]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / c / src / main / java / org / apache / arrow / c / ArrayImporter.java
diff --git a/ceph/src/arrow/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java b/ceph/src/arrow/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
new file mode 100644 (file)
index 0000000..e82cef6
--- /dev/null
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import static org.apache.arrow.c.NativeUtil.NULL;
+import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
+import static org.apache.arrow.util.Preconditions.checkNotNull;
+import static org.apache.arrow.util.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.TypeLayout;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+
+/**
+ * Importer for {@link ArrowArray}.
+ */
+final class ArrayImporter {
+  private static final int MAX_IMPORT_RECURSION_LEVEL = 64;
+
+  private final BufferAllocator allocator;
+  private final FieldVector vector;
+  private final DictionaryProvider dictionaryProvider;
+
+  private CDataReferenceManager referenceManager;
+  private int recursionLevel;
+
+  ArrayImporter(BufferAllocator allocator, FieldVector vector, DictionaryProvider dictionaryProvider) {
+    this.allocator = allocator;
+    this.vector = vector;
+    this.dictionaryProvider = dictionaryProvider;
+  }
+
+  void importArray(ArrowArray src) {
+    ArrowArray.Snapshot snapshot = src.snapshot();
+    checkState(snapshot.release != NULL, "Cannot import released ArrowArray");
+
+    // Move imported array
+    ArrowArray ownedArray = ArrowArray.allocateNew(allocator);
+    ownedArray.save(snapshot);
+    src.markReleased();
+    src.close();
+
+    recursionLevel = 0;
+
+    // This keeps the array alive as long as there are any buffers that need it
+    referenceManager = new CDataReferenceManager(ownedArray);
+    try {
+      referenceManager.increment();
+      doImport(snapshot);
+    } finally {
+      referenceManager.release();
+    }
+  }
+
+  private void importChild(ArrayImporter parent, ArrowArray src) {
+    ArrowArray.Snapshot snapshot = src.snapshot();
+    checkState(snapshot.release != NULL, "Cannot import released ArrowArray");
+    recursionLevel = parent.recursionLevel + 1;
+    checkState(recursionLevel <= MAX_IMPORT_RECURSION_LEVEL, "Recursion level in ArrowArray struct exceeded");
+    // Child buffers will keep the entire parent import alive.
+    // Perhaps we can move the child structs on import,
+    // but that is another level of complication.
+    referenceManager = parent.referenceManager;
+    doImport(snapshot);
+  }
+
+  private void doImport(ArrowArray.Snapshot snapshot) {
+    // First import children (required for reconstituting parent array data)
+    long[] children = NativeUtil.toJavaArray(snapshot.children, checkedCastToInt(snapshot.n_children));
+    if (children != null && children.length > 0) {
+      List<FieldVector> childVectors = vector.getChildrenFromFields();
+      checkState(children.length == childVectors.size(), "ArrowArray struct has %s children (expected %s)",
+          children.length, childVectors.size());
+      for (int i = 0; i < children.length; i++) {
+        checkState(children[i] != NULL, "ArrowArray struct has NULL child at position %s", i);
+        ArrayImporter childImporter = new ArrayImporter(allocator, childVectors.get(i), dictionaryProvider);
+        childImporter.importChild(this, ArrowArray.wrap(children[i]));
+      }
+    }
+
+    // Handle import of a dictionary encoded vector
+    if (snapshot.dictionary != NULL) {
+      DictionaryEncoding encoding = vector.getField().getDictionary();
+      checkNotNull(encoding, "Missing encoding on import of ArrowArray with dictionary");
+
+      Dictionary dictionary = dictionaryProvider.lookup(encoding.getId());
+      checkNotNull(dictionary, "Dictionary lookup failed on import of ArrowArray with dictionary");
+
+      // reset the dictionary vector to the initial state
+      dictionary.getVector().clear();
+
+      ArrayImporter dictionaryImporter = new ArrayImporter(allocator, dictionary.getVector(), dictionaryProvider);
+      dictionaryImporter.importChild(this, ArrowArray.wrap(snapshot.dictionary));
+    }
+
+    // Import main data
+    ArrowFieldNode fieldNode = new ArrowFieldNode(snapshot.length, snapshot.null_count);
+    List<ArrowBuf> buffers = importBuffers(snapshot);
+    try {
+      vector.loadFieldBuffers(fieldNode, buffers);
+    } catch (RuntimeException e) {
+      throw new IllegalArgumentException(
+          "Could not load buffers for field " + vector.getField() + ". error message: " + e.getMessage(), e);
+    }
+  }
+
+  private List<ArrowBuf> importBuffers(ArrowArray.Snapshot snapshot) {
+    long[] buffers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));
+    if (buffers == null || buffers.length == 0) {
+      return new ArrayList<>();
+    }
+
+    int buffersCount = TypeLayout.getTypeBufferCount(vector.getField().getType());
+    checkState(buffers.length == buffersCount, "Expected %s buffers for imported type %s, ArrowArray struct has %s",
+        buffersCount, vector.getField().getType().getTypeID(), buffers.length);
+
+    List<ArrowBuf> result = new ArrayList<>(buffersCount);
+    for (long bufferPtr : buffers) {
+      ArrowBuf buffer = null;
+      if (bufferPtr != NULL) {
+        // TODO(roee88): an API for getting the size for each buffer is not yet
+        // available
+        buffer = new ArrowBuf(referenceManager, null, Integer.MAX_VALUE, bufferPtr);
+      }
+      result.add(buffer);
+    }
+    return result;
+  }
+}