]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/dataset/file_parquet_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / file_parquet_test.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/dataset/file_parquet.h"
19
20 #include <memory>
21 #include <utility>
22 #include <vector>
23
24 #include "arrow/dataset/dataset_internal.h"
25 #include "arrow/dataset/scanner_internal.h"
26 #include "arrow/dataset/test_util.h"
27 #include "arrow/io/memory.h"
28 #include "arrow/io/util_internal.h"
29 #include "arrow/record_batch.h"
30 #include "arrow/table.h"
31 #include "arrow/testing/gtest_util.h"
32 #include "arrow/testing/util.h"
33 #include "arrow/type.h"
34 #include "arrow/type_fwd.h"
35 #include "arrow/util/range.h"
36
37 #include "parquet/arrow/writer.h"
38 #include "parquet/metadata.h"
39
40 namespace arrow {
41
42 using internal::checked_pointer_cast;
43
44 namespace dataset {
45
46 using parquet::ArrowWriterProperties;
47 using parquet::default_arrow_writer_properties;
48
49 using parquet::default_writer_properties;
50 using parquet::WriterProperties;
51
52 using parquet::CreateOutputStream;
53 using parquet::arrow::WriteTable;
54
55 using testing::Pointee;
56
57 class ParquetFormatHelper {
58 public:
59 using FormatType = ParquetFileFormat;
60
61 static Result<std::shared_ptr<Buffer>> Write(RecordBatchReader* reader) {
62 auto pool = ::arrow::default_memory_pool();
63 std::shared_ptr<Buffer> out;
64 auto sink = CreateOutputStream(pool);
65 RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink));
66 return sink->Finish();
67 }
68 static std::shared_ptr<ParquetFileFormat> MakeFormat() {
69 return std::make_shared<ParquetFileFormat>();
70 }
71
72 private:
73 static Status WriteRecordBatch(const RecordBatch& batch,
74 parquet::arrow::FileWriter* writer) {
75 auto schema = batch.schema();
76 auto size = batch.num_rows();
77
78 if (!schema->Equals(*writer->schema(), false)) {
79 return Status::Invalid("RecordBatch schema does not match this writer's. batch:'",
80 schema->ToString(), "' this:'", writer->schema()->ToString(),
81 "'");
82 }
83
84 RETURN_NOT_OK(writer->NewRowGroup(size));
85 for (int i = 0; i < batch.num_columns(); i++) {
86 RETURN_NOT_OK(writer->WriteColumnChunk(*batch.column(i)));
87 }
88
89 return Status::OK();
90 }
91
92 static Status WriteRecordBatchReader(RecordBatchReader* reader,
93 parquet::arrow::FileWriter* writer) {
94 auto schema = reader->schema();
95
96 if (!schema->Equals(*writer->schema(), false)) {
97 return Status::Invalid("RecordBatch schema does not match this writer's. batch:'",
98 schema->ToString(), "' this:'", writer->schema()->ToString(),
99 "'");
100 }
101
102 return MakeFunctionIterator([reader] { return reader->Next(); })
103 .Visit([&](std::shared_ptr<RecordBatch> batch) {
104 return WriteRecordBatch(*batch, writer);
105 });
106 }
107
108 static Status WriteRecordBatchReader(
109 RecordBatchReader* reader, MemoryPool* pool,
110 const std::shared_ptr<io::OutputStream>& sink,
111 const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
112 const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
113 default_arrow_writer_properties()) {
114 std::unique_ptr<parquet::arrow::FileWriter> writer;
115 RETURN_NOT_OK(parquet::arrow::FileWriter::Open(
116 *reader->schema(), pool, sink, properties, arrow_properties, &writer));
117 RETURN_NOT_OK(WriteRecordBatchReader(reader, writer.get()));
118 return writer->Close();
119 }
120 };
121
122 class TestParquetFileFormat : public FileFormatFixtureMixin<ParquetFormatHelper> {
123 public:
124 RecordBatchIterator Batches(ScanTaskIterator scan_task_it) {
125 return MakeFlattenIterator(MakeMaybeMapIterator(
126 [](std::shared_ptr<ScanTask> scan_task) { return scan_task->Execute(); },
127 std::move(scan_task_it)));
128 }
129
130 RecordBatchIterator Batches(Fragment* fragment) {
131 EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_));
132 return Batches(std::move(scan_task_it));
133 }
134
135 std::shared_ptr<RecordBatch> SingleBatch(Fragment* fragment) {
136 auto batches = IteratorToVector(Batches(fragment));
137 EXPECT_EQ(batches.size(), 1);
138 return batches.front();
139 }
140
141 void CountRowsAndBatchesInScan(Fragment* fragment, int64_t expected_rows,
142 int64_t expected_batches) {
143 int64_t actual_rows = 0;
144 int64_t actual_batches = 0;
145
146 for (auto maybe_batch : Batches(fragment)) {
147 ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
148 actual_rows += batch->num_rows();
149 ++actual_batches;
150 }
151
152 EXPECT_EQ(actual_rows, expected_rows);
153 EXPECT_EQ(actual_batches, expected_batches);
154 }
155
156 void CountRowsAndBatchesInScan(const std::shared_ptr<Fragment>& fragment,
157 int64_t expected_rows, int64_t expected_batches) {
158 return CountRowsAndBatchesInScan(fragment.get(), expected_rows, expected_batches);
159 }
160
161 void CountRowGroupsInFragment(const std::shared_ptr<Fragment>& fragment,
162 std::vector<int> expected_row_groups,
163 compute::Expression filter) {
164 SetFilter(filter);
165
166 auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
167 ASSERT_OK_AND_ASSIGN(auto fragments, parquet_fragment->SplitByRowGroup(opts_->filter))
168
169 EXPECT_EQ(fragments.size(), expected_row_groups.size());
170 for (size_t i = 0; i < fragments.size(); i++) {
171 auto expected = expected_row_groups[i];
172 auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragments[i]);
173
174 EXPECT_EQ(parquet_fragment->row_groups(), std::vector<int>{expected});
175 EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1);
176 }
177 }
178 };
179
180 TEST_F(TestParquetFileFormat, InspectFailureWithRelevantError) {
181 TestInspectFailureWithRelevantError(StatusCode::Invalid, "Parquet");
182 }
183 TEST_F(TestParquetFileFormat, Inspect) { TestInspect(); }
184
185 TEST_F(TestParquetFileFormat, InspectDictEncoded) {
186 auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));
187 auto source = GetFileSource(reader.get());
188
189 format_->reader_options.dict_columns = {"utf8"};
190 ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get()));
191
192 Schema expected_schema({field("utf8", dictionary(int32(), utf8()))});
193 AssertSchemaEqual(*actual, expected_schema, /* check_metadata = */ false);
194 }
195
196 TEST_F(TestParquetFileFormat, IsSupported) { TestIsSupported(); }
197
198 TEST_F(TestParquetFileFormat, WriteRecordBatchReader) { TestWrite(); }
199
200 TEST_F(TestParquetFileFormat, WriteRecordBatchReaderCustomOptions) {
201 TimeUnit::type coerce_timestamps_to = TimeUnit::MICRO,
202 coerce_timestamps_from = TimeUnit::NANO;
203
204 auto reader =
205 GetRecordBatchReader(schema({field("ts", timestamp(coerce_timestamps_from))}));
206 auto options =
207 checked_pointer_cast<ParquetFileWriteOptions>(format_->DefaultWriteOptions());
208 options->writer_properties = parquet::WriterProperties::Builder()
209 .created_by("TestParquetFileFormat")
210 ->disable_statistics()
211 ->build();
212 options->arrow_writer_properties = parquet::ArrowWriterProperties::Builder()
213 .coerce_timestamps(coerce_timestamps_to)
214 ->allow_truncated_timestamps()
215 ->build();
216
217 auto written = WriteToBuffer(reader->schema(), options);
218
219 EXPECT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(FileSource{written}));
220 EXPECT_OK_AND_ASSIGN(auto actual_schema, fragment->ReadPhysicalSchema());
221 AssertSchemaEqual(Schema({field("ts", timestamp(coerce_timestamps_to))}),
222 *actual_schema);
223 }
224
225 TEST_F(TestParquetFileFormat, CountRows) { TestCountRows(); }
226
227 TEST_F(TestParquetFileFormat, CountRowsPredicatePushdown) {
228 constexpr int64_t kNumRowGroups = 16;
229 constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
230
231 // See PredicatePushdown test below for a description of the generated data
232 auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
233 auto source = GetFileSource(reader.get());
234 auto options = std::make_shared<ScanOptions>();
235
236 auto fragment = MakeFragment(*source);
237
238 ASSERT_FINISHES_OK_AND_EQ(util::make_optional<int64_t>(kTotalNumRows),
239 fragment->CountRows(literal(true), options));
240
241 for (int i = 1; i <= kNumRowGroups; i++) {
242 SCOPED_TRACE(i);
243 // The row group for which all values in column i64 == i has i rows
244 auto predicate = less_equal(field_ref("i64"), literal(i));
245 ASSERT_OK_AND_ASSIGN(predicate, predicate.Bind(*reader->schema()));
246 auto expected = i * (i + 1) / 2;
247 ASSERT_FINISHES_OK_AND_EQ(util::make_optional<int64_t>(expected),
248 fragment->CountRows(predicate, options));
249
250 predicate = and_(less_equal(field_ref("i64"), literal(i)),
251 greater_equal(field_ref("i64"), literal(i)));
252 ASSERT_OK_AND_ASSIGN(predicate, predicate.Bind(*reader->schema()));
253 ASSERT_FINISHES_OK_AND_EQ(util::make_optional<int64_t>(i),
254 fragment->CountRows(predicate, options));
255
256 predicate = equal(field_ref("i64"), literal(i));
257 ASSERT_OK_AND_ASSIGN(predicate, predicate.Bind(*reader->schema()));
258 ASSERT_FINISHES_OK_AND_EQ(util::make_optional<int64_t>(i),
259 fragment->CountRows(predicate, options));
260 }
261
262 // Ensure nulls are properly handled
263 {
264 auto dataset_schema = schema({field("i64", int64())});
265 auto null_batch = RecordBatchFromJSON(dataset_schema, R"([
266 [null],
267 [null],
268 [null]
269 ])");
270 auto batch = RecordBatchFromJSON(dataset_schema, R"([
271 [1],
272 [2]
273 ])");
274 ASSERT_OK_AND_ASSIGN(auto reader,
275 RecordBatchReader::Make({null_batch, batch}, dataset_schema));
276 auto source = GetFileSource(reader.get());
277 auto fragment = MakeFragment(*source);
278 ASSERT_OK_AND_ASSIGN(
279 auto predicate,
280 greater_equal(field_ref("i64"), literal(1)).Bind(*dataset_schema));
281 ASSERT_FINISHES_OK_AND_EQ(util::make_optional<int64_t>(2),
282 fragment->CountRows(predicate, options));
283 // TODO(ARROW-12659): SimplifyWithGuarantee can't handle
284 // not(is_null) so trying to count with is_null doesn't work
285 }
286 }
287
288 TEST_F(TestParquetFileFormat, MultithreadedScan) {
289 constexpr int64_t kNumRowGroups = 16;
290
291 // See PredicatePushdown test below for a description of the generated data
292 auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
293 auto source = GetFileSource(reader.get());
294 auto options = std::make_shared<ScanOptions>();
295
296 auto fragment = MakeFragment(*source);
297
298 FragmentDataset dataset(ArithmeticDatasetFixture::schema(), {fragment});
299 ScannerBuilder builder({&dataset, [](...) {}});
300
301 ASSERT_OK(builder.UseAsync(true));
302 ASSERT_OK(builder.UseThreads(true));
303 ASSERT_OK(builder.Project({call("add", {field_ref("i64"), literal(3)})}, {""}));
304 ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish());
305
306 ASSERT_OK_AND_ASSIGN(auto gen, scanner->ScanBatchesUnorderedAsync());
307
308 auto collect_fut = CollectAsyncGenerator(gen);
309 ASSERT_OK_AND_ASSIGN(auto batches, collect_fut.result());
310
311 ASSERT_EQ(batches.size(), kNumRowGroups);
312 }
313
314 class TestParquetFileSystemDataset : public WriteFileSystemDatasetMixin,
315 public testing::Test {
316 public:
317 void SetUp() override {
318 MakeSourceDataset();
319 check_metadata_ = false;
320 auto parquet_format = std::make_shared<ParquetFileFormat>();
321 format_ = parquet_format;
322 SetWriteOptions(parquet_format->DefaultWriteOptions());
323 }
324 };
325
326 TEST_F(TestParquetFileSystemDataset, WriteWithIdenticalPartitioningSchema) {
327 TestWriteWithIdenticalPartitioningSchema();
328 }
329
330 TEST_F(TestParquetFileSystemDataset, WriteWithUnrelatedPartitioningSchema) {
331 TestWriteWithUnrelatedPartitioningSchema();
332 }
333
334 TEST_F(TestParquetFileSystemDataset, WriteWithSupersetPartitioningSchema) {
335 TestWriteWithSupersetPartitioningSchema();
336 }
337
338 TEST_F(TestParquetFileSystemDataset, WriteWithEmptyPartitioningSchema) {
339 TestWriteWithEmptyPartitioningSchema();
340 }
341
342 class TestParquetFileFormatScan : public FileFormatScanMixin<ParquetFormatHelper> {
343 public:
344 std::shared_ptr<RecordBatch> SingleBatch(std::shared_ptr<Fragment> fragment) {
345 auto batches = IteratorToVector(PhysicalBatches(fragment));
346 EXPECT_EQ(batches.size(), 1);
347 return batches.front();
348 }
349
350 void CountRowsAndBatchesInScan(std::shared_ptr<Fragment> fragment,
351 int64_t expected_rows, int64_t expected_batches) {
352 int64_t actual_rows = 0;
353 int64_t actual_batches = 0;
354
355 for (auto maybe_batch : PhysicalBatches(fragment)) {
356 ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
357 actual_rows += batch->num_rows();
358 ++actual_batches;
359 }
360
361 EXPECT_EQ(actual_rows, expected_rows);
362 EXPECT_EQ(actual_batches, expected_batches);
363 }
364
365 void CountRowGroupsInFragment(const std::shared_ptr<Fragment>& fragment,
366 std::vector<int> expected_row_groups,
367 compute::Expression filter) {
368 SetFilter(filter);
369
370 auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
371 ASSERT_OK_AND_ASSIGN(auto fragments, parquet_fragment->SplitByRowGroup(opts_->filter))
372
373 EXPECT_EQ(fragments.size(), expected_row_groups.size());
374 for (size_t i = 0; i < fragments.size(); i++) {
375 auto expected = expected_row_groups[i];
376 auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragments[i]);
377
378 EXPECT_EQ(parquet_fragment->row_groups(), std::vector<int>{expected});
379 EXPECT_EQ(SingleBatch(parquet_fragment)->num_rows(), expected + 1);
380 }
381 }
382 };
383
384 TEST_P(TestParquetFileFormatScan, ScanRecordBatchReader) { TestScan(); }
385 TEST_P(TestParquetFileFormatScan, ScanBatchSize) { TestScanBatchSize(); }
386 TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
387 TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
388 TestScanProjectedMissingCols();
389 }
390 TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderDictEncoded) {
391 auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));
392 auto source = GetFileSource(reader.get());
393
394 SetSchema(reader->schema()->fields());
395 SetFilter(literal(true));
396 format_->reader_options.dict_columns = {"utf8"};
397 ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
398
399 int64_t row_count = 0;
400 Schema expected_schema({field("utf8", dictionary(int32(), utf8()))});
401
402 for (auto maybe_batch : PhysicalBatches(fragment)) {
403 ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
404 row_count += batch->num_rows();
405 AssertSchemaEqual(*batch->schema(), expected_schema, /* check_metadata = */ false);
406 }
407 ASSERT_EQ(row_count, expected_rows());
408 }
409 TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderPreBuffer) {
410 auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
411 auto source = GetFileSource(reader.get());
412
413 SetSchema(reader->schema()->fields());
414 SetFilter(literal(true));
415
416 ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
417 auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
418 fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
419 opts_->fragment_scan_options = fragment_scan_options;
420 ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_));
421
422 int64_t row_count = 0;
423 for (auto maybe_batch : PhysicalBatches(fragment)) {
424 ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
425 row_count += batch->num_rows();
426 }
427 ASSERT_EQ(row_count, expected_rows());
428 }
429 TEST_P(TestParquetFileFormatScan, PredicatePushdown) {
430 // Given a number `n`, the arithmetic dataset creates n RecordBatches where
431 // each RecordBatch is keyed by a unique integer in [1, n]. Let `rb_i` denote
432 // the record batch keyed by `i`. `rb_i` is composed of `i` rows where all
433 // values are a variant of `i`, e.g. {"i64": i, "u8": i, ... }.
434 //
435 // Thus the ArithmeticDataset(n) has n RecordBatches and the total number of
436 // rows is n(n+1)/2.
437 //
438 // This test uses the Fragment directly, and so no post-filtering is
439 // applied via ScanOptions' evaluator. Thus, counting the number of returned
440 // rows and returned row groups is a good enough proxy to check if pushdown
441 // predicate is working.
442
443 constexpr int64_t kNumRowGroups = 16;
444 constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
445
446 auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
447 auto source = GetFileSource(reader.get());
448
449 SetSchema(reader->schema()->fields());
450 ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
451
452 SetFilter(literal(true));
453 CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups);
454
455 for (int64_t i = 1; i <= kNumRowGroups; i++) {
456 SetFilter(equal(field_ref("i64"), literal(i)));
457 CountRowsAndBatchesInScan(fragment, i, 1);
458 }
459
460 // Out of bound filters should skip all RowGroups.
461 SetFilter(literal(false));
462 CountRowsAndBatchesInScan(fragment, 0, 0);
463 SetFilter(equal(field_ref("i64"), literal<int64_t>(kNumRowGroups + 1)));
464 CountRowsAndBatchesInScan(fragment, 0, 0);
465 SetFilter(equal(field_ref("i64"), literal<int64_t>(-1)));
466 CountRowsAndBatchesInScan(fragment, 0, 0);
467 // No rows match 1 and 2.
468 SetFilter(and_(equal(field_ref("i64"), literal<int64_t>(1)),
469 equal(field_ref("u8"), literal<uint8_t>(2))));
470 CountRowsAndBatchesInScan(fragment, 0, 0);
471
472 SetFilter(or_(equal(field_ref("i64"), literal<int64_t>(2)),
473 equal(field_ref("i64"), literal<int64_t>(4))));
474 CountRowsAndBatchesInScan(fragment, 2 + 4, 2);
475
476 SetFilter(less(field_ref("i64"), literal<int64_t>(6)));
477 CountRowsAndBatchesInScan(fragment, 5 * (5 + 1) / 2, 5);
478
479 SetFilter(greater_equal(field_ref("i64"), literal<int64_t>(6)));
480 CountRowsAndBatchesInScan(fragment, kTotalNumRows - (5 * (5 + 1) / 2),
481 kNumRowGroups - 5);
482 }
483
484 TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragments) {
485 constexpr int64_t kNumRowGroups = 16;
486
487 auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
488 auto source = GetFileSource(reader.get());
489
490 SetSchema(reader->schema()->fields());
491 ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
492
493 auto all_row_groups = ::arrow::internal::Iota(static_cast<int>(kNumRowGroups));
494 CountRowGroupsInFragment(fragment, all_row_groups, literal(true));
495
496 for (int i = 0; i < kNumRowGroups; ++i) {
497 CountRowGroupsInFragment(fragment, {i}, equal(field_ref("i64"), literal(i + 1)));
498 }
499
500 // Out of bound filters should skip all RowGroups.
501 CountRowGroupsInFragment(fragment, {}, literal(false));
502 CountRowGroupsInFragment(fragment, {},
503 equal(field_ref("i64"), literal(kNumRowGroups + 1)));
504 CountRowGroupsInFragment(fragment, {}, equal(field_ref("i64"), literal(-1)));
505
506 // No rows match 1 and 2.
507 CountRowGroupsInFragment(
508 fragment, {},
509 and_(equal(field_ref("i64"), literal(1)), equal(field_ref("u8"), literal(2))));
510 CountRowGroupsInFragment(
511 fragment, {},
512 and_(equal(field_ref("i64"), literal(2)), equal(field_ref("i64"), literal(4))));
513
514 CountRowGroupsInFragment(
515 fragment, {1, 3},
516 or_(equal(field_ref("i64"), literal(2)), equal(field_ref("i64"), literal(4))));
517
518 auto set = ArrayFromJSON(int64(), "[2, 4]");
519 CountRowGroupsInFragment(
520 fragment, {1, 3},
521 call("is_in", {field_ref("i64")}, compute::SetLookupOptions{set}));
522
523 CountRowGroupsInFragment(fragment, {0, 1, 2, 3, 4}, less(field_ref("i64"), literal(6)));
524
525 CountRowGroupsInFragment(fragment,
526 ::arrow::internal::Iota(5, static_cast<int>(kNumRowGroups)),
527 greater_equal(field_ref("i64"), literal(6)));
528
529 CountRowGroupsInFragment(fragment, {5, 6},
530 and_(greater_equal(field_ref("i64"), literal(6)),
531 less(field_ref("i64"), literal(8))));
532 }
533
534 TEST_P(TestParquetFileFormatScan, ExplicitRowGroupSelection) {
535 constexpr int64_t kNumRowGroups = 16;
536 constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
537
538 auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
539 auto source = GetFileSource(reader.get());
540
541 SetSchema(reader->schema()->fields());
542 SetFilter(literal(true));
543
544 auto row_groups_fragment = [&](std::vector<int> row_groups) {
545 EXPECT_OK_AND_ASSIGN(auto fragment,
546 format_->MakeFragment(*source, literal(true),
547 /*physical_schema=*/nullptr, row_groups));
548 return fragment;
549 };
550
551 // select all row groups
552 EXPECT_OK_AND_ASSIGN(auto all_row_groups_fragment,
553 format_->MakeFragment(*source, literal(true))
554 .Map([](std::shared_ptr<FileFragment> f) {
555 return checked_pointer_cast<ParquetFileFragment>(f);
556 }));
557
558 EXPECT_EQ(all_row_groups_fragment->row_groups(), std::vector<int>{});
559
560 ARROW_EXPECT_OK(all_row_groups_fragment->EnsureCompleteMetadata());
561 CountRowsAndBatchesInScan(all_row_groups_fragment, kTotalNumRows, kNumRowGroups);
562
563 // individual selection selects a single row group
564 for (int i = 0; i < kNumRowGroups; ++i) {
565 CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1);
566 EXPECT_EQ(row_groups_fragment({i})->row_groups(), std::vector<int>{i});
567 }
568
569 for (int i = 0; i < kNumRowGroups; ++i) {
570 // conflicting selection/filter
571 SetFilter(equal(field_ref("i64"), literal(i)));
572 CountRowsAndBatchesInScan(row_groups_fragment({i}), 0, 0);
573 }
574
575 for (int i = 0; i < kNumRowGroups; ++i) {
576 // identical selection/filter
577 SetFilter(equal(field_ref("i64"), literal(i + 1)));
578 CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1);
579 }
580
581 SetFilter(greater(field_ref("i64"), literal(3)));
582 CountRowsAndBatchesInScan(row_groups_fragment({2, 3, 4, 5}), 4 + 5 + 6, 3);
583
584 EXPECT_RAISES_WITH_MESSAGE_THAT(
585 IndexError,
586 testing::HasSubstr("only has " + std::to_string(kNumRowGroups) + " row groups"),
587 row_groups_fragment({kNumRowGroups + 1})->Scan(opts_));
588 }
589
590 TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringColumn) {
591 auto table = TableFromJSON(schema({field("x", utf8())}),
592 {
593 R"([{"x": "a"}])",
594 R"([{"x": "b"}, {"x": "b"}])",
595 R"([{"x": "c"}, {"x": "c"}, {"x": "c"}])",
596 R"([{"x": "a"}, {"x": "b"}, {"x": "c"}, {"x": "d"}])",
597 });
598 TableBatchReader reader(*table);
599 auto source = GetFileSource(&reader);
600
601 SetSchema(reader.schema()->fields());
602 ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
603
604 CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a")));
605 }
606
607 INSTANTIATE_TEST_SUITE_P(TestScan, TestParquetFileFormatScan,
608 ::testing::ValuesIn(TestFormatParams::Values()),
609 TestFormatParams::ToTestNameString);
610
611 } // namespace dataset
612 } // namespace arrow