]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/dataset/file_orc.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / file_orc.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/dataset/file_orc.h"
19
20 #include <memory>
21
22 #include "arrow/adapters/orc/adapter.h"
23 #include "arrow/dataset/dataset_internal.h"
24 #include "arrow/dataset/file_base.h"
25 #include "arrow/dataset/scanner.h"
26 #include "arrow/util/checked_cast.h"
27 #include "arrow/util/iterator.h"
28 #include "arrow/util/logging.h"
29
30 namespace arrow {
31
32 using internal::checked_pointer_cast;
33
34 namespace dataset {
35
36 namespace {
37
38 Result<std::unique_ptr<arrow::adapters::orc::ORCFileReader>> OpenORCReader(
39 const FileSource& source,
40 const std::shared_ptr<ScanOptions>& scan_options = nullptr) {
41 ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
42
43 arrow::MemoryPool* pool;
44 if (scan_options) {
45 pool = scan_options->pool;
46 } else {
47 pool = default_memory_pool();
48 }
49
50 auto reader = arrow::adapters::orc::ORCFileReader::Open(std::move(input), pool);
51 auto status = reader.status();
52 if (!status.ok()) {
53 return status.WithMessage("Could not open ORC input source '", source.path(),
54 "': ", status.message());
55 }
56 return reader;
57 }
58
59 /// \brief A ScanTask backed by an ORC file.
60 class OrcScanTask : public ScanTask {
61 public:
62 OrcScanTask(std::shared_ptr<FileFragment> fragment,
63 std::shared_ptr<ScanOptions> options)
64 : ScanTask(std::move(options), fragment), source_(fragment->source()) {}
65
66 Result<RecordBatchIterator> Execute() override {
67 struct Impl {
68 static Result<RecordBatchIterator> Make(const FileSource& source,
69 const FileFormat& format,
70 const ScanOptions& scan_options) {
71 ARROW_ASSIGN_OR_RAISE(
72 auto reader,
73 OpenORCReader(source, std::make_shared<ScanOptions>(scan_options)));
74 int num_stripes = reader->NumberOfStripes();
75
76 auto materialized_fields = scan_options.MaterializedFields();
77 // filter out virtual columns
78 std::vector<std::string> included_fields;
79 ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema());
80 for (auto name : materialized_fields) {
81 FieldRef ref(name);
82 ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema));
83 if (match.indices().empty()) continue;
84
85 included_fields.push_back(name);
86 }
87
88 return RecordBatchIterator(
89 Impl{std::move(reader), 0, num_stripes, included_fields});
90 }
91
92 Result<std::shared_ptr<RecordBatch>> Next() {
93 if (i_ == num_stripes_) {
94 return nullptr;
95 }
96 std::shared_ptr<RecordBatch> batch;
97 // TODO (https://issues.apache.org/jira/browse/ARROW-14153)
98 // pass scan_options_->batch_size
99 return reader_->ReadStripe(i_++, included_fields_);
100 }
101
102 std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader_;
103 int i_;
104 int num_stripes_;
105 std::vector<std::string> included_fields_;
106 };
107
108 return Impl::Make(source_, *checked_pointer_cast<FileFragment>(fragment_)->format(),
109 *options_);
110 }
111
112 private:
113 FileSource source_;
114 };
115
116 class OrcScanTaskIterator {
117 public:
118 static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
119 std::shared_ptr<FileFragment> fragment) {
120 return ScanTaskIterator(OrcScanTaskIterator(std::move(options), std::move(fragment)));
121 }
122
123 Result<std::shared_ptr<ScanTask>> Next() {
124 if (once_) {
125 // Iteration is done.
126 return nullptr;
127 }
128
129 once_ = true;
130 return std::shared_ptr<ScanTask>(new OrcScanTask(fragment_, options_));
131 }
132
133 private:
134 OrcScanTaskIterator(std::shared_ptr<ScanOptions> options,
135 std::shared_ptr<FileFragment> fragment)
136 : options_(std::move(options)), fragment_(std::move(fragment)) {}
137
138 bool once_ = false;
139 std::shared_ptr<ScanOptions> options_;
140 std::shared_ptr<FileFragment> fragment_;
141 };
142
143 } // namespace
144
145 Result<bool> OrcFileFormat::IsSupported(const FileSource& source) const {
146 RETURN_NOT_OK(source.Open().status());
147 return OpenORCReader(source).ok();
148 }
149
150 Result<std::shared_ptr<Schema>> OrcFileFormat::Inspect(const FileSource& source) const {
151 ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source));
152 return reader->ReadSchema();
153 }
154
155 Result<ScanTaskIterator> OrcFileFormat::ScanFile(
156 const std::shared_ptr<ScanOptions>& options,
157 const std::shared_ptr<FileFragment>& fragment) const {
158 return OrcScanTaskIterator::Make(options, fragment);
159 }
160
161 Future<util::optional<int64_t>> OrcFileFormat::CountRows(
162 const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
163 const std::shared_ptr<ScanOptions>& options) {
164 if (ExpressionHasFieldRefs(predicate)) {
165 return Future<util::optional<int64_t>>::MakeFinished(util::nullopt);
166 }
167 auto self = checked_pointer_cast<OrcFileFormat>(shared_from_this());
168 return DeferNotOk(options->io_context.executor()->Submit(
169 [self, file]() -> Result<util::optional<int64_t>> {
170 ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(file->source()));
171 return reader->NumberOfRows();
172 }));
173 }
174
175 // //
176 // // OrcFileWriter, OrcFileWriteOptions
177 // //
178
179 std::shared_ptr<FileWriteOptions> OrcFileFormat::DefaultWriteOptions() {
180 // TODO (https://issues.apache.org/jira/browse/ARROW-13796)
181 return nullptr;
182 }
183
184 Result<std::shared_ptr<FileWriter>> OrcFileFormat::MakeWriter(
185 std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
186 std::shared_ptr<FileWriteOptions> options,
187 fs::FileLocator destination_locator) const {
188 // TODO (https://issues.apache.org/jira/browse/ARROW-13796)
189 return Status::NotImplemented("ORC writer not yet implemented.");
190 }
191
192 } // namespace dataset
193 } // namespace arrow