]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/compute/exec/key_encode.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / exec / key_encode.h
diff --git a/ceph/src/arrow/cpp/src/arrow/compute/exec/key_encode.h b/ceph/src/arrow/cpp/src/arrow/compute/exec/key_encode.h
new file mode 100644 (file)
index 0000000..69f4a16
--- /dev/null
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/compute/exec/util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+class KeyColumnMetadata;
+
+/// Converts between key representation as a collection of arrays for
+/// individual columns and another representation as a single array of rows
+/// combining data from all columns into one value.
+/// This conversion is reversible.
+/// Row-oriented storage is beneficial when there is a need for random access
+/// of individual rows and at the same time all included columns are likely to
+/// be accessed together, as in the case of hash table key.
+class KeyEncoder {
+ public:
+  struct KeyEncoderContext {
+    bool has_avx2() const {
+      return (hardware_flags & arrow::internal::CpuInfo::AVX2) > 0;
+    }
+    int64_t hardware_flags;
+    util::TempVectorStack* stack;
+  };
+
+  /// Description of a storage format of a single key column as needed
+  /// for the purpose of row encoding.
+  struct KeyColumnMetadata {
+    KeyColumnMetadata() = default;
+    KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in)
+        : is_fixed_length(is_fixed_length_in), fixed_length(fixed_length_in) {}
+    /// Is column storing a varying-length binary, using offsets array
+    /// to find a beginning of a value, or is it a fixed-length binary.
+    bool is_fixed_length;
+    /// For a fixed-length binary column: number of bytes per value.
+    /// Zero has a special meaning, indicating a bit vector with one bit per value.
+    /// For a varying-length binary column: number of bytes per offset.
+    uint32_t fixed_length;
+  };
+
+  /// Description of a storage format for rows produced by encoder.
+  struct KeyRowMetadata {
+    /// Is row a varying-length binary, using offsets array to find a beginning of a row,
+    /// or is it a fixed-length binary.
+    bool is_fixed_length;
+
+    /// For a fixed-length binary row, common size of rows in bytes,
+    /// rounded up to the multiple of alignment.
+    ///
+    /// For a varying-length binary, size of all encoded fixed-length key columns,
+    /// including lengths of varying-length columns, rounded up to the multiple of string
+    /// alignment.
+    uint32_t fixed_length;
+
+    /// Offset within a row to the array of 32-bit offsets within a row of
+    /// ends of varbinary fields.
+    /// Used only when the row is not fixed-length, zero for fixed-length row.
+    /// There are N elements for N varbinary fields.
+    /// Each element is the offset within a row of the first byte after
+    /// the corresponding varbinary field bytes in that row.
+    /// If varbinary fields begin at aligned addresses, than the end of the previous
+    /// varbinary field needs to be rounded up according to the specified alignment
+    /// to obtain the beginning of the next varbinary field.
+    /// The first varbinary field starts at offset specified by fixed_length,
+    /// which should already be aligned.
+    uint32_t varbinary_end_array_offset;
+
+    /// Fixed number of bytes per row that are used to encode null masks.
+    /// Null masks indicate for a single row which of its key columns are null.
+    /// Nth bit in the sequence of bytes assigned to a row represents null
+    /// information for Nth field according to the order in which they are encoded.
+    int null_masks_bytes_per_row;
+
+    /// Power of 2. Every row will start at the offset aligned to that number of bytes.
+    int row_alignment;
+
+    /// Power of 2. Must be no greater than row alignment.
+    /// Every non-power-of-2 binary field and every varbinary field bytes
+    /// will start aligned to that number of bytes.
+    int string_alignment;
+
+    /// Metadata of encoded columns in their original order.
+    std::vector<KeyColumnMetadata> column_metadatas;
+
+    /// Order in which fields are encoded.
+    std::vector<uint32_t> column_order;
+
+    /// Offsets within a row to fields in their encoding order.
+    std::vector<uint32_t> column_offsets;
+
+    /// Rounding up offset to the nearest multiple of alignment value.
+    /// Alignment must be a power of 2.
+    static inline uint32_t padding_for_alignment(uint32_t offset,
+                                                 int required_alignment) {
+      ARROW_DCHECK(ARROW_POPCOUNT64(required_alignment) == 1);
+      return static_cast<uint32_t>((-static_cast<int32_t>(offset)) &
+                                   (required_alignment - 1));
+    }
+
+    /// Rounding up offset to the beginning of next column,
+    /// chosing required alignment based on the data type of that column.
+    static inline uint32_t padding_for_alignment(uint32_t offset, int string_alignment,
+                                                 const KeyColumnMetadata& col_metadata) {
+      if (!col_metadata.is_fixed_length ||
+          ARROW_POPCOUNT64(col_metadata.fixed_length) <= 1) {
+        return 0;
+      } else {
+        return padding_for_alignment(offset, string_alignment);
+      }
+    }
+
+    /// Returns an array of offsets within a row of ends of varbinary fields.
+    inline const uint32_t* varbinary_end_array(const uint8_t* row) const {
+      ARROW_DCHECK(!is_fixed_length);
+      return reinterpret_cast<const uint32_t*>(row + varbinary_end_array_offset);
+    }
+    inline uint32_t* varbinary_end_array(uint8_t* row) const {
+      ARROW_DCHECK(!is_fixed_length);
+      return reinterpret_cast<uint32_t*>(row + varbinary_end_array_offset);
+    }
+
+    /// Returns the offset within the row and length of the first varbinary field.
+    inline void first_varbinary_offset_and_length(const uint8_t* row, uint32_t* offset,
+                                                  uint32_t* length) const {
+      ARROW_DCHECK(!is_fixed_length);
+      *offset = fixed_length;
+      *length = varbinary_end_array(row)[0] - fixed_length;
+    }
+
+    /// Returns the offset within the row and length of the second and further varbinary
+    /// fields.
+    inline void nth_varbinary_offset_and_length(const uint8_t* row, int varbinary_id,
+                                                uint32_t* out_offset,
+                                                uint32_t* out_length) const {
+      ARROW_DCHECK(!is_fixed_length);
+      ARROW_DCHECK(varbinary_id > 0);
+      const uint32_t* varbinary_end = varbinary_end_array(row);
+      uint32_t offset = varbinary_end[varbinary_id - 1];
+      offset += padding_for_alignment(offset, string_alignment);
+      *out_offset = offset;
+      *out_length = varbinary_end[varbinary_id] - offset;
+    }
+
+    uint32_t encoded_field_order(uint32_t icol) const { return column_order[icol]; }
+
+    uint32_t encoded_field_offset(uint32_t icol) const { return column_offsets[icol]; }
+
+    uint32_t num_cols() const { return static_cast<uint32_t>(column_metadatas.size()); }
+
+    uint32_t num_varbinary_cols() const;
+
+    void FromColumnMetadataVector(const std::vector<KeyColumnMetadata>& cols,
+                                  int in_row_alignment, int in_string_alignment);
+
+    bool is_compatible(const KeyRowMetadata& other) const;
+  };
+
+  class KeyRowArray {
+   public:
+    KeyRowArray();
+    Status Init(MemoryPool* pool, const KeyRowMetadata& metadata);
+    void Clean();
+    Status AppendEmpty(uint32_t num_rows_to_append, uint32_t num_extra_bytes_to_append);
+    Status AppendSelectionFrom(const KeyRowArray& from, uint32_t num_rows_to_append,
+                               const uint16_t* source_row_ids);
+    const KeyRowMetadata& metadata() const { return metadata_; }
+    int64_t length() const { return num_rows_; }
+    const uint8_t* data(int i) const {
+      ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+      return buffers_[i];
+    }
+    uint8_t* mutable_data(int i) {
+      ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+      return mutable_buffers_[i];
+    }
+    const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
+    uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
+    const uint8_t* null_masks() const { return null_masks_->data(); }
+    uint8_t* null_masks() { return null_masks_->mutable_data(); }
+
+    bool has_any_nulls(const KeyEncoderContext* ctx) const;
+
+   private:
+    Status ResizeFixedLengthBuffers(int64_t num_extra_rows);
+    Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes);
+
+    int64_t size_null_masks(int64_t num_rows);
+    int64_t size_offsets(int64_t num_rows);
+    int64_t size_rows_fixed_length(int64_t num_rows);
+    int64_t size_rows_varying_length(int64_t num_bytes);
+    void update_buffer_pointers();
+
+    static constexpr int64_t padding_for_vectors = 64;
+    MemoryPool* pool_;
+    KeyRowMetadata metadata_;
+    /// Buffers can only expand during lifetime and never shrink.
+    std::unique_ptr<ResizableBuffer> null_masks_;
+    std::unique_ptr<ResizableBuffer> offsets_;
+    std::unique_ptr<ResizableBuffer> rows_;
+    static constexpr int max_buffers_ = 3;
+    const uint8_t* buffers_[max_buffers_];
+    uint8_t* mutable_buffers_[max_buffers_];
+    int64_t num_rows_;
+    int64_t rows_capacity_;
+    int64_t bytes_capacity_;
+
+    // Mutable to allow lazy evaluation
+    mutable int64_t num_rows_for_has_any_nulls_;
+    mutable bool has_any_nulls_;
+  };
+
+  /// A lightweight description of an array representing one of key columns.
+  class KeyColumnArray {
+   public:
+    KeyColumnArray() = default;
+    /// Create as a mix of buffers according to the mask from two descriptions
+    /// (Nth bit is set to 0 if Nth buffer from the first input
+    /// should be used and is set to 1 otherwise).
+    /// Metadata is inherited from the first input.
+    KeyColumnArray(const KeyColumnMetadata& metadata, const KeyColumnArray& left,
+                   const KeyColumnArray& right, int buffer_id_to_replace);
+    /// Create for reading
+    KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                   const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                   int bit_offset0 = 0, int bit_offset1 = 0);
+    /// Create for writing
+    KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                   uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                   int bit_offset1 = 0);
+    /// Create as a window view of original description that is offset
+    /// by a given number of rows.
+    /// The number of rows used in offset must be divisible by 8
+    /// in order to not split bit vectors within a single byte.
+    KeyColumnArray(const KeyColumnArray& from, int64_t start, int64_t length);
+    uint8_t* mutable_data(int i) {
+      ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+      return mutable_buffers_[i];
+    }
+    const uint8_t* data(int i) const {
+      ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+      return buffers_[i];
+    }
+    uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
+    const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
+    const KeyColumnMetadata& metadata() const { return metadata_; }
+    int64_t length() const { return length_; }
+    int bit_offset(int i) const {
+      ARROW_DCHECK(i >= 0 && i < max_buffers_);
+      return bit_offset_[i];
+    }
+
+   private:
+    static constexpr int max_buffers_ = 3;
+    const uint8_t* buffers_[max_buffers_];
+    uint8_t* mutable_buffers_[max_buffers_];
+    KeyColumnMetadata metadata_;
+    int64_t length_;
+    // Starting bit offset within the first byte (between 0 and 7)
+    // to be used when accessing buffers that store bit vectors.
+    int bit_offset_[max_buffers_ - 1];
+  };
+
+  void Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
+            int row_alignment, int string_alignment);
+
+  const KeyRowMetadata& row_metadata() { return row_metadata_; }
+
+  void PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
+                             const std::vector<KeyColumnArray>& cols);
+  Status EncodeSelected(KeyRowArray* rows, uint32_t num_selected,
+                        const uint16_t* selection);
+
+  /// Decode a window of row oriented data into a corresponding
+  /// window of column oriented storage.
+  /// The output buffers need to be correctly allocated and sized before
+  /// calling each method.
+  /// For that reason decoding is split into two functions.
+  /// The output of the first one, that processes everything except for
+  /// varying length buffers, can be used to find out required varying
+  /// length buffers sizes.
+  void DecodeFixedLengthBuffers(int64_t start_row_input, int64_t start_row_output,
+                                int64_t num_rows, const KeyRowArray& rows,
+                                std::vector<KeyColumnArray>* cols);
+
+  void DecodeVaryingLengthBuffers(int64_t start_row_input, int64_t start_row_output,
+                                  int64_t num_rows, const KeyRowArray& rows,
+                                  std::vector<KeyColumnArray>* cols);
+
+  const std::vector<KeyColumnArray>& GetBatchColumns() const { return batch_all_cols_; }
+
+ private:
+  /// Prepare column array vectors.
+  /// Output column arrays represent a range of input column arrays
+  /// specified by starting row and number of rows.
+  /// Three vectors are generated:
+  /// - all columns
+  /// - fixed-length columns only
+  /// - varying-length columns only
+  void PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
+                              const std::vector<KeyColumnArray>& cols_in);
+
+  class TransformBoolean {
+   public:
+    static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
+                                       const KeyColumnArray& temp);
+    static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
+                           KeyEncoderContext* ctx);
+  };
+
+  class EncoderInteger {
+   public:
+    static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+                       const KeyRowArray& rows, KeyColumnArray* col,
+                       KeyEncoderContext* ctx, KeyColumnArray* temp);
+    static bool UsesTransform(const KeyColumnArray& column);
+    static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
+                                       const KeyColumnArray& temp);
+    static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
+                           KeyEncoderContext* ctx);
+
+   private:
+    static bool IsBoolean(const KeyColumnMetadata& metadata);
+  };
+
+  class EncoderBinary {
+   public:
+    static void EncodeSelected(uint32_t offset_within_row, KeyRowArray* rows,
+                               const KeyColumnArray& col, uint32_t num_selected,
+                               const uint16_t* selection);
+    static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+                       const KeyRowArray& rows, KeyColumnArray* col,
+                       KeyEncoderContext* ctx, KeyColumnArray* temp);
+    static bool IsInteger(const KeyColumnMetadata& metadata);
+
+   private:
+    template <class COPY_FN, class SET_NULL_FN>
+    static void EncodeSelectedImp(uint32_t offset_within_row, KeyRowArray* rows,
+                                  const KeyColumnArray& col, uint32_t num_selected,
+                                  const uint16_t* selection, COPY_FN copy_fn,
+                                  SET_NULL_FN set_null_fn);
+
+    template <bool is_row_fixed_length, class COPY_FN>
+    static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
+                                    uint32_t offset_within_row,
+                                    const KeyRowArray* rows_const,
+                                    KeyRowArray* rows_mutable_maybe_null,
+                                    const KeyColumnArray* col_const,
+                                    KeyColumnArray* col_mutable_maybe_null,
+                                    COPY_FN copy_fn);
+    template <bool is_row_fixed_length>
+    static void DecodeImp(uint32_t start_row, uint32_t num_rows,
+                          uint32_t offset_within_row, const KeyRowArray& rows,
+                          KeyColumnArray* col);
+#if defined(ARROW_HAVE_AVX2)
+    static void DecodeHelper_avx2(bool is_row_fixed_length, uint32_t start_row,
+                                  uint32_t num_rows, uint32_t offset_within_row,
+                                  const KeyRowArray& rows, KeyColumnArray* col);
+    template <bool is_row_fixed_length>
+    static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                               uint32_t offset_within_row, const KeyRowArray& rows,
+                               KeyColumnArray* col);
+#endif
+  };
+
+  class EncoderBinaryPair {
+   public:
+    static bool CanProcessPair(const KeyColumnMetadata& col1,
+                               const KeyColumnMetadata& col2) {
+      return EncoderBinary::IsInteger(col1) && EncoderBinary::IsInteger(col2);
+    }
+    static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+                       const KeyRowArray& rows, KeyColumnArray* col1,
+                       KeyColumnArray* col2, KeyEncoderContext* ctx,
+                       KeyColumnArray* temp1, KeyColumnArray* temp2);
+
+   private:
+    template <bool is_row_fixed_length, typename col1_type, typename col2_type>
+    static void DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row,
+                          uint32_t num_rows, uint32_t offset_within_row,
+                          const KeyRowArray& rows, KeyColumnArray* col1,
+                          KeyColumnArray* col2);
+#if defined(ARROW_HAVE_AVX2)
+    static uint32_t DecodeHelper_avx2(bool is_row_fixed_length, uint32_t col_width,
+                                      uint32_t start_row, uint32_t num_rows,
+                                      uint32_t offset_within_row, const KeyRowArray& rows,
+                                      KeyColumnArray* col1, KeyColumnArray* col2);
+    template <bool is_row_fixed_length, uint32_t col_width>
+    static uint32_t DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                                   uint32_t offset_within_row, const KeyRowArray& rows,
+                                   KeyColumnArray* col1, KeyColumnArray* col2);
+#endif
+  };
+
+  class EncoderOffsets {
+   public:
+    static void GetRowOffsetsSelected(KeyRowArray* rows,
+                                      const std::vector<KeyColumnArray>& cols,
+                                      uint32_t num_selected, const uint16_t* selection);
+    static void EncodeSelected(KeyRowArray* rows, const std::vector<KeyColumnArray>& cols,
+                               uint32_t num_selected, const uint16_t* selection);
+
+    static void Decode(uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
+                       std::vector<KeyColumnArray>* varbinary_cols,
+                       const std::vector<uint32_t>& varbinary_cols_base_offset,
+                       KeyEncoderContext* ctx);
+
+   private:
+    template <bool has_nulls, bool is_first_varbinary>
+    static void EncodeSelectedImp(uint32_t ivarbinary, KeyRowArray* rows,
+                                  const std::vector<KeyColumnArray>& cols,
+                                  uint32_t num_selected, const uint16_t* selection);
+  };
+
+  class EncoderVarBinary {
+   public:
+    static void EncodeSelected(uint32_t ivarbinary, KeyRowArray* rows,
+                               const KeyColumnArray& cols, uint32_t num_selected,
+                               const uint16_t* selection);
+
+    static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
+                       const KeyRowArray& rows, KeyColumnArray* col,
+                       KeyEncoderContext* ctx);
+
+   private:
+    template <bool first_varbinary_col, class COPY_FN>
+    static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
+                                    uint32_t varbinary_col_id,
+                                    const KeyRowArray* rows_const,
+                                    KeyRowArray* rows_mutable_maybe_null,
+                                    const KeyColumnArray* col_const,
+                                    KeyColumnArray* col_mutable_maybe_null,
+                                    COPY_FN copy_fn);
+    template <bool first_varbinary_col>
+    static void DecodeImp(uint32_t start_row, uint32_t num_rows,
+                          uint32_t varbinary_col_id, const KeyRowArray& rows,
+                          KeyColumnArray* col);
+#if defined(ARROW_HAVE_AVX2)
+    static void DecodeHelper_avx2(uint32_t start_row, uint32_t num_rows,
+                                  uint32_t varbinary_col_id, const KeyRowArray& rows,
+                                  KeyColumnArray* col);
+    template <bool first_varbinary_col>
+    static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                               uint32_t varbinary_col_id, const KeyRowArray& rows,
+                               KeyColumnArray* col);
+#endif
+  };
+
+  class EncoderNulls {
+   public:
+    static void EncodeSelected(KeyRowArray* rows, const std::vector<KeyColumnArray>& cols,
+                               uint32_t num_selected, const uint16_t* selection);
+
+    static void Decode(uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
+                       std::vector<KeyColumnArray>* cols);
+  };
+
+  KeyEncoderContext* ctx_;
+
+  // Data initialized once, based on data types of key columns
+  KeyRowMetadata row_metadata_;
+
+  // Data initialized for each input batch.
+  // All elements are ordered according to the order of encoded fields in a row.
+  std::vector<KeyColumnArray> batch_all_cols_;
+  std::vector<KeyColumnArray> batch_varbinary_cols_;
+  std::vector<uint32_t> batch_varbinary_cols_base_offsets_;
+};
+
+template <bool is_row_fixed_length, class COPY_FN>
+inline void KeyEncoder::EncoderBinary::DecodeHelper(
+    uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+    const KeyRowArray* rows_const, KeyRowArray* rows_mutable_maybe_null,
+    const KeyColumnArray* col_const, KeyColumnArray* col_mutable_maybe_null,
+    COPY_FN copy_fn) {
+  ARROW_DCHECK(col_const && col_const->metadata().is_fixed_length);
+  uint32_t col_width = col_const->metadata().fixed_length;
+
+  if (is_row_fixed_length) {
+    uint32_t row_width = rows_const->metadata().fixed_length;
+    for (uint32_t i = 0; i < num_rows; ++i) {
+      const uint8_t* src;
+      uint8_t* dst;
+      src = rows_const->data(1) + row_width * (start_row + i) + offset_within_row;
+      dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
+      copy_fn(dst, src, col_width);
+    }
+  } else {
+    const uint32_t* row_offsets = rows_const->offsets();
+    for (uint32_t i = 0; i < num_rows; ++i) {
+      const uint8_t* src;
+      uint8_t* dst;
+      src = rows_const->data(2) + row_offsets[start_row + i] + offset_within_row;
+      dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
+      copy_fn(dst, src, col_width);
+    }
+  }
+}
+
+template <bool first_varbinary_col, class COPY_FN>
+inline void KeyEncoder::EncoderVarBinary::DecodeHelper(
+    uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
+    const KeyRowArray* rows_const, KeyRowArray* rows_mutable_maybe_null,
+    const KeyColumnArray* col_const, KeyColumnArray* col_mutable_maybe_null,
+    COPY_FN copy_fn) {
+  // Column and rows need to be varying length
+  ARROW_DCHECK(!rows_const->metadata().is_fixed_length &&
+               !col_const->metadata().is_fixed_length);
+
+  const uint32_t* row_offsets_for_batch = rows_const->offsets() + start_row;
+  const uint32_t* col_offsets = col_const->offsets();
+
+  uint32_t col_offset_next = col_offsets[0];
+  for (uint32_t i = 0; i < num_rows; ++i) {
+    uint32_t col_offset = col_offset_next;
+    col_offset_next = col_offsets[i + 1];
+
+    uint32_t row_offset = row_offsets_for_batch[i];
+    const uint8_t* row = rows_const->data(2) + row_offset;
+
+    uint32_t offset_within_row;
+    uint32_t length;
+    if (first_varbinary_col) {
+      rows_const->metadata().first_varbinary_offset_and_length(row, &offset_within_row,
+                                                               &length);
+    } else {
+      rows_const->metadata().nth_varbinary_offset_and_length(row, varbinary_col_id,
+                                                             &offset_within_row, &length);
+    }
+
+    row_offset += offset_within_row;
+
+    const uint8_t* src;
+    uint8_t* dst;
+    src = rows_const->data(2) + row_offset;
+    dst = col_mutable_maybe_null->mutable_data(2) + col_offset;
+    copy_fn(dst, src, length);
+  }
+}
+
+}  // namespace compute
+}  // namespace arrow