]> git.proxmox.com Git - ceph.git/blame - ceph/src/s3select/include/s3select_parquet_intrf.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / s3select / include / s3select_parquet_intrf.h
CommitLineData
20effc67
TL
1
2#pragma once
3
4#if ! __has_include (<arrow/api.h>) || ! __has_include (<arrow/io/api.h>) || !__has_include (<parquet/arrow/reader.h>)
5# undef _ARROW_EXIST
6#endif
7
8#ifdef _ARROW_EXIST
9
10#include <iostream>
11#include <arrow/api.h>
12#include <arrow/io/api.h>
13#include <parquet/arrow/reader.h>
14#include <parquet/arrow/writer.h>
15#include <parquet/exception.h>
16#include <set>
17#include <parquet/column_reader.h>
18#include <arrow/util/io_util.h>
19
20#include <arrow/io/interfaces.h>
21#include <utility>
22
23#include <mutex>
24#include <functional>
25
26#include "internal_file_decryptor.h"
27#include "encryption_internal.h"
28
29/******************************************/
30/******************************************/
31class optional_yield;
32namespace s3selectEngine {
33class rgw_s3select_api {
34
35 // global object for setting interface between RGW and parquet-reader
36 private:
37
38 public:
39
40 std::function<int(int64_t,int64_t,void*,optional_yield*)> range_req_fptr;
41 std::function<size_t(void)> get_size_fptr;
42 optional_yield *m_y;
43
44 void set_range_req_api(std::function<int(int64_t,int64_t,void*,optional_yield*)> fp)
45 {
46 range_req_fptr = fp;
47 }
48
49 void set_get_size_api(std::function<size_t(void)> fp)
50 {
51 get_size_fptr = fp;
52 }
53};
54}
55
56/******************************************/
57/******************************************/
58/******************************************/
59
60static constexpr uint8_t kParquetMagic[4] = {'P', 'A', 'R', '1'};
61static constexpr uint8_t kParquetEMagic[4] = {'P', 'A', 'R', 'E'};
62constexpr int kGcmTagLength = 16;
63
64namespace arrow {
65namespace io {
66namespace internal {
67
68ARROW_EXPORT void CloseFromDestructor(FileInterface* file);
69
70// Validate a (offset, size) region (as given to ReadAt) against
71// the file size. Return the actual read size.
72ARROW_EXPORT Result<int64_t> ValidateReadRange(int64_t offset, int64_t size,
73 int64_t file_size);
74// Validate a (offset, size) region (as given to WriteAt) against
75// the file size. Short writes are not allowed.
76ARROW_EXPORT Status ValidateWriteRange(int64_t offset, int64_t size, int64_t file_size);
77
78// Validate a (offset, size) region (as given to ReadAt or WriteAt), without
79// knowing the file size.
80ARROW_EXPORT Status ValidateRange(int64_t offset, int64_t size);
81
82ARROW_EXPORT
83std::vector<ReadRange> CoalesceReadRanges(std::vector<ReadRange> ranges,
84 int64_t hole_size_limit,
85 int64_t range_size_limit);
86
87ARROW_EXPORT
88::arrow::internal::ThreadPool* GetIOThreadPool();
89
90} // namespace internal
91} // namespace io
92}
93
94
95// RGWimpl and OSFile implements the access to storage objects, OSFile(filesystem files) RGWimpl( ceph S3 )
96// ObjectInterface(temporary) is "empty base class" enables injections of access function to storage-objects
97// ReadableFileImpl an implementation layer to ObjectInterface objects
98// ReadableFile a layer which call to ReadableFileImpl, enable runtime switching between implementations
99// ParquetFileReader is the main interface (underline implementation is transparent to this layer)
100//
101
102
103namespace arrow {
104class Buffer;
105namespace io {
106
107class ObjectInterface {
108
109#define NOT_IMPLEMENTED {std::cout << "not implemented" << std::endl;}
110
111//purpose: to implement the range-request from single object
112public:
113 ObjectInterface() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}
114
115 virtual ~ObjectInterface(){}
116
117 // Note: only one of the Open* methods below may be called on a given instance
118
119 virtual Status OpenWritable(const std::string& path, bool truncate, bool append, bool write_only){return Status::OK();}
120
121 // This is different from OpenWritable(string, ...) in that it doesn't
122 // truncate nor mandate a seekable file
123 virtual Status OpenWritable(int fd){return Status::OK();}
124
125 virtual Status OpenReadable(const std::string& path){return Status::OK();}
126
127 virtual Status OpenReadable(int fd){return Status::OK();}
128
129 virtual Status CheckClosed() const {return Status::OK();}
130
131 virtual Status Close(){return Status::OK();}
132
133 virtual Result<int64_t> Read(int64_t nbytes, void* out){return Result<int64_t>(-1);}
134
135 virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out){return Result<int64_t>(-1);}
136
137 virtual Status Seek(int64_t pos){return Status::OK();}
138
139 virtual Result<int64_t> Tell() const {return Result<int64_t>(-1);}
140
141 virtual Status Write(const void* data, int64_t length){return Status::OK();}
142
143 virtual int fd() const{return -1;}
144
145 virtual bool is_open() const{return false;}
146
147 virtual int64_t size() const{return -1;}
148
149 virtual FileMode::type mode() const{return FileMode::READ;}
150
151 #if 0
152 std::mutex& lock(){}
153 #endif
154
155 protected:
156 virtual Status SetFileName(const std::string& file_name){return Status::OK();}
157
158 virtual Status SetFileName(int fd){return Status::OK();}
159
160 virtual Status CheckPositioned(){return Status::OK();}
161
162 ::arrow::internal::PlatformFilename file_name_;
163
164 std::mutex lock_;
165
166 // File descriptor
167 int fd_;
168
169 FileMode::type mode_;
170
171 bool is_open_;
172 int64_t size_;
173 // Whether ReadAt made the file position non-deterministic.
174 std::atomic<bool> need_seeking_;
175
176}; //ObjectInterface
177
178} //namespace io
179} //namespace arrow
180
181namespace arrow {
182
183using internal::IOErrorFromErrno;
184
185namespace io {
186
187class OSFile : public ObjectInterface {
188 public:
189 OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}
190
191 ~OSFile() {}
192
193 // Note: only one of the Open* methods below may be called on a given instance
194
195 Status OpenWritable(const std::string& path, bool truncate, bool append,
196 bool write_only) override {
197 RETURN_NOT_OK(SetFileName(path));
198
199 ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, write_only,
200 truncate, append));
201 is_open_ = true;
202 mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
203
204 if (!truncate) {
205 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
206 } else {
207 size_ = 0;
208 }
209 return Status::OK();
210 }
211
212 // This is different from OpenWritable(string, ...) in that it doesn't
213 // truncate nor mandate a seekable file
214 Status OpenWritable(int fd) override {
215 auto result = ::arrow::internal::FileGetSize(fd);
216 if (result.ok()) {
217 size_ = *result;
218 } else {
219 // Non-seekable file
220 size_ = -1;
221 }
222 RETURN_NOT_OK(SetFileName(fd));
223 is_open_ = true;
224 mode_ = FileMode::WRITE;
225 fd_ = fd;
226 return Status::OK();
227 }
228
229 Status OpenReadable(const std::string& path) override {
230 RETURN_NOT_OK(SetFileName(path));
231
232 ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_));
233 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
234
235 is_open_ = true;
236 mode_ = FileMode::READ;
237 return Status::OK();
238 }
239
240 Status OpenReadable(int fd) override {
241 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd));
242 RETURN_NOT_OK(SetFileName(fd));
243 is_open_ = true;
244 mode_ = FileMode::READ;
245 fd_ = fd;
246 return Status::OK();
247 }
248
249 Status CheckClosed() const override {
250 if (!is_open_) {
251 return Status::Invalid("Invalid operation on closed file");
252 }
253 return Status::OK();
254 }
255
256 Status Close() override {
257 if (is_open_) {
258 // Even if closing fails, the fd will likely be closed (perhaps it's
259 // already closed).
260 is_open_ = false;
261 int fd = fd_;
262 fd_ = -1;
263 RETURN_NOT_OK(::arrow::internal::FileClose(fd));
264 }
265 return Status::OK();
266 }
267
268 Result<int64_t> Read(int64_t nbytes, void* out) override {
269 RETURN_NOT_OK(CheckClosed());
270 RETURN_NOT_OK(CheckPositioned());
271 return ::arrow::internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes);
272 }
273
274 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
275 RETURN_NOT_OK(CheckClosed());
276 RETURN_NOT_OK(internal::ValidateRange(position, nbytes));
277 // ReadAt() leaves the file position undefined, so require that we seek
278 // before calling Read() or Write().
279 need_seeking_.store(true);
280 return ::arrow::internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position,
281 nbytes);
282 }
283
284 Status Seek(int64_t pos) override {
285 RETURN_NOT_OK(CheckClosed());
286 if (pos < 0) {
287 return Status::Invalid("Invalid position");
288 }
289 Status st = ::arrow::internal::FileSeek(fd_, pos);
290 if (st.ok()) {
291 need_seeking_.store(false);
292 }
293 return st;
294 }
295
296 Result<int64_t> Tell() const override {
297 RETURN_NOT_OK(CheckClosed());
298 return ::arrow::internal::FileTell(fd_);
299 }
300
301 Status Write(const void* data, int64_t length) override {
302 RETURN_NOT_OK(CheckClosed());
303
304 std::lock_guard<std::mutex> guard(lock_);
305 RETURN_NOT_OK(CheckPositioned());
306 if (length < 0) {
307 return Status::IOError("Length must be non-negative");
308 }
309 return ::arrow::internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data),
310 length);
311 }
312
313 int fd() const override { return fd_; }
314
315 bool is_open() const override { return is_open_; }
316
317 int64_t size() const override { return size_; }
318
319 FileMode::type mode() const override { return mode_; }
320
321 std::mutex& lock() { return lock_; }
322
323 protected:
324 Status SetFileName(const std::string& file_name) override {
325 return ::arrow::internal::PlatformFilename::FromString(file_name).Value(&file_name_);
326 }
327
328 Status SetFileName(int fd) override {
329 std::stringstream ss;
330 ss << "<fd " << fd << ">";
331 return SetFileName(ss.str());
332 }
333
334 Status CheckPositioned() override {
335 if (need_seeking_.load()) {
336 return Status::Invalid(
337 "Need seeking after ReadAt() before "
338 "calling implicitly-positioned operation");
339 }
340 return Status::OK();
341 }
342
343 ::arrow::internal::PlatformFilename file_name_;
344
345 std::mutex lock_;
346
347 // File descriptor
348 int fd_;
349
350 FileMode::type mode_;
351
352 bool is_open_;
353 int64_t size_;
354 // Whether ReadAt made the file position non-deterministic.
355 std::atomic<bool> need_seeking_;
356};
357} // namespace io
358} // namespace arrow
359
360namespace arrow {
361class Buffer;
362namespace io {
363
364class RGWimpl : public ObjectInterface {
365
366//purpose: to implement the range-request from single object
367public:
368 RGWimpl(s3selectEngine::rgw_s3select_api* rgw) : fd_(-1), is_open_(false), size_(-1), need_seeking_(false),m_rgw_impl(rgw) {}
369
370 ~RGWimpl(){}
371
372#define NOT_IMPLEMENT { \
373 std::stringstream ss; \
374 ss << " method " << __FUNCTION__ << " is not implemented;"; \
375 throw parquet::ParquetException(ss.str()); \
376 }
377
378 // Note: only one of the Open* methods below may be called on a given instance
379
380 Status OpenWritable(const std::string& path, bool truncate, bool append, bool write_only) { NOT_IMPLEMENT;return Status::OK(); }
381
382 // This is different from OpenWritable(string, ...) in that it doesn't
383 // truncate nor mandate a seekable file
384 Status OpenWritable(int fd) {NOT_IMPLEMENT;return Status::OK(); }
385
386 Status OpenReadable(const std::string& path) {
387 //RGW-implement
388
389 RETURN_NOT_OK(SetFileName(path));//TODO can skip that
390 size_ = m_rgw_impl->get_size_fptr();
391
392 is_open_ = true;
393 mode_ = FileMode::READ;
394 return Status::OK();
395 }
396
397 Status OpenReadable(int fd) {NOT_IMPLEMENT;return Status::OK(); }
398
399 Status CheckClosed() const {
400 //RGW-implement
401 if (!is_open_) {
402 return Status::Invalid("Invalid operation on closed file");
403 }
404 return Status::OK();
405 }
406
407 Status Close() {
408 //RGW-implement
409 if (is_open_) {
410 // Even if closing fails, the fd will likely be closed (perhaps it's
411 // already closed).
412 is_open_ = false;
413 //int fd = fd_;
414 fd_ = -1;
415 //RETURN_NOT_OK(::arrow::internal::FileClose(fd));
416 }
417 return Status::OK();
418 }
419
420 Result<int64_t> Read(int64_t nbytes, void* out) {
421 NOT_IMPLEMENT;
422 RETURN_NOT_OK(CheckClosed());
423 RETURN_NOT_OK(CheckPositioned());
424 return ::arrow::internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes);
425 }
426
427 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
428
429 Result<int64_t> status = m_rgw_impl->range_req_fptr(position,nbytes,out,m_rgw_impl->m_y);
430
431 return status;
432 }
433
434 Status Seek(int64_t pos) {NOT_IMPLEMENT;return Status::OK(); }
435
436 Result<int64_t> Tell() const {
437 NOT_IMPLEMENT;
438 return Result<int64_t>(0);
439 }
440
441 Status Write(const void* data, int64_t length) {
442 NOT_IMPLEMENT;
443 return Status::OK();
444 }
445
446 int fd() const { return fd_; }
447
448 bool is_open() const { return is_open_; }
449
450 int64_t size() const { return size_; }
451
452 FileMode::type mode() const { return mode_; }
453
454 std::mutex& lock() { return lock_; } //TODO skip
455
456 protected:
457 Status SetFileName(const std::string& file_name) override {
458 return ::arrow::internal::PlatformFilename::FromString(file_name).Value(&file_name_);
459 }
460
461 Status SetFileName(int fd) {NOT_IMPLEMENT; return Status::OK(); }
462
463 Status CheckPositioned() {NOT_IMPLEMENT; return Status::OK(); }
464
465 ::arrow::internal::PlatformFilename file_name_;
466
467 std::mutex lock_;
468
469 // File descriptor
470 int fd_;
471
472 FileMode::type mode_;
473
474 bool is_open_;
475 int64_t size_;
476 // Whether ReadAt made the file position non-deterministic.
477 std::atomic<bool> need_seeking_;
478
479private:
480
481 s3selectEngine::rgw_s3select_api* m_rgw_impl;
482};
483
484} //namespace io
485} //namespace arrow
486
487namespace arrow {
488
489class Buffer;
490class MemoryPool;
491class Status;
492
493namespace io {
494namespace ceph {
495
496/// \brief An operating system file open in read-only mode.
497///
498/// Reads through this implementation are unbuffered. If many small reads
499/// need to be issued, it is recommended to use a buffering layer for good
500/// performance.
501class ARROW_EXPORT ReadableFile
502 : public internal::RandomAccessFileConcurrencyWrapper<ReadableFile> {
503 public:
504 ~ReadableFile() override;
505
506 /// \brief Open a local file for reading
507 /// \param[in] path with UTF8 encoding
508 /// \param[in] pool a MemoryPool for memory allocations
509 /// \return ReadableFile instance
510 static Result<std::shared_ptr<ReadableFile>> Open(
511 const std::string& path,s3selectEngine::rgw_s3select_api* rgw,MemoryPool* pool = default_memory_pool());
512
513 /// \brief Open a local file for reading
514 /// \param[in] fd file descriptor
515 /// \param[in] pool a MemoryPool for memory allocations
516 /// \return ReadableFile instance
517 ///
518 /// The file descriptor becomes owned by the ReadableFile, and will be closed
519 /// on Close() or destruction.
520 static Result<std::shared_ptr<ReadableFile>> Open(
521 int fd, MemoryPool* pool = default_memory_pool());
522
523 bool closed() const override;
524
525 int file_descriptor() const;
526
527 Status WillNeed(const std::vector<ReadRange>& ranges) override;
528
529 private:
530 friend RandomAccessFileConcurrencyWrapper<ReadableFile>;
531
532 explicit ReadableFile(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw);
533
534 Status DoClose();
535 Result<int64_t> DoTell() const;
536 Result<int64_t> DoRead(int64_t nbytes, void* buffer);
537 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
538
539 /// \brief Thread-safe implementation of ReadAt
540 Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
541
542 /// \brief Thread-safe implementation of ReadAt
543 Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
544
545 Result<int64_t> DoGetSize();
546 Status DoSeek(int64_t position);
547
548 class ARROW_NO_EXPORT ReadableFileImpl;
549 std::unique_ptr<ReadableFileImpl> impl_;
550};
551
552
553} // namespace ceph
554} // namespace io
555} // namespace arrow
556
557// ----------------------------------------------------------------------
558// ReadableFileImpl implementation
559
560namespace arrow {
561namespace io {
562namespace ceph {
563
564class ReadableFile::ReadableFileImpl : public ObjectInterface {
565 public:
566
567 ~ReadableFileImpl()
568 {
569 if(IMPL != nullptr)
570 {
571 delete IMPL;
572 }
573 }
574
575#ifdef CEPH_USE_FS
576 explicit ReadableFileImpl(MemoryPool* pool) : pool_(pool) {IMPL=new OSFile();}
577#endif
578 explicit ReadableFileImpl(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw) : pool_(pool) {IMPL=new RGWimpl(rgw);}
579
580 Status Open(const std::string& path) { return IMPL->OpenReadable(path); }
581
582 Status Open(int fd) { return IMPL->OpenReadable(fd); }
583
584 Result<std::shared_ptr<Buffer>> ReadBuffer(int64_t nbytes) {
585 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
586
587 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, IMPL->Read(nbytes, buffer->mutable_data()));
588 if (bytes_read < nbytes) {
589 RETURN_NOT_OK(buffer->Resize(bytes_read));
590 buffer->ZeroPadding();
591 }
592 return std::move(buffer);
593 }
594
595 Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes) {
596 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
597
598 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
599 IMPL->ReadAt(position, nbytes, buffer->mutable_data()));
600 if (bytes_read < nbytes) {
601 RETURN_NOT_OK(buffer->Resize(bytes_read));
602 buffer->ZeroPadding();
603 }
604 return std::move(buffer);
605 }
606
607 Status WillNeed(const std::vector<ReadRange>& ranges) {
608 RETURN_NOT_OK(CheckClosed());
609 for (const auto& range : ranges) {
610 RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length));
611#if defined(POSIX_FADV_WILLNEED)
612 if (posix_fadvise(fd_, range.offset, range.length, POSIX_FADV_WILLNEED)) {
613 return IOErrorFromErrno(errno, "posix_fadvise failed");
614 }
615#elif defined(F_RDADVISE) // macOS, BSD?
616 struct {
617 off_t ra_offset;
618 int ra_count;
619 } radvisory{range.offset, static_cast<int>(range.length)};
620 if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) {
621 return IOErrorFromErrno(errno, "fcntl(fd, F_RDADVISE, ...) failed");
622 }
623#endif
624 }
625 return Status::OK();
626 }
627
628 ObjectInterface *IMPL;//TODO to declare in ObjectInterface
629
630 private:
631
632 MemoryPool* pool_;
633
634};
635
636// ReadableFile implemmetation
637ReadableFile::ReadableFile(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw) { impl_.reset(new ReadableFileImpl(pool,rgw)); }
638
639ReadableFile::~ReadableFile() { internal::CloseFromDestructor(this); }
640
641Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(const std::string& path,
642 s3selectEngine::rgw_s3select_api* rgw,
643 MemoryPool* pool
644 ) {
645 auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool,rgw));
646 RETURN_NOT_OK(file->impl_->Open(path));
647 return file;
648}
649
650Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(int fd, MemoryPool* pool) {
651 NOT_IMPLEMENT;
652 auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool,0));
653 RETURN_NOT_OK(file->impl_->Open(fd));
654 return file;
655}
656
657Status ReadableFile::DoClose() { return impl_->Close(); }
658
659bool ReadableFile::closed() const { return !impl_->is_open(); }
660
661Status ReadableFile::WillNeed(const std::vector<ReadRange>& ranges) {
662 return impl_->WillNeed(ranges);
663}
664
665Result<int64_t> ReadableFile::DoTell() const { return impl_->Tell(); }
666
667Result<int64_t> ReadableFile::DoRead(int64_t nbytes, void* out) {
668 return impl_->IMPL->Read(nbytes, out);
669}
670
671Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) {
672 return impl_->IMPL->ReadAt(position, nbytes, out);
673}
674
675Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) {
676 return impl_->ReadBufferAt(position, nbytes);
677}
678
679Result<std::shared_ptr<Buffer>> ReadableFile::DoRead(int64_t nbytes) {
680 return impl_->ReadBuffer(nbytes);
681}
682
683Result<int64_t> ReadableFile::DoGetSize() { return impl_->IMPL->size(); }
684
685Status ReadableFile::DoSeek(int64_t pos) { return impl_->IMPL->Seek(pos); }
686
687int ReadableFile::file_descriptor() const { return impl_->IMPL->fd(); }
688
689} // namepace ceph
690} // namespace io
691} // namespace arrow
692
693
694namespace parquet {
695
696class ColumnReader;
697class FileMetaData;
698class PageReader;
699class RandomAccessSource;
700class RowGroupMetaData;
701
702namespace ceph {
703class PARQUET_EXPORT RowGroupReader {
704 public:
705 // Forward declare a virtual class 'Contents' to aid dependency injection and more
706 // easily create test fixtures
707 // An implementation of the Contents class is defined in the .cc file
708 struct Contents {
709 virtual ~Contents() {}
710 virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0;
711 virtual const RowGroupMetaData* metadata() const = 0;
712 virtual const ReaderProperties* properties() const = 0;
713 };
714
715 explicit RowGroupReader(std::unique_ptr<Contents> contents);
716
717 // Returns the rowgroup metadata
718 const RowGroupMetaData* metadata() const;
719
720 // Construct a ColumnReader for the indicated row group-relative
721 // column. Ownership is shared with the RowGroupReader.
722 std::shared_ptr<ColumnReader> Column(int i);
723
724 std::unique_ptr<PageReader> GetColumnPageReader(int i);
725
726 private:
727 // Holds a pointer to an instance of Contents implementation
728 std::unique_ptr<Contents> contents_;
729};
730
731class PARQUET_EXPORT ParquetFileReader {
732 public:
733 // Declare a virtual class 'Contents' to aid dependency injection and more
734 // easily create test fixtures
735 // An implementation of the Contents class is defined in the .cc file
736 struct PARQUET_EXPORT Contents {
737 static std::unique_ptr<Contents> Open(
738 std::shared_ptr<::arrow::io::RandomAccessFile> source,
739 const ReaderProperties& props = default_reader_properties(),
740 std::shared_ptr<FileMetaData> metadata = NULLPTR);
741
742 virtual ~Contents() = default;
743 // Perform any cleanup associated with the file contents
744 virtual void Close() = 0;
745 virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i) = 0;
746 virtual std::shared_ptr<FileMetaData> metadata() const = 0;
747 };
748
749 ParquetFileReader();
750 ~ParquetFileReader();
751
752 // Create a reader from some implementation of parquet-cpp's generic file
753 // input interface
754 //
755 // If you cannot provide exclusive access to your file resource, create a
756 // subclass of RandomAccessSource that wraps the shared resource
757 ARROW_DEPRECATED("Use arrow::io::RandomAccessFile version")
758 static std::unique_ptr<ParquetFileReader> Open(
759 std::unique_ptr<RandomAccessSource> source,
760 const ReaderProperties& props = default_reader_properties(),
761 std::shared_ptr<FileMetaData> metadata = NULLPTR);
762
763 // Create a file reader instance from an Arrow file object. Thread-safety is
764 // the responsibility of the file implementation
765 static std::unique_ptr<ParquetFileReader> Open(
766 std::shared_ptr<::arrow::io::RandomAccessFile> source,
767 const ReaderProperties& props = default_reader_properties(),
768 std::shared_ptr<FileMetaData> metadata = NULLPTR);
769
770 // API Convenience to open a serialized Parquet file on disk, using Arrow IO
771 // interfaces.
772 static std::unique_ptr<ParquetFileReader> OpenFile(
773 const std::string& path,s3selectEngine::rgw_s3select_api* rgw, bool memory_map = true,
774 const ReaderProperties& props = default_reader_properties(),
775 std::shared_ptr<FileMetaData> metadata = NULLPTR
776 );
777
778 void Open(std::unique_ptr<Contents> contents);
779 void Close();
780
781 // The RowGroupReader is owned by the FileReader
782 std::shared_ptr<RowGroupReader> RowGroup(int i);
783
784 // Returns the file metadata. Only one instance is ever created
785 std::shared_ptr<FileMetaData> metadata() const;
786
787 /// Pre-buffer the specified column indices in all row groups.
788 ///
789 /// Readers can optionally call this to cache the necessary slices
790 /// of the file in-memory before deserialization. Arrow readers can
791 /// automatically do this via an option. This is intended to
792 /// increase performance when reading from high-latency filesystems
793 /// (e.g. Amazon S3).
794 ///
795 /// After calling this, creating readers for row groups/column
796 /// indices that were not buffered may fail. Creating multiple
797 /// readers for the a subset of the buffered regions is
798 /// acceptable. This may be called again to buffer a different set
799 /// of row groups/columns.
800 ///
801 /// If memory usage is a concern, note that data will remain
802 /// buffered in memory until either \a PreBuffer() is called again,
803 /// or the reader itself is destructed. Reading - and buffering -
804 /// only one row group at a time may be useful.
805 void PreBuffer(const std::vector<int>& row_groups,
806 const std::vector<int>& column_indices,
807 const ::arrow::io::IOContext& ctx,
808 const ::arrow::io::CacheOptions& options);
809
810 private:
811 // Holds a pointer to an instance of Contents implementation
812 std::unique_ptr<Contents> contents_;
813};
814
815// Read only Parquet file metadata
816std::shared_ptr<FileMetaData> PARQUET_EXPORT
817ReadMetaData(const std::shared_ptr<::arrow::io::RandomAccessFile>& source);
818
819/// \brief Scan all values in file. Useful for performance testing
820/// \param[in] columns the column numbers to scan. If empty scans all
821/// \param[in] column_batch_size number of values to read at a time when scanning column
822/// \param[in] reader a ParquetFileReader instance
823/// \return number of semantic rows in file
824PARQUET_EXPORT
825int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
826 ParquetFileReader* reader);
827
828}//namespace ceph
829}//namespace parquet
830
831
832namespace parquet {
833
834namespace ceph {
835
836// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
837static constexpr int64_t kDefaultFooterReadSize = 64 * 1024;
838static constexpr uint32_t kFooterSize = 8;
839
840// For PARQUET-816
841static constexpr int64_t kMaxDictHeaderSize = 100;
842
843// ----------------------------------------------------------------------
844// RowGroupReader public API
845
846RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents)
847 : contents_(std::move(contents)) {}
848
849std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
850 if (i >= metadata()->num_columns()) {
851 std::stringstream ss;
852 ss << "Trying to read column index " << i << " but row group metadata has only "
853 << metadata()->num_columns() << " columns";
854 throw ParquetException(ss.str());
855 }
856 const ColumnDescriptor* descr = metadata()->schema()->Column(i);
857
858 std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
859 return ColumnReader::Make(
860 descr, std::move(page_reader),
861 const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
862}
863
864std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
865 if (i >= metadata()->num_columns()) {
866 std::stringstream ss;
867 ss << "Trying to read column index " << i << " but row group metadata has only "
868 << metadata()->num_columns() << " columns";
869 throw ParquetException(ss.str());
870 }
871 return contents_->GetColumnPageReader(i);
872}
873
874// Returns the rowgroup metadata
875const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
876
877/// Compute the section of the file that should be read for the given
878/// row group and column chunk.
879::arrow::io::ReadRange ComputeColumnChunkRange(FileMetaData* file_metadata,
880 int64_t source_size, int row_group_index,
881 int column_index) {
882 auto row_group_metadata = file_metadata->RowGroup(row_group_index);
883 auto column_metadata = row_group_metadata->ColumnChunk(column_index);
884
885 int64_t col_start = column_metadata->data_page_offset();
886 if (column_metadata->has_dictionary_page() &&
887 column_metadata->dictionary_page_offset() > 0 &&
888 col_start > column_metadata->dictionary_page_offset()) {
889 col_start = column_metadata->dictionary_page_offset();
890 }
891
892 int64_t col_length = column_metadata->total_compressed_size();
893 // PARQUET-816 workaround for old files created by older parquet-mr
894 const ApplicationVersion& version = file_metadata->writer_version();
895 if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) {
896 // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
897 // dictionary page header size in total_compressed_size and total_uncompressed_size
898 // (see IMPALA-694). We add padding to compensate.
899 int64_t bytes_remaining = source_size - (col_start + col_length);
900 int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
901 col_length += padding;
902 }
903
904 return {col_start, col_length};
905}
906
907// RowGroupReader::Contents implementation for the Parquet file specification
908class SerializedRowGroup : public RowGroupReader::Contents {
909 public:
910 SerializedRowGroup(std::shared_ptr<ArrowInputFile> source,
911 std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source,
912 int64_t source_size, FileMetaData* file_metadata,
913 int row_group_number, const ReaderProperties& props,
914 std::shared_ptr<parquet::InternalFileDecryptor> file_decryptor = nullptr)
915 : source_(std::move(source)),
916 cached_source_(std::move(cached_source)),
917 source_size_(source_size),
918 file_metadata_(file_metadata),
919 properties_(props),
920 row_group_ordinal_(row_group_number),
921 file_decryptor_(file_decryptor) {
922 row_group_metadata_ = file_metadata->RowGroup(row_group_number);
923 }
924
925 const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); }
926
927 const ReaderProperties* properties() const override { return &properties_; }
928
929 std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
930 // Read column chunk from the file
931 auto col = row_group_metadata_->ColumnChunk(i);
932
933 ::arrow::io::ReadRange col_range =
934 ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i);
935 std::shared_ptr<ArrowInputStream> stream;
936 if (cached_source_) {
937 // PARQUET-1698: if read coalescing is enabled, read from pre-buffered
938 // segments.
939 PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
940 stream = std::make_shared<::arrow::io::BufferReader>(buffer);
941 } else {
942 stream = properties_.GetStream(source_, col_range.offset, col_range.length);
943 }
944
945 std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col->crypto_metadata();
946
947 // Column is encrypted only if crypto_metadata exists.
948 if (!crypto_metadata) {
949 return PageReader::Open(stream, col->num_values(), col->compression(),
950 properties_.memory_pool());
951 }
952
953 if (file_decryptor_ == nullptr) {
954 throw ParquetException("RowGroup is noted as encrypted but no file decryptor");
955 }
956
957 constexpr auto kEncryptedRowGroupsLimit = 32767;
958 if (i > kEncryptedRowGroupsLimit) {
959 throw ParquetException("Encrypted files cannot contain more than 32767 row groups");
960 }
961
962 // The column is encrypted
963 std::shared_ptr<::parquet::Decryptor> meta_decryptor;
964 std::shared_ptr<Decryptor> data_decryptor;
965 // The column is encrypted with footer key
966 if (crypto_metadata->encrypted_with_footer_key()) {
967 meta_decryptor = file_decryptor_->GetFooterDecryptorForColumnMeta();
968 data_decryptor = file_decryptor_->GetFooterDecryptorForColumnData();
969
970 CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_,
971 static_cast<int16_t>(i), meta_decryptor, data_decryptor);
972 return PageReader::Open(stream, col->num_values(), col->compression(),
973 properties_.memory_pool(), &ctx);
974 }
975
976 // The column is encrypted with its own key
977 std::string column_key_metadata = crypto_metadata->key_metadata();
978 const std::string column_path = crypto_metadata->path_in_schema()->ToDotString();
979
980 meta_decryptor =
981 file_decryptor_->GetColumnMetaDecryptor(column_path, column_key_metadata);
982 data_decryptor =
983 file_decryptor_->GetColumnDataDecryptor(column_path, column_key_metadata);
984
985 CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_,
986 static_cast<int16_t>(i), meta_decryptor, data_decryptor);
987 return PageReader::Open(stream, col->num_values(), col->compression(),
988 properties_.memory_pool(), &ctx);
989 }
990
991 private:
992 std::shared_ptr<ArrowInputFile> source_;
993 // Will be nullptr if PreBuffer() is not called.
994 std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source_;
995 int64_t source_size_;
996 FileMetaData* file_metadata_;
997 std::unique_ptr<RowGroupMetaData> row_group_metadata_;
998 ReaderProperties properties_;
999 int row_group_ordinal_;
1000 std::shared_ptr<InternalFileDecryptor> file_decryptor_;
1001};
1002
1003// ----------------------------------------------------------------------
1004// SerializedFile: An implementation of ParquetFileReader::Contents that deals
1005// with the Parquet file structure, Thrift deserialization, and other internal
1006// matters
1007
1008// This class takes ownership of the provided data source
1009class SerializedFile : public ParquetFileReader::Contents {
1010 public:
1011 SerializedFile(std::shared_ptr<ArrowInputFile> source,
1012 const ReaderProperties& props = default_reader_properties())
1013 : source_(std::move(source)), properties_(props) {
1014 PARQUET_ASSIGN_OR_THROW(source_size_, source_->GetSize());
1015 }
1016
1017 ~SerializedFile() override {
1018 try {
1019 Close();
1020 } catch (...) {
1021 }
1022 }
1023
1024 void Close() override {
1025 if (file_decryptor_) file_decryptor_->WipeOutDecryptionKeys();
1026 }
1027
1028 std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
1029 std::unique_ptr<SerializedRowGroup> contents(
1030 new SerializedRowGroup(source_, cached_source_, source_size_,
1031 file_metadata_.get(), i, properties_, file_decryptor_));
1032 return std::make_shared<RowGroupReader>(std::move(contents));
1033 }
1034
1035 std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; }
1036
1037 void set_metadata(std::shared_ptr<FileMetaData> metadata) {
1038 file_metadata_ = std::move(metadata);
1039 }
1040
1041 void PreBuffer(const std::vector<int>& row_groups,
1042 const std::vector<int>& column_indices,
1043 const ::arrow::io::IOContext& ctx,
1044 const ::arrow::io::CacheOptions& options) {
1045 cached_source_ =
1046 std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options);
1047 //std::vector<arrow::io::ReadRange> ranges;
1048 std::vector<::arrow::io::ReadRange> ranges;
1049 for (int row : row_groups) {
1050 for (int col : column_indices) {
1051 ranges.push_back(
1052 ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col));
1053 }
1054 }
1055 PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
1056 }
1057
1058 void ParseMetaData() {
1059 if (source_size_ == 0) {
1060 throw ParquetInvalidOrCorruptedFileException("Parquet file size is 0 bytes");
1061 } else if (source_size_ < kFooterSize) {
1062 throw ParquetInvalidOrCorruptedFileException(
1063 "Parquet file size is ", source_size_,
1064 " bytes, smaller than the minimum file footer (", kFooterSize, " bytes)");
1065 }
1066
1067 int64_t footer_read_size = std::min(source_size_, kDefaultFooterReadSize);
1068 PARQUET_ASSIGN_OR_THROW(
1069 auto footer_buffer,
1070 source_->ReadAt(source_size_ - footer_read_size, footer_read_size));
1071
1072 // Check if all bytes are read. Check if last 4 bytes read have the magic bits
1073 if (footer_buffer->size() != footer_read_size ||
1074 (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0 &&
1075 memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) != 0)) {
1076 throw ParquetInvalidOrCorruptedFileException(
1077 "Parquet magic bytes not found in footer. Either the file is corrupted or this "
1078 "is not a parquet file.");
1079 }
1080
1081 if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0) {
1082 // Encrypted file with Encrypted footer.
1083 ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, footer_read_size);
1084 return;
1085 }
1086
1087 // No encryption or encryption with plaintext footer mode.
1088 std::shared_ptr<Buffer> metadata_buffer;
1089 uint32_t metadata_len, read_metadata_len;
1090 ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer,
1091 &metadata_len, &read_metadata_len);
1092
1093 auto file_decryption_properties = properties_.file_decryption_properties().get();
1094 if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
1095 if (file_decryption_properties != nullptr) {
1096 if (!file_decryption_properties->plaintext_files_allowed()) {
1097 throw ParquetException("Applying decryption properties on plaintext file");
1098 }
1099 }
1100 } else {
1101 // Encrypted file with plaintext footer mode.
1102 ParseMetaDataOfEncryptedFileWithPlaintextFooter(
1103 file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len);
1104 }
1105 }
1106
1107 private:
1108 std::shared_ptr<ArrowInputFile> source_;
1109 std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source_;
1110 int64_t source_size_;
1111 std::shared_ptr<FileMetaData> file_metadata_;
1112 ReaderProperties properties_;
1113
1114 std::shared_ptr<::parquet::InternalFileDecryptor> file_decryptor_;
1115
1116 void ParseUnencryptedFileMetadata(const std::shared_ptr<Buffer>& footer_buffer,
1117 int64_t footer_read_size,
1118 std::shared_ptr<Buffer>* metadata_buffer,
1119 uint32_t* metadata_len, uint32_t* read_metadata_len);
1120
1121 std::string HandleAadPrefix(FileDecryptionProperties* file_decryption_properties,
1122 EncryptionAlgorithm& algo);
1123
1124 void ParseMetaDataOfEncryptedFileWithPlaintextFooter(
1125 FileDecryptionProperties* file_decryption_properties,
1126 const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len,
1127 uint32_t read_metadata_len);
1128
1129 void ParseMetaDataOfEncryptedFileWithEncryptedFooter(
1130 const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size);
1131};
1132
1133void SerializedFile::ParseUnencryptedFileMetadata(
1134 const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size,
1135 std::shared_ptr<Buffer>* metadata_buffer, uint32_t* metadata_len,
1136 uint32_t* read_metadata_len) {
1137 *metadata_len = ::arrow::util::SafeLoadAs<uint32_t>(
1138 reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size -
1139 kFooterSize);
1140 int64_t metadata_start = source_size_ - kFooterSize - *metadata_len;
1141 if (*metadata_len > source_size_ - kFooterSize) {
1142 throw ParquetInvalidOrCorruptedFileException(
1143 "Parquet file size is ", source_size_,
1144 " bytes, smaller than the size reported by metadata (", metadata_len, "bytes)");
1145 }
1146
1147 // Check if the footer_buffer contains the entire metadata
1148 if (footer_read_size >= (*metadata_len + kFooterSize)) {
1149 *metadata_buffer = SliceBuffer(
1150 footer_buffer, footer_read_size - *metadata_len - kFooterSize, *metadata_len);
1151 } else {
1152 PARQUET_ASSIGN_OR_THROW(*metadata_buffer,
1153 source_->ReadAt(metadata_start, *metadata_len));
1154 if ((*metadata_buffer)->size() != *metadata_len) {
1155 throw ParquetException("Failed reading metadata buffer (requested " +
1156 std::to_string(*metadata_len) + " bytes but got " +
1157 std::to_string((*metadata_buffer)->size()) + " bytes)");
1158 }
1159 }
1160
1161 *read_metadata_len = *metadata_len;
1162 file_metadata_ = FileMetaData::Make((*metadata_buffer)->data(), read_metadata_len);
1163}
1164
1165void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(
1166 const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size) {
1167 // encryption with encrypted footer
1168 // both metadata & crypto metadata length
1169 uint32_t footer_len = ::arrow::util::SafeLoadAs<uint32_t>(
1170 reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size -
1171 kFooterSize);
1172 int64_t crypto_metadata_start = source_size_ - kFooterSize - footer_len;
1173 if (kFooterSize + footer_len > source_size_) {
1174 throw ParquetInvalidOrCorruptedFileException(
1175 "Parquet file size is ", source_size_,
1176 " bytes, smaller than the size reported by footer's (", footer_len, "bytes)");
1177 }
1178 std::shared_ptr<Buffer> crypto_metadata_buffer;
1179 // Check if the footer_buffer contains the entire metadata
1180 if (footer_read_size >= (footer_len + kFooterSize)) {
1181 crypto_metadata_buffer = SliceBuffer(
1182 footer_buffer, footer_read_size - footer_len - kFooterSize, footer_len);
1183 } else {
1184 PARQUET_ASSIGN_OR_THROW(crypto_metadata_buffer,
1185 source_->ReadAt(crypto_metadata_start, footer_len));
1186 if (crypto_metadata_buffer->size() != footer_len) {
1187 throw ParquetException("Failed reading encrypted metadata buffer (requested " +
1188 std::to_string(footer_len) + " bytes but got " +
1189 std::to_string(crypto_metadata_buffer->size()) + " bytes)");
1190 }
1191 }
1192 auto file_decryption_properties = properties_.file_decryption_properties().get();
1193 if (file_decryption_properties == nullptr) {
1194 throw ParquetException(
1195 "Could not read encrypted metadata, no decryption found in reader's properties");
1196 }
1197 uint32_t crypto_metadata_len = footer_len;
1198 std::shared_ptr<FileCryptoMetaData> file_crypto_metadata =
1199 FileCryptoMetaData::Make(crypto_metadata_buffer->data(), &crypto_metadata_len);
1200 // Handle AAD prefix
1201 EncryptionAlgorithm algo = file_crypto_metadata->encryption_algorithm();
1202 std::string file_aad = HandleAadPrefix(file_decryption_properties, algo);
1203 file_decryptor_ = std::make_shared<::parquet::InternalFileDecryptor>(
1204 file_decryption_properties, file_aad, algo.algorithm,
1205 file_crypto_metadata->key_metadata(), properties_.memory_pool());
1206
1207 int64_t metadata_offset = source_size_ - kFooterSize - footer_len + crypto_metadata_len;
1208 uint32_t metadata_len = footer_len - crypto_metadata_len;
1209 PARQUET_ASSIGN_OR_THROW(auto metadata_buffer,
1210 source_->ReadAt(metadata_offset, metadata_len));
1211 if (metadata_buffer->size() != metadata_len) {
1212 throw ParquetException("Failed reading metadata buffer (requested " +
1213 std::to_string(metadata_len) + " bytes but got " +
1214 std::to_string(metadata_buffer->size()) + " bytes)");
1215 }
1216
1217 file_metadata_ =
1218 FileMetaData::Make(metadata_buffer->data(), &metadata_len, file_decryptor_);
1219}
1220
1221void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter(
1222 FileDecryptionProperties* file_decryption_properties,
1223 const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len,
1224 uint32_t read_metadata_len) {
1225 // Providing decryption properties in plaintext footer mode is not mandatory, for
1226 // example when reading by legacy reader.
1227 if (file_decryption_properties != nullptr) {
1228 EncryptionAlgorithm algo = file_metadata_->encryption_algorithm();
1229 // Handle AAD prefix
1230 std::string file_aad = HandleAadPrefix(file_decryption_properties, algo);
1231 file_decryptor_ = std::make_shared<::parquet::InternalFileDecryptor>(
1232 file_decryption_properties, file_aad, algo.algorithm,
1233 file_metadata_->footer_signing_key_metadata(), properties_.memory_pool());
1234 // set the InternalFileDecryptor in the metadata as well, as it's used
1235 // for signature verification and for ColumnChunkMetaData creation.
1236#if GAL_set_file_decryptor_declare_private
1237 file_metadata_->set_file_decryptor(file_decryptor_);
1238#endif
1239 if (file_decryption_properties->check_plaintext_footer_integrity()) {
1240 if (metadata_len - read_metadata_len !=
1241 (parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength)) {
1242 throw ParquetInvalidOrCorruptedFileException(
1243 "Failed reading metadata for encryption signature (requested ",
1244 parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength,
1245 " bytes but have ", metadata_len - read_metadata_len, " bytes)");
1246 }
1247
1248 if (!file_metadata_->VerifySignature(metadata_buffer->data() + read_metadata_len)) {
1249 throw ParquetInvalidOrCorruptedFileException(
1250 "Parquet crypto signature verification failed");
1251 }
1252 }
1253 }
1254}
1255
1256std::string SerializedFile::HandleAadPrefix(
1257 FileDecryptionProperties* file_decryption_properties, EncryptionAlgorithm& algo) {
1258 std::string aad_prefix_in_properties = file_decryption_properties->aad_prefix();
1259 std::string aad_prefix = aad_prefix_in_properties;
1260 bool file_has_aad_prefix = algo.aad.aad_prefix.empty() ? false : true;
1261 std::string aad_prefix_in_file = algo.aad.aad_prefix;
1262
1263 if (algo.aad.supply_aad_prefix && aad_prefix_in_properties.empty()) {
1264 throw ParquetException(
1265 "AAD prefix used for file encryption, "
1266 "but not stored in file and not supplied "
1267 "in decryption properties");
1268 }
1269
1270 if (file_has_aad_prefix) {
1271 if (!aad_prefix_in_properties.empty()) {
1272 if (aad_prefix_in_properties.compare(aad_prefix_in_file) != 0) {
1273 throw ParquetException(
1274 "AAD Prefix in file and in properties "
1275 "is not the same");
1276 }
1277 }
1278 aad_prefix = aad_prefix_in_file;
1279 std::shared_ptr<AADPrefixVerifier> aad_prefix_verifier =
1280 file_decryption_properties->aad_prefix_verifier();
1281 if (aad_prefix_verifier != nullptr) aad_prefix_verifier->Verify(aad_prefix);
1282 } else {
1283 if (!algo.aad.supply_aad_prefix && !aad_prefix_in_properties.empty()) {
1284 throw ParquetException(
1285 "AAD Prefix set in decryption properties, but was not used "
1286 "for file encryption");
1287 }
1288 std::shared_ptr<AADPrefixVerifier> aad_prefix_verifier =
1289 file_decryption_properties->aad_prefix_verifier();
1290 if (aad_prefix_verifier != nullptr) {
1291 throw ParquetException(
1292 "AAD Prefix Verifier is set, but AAD Prefix not found in file");
1293 }
1294 }
1295 return aad_prefix + algo.aad.aad_file_unique;
1296}
1297
1298// ----------------------------------------------------------------------
1299// ParquetFileReader public API
1300
1301ParquetFileReader::ParquetFileReader() {}
1302
1303ParquetFileReader::~ParquetFileReader() {
1304 try {
1305 Close();
1306 } catch (...) {
1307 }
1308}
1309
1310// Open the file. If no metadata is passed, it is parsed from the footer of
1311// the file
1312std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
1313 std::shared_ptr<ArrowInputFile> source, const ReaderProperties& props,
1314 std::shared_ptr<FileMetaData> metadata) {
1315 std::unique_ptr<ParquetFileReader::Contents> result(
1316 new SerializedFile(std::move(source), props));
1317
1318 // Access private methods here, but otherwise unavailable
1319 SerializedFile* file = static_cast<SerializedFile*>(result.get());
1320
1321 if (metadata == nullptr) {
1322 // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
1323 file->ParseMetaData();
1324 } else {
1325 file->set_metadata(std::move(metadata));
1326 }
1327
1328 return result;
1329}
1330
1331std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
1332 std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props,
1333 std::shared_ptr<FileMetaData> metadata) {
1334 auto contents = SerializedFile::Open(std::move(source), props, std::move(metadata));
1335 std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
1336 result->Open(std::move(contents));
1337 return result;
1338}
1339
1340#if GAL_NOT_IMPLEMENTED
1341std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
1342 std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
1343 std::shared_ptr<FileMetaData> metadata) {
1344 auto wrapper = std::make_shared<ParquetInputWrapper>(std::move(source));
1345 return Open(std::move(wrapper), props, std::move(metadata));
1346}
1347#endif
1348
1349std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
1350 const std::string& path, s3selectEngine::rgw_s3select_api* rgw, bool memory_map, const ReaderProperties& props,
1351 std::shared_ptr<FileMetaData> metadata) {
1352 std::shared_ptr<::arrow::io::RandomAccessFile> source;
1353 if (memory_map) {
1354 PARQUET_ASSIGN_OR_THROW(
1355 source, ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ));//GAL change that also, or to remove?
1356 } else {
1357 PARQUET_ASSIGN_OR_THROW(source,
1358 ::arrow::io::ceph::ReadableFile::Open(path, rgw, props.memory_pool()));
1359 }
1360
1361 return Open(std::move(source), props, std::move(metadata));
1362}
1363
1364void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
1365 contents_ = std::move(contents);
1366}
1367
1368void ParquetFileReader::Close() {
1369 if (contents_) {
1370 contents_->Close();
1371 }
1372}
1373
1374std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const {
1375 return contents_->metadata();
1376}
1377
1378std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
1379 if (i >= metadata()->num_row_groups()) {
1380 std::stringstream ss;
1381 ss << "Trying to read row group " << i << " but file only has "
1382 << metadata()->num_row_groups() << " row groups";
1383 throw ParquetException(ss.str());
1384 }
1385 return contents_->GetRowGroup(i);
1386}
1387
1388void ParquetFileReader::PreBuffer(const std::vector<int>& row_groups,
1389 const std::vector<int>& column_indices,
1390 const ::arrow::io::IOContext& ctx,
1391 const ::arrow::io::CacheOptions& options) {
1392 // Access private methods here
1393 SerializedFile* file =
1394 ::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
1395 file->PreBuffer(row_groups, column_indices, ctx, options);
1396}
1397
1398// ----------------------------------------------------------------------
1399// File metadata helpers
1400
1401std::shared_ptr<FileMetaData> ReadMetaData(
1402 const std::shared_ptr<::arrow::io::RandomAccessFile>& source) {
1403 return ParquetFileReader::Open(source)->metadata();
1404}
1405
1406// ----------------------------------------------------------------------
1407// File scanner for performance testing
1408#if GAL_ScanAllValues_is_no_declare
1409int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
1410 ParquetFileReader* reader) {
1411 std::vector<int16_t> rep_levels(column_batch_size);
1412 std::vector<int16_t> def_levels(column_batch_size);
1413
1414 int num_columns = static_cast<int>(columns.size());
1415
1416 // columns are not specified explicitly. Add all columns
1417 if (columns.size() == 0) {
1418 num_columns = reader->metadata()->num_columns();
1419 columns.resize(num_columns);
1420 for (int i = 0; i < num_columns; i++) {
1421 columns[i] = i;
1422 }
1423 }
1424
1425 std::vector<int64_t> total_rows(num_columns, 0);
1426
1427 for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
1428 auto group_reader = reader->RowGroup(r);
1429 int col = 0;
1430 for (auto i : columns) {
1431 std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
1432 size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
1433 std::vector<uint8_t> values(column_batch_size * value_byte_size);
1434
1435 int64_t values_read = 0;
1436 while (col_reader->HasNext()) {
1437 int64_t levels_read =
1438 ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
1439 values.data(), &values_read, col_reader.get());
1440 if (col_reader->descr()->max_repetition_level() > 0) {
1441 for (int64_t i = 0; i < levels_read; i++) {
1442 if (rep_levels[i] == 0) {
1443 total_rows[col]++;
1444 }
1445 }
1446 } else {
1447 total_rows[col] += levels_read;
1448 }
1449 }
1450 col++;
1451 }
1452 }
1453
1454 for (int i = 1; i < num_columns; ++i) {
1455 if (total_rows[0] != total_rows[i]) {
1456 throw ParquetException("Parquet error: Total rows among columns do not match");
1457 }
1458 }
1459
1460 return total_rows[0];
1461}
1462#endif
1463
1464} //namespace ceph
1465} //namespace parquet
1466
1467/******************************************/
1468/******************************************/
1469/******************************************/
1470class column_reader_wrap
1471{
1472
1473private:
1474
1475 int64_t m_rownum;
1476 parquet::Type::type m_type;
1477 std::shared_ptr<parquet::ceph::RowGroupReader> m_row_group_reader;
1478 int m_row_grouop_id;
1479 uint16_t m_col_id;
1480 parquet::ceph::ParquetFileReader* m_parquet_reader;
1481 std::shared_ptr<parquet::ColumnReader> m_ColumnReader;
1482 bool m_end_of_stream;
1483 bool m_read_last_value;
1484
1485
1486public:
1487
1488 enum class parquet_type
1489 {
1490 STRING,
1491 INT32,
1492 INT64,
1493 DOUBLE,
1494 TIMESTAMP,
1495 PARQUET_NULL
1496 };
1497
1498 typedef struct
1499 {
1500 int64_t num;
1501 char *str; //str is pointing to offset in string which is NOT null terminated.
1502 uint16_t str_len;
1503 double dbl;
1504 parquet_type type;
1505 } parquet_value_t;
1506
1507 private:
1508 parquet_value_t m_last_value;
1509
1510 public:
1511 column_reader_wrap(std::unique_ptr<parquet::ceph::ParquetFileReader> & parquet_reader,uint16_t col_id);
1512
1513 parquet::Type::type get_type();
1514
1515 bool HasNext();//TODO template
1516
1517 int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
1518 parquet_value_t* values, int64_t* values_read);
1519
1520 int64_t Skip(int64_t rows_to_skip);
1521
1522 int Read(uint64_t rownum,parquet_value_t & value);
1523
1524};
1525
1526class parquet_file_parser
1527{
1528
1529public:
1530
1531 typedef std::vector<std::pair<std::string, column_reader_wrap::parquet_type>> schema_t;
1532 typedef std::set<uint16_t> column_pos_t;
1533 typedef std::vector<column_reader_wrap::parquet_value_t> row_values_t;
1534
1535 typedef column_reader_wrap::parquet_value_t parquet_value_t;
1536 typedef column_reader_wrap::parquet_type parquet_type;
1537
1538private:
1539
1540 std::string m_parquet_file_name;
1541 uint32_t m_num_of_columms;
1542 uint64_t m_num_of_rows;
1543 uint64_t m_rownum;
1544 schema_t m_schm;
1545 int m_num_row_groups;
1546 std::shared_ptr<parquet::FileMetaData> m_file_metadata;
1547 std::unique_ptr<parquet::ceph::ParquetFileReader> m_parquet_reader;
1548 std::vector<column_reader_wrap*> m_column_readers;
1549 s3selectEngine::rgw_s3select_api* m_rgw_s3select_api;
1550
1551 public:
1552
1553 parquet_file_parser(std::string parquet_file_name,s3selectEngine::rgw_s3select_api* rgw_api) :
1554 m_parquet_file_name(parquet_file_name),
1555 m_num_of_columms(0),
1556 m_num_of_rows(0),
1557 m_rownum(0),
1558 m_num_row_groups(0),
1559 m_rgw_s3select_api(rgw_api)
1560
1561
1562 {
1563 load_meta_data();
1564 }
1565
1566 ~parquet_file_parser()
1567 {
1568 for(auto r : m_column_readers)
1569 {
1570 delete r;
1571 }
1572 }
1573
1574 int load_meta_data()
1575 {
1576 m_parquet_reader = parquet::ceph::ParquetFileReader::OpenFile(m_parquet_file_name,m_rgw_s3select_api,false);
1577 m_file_metadata = m_parquet_reader->metadata();
1578 m_num_of_columms = m_parquet_reader->metadata()->num_columns();
1579 m_num_row_groups = m_file_metadata->num_row_groups();
1580 m_num_of_rows = m_file_metadata->num_rows();
1581
1582 for (uint32_t i = 0; i < m_num_of_columms; i++)
1583 {
1584 parquet::Type::type tp = m_file_metadata->schema()->Column(i)->physical_type();
1585 std::pair<std::string, column_reader_wrap::parquet_type> elm;
1586
1587 switch (tp)
1588 {
1589 case parquet::Type::type::INT32:
1590 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::INT32);
1591 m_schm.push_back(elm);
1592 break;
1593
1594 case parquet::Type::type::INT64:
1595 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::INT64);
1596 m_schm.push_back(elm);
1597 break;
1598
1599 case parquet::Type::type::DOUBLE:
1600 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::DOUBLE);
1601 m_schm.push_back(elm);
1602 break;
1603
1604 case parquet::Type::type::BYTE_ARRAY:
1605 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::STRING);
1606 m_schm.push_back(elm);
1607 break;
1608
1609 default:
1610 {
1611 std::stringstream err;
1612 err << "some parquet type not supported";
1613 throw std::runtime_error(err.str());
1614 }
1615 }
1616
1617 m_column_readers.push_back(new column_reader_wrap(m_parquet_reader,i));
1618 }
1619
1620 return 0;
1621 }
1622
1623 bool end_of_stream()
1624 {
1625
1626 if (m_rownum >= m_num_of_rows)
1627 return true;
1628 return false;
1629 }
1630
1631 uint64_t get_number_of_rows()
1632 {
1633 return m_num_of_rows;
1634 }
1635
1636 bool increase_rownum()
1637 {
1638 if (end_of_stream())
1639 return false;
1640
1641 m_rownum++;
1642 return true;
1643 }
1644
1645 uint64_t get_rownum()
1646 {
1647 return m_rownum;
1648 }
1649
1650 uint32_t get_num_of_columns()
1651 {
1652 return m_num_of_columms;
1653 }
1654
1655 int get_column_values_by_positions(column_pos_t positions, row_values_t &row_values)
1656 {
1657 column_reader_wrap::parquet_value_t column_value;
1658 row_values.clear();
1659
1660 for(auto col : positions)
1661 {
1662 if((col)>=m_num_of_columms)
1663 {//TODO should verified upon syntax phase
1664 //TODO throw exception
1665 return -1;
1666 }
1667 m_column_readers[col]->Read(m_rownum,column_value);
1668 row_values.push_back(column_value);//TODO intensive (should move)
1669 }
1670 return 0;
1671 }
1672
1673 schema_t get_schema()
1674 {
1675 return m_schm;
1676 }
1677};
1678
1679/******************************************/
1680
1681
1682 column_reader_wrap::column_reader_wrap(std::unique_ptr<parquet::ceph::ParquetFileReader> & parquet_reader,uint16_t col_id):
1683 m_rownum(-1),
1684 m_type(parquet::Type::type::UNDEFINED),
1685 m_row_grouop_id(0),
1686 m_col_id(col_id),
1687 m_end_of_stream(false),
1688 m_read_last_value(false)
1689 {
1690 m_parquet_reader = parquet_reader.get();
1691 m_row_group_reader = m_parquet_reader->RowGroup(m_row_grouop_id);
1692 m_ColumnReader = m_row_group_reader->Column(m_col_id);
1693 }
1694
1695 parquet::Type::type column_reader_wrap::get_type()
1696 {//TODO if UNDEFINED
1697 return m_parquet_reader->metadata()->schema()->Column(m_col_id)->physical_type();
1698 }
1699
1700 bool column_reader_wrap::HasNext()//TODO template
1701 {
1702 parquet::Int32Reader* int32_reader;
1703 parquet::Int64Reader* int64_reader;
1704 parquet::DoubleReader* double_reader;
1705 parquet::ByteArrayReader* byte_array_reader;
1706
1707 switch (get_type())
1708 {
1709 case parquet::Type::type::INT32:
1710 int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get());
1711 return int32_reader->HasNext();
1712 break;
1713
1714 case parquet::Type::type::INT64:
1715 int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get());
1716 return int64_reader->HasNext();
1717 break;
1718
1719 case parquet::Type::type::DOUBLE:
1720 double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get());
1721 return double_reader->HasNext();
1722 break;
1723
1724 case parquet::Type::type::BYTE_ARRAY:
1725 byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get());
1726 return byte_array_reader->HasNext();
1727 break;
1728
1729 default:
1730 return false;
1731 //TODO throw exception
1732 }
1733
1734 return false;
1735 }
1736
1737 int64_t column_reader_wrap::ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
1738 parquet_value_t* values, int64_t* values_read)
1739 {
1740 parquet::Int32Reader* int32_reader;
1741 parquet::Int64Reader* int64_reader;
1742 parquet::DoubleReader* double_reader;
1743 parquet::ByteArrayReader* byte_array_reader;
1744
1745 parquet::ByteArray str_value;
1746 int64_t rows_read;
1747 int32_t i32_val;
1748
1749 auto error_msg = [&](std::exception &e)
1750 {
1751 std::stringstream err;
1752 err << "what() :" << e.what() << std::endl;
1753 err << "failed to parse column id:" << this->m_col_id << " name:" <<this->m_parquet_reader->metadata()->schema()->Column(m_col_id)->name();
1754 return err;
1755 };
1756 int16_t defintion_level;
1757 int16_t repeat_level;
1758
1759 switch (get_type())
1760 {
1761 case parquet::Type::type::INT32:
1762 int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get());
1763 try {
1764 rows_read = int32_reader->ReadBatch(1, nullptr, nullptr,&i32_val, values_read);
1765 }
1766 catch(std::exception &e)
1767 {
1768 throw std::runtime_error(error_msg(e).str());
1769 }
1770
1771 values->num = i32_val;
1772 values->type = column_reader_wrap::parquet_type::INT32;
1773 break;
1774
1775 case parquet::Type::type::INT64:
1776 int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get());
1777 try{
1778 rows_read = int64_reader->ReadBatch(1, nullptr, nullptr, (int64_t *)&(values->num), values_read);
1779 }
1780 catch(std::exception &e)
1781 {
1782 throw std::runtime_error(error_msg(e).str());
1783 }
1784 values->type = column_reader_wrap::parquet_type::INT64;
1785 break;
1786
1787 case parquet::Type::type::DOUBLE:
1788 try{
1789 double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get());
1790 }
1791 catch(std::exception &e)
1792 {
1793 throw std::runtime_error(error_msg(e).str());
1794 }
1795 rows_read = double_reader->ReadBatch(1, nullptr, nullptr, (double *)&(values->dbl), values_read);
1796 values->type = column_reader_wrap::parquet_type::DOUBLE;
1797 break;
1798
1799 case parquet::Type::type::BYTE_ARRAY:
1800 byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get());
1801 try{
1802 rows_read = byte_array_reader->ReadBatch(1, &defintion_level, &repeat_level, &str_value , values_read);
1803 }
1804 catch(std::exception &e)
1805 {
1806 throw std::runtime_error(error_msg(e).str());
1807 }
1808 values->str = (char*)str_value.ptr;
1809 values->str_len = str_value.len;
1810 if(defintion_level == 0)
1811 {
1812 values->type = column_reader_wrap::parquet_type::PARQUET_NULL;
1813 } else
1814 {
1815 values->type = column_reader_wrap::parquet_type::STRING;
1816 }
1817 break;
1818
1819 default:
1820 {
1821 std::stringstream err;
1822 err << "wrong type" << std::endl;
1823 throw std::runtime_error(err.str());
1824 }
1825
1826 }
1827
1828 return rows_read;
1829 }
1830
1831 int64_t column_reader_wrap::Skip(int64_t rows_to_skip)
1832 {
1833 parquet::Int32Reader* int32_reader;
1834 parquet::Int64Reader* int64_reader;
1835 parquet::DoubleReader* double_reader;
1836 parquet::ByteArrayReader* byte_array_reader;
1837
1838 parquet::ByteArray str_value;
1839 int64_t rows_read;
1840
1841 auto error_msg = [&](std::exception &e)
1842 {
1843 std::stringstream err;
1844 err << "what() :" << e.what() << std::endl;
1845 err << "failed to parse column id:" << this->m_col_id << " name:" <<this->m_parquet_reader->metadata()->schema()->Column(m_col_id)->name();
1846 return err;
1847 };
1848
1849 switch (get_type())
1850 {
1851 case parquet::Type::type::INT32:
1852 int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get());
1853 try{
1854 rows_read = int32_reader->Skip(rows_to_skip);
1855 }
1856 catch(std::exception &e)
1857 {
1858 throw std::runtime_error(error_msg(e).str());
1859 }
1860 break;
1861
1862 case parquet::Type::type::INT64:
1863 int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get());
1864 try{
1865 rows_read = int64_reader->Skip(rows_to_skip);
1866 }
1867 catch(std::exception &e)
1868 {
1869 throw std::runtime_error(error_msg(e).str());
1870 }
1871 break;
1872
1873 case parquet::Type::type::DOUBLE:
1874 double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get());
1875 try {
1876 rows_read = double_reader->Skip(rows_to_skip);
1877 }
1878 catch(std::exception &e)
1879 {
1880 throw std::runtime_error(error_msg(e).str());
1881 }
1882 break;
1883
1884 case parquet::Type::type::BYTE_ARRAY:
1885 byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get());
1886 try{
1887 rows_read = byte_array_reader->Skip(rows_to_skip);
1888 }
1889 catch(std::exception &e)
1890 {
1891 throw std::runtime_error(error_msg(e).str());
1892 }
1893 break;
1894
1895 default:
1896 {
1897 std::stringstream err;
1898 err << "wrong type" << std::endl;
1899 throw std::runtime_error(err.str());
1900 }
1901 }
1902
1903 return rows_read;
1904 }
1905
1906 int column_reader_wrap::Read(const uint64_t rownum,parquet_value_t & value)
1907 {
1908 int64_t values_read = 0;
1909
1910 if (m_rownum < (int64_t)rownum)
1911 { //should skip
1912 m_read_last_value = false;
1913
1914 uint64_t skipped_rows = Skip(rownum - m_rownum -1);
1915 m_rownum += skipped_rows;
1916
1917 while (((m_rownum+1) < (int64_t)rownum) || HasNext() == false)
1918 {
1919 uint64_t skipped_rows = Skip(rownum - m_rownum -1);
1920 m_rownum += skipped_rows;
1921
1922 if (HasNext() == false)
1923 {
1924 if ((m_row_grouop_id + 1) >= m_parquet_reader->metadata()->num_row_groups())
1925 {
1926 m_end_of_stream = true;
1927 return -2; //end-of-stream
1928 }
1929 else
1930 {
1931 m_row_grouop_id++;
1932 m_row_group_reader = m_parquet_reader->RowGroup(m_row_grouop_id);
1933 m_ColumnReader = m_row_group_reader->Column(m_col_id);
1934 }
1935 }
1936 } //end-while
1937
1938 ReadBatch(1, nullptr, nullptr, &m_last_value, &values_read);
1939 m_read_last_value = true;
1940 m_rownum++;
1941 value = m_last_value;
1942 }
1943 else
1944 {
1945 if (m_read_last_value == false)
1946 {
1947 ReadBatch(1, nullptr, nullptr, &m_last_value, &values_read);
1948 m_read_last_value = true;
1949 m_rownum++;
1950 }
1951
1952 value = m_last_value;
1953 }
1954
1955 return 0;
1956 }
1957
1958#endif
1959