]> git.proxmox.com Git - ceph.git/blame - ceph/src/s3select/include/s3select_parquet_intrf.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / s3select / include / s3select_parquet_intrf.h
CommitLineData
20effc67
TL
1
2#pragma once
3
4#if ! __has_include (<arrow/api.h>) || ! __has_include (<arrow/io/api.h>) || !__has_include (<parquet/arrow/reader.h>)
5# undef _ARROW_EXIST
6#endif
7
8#ifdef _ARROW_EXIST
9
10#include <iostream>
11#include <arrow/api.h>
12#include <arrow/io/api.h>
13#include <parquet/arrow/reader.h>
14#include <parquet/arrow/writer.h>
15#include <parquet/exception.h>
16#include <set>
17#include <parquet/column_reader.h>
18#include <arrow/util/io_util.h>
19
20#include <arrow/io/interfaces.h>
21#include <utility>
22
23#include <mutex>
24#include <functional>
25
26#include "internal_file_decryptor.h"
27#include "encryption_internal.h"
28
1e59de90
TL
29#if ARROW_VERSION_MAJOR < 9
30#define _ARROW_FD fd_
31#define _ARROW_FD_TYPE int
32#else
33#define _ARROW_FD fd_.fd()
34#define _ARROW_FD_TYPE arrow::internal::FileDescriptor
35#endif
36
20effc67
TL
37/******************************************/
38/******************************************/
39class optional_yield;
40namespace s3selectEngine {
41class rgw_s3select_api {
42
43 // global object for setting interface between RGW and parquet-reader
44 private:
45
46 public:
47
48 std::function<int(int64_t,int64_t,void*,optional_yield*)> range_req_fptr;
49 std::function<size_t(void)> get_size_fptr;
50 optional_yield *m_y;
51
52 void set_range_req_api(std::function<int(int64_t,int64_t,void*,optional_yield*)> fp)
53 {
54 range_req_fptr = fp;
55 }
56
57 void set_get_size_api(std::function<size_t(void)> fp)
58 {
59 get_size_fptr = fp;
60 }
61};
62}
63
64/******************************************/
65/******************************************/
66/******************************************/
67
68static constexpr uint8_t kParquetMagic[4] = {'P', 'A', 'R', '1'};
69static constexpr uint8_t kParquetEMagic[4] = {'P', 'A', 'R', 'E'};
70constexpr int kGcmTagLength = 16;
71
72namespace arrow {
73namespace io {
74namespace internal {
75
76ARROW_EXPORT void CloseFromDestructor(FileInterface* file);
77
78// Validate a (offset, size) region (as given to ReadAt) against
79// the file size. Return the actual read size.
80ARROW_EXPORT Result<int64_t> ValidateReadRange(int64_t offset, int64_t size,
81 int64_t file_size);
82// Validate a (offset, size) region (as given to WriteAt) against
83// the file size. Short writes are not allowed.
84ARROW_EXPORT Status ValidateWriteRange(int64_t offset, int64_t size, int64_t file_size);
85
86// Validate a (offset, size) region (as given to ReadAt or WriteAt), without
87// knowing the file size.
88ARROW_EXPORT Status ValidateRange(int64_t offset, int64_t size);
89
90ARROW_EXPORT
91std::vector<ReadRange> CoalesceReadRanges(std::vector<ReadRange> ranges,
92 int64_t hole_size_limit,
93 int64_t range_size_limit);
94
95ARROW_EXPORT
96::arrow::internal::ThreadPool* GetIOThreadPool();
97
98} // namespace internal
99} // namespace io
100}
101
102
103// RGWimpl and OSFile implements the access to storage objects, OSFile(filesystem files) RGWimpl( ceph S3 )
104// ObjectInterface(temporary) is "empty base class" enables injections of access function to storage-objects
105// ReadableFileImpl an implementation layer to ObjectInterface objects
106// ReadableFile a layer which call to ReadableFileImpl, enable runtime switching between implementations
107// ParquetFileReader is the main interface (underline implementation is transparent to this layer)
108//
109
110
111namespace arrow {
112class Buffer;
113namespace io {
114
115class ObjectInterface {
116
117#define NOT_IMPLEMENTED {std::cout << "not implemented" << std::endl;}
118
119//purpose: to implement the range-request from single object
120public:
121 ObjectInterface() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}
122
123 virtual ~ObjectInterface(){}
124
125 // Note: only one of the Open* methods below may be called on a given instance
126
127 virtual Status OpenWritable(const std::string& path, bool truncate, bool append, bool write_only){return Status::OK();}
128
129 // This is different from OpenWritable(string, ...) in that it doesn't
130 // truncate nor mandate a seekable file
131 virtual Status OpenWritable(int fd){return Status::OK();}
132
133 virtual Status OpenReadable(const std::string& path){return Status::OK();}
134
135 virtual Status OpenReadable(int fd){return Status::OK();}
136
137 virtual Status CheckClosed() const {return Status::OK();}
138
139 virtual Status Close(){return Status::OK();}
140
141 virtual Result<int64_t> Read(int64_t nbytes, void* out){return Result<int64_t>(-1);}
142
143 virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out){return Result<int64_t>(-1);}
144
145 virtual Status Seek(int64_t pos){return Status::OK();}
146
147 virtual Result<int64_t> Tell() const {return Result<int64_t>(-1);}
148
149 virtual Status Write(const void* data, int64_t length){return Status::OK();}
150
151 virtual int fd() const{return -1;}
152
153 virtual bool is_open() const{return false;}
154
155 virtual int64_t size() const{return -1;}
156
157 virtual FileMode::type mode() const{return FileMode::READ;}
158
159 #if 0
160 std::mutex& lock(){}
161 #endif
162
163 protected:
164 virtual Status SetFileName(const std::string& file_name){return Status::OK();}
165
166 virtual Status SetFileName(int fd){return Status::OK();}
167
168 virtual Status CheckPositioned(){return Status::OK();}
169
170 ::arrow::internal::PlatformFilename file_name_;
171
172 std::mutex lock_;
173
174 // File descriptor
1e59de90 175 _ARROW_FD_TYPE fd_;
20effc67
TL
176
177 FileMode::type mode_;
178
179 bool is_open_;
180 int64_t size_;
181 // Whether ReadAt made the file position non-deterministic.
182 std::atomic<bool> need_seeking_;
183
184}; //ObjectInterface
185
186} //namespace io
187} //namespace arrow
188
189namespace arrow {
190
191using internal::IOErrorFromErrno;
192
193namespace io {
194
195class OSFile : public ObjectInterface {
196 public:
197 OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}
198
199 ~OSFile() {}
200
201 // Note: only one of the Open* methods below may be called on a given instance
202
203 Status OpenWritable(const std::string& path, bool truncate, bool append,
204 bool write_only) override {
205 RETURN_NOT_OK(SetFileName(path));
206
207 ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, write_only,
208 truncate, append));
209 is_open_ = true;
210 mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
211
212 if (!truncate) {
1e59de90 213 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(_ARROW_FD));
20effc67
TL
214 } else {
215 size_ = 0;
216 }
217 return Status::OK();
218 }
219
220 // This is different from OpenWritable(string, ...) in that it doesn't
221 // truncate nor mandate a seekable file
222 Status OpenWritable(int fd) override {
223 auto result = ::arrow::internal::FileGetSize(fd);
224 if (result.ok()) {
225 size_ = *result;
226 } else {
227 // Non-seekable file
228 size_ = -1;
229 }
230 RETURN_NOT_OK(SetFileName(fd));
231 is_open_ = true;
232 mode_ = FileMode::WRITE;
1e59de90 233 #if ARROW_VERSION_MAJOR < 9
20effc67 234 fd_ = fd;
1e59de90
TL
235 #else
236 fd_ = arrow::internal::FileDescriptor{fd};
237 #endif
20effc67
TL
238 return Status::OK();
239 }
240
241 Status OpenReadable(const std::string& path) override {
242 RETURN_NOT_OK(SetFileName(path));
243
244 ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_));
1e59de90 245 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(_ARROW_FD));
20effc67
TL
246
247 is_open_ = true;
248 mode_ = FileMode::READ;
249 return Status::OK();
250 }
251
252 Status OpenReadable(int fd) override {
253 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd));
254 RETURN_NOT_OK(SetFileName(fd));
255 is_open_ = true;
256 mode_ = FileMode::READ;
1e59de90 257 #if ARROW_VERSION_MAJOR < 9
20effc67 258 fd_ = fd;
1e59de90
TL
259 #else
260 fd_ = arrow::internal::FileDescriptor{fd};
261 #endif
20effc67
TL
262 return Status::OK();
263 }
264
265 Status CheckClosed() const override {
266 if (!is_open_) {
267 return Status::Invalid("Invalid operation on closed file");
268 }
269 return Status::OK();
270 }
271
272 Status Close() override {
273 if (is_open_) {
274 // Even if closing fails, the fd will likely be closed (perhaps it's
275 // already closed).
276 is_open_ = false;
1e59de90 277 #if ARROW_VERSION_MAJOR < 9
20effc67
TL
278 int fd = fd_;
279 fd_ = -1;
280 RETURN_NOT_OK(::arrow::internal::FileClose(fd));
1e59de90
TL
281 #else
282 RETURN_NOT_OK(fd_.Close());
283 #endif
20effc67
TL
284 }
285 return Status::OK();
286 }
287
288 Result<int64_t> Read(int64_t nbytes, void* out) override {
289 RETURN_NOT_OK(CheckClosed());
290 RETURN_NOT_OK(CheckPositioned());
1e59de90 291 return ::arrow::internal::FileRead(_ARROW_FD, reinterpret_cast<uint8_t*>(out), nbytes);
20effc67
TL
292 }
293
294 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
295 RETURN_NOT_OK(CheckClosed());
296 RETURN_NOT_OK(internal::ValidateRange(position, nbytes));
297 // ReadAt() leaves the file position undefined, so require that we seek
298 // before calling Read() or Write().
299 need_seeking_.store(true);
1e59de90 300 return ::arrow::internal::FileReadAt(_ARROW_FD, reinterpret_cast<uint8_t*>(out), position,
20effc67
TL
301 nbytes);
302 }
303
304 Status Seek(int64_t pos) override {
305 RETURN_NOT_OK(CheckClosed());
306 if (pos < 0) {
307 return Status::Invalid("Invalid position");
308 }
1e59de90 309 Status st = ::arrow::internal::FileSeek(_ARROW_FD, pos);
20effc67
TL
310 if (st.ok()) {
311 need_seeking_.store(false);
312 }
313 return st;
314 }
315
316 Result<int64_t> Tell() const override {
317 RETURN_NOT_OK(CheckClosed());
1e59de90 318 return ::arrow::internal::FileTell(_ARROW_FD);
20effc67
TL
319 }
320
321 Status Write(const void* data, int64_t length) override {
322 RETURN_NOT_OK(CheckClosed());
323
324 std::lock_guard<std::mutex> guard(lock_);
325 RETURN_NOT_OK(CheckPositioned());
326 if (length < 0) {
327 return Status::IOError("Length must be non-negative");
328 }
1e59de90 329 return ::arrow::internal::FileWrite(_ARROW_FD, reinterpret_cast<const uint8_t*>(data),
20effc67
TL
330 length);
331 }
332
1e59de90 333 int fd() const override { return _ARROW_FD; }
20effc67
TL
334
335 bool is_open() const override { return is_open_; }
336
337 int64_t size() const override { return size_; }
338
339 FileMode::type mode() const override { return mode_; }
340
341 std::mutex& lock() { return lock_; }
342
343 protected:
344 Status SetFileName(const std::string& file_name) override {
345 return ::arrow::internal::PlatformFilename::FromString(file_name).Value(&file_name_);
346 }
347
348 Status SetFileName(int fd) override {
349 std::stringstream ss;
350 ss << "<fd " << fd << ">";
351 return SetFileName(ss.str());
352 }
353
354 Status CheckPositioned() override {
355 if (need_seeking_.load()) {
356 return Status::Invalid(
357 "Need seeking after ReadAt() before "
358 "calling implicitly-positioned operation");
359 }
360 return Status::OK();
361 }
362
363 ::arrow::internal::PlatformFilename file_name_;
364
365 std::mutex lock_;
366
367 // File descriptor
1e59de90 368 _ARROW_FD_TYPE fd_;
20effc67
TL
369
370 FileMode::type mode_;
371
372 bool is_open_;
373 int64_t size_;
374 // Whether ReadAt made the file position non-deterministic.
375 std::atomic<bool> need_seeking_;
376};
377} // namespace io
378} // namespace arrow
379
380namespace arrow {
381class Buffer;
382namespace io {
383
384class RGWimpl : public ObjectInterface {
385
386//purpose: to implement the range-request from single object
387public:
388 RGWimpl(s3selectEngine::rgw_s3select_api* rgw) : fd_(-1), is_open_(false), size_(-1), need_seeking_(false),m_rgw_impl(rgw) {}
389
390 ~RGWimpl(){}
391
392#define NOT_IMPLEMENT { \
393 std::stringstream ss; \
394 ss << " method " << __FUNCTION__ << " is not implemented;"; \
395 throw parquet::ParquetException(ss.str()); \
396 }
397
398 // Note: only one of the Open* methods below may be called on a given instance
399
400 Status OpenWritable(const std::string& path, bool truncate, bool append, bool write_only) { NOT_IMPLEMENT;return Status::OK(); }
401
402 // This is different from OpenWritable(string, ...) in that it doesn't
403 // truncate nor mandate a seekable file
404 Status OpenWritable(int fd) {NOT_IMPLEMENT;return Status::OK(); }
405
406 Status OpenReadable(const std::string& path) {
407 //RGW-implement
408
409 RETURN_NOT_OK(SetFileName(path));//TODO can skip that
410 size_ = m_rgw_impl->get_size_fptr();
411
412 is_open_ = true;
413 mode_ = FileMode::READ;
414 return Status::OK();
415 }
416
417 Status OpenReadable(int fd) {NOT_IMPLEMENT;return Status::OK(); }
418
419 Status CheckClosed() const {
420 //RGW-implement
421 if (!is_open_) {
422 return Status::Invalid("Invalid operation on closed file");
423 }
424 return Status::OK();
425 }
426
427 Status Close() {
428 //RGW-implement
429 if (is_open_) {
430 // Even if closing fails, the fd will likely be closed (perhaps it's
431 // already closed).
432 is_open_ = false;
433 //int fd = fd_;
1e59de90 434 #if ARROW_VERSION_MAJOR < 9
20effc67 435 fd_ = -1;
1e59de90
TL
436 #else
437 fd_.Close();
438 #endif
20effc67
TL
439 //RETURN_NOT_OK(::arrow::internal::FileClose(fd));
440 }
441 return Status::OK();
442 }
443
444 Result<int64_t> Read(int64_t nbytes, void* out) {
445 NOT_IMPLEMENT;
446 RETURN_NOT_OK(CheckClosed());
447 RETURN_NOT_OK(CheckPositioned());
1e59de90 448 return ::arrow::internal::FileRead(_ARROW_FD, reinterpret_cast<uint8_t*>(out), nbytes);
20effc67
TL
449 }
450
451 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
452
453 Result<int64_t> status = m_rgw_impl->range_req_fptr(position,nbytes,out,m_rgw_impl->m_y);
454
455 return status;
456 }
457
458 Status Seek(int64_t pos) {NOT_IMPLEMENT;return Status::OK(); }
459
460 Result<int64_t> Tell() const {
461 NOT_IMPLEMENT;
462 return Result<int64_t>(0);
463 }
464
465 Status Write(const void* data, int64_t length) {
466 NOT_IMPLEMENT;
467 return Status::OK();
468 }
469
1e59de90 470 int fd() const { return _ARROW_FD; }
20effc67
TL
471
472 bool is_open() const { return is_open_; }
473
474 int64_t size() const { return size_; }
475
476 FileMode::type mode() const { return mode_; }
477
478 std::mutex& lock() { return lock_; } //TODO skip
479
480 protected:
481 Status SetFileName(const std::string& file_name) override {
482 return ::arrow::internal::PlatformFilename::FromString(file_name).Value(&file_name_);
483 }
484
485 Status SetFileName(int fd) {NOT_IMPLEMENT; return Status::OK(); }
486
487 Status CheckPositioned() {NOT_IMPLEMENT; return Status::OK(); }
488
489 ::arrow::internal::PlatformFilename file_name_;
490
491 std::mutex lock_;
492
493 // File descriptor
1e59de90 494 _ARROW_FD_TYPE fd_;
20effc67
TL
495
496 FileMode::type mode_;
497
498 bool is_open_;
499 int64_t size_;
500 // Whether ReadAt made the file position non-deterministic.
501 std::atomic<bool> need_seeking_;
502
503private:
504
505 s3selectEngine::rgw_s3select_api* m_rgw_impl;
506};
507
508} //namespace io
509} //namespace arrow
510
511namespace arrow {
512
513class Buffer;
514class MemoryPool;
515class Status;
516
517namespace io {
518namespace ceph {
519
520/// \brief An operating system file open in read-only mode.
521///
522/// Reads through this implementation are unbuffered. If many small reads
523/// need to be issued, it is recommended to use a buffering layer for good
524/// performance.
525class ARROW_EXPORT ReadableFile
526 : public internal::RandomAccessFileConcurrencyWrapper<ReadableFile> {
527 public:
528 ~ReadableFile() override;
529
530 /// \brief Open a local file for reading
531 /// \param[in] path with UTF8 encoding
532 /// \param[in] pool a MemoryPool for memory allocations
533 /// \return ReadableFile instance
534 static Result<std::shared_ptr<ReadableFile>> Open(
535 const std::string& path,s3selectEngine::rgw_s3select_api* rgw,MemoryPool* pool = default_memory_pool());
536
537 /// \brief Open a local file for reading
538 /// \param[in] fd file descriptor
539 /// \param[in] pool a MemoryPool for memory allocations
540 /// \return ReadableFile instance
541 ///
542 /// The file descriptor becomes owned by the ReadableFile, and will be closed
543 /// on Close() or destruction.
544 static Result<std::shared_ptr<ReadableFile>> Open(
545 int fd, MemoryPool* pool = default_memory_pool());
546
547 bool closed() const override;
548
549 int file_descriptor() const;
550
551 Status WillNeed(const std::vector<ReadRange>& ranges) override;
552
553 private:
554 friend RandomAccessFileConcurrencyWrapper<ReadableFile>;
555
556 explicit ReadableFile(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw);
557
558 Status DoClose();
559 Result<int64_t> DoTell() const;
560 Result<int64_t> DoRead(int64_t nbytes, void* buffer);
561 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
562
563 /// \brief Thread-safe implementation of ReadAt
564 Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
565
566 /// \brief Thread-safe implementation of ReadAt
567 Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
568
569 Result<int64_t> DoGetSize();
570 Status DoSeek(int64_t position);
571
572 class ARROW_NO_EXPORT ReadableFileImpl;
573 std::unique_ptr<ReadableFileImpl> impl_;
574};
575
576
577} // namespace ceph
578} // namespace io
579} // namespace arrow
580
581// ----------------------------------------------------------------------
582// ReadableFileImpl implementation
583
584namespace arrow {
585namespace io {
586namespace ceph {
587
588class ReadableFile::ReadableFileImpl : public ObjectInterface {
589 public:
590
591 ~ReadableFileImpl()
592 {
593 if(IMPL != nullptr)
594 {
595 delete IMPL;
596 }
597 }
598
599#ifdef CEPH_USE_FS
600 explicit ReadableFileImpl(MemoryPool* pool) : pool_(pool) {IMPL=new OSFile();}
601#endif
602 explicit ReadableFileImpl(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw) : pool_(pool) {IMPL=new RGWimpl(rgw);}
603
604 Status Open(const std::string& path) { return IMPL->OpenReadable(path); }
605
606 Status Open(int fd) { return IMPL->OpenReadable(fd); }
607
608 Result<std::shared_ptr<Buffer>> ReadBuffer(int64_t nbytes) {
609 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
610
611 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, IMPL->Read(nbytes, buffer->mutable_data()));
612 if (bytes_read < nbytes) {
613 RETURN_NOT_OK(buffer->Resize(bytes_read));
614 buffer->ZeroPadding();
615 }
1e59de90 616 return buffer;
20effc67
TL
617 }
618
619 Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes) {
620 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
621
622 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
623 IMPL->ReadAt(position, nbytes, buffer->mutable_data()));
624 if (bytes_read < nbytes) {
625 RETURN_NOT_OK(buffer->Resize(bytes_read));
626 buffer->ZeroPadding();
627 }
1e59de90 628 return buffer;
20effc67
TL
629 }
630
631 Status WillNeed(const std::vector<ReadRange>& ranges) {
632 RETURN_NOT_OK(CheckClosed());
633 for (const auto& range : ranges) {
634 RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length));
635#if defined(POSIX_FADV_WILLNEED)
1e59de90 636 if (posix_fadvise(_ARROW_FD, range.offset, range.length, POSIX_FADV_WILLNEED)) {
20effc67
TL
637 return IOErrorFromErrno(errno, "posix_fadvise failed");
638 }
639#elif defined(F_RDADVISE) // macOS, BSD?
640 struct {
641 off_t ra_offset;
642 int ra_count;
643 } radvisory{range.offset, static_cast<int>(range.length)};
1e59de90 644 if (radvisory.ra_count > 0 && fcntl(_ARROW_FD, F_RDADVISE, &radvisory) == -1) {
20effc67
TL
645 return IOErrorFromErrno(errno, "fcntl(fd, F_RDADVISE, ...) failed");
646 }
647#endif
648 }
649 return Status::OK();
650 }
651
652 ObjectInterface *IMPL;//TODO to declare in ObjectInterface
653
654 private:
655
656 MemoryPool* pool_;
657
658};
659
660// ReadableFile implemmetation
661ReadableFile::ReadableFile(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw) { impl_.reset(new ReadableFileImpl(pool,rgw)); }
662
663ReadableFile::~ReadableFile() { internal::CloseFromDestructor(this); }
664
665Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(const std::string& path,
666 s3selectEngine::rgw_s3select_api* rgw,
667 MemoryPool* pool
668 ) {
669 auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool,rgw));
670 RETURN_NOT_OK(file->impl_->Open(path));
671 return file;
672}
673
674Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(int fd, MemoryPool* pool) {
675 NOT_IMPLEMENT;
676 auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool,0));
677 RETURN_NOT_OK(file->impl_->Open(fd));
678 return file;
679}
680
681Status ReadableFile::DoClose() { return impl_->Close(); }
682
683bool ReadableFile::closed() const { return !impl_->is_open(); }
684
685Status ReadableFile::WillNeed(const std::vector<ReadRange>& ranges) {
686 return impl_->WillNeed(ranges);
687}
688
689Result<int64_t> ReadableFile::DoTell() const { return impl_->Tell(); }
690
691Result<int64_t> ReadableFile::DoRead(int64_t nbytes, void* out) {
692 return impl_->IMPL->Read(nbytes, out);
693}
694
695Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) {
696 return impl_->IMPL->ReadAt(position, nbytes, out);
697}
698
699Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) {
700 return impl_->ReadBufferAt(position, nbytes);
701}
702
703Result<std::shared_ptr<Buffer>> ReadableFile::DoRead(int64_t nbytes) {
704 return impl_->ReadBuffer(nbytes);
705}
706
707Result<int64_t> ReadableFile::DoGetSize() { return impl_->IMPL->size(); }
708
709Status ReadableFile::DoSeek(int64_t pos) { return impl_->IMPL->Seek(pos); }
710
711int ReadableFile::file_descriptor() const { return impl_->IMPL->fd(); }
712
713} // namepace ceph
714} // namespace io
715} // namespace arrow
716
717
718namespace parquet {
719
720class ColumnReader;
721class FileMetaData;
722class PageReader;
723class RandomAccessSource;
724class RowGroupMetaData;
725
726namespace ceph {
727class PARQUET_EXPORT RowGroupReader {
728 public:
729 // Forward declare a virtual class 'Contents' to aid dependency injection and more
730 // easily create test fixtures
731 // An implementation of the Contents class is defined in the .cc file
732 struct Contents {
733 virtual ~Contents() {}
734 virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0;
735 virtual const RowGroupMetaData* metadata() const = 0;
736 virtual const ReaderProperties* properties() const = 0;
737 };
738
739 explicit RowGroupReader(std::unique_ptr<Contents> contents);
740
741 // Returns the rowgroup metadata
742 const RowGroupMetaData* metadata() const;
743
744 // Construct a ColumnReader for the indicated row group-relative
745 // column. Ownership is shared with the RowGroupReader.
746 std::shared_ptr<ColumnReader> Column(int i);
747
748 std::unique_ptr<PageReader> GetColumnPageReader(int i);
749
750 private:
751 // Holds a pointer to an instance of Contents implementation
752 std::unique_ptr<Contents> contents_;
753};
754
755class PARQUET_EXPORT ParquetFileReader {
756 public:
757 // Declare a virtual class 'Contents' to aid dependency injection and more
758 // easily create test fixtures
759 // An implementation of the Contents class is defined in the .cc file
760 struct PARQUET_EXPORT Contents {
761 static std::unique_ptr<Contents> Open(
762 std::shared_ptr<::arrow::io::RandomAccessFile> source,
763 const ReaderProperties& props = default_reader_properties(),
764 std::shared_ptr<FileMetaData> metadata = NULLPTR);
765
766 virtual ~Contents() = default;
767 // Perform any cleanup associated with the file contents
768 virtual void Close() = 0;
769 virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i) = 0;
770 virtual std::shared_ptr<FileMetaData> metadata() const = 0;
771 };
772
773 ParquetFileReader();
774 ~ParquetFileReader();
775
776 // Create a reader from some implementation of parquet-cpp's generic file
777 // input interface
778 //
779 // If you cannot provide exclusive access to your file resource, create a
780 // subclass of RandomAccessSource that wraps the shared resource
781 ARROW_DEPRECATED("Use arrow::io::RandomAccessFile version")
782 static std::unique_ptr<ParquetFileReader> Open(
783 std::unique_ptr<RandomAccessSource> source,
784 const ReaderProperties& props = default_reader_properties(),
785 std::shared_ptr<FileMetaData> metadata = NULLPTR);
786
787 // Create a file reader instance from an Arrow file object. Thread-safety is
788 // the responsibility of the file implementation
789 static std::unique_ptr<ParquetFileReader> Open(
790 std::shared_ptr<::arrow::io::RandomAccessFile> source,
791 const ReaderProperties& props = default_reader_properties(),
792 std::shared_ptr<FileMetaData> metadata = NULLPTR);
793
794 // API Convenience to open a serialized Parquet file on disk, using Arrow IO
795 // interfaces.
796 static std::unique_ptr<ParquetFileReader> OpenFile(
797 const std::string& path,s3selectEngine::rgw_s3select_api* rgw, bool memory_map = true,
798 const ReaderProperties& props = default_reader_properties(),
799 std::shared_ptr<FileMetaData> metadata = NULLPTR
800 );
801
802 void Open(std::unique_ptr<Contents> contents);
803 void Close();
804
805 // The RowGroupReader is owned by the FileReader
806 std::shared_ptr<RowGroupReader> RowGroup(int i);
807
808 // Returns the file metadata. Only one instance is ever created
809 std::shared_ptr<FileMetaData> metadata() const;
810
811 /// Pre-buffer the specified column indices in all row groups.
812 ///
813 /// Readers can optionally call this to cache the necessary slices
814 /// of the file in-memory before deserialization. Arrow readers can
815 /// automatically do this via an option. This is intended to
816 /// increase performance when reading from high-latency filesystems
817 /// (e.g. Amazon S3).
818 ///
819 /// After calling this, creating readers for row groups/column
820 /// indices that were not buffered may fail. Creating multiple
821 /// readers for the a subset of the buffered regions is
822 /// acceptable. This may be called again to buffer a different set
823 /// of row groups/columns.
824 ///
825 /// If memory usage is a concern, note that data will remain
826 /// buffered in memory until either \a PreBuffer() is called again,
827 /// or the reader itself is destructed. Reading - and buffering -
828 /// only one row group at a time may be useful.
829 void PreBuffer(const std::vector<int>& row_groups,
830 const std::vector<int>& column_indices,
831 const ::arrow::io::IOContext& ctx,
832 const ::arrow::io::CacheOptions& options);
833
834 private:
835 // Holds a pointer to an instance of Contents implementation
836 std::unique_ptr<Contents> contents_;
837};
838
839// Read only Parquet file metadata
840std::shared_ptr<FileMetaData> PARQUET_EXPORT
841ReadMetaData(const std::shared_ptr<::arrow::io::RandomAccessFile>& source);
842
843/// \brief Scan all values in file. Useful for performance testing
844/// \param[in] columns the column numbers to scan. If empty scans all
845/// \param[in] column_batch_size number of values to read at a time when scanning column
846/// \param[in] reader a ParquetFileReader instance
847/// \return number of semantic rows in file
848PARQUET_EXPORT
849int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
850 ParquetFileReader* reader);
851
852}//namespace ceph
853}//namespace parquet
854
855
856namespace parquet {
857
858namespace ceph {
859
860// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
861static constexpr int64_t kDefaultFooterReadSize = 64 * 1024;
862static constexpr uint32_t kFooterSize = 8;
863
864// For PARQUET-816
865static constexpr int64_t kMaxDictHeaderSize = 100;
866
867// ----------------------------------------------------------------------
868// RowGroupReader public API
869
870RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents)
871 : contents_(std::move(contents)) {}
872
873std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
874 if (i >= metadata()->num_columns()) {
875 std::stringstream ss;
876 ss << "Trying to read column index " << i << " but row group metadata has only "
877 << metadata()->num_columns() << " columns";
878 throw ParquetException(ss.str());
879 }
880 const ColumnDescriptor* descr = metadata()->schema()->Column(i);
881
882 std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
883 return ColumnReader::Make(
884 descr, std::move(page_reader),
885 const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
886}
887
888std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
889 if (i >= metadata()->num_columns()) {
890 std::stringstream ss;
891 ss << "Trying to read column index " << i << " but row group metadata has only "
892 << metadata()->num_columns() << " columns";
893 throw ParquetException(ss.str());
894 }
895 return contents_->GetColumnPageReader(i);
896}
897
898// Returns the rowgroup metadata
899const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
900
901/// Compute the section of the file that should be read for the given
902/// row group and column chunk.
903::arrow::io::ReadRange ComputeColumnChunkRange(FileMetaData* file_metadata,
904 int64_t source_size, int row_group_index,
905 int column_index) {
906 auto row_group_metadata = file_metadata->RowGroup(row_group_index);
907 auto column_metadata = row_group_metadata->ColumnChunk(column_index);
908
909 int64_t col_start = column_metadata->data_page_offset();
910 if (column_metadata->has_dictionary_page() &&
911 column_metadata->dictionary_page_offset() > 0 &&
912 col_start > column_metadata->dictionary_page_offset()) {
913 col_start = column_metadata->dictionary_page_offset();
914 }
915
916 int64_t col_length = column_metadata->total_compressed_size();
917 // PARQUET-816 workaround for old files created by older parquet-mr
918 const ApplicationVersion& version = file_metadata->writer_version();
919 if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) {
920 // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
921 // dictionary page header size in total_compressed_size and total_uncompressed_size
922 // (see IMPALA-694). We add padding to compensate.
923 int64_t bytes_remaining = source_size - (col_start + col_length);
924 int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
925 col_length += padding;
926 }
927
928 return {col_start, col_length};
929}
930
931// RowGroupReader::Contents implementation for the Parquet file specification
932class SerializedRowGroup : public RowGroupReader::Contents {
933 public:
934 SerializedRowGroup(std::shared_ptr<ArrowInputFile> source,
935 std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source,
936 int64_t source_size, FileMetaData* file_metadata,
937 int row_group_number, const ReaderProperties& props,
938 std::shared_ptr<parquet::InternalFileDecryptor> file_decryptor = nullptr)
939 : source_(std::move(source)),
940 cached_source_(std::move(cached_source)),
941 source_size_(source_size),
942 file_metadata_(file_metadata),
943 properties_(props),
944 row_group_ordinal_(row_group_number),
945 file_decryptor_(file_decryptor) {
946 row_group_metadata_ = file_metadata->RowGroup(row_group_number);
947 }
948
949 const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); }
950
951 const ReaderProperties* properties() const override { return &properties_; }
952
953 std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
954 // Read column chunk from the file
955 auto col = row_group_metadata_->ColumnChunk(i);
956
957 ::arrow::io::ReadRange col_range =
958 ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i);
959 std::shared_ptr<ArrowInputStream> stream;
960 if (cached_source_) {
961 // PARQUET-1698: if read coalescing is enabled, read from pre-buffered
962 // segments.
963 PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
964 stream = std::make_shared<::arrow::io::BufferReader>(buffer);
965 } else {
966 stream = properties_.GetStream(source_, col_range.offset, col_range.length);
967 }
968
969 std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col->crypto_metadata();
970
971 // Column is encrypted only if crypto_metadata exists.
972 if (!crypto_metadata) {
973 return PageReader::Open(stream, col->num_values(), col->compression(),
974 properties_.memory_pool());
975 }
976
977 if (file_decryptor_ == nullptr) {
978 throw ParquetException("RowGroup is noted as encrypted but no file decryptor");
979 }
980
981 constexpr auto kEncryptedRowGroupsLimit = 32767;
982 if (i > kEncryptedRowGroupsLimit) {
983 throw ParquetException("Encrypted files cannot contain more than 32767 row groups");
984 }
985
986 // The column is encrypted
987 std::shared_ptr<::parquet::Decryptor> meta_decryptor;
988 std::shared_ptr<Decryptor> data_decryptor;
989 // The column is encrypted with footer key
990 if (crypto_metadata->encrypted_with_footer_key()) {
991 meta_decryptor = file_decryptor_->GetFooterDecryptorForColumnMeta();
992 data_decryptor = file_decryptor_->GetFooterDecryptorForColumnData();
993
994 CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_,
995 static_cast<int16_t>(i), meta_decryptor, data_decryptor);
996 return PageReader::Open(stream, col->num_values(), col->compression(),
1e59de90
TL
997 #if ARROW_VERSION_MAJOR > 8
998 false,
999 #endif
20effc67
TL
1000 properties_.memory_pool(), &ctx);
1001 }
1002
1003 // The column is encrypted with its own key
1004 std::string column_key_metadata = crypto_metadata->key_metadata();
1005 const std::string column_path = crypto_metadata->path_in_schema()->ToDotString();
1006
1007 meta_decryptor =
1008 file_decryptor_->GetColumnMetaDecryptor(column_path, column_key_metadata);
1009 data_decryptor =
1010 file_decryptor_->GetColumnDataDecryptor(column_path, column_key_metadata);
1011
1012 CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_,
1013 static_cast<int16_t>(i), meta_decryptor, data_decryptor);
1014 return PageReader::Open(stream, col->num_values(), col->compression(),
1e59de90
TL
1015 #if ARROW_VERSION_MAJOR > 8
1016 false,
1017 #endif
20effc67
TL
1018 properties_.memory_pool(), &ctx);
1019 }
1020
1021 private:
1022 std::shared_ptr<ArrowInputFile> source_;
1023 // Will be nullptr if PreBuffer() is not called.
1024 std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source_;
1025 int64_t source_size_;
1026 FileMetaData* file_metadata_;
1027 std::unique_ptr<RowGroupMetaData> row_group_metadata_;
1028 ReaderProperties properties_;
1029 int row_group_ordinal_;
1030 std::shared_ptr<InternalFileDecryptor> file_decryptor_;
1031};
1032
1033// ----------------------------------------------------------------------
1034// SerializedFile: An implementation of ParquetFileReader::Contents that deals
1035// with the Parquet file structure, Thrift deserialization, and other internal
1036// matters
1037
1038// This class takes ownership of the provided data source
1039class SerializedFile : public ParquetFileReader::Contents {
1040 public:
1041 SerializedFile(std::shared_ptr<ArrowInputFile> source,
1042 const ReaderProperties& props = default_reader_properties())
1043 : source_(std::move(source)), properties_(props) {
1044 PARQUET_ASSIGN_OR_THROW(source_size_, source_->GetSize());
1045 }
1046
1047 ~SerializedFile() override {
1048 try {
1049 Close();
1050 } catch (...) {
1051 }
1052 }
1053
1054 void Close() override {
1055 if (file_decryptor_) file_decryptor_->WipeOutDecryptionKeys();
1056 }
1057
1058 std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
1059 std::unique_ptr<SerializedRowGroup> contents(
1060 new SerializedRowGroup(source_, cached_source_, source_size_,
1061 file_metadata_.get(), i, properties_, file_decryptor_));
1062 return std::make_shared<RowGroupReader>(std::move(contents));
1063 }
1064
1065 std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; }
1066
1067 void set_metadata(std::shared_ptr<FileMetaData> metadata) {
1068 file_metadata_ = std::move(metadata);
1069 }
1070
1071 void PreBuffer(const std::vector<int>& row_groups,
1072 const std::vector<int>& column_indices,
1073 const ::arrow::io::IOContext& ctx,
1074 const ::arrow::io::CacheOptions& options) {
1075 cached_source_ =
1076 std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options);
1077 //std::vector<arrow::io::ReadRange> ranges;
1078 std::vector<::arrow::io::ReadRange> ranges;
1079 for (int row : row_groups) {
1080 for (int col : column_indices) {
1081 ranges.push_back(
1082 ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col));
1083 }
1084 }
1085 PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
1086 }
1087
1088 void ParseMetaData() {
1089 if (source_size_ == 0) {
1090 throw ParquetInvalidOrCorruptedFileException("Parquet file size is 0 bytes");
1091 } else if (source_size_ < kFooterSize) {
1092 throw ParquetInvalidOrCorruptedFileException(
1093 "Parquet file size is ", source_size_,
1094 " bytes, smaller than the minimum file footer (", kFooterSize, " bytes)");
1095 }
1096
1097 int64_t footer_read_size = std::min(source_size_, kDefaultFooterReadSize);
1098 PARQUET_ASSIGN_OR_THROW(
1099 auto footer_buffer,
1100 source_->ReadAt(source_size_ - footer_read_size, footer_read_size));
1101
1102 // Check if all bytes are read. Check if last 4 bytes read have the magic bits
1103 if (footer_buffer->size() != footer_read_size ||
1104 (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0 &&
1105 memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) != 0)) {
1106 throw ParquetInvalidOrCorruptedFileException(
1107 "Parquet magic bytes not found in footer. Either the file is corrupted or this "
1108 "is not a parquet file.");
1109 }
1110
1111 if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0) {
1112 // Encrypted file with Encrypted footer.
1113 ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, footer_read_size);
1114 return;
1115 }
1116
1117 // No encryption or encryption with plaintext footer mode.
1118 std::shared_ptr<Buffer> metadata_buffer;
1119 uint32_t metadata_len, read_metadata_len;
1120 ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer,
1121 &metadata_len, &read_metadata_len);
1122
1123 auto file_decryption_properties = properties_.file_decryption_properties().get();
1124 if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
1125 if (file_decryption_properties != nullptr) {
1126 if (!file_decryption_properties->plaintext_files_allowed()) {
1127 throw ParquetException("Applying decryption properties on plaintext file");
1128 }
1129 }
1130 } else {
1131 // Encrypted file with plaintext footer mode.
1132 ParseMetaDataOfEncryptedFileWithPlaintextFooter(
1133 file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len);
1134 }
1135 }
1136
1137 private:
1138 std::shared_ptr<ArrowInputFile> source_;
1139 std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source_;
1140 int64_t source_size_;
1141 std::shared_ptr<FileMetaData> file_metadata_;
1142 ReaderProperties properties_;
1143
1144 std::shared_ptr<::parquet::InternalFileDecryptor> file_decryptor_;
1145
1146 void ParseUnencryptedFileMetadata(const std::shared_ptr<Buffer>& footer_buffer,
1147 int64_t footer_read_size,
1148 std::shared_ptr<Buffer>* metadata_buffer,
1149 uint32_t* metadata_len, uint32_t* read_metadata_len);
1150
1151 std::string HandleAadPrefix(FileDecryptionProperties* file_decryption_properties,
1152 EncryptionAlgorithm& algo);
1153
1154 void ParseMetaDataOfEncryptedFileWithPlaintextFooter(
1155 FileDecryptionProperties* file_decryption_properties,
1156 const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len,
1157 uint32_t read_metadata_len);
1158
1159 void ParseMetaDataOfEncryptedFileWithEncryptedFooter(
1160 const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size);
1161};
1162
1163void SerializedFile::ParseUnencryptedFileMetadata(
1164 const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size,
1165 std::shared_ptr<Buffer>* metadata_buffer, uint32_t* metadata_len,
1166 uint32_t* read_metadata_len) {
1167 *metadata_len = ::arrow::util::SafeLoadAs<uint32_t>(
1168 reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size -
1169 kFooterSize);
1170 int64_t metadata_start = source_size_ - kFooterSize - *metadata_len;
1171 if (*metadata_len > source_size_ - kFooterSize) {
1172 throw ParquetInvalidOrCorruptedFileException(
1173 "Parquet file size is ", source_size_,
1174 " bytes, smaller than the size reported by metadata (", metadata_len, "bytes)");
1175 }
1176
1177 // Check if the footer_buffer contains the entire metadata
1178 if (footer_read_size >= (*metadata_len + kFooterSize)) {
1179 *metadata_buffer = SliceBuffer(
1180 footer_buffer, footer_read_size - *metadata_len - kFooterSize, *metadata_len);
1181 } else {
1182 PARQUET_ASSIGN_OR_THROW(*metadata_buffer,
1183 source_->ReadAt(metadata_start, *metadata_len));
1184 if ((*metadata_buffer)->size() != *metadata_len) {
1185 throw ParquetException("Failed reading metadata buffer (requested " +
1186 std::to_string(*metadata_len) + " bytes but got " +
1187 std::to_string((*metadata_buffer)->size()) + " bytes)");
1188 }
1189 }
1190
1191 *read_metadata_len = *metadata_len;
1192 file_metadata_ = FileMetaData::Make((*metadata_buffer)->data(), read_metadata_len);
1193}
1194
1195void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(
1196 const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size) {
1197 // encryption with encrypted footer
1198 // both metadata & crypto metadata length
1199 uint32_t footer_len = ::arrow::util::SafeLoadAs<uint32_t>(
1200 reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size -
1201 kFooterSize);
1202 int64_t crypto_metadata_start = source_size_ - kFooterSize - footer_len;
1203 if (kFooterSize + footer_len > source_size_) {
1204 throw ParquetInvalidOrCorruptedFileException(
1205 "Parquet file size is ", source_size_,
1206 " bytes, smaller than the size reported by footer's (", footer_len, "bytes)");
1207 }
1208 std::shared_ptr<Buffer> crypto_metadata_buffer;
1209 // Check if the footer_buffer contains the entire metadata
1210 if (footer_read_size >= (footer_len + kFooterSize)) {
1211 crypto_metadata_buffer = SliceBuffer(
1212 footer_buffer, footer_read_size - footer_len - kFooterSize, footer_len);
1213 } else {
1214 PARQUET_ASSIGN_OR_THROW(crypto_metadata_buffer,
1215 source_->ReadAt(crypto_metadata_start, footer_len));
1216 if (crypto_metadata_buffer->size() != footer_len) {
1217 throw ParquetException("Failed reading encrypted metadata buffer (requested " +
1218 std::to_string(footer_len) + " bytes but got " +
1219 std::to_string(crypto_metadata_buffer->size()) + " bytes)");
1220 }
1221 }
1222 auto file_decryption_properties = properties_.file_decryption_properties().get();
1223 if (file_decryption_properties == nullptr) {
1224 throw ParquetException(
1225 "Could not read encrypted metadata, no decryption found in reader's properties");
1226 }
1227 uint32_t crypto_metadata_len = footer_len;
1228 std::shared_ptr<FileCryptoMetaData> file_crypto_metadata =
1229 FileCryptoMetaData::Make(crypto_metadata_buffer->data(), &crypto_metadata_len);
1230 // Handle AAD prefix
1231 EncryptionAlgorithm algo = file_crypto_metadata->encryption_algorithm();
1232 std::string file_aad = HandleAadPrefix(file_decryption_properties, algo);
1233 file_decryptor_ = std::make_shared<::parquet::InternalFileDecryptor>(
1234 file_decryption_properties, file_aad, algo.algorithm,
1235 file_crypto_metadata->key_metadata(), properties_.memory_pool());
1236
1237 int64_t metadata_offset = source_size_ - kFooterSize - footer_len + crypto_metadata_len;
1238 uint32_t metadata_len = footer_len - crypto_metadata_len;
1239 PARQUET_ASSIGN_OR_THROW(auto metadata_buffer,
1240 source_->ReadAt(metadata_offset, metadata_len));
1241 if (metadata_buffer->size() != metadata_len) {
1242 throw ParquetException("Failed reading metadata buffer (requested " +
1243 std::to_string(metadata_len) + " bytes but got " +
1244 std::to_string(metadata_buffer->size()) + " bytes)");
1245 }
1246
1247 file_metadata_ =
1e59de90
TL
1248 FileMetaData::Make(metadata_buffer->data(), &metadata_len, file_decryptor_);
1249 //FileMetaData::Make(metadata_buffer->data(), &metadata_len, default_reader_properties(), file_decryptor_); //version>9
20effc67
TL
1250}
1251
1252void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter(
1253 FileDecryptionProperties* file_decryption_properties,
1254 const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len,
1255 uint32_t read_metadata_len) {
1256 // Providing decryption properties in plaintext footer mode is not mandatory, for
1257 // example when reading by legacy reader.
1258 if (file_decryption_properties != nullptr) {
1259 EncryptionAlgorithm algo = file_metadata_->encryption_algorithm();
1260 // Handle AAD prefix
1261 std::string file_aad = HandleAadPrefix(file_decryption_properties, algo);
1262 file_decryptor_ = std::make_shared<::parquet::InternalFileDecryptor>(
1263 file_decryption_properties, file_aad, algo.algorithm,
1264 file_metadata_->footer_signing_key_metadata(), properties_.memory_pool());
1265 // set the InternalFileDecryptor in the metadata as well, as it's used
1266 // for signature verification and for ColumnChunkMetaData creation.
1267#if GAL_set_file_decryptor_declare_private
1268 file_metadata_->set_file_decryptor(file_decryptor_);
1269#endif
1270 if (file_decryption_properties->check_plaintext_footer_integrity()) {
1271 if (metadata_len - read_metadata_len !=
1272 (parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength)) {
1273 throw ParquetInvalidOrCorruptedFileException(
1274 "Failed reading metadata for encryption signature (requested ",
1275 parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength,
1276 " bytes but have ", metadata_len - read_metadata_len, " bytes)");
1277 }
1278
1279 if (!file_metadata_->VerifySignature(metadata_buffer->data() + read_metadata_len)) {
1280 throw ParquetInvalidOrCorruptedFileException(
1281 "Parquet crypto signature verification failed");
1282 }
1283 }
1284 }
1285}
1286
1287std::string SerializedFile::HandleAadPrefix(
1288 FileDecryptionProperties* file_decryption_properties, EncryptionAlgorithm& algo) {
1289 std::string aad_prefix_in_properties = file_decryption_properties->aad_prefix();
1290 std::string aad_prefix = aad_prefix_in_properties;
1291 bool file_has_aad_prefix = algo.aad.aad_prefix.empty() ? false : true;
1292 std::string aad_prefix_in_file = algo.aad.aad_prefix;
1293
1294 if (algo.aad.supply_aad_prefix && aad_prefix_in_properties.empty()) {
1295 throw ParquetException(
1296 "AAD prefix used for file encryption, "
1297 "but not stored in file and not supplied "
1298 "in decryption properties");
1299 }
1300
1301 if (file_has_aad_prefix) {
1302 if (!aad_prefix_in_properties.empty()) {
1303 if (aad_prefix_in_properties.compare(aad_prefix_in_file) != 0) {
1304 throw ParquetException(
1305 "AAD Prefix in file and in properties "
1306 "is not the same");
1307 }
1308 }
1309 aad_prefix = aad_prefix_in_file;
1310 std::shared_ptr<AADPrefixVerifier> aad_prefix_verifier =
1311 file_decryption_properties->aad_prefix_verifier();
1312 if (aad_prefix_verifier != nullptr) aad_prefix_verifier->Verify(aad_prefix);
1313 } else {
1314 if (!algo.aad.supply_aad_prefix && !aad_prefix_in_properties.empty()) {
1315 throw ParquetException(
1316 "AAD Prefix set in decryption properties, but was not used "
1317 "for file encryption");
1318 }
1319 std::shared_ptr<AADPrefixVerifier> aad_prefix_verifier =
1320 file_decryption_properties->aad_prefix_verifier();
1321 if (aad_prefix_verifier != nullptr) {
1322 throw ParquetException(
1323 "AAD Prefix Verifier is set, but AAD Prefix not found in file");
1324 }
1325 }
1326 return aad_prefix + algo.aad.aad_file_unique;
1327}
1328
1329// ----------------------------------------------------------------------
1330// ParquetFileReader public API
1331
1332ParquetFileReader::ParquetFileReader() {}
1333
1334ParquetFileReader::~ParquetFileReader() {
1335 try {
1336 Close();
1337 } catch (...) {
1338 }
1339}
1340
1341// Open the file. If no metadata is passed, it is parsed from the footer of
1342// the file
1343std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
1344 std::shared_ptr<ArrowInputFile> source, const ReaderProperties& props,
1345 std::shared_ptr<FileMetaData> metadata) {
1346 std::unique_ptr<ParquetFileReader::Contents> result(
1347 new SerializedFile(std::move(source), props));
1348
1349 // Access private methods here, but otherwise unavailable
1350 SerializedFile* file = static_cast<SerializedFile*>(result.get());
1351
1352 if (metadata == nullptr) {
1353 // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
1354 file->ParseMetaData();
1355 } else {
1356 file->set_metadata(std::move(metadata));
1357 }
1358
1359 return result;
1360}
1361
1362std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
1363 std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props,
1364 std::shared_ptr<FileMetaData> metadata) {
1365 auto contents = SerializedFile::Open(std::move(source), props, std::move(metadata));
1366 std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
1367 result->Open(std::move(contents));
1368 return result;
1369}
1370
1371#if GAL_NOT_IMPLEMENTED
1372std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
1373 std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
1374 std::shared_ptr<FileMetaData> metadata) {
1375 auto wrapper = std::make_shared<ParquetInputWrapper>(std::move(source));
1376 return Open(std::move(wrapper), props, std::move(metadata));
1377}
1378#endif
1379
1380std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
1381 const std::string& path, s3selectEngine::rgw_s3select_api* rgw, bool memory_map, const ReaderProperties& props,
1382 std::shared_ptr<FileMetaData> metadata) {
1383 std::shared_ptr<::arrow::io::RandomAccessFile> source;
1384 if (memory_map) {
1385 PARQUET_ASSIGN_OR_THROW(
1386 source, ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ));//GAL change that also, or to remove?
1387 } else {
1388 PARQUET_ASSIGN_OR_THROW(source,
1389 ::arrow::io::ceph::ReadableFile::Open(path, rgw, props.memory_pool()));
1390 }
1391
1392 return Open(std::move(source), props, std::move(metadata));
1393}
1394
1395void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
1396 contents_ = std::move(contents);
1397}
1398
1399void ParquetFileReader::Close() {
1400 if (contents_) {
1401 contents_->Close();
1402 }
1403}
1404
1405std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const {
1406 return contents_->metadata();
1407}
1408
1409std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
1410 if (i >= metadata()->num_row_groups()) {
1411 std::stringstream ss;
1412 ss << "Trying to read row group " << i << " but file only has "
1413 << metadata()->num_row_groups() << " row groups";
1414 throw ParquetException(ss.str());
1415 }
1416 return contents_->GetRowGroup(i);
1417}
1418
1419void ParquetFileReader::PreBuffer(const std::vector<int>& row_groups,
1420 const std::vector<int>& column_indices,
1421 const ::arrow::io::IOContext& ctx,
1422 const ::arrow::io::CacheOptions& options) {
1423 // Access private methods here
1424 SerializedFile* file =
1425 ::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
1426 file->PreBuffer(row_groups, column_indices, ctx, options);
1427}
1428
1429// ----------------------------------------------------------------------
1430// File metadata helpers
1431
1432std::shared_ptr<FileMetaData> ReadMetaData(
1433 const std::shared_ptr<::arrow::io::RandomAccessFile>& source) {
1434 return ParquetFileReader::Open(source)->metadata();
1435}
1436
1437// ----------------------------------------------------------------------
1438// File scanner for performance testing
1439#if GAL_ScanAllValues_is_no_declare
1440int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
1441 ParquetFileReader* reader) {
1442 std::vector<int16_t> rep_levels(column_batch_size);
1443 std::vector<int16_t> def_levels(column_batch_size);
1444
1445 int num_columns = static_cast<int>(columns.size());
1446
1447 // columns are not specified explicitly. Add all columns
1448 if (columns.size() == 0) {
1449 num_columns = reader->metadata()->num_columns();
1450 columns.resize(num_columns);
1451 for (int i = 0; i < num_columns; i++) {
1452 columns[i] = i;
1453 }
1454 }
1455
1456 std::vector<int64_t> total_rows(num_columns, 0);
1457
1458 for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
1459 auto group_reader = reader->RowGroup(r);
1460 int col = 0;
1461 for (auto i : columns) {
1462 std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
1463 size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
1464 std::vector<uint8_t> values(column_batch_size * value_byte_size);
1465
1466 int64_t values_read = 0;
1467 while (col_reader->HasNext()) {
1468 int64_t levels_read =
1469 ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
1470 values.data(), &values_read, col_reader.get());
1471 if (col_reader->descr()->max_repetition_level() > 0) {
1472 for (int64_t i = 0; i < levels_read; i++) {
1473 if (rep_levels[i] == 0) {
1474 total_rows[col]++;
1475 }
1476 }
1477 } else {
1478 total_rows[col] += levels_read;
1479 }
1480 }
1481 col++;
1482 }
1483 }
1484
1485 for (int i = 1; i < num_columns; ++i) {
1486 if (total_rows[0] != total_rows[i]) {
1487 throw ParquetException("Parquet error: Total rows among columns do not match");
1488 }
1489 }
1490
1491 return total_rows[0];
1492}
1493#endif
1494
1495} //namespace ceph
1496} //namespace parquet
1497
1498/******************************************/
1499/******************************************/
1500/******************************************/
1501class column_reader_wrap
1502{
1503
1504private:
1505
1506 int64_t m_rownum;
1507 parquet::Type::type m_type;
1508 std::shared_ptr<parquet::ceph::RowGroupReader> m_row_group_reader;
1509 int m_row_grouop_id;
1510 uint16_t m_col_id;
1511 parquet::ceph::ParquetFileReader* m_parquet_reader;
1512 std::shared_ptr<parquet::ColumnReader> m_ColumnReader;
1513 bool m_end_of_stream;
1514 bool m_read_last_value;
1515
1516
1517public:
1518
1519 enum class parquet_type
1520 {
1e59de90 1521 NA_TYPE,
20effc67
TL
1522 STRING,
1523 INT32,
1524 INT64,
1e59de90 1525 FLOAT,
20effc67
TL
1526 DOUBLE,
1527 TIMESTAMP,
1528 PARQUET_NULL
1529 };
1530
1e59de90 1531 struct parquet_value
20effc67
TL
1532 {
1533 int64_t num;
1534 char *str; //str is pointing to offset in string which is NOT null terminated.
1535 uint16_t str_len;
1536 double dbl;
1537 parquet_type type;
1e59de90
TL
1538
1539 parquet_value():type(parquet_type::NA_TYPE){}
1540 };
1541
1542 typedef struct parquet_value parquet_value_t;
1543
1544 enum class parquet_column_read_state {PARQUET_OUT_OF_RANGE,PARQUET_READ_OK};
20effc67
TL
1545
1546 private:
1547 parquet_value_t m_last_value;
1548
1549 public:
1550 column_reader_wrap(std::unique_ptr<parquet::ceph::ParquetFileReader> & parquet_reader,uint16_t col_id);
1551
1552 parquet::Type::type get_type();
1553
1554 bool HasNext();//TODO template
1555
1556 int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
1557 parquet_value_t* values, int64_t* values_read);
1558
1559 int64_t Skip(int64_t rows_to_skip);
1560
1e59de90 1561 parquet_column_read_state Read(uint64_t rownum,parquet_value_t & value);
20effc67
TL
1562
1563};
1564
1565class parquet_file_parser
1566{
1567
1568public:
1569
1570 typedef std::vector<std::pair<std::string, column_reader_wrap::parquet_type>> schema_t;
1571 typedef std::set<uint16_t> column_pos_t;
1572 typedef std::vector<column_reader_wrap::parquet_value_t> row_values_t;
1573
1574 typedef column_reader_wrap::parquet_value_t parquet_value_t;
1575 typedef column_reader_wrap::parquet_type parquet_type;
1576
1577private:
1578
1579 std::string m_parquet_file_name;
1580 uint32_t m_num_of_columms;
1581 uint64_t m_num_of_rows;
1582 uint64_t m_rownum;
1583 schema_t m_schm;
1584 int m_num_row_groups;
1585 std::shared_ptr<parquet::FileMetaData> m_file_metadata;
1586 std::unique_ptr<parquet::ceph::ParquetFileReader> m_parquet_reader;
1587 std::vector<column_reader_wrap*> m_column_readers;
1588 s3selectEngine::rgw_s3select_api* m_rgw_s3select_api;
1589
1590 public:
1591
1592 parquet_file_parser(std::string parquet_file_name,s3selectEngine::rgw_s3select_api* rgw_api) :
1593 m_parquet_file_name(parquet_file_name),
1594 m_num_of_columms(0),
1595 m_num_of_rows(0),
1596 m_rownum(0),
1597 m_num_row_groups(0),
1598 m_rgw_s3select_api(rgw_api)
1599
1600
1601 {
1602 load_meta_data();
1603 }
1604
1605 ~parquet_file_parser()
1606 {
1607 for(auto r : m_column_readers)
1608 {
1609 delete r;
1610 }
1611 }
1612
1613 int load_meta_data()
1614 {
1615 m_parquet_reader = parquet::ceph::ParquetFileReader::OpenFile(m_parquet_file_name,m_rgw_s3select_api,false);
1616 m_file_metadata = m_parquet_reader->metadata();
1617 m_num_of_columms = m_parquet_reader->metadata()->num_columns();
1618 m_num_row_groups = m_file_metadata->num_row_groups();
1619 m_num_of_rows = m_file_metadata->num_rows();
1620
1621 for (uint32_t i = 0; i < m_num_of_columms; i++)
1622 {
1623 parquet::Type::type tp = m_file_metadata->schema()->Column(i)->physical_type();
1624 std::pair<std::string, column_reader_wrap::parquet_type> elm;
1625
1626 switch (tp)
1627 {
1628 case parquet::Type::type::INT32:
1629 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::INT32);
1630 m_schm.push_back(elm);
1631 break;
1632
1633 case parquet::Type::type::INT64:
1634 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::INT64);
1635 m_schm.push_back(elm);
1636 break;
1637
1e59de90
TL
1638 case parquet::Type::type::FLOAT:
1639 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::FLOAT);
1640 m_schm.push_back(elm);
1641 break;
1642
20effc67
TL
1643 case parquet::Type::type::DOUBLE:
1644 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::DOUBLE);
1645 m_schm.push_back(elm);
1646 break;
1647
1648 case parquet::Type::type::BYTE_ARRAY:
1649 elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::STRING);
1650 m_schm.push_back(elm);
1651 break;
1652
1653 default:
1654 {
1655 std::stringstream err;
1656 err << "some parquet type not supported";
1657 throw std::runtime_error(err.str());
1658 }
1659 }
1660
1661 m_column_readers.push_back(new column_reader_wrap(m_parquet_reader,i));
1662 }
1663
1664 return 0;
1665 }
1666
1667 bool end_of_stream()
1668 {
1669
1e59de90 1670 if (m_rownum > (m_num_of_rows-1))
20effc67
TL
1671 return true;
1672 return false;
1673 }
1674
1675 uint64_t get_number_of_rows()
1676 {
1677 return m_num_of_rows;
1678 }
1679
1e59de90
TL
1680 uint64_t rownum()
1681 {
1682 return m_rownum;
1683 }
1684
20effc67
TL
1685 bool increase_rownum()
1686 {
1687 if (end_of_stream())
1688 return false;
1689
1690 m_rownum++;
1691 return true;
1692 }
1693
1694 uint64_t get_rownum()
1695 {
1696 return m_rownum;
1697 }
1698
1699 uint32_t get_num_of_columns()
1700 {
1701 return m_num_of_columms;
1702 }
1703
1704 int get_column_values_by_positions(column_pos_t positions, row_values_t &row_values)
1705 {
1706 column_reader_wrap::parquet_value_t column_value;
1707 row_values.clear();
1708
1709 for(auto col : positions)
1710 {
1711 if((col)>=m_num_of_columms)
1712 {//TODO should verified upon syntax phase
1713 //TODO throw exception
1714 return -1;
1715 }
1e59de90
TL
1716 auto status = m_column_readers[col]->Read(m_rownum,column_value);
1717 if(status == column_reader_wrap::parquet_column_read_state::PARQUET_OUT_OF_RANGE) return -1;
20effc67
TL
1718 row_values.push_back(column_value);//TODO intensive (should move)
1719 }
1720 return 0;
1721 }
1722
1723 schema_t get_schema()
1724 {
1725 return m_schm;
1726 }
1727};
1728
1729/******************************************/
1730
1731
1732 column_reader_wrap::column_reader_wrap(std::unique_ptr<parquet::ceph::ParquetFileReader> & parquet_reader,uint16_t col_id):
1733 m_rownum(-1),
1734 m_type(parquet::Type::type::UNDEFINED),
1735 m_row_grouop_id(0),
1736 m_col_id(col_id),
1737 m_end_of_stream(false),
1738 m_read_last_value(false)
1739 {
1740 m_parquet_reader = parquet_reader.get();
1741 m_row_group_reader = m_parquet_reader->RowGroup(m_row_grouop_id);
1742 m_ColumnReader = m_row_group_reader->Column(m_col_id);
1743 }
1744
1745 parquet::Type::type column_reader_wrap::get_type()
1746 {//TODO if UNDEFINED
1747 return m_parquet_reader->metadata()->schema()->Column(m_col_id)->physical_type();
1748 }
1749
1750 bool column_reader_wrap::HasNext()//TODO template
1751 {
1752 parquet::Int32Reader* int32_reader;
1753 parquet::Int64Reader* int64_reader;
1e59de90 1754 parquet::FloatReader* float_reader;
20effc67
TL
1755 parquet::DoubleReader* double_reader;
1756 parquet::ByteArrayReader* byte_array_reader;
1757
1758 switch (get_type())
1759 {
1760 case parquet::Type::type::INT32:
1761 int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get());
1762 return int32_reader->HasNext();
1763 break;
1764
1765 case parquet::Type::type::INT64:
1766 int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get());
1767 return int64_reader->HasNext();
1768 break;
1769
1e59de90
TL
1770 case parquet::Type::type::FLOAT:
1771 float_reader = static_cast<parquet::FloatReader *>(m_ColumnReader.get());
1772 return float_reader->HasNext();
1773 break;
1774
20effc67
TL
1775 case parquet::Type::type::DOUBLE:
1776 double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get());
1777 return double_reader->HasNext();
1778 break;
1779
1780 case parquet::Type::type::BYTE_ARRAY:
1781 byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get());
1782 return byte_array_reader->HasNext();
1783 break;
1784
1785 default:
1e59de90
TL
1786
1787 std::stringstream err;
1788 err << "HasNext():" << "wrong type or type not exist" << std::endl;
1789 throw std::runtime_error(err.str());
1790
20effc67
TL
1791 return false;
1792 //TODO throw exception
1793 }
1794
1795 return false;
1796 }
1797
1798 int64_t column_reader_wrap::ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
1799 parquet_value_t* values, int64_t* values_read)
1800 {
1801 parquet::Int32Reader* int32_reader;
1802 parquet::Int64Reader* int64_reader;
1e59de90 1803 parquet::FloatReader* float_reader;
20effc67
TL
1804 parquet::DoubleReader* double_reader;
1805 parquet::ByteArrayReader* byte_array_reader;
1806
1807 parquet::ByteArray str_value;
1808 int64_t rows_read;
1809 int32_t i32_val;
1810
1811 auto error_msg = [&](std::exception &e)
1812 {
1813 std::stringstream err;
1814 err << "what() :" << e.what() << std::endl;
1815 err << "failed to parse column id:" << this->m_col_id << " name:" <<this->m_parquet_reader->metadata()->schema()->Column(m_col_id)->name();
1816 return err;
1817 };
1818 int16_t defintion_level;
1819 int16_t repeat_level;
1820
1821 switch (get_type())
1822 {
1823 case parquet::Type::type::INT32:
1824 int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get());
1825 try {
1e59de90
TL
1826 rows_read = int32_reader->ReadBatch(1, &defintion_level, &repeat_level, &i32_val , values_read);
1827 if(defintion_level == 0)
1828 {
1829 values->type = column_reader_wrap::parquet_type::PARQUET_NULL;
1830 } else
1831 {
1832 values->num = i32_val;
1833 values->type = column_reader_wrap::parquet_type::INT32;
1834 }
20effc67
TL
1835 }
1836 catch(std::exception &e)
1837 {
1838 throw std::runtime_error(error_msg(e).str());
1839 }
1840
20effc67
TL
1841 break;
1842
1843 case parquet::Type::type::INT64:
1844 int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get());
1845 try{
1e59de90
TL
1846 rows_read = int64_reader->ReadBatch(1, &defintion_level, &repeat_level, (int64_t *)&(values->num), values_read);
1847 if(defintion_level == 0)
1848 {
1849 values->type = column_reader_wrap::parquet_type::PARQUET_NULL;
1850 } else
1851 {
1852 auto logical_type = m_parquet_reader->metadata()->schema()->Column(m_col_id)->logical_type();
1853
1854 if (logical_type.get()->type() == parquet::LogicalType::Type::type::TIMESTAMP) //TODO missing sub-type (milli,micro)
1855 values->type = column_reader_wrap::parquet_type::TIMESTAMP;
1856 else
1857 values->type = column_reader_wrap::parquet_type::INT64;
1858 }
20effc67
TL
1859 }
1860 catch(std::exception &e)
1861 {
1862 throw std::runtime_error(error_msg(e).str());
1863 }
20effc67
TL
1864 break;
1865
1e59de90
TL
1866 case parquet::Type::type::FLOAT:
1867 float_reader = static_cast<parquet::FloatReader *>(m_ColumnReader.get());
20effc67 1868 try{
1e59de90
TL
1869 float data_source_float = 0;
1870 rows_read = float_reader->ReadBatch(1, &defintion_level, &repeat_level, &data_source_float , values_read);//TODO proper cast
1871 if(defintion_level == 0)
1872 {
1873 values->type = column_reader_wrap::parquet_type::PARQUET_NULL;
1874 } else
1875 {
1876 values->type = column_reader_wrap::parquet_type::DOUBLE;
1877 values->dbl = data_source_float;
1878
1879 }
1880 }
1881 catch(std::exception &e)
1882 {
1883 throw std::runtime_error(error_msg(e).str());
1884 }
1885 break;
1886
1887 case parquet::Type::type::DOUBLE:
20effc67 1888 double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get());
1e59de90
TL
1889 try{
1890 rows_read = double_reader->ReadBatch(1, &defintion_level, &repeat_level, (double *)&(values->dbl), values_read);
1891 if(defintion_level == 0)
1892 {
1893 values->type = column_reader_wrap::parquet_type::PARQUET_NULL;
1894 } else
1895 {
1896 values->type = column_reader_wrap::parquet_type::DOUBLE;
1897 }
20effc67
TL
1898 }
1899 catch(std::exception &e)
1900 {
1901 throw std::runtime_error(error_msg(e).str());
1902 }
20effc67
TL
1903 break;
1904
1905 case parquet::Type::type::BYTE_ARRAY:
1906 byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get());
1907 try{
1908 rows_read = byte_array_reader->ReadBatch(1, &defintion_level, &repeat_level, &str_value , values_read);
1e59de90
TL
1909 if(defintion_level == 0)
1910 {
1911 values->type = column_reader_wrap::parquet_type::PARQUET_NULL;
1912 } else
1913 {
1914 values->type = column_reader_wrap::parquet_type::STRING;
1915 values->str = (char*)str_value.ptr;
1916 values->str_len = str_value.len;
1917 }
20effc67
TL
1918 }
1919 catch(std::exception &e)
1920 {
1921 throw std::runtime_error(error_msg(e).str());
1922 }
20effc67
TL
1923 break;
1924
1925 default:
1926 {
1927 std::stringstream err;
1928 err << "wrong type" << std::endl;
1929 throw std::runtime_error(err.str());
1930 }
1931
1932 }
1933
1934 return rows_read;
1935 }
1936
1937 int64_t column_reader_wrap::Skip(int64_t rows_to_skip)
1938 {
1939 parquet::Int32Reader* int32_reader;
1940 parquet::Int64Reader* int64_reader;
1941 parquet::DoubleReader* double_reader;
1e59de90 1942 parquet::FloatReader* float_reader;
20effc67
TL
1943 parquet::ByteArrayReader* byte_array_reader;
1944
1945 parquet::ByteArray str_value;
1946 int64_t rows_read;
1947
1948 auto error_msg = [&](std::exception &e)
1949 {
1950 std::stringstream err;
1951 err << "what() :" << e.what() << std::endl;
1952 err << "failed to parse column id:" << this->m_col_id << " name:" <<this->m_parquet_reader->metadata()->schema()->Column(m_col_id)->name();
1953 return err;
1954 };
1955
1956 switch (get_type())
1957 {
1958 case parquet::Type::type::INT32:
1959 int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get());
1960 try{
1961 rows_read = int32_reader->Skip(rows_to_skip);
1962 }
1963 catch(std::exception &e)
1964 {
1965 throw std::runtime_error(error_msg(e).str());
1966 }
1967 break;
1968
1969 case parquet::Type::type::INT64:
1970 int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get());
1971 try{
1972 rows_read = int64_reader->Skip(rows_to_skip);
1973 }
1974 catch(std::exception &e)
1975 {
1976 throw std::runtime_error(error_msg(e).str());
1977 }
1978 break;
1979
1e59de90
TL
1980 case parquet::Type::type::FLOAT:
1981 float_reader = static_cast<parquet::FloatReader *>(m_ColumnReader.get());
1982 try {
1983 rows_read = float_reader->Skip(rows_to_skip);
1984 }
1985 catch(std::exception &e)
1986 {
1987 throw std::runtime_error(error_msg(e).str());
1988 }
1989 break;
1990
20effc67
TL
1991 case parquet::Type::type::DOUBLE:
1992 double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get());
1993 try {
1994 rows_read = double_reader->Skip(rows_to_skip);
1995 }
1996 catch(std::exception &e)
1997 {
1998 throw std::runtime_error(error_msg(e).str());
1999 }
2000 break;
2001
2002 case parquet::Type::type::BYTE_ARRAY:
2003 byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get());
2004 try{
2005 rows_read = byte_array_reader->Skip(rows_to_skip);
2006 }
2007 catch(std::exception &e)
2008 {
2009 throw std::runtime_error(error_msg(e).str());
2010 }
2011 break;
2012
2013 default:
2014 {
2015 std::stringstream err;
2016 err << "wrong type" << std::endl;
2017 throw std::runtime_error(err.str());
2018 }
2019 }
2020
2021 return rows_read;
2022 }
2023
1e59de90
TL
2024
2025 column_reader_wrap::parquet_column_read_state column_reader_wrap::Read(const uint64_t rownum,parquet_value_t & value)
20effc67
TL
2026 {
2027 int64_t values_read = 0;
2028
2029 if (m_rownum < (int64_t)rownum)
2030 { //should skip
2031 m_read_last_value = false;
2032
1e59de90 2033 //TODO what about Skip(0)
20effc67
TL
2034 uint64_t skipped_rows = Skip(rownum - m_rownum -1);
2035 m_rownum += skipped_rows;
2036
2037 while (((m_rownum+1) < (int64_t)rownum) || HasNext() == false)
2038 {
2039 uint64_t skipped_rows = Skip(rownum - m_rownum -1);
2040 m_rownum += skipped_rows;
2041
2042 if (HasNext() == false)
2043 {
2044 if ((m_row_grouop_id + 1) >= m_parquet_reader->metadata()->num_row_groups())
2045 {
2046 m_end_of_stream = true;
1e59de90 2047 return column_reader_wrap::parquet_column_read_state::PARQUET_OUT_OF_RANGE;//end-of-stream
20effc67
TL
2048 }
2049 else
2050 {
2051 m_row_grouop_id++;
2052 m_row_group_reader = m_parquet_reader->RowGroup(m_row_grouop_id);
2053 m_ColumnReader = m_row_group_reader->Column(m_col_id);
2054 }
2055 }
2056 } //end-while
2057
2058 ReadBatch(1, nullptr, nullptr, &m_last_value, &values_read);
2059 m_read_last_value = true;
2060 m_rownum++;
2061 value = m_last_value;
2062 }
2063 else
2064 {
2065 if (m_read_last_value == false)
2066 {
2067 ReadBatch(1, nullptr, nullptr, &m_last_value, &values_read);
2068 m_read_last_value = true;
2069 m_rownum++;
2070 }
2071
2072 value = m_last_value;
2073 }
2074
1e59de90 2075 return column_reader_wrap::parquet_column_read_state::PARQUET_READ_OK;
20effc67
TL
2076 }
2077
2078#endif
2079