]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/table.h" | |
19 | ||
20 | #include <algorithm> | |
21 | #include <cstdlib> | |
22 | #include <limits> | |
23 | #include <memory> | |
24 | #include <sstream> | |
25 | #include <utility> | |
26 | ||
27 | #include "arrow/array/array_base.h" | |
28 | #include "arrow/array/array_binary.h" | |
29 | #include "arrow/array/array_nested.h" | |
30 | #include "arrow/array/concatenate.h" | |
31 | #include "arrow/array/util.h" | |
32 | #include "arrow/chunked_array.h" | |
33 | #include "arrow/pretty_print.h" | |
34 | #include "arrow/record_batch.h" | |
35 | #include "arrow/result.h" | |
36 | #include "arrow/status.h" | |
37 | #include "arrow/type.h" | |
38 | #include "arrow/type_fwd.h" | |
39 | #include "arrow/type_traits.h" | |
40 | #include "arrow/util/checked_cast.h" | |
41 | #include "arrow/util/logging.h" | |
42 | #include "arrow/util/vector.h" | |
43 | ||
44 | namespace arrow { | |
45 | ||
46 | using internal::checked_cast; | |
47 | ||
48 | class KeyValueMetadata; | |
49 | class MemoryPool; | |
50 | struct ArrayData; | |
51 | ||
52 | // ---------------------------------------------------------------------- | |
53 | // Table methods | |
54 | ||
55 | /// \class SimpleTable | |
56 | /// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch | |
57 | class SimpleTable : public Table { | |
58 | public: | |
59 | SimpleTable(std::shared_ptr<Schema> schema, | |
60 | std::vector<std::shared_ptr<ChunkedArray>> columns, int64_t num_rows = -1) | |
61 | : columns_(std::move(columns)) { | |
62 | schema_ = std::move(schema); | |
63 | if (num_rows < 0) { | |
64 | if (columns_.size() == 0) { | |
65 | num_rows_ = 0; | |
66 | } else { | |
67 | num_rows_ = columns_[0]->length(); | |
68 | } | |
69 | } else { | |
70 | num_rows_ = num_rows; | |
71 | } | |
72 | } | |
73 | ||
74 | SimpleTable(std::shared_ptr<Schema> schema, | |
75 | const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows = -1) { | |
76 | schema_ = std::move(schema); | |
77 | if (num_rows < 0) { | |
78 | if (columns.size() == 0) { | |
79 | num_rows_ = 0; | |
80 | } else { | |
81 | num_rows_ = columns[0]->length(); | |
82 | } | |
83 | } else { | |
84 | num_rows_ = num_rows; | |
85 | } | |
86 | ||
87 | columns_.resize(columns.size()); | |
88 | for (size_t i = 0; i < columns.size(); ++i) { | |
89 | columns_[i] = std::make_shared<ChunkedArray>(columns[i]); | |
90 | } | |
91 | } | |
92 | ||
93 | std::shared_ptr<ChunkedArray> column(int i) const override { return columns_[i]; } | |
94 | ||
95 | const std::vector<std::shared_ptr<ChunkedArray>>& columns() const override { | |
96 | return columns_; | |
97 | } | |
98 | ||
99 | std::shared_ptr<Table> Slice(int64_t offset, int64_t length) const override { | |
100 | auto sliced = columns_; | |
101 | int64_t num_rows = length; | |
102 | for (auto& column : sliced) { | |
103 | column = column->Slice(offset, length); | |
104 | num_rows = column->length(); | |
105 | } | |
106 | return Table::Make(schema_, std::move(sliced), num_rows); | |
107 | } | |
108 | ||
109 | Result<std::shared_ptr<Table>> RemoveColumn(int i) const override { | |
110 | ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->RemoveField(i)); | |
111 | ||
112 | return Table::Make(std::move(new_schema), internal::DeleteVectorElement(columns_, i), | |
113 | this->num_rows()); | |
114 | } | |
115 | ||
116 | Result<std::shared_ptr<Table>> AddColumn( | |
117 | int i, std::shared_ptr<Field> field_arg, | |
118 | std::shared_ptr<ChunkedArray> col) const override { | |
119 | DCHECK(col != nullptr); | |
120 | ||
121 | if (col->length() != num_rows_) { | |
122 | return Status::Invalid( | |
123 | "Added column's length must match table's length. Expected length ", num_rows_, | |
124 | " but got length ", col->length()); | |
125 | } | |
126 | ||
127 | if (!field_arg->type()->Equals(col->type())) { | |
128 | return Status::Invalid("Field type did not match data type"); | |
129 | } | |
130 | ||
131 | ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->AddField(i, field_arg)); | |
132 | return Table::Make(std::move(new_schema), | |
133 | internal::AddVectorElement(columns_, i, std::move(col))); | |
134 | } | |
135 | ||
136 | Result<std::shared_ptr<Table>> SetColumn( | |
137 | int i, std::shared_ptr<Field> field_arg, | |
138 | std::shared_ptr<ChunkedArray> col) const override { | |
139 | DCHECK(col != nullptr); | |
140 | ||
141 | if (col->length() != num_rows_) { | |
142 | return Status::Invalid( | |
143 | "Added column's length must match table's length. Expected length ", num_rows_, | |
144 | " but got length ", col->length()); | |
145 | } | |
146 | ||
147 | if (!field_arg->type()->Equals(col->type())) { | |
148 | return Status::Invalid("Field type did not match data type"); | |
149 | } | |
150 | ||
151 | ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->SetField(i, field_arg)); | |
152 | return Table::Make(std::move(new_schema), | |
153 | internal::ReplaceVectorElement(columns_, i, std::move(col))); | |
154 | } | |
155 | ||
156 | std::shared_ptr<Table> ReplaceSchemaMetadata( | |
157 | const std::shared_ptr<const KeyValueMetadata>& metadata) const override { | |
158 | auto new_schema = schema_->WithMetadata(metadata); | |
159 | return Table::Make(std::move(new_schema), columns_); | |
160 | } | |
161 | ||
162 | Result<std::shared_ptr<Table>> Flatten(MemoryPool* pool) const override { | |
163 | std::vector<std::shared_ptr<Field>> flattened_fields; | |
164 | std::vector<std::shared_ptr<ChunkedArray>> flattened_columns; | |
165 | for (int i = 0; i < num_columns(); ++i) { | |
166 | std::vector<std::shared_ptr<Field>> new_fields = field(i)->Flatten(); | |
167 | ARROW_ASSIGN_OR_RAISE(auto new_columns, column(i)->Flatten(pool)); | |
168 | DCHECK_EQ(new_columns.size(), new_fields.size()); | |
169 | for (size_t j = 0; j < new_columns.size(); ++j) { | |
170 | flattened_fields.push_back(new_fields[j]); | |
171 | flattened_columns.push_back(new_columns[j]); | |
172 | } | |
173 | } | |
174 | auto flattened_schema = | |
175 | std::make_shared<Schema>(std::move(flattened_fields), schema_->metadata()); | |
176 | return Table::Make(std::move(flattened_schema), std::move(flattened_columns)); | |
177 | } | |
178 | ||
179 | Status Validate() const override { | |
180 | RETURN_NOT_OK(ValidateMeta()); | |
181 | for (int i = 0; i < num_columns(); ++i) { | |
182 | const ChunkedArray* col = columns_[i].get(); | |
183 | Status st = col->Validate(); | |
184 | if (!st.ok()) { | |
185 | std::stringstream ss; | |
186 | ss << "Column " << i << ": " << st.message(); | |
187 | return st.WithMessage(ss.str()); | |
188 | } | |
189 | } | |
190 | return Status::OK(); | |
191 | } | |
192 | ||
193 | Status ValidateFull() const override { | |
194 | RETURN_NOT_OK(ValidateMeta()); | |
195 | for (int i = 0; i < num_columns(); ++i) { | |
196 | const ChunkedArray* col = columns_[i].get(); | |
197 | Status st = col->ValidateFull(); | |
198 | if (!st.ok()) { | |
199 | std::stringstream ss; | |
200 | ss << "Column " << i << ": " << st.message(); | |
201 | return st.WithMessage(ss.str()); | |
202 | } | |
203 | } | |
204 | return Status::OK(); | |
205 | } | |
206 | ||
207 | protected: | |
208 | Status ValidateMeta() const { | |
209 | // Make sure columns and schema are consistent | |
210 | if (static_cast<int>(columns_.size()) != schema_->num_fields()) { | |
211 | return Status::Invalid("Number of columns did not match schema"); | |
212 | } | |
213 | for (int i = 0; i < num_columns(); ++i) { | |
214 | const ChunkedArray* col = columns_[i].get(); | |
215 | if (col == nullptr) { | |
216 | return Status::Invalid("Column ", i, " was null"); | |
217 | } | |
218 | if (!col->type()->Equals(*schema_->field(i)->type())) { | |
219 | return Status::Invalid("Column data for field ", i, " with type ", | |
220 | col->type()->ToString(), " is inconsistent with schema ", | |
221 | schema_->field(i)->type()->ToString()); | |
222 | } | |
223 | } | |
224 | ||
225 | // Make sure columns are all the same length, and validate them | |
226 | for (int i = 0; i < num_columns(); ++i) { | |
227 | const ChunkedArray* col = columns_[i].get(); | |
228 | if (col->length() != num_rows_) { | |
229 | return Status::Invalid("Column ", i, " named ", field(i)->name(), | |
230 | " expected length ", num_rows_, " but got length ", | |
231 | col->length()); | |
232 | } | |
233 | Status st = col->Validate(); | |
234 | if (!st.ok()) { | |
235 | std::stringstream ss; | |
236 | ss << "Column " << i << ": " << st.message(); | |
237 | return st.WithMessage(ss.str()); | |
238 | } | |
239 | } | |
240 | return Status::OK(); | |
241 | } | |
242 | ||
243 | private: | |
244 | std::vector<std::shared_ptr<ChunkedArray>> columns_; | |
245 | }; | |
246 | ||
247 | Table::Table() : num_rows_(0) {} | |
248 | ||
249 | std::vector<std::shared_ptr<Field>> Table::fields() const { | |
250 | std::vector<std::shared_ptr<Field>> result; | |
251 | for (int i = 0; i < this->num_columns(); ++i) { | |
252 | result.emplace_back(this->field(i)); | |
253 | } | |
254 | return result; | |
255 | } | |
256 | ||
257 | std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema, | |
258 | std::vector<std::shared_ptr<ChunkedArray>> columns, | |
259 | int64_t num_rows) { | |
260 | return std::make_shared<SimpleTable>(std::move(schema), std::move(columns), num_rows); | |
261 | } | |
262 | ||
263 | std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema, | |
264 | const std::vector<std::shared_ptr<Array>>& arrays, | |
265 | int64_t num_rows) { | |
266 | return std::make_shared<SimpleTable>(std::move(schema), arrays, num_rows); | |
267 | } | |
268 | ||
269 | Result<std::shared_ptr<Table>> Table::FromRecordBatchReader(RecordBatchReader* reader) { | |
270 | std::shared_ptr<Table> table = nullptr; | |
271 | RETURN_NOT_OK(reader->ReadAll(&table)); | |
272 | return table; | |
273 | } | |
274 | ||
275 | Result<std::shared_ptr<Table>> Table::FromRecordBatches( | |
276 | std::shared_ptr<Schema> schema, | |
277 | const std::vector<std::shared_ptr<RecordBatch>>& batches) { | |
278 | const int nbatches = static_cast<int>(batches.size()); | |
279 | const int ncolumns = static_cast<int>(schema->num_fields()); | |
280 | ||
281 | int64_t num_rows = 0; | |
282 | for (int i = 0; i < nbatches; ++i) { | |
283 | if (!batches[i]->schema()->Equals(*schema, false)) { | |
284 | return Status::Invalid("Schema at index ", static_cast<int>(i), | |
285 | " was different: \n", schema->ToString(), "\nvs\n", | |
286 | batches[i]->schema()->ToString()); | |
287 | } | |
288 | num_rows += batches[i]->num_rows(); | |
289 | } | |
290 | ||
291 | std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns); | |
292 | std::vector<std::shared_ptr<Array>> column_arrays(nbatches); | |
293 | ||
294 | for (int i = 0; i < ncolumns; ++i) { | |
295 | for (int j = 0; j < nbatches; ++j) { | |
296 | column_arrays[j] = batches[j]->column(i); | |
297 | } | |
298 | columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type()); | |
299 | } | |
300 | ||
301 | return Table::Make(std::move(schema), std::move(columns), num_rows); | |
302 | } | |
303 | ||
304 | Result<std::shared_ptr<Table>> Table::FromRecordBatches( | |
305 | const std::vector<std::shared_ptr<RecordBatch>>& batches) { | |
306 | if (batches.size() == 0) { | |
307 | return Status::Invalid("Must pass at least one record batch or an explicit Schema"); | |
308 | } | |
309 | ||
310 | return FromRecordBatches(batches[0]->schema(), batches); | |
311 | } | |
312 | ||
313 | Result<std::shared_ptr<Table>> Table::FromChunkedStructArray( | |
314 | const std::shared_ptr<ChunkedArray>& array) { | |
315 | auto type = array->type(); | |
316 | if (type->id() != Type::STRUCT) { | |
317 | return Status::Invalid("Expected a chunked struct array, got ", *type); | |
318 | } | |
319 | int num_columns = type->num_fields(); | |
320 | int num_chunks = array->num_chunks(); | |
321 | ||
322 | const auto& struct_chunks = array->chunks(); | |
323 | std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns); | |
324 | for (int i = 0; i < num_columns; ++i) { | |
325 | ArrayVector chunks(num_chunks); | |
326 | std::transform(struct_chunks.begin(), struct_chunks.end(), chunks.begin(), | |
327 | [i](const std::shared_ptr<Array>& struct_chunk) { | |
328 | return static_cast<const StructArray&>(*struct_chunk).field(i); | |
329 | }); | |
330 | columns[i] = | |
331 | std::make_shared<ChunkedArray>(std::move(chunks), type->field(i)->type()); | |
332 | } | |
333 | ||
334 | return Table::Make(::arrow::schema(type->fields()), std::move(columns), | |
335 | array->length()); | |
336 | } | |
337 | ||
338 | std::vector<std::string> Table::ColumnNames() const { | |
339 | std::vector<std::string> names(num_columns()); | |
340 | for (int i = 0; i < num_columns(); ++i) { | |
341 | names[i] = field(i)->name(); | |
342 | } | |
343 | return names; | |
344 | } | |
345 | ||
346 | Result<std::shared_ptr<Table>> Table::RenameColumns( | |
347 | const std::vector<std::string>& names) const { | |
348 | if (names.size() != static_cast<size_t>(num_columns())) { | |
349 | return Status::Invalid("tried to rename a table of ", num_columns(), | |
350 | " columns but only ", names.size(), " names were provided"); | |
351 | } | |
352 | std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns()); | |
353 | std::vector<std::shared_ptr<Field>> fields(num_columns()); | |
354 | for (int i = 0; i < num_columns(); ++i) { | |
355 | columns[i] = column(i); | |
356 | fields[i] = field(i)->WithName(names[i]); | |
357 | } | |
358 | return Table::Make(::arrow::schema(std::move(fields)), std::move(columns), num_rows()); | |
359 | } | |
360 | ||
361 | Result<std::shared_ptr<Table>> Table::SelectColumns( | |
362 | const std::vector<int>& indices) const { | |
363 | int n = static_cast<int>(indices.size()); | |
364 | ||
365 | std::vector<std::shared_ptr<ChunkedArray>> columns(n); | |
366 | std::vector<std::shared_ptr<Field>> fields(n); | |
367 | for (int i = 0; i < n; i++) { | |
368 | int pos = indices[i]; | |
369 | if (pos < 0 || pos > num_columns() - 1) { | |
370 | return Status::Invalid("Invalid column index ", pos, " to select columns."); | |
371 | } | |
372 | columns[i] = column(pos); | |
373 | fields[i] = field(pos); | |
374 | } | |
375 | ||
376 | auto new_schema = | |
377 | std::make_shared<arrow::Schema>(std::move(fields), schema()->metadata()); | |
378 | return Table::Make(std::move(new_schema), std::move(columns), num_rows()); | |
379 | } | |
380 | ||
381 | std::string Table::ToString() const { | |
382 | std::stringstream ss; | |
383 | ARROW_CHECK_OK(PrettyPrint(*this, 0, &ss)); | |
384 | return ss.str(); | |
385 | } | |
386 | ||
387 | Result<std::shared_ptr<Table>> ConcatenateTables( | |
388 | const std::vector<std::shared_ptr<Table>>& tables, | |
389 | const ConcatenateTablesOptions options, MemoryPool* memory_pool) { | |
390 | if (tables.size() == 0) { | |
391 | return Status::Invalid("Must pass at least one table"); | |
392 | } | |
393 | ||
394 | std::vector<std::shared_ptr<Table>> promoted_tables; | |
395 | const std::vector<std::shared_ptr<Table>>* tables_to_concat = &tables; | |
396 | if (options.unify_schemas) { | |
397 | std::vector<std::shared_ptr<Schema>> schemas; | |
398 | schemas.reserve(tables.size()); | |
399 | for (const auto& t : tables) { | |
400 | schemas.push_back(t->schema()); | |
401 | } | |
402 | ||
403 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> unified_schema, | |
404 | UnifySchemas(schemas, options.field_merge_options)); | |
405 | ||
406 | promoted_tables.reserve(tables.size()); | |
407 | for (const auto& t : tables) { | |
408 | promoted_tables.emplace_back(); | |
409 | ARROW_ASSIGN_OR_RAISE(promoted_tables.back(), | |
410 | PromoteTableToSchema(t, unified_schema, memory_pool)); | |
411 | } | |
412 | tables_to_concat = &promoted_tables; | |
413 | } else { | |
414 | auto first_schema = tables[0]->schema(); | |
415 | for (size_t i = 1; i < tables.size(); ++i) { | |
416 | if (!tables[i]->schema()->Equals(*first_schema, false)) { | |
417 | return Status::Invalid("Schema at index ", i, " was different: \n", | |
418 | first_schema->ToString(), "\nvs\n", | |
419 | tables[i]->schema()->ToString()); | |
420 | } | |
421 | } | |
422 | } | |
423 | ||
424 | std::shared_ptr<Schema> schema = tables_to_concat->front()->schema(); | |
425 | ||
426 | const int ncolumns = schema->num_fields(); | |
427 | ||
428 | std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns); | |
429 | for (int i = 0; i < ncolumns; ++i) { | |
430 | std::vector<std::shared_ptr<Array>> column_arrays; | |
431 | for (const auto& table : *tables_to_concat) { | |
432 | const std::vector<std::shared_ptr<Array>>& chunks = table->column(i)->chunks(); | |
433 | for (const auto& chunk : chunks) { | |
434 | column_arrays.push_back(chunk); | |
435 | } | |
436 | } | |
437 | columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type()); | |
438 | } | |
439 | return Table::Make(std::move(schema), std::move(columns)); | |
440 | } | |
441 | ||
442 | Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>& table, | |
443 | const std::shared_ptr<Schema>& schema, | |
444 | MemoryPool* pool) { | |
445 | const std::shared_ptr<Schema> current_schema = table->schema(); | |
446 | if (current_schema->Equals(*schema, /*check_metadata=*/false)) { | |
447 | return table->ReplaceSchemaMetadata(schema->metadata()); | |
448 | } | |
449 | ||
450 | // fields_seen[i] == true iff that field is also in `schema`. | |
451 | std::vector<bool> fields_seen(current_schema->num_fields(), false); | |
452 | ||
453 | std::vector<std::shared_ptr<ChunkedArray>> columns; | |
454 | columns.reserve(schema->num_fields()); | |
455 | const int64_t num_rows = table->num_rows(); | |
456 | auto AppendColumnOfNulls = [pool, &columns, | |
457 | num_rows](const std::shared_ptr<DataType>& type) { | |
458 | // TODO(bkietz): share the zero-filled buffers as much as possible across | |
459 | // the null-filled arrays created here. | |
460 | ARROW_ASSIGN_OR_RAISE(auto array_of_nulls, MakeArrayOfNull(type, num_rows, pool)); | |
461 | columns.push_back(std::make_shared<ChunkedArray>(array_of_nulls)); | |
462 | return Status::OK(); | |
463 | }; | |
464 | ||
465 | for (const auto& field : schema->fields()) { | |
466 | const std::vector<int> field_indices = | |
467 | current_schema->GetAllFieldIndices(field->name()); | |
468 | if (field_indices.empty()) { | |
469 | RETURN_NOT_OK(AppendColumnOfNulls(field->type())); | |
470 | continue; | |
471 | } | |
472 | ||
473 | if (field_indices.size() > 1) { | |
474 | return Status::Invalid( | |
475 | "PromoteTableToSchema cannot handle schemas with duplicate fields: ", | |
476 | field->name()); | |
477 | } | |
478 | ||
479 | const int field_index = field_indices[0]; | |
480 | const auto& current_field = current_schema->field(field_index); | |
481 | if (!field->nullable() && current_field->nullable()) { | |
482 | return Status::Invalid("Unable to promote field ", current_field->name(), | |
483 | ": it was nullable but the target schema was not."); | |
484 | } | |
485 | ||
486 | fields_seen[field_index] = true; | |
487 | if (current_field->type()->Equals(field->type())) { | |
488 | columns.push_back(table->column(field_index)); | |
489 | continue; | |
490 | } | |
491 | ||
492 | if (current_field->type()->id() == Type::NA) { | |
493 | RETURN_NOT_OK(AppendColumnOfNulls(field->type())); | |
494 | continue; | |
495 | } | |
496 | ||
497 | return Status::Invalid("Unable to promote field ", field->name(), | |
498 | ": incompatible types: ", field->type()->ToString(), " vs ", | |
499 | current_field->type()->ToString()); | |
500 | } | |
501 | ||
502 | auto unseen_field_iter = std::find(fields_seen.begin(), fields_seen.end(), false); | |
503 | if (unseen_field_iter != fields_seen.end()) { | |
504 | const size_t unseen_field_index = unseen_field_iter - fields_seen.begin(); | |
505 | return Status::Invalid( | |
506 | "Incompatible schemas: field ", | |
507 | current_schema->field(static_cast<int>(unseen_field_index))->name(), | |
508 | " did not exist in the new schema."); | |
509 | } | |
510 | ||
511 | return Table::Make(schema, std::move(columns)); | |
512 | } | |
513 | ||
514 | bool Table::Equals(const Table& other, bool check_metadata) const { | |
515 | if (this == &other) { | |
516 | return true; | |
517 | } | |
518 | if (!schema_->Equals(*other.schema(), check_metadata)) { | |
519 | return false; | |
520 | } | |
521 | if (this->num_columns() != other.num_columns()) { | |
522 | return false; | |
523 | } | |
524 | ||
525 | for (int i = 0; i < this->num_columns(); i++) { | |
526 | if (!this->column(i)->Equals(other.column(i))) { | |
527 | return false; | |
528 | } | |
529 | } | |
530 | return true; | |
531 | } | |
532 | ||
533 | Result<std::shared_ptr<Table>> Table::CombineChunks(MemoryPool* pool) const { | |
534 | const int ncolumns = num_columns(); | |
535 | std::vector<std::shared_ptr<ChunkedArray>> compacted_columns(ncolumns); | |
536 | for (int i = 0; i < ncolumns; ++i) { | |
537 | const auto& col = column(i); | |
538 | if (col->num_chunks() <= 1) { | |
539 | compacted_columns[i] = col; | |
540 | continue; | |
541 | } | |
542 | ||
543 | if (is_binary_like(col->type()->id())) { | |
544 | // ARROW-5744 Allow binary columns to be combined into multiple chunks to avoid | |
545 | // buffer overflow | |
546 | ArrayVector chunks; | |
547 | int chunk_i = 0; | |
548 | while (chunk_i < col->num_chunks()) { | |
549 | ArrayVector safe_chunks; | |
550 | int64_t data_length = 0; | |
551 | for (; chunk_i < col->num_chunks(); ++chunk_i) { | |
552 | const auto& chunk = col->chunk(chunk_i); | |
553 | data_length += checked_cast<const BinaryArray&>(*chunk).total_values_length(); | |
554 | if (data_length >= kBinaryMemoryLimit) { | |
555 | break; | |
556 | } | |
557 | safe_chunks.push_back(chunk); | |
558 | } | |
559 | chunks.emplace_back(); | |
560 | ARROW_ASSIGN_OR_RAISE(chunks.back(), Concatenate(safe_chunks, pool)); | |
561 | } | |
562 | compacted_columns[i] = std::make_shared<ChunkedArray>(std::move(chunks)); | |
563 | } else { | |
564 | ARROW_ASSIGN_OR_RAISE(auto compacted, Concatenate(col->chunks(), pool)); | |
565 | compacted_columns[i] = std::make_shared<ChunkedArray>(compacted); | |
566 | } | |
567 | } | |
568 | return Table::Make(schema(), std::move(compacted_columns), num_rows_); | |
569 | } | |
570 | ||
571 | // ---------------------------------------------------------------------- | |
572 | // Convert a table to a sequence of record batches | |
573 | ||
574 | TableBatchReader::TableBatchReader(const Table& table) | |
575 | : table_(table), | |
576 | column_data_(table.num_columns()), | |
577 | chunk_numbers_(table.num_columns(), 0), | |
578 | chunk_offsets_(table.num_columns(), 0), | |
579 | absolute_row_position_(0), | |
580 | max_chunksize_(std::numeric_limits<int64_t>::max()) { | |
581 | for (int i = 0; i < table.num_columns(); ++i) { | |
582 | column_data_[i] = table.column(i).get(); | |
583 | } | |
584 | } | |
585 | ||
586 | std::shared_ptr<Schema> TableBatchReader::schema() const { return table_.schema(); } | |
587 | ||
588 | void TableBatchReader::set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; } | |
589 | ||
590 | Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) { | |
591 | if (absolute_row_position_ == table_.num_rows()) { | |
592 | *out = nullptr; | |
593 | return Status::OK(); | |
594 | } | |
595 | ||
596 | // Determine the minimum contiguous slice across all columns | |
597 | int64_t chunksize = std::min(table_.num_rows(), max_chunksize_); | |
598 | std::vector<const Array*> chunks(table_.num_columns()); | |
599 | for (int i = 0; i < table_.num_columns(); ++i) { | |
600 | auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get(); | |
601 | int64_t chunk_remaining = chunk->length() - chunk_offsets_[i]; | |
602 | ||
603 | if (chunk_remaining < chunksize) { | |
604 | chunksize = chunk_remaining; | |
605 | } | |
606 | ||
607 | chunks[i] = chunk; | |
608 | } | |
609 | ||
610 | // Slice chunks and advance chunk index as appropriate | |
611 | std::vector<std::shared_ptr<ArrayData>> batch_data(table_.num_columns()); | |
612 | ||
613 | for (int i = 0; i < table_.num_columns(); ++i) { | |
614 | // Exhausted chunk | |
615 | const Array* chunk = chunks[i]; | |
616 | const int64_t offset = chunk_offsets_[i]; | |
617 | std::shared_ptr<ArrayData> slice_data; | |
618 | if ((chunk->length() - offset) == chunksize) { | |
619 | ++chunk_numbers_[i]; | |
620 | chunk_offsets_[i] = 0; | |
621 | if (offset > 0) { | |
622 | // Need to slice | |
623 | slice_data = chunk->Slice(offset, chunksize)->data(); | |
624 | } else { | |
625 | // No slice | |
626 | slice_data = chunk->data(); | |
627 | } | |
628 | } else { | |
629 | chunk_offsets_[i] += chunksize; | |
630 | slice_data = chunk->Slice(offset, chunksize)->data(); | |
631 | } | |
632 | batch_data[i] = std::move(slice_data); | |
633 | } | |
634 | ||
635 | absolute_row_position_ += chunksize; | |
636 | *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); | |
637 | ||
638 | return Status::OK(); | |
639 | } | |
640 | ||
641 | } // namespace arrow |