1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/dataset/file_parquet.h"
24 #include "arrow/dataset/dataset_internal.h"
25 #include "arrow/dataset/scanner_internal.h"
26 #include "arrow/dataset/test_util.h"
27 #include "arrow/io/memory.h"
28 #include "arrow/io/util_internal.h"
29 #include "arrow/record_batch.h"
30 #include "arrow/table.h"
31 #include "arrow/testing/gtest_util.h"
32 #include "arrow/testing/util.h"
33 #include "arrow/type.h"
34 #include "arrow/type_fwd.h"
35 #include "arrow/util/range.h"
37 #include "parquet/arrow/writer.h"
38 #include "parquet/metadata.h"
42 using internal::checked_pointer_cast
;
46 using parquet::ArrowWriterProperties
;
47 using parquet::default_arrow_writer_properties
;
49 using parquet::default_writer_properties
;
50 using parquet::WriterProperties
;
52 using parquet::CreateOutputStream
;
53 using parquet::arrow::WriteTable
;
55 using testing::Pointee
;
57 class ParquetFormatHelper
{
59 using FormatType
= ParquetFileFormat
;
61 static Result
<std::shared_ptr
<Buffer
>> Write(RecordBatchReader
* reader
) {
62 auto pool
= ::arrow::default_memory_pool();
63 std::shared_ptr
<Buffer
> out
;
64 auto sink
= CreateOutputStream(pool
);
65 RETURN_NOT_OK(WriteRecordBatchReader(reader
, pool
, sink
));
66 return sink
->Finish();
68 static std::shared_ptr
<ParquetFileFormat
> MakeFormat() {
69 return std::make_shared
<ParquetFileFormat
>();
73 static Status
WriteRecordBatch(const RecordBatch
& batch
,
74 parquet::arrow::FileWriter
* writer
) {
75 auto schema
= batch
.schema();
76 auto size
= batch
.num_rows();
78 if (!schema
->Equals(*writer
->schema(), false)) {
79 return Status::Invalid("RecordBatch schema does not match this writer's. batch:'",
80 schema
->ToString(), "' this:'", writer
->schema()->ToString(),
84 RETURN_NOT_OK(writer
->NewRowGroup(size
));
85 for (int i
= 0; i
< batch
.num_columns(); i
++) {
86 RETURN_NOT_OK(writer
->WriteColumnChunk(*batch
.column(i
)));
92 static Status
WriteRecordBatchReader(RecordBatchReader
* reader
,
93 parquet::arrow::FileWriter
* writer
) {
94 auto schema
= reader
->schema();
96 if (!schema
->Equals(*writer
->schema(), false)) {
97 return Status::Invalid("RecordBatch schema does not match this writer's. batch:'",
98 schema
->ToString(), "' this:'", writer
->schema()->ToString(),
102 return MakeFunctionIterator([reader
] { return reader
->Next(); })
103 .Visit([&](std::shared_ptr
<RecordBatch
> batch
) {
104 return WriteRecordBatch(*batch
, writer
);
108 static Status
WriteRecordBatchReader(
109 RecordBatchReader
* reader
, MemoryPool
* pool
,
110 const std::shared_ptr
<io::OutputStream
>& sink
,
111 const std::shared_ptr
<WriterProperties
>& properties
= default_writer_properties(),
112 const std::shared_ptr
<ArrowWriterProperties
>& arrow_properties
=
113 default_arrow_writer_properties()) {
114 std::unique_ptr
<parquet::arrow::FileWriter
> writer
;
115 RETURN_NOT_OK(parquet::arrow::FileWriter::Open(
116 *reader
->schema(), pool
, sink
, properties
, arrow_properties
, &writer
));
117 RETURN_NOT_OK(WriteRecordBatchReader(reader
, writer
.get()));
118 return writer
->Close();
122 class TestParquetFileFormat
: public FileFormatFixtureMixin
<ParquetFormatHelper
> {
124 RecordBatchIterator
Batches(ScanTaskIterator scan_task_it
) {
125 return MakeFlattenIterator(MakeMaybeMapIterator(
126 [](std::shared_ptr
<ScanTask
> scan_task
) { return scan_task
->Execute(); },
127 std::move(scan_task_it
)));
130 RecordBatchIterator
Batches(Fragment
* fragment
) {
131 EXPECT_OK_AND_ASSIGN(auto scan_task_it
, fragment
->Scan(opts_
));
132 return Batches(std::move(scan_task_it
));
135 std::shared_ptr
<RecordBatch
> SingleBatch(Fragment
* fragment
) {
136 auto batches
= IteratorToVector(Batches(fragment
));
137 EXPECT_EQ(batches
.size(), 1);
138 return batches
.front();
141 void CountRowsAndBatchesInScan(Fragment
* fragment
, int64_t expected_rows
,
142 int64_t expected_batches
) {
143 int64_t actual_rows
= 0;
144 int64_t actual_batches
= 0;
146 for (auto maybe_batch
: Batches(fragment
)) {
147 ASSERT_OK_AND_ASSIGN(auto batch
, maybe_batch
);
148 actual_rows
+= batch
->num_rows();
152 EXPECT_EQ(actual_rows
, expected_rows
);
153 EXPECT_EQ(actual_batches
, expected_batches
);
156 void CountRowsAndBatchesInScan(const std::shared_ptr
<Fragment
>& fragment
,
157 int64_t expected_rows
, int64_t expected_batches
) {
158 return CountRowsAndBatchesInScan(fragment
.get(), expected_rows
, expected_batches
);
161 void CountRowGroupsInFragment(const std::shared_ptr
<Fragment
>& fragment
,
162 std::vector
<int> expected_row_groups
,
163 compute::Expression filter
) {
166 auto parquet_fragment
= checked_pointer_cast
<ParquetFileFragment
>(fragment
);
167 ASSERT_OK_AND_ASSIGN(auto fragments
, parquet_fragment
->SplitByRowGroup(opts_
->filter
))
169 EXPECT_EQ(fragments
.size(), expected_row_groups
.size());
170 for (size_t i
= 0; i
< fragments
.size(); i
++) {
171 auto expected
= expected_row_groups
[i
];
172 auto parquet_fragment
= checked_pointer_cast
<ParquetFileFragment
>(fragments
[i
]);
174 EXPECT_EQ(parquet_fragment
->row_groups(), std::vector
<int>{expected
});
175 EXPECT_EQ(SingleBatch(parquet_fragment
.get())->num_rows(), expected
+ 1);
180 TEST_F(TestParquetFileFormat
, InspectFailureWithRelevantError
) {
181 TestInspectFailureWithRelevantError(StatusCode::Invalid
, "Parquet");
183 TEST_F(TestParquetFileFormat
, Inspect
) { TestInspect(); }
185 TEST_F(TestParquetFileFormat
, InspectDictEncoded
) {
186 auto reader
= GetRecordBatchReader(schema({field("utf8", utf8())}));
187 auto source
= GetFileSource(reader
.get());
189 format_
->reader_options
.dict_columns
= {"utf8"};
190 ASSERT_OK_AND_ASSIGN(auto actual
, format_
->Inspect(*source
.get()));
192 Schema
expected_schema({field("utf8", dictionary(int32(), utf8()))});
193 AssertSchemaEqual(*actual
, expected_schema
, /* check_metadata = */ false);
196 TEST_F(TestParquetFileFormat
, IsSupported
) { TestIsSupported(); }
198 TEST_F(TestParquetFileFormat
, WriteRecordBatchReader
) { TestWrite(); }
200 TEST_F(TestParquetFileFormat
, WriteRecordBatchReaderCustomOptions
) {
201 TimeUnit::type coerce_timestamps_to
= TimeUnit::MICRO
,
202 coerce_timestamps_from
= TimeUnit::NANO
;
205 GetRecordBatchReader(schema({field("ts", timestamp(coerce_timestamps_from
))}));
207 checked_pointer_cast
<ParquetFileWriteOptions
>(format_
->DefaultWriteOptions());
208 options
->writer_properties
= parquet::WriterProperties::Builder()
209 .created_by("TestParquetFileFormat")
210 ->disable_statistics()
212 options
->arrow_writer_properties
= parquet::ArrowWriterProperties::Builder()
213 .coerce_timestamps(coerce_timestamps_to
)
214 ->allow_truncated_timestamps()
217 auto written
= WriteToBuffer(reader
->schema(), options
);
219 EXPECT_OK_AND_ASSIGN(auto fragment
, format_
->MakeFragment(FileSource
{written
}));
220 EXPECT_OK_AND_ASSIGN(auto actual_schema
, fragment
->ReadPhysicalSchema());
221 AssertSchemaEqual(Schema({field("ts", timestamp(coerce_timestamps_to
))}),
225 TEST_F(TestParquetFileFormat
, CountRows
) { TestCountRows(); }
227 TEST_F(TestParquetFileFormat
, CountRowsPredicatePushdown
) {
228 constexpr int64_t kNumRowGroups
= 16;
229 constexpr int64_t kTotalNumRows
= kNumRowGroups
* (kNumRowGroups
+ 1) / 2;
231 // See PredicatePushdown test below for a description of the generated data
232 auto reader
= ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups
);
233 auto source
= GetFileSource(reader
.get());
234 auto options
= std::make_shared
<ScanOptions
>();
236 auto fragment
= MakeFragment(*source
);
238 ASSERT_FINISHES_OK_AND_EQ(util::make_optional
<int64_t>(kTotalNumRows
),
239 fragment
->CountRows(literal(true), options
));
241 for (int i
= 1; i
<= kNumRowGroups
; i
++) {
243 // The row group for which all values in column i64 == i has i rows
244 auto predicate
= less_equal(field_ref("i64"), literal(i
));
245 ASSERT_OK_AND_ASSIGN(predicate
, predicate
.Bind(*reader
->schema()));
246 auto expected
= i
* (i
+ 1) / 2;
247 ASSERT_FINISHES_OK_AND_EQ(util::make_optional
<int64_t>(expected
),
248 fragment
->CountRows(predicate
, options
));
250 predicate
= and_(less_equal(field_ref("i64"), literal(i
)),
251 greater_equal(field_ref("i64"), literal(i
)));
252 ASSERT_OK_AND_ASSIGN(predicate
, predicate
.Bind(*reader
->schema()));
253 ASSERT_FINISHES_OK_AND_EQ(util::make_optional
<int64_t>(i
),
254 fragment
->CountRows(predicate
, options
));
256 predicate
= equal(field_ref("i64"), literal(i
));
257 ASSERT_OK_AND_ASSIGN(predicate
, predicate
.Bind(*reader
->schema()));
258 ASSERT_FINISHES_OK_AND_EQ(util::make_optional
<int64_t>(i
),
259 fragment
->CountRows(predicate
, options
));
262 // Ensure nulls are properly handled
264 auto dataset_schema
= schema({field("i64", int64())});
265 auto null_batch
= RecordBatchFromJSON(dataset_schema
, R
"([
270 auto batch
= RecordBatchFromJSON(dataset_schema
, R
"([
274 ASSERT_OK_AND_ASSIGN(auto reader
,
275 RecordBatchReader::Make({null_batch
, batch
}, dataset_schema
));
276 auto source
= GetFileSource(reader
.get());
277 auto fragment
= MakeFragment(*source
);
278 ASSERT_OK_AND_ASSIGN(
280 greater_equal(field_ref("i64"), literal(1)).Bind(*dataset_schema
));
281 ASSERT_FINISHES_OK_AND_EQ(util::make_optional
<int64_t>(2),
282 fragment
->CountRows(predicate
, options
));
283 // TODO(ARROW-12659): SimplifyWithGuarantee can't handle
284 // not(is_null) so trying to count with is_null doesn't work
288 TEST_F(TestParquetFileFormat
, MultithreadedScan
) {
289 constexpr int64_t kNumRowGroups
= 16;
291 // See PredicatePushdown test below for a description of the generated data
292 auto reader
= ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups
);
293 auto source
= GetFileSource(reader
.get());
294 auto options
= std::make_shared
<ScanOptions
>();
296 auto fragment
= MakeFragment(*source
);
298 FragmentDataset
dataset(ArithmeticDatasetFixture::schema(), {fragment
});
299 ScannerBuilder
builder({&dataset
, [](...) {}});
301 ASSERT_OK(builder
.UseAsync(true));
302 ASSERT_OK(builder
.UseThreads(true));
303 ASSERT_OK(builder
.Project({call("add", {field_ref("i64"), literal(3)})}, {""}));
304 ASSERT_OK_AND_ASSIGN(auto scanner
, builder
.Finish());
306 ASSERT_OK_AND_ASSIGN(auto gen
, scanner
->ScanBatchesUnorderedAsync());
308 auto collect_fut
= CollectAsyncGenerator(gen
);
309 ASSERT_OK_AND_ASSIGN(auto batches
, collect_fut
.result());
311 ASSERT_EQ(batches
.size(), kNumRowGroups
);
314 class TestParquetFileSystemDataset
: public WriteFileSystemDatasetMixin
,
315 public testing::Test
{
317 void SetUp() override
{
319 check_metadata_
= false;
320 auto parquet_format
= std::make_shared
<ParquetFileFormat
>();
321 format_
= parquet_format
;
322 SetWriteOptions(parquet_format
->DefaultWriteOptions());
326 TEST_F(TestParquetFileSystemDataset
, WriteWithIdenticalPartitioningSchema
) {
327 TestWriteWithIdenticalPartitioningSchema();
330 TEST_F(TestParquetFileSystemDataset
, WriteWithUnrelatedPartitioningSchema
) {
331 TestWriteWithUnrelatedPartitioningSchema();
334 TEST_F(TestParquetFileSystemDataset
, WriteWithSupersetPartitioningSchema
) {
335 TestWriteWithSupersetPartitioningSchema();
338 TEST_F(TestParquetFileSystemDataset
, WriteWithEmptyPartitioningSchema
) {
339 TestWriteWithEmptyPartitioningSchema();
342 class TestParquetFileFormatScan
: public FileFormatScanMixin
<ParquetFormatHelper
> {
344 std::shared_ptr
<RecordBatch
> SingleBatch(std::shared_ptr
<Fragment
> fragment
) {
345 auto batches
= IteratorToVector(PhysicalBatches(fragment
));
346 EXPECT_EQ(batches
.size(), 1);
347 return batches
.front();
350 void CountRowsAndBatchesInScan(std::shared_ptr
<Fragment
> fragment
,
351 int64_t expected_rows
, int64_t expected_batches
) {
352 int64_t actual_rows
= 0;
353 int64_t actual_batches
= 0;
355 for (auto maybe_batch
: PhysicalBatches(fragment
)) {
356 ASSERT_OK_AND_ASSIGN(auto batch
, maybe_batch
);
357 actual_rows
+= batch
->num_rows();
361 EXPECT_EQ(actual_rows
, expected_rows
);
362 EXPECT_EQ(actual_batches
, expected_batches
);
365 void CountRowGroupsInFragment(const std::shared_ptr
<Fragment
>& fragment
,
366 std::vector
<int> expected_row_groups
,
367 compute::Expression filter
) {
370 auto parquet_fragment
= checked_pointer_cast
<ParquetFileFragment
>(fragment
);
371 ASSERT_OK_AND_ASSIGN(auto fragments
, parquet_fragment
->SplitByRowGroup(opts_
->filter
))
373 EXPECT_EQ(fragments
.size(), expected_row_groups
.size());
374 for (size_t i
= 0; i
< fragments
.size(); i
++) {
375 auto expected
= expected_row_groups
[i
];
376 auto parquet_fragment
= checked_pointer_cast
<ParquetFileFragment
>(fragments
[i
]);
378 EXPECT_EQ(parquet_fragment
->row_groups(), std::vector
<int>{expected
});
379 EXPECT_EQ(SingleBatch(parquet_fragment
)->num_rows(), expected
+ 1);
384 TEST_P(TestParquetFileFormatScan
, ScanRecordBatchReader
) { TestScan(); }
385 TEST_P(TestParquetFileFormatScan
, ScanBatchSize
) { TestScanBatchSize(); }
386 TEST_P(TestParquetFileFormatScan
, ScanRecordBatchReaderProjected
) { TestScanProjected(); }
387 TEST_P(TestParquetFileFormatScan
, ScanRecordBatchReaderProjectedMissingCols
) {
388 TestScanProjectedMissingCols();
390 TEST_P(TestParquetFileFormatScan
, ScanRecordBatchReaderDictEncoded
) {
391 auto reader
= GetRecordBatchReader(schema({field("utf8", utf8())}));
392 auto source
= GetFileSource(reader
.get());
394 SetSchema(reader
->schema()->fields());
395 SetFilter(literal(true));
396 format_
->reader_options
.dict_columns
= {"utf8"};
397 ASSERT_OK_AND_ASSIGN(auto fragment
, format_
->MakeFragment(*source
));
399 int64_t row_count
= 0;
400 Schema
expected_schema({field("utf8", dictionary(int32(), utf8()))});
402 for (auto maybe_batch
: PhysicalBatches(fragment
)) {
403 ASSERT_OK_AND_ASSIGN(auto batch
, maybe_batch
);
404 row_count
+= batch
->num_rows();
405 AssertSchemaEqual(*batch
->schema(), expected_schema
, /* check_metadata = */ false);
407 ASSERT_EQ(row_count
, expected_rows());
409 TEST_P(TestParquetFileFormatScan
, ScanRecordBatchReaderPreBuffer
) {
410 auto reader
= GetRecordBatchReader(schema({field("f64", float64())}));
411 auto source
= GetFileSource(reader
.get());
413 SetSchema(reader
->schema()->fields());
414 SetFilter(literal(true));
416 ASSERT_OK_AND_ASSIGN(auto fragment
, format_
->MakeFragment(*source
));
417 auto fragment_scan_options
= std::make_shared
<ParquetFragmentScanOptions
>();
418 fragment_scan_options
->arrow_reader_properties
->set_pre_buffer(true);
419 opts_
->fragment_scan_options
= fragment_scan_options
;
420 ASSERT_OK_AND_ASSIGN(auto scan_task_it
, fragment
->Scan(opts_
));
422 int64_t row_count
= 0;
423 for (auto maybe_batch
: PhysicalBatches(fragment
)) {
424 ASSERT_OK_AND_ASSIGN(auto batch
, maybe_batch
);
425 row_count
+= batch
->num_rows();
427 ASSERT_EQ(row_count
, expected_rows());
429 TEST_P(TestParquetFileFormatScan
, PredicatePushdown
) {
430 // Given a number `n`, the arithmetic dataset creates n RecordBatches where
431 // each RecordBatch is keyed by a unique integer in [1, n]. Let `rb_i` denote
432 // the record batch keyed by `i`. `rb_i` is composed of `i` rows where all
433 // values are a variant of `i`, e.g. {"i64": i, "u8": i, ... }.
435 // Thus the ArithmeticDataset(n) has n RecordBatches and the total number of
438 // This test uses the Fragment directly, and so no post-filtering is
439 // applied via ScanOptions' evaluator. Thus, counting the number of returned
440 // rows and returned row groups is a good enough proxy to check if pushdown
441 // predicate is working.
443 constexpr int64_t kNumRowGroups
= 16;
444 constexpr int64_t kTotalNumRows
= kNumRowGroups
* (kNumRowGroups
+ 1) / 2;
446 auto reader
= ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups
);
447 auto source
= GetFileSource(reader
.get());
449 SetSchema(reader
->schema()->fields());
450 ASSERT_OK_AND_ASSIGN(auto fragment
, format_
->MakeFragment(*source
));
452 SetFilter(literal(true));
453 CountRowsAndBatchesInScan(fragment
, kTotalNumRows
, kNumRowGroups
);
455 for (int64_t i
= 1; i
<= kNumRowGroups
; i
++) {
456 SetFilter(equal(field_ref("i64"), literal(i
)));
457 CountRowsAndBatchesInScan(fragment
, i
, 1);
460 // Out of bound filters should skip all RowGroups.
461 SetFilter(literal(false));
462 CountRowsAndBatchesInScan(fragment
, 0, 0);
463 SetFilter(equal(field_ref("i64"), literal
<int64_t>(kNumRowGroups
+ 1)));
464 CountRowsAndBatchesInScan(fragment
, 0, 0);
465 SetFilter(equal(field_ref("i64"), literal
<int64_t>(-1)));
466 CountRowsAndBatchesInScan(fragment
, 0, 0);
467 // No rows match 1 and 2.
468 SetFilter(and_(equal(field_ref("i64"), literal
<int64_t>(1)),
469 equal(field_ref("u8"), literal
<uint8_t>(2))));
470 CountRowsAndBatchesInScan(fragment
, 0, 0);
472 SetFilter(or_(equal(field_ref("i64"), literal
<int64_t>(2)),
473 equal(field_ref("i64"), literal
<int64_t>(4))));
474 CountRowsAndBatchesInScan(fragment
, 2 + 4, 2);
476 SetFilter(less(field_ref("i64"), literal
<int64_t>(6)));
477 CountRowsAndBatchesInScan(fragment
, 5 * (5 + 1) / 2, 5);
479 SetFilter(greater_equal(field_ref("i64"), literal
<int64_t>(6)));
480 CountRowsAndBatchesInScan(fragment
, kTotalNumRows
- (5 * (5 + 1) / 2),
484 TEST_P(TestParquetFileFormatScan
, PredicatePushdownRowGroupFragments
) {
485 constexpr int64_t kNumRowGroups
= 16;
487 auto reader
= ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups
);
488 auto source
= GetFileSource(reader
.get());
490 SetSchema(reader
->schema()->fields());
491 ASSERT_OK_AND_ASSIGN(auto fragment
, format_
->MakeFragment(*source
));
493 auto all_row_groups
= ::arrow::internal::Iota(static_cast<int>(kNumRowGroups
));
494 CountRowGroupsInFragment(fragment
, all_row_groups
, literal(true));
496 for (int i
= 0; i
< kNumRowGroups
; ++i
) {
497 CountRowGroupsInFragment(fragment
, {i
}, equal(field_ref("i64"), literal(i
+ 1)));
500 // Out of bound filters should skip all RowGroups.
501 CountRowGroupsInFragment(fragment
, {}, literal(false));
502 CountRowGroupsInFragment(fragment
, {},
503 equal(field_ref("i64"), literal(kNumRowGroups
+ 1)));
504 CountRowGroupsInFragment(fragment
, {}, equal(field_ref("i64"), literal(-1)));
506 // No rows match 1 and 2.
507 CountRowGroupsInFragment(
509 and_(equal(field_ref("i64"), literal(1)), equal(field_ref("u8"), literal(2))));
510 CountRowGroupsInFragment(
512 and_(equal(field_ref("i64"), literal(2)), equal(field_ref("i64"), literal(4))));
514 CountRowGroupsInFragment(
516 or_(equal(field_ref("i64"), literal(2)), equal(field_ref("i64"), literal(4))));
518 auto set
= ArrayFromJSON(int64(), "[2, 4]");
519 CountRowGroupsInFragment(
521 call("is_in", {field_ref("i64")}, compute::SetLookupOptions
{set
}));
523 CountRowGroupsInFragment(fragment
, {0, 1, 2, 3, 4}, less(field_ref("i64"), literal(6)));
525 CountRowGroupsInFragment(fragment
,
526 ::arrow::internal::Iota(5, static_cast<int>(kNumRowGroups
)),
527 greater_equal(field_ref("i64"), literal(6)));
529 CountRowGroupsInFragment(fragment
, {5, 6},
530 and_(greater_equal(field_ref("i64"), literal(6)),
531 less(field_ref("i64"), literal(8))));
534 TEST_P(TestParquetFileFormatScan
, ExplicitRowGroupSelection
) {
535 constexpr int64_t kNumRowGroups
= 16;
536 constexpr int64_t kTotalNumRows
= kNumRowGroups
* (kNumRowGroups
+ 1) / 2;
538 auto reader
= ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups
);
539 auto source
= GetFileSource(reader
.get());
541 SetSchema(reader
->schema()->fields());
542 SetFilter(literal(true));
544 auto row_groups_fragment
= [&](std::vector
<int> row_groups
) {
545 EXPECT_OK_AND_ASSIGN(auto fragment
,
546 format_
->MakeFragment(*source
, literal(true),
547 /*physical_schema=*/nullptr, row_groups
));
551 // select all row groups
552 EXPECT_OK_AND_ASSIGN(auto all_row_groups_fragment
,
553 format_
->MakeFragment(*source
, literal(true))
554 .Map([](std::shared_ptr
<FileFragment
> f
) {
555 return checked_pointer_cast
<ParquetFileFragment
>(f
);
558 EXPECT_EQ(all_row_groups_fragment
->row_groups(), std::vector
<int>{});
560 ARROW_EXPECT_OK(all_row_groups_fragment
->EnsureCompleteMetadata());
561 CountRowsAndBatchesInScan(all_row_groups_fragment
, kTotalNumRows
, kNumRowGroups
);
563 // individual selection selects a single row group
564 for (int i
= 0; i
< kNumRowGroups
; ++i
) {
565 CountRowsAndBatchesInScan(row_groups_fragment({i
}), i
+ 1, 1);
566 EXPECT_EQ(row_groups_fragment({i
})->row_groups(), std::vector
<int>{i
});
569 for (int i
= 0; i
< kNumRowGroups
; ++i
) {
570 // conflicting selection/filter
571 SetFilter(equal(field_ref("i64"), literal(i
)));
572 CountRowsAndBatchesInScan(row_groups_fragment({i
}), 0, 0);
575 for (int i
= 0; i
< kNumRowGroups
; ++i
) {
576 // identical selection/filter
577 SetFilter(equal(field_ref("i64"), literal(i
+ 1)));
578 CountRowsAndBatchesInScan(row_groups_fragment({i
}), i
+ 1, 1);
581 SetFilter(greater(field_ref("i64"), literal(3)));
582 CountRowsAndBatchesInScan(row_groups_fragment({2, 3, 4, 5}), 4 + 5 + 6, 3);
584 EXPECT_RAISES_WITH_MESSAGE_THAT(
586 testing::HasSubstr("only has " + std::to_string(kNumRowGroups
) + " row groups"),
587 row_groups_fragment({kNumRowGroups
+ 1})->Scan(opts_
));
590 TEST_P(TestParquetFileFormatScan
, PredicatePushdownRowGroupFragmentsUsingStringColumn
) {
591 auto table
= TableFromJSON(schema({field("x", utf8())}),
594 R
"([{"x
": "b
"}, {"x
": "b
"}])",
595 R
"([{"x
": "c
"}, {"x
": "c
"}, {"x
": "c
"}])",
596 R
"([{"x
": "a
"}, {"x
": "b
"}, {"x
": "c
"}, {"x
": "d
"}])",
598 TableBatchReader
reader(*table
);
599 auto source
= GetFileSource(&reader
);
601 SetSchema(reader
.schema()->fields());
602 ASSERT_OK_AND_ASSIGN(auto fragment
, format_
->MakeFragment(*source
));
604 CountRowGroupsInFragment(fragment
, {0, 3}, equal(field_ref("x"), literal("a")));
607 INSTANTIATE_TEST_SUITE_P(TestScan
, TestParquetFileFormatScan
,
608 ::testing::ValuesIn(TestFormatParams::Values()),
609 TestFormatParams::ToTestNameString
);
611 } // namespace dataset