]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/csv/reader.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / csv / reader.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/csv/reader.h"
19
20 #include <cstdint>
21 #include <cstring>
22 #include <functional>
23 #include <limits>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <unordered_map>
28 #include <utility>
29 #include <vector>
30
31 #include "arrow/array.h"
32 #include "arrow/buffer.h"
33 #include "arrow/csv/chunker.h"
34 #include "arrow/csv/column_builder.h"
35 #include "arrow/csv/column_decoder.h"
36 #include "arrow/csv/options.h"
37 #include "arrow/csv/parser.h"
38 #include "arrow/io/interfaces.h"
39 #include "arrow/result.h"
40 #include "arrow/status.h"
41 #include "arrow/table.h"
42 #include "arrow/type.h"
43 #include "arrow/type_fwd.h"
44 #include "arrow/util/async_generator.h"
45 #include "arrow/util/future.h"
46 #include "arrow/util/iterator.h"
47 #include "arrow/util/logging.h"
48 #include "arrow/util/macros.h"
49 #include "arrow/util/optional.h"
50 #include "arrow/util/task_group.h"
51 #include "arrow/util/thread_pool.h"
52 #include "arrow/util/utf8.h"
53 #include "arrow/util/vector.h"
54
55 namespace arrow {
56 namespace csv {
57
58 using internal::Executor;
59
60 namespace {
61
62 struct ConversionSchema {
63 struct Column {
64 std::string name;
65 // Physical column index in CSV file
66 int32_t index;
67 // If true, make a column of nulls
68 bool is_missing;
69 // If set, convert the CSV column to this type
70 // If unset (and is_missing is false), infer the type from the CSV column
71 std::shared_ptr<DataType> type;
72 };
73
74 static Column NullColumn(std::string col_name, std::shared_ptr<DataType> type) {
75 return Column{std::move(col_name), -1, true, std::move(type)};
76 }
77
78 static Column TypedColumn(std::string col_name, int32_t col_index,
79 std::shared_ptr<DataType> type) {
80 return Column{std::move(col_name), col_index, false, std::move(type)};
81 }
82
83 static Column InferredColumn(std::string col_name, int32_t col_index) {
84 return Column{std::move(col_name), col_index, false, nullptr};
85 }
86
87 std::vector<Column> columns;
88 };
89
90 // An iterator of Buffers that makes sure there is no straddling CRLF sequence.
91 class CSVBufferIterator {
92 public:
93 static Iterator<std::shared_ptr<Buffer>> Make(
94 Iterator<std::shared_ptr<Buffer>> buffer_iterator) {
95 Transformer<std::shared_ptr<Buffer>, std::shared_ptr<Buffer>> fn =
96 CSVBufferIterator();
97 return MakeTransformedIterator(std::move(buffer_iterator), fn);
98 }
99
100 static AsyncGenerator<std::shared_ptr<Buffer>> MakeAsync(
101 AsyncGenerator<std::shared_ptr<Buffer>> buffer_iterator) {
102 Transformer<std::shared_ptr<Buffer>, std::shared_ptr<Buffer>> fn =
103 CSVBufferIterator();
104 return MakeTransformedGenerator(std::move(buffer_iterator), fn);
105 }
106
107 Result<TransformFlow<std::shared_ptr<Buffer>>> operator()(std::shared_ptr<Buffer> buf) {
108 if (buf == nullptr) {
109 // EOF
110 return TransformFinish();
111 }
112
113 int64_t offset = 0;
114 if (first_buffer_) {
115 ARROW_ASSIGN_OR_RAISE(auto data, util::SkipUTF8BOM(buf->data(), buf->size()));
116 offset += data - buf->data();
117 DCHECK_GE(offset, 0);
118 first_buffer_ = false;
119 }
120
121 if (trailing_cr_ && buf->data()[offset] == '\n') {
122 // Skip '\r\n' line separator that started at the end of previous buffer
123 ++offset;
124 }
125
126 trailing_cr_ = (buf->data()[buf->size() - 1] == '\r');
127 buf = SliceBuffer(buf, offset);
128 if (buf->size() == 0) {
129 // EOF
130 return TransformFinish();
131 } else {
132 return TransformYield(buf);
133 }
134 }
135
136 protected:
137 bool first_buffer_ = true;
138 // Whether there was a trailing CR at the end of last received buffer
139 bool trailing_cr_ = false;
140 };
141
142 struct CSVBlock {
143 // (partial + completion + buffer) is an entire delimited CSV buffer.
144 std::shared_ptr<Buffer> partial;
145 std::shared_ptr<Buffer> completion;
146 std::shared_ptr<Buffer> buffer;
147 int64_t block_index;
148 bool is_final;
149 int64_t bytes_skipped;
150 std::function<Status(int64_t)> consume_bytes;
151 };
152
153 } // namespace
154 } // namespace csv
155
156 template <>
157 struct IterationTraits<csv::CSVBlock> {
158 static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, 0, {}}; }
159 static bool IsEnd(const csv::CSVBlock& val) { return val.block_index < 0; }
160 };
161
162 namespace csv {
163 namespace {
164
165 // This is a callable that can be used to transform an iterator. The source iterator
166 // will contain buffers of data and the output iterator will contain delimited CSV
167 // blocks. util::optional is used so that there is an end token (required by the
168 // iterator APIs (e.g. Visit)) even though an empty optional is never used in this code.
169 class BlockReader {
170 public:
171 BlockReader(std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer,
172 int64_t skip_rows)
173 : chunker_(std::move(chunker)),
174 partial_(std::make_shared<Buffer>("")),
175 buffer_(std::move(first_buffer)),
176 skip_rows_(skip_rows) {}
177
178 protected:
179 std::unique_ptr<Chunker> chunker_;
180 std::shared_ptr<Buffer> partial_, buffer_;
181 int64_t skip_rows_;
182 int64_t block_index_ = 0;
183 // Whether there was a trailing CR at the end of last received buffer
184 bool trailing_cr_ = false;
185 };
186
187 // An object that reads delimited CSV blocks for serial use.
188 // The number of bytes consumed should be notified after each read,
189 // using CSVBlock::consume_bytes.
190 class SerialBlockReader : public BlockReader {
191 public:
192 using BlockReader::BlockReader;
193
194 static Iterator<CSVBlock> MakeIterator(
195 Iterator<std::shared_ptr<Buffer>> buffer_iterator, std::unique_ptr<Chunker> chunker,
196 std::shared_ptr<Buffer> first_buffer, int64_t skip_rows) {
197 auto block_reader =
198 std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer, skip_rows);
199 // Wrap shared pointer in callable
200 Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
201 [block_reader](std::shared_ptr<Buffer> buf) {
202 return (*block_reader)(std::move(buf));
203 };
204 return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn);
205 }
206
207 static AsyncGenerator<CSVBlock> MakeAsyncIterator(
208 AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
209 std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer,
210 int64_t skip_rows) {
211 auto block_reader =
212 std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer, skip_rows);
213 // Wrap shared pointer in callable
214 Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
215 [block_reader](std::shared_ptr<Buffer> next) {
216 return (*block_reader)(std::move(next));
217 };
218 return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn);
219 }
220
221 Result<TransformFlow<CSVBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
222 if (buffer_ == nullptr) {
223 return TransformFinish();
224 }
225
226 bool is_final = (next_buffer == nullptr);
227 int64_t bytes_skipped = 0;
228
229 if (skip_rows_) {
230 bytes_skipped += partial_->size();
231 auto orig_size = buffer_->size();
232 RETURN_NOT_OK(
233 chunker_->ProcessSkip(partial_, buffer_, is_final, &skip_rows_, &buffer_));
234 bytes_skipped += orig_size - buffer_->size();
235 auto empty = std::make_shared<Buffer>(nullptr, 0);
236 if (skip_rows_) {
237 // Still have rows beyond this buffer to skip return empty block
238 partial_ = std::move(buffer_);
239 buffer_ = next_buffer;
240 return TransformYield<CSVBlock>(CSVBlock{empty, empty, empty, block_index_++,
241 is_final, bytes_skipped,
242 [](int64_t) { return Status::OK(); }});
243 }
244 partial_ = std::move(empty);
245 }
246
247 std::shared_ptr<Buffer> completion;
248
249 if (is_final) {
250 // End of file reached => compute completion from penultimate block
251 RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &buffer_));
252 } else {
253 // Get completion of partial from previous block.
254 RETURN_NOT_OK(
255 chunker_->ProcessWithPartial(partial_, buffer_, &completion, &buffer_));
256 }
257 int64_t bytes_before_buffer = partial_->size() + completion->size();
258
259 auto consume_bytes = [this, bytes_before_buffer,
260 next_buffer](int64_t nbytes) -> Status {
261 DCHECK_GE(nbytes, 0);
262 auto offset = nbytes - bytes_before_buffer;
263 if (offset < 0) {
264 // Should not happen
265 return Status::Invalid("CSV parser got out of sync with chunker");
266 }
267 partial_ = SliceBuffer(buffer_, offset);
268 buffer_ = next_buffer;
269 return Status::OK();
270 };
271
272 return TransformYield<CSVBlock>(CSVBlock{partial_, completion, buffer_,
273 block_index_++, is_final, bytes_skipped,
274 std::move(consume_bytes)});
275 }
276 };
277
278 // An object that reads delimited CSV blocks for threaded use.
279 class ThreadedBlockReader : public BlockReader {
280 public:
281 using BlockReader::BlockReader;
282
283 static AsyncGenerator<CSVBlock> MakeAsyncIterator(
284 AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
285 std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer,
286 int64_t skip_rows) {
287 auto block_reader = std::make_shared<ThreadedBlockReader>(std::move(chunker),
288 first_buffer, skip_rows);
289 // Wrap shared pointer in callable
290 Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
291 [block_reader](std::shared_ptr<Buffer> next) { return (*block_reader)(next); };
292 return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn);
293 }
294
295 Result<TransformFlow<CSVBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
296 if (buffer_ == nullptr) {
297 // EOF
298 return TransformFinish();
299 }
300
301 bool is_final = (next_buffer == nullptr);
302
303 auto current_partial = std::move(partial_);
304 auto current_buffer = std::move(buffer_);
305 int64_t bytes_skipped = 0;
306
307 if (skip_rows_) {
308 auto orig_size = current_buffer->size();
309 bytes_skipped = current_partial->size();
310 RETURN_NOT_OK(chunker_->ProcessSkip(current_partial, current_buffer, is_final,
311 &skip_rows_, &current_buffer));
312 bytes_skipped += orig_size - current_buffer->size();
313 current_partial = std::make_shared<Buffer>(nullptr, 0);
314 if (skip_rows_) {
315 partial_ = std::move(current_buffer);
316 buffer_ = std::move(next_buffer);
317 return TransformYield<CSVBlock>(CSVBlock{current_partial,
318 current_partial,
319 current_partial,
320 block_index_++,
321 is_final,
322 bytes_skipped,
323 {}});
324 }
325 }
326
327 std::shared_ptr<Buffer> whole, completion, next_partial;
328
329 if (is_final) {
330 // End of file reached => compute completion from penultimate block
331 RETURN_NOT_OK(
332 chunker_->ProcessFinal(current_partial, current_buffer, &completion, &whole));
333 } else {
334 // Get completion of partial from previous block.
335 std::shared_ptr<Buffer> starts_with_whole;
336 // Get completion of partial from previous block.
337 RETURN_NOT_OK(chunker_->ProcessWithPartial(current_partial, current_buffer,
338 &completion, &starts_with_whole));
339
340 // Get a complete CSV block inside `partial + block`, and keep
341 // the rest for the next iteration.
342 RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
343 }
344
345 partial_ = std::move(next_partial);
346 buffer_ = std::move(next_buffer);
347
348 return TransformYield<CSVBlock>(CSVBlock{
349 current_partial, completion, whole, block_index_++, is_final, bytes_skipped, {}});
350 }
351 };
352
353 struct ParsedBlock {
354 std::shared_ptr<BlockParser> parser;
355 int64_t block_index;
356 int64_t bytes_parsed_or_skipped;
357 };
358
359 struct DecodedBlock {
360 std::shared_ptr<RecordBatch> record_batch;
361 // Represents the number of input bytes represented by this batch
362 // This will include bytes skipped when skipping rows after the header
363 int64_t bytes_processed;
364 };
365
366 } // namespace
367
368 } // namespace csv
369
370 template <>
371 struct IterationTraits<csv::ParsedBlock> {
372 static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
373 static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; }
374 };
375
376 template <>
377 struct IterationTraits<csv::DecodedBlock> {
378 static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; }
379 static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; }
380 };
381
382 namespace csv {
383 namespace {
384
385 // A function object that takes in a buffer of CSV data and returns a parsed batch of CSV
386 // data (CSVBlock -> ParsedBlock) for use with MakeMappedGenerator.
387 // The parsed batch contains a list of offsets for each of the columns so that columns
388 // can be individually scanned
389 //
390 // This operator is not re-entrant
391 class BlockParsingOperator {
392 public:
393 BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
394 int num_csv_cols, int64_t first_row)
395 : io_context_(io_context),
396 parse_options_(parse_options),
397 num_csv_cols_(num_csv_cols),
398 count_rows_(first_row >= 0),
399 num_rows_seen_(first_row) {}
400
401 Result<ParsedBlock> operator()(const CSVBlock& block) {
402 constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
403 auto parser = std::make_shared<BlockParser>(
404 io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
405
406 std::shared_ptr<Buffer> straddling;
407 std::vector<util::string_view> views;
408 if (block.partial->size() != 0 || block.completion->size() != 0) {
409 if (block.partial->size() == 0) {
410 straddling = block.completion;
411 } else if (block.completion->size() == 0) {
412 straddling = block.partial;
413 } else {
414 ARROW_ASSIGN_OR_RAISE(
415 straddling,
416 ConcatenateBuffers({block.partial, block.completion}, io_context_.pool()));
417 }
418 views = {util::string_view(*straddling), util::string_view(*block.buffer)};
419 } else {
420 views = {util::string_view(*block.buffer)};
421 }
422 uint32_t parsed_size;
423 if (block.is_final) {
424 RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
425 } else {
426 RETURN_NOT_OK(parser->Parse(views, &parsed_size));
427 }
428 if (count_rows_) {
429 num_rows_seen_ += parser->total_num_rows();
430 }
431 RETURN_NOT_OK(block.consume_bytes(parsed_size));
432 return ParsedBlock{std::move(parser), block.block_index,
433 static_cast<int64_t>(parsed_size) + block.bytes_skipped};
434 }
435
436 private:
437 io::IOContext io_context_;
438 ParseOptions parse_options_;
439 int num_csv_cols_;
440 bool count_rows_;
441 int64_t num_rows_seen_;
442 };
443
444 // A function object that takes in parsed batch of CSV data and decodes it to an arrow
445 // record batch (ParsedBlock -> DecodedBlock) for use with MakeMappedGenerator.
446 class BlockDecodingOperator {
447 public:
448 Future<DecodedBlock> operator()(const ParsedBlock& block) {
449 DCHECK(!state_->column_decoders.empty());
450 std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs;
451 for (auto& decoder : state_->column_decoders) {
452 decoded_array_futs.push_back(decoder->Decode(block.parser));
453 }
454 auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped;
455 auto decoded_arrays_fut = All(std::move(decoded_array_futs));
456 auto state = state_;
457 return decoded_arrays_fut.Then(
458 [state, bytes_parsed_or_skipped](
459 const std::vector<Result<std::shared_ptr<Array>>>& maybe_decoded_arrays)
460 -> Result<DecodedBlock> {
461 ARROW_ASSIGN_OR_RAISE(auto decoded_arrays,
462 internal::UnwrapOrRaise(maybe_decoded_arrays));
463
464 ARROW_ASSIGN_OR_RAISE(auto batch,
465 state->DecodedArraysToBatch(std::move(decoded_arrays)));
466 return DecodedBlock{std::move(batch), bytes_parsed_or_skipped};
467 });
468 }
469
470 static Result<BlockDecodingOperator> Make(io::IOContext io_context,
471 ConvertOptions convert_options,
472 ConversionSchema conversion_schema) {
473 BlockDecodingOperator op(std::move(io_context), std::move(convert_options),
474 std::move(conversion_schema));
475 RETURN_NOT_OK(op.state_->MakeColumnDecoders(io_context));
476 return op;
477 }
478
479 private:
480 BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options,
481 ConversionSchema conversion_schema)
482 : state_(std::make_shared<State>(std::move(io_context), std::move(convert_options),
483 std::move(conversion_schema))) {}
484
485 struct State {
486 State(io::IOContext io_context, ConvertOptions convert_options,
487 ConversionSchema conversion_schema)
488 : convert_options(std::move(convert_options)),
489 conversion_schema(std::move(conversion_schema)) {}
490
491 Result<std::shared_ptr<RecordBatch>> DecodedArraysToBatch(
492 std::vector<std::shared_ptr<Array>> arrays) {
493 const auto n_rows = arrays[0]->length();
494
495 if (schema == nullptr) {
496 FieldVector fields(arrays.size());
497 for (size_t i = 0; i < arrays.size(); ++i) {
498 fields[i] = field(conversion_schema.columns[i].name, arrays[i]->type());
499 }
500
501 if (n_rows == 0) {
502 // No rows so schema is not reliable. return RecordBatch but do not set schema
503 return RecordBatch::Make(arrow::schema(std::move(fields)), n_rows,
504 std::move(arrays));
505 }
506
507 schema = arrow::schema(std::move(fields));
508 }
509
510 return RecordBatch::Make(schema, n_rows, std::move(arrays));
511 }
512
513 // Make column decoders from conversion schema
514 Status MakeColumnDecoders(io::IOContext io_context) {
515 for (const auto& column : conversion_schema.columns) {
516 std::shared_ptr<ColumnDecoder> decoder;
517 if (column.is_missing) {
518 ARROW_ASSIGN_OR_RAISE(decoder,
519 ColumnDecoder::MakeNull(io_context.pool(), column.type));
520 } else if (column.type != nullptr) {
521 ARROW_ASSIGN_OR_RAISE(
522 decoder, ColumnDecoder::Make(io_context.pool(), column.type, column.index,
523 convert_options));
524 } else {
525 ARROW_ASSIGN_OR_RAISE(
526 decoder,
527 ColumnDecoder::Make(io_context.pool(), column.index, convert_options));
528 }
529 column_decoders.push_back(std::move(decoder));
530 }
531 return Status::OK();
532 }
533
534 ConvertOptions convert_options;
535 ConversionSchema conversion_schema;
536 std::vector<std::shared_ptr<ColumnDecoder>> column_decoders;
537 std::shared_ptr<Schema> schema;
538 };
539
540 std::shared_ptr<State> state_;
541 };
542
543 /////////////////////////////////////////////////////////////////////////
544 // Base class for common functionality
545
546 class ReaderMixin {
547 public:
548 ReaderMixin(io::IOContext io_context, std::shared_ptr<io::InputStream> input,
549 const ReadOptions& read_options, const ParseOptions& parse_options,
550 const ConvertOptions& convert_options, bool count_rows)
551 : io_context_(std::move(io_context)),
552 read_options_(read_options),
553 parse_options_(parse_options),
554 convert_options_(convert_options),
555 count_rows_(count_rows),
556 num_rows_seen_(count_rows_ ? 1 : -1),
557 input_(std::move(input)) {}
558
559 protected:
560 // Read header and column names from buffer, create column builders
561 // Returns the # of bytes consumed
562 Result<int64_t> ProcessHeader(const std::shared_ptr<Buffer>& buf,
563 std::shared_ptr<Buffer>* rest) {
564 const uint8_t* data = buf->data();
565 const auto data_end = data + buf->size();
566 DCHECK_GT(data_end - data, 0);
567
568 if (read_options_.skip_rows) {
569 // Skip initial rows (potentially invalid CSV data)
570 auto num_skipped_rows = SkipRows(data, static_cast<uint32_t>(data_end - data),
571 read_options_.skip_rows, &data);
572 if (num_skipped_rows < read_options_.skip_rows) {
573 return Status::Invalid(
574 "Could not skip initial ", read_options_.skip_rows,
575 " rows from CSV file, "
576 "either file is too short or header is larger than block size");
577 }
578 if (count_rows_) {
579 num_rows_seen_ += num_skipped_rows;
580 }
581 }
582
583 if (read_options_.column_names.empty()) {
584 // Parse one row (either to read column names or to know the number of columns)
585 BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_,
586 num_rows_seen_, 1);
587 uint32_t parsed_size = 0;
588 RETURN_NOT_OK(parser.Parse(
589 util::string_view(reinterpret_cast<const char*>(data), data_end - data),
590 &parsed_size));
591 if (parser.num_rows() != 1) {
592 return Status::Invalid(
593 "Could not read first row from CSV file, either "
594 "file is too short or header is larger than block size");
595 }
596 if (parser.num_cols() == 0) {
597 return Status::Invalid("No columns in CSV file");
598 }
599
600 if (read_options_.autogenerate_column_names) {
601 column_names_ = GenerateColumnNames(parser.num_cols());
602 } else {
603 // Read column names from header row
604 auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status {
605 column_names_.emplace_back(reinterpret_cast<const char*>(data), size);
606 return Status::OK();
607 };
608 RETURN_NOT_OK(parser.VisitLastRow(visit));
609 DCHECK_EQ(static_cast<size_t>(parser.num_cols()), column_names_.size());
610 // Skip parsed header row
611 data += parsed_size;
612 if (count_rows_) {
613 ++num_rows_seen_;
614 }
615 }
616 } else {
617 column_names_ = read_options_.column_names;
618 }
619
620 if (count_rows_) {
621 // increase rows seen to skip past rows which will be skipped
622 num_rows_seen_ += read_options_.skip_rows_after_names;
623 }
624
625 auto bytes_consumed = data - buf->data();
626 *rest = SliceBuffer(buf, bytes_consumed);
627
628 num_csv_cols_ = static_cast<int32_t>(column_names_.size());
629 DCHECK_GT(num_csv_cols_, 0);
630
631 RETURN_NOT_OK(MakeConversionSchema());
632 return bytes_consumed;
633 }
634
635 std::vector<std::string> GenerateColumnNames(int32_t num_cols) {
636 std::vector<std::string> res;
637 res.reserve(num_cols);
638 for (int32_t i = 0; i < num_cols; ++i) {
639 std::stringstream ss;
640 ss << "f" << i;
641 res.push_back(ss.str());
642 }
643 return res;
644 }
645
646 // Make conversion schema from options and parsed CSV header
647 Status MakeConversionSchema() {
648 // Append a column converted from CSV data
649 auto append_csv_column = [&](std::string col_name, int32_t col_index) {
650 // Does the named column have a fixed type?
651 auto it = convert_options_.column_types.find(col_name);
652 if (it == convert_options_.column_types.end()) {
653 conversion_schema_.columns.push_back(
654 ConversionSchema::InferredColumn(std::move(col_name), col_index));
655 } else {
656 conversion_schema_.columns.push_back(
657 ConversionSchema::TypedColumn(std::move(col_name), col_index, it->second));
658 }
659 };
660
661 // Append a column of nulls
662 auto append_null_column = [&](std::string col_name) {
663 // If the named column has a fixed type, use it, otherwise use null()
664 std::shared_ptr<DataType> type;
665 auto it = convert_options_.column_types.find(col_name);
666 if (it == convert_options_.column_types.end()) {
667 type = null();
668 } else {
669 type = it->second;
670 }
671 conversion_schema_.columns.push_back(
672 ConversionSchema::NullColumn(std::move(col_name), std::move(type)));
673 };
674
675 if (convert_options_.include_columns.empty()) {
676 // Include all columns in CSV file order
677 for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) {
678 append_csv_column(column_names_[col_index], col_index);
679 }
680 } else {
681 // Include columns from `include_columns` (in that order)
682 // Compute indices of columns in the CSV file
683 std::unordered_map<std::string, int32_t> col_indices;
684 col_indices.reserve(column_names_.size());
685 for (int32_t i = 0; i < static_cast<int32_t>(column_names_.size()); ++i) {
686 col_indices.emplace(column_names_[i], i);
687 }
688
689 for (const auto& col_name : convert_options_.include_columns) {
690 auto it = col_indices.find(col_name);
691 if (it != col_indices.end()) {
692 append_csv_column(col_name, it->second);
693 } else if (convert_options_.include_missing_columns) {
694 append_null_column(col_name);
695 } else {
696 return Status::KeyError("Column '", col_name,
697 "' in include_columns "
698 "does not exist in CSV file");
699 }
700 }
701 }
702 return Status::OK();
703 }
704
705 struct ParseResult {
706 std::shared_ptr<BlockParser> parser;
707 int64_t parsed_bytes;
708 };
709
710 Result<ParseResult> Parse(const std::shared_ptr<Buffer>& partial,
711 const std::shared_ptr<Buffer>& completion,
712 const std::shared_ptr<Buffer>& block, int64_t block_index,
713 bool is_final) {
714 static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
715 auto parser = std::make_shared<BlockParser>(
716 io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
717
718 std::shared_ptr<Buffer> straddling;
719 std::vector<util::string_view> views;
720 if (partial->size() != 0 || completion->size() != 0) {
721 if (partial->size() == 0) {
722 straddling = completion;
723 } else if (completion->size() == 0) {
724 straddling = partial;
725 } else {
726 ARROW_ASSIGN_OR_RAISE(
727 straddling, ConcatenateBuffers({partial, completion}, io_context_.pool()));
728 }
729 views = {util::string_view(*straddling), util::string_view(*block)};
730 } else {
731 views = {util::string_view(*block)};
732 }
733 uint32_t parsed_size;
734 if (is_final) {
735 RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
736 } else {
737 RETURN_NOT_OK(parser->Parse(views, &parsed_size));
738 }
739 if (count_rows_) {
740 num_rows_seen_ += parser->total_num_rows();
741 }
742 return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)};
743 }
744
745 io::IOContext io_context_;
746 ReadOptions read_options_;
747 ParseOptions parse_options_;
748 ConvertOptions convert_options_;
749
750 // Number of columns in the CSV file
751 int32_t num_csv_cols_ = -1;
752 // Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed
753 bool count_rows_;
754 // Number of rows seen in the csv. Not used if count_rows is false
755 int64_t num_rows_seen_;
756 // Column names in the CSV file
757 std::vector<std::string> column_names_;
758 ConversionSchema conversion_schema_;
759
760 std::shared_ptr<io::InputStream> input_;
761 std::shared_ptr<internal::TaskGroup> task_group_;
762 };
763
764 /////////////////////////////////////////////////////////////////////////
765 // Base class for one-shot table readers
766
767 class BaseTableReader : public ReaderMixin, public csv::TableReader {
768 public:
769 using ReaderMixin::ReaderMixin;
770
771 virtual Status Init() = 0;
772
773 Future<std::shared_ptr<Table>> ReadAsync() override {
774 return Future<std::shared_ptr<Table>>::MakeFinished(Read());
775 }
776
777 protected:
778 // Make column builders from conversion schema
779 Status MakeColumnBuilders() {
780 for (const auto& column : conversion_schema_.columns) {
781 std::shared_ptr<ColumnBuilder> builder;
782 if (column.is_missing) {
783 ARROW_ASSIGN_OR_RAISE(builder, ColumnBuilder::MakeNull(io_context_.pool(),
784 column.type, task_group_));
785 } else if (column.type != nullptr) {
786 ARROW_ASSIGN_OR_RAISE(
787 builder, ColumnBuilder::Make(io_context_.pool(), column.type, column.index,
788 convert_options_, task_group_));
789 } else {
790 ARROW_ASSIGN_OR_RAISE(builder,
791 ColumnBuilder::Make(io_context_.pool(), column.index,
792 convert_options_, task_group_));
793 }
794 column_builders_.push_back(std::move(builder));
795 }
796 return Status::OK();
797 }
798
799 Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial,
800 const std::shared_ptr<Buffer>& completion,
801 const std::shared_ptr<Buffer>& block,
802 int64_t block_index, bool is_final) {
803 ARROW_ASSIGN_OR_RAISE(auto result,
804 Parse(partial, completion, block, block_index, is_final));
805 RETURN_NOT_OK(ProcessData(result.parser, block_index));
806 return result.parsed_bytes;
807 }
808
809 // Trigger conversion of parsed block data
810 Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) {
811 for (auto& builder : column_builders_) {
812 builder->Insert(block_index, parser);
813 }
814 return Status::OK();
815 }
816
817 Result<std::shared_ptr<Table>> MakeTable() {
818 DCHECK_EQ(column_builders_.size(), conversion_schema_.columns.size());
819
820 std::vector<std::shared_ptr<Field>> fields;
821 std::vector<std::shared_ptr<ChunkedArray>> columns;
822
823 for (int32_t i = 0; i < static_cast<int32_t>(column_builders_.size()); ++i) {
824 const auto& column = conversion_schema_.columns[i];
825 ARROW_ASSIGN_OR_RAISE(auto array, column_builders_[i]->Finish());
826 fields.push_back(::arrow::field(column.name, array->type()));
827 columns.emplace_back(std::move(array));
828 }
829 return Table::Make(schema(std::move(fields)), std::move(columns));
830 }
831
832 // Column builders for target Table (in ConversionSchema order)
833 std::vector<std::shared_ptr<ColumnBuilder>> column_builders_;
834 };
835
836 /////////////////////////////////////////////////////////////////////////
837 // Base class for streaming readers
838
839 class StreamingReaderImpl : public ReaderMixin,
840 public csv::StreamingReader,
841 public std::enable_shared_from_this<StreamingReaderImpl> {
842 public:
843 StreamingReaderImpl(io::IOContext io_context, std::shared_ptr<io::InputStream> input,
844 const ReadOptions& read_options, const ParseOptions& parse_options,
845 const ConvertOptions& convert_options, bool count_rows)
846 : ReaderMixin(io_context, std::move(input), read_options, parse_options,
847 convert_options, count_rows),
848 bytes_decoded_(std::make_shared<std::atomic<int64_t>>(0)) {}
849
850 Future<> Init(Executor* cpu_executor) {
851 ARROW_ASSIGN_OR_RAISE(auto istream_it,
852 io::MakeInputStreamIterator(input_, read_options_.block_size));
853
854 // TODO Consider exposing readahead as a read option (ARROW-12090)
855 ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
856 io_context_.executor()));
857
858 auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor);
859
860 auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it));
861
862 int max_readahead = cpu_executor->GetCapacity();
863 auto self = shared_from_this();
864
865 return buffer_generator().Then([self, buffer_generator, max_readahead](
866 const std::shared_ptr<Buffer>& first_buffer) {
867 return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead);
868 });
869 }
870
871 std::shared_ptr<Schema> schema() const override { return schema_; }
872
873 int64_t bytes_read() const override { return bytes_decoded_->load(); }
874
875 Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
876 auto next_fut = ReadNextAsync();
877 auto next_result = next_fut.result();
878 return std::move(next_result).Value(batch);
879 }
880
881 Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
882 return record_batch_gen_();
883 }
884
885 protected:
886 Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer,
887 AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
888 int max_readahead) {
889 if (first_buffer == nullptr) {
890 return Status::Invalid("Empty CSV file");
891 }
892
893 std::shared_ptr<Buffer> after_header;
894 ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed,
895 ProcessHeader(first_buffer, &after_header));
896 bytes_decoded_->fetch_add(header_bytes_consumed);
897
898 auto parser_op =
899 BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, num_rows_seen_);
900 ARROW_ASSIGN_OR_RAISE(
901 auto decoder_op,
902 BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_));
903
904 auto block_gen = SerialBlockReader::MakeAsyncIterator(
905 std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header),
906 read_options_.skip_rows_after_names);
907 auto parsed_block_gen =
908 MakeMappedGenerator(std::move(block_gen), std::move(parser_op));
909 auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op));
910
911 auto self = shared_from_this();
912 return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) {
913 return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0);
914 });
915 }
916
917 Future<> InitFromBlock(const DecodedBlock& block,
918 AsyncGenerator<DecodedBlock> batch_gen, int max_readahead,
919 int64_t prev_bytes_processed) {
920 if (!block.record_batch) {
921 // End of file just return null batches
922 record_batch_gen_ = MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
923 return Status::OK();
924 }
925
926 schema_ = block.record_batch->schema();
927
928 if (block.record_batch->num_rows() == 0) {
929 // Keep consuming blocks until the first non empty block is found
930 auto self = shared_from_this();
931 prev_bytes_processed += block.bytes_processed;
932 return batch_gen().Then([self, batch_gen, max_readahead,
933 prev_bytes_processed](const DecodedBlock& next_block) {
934 return self->InitFromBlock(next_block, std::move(batch_gen), max_readahead,
935 prev_bytes_processed);
936 });
937 }
938
939 AsyncGenerator<DecodedBlock> readahead_gen;
940 if (read_options_.use_threads) {
941 readahead_gen = MakeReadaheadGenerator(std::move(batch_gen), max_readahead);
942 } else {
943 readahead_gen = std::move(batch_gen);
944 }
945
946 AsyncGenerator<DecodedBlock> restarted_gen =
947 MakeGeneratorStartsWith({block}, std::move(readahead_gen));
948
949 auto bytes_decoded = bytes_decoded_;
950 auto unwrap_and_record_bytes =
951 [bytes_decoded, prev_bytes_processed](
952 const DecodedBlock& block) mutable -> Result<std::shared_ptr<RecordBatch>> {
953 bytes_decoded->fetch_add(block.bytes_processed + prev_bytes_processed);
954 prev_bytes_processed = 0;
955 return block.record_batch;
956 };
957
958 auto unwrapped =
959 MakeMappedGenerator(std::move(restarted_gen), std::move(unwrap_and_record_bytes));
960
961 record_batch_gen_ = MakeCancellable(std::move(unwrapped), io_context_.stop_token());
962 return Status::OK();
963 }
964
965 std::shared_ptr<Schema> schema_;
966 AsyncGenerator<std::shared_ptr<RecordBatch>> record_batch_gen_;
967 // bytes which have been decoded and asked for by the caller
968 std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
969 };
970
971 /////////////////////////////////////////////////////////////////////////
972 // Serial TableReader implementation
973
974 class SerialTableReader : public BaseTableReader {
975 public:
976 using BaseTableReader::BaseTableReader;
977
978 Status Init() override {
979 ARROW_ASSIGN_OR_RAISE(auto istream_it,
980 io::MakeInputStreamIterator(input_, read_options_.block_size));
981
982 // Since we're converting serially, no need to readahead more than one block
983 int32_t block_queue_size = 1;
984 ARROW_ASSIGN_OR_RAISE(auto rh_it,
985 MakeReadaheadIterator(std::move(istream_it), block_queue_size));
986 buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
987 return Status::OK();
988 }
989
990 Result<std::shared_ptr<Table>> Read() override {
991 task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
992
993 // First block
994 ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next());
995 if (first_buffer == nullptr) {
996 return Status::Invalid("Empty CSV file");
997 }
998 RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer));
999 RETURN_NOT_OK(MakeColumnBuilders());
1000
1001 auto block_iterator = SerialBlockReader::MakeIterator(
1002 std::move(buffer_iterator_), MakeChunker(parse_options_), std::move(first_buffer),
1003 read_options_.skip_rows_after_names);
1004 while (true) {
1005 RETURN_NOT_OK(io_context_.stop_token().Poll());
1006
1007 ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator.Next());
1008 if (IsIterationEnd(maybe_block)) {
1009 // EOF
1010 break;
1011 }
1012 ARROW_ASSIGN_OR_RAISE(
1013 int64_t parsed_bytes,
1014 ParseAndInsert(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
1015 maybe_block.block_index, maybe_block.is_final));
1016 RETURN_NOT_OK(maybe_block.consume_bytes(parsed_bytes));
1017 }
1018 // Finish conversion, create schema and table
1019 RETURN_NOT_OK(task_group_->Finish());
1020 return MakeTable();
1021 }
1022
1023 protected:
1024 Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
1025 };
1026
1027 class AsyncThreadedTableReader
1028 : public BaseTableReader,
1029 public std::enable_shared_from_this<AsyncThreadedTableReader> {
1030 public:
1031 using BaseTableReader::BaseTableReader;
1032
1033 AsyncThreadedTableReader(io::IOContext io_context,
1034 std::shared_ptr<io::InputStream> input,
1035 const ReadOptions& read_options,
1036 const ParseOptions& parse_options,
1037 const ConvertOptions& convert_options, Executor* cpu_executor)
1038 // Count rows is currently not supported during parallel read
1039 : BaseTableReader(std::move(io_context), input, read_options, parse_options,
1040 convert_options, /*count_rows=*/false),
1041 cpu_executor_(cpu_executor) {}
1042
1043 ~AsyncThreadedTableReader() override {
1044 if (task_group_) {
1045 // In case of error, make sure all pending tasks are finished before
1046 // we start destroying BaseTableReader members
1047 ARROW_UNUSED(task_group_->Finish());
1048 }
1049 }
1050
1051 Status Init() override {
1052 ARROW_ASSIGN_OR_RAISE(auto istream_it,
1053 io::MakeInputStreamIterator(input_, read_options_.block_size));
1054
1055 int max_readahead = cpu_executor_->GetCapacity();
1056 int readahead_restart = std::max(1, max_readahead / 2);
1057
1058 ARROW_ASSIGN_OR_RAISE(
1059 auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor(),
1060 max_readahead, readahead_restart));
1061
1062 auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
1063 buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it));
1064 return Status::OK();
1065 }
1066
1067 Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); }
1068
1069 Future<std::shared_ptr<Table>> ReadAsync() override {
1070 task_group_ =
1071 internal::TaskGroup::MakeThreaded(cpu_executor_, io_context_.stop_token());
1072
1073 auto self = shared_from_this();
1074 return ProcessFirstBuffer().Then([self](const std::shared_ptr<Buffer>& first_buffer) {
1075 auto block_generator = ThreadedBlockReader::MakeAsyncIterator(
1076 self->buffer_generator_, MakeChunker(self->parse_options_),
1077 std::move(first_buffer), self->read_options_.skip_rows_after_names);
1078
1079 std::function<Status(CSVBlock)> block_visitor =
1080 [self](CSVBlock maybe_block) -> Status {
1081 // The logic in VisitAsyncGenerator ensures that we will never be
1082 // passed an empty block (visit does not call with the end token) so
1083 // we can be assured maybe_block has a value.
1084 DCHECK_GE(maybe_block.block_index, 0);
1085 DCHECK(!maybe_block.consume_bytes);
1086
1087 // Launch parse task
1088 self->task_group_->Append([self, maybe_block] {
1089 return self
1090 ->ParseAndInsert(maybe_block.partial, maybe_block.completion,
1091 maybe_block.buffer, maybe_block.block_index,
1092 maybe_block.is_final)
1093 .status();
1094 });
1095 return Status::OK();
1096 };
1097
1098 return VisitAsyncGenerator(std::move(block_generator), block_visitor)
1099 .Then([self]() -> Future<> {
1100 // By this point we've added all top level tasks so it is safe to call
1101 // FinishAsync
1102 return self->task_group_->FinishAsync();
1103 })
1104 .Then([self]() -> Result<std::shared_ptr<Table>> {
1105 // Finish conversion, create schema and table
1106 return self->MakeTable();
1107 });
1108 });
1109 }
1110
1111 protected:
1112 Future<std::shared_ptr<Buffer>> ProcessFirstBuffer() {
1113 // First block
1114 auto first_buffer_future = buffer_generator_();
1115 return first_buffer_future.Then([this](const std::shared_ptr<Buffer>& first_buffer)
1116 -> Result<std::shared_ptr<Buffer>> {
1117 if (first_buffer == nullptr) {
1118 return Status::Invalid("Empty CSV file");
1119 }
1120 std::shared_ptr<Buffer> first_buffer_processed;
1121 RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer_processed));
1122 RETURN_NOT_OK(MakeColumnBuilders());
1123 return first_buffer_processed;
1124 });
1125 }
1126
1127 Executor* cpu_executor_;
1128 AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
1129 };
1130
1131 Result<std::shared_ptr<TableReader>> MakeTableReader(
1132 MemoryPool* pool, io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1133 const ReadOptions& read_options, const ParseOptions& parse_options,
1134 const ConvertOptions& convert_options) {
1135 RETURN_NOT_OK(parse_options.Validate());
1136 RETURN_NOT_OK(read_options.Validate());
1137 RETURN_NOT_OK(convert_options.Validate());
1138 std::shared_ptr<BaseTableReader> reader;
1139 if (read_options.use_threads) {
1140 auto cpu_executor = internal::GetCpuThreadPool();
1141 reader = std::make_shared<AsyncThreadedTableReader>(
1142 io_context, input, read_options, parse_options, convert_options, cpu_executor);
1143 } else {
1144 reader = std::make_shared<SerialTableReader>(io_context, input, read_options,
1145 parse_options, convert_options,
1146 /*count_rows=*/true);
1147 }
1148 RETURN_NOT_OK(reader->Init());
1149 return reader;
1150 }
1151
1152 Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
1153 io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1154 internal::Executor* cpu_executor, const ReadOptions& read_options,
1155 const ParseOptions& parse_options, const ConvertOptions& convert_options) {
1156 RETURN_NOT_OK(parse_options.Validate());
1157 RETURN_NOT_OK(read_options.Validate());
1158 RETURN_NOT_OK(convert_options.Validate());
1159 std::shared_ptr<StreamingReaderImpl> reader;
1160 reader = std::make_shared<StreamingReaderImpl>(
1161 io_context, input, read_options, parse_options, convert_options,
1162 /*count_rows=*/!read_options.use_threads || cpu_executor->GetCapacity() == 1);
1163 return reader->Init(cpu_executor).Then([reader] {
1164 return std::dynamic_pointer_cast<StreamingReader>(reader);
1165 });
1166 }
1167
1168 /////////////////////////////////////////////////////////////////////////
1169 // Row count implementation
1170
1171 class CSVRowCounter : public ReaderMixin,
1172 public std::enable_shared_from_this<CSVRowCounter> {
1173 public:
1174 CSVRowCounter(io::IOContext io_context, Executor* cpu_executor,
1175 std::shared_ptr<io::InputStream> input, const ReadOptions& read_options,
1176 const ParseOptions& parse_options)
1177 : ReaderMixin(io_context, std::move(input), read_options, parse_options,
1178 ConvertOptions::Defaults(), /*count_rows=*/true),
1179 cpu_executor_(cpu_executor),
1180 row_count_(0) {}
1181
1182 Future<int64_t> Count() {
1183 auto self = shared_from_this();
1184 return Init(self).Then([self]() { return self->DoCount(self); });
1185 }
1186
1187 private:
1188 Future<> Init(const std::shared_ptr<CSVRowCounter>& self) {
1189 ARROW_ASSIGN_OR_RAISE(auto istream_it,
1190 io::MakeInputStreamIterator(input_, read_options_.block_size));
1191 // TODO Consider exposing readahead as a read option (ARROW-12090)
1192 ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
1193 io_context_.executor()));
1194 auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
1195 auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it));
1196
1197 return buffer_generator().Then(
1198 [self, buffer_generator](std::shared_ptr<Buffer> first_buffer) {
1199 if (!first_buffer) {
1200 return Status::Invalid("Empty CSV file");
1201 }
1202 RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer));
1203 self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
1204 buffer_generator, MakeChunker(self->parse_options_),
1205 std::move(first_buffer), 0);
1206 return Status::OK();
1207 });
1208 }
1209
1210 Future<int64_t> DoCount(const std::shared_ptr<CSVRowCounter>& self) {
1211 // count_cb must return a value instead of Status/Future<> to work with
1212 // MakeMappedGenerator, and it must use a type with a valid end value to work with
1213 // IterationEnd.
1214 std::function<Result<util::optional<int64_t>>(const CSVBlock&)> count_cb =
1215 [self](const CSVBlock& maybe_block) -> Result<util::optional<int64_t>> {
1216 ARROW_ASSIGN_OR_RAISE(
1217 auto parser,
1218 self->Parse(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
1219 maybe_block.block_index, maybe_block.is_final));
1220 RETURN_NOT_OK(maybe_block.consume_bytes(parser.parsed_bytes));
1221 int32_t total_row_count = parser.parser->total_num_rows();
1222 self->row_count_ += total_row_count;
1223 return total_row_count;
1224 };
1225 auto count_gen = MakeMappedGenerator(block_generator_, std::move(count_cb));
1226 return DiscardAllFromAsyncGenerator(count_gen).Then(
1227 [self]() { return self->row_count_; });
1228 }
1229
1230 Executor* cpu_executor_;
1231 AsyncGenerator<CSVBlock> block_generator_;
1232 int64_t row_count_;
1233 };
1234
1235 } // namespace
1236
1237 /////////////////////////////////////////////////////////////////////////
1238 // Factory functions
1239
1240 Result<std::shared_ptr<TableReader>> TableReader::Make(
1241 io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1242 const ReadOptions& read_options, const ParseOptions& parse_options,
1243 const ConvertOptions& convert_options) {
1244 return MakeTableReader(io_context.pool(), io_context, std::move(input), read_options,
1245 parse_options, convert_options);
1246 }
1247
1248 Result<std::shared_ptr<TableReader>> TableReader::Make(
1249 MemoryPool* pool, io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1250 const ReadOptions& read_options, const ParseOptions& parse_options,
1251 const ConvertOptions& convert_options) {
1252 return MakeTableReader(pool, io_context, std::move(input), read_options, parse_options,
1253 convert_options);
1254 }
1255
1256 Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
1257 MemoryPool* pool, std::shared_ptr<io::InputStream> input,
1258 const ReadOptions& read_options, const ParseOptions& parse_options,
1259 const ConvertOptions& convert_options) {
1260 auto io_context = io::IOContext(pool);
1261 auto cpu_executor = internal::GetCpuThreadPool();
1262 auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
1263 read_options, parse_options, convert_options);
1264 auto reader_result = reader_fut.result();
1265 ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
1266 return reader;
1267 }
1268
1269 Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
1270 io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1271 const ReadOptions& read_options, const ParseOptions& parse_options,
1272 const ConvertOptions& convert_options) {
1273 auto cpu_executor = internal::GetCpuThreadPool();
1274 auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
1275 read_options, parse_options, convert_options);
1276 auto reader_result = reader_fut.result();
1277 ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
1278 return reader;
1279 }
1280
1281 Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
1282 io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1283 internal::Executor* cpu_executor, const ReadOptions& read_options,
1284 const ParseOptions& parse_options, const ConvertOptions& convert_options) {
1285 return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
1286 parse_options, convert_options);
1287 }
1288
1289 Future<int64_t> CountRowsAsync(io::IOContext io_context,
1290 std::shared_ptr<io::InputStream> input,
1291 internal::Executor* cpu_executor,
1292 const ReadOptions& read_options,
1293 const ParseOptions& parse_options) {
1294 RETURN_NOT_OK(parse_options.Validate());
1295 RETURN_NOT_OK(read_options.Validate());
1296 auto counter = std::make_shared<CSVRowCounter>(
1297 io_context, cpu_executor, std::move(input), read_options, parse_options);
1298 return counter->Count();
1299 }
1300
1301 } // namespace csv
1302
1303 } // namespace arrow