1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
23 #include <gmock/gmock.h>
24 #include <gtest/gtest.h>
26 #include "arrow/dataset/api.h"
27 #include "arrow/dataset/partition.h"
28 #include "arrow/dataset/test_util.h"
29 #include "arrow/filesystem/path_util.h"
30 #include "arrow/filesystem/test_util.h"
31 #include "arrow/status.h"
32 #include "arrow/testing/future_util.h"
33 #include "arrow/testing/gtest_util.h"
34 #include "arrow/util/io_util.h"
38 using internal::TemporaryDir
;
42 using fs::internal::GetAbstractPathExtension
;
43 using testing::ContainerEq
;
45 TEST(FileSource
, PathBased
) {
46 auto localfs
= std::make_shared
<fs::LocalFileSystem
>();
48 std::string p1
= "/path/to/file.ext";
49 std::string p2
= "/path/to/file.ext.gz";
51 FileSource
source1(p1
, localfs
);
52 FileSource
source2(p2
, localfs
, Compression::GZIP
);
54 ASSERT_EQ(p1
, source1
.path());
55 ASSERT_TRUE(localfs
->Equals(*source1
.filesystem()));
56 ASSERT_EQ(Compression::UNCOMPRESSED
, source1
.compression());
58 ASSERT_EQ(p2
, source2
.path());
59 ASSERT_TRUE(localfs
->Equals(*source2
.filesystem()));
60 ASSERT_EQ(Compression::GZIP
, source2
.compression());
62 // Test copy constructor and comparison
65 ASSERT_EQ(source1
.path(), source3
.path());
66 ASSERT_EQ(source1
.filesystem(), source3
.filesystem());
69 TEST(FileSource
, BufferBased
) {
70 std::string the_data
= "this is the file contents";
71 auto buf
= std::make_shared
<Buffer
>(the_data
);
73 FileSource
source1(buf
);
74 FileSource
source2(buf
, Compression::LZ4
);
76 ASSERT_TRUE(source1
.buffer()->Equals(*buf
));
77 ASSERT_EQ(Compression::UNCOMPRESSED
, source1
.compression());
79 ASSERT_TRUE(source2
.buffer()->Equals(*buf
));
80 ASSERT_EQ(Compression::LZ4
, source2
.compression());
84 ASSERT_EQ(source1
.buffer(), source3
.buffer());
87 constexpr int kNumScanTasks
= 2;
88 constexpr int kBatchesPerScanTask
= 2;
89 constexpr int kRowsPerBatch
= 1024;
90 class MockFileFormat
: public FileFormat
{
91 std::string
type_name() const override
{ return "mock"; }
92 bool Equals(const FileFormat
& other
) const override
{ return false; }
93 Result
<bool> IsSupported(const FileSource
& source
) const override
{ return true; }
94 Result
<std::shared_ptr
<Schema
>> Inspect(const FileSource
& source
) const override
{
95 return Status::NotImplemented("Not needed for test");
97 Result
<std::shared_ptr
<FileWriter
>> MakeWriter(
98 std::shared_ptr
<io::OutputStream
> destination
, std::shared_ptr
<Schema
> schema
,
99 std::shared_ptr
<FileWriteOptions
> options
,
100 fs::FileLocator destination_locator
) const override
{
101 return Status::NotImplemented("Not needed for test");
103 std::shared_ptr
<FileWriteOptions
> DefaultWriteOptions() override
{ return nullptr; }
105 Result
<ScanTaskIterator
> ScanFile(
106 const std::shared_ptr
<ScanOptions
>& options
,
107 const std::shared_ptr
<FileFragment
>& file
) const override
{
108 auto sch
= schema({field("i32", int32())});
109 ScanTaskVector scan_tasks
;
110 for (int i
= 0; i
< kNumScanTasks
; i
++) {
111 RecordBatchVector batches
;
112 for (int j
= 0; j
< kBatchesPerScanTask
; j
++) {
113 batches
.push_back(ConstantArrayGenerator::Zeroes(kRowsPerBatch
, sch
));
115 scan_tasks
.push_back(std::make_shared
<InMemoryScanTask
>(
116 std::move(batches
), std::make_shared
<ScanOptions
>(), nullptr));
118 return MakeVectorIterator(std::move(scan_tasks
));
122 TEST(FileFormat
, ScanAsync
) {
123 MockFileFormat format
;
124 auto scan_options
= std::make_shared
<ScanOptions
>();
125 ASSERT_OK_AND_ASSIGN(auto batch_gen
, format
.ScanBatchesAsync(scan_options
, nullptr));
126 ASSERT_FINISHES_OK_AND_ASSIGN(auto batches
, CollectAsyncGenerator(batch_gen
));
127 ASSERT_EQ(kNumScanTasks
* kBatchesPerScanTask
, static_cast<int>(batches
.size()));
128 for (int i
= 0; i
< kNumScanTasks
* kBatchesPerScanTask
; i
++) {
129 ASSERT_EQ(kRowsPerBatch
, batches
[i
]->num_rows());
133 TEST_F(TestFileSystemDataset
, Basic
) {
135 AssertFragmentsAreFromPath(*dataset_
->GetFragments(), {});
137 MakeDataset({fs::File("a"), fs::File("b"), fs::File("c")});
138 AssertFragmentsAreFromPath(*dataset_
->GetFragments(), {"a", "b", "c"});
139 AssertFilesAre(dataset_
, {"a", "b", "c"});
141 // Should not create fragment from directories.
142 MakeDataset({fs::Dir("A"), fs::Dir("A/B"), fs::File("A/a"), fs::File("A/B/b")});
143 AssertFragmentsAreFromPath(*dataset_
->GetFragments(), {"A/a", "A/B/b"});
144 AssertFilesAre(dataset_
, {"A/a", "A/B/b"});
147 TEST_F(TestFileSystemDataset
, ReplaceSchema
) {
148 auto schm
= schema({field("i32", int32()), field("f64", float64())});
149 auto format
= std::make_shared
<DummyFileFormat
>(schm
);
150 ASSERT_OK_AND_ASSIGN(auto dataset
,
151 FileSystemDataset::Make(schm
, literal(true), format
, nullptr, {}));
154 ASSERT_OK(dataset
->ReplaceSchema(schema({field("i32", int32())})).status());
155 // add nullable field (will be materialized as null during projection)
156 ASSERT_OK(dataset
->ReplaceSchema(schema({field("str", utf8())})).status());
158 ASSERT_RAISES(TypeError
,
159 dataset
->ReplaceSchema(schema({field("i32", utf8())})).status());
160 // incompatible nullability
163 dataset
->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
165 // add non-nullable field
166 ASSERT_RAISES(TypeError
,
167 dataset
->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
171 TEST_F(TestFileSystemDataset
, RootPartitionPruning
) {
172 auto root_partition
= equal(field_ref("i32"), literal(5));
173 MakeDataset({fs::File("a"), fs::File("b")}, root_partition
, {},
174 schema({field("i32", int32()), field("f32", float32())}));
176 auto GetFragments
= [&](compute::Expression filter
) {
177 return *dataset_
->GetFragments(*filter
.Bind(*dataset_
->schema()));
180 // Default filter should always return all data.
181 AssertFragmentsAreFromPath(*dataset_
->GetFragments(), {"a", "b"});
183 // filter == partition
184 AssertFragmentsAreFromPath(GetFragments(root_partition
), {"a", "b"});
186 // Same partition key, but non matching filter
187 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("i32"), literal(6))), {});
189 AssertFragmentsAreFromPath(GetFragments(greater(field_ref("i32"), literal(1))),
192 // different key shouldn't prune
193 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("f32"), literal(3.F
))),
196 // No root partition: don't prune any fragments
197 MakeDataset({fs::File("a"), fs::File("b")}, literal(true), {},
198 schema({field("i32", int32()), field("f32", float32())}));
199 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("f32"), literal(3.F
))),
203 TEST_F(TestFileSystemDataset
, TreePartitionPruning
) {
204 auto root_partition
= equal(field_ref("country"), literal("US"));
206 std::vector
<fs::FileInfo
> regions
= {
207 fs::Dir("NY"), fs::File("NY/New York"), fs::File("NY/Franklin"),
208 fs::Dir("CA"), fs::File("CA/San Francisco"), fs::File("CA/Franklin"),
211 std::vector
<compute::Expression
> partitions
= {
212 equal(field_ref("state"), literal("NY")),
214 and_(equal(field_ref("state"), literal("NY")),
215 equal(field_ref("city"), literal("New York"))),
217 and_(equal(field_ref("state"), literal("NY")),
218 equal(field_ref("city"), literal("Franklin"))),
220 equal(field_ref("state"), literal("CA")),
222 and_(equal(field_ref("state"), literal("CA")),
223 equal(field_ref("city"), literal("San Francisco"))),
225 and_(equal(field_ref("state"), literal("CA")),
226 equal(field_ref("city"), literal("Franklin"))),
230 regions
, root_partition
, partitions
,
231 schema({field("country", utf8()), field("state", utf8()), field("city", utf8())}));
233 std::vector
<std::string
> all_cities
= {"CA/San Francisco", "CA/Franklin", "NY/New York",
235 std::vector
<std::string
> ca_cities
= {"CA/San Francisco", "CA/Franklin"};
236 std::vector
<std::string
> franklins
= {"CA/Franklin", "NY/Franklin"};
238 // Default filter should always return all data.
239 AssertFragmentsAreFromPath(*dataset_
->GetFragments(), all_cities
);
241 auto GetFragments
= [&](compute::Expression filter
) {
242 return *dataset_
->GetFragments(*filter
.Bind(*dataset_
->schema()));
245 // Dataset's partitions are respected
246 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("country"), literal("US"))),
248 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("country"), literal("FR"))),
251 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("state"), literal("CA"))),
254 // Filter where no decisions can be made on inner nodes when filter don't
255 // apply to inner partitions.
256 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("city"), literal("Franklin"))),
260 TEST_F(TestFileSystemDataset
, FragmentPartitions
) {
261 auto root_partition
= equal(field_ref("country"), literal("US"));
262 std::vector
<fs::FileInfo
> regions
= {
263 fs::Dir("NY"), fs::File("NY/New York"), fs::File("NY/Franklin"),
264 fs::Dir("CA"), fs::File("CA/San Francisco"), fs::File("CA/Franklin"),
267 std::vector
<compute::Expression
> partitions
= {
268 equal(field_ref("state"), literal("NY")),
270 and_(equal(field_ref("state"), literal("NY")),
271 equal(field_ref("city"), literal("New York"))),
273 and_(equal(field_ref("state"), literal("NY")),
274 equal(field_ref("city"), literal("Franklin"))),
276 equal(field_ref("state"), literal("CA")),
278 and_(equal(field_ref("state"), literal("CA")),
279 equal(field_ref("city"), literal("San Francisco"))),
281 and_(equal(field_ref("state"), literal("CA")),
282 equal(field_ref("city"), literal("Franklin"))),
286 regions
, root_partition
, partitions
,
287 schema({field("country", utf8()), field("state", utf8()), field("city", utf8())}));
289 AssertFragmentsHavePartitionExpressions(
291 and_(equal(field_ref("state"), literal("CA")),
292 equal(field_ref("city"), literal("San Francisco"))),
293 and_(equal(field_ref("state"), literal("CA")),
294 equal(field_ref("city"), literal("Franklin"))),
295 and_(equal(field_ref("state"), literal("NY")),
296 equal(field_ref("city"), literal("New York"))),
297 and_(equal(field_ref("state"), literal("NY")),
298 equal(field_ref("city"), literal("Franklin"))),
302 TEST_F(TestFileSystemDataset
, WriteProjected
) {
303 // Regression test for ARROW-12620
304 auto format
= std::make_shared
<IpcFileFormat
>();
305 auto fs
= std::make_shared
<fs::internal::MockFileSystem
>(fs::kNoTime
);
306 FileSystemDatasetWriteOptions write_options
;
307 write_options
.file_write_options
= format
->DefaultWriteOptions();
308 write_options
.filesystem
= fs
;
309 write_options
.base_dir
= "root";
310 write_options
.partitioning
= std::make_shared
<HivePartitioning
>(schema({}));
311 write_options
.basename_template
= "{i}.feather";
313 auto dataset_schema
= schema({field("a", int64())});
314 RecordBatchVector batches
{
315 ConstantArrayGenerator::Zeroes(kRowsPerBatch
, dataset_schema
)};
316 ASSERT_EQ(0, batches
[0]->column(0)->null_count());
317 auto dataset
= std::make_shared
<InMemoryDataset
>(dataset_schema
, batches
);
318 ASSERT_OK_AND_ASSIGN(auto scanner_builder
, dataset
->NewScan());
319 ASSERT_OK(scanner_builder
->Project(
320 {compute::call("add", {compute::field_ref("a"), compute::literal(1)})},
322 ASSERT_OK(scanner_builder
->UseAsync(true));
323 ASSERT_OK_AND_ASSIGN(auto scanner
, scanner_builder
->Finish());
325 ASSERT_OK(FileSystemDataset::Write(write_options
, scanner
));
327 ASSERT_OK_AND_ASSIGN(auto dataset_factory
, FileSystemDatasetFactory::Make(
328 fs
, {"root/0.feather"}, format
, {}));
329 ASSERT_OK_AND_ASSIGN(auto written_dataset
, dataset_factory
->Finish(FinishOptions
{}));
330 auto expected_schema
= schema({field("a_plus_one", int64())});
331 AssertSchemaEqual(*expected_schema
, *written_dataset
->schema());
332 ASSERT_OK_AND_ASSIGN(scanner_builder
, written_dataset
->NewScan());
333 ASSERT_OK_AND_ASSIGN(scanner
, scanner_builder
->Finish());
334 ASSERT_OK_AND_ASSIGN(auto table
, scanner
->ToTable());
335 auto col
= table
->column(0);
336 ASSERT_EQ(0, col
->null_count());
337 for (auto chunk
: col
->chunks()) {
338 auto arr
= std::dynamic_pointer_cast
<Int64Array
>(chunk
);
339 for (auto val
: *arr
) {
340 ASSERT_TRUE(val
.has_value());
345 } // namespace dataset