1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
25 from pyarrow
import fs
26 from pyarrow
.filesystem
import LocalFileSystem
27 from pyarrow
.tests
import util
28 from pyarrow
.tests
.parquet
.common
import (
29 parametrize_legacy_dataset
, parametrize_legacy_dataset_fixed
,
30 parametrize_legacy_dataset_not_supported
)
31 from pyarrow
.util
import guid
32 from pyarrow
.vendored
.version
import Version
35 import pyarrow
.parquet
as pq
36 from pyarrow
.tests
.parquet
.common
import (
37 _read_table
, _test_dataframe
, _write_table
)
44 import pandas
.testing
as tm
49 pytestmark
= pytest
.mark
.parquet
53 def test_parquet_piece_read(tempdir
):
54 df
= _test_dataframe(1000)
55 table
= pa
.Table
.from_pandas(df
)
57 path
= tempdir
/ 'parquet_piece_read.parquet'
58 _write_table(table
, path
, version
='2.6')
60 with pytest
.warns(DeprecationWarning):
61 piece1
= pq
.ParquetDatasetPiece(path
)
63 result
= piece1
.read()
64 assert result
.equals(table
)
68 def test_parquet_piece_open_and_get_metadata(tempdir
):
69 df
= _test_dataframe(100)
70 table
= pa
.Table
.from_pandas(df
)
72 path
= tempdir
/ 'parquet_piece_read.parquet'
73 _write_table(table
, path
, version
='2.6')
75 with pytest
.warns(DeprecationWarning):
76 piece
= pq
.ParquetDatasetPiece(path
)
78 assert isinstance(table1
, pa
.Table
)
79 meta1
= piece
.get_metadata()
80 assert isinstance(meta1
, pq
.FileMetaData
)
82 assert table
.equals(table1
)
85 @pytest.mark
.filterwarnings("ignore:ParquetDatasetPiece:DeprecationWarning")
86 def test_parquet_piece_basics():
89 piece1
= pq
.ParquetDatasetPiece(path
)
90 piece2
= pq
.ParquetDatasetPiece(path
, row_group
=1)
91 piece3
= pq
.ParquetDatasetPiece(
92 path
, row_group
=1, partition_keys
=[('foo', 0), ('bar', 1)])
94 assert str(piece1
) == path
95 assert str(piece2
) == '/baz.parq | row_group=1'
96 assert str(piece3
) == 'partition[foo=0, bar=1] /baz.parq | row_group=1'
98 assert piece1
== piece1
99 assert piece2
== piece2
100 assert piece3
== piece3
101 assert piece1
!= piece3
104 def test_partition_set_dictionary_type():
105 set1
= pq
.PartitionSet('key1', ['foo', 'bar', 'baz'])
106 set2
= pq
.PartitionSet('key2', [2007, 2008, 2009])
108 assert isinstance(set1
.dictionary
, pa
.StringArray
)
109 assert isinstance(set2
.dictionary
, pa
.IntegerArray
)
111 set3
= pq
.PartitionSet('key2', [datetime
.datetime(2007, 1, 1)])
112 with pytest
.raises(TypeError):
116 @parametrize_legacy_dataset_fixed
117 def test_filesystem_uri(tempdir
, use_legacy_dataset
):
118 table
= pa
.table({"a": [1, 2, 3]})
120 directory
= tempdir
/ "data_dir"
122 path
= directory
/ "data.parquet"
123 pq
.write_table(table
, str(path
))
126 result
= pq
.read_table(
127 path
, filesystem
=fs
.LocalFileSystem(),
128 use_legacy_dataset
=use_legacy_dataset
)
129 assert result
.equals(table
)
132 result
= pq
.read_table(
133 "data_dir/data.parquet", filesystem
=util
._filesystem
_uri
(tempdir
),
134 use_legacy_dataset
=use_legacy_dataset
)
135 assert result
.equals(table
)
139 @parametrize_legacy_dataset
140 def test_read_partitioned_directory(tempdir
, use_legacy_dataset
):
141 fs
= LocalFileSystem
._get
_instance
()
142 _partition_test_for_filesystem(fs
, tempdir
, use_legacy_dataset
)
145 @pytest.mark
.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
147 def test_create_parquet_dataset_multi_threaded(tempdir
):
148 fs
= LocalFileSystem
._get
_instance
()
151 _partition_test_for_filesystem(fs
, base_path
)
153 manifest
= pq
.ParquetManifest(base_path
, filesystem
=fs
,
155 dataset
= pq
.ParquetDataset(base_path
, filesystem
=fs
, metadata_nthreads
=16)
156 assert len(dataset
.pieces
) > 0
157 partitions
= dataset
.partitions
158 assert len(partitions
.partition_names
) > 0
159 assert partitions
.partition_names
== manifest
.partitions
.partition_names
160 assert len(partitions
.levels
) == len(manifest
.partitions
.levels
)
164 @parametrize_legacy_dataset
165 def test_read_partitioned_columns_selection(tempdir
, use_legacy_dataset
):
166 # ARROW-3861 - do not include partition columns in resulting table when
167 # `columns` keyword was passed without those columns
168 fs
= LocalFileSystem
._get
_instance
()
170 _partition_test_for_filesystem(fs
, base_path
)
172 dataset
= pq
.ParquetDataset(
173 base_path
, use_legacy_dataset
=use_legacy_dataset
)
174 result
= dataset
.read(columns
=["values"])
175 if use_legacy_dataset
:
176 # ParquetDataset implementation always includes the partition columns
177 # automatically, and we can't easily "fix" this since dask relies on
178 # this behaviour (ARROW-8644)
179 assert result
.column_names
== ["values", "foo", "bar"]
181 assert result
.column_names
== ["values"]
185 @parametrize_legacy_dataset
186 def test_filters_equivalency(tempdir
, use_legacy_dataset
):
187 fs
= LocalFileSystem
._get
_instance
()
190 integer_keys
= [0, 1]
191 string_keys
= ['a', 'b', 'c']
192 boolean_keys
= [True, False]
194 ['integer', integer_keys
],
195 ['string', string_keys
],
196 ['boolean', boolean_keys
]
200 'integer': np
.array(integer_keys
, dtype
='i4').repeat(15),
201 'string': np
.tile(np
.tile(np
.array(string_keys
, dtype
=object), 5), 2),
202 'boolean': np
.tile(np
.tile(np
.array(boolean_keys
, dtype
='bool'), 5),
204 }, columns
=['integer', 'string', 'boolean'])
206 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
208 # Old filters syntax:
209 # integer == 1 AND string != b AND boolean == True
210 dataset
= pq
.ParquetDataset(
211 base_path
, filesystem
=fs
,
212 filters
=[('integer', '=', 1), ('string', '!=', 'b'),
213 ('boolean', '==', 'True')],
214 use_legacy_dataset
=use_legacy_dataset
,
216 table
= dataset
.read()
217 result_df
= (table
.to_pandas().reset_index(drop
=True))
219 assert 0 not in result_df
['integer'].values
220 assert 'b' not in result_df
['string'].values
221 assert False not in result_df
['boolean'].values
223 # filters in disjunctive normal form:
224 # (integer == 1 AND string != b AND boolean == True) OR
225 # (integer == 2 AND boolean == False)
226 # TODO(ARROW-3388): boolean columns are reconstructed as string
230 ('string', '!=', 'b'),
231 ('boolean', '==', 'True')
233 [('integer', '=', 0), ('boolean', '==', 'False')]
235 dataset
= pq
.ParquetDataset(
236 base_path
, filesystem
=fs
, filters
=filters
,
237 use_legacy_dataset
=use_legacy_dataset
)
238 table
= dataset
.read()
239 result_df
= table
.to_pandas().reset_index(drop
=True)
241 # Check that all rows in the DF fulfill the filter
242 # Pandas 0.23.x has problems with indexing constant memoryviews in
243 # categoricals. Thus we need to make an explicit copy here with np.array.
244 df_filter_1
= (np
.array(result_df
['integer']) == 1) \
245 & (np
.array(result_df
['string']) != 'b') \
246 & (np
.array(result_df
['boolean']) == 'True')
247 df_filter_2
= (np
.array(result_df
['integer']) == 0) \
248 & (np
.array(result_df
['boolean']) == 'False')
249 assert df_filter_1
.sum() > 0
250 assert df_filter_2
.sum() > 0
251 assert result_df
.shape
[0] == (df_filter_1
.sum() + df_filter_2
.sum())
253 if use_legacy_dataset
:
254 # Check for \0 in predicate values. Until they are correctly
255 # implemented in ARROW-3391, they would otherwise lead to weird
256 # results with the current code.
257 with pytest
.raises(NotImplementedError):
258 filters
= [[('string', '==', b
'1\0a')]]
259 pq
.ParquetDataset(base_path
, filesystem
=fs
, filters
=filters
)
260 with pytest
.raises(NotImplementedError):
261 filters
= [[('string', '==', '1\0a')]]
262 pq
.ParquetDataset(base_path
, filesystem
=fs
, filters
=filters
)
264 for filters
in [[[('string', '==', b
'1\0a')]],
265 [[('string', '==', '1\0a')]]]:
266 dataset
= pq
.ParquetDataset(
267 base_path
, filesystem
=fs
, filters
=filters
,
268 use_legacy_dataset
=False)
269 assert dataset
.read().num_rows
== 0
273 @parametrize_legacy_dataset
274 def test_filters_cutoff_exclusive_integer(tempdir
, use_legacy_dataset
):
275 fs
= LocalFileSystem
._get
_instance
()
278 integer_keys
= [0, 1, 2, 3, 4]
280 ['integers', integer_keys
],
285 'index': np
.arange(N
),
286 'integers': np
.array(integer_keys
, dtype
='i4'),
287 }, columns
=['index', 'integers'])
289 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
291 dataset
= pq
.ParquetDataset(
292 base_path
, filesystem
=fs
,
294 ('integers', '<', 4),
295 ('integers', '>', 1),
297 use_legacy_dataset
=use_legacy_dataset
299 table
= dataset
.read()
300 result_df
= (table
.to_pandas()
301 .sort_values(by
='index')
302 .reset_index(drop
=True))
304 result_list
= [x
for x
in map(int, result_df
['integers'].values
)]
305 assert result_list
== [2, 3]
309 @parametrize_legacy_dataset
311 # different error with use_legacy_datasets because result_df is no longer
313 raises
=(TypeError, AssertionError),
314 reason
='Loss of type information in creation of categoricals.'
316 def test_filters_cutoff_exclusive_datetime(tempdir
, use_legacy_dataset
):
317 fs
= LocalFileSystem
._get
_instance
()
321 datetime
.date(2018, 4, 9),
322 datetime
.date(2018, 4, 10),
323 datetime
.date(2018, 4, 11),
324 datetime
.date(2018, 4, 12),
325 datetime
.date(2018, 4, 13)
333 'index': np
.arange(N
),
334 'dates': np
.array(date_keys
, dtype
='datetime64'),
335 }, columns
=['index', 'dates'])
337 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
339 dataset
= pq
.ParquetDataset(
340 base_path
, filesystem
=fs
,
342 ('dates', '<', "2018-04-12"),
343 ('dates', '>', "2018-04-10")
345 use_legacy_dataset
=use_legacy_dataset
347 table
= dataset
.read()
348 result_df
= (table
.to_pandas()
349 .sort_values(by
='index')
350 .reset_index(drop
=True))
352 expected
= pd
.Categorical(
353 np
.array([datetime
.date(2018, 4, 11)], dtype
='datetime64'),
354 categories
=np
.array(date_keys
, dtype
='datetime64'))
356 assert result_df
['dates'].values
== expected
361 def test_filters_inclusive_datetime(tempdir
):
363 path
= tempdir
/ 'timestamps.parquet'
366 "dates": pd
.date_range("2020-01-01", periods
=10, freq
="D"),
368 }).to_parquet(path
, use_deprecated_int96_timestamps
=True)
370 table
= pq
.read_table(path
, filters
=[
371 ("dates", "<=", datetime
.datetime(2020, 1, 5))
374 assert table
.column('id').to_pylist() == [0, 1, 2, 3, 4]
378 @parametrize_legacy_dataset
379 def test_filters_inclusive_integer(tempdir
, use_legacy_dataset
):
380 fs
= LocalFileSystem
._get
_instance
()
383 integer_keys
= [0, 1, 2, 3, 4]
385 ['integers', integer_keys
],
390 'index': np
.arange(N
),
391 'integers': np
.array(integer_keys
, dtype
='i4'),
392 }, columns
=['index', 'integers'])
394 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
396 dataset
= pq
.ParquetDataset(
397 base_path
, filesystem
=fs
,
399 ('integers', '<=', 3),
400 ('integers', '>=', 2),
402 use_legacy_dataset
=use_legacy_dataset
404 table
= dataset
.read()
405 result_df
= (table
.to_pandas()
406 .sort_values(by
='index')
407 .reset_index(drop
=True))
409 result_list
= [int(x
) for x
in map(int, result_df
['integers'].values
)]
410 assert result_list
== [2, 3]
414 @parametrize_legacy_dataset
415 def test_filters_inclusive_set(tempdir
, use_legacy_dataset
):
416 fs
= LocalFileSystem
._get
_instance
()
419 integer_keys
= [0, 1]
420 string_keys
= ['a', 'b', 'c']
421 boolean_keys
= [True, False]
423 ['integer', integer_keys
],
424 ['string', string_keys
],
425 ['boolean', boolean_keys
]
429 'integer': np
.array(integer_keys
, dtype
='i4').repeat(15),
430 'string': np
.tile(np
.tile(np
.array(string_keys
, dtype
=object), 5), 2),
431 'boolean': np
.tile(np
.tile(np
.array(boolean_keys
, dtype
='bool'), 5),
433 }, columns
=['integer', 'string', 'boolean'])
435 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
437 dataset
= pq
.ParquetDataset(
438 base_path
, filesystem
=fs
,
439 filters
=[('string', 'in', 'ab')],
440 use_legacy_dataset
=use_legacy_dataset
442 table
= dataset
.read()
443 result_df
= (table
.to_pandas().reset_index(drop
=True))
445 assert 'a' in result_df
['string'].values
446 assert 'b' in result_df
['string'].values
447 assert 'c' not in result_df
['string'].values
449 dataset
= pq
.ParquetDataset(
450 base_path
, filesystem
=fs
,
451 filters
=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')),
452 ('boolean', 'not in', {False}
)],
453 use_legacy_dataset
=use_legacy_dataset
455 table
= dataset
.read()
456 result_df
= (table
.to_pandas().reset_index(drop
=True))
458 assert 0 not in result_df
['integer'].values
459 assert 'c' not in result_df
['string'].values
460 assert False not in result_df
['boolean'].values
464 @parametrize_legacy_dataset
465 def test_filters_invalid_pred_op(tempdir
, use_legacy_dataset
):
466 fs
= LocalFileSystem
._get
_instance
()
469 integer_keys
= [0, 1, 2, 3, 4]
471 ['integers', integer_keys
],
476 'index': np
.arange(N
),
477 'integers': np
.array(integer_keys
, dtype
='i4'),
478 }, columns
=['index', 'integers'])
480 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
482 with pytest
.raises(TypeError):
483 pq
.ParquetDataset(base_path
,
485 filters
=[('integers', 'in', 3), ],
486 use_legacy_dataset
=use_legacy_dataset
)
488 with pytest
.raises(ValueError):
489 pq
.ParquetDataset(base_path
,
491 filters
=[('integers', '=<', 3), ],
492 use_legacy_dataset
=use_legacy_dataset
)
494 if use_legacy_dataset
:
495 with pytest
.raises(ValueError):
496 pq
.ParquetDataset(base_path
,
498 filters
=[('integers', 'in', set()), ],
499 use_legacy_dataset
=use_legacy_dataset
)
501 # Dataset API returns empty table instead
502 dataset
= pq
.ParquetDataset(base_path
,
504 filters
=[('integers', 'in', set()), ],
505 use_legacy_dataset
=use_legacy_dataset
)
506 assert dataset
.read().num_rows
== 0
508 if use_legacy_dataset
:
509 with pytest
.raises(ValueError):
510 pq
.ParquetDataset(base_path
,
512 filters
=[('integers', '!=', {3}
)],
513 use_legacy_dataset
=use_legacy_dataset
)
515 dataset
= pq
.ParquetDataset(base_path
,
517 filters
=[('integers', '!=', {3}
)],
518 use_legacy_dataset
=use_legacy_dataset
)
519 with pytest
.raises(NotImplementedError):
520 assert dataset
.read().num_rows
== 0
524 @parametrize_legacy_dataset_fixed
525 def test_filters_invalid_column(tempdir
, use_legacy_dataset
):
526 # ARROW-5572 - raise error on invalid name in filter specification
527 # works with new dataset / xfail with legacy implementation
528 fs
= LocalFileSystem
._get
_instance
()
531 integer_keys
= [0, 1, 2, 3, 4]
532 partition_spec
= [['integers', integer_keys
]]
536 'index': np
.arange(N
),
537 'integers': np
.array(integer_keys
, dtype
='i4'),
538 }, columns
=['index', 'integers'])
540 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
542 msg
= r
"No match for FieldRef.Name\(non_existent_column\)"
543 with pytest
.raises(ValueError, match
=msg
):
544 pq
.ParquetDataset(base_path
, filesystem
=fs
,
545 filters
=[('non_existent_column', '<', 3), ],
546 use_legacy_dataset
=use_legacy_dataset
).read()
550 @parametrize_legacy_dataset
551 def test_filters_read_table(tempdir
, use_legacy_dataset
):
552 # test that filters keyword is passed through in read_table
553 fs
= LocalFileSystem
._get
_instance
()
556 integer_keys
= [0, 1, 2, 3, 4]
558 ['integers', integer_keys
],
563 'index': np
.arange(N
),
564 'integers': np
.array(integer_keys
, dtype
='i4'),
565 }, columns
=['index', 'integers'])
567 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
569 table
= pq
.read_table(
570 base_path
, filesystem
=fs
, filters
=[('integers', '<', 3)],
571 use_legacy_dataset
=use_legacy_dataset
)
572 assert table
.num_rows
== 3
574 table
= pq
.read_table(
575 base_path
, filesystem
=fs
, filters
=[[('integers', '<', 3)]],
576 use_legacy_dataset
=use_legacy_dataset
)
577 assert table
.num_rows
== 3
579 table
= pq
.read_pandas(
580 base_path
, filters
=[('integers', '<', 3)],
581 use_legacy_dataset
=use_legacy_dataset
)
582 assert table
.num_rows
== 3
586 @parametrize_legacy_dataset_fixed
587 def test_partition_keys_with_underscores(tempdir
, use_legacy_dataset
):
588 # ARROW-5666 - partition field values with underscores preserve underscores
589 # xfail with legacy dataset -> they get interpreted as integers
590 fs
= LocalFileSystem
._get
_instance
()
593 string_keys
= ["2019_2", "2019_3"]
595 ['year_week', string_keys
],
600 'index': np
.arange(N
),
601 'year_week': np
.array(string_keys
, dtype
='object'),
602 }, columns
=['index', 'year_week'])
604 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
606 dataset
= pq
.ParquetDataset(
607 base_path
, use_legacy_dataset
=use_legacy_dataset
)
608 result
= dataset
.read()
609 assert result
.column("year_week").to_pylist() == string_keys
613 @parametrize_legacy_dataset
614 def test_read_s3fs(s3_example_s3fs
, use_legacy_dataset
):
615 fs
, path
= s3_example_s3fs
616 path
= path
+ "/test.parquet"
617 table
= pa
.table({"a": [1, 2, 3]})
618 _write_table(table
, path
, filesystem
=fs
)
620 result
= _read_table(
621 path
, filesystem
=fs
, use_legacy_dataset
=use_legacy_dataset
623 assert result
.equals(table
)
627 @parametrize_legacy_dataset
628 def test_read_directory_s3fs(s3_example_s3fs
, use_legacy_dataset
):
629 fs
, directory
= s3_example_s3fs
630 path
= directory
+ "/test.parquet"
631 table
= pa
.table({"a": [1, 2, 3]})
632 _write_table(table
, path
, filesystem
=fs
)
634 result
= _read_table(
635 directory
, filesystem
=fs
, use_legacy_dataset
=use_legacy_dataset
637 assert result
.equals(table
)
642 @parametrize_legacy_dataset
643 def test_read_partitioned_directory_s3fs_wrapper(
644 s3_example_s3fs
, use_legacy_dataset
648 from pyarrow
.filesystem
import S3FSWrapper
650 if Version(s3fs
.__version
__) >= Version("0.5"):
651 pytest
.skip("S3FSWrapper no longer working for s3fs 0.5+")
653 fs
, path
= s3_example_s3fs
654 with pytest
.warns(FutureWarning
):
655 wrapper
= S3FSWrapper(fs
)
656 _partition_test_for_filesystem(wrapper
, path
)
658 # Check that we can auto-wrap
659 dataset
= pq
.ParquetDataset(
660 path
, filesystem
=fs
, use_legacy_dataset
=use_legacy_dataset
667 @parametrize_legacy_dataset
668 def test_read_partitioned_directory_s3fs(s3_example_s3fs
, use_legacy_dataset
):
669 fs
, path
= s3_example_s3fs
670 _partition_test_for_filesystem(
671 fs
, path
, use_legacy_dataset
=use_legacy_dataset
675 def _partition_test_for_filesystem(fs
, base_path
, use_legacy_dataset
=True):
677 bar_keys
= ['a', 'b', 'c']
685 'index': np
.arange(N
),
686 'foo': np
.array(foo_keys
, dtype
='i4').repeat(15),
687 'bar': np
.tile(np
.tile(np
.array(bar_keys
, dtype
=object), 5), 2),
688 'values': np
.random
.randn(N
)
689 }, columns
=['index', 'foo', 'bar', 'values'])
691 _generate_partition_directories(fs
, base_path
, partition_spec
, df
)
693 dataset
= pq
.ParquetDataset(
694 base_path
, filesystem
=fs
, use_legacy_dataset
=use_legacy_dataset
)
695 table
= dataset
.read()
696 result_df
= (table
.to_pandas()
697 .sort_values(by
='index')
698 .reset_index(drop
=True))
700 expected_df
= (df
.sort_values(by
='index')
701 .reset_index(drop
=True)
702 .reindex(columns
=result_df
.columns
))
704 expected_df
['foo'] = pd
.Categorical(df
['foo'], categories
=foo_keys
)
705 expected_df
['bar'] = pd
.Categorical(df
['bar'], categories
=bar_keys
)
707 assert (result_df
.columns
== ['index', 'values', 'foo', 'bar']).all()
709 tm
.assert_frame_equal(result_df
, expected_df
)
712 def _generate_partition_directories(fs
, base_dir
, partition_spec
, df
):
713 # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
714 # ['bar', ['a', 'b', 'c']]
715 # part_table : a pyarrow.Table to write to each partition
716 DEPTH
= len(partition_spec
)
718 pathsep
= getattr(fs
, "pathsep", getattr(fs
, "sep", "/"))
720 def _visit_level(base_dir
, level
, part_keys
):
721 name
, values
= partition_spec
[level
]
723 this_part_keys
= part_keys
+ [(name
, value
)]
725 level_dir
= pathsep
.join([
727 '{}={}'.format(name
, value
)
731 if level
== DEPTH
- 1:
732 # Generate example data
733 file_path
= pathsep
.join([level_dir
, guid()])
734 filtered_df
= _filter_partition(df
, this_part_keys
)
735 part_table
= pa
.Table
.from_pandas(filtered_df
)
736 with fs
.open(file_path
, 'wb') as f
:
737 _write_table(part_table
, f
)
738 assert fs
.exists(file_path
)
740 file_success
= pathsep
.join([level_dir
, '_SUCCESS'])
741 with fs
.open(file_success
, 'wb') as f
:
744 _visit_level(level_dir
, level
+ 1, this_part_keys
)
745 file_success
= pathsep
.join([level_dir
, '_SUCCESS'])
746 with fs
.open(file_success
, 'wb') as f
:
749 _visit_level(base_dir
, 0, [])
752 def _test_read_common_metadata_files(fs
, base_path
):
755 import pyarrow
.parquet
as pq
759 'index': np
.arange(N
),
760 'values': np
.random
.randn(N
)
761 }, columns
=['index', 'values'])
763 base_path
= str(base_path
)
764 data_path
= os
.path
.join(base_path
, 'data.parquet')
766 table
= pa
.Table
.from_pandas(df
)
768 with fs
.open(data_path
, 'wb') as f
:
769 _write_table(table
, f
)
771 metadata_path
= os
.path
.join(base_path
, '_common_metadata')
772 with fs
.open(metadata_path
, 'wb') as f
:
773 pq
.write_metadata(table
.schema
, f
)
775 dataset
= pq
.ParquetDataset(base_path
, filesystem
=fs
)
776 assert dataset
.common_metadata_path
== str(metadata_path
)
778 with fs
.open(data_path
) as f
:
779 common_schema
= pq
.read_metadata(f
).schema
780 assert dataset
.schema
.equals(common_schema
)
782 # handle list of one directory
783 dataset2
= pq
.ParquetDataset([base_path
], filesystem
=fs
)
784 assert dataset2
.schema
.equals(dataset
.schema
)
788 def test_read_common_metadata_files(tempdir
):
789 fs
= LocalFileSystem
._get
_instance
()
790 _test_read_common_metadata_files(fs
, tempdir
)
794 def test_read_metadata_files(tempdir
):
795 fs
= LocalFileSystem
._get
_instance
()
799 'index': np
.arange(N
),
800 'values': np
.random
.randn(N
)
801 }, columns
=['index', 'values'])
803 data_path
= tempdir
/ 'data.parquet'
805 table
= pa
.Table
.from_pandas(df
)
807 with fs
.open(data_path
, 'wb') as f
:
808 _write_table(table
, f
)
810 metadata_path
= tempdir
/ '_metadata'
811 with fs
.open(metadata_path
, 'wb') as f
:
812 pq
.write_metadata(table
.schema
, f
)
814 dataset
= pq
.ParquetDataset(tempdir
, filesystem
=fs
)
815 assert dataset
.metadata_path
== str(metadata_path
)
817 with fs
.open(data_path
) as f
:
818 metadata_schema
= pq
.read_metadata(f
).schema
819 assert dataset
.schema
.equals(metadata_schema
)
822 def _filter_partition(df
, part_keys
):
823 predicate
= np
.ones(len(df
), dtype
=bool)
826 for name
, value
in part_keys
:
829 # to avoid pandas warning
830 if isinstance(value
, (datetime
.date
, datetime
.datetime
)):
831 value
= pd
.Timestamp(value
)
833 predicate
&= df
[name
] == value
835 return df
[predicate
].drop(to_drop
, axis
=1)
838 @parametrize_legacy_dataset
840 def test_filter_before_validate_schema(tempdir
, use_legacy_dataset
):
841 # ARROW-4076 apply filter before schema validation
842 # to avoid checking unneeded schemas
844 # create partitioned dataset with mismatching schemas which would
845 # otherwise raise if first validation all schemas
846 dir1
= tempdir
/ 'A=0'
848 table1
= pa
.Table
.from_pandas(pd
.DataFrame({'B': [1, 2, 3]}))
849 pq
.write_table(table1
, dir1
/ 'data.parquet')
851 dir2
= tempdir
/ 'A=1'
853 table2
= pa
.Table
.from_pandas(pd
.DataFrame({'B': ['a', 'b', 'c']}))
854 pq
.write_table(table2
, dir2
/ 'data.parquet')
856 # read single file using filter
857 table
= pq
.read_table(tempdir
, filters
=[[('A', '==', 0)]],
858 use_legacy_dataset
=use_legacy_dataset
)
859 assert table
.column('B').equals(pa
.chunked_array([[1, 2, 3]]))
863 @parametrize_legacy_dataset
864 def test_read_multiple_files(tempdir
, use_legacy_dataset
):
868 dirpath
= tempdir
/ guid()
873 for i
in range(nfiles
):
874 df
= _test_dataframe(size
, seed
=i
)
876 # Hack so that we don't have a dtype cast in v1 files
877 df
['uint32'] = df
['uint32'].astype(np
.int64
)
879 path
= dirpath
/ '{}.parquet'.format(i
)
881 table
= pa
.Table
.from_pandas(df
)
882 _write_table(table
, path
)
884 test_data
.append(table
)
887 # Write a _SUCCESS.crc file
888 (dirpath
/ '_SUCCESS.crc').touch()
890 def read_multiple_files(paths
, columns
=None, use_threads
=True, **kwargs
):
891 dataset
= pq
.ParquetDataset(
892 paths
, use_legacy_dataset
=use_legacy_dataset
, **kwargs
)
893 return dataset
.read(columns
=columns
, use_threads
=use_threads
)
895 result
= read_multiple_files(paths
)
896 expected
= pa
.concat_tables(test_data
)
898 assert result
.equals(expected
)
900 # Read with provided metadata
901 # TODO(dataset) specifying metadata not yet supported
902 metadata
= pq
.read_metadata(paths
[0])
903 if use_legacy_dataset
:
904 result2
= read_multiple_files(paths
, metadata
=metadata
)
905 assert result2
.equals(expected
)
907 result3
= pq
.ParquetDataset(dirpath
, schema
=metadata
.schema
).read()
908 assert result3
.equals(expected
)
910 with pytest
.raises(ValueError, match
="no longer supported"):
911 pq
.read_table(paths
, metadata
=metadata
, use_legacy_dataset
=False)
914 to_read
= [0, 2, 6, result
.num_columns
- 1]
916 col_names
= [result
.field(i
).name
for i
in to_read
]
918 dirpath
, columns
=col_names
, use_legacy_dataset
=use_legacy_dataset
920 expected
= pa
.Table
.from_arrays([result
.column(i
) for i
in to_read
],
922 metadata
=result
.schema
.metadata
)
923 assert out
.equals(expected
)
925 # Read with multiple threads
927 dirpath
, use_threads
=True, use_legacy_dataset
=use_legacy_dataset
930 # Test failure modes with non-uniform metadata
931 bad_apple
= _test_dataframe(size
, seed
=i
).iloc
[:, :4]
932 bad_apple_path
= tempdir
/ '{}.parquet'.format(guid())
934 t
= pa
.Table
.from_pandas(bad_apple
)
935 _write_table(t
, bad_apple_path
)
937 if not use_legacy_dataset
:
938 # TODO(dataset) Dataset API skips bad files
941 bad_meta
= pq
.read_metadata(bad_apple_path
)
943 with pytest
.raises(ValueError):
944 read_multiple_files(paths
+ [bad_apple_path
])
946 with pytest
.raises(ValueError):
947 read_multiple_files(paths
, metadata
=bad_meta
)
949 mixed_paths
= [bad_apple_path
, paths
[0]]
951 with pytest
.raises(ValueError):
952 read_multiple_files(mixed_paths
, schema
=bad_meta
.schema
)
954 with pytest
.raises(ValueError):
955 read_multiple_files(mixed_paths
)
959 @parametrize_legacy_dataset
960 def test_dataset_read_pandas(tempdir
, use_legacy_dataset
):
964 dirpath
= tempdir
/ guid()
970 for i
in range(nfiles
):
971 df
= _test_dataframe(size
, seed
=i
)
972 df
.index
= np
.arange(i
* size
, (i
+ 1) * size
)
973 df
.index
.name
= 'index'
975 path
= dirpath
/ '{}.parquet'.format(i
)
977 table
= pa
.Table
.from_pandas(df
)
978 _write_table(table
, path
)
979 test_data
.append(table
)
983 dataset
= pq
.ParquetDataset(dirpath
, use_legacy_dataset
=use_legacy_dataset
)
984 columns
= ['uint8', 'strings']
985 result
= dataset
.read_pandas(columns
=columns
).to_pandas()
986 expected
= pd
.concat([x
[columns
] for x
in frames
])
988 tm
.assert_frame_equal(result
, expected
)
990 # also be able to pass the columns as a set (ARROW-12314)
991 result
= dataset
.read_pandas(columns
=set(columns
)).to_pandas()
992 assert result
.shape
== expected
.shape
993 # column order can be different because of using a set
994 tm
.assert_frame_equal(result
.reindex(columns
=expected
.columns
), expected
)
997 @pytest.mark
.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
999 @parametrize_legacy_dataset
1000 def test_dataset_memory_map(tempdir
, use_legacy_dataset
):
1001 # ARROW-2627: Check that we can use ParquetDataset with memory-mapping
1002 dirpath
= tempdir
/ guid()
1005 df
= _test_dataframe(10, seed
=0)
1006 path
= dirpath
/ '{}.parquet'.format(0)
1007 table
= pa
.Table
.from_pandas(df
)
1008 _write_table(table
, path
, version
='2.6')
1010 dataset
= pq
.ParquetDataset(
1011 dirpath
, memory_map
=True, use_legacy_dataset
=use_legacy_dataset
)
1012 assert dataset
.read().equals(table
)
1013 if use_legacy_dataset
:
1014 assert dataset
.pieces
[0].read().equals(table
)
1018 @parametrize_legacy_dataset
1019 def test_dataset_enable_buffered_stream(tempdir
, use_legacy_dataset
):
1020 dirpath
= tempdir
/ guid()
1023 df
= _test_dataframe(10, seed
=0)
1024 path
= dirpath
/ '{}.parquet'.format(0)
1025 table
= pa
.Table
.from_pandas(df
)
1026 _write_table(table
, path
, version
='2.6')
1028 with pytest
.raises(ValueError):
1030 dirpath
, buffer_size
=-64,
1031 use_legacy_dataset
=use_legacy_dataset
)
1033 for buffer_size
in [128, 1024]:
1034 dataset
= pq
.ParquetDataset(
1035 dirpath
, buffer_size
=buffer_size
,
1036 use_legacy_dataset
=use_legacy_dataset
)
1037 assert dataset
.read().equals(table
)
1041 @parametrize_legacy_dataset
1042 def test_dataset_enable_pre_buffer(tempdir
, use_legacy_dataset
):
1043 dirpath
= tempdir
/ guid()
1046 df
= _test_dataframe(10, seed
=0)
1047 path
= dirpath
/ '{}.parquet'.format(0)
1048 table
= pa
.Table
.from_pandas(df
)
1049 _write_table(table
, path
, version
='2.6')
1051 for pre_buffer
in (True, False):
1052 dataset
= pq
.ParquetDataset(
1053 dirpath
, pre_buffer
=pre_buffer
,
1054 use_legacy_dataset
=use_legacy_dataset
)
1055 assert dataset
.read().equals(table
)
1056 actual
= pq
.read_table(dirpath
, pre_buffer
=pre_buffer
,
1057 use_legacy_dataset
=use_legacy_dataset
)
1058 assert actual
.equals(table
)
1061 def _make_example_multifile_dataset(base_path
, nfiles
=10, file_nrows
=5):
1064 for i
in range(nfiles
):
1065 df
= _test_dataframe(file_nrows
, seed
=i
)
1066 path
= base_path
/ '{}.parquet'.format(i
)
1068 test_data
.append(_write_table(df
, path
))
1073 def _assert_dataset_paths(dataset
, paths
, use_legacy_dataset
):
1074 if use_legacy_dataset
:
1075 assert set(map(str, paths
)) == {x
.path
for x
in dataset
._pieces
}
1077 paths
= [str(path
.as_posix()) for path
in paths
]
1078 assert set(paths
) == set(dataset
._dataset
.files
)
1082 @parametrize_legacy_dataset
1083 @pytest.mark
.parametrize('dir_prefix', ['_', '.'])
1084 def test_ignore_private_directories(tempdir
, dir_prefix
, use_legacy_dataset
):
1085 dirpath
= tempdir
/ guid()
1088 paths
= _make_example_multifile_dataset(dirpath
, nfiles
=10,
1092 (dirpath
/ '{}staging'.format(dir_prefix
)).mkdir()
1094 dataset
= pq
.ParquetDataset(dirpath
, use_legacy_dataset
=use_legacy_dataset
)
1096 _assert_dataset_paths(dataset
, paths
, use_legacy_dataset
)
1100 @parametrize_legacy_dataset
1101 def test_ignore_hidden_files_dot(tempdir
, use_legacy_dataset
):
1102 dirpath
= tempdir
/ guid()
1105 paths
= _make_example_multifile_dataset(dirpath
, nfiles
=10,
1108 with (dirpath
/ '.DS_Store').open('wb') as f
:
1109 f
.write(b
'gibberish')
1111 with (dirpath
/ '.private').open('wb') as f
:
1112 f
.write(b
'gibberish')
1114 dataset
= pq
.ParquetDataset(dirpath
, use_legacy_dataset
=use_legacy_dataset
)
1116 _assert_dataset_paths(dataset
, paths
, use_legacy_dataset
)
1120 @parametrize_legacy_dataset
1121 def test_ignore_hidden_files_underscore(tempdir
, use_legacy_dataset
):
1122 dirpath
= tempdir
/ guid()
1125 paths
= _make_example_multifile_dataset(dirpath
, nfiles
=10,
1128 with (dirpath
/ '_committed_123').open('wb') as f
:
1131 with (dirpath
/ '_started_321').open('wb') as f
:
1134 dataset
= pq
.ParquetDataset(dirpath
, use_legacy_dataset
=use_legacy_dataset
)
1136 _assert_dataset_paths(dataset
, paths
, use_legacy_dataset
)
1140 @parametrize_legacy_dataset
1141 @pytest.mark
.parametrize('dir_prefix', ['_', '.'])
1142 def test_ignore_no_private_directories_in_base_path(
1143 tempdir
, dir_prefix
, use_legacy_dataset
1145 # ARROW-8427 - don't ignore explicitly listed files if parent directory
1146 # is a private directory
1147 dirpath
= tempdir
/ "{0}data".format(dir_prefix
) / guid()
1148 dirpath
.mkdir(parents
=True)
1150 paths
= _make_example_multifile_dataset(dirpath
, nfiles
=10,
1153 dataset
= pq
.ParquetDataset(paths
, use_legacy_dataset
=use_legacy_dataset
)
1154 _assert_dataset_paths(dataset
, paths
, use_legacy_dataset
)
1156 # ARROW-9644 - don't ignore full directory with underscore in base path
1157 dataset
= pq
.ParquetDataset(dirpath
, use_legacy_dataset
=use_legacy_dataset
)
1158 _assert_dataset_paths(dataset
, paths
, use_legacy_dataset
)
1162 @parametrize_legacy_dataset_fixed
1163 def test_ignore_custom_prefixes(tempdir
, use_legacy_dataset
):
1164 # ARROW-9573 - allow override of default ignore_prefixes
1165 part
= ["xxx"] * 3 + ["yyy"] * 3
1167 pa
.array(range(len(part
))),
1168 pa
.array(part
).dictionary_encode(),
1169 ], names
=['index', '_part'])
1171 # TODO use_legacy_dataset ARROW-10247
1172 pq
.write_to_dataset(table
, str(tempdir
), partition_cols
=['_part'])
1174 private_duplicate
= tempdir
/ '_private_duplicate'
1175 private_duplicate
.mkdir()
1176 pq
.write_to_dataset(table
, str(private_duplicate
),
1177 partition_cols
=['_part'])
1179 read
= pq
.read_table(
1180 tempdir
, use_legacy_dataset
=use_legacy_dataset
,
1181 ignore_prefixes
=['_private'])
1183 assert read
.equals(table
)
1186 @parametrize_legacy_dataset_fixed
1187 def test_empty_directory(tempdir
, use_legacy_dataset
):
1188 # ARROW-5310 - reading empty directory
1189 # fails with legacy implementation
1190 empty_dir
= tempdir
/ 'dataset'
1193 dataset
= pq
.ParquetDataset(
1194 empty_dir
, use_legacy_dataset
=use_legacy_dataset
)
1195 result
= dataset
.read()
1196 assert result
.num_rows
== 0
1197 assert result
.num_columns
== 0
1200 def _test_write_to_dataset_with_partitions(base_path
,
1201 use_legacy_dataset
=True,
1206 import pandas
.testing
as tm
1208 import pyarrow
.parquet
as pq
1211 output_df
= pd
.DataFrame({'group1': list('aaabbbbccc'),
1212 'group2': list('eefeffgeee'),
1213 'num': list(range(10)),
1214 'nan': [np
.nan
] * 10,
1215 'date': np
.arange('2017-01-01', '2017-01-11',
1216 dtype
='datetime64[D]')})
1217 cols
= output_df
.columns
.tolist()
1218 partition_by
= ['group1', 'group2']
1219 output_table
= pa
.Table
.from_pandas(output_df
, schema
=schema
, safe
=False,
1220 preserve_index
=False)
1221 pq
.write_to_dataset(output_table
, base_path
, partition_by
,
1222 filesystem
=filesystem
,
1223 use_legacy_dataset
=use_legacy_dataset
)
1225 metadata_path
= os
.path
.join(str(base_path
), '_common_metadata')
1227 if filesystem
is not None:
1228 with filesystem
.open(metadata_path
, 'wb') as f
:
1229 pq
.write_metadata(output_table
.schema
, f
)
1231 pq
.write_metadata(output_table
.schema
, metadata_path
)
1233 # ARROW-2891: Ensure the output_schema is preserved when writing a
1234 # partitioned dataset
1235 dataset
= pq
.ParquetDataset(base_path
,
1236 filesystem
=filesystem
,
1237 validate_schema
=True,
1238 use_legacy_dataset
=use_legacy_dataset
)
1239 # ARROW-2209: Ensure the dataset schema also includes the partition columns
1240 if use_legacy_dataset
:
1241 dataset_cols
= set(dataset
.schema
.to_arrow_schema().names
)
1243 # NB schema property is an arrow and not parquet schema
1244 dataset_cols
= set(dataset
.schema
.names
)
1246 assert dataset_cols
== set(output_table
.schema
.names
)
1248 input_table
= dataset
.read()
1249 input_df
= input_table
.to_pandas()
1251 # Read data back in and compare with original DataFrame
1252 # Partitioned columns added to the end of the DataFrame when read
1253 input_df_cols
= input_df
.columns
.tolist()
1254 assert partition_by
== input_df_cols
[-1 * len(partition_by
):]
1256 input_df
= input_df
[cols
]
1257 # Partitioned columns become 'categorical' dtypes
1258 for col
in partition_by
:
1259 output_df
[col
] = output_df
[col
].astype('category')
1260 tm
.assert_frame_equal(output_df
, input_df
)
1263 def _test_write_to_dataset_no_partitions(base_path
,
1264 use_legacy_dataset
=True,
1268 import pyarrow
.parquet
as pq
1271 output_df
= pd
.DataFrame({'group1': list('aaabbbbccc'),
1272 'group2': list('eefeffgeee'),
1273 'num': list(range(10)),
1274 'date': np
.arange('2017-01-01', '2017-01-11',
1275 dtype
='datetime64[D]')})
1276 cols
= output_df
.columns
.tolist()
1277 output_table
= pa
.Table
.from_pandas(output_df
)
1279 if filesystem
is None:
1280 filesystem
= LocalFileSystem
._get
_instance
()
1282 # Without partitions, append files to root_path
1285 pq
.write_to_dataset(output_table
, base_path
,
1286 filesystem
=filesystem
)
1287 output_files
= [file for file in filesystem
.ls(str(base_path
))
1288 if file.endswith(".parquet")]
1289 assert len(output_files
) == n
1291 # Deduplicated incoming DataFrame should match
1292 # original outgoing Dataframe
1293 input_table
= pq
.ParquetDataset(
1294 base_path
, filesystem
=filesystem
,
1295 use_legacy_dataset
=use_legacy_dataset
1297 input_df
= input_table
.to_pandas()
1298 input_df
= input_df
.drop_duplicates()
1299 input_df
= input_df
[cols
]
1300 assert output_df
.equals(input_df
)
1304 @parametrize_legacy_dataset
1305 def test_write_to_dataset_with_partitions(tempdir
, use_legacy_dataset
):
1306 _test_write_to_dataset_with_partitions(str(tempdir
), use_legacy_dataset
)
1310 @parametrize_legacy_dataset
1311 def test_write_to_dataset_with_partitions_and_schema(
1312 tempdir
, use_legacy_dataset
1314 schema
= pa
.schema([pa
.field('group1', type=pa
.string()),
1315 pa
.field('group2', type=pa
.string()),
1316 pa
.field('num', type=pa
.int64()),
1317 pa
.field('nan', type=pa
.int32()),
1318 pa
.field('date', type=pa
.timestamp(unit
='us'))])
1319 _test_write_to_dataset_with_partitions(
1320 str(tempdir
), use_legacy_dataset
, schema
=schema
)
1324 @parametrize_legacy_dataset
1325 def test_write_to_dataset_with_partitions_and_index_name(
1326 tempdir
, use_legacy_dataset
1328 _test_write_to_dataset_with_partitions(
1329 str(tempdir
), use_legacy_dataset
, index_name
='index_name')
1333 @parametrize_legacy_dataset
1334 def test_write_to_dataset_no_partitions(tempdir
, use_legacy_dataset
):
1335 _test_write_to_dataset_no_partitions(str(tempdir
), use_legacy_dataset
)
1339 @parametrize_legacy_dataset
1340 def test_write_to_dataset_pathlib(tempdir
, use_legacy_dataset
):
1341 _test_write_to_dataset_with_partitions(
1342 tempdir
/ "test1", use_legacy_dataset
)
1343 _test_write_to_dataset_no_partitions(
1344 tempdir
/ "test2", use_legacy_dataset
)
1349 @parametrize_legacy_dataset
1350 def test_write_to_dataset_pathlib_nonlocal(
1351 tempdir
, s3_example_s3fs
, use_legacy_dataset
1353 # pathlib paths are only accepted for local files
1354 fs
, _
= s3_example_s3fs
1356 with pytest
.raises(TypeError, match
="path-like objects are only allowed"):
1357 _test_write_to_dataset_with_partitions(
1358 tempdir
/ "test1", use_legacy_dataset
, filesystem
=fs
)
1360 with pytest
.raises(TypeError, match
="path-like objects are only allowed"):
1361 _test_write_to_dataset_no_partitions(
1362 tempdir
/ "test2", use_legacy_dataset
, filesystem
=fs
)
1367 @parametrize_legacy_dataset
1368 def test_write_to_dataset_with_partitions_s3fs(
1369 s3_example_s3fs
, use_legacy_dataset
1371 fs
, path
= s3_example_s3fs
1373 _test_write_to_dataset_with_partitions(
1374 path
, use_legacy_dataset
, filesystem
=fs
)
1379 @parametrize_legacy_dataset
1380 def test_write_to_dataset_no_partitions_s3fs(
1381 s3_example_s3fs
, use_legacy_dataset
1383 fs
, path
= s3_example_s3fs
1385 _test_write_to_dataset_no_partitions(
1386 path
, use_legacy_dataset
, filesystem
=fs
)
1389 @pytest.mark
.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
1391 @parametrize_legacy_dataset_not_supported
1392 def test_write_to_dataset_with_partitions_and_custom_filenames(
1393 tempdir
, use_legacy_dataset
1395 output_df
= pd
.DataFrame({'group1': list('aaabbbbccc'),
1396 'group2': list('eefeffgeee'),
1397 'num': list(range(10)),
1398 'nan': [np
.nan
] * 10,
1399 'date': np
.arange('2017-01-01', '2017-01-11',
1400 dtype
='datetime64[D]')})
1401 partition_by
= ['group1', 'group2']
1402 output_table
= pa
.Table
.from_pandas(output_df
)
1405 def partition_filename_callback(keys
):
1406 return "{}-{}.parquet".format(*keys
)
1408 pq
.write_to_dataset(output_table
, path
,
1409 partition_by
, partition_filename_callback
,
1410 use_legacy_dataset
=use_legacy_dataset
)
1412 dataset
= pq
.ParquetDataset(path
)
1414 # ARROW-3538: Ensure partition filenames match the given pattern
1415 # defined in the local function partition_filename_callback
1416 expected_basenames
= [
1417 'a-e.parquet', 'a-f.parquet',
1418 'b-e.parquet', 'b-f.parquet',
1419 'b-g.parquet', 'c-e.parquet'
1421 output_basenames
= [os
.path
.basename(p
.path
) for p
in dataset
.pieces
]
1423 assert sorted(expected_basenames
) == sorted(output_basenames
)
1426 @pytest.mark
.dataset
1428 def test_write_to_dataset_filesystem(tempdir
):
1429 df
= pd
.DataFrame({'A': [1, 2, 3]})
1430 table
= pa
.Table
.from_pandas(df
)
1433 pq
.write_to_dataset(table
, path
, filesystem
=fs
.LocalFileSystem())
1434 result
= pq
.read_table(path
)
1435 assert result
.equals(table
)
1438 # TODO(dataset) support pickling
1439 def _make_dataset_for_pickling(tempdir
, N
=100):
1440 path
= tempdir
/ 'data.parquet'
1441 fs
= LocalFileSystem
._get
_instance
()
1444 'index': np
.arange(N
),
1445 'values': np
.random
.randn(N
)
1446 }, columns
=['index', 'values'])
1447 table
= pa
.Table
.from_pandas(df
)
1450 with pq
.ParquetWriter(path
, table
.schema
) as writer
:
1451 for i
in range(num_groups
):
1452 writer
.write_table(table
)
1454 reader
= pq
.ParquetFile(path
)
1455 assert reader
.metadata
.num_row_groups
== num_groups
1457 metadata_path
= tempdir
/ '_metadata'
1458 with fs
.open(metadata_path
, 'wb') as f
:
1459 pq
.write_metadata(table
.schema
, f
)
1461 dataset
= pq
.ParquetDataset(tempdir
, filesystem
=fs
)
1462 assert dataset
.metadata_path
== str(metadata_path
)
1467 def _assert_dataset_is_picklable(dataset
, pickler
):
1468 def is_pickleable(obj
):
1469 return obj
== pickler
.loads(pickler
.dumps(obj
))
1471 assert is_pickleable(dataset
)
1472 assert is_pickleable(dataset
.metadata
)
1473 assert is_pickleable(dataset
.metadata
.schema
)
1474 assert len(dataset
.metadata
.schema
)
1475 for column
in dataset
.metadata
.schema
:
1476 assert is_pickleable(column
)
1478 for piece
in dataset
._pieces
:
1479 assert is_pickleable(piece
)
1480 metadata
= piece
.get_metadata()
1481 assert metadata
.num_row_groups
1482 for i
in range(metadata
.num_row_groups
):
1483 assert is_pickleable(metadata
.row_group(i
))
1487 def test_builtin_pickle_dataset(tempdir
, datadir
):
1489 dataset
= _make_dataset_for_pickling(tempdir
)
1490 _assert_dataset_is_picklable(dataset
, pickler
=pickle
)
1494 def test_cloudpickle_dataset(tempdir
, datadir
):
1495 cp
= pytest
.importorskip('cloudpickle')
1496 dataset
= _make_dataset_for_pickling(tempdir
)
1497 _assert_dataset_is_picklable(dataset
, pickler
=cp
)
1501 @parametrize_legacy_dataset
1502 def test_partitioned_dataset(tempdir
, use_legacy_dataset
):
1503 # ARROW-3208: Segmentation fault when reading a Parquet partitioned dataset
1505 path
= tempdir
/ "ARROW-3208"
1507 'one': [-1, 10, 2.5, 100, 1000, 1, 29.2],
1508 'two': [-1, 10, 2, 100, 1000, 1, 11],
1509 'three': [0, 0, 0, 0, 0, 0, 0]
1511 table
= pa
.Table
.from_pandas(df
)
1512 pq
.write_to_dataset(table
, root_path
=str(path
),
1513 partition_cols
=['one', 'two'])
1514 table
= pq
.ParquetDataset(
1515 path
, use_legacy_dataset
=use_legacy_dataset
).read()
1516 pq
.write_table(table
, path
/ "output.parquet")
1520 @parametrize_legacy_dataset
1521 def test_dataset_read_dictionary(tempdir
, use_legacy_dataset
):
1522 path
= tempdir
/ "ARROW-3325-dataset"
1523 t1
= pa
.table([[util
.rands(10) for i
in range(5)] * 10], names
=['f0'])
1524 t2
= pa
.table([[util
.rands(10) for i
in range(5)] * 10], names
=['f0'])
1525 # TODO pass use_legacy_dataset (need to fix unique names)
1526 pq
.write_to_dataset(t1
, root_path
=str(path
))
1527 pq
.write_to_dataset(t2
, root_path
=str(path
))
1529 result
= pq
.ParquetDataset(
1530 path
, read_dictionary
=['f0'],
1531 use_legacy_dataset
=use_legacy_dataset
).read()
1533 # The order of the chunks is non-deterministic
1534 ex_chunks
= [t1
[0].chunk(0).dictionary_encode(),
1535 t2
[0].chunk(0).dictionary_encode()]
1537 assert result
[0].num_chunks
== 2
1538 c0
, c1
= result
[0].chunk(0), result
[0].chunk(1)
1539 if c0
.equals(ex_chunks
[0]):
1540 assert c1
.equals(ex_chunks
[1])
1542 assert c0
.equals(ex_chunks
[1])
1543 assert c1
.equals(ex_chunks
[0])
1546 @pytest.mark
.dataset
1547 def test_dataset_unsupported_keywords():
1549 with pytest
.raises(ValueError, match
="not yet supported with the new"):
1550 pq
.ParquetDataset("", use_legacy_dataset
=False, schema
=pa
.schema([]))
1552 with pytest
.raises(ValueError, match
="not yet supported with the new"):
1553 pq
.ParquetDataset("", use_legacy_dataset
=False, metadata
=pa
.schema([]))
1555 with pytest
.raises(ValueError, match
="not yet supported with the new"):
1556 pq
.ParquetDataset("", use_legacy_dataset
=False, validate_schema
=False)
1558 with pytest
.raises(ValueError, match
="not yet supported with the new"):
1559 pq
.ParquetDataset("", use_legacy_dataset
=False, split_row_groups
=True)
1561 with pytest
.raises(ValueError, match
="not yet supported with the new"):
1562 pq
.ParquetDataset("", use_legacy_dataset
=False, metadata_nthreads
=4)
1564 with pytest
.raises(ValueError, match
="no longer supported"):
1565 pq
.read_table("", use_legacy_dataset
=False, metadata
=pa
.schema([]))
1568 @pytest.mark
.dataset
1569 def test_dataset_partitioning(tempdir
):
1570 import pyarrow
.dataset
as ds
1572 # create small dataset with directory partitioning
1573 root_path
= tempdir
/ "test_partitioning"
1574 (root_path
/ "2012" / "10" / "01").mkdir(parents
=True)
1576 table
= pa
.table({'a': [1, 2, 3]})
1578 table
, str(root_path
/ "2012" / "10" / "01" / "data.parquet"))
1580 # This works with new dataset API
1583 part
= ds
.partitioning(field_names
=["year", "month", "day"])
1584 result
= pq
.read_table(
1585 str(root_path
), partitioning
=part
, use_legacy_dataset
=False)
1586 assert result
.column_names
== ["a", "year", "month", "day"]
1588 result
= pq
.ParquetDataset(
1589 str(root_path
), partitioning
=part
, use_legacy_dataset
=False).read()
1590 assert result
.column_names
== ["a", "year", "month", "day"]
1592 # This raises an error for legacy dataset
1593 with pytest
.raises(ValueError):
1595 str(root_path
), partitioning
=part
, use_legacy_dataset
=True)
1597 with pytest
.raises(ValueError):
1599 str(root_path
), partitioning
=part
, use_legacy_dataset
=True)
1602 @pytest.mark
.dataset
1603 def test_parquet_dataset_new_filesystem(tempdir
):
1604 # Ensure we can pass new FileSystem object to ParquetDataset
1605 # (use new implementation automatically without specifying
1606 # use_legacy_dataset=False)
1607 table
= pa
.table({'a': [1, 2, 3]})
1608 pq
.write_table(table
, tempdir
/ 'data.parquet')
1609 # don't use simple LocalFileSystem (as that gets mapped to legacy one)
1610 filesystem
= fs
.SubTreeFileSystem(str(tempdir
), fs
.LocalFileSystem())
1611 dataset
= pq
.ParquetDataset('.', filesystem
=filesystem
)
1612 result
= dataset
.read()
1613 assert result
.equals(table
)
1616 @pytest.mark
.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
1617 def test_parquet_dataset_partitions_piece_path_with_fsspec(tempdir
):
1618 # ARROW-10462 ensure that on Windows we properly use posix-style paths
1620 fsspec
= pytest
.importorskip("fsspec")
1621 filesystem
= fsspec
.filesystem('file')
1622 table
= pa
.table({'a': [1, 2, 3]})
1623 pq
.write_table(table
, tempdir
/ 'data.parquet')
1625 # pass a posix-style path (using "/" also on Windows)
1626 path
= str(tempdir
).replace("\\", "/")
1627 dataset
= pq
.ParquetDataset(path
, filesystem
=filesystem
)
1628 # ensure the piece path is also posix-style
1629 expected
= path
+ "/data.parquet"
1630 assert dataset
.pieces
[0].path
== expected
1633 @pytest.mark
.dataset
1634 def test_parquet_dataset_deprecated_properties(tempdir
):
1635 table
= pa
.table({'a': [1, 2, 3]})
1636 path
= tempdir
/ 'data.parquet'
1637 pq
.write_table(table
, path
)
1638 dataset
= pq
.ParquetDataset(path
)
1640 with pytest
.warns(DeprecationWarning, match
="'ParquetDataset.pieces"):
1643 with pytest
.warns(DeprecationWarning, match
="'ParquetDataset.partitions"):
1646 with pytest
.warns(DeprecationWarning, match
="'ParquetDataset.memory_map"):
1649 with pytest
.warns(DeprecationWarning, match
="'ParquetDataset.read_dictio"):
1650 dataset
.read_dictionary
1652 with pytest
.warns(DeprecationWarning, match
="'ParquetDataset.buffer_size"):
1655 with pytest
.warns(DeprecationWarning, match
="'ParquetDataset.fs"):
1658 dataset2
= pq
.ParquetDataset(path
, use_legacy_dataset
=False)
1660 with pytest
.warns(DeprecationWarning, match
="'ParquetDataset.pieces"):