]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/dataset/file_orc.h" | |
19 | ||
20 | #include <memory> | |
21 | ||
22 | #include "arrow/adapters/orc/adapter.h" | |
23 | #include "arrow/dataset/dataset_internal.h" | |
24 | #include "arrow/dataset/file_base.h" | |
25 | #include "arrow/dataset/scanner.h" | |
26 | #include "arrow/util/checked_cast.h" | |
27 | #include "arrow/util/iterator.h" | |
28 | #include "arrow/util/logging.h" | |
29 | ||
30 | namespace arrow { | |
31 | ||
32 | using internal::checked_pointer_cast; | |
33 | ||
34 | namespace dataset { | |
35 | ||
36 | namespace { | |
37 | ||
38 | Result<std::unique_ptr<arrow::adapters::orc::ORCFileReader>> OpenORCReader( | |
39 | const FileSource& source, | |
40 | const std::shared_ptr<ScanOptions>& scan_options = nullptr) { | |
41 | ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); | |
42 | ||
43 | arrow::MemoryPool* pool; | |
44 | if (scan_options) { | |
45 | pool = scan_options->pool; | |
46 | } else { | |
47 | pool = default_memory_pool(); | |
48 | } | |
49 | ||
50 | auto reader = arrow::adapters::orc::ORCFileReader::Open(std::move(input), pool); | |
51 | auto status = reader.status(); | |
52 | if (!status.ok()) { | |
53 | return status.WithMessage("Could not open ORC input source '", source.path(), | |
54 | "': ", status.message()); | |
55 | } | |
56 | return reader; | |
57 | } | |
58 | ||
59 | /// \brief A ScanTask backed by an ORC file. | |
60 | class OrcScanTask : public ScanTask { | |
61 | public: | |
62 | OrcScanTask(std::shared_ptr<FileFragment> fragment, | |
63 | std::shared_ptr<ScanOptions> options) | |
64 | : ScanTask(std::move(options), fragment), source_(fragment->source()) {} | |
65 | ||
66 | Result<RecordBatchIterator> Execute() override { | |
67 | struct Impl { | |
68 | static Result<RecordBatchIterator> Make(const FileSource& source, | |
69 | const FileFormat& format, | |
70 | const ScanOptions& scan_options) { | |
71 | ARROW_ASSIGN_OR_RAISE( | |
72 | auto reader, | |
73 | OpenORCReader(source, std::make_shared<ScanOptions>(scan_options))); | |
74 | int num_stripes = reader->NumberOfStripes(); | |
75 | ||
76 | auto materialized_fields = scan_options.MaterializedFields(); | |
77 | // filter out virtual columns | |
78 | std::vector<std::string> included_fields; | |
79 | ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema()); | |
80 | for (auto name : materialized_fields) { | |
81 | FieldRef ref(name); | |
82 | ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema)); | |
83 | if (match.indices().empty()) continue; | |
84 | ||
85 | included_fields.push_back(name); | |
86 | } | |
87 | ||
88 | return RecordBatchIterator( | |
89 | Impl{std::move(reader), 0, num_stripes, included_fields}); | |
90 | } | |
91 | ||
92 | Result<std::shared_ptr<RecordBatch>> Next() { | |
93 | if (i_ == num_stripes_) { | |
94 | return nullptr; | |
95 | } | |
96 | std::shared_ptr<RecordBatch> batch; | |
97 | // TODO (https://issues.apache.org/jira/browse/ARROW-14153) | |
98 | // pass scan_options_->batch_size | |
99 | return reader_->ReadStripe(i_++, included_fields_); | |
100 | } | |
101 | ||
102 | std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader_; | |
103 | int i_; | |
104 | int num_stripes_; | |
105 | std::vector<std::string> included_fields_; | |
106 | }; | |
107 | ||
108 | return Impl::Make(source_, *checked_pointer_cast<FileFragment>(fragment_)->format(), | |
109 | *options_); | |
110 | } | |
111 | ||
112 | private: | |
113 | FileSource source_; | |
114 | }; | |
115 | ||
116 | class OrcScanTaskIterator { | |
117 | public: | |
118 | static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options, | |
119 | std::shared_ptr<FileFragment> fragment) { | |
120 | return ScanTaskIterator(OrcScanTaskIterator(std::move(options), std::move(fragment))); | |
121 | } | |
122 | ||
123 | Result<std::shared_ptr<ScanTask>> Next() { | |
124 | if (once_) { | |
125 | // Iteration is done. | |
126 | return nullptr; | |
127 | } | |
128 | ||
129 | once_ = true; | |
130 | return std::shared_ptr<ScanTask>(new OrcScanTask(fragment_, options_)); | |
131 | } | |
132 | ||
133 | private: | |
134 | OrcScanTaskIterator(std::shared_ptr<ScanOptions> options, | |
135 | std::shared_ptr<FileFragment> fragment) | |
136 | : options_(std::move(options)), fragment_(std::move(fragment)) {} | |
137 | ||
138 | bool once_ = false; | |
139 | std::shared_ptr<ScanOptions> options_; | |
140 | std::shared_ptr<FileFragment> fragment_; | |
141 | }; | |
142 | ||
143 | } // namespace | |
144 | ||
145 | Result<bool> OrcFileFormat::IsSupported(const FileSource& source) const { | |
146 | RETURN_NOT_OK(source.Open().status()); | |
147 | return OpenORCReader(source).ok(); | |
148 | } | |
149 | ||
150 | Result<std::shared_ptr<Schema>> OrcFileFormat::Inspect(const FileSource& source) const { | |
151 | ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source)); | |
152 | return reader->ReadSchema(); | |
153 | } | |
154 | ||
155 | Result<ScanTaskIterator> OrcFileFormat::ScanFile( | |
156 | const std::shared_ptr<ScanOptions>& options, | |
157 | const std::shared_ptr<FileFragment>& fragment) const { | |
158 | return OrcScanTaskIterator::Make(options, fragment); | |
159 | } | |
160 | ||
161 | Future<util::optional<int64_t>> OrcFileFormat::CountRows( | |
162 | const std::shared_ptr<FileFragment>& file, compute::Expression predicate, | |
163 | const std::shared_ptr<ScanOptions>& options) { | |
164 | if (ExpressionHasFieldRefs(predicate)) { | |
165 | return Future<util::optional<int64_t>>::MakeFinished(util::nullopt); | |
166 | } | |
167 | auto self = checked_pointer_cast<OrcFileFormat>(shared_from_this()); | |
168 | return DeferNotOk(options->io_context.executor()->Submit( | |
169 | [self, file]() -> Result<util::optional<int64_t>> { | |
170 | ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(file->source())); | |
171 | return reader->NumberOfRows(); | |
172 | })); | |
173 | } | |
174 | ||
175 | // // | |
176 | // // OrcFileWriter, OrcFileWriteOptions | |
177 | // // | |
178 | ||
179 | std::shared_ptr<FileWriteOptions> OrcFileFormat::DefaultWriteOptions() { | |
180 | // TODO (https://issues.apache.org/jira/browse/ARROW-13796) | |
181 | return nullptr; | |
182 | } | |
183 | ||
184 | Result<std::shared_ptr<FileWriter>> OrcFileFormat::MakeWriter( | |
185 | std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema, | |
186 | std::shared_ptr<FileWriteOptions> options, | |
187 | fs::FileLocator destination_locator) const { | |
188 | // TODO (https://issues.apache.org/jira/browse/ARROW-13796) | |
189 | return Status::NotImplemented("ORC writer not yet implemented."); | |
190 | } | |
191 | ||
192 | } // namespace dataset | |
193 | } // namespace arrow |