]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | // This module defines an abstract interface for iterating through pages in a | |
19 | // Parquet column chunk within a row group. It could be extended in the future | |
20 | // to iterate through all data pages in all chunks in a file. | |
21 | ||
22 | #pragma once | |
23 | ||
24 | #include <algorithm> | |
25 | #include <limits> | |
26 | #include <memory> | |
27 | #include <random> | |
28 | #include <string> | |
29 | #include <utility> | |
30 | #include <vector> | |
31 | ||
32 | #include <gtest/gtest.h> | |
33 | ||
34 | #include "arrow/io/memory.h" | |
35 | #include "arrow/testing/util.h" | |
36 | ||
37 | #include "parquet/column_page.h" | |
38 | #include "parquet/column_reader.h" | |
39 | #include "parquet/column_writer.h" | |
40 | #include "parquet/encoding.h" | |
41 | #include "parquet/platform.h" | |
42 | ||
43 | namespace parquet { | |
44 | ||
45 | static constexpr int FLBA_LENGTH = 12; | |
46 | ||
47 | inline bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { | |
48 | return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH); | |
49 | } | |
50 | ||
51 | namespace test { | |
52 | ||
53 | typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type, FloatType, | |
54 | DoubleType, ByteArrayType, FLBAType> | |
55 | ParquetTypes; | |
56 | ||
57 | class ParquetTestException : public parquet::ParquetException { | |
58 | using ParquetException::ParquetException; | |
59 | }; | |
60 | ||
61 | const char* get_data_dir(); | |
62 | std::string get_bad_data_dir(); | |
63 | ||
64 | std::string get_data_file(const std::string& filename, bool is_good = true); | |
65 | ||
66 | template <typename T> | |
67 | static inline void assert_vector_equal(const std::vector<T>& left, | |
68 | const std::vector<T>& right) { | |
69 | ASSERT_EQ(left.size(), right.size()); | |
70 | ||
71 | for (size_t i = 0; i < left.size(); ++i) { | |
72 | ASSERT_EQ(left[i], right[i]) << i; | |
73 | } | |
74 | } | |
75 | ||
76 | template <typename T> | |
77 | static inline bool vector_equal(const std::vector<T>& left, const std::vector<T>& right) { | |
78 | if (left.size() != right.size()) { | |
79 | return false; | |
80 | } | |
81 | ||
82 | for (size_t i = 0; i < left.size(); ++i) { | |
83 | if (left[i] != right[i]) { | |
84 | std::cerr << "index " << i << " left was " << left[i] << " right was " << right[i] | |
85 | << std::endl; | |
86 | return false; | |
87 | } | |
88 | } | |
89 | ||
90 | return true; | |
91 | } | |
92 | ||
93 | template <typename T> | |
94 | static std::vector<T> slice(const std::vector<T>& values, int start, int end) { | |
95 | if (end < start) { | |
96 | return std::vector<T>(0); | |
97 | } | |
98 | ||
99 | std::vector<T> out(end - start); | |
100 | for (int i = start; i < end; ++i) { | |
101 | out[i - start] = values[i]; | |
102 | } | |
103 | return out; | |
104 | } | |
105 | ||
106 | void random_bytes(int n, uint32_t seed, std::vector<uint8_t>* out); | |
107 | void random_bools(int n, double p, uint32_t seed, bool* out); | |
108 | ||
109 | template <typename T> | |
110 | inline void random_numbers(int n, uint32_t seed, T min_value, T max_value, T* out) { | |
111 | std::default_random_engine gen(seed); | |
112 | std::uniform_int_distribution<T> d(min_value, max_value); | |
113 | for (int i = 0; i < n; ++i) { | |
114 | out[i] = d(gen); | |
115 | } | |
116 | } | |
117 | ||
118 | template <> | |
119 | inline void random_numbers(int n, uint32_t seed, float min_value, float max_value, | |
120 | float* out) { | |
121 | std::default_random_engine gen(seed); | |
122 | std::uniform_real_distribution<float> d(min_value, max_value); | |
123 | for (int i = 0; i < n; ++i) { | |
124 | out[i] = d(gen); | |
125 | } | |
126 | } | |
127 | ||
128 | template <> | |
129 | inline void random_numbers(int n, uint32_t seed, double min_value, double max_value, | |
130 | double* out) { | |
131 | std::default_random_engine gen(seed); | |
132 | std::uniform_real_distribution<double> d(min_value, max_value); | |
133 | for (int i = 0; i < n; ++i) { | |
134 | out[i] = d(gen); | |
135 | } | |
136 | } | |
137 | ||
138 | void random_Int96_numbers(int n, uint32_t seed, int32_t min_value, int32_t max_value, | |
139 | Int96* out); | |
140 | ||
141 | void random_fixed_byte_array(int n, uint32_t seed, uint8_t* buf, int len, FLBA* out); | |
142 | ||
143 | void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int min_size, | |
144 | int max_size); | |
145 | ||
146 | void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int max_size); | |
147 | ||
148 | template <typename Type, typename Sequence> | |
149 | std::shared_ptr<Buffer> EncodeValues(Encoding::type encoding, bool use_dictionary, | |
150 | const Sequence& values, int length, | |
151 | const ColumnDescriptor* descr) { | |
152 | auto encoder = MakeTypedEncoder<Type>(encoding, use_dictionary, descr); | |
153 | encoder->Put(values, length); | |
154 | return encoder->FlushValues(); | |
155 | } | |
156 | ||
157 | template <typename T> | |
158 | static void InitValues(int num_values, std::vector<T>& values, | |
159 | std::vector<uint8_t>& buffer) { | |
160 | random_numbers(num_values, 0, std::numeric_limits<T>::min(), | |
161 | std::numeric_limits<T>::max(), values.data()); | |
162 | } | |
163 | ||
164 | template <typename T> | |
165 | static void InitDictValues(int num_values, int num_dicts, std::vector<T>& values, | |
166 | std::vector<uint8_t>& buffer) { | |
167 | int repeat_factor = num_values / num_dicts; | |
168 | InitValues<T>(num_dicts, values, buffer); | |
169 | // add some repeated values | |
170 | for (int j = 1; j < repeat_factor; ++j) { | |
171 | for (int i = 0; i < num_dicts; ++i) { | |
172 | std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T)); | |
173 | } | |
174 | } | |
175 | // computed only dict_per_page * repeat_factor - 1 values < num_values | |
176 | // compute remaining | |
177 | for (int i = num_dicts * repeat_factor; i < num_values; ++i) { | |
178 | std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T)); | |
179 | } | |
180 | } | |
181 | ||
182 | template <> | |
183 | inline void InitDictValues<bool>(int num_values, int num_dicts, std::vector<bool>& values, | |
184 | std::vector<uint8_t>& buffer) { | |
185 | // No op for bool | |
186 | } | |
187 | ||
188 | class MockPageReader : public PageReader { | |
189 | public: | |
190 | explicit MockPageReader(const std::vector<std::shared_ptr<Page>>& pages) | |
191 | : pages_(pages), page_index_(0) {} | |
192 | ||
193 | std::shared_ptr<Page> NextPage() override { | |
194 | if (page_index_ == static_cast<int>(pages_.size())) { | |
195 | // EOS to consumer | |
196 | return std::shared_ptr<Page>(nullptr); | |
197 | } | |
198 | return pages_[page_index_++]; | |
199 | } | |
200 | ||
201 | // No-op | |
202 | void set_max_page_header_size(uint32_t size) override {} | |
203 | ||
204 | private: | |
205 | std::vector<std::shared_ptr<Page>> pages_; | |
206 | int page_index_; | |
207 | }; | |
208 | ||
209 | // TODO(wesm): this is only used for testing for now. Refactor to form part of | |
210 | // primary file write path | |
211 | template <typename Type> | |
212 | class DataPageBuilder { | |
213 | public: | |
214 | using c_type = typename Type::c_type; | |
215 | ||
216 | // This class writes data and metadata to the passed inputs | |
217 | explicit DataPageBuilder(ArrowOutputStream* sink) | |
218 | : sink_(sink), | |
219 | num_values_(0), | |
220 | encoding_(Encoding::PLAIN), | |
221 | definition_level_encoding_(Encoding::RLE), | |
222 | repetition_level_encoding_(Encoding::RLE), | |
223 | have_def_levels_(false), | |
224 | have_rep_levels_(false), | |
225 | have_values_(false) {} | |
226 | ||
227 | void AppendDefLevels(const std::vector<int16_t>& levels, int16_t max_level, | |
228 | Encoding::type encoding = Encoding::RLE) { | |
229 | AppendLevels(levels, max_level, encoding); | |
230 | ||
231 | num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_); | |
232 | definition_level_encoding_ = encoding; | |
233 | have_def_levels_ = true; | |
234 | } | |
235 | ||
236 | void AppendRepLevels(const std::vector<int16_t>& levels, int16_t max_level, | |
237 | Encoding::type encoding = Encoding::RLE) { | |
238 | AppendLevels(levels, max_level, encoding); | |
239 | ||
240 | num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_); | |
241 | repetition_level_encoding_ = encoding; | |
242 | have_rep_levels_ = true; | |
243 | } | |
244 | ||
245 | void AppendValues(const ColumnDescriptor* d, const std::vector<c_type>& values, | |
246 | Encoding::type encoding = Encoding::PLAIN) { | |
247 | std::shared_ptr<Buffer> values_sink = EncodeValues<Type>( | |
248 | encoding, false, values.data(), static_cast<int>(values.size()), d); | |
249 | PARQUET_THROW_NOT_OK(sink_->Write(values_sink->data(), values_sink->size())); | |
250 | ||
251 | num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_); | |
252 | encoding_ = encoding; | |
253 | have_values_ = true; | |
254 | } | |
255 | ||
256 | int32_t num_values() const { return num_values_; } | |
257 | ||
258 | Encoding::type encoding() const { return encoding_; } | |
259 | ||
260 | Encoding::type rep_level_encoding() const { return repetition_level_encoding_; } | |
261 | ||
262 | Encoding::type def_level_encoding() const { return definition_level_encoding_; } | |
263 | ||
264 | private: | |
265 | ArrowOutputStream* sink_; | |
266 | ||
267 | int32_t num_values_; | |
268 | Encoding::type encoding_; | |
269 | Encoding::type definition_level_encoding_; | |
270 | Encoding::type repetition_level_encoding_; | |
271 | ||
272 | bool have_def_levels_; | |
273 | bool have_rep_levels_; | |
274 | bool have_values_; | |
275 | ||
276 | // Used internally for both repetition and definition levels | |
277 | void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level, | |
278 | Encoding::type encoding) { | |
279 | if (encoding != Encoding::RLE) { | |
280 | ParquetException::NYI("only rle encoding currently implemented"); | |
281 | } | |
282 | ||
283 | // TODO: compute a more precise maximum size for the encoded levels | |
284 | std::vector<uint8_t> encode_buffer(levels.size() * 2); | |
285 | ||
286 | // We encode into separate memory from the output stream because the | |
287 | // RLE-encoded bytes have to be preceded in the stream by their absolute | |
288 | // size. | |
289 | LevelEncoder encoder; | |
290 | encoder.Init(encoding, max_level, static_cast<int>(levels.size()), | |
291 | encode_buffer.data(), static_cast<int>(encode_buffer.size())); | |
292 | ||
293 | encoder.Encode(static_cast<int>(levels.size()), levels.data()); | |
294 | ||
295 | int32_t rle_bytes = encoder.len(); | |
296 | PARQUET_THROW_NOT_OK( | |
297 | sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t))); | |
298 | PARQUET_THROW_NOT_OK(sink_->Write(encode_buffer.data(), rle_bytes)); | |
299 | } | |
300 | }; | |
301 | ||
302 | template <> | |
303 | inline void DataPageBuilder<BooleanType>::AppendValues(const ColumnDescriptor* d, | |
304 | const std::vector<bool>& values, | |
305 | Encoding::type encoding) { | |
306 | if (encoding != Encoding::PLAIN) { | |
307 | ParquetException::NYI("only plain encoding currently implemented"); | |
308 | } | |
309 | ||
310 | auto encoder = MakeTypedEncoder<BooleanType>(Encoding::PLAIN, false, d); | |
311 | dynamic_cast<BooleanEncoder*>(encoder.get()) | |
312 | ->Put(values, static_cast<int>(values.size())); | |
313 | std::shared_ptr<Buffer> buffer = encoder->FlushValues(); | |
314 | PARQUET_THROW_NOT_OK(sink_->Write(buffer->data(), buffer->size())); | |
315 | ||
316 | num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_); | |
317 | encoding_ = encoding; | |
318 | have_values_ = true; | |
319 | } | |
320 | ||
321 | template <typename Type> | |
322 | static std::shared_ptr<DataPageV1> MakeDataPage( | |
323 | const ColumnDescriptor* d, const std::vector<typename Type::c_type>& values, | |
324 | int num_vals, Encoding::type encoding, const uint8_t* indices, int indices_size, | |
325 | const std::vector<int16_t>& def_levels, int16_t max_def_level, | |
326 | const std::vector<int16_t>& rep_levels, int16_t max_rep_level) { | |
327 | int num_values = 0; | |
328 | ||
329 | auto page_stream = CreateOutputStream(); | |
330 | test::DataPageBuilder<Type> page_builder(page_stream.get()); | |
331 | ||
332 | if (!rep_levels.empty()) { | |
333 | page_builder.AppendRepLevels(rep_levels, max_rep_level); | |
334 | } | |
335 | if (!def_levels.empty()) { | |
336 | page_builder.AppendDefLevels(def_levels, max_def_level); | |
337 | } | |
338 | ||
339 | if (encoding == Encoding::PLAIN) { | |
340 | page_builder.AppendValues(d, values, encoding); | |
341 | num_values = page_builder.num_values(); | |
342 | } else { // DICTIONARY PAGES | |
343 | PARQUET_THROW_NOT_OK(page_stream->Write(indices, indices_size)); | |
344 | num_values = std::max(page_builder.num_values(), num_vals); | |
345 | } | |
346 | ||
347 | PARQUET_ASSIGN_OR_THROW(auto buffer, page_stream->Finish()); | |
348 | ||
349 | return std::make_shared<DataPageV1>(buffer, num_values, encoding, | |
350 | page_builder.def_level_encoding(), | |
351 | page_builder.rep_level_encoding(), buffer->size()); | |
352 | } | |
353 | ||
354 | template <typename TYPE> | |
355 | class DictionaryPageBuilder { | |
356 | public: | |
357 | typedef typename TYPE::c_type TC; | |
358 | static constexpr int TN = TYPE::type_num; | |
359 | using SpecializedEncoder = typename EncodingTraits<TYPE>::Encoder; | |
360 | ||
361 | // This class writes data and metadata to the passed inputs | |
362 | explicit DictionaryPageBuilder(const ColumnDescriptor* d) | |
363 | : num_dict_values_(0), have_values_(false) { | |
364 | auto encoder = MakeTypedEncoder<TYPE>(Encoding::PLAIN, true, d); | |
365 | dict_traits_ = dynamic_cast<DictEncoder<TYPE>*>(encoder.get()); | |
366 | encoder_.reset(dynamic_cast<SpecializedEncoder*>(encoder.release())); | |
367 | } | |
368 | ||
369 | ~DictionaryPageBuilder() {} | |
370 | ||
371 | std::shared_ptr<Buffer> AppendValues(const std::vector<TC>& values) { | |
372 | int num_values = static_cast<int>(values.size()); | |
373 | // Dictionary encoding | |
374 | encoder_->Put(values.data(), num_values); | |
375 | num_dict_values_ = dict_traits_->num_entries(); | |
376 | have_values_ = true; | |
377 | return encoder_->FlushValues(); | |
378 | } | |
379 | ||
380 | std::shared_ptr<Buffer> WriteDict() { | |
381 | std::shared_ptr<Buffer> dict_buffer = | |
382 | AllocateBuffer(::arrow::default_memory_pool(), dict_traits_->dict_encoded_size()); | |
383 | dict_traits_->WriteDict(dict_buffer->mutable_data()); | |
384 | return dict_buffer; | |
385 | } | |
386 | ||
387 | int32_t num_values() const { return num_dict_values_; } | |
388 | ||
389 | private: | |
390 | DictEncoder<TYPE>* dict_traits_; | |
391 | std::unique_ptr<SpecializedEncoder> encoder_; | |
392 | int32_t num_dict_values_; | |
393 | bool have_values_; | |
394 | }; | |
395 | ||
396 | template <> | |
397 | inline DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder( | |
398 | const ColumnDescriptor* d) { | |
399 | ParquetException::NYI("only plain encoding currently implemented for boolean"); | |
400 | } | |
401 | ||
402 | template <> | |
403 | inline std::shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() { | |
404 | ParquetException::NYI("only plain encoding currently implemented for boolean"); | |
405 | return nullptr; | |
406 | } | |
407 | ||
408 | template <> | |
409 | inline std::shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues( | |
410 | const std::vector<TC>& values) { | |
411 | ParquetException::NYI("only plain encoding currently implemented for boolean"); | |
412 | return nullptr; | |
413 | } | |
414 | ||
415 | template <typename Type> | |
416 | inline static std::shared_ptr<DictionaryPage> MakeDictPage( | |
417 | const ColumnDescriptor* d, const std::vector<typename Type::c_type>& values, | |
418 | const std::vector<int>& values_per_page, Encoding::type encoding, | |
419 | std::vector<std::shared_ptr<Buffer>>& rle_indices) { | |
420 | test::DictionaryPageBuilder<Type> page_builder(d); | |
421 | int num_pages = static_cast<int>(values_per_page.size()); | |
422 | int value_start = 0; | |
423 | ||
424 | for (int i = 0; i < num_pages; i++) { | |
425 | rle_indices.push_back(page_builder.AppendValues( | |
426 | slice(values, value_start, value_start + values_per_page[i]))); | |
427 | value_start += values_per_page[i]; | |
428 | } | |
429 | ||
430 | auto buffer = page_builder.WriteDict(); | |
431 | ||
432 | return std::make_shared<DictionaryPage>(buffer, page_builder.num_values(), | |
433 | Encoding::PLAIN); | |
434 | } | |
435 | ||
436 | // Given def/rep levels and values create multiple dict pages | |
437 | template <typename Type> | |
438 | inline static void PaginateDict(const ColumnDescriptor* d, | |
439 | const std::vector<typename Type::c_type>& values, | |
440 | const std::vector<int16_t>& def_levels, | |
441 | int16_t max_def_level, | |
442 | const std::vector<int16_t>& rep_levels, | |
443 | int16_t max_rep_level, int num_levels_per_page, | |
444 | const std::vector<int>& values_per_page, | |
445 | std::vector<std::shared_ptr<Page>>& pages, | |
446 | Encoding::type encoding = Encoding::RLE_DICTIONARY) { | |
447 | int num_pages = static_cast<int>(values_per_page.size()); | |
448 | std::vector<std::shared_ptr<Buffer>> rle_indices; | |
449 | std::shared_ptr<DictionaryPage> dict_page = | |
450 | MakeDictPage<Type>(d, values, values_per_page, encoding, rle_indices); | |
451 | pages.push_back(dict_page); | |
452 | int def_level_start = 0; | |
453 | int def_level_end = 0; | |
454 | int rep_level_start = 0; | |
455 | int rep_level_end = 0; | |
456 | for (int i = 0; i < num_pages; i++) { | |
457 | if (max_def_level > 0) { | |
458 | def_level_start = i * num_levels_per_page; | |
459 | def_level_end = (i + 1) * num_levels_per_page; | |
460 | } | |
461 | if (max_rep_level > 0) { | |
462 | rep_level_start = i * num_levels_per_page; | |
463 | rep_level_end = (i + 1) * num_levels_per_page; | |
464 | } | |
465 | std::shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>( | |
466 | d, {}, values_per_page[i], encoding, rle_indices[i]->data(), | |
467 | static_cast<int>(rle_indices[i]->size()), | |
468 | slice(def_levels, def_level_start, def_level_end), max_def_level, | |
469 | slice(rep_levels, rep_level_start, rep_level_end), max_rep_level); | |
470 | pages.push_back(data_page); | |
471 | } | |
472 | } | |
473 | ||
474 | // Given def/rep levels and values create multiple plain pages | |
475 | template <typename Type> | |
476 | static inline void PaginatePlain(const ColumnDescriptor* d, | |
477 | const std::vector<typename Type::c_type>& values, | |
478 | const std::vector<int16_t>& def_levels, | |
479 | int16_t max_def_level, | |
480 | const std::vector<int16_t>& rep_levels, | |
481 | int16_t max_rep_level, int num_levels_per_page, | |
482 | const std::vector<int>& values_per_page, | |
483 | std::vector<std::shared_ptr<Page>>& pages, | |
484 | Encoding::type encoding = Encoding::PLAIN) { | |
485 | int num_pages = static_cast<int>(values_per_page.size()); | |
486 | int def_level_start = 0; | |
487 | int def_level_end = 0; | |
488 | int rep_level_start = 0; | |
489 | int rep_level_end = 0; | |
490 | int value_start = 0; | |
491 | for (int i = 0; i < num_pages; i++) { | |
492 | if (max_def_level > 0) { | |
493 | def_level_start = i * num_levels_per_page; | |
494 | def_level_end = (i + 1) * num_levels_per_page; | |
495 | } | |
496 | if (max_rep_level > 0) { | |
497 | rep_level_start = i * num_levels_per_page; | |
498 | rep_level_end = (i + 1) * num_levels_per_page; | |
499 | } | |
500 | std::shared_ptr<DataPage> page = MakeDataPage<Type>( | |
501 | d, slice(values, value_start, value_start + values_per_page[i]), | |
502 | values_per_page[i], encoding, nullptr, 0, | |
503 | slice(def_levels, def_level_start, def_level_end), max_def_level, | |
504 | slice(rep_levels, rep_level_start, rep_level_end), max_rep_level); | |
505 | pages.push_back(page); | |
506 | value_start += values_per_page[i]; | |
507 | } | |
508 | } | |
509 | ||
510 | // Generates pages from randomly generated data | |
511 | template <typename Type> | |
512 | static inline int MakePages(const ColumnDescriptor* d, int num_pages, int levels_per_page, | |
513 | std::vector<int16_t>& def_levels, | |
514 | std::vector<int16_t>& rep_levels, | |
515 | std::vector<typename Type::c_type>& values, | |
516 | std::vector<uint8_t>& buffer, | |
517 | std::vector<std::shared_ptr<Page>>& pages, | |
518 | Encoding::type encoding = Encoding::PLAIN) { | |
519 | int num_levels = levels_per_page * num_pages; | |
520 | int num_values = 0; | |
521 | uint32_t seed = 0; | |
522 | int16_t zero = 0; | |
523 | int16_t max_def_level = d->max_definition_level(); | |
524 | int16_t max_rep_level = d->max_repetition_level(); | |
525 | std::vector<int> values_per_page(num_pages, levels_per_page); | |
526 | // Create definition levels | |
527 | if (max_def_level > 0) { | |
528 | def_levels.resize(num_levels); | |
529 | random_numbers(num_levels, seed, zero, max_def_level, def_levels.data()); | |
530 | for (int p = 0; p < num_pages; p++) { | |
531 | int num_values_per_page = 0; | |
532 | for (int i = 0; i < levels_per_page; i++) { | |
533 | if (def_levels[i + p * levels_per_page] == max_def_level) { | |
534 | num_values_per_page++; | |
535 | num_values++; | |
536 | } | |
537 | } | |
538 | values_per_page[p] = num_values_per_page; | |
539 | } | |
540 | } else { | |
541 | num_values = num_levels; | |
542 | } | |
543 | // Create repetition levels | |
544 | if (max_rep_level > 0) { | |
545 | rep_levels.resize(num_levels); | |
546 | random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data()); | |
547 | } | |
548 | // Create values | |
549 | values.resize(num_values); | |
550 | if (encoding == Encoding::PLAIN) { | |
551 | InitValues<typename Type::c_type>(num_values, values, buffer); | |
552 | PaginatePlain<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level, | |
553 | levels_per_page, values_per_page, pages); | |
554 | } else if (encoding == Encoding::RLE_DICTIONARY || | |
555 | encoding == Encoding::PLAIN_DICTIONARY) { | |
556 | // Calls InitValues and repeats the data | |
557 | InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer); | |
558 | PaginateDict<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level, | |
559 | levels_per_page, values_per_page, pages); | |
560 | } | |
561 | ||
562 | return num_values; | |
563 | } | |
564 | ||
565 | // ---------------------------------------------------------------------- | |
566 | // Test data generation | |
567 | ||
568 | template <> | |
569 | void inline InitValues<bool>(int num_values, std::vector<bool>& values, | |
570 | std::vector<uint8_t>& buffer) { | |
571 | values = {}; | |
572 | ::arrow::random_is_valid(num_values, 0.5, &values, | |
573 | static_cast<int>(::arrow::random_seed())); | |
574 | } | |
575 | ||
576 | template <> | |
577 | inline void InitValues<ByteArray>(int num_values, std::vector<ByteArray>& values, | |
578 | std::vector<uint8_t>& buffer) { | |
579 | int max_byte_array_len = 12; | |
580 | int num_bytes = static_cast<int>(max_byte_array_len + sizeof(uint32_t)); | |
581 | size_t nbytes = num_values * num_bytes; | |
582 | buffer.resize(nbytes); | |
583 | random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len); | |
584 | } | |
585 | ||
586 | inline void InitWideByteArrayValues(int num_values, std::vector<ByteArray>& values, | |
587 | std::vector<uint8_t>& buffer, int min_len, | |
588 | int max_len) { | |
589 | int num_bytes = static_cast<int>(max_len + sizeof(uint32_t)); | |
590 | size_t nbytes = num_values * num_bytes; | |
591 | buffer.resize(nbytes); | |
592 | random_byte_array(num_values, 0, buffer.data(), values.data(), min_len, max_len); | |
593 | } | |
594 | ||
595 | template <> | |
596 | inline void InitValues<FLBA>(int num_values, std::vector<FLBA>& values, | |
597 | std::vector<uint8_t>& buffer) { | |
598 | size_t nbytes = num_values * FLBA_LENGTH; | |
599 | buffer.resize(nbytes); | |
600 | random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH, values.data()); | |
601 | } | |
602 | ||
603 | template <> | |
604 | inline void InitValues<Int96>(int num_values, std::vector<Int96>& values, | |
605 | std::vector<uint8_t>& buffer) { | |
606 | random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(), | |
607 | std::numeric_limits<int32_t>::max(), values.data()); | |
608 | } | |
609 | ||
610 | inline std::string TestColumnName(int i) { | |
611 | std::stringstream col_name; | |
612 | col_name << "column_" << i; | |
613 | return col_name.str(); | |
614 | } | |
615 | ||
616 | // This class lives here because of its dependency on the InitValues specializations. | |
617 | template <typename TestType> | |
618 | class PrimitiveTypedTest : public ::testing::Test { | |
619 | public: | |
620 | using c_type = typename TestType::c_type; | |
621 | ||
622 | void SetUpSchema(Repetition::type repetition, int num_columns = 1) { | |
623 | std::vector<schema::NodePtr> fields; | |
624 | ||
625 | for (int i = 0; i < num_columns; ++i) { | |
626 | std::string name = TestColumnName(i); | |
627 | fields.push_back(schema::PrimitiveNode::Make(name, repetition, TestType::type_num, | |
628 | ConvertedType::NONE, FLBA_LENGTH)); | |
629 | } | |
630 | node_ = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields); | |
631 | schema_.Init(node_); | |
632 | } | |
633 | ||
634 | void GenerateData(int64_t num_values); | |
635 | void SetupValuesOut(int64_t num_values); | |
636 | void SyncValuesOut(); | |
637 | ||
638 | protected: | |
639 | schema::NodePtr node_; | |
640 | SchemaDescriptor schema_; | |
641 | ||
642 | // Input buffers | |
643 | std::vector<c_type> values_; | |
644 | ||
645 | std::vector<int16_t> def_levels_; | |
646 | ||
647 | std::vector<uint8_t> buffer_; | |
648 | // Pointer to the values, needed as we cannot use std::vector<bool>::data() | |
649 | c_type* values_ptr_; | |
650 | std::vector<uint8_t> bool_buffer_; | |
651 | ||
652 | // Output buffers | |
653 | std::vector<c_type> values_out_; | |
654 | std::vector<uint8_t> bool_buffer_out_; | |
655 | c_type* values_out_ptr_; | |
656 | }; | |
657 | ||
658 | template <typename TestType> | |
659 | inline void PrimitiveTypedTest<TestType>::SyncValuesOut() {} | |
660 | ||
661 | template <> | |
662 | inline void PrimitiveTypedTest<BooleanType>::SyncValuesOut() { | |
663 | std::vector<uint8_t>::const_iterator source_iterator = bool_buffer_out_.begin(); | |
664 | std::vector<c_type>::iterator destination_iterator = values_out_.begin(); | |
665 | while (source_iterator != bool_buffer_out_.end()) { | |
666 | *destination_iterator++ = *source_iterator++ != 0; | |
667 | } | |
668 | } | |
669 | ||
670 | template <typename TestType> | |
671 | inline void PrimitiveTypedTest<TestType>::SetupValuesOut(int64_t num_values) { | |
672 | values_out_.clear(); | |
673 | values_out_.resize(num_values); | |
674 | values_out_ptr_ = values_out_.data(); | |
675 | } | |
676 | ||
677 | template <> | |
678 | inline void PrimitiveTypedTest<BooleanType>::SetupValuesOut(int64_t num_values) { | |
679 | values_out_.clear(); | |
680 | values_out_.resize(num_values); | |
681 | ||
682 | bool_buffer_out_.clear(); | |
683 | bool_buffer_out_.resize(num_values); | |
684 | // Write once to all values so we can copy it without getting Valgrind errors | |
685 | // about uninitialised values. | |
686 | std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true); | |
687 | values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data()); | |
688 | } | |
689 | ||
690 | template <typename TestType> | |
691 | inline void PrimitiveTypedTest<TestType>::GenerateData(int64_t num_values) { | |
692 | def_levels_.resize(num_values); | |
693 | values_.resize(num_values); | |
694 | ||
695 | InitValues<c_type>(static_cast<int>(num_values), values_, buffer_); | |
696 | values_ptr_ = values_.data(); | |
697 | ||
698 | std::fill(def_levels_.begin(), def_levels_.end(), 1); | |
699 | } | |
700 | ||
701 | template <> | |
702 | inline void PrimitiveTypedTest<BooleanType>::GenerateData(int64_t num_values) { | |
703 | def_levels_.resize(num_values); | |
704 | values_.resize(num_values); | |
705 | ||
706 | InitValues<c_type>(static_cast<int>(num_values), values_, buffer_); | |
707 | bool_buffer_.resize(num_values); | |
708 | std::copy(values_.begin(), values_.end(), bool_buffer_.begin()); | |
709 | values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data()); | |
710 | ||
711 | std::fill(def_levels_.begin(), def_levels_.end(), 1); | |
712 | } | |
713 | ||
714 | } // namespace test | |
715 | } // namespace parquet |