]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/parquet/column_scanner.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / column_scanner.h
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#pragma once
19
20#include <stdio.h>
21
22#include <cstdint>
23#include <memory>
24#include <ostream>
25#include <string>
26#include <utility>
27#include <vector>
28
29#include "parquet/column_reader.h"
30#include "parquet/exception.h"
31#include "parquet/platform.h"
32#include "parquet/schema.h"
33#include "parquet/types.h"
34
35namespace parquet {
36
37static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
38
39class PARQUET_EXPORT Scanner {
40 public:
41 explicit Scanner(std::shared_ptr<ColumnReader> reader,
42 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
43 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
44 : batch_size_(batch_size),
45 level_offset_(0),
46 levels_buffered_(0),
47 value_buffer_(AllocateBuffer(pool)),
48 value_offset_(0),
49 values_buffered_(0),
50 reader_(std::move(reader)) {
51 def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0);
52 rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0);
53 }
54
55 virtual ~Scanner() {}
56
57 static std::shared_ptr<Scanner> Make(
58 std::shared_ptr<ColumnReader> col_reader,
59 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
60 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
61
62 virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0;
63
64 bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
65
66 const ColumnDescriptor* descr() const { return reader_->descr(); }
67
68 int64_t batch_size() const { return batch_size_; }
69
70 void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
71
72 protected:
73 int64_t batch_size_;
74
75 std::vector<int16_t> def_levels_;
76 std::vector<int16_t> rep_levels_;
77 int level_offset_;
78 int levels_buffered_;
79
80 std::shared_ptr<ResizableBuffer> value_buffer_;
81 int value_offset_;
82 int64_t values_buffered_;
83 std::shared_ptr<ColumnReader> reader_;
84};
85
86template <typename DType>
87class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner {
88 public:
89 typedef typename DType::c_type T;
90
91 explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
92 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
93 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
94 : Scanner(std::move(reader), batch_size, pool) {
95 typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader_.get());
96 int value_byte_size = type_traits<DType::type_num>::value_byte_size;
97 PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
98 values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
99 }
100
101 virtual ~TypedScanner() {}
102
103 bool NextLevels(int16_t* def_level, int16_t* rep_level) {
104 if (level_offset_ == levels_buffered_) {
105 levels_buffered_ = static_cast<int>(
106 typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(),
107 rep_levels_.data(), values_, &values_buffered_));
108
109 value_offset_ = 0;
110 level_offset_ = 0;
111 if (!levels_buffered_) {
112 return false;
113 }
114 }
115 *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
116 *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
117 level_offset_++;
118 return true;
119 }
120
121 bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
122 if (level_offset_ == levels_buffered_) {
123 if (!HasNext()) {
124 // Out of data pages
125 return false;
126 }
127 }
128
129 NextLevels(def_level, rep_level);
130 *is_null = *def_level < descr()->max_definition_level();
131
132 if (*is_null) {
133 return true;
134 }
135
136 if (value_offset_ == values_buffered_) {
137 throw ParquetException("Value was non-null, but has not been buffered");
138 }
139 *val = values_[value_offset_++];
140 return true;
141 }
142
143 // Returns true if there is a next value
144 bool NextValue(T* val, bool* is_null) {
145 if (level_offset_ == levels_buffered_) {
146 if (!HasNext()) {
147 // Out of data pages
148 return false;
149 }
150 }
151
152 // Out of values
153 int16_t def_level = -1;
154 int16_t rep_level = -1;
155 NextLevels(&def_level, &rep_level);
156 *is_null = def_level < descr()->max_definition_level();
157
158 if (*is_null) {
159 return true;
160 }
161
162 if (value_offset_ == values_buffered_) {
163 throw ParquetException("Value was non-null, but has not been buffered");
164 }
165 *val = values_[value_offset_++];
166 return true;
167 }
168
169 virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) {
170 T val{};
171 int16_t def_level = -1;
172 int16_t rep_level = -1;
173 bool is_null = false;
174 char buffer[80];
175
176 if (!Next(&val, &def_level, &rep_level, &is_null)) {
177 throw ParquetException("No more values buffered");
178 }
179
180 if (with_levels) {
181 out << " D:" << def_level << " R:" << rep_level << " ";
182 if (!is_null) {
183 out << "V:";
184 }
185 }
186
187 if (is_null) {
188 std::string null_fmt = format_fwf<ByteArrayType>(width);
189 snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
190 } else {
191 FormatValue(&val, buffer, sizeof(buffer), width);
192 }
193 out << buffer;
194 }
195
196 private:
197 // The ownership of this object is expressed through the reader_ variable in the base
198 TypedColumnReader<DType>* typed_reader_;
199
200 inline void FormatValue(void* val, char* buffer, int bufsize, int width);
201
202 T* values_;
203};
204
205template <typename DType>
206inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize,
207 int width) {
208 std::string fmt = format_fwf<DType>(width);
209 snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
210}
211
212template <>
213inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize,
214 int width) {
215 std::string fmt = format_fwf<Int96Type>(width);
216 std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
217 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
218}
219
220template <>
221inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize,
222 int width) {
223 std::string fmt = format_fwf<ByteArrayType>(width);
224 std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
225 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
226}
227
228template <>
229inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize,
230 int width) {
231 std::string fmt = format_fwf<FLBAType>(width);
232 std::string result = FixedLenByteArrayToString(
233 *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
234 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
235}
236
237typedef TypedScanner<BooleanType> BoolScanner;
238typedef TypedScanner<Int32Type> Int32Scanner;
239typedef TypedScanner<Int64Type> Int64Scanner;
240typedef TypedScanner<Int96Type> Int96Scanner;
241typedef TypedScanner<FloatType> FloatScanner;
242typedef TypedScanner<DoubleType> DoubleScanner;
243typedef TypedScanner<ByteArrayType> ByteArrayScanner;
244typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
245
246template <typename RType>
247int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
248 uint8_t* values, int64_t* values_buffered,
249 parquet::ColumnReader* reader) {
250 typedef typename RType::T Type;
251 auto typed_reader = static_cast<RType*>(reader);
252 auto vals = reinterpret_cast<Type*>(&values[0]);
253 return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals,
254 values_buffered);
255}
256
257int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
258 int16_t* rep_levels, uint8_t* values,
259 int64_t* values_buffered,
260 parquet::ColumnReader* reader);
261
262} // namespace parquet