]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/adapters/orc/adapter.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / adapters / orc / adapter.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/adapters/orc/adapter.h"
19
20 #include <algorithm>
21 #include <cstdint>
22 #include <functional>
23 #include <list>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <utility>
28 #include <vector>
29
30 #include "arrow/adapters/orc/adapter_util.h"
31 #include "arrow/buffer.h"
32 #include "arrow/builder.h"
33 #include "arrow/io/interfaces.h"
34 #include "arrow/memory_pool.h"
35 #include "arrow/record_batch.h"
36 #include "arrow/status.h"
37 #include "arrow/table.h"
38 #include "arrow/table_builder.h"
39 #include "arrow/type.h"
40 #include "arrow/type_traits.h"
41 #include "arrow/util/bit_util.h"
42 #include "arrow/util/checked_cast.h"
43 #include "arrow/util/decimal.h"
44 #include "arrow/util/key_value_metadata.h"
45 #include "arrow/util/macros.h"
46 #include "arrow/util/range.h"
47 #include "arrow/util/visibility.h"
48 #include "orc/Exceptions.hh"
49
50 // alias to not interfere with nested orc namespace
51 namespace liborc = orc;
52
53 #define ORC_THROW_NOT_OK(s) \
54 do { \
55 Status _s = (s); \
56 if (!_s.ok()) { \
57 std::stringstream ss; \
58 ss << "Arrow error: " << _s.ToString(); \
59 throw liborc::ParseError(ss.str()); \
60 } \
61 } while (0)
62
63 #define ORC_ASSIGN_OR_THROW_IMPL(status_name, lhs, rexpr) \
64 auto status_name = (rexpr); \
65 ORC_THROW_NOT_OK(status_name.status()); \
66 lhs = std::move(status_name).ValueOrDie();
67
68 #define ORC_ASSIGN_OR_THROW(lhs, rexpr) \
69 ORC_ASSIGN_OR_THROW_IMPL(ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), \
70 lhs, rexpr);
71
72 #define ORC_BEGIN_CATCH_NOT_OK try {
73 #define ORC_END_CATCH_NOT_OK \
74 } \
75 catch (const liborc::ParseError& e) { \
76 return Status::IOError(e.what()); \
77 } \
78 catch (const liborc::InvalidArgument& e) { \
79 return Status::Invalid(e.what()); \
80 } \
81 catch (const liborc::NotImplementedYet& e) { \
82 return Status::NotImplemented(e.what()); \
83 }
84
85 #define ORC_CATCH_NOT_OK(_s) \
86 ORC_BEGIN_CATCH_NOT_OK(_s); \
87 ORC_END_CATCH_NOT_OK
88
89 namespace arrow {
90 namespace adapters {
91 namespace orc {
92
93 namespace {
94
95 // The following are required by ORC to be uint64_t
96 constexpr uint64_t kOrcWriterBatchSize = 128 * 1024;
97 constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024;
98
99 using internal::checked_cast;
100
101 class ArrowInputFile : public liborc::InputStream {
102 public:
103 explicit ArrowInputFile(const std::shared_ptr<io::RandomAccessFile>& file)
104 : file_(file) {}
105
106 uint64_t getLength() const override {
107 ORC_ASSIGN_OR_THROW(int64_t size, file_->GetSize());
108 return static_cast<uint64_t>(size);
109 }
110
111 uint64_t getNaturalReadSize() const override { return 128 * 1024; }
112
113 void read(void* buf, uint64_t length, uint64_t offset) override {
114 ORC_ASSIGN_OR_THROW(int64_t bytes_read, file_->ReadAt(offset, length, buf));
115
116 if (static_cast<uint64_t>(bytes_read) != length) {
117 throw liborc::ParseError("Short read from arrow input file");
118 }
119 }
120
121 const std::string& getName() const override {
122 static const std::string filename("ArrowInputFile");
123 return filename;
124 }
125
126 private:
127 std::shared_ptr<io::RandomAccessFile> file_;
128 };
129
130 struct StripeInformation {
131 uint64_t offset;
132 uint64_t length;
133 uint64_t num_rows;
134 uint64_t first_row_of_stripe;
135 };
136
137 // The number of rows to read in a ColumnVectorBatch
138 constexpr int64_t kReadRowsBatch = 1000;
139
140 class OrcStripeReader : public RecordBatchReader {
141 public:
142 OrcStripeReader(std::unique_ptr<liborc::RowReader> row_reader,
143 std::shared_ptr<Schema> schema, int64_t batch_size, MemoryPool* pool)
144 : row_reader_(std::move(row_reader)),
145 schema_(schema),
146 pool_(pool),
147 batch_size_{batch_size} {}
148
149 std::shared_ptr<Schema> schema() const override { return schema_; }
150
151 Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
152 std::unique_ptr<liborc::ColumnVectorBatch> batch;
153 ORC_CATCH_NOT_OK(batch = row_reader_->createRowBatch(batch_size_));
154
155 const liborc::Type& type = row_reader_->getSelectedType();
156 if (!row_reader_->next(*batch)) {
157 out->reset();
158 return Status::OK();
159 }
160
161 std::unique_ptr<RecordBatchBuilder> builder;
162 RETURN_NOT_OK(RecordBatchBuilder::Make(schema_, pool_, batch->numElements, &builder));
163
164 // The top-level type must be a struct to read into an arrow table
165 const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);
166
167 for (int i = 0; i < builder->num_fields(); i++) {
168 RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
169 batch->numElements, builder->GetField(i)));
170 }
171
172 RETURN_NOT_OK(builder->Flush(out));
173 return Status::OK();
174 }
175
176 private:
177 std::unique_ptr<liborc::RowReader> row_reader_;
178 std::shared_ptr<Schema> schema_;
179 MemoryPool* pool_;
180 int64_t batch_size_;
181 };
182
183 } // namespace
184
185 class ORCFileReader::Impl {
186 public:
187 Impl() {}
188 ~Impl() {}
189
190 Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool) {
191 std::unique_ptr<ArrowInputFile> io_wrapper(new ArrowInputFile(file));
192 liborc::ReaderOptions options;
193 std::unique_ptr<liborc::Reader> liborc_reader;
194 ORC_CATCH_NOT_OK(liborc_reader = createReader(std::move(io_wrapper), options));
195 pool_ = pool;
196 reader_ = std::move(liborc_reader);
197 current_row_ = 0;
198
199 return Init();
200 }
201
202 Status Init() {
203 int64_t nstripes = reader_->getNumberOfStripes();
204 stripes_.resize(nstripes);
205 std::unique_ptr<liborc::StripeInformation> stripe;
206 uint64_t first_row_of_stripe = 0;
207 for (int i = 0; i < nstripes; ++i) {
208 stripe = reader_->getStripe(i);
209 stripes_[i] = StripeInformation({stripe->getOffset(), stripe->getLength(),
210 stripe->getNumberOfRows(), first_row_of_stripe});
211 first_row_of_stripe += stripe->getNumberOfRows();
212 }
213 return Status::OK();
214 }
215
216 int64_t NumberOfStripes() { return stripes_.size(); }
217
218 int64_t NumberOfRows() { return reader_->getNumberOfRows(); }
219
220 Status ReadSchema(std::shared_ptr<Schema>* out) {
221 const liborc::Type& type = reader_->getType();
222 return GetArrowSchema(type, out);
223 }
224
225 Status ReadSchema(const liborc::RowReaderOptions& opts, std::shared_ptr<Schema>* out) {
226 std::unique_ptr<liborc::RowReader> row_reader;
227 ORC_CATCH_NOT_OK(row_reader = reader_->createRowReader(opts));
228 const liborc::Type& type = row_reader->getSelectedType();
229 return GetArrowSchema(type, out);
230 }
231
232 Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() {
233 const std::list<std::string> keys = reader_->getMetadataKeys();
234 auto metadata = std::make_shared<KeyValueMetadata>();
235 for (const auto& key : keys) {
236 metadata->Append(key, reader_->getMetadataValue(key));
237 }
238 return std::const_pointer_cast<const KeyValueMetadata>(metadata);
239 }
240
241 Status GetArrowSchema(const liborc::Type& type, std::shared_ptr<Schema>* out) {
242 if (type.getKind() != liborc::STRUCT) {
243 return Status::NotImplemented(
244 "Only ORC files with a top-level struct "
245 "can be handled");
246 }
247 int size = static_cast<int>(type.getSubtypeCount());
248 std::vector<std::shared_ptr<Field>> fields;
249 for (int child = 0; child < size; ++child) {
250 std::shared_ptr<DataType> elemtype;
251 RETURN_NOT_OK(GetArrowType(type.getSubtype(child), &elemtype));
252 std::string name = type.getFieldName(child);
253 fields.push_back(field(name, elemtype));
254 }
255 ARROW_ASSIGN_OR_RAISE(auto metadata, ReadMetadata());
256 *out = std::make_shared<Schema>(std::move(fields), std::move(metadata));
257 return Status::OK();
258 }
259
260 Status Read(std::shared_ptr<Table>* out) {
261 liborc::RowReaderOptions opts;
262 std::shared_ptr<Schema> schema;
263 RETURN_NOT_OK(ReadSchema(opts, &schema));
264 return ReadTable(opts, schema, out);
265 }
266
267 Status Read(const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out) {
268 liborc::RowReaderOptions opts;
269 return ReadTable(opts, schema, out);
270 }
271
272 Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out) {
273 liborc::RowReaderOptions opts;
274 RETURN_NOT_OK(SelectIndices(&opts, include_indices));
275 std::shared_ptr<Schema> schema;
276 RETURN_NOT_OK(ReadSchema(opts, &schema));
277 return ReadTable(opts, schema, out);
278 }
279
280 Status Read(const std::vector<std::string>& include_names,
281 std::shared_ptr<Table>* out) {
282 liborc::RowReaderOptions opts;
283 RETURN_NOT_OK(SelectNames(&opts, include_names));
284 std::shared_ptr<Schema> schema;
285 RETURN_NOT_OK(ReadSchema(opts, &schema));
286 return ReadTable(opts, schema, out);
287 }
288
289 Status Read(const std::shared_ptr<Schema>& schema,
290 const std::vector<int>& include_indices, std::shared_ptr<Table>* out) {
291 liborc::RowReaderOptions opts;
292 RETURN_NOT_OK(SelectIndices(&opts, include_indices));
293 return ReadTable(opts, schema, out);
294 }
295
296 Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) {
297 liborc::RowReaderOptions opts;
298 RETURN_NOT_OK(SelectStripe(&opts, stripe));
299 std::shared_ptr<Schema> schema;
300 RETURN_NOT_OK(ReadSchema(opts, &schema));
301 return ReadBatch(opts, schema, stripes_[stripe].num_rows, out);
302 }
303
304 Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
305 std::shared_ptr<RecordBatch>* out) {
306 liborc::RowReaderOptions opts;
307 RETURN_NOT_OK(SelectIndices(&opts, include_indices));
308 RETURN_NOT_OK(SelectStripe(&opts, stripe));
309 std::shared_ptr<Schema> schema;
310 RETURN_NOT_OK(ReadSchema(opts, &schema));
311 return ReadBatch(opts, schema, stripes_[stripe].num_rows, out);
312 }
313
314 Status ReadStripe(int64_t stripe, const std::vector<std::string>& include_names,
315 std::shared_ptr<RecordBatch>* out) {
316 liborc::RowReaderOptions opts;
317 RETURN_NOT_OK(SelectNames(&opts, include_names));
318 RETURN_NOT_OK(SelectStripe(&opts, stripe));
319 std::shared_ptr<Schema> schema;
320 RETURN_NOT_OK(ReadSchema(opts, &schema));
321 return ReadBatch(opts, schema, stripes_[stripe].num_rows, out);
322 }
323
324 Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) {
325 ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(),
326 Status::Invalid("Out of bounds stripe: ", stripe));
327
328 opts->range(stripes_[stripe].offset, stripes_[stripe].length);
329 return Status::OK();
330 }
331
332 Status SelectStripeWithRowNumber(liborc::RowReaderOptions* opts, int64_t row_number,
333 StripeInformation* out) {
334 ARROW_RETURN_IF(row_number >= NumberOfRows(),
335 Status::Invalid("Out of bounds row number: ", row_number));
336
337 for (auto it = stripes_.begin(); it != stripes_.end(); it++) {
338 if (static_cast<uint64_t>(row_number) >= it->first_row_of_stripe &&
339 static_cast<uint64_t>(row_number) < it->first_row_of_stripe + it->num_rows) {
340 opts->range(it->offset, it->length);
341 *out = *it;
342 return Status::OK();
343 }
344 }
345
346 return Status::Invalid("Invalid row number", row_number);
347 }
348
349 Status SelectIndices(liborc::RowReaderOptions* opts,
350 const std::vector<int>& include_indices) {
351 std::list<uint64_t> include_indices_list;
352 for (auto it = include_indices.begin(); it != include_indices.end(); ++it) {
353 ARROW_RETURN_IF(*it < 0, Status::Invalid("Negative field index"));
354 include_indices_list.push_back(*it);
355 }
356 opts->includeTypes(include_indices_list);
357 return Status::OK();
358 }
359
360 Status SelectNames(liborc::RowReaderOptions* opts,
361 const std::vector<std::string>& include_names) {
362 std::list<std::string> include_names_list(include_names.begin(), include_names.end());
363 opts->include(include_names_list);
364 return Status::OK();
365 }
366
367 Status ReadTable(const liborc::RowReaderOptions& row_opts,
368 const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out) {
369 liborc::RowReaderOptions opts(row_opts);
370 std::vector<std::shared_ptr<RecordBatch>> batches(stripes_.size());
371 for (size_t stripe = 0; stripe < stripes_.size(); stripe++) {
372 opts.range(stripes_[stripe].offset, stripes_[stripe].length);
373 RETURN_NOT_OK(ReadBatch(opts, schema, stripes_[stripe].num_rows, &batches[stripe]));
374 }
375 return Table::FromRecordBatches(schema, std::move(batches)).Value(out);
376 }
377
378 Status ReadBatch(const liborc::RowReaderOptions& opts,
379 const std::shared_ptr<Schema>& schema, int64_t nrows,
380 std::shared_ptr<RecordBatch>* out) {
381 std::unique_ptr<liborc::RowReader> row_reader;
382 std::unique_ptr<liborc::ColumnVectorBatch> batch;
383
384 ORC_BEGIN_CATCH_NOT_OK
385 row_reader = reader_->createRowReader(opts);
386 batch = row_reader->createRowBatch(std::min(nrows, kReadRowsBatch));
387 ORC_END_CATCH_NOT_OK
388
389 std::unique_ptr<RecordBatchBuilder> builder;
390 RETURN_NOT_OK(RecordBatchBuilder::Make(schema, pool_, nrows, &builder));
391
392 // The top-level type must be a struct to read into an arrow table
393 const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);
394
395 const liborc::Type& type = row_reader->getSelectedType();
396 while (row_reader->next(*batch)) {
397 for (int i = 0; i < builder->num_fields(); i++) {
398 RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
399 batch->numElements, builder->GetField(i)));
400 }
401 }
402 RETURN_NOT_OK(builder->Flush(out));
403 return Status::OK();
404 }
405
406 Status Seek(int64_t row_number) {
407 ARROW_RETURN_IF(row_number >= NumberOfRows(),
408 Status::Invalid("Out of bounds row number: ", row_number));
409
410 current_row_ = row_number;
411 return Status::OK();
412 }
413
414 Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
415 std::shared_ptr<RecordBatchReader>* out) {
416 if (current_row_ >= NumberOfRows()) {
417 out->reset();
418 return Status::OK();
419 }
420
421 liborc::RowReaderOptions opts;
422 if (!include_indices.empty()) {
423 RETURN_NOT_OK(SelectIndices(&opts, include_indices));
424 }
425 StripeInformation stripe_info({0, 0, 0, 0});
426 RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
427 std::shared_ptr<Schema> schema;
428 RETURN_NOT_OK(ReadSchema(opts, &schema));
429 std::unique_ptr<liborc::RowReader> row_reader;
430
431 ORC_BEGIN_CATCH_NOT_OK
432 row_reader = reader_->createRowReader(opts);
433 row_reader->seekToRow(current_row_);
434 current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows;
435 ORC_END_CATCH_NOT_OK
436
437 *out = std::shared_ptr<RecordBatchReader>(
438 new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_));
439 return Status::OK();
440 }
441
442 Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out) {
443 return NextStripeReader(batch_size, {}, out);
444 }
445
446 private:
447 MemoryPool* pool_;
448 std::unique_ptr<liborc::Reader> reader_;
449 std::vector<StripeInformation> stripes_;
450 int64_t current_row_;
451 };
452
453 ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); }
454
455 ORCFileReader::~ORCFileReader() {}
456
457 Status ORCFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
458 MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader) {
459 return Open(file, pool).Value(reader);
460 }
461
462 Result<std::unique_ptr<ORCFileReader>> ORCFileReader::Open(
463 const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool) {
464 auto result = std::unique_ptr<ORCFileReader>(new ORCFileReader());
465 RETURN_NOT_OK(result->impl_->Open(file, pool));
466 return std::move(result);
467 }
468
469 Result<std::shared_ptr<const KeyValueMetadata>> ORCFileReader::ReadMetadata() {
470 return impl_->ReadMetadata();
471 }
472
473 Status ORCFileReader::ReadSchema(std::shared_ptr<Schema>* out) {
474 return impl_->ReadSchema(out);
475 }
476
477 Result<std::shared_ptr<Schema>> ORCFileReader::ReadSchema() {
478 std::shared_ptr<Schema> schema;
479 RETURN_NOT_OK(impl_->ReadSchema(&schema));
480 return schema;
481 }
482
483 Status ORCFileReader::Read(std::shared_ptr<Table>* out) { return impl_->Read(out); }
484
485 Result<std::shared_ptr<Table>> ORCFileReader::Read() {
486 std::shared_ptr<Table> table;
487 RETURN_NOT_OK(impl_->Read(&table));
488 return table;
489 }
490
491 Status ORCFileReader::Read(const std::shared_ptr<Schema>& schema,
492 std::shared_ptr<Table>* out) {
493 return impl_->Read(schema, out);
494 }
495
496 Result<std::shared_ptr<Table>> ORCFileReader::Read(
497 const std::shared_ptr<Schema>& schema) {
498 std::shared_ptr<Table> table;
499 RETURN_NOT_OK(impl_->Read(schema, &table));
500 return table;
501 }
502
503 Status ORCFileReader::Read(const std::vector<int>& include_indices,
504 std::shared_ptr<Table>* out) {
505 return impl_->Read(include_indices, out);
506 }
507
508 Result<std::shared_ptr<Table>> ORCFileReader::Read(
509 const std::vector<int>& include_indices) {
510 std::shared_ptr<Table> table;
511 RETURN_NOT_OK(impl_->Read(include_indices, &table));
512 return table;
513 }
514
515 Result<std::shared_ptr<Table>> ORCFileReader::Read(
516 const std::vector<std::string>& include_names) {
517 std::shared_ptr<Table> table;
518 RETURN_NOT_OK(impl_->Read(include_names, &table));
519 return table;
520 }
521
522 Status ORCFileReader::Read(const std::shared_ptr<Schema>& schema,
523 const std::vector<int>& include_indices,
524 std::shared_ptr<Table>* out) {
525 return impl_->Read(schema, include_indices, out);
526 }
527
528 Result<std::shared_ptr<Table>> ORCFileReader::Read(
529 const std::shared_ptr<Schema>& schema, const std::vector<int>& include_indices) {
530 std::shared_ptr<Table> table;
531 RETURN_NOT_OK(impl_->Read(schema, include_indices, &table));
532 return table;
533 }
534
535 Status ORCFileReader::ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) {
536 return impl_->ReadStripe(stripe, out);
537 }
538
539 Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(int64_t stripe) {
540 std::shared_ptr<RecordBatch> recordBatch;
541 RETURN_NOT_OK(impl_->ReadStripe(stripe, &recordBatch));
542 return recordBatch;
543 }
544
545 Status ORCFileReader::ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
546 std::shared_ptr<RecordBatch>* out) {
547 return impl_->ReadStripe(stripe, include_indices, out);
548 }
549
550 Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(
551 int64_t stripe, const std::vector<int>& include_indices) {
552 std::shared_ptr<RecordBatch> recordBatch;
553 RETURN_NOT_OK(impl_->ReadStripe(stripe, include_indices, &recordBatch));
554 return recordBatch;
555 }
556
557 Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(
558 int64_t stripe, const std::vector<std::string>& include_names) {
559 std::shared_ptr<RecordBatch> recordBatch;
560 RETURN_NOT_OK(impl_->ReadStripe(stripe, include_names, &recordBatch));
561 return recordBatch;
562 }
563
564 Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); }
565
566 Status ORCFileReader::NextStripeReader(int64_t batch_sizes,
567 std::shared_ptr<RecordBatchReader>* out) {
568 return impl_->NextStripeReader(batch_sizes, out);
569 }
570
571 Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
572 int64_t batch_size) {
573 std::shared_ptr<RecordBatchReader> reader;
574 RETURN_NOT_OK(impl_->NextStripeReader(batch_size, &reader));
575 return reader;
576 }
577
578 Status ORCFileReader::NextStripeReader(int64_t batch_size,
579 const std::vector<int>& include_indices,
580 std::shared_ptr<RecordBatchReader>* out) {
581 return impl_->NextStripeReader(batch_size, include_indices, out);
582 }
583
584 Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
585 int64_t batch_size, const std::vector<int>& include_indices) {
586 std::shared_ptr<RecordBatchReader> reader;
587 RETURN_NOT_OK(impl_->NextStripeReader(batch_size, include_indices, &reader));
588 return reader;
589 }
590
591 int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
592
593 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
594
595 namespace {
596
597 class ArrowOutputStream : public liborc::OutputStream {
598 public:
599 explicit ArrowOutputStream(arrow::io::OutputStream& output_stream)
600 : output_stream_(output_stream), length_(0) {}
601
602 uint64_t getLength() const override { return length_; }
603
604 uint64_t getNaturalWriteSize() const override { return kOrcNaturalWriteSize; }
605
606 void write(const void* buf, size_t length) override {
607 ORC_THROW_NOT_OK(output_stream_.Write(buf, static_cast<int64_t>(length)));
608 length_ += static_cast<int64_t>(length);
609 }
610
611 // Mandatory due to us implementing an ORC virtual class.
612 // Used by ORC for error messages, not used by Arrow
613 const std::string& getName() const override {
614 static const std::string filename("ArrowOutputFile");
615 return filename;
616 }
617
618 void close() override {
619 if (!output_stream_.closed()) {
620 ORC_THROW_NOT_OK(output_stream_.Close());
621 }
622 }
623
624 void set_length(int64_t length) { length_ = length; }
625
626 private:
627 arrow::io::OutputStream& output_stream_;
628 int64_t length_;
629 };
630
631 } // namespace
632
633 class ORCFileWriter::Impl {
634 public:
635 Status Open(arrow::io::OutputStream* output_stream) {
636 out_stream_ = std::unique_ptr<liborc::OutputStream>(
637 checked_cast<liborc::OutputStream*>(new ArrowOutputStream(*output_stream)));
638 return Status::OK();
639 }
640
641 Status Write(const Table& table) {
642 std::unique_ptr<liborc::WriterOptions> orc_options =
643 std::unique_ptr<liborc::WriterOptions>(new liborc::WriterOptions());
644 ARROW_ASSIGN_OR_RAISE(auto orc_schema, GetOrcType(*(table.schema())));
645 ORC_CATCH_NOT_OK(
646 writer_ = liborc::createWriter(*orc_schema, out_stream_.get(), *orc_options))
647
648 int64_t num_rows = table.num_rows();
649 const int num_cols_ = table.num_columns();
650 std::vector<int64_t> arrow_index_offset(num_cols_, 0);
651 std::vector<int> arrow_chunk_offset(num_cols_, 0);
652 std::unique_ptr<liborc::ColumnVectorBatch> batch =
653 writer_->createRowBatch(kOrcWriterBatchSize);
654 liborc::StructVectorBatch* root =
655 internal::checked_cast<liborc::StructVectorBatch*>(batch.get());
656 while (num_rows > 0) {
657 for (int i = 0; i < num_cols_; i++) {
658 RETURN_NOT_OK(adapters::orc::WriteBatch(
659 *(table.column(i)), kOrcWriterBatchSize, &(arrow_chunk_offset[i]),
660 &(arrow_index_offset[i]), (root->fields)[i]));
661 }
662 root->numElements = (root->fields)[0]->numElements;
663 writer_->add(*batch);
664 batch->clear();
665 num_rows -= kOrcWriterBatchSize;
666 }
667 return Status::OK();
668 }
669
670 Status Close() {
671 writer_->close();
672 return Status::OK();
673 }
674
675 private:
676 std::unique_ptr<liborc::Writer> writer_;
677 std::unique_ptr<liborc::OutputStream> out_stream_;
678 };
679
680 ORCFileWriter::~ORCFileWriter() {}
681
682 ORCFileWriter::ORCFileWriter() { impl_.reset(new ORCFileWriter::Impl()); }
683
684 Result<std::unique_ptr<ORCFileWriter>> ORCFileWriter::Open(
685 io::OutputStream* output_stream) {
686 std::unique_ptr<ORCFileWriter> result =
687 std::unique_ptr<ORCFileWriter>(new ORCFileWriter());
688 Status status = result->impl_->Open(output_stream);
689 RETURN_NOT_OK(status);
690 return std::move(result);
691 }
692
693 Status ORCFileWriter::Write(const Table& table) { return impl_->Write(table); }
694
695 Status ORCFileWriter::Close() { return impl_->Close(); }
696
697 } // namespace orc
698 } // namespace adapters
699 } // namespace arrow