]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / dataset / src / main / java / org / apache / arrow / dataset / jni / NativeScanner.java
diff --git a/ceph/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/ceph/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
new file mode 100644 (file)
index 0000000..24c2980
--- /dev/null
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.BufferLedger;
+import org.apache.arrow.memory.NativeUnderlyingMemory;
+import org.apache.arrow.memory.util.LargeMemoryUtil;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+/**
+ * Native implementation of {@link Scanner}. Note that it currently emits only a single scan task of type
+ * {@link NativeScanTask}, which is internally a combination of all scan task instances returned by the
+ * native scanner.
+ */
+public class NativeScanner implements Scanner {
+
+  private final AtomicBoolean executed = new AtomicBoolean(false);
+  private final NativeContext context;
+  private final long scannerId;
+
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Lock writeLock = lock.writeLock();
+  private final Lock readLock = lock.readLock();
+  private boolean closed = false;
+
+  public NativeScanner(NativeContext context, long scannerId) {
+    this.context = context;
+    this.scannerId = scannerId;
+  }
+
+  ScanTask.BatchIterator execute() {
+    if (closed) {
+      throw new NativeInstanceReleasedException();
+    }
+    if (!executed.compareAndSet(false, true)) {
+      throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " +
+          "new scanner instead");
+    }
+    return new ScanTask.BatchIterator() {
+      private ArrowRecordBatch peek = null;
+
+      @Override
+      public void close() {
+        NativeScanner.this.close();
+      }
+
+      @Override
+      public boolean hasNext() {
+        if (peek != null) {
+          return true;
+        }
+        final NativeRecordBatchHandle handle;
+        readLock.lock();
+        try {
+          if (closed) {
+            throw new NativeInstanceReleasedException();
+          }
+          handle = JniWrapper.get().nextRecordBatch(scannerId);
+        } finally {
+          readLock.unlock();
+        }
+        if (handle == null) {
+          return false;
+        }
+        final ArrayList<ArrowBuf> buffers = new ArrayList<>();
+        for (NativeRecordBatchHandle.Buffer buffer : handle.getBuffers()) {
+          final BufferAllocator allocator = context.getAllocator();
+          final int size = LargeMemoryUtil.checkedCastToInt(buffer.size);
+          final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
+              size, buffer.nativeInstanceId, buffer.memoryAddress);
+          BufferLedger ledger = am.associate(allocator);
+          ArrowBuf buf = new ArrowBuf(ledger, null, size, buffer.memoryAddress);
+          buffers.add(buf);
+        }
+
+        try {
+          final int numRows = LargeMemoryUtil.checkedCastToInt(handle.getNumRows());
+          peek = new ArrowRecordBatch(numRows, handle.getFields().stream()
+              .map(field -> new ArrowFieldNode(field.length, field.nullCount))
+              .collect(Collectors.toList()), buffers);
+          return true;
+        } finally {
+          buffers.forEach(buffer -> buffer.getReferenceManager().release());
+        }
+      }
+
+      @Override
+      public ArrowRecordBatch next() {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+
+        try {
+          return peek;
+        } finally {
+          peek = null;
+        }
+      }
+    };
+  }
+
+  @Override
+  public Iterable<? extends NativeScanTask> scan() {
+    if (closed) {
+      throw new NativeInstanceReleasedException();
+    }
+    return Collections.singletonList(new NativeScanTask(this));
+  }
+
+  @Override
+  public Schema schema() {
+    readLock.lock();
+    try {
+      if (closed) {
+        throw new NativeInstanceReleasedException();
+      }
+      return SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() {
+    writeLock.lock();
+    try {
+      if (closed) {
+        return;
+      }
+      closed = true;
+      JniWrapper.get().closeScanner(scannerId);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+}