]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #pragma once | |
19 | ||
20 | #include <cstdint> | |
21 | #include <memory> | |
22 | #include <utility> | |
23 | #include <vector> | |
24 | ||
25 | #include "parquet/exception.h" | |
26 | #include "parquet/level_conversion.h" | |
27 | #include "parquet/platform.h" | |
28 | #include "parquet/schema.h" | |
29 | #include "parquet/types.h" | |
30 | ||
31 | namespace arrow { | |
32 | ||
33 | class Array; | |
34 | class ChunkedArray; | |
35 | ||
36 | namespace BitUtil { | |
37 | class BitReader; | |
38 | } // namespace BitUtil | |
39 | ||
40 | namespace util { | |
41 | class RleDecoder; | |
42 | } // namespace util | |
43 | ||
44 | } // namespace arrow | |
45 | ||
46 | namespace parquet { | |
47 | ||
48 | class Decryptor; | |
49 | class Page; | |
50 | ||
51 | // 16 MB is the default maximum page header size | |
52 | static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; | |
53 | ||
54 | // 16 KB is the default expected page header size | |
55 | static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; | |
56 | ||
57 | class PARQUET_EXPORT LevelDecoder { | |
58 | public: | |
59 | LevelDecoder(); | |
60 | ~LevelDecoder(); | |
61 | ||
62 | // Initialize the LevelDecoder state with new data | |
63 | // and return the number of bytes consumed | |
64 | int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, | |
65 | const uint8_t* data, int32_t data_size); | |
66 | ||
67 | void SetDataV2(int32_t num_bytes, int16_t max_level, int num_buffered_values, | |
68 | const uint8_t* data); | |
69 | ||
70 | // Decodes a batch of levels into an array and returns the number of levels decoded | |
71 | int Decode(int batch_size, int16_t* levels); | |
72 | ||
73 | private: | |
74 | int bit_width_; | |
75 | int num_values_remaining_; | |
76 | Encoding::type encoding_; | |
77 | std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_; | |
78 | std::unique_ptr<::arrow::BitUtil::BitReader> bit_packed_decoder_; | |
79 | int16_t max_level_; | |
80 | }; | |
81 | ||
82 | struct CryptoContext { | |
83 | CryptoContext(bool start_with_dictionary_page, int16_t rg_ordinal, int16_t col_ordinal, | |
84 | std::shared_ptr<Decryptor> meta, std::shared_ptr<Decryptor> data) | |
85 | : start_decrypt_with_dictionary_page(start_with_dictionary_page), | |
86 | row_group_ordinal(rg_ordinal), | |
87 | column_ordinal(col_ordinal), | |
88 | meta_decryptor(std::move(meta)), | |
89 | data_decryptor(std::move(data)) {} | |
90 | CryptoContext() {} | |
91 | ||
92 | bool start_decrypt_with_dictionary_page = false; | |
93 | int16_t row_group_ordinal = -1; | |
94 | int16_t column_ordinal = -1; | |
95 | std::shared_ptr<Decryptor> meta_decryptor; | |
96 | std::shared_ptr<Decryptor> data_decryptor; | |
97 | }; | |
98 | ||
99 | // Abstract page iterator interface. This way, we can feed column pages to the | |
100 | // ColumnReader through whatever mechanism we choose | |
101 | class PARQUET_EXPORT PageReader { | |
102 | public: | |
103 | virtual ~PageReader() = default; | |
104 | ||
105 | static std::unique_ptr<PageReader> Open( | |
106 | std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, | |
107 | Compression::type codec, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), | |
108 | const CryptoContext* ctx = NULLPTR); | |
109 | ||
110 | // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page> | |
111 | // containing new Page otherwise | |
112 | virtual std::shared_ptr<Page> NextPage() = 0; | |
113 | ||
114 | virtual void set_max_page_header_size(uint32_t size) = 0; | |
115 | }; | |
116 | ||
117 | class PARQUET_EXPORT ColumnReader { | |
118 | public: | |
119 | virtual ~ColumnReader() = default; | |
120 | ||
121 | static std::shared_ptr<ColumnReader> Make( | |
122 | const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, | |
123 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); | |
124 | ||
125 | // Returns true if there are still values in this column. | |
126 | virtual bool HasNext() = 0; | |
127 | ||
128 | virtual Type::type type() const = 0; | |
129 | ||
130 | virtual const ColumnDescriptor* descr() const = 0; | |
131 | ||
132 | // Get the encoding that can be exposed by this reader. If it returns | |
133 | // dictionary encoding, then ReadBatchWithDictionary can be used to read data. | |
134 | // | |
135 | // \note API EXPERIMENTAL | |
136 | virtual ExposedEncoding GetExposedEncoding() = 0; | |
137 | ||
138 | protected: | |
139 | friend class RowGroupReader; | |
140 | // Set the encoding that can be exposed by this reader. | |
141 | // | |
142 | // \note API EXPERIMENTAL | |
143 | virtual void SetExposedEncoding(ExposedEncoding encoding) = 0; | |
144 | }; | |
145 | ||
146 | // API to read values from a single column. This is a main client facing API. | |
147 | template <typename DType> | |
148 | class TypedColumnReader : public ColumnReader { | |
149 | public: | |
150 | typedef typename DType::c_type T; | |
151 | ||
152 | // Read a batch of repetition levels, definition levels, and values from the | |
153 | // column. | |
154 | // | |
155 | // Since null values are not stored in the values, the number of values read | |
156 | // may be less than the number of repetition and definition levels. With | |
157 | // nested data this is almost certainly true. | |
158 | // | |
159 | // Set def_levels or rep_levels to nullptr if you want to skip reading them. | |
160 | // This is only safe if you know through some other source that there are no | |
161 | // undefined values. | |
162 | // | |
163 | // To fully exhaust a row group, you must read batches until the number of | |
164 | // values read reaches the number of stored values according to the metadata. | |
165 | // | |
166 | // This API is the same for both V1 and V2 of the DataPage | |
167 | // | |
168 | // @returns: actual number of levels read (see values_read for number of values read) | |
169 | virtual int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, | |
170 | T* values, int64_t* values_read) = 0; | |
171 | ||
172 | /// Read a batch of repetition levels, definition levels, and values from the | |
173 | /// column and leave spaces for null entries on the lowest level in the values | |
174 | /// buffer. | |
175 | /// | |
176 | /// In comparison to ReadBatch the length of repetition and definition levels | |
177 | /// is the same as of the number of values read for max_definition_level == 1. | |
178 | /// In the case of max_definition_level > 1, the repetition and definition | |
179 | /// levels are larger than the values but the values include the null entries | |
180 | /// with definition_level == (max_definition_level - 1). | |
181 | /// | |
182 | /// To fully exhaust a row group, you must read batches until the number of | |
183 | /// values read reaches the number of stored values according to the metadata. | |
184 | /// | |
185 | /// @param batch_size the number of levels to read | |
186 | /// @param[out] def_levels The Parquet definition levels, output has | |
187 | /// the length levels_read. | |
188 | /// @param[out] rep_levels The Parquet repetition levels, output has | |
189 | /// the length levels_read. | |
190 | /// @param[out] values The values in the lowest nested level including | |
191 | /// spacing for nulls on the lowest levels; output has the length | |
192 | /// values_read. | |
193 | /// @param[out] valid_bits Memory allocated for a bitmap that indicates if | |
194 | /// the row is null or on the maximum definition level. For performance | |
195 | /// reasons the underlying buffer should be able to store 1 bit more than | |
196 | /// required. If this requires an additional byte, this byte is only read | |
197 | /// but never written to. | |
198 | /// @param valid_bits_offset The offset in bits of the valid_bits where the | |
199 | /// first relevant bit resides. | |
200 | /// @param[out] levels_read The number of repetition/definition levels that were read. | |
201 | /// @param[out] values_read The number of values read, this includes all | |
202 | /// non-null entries as well as all null-entries on the lowest level | |
203 | /// (i.e. definition_level == max_definition_level - 1) | |
204 | /// @param[out] null_count The number of nulls on the lowest levels. | |
205 | /// (i.e. (values_read - null_count) is total number of non-null entries) | |
206 | /// | |
207 | /// \deprecated Since 4.0.0 | |
208 | ARROW_DEPRECATED("Doesn't handle nesting correctly and unused outside of unit tests.") | |
209 | virtual int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, | |
210 | int16_t* rep_levels, T* values, uint8_t* valid_bits, | |
211 | int64_t valid_bits_offset, int64_t* levels_read, | |
212 | int64_t* values_read, int64_t* null_count) = 0; | |
213 | ||
214 | // Skip reading levels | |
215 | // Returns the number of levels skipped | |
216 | virtual int64_t Skip(int64_t num_rows_to_skip) = 0; | |
217 | ||
218 | // Read a batch of repetition levels, definition levels, and indices from the | |
219 | // column. And read the dictionary if a dictionary page is encountered during | |
220 | // reading pages. This API is similar to ReadBatch(), with ability to read | |
221 | // dictionary and indices. It is only valid to call this method when the reader can | |
222 | // expose dictionary encoding. (i.e., the reader's GetExposedEncoding() returns | |
223 | // DICTIONARY). | |
224 | // | |
225 | // The dictionary is read along with the data page. When there's no data page, | |
226 | // the dictionary won't be returned. | |
227 | // | |
228 | // @param batch_size The batch size to read | |
229 | // @param[out] def_levels The Parquet definition levels. | |
230 | // @param[out] rep_levels The Parquet repetition levels. | |
231 | // @param[out] indices The dictionary indices. | |
232 | // @param[out] indices_read The number of indices read. | |
233 | // @param[out] dict The pointer to dictionary values. It will return nullptr if | |
234 | // there's no data page. Each column chunk only has one dictionary page. The dictionary | |
235 | // is owned by the reader, so the caller is responsible for copying the dictionary | |
236 | // values before the reader gets destroyed. | |
237 | // @param[out] dict_len The dictionary length. It will return 0 if there's no data | |
238 | // page. | |
239 | // @returns: actual number of levels read (see indices_read for number of | |
240 | // indices read | |
241 | // | |
242 | // \note API EXPERIMENTAL | |
243 | virtual int64_t ReadBatchWithDictionary(int64_t batch_size, int16_t* def_levels, | |
244 | int16_t* rep_levels, int32_t* indices, | |
245 | int64_t* indices_read, const T** dict, | |
246 | int32_t* dict_len) = 0; | |
247 | }; | |
248 | ||
249 | namespace internal { | |
250 | ||
251 | /// \brief Stateful column reader that delimits semantic records for both flat | |
252 | /// and nested columns | |
253 | /// | |
254 | /// \note API EXPERIMENTAL | |
255 | /// \since 1.3.0 | |
256 | class RecordReader { | |
257 | public: | |
258 | static std::shared_ptr<RecordReader> Make( | |
259 | const ColumnDescriptor* descr, LevelInfo leaf_info, | |
260 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), | |
261 | const bool read_dictionary = false); | |
262 | ||
263 | virtual ~RecordReader() = default; | |
264 | ||
265 | /// \brief Attempt to read indicated number of records from column chunk | |
266 | /// \return number of records read | |
267 | virtual int64_t ReadRecords(int64_t num_records) = 0; | |
268 | ||
269 | /// \brief Pre-allocate space for data. Results in better flat read performance | |
270 | virtual void Reserve(int64_t num_values) = 0; | |
271 | ||
272 | /// \brief Clear consumed values and repetition/definition levels as the | |
273 | /// result of calling ReadRecords | |
274 | virtual void Reset() = 0; | |
275 | ||
276 | /// \brief Transfer filled values buffer to caller. A new one will be | |
277 | /// allocated in subsequent ReadRecords calls | |
278 | virtual std::shared_ptr<ResizableBuffer> ReleaseValues() = 0; | |
279 | ||
280 | /// \brief Transfer filled validity bitmap buffer to caller. A new one will | |
281 | /// be allocated in subsequent ReadRecords calls | |
282 | virtual std::shared_ptr<ResizableBuffer> ReleaseIsValid() = 0; | |
283 | ||
284 | /// \brief Return true if the record reader has more internal data yet to | |
285 | /// process | |
286 | virtual bool HasMoreData() const = 0; | |
287 | ||
288 | /// \brief Advance record reader to the next row group | |
289 | /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader | |
290 | virtual void SetPageReader(std::unique_ptr<PageReader> reader) = 0; | |
291 | ||
292 | virtual void DebugPrintState() = 0; | |
293 | ||
294 | /// \brief Decoded definition levels | |
295 | int16_t* def_levels() const { | |
296 | return reinterpret_cast<int16_t*>(def_levels_->mutable_data()); | |
297 | } | |
298 | ||
299 | /// \brief Decoded repetition levels | |
300 | int16_t* rep_levels() const { | |
301 | return reinterpret_cast<int16_t*>(rep_levels_->mutable_data()); | |
302 | } | |
303 | ||
304 | /// \brief Decoded values, including nulls, if any | |
305 | uint8_t* values() const { return values_->mutable_data(); } | |
306 | ||
307 | /// \brief Number of values written including nulls (if any) | |
308 | int64_t values_written() const { return values_written_; } | |
309 | ||
310 | /// \brief Number of definition / repetition levels (from those that have | |
311 | /// been decoded) that have been consumed inside the reader. | |
312 | int64_t levels_position() const { return levels_position_; } | |
313 | ||
314 | /// \brief Number of definition / repetition levels that have been written | |
315 | /// internally in the reader | |
316 | int64_t levels_written() const { return levels_written_; } | |
317 | ||
318 | /// \brief Number of nulls in the leaf | |
319 | int64_t null_count() const { return null_count_; } | |
320 | ||
321 | /// \brief True if the leaf values are nullable | |
322 | bool nullable_values() const { return nullable_values_; } | |
323 | ||
324 | /// \brief True if reading directly as Arrow dictionary-encoded | |
325 | bool read_dictionary() const { return read_dictionary_; } | |
326 | ||
327 | protected: | |
328 | bool nullable_values_; | |
329 | ||
330 | bool at_record_start_; | |
331 | int64_t records_read_; | |
332 | ||
333 | int64_t values_written_; | |
334 | int64_t values_capacity_; | |
335 | int64_t null_count_; | |
336 | ||
337 | int64_t levels_written_; | |
338 | int64_t levels_position_; | |
339 | int64_t levels_capacity_; | |
340 | ||
341 | std::shared_ptr<::arrow::ResizableBuffer> values_; | |
342 | // In the case of false, don't allocate the values buffer (when we directly read into | |
343 | // builder classes). | |
344 | bool uses_values_; | |
345 | ||
346 | std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; | |
347 | std::shared_ptr<::arrow::ResizableBuffer> def_levels_; | |
348 | std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; | |
349 | ||
350 | bool read_dictionary_ = false; | |
351 | }; | |
352 | ||
353 | class BinaryRecordReader : virtual public RecordReader { | |
354 | public: | |
355 | virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0; | |
356 | }; | |
357 | ||
358 | /// \brief Read records directly to dictionary-encoded Arrow form (int32 | |
359 | /// indices). Only valid for BYTE_ARRAY columns | |
360 | class DictionaryRecordReader : virtual public RecordReader { | |
361 | public: | |
362 | virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0; | |
363 | }; | |
364 | ||
365 | } // namespace internal | |
366 | ||
367 | using BoolReader = TypedColumnReader<BooleanType>; | |
368 | using Int32Reader = TypedColumnReader<Int32Type>; | |
369 | using Int64Reader = TypedColumnReader<Int64Type>; | |
370 | using Int96Reader = TypedColumnReader<Int96Type>; | |
371 | using FloatReader = TypedColumnReader<FloatType>; | |
372 | using DoubleReader = TypedColumnReader<DoubleType>; | |
373 | using ByteArrayReader = TypedColumnReader<ByteArrayType>; | |
374 | using FixedLenByteArrayReader = TypedColumnReader<FLBAType>; | |
375 | ||
376 | } // namespace parquet |