]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/adapters/orc/adapter.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/adapters/orc/adapter.h"
30 #include "arrow/adapters/orc/adapter_util.h"
31 #include "arrow/buffer.h"
32 #include "arrow/builder.h"
33 #include "arrow/io/interfaces.h"
34 #include "arrow/memory_pool.h"
35 #include "arrow/record_batch.h"
36 #include "arrow/status.h"
37 #include "arrow/table.h"
38 #include "arrow/table_builder.h"
39 #include "arrow/type.h"
40 #include "arrow/type_traits.h"
41 #include "arrow/util/bit_util.h"
42 #include "arrow/util/checked_cast.h"
43 #include "arrow/util/decimal.h"
44 #include "arrow/util/key_value_metadata.h"
45 #include "arrow/util/macros.h"
46 #include "arrow/util/range.h"
47 #include "arrow/util/visibility.h"
48 #include "orc/Exceptions.hh"
50 // alias to not interfere with nested orc namespace
51 namespace liborc
= orc
;
53 #define ORC_THROW_NOT_OK(s) \
57 std::stringstream ss; \
58 ss << "Arrow error: " << _s.ToString(); \
59 throw liborc::ParseError(ss.str()); \
63 #define ORC_ASSIGN_OR_THROW_IMPL(status_name, lhs, rexpr) \
64 auto status_name = (rexpr); \
65 ORC_THROW_NOT_OK(status_name.status()); \
66 lhs = std::move(status_name).ValueOrDie();
68 #define ORC_ASSIGN_OR_THROW(lhs, rexpr) \
69 ORC_ASSIGN_OR_THROW_IMPL(ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), \
72 #define ORC_BEGIN_CATCH_NOT_OK try {
73 #define ORC_END_CATCH_NOT_OK \
75 catch (const liborc::ParseError& e) { \
76 return Status::IOError(e.what()); \
78 catch (const liborc::InvalidArgument& e) { \
79 return Status::Invalid(e.what()); \
81 catch (const liborc::NotImplementedYet& e) { \
82 return Status::NotImplemented(e.what()); \
85 #define ORC_CATCH_NOT_OK(_s) \
86 ORC_BEGIN_CATCH_NOT_OK(_s); \
95 // The following are required by ORC to be uint64_t
96 constexpr uint64_t kOrcWriterBatchSize
= 128 * 1024;
97 constexpr uint64_t kOrcNaturalWriteSize
= 128 * 1024;
99 using internal::checked_cast
;
101 class ArrowInputFile
: public liborc::InputStream
{
103 explicit ArrowInputFile(const std::shared_ptr
<io::RandomAccessFile
>& file
)
106 uint64_t getLength() const override
{
107 ORC_ASSIGN_OR_THROW(int64_t size
, file_
->GetSize());
108 return static_cast<uint64_t>(size
);
111 uint64_t getNaturalReadSize() const override
{ return 128 * 1024; }
113 void read(void* buf
, uint64_t length
, uint64_t offset
) override
{
114 ORC_ASSIGN_OR_THROW(int64_t bytes_read
, file_
->ReadAt(offset
, length
, buf
));
116 if (static_cast<uint64_t>(bytes_read
) != length
) {
117 throw liborc::ParseError("Short read from arrow input file");
121 const std::string
& getName() const override
{
122 static const std::string
filename("ArrowInputFile");
127 std::shared_ptr
<io::RandomAccessFile
> file_
;
130 struct StripeInformation
{
134 uint64_t first_row_of_stripe
;
137 // The number of rows to read in a ColumnVectorBatch
138 constexpr int64_t kReadRowsBatch
= 1000;
140 class OrcStripeReader
: public RecordBatchReader
{
142 OrcStripeReader(std::unique_ptr
<liborc::RowReader
> row_reader
,
143 std::shared_ptr
<Schema
> schema
, int64_t batch_size
, MemoryPool
* pool
)
144 : row_reader_(std::move(row_reader
)),
147 batch_size_
{batch_size
} {}
149 std::shared_ptr
<Schema
> schema() const override
{ return schema_
; }
151 Status
ReadNext(std::shared_ptr
<RecordBatch
>* out
) override
{
152 std::unique_ptr
<liborc::ColumnVectorBatch
> batch
;
153 ORC_CATCH_NOT_OK(batch
= row_reader_
->createRowBatch(batch_size_
));
155 const liborc::Type
& type
= row_reader_
->getSelectedType();
156 if (!row_reader_
->next(*batch
)) {
161 std::unique_ptr
<RecordBatchBuilder
> builder
;
162 RETURN_NOT_OK(RecordBatchBuilder::Make(schema_
, pool_
, batch
->numElements
, &builder
));
164 // The top-level type must be a struct to read into an arrow table
165 const auto& struct_batch
= checked_cast
<liborc::StructVectorBatch
&>(*batch
);
167 for (int i
= 0; i
< builder
->num_fields(); i
++) {
168 RETURN_NOT_OK(AppendBatch(type
.getSubtype(i
), struct_batch
.fields
[i
], 0,
169 batch
->numElements
, builder
->GetField(i
)));
172 RETURN_NOT_OK(builder
->Flush(out
));
177 std::unique_ptr
<liborc::RowReader
> row_reader_
;
178 std::shared_ptr
<Schema
> schema_
;
185 class ORCFileReader::Impl
{
190 Status
Open(const std::shared_ptr
<io::RandomAccessFile
>& file
, MemoryPool
* pool
) {
191 std::unique_ptr
<ArrowInputFile
> io_wrapper(new ArrowInputFile(file
));
192 liborc::ReaderOptions options
;
193 std::unique_ptr
<liborc::Reader
> liborc_reader
;
194 ORC_CATCH_NOT_OK(liborc_reader
= createReader(std::move(io_wrapper
), options
));
196 reader_
= std::move(liborc_reader
);
203 int64_t nstripes
= reader_
->getNumberOfStripes();
204 stripes_
.resize(nstripes
);
205 std::unique_ptr
<liborc::StripeInformation
> stripe
;
206 uint64_t first_row_of_stripe
= 0;
207 for (int i
= 0; i
< nstripes
; ++i
) {
208 stripe
= reader_
->getStripe(i
);
209 stripes_
[i
] = StripeInformation({stripe
->getOffset(), stripe
->getLength(),
210 stripe
->getNumberOfRows(), first_row_of_stripe
});
211 first_row_of_stripe
+= stripe
->getNumberOfRows();
216 int64_t NumberOfStripes() { return stripes_
.size(); }
218 int64_t NumberOfRows() { return reader_
->getNumberOfRows(); }
220 Status
ReadSchema(std::shared_ptr
<Schema
>* out
) {
221 const liborc::Type
& type
= reader_
->getType();
222 return GetArrowSchema(type
, out
);
225 Status
ReadSchema(const liborc::RowReaderOptions
& opts
, std::shared_ptr
<Schema
>* out
) {
226 std::unique_ptr
<liborc::RowReader
> row_reader
;
227 ORC_CATCH_NOT_OK(row_reader
= reader_
->createRowReader(opts
));
228 const liborc::Type
& type
= row_reader
->getSelectedType();
229 return GetArrowSchema(type
, out
);
232 Result
<std::shared_ptr
<const KeyValueMetadata
>> ReadMetadata() {
233 const std::list
<std::string
> keys
= reader_
->getMetadataKeys();
234 auto metadata
= std::make_shared
<KeyValueMetadata
>();
235 for (const auto& key
: keys
) {
236 metadata
->Append(key
, reader_
->getMetadataValue(key
));
238 return std::const_pointer_cast
<const KeyValueMetadata
>(metadata
);
241 Status
GetArrowSchema(const liborc::Type
& type
, std::shared_ptr
<Schema
>* out
) {
242 if (type
.getKind() != liborc::STRUCT
) {
243 return Status::NotImplemented(
244 "Only ORC files with a top-level struct "
247 int size
= static_cast<int>(type
.getSubtypeCount());
248 std::vector
<std::shared_ptr
<Field
>> fields
;
249 for (int child
= 0; child
< size
; ++child
) {
250 std::shared_ptr
<DataType
> elemtype
;
251 RETURN_NOT_OK(GetArrowType(type
.getSubtype(child
), &elemtype
));
252 std::string name
= type
.getFieldName(child
);
253 fields
.push_back(field(name
, elemtype
));
255 ARROW_ASSIGN_OR_RAISE(auto metadata
, ReadMetadata());
256 *out
= std::make_shared
<Schema
>(std::move(fields
), std::move(metadata
));
260 Status
Read(std::shared_ptr
<Table
>* out
) {
261 liborc::RowReaderOptions opts
;
262 std::shared_ptr
<Schema
> schema
;
263 RETURN_NOT_OK(ReadSchema(opts
, &schema
));
264 return ReadTable(opts
, schema
, out
);
267 Status
Read(const std::shared_ptr
<Schema
>& schema
, std::shared_ptr
<Table
>* out
) {
268 liborc::RowReaderOptions opts
;
269 return ReadTable(opts
, schema
, out
);
272 Status
Read(const std::vector
<int>& include_indices
, std::shared_ptr
<Table
>* out
) {
273 liborc::RowReaderOptions opts
;
274 RETURN_NOT_OK(SelectIndices(&opts
, include_indices
));
275 std::shared_ptr
<Schema
> schema
;
276 RETURN_NOT_OK(ReadSchema(opts
, &schema
));
277 return ReadTable(opts
, schema
, out
);
280 Status
Read(const std::vector
<std::string
>& include_names
,
281 std::shared_ptr
<Table
>* out
) {
282 liborc::RowReaderOptions opts
;
283 RETURN_NOT_OK(SelectNames(&opts
, include_names
));
284 std::shared_ptr
<Schema
> schema
;
285 RETURN_NOT_OK(ReadSchema(opts
, &schema
));
286 return ReadTable(opts
, schema
, out
);
289 Status
Read(const std::shared_ptr
<Schema
>& schema
,
290 const std::vector
<int>& include_indices
, std::shared_ptr
<Table
>* out
) {
291 liborc::RowReaderOptions opts
;
292 RETURN_NOT_OK(SelectIndices(&opts
, include_indices
));
293 return ReadTable(opts
, schema
, out
);
296 Status
ReadStripe(int64_t stripe
, std::shared_ptr
<RecordBatch
>* out
) {
297 liborc::RowReaderOptions opts
;
298 RETURN_NOT_OK(SelectStripe(&opts
, stripe
));
299 std::shared_ptr
<Schema
> schema
;
300 RETURN_NOT_OK(ReadSchema(opts
, &schema
));
301 return ReadBatch(opts
, schema
, stripes_
[stripe
].num_rows
, out
);
304 Status
ReadStripe(int64_t stripe
, const std::vector
<int>& include_indices
,
305 std::shared_ptr
<RecordBatch
>* out
) {
306 liborc::RowReaderOptions opts
;
307 RETURN_NOT_OK(SelectIndices(&opts
, include_indices
));
308 RETURN_NOT_OK(SelectStripe(&opts
, stripe
));
309 std::shared_ptr
<Schema
> schema
;
310 RETURN_NOT_OK(ReadSchema(opts
, &schema
));
311 return ReadBatch(opts
, schema
, stripes_
[stripe
].num_rows
, out
);
314 Status
ReadStripe(int64_t stripe
, const std::vector
<std::string
>& include_names
,
315 std::shared_ptr
<RecordBatch
>* out
) {
316 liborc::RowReaderOptions opts
;
317 RETURN_NOT_OK(SelectNames(&opts
, include_names
));
318 RETURN_NOT_OK(SelectStripe(&opts
, stripe
));
319 std::shared_ptr
<Schema
> schema
;
320 RETURN_NOT_OK(ReadSchema(opts
, &schema
));
321 return ReadBatch(opts
, schema
, stripes_
[stripe
].num_rows
, out
);
324 Status
SelectStripe(liborc::RowReaderOptions
* opts
, int64_t stripe
) {
325 ARROW_RETURN_IF(stripe
< 0 || stripe
>= NumberOfStripes(),
326 Status::Invalid("Out of bounds stripe: ", stripe
));
328 opts
->range(stripes_
[stripe
].offset
, stripes_
[stripe
].length
);
332 Status
SelectStripeWithRowNumber(liborc::RowReaderOptions
* opts
, int64_t row_number
,
333 StripeInformation
* out
) {
334 ARROW_RETURN_IF(row_number
>= NumberOfRows(),
335 Status::Invalid("Out of bounds row number: ", row_number
));
337 for (auto it
= stripes_
.begin(); it
!= stripes_
.end(); it
++) {
338 if (static_cast<uint64_t>(row_number
) >= it
->first_row_of_stripe
&&
339 static_cast<uint64_t>(row_number
) < it
->first_row_of_stripe
+ it
->num_rows
) {
340 opts
->range(it
->offset
, it
->length
);
346 return Status::Invalid("Invalid row number", row_number
);
349 Status
SelectIndices(liborc::RowReaderOptions
* opts
,
350 const std::vector
<int>& include_indices
) {
351 std::list
<uint64_t> include_indices_list
;
352 for (auto it
= include_indices
.begin(); it
!= include_indices
.end(); ++it
) {
353 ARROW_RETURN_IF(*it
< 0, Status::Invalid("Negative field index"));
354 include_indices_list
.push_back(*it
);
356 opts
->includeTypes(include_indices_list
);
360 Status
SelectNames(liborc::RowReaderOptions
* opts
,
361 const std::vector
<std::string
>& include_names
) {
362 std::list
<std::string
> include_names_list(include_names
.begin(), include_names
.end());
363 opts
->include(include_names_list
);
367 Status
ReadTable(const liborc::RowReaderOptions
& row_opts
,
368 const std::shared_ptr
<Schema
>& schema
, std::shared_ptr
<Table
>* out
) {
369 liborc::RowReaderOptions
opts(row_opts
);
370 std::vector
<std::shared_ptr
<RecordBatch
>> batches(stripes_
.size());
371 for (size_t stripe
= 0; stripe
< stripes_
.size(); stripe
++) {
372 opts
.range(stripes_
[stripe
].offset
, stripes_
[stripe
].length
);
373 RETURN_NOT_OK(ReadBatch(opts
, schema
, stripes_
[stripe
].num_rows
, &batches
[stripe
]));
375 return Table::FromRecordBatches(schema
, std::move(batches
)).Value(out
);
378 Status
ReadBatch(const liborc::RowReaderOptions
& opts
,
379 const std::shared_ptr
<Schema
>& schema
, int64_t nrows
,
380 std::shared_ptr
<RecordBatch
>* out
) {
381 std::unique_ptr
<liborc::RowReader
> row_reader
;
382 std::unique_ptr
<liborc::ColumnVectorBatch
> batch
;
384 ORC_BEGIN_CATCH_NOT_OK
385 row_reader
= reader_
->createRowReader(opts
);
386 batch
= row_reader
->createRowBatch(std::min(nrows
, kReadRowsBatch
));
389 std::unique_ptr
<RecordBatchBuilder
> builder
;
390 RETURN_NOT_OK(RecordBatchBuilder::Make(schema
, pool_
, nrows
, &builder
));
392 // The top-level type must be a struct to read into an arrow table
393 const auto& struct_batch
= checked_cast
<liborc::StructVectorBatch
&>(*batch
);
395 const liborc::Type
& type
= row_reader
->getSelectedType();
396 while (row_reader
->next(*batch
)) {
397 for (int i
= 0; i
< builder
->num_fields(); i
++) {
398 RETURN_NOT_OK(AppendBatch(type
.getSubtype(i
), struct_batch
.fields
[i
], 0,
399 batch
->numElements
, builder
->GetField(i
)));
402 RETURN_NOT_OK(builder
->Flush(out
));
406 Status
Seek(int64_t row_number
) {
407 ARROW_RETURN_IF(row_number
>= NumberOfRows(),
408 Status::Invalid("Out of bounds row number: ", row_number
));
410 current_row_
= row_number
;
414 Status
NextStripeReader(int64_t batch_size
, const std::vector
<int>& include_indices
,
415 std::shared_ptr
<RecordBatchReader
>* out
) {
416 if (current_row_
>= NumberOfRows()) {
421 liborc::RowReaderOptions opts
;
422 if (!include_indices
.empty()) {
423 RETURN_NOT_OK(SelectIndices(&opts
, include_indices
));
425 StripeInformation
stripe_info({0, 0, 0, 0});
426 RETURN_NOT_OK(SelectStripeWithRowNumber(&opts
, current_row_
, &stripe_info
));
427 std::shared_ptr
<Schema
> schema
;
428 RETURN_NOT_OK(ReadSchema(opts
, &schema
));
429 std::unique_ptr
<liborc::RowReader
> row_reader
;
431 ORC_BEGIN_CATCH_NOT_OK
432 row_reader
= reader_
->createRowReader(opts
);
433 row_reader
->seekToRow(current_row_
);
434 current_row_
= stripe_info
.first_row_of_stripe
+ stripe_info
.num_rows
;
437 *out
= std::shared_ptr
<RecordBatchReader
>(
438 new OrcStripeReader(std::move(row_reader
), schema
, batch_size
, pool_
));
442 Status
NextStripeReader(int64_t batch_size
, std::shared_ptr
<RecordBatchReader
>* out
) {
443 return NextStripeReader(batch_size
, {}, out
);
448 std::unique_ptr
<liborc::Reader
> reader_
;
449 std::vector
<StripeInformation
> stripes_
;
450 int64_t current_row_
;
453 ORCFileReader::ORCFileReader() { impl_
.reset(new ORCFileReader::Impl()); }
455 ORCFileReader::~ORCFileReader() {}
457 Status
ORCFileReader::Open(const std::shared_ptr
<io::RandomAccessFile
>& file
,
458 MemoryPool
* pool
, std::unique_ptr
<ORCFileReader
>* reader
) {
459 return Open(file
, pool
).Value(reader
);
462 Result
<std::unique_ptr
<ORCFileReader
>> ORCFileReader::Open(
463 const std::shared_ptr
<io::RandomAccessFile
>& file
, MemoryPool
* pool
) {
464 auto result
= std::unique_ptr
<ORCFileReader
>(new ORCFileReader());
465 RETURN_NOT_OK(result
->impl_
->Open(file
, pool
));
466 return std::move(result
);
469 Result
<std::shared_ptr
<const KeyValueMetadata
>> ORCFileReader::ReadMetadata() {
470 return impl_
->ReadMetadata();
473 Status
ORCFileReader::ReadSchema(std::shared_ptr
<Schema
>* out
) {
474 return impl_
->ReadSchema(out
);
477 Result
<std::shared_ptr
<Schema
>> ORCFileReader::ReadSchema() {
478 std::shared_ptr
<Schema
> schema
;
479 RETURN_NOT_OK(impl_
->ReadSchema(&schema
));
483 Status
ORCFileReader::Read(std::shared_ptr
<Table
>* out
) { return impl_
->Read(out
); }
485 Result
<std::shared_ptr
<Table
>> ORCFileReader::Read() {
486 std::shared_ptr
<Table
> table
;
487 RETURN_NOT_OK(impl_
->Read(&table
));
491 Status
ORCFileReader::Read(const std::shared_ptr
<Schema
>& schema
,
492 std::shared_ptr
<Table
>* out
) {
493 return impl_
->Read(schema
, out
);
496 Result
<std::shared_ptr
<Table
>> ORCFileReader::Read(
497 const std::shared_ptr
<Schema
>& schema
) {
498 std::shared_ptr
<Table
> table
;
499 RETURN_NOT_OK(impl_
->Read(schema
, &table
));
503 Status
ORCFileReader::Read(const std::vector
<int>& include_indices
,
504 std::shared_ptr
<Table
>* out
) {
505 return impl_
->Read(include_indices
, out
);
508 Result
<std::shared_ptr
<Table
>> ORCFileReader::Read(
509 const std::vector
<int>& include_indices
) {
510 std::shared_ptr
<Table
> table
;
511 RETURN_NOT_OK(impl_
->Read(include_indices
, &table
));
515 Result
<std::shared_ptr
<Table
>> ORCFileReader::Read(
516 const std::vector
<std::string
>& include_names
) {
517 std::shared_ptr
<Table
> table
;
518 RETURN_NOT_OK(impl_
->Read(include_names
, &table
));
522 Status
ORCFileReader::Read(const std::shared_ptr
<Schema
>& schema
,
523 const std::vector
<int>& include_indices
,
524 std::shared_ptr
<Table
>* out
) {
525 return impl_
->Read(schema
, include_indices
, out
);
528 Result
<std::shared_ptr
<Table
>> ORCFileReader::Read(
529 const std::shared_ptr
<Schema
>& schema
, const std::vector
<int>& include_indices
) {
530 std::shared_ptr
<Table
> table
;
531 RETURN_NOT_OK(impl_
->Read(schema
, include_indices
, &table
));
535 Status
ORCFileReader::ReadStripe(int64_t stripe
, std::shared_ptr
<RecordBatch
>* out
) {
536 return impl_
->ReadStripe(stripe
, out
);
539 Result
<std::shared_ptr
<RecordBatch
>> ORCFileReader::ReadStripe(int64_t stripe
) {
540 std::shared_ptr
<RecordBatch
> recordBatch
;
541 RETURN_NOT_OK(impl_
->ReadStripe(stripe
, &recordBatch
));
545 Status
ORCFileReader::ReadStripe(int64_t stripe
, const std::vector
<int>& include_indices
,
546 std::shared_ptr
<RecordBatch
>* out
) {
547 return impl_
->ReadStripe(stripe
, include_indices
, out
);
550 Result
<std::shared_ptr
<RecordBatch
>> ORCFileReader::ReadStripe(
551 int64_t stripe
, const std::vector
<int>& include_indices
) {
552 std::shared_ptr
<RecordBatch
> recordBatch
;
553 RETURN_NOT_OK(impl_
->ReadStripe(stripe
, include_indices
, &recordBatch
));
557 Result
<std::shared_ptr
<RecordBatch
>> ORCFileReader::ReadStripe(
558 int64_t stripe
, const std::vector
<std::string
>& include_names
) {
559 std::shared_ptr
<RecordBatch
> recordBatch
;
560 RETURN_NOT_OK(impl_
->ReadStripe(stripe
, include_names
, &recordBatch
));
564 Status
ORCFileReader::Seek(int64_t row_number
) { return impl_
->Seek(row_number
); }
566 Status
ORCFileReader::NextStripeReader(int64_t batch_sizes
,
567 std::shared_ptr
<RecordBatchReader
>* out
) {
568 return impl_
->NextStripeReader(batch_sizes
, out
);
571 Result
<std::shared_ptr
<RecordBatchReader
>> ORCFileReader::NextStripeReader(
572 int64_t batch_size
) {
573 std::shared_ptr
<RecordBatchReader
> reader
;
574 RETURN_NOT_OK(impl_
->NextStripeReader(batch_size
, &reader
));
578 Status
ORCFileReader::NextStripeReader(int64_t batch_size
,
579 const std::vector
<int>& include_indices
,
580 std::shared_ptr
<RecordBatchReader
>* out
) {
581 return impl_
->NextStripeReader(batch_size
, include_indices
, out
);
584 Result
<std::shared_ptr
<RecordBatchReader
>> ORCFileReader::NextStripeReader(
585 int64_t batch_size
, const std::vector
<int>& include_indices
) {
586 std::shared_ptr
<RecordBatchReader
> reader
;
587 RETURN_NOT_OK(impl_
->NextStripeReader(batch_size
, include_indices
, &reader
));
591 int64_t ORCFileReader::NumberOfStripes() { return impl_
->NumberOfStripes(); }
593 int64_t ORCFileReader::NumberOfRows() { return impl_
->NumberOfRows(); }
597 class ArrowOutputStream
: public liborc::OutputStream
{
599 explicit ArrowOutputStream(arrow::io::OutputStream
& output_stream
)
600 : output_stream_(output_stream
), length_(0) {}
602 uint64_t getLength() const override
{ return length_
; }
604 uint64_t getNaturalWriteSize() const override
{ return kOrcNaturalWriteSize
; }
606 void write(const void* buf
, size_t length
) override
{
607 ORC_THROW_NOT_OK(output_stream_
.Write(buf
, static_cast<int64_t>(length
)));
608 length_
+= static_cast<int64_t>(length
);
611 // Mandatory due to us implementing an ORC virtual class.
612 // Used by ORC for error messages, not used by Arrow
613 const std::string
& getName() const override
{
614 static const std::string
filename("ArrowOutputFile");
618 void close() override
{
619 if (!output_stream_
.closed()) {
620 ORC_THROW_NOT_OK(output_stream_
.Close());
624 void set_length(int64_t length
) { length_
= length
; }
627 arrow::io::OutputStream
& output_stream_
;
633 class ORCFileWriter::Impl
{
635 Status
Open(arrow::io::OutputStream
* output_stream
) {
636 out_stream_
= std::unique_ptr
<liborc::OutputStream
>(
637 checked_cast
<liborc::OutputStream
*>(new ArrowOutputStream(*output_stream
)));
641 Status
Write(const Table
& table
) {
642 std::unique_ptr
<liborc::WriterOptions
> orc_options
=
643 std::unique_ptr
<liborc::WriterOptions
>(new liborc::WriterOptions());
644 ARROW_ASSIGN_OR_RAISE(auto orc_schema
, GetOrcType(*(table
.schema())));
646 writer_
= liborc::createWriter(*orc_schema
, out_stream_
.get(), *orc_options
))
648 int64_t num_rows
= table
.num_rows();
649 const int num_cols_
= table
.num_columns();
650 std::vector
<int64_t> arrow_index_offset(num_cols_
, 0);
651 std::vector
<int> arrow_chunk_offset(num_cols_
, 0);
652 std::unique_ptr
<liborc::ColumnVectorBatch
> batch
=
653 writer_
->createRowBatch(kOrcWriterBatchSize
);
654 liborc::StructVectorBatch
* root
=
655 internal::checked_cast
<liborc::StructVectorBatch
*>(batch
.get());
656 while (num_rows
> 0) {
657 for (int i
= 0; i
< num_cols_
; i
++) {
658 RETURN_NOT_OK(adapters::orc::WriteBatch(
659 *(table
.column(i
)), kOrcWriterBatchSize
, &(arrow_chunk_offset
[i
]),
660 &(arrow_index_offset
[i
]), (root
->fields
)[i
]));
662 root
->numElements
= (root
->fields
)[0]->numElements
;
663 writer_
->add(*batch
);
665 num_rows
-= kOrcWriterBatchSize
;
676 std::unique_ptr
<liborc::Writer
> writer_
;
677 std::unique_ptr
<liborc::OutputStream
> out_stream_
;
680 ORCFileWriter::~ORCFileWriter() {}
682 ORCFileWriter::ORCFileWriter() { impl_
.reset(new ORCFileWriter::Impl()); }
684 Result
<std::unique_ptr
<ORCFileWriter
>> ORCFileWriter::Open(
685 io::OutputStream
* output_stream
) {
686 std::unique_ptr
<ORCFileWriter
> result
=
687 std::unique_ptr
<ORCFileWriter
>(new ORCFileWriter());
688 Status status
= result
->impl_
->Open(output_stream
);
689 RETURN_NOT_OK(status
);
690 return std::move(result
);
693 Status
ORCFileWriter::Write(const Table
& table
) { return impl_
->Write(table
); }
695 Status
ORCFileWriter::Close() { return impl_
->Close(); }
698 } // namespace adapters