]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/pyarrow/tests/parquet/test_dataset.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / tests / parquet / test_dataset.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 import datetime
19 import os
20
21 import numpy as np
22 import pytest
23
24 import pyarrow as pa
25 from pyarrow import fs
26 from pyarrow.filesystem import LocalFileSystem
27 from pyarrow.tests import util
28 from pyarrow.tests.parquet.common import (
29 parametrize_legacy_dataset, parametrize_legacy_dataset_fixed,
30 parametrize_legacy_dataset_not_supported)
31 from pyarrow.util import guid
32 from pyarrow.vendored.version import Version
33
34 try:
35 import pyarrow.parquet as pq
36 from pyarrow.tests.parquet.common import (
37 _read_table, _test_dataframe, _write_table)
38 except ImportError:
39 pq = None
40
41
42 try:
43 import pandas as pd
44 import pandas.testing as tm
45
46 except ImportError:
47 pd = tm = None
48
49 pytestmark = pytest.mark.parquet
50
51
52 @pytest.mark.pandas
53 def test_parquet_piece_read(tempdir):
54 df = _test_dataframe(1000)
55 table = pa.Table.from_pandas(df)
56
57 path = tempdir / 'parquet_piece_read.parquet'
58 _write_table(table, path, version='2.6')
59
60 with pytest.warns(DeprecationWarning):
61 piece1 = pq.ParquetDatasetPiece(path)
62
63 result = piece1.read()
64 assert result.equals(table)
65
66
67 @pytest.mark.pandas
68 def test_parquet_piece_open_and_get_metadata(tempdir):
69 df = _test_dataframe(100)
70 table = pa.Table.from_pandas(df)
71
72 path = tempdir / 'parquet_piece_read.parquet'
73 _write_table(table, path, version='2.6')
74
75 with pytest.warns(DeprecationWarning):
76 piece = pq.ParquetDatasetPiece(path)
77 table1 = piece.read()
78 assert isinstance(table1, pa.Table)
79 meta1 = piece.get_metadata()
80 assert isinstance(meta1, pq.FileMetaData)
81
82 assert table.equals(table1)
83
84
85 @pytest.mark.filterwarnings("ignore:ParquetDatasetPiece:DeprecationWarning")
86 def test_parquet_piece_basics():
87 path = '/baz.parq'
88
89 piece1 = pq.ParquetDatasetPiece(path)
90 piece2 = pq.ParquetDatasetPiece(path, row_group=1)
91 piece3 = pq.ParquetDatasetPiece(
92 path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)])
93
94 assert str(piece1) == path
95 assert str(piece2) == '/baz.parq | row_group=1'
96 assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1'
97
98 assert piece1 == piece1
99 assert piece2 == piece2
100 assert piece3 == piece3
101 assert piece1 != piece3
102
103
104 def test_partition_set_dictionary_type():
105 set1 = pq.PartitionSet('key1', ['foo', 'bar', 'baz'])
106 set2 = pq.PartitionSet('key2', [2007, 2008, 2009])
107
108 assert isinstance(set1.dictionary, pa.StringArray)
109 assert isinstance(set2.dictionary, pa.IntegerArray)
110
111 set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)])
112 with pytest.raises(TypeError):
113 set3.dictionary
114
115
116 @parametrize_legacy_dataset_fixed
117 def test_filesystem_uri(tempdir, use_legacy_dataset):
118 table = pa.table({"a": [1, 2, 3]})
119
120 directory = tempdir / "data_dir"
121 directory.mkdir()
122 path = directory / "data.parquet"
123 pq.write_table(table, str(path))
124
125 # filesystem object
126 result = pq.read_table(
127 path, filesystem=fs.LocalFileSystem(),
128 use_legacy_dataset=use_legacy_dataset)
129 assert result.equals(table)
130
131 # filesystem URI
132 result = pq.read_table(
133 "data_dir/data.parquet", filesystem=util._filesystem_uri(tempdir),
134 use_legacy_dataset=use_legacy_dataset)
135 assert result.equals(table)
136
137
138 @pytest.mark.pandas
139 @parametrize_legacy_dataset
140 def test_read_partitioned_directory(tempdir, use_legacy_dataset):
141 fs = LocalFileSystem._get_instance()
142 _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset)
143
144
145 @pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
146 @pytest.mark.pandas
147 def test_create_parquet_dataset_multi_threaded(tempdir):
148 fs = LocalFileSystem._get_instance()
149 base_path = tempdir
150
151 _partition_test_for_filesystem(fs, base_path)
152
153 manifest = pq.ParquetManifest(base_path, filesystem=fs,
154 metadata_nthreads=1)
155 dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16)
156 assert len(dataset.pieces) > 0
157 partitions = dataset.partitions
158 assert len(partitions.partition_names) > 0
159 assert partitions.partition_names == manifest.partitions.partition_names
160 assert len(partitions.levels) == len(manifest.partitions.levels)
161
162
163 @pytest.mark.pandas
164 @parametrize_legacy_dataset
165 def test_read_partitioned_columns_selection(tempdir, use_legacy_dataset):
166 # ARROW-3861 - do not include partition columns in resulting table when
167 # `columns` keyword was passed without those columns
168 fs = LocalFileSystem._get_instance()
169 base_path = tempdir
170 _partition_test_for_filesystem(fs, base_path)
171
172 dataset = pq.ParquetDataset(
173 base_path, use_legacy_dataset=use_legacy_dataset)
174 result = dataset.read(columns=["values"])
175 if use_legacy_dataset:
176 # ParquetDataset implementation always includes the partition columns
177 # automatically, and we can't easily "fix" this since dask relies on
178 # this behaviour (ARROW-8644)
179 assert result.column_names == ["values", "foo", "bar"]
180 else:
181 assert result.column_names == ["values"]
182
183
184 @pytest.mark.pandas
185 @parametrize_legacy_dataset
186 def test_filters_equivalency(tempdir, use_legacy_dataset):
187 fs = LocalFileSystem._get_instance()
188 base_path = tempdir
189
190 integer_keys = [0, 1]
191 string_keys = ['a', 'b', 'c']
192 boolean_keys = [True, False]
193 partition_spec = [
194 ['integer', integer_keys],
195 ['string', string_keys],
196 ['boolean', boolean_keys]
197 ]
198
199 df = pd.DataFrame({
200 'integer': np.array(integer_keys, dtype='i4').repeat(15),
201 'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
202 'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
203 3),
204 }, columns=['integer', 'string', 'boolean'])
205
206 _generate_partition_directories(fs, base_path, partition_spec, df)
207
208 # Old filters syntax:
209 # integer == 1 AND string != b AND boolean == True
210 dataset = pq.ParquetDataset(
211 base_path, filesystem=fs,
212 filters=[('integer', '=', 1), ('string', '!=', 'b'),
213 ('boolean', '==', 'True')],
214 use_legacy_dataset=use_legacy_dataset,
215 )
216 table = dataset.read()
217 result_df = (table.to_pandas().reset_index(drop=True))
218
219 assert 0 not in result_df['integer'].values
220 assert 'b' not in result_df['string'].values
221 assert False not in result_df['boolean'].values
222
223 # filters in disjunctive normal form:
224 # (integer == 1 AND string != b AND boolean == True) OR
225 # (integer == 2 AND boolean == False)
226 # TODO(ARROW-3388): boolean columns are reconstructed as string
227 filters = [
228 [
229 ('integer', '=', 1),
230 ('string', '!=', 'b'),
231 ('boolean', '==', 'True')
232 ],
233 [('integer', '=', 0), ('boolean', '==', 'False')]
234 ]
235 dataset = pq.ParquetDataset(
236 base_path, filesystem=fs, filters=filters,
237 use_legacy_dataset=use_legacy_dataset)
238 table = dataset.read()
239 result_df = table.to_pandas().reset_index(drop=True)
240
241 # Check that all rows in the DF fulfill the filter
242 # Pandas 0.23.x has problems with indexing constant memoryviews in
243 # categoricals. Thus we need to make an explicit copy here with np.array.
244 df_filter_1 = (np.array(result_df['integer']) == 1) \
245 & (np.array(result_df['string']) != 'b') \
246 & (np.array(result_df['boolean']) == 'True')
247 df_filter_2 = (np.array(result_df['integer']) == 0) \
248 & (np.array(result_df['boolean']) == 'False')
249 assert df_filter_1.sum() > 0
250 assert df_filter_2.sum() > 0
251 assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())
252
253 if use_legacy_dataset:
254 # Check for \0 in predicate values. Until they are correctly
255 # implemented in ARROW-3391, they would otherwise lead to weird
256 # results with the current code.
257 with pytest.raises(NotImplementedError):
258 filters = [[('string', '==', b'1\0a')]]
259 pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
260 with pytest.raises(NotImplementedError):
261 filters = [[('string', '==', '1\0a')]]
262 pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
263 else:
264 for filters in [[[('string', '==', b'1\0a')]],
265 [[('string', '==', '1\0a')]]]:
266 dataset = pq.ParquetDataset(
267 base_path, filesystem=fs, filters=filters,
268 use_legacy_dataset=False)
269 assert dataset.read().num_rows == 0
270
271
272 @pytest.mark.pandas
273 @parametrize_legacy_dataset
274 def test_filters_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
275 fs = LocalFileSystem._get_instance()
276 base_path = tempdir
277
278 integer_keys = [0, 1, 2, 3, 4]
279 partition_spec = [
280 ['integers', integer_keys],
281 ]
282 N = 5
283
284 df = pd.DataFrame({
285 'index': np.arange(N),
286 'integers': np.array(integer_keys, dtype='i4'),
287 }, columns=['index', 'integers'])
288
289 _generate_partition_directories(fs, base_path, partition_spec, df)
290
291 dataset = pq.ParquetDataset(
292 base_path, filesystem=fs,
293 filters=[
294 ('integers', '<', 4),
295 ('integers', '>', 1),
296 ],
297 use_legacy_dataset=use_legacy_dataset
298 )
299 table = dataset.read()
300 result_df = (table.to_pandas()
301 .sort_values(by='index')
302 .reset_index(drop=True))
303
304 result_list = [x for x in map(int, result_df['integers'].values)]
305 assert result_list == [2, 3]
306
307
308 @pytest.mark.pandas
309 @parametrize_legacy_dataset
310 @pytest.mark.xfail(
311 # different error with use_legacy_datasets because result_df is no longer
312 # categorical
313 raises=(TypeError, AssertionError),
314 reason='Loss of type information in creation of categoricals.'
315 )
316 def test_filters_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):
317 fs = LocalFileSystem._get_instance()
318 base_path = tempdir
319
320 date_keys = [
321 datetime.date(2018, 4, 9),
322 datetime.date(2018, 4, 10),
323 datetime.date(2018, 4, 11),
324 datetime.date(2018, 4, 12),
325 datetime.date(2018, 4, 13)
326 ]
327 partition_spec = [
328 ['dates', date_keys]
329 ]
330 N = 5
331
332 df = pd.DataFrame({
333 'index': np.arange(N),
334 'dates': np.array(date_keys, dtype='datetime64'),
335 }, columns=['index', 'dates'])
336
337 _generate_partition_directories(fs, base_path, partition_spec, df)
338
339 dataset = pq.ParquetDataset(
340 base_path, filesystem=fs,
341 filters=[
342 ('dates', '<', "2018-04-12"),
343 ('dates', '>', "2018-04-10")
344 ],
345 use_legacy_dataset=use_legacy_dataset
346 )
347 table = dataset.read()
348 result_df = (table.to_pandas()
349 .sort_values(by='index')
350 .reset_index(drop=True))
351
352 expected = pd.Categorical(
353 np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
354 categories=np.array(date_keys, dtype='datetime64'))
355
356 assert result_df['dates'].values == expected
357
358
359 @pytest.mark.pandas
360 @pytest.mark.dataset
361 def test_filters_inclusive_datetime(tempdir):
362 # ARROW-11480
363 path = tempdir / 'timestamps.parquet'
364
365 pd.DataFrame({
366 "dates": pd.date_range("2020-01-01", periods=10, freq="D"),
367 "id": range(10)
368 }).to_parquet(path, use_deprecated_int96_timestamps=True)
369
370 table = pq.read_table(path, filters=[
371 ("dates", "<=", datetime.datetime(2020, 1, 5))
372 ])
373
374 assert table.column('id').to_pylist() == [0, 1, 2, 3, 4]
375
376
377 @pytest.mark.pandas
378 @parametrize_legacy_dataset
379 def test_filters_inclusive_integer(tempdir, use_legacy_dataset):
380 fs = LocalFileSystem._get_instance()
381 base_path = tempdir
382
383 integer_keys = [0, 1, 2, 3, 4]
384 partition_spec = [
385 ['integers', integer_keys],
386 ]
387 N = 5
388
389 df = pd.DataFrame({
390 'index': np.arange(N),
391 'integers': np.array(integer_keys, dtype='i4'),
392 }, columns=['index', 'integers'])
393
394 _generate_partition_directories(fs, base_path, partition_spec, df)
395
396 dataset = pq.ParquetDataset(
397 base_path, filesystem=fs,
398 filters=[
399 ('integers', '<=', 3),
400 ('integers', '>=', 2),
401 ],
402 use_legacy_dataset=use_legacy_dataset
403 )
404 table = dataset.read()
405 result_df = (table.to_pandas()
406 .sort_values(by='index')
407 .reset_index(drop=True))
408
409 result_list = [int(x) for x in map(int, result_df['integers'].values)]
410 assert result_list == [2, 3]
411
412
413 @pytest.mark.pandas
414 @parametrize_legacy_dataset
415 def test_filters_inclusive_set(tempdir, use_legacy_dataset):
416 fs = LocalFileSystem._get_instance()
417 base_path = tempdir
418
419 integer_keys = [0, 1]
420 string_keys = ['a', 'b', 'c']
421 boolean_keys = [True, False]
422 partition_spec = [
423 ['integer', integer_keys],
424 ['string', string_keys],
425 ['boolean', boolean_keys]
426 ]
427
428 df = pd.DataFrame({
429 'integer': np.array(integer_keys, dtype='i4').repeat(15),
430 'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
431 'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
432 3),
433 }, columns=['integer', 'string', 'boolean'])
434
435 _generate_partition_directories(fs, base_path, partition_spec, df)
436
437 dataset = pq.ParquetDataset(
438 base_path, filesystem=fs,
439 filters=[('string', 'in', 'ab')],
440 use_legacy_dataset=use_legacy_dataset
441 )
442 table = dataset.read()
443 result_df = (table.to_pandas().reset_index(drop=True))
444
445 assert 'a' in result_df['string'].values
446 assert 'b' in result_df['string'].values
447 assert 'c' not in result_df['string'].values
448
449 dataset = pq.ParquetDataset(
450 base_path, filesystem=fs,
451 filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')),
452 ('boolean', 'not in', {False})],
453 use_legacy_dataset=use_legacy_dataset
454 )
455 table = dataset.read()
456 result_df = (table.to_pandas().reset_index(drop=True))
457
458 assert 0 not in result_df['integer'].values
459 assert 'c' not in result_df['string'].values
460 assert False not in result_df['boolean'].values
461
462
463 @pytest.mark.pandas
464 @parametrize_legacy_dataset
465 def test_filters_invalid_pred_op(tempdir, use_legacy_dataset):
466 fs = LocalFileSystem._get_instance()
467 base_path = tempdir
468
469 integer_keys = [0, 1, 2, 3, 4]
470 partition_spec = [
471 ['integers', integer_keys],
472 ]
473 N = 5
474
475 df = pd.DataFrame({
476 'index': np.arange(N),
477 'integers': np.array(integer_keys, dtype='i4'),
478 }, columns=['index', 'integers'])
479
480 _generate_partition_directories(fs, base_path, partition_spec, df)
481
482 with pytest.raises(TypeError):
483 pq.ParquetDataset(base_path,
484 filesystem=fs,
485 filters=[('integers', 'in', 3), ],
486 use_legacy_dataset=use_legacy_dataset)
487
488 with pytest.raises(ValueError):
489 pq.ParquetDataset(base_path,
490 filesystem=fs,
491 filters=[('integers', '=<', 3), ],
492 use_legacy_dataset=use_legacy_dataset)
493
494 if use_legacy_dataset:
495 with pytest.raises(ValueError):
496 pq.ParquetDataset(base_path,
497 filesystem=fs,
498 filters=[('integers', 'in', set()), ],
499 use_legacy_dataset=use_legacy_dataset)
500 else:
501 # Dataset API returns empty table instead
502 dataset = pq.ParquetDataset(base_path,
503 filesystem=fs,
504 filters=[('integers', 'in', set()), ],
505 use_legacy_dataset=use_legacy_dataset)
506 assert dataset.read().num_rows == 0
507
508 if use_legacy_dataset:
509 with pytest.raises(ValueError):
510 pq.ParquetDataset(base_path,
511 filesystem=fs,
512 filters=[('integers', '!=', {3})],
513 use_legacy_dataset=use_legacy_dataset)
514 else:
515 dataset = pq.ParquetDataset(base_path,
516 filesystem=fs,
517 filters=[('integers', '!=', {3})],
518 use_legacy_dataset=use_legacy_dataset)
519 with pytest.raises(NotImplementedError):
520 assert dataset.read().num_rows == 0
521
522
523 @pytest.mark.pandas
524 @parametrize_legacy_dataset_fixed
525 def test_filters_invalid_column(tempdir, use_legacy_dataset):
526 # ARROW-5572 - raise error on invalid name in filter specification
527 # works with new dataset / xfail with legacy implementation
528 fs = LocalFileSystem._get_instance()
529 base_path = tempdir
530
531 integer_keys = [0, 1, 2, 3, 4]
532 partition_spec = [['integers', integer_keys]]
533 N = 5
534
535 df = pd.DataFrame({
536 'index': np.arange(N),
537 'integers': np.array(integer_keys, dtype='i4'),
538 }, columns=['index', 'integers'])
539
540 _generate_partition_directories(fs, base_path, partition_spec, df)
541
542 msg = r"No match for FieldRef.Name\(non_existent_column\)"
543 with pytest.raises(ValueError, match=msg):
544 pq.ParquetDataset(base_path, filesystem=fs,
545 filters=[('non_existent_column', '<', 3), ],
546 use_legacy_dataset=use_legacy_dataset).read()
547
548
549 @pytest.mark.pandas
550 @parametrize_legacy_dataset
551 def test_filters_read_table(tempdir, use_legacy_dataset):
552 # test that filters keyword is passed through in read_table
553 fs = LocalFileSystem._get_instance()
554 base_path = tempdir
555
556 integer_keys = [0, 1, 2, 3, 4]
557 partition_spec = [
558 ['integers', integer_keys],
559 ]
560 N = 5
561
562 df = pd.DataFrame({
563 'index': np.arange(N),
564 'integers': np.array(integer_keys, dtype='i4'),
565 }, columns=['index', 'integers'])
566
567 _generate_partition_directories(fs, base_path, partition_spec, df)
568
569 table = pq.read_table(
570 base_path, filesystem=fs, filters=[('integers', '<', 3)],
571 use_legacy_dataset=use_legacy_dataset)
572 assert table.num_rows == 3
573
574 table = pq.read_table(
575 base_path, filesystem=fs, filters=[[('integers', '<', 3)]],
576 use_legacy_dataset=use_legacy_dataset)
577 assert table.num_rows == 3
578
579 table = pq.read_pandas(
580 base_path, filters=[('integers', '<', 3)],
581 use_legacy_dataset=use_legacy_dataset)
582 assert table.num_rows == 3
583
584
585 @pytest.mark.pandas
586 @parametrize_legacy_dataset_fixed
587 def test_partition_keys_with_underscores(tempdir, use_legacy_dataset):
588 # ARROW-5666 - partition field values with underscores preserve underscores
589 # xfail with legacy dataset -> they get interpreted as integers
590 fs = LocalFileSystem._get_instance()
591 base_path = tempdir
592
593 string_keys = ["2019_2", "2019_3"]
594 partition_spec = [
595 ['year_week', string_keys],
596 ]
597 N = 2
598
599 df = pd.DataFrame({
600 'index': np.arange(N),
601 'year_week': np.array(string_keys, dtype='object'),
602 }, columns=['index', 'year_week'])
603
604 _generate_partition_directories(fs, base_path, partition_spec, df)
605
606 dataset = pq.ParquetDataset(
607 base_path, use_legacy_dataset=use_legacy_dataset)
608 result = dataset.read()
609 assert result.column("year_week").to_pylist() == string_keys
610
611
612 @pytest.mark.s3
613 @parametrize_legacy_dataset
614 def test_read_s3fs(s3_example_s3fs, use_legacy_dataset):
615 fs, path = s3_example_s3fs
616 path = path + "/test.parquet"
617 table = pa.table({"a": [1, 2, 3]})
618 _write_table(table, path, filesystem=fs)
619
620 result = _read_table(
621 path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
622 )
623 assert result.equals(table)
624
625
626 @pytest.mark.s3
627 @parametrize_legacy_dataset
628 def test_read_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
629 fs, directory = s3_example_s3fs
630 path = directory + "/test.parquet"
631 table = pa.table({"a": [1, 2, 3]})
632 _write_table(table, path, filesystem=fs)
633
634 result = _read_table(
635 directory, filesystem=fs, use_legacy_dataset=use_legacy_dataset
636 )
637 assert result.equals(table)
638
639
640 @pytest.mark.pandas
641 @pytest.mark.s3
642 @parametrize_legacy_dataset
643 def test_read_partitioned_directory_s3fs_wrapper(
644 s3_example_s3fs, use_legacy_dataset
645 ):
646 import s3fs
647
648 from pyarrow.filesystem import S3FSWrapper
649
650 if Version(s3fs.__version__) >= Version("0.5"):
651 pytest.skip("S3FSWrapper no longer working for s3fs 0.5+")
652
653 fs, path = s3_example_s3fs
654 with pytest.warns(FutureWarning):
655 wrapper = S3FSWrapper(fs)
656 _partition_test_for_filesystem(wrapper, path)
657
658 # Check that we can auto-wrap
659 dataset = pq.ParquetDataset(
660 path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
661 )
662 dataset.read()
663
664
665 @pytest.mark.pandas
666 @pytest.mark.s3
667 @parametrize_legacy_dataset
668 def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
669 fs, path = s3_example_s3fs
670 _partition_test_for_filesystem(
671 fs, path, use_legacy_dataset=use_legacy_dataset
672 )
673
674
675 def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True):
676 foo_keys = [0, 1]
677 bar_keys = ['a', 'b', 'c']
678 partition_spec = [
679 ['foo', foo_keys],
680 ['bar', bar_keys]
681 ]
682 N = 30
683
684 df = pd.DataFrame({
685 'index': np.arange(N),
686 'foo': np.array(foo_keys, dtype='i4').repeat(15),
687 'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
688 'values': np.random.randn(N)
689 }, columns=['index', 'foo', 'bar', 'values'])
690
691 _generate_partition_directories(fs, base_path, partition_spec, df)
692
693 dataset = pq.ParquetDataset(
694 base_path, filesystem=fs, use_legacy_dataset=use_legacy_dataset)
695 table = dataset.read()
696 result_df = (table.to_pandas()
697 .sort_values(by='index')
698 .reset_index(drop=True))
699
700 expected_df = (df.sort_values(by='index')
701 .reset_index(drop=True)
702 .reindex(columns=result_df.columns))
703
704 expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
705 expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
706
707 assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
708
709 tm.assert_frame_equal(result_df, expected_df)
710
711
712 def _generate_partition_directories(fs, base_dir, partition_spec, df):
713 # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
714 # ['bar', ['a', 'b', 'c']]
715 # part_table : a pyarrow.Table to write to each partition
716 DEPTH = len(partition_spec)
717
718 pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/"))
719
720 def _visit_level(base_dir, level, part_keys):
721 name, values = partition_spec[level]
722 for value in values:
723 this_part_keys = part_keys + [(name, value)]
724
725 level_dir = pathsep.join([
726 str(base_dir),
727 '{}={}'.format(name, value)
728 ])
729 fs.mkdir(level_dir)
730
731 if level == DEPTH - 1:
732 # Generate example data
733 file_path = pathsep.join([level_dir, guid()])
734 filtered_df = _filter_partition(df, this_part_keys)
735 part_table = pa.Table.from_pandas(filtered_df)
736 with fs.open(file_path, 'wb') as f:
737 _write_table(part_table, f)
738 assert fs.exists(file_path)
739
740 file_success = pathsep.join([level_dir, '_SUCCESS'])
741 with fs.open(file_success, 'wb') as f:
742 pass
743 else:
744 _visit_level(level_dir, level + 1, this_part_keys)
745 file_success = pathsep.join([level_dir, '_SUCCESS'])
746 with fs.open(file_success, 'wb') as f:
747 pass
748
749 _visit_level(base_dir, 0, [])
750
751
752 def _test_read_common_metadata_files(fs, base_path):
753 import pandas as pd
754
755 import pyarrow.parquet as pq
756
757 N = 100
758 df = pd.DataFrame({
759 'index': np.arange(N),
760 'values': np.random.randn(N)
761 }, columns=['index', 'values'])
762
763 base_path = str(base_path)
764 data_path = os.path.join(base_path, 'data.parquet')
765
766 table = pa.Table.from_pandas(df)
767
768 with fs.open(data_path, 'wb') as f:
769 _write_table(table, f)
770
771 metadata_path = os.path.join(base_path, '_common_metadata')
772 with fs.open(metadata_path, 'wb') as f:
773 pq.write_metadata(table.schema, f)
774
775 dataset = pq.ParquetDataset(base_path, filesystem=fs)
776 assert dataset.common_metadata_path == str(metadata_path)
777
778 with fs.open(data_path) as f:
779 common_schema = pq.read_metadata(f).schema
780 assert dataset.schema.equals(common_schema)
781
782 # handle list of one directory
783 dataset2 = pq.ParquetDataset([base_path], filesystem=fs)
784 assert dataset2.schema.equals(dataset.schema)
785
786
787 @pytest.mark.pandas
788 def test_read_common_metadata_files(tempdir):
789 fs = LocalFileSystem._get_instance()
790 _test_read_common_metadata_files(fs, tempdir)
791
792
793 @pytest.mark.pandas
794 def test_read_metadata_files(tempdir):
795 fs = LocalFileSystem._get_instance()
796
797 N = 100
798 df = pd.DataFrame({
799 'index': np.arange(N),
800 'values': np.random.randn(N)
801 }, columns=['index', 'values'])
802
803 data_path = tempdir / 'data.parquet'
804
805 table = pa.Table.from_pandas(df)
806
807 with fs.open(data_path, 'wb') as f:
808 _write_table(table, f)
809
810 metadata_path = tempdir / '_metadata'
811 with fs.open(metadata_path, 'wb') as f:
812 pq.write_metadata(table.schema, f)
813
814 dataset = pq.ParquetDataset(tempdir, filesystem=fs)
815 assert dataset.metadata_path == str(metadata_path)
816
817 with fs.open(data_path) as f:
818 metadata_schema = pq.read_metadata(f).schema
819 assert dataset.schema.equals(metadata_schema)
820
821
822 def _filter_partition(df, part_keys):
823 predicate = np.ones(len(df), dtype=bool)
824
825 to_drop = []
826 for name, value in part_keys:
827 to_drop.append(name)
828
829 # to avoid pandas warning
830 if isinstance(value, (datetime.date, datetime.datetime)):
831 value = pd.Timestamp(value)
832
833 predicate &= df[name] == value
834
835 return df[predicate].drop(to_drop, axis=1)
836
837
838 @parametrize_legacy_dataset
839 @pytest.mark.pandas
840 def test_filter_before_validate_schema(tempdir, use_legacy_dataset):
841 # ARROW-4076 apply filter before schema validation
842 # to avoid checking unneeded schemas
843
844 # create partitioned dataset with mismatching schemas which would
845 # otherwise raise if first validation all schemas
846 dir1 = tempdir / 'A=0'
847 dir1.mkdir()
848 table1 = pa.Table.from_pandas(pd.DataFrame({'B': [1, 2, 3]}))
849 pq.write_table(table1, dir1 / 'data.parquet')
850
851 dir2 = tempdir / 'A=1'
852 dir2.mkdir()
853 table2 = pa.Table.from_pandas(pd.DataFrame({'B': ['a', 'b', 'c']}))
854 pq.write_table(table2, dir2 / 'data.parquet')
855
856 # read single file using filter
857 table = pq.read_table(tempdir, filters=[[('A', '==', 0)]],
858 use_legacy_dataset=use_legacy_dataset)
859 assert table.column('B').equals(pa.chunked_array([[1, 2, 3]]))
860
861
862 @pytest.mark.pandas
863 @parametrize_legacy_dataset
864 def test_read_multiple_files(tempdir, use_legacy_dataset):
865 nfiles = 10
866 size = 5
867
868 dirpath = tempdir / guid()
869 dirpath.mkdir()
870
871 test_data = []
872 paths = []
873 for i in range(nfiles):
874 df = _test_dataframe(size, seed=i)
875
876 # Hack so that we don't have a dtype cast in v1 files
877 df['uint32'] = df['uint32'].astype(np.int64)
878
879 path = dirpath / '{}.parquet'.format(i)
880
881 table = pa.Table.from_pandas(df)
882 _write_table(table, path)
883
884 test_data.append(table)
885 paths.append(path)
886
887 # Write a _SUCCESS.crc file
888 (dirpath / '_SUCCESS.crc').touch()
889
890 def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
891 dataset = pq.ParquetDataset(
892 paths, use_legacy_dataset=use_legacy_dataset, **kwargs)
893 return dataset.read(columns=columns, use_threads=use_threads)
894
895 result = read_multiple_files(paths)
896 expected = pa.concat_tables(test_data)
897
898 assert result.equals(expected)
899
900 # Read with provided metadata
901 # TODO(dataset) specifying metadata not yet supported
902 metadata = pq.read_metadata(paths[0])
903 if use_legacy_dataset:
904 result2 = read_multiple_files(paths, metadata=metadata)
905 assert result2.equals(expected)
906
907 result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read()
908 assert result3.equals(expected)
909 else:
910 with pytest.raises(ValueError, match="no longer supported"):
911 pq.read_table(paths, metadata=metadata, use_legacy_dataset=False)
912
913 # Read column subset
914 to_read = [0, 2, 6, result.num_columns - 1]
915
916 col_names = [result.field(i).name for i in to_read]
917 out = pq.read_table(
918 dirpath, columns=col_names, use_legacy_dataset=use_legacy_dataset
919 )
920 expected = pa.Table.from_arrays([result.column(i) for i in to_read],
921 names=col_names,
922 metadata=result.schema.metadata)
923 assert out.equals(expected)
924
925 # Read with multiple threads
926 pq.read_table(
927 dirpath, use_threads=True, use_legacy_dataset=use_legacy_dataset
928 )
929
930 # Test failure modes with non-uniform metadata
931 bad_apple = _test_dataframe(size, seed=i).iloc[:, :4]
932 bad_apple_path = tempdir / '{}.parquet'.format(guid())
933
934 t = pa.Table.from_pandas(bad_apple)
935 _write_table(t, bad_apple_path)
936
937 if not use_legacy_dataset:
938 # TODO(dataset) Dataset API skips bad files
939 return
940
941 bad_meta = pq.read_metadata(bad_apple_path)
942
943 with pytest.raises(ValueError):
944 read_multiple_files(paths + [bad_apple_path])
945
946 with pytest.raises(ValueError):
947 read_multiple_files(paths, metadata=bad_meta)
948
949 mixed_paths = [bad_apple_path, paths[0]]
950
951 with pytest.raises(ValueError):
952 read_multiple_files(mixed_paths, schema=bad_meta.schema)
953
954 with pytest.raises(ValueError):
955 read_multiple_files(mixed_paths)
956
957
958 @pytest.mark.pandas
959 @parametrize_legacy_dataset
960 def test_dataset_read_pandas(tempdir, use_legacy_dataset):
961 nfiles = 5
962 size = 5
963
964 dirpath = tempdir / guid()
965 dirpath.mkdir()
966
967 test_data = []
968 frames = []
969 paths = []
970 for i in range(nfiles):
971 df = _test_dataframe(size, seed=i)
972 df.index = np.arange(i * size, (i + 1) * size)
973 df.index.name = 'index'
974
975 path = dirpath / '{}.parquet'.format(i)
976
977 table = pa.Table.from_pandas(df)
978 _write_table(table, path)
979 test_data.append(table)
980 frames.append(df)
981 paths.append(path)
982
983 dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
984 columns = ['uint8', 'strings']
985 result = dataset.read_pandas(columns=columns).to_pandas()
986 expected = pd.concat([x[columns] for x in frames])
987
988 tm.assert_frame_equal(result, expected)
989
990 # also be able to pass the columns as a set (ARROW-12314)
991 result = dataset.read_pandas(columns=set(columns)).to_pandas()
992 assert result.shape == expected.shape
993 # column order can be different because of using a set
994 tm.assert_frame_equal(result.reindex(columns=expected.columns), expected)
995
996
997 @pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
998 @pytest.mark.pandas
999 @parametrize_legacy_dataset
1000 def test_dataset_memory_map(tempdir, use_legacy_dataset):
1001 # ARROW-2627: Check that we can use ParquetDataset with memory-mapping
1002 dirpath = tempdir / guid()
1003 dirpath.mkdir()
1004
1005 df = _test_dataframe(10, seed=0)
1006 path = dirpath / '{}.parquet'.format(0)
1007 table = pa.Table.from_pandas(df)
1008 _write_table(table, path, version='2.6')
1009
1010 dataset = pq.ParquetDataset(
1011 dirpath, memory_map=True, use_legacy_dataset=use_legacy_dataset)
1012 assert dataset.read().equals(table)
1013 if use_legacy_dataset:
1014 assert dataset.pieces[0].read().equals(table)
1015
1016
1017 @pytest.mark.pandas
1018 @parametrize_legacy_dataset
1019 def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
1020 dirpath = tempdir / guid()
1021 dirpath.mkdir()
1022
1023 df = _test_dataframe(10, seed=0)
1024 path = dirpath / '{}.parquet'.format(0)
1025 table = pa.Table.from_pandas(df)
1026 _write_table(table, path, version='2.6')
1027
1028 with pytest.raises(ValueError):
1029 pq.ParquetDataset(
1030 dirpath, buffer_size=-64,
1031 use_legacy_dataset=use_legacy_dataset)
1032
1033 for buffer_size in [128, 1024]:
1034 dataset = pq.ParquetDataset(
1035 dirpath, buffer_size=buffer_size,
1036 use_legacy_dataset=use_legacy_dataset)
1037 assert dataset.read().equals(table)
1038
1039
1040 @pytest.mark.pandas
1041 @parametrize_legacy_dataset
1042 def test_dataset_enable_pre_buffer(tempdir, use_legacy_dataset):
1043 dirpath = tempdir / guid()
1044 dirpath.mkdir()
1045
1046 df = _test_dataframe(10, seed=0)
1047 path = dirpath / '{}.parquet'.format(0)
1048 table = pa.Table.from_pandas(df)
1049 _write_table(table, path, version='2.6')
1050
1051 for pre_buffer in (True, False):
1052 dataset = pq.ParquetDataset(
1053 dirpath, pre_buffer=pre_buffer,
1054 use_legacy_dataset=use_legacy_dataset)
1055 assert dataset.read().equals(table)
1056 actual = pq.read_table(dirpath, pre_buffer=pre_buffer,
1057 use_legacy_dataset=use_legacy_dataset)
1058 assert actual.equals(table)
1059
1060
1061 def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
1062 test_data = []
1063 paths = []
1064 for i in range(nfiles):
1065 df = _test_dataframe(file_nrows, seed=i)
1066 path = base_path / '{}.parquet'.format(i)
1067
1068 test_data.append(_write_table(df, path))
1069 paths.append(path)
1070 return paths
1071
1072
1073 def _assert_dataset_paths(dataset, paths, use_legacy_dataset):
1074 if use_legacy_dataset:
1075 assert set(map(str, paths)) == {x.path for x in dataset._pieces}
1076 else:
1077 paths = [str(path.as_posix()) for path in paths]
1078 assert set(paths) == set(dataset._dataset.files)
1079
1080
1081 @pytest.mark.pandas
1082 @parametrize_legacy_dataset
1083 @pytest.mark.parametrize('dir_prefix', ['_', '.'])
1084 def test_ignore_private_directories(tempdir, dir_prefix, use_legacy_dataset):
1085 dirpath = tempdir / guid()
1086 dirpath.mkdir()
1087
1088 paths = _make_example_multifile_dataset(dirpath, nfiles=10,
1089 file_nrows=5)
1090
1091 # private directory
1092 (dirpath / '{}staging'.format(dir_prefix)).mkdir()
1093
1094 dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
1095
1096 _assert_dataset_paths(dataset, paths, use_legacy_dataset)
1097
1098
1099 @pytest.mark.pandas
1100 @parametrize_legacy_dataset
1101 def test_ignore_hidden_files_dot(tempdir, use_legacy_dataset):
1102 dirpath = tempdir / guid()
1103 dirpath.mkdir()
1104
1105 paths = _make_example_multifile_dataset(dirpath, nfiles=10,
1106 file_nrows=5)
1107
1108 with (dirpath / '.DS_Store').open('wb') as f:
1109 f.write(b'gibberish')
1110
1111 with (dirpath / '.private').open('wb') as f:
1112 f.write(b'gibberish')
1113
1114 dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
1115
1116 _assert_dataset_paths(dataset, paths, use_legacy_dataset)
1117
1118
1119 @pytest.mark.pandas
1120 @parametrize_legacy_dataset
1121 def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset):
1122 dirpath = tempdir / guid()
1123 dirpath.mkdir()
1124
1125 paths = _make_example_multifile_dataset(dirpath, nfiles=10,
1126 file_nrows=5)
1127
1128 with (dirpath / '_committed_123').open('wb') as f:
1129 f.write(b'abcd')
1130
1131 with (dirpath / '_started_321').open('wb') as f:
1132 f.write(b'abcd')
1133
1134 dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
1135
1136 _assert_dataset_paths(dataset, paths, use_legacy_dataset)
1137
1138
1139 @pytest.mark.pandas
1140 @parametrize_legacy_dataset
1141 @pytest.mark.parametrize('dir_prefix', ['_', '.'])
1142 def test_ignore_no_private_directories_in_base_path(
1143 tempdir, dir_prefix, use_legacy_dataset
1144 ):
1145 # ARROW-8427 - don't ignore explicitly listed files if parent directory
1146 # is a private directory
1147 dirpath = tempdir / "{0}data".format(dir_prefix) / guid()
1148 dirpath.mkdir(parents=True)
1149
1150 paths = _make_example_multifile_dataset(dirpath, nfiles=10,
1151 file_nrows=5)
1152
1153 dataset = pq.ParquetDataset(paths, use_legacy_dataset=use_legacy_dataset)
1154 _assert_dataset_paths(dataset, paths, use_legacy_dataset)
1155
1156 # ARROW-9644 - don't ignore full directory with underscore in base path
1157 dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
1158 _assert_dataset_paths(dataset, paths, use_legacy_dataset)
1159
1160
1161 @pytest.mark.pandas
1162 @parametrize_legacy_dataset_fixed
1163 def test_ignore_custom_prefixes(tempdir, use_legacy_dataset):
1164 # ARROW-9573 - allow override of default ignore_prefixes
1165 part = ["xxx"] * 3 + ["yyy"] * 3
1166 table = pa.table([
1167 pa.array(range(len(part))),
1168 pa.array(part).dictionary_encode(),
1169 ], names=['index', '_part'])
1170
1171 # TODO use_legacy_dataset ARROW-10247
1172 pq.write_to_dataset(table, str(tempdir), partition_cols=['_part'])
1173
1174 private_duplicate = tempdir / '_private_duplicate'
1175 private_duplicate.mkdir()
1176 pq.write_to_dataset(table, str(private_duplicate),
1177 partition_cols=['_part'])
1178
1179 read = pq.read_table(
1180 tempdir, use_legacy_dataset=use_legacy_dataset,
1181 ignore_prefixes=['_private'])
1182
1183 assert read.equals(table)
1184
1185
1186 @parametrize_legacy_dataset_fixed
1187 def test_empty_directory(tempdir, use_legacy_dataset):
1188 # ARROW-5310 - reading empty directory
1189 # fails with legacy implementation
1190 empty_dir = tempdir / 'dataset'
1191 empty_dir.mkdir()
1192
1193 dataset = pq.ParquetDataset(
1194 empty_dir, use_legacy_dataset=use_legacy_dataset)
1195 result = dataset.read()
1196 assert result.num_rows == 0
1197 assert result.num_columns == 0
1198
1199
1200 def _test_write_to_dataset_with_partitions(base_path,
1201 use_legacy_dataset=True,
1202 filesystem=None,
1203 schema=None,
1204 index_name=None):
1205 import pandas as pd
1206 import pandas.testing as tm
1207
1208 import pyarrow.parquet as pq
1209
1210 # ARROW-1400
1211 output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
1212 'group2': list('eefeffgeee'),
1213 'num': list(range(10)),
1214 'nan': [np.nan] * 10,
1215 'date': np.arange('2017-01-01', '2017-01-11',
1216 dtype='datetime64[D]')})
1217 cols = output_df.columns.tolist()
1218 partition_by = ['group1', 'group2']
1219 output_table = pa.Table.from_pandas(output_df, schema=schema, safe=False,
1220 preserve_index=False)
1221 pq.write_to_dataset(output_table, base_path, partition_by,
1222 filesystem=filesystem,
1223 use_legacy_dataset=use_legacy_dataset)
1224
1225 metadata_path = os.path.join(str(base_path), '_common_metadata')
1226
1227 if filesystem is not None:
1228 with filesystem.open(metadata_path, 'wb') as f:
1229 pq.write_metadata(output_table.schema, f)
1230 else:
1231 pq.write_metadata(output_table.schema, metadata_path)
1232
1233 # ARROW-2891: Ensure the output_schema is preserved when writing a
1234 # partitioned dataset
1235 dataset = pq.ParquetDataset(base_path,
1236 filesystem=filesystem,
1237 validate_schema=True,
1238 use_legacy_dataset=use_legacy_dataset)
1239 # ARROW-2209: Ensure the dataset schema also includes the partition columns
1240 if use_legacy_dataset:
1241 dataset_cols = set(dataset.schema.to_arrow_schema().names)
1242 else:
1243 # NB schema property is an arrow and not parquet schema
1244 dataset_cols = set(dataset.schema.names)
1245
1246 assert dataset_cols == set(output_table.schema.names)
1247
1248 input_table = dataset.read()
1249 input_df = input_table.to_pandas()
1250
1251 # Read data back in and compare with original DataFrame
1252 # Partitioned columns added to the end of the DataFrame when read
1253 input_df_cols = input_df.columns.tolist()
1254 assert partition_by == input_df_cols[-1 * len(partition_by):]
1255
1256 input_df = input_df[cols]
1257 # Partitioned columns become 'categorical' dtypes
1258 for col in partition_by:
1259 output_df[col] = output_df[col].astype('category')
1260 tm.assert_frame_equal(output_df, input_df)
1261
1262
1263 def _test_write_to_dataset_no_partitions(base_path,
1264 use_legacy_dataset=True,
1265 filesystem=None):
1266 import pandas as pd
1267
1268 import pyarrow.parquet as pq
1269
1270 # ARROW-1400
1271 output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
1272 'group2': list('eefeffgeee'),
1273 'num': list(range(10)),
1274 'date': np.arange('2017-01-01', '2017-01-11',
1275 dtype='datetime64[D]')})
1276 cols = output_df.columns.tolist()
1277 output_table = pa.Table.from_pandas(output_df)
1278
1279 if filesystem is None:
1280 filesystem = LocalFileSystem._get_instance()
1281
1282 # Without partitions, append files to root_path
1283 n = 5
1284 for i in range(n):
1285 pq.write_to_dataset(output_table, base_path,
1286 filesystem=filesystem)
1287 output_files = [file for file in filesystem.ls(str(base_path))
1288 if file.endswith(".parquet")]
1289 assert len(output_files) == n
1290
1291 # Deduplicated incoming DataFrame should match
1292 # original outgoing Dataframe
1293 input_table = pq.ParquetDataset(
1294 base_path, filesystem=filesystem,
1295 use_legacy_dataset=use_legacy_dataset
1296 ).read()
1297 input_df = input_table.to_pandas()
1298 input_df = input_df.drop_duplicates()
1299 input_df = input_df[cols]
1300 assert output_df.equals(input_df)
1301
1302
1303 @pytest.mark.pandas
1304 @parametrize_legacy_dataset
1305 def test_write_to_dataset_with_partitions(tempdir, use_legacy_dataset):
1306 _test_write_to_dataset_with_partitions(str(tempdir), use_legacy_dataset)
1307
1308
1309 @pytest.mark.pandas
1310 @parametrize_legacy_dataset
1311 def test_write_to_dataset_with_partitions_and_schema(
1312 tempdir, use_legacy_dataset
1313 ):
1314 schema = pa.schema([pa.field('group1', type=pa.string()),
1315 pa.field('group2', type=pa.string()),
1316 pa.field('num', type=pa.int64()),
1317 pa.field('nan', type=pa.int32()),
1318 pa.field('date', type=pa.timestamp(unit='us'))])
1319 _test_write_to_dataset_with_partitions(
1320 str(tempdir), use_legacy_dataset, schema=schema)
1321
1322
1323 @pytest.mark.pandas
1324 @parametrize_legacy_dataset
1325 def test_write_to_dataset_with_partitions_and_index_name(
1326 tempdir, use_legacy_dataset
1327 ):
1328 _test_write_to_dataset_with_partitions(
1329 str(tempdir), use_legacy_dataset, index_name='index_name')
1330
1331
1332 @pytest.mark.pandas
1333 @parametrize_legacy_dataset
1334 def test_write_to_dataset_no_partitions(tempdir, use_legacy_dataset):
1335 _test_write_to_dataset_no_partitions(str(tempdir), use_legacy_dataset)
1336
1337
1338 @pytest.mark.pandas
1339 @parametrize_legacy_dataset
1340 def test_write_to_dataset_pathlib(tempdir, use_legacy_dataset):
1341 _test_write_to_dataset_with_partitions(
1342 tempdir / "test1", use_legacy_dataset)
1343 _test_write_to_dataset_no_partitions(
1344 tempdir / "test2", use_legacy_dataset)
1345
1346
1347 @pytest.mark.pandas
1348 @pytest.mark.s3
1349 @parametrize_legacy_dataset
1350 def test_write_to_dataset_pathlib_nonlocal(
1351 tempdir, s3_example_s3fs, use_legacy_dataset
1352 ):
1353 # pathlib paths are only accepted for local files
1354 fs, _ = s3_example_s3fs
1355
1356 with pytest.raises(TypeError, match="path-like objects are only allowed"):
1357 _test_write_to_dataset_with_partitions(
1358 tempdir / "test1", use_legacy_dataset, filesystem=fs)
1359
1360 with pytest.raises(TypeError, match="path-like objects are only allowed"):
1361 _test_write_to_dataset_no_partitions(
1362 tempdir / "test2", use_legacy_dataset, filesystem=fs)
1363
1364
1365 @pytest.mark.pandas
1366 @pytest.mark.s3
1367 @parametrize_legacy_dataset
1368 def test_write_to_dataset_with_partitions_s3fs(
1369 s3_example_s3fs, use_legacy_dataset
1370 ):
1371 fs, path = s3_example_s3fs
1372
1373 _test_write_to_dataset_with_partitions(
1374 path, use_legacy_dataset, filesystem=fs)
1375
1376
1377 @pytest.mark.pandas
1378 @pytest.mark.s3
1379 @parametrize_legacy_dataset
1380 def test_write_to_dataset_no_partitions_s3fs(
1381 s3_example_s3fs, use_legacy_dataset
1382 ):
1383 fs, path = s3_example_s3fs
1384
1385 _test_write_to_dataset_no_partitions(
1386 path, use_legacy_dataset, filesystem=fs)
1387
1388
1389 @pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
1390 @pytest.mark.pandas
1391 @parametrize_legacy_dataset_not_supported
1392 def test_write_to_dataset_with_partitions_and_custom_filenames(
1393 tempdir, use_legacy_dataset
1394 ):
1395 output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
1396 'group2': list('eefeffgeee'),
1397 'num': list(range(10)),
1398 'nan': [np.nan] * 10,
1399 'date': np.arange('2017-01-01', '2017-01-11',
1400 dtype='datetime64[D]')})
1401 partition_by = ['group1', 'group2']
1402 output_table = pa.Table.from_pandas(output_df)
1403 path = str(tempdir)
1404
1405 def partition_filename_callback(keys):
1406 return "{}-{}.parquet".format(*keys)
1407
1408 pq.write_to_dataset(output_table, path,
1409 partition_by, partition_filename_callback,
1410 use_legacy_dataset=use_legacy_dataset)
1411
1412 dataset = pq.ParquetDataset(path)
1413
1414 # ARROW-3538: Ensure partition filenames match the given pattern
1415 # defined in the local function partition_filename_callback
1416 expected_basenames = [
1417 'a-e.parquet', 'a-f.parquet',
1418 'b-e.parquet', 'b-f.parquet',
1419 'b-g.parquet', 'c-e.parquet'
1420 ]
1421 output_basenames = [os.path.basename(p.path) for p in dataset.pieces]
1422
1423 assert sorted(expected_basenames) == sorted(output_basenames)
1424
1425
1426 @pytest.mark.dataset
1427 @pytest.mark.pandas
1428 def test_write_to_dataset_filesystem(tempdir):
1429 df = pd.DataFrame({'A': [1, 2, 3]})
1430 table = pa.Table.from_pandas(df)
1431 path = str(tempdir)
1432
1433 pq.write_to_dataset(table, path, filesystem=fs.LocalFileSystem())
1434 result = pq.read_table(path)
1435 assert result.equals(table)
1436
1437
1438 # TODO(dataset) support pickling
1439 def _make_dataset_for_pickling(tempdir, N=100):
1440 path = tempdir / 'data.parquet'
1441 fs = LocalFileSystem._get_instance()
1442
1443 df = pd.DataFrame({
1444 'index': np.arange(N),
1445 'values': np.random.randn(N)
1446 }, columns=['index', 'values'])
1447 table = pa.Table.from_pandas(df)
1448
1449 num_groups = 3
1450 with pq.ParquetWriter(path, table.schema) as writer:
1451 for i in range(num_groups):
1452 writer.write_table(table)
1453
1454 reader = pq.ParquetFile(path)
1455 assert reader.metadata.num_row_groups == num_groups
1456
1457 metadata_path = tempdir / '_metadata'
1458 with fs.open(metadata_path, 'wb') as f:
1459 pq.write_metadata(table.schema, f)
1460
1461 dataset = pq.ParquetDataset(tempdir, filesystem=fs)
1462 assert dataset.metadata_path == str(metadata_path)
1463
1464 return dataset
1465
1466
1467 def _assert_dataset_is_picklable(dataset, pickler):
1468 def is_pickleable(obj):
1469 return obj == pickler.loads(pickler.dumps(obj))
1470
1471 assert is_pickleable(dataset)
1472 assert is_pickleable(dataset.metadata)
1473 assert is_pickleable(dataset.metadata.schema)
1474 assert len(dataset.metadata.schema)
1475 for column in dataset.metadata.schema:
1476 assert is_pickleable(column)
1477
1478 for piece in dataset._pieces:
1479 assert is_pickleable(piece)
1480 metadata = piece.get_metadata()
1481 assert metadata.num_row_groups
1482 for i in range(metadata.num_row_groups):
1483 assert is_pickleable(metadata.row_group(i))
1484
1485
1486 @pytest.mark.pandas
1487 def test_builtin_pickle_dataset(tempdir, datadir):
1488 import pickle
1489 dataset = _make_dataset_for_pickling(tempdir)
1490 _assert_dataset_is_picklable(dataset, pickler=pickle)
1491
1492
1493 @pytest.mark.pandas
1494 def test_cloudpickle_dataset(tempdir, datadir):
1495 cp = pytest.importorskip('cloudpickle')
1496 dataset = _make_dataset_for_pickling(tempdir)
1497 _assert_dataset_is_picklable(dataset, pickler=cp)
1498
1499
1500 @pytest.mark.pandas
1501 @parametrize_legacy_dataset
1502 def test_partitioned_dataset(tempdir, use_legacy_dataset):
1503 # ARROW-3208: Segmentation fault when reading a Parquet partitioned dataset
1504 # to a Parquet file
1505 path = tempdir / "ARROW-3208"
1506 df = pd.DataFrame({
1507 'one': [-1, 10, 2.5, 100, 1000, 1, 29.2],
1508 'two': [-1, 10, 2, 100, 1000, 1, 11],
1509 'three': [0, 0, 0, 0, 0, 0, 0]
1510 })
1511 table = pa.Table.from_pandas(df)
1512 pq.write_to_dataset(table, root_path=str(path),
1513 partition_cols=['one', 'two'])
1514 table = pq.ParquetDataset(
1515 path, use_legacy_dataset=use_legacy_dataset).read()
1516 pq.write_table(table, path / "output.parquet")
1517
1518
1519 @pytest.mark.pandas
1520 @parametrize_legacy_dataset
1521 def test_dataset_read_dictionary(tempdir, use_legacy_dataset):
1522 path = tempdir / "ARROW-3325-dataset"
1523 t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
1524 t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
1525 # TODO pass use_legacy_dataset (need to fix unique names)
1526 pq.write_to_dataset(t1, root_path=str(path))
1527 pq.write_to_dataset(t2, root_path=str(path))
1528
1529 result = pq.ParquetDataset(
1530 path, read_dictionary=['f0'],
1531 use_legacy_dataset=use_legacy_dataset).read()
1532
1533 # The order of the chunks is non-deterministic
1534 ex_chunks = [t1[0].chunk(0).dictionary_encode(),
1535 t2[0].chunk(0).dictionary_encode()]
1536
1537 assert result[0].num_chunks == 2
1538 c0, c1 = result[0].chunk(0), result[0].chunk(1)
1539 if c0.equals(ex_chunks[0]):
1540 assert c1.equals(ex_chunks[1])
1541 else:
1542 assert c0.equals(ex_chunks[1])
1543 assert c1.equals(ex_chunks[0])
1544
1545
1546 @pytest.mark.dataset
1547 def test_dataset_unsupported_keywords():
1548
1549 with pytest.raises(ValueError, match="not yet supported with the new"):
1550 pq.ParquetDataset("", use_legacy_dataset=False, schema=pa.schema([]))
1551
1552 with pytest.raises(ValueError, match="not yet supported with the new"):
1553 pq.ParquetDataset("", use_legacy_dataset=False, metadata=pa.schema([]))
1554
1555 with pytest.raises(ValueError, match="not yet supported with the new"):
1556 pq.ParquetDataset("", use_legacy_dataset=False, validate_schema=False)
1557
1558 with pytest.raises(ValueError, match="not yet supported with the new"):
1559 pq.ParquetDataset("", use_legacy_dataset=False, split_row_groups=True)
1560
1561 with pytest.raises(ValueError, match="not yet supported with the new"):
1562 pq.ParquetDataset("", use_legacy_dataset=False, metadata_nthreads=4)
1563
1564 with pytest.raises(ValueError, match="no longer supported"):
1565 pq.read_table("", use_legacy_dataset=False, metadata=pa.schema([]))
1566
1567
1568 @pytest.mark.dataset
1569 def test_dataset_partitioning(tempdir):
1570 import pyarrow.dataset as ds
1571
1572 # create small dataset with directory partitioning
1573 root_path = tempdir / "test_partitioning"
1574 (root_path / "2012" / "10" / "01").mkdir(parents=True)
1575
1576 table = pa.table({'a': [1, 2, 3]})
1577 pq.write_table(
1578 table, str(root_path / "2012" / "10" / "01" / "data.parquet"))
1579
1580 # This works with new dataset API
1581
1582 # read_table
1583 part = ds.partitioning(field_names=["year", "month", "day"])
1584 result = pq.read_table(
1585 str(root_path), partitioning=part, use_legacy_dataset=False)
1586 assert result.column_names == ["a", "year", "month", "day"]
1587
1588 result = pq.ParquetDataset(
1589 str(root_path), partitioning=part, use_legacy_dataset=False).read()
1590 assert result.column_names == ["a", "year", "month", "day"]
1591
1592 # This raises an error for legacy dataset
1593 with pytest.raises(ValueError):
1594 pq.read_table(
1595 str(root_path), partitioning=part, use_legacy_dataset=True)
1596
1597 with pytest.raises(ValueError):
1598 pq.ParquetDataset(
1599 str(root_path), partitioning=part, use_legacy_dataset=True)
1600
1601
1602 @pytest.mark.dataset
1603 def test_parquet_dataset_new_filesystem(tempdir):
1604 # Ensure we can pass new FileSystem object to ParquetDataset
1605 # (use new implementation automatically without specifying
1606 # use_legacy_dataset=False)
1607 table = pa.table({'a': [1, 2, 3]})
1608 pq.write_table(table, tempdir / 'data.parquet')
1609 # don't use simple LocalFileSystem (as that gets mapped to legacy one)
1610 filesystem = fs.SubTreeFileSystem(str(tempdir), fs.LocalFileSystem())
1611 dataset = pq.ParquetDataset('.', filesystem=filesystem)
1612 result = dataset.read()
1613 assert result.equals(table)
1614
1615
1616 @pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
1617 def test_parquet_dataset_partitions_piece_path_with_fsspec(tempdir):
1618 # ARROW-10462 ensure that on Windows we properly use posix-style paths
1619 # as used by fsspec
1620 fsspec = pytest.importorskip("fsspec")
1621 filesystem = fsspec.filesystem('file')
1622 table = pa.table({'a': [1, 2, 3]})
1623 pq.write_table(table, tempdir / 'data.parquet')
1624
1625 # pass a posix-style path (using "/" also on Windows)
1626 path = str(tempdir).replace("\\", "/")
1627 dataset = pq.ParquetDataset(path, filesystem=filesystem)
1628 # ensure the piece path is also posix-style
1629 expected = path + "/data.parquet"
1630 assert dataset.pieces[0].path == expected
1631
1632
1633 @pytest.mark.dataset
1634 def test_parquet_dataset_deprecated_properties(tempdir):
1635 table = pa.table({'a': [1, 2, 3]})
1636 path = tempdir / 'data.parquet'
1637 pq.write_table(table, path)
1638 dataset = pq.ParquetDataset(path)
1639
1640 with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
1641 dataset.pieces
1642
1643 with pytest.warns(DeprecationWarning, match="'ParquetDataset.partitions"):
1644 dataset.partitions
1645
1646 with pytest.warns(DeprecationWarning, match="'ParquetDataset.memory_map"):
1647 dataset.memory_map
1648
1649 with pytest.warns(DeprecationWarning, match="'ParquetDataset.read_dictio"):
1650 dataset.read_dictionary
1651
1652 with pytest.warns(DeprecationWarning, match="'ParquetDataset.buffer_size"):
1653 dataset.buffer_size
1654
1655 with pytest.warns(DeprecationWarning, match="'ParquetDataset.fs"):
1656 dataset.fs
1657
1658 dataset2 = pq.ParquetDataset(path, use_legacy_dataset=False)
1659
1660 with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
1661 dataset2.pieces