1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/dataset/file_orc.h"
22 #include "arrow/adapters/orc/adapter.h"
23 #include "arrow/dataset/dataset_internal.h"
24 #include "arrow/dataset/file_base.h"
25 #include "arrow/dataset/scanner.h"
26 #include "arrow/util/checked_cast.h"
27 #include "arrow/util/iterator.h"
28 #include "arrow/util/logging.h"
32 using internal::checked_pointer_cast
;
38 Result
<std::unique_ptr
<arrow::adapters::orc::ORCFileReader
>> OpenORCReader(
39 const FileSource
& source
,
40 const std::shared_ptr
<ScanOptions
>& scan_options
= nullptr) {
41 ARROW_ASSIGN_OR_RAISE(auto input
, source
.Open());
43 arrow::MemoryPool
* pool
;
45 pool
= scan_options
->pool
;
47 pool
= default_memory_pool();
50 auto reader
= arrow::adapters::orc::ORCFileReader::Open(std::move(input
), pool
);
51 auto status
= reader
.status();
53 return status
.WithMessage("Could not open ORC input source '", source
.path(),
54 "': ", status
.message());
59 /// \brief A ScanTask backed by an ORC file.
60 class OrcScanTask
: public ScanTask
{
62 OrcScanTask(std::shared_ptr
<FileFragment
> fragment
,
63 std::shared_ptr
<ScanOptions
> options
)
64 : ScanTask(std::move(options
), fragment
), source_(fragment
->source()) {}
66 Result
<RecordBatchIterator
> Execute() override
{
68 static Result
<RecordBatchIterator
> Make(const FileSource
& source
,
69 const FileFormat
& format
,
70 const ScanOptions
& scan_options
) {
71 ARROW_ASSIGN_OR_RAISE(
73 OpenORCReader(source
, std::make_shared
<ScanOptions
>(scan_options
)));
74 int num_stripes
= reader
->NumberOfStripes();
76 auto materialized_fields
= scan_options
.MaterializedFields();
77 // filter out virtual columns
78 std::vector
<std::string
> included_fields
;
79 ARROW_ASSIGN_OR_RAISE(auto schema
, reader
->ReadSchema());
80 for (auto name
: materialized_fields
) {
82 ARROW_ASSIGN_OR_RAISE(auto match
, ref
.FindOneOrNone(*schema
));
83 if (match
.indices().empty()) continue;
85 included_fields
.push_back(name
);
88 return RecordBatchIterator(
89 Impl
{std::move(reader
), 0, num_stripes
, included_fields
});
92 Result
<std::shared_ptr
<RecordBatch
>> Next() {
93 if (i_
== num_stripes_
) {
96 std::shared_ptr
<RecordBatch
> batch
;
97 // TODO (https://issues.apache.org/jira/browse/ARROW-14153)
98 // pass scan_options_->batch_size
99 return reader_
->ReadStripe(i_
++, included_fields_
);
102 std::unique_ptr
<arrow::adapters::orc::ORCFileReader
> reader_
;
105 std::vector
<std::string
> included_fields_
;
108 return Impl::Make(source_
, *checked_pointer_cast
<FileFragment
>(fragment_
)->format(),
116 class OrcScanTaskIterator
{
118 static Result
<ScanTaskIterator
> Make(std::shared_ptr
<ScanOptions
> options
,
119 std::shared_ptr
<FileFragment
> fragment
) {
120 return ScanTaskIterator(OrcScanTaskIterator(std::move(options
), std::move(fragment
)));
123 Result
<std::shared_ptr
<ScanTask
>> Next() {
125 // Iteration is done.
130 return std::shared_ptr
<ScanTask
>(new OrcScanTask(fragment_
, options_
));
134 OrcScanTaskIterator(std::shared_ptr
<ScanOptions
> options
,
135 std::shared_ptr
<FileFragment
> fragment
)
136 : options_(std::move(options
)), fragment_(std::move(fragment
)) {}
139 std::shared_ptr
<ScanOptions
> options_
;
140 std::shared_ptr
<FileFragment
> fragment_
;
145 Result
<bool> OrcFileFormat::IsSupported(const FileSource
& source
) const {
146 RETURN_NOT_OK(source
.Open().status());
147 return OpenORCReader(source
).ok();
150 Result
<std::shared_ptr
<Schema
>> OrcFileFormat::Inspect(const FileSource
& source
) const {
151 ARROW_ASSIGN_OR_RAISE(auto reader
, OpenORCReader(source
));
152 return reader
->ReadSchema();
155 Result
<ScanTaskIterator
> OrcFileFormat::ScanFile(
156 const std::shared_ptr
<ScanOptions
>& options
,
157 const std::shared_ptr
<FileFragment
>& fragment
) const {
158 return OrcScanTaskIterator::Make(options
, fragment
);
161 Future
<util::optional
<int64_t>> OrcFileFormat::CountRows(
162 const std::shared_ptr
<FileFragment
>& file
, compute::Expression predicate
,
163 const std::shared_ptr
<ScanOptions
>& options
) {
164 if (ExpressionHasFieldRefs(predicate
)) {
165 return Future
<util::optional
<int64_t>>::MakeFinished(util::nullopt
);
167 auto self
= checked_pointer_cast
<OrcFileFormat
>(shared_from_this());
168 return DeferNotOk(options
->io_context
.executor()->Submit(
169 [self
, file
]() -> Result
<util::optional
<int64_t>> {
170 ARROW_ASSIGN_OR_RAISE(auto reader
, OpenORCReader(file
->source()));
171 return reader
->NumberOfRows();
176 // // OrcFileWriter, OrcFileWriteOptions
179 std::shared_ptr
<FileWriteOptions
> OrcFileFormat::DefaultWriteOptions() {
180 // TODO (https://issues.apache.org/jira/browse/ARROW-13796)
184 Result
<std::shared_ptr
<FileWriter
>> OrcFileFormat::MakeWriter(
185 std::shared_ptr
<io::OutputStream
> destination
, std::shared_ptr
<Schema
> schema
,
186 std::shared_ptr
<FileWriteOptions
> options
,
187 fs::FileLocator destination_locator
) const {
188 // TODO (https://issues.apache.org/jira/browse/ARROW-13796)
189 return Status::NotImplemented("ORC writer not yet implemented.");
192 } // namespace dataset