]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/dataset/file_base.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / file_base.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/dataset/file_base.h"
19
20 #include <arrow/compute/exec/exec_plan.h>
21
22 #include <algorithm>
23 #include <unordered_map>
24 #include <vector>
25
26 #include "arrow/compute/exec/forest_internal.h"
27 #include "arrow/compute/exec/subtree_internal.h"
28 #include "arrow/dataset/dataset_internal.h"
29 #include "arrow/dataset/dataset_writer.h"
30 #include "arrow/dataset/scanner.h"
31 #include "arrow/dataset/scanner_internal.h"
32 #include "arrow/filesystem/filesystem.h"
33 #include "arrow/filesystem/path_util.h"
34 #include "arrow/io/compressed.h"
35 #include "arrow/io/interfaces.h"
36 #include "arrow/io/memory.h"
37 #include "arrow/util/compression.h"
38 #include "arrow/util/iterator.h"
39 #include "arrow/util/macros.h"
40 #include "arrow/util/make_unique.h"
41 #include "arrow/util/map.h"
42 #include "arrow/util/string.h"
43 #include "arrow/util/task_group.h"
44 #include "arrow/util/variant.h"
45
46 namespace arrow {
47
48 using internal::checked_pointer_cast;
49
50 namespace dataset {
51
52 Result<std::shared_ptr<io::RandomAccessFile>> FileSource::Open() const {
53 if (filesystem_) {
54 return filesystem_->OpenInputFile(file_info_);
55 }
56
57 if (buffer_) {
58 return std::make_shared<io::BufferReader>(buffer_);
59 }
60
61 return custom_open_();
62 }
63
64 Result<std::shared_ptr<io::InputStream>> FileSource::OpenCompressed(
65 util::optional<Compression::type> compression) const {
66 ARROW_ASSIGN_OR_RAISE(auto file, Open());
67 auto actual_compression = Compression::type::UNCOMPRESSED;
68 if (!compression.has_value()) {
69 // Guess compression from file extension
70 auto extension = fs::internal::GetAbstractPathExtension(path());
71 if (extension == "gz") {
72 actual_compression = Compression::type::GZIP;
73 } else {
74 auto maybe_compression = util::Codec::GetCompressionType(extension);
75 if (maybe_compression.ok()) {
76 ARROW_ASSIGN_OR_RAISE(actual_compression, maybe_compression);
77 }
78 }
79 } else {
80 actual_compression = compression.value();
81 }
82 if (actual_compression == Compression::type::UNCOMPRESSED) {
83 return file;
84 }
85 ARROW_ASSIGN_OR_RAISE(auto codec, util::Codec::Create(actual_compression));
86 return io::CompressedInputStream::Make(codec.get(), std::move(file));
87 }
88
89 Future<util::optional<int64_t>> FileFormat::CountRows(
90 const std::shared_ptr<FileFragment>&, compute::Expression,
91 const std::shared_ptr<ScanOptions>&) {
92 return Future<util::optional<int64_t>>::MakeFinished(util::nullopt);
93 }
94
95 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
96 FileSource source, std::shared_ptr<Schema> physical_schema) {
97 return MakeFragment(std::move(source), compute::literal(true),
98 std::move(physical_schema));
99 }
100
101 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
102 FileSource source, compute::Expression partition_expression) {
103 return MakeFragment(std::move(source), std::move(partition_expression), nullptr);
104 }
105
106 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
107 FileSource source, compute::Expression partition_expression,
108 std::shared_ptr<Schema> physical_schema) {
109 return std::shared_ptr<FileFragment>(
110 new FileFragment(std::move(source), shared_from_this(),
111 std::move(partition_expression), std::move(physical_schema)));
112 }
113
114 // The following implementation of ScanBatchesAsync is both ugly and terribly inefficient.
115 // Each of the formats should provide their own efficient implementation. However, this
116 // is a reasonable starting point or implementation for a dummy/mock format.
117 Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
118 const std::shared_ptr<ScanOptions>& scan_options,
119 const std::shared_ptr<FileFragment>& file) const {
120 ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
121 struct State {
122 State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
123 : scan_options(std::move(scan_options)),
124 scan_task_it(std::move(scan_task_it)),
125 current_rb_it(),
126 finished(false) {}
127
128 std::shared_ptr<ScanOptions> scan_options;
129 ScanTaskIterator scan_task_it;
130 RecordBatchIterator current_rb_it;
131 bool finished;
132 };
133 struct Generator {
134 Future<std::shared_ptr<RecordBatch>> operator()() {
135 while (!state->finished) {
136 if (!state->current_rb_it) {
137 RETURN_NOT_OK(PumpScanTask());
138 if (state->finished) {
139 return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
140 }
141 }
142 ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next());
143 if (IsIterationEnd(next_batch)) {
144 state->current_rb_it = RecordBatchIterator();
145 } else {
146 return Future<std::shared_ptr<RecordBatch>>::MakeFinished(next_batch);
147 }
148 }
149 return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
150 }
151 Status PumpScanTask() {
152 ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next());
153 if (IsIterationEnd(next_task)) {
154 state->finished = true;
155 } else {
156 ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute());
157 }
158 return Status::OK();
159 }
160 std::shared_ptr<State> state;
161 };
162 return Generator{std::make_shared<State>(scan_options, std::move(scan_task_it))};
163 }
164
165 Result<std::shared_ptr<Schema>> FileFragment::ReadPhysicalSchemaImpl() {
166 return format_->Inspect(source_);
167 }
168
169 Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options) {
170 auto self = std::dynamic_pointer_cast<FileFragment>(shared_from_this());
171 return format_->ScanFile(options, self);
172 }
173
174 Result<RecordBatchGenerator> FileFragment::ScanBatchesAsync(
175 const std::shared_ptr<ScanOptions>& options) {
176 auto self = std::dynamic_pointer_cast<FileFragment>(shared_from_this());
177 return format_->ScanBatchesAsync(options, self);
178 }
179
180 Future<util::optional<int64_t>> FileFragment::CountRows(
181 compute::Expression predicate, const std::shared_ptr<ScanOptions>& options) {
182 ARROW_ASSIGN_OR_RAISE(predicate, compute::SimplifyWithGuarantee(std::move(predicate),
183 partition_expression_));
184 if (!predicate.IsSatisfiable()) {
185 return Future<util::optional<int64_t>>::MakeFinished(0);
186 }
187 auto self = checked_pointer_cast<FileFragment>(shared_from_this());
188 return format()->CountRows(self, std::move(predicate), options);
189 }
190
191 struct FileSystemDataset::FragmentSubtrees {
192 // Forest for skipping fragments based on extracted subtree expressions
193 compute::Forest forest;
194 // fragment indices and subtree expressions in forest order
195 std::vector<util::Variant<int, compute::Expression>> fragments_and_subtrees;
196 };
197
198 Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
199 std::shared_ptr<Schema> schema, compute::Expression root_partition,
200 std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem,
201 std::vector<std::shared_ptr<FileFragment>> fragments,
202 std::shared_ptr<Partitioning> partitioning) {
203 std::shared_ptr<FileSystemDataset> out(
204 new FileSystemDataset(std::move(schema), std::move(root_partition)));
205 out->format_ = std::move(format);
206 out->filesystem_ = std::move(filesystem);
207 out->fragments_ = std::move(fragments);
208 out->partitioning_ = std::move(partitioning);
209 out->SetupSubtreePruning();
210 return out;
211 }
212
213 Result<std::shared_ptr<Dataset>> FileSystemDataset::ReplaceSchema(
214 std::shared_ptr<Schema> schema) const {
215 RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
216 return Make(std::move(schema), partition_expression_, format_, filesystem_, fragments_);
217 }
218
219 std::vector<std::string> FileSystemDataset::files() const {
220 std::vector<std::string> files;
221
222 for (const auto& fragment : fragments_) {
223 files.push_back(fragment->source().path());
224 }
225
226 return files;
227 }
228
229 std::string FileSystemDataset::ToString() const {
230 std::string repr = "FileSystemDataset:";
231
232 if (fragments_.empty()) {
233 return repr + " []";
234 }
235
236 for (const auto& fragment : fragments_) {
237 repr += "\n" + fragment->source().path();
238
239 const auto& partition = fragment->partition_expression();
240 if (partition != compute::literal(true)) {
241 repr += ": " + partition.ToString();
242 }
243 }
244
245 return repr;
246 }
247
248 void FileSystemDataset::SetupSubtreePruning() {
249 subtrees_ = std::make_shared<FragmentSubtrees>();
250 compute::SubtreeImpl impl;
251
252 auto encoded = impl.EncodeGuarantees(
253 [&](int index) { return fragments_[index]->partition_expression(); },
254 static_cast<int>(fragments_.size()));
255
256 std::sort(encoded.begin(), encoded.end(), compute::SubtreeImpl::ByGuarantee());
257
258 for (const auto& e : encoded) {
259 if (e.index) {
260 subtrees_->fragments_and_subtrees.emplace_back(*e.index);
261 } else {
262 subtrees_->fragments_and_subtrees.emplace_back(impl.GetSubtreeExpression(e));
263 }
264 }
265
266 subtrees_->forest = compute::Forest(static_cast<int>(encoded.size()),
267 compute::SubtreeImpl::IsAncestor{encoded});
268 }
269
270 Result<FragmentIterator> FileSystemDataset::GetFragmentsImpl(
271 compute::Expression predicate) {
272 if (predicate == compute::literal(true)) {
273 // trivial predicate; skip subtree pruning
274 return MakeVectorIterator(FragmentVector(fragments_.begin(), fragments_.end()));
275 }
276
277 std::vector<int> fragment_indices;
278
279 std::vector<compute::Expression> predicates{predicate};
280 RETURN_NOT_OK(subtrees_->forest.Visit(
281 [&](compute::Forest::Ref ref) -> Result<bool> {
282 if (auto fragment_index =
283 util::get_if<int>(&subtrees_->fragments_and_subtrees[ref.i])) {
284 fragment_indices.push_back(*fragment_index);
285 return false;
286 }
287
288 const auto& subtree_expr =
289 util::get<compute::Expression>(subtrees_->fragments_and_subtrees[ref.i]);
290 ARROW_ASSIGN_OR_RAISE(auto simplified,
291 SimplifyWithGuarantee(predicates.back(), subtree_expr));
292
293 if (!simplified.IsSatisfiable()) {
294 return false;
295 }
296
297 predicates.push_back(std::move(simplified));
298 return true;
299 },
300 [&](compute::Forest::Ref ref) { predicates.pop_back(); }));
301
302 std::sort(fragment_indices.begin(), fragment_indices.end());
303
304 FragmentVector fragments(fragment_indices.size());
305 std::transform(fragment_indices.begin(), fragment_indices.end(), fragments.begin(),
306 [this](int i) { return fragments_[i]; });
307
308 return MakeVectorIterator(std::move(fragments));
309 }
310
311 Status FileWriter::Write(RecordBatchReader* batches) {
312 while (true) {
313 ARROW_ASSIGN_OR_RAISE(auto batch, batches->Next());
314 if (batch == nullptr) break;
315 RETURN_NOT_OK(Write(batch));
316 }
317 return Status::OK();
318 }
319
320 Status FileWriter::Finish() {
321 RETURN_NOT_OK(FinishInternal());
322 return destination_->Close();
323 }
324
325 namespace {
326
327 class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {
328 public:
329 DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> schema,
330 std::unique_ptr<internal::DatasetWriter> dataset_writer,
331 FileSystemDatasetWriteOptions write_options,
332 std::shared_ptr<util::AsyncToggle> backpressure_toggle)
333 : schema_(std::move(schema)),
334 dataset_writer_(std::move(dataset_writer)),
335 write_options_(std::move(write_options)),
336 backpressure_toggle_(std::move(backpressure_toggle)) {}
337
338 Status Consume(compute::ExecBatch batch) {
339 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> record_batch,
340 batch.ToRecordBatch(schema_));
341 return WriteNextBatch(std::move(record_batch), batch.guarantee);
342 }
343
344 Future<> Finish() {
345 RETURN_NOT_OK(task_group_.AddTask([this] { return dataset_writer_->Finish(); }));
346 return task_group_.End();
347 }
348
349 private:
350 Status WriteNextBatch(std::shared_ptr<RecordBatch> batch,
351 compute::Expression guarantee) {
352 ARROW_ASSIGN_OR_RAISE(auto groups, write_options_.partitioning->Partition(batch));
353 batch.reset(); // drop to hopefully conserve memory
354
355 if (groups.batches.size() > static_cast<size_t>(write_options_.max_partitions)) {
356 return Status::Invalid("Fragment would be written into ", groups.batches.size(),
357 " partitions. This exceeds the maximum of ",
358 write_options_.max_partitions);
359 }
360
361 for (std::size_t index = 0; index < groups.batches.size(); index++) {
362 auto partition_expression = and_(groups.expressions[index], guarantee);
363 auto next_batch = groups.batches[index];
364 ARROW_ASSIGN_OR_RAISE(std::string destination,
365 write_options_.partitioning->Format(partition_expression));
366 RETURN_NOT_OK(task_group_.AddTask([this, next_batch, destination] {
367 Future<> has_room = dataset_writer_->WriteRecordBatch(next_batch, destination);
368 if (!has_room.is_finished() && backpressure_toggle_) {
369 backpressure_toggle_->Close();
370 return has_room.Then([this] { backpressure_toggle_->Open(); });
371 }
372 return has_room;
373 }));
374 }
375 return Status::OK();
376 }
377
378 std::shared_ptr<Schema> schema_;
379 std::unique_ptr<internal::DatasetWriter> dataset_writer_;
380 FileSystemDatasetWriteOptions write_options_;
381 std::shared_ptr<util::AsyncToggle> backpressure_toggle_;
382 util::SerializedAsyncTaskGroup task_group_;
383 };
384
385 } // namespace
386
387 Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
388 std::shared_ptr<Scanner> scanner) {
389 if (!scanner->options()->use_async) {
390 return Status::Invalid(
391 "A dataset write operation was invoked on a scanner that was configured for "
392 "synchronous scanning. Dataset writing requires a scanner configured for "
393 "asynchronous scanning. Please recreate the scanner with the use_async or "
394 "UseAsync option set to true");
395 }
396 const io::IOContext& io_context = scanner->options()->io_context;
397 std::shared_ptr<compute::ExecContext> exec_context =
398 std::make_shared<compute::ExecContext>(io_context.pool(),
399 ::arrow::internal::GetCpuThreadPool());
400
401 ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
402
403 auto exprs = scanner->options()->projection.call()->arguments;
404 auto names = checked_cast<const compute::MakeStructOptions*>(
405 scanner->options()->projection.call()->options.get())
406 ->field_names;
407 std::shared_ptr<Dataset> dataset = scanner->dataset();
408 std::shared_ptr<util::AsyncToggle> backpressure_toggle =
409 std::make_shared<util::AsyncToggle>();
410
411 RETURN_NOT_OK(
412 compute::Declaration::Sequence(
413 {
414 {"scan", ScanNodeOptions{dataset, scanner->options(), backpressure_toggle}},
415 {"filter", compute::FilterNodeOptions{scanner->options()->filter}},
416 {"project",
417 compute::ProjectNodeOptions{std::move(exprs), std::move(names)}},
418 {"write",
419 WriteNodeOptions{write_options, scanner->options()->projected_schema,
420 backpressure_toggle}},
421 })
422 .AddToPlan(plan.get()));
423
424 RETURN_NOT_OK(plan->StartProducing());
425 return plan->finished().status();
426 }
427
428 Result<compute::ExecNode*> MakeWriteNode(compute::ExecPlan* plan,
429 std::vector<compute::ExecNode*> inputs,
430 const compute::ExecNodeOptions& options) {
431 if (inputs.size() != 1) {
432 return Status::Invalid("Write SinkNode requires exactly 1 input, got ",
433 inputs.size());
434 }
435
436 const WriteNodeOptions write_node_options =
437 checked_cast<const WriteNodeOptions&>(options);
438 const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;
439 const std::shared_ptr<Schema>& schema = write_node_options.schema;
440 const std::shared_ptr<util::AsyncToggle>& backpressure_toggle =
441 write_node_options.backpressure_toggle;
442
443 ARROW_ASSIGN_OR_RAISE(auto dataset_writer,
444 internal::DatasetWriter::Make(write_options));
445
446 std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
447 std::make_shared<DatasetWritingSinkNodeConsumer>(
448 schema, std::move(dataset_writer), write_options, backpressure_toggle);
449
450 ARROW_ASSIGN_OR_RAISE(
451 auto node,
452 compute::MakeExecNode("consuming_sink", plan, std::move(inputs),
453 compute::ConsumingSinkNodeOptions{std::move(consumer)}));
454
455 return node;
456 }
457
458 namespace internal {
459 void InitializeDatasetWriter(arrow::compute::ExecFactoryRegistry* registry) {
460 DCHECK_OK(registry->AddFactory("write", MakeWriteNode));
461 }
462 } // namespace internal
463
464 } // namespace dataset
465
466 } // namespace arrow