]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/parquet/column_scanner_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / column_scanner_test.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <gtest/gtest.h>
19
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstdlib>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "arrow/testing/gtest_compat.h"
29
30 #include "parquet/column_page.h"
31 #include "parquet/column_scanner.h"
32 #include "parquet/schema.h"
33 #include "parquet/test_util.h"
34 #include "parquet/types.h"
35
36 namespace parquet {
37
38 using schema::NodePtr;
39
40 namespace test {
41
42 template <typename Type>
43 class TestFlatScanner : public ::testing::Test {
44 public:
45 using c_type = typename Type::c_type;
46
47 void InitScanner(const ColumnDescriptor* d) {
48 std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_));
49 scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager)));
50 }
51
52 void CheckResults(int batch_size, const ColumnDescriptor* d) {
53 TypedScanner<Type>* scanner = reinterpret_cast<TypedScanner<Type>*>(scanner_.get());
54 c_type val;
55 bool is_null = false;
56 int16_t def_level;
57 int16_t rep_level;
58 int j = 0;
59 scanner->SetBatchSize(batch_size);
60 for (int i = 0; i < num_levels_; i++) {
61 ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j;
62 if (!is_null) {
63 ASSERT_EQ(values_[j], val) << i << "V" << j;
64 j++;
65 }
66 if (d->max_definition_level() > 0) {
67 ASSERT_EQ(def_levels_[i], def_level) << i << "D" << j;
68 }
69 if (d->max_repetition_level() > 0) {
70 ASSERT_EQ(rep_levels_[i], rep_level) << i << "R" << j;
71 }
72 }
73 ASSERT_EQ(num_values_, j);
74 ASSERT_FALSE(scanner->Next(&val, &def_level, &rep_level, &is_null));
75 }
76
77 void Clear() {
78 pages_.clear();
79 values_.clear();
80 def_levels_.clear();
81 rep_levels_.clear();
82 }
83
84 void Execute(int num_pages, int levels_per_page, int batch_size,
85 const ColumnDescriptor* d, Encoding::type encoding) {
86 num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
87 values_, data_buffer_, pages_, encoding);
88 num_levels_ = num_pages * levels_per_page;
89 InitScanner(d);
90 CheckResults(batch_size, d);
91 Clear();
92 }
93
94 void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
95 std::shared_ptr<ColumnDescriptor>& d2,
96 std::shared_ptr<ColumnDescriptor>& d3, int length) {
97 NodePtr type;
98 type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::type_num,
99 ConvertedType::NONE, length);
100 d1.reset(new ColumnDescriptor(type, 0, 0));
101 type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, Type::type_num,
102 ConvertedType::NONE, length);
103 d2.reset(new ColumnDescriptor(type, 4, 0));
104 type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, Type::type_num,
105 ConvertedType::NONE, length);
106 d3.reset(new ColumnDescriptor(type, 4, 2));
107 }
108
109 void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length,
110 Encoding::type encoding = Encoding::PLAIN) {
111 std::shared_ptr<ColumnDescriptor> d1;
112 std::shared_ptr<ColumnDescriptor> d2;
113 std::shared_ptr<ColumnDescriptor> d3;
114 InitDescriptors(d1, d2, d3, type_length);
115 // evaluate REQUIRED pages
116 Execute(num_pages, num_levels, batch_size, d1.get(), encoding);
117 // evaluate OPTIONAL pages
118 Execute(num_pages, num_levels, batch_size, d2.get(), encoding);
119 // evaluate REPEATED pages
120 Execute(num_pages, num_levels, batch_size, d3.get(), encoding);
121 }
122
123 protected:
124 int num_levels_;
125 int num_values_;
126 std::vector<std::shared_ptr<Page>> pages_;
127 std::shared_ptr<Scanner> scanner_;
128 std::vector<c_type> values_;
129 std::vector<int16_t> def_levels_;
130 std::vector<int16_t> rep_levels_;
131 std::vector<uint8_t> data_buffer_; // For BA and FLBA
132 };
133
134 static int num_levels_per_page = 100;
135 static int num_pages = 20;
136 static int batch_size = 32;
137
138 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
139 ByteArrayType>
140 TestTypes;
141
142 using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
143 using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;
144
145 TYPED_TEST_SUITE(TestFlatScanner, TestTypes);
146
147 TYPED_TEST(TestFlatScanner, TestPlainScanner) {
148 ASSERT_NO_FATAL_FAILURE(
149 this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN));
150 }
151
152 TYPED_TEST(TestFlatScanner, TestDictScanner) {
153 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0,
154 Encoding::RLE_DICTIONARY));
155 }
156
157 TEST_F(TestBooleanFlatScanner, TestPlainScanner) {
158 ASSERT_NO_FATAL_FAILURE(
159 this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0));
160 }
161
162 TEST_F(TestFLBAFlatScanner, TestPlainScanner) {
163 ASSERT_NO_FATAL_FAILURE(
164 this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH));
165 }
166
167 TEST_F(TestFLBAFlatScanner, TestDictScanner) {
168 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size,
169 FLBA_LENGTH, Encoding::RLE_DICTIONARY));
170 }
171
172 TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
173 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size,
174 FLBA_LENGTH, Encoding::PLAIN_DICTIONARY));
175 }
176
177 // PARQUET 502
178 TEST_F(TestFLBAFlatScanner, TestSmallBatch) {
179 NodePtr type =
180 schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
181 ConvertedType::DECIMAL, FLBA_LENGTH, 10, 2);
182 const ColumnDescriptor d(type, 0, 0);
183 num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
184 data_buffer_, pages_);
185 num_levels_ = 1 * 100;
186 InitScanner(&d);
187 ASSERT_NO_FATAL_FAILURE(CheckResults(1, &d));
188 }
189
190 TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
191 NodePtr type =
192 schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
193 ConvertedType::DECIMAL, FLBA_LENGTH, 10, 2);
194 const ColumnDescriptor d(type, 4, 0);
195 num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
196 data_buffer_, pages_);
197 num_levels_ = 1 * 100;
198 InitScanner(&d);
199 TypedScanner<FLBAType>* scanner =
200 reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
201 ASSERT_EQ(10, scanner->descr()->type_precision());
202 ASSERT_EQ(2, scanner->descr()->type_scale());
203 ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length());
204 }
205
206 TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) {
207 NodePtr type =
208 schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
209 ConvertedType::DECIMAL, FLBA_LENGTH, 10, 2);
210 const ColumnDescriptor d(type, 4, 0);
211 num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
212 data_buffer_, pages_);
213 num_levels_ = 1 * 100;
214 InitScanner(&d);
215 TypedScanner<FLBAType>* scanner =
216 reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
217 scanner->SetBatchSize(batch_size);
218 std::stringstream ss_fail;
219 for (int i = 0; i < num_levels_; i++) {
220 std::stringstream ss;
221 scanner->PrintNext(ss, 17);
222 std::string result = ss.str();
223 ASSERT_LE(17, result.size()) << i;
224 }
225 ASSERT_THROW(scanner->PrintNext(ss_fail, 17), ParquetException);
226 }
227
228 } // namespace test
229 } // namespace parquet