]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/table.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / table.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/table.h"
19
20#include <algorithm>
21#include <cstdlib>
22#include <limits>
23#include <memory>
24#include <sstream>
25#include <utility>
26
27#include "arrow/array/array_base.h"
28#include "arrow/array/array_binary.h"
29#include "arrow/array/array_nested.h"
30#include "arrow/array/concatenate.h"
31#include "arrow/array/util.h"
32#include "arrow/chunked_array.h"
33#include "arrow/pretty_print.h"
34#include "arrow/record_batch.h"
35#include "arrow/result.h"
36#include "arrow/status.h"
37#include "arrow/type.h"
38#include "arrow/type_fwd.h"
39#include "arrow/type_traits.h"
40#include "arrow/util/checked_cast.h"
41#include "arrow/util/logging.h"
42#include "arrow/util/vector.h"
43
44namespace arrow {
45
46using internal::checked_cast;
47
48class KeyValueMetadata;
49class MemoryPool;
50struct ArrayData;
51
52// ----------------------------------------------------------------------
53// Table methods
54
55/// \class SimpleTable
56/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch
57class SimpleTable : public Table {
58 public:
59 SimpleTable(std::shared_ptr<Schema> schema,
60 std::vector<std::shared_ptr<ChunkedArray>> columns, int64_t num_rows = -1)
61 : columns_(std::move(columns)) {
62 schema_ = std::move(schema);
63 if (num_rows < 0) {
64 if (columns_.size() == 0) {
65 num_rows_ = 0;
66 } else {
67 num_rows_ = columns_[0]->length();
68 }
69 } else {
70 num_rows_ = num_rows;
71 }
72 }
73
74 SimpleTable(std::shared_ptr<Schema> schema,
75 const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows = -1) {
76 schema_ = std::move(schema);
77 if (num_rows < 0) {
78 if (columns.size() == 0) {
79 num_rows_ = 0;
80 } else {
81 num_rows_ = columns[0]->length();
82 }
83 } else {
84 num_rows_ = num_rows;
85 }
86
87 columns_.resize(columns.size());
88 for (size_t i = 0; i < columns.size(); ++i) {
89 columns_[i] = std::make_shared<ChunkedArray>(columns[i]);
90 }
91 }
92
93 std::shared_ptr<ChunkedArray> column(int i) const override { return columns_[i]; }
94
95 const std::vector<std::shared_ptr<ChunkedArray>>& columns() const override {
96 return columns_;
97 }
98
99 std::shared_ptr<Table> Slice(int64_t offset, int64_t length) const override {
100 auto sliced = columns_;
101 int64_t num_rows = length;
102 for (auto& column : sliced) {
103 column = column->Slice(offset, length);
104 num_rows = column->length();
105 }
106 return Table::Make(schema_, std::move(sliced), num_rows);
107 }
108
109 Result<std::shared_ptr<Table>> RemoveColumn(int i) const override {
110 ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->RemoveField(i));
111
112 return Table::Make(std::move(new_schema), internal::DeleteVectorElement(columns_, i),
113 this->num_rows());
114 }
115
116 Result<std::shared_ptr<Table>> AddColumn(
117 int i, std::shared_ptr<Field> field_arg,
118 std::shared_ptr<ChunkedArray> col) const override {
119 DCHECK(col != nullptr);
120
121 if (col->length() != num_rows_) {
122 return Status::Invalid(
123 "Added column's length must match table's length. Expected length ", num_rows_,
124 " but got length ", col->length());
125 }
126
127 if (!field_arg->type()->Equals(col->type())) {
128 return Status::Invalid("Field type did not match data type");
129 }
130
131 ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->AddField(i, field_arg));
132 return Table::Make(std::move(new_schema),
133 internal::AddVectorElement(columns_, i, std::move(col)));
134 }
135
136 Result<std::shared_ptr<Table>> SetColumn(
137 int i, std::shared_ptr<Field> field_arg,
138 std::shared_ptr<ChunkedArray> col) const override {
139 DCHECK(col != nullptr);
140
141 if (col->length() != num_rows_) {
142 return Status::Invalid(
143 "Added column's length must match table's length. Expected length ", num_rows_,
144 " but got length ", col->length());
145 }
146
147 if (!field_arg->type()->Equals(col->type())) {
148 return Status::Invalid("Field type did not match data type");
149 }
150
151 ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->SetField(i, field_arg));
152 return Table::Make(std::move(new_schema),
153 internal::ReplaceVectorElement(columns_, i, std::move(col)));
154 }
155
156 std::shared_ptr<Table> ReplaceSchemaMetadata(
157 const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
158 auto new_schema = schema_->WithMetadata(metadata);
159 return Table::Make(std::move(new_schema), columns_);
160 }
161
162 Result<std::shared_ptr<Table>> Flatten(MemoryPool* pool) const override {
163 std::vector<std::shared_ptr<Field>> flattened_fields;
164 std::vector<std::shared_ptr<ChunkedArray>> flattened_columns;
165 for (int i = 0; i < num_columns(); ++i) {
166 std::vector<std::shared_ptr<Field>> new_fields = field(i)->Flatten();
167 ARROW_ASSIGN_OR_RAISE(auto new_columns, column(i)->Flatten(pool));
168 DCHECK_EQ(new_columns.size(), new_fields.size());
169 for (size_t j = 0; j < new_columns.size(); ++j) {
170 flattened_fields.push_back(new_fields[j]);
171 flattened_columns.push_back(new_columns[j]);
172 }
173 }
174 auto flattened_schema =
175 std::make_shared<Schema>(std::move(flattened_fields), schema_->metadata());
176 return Table::Make(std::move(flattened_schema), std::move(flattened_columns));
177 }
178
179 Status Validate() const override {
180 RETURN_NOT_OK(ValidateMeta());
181 for (int i = 0; i < num_columns(); ++i) {
182 const ChunkedArray* col = columns_[i].get();
183 Status st = col->Validate();
184 if (!st.ok()) {
185 std::stringstream ss;
186 ss << "Column " << i << ": " << st.message();
187 return st.WithMessage(ss.str());
188 }
189 }
190 return Status::OK();
191 }
192
193 Status ValidateFull() const override {
194 RETURN_NOT_OK(ValidateMeta());
195 for (int i = 0; i < num_columns(); ++i) {
196 const ChunkedArray* col = columns_[i].get();
197 Status st = col->ValidateFull();
198 if (!st.ok()) {
199 std::stringstream ss;
200 ss << "Column " << i << ": " << st.message();
201 return st.WithMessage(ss.str());
202 }
203 }
204 return Status::OK();
205 }
206
207 protected:
208 Status ValidateMeta() const {
209 // Make sure columns and schema are consistent
210 if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
211 return Status::Invalid("Number of columns did not match schema");
212 }
213 for (int i = 0; i < num_columns(); ++i) {
214 const ChunkedArray* col = columns_[i].get();
215 if (col == nullptr) {
216 return Status::Invalid("Column ", i, " was null");
217 }
218 if (!col->type()->Equals(*schema_->field(i)->type())) {
219 return Status::Invalid("Column data for field ", i, " with type ",
220 col->type()->ToString(), " is inconsistent with schema ",
221 schema_->field(i)->type()->ToString());
222 }
223 }
224
225 // Make sure columns are all the same length, and validate them
226 for (int i = 0; i < num_columns(); ++i) {
227 const ChunkedArray* col = columns_[i].get();
228 if (col->length() != num_rows_) {
229 return Status::Invalid("Column ", i, " named ", field(i)->name(),
230 " expected length ", num_rows_, " but got length ",
231 col->length());
232 }
233 Status st = col->Validate();
234 if (!st.ok()) {
235 std::stringstream ss;
236 ss << "Column " << i << ": " << st.message();
237 return st.WithMessage(ss.str());
238 }
239 }
240 return Status::OK();
241 }
242
243 private:
244 std::vector<std::shared_ptr<ChunkedArray>> columns_;
245};
246
247Table::Table() : num_rows_(0) {}
248
249std::vector<std::shared_ptr<Field>> Table::fields() const {
250 std::vector<std::shared_ptr<Field>> result;
251 for (int i = 0; i < this->num_columns(); ++i) {
252 result.emplace_back(this->field(i));
253 }
254 return result;
255}
256
257std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema,
258 std::vector<std::shared_ptr<ChunkedArray>> columns,
259 int64_t num_rows) {
260 return std::make_shared<SimpleTable>(std::move(schema), std::move(columns), num_rows);
261}
262
263std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema,
264 const std::vector<std::shared_ptr<Array>>& arrays,
265 int64_t num_rows) {
266 return std::make_shared<SimpleTable>(std::move(schema), arrays, num_rows);
267}
268
269Result<std::shared_ptr<Table>> Table::FromRecordBatchReader(RecordBatchReader* reader) {
270 std::shared_ptr<Table> table = nullptr;
271 RETURN_NOT_OK(reader->ReadAll(&table));
272 return table;
273}
274
275Result<std::shared_ptr<Table>> Table::FromRecordBatches(
276 std::shared_ptr<Schema> schema,
277 const std::vector<std::shared_ptr<RecordBatch>>& batches) {
278 const int nbatches = static_cast<int>(batches.size());
279 const int ncolumns = static_cast<int>(schema->num_fields());
280
281 int64_t num_rows = 0;
282 for (int i = 0; i < nbatches; ++i) {
283 if (!batches[i]->schema()->Equals(*schema, false)) {
284 return Status::Invalid("Schema at index ", static_cast<int>(i),
285 " was different: \n", schema->ToString(), "\nvs\n",
286 batches[i]->schema()->ToString());
287 }
288 num_rows += batches[i]->num_rows();
289 }
290
291 std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns);
292 std::vector<std::shared_ptr<Array>> column_arrays(nbatches);
293
294 for (int i = 0; i < ncolumns; ++i) {
295 for (int j = 0; j < nbatches; ++j) {
296 column_arrays[j] = batches[j]->column(i);
297 }
298 columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type());
299 }
300
301 return Table::Make(std::move(schema), std::move(columns), num_rows);
302}
303
304Result<std::shared_ptr<Table>> Table::FromRecordBatches(
305 const std::vector<std::shared_ptr<RecordBatch>>& batches) {
306 if (batches.size() == 0) {
307 return Status::Invalid("Must pass at least one record batch or an explicit Schema");
308 }
309
310 return FromRecordBatches(batches[0]->schema(), batches);
311}
312
313Result<std::shared_ptr<Table>> Table::FromChunkedStructArray(
314 const std::shared_ptr<ChunkedArray>& array) {
315 auto type = array->type();
316 if (type->id() != Type::STRUCT) {
317 return Status::Invalid("Expected a chunked struct array, got ", *type);
318 }
319 int num_columns = type->num_fields();
320 int num_chunks = array->num_chunks();
321
322 const auto& struct_chunks = array->chunks();
323 std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns);
324 for (int i = 0; i < num_columns; ++i) {
325 ArrayVector chunks(num_chunks);
326 std::transform(struct_chunks.begin(), struct_chunks.end(), chunks.begin(),
327 [i](const std::shared_ptr<Array>& struct_chunk) {
328 return static_cast<const StructArray&>(*struct_chunk).field(i);
329 });
330 columns[i] =
331 std::make_shared<ChunkedArray>(std::move(chunks), type->field(i)->type());
332 }
333
334 return Table::Make(::arrow::schema(type->fields()), std::move(columns),
335 array->length());
336}
337
338std::vector<std::string> Table::ColumnNames() const {
339 std::vector<std::string> names(num_columns());
340 for (int i = 0; i < num_columns(); ++i) {
341 names[i] = field(i)->name();
342 }
343 return names;
344}
345
346Result<std::shared_ptr<Table>> Table::RenameColumns(
347 const std::vector<std::string>& names) const {
348 if (names.size() != static_cast<size_t>(num_columns())) {
349 return Status::Invalid("tried to rename a table of ", num_columns(),
350 " columns but only ", names.size(), " names were provided");
351 }
352 std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns());
353 std::vector<std::shared_ptr<Field>> fields(num_columns());
354 for (int i = 0; i < num_columns(); ++i) {
355 columns[i] = column(i);
356 fields[i] = field(i)->WithName(names[i]);
357 }
358 return Table::Make(::arrow::schema(std::move(fields)), std::move(columns), num_rows());
359}
360
361Result<std::shared_ptr<Table>> Table::SelectColumns(
362 const std::vector<int>& indices) const {
363 int n = static_cast<int>(indices.size());
364
365 std::vector<std::shared_ptr<ChunkedArray>> columns(n);
366 std::vector<std::shared_ptr<Field>> fields(n);
367 for (int i = 0; i < n; i++) {
368 int pos = indices[i];
369 if (pos < 0 || pos > num_columns() - 1) {
370 return Status::Invalid("Invalid column index ", pos, " to select columns.");
371 }
372 columns[i] = column(pos);
373 fields[i] = field(pos);
374 }
375
376 auto new_schema =
377 std::make_shared<arrow::Schema>(std::move(fields), schema()->metadata());
378 return Table::Make(std::move(new_schema), std::move(columns), num_rows());
379}
380
381std::string Table::ToString() const {
382 std::stringstream ss;
383 ARROW_CHECK_OK(PrettyPrint(*this, 0, &ss));
384 return ss.str();
385}
386
387Result<std::shared_ptr<Table>> ConcatenateTables(
388 const std::vector<std::shared_ptr<Table>>& tables,
389 const ConcatenateTablesOptions options, MemoryPool* memory_pool) {
390 if (tables.size() == 0) {
391 return Status::Invalid("Must pass at least one table");
392 }
393
394 std::vector<std::shared_ptr<Table>> promoted_tables;
395 const std::vector<std::shared_ptr<Table>>* tables_to_concat = &tables;
396 if (options.unify_schemas) {
397 std::vector<std::shared_ptr<Schema>> schemas;
398 schemas.reserve(tables.size());
399 for (const auto& t : tables) {
400 schemas.push_back(t->schema());
401 }
402
403 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> unified_schema,
404 UnifySchemas(schemas, options.field_merge_options));
405
406 promoted_tables.reserve(tables.size());
407 for (const auto& t : tables) {
408 promoted_tables.emplace_back();
409 ARROW_ASSIGN_OR_RAISE(promoted_tables.back(),
410 PromoteTableToSchema(t, unified_schema, memory_pool));
411 }
412 tables_to_concat = &promoted_tables;
413 } else {
414 auto first_schema = tables[0]->schema();
415 for (size_t i = 1; i < tables.size(); ++i) {
416 if (!tables[i]->schema()->Equals(*first_schema, false)) {
417 return Status::Invalid("Schema at index ", i, " was different: \n",
418 first_schema->ToString(), "\nvs\n",
419 tables[i]->schema()->ToString());
420 }
421 }
422 }
423
424 std::shared_ptr<Schema> schema = tables_to_concat->front()->schema();
425
426 const int ncolumns = schema->num_fields();
427
428 std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns);
429 for (int i = 0; i < ncolumns; ++i) {
430 std::vector<std::shared_ptr<Array>> column_arrays;
431 for (const auto& table : *tables_to_concat) {
432 const std::vector<std::shared_ptr<Array>>& chunks = table->column(i)->chunks();
433 for (const auto& chunk : chunks) {
434 column_arrays.push_back(chunk);
435 }
436 }
437 columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type());
438 }
439 return Table::Make(std::move(schema), std::move(columns));
440}
441
442Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>& table,
443 const std::shared_ptr<Schema>& schema,
444 MemoryPool* pool) {
445 const std::shared_ptr<Schema> current_schema = table->schema();
446 if (current_schema->Equals(*schema, /*check_metadata=*/false)) {
447 return table->ReplaceSchemaMetadata(schema->metadata());
448 }
449
450 // fields_seen[i] == true iff that field is also in `schema`.
451 std::vector<bool> fields_seen(current_schema->num_fields(), false);
452
453 std::vector<std::shared_ptr<ChunkedArray>> columns;
454 columns.reserve(schema->num_fields());
455 const int64_t num_rows = table->num_rows();
456 auto AppendColumnOfNulls = [pool, &columns,
457 num_rows](const std::shared_ptr<DataType>& type) {
458 // TODO(bkietz): share the zero-filled buffers as much as possible across
459 // the null-filled arrays created here.
460 ARROW_ASSIGN_OR_RAISE(auto array_of_nulls, MakeArrayOfNull(type, num_rows, pool));
461 columns.push_back(std::make_shared<ChunkedArray>(array_of_nulls));
462 return Status::OK();
463 };
464
465 for (const auto& field : schema->fields()) {
466 const std::vector<int> field_indices =
467 current_schema->GetAllFieldIndices(field->name());
468 if (field_indices.empty()) {
469 RETURN_NOT_OK(AppendColumnOfNulls(field->type()));
470 continue;
471 }
472
473 if (field_indices.size() > 1) {
474 return Status::Invalid(
475 "PromoteTableToSchema cannot handle schemas with duplicate fields: ",
476 field->name());
477 }
478
479 const int field_index = field_indices[0];
480 const auto& current_field = current_schema->field(field_index);
481 if (!field->nullable() && current_field->nullable()) {
482 return Status::Invalid("Unable to promote field ", current_field->name(),
483 ": it was nullable but the target schema was not.");
484 }
485
486 fields_seen[field_index] = true;
487 if (current_field->type()->Equals(field->type())) {
488 columns.push_back(table->column(field_index));
489 continue;
490 }
491
492 if (current_field->type()->id() == Type::NA) {
493 RETURN_NOT_OK(AppendColumnOfNulls(field->type()));
494 continue;
495 }
496
497 return Status::Invalid("Unable to promote field ", field->name(),
498 ": incompatible types: ", field->type()->ToString(), " vs ",
499 current_field->type()->ToString());
500 }
501
502 auto unseen_field_iter = std::find(fields_seen.begin(), fields_seen.end(), false);
503 if (unseen_field_iter != fields_seen.end()) {
504 const size_t unseen_field_index = unseen_field_iter - fields_seen.begin();
505 return Status::Invalid(
506 "Incompatible schemas: field ",
507 current_schema->field(static_cast<int>(unseen_field_index))->name(),
508 " did not exist in the new schema.");
509 }
510
511 return Table::Make(schema, std::move(columns));
512}
513
514bool Table::Equals(const Table& other, bool check_metadata) const {
515 if (this == &other) {
516 return true;
517 }
518 if (!schema_->Equals(*other.schema(), check_metadata)) {
519 return false;
520 }
521 if (this->num_columns() != other.num_columns()) {
522 return false;
523 }
524
525 for (int i = 0; i < this->num_columns(); i++) {
526 if (!this->column(i)->Equals(other.column(i))) {
527 return false;
528 }
529 }
530 return true;
531}
532
533Result<std::shared_ptr<Table>> Table::CombineChunks(MemoryPool* pool) const {
534 const int ncolumns = num_columns();
535 std::vector<std::shared_ptr<ChunkedArray>> compacted_columns(ncolumns);
536 for (int i = 0; i < ncolumns; ++i) {
537 const auto& col = column(i);
538 if (col->num_chunks() <= 1) {
539 compacted_columns[i] = col;
540 continue;
541 }
542
543 if (is_binary_like(col->type()->id())) {
544 // ARROW-5744 Allow binary columns to be combined into multiple chunks to avoid
545 // buffer overflow
546 ArrayVector chunks;
547 int chunk_i = 0;
548 while (chunk_i < col->num_chunks()) {
549 ArrayVector safe_chunks;
550 int64_t data_length = 0;
551 for (; chunk_i < col->num_chunks(); ++chunk_i) {
552 const auto& chunk = col->chunk(chunk_i);
553 data_length += checked_cast<const BinaryArray&>(*chunk).total_values_length();
554 if (data_length >= kBinaryMemoryLimit) {
555 break;
556 }
557 safe_chunks.push_back(chunk);
558 }
559 chunks.emplace_back();
560 ARROW_ASSIGN_OR_RAISE(chunks.back(), Concatenate(safe_chunks, pool));
561 }
562 compacted_columns[i] = std::make_shared<ChunkedArray>(std::move(chunks));
563 } else {
564 ARROW_ASSIGN_OR_RAISE(auto compacted, Concatenate(col->chunks(), pool));
565 compacted_columns[i] = std::make_shared<ChunkedArray>(compacted);
566 }
567 }
568 return Table::Make(schema(), std::move(compacted_columns), num_rows_);
569}
570
571// ----------------------------------------------------------------------
572// Convert a table to a sequence of record batches
573
574TableBatchReader::TableBatchReader(const Table& table)
575 : table_(table),
576 column_data_(table.num_columns()),
577 chunk_numbers_(table.num_columns(), 0),
578 chunk_offsets_(table.num_columns(), 0),
579 absolute_row_position_(0),
580 max_chunksize_(std::numeric_limits<int64_t>::max()) {
581 for (int i = 0; i < table.num_columns(); ++i) {
582 column_data_[i] = table.column(i).get();
583 }
584}
585
586std::shared_ptr<Schema> TableBatchReader::schema() const { return table_.schema(); }
587
588void TableBatchReader::set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; }
589
590Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) {
591 if (absolute_row_position_ == table_.num_rows()) {
592 *out = nullptr;
593 return Status::OK();
594 }
595
596 // Determine the minimum contiguous slice across all columns
597 int64_t chunksize = std::min(table_.num_rows(), max_chunksize_);
598 std::vector<const Array*> chunks(table_.num_columns());
599 for (int i = 0; i < table_.num_columns(); ++i) {
600 auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get();
601 int64_t chunk_remaining = chunk->length() - chunk_offsets_[i];
602
603 if (chunk_remaining < chunksize) {
604 chunksize = chunk_remaining;
605 }
606
607 chunks[i] = chunk;
608 }
609
610 // Slice chunks and advance chunk index as appropriate
611 std::vector<std::shared_ptr<ArrayData>> batch_data(table_.num_columns());
612
613 for (int i = 0; i < table_.num_columns(); ++i) {
614 // Exhausted chunk
615 const Array* chunk = chunks[i];
616 const int64_t offset = chunk_offsets_[i];
617 std::shared_ptr<ArrayData> slice_data;
618 if ((chunk->length() - offset) == chunksize) {
619 ++chunk_numbers_[i];
620 chunk_offsets_[i] = 0;
621 if (offset > 0) {
622 // Need to slice
623 slice_data = chunk->Slice(offset, chunksize)->data();
624 } else {
625 // No slice
626 slice_data = chunk->data();
627 }
628 } else {
629 chunk_offsets_[i] += chunksize;
630 slice_data = chunk->Slice(offset, chunksize)->data();
631 }
632 batch_data[i] = std::move(slice_data);
633 }
634
635 absolute_row_position_ += chunksize;
636 *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data));
637
638 return Status::OK();
639}
640
641} // namespace arrow