1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/csv/reader.h"
27 #include <unordered_map>
31 #include "arrow/array.h"
32 #include "arrow/buffer.h"
33 #include "arrow/csv/chunker.h"
34 #include "arrow/csv/column_builder.h"
35 #include "arrow/csv/column_decoder.h"
36 #include "arrow/csv/options.h"
37 #include "arrow/csv/parser.h"
38 #include "arrow/io/interfaces.h"
39 #include "arrow/result.h"
40 #include "arrow/status.h"
41 #include "arrow/table.h"
42 #include "arrow/type.h"
43 #include "arrow/type_fwd.h"
44 #include "arrow/util/async_generator.h"
45 #include "arrow/util/future.h"
46 #include "arrow/util/iterator.h"
47 #include "arrow/util/logging.h"
48 #include "arrow/util/macros.h"
49 #include "arrow/util/optional.h"
50 #include "arrow/util/task_group.h"
51 #include "arrow/util/thread_pool.h"
52 #include "arrow/util/utf8.h"
53 #include "arrow/util/vector.h"
58 using internal::Executor
;
62 struct ConversionSchema
{
65 // Physical column index in CSV file
67 // If true, make a column of nulls
69 // If set, convert the CSV column to this type
70 // If unset (and is_missing is false), infer the type from the CSV column
71 std::shared_ptr
<DataType
> type
;
74 static Column
NullColumn(std::string col_name
, std::shared_ptr
<DataType
> type
) {
75 return Column
{std::move(col_name
), -1, true, std::move(type
)};
78 static Column
TypedColumn(std::string col_name
, int32_t col_index
,
79 std::shared_ptr
<DataType
> type
) {
80 return Column
{std::move(col_name
), col_index
, false, std::move(type
)};
83 static Column
InferredColumn(std::string col_name
, int32_t col_index
) {
84 return Column
{std::move(col_name
), col_index
, false, nullptr};
87 std::vector
<Column
> columns
;
90 // An iterator of Buffers that makes sure there is no straddling CRLF sequence.
91 class CSVBufferIterator
{
93 static Iterator
<std::shared_ptr
<Buffer
>> Make(
94 Iterator
<std::shared_ptr
<Buffer
>> buffer_iterator
) {
95 Transformer
<std::shared_ptr
<Buffer
>, std::shared_ptr
<Buffer
>> fn
=
97 return MakeTransformedIterator(std::move(buffer_iterator
), fn
);
100 static AsyncGenerator
<std::shared_ptr
<Buffer
>> MakeAsync(
101 AsyncGenerator
<std::shared_ptr
<Buffer
>> buffer_iterator
) {
102 Transformer
<std::shared_ptr
<Buffer
>, std::shared_ptr
<Buffer
>> fn
=
104 return MakeTransformedGenerator(std::move(buffer_iterator
), fn
);
107 Result
<TransformFlow
<std::shared_ptr
<Buffer
>>> operator()(std::shared_ptr
<Buffer
> buf
) {
108 if (buf
== nullptr) {
110 return TransformFinish();
115 ARROW_ASSIGN_OR_RAISE(auto data
, util::SkipUTF8BOM(buf
->data(), buf
->size()));
116 offset
+= data
- buf
->data();
117 DCHECK_GE(offset
, 0);
118 first_buffer_
= false;
121 if (trailing_cr_
&& buf
->data()[offset
] == '\n') {
122 // Skip '\r\n' line separator that started at the end of previous buffer
126 trailing_cr_
= (buf
->data()[buf
->size() - 1] == '\r');
127 buf
= SliceBuffer(buf
, offset
);
128 if (buf
->size() == 0) {
130 return TransformFinish();
132 return TransformYield(buf
);
137 bool first_buffer_
= true;
138 // Whether there was a trailing CR at the end of last received buffer
139 bool trailing_cr_
= false;
143 // (partial + completion + buffer) is an entire delimited CSV buffer.
144 std::shared_ptr
<Buffer
> partial
;
145 std::shared_ptr
<Buffer
> completion
;
146 std::shared_ptr
<Buffer
> buffer
;
149 int64_t bytes_skipped
;
150 std::function
<Status(int64_t)> consume_bytes
;
157 struct IterationTraits
<csv::CSVBlock
> {
158 static csv::CSVBlock
End() { return csv::CSVBlock
{{}, {}, {}, -1, true, 0, {}}; }
159 static bool IsEnd(const csv::CSVBlock
& val
) { return val
.block_index
< 0; }
165 // This is a callable that can be used to transform an iterator. The source iterator
166 // will contain buffers of data and the output iterator will contain delimited CSV
167 // blocks. util::optional is used so that there is an end token (required by the
168 // iterator APIs (e.g. Visit)) even though an empty optional is never used in this code.
171 BlockReader(std::unique_ptr
<Chunker
> chunker
, std::shared_ptr
<Buffer
> first_buffer
,
173 : chunker_(std::move(chunker
)),
174 partial_(std::make_shared
<Buffer
>("")),
175 buffer_(std::move(first_buffer
)),
176 skip_rows_(skip_rows
) {}
179 std::unique_ptr
<Chunker
> chunker_
;
180 std::shared_ptr
<Buffer
> partial_
, buffer_
;
182 int64_t block_index_
= 0;
183 // Whether there was a trailing CR at the end of last received buffer
184 bool trailing_cr_
= false;
187 // An object that reads delimited CSV blocks for serial use.
188 // The number of bytes consumed should be notified after each read,
189 // using CSVBlock::consume_bytes.
190 class SerialBlockReader
: public BlockReader
{
192 using BlockReader::BlockReader
;
194 static Iterator
<CSVBlock
> MakeIterator(
195 Iterator
<std::shared_ptr
<Buffer
>> buffer_iterator
, std::unique_ptr
<Chunker
> chunker
,
196 std::shared_ptr
<Buffer
> first_buffer
, int64_t skip_rows
) {
198 std::make_shared
<SerialBlockReader
>(std::move(chunker
), first_buffer
, skip_rows
);
199 // Wrap shared pointer in callable
200 Transformer
<std::shared_ptr
<Buffer
>, CSVBlock
> block_reader_fn
=
201 [block_reader
](std::shared_ptr
<Buffer
> buf
) {
202 return (*block_reader
)(std::move(buf
));
204 return MakeTransformedIterator(std::move(buffer_iterator
), block_reader_fn
);
207 static AsyncGenerator
<CSVBlock
> MakeAsyncIterator(
208 AsyncGenerator
<std::shared_ptr
<Buffer
>> buffer_generator
,
209 std::unique_ptr
<Chunker
> chunker
, std::shared_ptr
<Buffer
> first_buffer
,
212 std::make_shared
<SerialBlockReader
>(std::move(chunker
), first_buffer
, skip_rows
);
213 // Wrap shared pointer in callable
214 Transformer
<std::shared_ptr
<Buffer
>, CSVBlock
> block_reader_fn
=
215 [block_reader
](std::shared_ptr
<Buffer
> next
) {
216 return (*block_reader
)(std::move(next
));
218 return MakeTransformedGenerator(std::move(buffer_generator
), block_reader_fn
);
221 Result
<TransformFlow
<CSVBlock
>> operator()(std::shared_ptr
<Buffer
> next_buffer
) {
222 if (buffer_
== nullptr) {
223 return TransformFinish();
226 bool is_final
= (next_buffer
== nullptr);
227 int64_t bytes_skipped
= 0;
230 bytes_skipped
+= partial_
->size();
231 auto orig_size
= buffer_
->size();
233 chunker_
->ProcessSkip(partial_
, buffer_
, is_final
, &skip_rows_
, &buffer_
));
234 bytes_skipped
+= orig_size
- buffer_
->size();
235 auto empty
= std::make_shared
<Buffer
>(nullptr, 0);
237 // Still have rows beyond this buffer to skip return empty block
238 partial_
= std::move(buffer_
);
239 buffer_
= next_buffer
;
240 return TransformYield
<CSVBlock
>(CSVBlock
{empty
, empty
, empty
, block_index_
++,
241 is_final
, bytes_skipped
,
242 [](int64_t) { return Status::OK(); }});
244 partial_
= std::move(empty
);
247 std::shared_ptr
<Buffer
> completion
;
250 // End of file reached => compute completion from penultimate block
251 RETURN_NOT_OK(chunker_
->ProcessFinal(partial_
, buffer_
, &completion
, &buffer_
));
253 // Get completion of partial from previous block.
255 chunker_
->ProcessWithPartial(partial_
, buffer_
, &completion
, &buffer_
));
257 int64_t bytes_before_buffer
= partial_
->size() + completion
->size();
259 auto consume_bytes
= [this, bytes_before_buffer
,
260 next_buffer
](int64_t nbytes
) -> Status
{
261 DCHECK_GE(nbytes
, 0);
262 auto offset
= nbytes
- bytes_before_buffer
;
265 return Status::Invalid("CSV parser got out of sync with chunker");
267 partial_
= SliceBuffer(buffer_
, offset
);
268 buffer_
= next_buffer
;
272 return TransformYield
<CSVBlock
>(CSVBlock
{partial_
, completion
, buffer_
,
273 block_index_
++, is_final
, bytes_skipped
,
274 std::move(consume_bytes
)});
278 // An object that reads delimited CSV blocks for threaded use.
279 class ThreadedBlockReader
: public BlockReader
{
281 using BlockReader::BlockReader
;
283 static AsyncGenerator
<CSVBlock
> MakeAsyncIterator(
284 AsyncGenerator
<std::shared_ptr
<Buffer
>> buffer_generator
,
285 std::unique_ptr
<Chunker
> chunker
, std::shared_ptr
<Buffer
> first_buffer
,
287 auto block_reader
= std::make_shared
<ThreadedBlockReader
>(std::move(chunker
),
288 first_buffer
, skip_rows
);
289 // Wrap shared pointer in callable
290 Transformer
<std::shared_ptr
<Buffer
>, CSVBlock
> block_reader_fn
=
291 [block_reader
](std::shared_ptr
<Buffer
> next
) { return (*block_reader
)(next
); };
292 return MakeTransformedGenerator(std::move(buffer_generator
), block_reader_fn
);
295 Result
<TransformFlow
<CSVBlock
>> operator()(std::shared_ptr
<Buffer
> next_buffer
) {
296 if (buffer_
== nullptr) {
298 return TransformFinish();
301 bool is_final
= (next_buffer
== nullptr);
303 auto current_partial
= std::move(partial_
);
304 auto current_buffer
= std::move(buffer_
);
305 int64_t bytes_skipped
= 0;
308 auto orig_size
= current_buffer
->size();
309 bytes_skipped
= current_partial
->size();
310 RETURN_NOT_OK(chunker_
->ProcessSkip(current_partial
, current_buffer
, is_final
,
311 &skip_rows_
, ¤t_buffer
));
312 bytes_skipped
+= orig_size
- current_buffer
->size();
313 current_partial
= std::make_shared
<Buffer
>(nullptr, 0);
315 partial_
= std::move(current_buffer
);
316 buffer_
= std::move(next_buffer
);
317 return TransformYield
<CSVBlock
>(CSVBlock
{current_partial
,
327 std::shared_ptr
<Buffer
> whole
, completion
, next_partial
;
330 // End of file reached => compute completion from penultimate block
332 chunker_
->ProcessFinal(current_partial
, current_buffer
, &completion
, &whole
));
334 // Get completion of partial from previous block.
335 std::shared_ptr
<Buffer
> starts_with_whole
;
336 // Get completion of partial from previous block.
337 RETURN_NOT_OK(chunker_
->ProcessWithPartial(current_partial
, current_buffer
,
338 &completion
, &starts_with_whole
));
340 // Get a complete CSV block inside `partial + block`, and keep
341 // the rest for the next iteration.
342 RETURN_NOT_OK(chunker_
->Process(starts_with_whole
, &whole
, &next_partial
));
345 partial_
= std::move(next_partial
);
346 buffer_
= std::move(next_buffer
);
348 return TransformYield
<CSVBlock
>(CSVBlock
{
349 current_partial
, completion
, whole
, block_index_
++, is_final
, bytes_skipped
, {}});
354 std::shared_ptr
<BlockParser
> parser
;
356 int64_t bytes_parsed_or_skipped
;
359 struct DecodedBlock
{
360 std::shared_ptr
<RecordBatch
> record_batch
;
361 // Represents the number of input bytes represented by this batch
362 // This will include bytes skipped when skipping rows after the header
363 int64_t bytes_processed
;
371 struct IterationTraits
<csv::ParsedBlock
> {
372 static csv::ParsedBlock
End() { return csv::ParsedBlock
{nullptr, -1, -1}; }
373 static bool IsEnd(const csv::ParsedBlock
& val
) { return val
.block_index
< 0; }
377 struct IterationTraits
<csv::DecodedBlock
> {
378 static csv::DecodedBlock
End() { return csv::DecodedBlock
{nullptr, -1}; }
379 static bool IsEnd(const csv::DecodedBlock
& val
) { return val
.bytes_processed
< 0; }
385 // A function object that takes in a buffer of CSV data and returns a parsed batch of CSV
386 // data (CSVBlock -> ParsedBlock) for use with MakeMappedGenerator.
387 // The parsed batch contains a list of offsets for each of the columns so that columns
388 // can be individually scanned
390 // This operator is not re-entrant
391 class BlockParsingOperator
{
393 BlockParsingOperator(io::IOContext io_context
, ParseOptions parse_options
,
394 int num_csv_cols
, int64_t first_row
)
395 : io_context_(io_context
),
396 parse_options_(parse_options
),
397 num_csv_cols_(num_csv_cols
),
398 count_rows_(first_row
>= 0),
399 num_rows_seen_(first_row
) {}
401 Result
<ParsedBlock
> operator()(const CSVBlock
& block
) {
402 constexpr int32_t max_num_rows
= std::numeric_limits
<int32_t>::max();
403 auto parser
= std::make_shared
<BlockParser
>(
404 io_context_
.pool(), parse_options_
, num_csv_cols_
, num_rows_seen_
, max_num_rows
);
406 std::shared_ptr
<Buffer
> straddling
;
407 std::vector
<util::string_view
> views
;
408 if (block
.partial
->size() != 0 || block
.completion
->size() != 0) {
409 if (block
.partial
->size() == 0) {
410 straddling
= block
.completion
;
411 } else if (block
.completion
->size() == 0) {
412 straddling
= block
.partial
;
414 ARROW_ASSIGN_OR_RAISE(
416 ConcatenateBuffers({block
.partial
, block
.completion
}, io_context_
.pool()));
418 views
= {util::string_view(*straddling
), util::string_view(*block
.buffer
)};
420 views
= {util::string_view(*block
.buffer
)};
422 uint32_t parsed_size
;
423 if (block
.is_final
) {
424 RETURN_NOT_OK(parser
->ParseFinal(views
, &parsed_size
));
426 RETURN_NOT_OK(parser
->Parse(views
, &parsed_size
));
429 num_rows_seen_
+= parser
->total_num_rows();
431 RETURN_NOT_OK(block
.consume_bytes(parsed_size
));
432 return ParsedBlock
{std::move(parser
), block
.block_index
,
433 static_cast<int64_t>(parsed_size
) + block
.bytes_skipped
};
437 io::IOContext io_context_
;
438 ParseOptions parse_options_
;
441 int64_t num_rows_seen_
;
444 // A function object that takes in parsed batch of CSV data and decodes it to an arrow
445 // record batch (ParsedBlock -> DecodedBlock) for use with MakeMappedGenerator.
446 class BlockDecodingOperator
{
448 Future
<DecodedBlock
> operator()(const ParsedBlock
& block
) {
449 DCHECK(!state_
->column_decoders
.empty());
450 std::vector
<Future
<std::shared_ptr
<Array
>>> decoded_array_futs
;
451 for (auto& decoder
: state_
->column_decoders
) {
452 decoded_array_futs
.push_back(decoder
->Decode(block
.parser
));
454 auto bytes_parsed_or_skipped
= block
.bytes_parsed_or_skipped
;
455 auto decoded_arrays_fut
= All(std::move(decoded_array_futs
));
457 return decoded_arrays_fut
.Then(
458 [state
, bytes_parsed_or_skipped
](
459 const std::vector
<Result
<std::shared_ptr
<Array
>>>& maybe_decoded_arrays
)
460 -> Result
<DecodedBlock
> {
461 ARROW_ASSIGN_OR_RAISE(auto decoded_arrays
,
462 internal::UnwrapOrRaise(maybe_decoded_arrays
));
464 ARROW_ASSIGN_OR_RAISE(auto batch
,
465 state
->DecodedArraysToBatch(std::move(decoded_arrays
)));
466 return DecodedBlock
{std::move(batch
), bytes_parsed_or_skipped
};
470 static Result
<BlockDecodingOperator
> Make(io::IOContext io_context
,
471 ConvertOptions convert_options
,
472 ConversionSchema conversion_schema
) {
473 BlockDecodingOperator
op(std::move(io_context
), std::move(convert_options
),
474 std::move(conversion_schema
));
475 RETURN_NOT_OK(op
.state_
->MakeColumnDecoders(io_context
));
480 BlockDecodingOperator(io::IOContext io_context
, ConvertOptions convert_options
,
481 ConversionSchema conversion_schema
)
482 : state_(std::make_shared
<State
>(std::move(io_context
), std::move(convert_options
),
483 std::move(conversion_schema
))) {}
486 State(io::IOContext io_context
, ConvertOptions convert_options
,
487 ConversionSchema conversion_schema
)
488 : convert_options(std::move(convert_options
)),
489 conversion_schema(std::move(conversion_schema
)) {}
491 Result
<std::shared_ptr
<RecordBatch
>> DecodedArraysToBatch(
492 std::vector
<std::shared_ptr
<Array
>> arrays
) {
493 const auto n_rows
= arrays
[0]->length();
495 if (schema
== nullptr) {
496 FieldVector
fields(arrays
.size());
497 for (size_t i
= 0; i
< arrays
.size(); ++i
) {
498 fields
[i
] = field(conversion_schema
.columns
[i
].name
, arrays
[i
]->type());
502 // No rows so schema is not reliable. return RecordBatch but do not set schema
503 return RecordBatch::Make(arrow::schema(std::move(fields
)), n_rows
,
507 schema
= arrow::schema(std::move(fields
));
510 return RecordBatch::Make(schema
, n_rows
, std::move(arrays
));
513 // Make column decoders from conversion schema
514 Status
MakeColumnDecoders(io::IOContext io_context
) {
515 for (const auto& column
: conversion_schema
.columns
) {
516 std::shared_ptr
<ColumnDecoder
> decoder
;
517 if (column
.is_missing
) {
518 ARROW_ASSIGN_OR_RAISE(decoder
,
519 ColumnDecoder::MakeNull(io_context
.pool(), column
.type
));
520 } else if (column
.type
!= nullptr) {
521 ARROW_ASSIGN_OR_RAISE(
522 decoder
, ColumnDecoder::Make(io_context
.pool(), column
.type
, column
.index
,
525 ARROW_ASSIGN_OR_RAISE(
527 ColumnDecoder::Make(io_context
.pool(), column
.index
, convert_options
));
529 column_decoders
.push_back(std::move(decoder
));
534 ConvertOptions convert_options
;
535 ConversionSchema conversion_schema
;
536 std::vector
<std::shared_ptr
<ColumnDecoder
>> column_decoders
;
537 std::shared_ptr
<Schema
> schema
;
540 std::shared_ptr
<State
> state_
;
543 /////////////////////////////////////////////////////////////////////////
544 // Base class for common functionality
548 ReaderMixin(io::IOContext io_context
, std::shared_ptr
<io::InputStream
> input
,
549 const ReadOptions
& read_options
, const ParseOptions
& parse_options
,
550 const ConvertOptions
& convert_options
, bool count_rows
)
551 : io_context_(std::move(io_context
)),
552 read_options_(read_options
),
553 parse_options_(parse_options
),
554 convert_options_(convert_options
),
555 count_rows_(count_rows
),
556 num_rows_seen_(count_rows_
? 1 : -1),
557 input_(std::move(input
)) {}
560 // Read header and column names from buffer, create column builders
561 // Returns the # of bytes consumed
562 Result
<int64_t> ProcessHeader(const std::shared_ptr
<Buffer
>& buf
,
563 std::shared_ptr
<Buffer
>* rest
) {
564 const uint8_t* data
= buf
->data();
565 const auto data_end
= data
+ buf
->size();
566 DCHECK_GT(data_end
- data
, 0);
568 if (read_options_
.skip_rows
) {
569 // Skip initial rows (potentially invalid CSV data)
570 auto num_skipped_rows
= SkipRows(data
, static_cast<uint32_t>(data_end
- data
),
571 read_options_
.skip_rows
, &data
);
572 if (num_skipped_rows
< read_options_
.skip_rows
) {
573 return Status::Invalid(
574 "Could not skip initial ", read_options_
.skip_rows
,
575 " rows from CSV file, "
576 "either file is too short or header is larger than block size");
579 num_rows_seen_
+= num_skipped_rows
;
583 if (read_options_
.column_names
.empty()) {
584 // Parse one row (either to read column names or to know the number of columns)
585 BlockParser
parser(io_context_
.pool(), parse_options_
, num_csv_cols_
,
587 uint32_t parsed_size
= 0;
588 RETURN_NOT_OK(parser
.Parse(
589 util::string_view(reinterpret_cast<const char*>(data
), data_end
- data
),
591 if (parser
.num_rows() != 1) {
592 return Status::Invalid(
593 "Could not read first row from CSV file, either "
594 "file is too short or header is larger than block size");
596 if (parser
.num_cols() == 0) {
597 return Status::Invalid("No columns in CSV file");
600 if (read_options_
.autogenerate_column_names
) {
601 column_names_
= GenerateColumnNames(parser
.num_cols());
603 // Read column names from header row
604 auto visit
= [&](const uint8_t* data
, uint32_t size
, bool quoted
) -> Status
{
605 column_names_
.emplace_back(reinterpret_cast<const char*>(data
), size
);
608 RETURN_NOT_OK(parser
.VisitLastRow(visit
));
609 DCHECK_EQ(static_cast<size_t>(parser
.num_cols()), column_names_
.size());
610 // Skip parsed header row
617 column_names_
= read_options_
.column_names
;
621 // increase rows seen to skip past rows which will be skipped
622 num_rows_seen_
+= read_options_
.skip_rows_after_names
;
625 auto bytes_consumed
= data
- buf
->data();
626 *rest
= SliceBuffer(buf
, bytes_consumed
);
628 num_csv_cols_
= static_cast<int32_t>(column_names_
.size());
629 DCHECK_GT(num_csv_cols_
, 0);
631 RETURN_NOT_OK(MakeConversionSchema());
632 return bytes_consumed
;
635 std::vector
<std::string
> GenerateColumnNames(int32_t num_cols
) {
636 std::vector
<std::string
> res
;
637 res
.reserve(num_cols
);
638 for (int32_t i
= 0; i
< num_cols
; ++i
) {
639 std::stringstream ss
;
641 res
.push_back(ss
.str());
646 // Make conversion schema from options and parsed CSV header
647 Status
MakeConversionSchema() {
648 // Append a column converted from CSV data
649 auto append_csv_column
= [&](std::string col_name
, int32_t col_index
) {
650 // Does the named column have a fixed type?
651 auto it
= convert_options_
.column_types
.find(col_name
);
652 if (it
== convert_options_
.column_types
.end()) {
653 conversion_schema_
.columns
.push_back(
654 ConversionSchema::InferredColumn(std::move(col_name
), col_index
));
656 conversion_schema_
.columns
.push_back(
657 ConversionSchema::TypedColumn(std::move(col_name
), col_index
, it
->second
));
661 // Append a column of nulls
662 auto append_null_column
= [&](std::string col_name
) {
663 // If the named column has a fixed type, use it, otherwise use null()
664 std::shared_ptr
<DataType
> type
;
665 auto it
= convert_options_
.column_types
.find(col_name
);
666 if (it
== convert_options_
.column_types
.end()) {
671 conversion_schema_
.columns
.push_back(
672 ConversionSchema::NullColumn(std::move(col_name
), std::move(type
)));
675 if (convert_options_
.include_columns
.empty()) {
676 // Include all columns in CSV file order
677 for (int32_t col_index
= 0; col_index
< num_csv_cols_
; ++col_index
) {
678 append_csv_column(column_names_
[col_index
], col_index
);
681 // Include columns from `include_columns` (in that order)
682 // Compute indices of columns in the CSV file
683 std::unordered_map
<std::string
, int32_t> col_indices
;
684 col_indices
.reserve(column_names_
.size());
685 for (int32_t i
= 0; i
< static_cast<int32_t>(column_names_
.size()); ++i
) {
686 col_indices
.emplace(column_names_
[i
], i
);
689 for (const auto& col_name
: convert_options_
.include_columns
) {
690 auto it
= col_indices
.find(col_name
);
691 if (it
!= col_indices
.end()) {
692 append_csv_column(col_name
, it
->second
);
693 } else if (convert_options_
.include_missing_columns
) {
694 append_null_column(col_name
);
696 return Status::KeyError("Column '", col_name
,
697 "' in include_columns "
698 "does not exist in CSV file");
706 std::shared_ptr
<BlockParser
> parser
;
707 int64_t parsed_bytes
;
710 Result
<ParseResult
> Parse(const std::shared_ptr
<Buffer
>& partial
,
711 const std::shared_ptr
<Buffer
>& completion
,
712 const std::shared_ptr
<Buffer
>& block
, int64_t block_index
,
714 static constexpr int32_t max_num_rows
= std::numeric_limits
<int32_t>::max();
715 auto parser
= std::make_shared
<BlockParser
>(
716 io_context_
.pool(), parse_options_
, num_csv_cols_
, num_rows_seen_
, max_num_rows
);
718 std::shared_ptr
<Buffer
> straddling
;
719 std::vector
<util::string_view
> views
;
720 if (partial
->size() != 0 || completion
->size() != 0) {
721 if (partial
->size() == 0) {
722 straddling
= completion
;
723 } else if (completion
->size() == 0) {
724 straddling
= partial
;
726 ARROW_ASSIGN_OR_RAISE(
727 straddling
, ConcatenateBuffers({partial
, completion
}, io_context_
.pool()));
729 views
= {util::string_view(*straddling
), util::string_view(*block
)};
731 views
= {util::string_view(*block
)};
733 uint32_t parsed_size
;
735 RETURN_NOT_OK(parser
->ParseFinal(views
, &parsed_size
));
737 RETURN_NOT_OK(parser
->Parse(views
, &parsed_size
));
740 num_rows_seen_
+= parser
->total_num_rows();
742 return ParseResult
{std::move(parser
), static_cast<int64_t>(parsed_size
)};
745 io::IOContext io_context_
;
746 ReadOptions read_options_
;
747 ParseOptions parse_options_
;
748 ConvertOptions convert_options_
;
750 // Number of columns in the CSV file
751 int32_t num_csv_cols_
= -1;
752 // Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed
754 // Number of rows seen in the csv. Not used if count_rows is false
755 int64_t num_rows_seen_
;
756 // Column names in the CSV file
757 std::vector
<std::string
> column_names_
;
758 ConversionSchema conversion_schema_
;
760 std::shared_ptr
<io::InputStream
> input_
;
761 std::shared_ptr
<internal::TaskGroup
> task_group_
;
764 /////////////////////////////////////////////////////////////////////////
765 // Base class for one-shot table readers
767 class BaseTableReader
: public ReaderMixin
, public csv::TableReader
{
769 using ReaderMixin::ReaderMixin
;
771 virtual Status
Init() = 0;
773 Future
<std::shared_ptr
<Table
>> ReadAsync() override
{
774 return Future
<std::shared_ptr
<Table
>>::MakeFinished(Read());
778 // Make column builders from conversion schema
779 Status
MakeColumnBuilders() {
780 for (const auto& column
: conversion_schema_
.columns
) {
781 std::shared_ptr
<ColumnBuilder
> builder
;
782 if (column
.is_missing
) {
783 ARROW_ASSIGN_OR_RAISE(builder
, ColumnBuilder::MakeNull(io_context_
.pool(),
784 column
.type
, task_group_
));
785 } else if (column
.type
!= nullptr) {
786 ARROW_ASSIGN_OR_RAISE(
787 builder
, ColumnBuilder::Make(io_context_
.pool(), column
.type
, column
.index
,
788 convert_options_
, task_group_
));
790 ARROW_ASSIGN_OR_RAISE(builder
,
791 ColumnBuilder::Make(io_context_
.pool(), column
.index
,
792 convert_options_
, task_group_
));
794 column_builders_
.push_back(std::move(builder
));
799 Result
<int64_t> ParseAndInsert(const std::shared_ptr
<Buffer
>& partial
,
800 const std::shared_ptr
<Buffer
>& completion
,
801 const std::shared_ptr
<Buffer
>& block
,
802 int64_t block_index
, bool is_final
) {
803 ARROW_ASSIGN_OR_RAISE(auto result
,
804 Parse(partial
, completion
, block
, block_index
, is_final
));
805 RETURN_NOT_OK(ProcessData(result
.parser
, block_index
));
806 return result
.parsed_bytes
;
809 // Trigger conversion of parsed block data
810 Status
ProcessData(const std::shared_ptr
<BlockParser
>& parser
, int64_t block_index
) {
811 for (auto& builder
: column_builders_
) {
812 builder
->Insert(block_index
, parser
);
817 Result
<std::shared_ptr
<Table
>> MakeTable() {
818 DCHECK_EQ(column_builders_
.size(), conversion_schema_
.columns
.size());
820 std::vector
<std::shared_ptr
<Field
>> fields
;
821 std::vector
<std::shared_ptr
<ChunkedArray
>> columns
;
823 for (int32_t i
= 0; i
< static_cast<int32_t>(column_builders_
.size()); ++i
) {
824 const auto& column
= conversion_schema_
.columns
[i
];
825 ARROW_ASSIGN_OR_RAISE(auto array
, column_builders_
[i
]->Finish());
826 fields
.push_back(::arrow::field(column
.name
, array
->type()));
827 columns
.emplace_back(std::move(array
));
829 return Table::Make(schema(std::move(fields
)), std::move(columns
));
832 // Column builders for target Table (in ConversionSchema order)
833 std::vector
<std::shared_ptr
<ColumnBuilder
>> column_builders_
;
836 /////////////////////////////////////////////////////////////////////////
837 // Base class for streaming readers
839 class StreamingReaderImpl
: public ReaderMixin
,
840 public csv::StreamingReader
,
841 public std::enable_shared_from_this
<StreamingReaderImpl
> {
843 StreamingReaderImpl(io::IOContext io_context
, std::shared_ptr
<io::InputStream
> input
,
844 const ReadOptions
& read_options
, const ParseOptions
& parse_options
,
845 const ConvertOptions
& convert_options
, bool count_rows
)
846 : ReaderMixin(io_context
, std::move(input
), read_options
, parse_options
,
847 convert_options
, count_rows
),
848 bytes_decoded_(std::make_shared
<std::atomic
<int64_t>>(0)) {}
850 Future
<> Init(Executor
* cpu_executor
) {
851 ARROW_ASSIGN_OR_RAISE(auto istream_it
,
852 io::MakeInputStreamIterator(input_
, read_options_
.block_size
));
854 // TODO Consider exposing readahead as a read option (ARROW-12090)
855 ARROW_ASSIGN_OR_RAISE(auto bg_it
, MakeBackgroundGenerator(std::move(istream_it
),
856 io_context_
.executor()));
858 auto transferred_it
= MakeTransferredGenerator(bg_it
, cpu_executor
);
860 auto buffer_generator
= CSVBufferIterator::MakeAsync(std::move(transferred_it
));
862 int max_readahead
= cpu_executor
->GetCapacity();
863 auto self
= shared_from_this();
865 return buffer_generator().Then([self
, buffer_generator
, max_readahead
](
866 const std::shared_ptr
<Buffer
>& first_buffer
) {
867 return self
->InitAfterFirstBuffer(first_buffer
, buffer_generator
, max_readahead
);
871 std::shared_ptr
<Schema
> schema() const override
{ return schema_
; }
873 int64_t bytes_read() const override
{ return bytes_decoded_
->load(); }
875 Status
ReadNext(std::shared_ptr
<RecordBatch
>* batch
) override
{
876 auto next_fut
= ReadNextAsync();
877 auto next_result
= next_fut
.result();
878 return std::move(next_result
).Value(batch
);
881 Future
<std::shared_ptr
<RecordBatch
>> ReadNextAsync() override
{
882 return record_batch_gen_();
886 Future
<> InitAfterFirstBuffer(const std::shared_ptr
<Buffer
>& first_buffer
,
887 AsyncGenerator
<std::shared_ptr
<Buffer
>> buffer_generator
,
889 if (first_buffer
== nullptr) {
890 return Status::Invalid("Empty CSV file");
893 std::shared_ptr
<Buffer
> after_header
;
894 ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed
,
895 ProcessHeader(first_buffer
, &after_header
));
896 bytes_decoded_
->fetch_add(header_bytes_consumed
);
899 BlockParsingOperator(io_context_
, parse_options_
, num_csv_cols_
, num_rows_seen_
);
900 ARROW_ASSIGN_OR_RAISE(
902 BlockDecodingOperator::Make(io_context_
, convert_options_
, conversion_schema_
));
904 auto block_gen
= SerialBlockReader::MakeAsyncIterator(
905 std::move(buffer_generator
), MakeChunker(parse_options_
), std::move(after_header
),
906 read_options_
.skip_rows_after_names
);
907 auto parsed_block_gen
=
908 MakeMappedGenerator(std::move(block_gen
), std::move(parser_op
));
909 auto rb_gen
= MakeMappedGenerator(std::move(parsed_block_gen
), std::move(decoder_op
));
911 auto self
= shared_from_this();
912 return rb_gen().Then([self
, rb_gen
, max_readahead
](const DecodedBlock
& first_block
) {
913 return self
->InitFromBlock(first_block
, std::move(rb_gen
), max_readahead
, 0);
917 Future
<> InitFromBlock(const DecodedBlock
& block
,
918 AsyncGenerator
<DecodedBlock
> batch_gen
, int max_readahead
,
919 int64_t prev_bytes_processed
) {
920 if (!block
.record_batch
) {
921 // End of file just return null batches
922 record_batch_gen_
= MakeEmptyGenerator
<std::shared_ptr
<RecordBatch
>>();
926 schema_
= block
.record_batch
->schema();
928 if (block
.record_batch
->num_rows() == 0) {
929 // Keep consuming blocks until the first non empty block is found
930 auto self
= shared_from_this();
931 prev_bytes_processed
+= block
.bytes_processed
;
932 return batch_gen().Then([self
, batch_gen
, max_readahead
,
933 prev_bytes_processed
](const DecodedBlock
& next_block
) {
934 return self
->InitFromBlock(next_block
, std::move(batch_gen
), max_readahead
,
935 prev_bytes_processed
);
939 AsyncGenerator
<DecodedBlock
> readahead_gen
;
940 if (read_options_
.use_threads
) {
941 readahead_gen
= MakeReadaheadGenerator(std::move(batch_gen
), max_readahead
);
943 readahead_gen
= std::move(batch_gen
);
946 AsyncGenerator
<DecodedBlock
> restarted_gen
=
947 MakeGeneratorStartsWith({block
}, std::move(readahead_gen
));
949 auto bytes_decoded
= bytes_decoded_
;
950 auto unwrap_and_record_bytes
=
951 [bytes_decoded
, prev_bytes_processed
](
952 const DecodedBlock
& block
) mutable -> Result
<std::shared_ptr
<RecordBatch
>> {
953 bytes_decoded
->fetch_add(block
.bytes_processed
+ prev_bytes_processed
);
954 prev_bytes_processed
= 0;
955 return block
.record_batch
;
959 MakeMappedGenerator(std::move(restarted_gen
), std::move(unwrap_and_record_bytes
));
961 record_batch_gen_
= MakeCancellable(std::move(unwrapped
), io_context_
.stop_token());
965 std::shared_ptr
<Schema
> schema_
;
966 AsyncGenerator
<std::shared_ptr
<RecordBatch
>> record_batch_gen_
;
967 // bytes which have been decoded and asked for by the caller
968 std::shared_ptr
<std::atomic
<int64_t>> bytes_decoded_
;
971 /////////////////////////////////////////////////////////////////////////
972 // Serial TableReader implementation
974 class SerialTableReader
: public BaseTableReader
{
976 using BaseTableReader::BaseTableReader
;
978 Status
Init() override
{
979 ARROW_ASSIGN_OR_RAISE(auto istream_it
,
980 io::MakeInputStreamIterator(input_
, read_options_
.block_size
));
982 // Since we're converting serially, no need to readahead more than one block
983 int32_t block_queue_size
= 1;
984 ARROW_ASSIGN_OR_RAISE(auto rh_it
,
985 MakeReadaheadIterator(std::move(istream_it
), block_queue_size
));
986 buffer_iterator_
= CSVBufferIterator::Make(std::move(rh_it
));
990 Result
<std::shared_ptr
<Table
>> Read() override
{
991 task_group_
= internal::TaskGroup::MakeSerial(io_context_
.stop_token());
994 ARROW_ASSIGN_OR_RAISE(auto first_buffer
, buffer_iterator_
.Next());
995 if (first_buffer
== nullptr) {
996 return Status::Invalid("Empty CSV file");
998 RETURN_NOT_OK(ProcessHeader(first_buffer
, &first_buffer
));
999 RETURN_NOT_OK(MakeColumnBuilders());
1001 auto block_iterator
= SerialBlockReader::MakeIterator(
1002 std::move(buffer_iterator_
), MakeChunker(parse_options_
), std::move(first_buffer
),
1003 read_options_
.skip_rows_after_names
);
1005 RETURN_NOT_OK(io_context_
.stop_token().Poll());
1007 ARROW_ASSIGN_OR_RAISE(auto maybe_block
, block_iterator
.Next());
1008 if (IsIterationEnd(maybe_block
)) {
1012 ARROW_ASSIGN_OR_RAISE(
1013 int64_t parsed_bytes
,
1014 ParseAndInsert(maybe_block
.partial
, maybe_block
.completion
, maybe_block
.buffer
,
1015 maybe_block
.block_index
, maybe_block
.is_final
));
1016 RETURN_NOT_OK(maybe_block
.consume_bytes(parsed_bytes
));
1018 // Finish conversion, create schema and table
1019 RETURN_NOT_OK(task_group_
->Finish());
1024 Iterator
<std::shared_ptr
<Buffer
>> buffer_iterator_
;
1027 class AsyncThreadedTableReader
1028 : public BaseTableReader
,
1029 public std::enable_shared_from_this
<AsyncThreadedTableReader
> {
1031 using BaseTableReader::BaseTableReader
;
1033 AsyncThreadedTableReader(io::IOContext io_context
,
1034 std::shared_ptr
<io::InputStream
> input
,
1035 const ReadOptions
& read_options
,
1036 const ParseOptions
& parse_options
,
1037 const ConvertOptions
& convert_options
, Executor
* cpu_executor
)
1038 // Count rows is currently not supported during parallel read
1039 : BaseTableReader(std::move(io_context
), input
, read_options
, parse_options
,
1040 convert_options
, /*count_rows=*/false),
1041 cpu_executor_(cpu_executor
) {}
1043 ~AsyncThreadedTableReader() override
{
1045 // In case of error, make sure all pending tasks are finished before
1046 // we start destroying BaseTableReader members
1047 ARROW_UNUSED(task_group_
->Finish());
1051 Status
Init() override
{
1052 ARROW_ASSIGN_OR_RAISE(auto istream_it
,
1053 io::MakeInputStreamIterator(input_
, read_options_
.block_size
));
1055 int max_readahead
= cpu_executor_
->GetCapacity();
1056 int readahead_restart
= std::max(1, max_readahead
/ 2);
1058 ARROW_ASSIGN_OR_RAISE(
1059 auto bg_it
, MakeBackgroundGenerator(std::move(istream_it
), io_context_
.executor(),
1060 max_readahead
, readahead_restart
));
1062 auto transferred_it
= MakeTransferredGenerator(bg_it
, cpu_executor_
);
1063 buffer_generator_
= CSVBufferIterator::MakeAsync(std::move(transferred_it
));
1064 return Status::OK();
1067 Result
<std::shared_ptr
<Table
>> Read() override
{ return ReadAsync().result(); }
1069 Future
<std::shared_ptr
<Table
>> ReadAsync() override
{
1071 internal::TaskGroup::MakeThreaded(cpu_executor_
, io_context_
.stop_token());
1073 auto self
= shared_from_this();
1074 return ProcessFirstBuffer().Then([self
](const std::shared_ptr
<Buffer
>& first_buffer
) {
1075 auto block_generator
= ThreadedBlockReader::MakeAsyncIterator(
1076 self
->buffer_generator_
, MakeChunker(self
->parse_options_
),
1077 std::move(first_buffer
), self
->read_options_
.skip_rows_after_names
);
1079 std::function
<Status(CSVBlock
)> block_visitor
=
1080 [self
](CSVBlock maybe_block
) -> Status
{
1081 // The logic in VisitAsyncGenerator ensures that we will never be
1082 // passed an empty block (visit does not call with the end token) so
1083 // we can be assured maybe_block has a value.
1084 DCHECK_GE(maybe_block
.block_index
, 0);
1085 DCHECK(!maybe_block
.consume_bytes
);
1087 // Launch parse task
1088 self
->task_group_
->Append([self
, maybe_block
] {
1090 ->ParseAndInsert(maybe_block
.partial
, maybe_block
.completion
,
1091 maybe_block
.buffer
, maybe_block
.block_index
,
1092 maybe_block
.is_final
)
1095 return Status::OK();
1098 return VisitAsyncGenerator(std::move(block_generator
), block_visitor
)
1099 .Then([self
]() -> Future
<> {
1100 // By this point we've added all top level tasks so it is safe to call
1102 return self
->task_group_
->FinishAsync();
1104 .Then([self
]() -> Result
<std::shared_ptr
<Table
>> {
1105 // Finish conversion, create schema and table
1106 return self
->MakeTable();
1112 Future
<std::shared_ptr
<Buffer
>> ProcessFirstBuffer() {
1114 auto first_buffer_future
= buffer_generator_();
1115 return first_buffer_future
.Then([this](const std::shared_ptr
<Buffer
>& first_buffer
)
1116 -> Result
<std::shared_ptr
<Buffer
>> {
1117 if (first_buffer
== nullptr) {
1118 return Status::Invalid("Empty CSV file");
1120 std::shared_ptr
<Buffer
> first_buffer_processed
;
1121 RETURN_NOT_OK(ProcessHeader(first_buffer
, &first_buffer_processed
));
1122 RETURN_NOT_OK(MakeColumnBuilders());
1123 return first_buffer_processed
;
1127 Executor
* cpu_executor_
;
1128 AsyncGenerator
<std::shared_ptr
<Buffer
>> buffer_generator_
;
1131 Result
<std::shared_ptr
<TableReader
>> MakeTableReader(
1132 MemoryPool
* pool
, io::IOContext io_context
, std::shared_ptr
<io::InputStream
> input
,
1133 const ReadOptions
& read_options
, const ParseOptions
& parse_options
,
1134 const ConvertOptions
& convert_options
) {
1135 RETURN_NOT_OK(parse_options
.Validate());
1136 RETURN_NOT_OK(read_options
.Validate());
1137 RETURN_NOT_OK(convert_options
.Validate());
1138 std::shared_ptr
<BaseTableReader
> reader
;
1139 if (read_options
.use_threads
) {
1140 auto cpu_executor
= internal::GetCpuThreadPool();
1141 reader
= std::make_shared
<AsyncThreadedTableReader
>(
1142 io_context
, input
, read_options
, parse_options
, convert_options
, cpu_executor
);
1144 reader
= std::make_shared
<SerialTableReader
>(io_context
, input
, read_options
,
1145 parse_options
, convert_options
,
1146 /*count_rows=*/true);
1148 RETURN_NOT_OK(reader
->Init());
1152 Future
<std::shared_ptr
<StreamingReader
>> MakeStreamingReader(
1153 io::IOContext io_context
, std::shared_ptr
<io::InputStream
> input
,
1154 internal::Executor
* cpu_executor
, const ReadOptions
& read_options
,
1155 const ParseOptions
& parse_options
, const ConvertOptions
& convert_options
) {
1156 RETURN_NOT_OK(parse_options
.Validate());
1157 RETURN_NOT_OK(read_options
.Validate());
1158 RETURN_NOT_OK(convert_options
.Validate());
1159 std::shared_ptr
<StreamingReaderImpl
> reader
;
1160 reader
= std::make_shared
<StreamingReaderImpl
>(
1161 io_context
, input
, read_options
, parse_options
, convert_options
,
1162 /*count_rows=*/!read_options
.use_threads
|| cpu_executor
->GetCapacity() == 1);
1163 return reader
->Init(cpu_executor
).Then([reader
] {
1164 return std::dynamic_pointer_cast
<StreamingReader
>(reader
);
1168 /////////////////////////////////////////////////////////////////////////
1169 // Row count implementation
1171 class CSVRowCounter
: public ReaderMixin
,
1172 public std::enable_shared_from_this
<CSVRowCounter
> {
1174 CSVRowCounter(io::IOContext io_context
, Executor
* cpu_executor
,
1175 std::shared_ptr
<io::InputStream
> input
, const ReadOptions
& read_options
,
1176 const ParseOptions
& parse_options
)
1177 : ReaderMixin(io_context
, std::move(input
), read_options
, parse_options
,
1178 ConvertOptions::Defaults(), /*count_rows=*/true),
1179 cpu_executor_(cpu_executor
),
1182 Future
<int64_t> Count() {
1183 auto self
= shared_from_this();
1184 return Init(self
).Then([self
]() { return self
->DoCount(self
); });
1188 Future
<> Init(const std::shared_ptr
<CSVRowCounter
>& self
) {
1189 ARROW_ASSIGN_OR_RAISE(auto istream_it
,
1190 io::MakeInputStreamIterator(input_
, read_options_
.block_size
));
1191 // TODO Consider exposing readahead as a read option (ARROW-12090)
1192 ARROW_ASSIGN_OR_RAISE(auto bg_it
, MakeBackgroundGenerator(std::move(istream_it
),
1193 io_context_
.executor()));
1194 auto transferred_it
= MakeTransferredGenerator(bg_it
, cpu_executor_
);
1195 auto buffer_generator
= CSVBufferIterator::MakeAsync(std::move(transferred_it
));
1197 return buffer_generator().Then(
1198 [self
, buffer_generator
](std::shared_ptr
<Buffer
> first_buffer
) {
1199 if (!first_buffer
) {
1200 return Status::Invalid("Empty CSV file");
1202 RETURN_NOT_OK(self
->ProcessHeader(first_buffer
, &first_buffer
));
1203 self
->block_generator_
= SerialBlockReader::MakeAsyncIterator(
1204 buffer_generator
, MakeChunker(self
->parse_options_
),
1205 std::move(first_buffer
), 0);
1206 return Status::OK();
1210 Future
<int64_t> DoCount(const std::shared_ptr
<CSVRowCounter
>& self
) {
1211 // count_cb must return a value instead of Status/Future<> to work with
1212 // MakeMappedGenerator, and it must use a type with a valid end value to work with
1214 std::function
<Result
<util::optional
<int64_t>>(const CSVBlock
&)> count_cb
=
1215 [self
](const CSVBlock
& maybe_block
) -> Result
<util::optional
<int64_t>> {
1216 ARROW_ASSIGN_OR_RAISE(
1218 self
->Parse(maybe_block
.partial
, maybe_block
.completion
, maybe_block
.buffer
,
1219 maybe_block
.block_index
, maybe_block
.is_final
));
1220 RETURN_NOT_OK(maybe_block
.consume_bytes(parser
.parsed_bytes
));
1221 int32_t total_row_count
= parser
.parser
->total_num_rows();
1222 self
->row_count_
+= total_row_count
;
1223 return total_row_count
;
1225 auto count_gen
= MakeMappedGenerator(block_generator_
, std::move(count_cb
));
1226 return DiscardAllFromAsyncGenerator(count_gen
).Then(
1227 [self
]() { return self
->row_count_
; });
1230 Executor
* cpu_executor_
;
1231 AsyncGenerator
<CSVBlock
> block_generator_
;
1237 /////////////////////////////////////////////////////////////////////////
1238 // Factory functions
1240 Result
<std::shared_ptr
<TableReader
>> TableReader::Make(
1241 io::IOContext io_context
, std::shared_ptr
<io::InputStream
> input
,
1242 const ReadOptions
& read_options
, const ParseOptions
& parse_options
,
1243 const ConvertOptions
& convert_options
) {
1244 return MakeTableReader(io_context
.pool(), io_context
, std::move(input
), read_options
,
1245 parse_options
, convert_options
);
1248 Result
<std::shared_ptr
<TableReader
>> TableReader::Make(
1249 MemoryPool
* pool
, io::IOContext io_context
, std::shared_ptr
<io::InputStream
> input
,
1250 const ReadOptions
& read_options
, const ParseOptions
& parse_options
,
1251 const ConvertOptions
& convert_options
) {
1252 return MakeTableReader(pool
, io_context
, std::move(input
), read_options
, parse_options
,
1256 Result
<std::shared_ptr
<StreamingReader
>> StreamingReader::Make(
1257 MemoryPool
* pool
, std::shared_ptr
<io::InputStream
> input
,
1258 const ReadOptions
& read_options
, const ParseOptions
& parse_options
,
1259 const ConvertOptions
& convert_options
) {
1260 auto io_context
= io::IOContext(pool
);
1261 auto cpu_executor
= internal::GetCpuThreadPool();
1262 auto reader_fut
= MakeStreamingReader(io_context
, std::move(input
), cpu_executor
,
1263 read_options
, parse_options
, convert_options
);
1264 auto reader_result
= reader_fut
.result();
1265 ARROW_ASSIGN_OR_RAISE(auto reader
, reader_result
);
1269 Result
<std::shared_ptr
<StreamingReader
>> StreamingReader::Make(
1270 io::IOContext io_context
, std::shared_ptr
<io::InputStream
> input
,
1271 const ReadOptions
& read_options
, const ParseOptions
& parse_options
,
1272 const ConvertOptions
& convert_options
) {
1273 auto cpu_executor
= internal::GetCpuThreadPool();
1274 auto reader_fut
= MakeStreamingReader(io_context
, std::move(input
), cpu_executor
,
1275 read_options
, parse_options
, convert_options
);
1276 auto reader_result
= reader_fut
.result();
1277 ARROW_ASSIGN_OR_RAISE(auto reader
, reader_result
);
1281 Future
<std::shared_ptr
<StreamingReader
>> StreamingReader::MakeAsync(
1282 io::IOContext io_context
, std::shared_ptr
<io::InputStream
> input
,
1283 internal::Executor
* cpu_executor
, const ReadOptions
& read_options
,
1284 const ParseOptions
& parse_options
, const ConvertOptions
& convert_options
) {
1285 return MakeStreamingReader(io_context
, std::move(input
), cpu_executor
, read_options
,
1286 parse_options
, convert_options
);
1289 Future
<int64_t> CountRowsAsync(io::IOContext io_context
,
1290 std::shared_ptr
<io::InputStream
> input
,
1291 internal::Executor
* cpu_executor
,
1292 const ReadOptions
& read_options
,
1293 const ParseOptions
& parse_options
) {
1294 RETURN_NOT_OK(parse_options
.Validate());
1295 RETURN_NOT_OK(read_options
.Validate());
1296 auto counter
= std::make_shared
<CSVRowCounter
>(
1297 io_context
, cpu_executor
, std::move(input
), read_options
, parse_options
);
1298 return counter
->Count();
1303 } // namespace arrow