1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/dataset/file_base.h"
20 #include <arrow/compute/exec/exec_plan.h>
23 #include <unordered_map>
26 #include "arrow/compute/exec/forest_internal.h"
27 #include "arrow/compute/exec/subtree_internal.h"
28 #include "arrow/dataset/dataset_internal.h"
29 #include "arrow/dataset/dataset_writer.h"
30 #include "arrow/dataset/scanner.h"
31 #include "arrow/dataset/scanner_internal.h"
32 #include "arrow/filesystem/filesystem.h"
33 #include "arrow/filesystem/path_util.h"
34 #include "arrow/io/compressed.h"
35 #include "arrow/io/interfaces.h"
36 #include "arrow/io/memory.h"
37 #include "arrow/util/compression.h"
38 #include "arrow/util/iterator.h"
39 #include "arrow/util/macros.h"
40 #include "arrow/util/make_unique.h"
41 #include "arrow/util/map.h"
42 #include "arrow/util/string.h"
43 #include "arrow/util/task_group.h"
44 #include "arrow/util/variant.h"
48 using internal::checked_pointer_cast
;
52 Result
<std::shared_ptr
<io::RandomAccessFile
>> FileSource::Open() const {
54 return filesystem_
->OpenInputFile(file_info_
);
58 return std::make_shared
<io::BufferReader
>(buffer_
);
61 return custom_open_();
64 Result
<std::shared_ptr
<io::InputStream
>> FileSource::OpenCompressed(
65 util::optional
<Compression::type
> compression
) const {
66 ARROW_ASSIGN_OR_RAISE(auto file
, Open());
67 auto actual_compression
= Compression::type::UNCOMPRESSED
;
68 if (!compression
.has_value()) {
69 // Guess compression from file extension
70 auto extension
= fs::internal::GetAbstractPathExtension(path());
71 if (extension
== "gz") {
72 actual_compression
= Compression::type::GZIP
;
74 auto maybe_compression
= util::Codec::GetCompressionType(extension
);
75 if (maybe_compression
.ok()) {
76 ARROW_ASSIGN_OR_RAISE(actual_compression
, maybe_compression
);
80 actual_compression
= compression
.value();
82 if (actual_compression
== Compression::type::UNCOMPRESSED
) {
85 ARROW_ASSIGN_OR_RAISE(auto codec
, util::Codec::Create(actual_compression
));
86 return io::CompressedInputStream::Make(codec
.get(), std::move(file
));
89 Future
<util::optional
<int64_t>> FileFormat::CountRows(
90 const std::shared_ptr
<FileFragment
>&, compute::Expression
,
91 const std::shared_ptr
<ScanOptions
>&) {
92 return Future
<util::optional
<int64_t>>::MakeFinished(util::nullopt
);
95 Result
<std::shared_ptr
<FileFragment
>> FileFormat::MakeFragment(
96 FileSource source
, std::shared_ptr
<Schema
> physical_schema
) {
97 return MakeFragment(std::move(source
), compute::literal(true),
98 std::move(physical_schema
));
101 Result
<std::shared_ptr
<FileFragment
>> FileFormat::MakeFragment(
102 FileSource source
, compute::Expression partition_expression
) {
103 return MakeFragment(std::move(source
), std::move(partition_expression
), nullptr);
106 Result
<std::shared_ptr
<FileFragment
>> FileFormat::MakeFragment(
107 FileSource source
, compute::Expression partition_expression
,
108 std::shared_ptr
<Schema
> physical_schema
) {
109 return std::shared_ptr
<FileFragment
>(
110 new FileFragment(std::move(source
), shared_from_this(),
111 std::move(partition_expression
), std::move(physical_schema
)));
114 // The following implementation of ScanBatchesAsync is both ugly and terribly inefficient.
115 // Each of the formats should provide their own efficient implementation. However, this
116 // is a reasonable starting point or implementation for a dummy/mock format.
117 Result
<RecordBatchGenerator
> FileFormat::ScanBatchesAsync(
118 const std::shared_ptr
<ScanOptions
>& scan_options
,
119 const std::shared_ptr
<FileFragment
>& file
) const {
120 ARROW_ASSIGN_OR_RAISE(auto scan_task_it
, ScanFile(scan_options
, file
));
122 State(std::shared_ptr
<ScanOptions
> scan_options
, ScanTaskIterator scan_task_it
)
123 : scan_options(std::move(scan_options
)),
124 scan_task_it(std::move(scan_task_it
)),
128 std::shared_ptr
<ScanOptions
> scan_options
;
129 ScanTaskIterator scan_task_it
;
130 RecordBatchIterator current_rb_it
;
134 Future
<std::shared_ptr
<RecordBatch
>> operator()() {
135 while (!state
->finished
) {
136 if (!state
->current_rb_it
) {
137 RETURN_NOT_OK(PumpScanTask());
138 if (state
->finished
) {
139 return AsyncGeneratorEnd
<std::shared_ptr
<RecordBatch
>>();
142 ARROW_ASSIGN_OR_RAISE(auto next_batch
, state
->current_rb_it
.Next());
143 if (IsIterationEnd(next_batch
)) {
144 state
->current_rb_it
= RecordBatchIterator();
146 return Future
<std::shared_ptr
<RecordBatch
>>::MakeFinished(next_batch
);
149 return AsyncGeneratorEnd
<std::shared_ptr
<RecordBatch
>>();
151 Status
PumpScanTask() {
152 ARROW_ASSIGN_OR_RAISE(auto next_task
, state
->scan_task_it
.Next());
153 if (IsIterationEnd(next_task
)) {
154 state
->finished
= true;
156 ARROW_ASSIGN_OR_RAISE(state
->current_rb_it
, next_task
->Execute());
160 std::shared_ptr
<State
> state
;
162 return Generator
{std::make_shared
<State
>(scan_options
, std::move(scan_task_it
))};
165 Result
<std::shared_ptr
<Schema
>> FileFragment::ReadPhysicalSchemaImpl() {
166 return format_
->Inspect(source_
);
169 Result
<ScanTaskIterator
> FileFragment::Scan(std::shared_ptr
<ScanOptions
> options
) {
170 auto self
= std::dynamic_pointer_cast
<FileFragment
>(shared_from_this());
171 return format_
->ScanFile(options
, self
);
174 Result
<RecordBatchGenerator
> FileFragment::ScanBatchesAsync(
175 const std::shared_ptr
<ScanOptions
>& options
) {
176 auto self
= std::dynamic_pointer_cast
<FileFragment
>(shared_from_this());
177 return format_
->ScanBatchesAsync(options
, self
);
180 Future
<util::optional
<int64_t>> FileFragment::CountRows(
181 compute::Expression predicate
, const std::shared_ptr
<ScanOptions
>& options
) {
182 ARROW_ASSIGN_OR_RAISE(predicate
, compute::SimplifyWithGuarantee(std::move(predicate
),
183 partition_expression_
));
184 if (!predicate
.IsSatisfiable()) {
185 return Future
<util::optional
<int64_t>>::MakeFinished(0);
187 auto self
= checked_pointer_cast
<FileFragment
>(shared_from_this());
188 return format()->CountRows(self
, std::move(predicate
), options
);
191 struct FileSystemDataset::FragmentSubtrees
{
192 // Forest for skipping fragments based on extracted subtree expressions
193 compute::Forest forest
;
194 // fragment indices and subtree expressions in forest order
195 std::vector
<util::Variant
<int, compute::Expression
>> fragments_and_subtrees
;
198 Result
<std::shared_ptr
<FileSystemDataset
>> FileSystemDataset::Make(
199 std::shared_ptr
<Schema
> schema
, compute::Expression root_partition
,
200 std::shared_ptr
<FileFormat
> format
, std::shared_ptr
<fs::FileSystem
> filesystem
,
201 std::vector
<std::shared_ptr
<FileFragment
>> fragments
,
202 std::shared_ptr
<Partitioning
> partitioning
) {
203 std::shared_ptr
<FileSystemDataset
> out(
204 new FileSystemDataset(std::move(schema
), std::move(root_partition
)));
205 out
->format_
= std::move(format
);
206 out
->filesystem_
= std::move(filesystem
);
207 out
->fragments_
= std::move(fragments
);
208 out
->partitioning_
= std::move(partitioning
);
209 out
->SetupSubtreePruning();
213 Result
<std::shared_ptr
<Dataset
>> FileSystemDataset::ReplaceSchema(
214 std::shared_ptr
<Schema
> schema
) const {
215 RETURN_NOT_OK(CheckProjectable(*schema_
, *schema
));
216 return Make(std::move(schema
), partition_expression_
, format_
, filesystem_
, fragments_
);
219 std::vector
<std::string
> FileSystemDataset::files() const {
220 std::vector
<std::string
> files
;
222 for (const auto& fragment
: fragments_
) {
223 files
.push_back(fragment
->source().path());
229 std::string
FileSystemDataset::ToString() const {
230 std::string repr
= "FileSystemDataset:";
232 if (fragments_
.empty()) {
236 for (const auto& fragment
: fragments_
) {
237 repr
+= "\n" + fragment
->source().path();
239 const auto& partition
= fragment
->partition_expression();
240 if (partition
!= compute::literal(true)) {
241 repr
+= ": " + partition
.ToString();
248 void FileSystemDataset::SetupSubtreePruning() {
249 subtrees_
= std::make_shared
<FragmentSubtrees
>();
250 compute::SubtreeImpl impl
;
252 auto encoded
= impl
.EncodeGuarantees(
253 [&](int index
) { return fragments_
[index
]->partition_expression(); },
254 static_cast<int>(fragments_
.size()));
256 std::sort(encoded
.begin(), encoded
.end(), compute::SubtreeImpl::ByGuarantee());
258 for (const auto& e
: encoded
) {
260 subtrees_
->fragments_and_subtrees
.emplace_back(*e
.index
);
262 subtrees_
->fragments_and_subtrees
.emplace_back(impl
.GetSubtreeExpression(e
));
266 subtrees_
->forest
= compute::Forest(static_cast<int>(encoded
.size()),
267 compute::SubtreeImpl::IsAncestor
{encoded
});
270 Result
<FragmentIterator
> FileSystemDataset::GetFragmentsImpl(
271 compute::Expression predicate
) {
272 if (predicate
== compute::literal(true)) {
273 // trivial predicate; skip subtree pruning
274 return MakeVectorIterator(FragmentVector(fragments_
.begin(), fragments_
.end()));
277 std::vector
<int> fragment_indices
;
279 std::vector
<compute::Expression
> predicates
{predicate
};
280 RETURN_NOT_OK(subtrees_
->forest
.Visit(
281 [&](compute::Forest::Ref ref
) -> Result
<bool> {
282 if (auto fragment_index
=
283 util::get_if
<int>(&subtrees_
->fragments_and_subtrees
[ref
.i
])) {
284 fragment_indices
.push_back(*fragment_index
);
288 const auto& subtree_expr
=
289 util::get
<compute::Expression
>(subtrees_
->fragments_and_subtrees
[ref
.i
]);
290 ARROW_ASSIGN_OR_RAISE(auto simplified
,
291 SimplifyWithGuarantee(predicates
.back(), subtree_expr
));
293 if (!simplified
.IsSatisfiable()) {
297 predicates
.push_back(std::move(simplified
));
300 [&](compute::Forest::Ref ref
) { predicates
.pop_back(); }));
302 std::sort(fragment_indices
.begin(), fragment_indices
.end());
304 FragmentVector
fragments(fragment_indices
.size());
305 std::transform(fragment_indices
.begin(), fragment_indices
.end(), fragments
.begin(),
306 [this](int i
) { return fragments_
[i
]; });
308 return MakeVectorIterator(std::move(fragments
));
311 Status
FileWriter::Write(RecordBatchReader
* batches
) {
313 ARROW_ASSIGN_OR_RAISE(auto batch
, batches
->Next());
314 if (batch
== nullptr) break;
315 RETURN_NOT_OK(Write(batch
));
320 Status
FileWriter::Finish() {
321 RETURN_NOT_OK(FinishInternal());
322 return destination_
->Close();
327 class DatasetWritingSinkNodeConsumer
: public compute::SinkNodeConsumer
{
329 DatasetWritingSinkNodeConsumer(std::shared_ptr
<Schema
> schema
,
330 std::unique_ptr
<internal::DatasetWriter
> dataset_writer
,
331 FileSystemDatasetWriteOptions write_options
,
332 std::shared_ptr
<util::AsyncToggle
> backpressure_toggle
)
333 : schema_(std::move(schema
)),
334 dataset_writer_(std::move(dataset_writer
)),
335 write_options_(std::move(write_options
)),
336 backpressure_toggle_(std::move(backpressure_toggle
)) {}
338 Status
Consume(compute::ExecBatch batch
) {
339 ARROW_ASSIGN_OR_RAISE(std::shared_ptr
<RecordBatch
> record_batch
,
340 batch
.ToRecordBatch(schema_
));
341 return WriteNextBatch(std::move(record_batch
), batch
.guarantee
);
345 RETURN_NOT_OK(task_group_
.AddTask([this] { return dataset_writer_
->Finish(); }));
346 return task_group_
.End();
350 Status
WriteNextBatch(std::shared_ptr
<RecordBatch
> batch
,
351 compute::Expression guarantee
) {
352 ARROW_ASSIGN_OR_RAISE(auto groups
, write_options_
.partitioning
->Partition(batch
));
353 batch
.reset(); // drop to hopefully conserve memory
355 if (groups
.batches
.size() > static_cast<size_t>(write_options_
.max_partitions
)) {
356 return Status::Invalid("Fragment would be written into ", groups
.batches
.size(),
357 " partitions. This exceeds the maximum of ",
358 write_options_
.max_partitions
);
361 for (std::size_t index
= 0; index
< groups
.batches
.size(); index
++) {
362 auto partition_expression
= and_(groups
.expressions
[index
], guarantee
);
363 auto next_batch
= groups
.batches
[index
];
364 ARROW_ASSIGN_OR_RAISE(std::string destination
,
365 write_options_
.partitioning
->Format(partition_expression
));
366 RETURN_NOT_OK(task_group_
.AddTask([this, next_batch
, destination
] {
367 Future
<> has_room
= dataset_writer_
->WriteRecordBatch(next_batch
, destination
);
368 if (!has_room
.is_finished() && backpressure_toggle_
) {
369 backpressure_toggle_
->Close();
370 return has_room
.Then([this] { backpressure_toggle_
->Open(); });
378 std::shared_ptr
<Schema
> schema_
;
379 std::unique_ptr
<internal::DatasetWriter
> dataset_writer_
;
380 FileSystemDatasetWriteOptions write_options_
;
381 std::shared_ptr
<util::AsyncToggle
> backpressure_toggle_
;
382 util::SerializedAsyncTaskGroup task_group_
;
387 Status
FileSystemDataset::Write(const FileSystemDatasetWriteOptions
& write_options
,
388 std::shared_ptr
<Scanner
> scanner
) {
389 if (!scanner
->options()->use_async
) {
390 return Status::Invalid(
391 "A dataset write operation was invoked on a scanner that was configured for "
392 "synchronous scanning. Dataset writing requires a scanner configured for "
393 "asynchronous scanning. Please recreate the scanner with the use_async or "
394 "UseAsync option set to true");
396 const io::IOContext
& io_context
= scanner
->options()->io_context
;
397 std::shared_ptr
<compute::ExecContext
> exec_context
=
398 std::make_shared
<compute::ExecContext
>(io_context
.pool(),
399 ::arrow::internal::GetCpuThreadPool());
401 ARROW_ASSIGN_OR_RAISE(auto plan
, compute::ExecPlan::Make(exec_context
.get()));
403 auto exprs
= scanner
->options()->projection
.call()->arguments
;
404 auto names
= checked_cast
<const compute::MakeStructOptions
*>(
405 scanner
->options()->projection
.call()->options
.get())
407 std::shared_ptr
<Dataset
> dataset
= scanner
->dataset();
408 std::shared_ptr
<util::AsyncToggle
> backpressure_toggle
=
409 std::make_shared
<util::AsyncToggle
>();
412 compute::Declaration::Sequence(
414 {"scan", ScanNodeOptions
{dataset
, scanner
->options(), backpressure_toggle
}},
415 {"filter", compute::FilterNodeOptions
{scanner
->options()->filter
}},
417 compute::ProjectNodeOptions
{std::move(exprs
), std::move(names
)}},
419 WriteNodeOptions
{write_options
, scanner
->options()->projected_schema
,
420 backpressure_toggle
}},
422 .AddToPlan(plan
.get()));
424 RETURN_NOT_OK(plan
->StartProducing());
425 return plan
->finished().status();
428 Result
<compute::ExecNode
*> MakeWriteNode(compute::ExecPlan
* plan
,
429 std::vector
<compute::ExecNode
*> inputs
,
430 const compute::ExecNodeOptions
& options
) {
431 if (inputs
.size() != 1) {
432 return Status::Invalid("Write SinkNode requires exactly 1 input, got ",
436 const WriteNodeOptions write_node_options
=
437 checked_cast
<const WriteNodeOptions
&>(options
);
438 const FileSystemDatasetWriteOptions
& write_options
= write_node_options
.write_options
;
439 const std::shared_ptr
<Schema
>& schema
= write_node_options
.schema
;
440 const std::shared_ptr
<util::AsyncToggle
>& backpressure_toggle
=
441 write_node_options
.backpressure_toggle
;
443 ARROW_ASSIGN_OR_RAISE(auto dataset_writer
,
444 internal::DatasetWriter::Make(write_options
));
446 std::shared_ptr
<DatasetWritingSinkNodeConsumer
> consumer
=
447 std::make_shared
<DatasetWritingSinkNodeConsumer
>(
448 schema
, std::move(dataset_writer
), write_options
, backpressure_toggle
);
450 ARROW_ASSIGN_OR_RAISE(
452 compute::MakeExecNode("consuming_sink", plan
, std::move(inputs
),
453 compute::ConsumingSinkNodeOptions
{std::move(consumer
)}));
459 void InitializeDatasetWriter(arrow::compute::ExecFactoryRegistry
* registry
) {
460 DCHECK_OK(registry
->AddFactory("write", MakeWriteNode
));
462 } // namespace internal
464 } // namespace dataset