]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #pragma once | |
19 | ||
20 | #include <stdio.h> | |
21 | ||
22 | #include <cstdint> | |
23 | #include <memory> | |
24 | #include <ostream> | |
25 | #include <string> | |
26 | #include <utility> | |
27 | #include <vector> | |
28 | ||
29 | #include "parquet/column_reader.h" | |
30 | #include "parquet/exception.h" | |
31 | #include "parquet/platform.h" | |
32 | #include "parquet/schema.h" | |
33 | #include "parquet/types.h" | |
34 | ||
35 | namespace parquet { | |
36 | ||
37 | static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; | |
38 | ||
39 | class PARQUET_EXPORT Scanner { | |
40 | public: | |
41 | explicit Scanner(std::shared_ptr<ColumnReader> reader, | |
42 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, | |
43 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) | |
44 | : batch_size_(batch_size), | |
45 | level_offset_(0), | |
46 | levels_buffered_(0), | |
47 | value_buffer_(AllocateBuffer(pool)), | |
48 | value_offset_(0), | |
49 | values_buffered_(0), | |
50 | reader_(std::move(reader)) { | |
51 | def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0); | |
52 | rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0); | |
53 | } | |
54 | ||
55 | virtual ~Scanner() {} | |
56 | ||
57 | static std::shared_ptr<Scanner> Make( | |
58 | std::shared_ptr<ColumnReader> col_reader, | |
59 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, | |
60 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); | |
61 | ||
62 | virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0; | |
63 | ||
64 | bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); } | |
65 | ||
66 | const ColumnDescriptor* descr() const { return reader_->descr(); } | |
67 | ||
68 | int64_t batch_size() const { return batch_size_; } | |
69 | ||
70 | void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } | |
71 | ||
72 | protected: | |
73 | int64_t batch_size_; | |
74 | ||
75 | std::vector<int16_t> def_levels_; | |
76 | std::vector<int16_t> rep_levels_; | |
77 | int level_offset_; | |
78 | int levels_buffered_; | |
79 | ||
80 | std::shared_ptr<ResizableBuffer> value_buffer_; | |
81 | int value_offset_; | |
82 | int64_t values_buffered_; | |
83 | std::shared_ptr<ColumnReader> reader_; | |
84 | }; | |
85 | ||
86 | template <typename DType> | |
87 | class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner { | |
88 | public: | |
89 | typedef typename DType::c_type T; | |
90 | ||
91 | explicit TypedScanner(std::shared_ptr<ColumnReader> reader, | |
92 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, | |
93 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) | |
94 | : Scanner(std::move(reader), batch_size, pool) { | |
95 | typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader_.get()); | |
96 | int value_byte_size = type_traits<DType::type_num>::value_byte_size; | |
97 | PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); | |
98 | values_ = reinterpret_cast<T*>(value_buffer_->mutable_data()); | |
99 | } | |
100 | ||
101 | virtual ~TypedScanner() {} | |
102 | ||
103 | bool NextLevels(int16_t* def_level, int16_t* rep_level) { | |
104 | if (level_offset_ == levels_buffered_) { | |
105 | levels_buffered_ = static_cast<int>( | |
106 | typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(), | |
107 | rep_levels_.data(), values_, &values_buffered_)); | |
108 | ||
109 | value_offset_ = 0; | |
110 | level_offset_ = 0; | |
111 | if (!levels_buffered_) { | |
112 | return false; | |
113 | } | |
114 | } | |
115 | *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0; | |
116 | *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0; | |
117 | level_offset_++; | |
118 | return true; | |
119 | } | |
120 | ||
121 | bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) { | |
122 | if (level_offset_ == levels_buffered_) { | |
123 | if (!HasNext()) { | |
124 | // Out of data pages | |
125 | return false; | |
126 | } | |
127 | } | |
128 | ||
129 | NextLevels(def_level, rep_level); | |
130 | *is_null = *def_level < descr()->max_definition_level(); | |
131 | ||
132 | if (*is_null) { | |
133 | return true; | |
134 | } | |
135 | ||
136 | if (value_offset_ == values_buffered_) { | |
137 | throw ParquetException("Value was non-null, but has not been buffered"); | |
138 | } | |
139 | *val = values_[value_offset_++]; | |
140 | return true; | |
141 | } | |
142 | ||
143 | // Returns true if there is a next value | |
144 | bool NextValue(T* val, bool* is_null) { | |
145 | if (level_offset_ == levels_buffered_) { | |
146 | if (!HasNext()) { | |
147 | // Out of data pages | |
148 | return false; | |
149 | } | |
150 | } | |
151 | ||
152 | // Out of values | |
153 | int16_t def_level = -1; | |
154 | int16_t rep_level = -1; | |
155 | NextLevels(&def_level, &rep_level); | |
156 | *is_null = def_level < descr()->max_definition_level(); | |
157 | ||
158 | if (*is_null) { | |
159 | return true; | |
160 | } | |
161 | ||
162 | if (value_offset_ == values_buffered_) { | |
163 | throw ParquetException("Value was non-null, but has not been buffered"); | |
164 | } | |
165 | *val = values_[value_offset_++]; | |
166 | return true; | |
167 | } | |
168 | ||
169 | virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) { | |
170 | T val{}; | |
171 | int16_t def_level = -1; | |
172 | int16_t rep_level = -1; | |
173 | bool is_null = false; | |
174 | char buffer[80]; | |
175 | ||
176 | if (!Next(&val, &def_level, &rep_level, &is_null)) { | |
177 | throw ParquetException("No more values buffered"); | |
178 | } | |
179 | ||
180 | if (with_levels) { | |
181 | out << " D:" << def_level << " R:" << rep_level << " "; | |
182 | if (!is_null) { | |
183 | out << "V:"; | |
184 | } | |
185 | } | |
186 | ||
187 | if (is_null) { | |
188 | std::string null_fmt = format_fwf<ByteArrayType>(width); | |
189 | snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL"); | |
190 | } else { | |
191 | FormatValue(&val, buffer, sizeof(buffer), width); | |
192 | } | |
193 | out << buffer; | |
194 | } | |
195 | ||
196 | private: | |
197 | // The ownership of this object is expressed through the reader_ variable in the base | |
198 | TypedColumnReader<DType>* typed_reader_; | |
199 | ||
200 | inline void FormatValue(void* val, char* buffer, int bufsize, int width); | |
201 | ||
202 | T* values_; | |
203 | }; | |
204 | ||
205 | template <typename DType> | |
206 | inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize, | |
207 | int width) { | |
208 | std::string fmt = format_fwf<DType>(width); | |
209 | snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val)); | |
210 | } | |
211 | ||
212 | template <> | |
213 | inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize, | |
214 | int width) { | |
215 | std::string fmt = format_fwf<Int96Type>(width); | |
216 | std::string result = Int96ToString(*reinterpret_cast<Int96*>(val)); | |
217 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); | |
218 | } | |
219 | ||
220 | template <> | |
221 | inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize, | |
222 | int width) { | |
223 | std::string fmt = format_fwf<ByteArrayType>(width); | |
224 | std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val)); | |
225 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); | |
226 | } | |
227 | ||
228 | template <> | |
229 | inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize, | |
230 | int width) { | |
231 | std::string fmt = format_fwf<FLBAType>(width); | |
232 | std::string result = FixedLenByteArrayToString( | |
233 | *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length()); | |
234 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); | |
235 | } | |
236 | ||
237 | typedef TypedScanner<BooleanType> BoolScanner; | |
238 | typedef TypedScanner<Int32Type> Int32Scanner; | |
239 | typedef TypedScanner<Int64Type> Int64Scanner; | |
240 | typedef TypedScanner<Int96Type> Int96Scanner; | |
241 | typedef TypedScanner<FloatType> FloatScanner; | |
242 | typedef TypedScanner<DoubleType> DoubleScanner; | |
243 | typedef TypedScanner<ByteArrayType> ByteArrayScanner; | |
244 | typedef TypedScanner<FLBAType> FixedLenByteArrayScanner; | |
245 | ||
246 | template <typename RType> | |
247 | int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, | |
248 | uint8_t* values, int64_t* values_buffered, | |
249 | parquet::ColumnReader* reader) { | |
250 | typedef typename RType::T Type; | |
251 | auto typed_reader = static_cast<RType*>(reader); | |
252 | auto vals = reinterpret_cast<Type*>(&values[0]); | |
253 | return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals, | |
254 | values_buffered); | |
255 | } | |
256 | ||
257 | int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels, | |
258 | int16_t* rep_levels, uint8_t* values, | |
259 | int64_t* values_buffered, | |
260 | parquet::ColumnReader* reader); | |
261 | ||
262 | } // namespace parquet |