]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / main / java / org / apache / arrow / vector / complex / AbstractStructVector.java
diff --git a/ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java b/ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java
new file mode 100644 (file)
index 0000000..be6d992
--- /dev/null
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.complex;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.util.PromotableMultiMapWithOrdinal;
+import org.apache.arrow.vector.util.ValueVectorUtility;
+
+/**
+ * Base class for StructVectors. Currently used by NonNullableStructVector
+ */
+public abstract class AbstractStructVector extends AbstractContainerVector {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
+  private static final String STRUCT_CONFLICT_POLICY_ENV = "ARROW_STRUCT_CONFLICT_POLICY";
+  private static final String STRUCT_CONFLICT_POLICY_JVM = "arrow.struct.conflict.policy";
+  private static final ConflictPolicy DEFAULT_CONFLICT_POLICY;
+  // Maintains a map with key as field name and value is the vector itself
+  private final PromotableMultiMapWithOrdinal<String, FieldVector> vectors;
+  protected final boolean allowConflictPolicyChanges;
+  private ConflictPolicy conflictPolicy;
+
+
+  static {
+    String conflictPolicyStr = System.getProperty(STRUCT_CONFLICT_POLICY_JVM,
+        ConflictPolicy.CONFLICT_REPLACE.toString());
+    if (conflictPolicyStr == null) {
+      conflictPolicyStr = System.getenv(STRUCT_CONFLICT_POLICY_ENV);
+    }
+    ConflictPolicy conflictPolicy;
+    try {
+      conflictPolicy = ConflictPolicy.valueOf(conflictPolicyStr.toUpperCase());
+    } catch (Exception e) {
+      conflictPolicy = ConflictPolicy.CONFLICT_REPLACE;
+    }
+    DEFAULT_CONFLICT_POLICY = conflictPolicy;
+  }
+
+  /**
+   * Policy to determine how to react when duplicate columns are encountered.
+   */
+  public enum ConflictPolicy {
+    // Ignore the conflict and append the field. This is the default behaviour
+    CONFLICT_APPEND,
+    // Keep the existing field and ignore the newer one.
+    CONFLICT_IGNORE,
+    // Replace the existing field with the newer one.
+    CONFLICT_REPLACE,
+    // Refuse the new field and error out.
+    CONFLICT_ERROR
+  }
+
+  /**
+   *  Base coonstructor that sets default conflict policy to APPEND.
+   */
+  protected AbstractStructVector(String name,
+                                 BufferAllocator allocator,
+                                 CallBack callBack,
+                                 ConflictPolicy conflictPolicy,
+                                 boolean allowConflictPolicyChanges) {
+    super(name, allocator, callBack);
+    this.conflictPolicy = conflictPolicy == null ? DEFAULT_CONFLICT_POLICY : conflictPolicy;
+    this.vectors = new PromotableMultiMapWithOrdinal<>(allowConflictPolicyChanges, this.conflictPolicy);
+    this.allowConflictPolicyChanges = allowConflictPolicyChanges;
+  }
+
+  /**
+   * Set conflict policy and return last conflict policy state.
+   */
+  public ConflictPolicy setConflictPolicy(ConflictPolicy conflictPolicy) {
+    ConflictPolicy tmp = this.conflictPolicy;
+    this.conflictPolicy = conflictPolicy;
+    this.vectors.setConflictPolicy(conflictPolicy);
+    return tmp;
+  }
+
+  public ConflictPolicy getConflictPolicy() {
+    return conflictPolicy;
+  }
+
+  @Override
+  public void close() {
+    for (final ValueVector valueVector : vectors.values()) {
+      valueVector.close();
+    }
+    vectors.clear();
+
+    super.close();
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    /* boolean to keep track if all the memory allocation were successful
+     * Used in the case of composite vectors when we need to allocate multiple
+     * buffers for multiple vectors. If one of the allocations failed we need to
+     * clear all the memory that we allocated
+     */
+    boolean success = false;
+    try {
+      for (final ValueVector v : vectors.values()) {
+        if (!v.allocateNewSafe()) {
+          return false;
+        }
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        clear();
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void reAlloc() {
+    for (final ValueVector v : vectors.values()) {
+      v.reAlloc();
+    }
+  }
+
+  /**
+   * Adds a new field with the given parameters or replaces the existing one and consequently returns the resultant
+   * {@link org.apache.arrow.vector.ValueVector}.
+   *
+   * <p>Execution takes place in the following order:
+   * <ul>
+   * <li>
+   * if field is new, create and insert a new vector of desired type.
+   * </li>
+   * <li>
+   * if field exists and existing vector is of desired vector type, return the vector.
+   * </li>
+   * <li>
+   * if field exists and null filled, clear the existing vector; create and insert a new vector of desired type.
+   * </li>
+   * <li>
+   * otherwise, throw an {@link java.lang.IllegalStateException}
+   * </li>
+   * </ul>
+   *
+   * @param childName the name of the field
+   * @param fieldType the type for the vector
+   * @param clazz     class of expected vector type
+   * @param <T>       class type of expected vector type
+   * @return resultant {@link org.apache.arrow.vector.ValueVector}
+   * @throws java.lang.IllegalStateException raised if there is a hard schema change
+   */
+  public <T extends FieldVector> T addOrGet(String childName, FieldType fieldType, Class<T> clazz) {
+    final ValueVector existing = getChild(childName);
+    boolean create = false;
+    if (existing == null) {
+      create = true;
+    } else if (clazz.isAssignableFrom(existing.getClass())) {
+      return clazz.cast(existing);
+    } else if (nullFilled(existing)) {
+      existing.clear();
+      create = true;
+    }
+    if (create) {
+      final T vector = clazz.cast(fieldType.createNewSingleVector(childName, allocator, callBack));
+      putChild(childName, vector);
+      if (callBack != null) {
+        callBack.doWork();
+      }
+      return vector;
+    }
+    final String message = "Arrow does not support schema change yet. Existing[%s] and desired[%s] vector types " +
+        "mismatch";
+    throw new IllegalStateException(String.format(message, existing.getClass().getSimpleName(), clazz.getSimpleName()));
+  }
+
+  private boolean nullFilled(ValueVector vector) {
+    return BitVectorHelper.checkAllBitsEqualTo(vector.getValidityBuffer(), vector.getValueCount(), false);
+  }
+
+  /**
+   * Returns a {@link org.apache.arrow.vector.ValueVector} corresponding to the given ordinal identifier.
+   *
+   * @param id the ordinal of the child to return
+   * @return the corresponding child
+   */
+  public ValueVector getChildByOrdinal(int id) {
+    return vectors.getByOrdinal(id);
+  }
+
+  /**
+   * Returns a {@link org.apache.arrow.vector.ValueVector} instance of subtype of T corresponding to the given
+   * field name if exists or null.
+   *
+   * If there is more than one element for name this will return the first inserted.
+   *
+   * @param name  the name of the child to return
+   * @param clazz the expected type of the child
+   * @return the child corresponding to this name
+   */
+  @Override
+  public <T extends FieldVector> T getChild(String name, Class<T> clazz) {
+    final FieldVector f = vectors.get(name);
+    if (f == null) {
+      return null;
+    }
+    return typeify(f, clazz);
+  }
+
+  protected ValueVector add(String childName, FieldType fieldType) {
+    FieldVector vector = fieldType.createNewSingleVector(childName, allocator, callBack);
+    putChild(childName, vector);
+    if (callBack != null) {
+      callBack.doWork();
+    }
+    return vector;
+  }
+
+  /**
+   * Inserts the vector with the given name if it does not exist else replaces it with the new value.
+   *
+   * <p>Note that this method does not enforce any vector type check nor throws a schema change exception.
+   *
+   * @param name   the name of the child to add
+   * @param vector the vector to add as a child
+   */
+  protected void putChild(String name, FieldVector vector) {
+    putVector(name, vector);
+  }
+
+  private void put(String name, FieldVector vector, boolean overwrite) {
+    final boolean old = vectors.put(
+        Preconditions.checkNotNull(name, "field name cannot be null"),
+        Preconditions.checkNotNull(vector, "vector cannot be null"),
+        overwrite
+    );
+    if (old) {
+      logger.debug("Field [{}] mutated to [{}] ", name,
+          vector.getClass().getSimpleName());
+    }
+  }
+
+  /**
+   * Inserts the input vector into the map if it does not exist.
+   *
+   * <p>
+   * If the field name already exists the conflict is handled according to the currently set ConflictPolicy
+   * </p>
+   *
+   * @param name   field name
+   * @param vector vector to be inserted
+   */
+  protected void putVector(String name, FieldVector vector) {
+    switch (conflictPolicy) {
+      case CONFLICT_APPEND:
+        put(name, vector, false);
+        break;
+      case CONFLICT_IGNORE:
+        if (!vectors.containsKey(name)) {
+          put(name, vector, false);
+        }
+        break;
+      case CONFLICT_REPLACE:
+        if (vectors.containsKey(name)) {
+          vectors.removeAll(name);
+        }
+        put(name, vector, true);
+        break;
+      case CONFLICT_ERROR:
+        if (vectors.containsKey(name)) {
+          throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ",
+            vector.getClass().getSimpleName(), vector.getField().getFieldType()));
+        }
+        put(name, vector, false);
+        break;
+      default:
+        throw new IllegalStateException(String.format("%s type not a valid conflict state", conflictPolicy));
+    }
+
+  }
+
+  /**
+   * Get child vectors.
+   * @return a sequence of underlying child vectors.
+   */
+  protected List<FieldVector> getChildren() {
+    int size = vectors.size();
+    List<FieldVector> children = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      children.add(vectors.getByOrdinal(i));
+    }
+    return children;
+  }
+
+  /**
+   * Get child field names.
+   */
+  public List<String> getChildFieldNames() {
+    return getChildren().stream()
+        .map(child -> child.getField().getName())
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the number of child vectors.
+   * @return the number of underlying child vectors.
+   */
+  @Override
+  public int size() {
+    return vectors.size();
+  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return Collections.<ValueVector>unmodifiableCollection(vectors.values()).iterator();
+  }
+
+  /**
+   * Get primitive child vectors.
+   * @return a list of scalar child vectors recursing the entire vector hierarchy.
+   */
+  public List<ValueVector> getPrimitiveVectors() {
+    final List<ValueVector> primitiveVectors = new ArrayList<>();
+    for (final FieldVector v : vectors.values()) {
+      primitiveVectors.addAll(getPrimitiveVectors(v));
+    }
+    return primitiveVectors;
+  }
+
+  private List<ValueVector> getPrimitiveVectors(FieldVector v) {
+    final List<ValueVector> primitives = new ArrayList<>();
+    if (v instanceof AbstractStructVector) {
+      AbstractStructVector structVector = (AbstractStructVector) v;
+      primitives.addAll(structVector.getPrimitiveVectors());
+    } else if (v instanceof ListVector) {
+      ListVector listVector = (ListVector) v;
+      primitives.addAll(getPrimitiveVectors(listVector.getDataVector()));
+    } else if (v instanceof FixedSizeListVector) {
+      FixedSizeListVector listVector = (FixedSizeListVector) v;
+      primitives.addAll(getPrimitiveVectors(listVector.getDataVector()));
+    } else if (v instanceof UnionVector) {
+      UnionVector unionVector = (UnionVector) v;
+      for (final FieldVector vector : unionVector.getChildrenFromFields()) {
+        primitives.addAll(getPrimitiveVectors(vector));
+      }
+    } else {
+      primitives.add(v);
+    }
+    return primitives;
+  }
+
+  /**
+   * Get a child vector by name. If duplicate names this returns the first inserted.
+   * @param name the name of the child to return
+   * @return a vector with its corresponding ordinal mapping if field exists or null.
+   */
+  @Override
+  public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
+    final int ordinal = vectors.getOrdinal(name);
+    if (ordinal < 0) {
+      return null;
+    }
+    final ValueVector vector = vectors.getByOrdinal(ordinal);
+    return new VectorWithOrdinal(vector, ordinal);
+  }
+
+  @Override
+  public ArrowBuf[] getBuffers(boolean clear) {
+    final List<ArrowBuf> buffers = new ArrayList<>();
+
+    for (final ValueVector vector : vectors.values()) {
+      for (final ArrowBuf buf : vector.getBuffers(false)) {
+        buffers.add(buf);
+        if (clear) {
+          buf.getReferenceManager().retain(1);
+        }
+      }
+      if (clear) {
+        vector.clear();
+      }
+    }
+
+    return buffers.toArray(new ArrowBuf[buffers.size()]);
+  }
+
+  @Override
+  public int getBufferSize() {
+    int actualBufSize = 0;
+
+    for (final ValueVector v : vectors.values()) {
+      for (final ArrowBuf buf : v.getBuffers(false)) {
+        actualBufSize += buf.writerIndex();
+      }
+    }
+    return actualBufSize;
+  }
+
+  @Override
+  public String toString() {
+    return ValueVectorUtility.getToString(this, 0 , getValueCount());
+  }
+
+}