]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/parquet/column_scanner_test.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include <gtest/gtest.h>
28 #include "arrow/testing/gtest_compat.h"
30 #include "parquet/column_page.h"
31 #include "parquet/column_scanner.h"
32 #include "parquet/schema.h"
33 #include "parquet/test_util.h"
34 #include "parquet/types.h"
38 using schema::NodePtr
;
42 template <typename Type
>
43 class TestFlatScanner
: public ::testing::Test
{
45 using c_type
= typename
Type::c_type
;
47 void InitScanner(const ColumnDescriptor
* d
) {
48 std::unique_ptr
<PageReader
> pager(new test::MockPageReader(pages_
));
49 scanner_
= Scanner::Make(ColumnReader::Make(d
, std::move(pager
)));
52 void CheckResults(int batch_size
, const ColumnDescriptor
* d
) {
53 TypedScanner
<Type
>* scanner
= reinterpret_cast<TypedScanner
<Type
>*>(scanner_
.get());
59 scanner
->SetBatchSize(batch_size
);
60 for (int i
= 0; i
< num_levels_
; i
++) {
61 ASSERT_TRUE(scanner
->Next(&val
, &def_level
, &rep_level
, &is_null
)) << i
<< j
;
63 ASSERT_EQ(values_
[j
], val
) << i
<< "V" << j
;
66 if (d
->max_definition_level() > 0) {
67 ASSERT_EQ(def_levels_
[i
], def_level
) << i
<< "D" << j
;
69 if (d
->max_repetition_level() > 0) {
70 ASSERT_EQ(rep_levels_
[i
], rep_level
) << i
<< "R" << j
;
73 ASSERT_EQ(num_values_
, j
);
74 ASSERT_FALSE(scanner
->Next(&val
, &def_level
, &rep_level
, &is_null
));
84 void Execute(int num_pages
, int levels_per_page
, int batch_size
,
85 const ColumnDescriptor
* d
, Encoding::type encoding
) {
86 num_values_
= MakePages
<Type
>(d
, num_pages
, levels_per_page
, def_levels_
, rep_levels_
,
87 values_
, data_buffer_
, pages_
, encoding
);
88 num_levels_
= num_pages
* levels_per_page
;
90 CheckResults(batch_size
, d
);
94 void InitDescriptors(std::shared_ptr
<ColumnDescriptor
>& d1
,
95 std::shared_ptr
<ColumnDescriptor
>& d2
,
96 std::shared_ptr
<ColumnDescriptor
>& d3
, int length
) {
98 type
= schema::PrimitiveNode::Make("c1", Repetition::REQUIRED
, Type::type_num
,
99 ConvertedType::NONE
, length
);
100 d1
.reset(new ColumnDescriptor(type
, 0, 0));
101 type
= schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL
, Type::type_num
,
102 ConvertedType::NONE
, length
);
103 d2
.reset(new ColumnDescriptor(type
, 4, 0));
104 type
= schema::PrimitiveNode::Make("c3", Repetition::REPEATED
, Type::type_num
,
105 ConvertedType::NONE
, length
);
106 d3
.reset(new ColumnDescriptor(type
, 4, 2));
109 void ExecuteAll(int num_pages
, int num_levels
, int batch_size
, int type_length
,
110 Encoding::type encoding
= Encoding::PLAIN
) {
111 std::shared_ptr
<ColumnDescriptor
> d1
;
112 std::shared_ptr
<ColumnDescriptor
> d2
;
113 std::shared_ptr
<ColumnDescriptor
> d3
;
114 InitDescriptors(d1
, d2
, d3
, type_length
);
115 // evaluate REQUIRED pages
116 Execute(num_pages
, num_levels
, batch_size
, d1
.get(), encoding
);
117 // evaluate OPTIONAL pages
118 Execute(num_pages
, num_levels
, batch_size
, d2
.get(), encoding
);
119 // evaluate REPEATED pages
120 Execute(num_pages
, num_levels
, batch_size
, d3
.get(), encoding
);
126 std::vector
<std::shared_ptr
<Page
>> pages_
;
127 std::shared_ptr
<Scanner
> scanner_
;
128 std::vector
<c_type
> values_
;
129 std::vector
<int16_t> def_levels_
;
130 std::vector
<int16_t> rep_levels_
;
131 std::vector
<uint8_t> data_buffer_
; // For BA and FLBA
134 static int num_levels_per_page
= 100;
135 static int num_pages
= 20;
136 static int batch_size
= 32;
138 typedef ::testing::Types
<Int32Type
, Int64Type
, Int96Type
, FloatType
, DoubleType
,
142 using TestBooleanFlatScanner
= TestFlatScanner
<BooleanType
>;
143 using TestFLBAFlatScanner
= TestFlatScanner
<FLBAType
>;
145 TYPED_TEST_SUITE(TestFlatScanner
, TestTypes
);
147 TYPED_TEST(TestFlatScanner
, TestPlainScanner
) {
148 ASSERT_NO_FATAL_FAILURE(
149 this->ExecuteAll(num_pages
, num_levels_per_page
, batch_size
, 0, Encoding::PLAIN
));
152 TYPED_TEST(TestFlatScanner
, TestDictScanner
) {
153 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages
, num_levels_per_page
, batch_size
, 0,
154 Encoding::RLE_DICTIONARY
));
157 TEST_F(TestBooleanFlatScanner
, TestPlainScanner
) {
158 ASSERT_NO_FATAL_FAILURE(
159 this->ExecuteAll(num_pages
, num_levels_per_page
, batch_size
, 0));
162 TEST_F(TestFLBAFlatScanner
, TestPlainScanner
) {
163 ASSERT_NO_FATAL_FAILURE(
164 this->ExecuteAll(num_pages
, num_levels_per_page
, batch_size
, FLBA_LENGTH
));
167 TEST_F(TestFLBAFlatScanner
, TestDictScanner
) {
168 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages
, num_levels_per_page
, batch_size
,
169 FLBA_LENGTH
, Encoding::RLE_DICTIONARY
));
172 TEST_F(TestFLBAFlatScanner
, TestPlainDictScanner
) {
173 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages
, num_levels_per_page
, batch_size
,
174 FLBA_LENGTH
, Encoding::PLAIN_DICTIONARY
));
178 TEST_F(TestFLBAFlatScanner
, TestSmallBatch
) {
180 schema::PrimitiveNode::Make("c1", Repetition::REQUIRED
, Type::FIXED_LEN_BYTE_ARRAY
,
181 ConvertedType::DECIMAL
, FLBA_LENGTH
, 10, 2);
182 const ColumnDescriptor
d(type
, 0, 0);
183 num_values_
= MakePages
<FLBAType
>(&d
, 1, 100, def_levels_
, rep_levels_
, values_
,
184 data_buffer_
, pages_
);
185 num_levels_
= 1 * 100;
187 ASSERT_NO_FATAL_FAILURE(CheckResults(1, &d
));
190 TEST_F(TestFLBAFlatScanner
, TestDescriptorAPI
) {
192 schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL
, Type::FIXED_LEN_BYTE_ARRAY
,
193 ConvertedType::DECIMAL
, FLBA_LENGTH
, 10, 2);
194 const ColumnDescriptor
d(type
, 4, 0);
195 num_values_
= MakePages
<FLBAType
>(&d
, 1, 100, def_levels_
, rep_levels_
, values_
,
196 data_buffer_
, pages_
);
197 num_levels_
= 1 * 100;
199 TypedScanner
<FLBAType
>* scanner
=
200 reinterpret_cast<TypedScanner
<FLBAType
>*>(scanner_
.get());
201 ASSERT_EQ(10, scanner
->descr()->type_precision());
202 ASSERT_EQ(2, scanner
->descr()->type_scale());
203 ASSERT_EQ(FLBA_LENGTH
, scanner
->descr()->type_length());
206 TEST_F(TestFLBAFlatScanner
, TestFLBAPrinterNext
) {
208 schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL
, Type::FIXED_LEN_BYTE_ARRAY
,
209 ConvertedType::DECIMAL
, FLBA_LENGTH
, 10, 2);
210 const ColumnDescriptor
d(type
, 4, 0);
211 num_values_
= MakePages
<FLBAType
>(&d
, 1, 100, def_levels_
, rep_levels_
, values_
,
212 data_buffer_
, pages_
);
213 num_levels_
= 1 * 100;
215 TypedScanner
<FLBAType
>* scanner
=
216 reinterpret_cast<TypedScanner
<FLBAType
>*>(scanner_
.get());
217 scanner
->SetBatchSize(batch_size
);
218 std::stringstream ss_fail
;
219 for (int i
= 0; i
< num_levels_
; i
++) {
220 std::stringstream ss
;
221 scanner
->PrintNext(ss
, 17);
222 std::string result
= ss
.str();
223 ASSERT_LE(17, result
.size()) << i
;
225 ASSERT_THROW(scanner
->PrintNext(ss_fail
, 17), ParquetException
);
229 } // namespace parquet