]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/dataset/file_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / file_test.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <cstdint>
19 #include <memory>
20 #include <string>
21 #include <vector>
22
23 #include <gmock/gmock.h>
24 #include <gtest/gtest.h>
25
26 #include "arrow/dataset/api.h"
27 #include "arrow/dataset/partition.h"
28 #include "arrow/dataset/test_util.h"
29 #include "arrow/filesystem/path_util.h"
30 #include "arrow/filesystem/test_util.h"
31 #include "arrow/status.h"
32 #include "arrow/testing/future_util.h"
33 #include "arrow/testing/gtest_util.h"
34 #include "arrow/util/io_util.h"
35
36 namespace arrow {
37
38 using internal::TemporaryDir;
39
40 namespace dataset {
41
42 using fs::internal::GetAbstractPathExtension;
43 using testing::ContainerEq;
44
45 TEST(FileSource, PathBased) {
46 auto localfs = std::make_shared<fs::LocalFileSystem>();
47
48 std::string p1 = "/path/to/file.ext";
49 std::string p2 = "/path/to/file.ext.gz";
50
51 FileSource source1(p1, localfs);
52 FileSource source2(p2, localfs, Compression::GZIP);
53
54 ASSERT_EQ(p1, source1.path());
55 ASSERT_TRUE(localfs->Equals(*source1.filesystem()));
56 ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());
57
58 ASSERT_EQ(p2, source2.path());
59 ASSERT_TRUE(localfs->Equals(*source2.filesystem()));
60 ASSERT_EQ(Compression::GZIP, source2.compression());
61
62 // Test copy constructor and comparison
63 FileSource source3;
64 source3 = source1;
65 ASSERT_EQ(source1.path(), source3.path());
66 ASSERT_EQ(source1.filesystem(), source3.filesystem());
67 }
68
69 TEST(FileSource, BufferBased) {
70 std::string the_data = "this is the file contents";
71 auto buf = std::make_shared<Buffer>(the_data);
72
73 FileSource source1(buf);
74 FileSource source2(buf, Compression::LZ4);
75
76 ASSERT_TRUE(source1.buffer()->Equals(*buf));
77 ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());
78
79 ASSERT_TRUE(source2.buffer()->Equals(*buf));
80 ASSERT_EQ(Compression::LZ4, source2.compression());
81
82 FileSource source3;
83 source3 = source1;
84 ASSERT_EQ(source1.buffer(), source3.buffer());
85 }
86
87 constexpr int kNumScanTasks = 2;
88 constexpr int kBatchesPerScanTask = 2;
89 constexpr int kRowsPerBatch = 1024;
90 class MockFileFormat : public FileFormat {
91 std::string type_name() const override { return "mock"; }
92 bool Equals(const FileFormat& other) const override { return false; }
93 Result<bool> IsSupported(const FileSource& source) const override { return true; }
94 Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
95 return Status::NotImplemented("Not needed for test");
96 }
97 Result<std::shared_ptr<FileWriter>> MakeWriter(
98 std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
99 std::shared_ptr<FileWriteOptions> options,
100 fs::FileLocator destination_locator) const override {
101 return Status::NotImplemented("Not needed for test");
102 }
103 std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return nullptr; }
104
105 Result<ScanTaskIterator> ScanFile(
106 const std::shared_ptr<ScanOptions>& options,
107 const std::shared_ptr<FileFragment>& file) const override {
108 auto sch = schema({field("i32", int32())});
109 ScanTaskVector scan_tasks;
110 for (int i = 0; i < kNumScanTasks; i++) {
111 RecordBatchVector batches;
112 for (int j = 0; j < kBatchesPerScanTask; j++) {
113 batches.push_back(ConstantArrayGenerator::Zeroes(kRowsPerBatch, sch));
114 }
115 scan_tasks.push_back(std::make_shared<InMemoryScanTask>(
116 std::move(batches), std::make_shared<ScanOptions>(), nullptr));
117 }
118 return MakeVectorIterator(std::move(scan_tasks));
119 }
120 };
121
122 TEST(FileFormat, ScanAsync) {
123 MockFileFormat format;
124 auto scan_options = std::make_shared<ScanOptions>();
125 ASSERT_OK_AND_ASSIGN(auto batch_gen, format.ScanBatchesAsync(scan_options, nullptr));
126 ASSERT_FINISHES_OK_AND_ASSIGN(auto batches, CollectAsyncGenerator(batch_gen));
127 ASSERT_EQ(kNumScanTasks * kBatchesPerScanTask, static_cast<int>(batches.size()));
128 for (int i = 0; i < kNumScanTasks * kBatchesPerScanTask; i++) {
129 ASSERT_EQ(kRowsPerBatch, batches[i]->num_rows());
130 }
131 }
132
133 TEST_F(TestFileSystemDataset, Basic) {
134 MakeDataset({});
135 AssertFragmentsAreFromPath(*dataset_->GetFragments(), {});
136
137 MakeDataset({fs::File("a"), fs::File("b"), fs::File("c")});
138 AssertFragmentsAreFromPath(*dataset_->GetFragments(), {"a", "b", "c"});
139 AssertFilesAre(dataset_, {"a", "b", "c"});
140
141 // Should not create fragment from directories.
142 MakeDataset({fs::Dir("A"), fs::Dir("A/B"), fs::File("A/a"), fs::File("A/B/b")});
143 AssertFragmentsAreFromPath(*dataset_->GetFragments(), {"A/a", "A/B/b"});
144 AssertFilesAre(dataset_, {"A/a", "A/B/b"});
145 }
146
147 TEST_F(TestFileSystemDataset, ReplaceSchema) {
148 auto schm = schema({field("i32", int32()), field("f64", float64())});
149 auto format = std::make_shared<DummyFileFormat>(schm);
150 ASSERT_OK_AND_ASSIGN(auto dataset,
151 FileSystemDataset::Make(schm, literal(true), format, nullptr, {}));
152
153 // drop field
154 ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
155 // add nullable field (will be materialized as null during projection)
156 ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
157 // incompatible type
158 ASSERT_RAISES(TypeError,
159 dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
160 // incompatible nullability
161 ASSERT_RAISES(
162 TypeError,
163 dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
164 .status());
165 // add non-nullable field
166 ASSERT_RAISES(TypeError,
167 dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
168 .status());
169 }
170
171 TEST_F(TestFileSystemDataset, RootPartitionPruning) {
172 auto root_partition = equal(field_ref("i32"), literal(5));
173 MakeDataset({fs::File("a"), fs::File("b")}, root_partition, {},
174 schema({field("i32", int32()), field("f32", float32())}));
175
176 auto GetFragments = [&](compute::Expression filter) {
177 return *dataset_->GetFragments(*filter.Bind(*dataset_->schema()));
178 };
179
180 // Default filter should always return all data.
181 AssertFragmentsAreFromPath(*dataset_->GetFragments(), {"a", "b"});
182
183 // filter == partition
184 AssertFragmentsAreFromPath(GetFragments(root_partition), {"a", "b"});
185
186 // Same partition key, but non matching filter
187 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("i32"), literal(6))), {});
188
189 AssertFragmentsAreFromPath(GetFragments(greater(field_ref("i32"), literal(1))),
190 {"a", "b"});
191
192 // different key shouldn't prune
193 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("f32"), literal(3.F))),
194 {"a", "b"});
195
196 // No root partition: don't prune any fragments
197 MakeDataset({fs::File("a"), fs::File("b")}, literal(true), {},
198 schema({field("i32", int32()), field("f32", float32())}));
199 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("f32"), literal(3.F))),
200 {"a", "b"});
201 }
202
203 TEST_F(TestFileSystemDataset, TreePartitionPruning) {
204 auto root_partition = equal(field_ref("country"), literal("US"));
205
206 std::vector<fs::FileInfo> regions = {
207 fs::Dir("NY"), fs::File("NY/New York"), fs::File("NY/Franklin"),
208 fs::Dir("CA"), fs::File("CA/San Francisco"), fs::File("CA/Franklin"),
209 };
210
211 std::vector<compute::Expression> partitions = {
212 equal(field_ref("state"), literal("NY")),
213
214 and_(equal(field_ref("state"), literal("NY")),
215 equal(field_ref("city"), literal("New York"))),
216
217 and_(equal(field_ref("state"), literal("NY")),
218 equal(field_ref("city"), literal("Franklin"))),
219
220 equal(field_ref("state"), literal("CA")),
221
222 and_(equal(field_ref("state"), literal("CA")),
223 equal(field_ref("city"), literal("San Francisco"))),
224
225 and_(equal(field_ref("state"), literal("CA")),
226 equal(field_ref("city"), literal("Franklin"))),
227 };
228
229 MakeDataset(
230 regions, root_partition, partitions,
231 schema({field("country", utf8()), field("state", utf8()), field("city", utf8())}));
232
233 std::vector<std::string> all_cities = {"CA/San Francisco", "CA/Franklin", "NY/New York",
234 "NY/Franklin"};
235 std::vector<std::string> ca_cities = {"CA/San Francisco", "CA/Franklin"};
236 std::vector<std::string> franklins = {"CA/Franklin", "NY/Franklin"};
237
238 // Default filter should always return all data.
239 AssertFragmentsAreFromPath(*dataset_->GetFragments(), all_cities);
240
241 auto GetFragments = [&](compute::Expression filter) {
242 return *dataset_->GetFragments(*filter.Bind(*dataset_->schema()));
243 };
244
245 // Dataset's partitions are respected
246 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("country"), literal("US"))),
247 all_cities);
248 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("country"), literal("FR"))),
249 {});
250
251 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("state"), literal("CA"))),
252 ca_cities);
253
254 // Filter where no decisions can be made on inner nodes when filter don't
255 // apply to inner partitions.
256 AssertFragmentsAreFromPath(GetFragments(equal(field_ref("city"), literal("Franklin"))),
257 franklins);
258 }
259
260 TEST_F(TestFileSystemDataset, FragmentPartitions) {
261 auto root_partition = equal(field_ref("country"), literal("US"));
262 std::vector<fs::FileInfo> regions = {
263 fs::Dir("NY"), fs::File("NY/New York"), fs::File("NY/Franklin"),
264 fs::Dir("CA"), fs::File("CA/San Francisco"), fs::File("CA/Franklin"),
265 };
266
267 std::vector<compute::Expression> partitions = {
268 equal(field_ref("state"), literal("NY")),
269
270 and_(equal(field_ref("state"), literal("NY")),
271 equal(field_ref("city"), literal("New York"))),
272
273 and_(equal(field_ref("state"), literal("NY")),
274 equal(field_ref("city"), literal("Franklin"))),
275
276 equal(field_ref("state"), literal("CA")),
277
278 and_(equal(field_ref("state"), literal("CA")),
279 equal(field_ref("city"), literal("San Francisco"))),
280
281 and_(equal(field_ref("state"), literal("CA")),
282 equal(field_ref("city"), literal("Franklin"))),
283 };
284
285 MakeDataset(
286 regions, root_partition, partitions,
287 schema({field("country", utf8()), field("state", utf8()), field("city", utf8())}));
288
289 AssertFragmentsHavePartitionExpressions(
290 dataset_, {
291 and_(equal(field_ref("state"), literal("CA")),
292 equal(field_ref("city"), literal("San Francisco"))),
293 and_(equal(field_ref("state"), literal("CA")),
294 equal(field_ref("city"), literal("Franklin"))),
295 and_(equal(field_ref("state"), literal("NY")),
296 equal(field_ref("city"), literal("New York"))),
297 and_(equal(field_ref("state"), literal("NY")),
298 equal(field_ref("city"), literal("Franklin"))),
299 });
300 }
301
302 TEST_F(TestFileSystemDataset, WriteProjected) {
303 // Regression test for ARROW-12620
304 auto format = std::make_shared<IpcFileFormat>();
305 auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
306 FileSystemDatasetWriteOptions write_options;
307 write_options.file_write_options = format->DefaultWriteOptions();
308 write_options.filesystem = fs;
309 write_options.base_dir = "root";
310 write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
311 write_options.basename_template = "{i}.feather";
312
313 auto dataset_schema = schema({field("a", int64())});
314 RecordBatchVector batches{
315 ConstantArrayGenerator::Zeroes(kRowsPerBatch, dataset_schema)};
316 ASSERT_EQ(0, batches[0]->column(0)->null_count());
317 auto dataset = std::make_shared<InMemoryDataset>(dataset_schema, batches);
318 ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
319 ASSERT_OK(scanner_builder->Project(
320 {compute::call("add", {compute::field_ref("a"), compute::literal(1)})},
321 {"a_plus_one"}));
322 ASSERT_OK(scanner_builder->UseAsync(true));
323 ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
324
325 ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
326
327 ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
328 fs, {"root/0.feather"}, format, {}));
329 ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{}));
330 auto expected_schema = schema({field("a_plus_one", int64())});
331 AssertSchemaEqual(*expected_schema, *written_dataset->schema());
332 ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
333 ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
334 ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
335 auto col = table->column(0);
336 ASSERT_EQ(0, col->null_count());
337 for (auto chunk : col->chunks()) {
338 auto arr = std::dynamic_pointer_cast<Int64Array>(chunk);
339 for (auto val : *arr) {
340 ASSERT_TRUE(val.has_value());
341 ASSERT_EQ(1, *val);
342 }
343 }
344 }
345 } // namespace dataset
346 } // namespace arrow