1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
19 from collections
import defaultdict
20 from concurrent
import futures
21 from functools
import partial
, reduce
24 from collections
.abc
import Collection
33 import pyarrow
.lib
as lib
34 import pyarrow
._parquet
as _parquet
36 from pyarrow
._parquet
import (ParquetReader
, Statistics
, # noqa
37 FileMetaData
, RowGroupMetaData
,
39 ParquetSchema
, ColumnSchema
)
40 from pyarrow
.fs
import (LocalFileSystem
, FileSystem
,
41 _resolve_filesystem_and_path
, _ensure_filesystem
)
42 from pyarrow
import filesystem
as legacyfs
43 from pyarrow
.util
import guid
, _is_path_like
, _stringify_path
45 _URI_STRIP_SCHEMES
= ('hdfs',)
49 path
= _stringify_path(path
)
50 parsed_uri
= urllib
.parse
.urlparse(path
)
51 if parsed_uri
.scheme
in _URI_STRIP_SCHEMES
:
52 return parsed_uri
.path
54 # ARROW-4073: On Windows returning the path with the scheme
55 # stripped removes the drive letter, if any
59 def _get_filesystem_and_path(passed_filesystem
, path
):
60 if passed_filesystem
is None:
61 return legacyfs
.resolve_filesystem_and_path(path
, passed_filesystem
)
63 passed_filesystem
= legacyfs
._ensure
_filesystem
(passed_filesystem
)
64 parsed_path
= _parse_uri(path
)
65 return passed_filesystem
, parsed_path
68 def _check_contains_null(val
):
69 if isinstance(val
, bytes
):
71 if isinstance(byte
, bytes
):
75 if byte
== compare_to
:
77 elif isinstance(val
, str):
82 def _check_filters(filters
, check_null_strings
=True):
84 Check if filters are well-formed.
86 if filters
is not None:
87 if len(filters
) == 0 or any(len(f
) == 0 for f
in filters
):
88 raise ValueError("Malformed filters")
89 if isinstance(filters
[0][0], str):
90 # We have encountered the situation where we have one nesting level
92 # We have [(,,), ..] instead of [[(,,), ..]]
94 if check_null_strings
:
95 for conjunction
in filters
:
96 for col
, op
, val
in conjunction
:
98 isinstance(val
, list) and
99 all(_check_contains_null(v
) for v
in val
) or
100 _check_contains_null(val
)
102 raise NotImplementedError(
103 "Null-terminated binary strings are not supported "
109 _DNF_filter_doc
= """Predicates are expressed in disjunctive normal form (DNF), like
110 ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary boolean logical
111 combinations of single column predicates. The innermost tuples each
112 describe a single column predicate. The list of inner predicates is
113 interpreted as a conjunction (AND), forming a more selective and
114 multiple column predicate. Finally, the most outer list combines these
115 filters as a disjunction (OR).
117 Predicates may also be passed as List[Tuple]. This form is interpreted
118 as a single conjunction. To express OR in predicates, one must
119 use the (preferred) List[List[Tuple]] notation.
121 Each tuple has format: (``key``, ``op``, ``value``) and compares the
122 ``key`` with the ``value``.
123 The supported ``op`` are: ``=`` or ``==``, ``!=``, ``<``, ``>``, ``<=``,
124 ``>=``, ``in`` and ``not in``. If the ``op`` is ``in`` or ``not in``, the
125 ``value`` must be a collection such as a ``list``, a ``set`` or a
130 .. code-block:: python
133 ('y', 'in', ['a', 'b', 'c'])
134 ('z', 'not in', {'a','b'})
139 def _filters_to_expression(filters
):
141 Check if filters are well-formed.
143 See _DNF_filter_doc above for more details.
145 import pyarrow
.dataset
as ds
147 if isinstance(filters
, ds
.Expression
):
150 filters
= _check_filters(filters
, check_null_strings
=False)
152 def convert_single_predicate(col
, op
, val
):
153 field
= ds
.field(col
)
155 if op
== "=" or op
== "==":
168 return field
.isin(val
)
170 return ~field
.isin(val
)
173 '"{0}" is not a valid operator in predicates.'.format(
176 disjunction_members
= []
178 for conjunction
in filters
:
179 conjunction_members
= [
180 convert_single_predicate(col
, op
, val
)
181 for col
, op
, val
in conjunction
184 disjunction_members
.append(reduce(operator
.and_
, conjunction_members
))
186 return reduce(operator
.or_
, disjunction_members
)
189 # ----------------------------------------------------------------------
190 # Reading a single Parquet file
195 Reader interface for a single Parquet file.
199 source : str, pathlib.Path, pyarrow.NativeFile, or file-like object
200 Readable source. For passing bytes or buffer-like file containing a
201 Parquet file, use pyarrow.BufferReader.
202 metadata : FileMetaData, default None
203 Use existing metadata object, rather than reading from file.
204 common_metadata : FileMetaData, default None
205 Will be used in reads for pandas schema metadata if not found in the
206 main file's metadata, no other uses at the moment.
207 memory_map : bool, default False
208 If the source is a file path, use a memory map to read file, which can
209 improve performance in some environments.
210 buffer_size : int, default 0
211 If positive, perform read buffering when deserializing individual
212 column chunks. Otherwise IO calls are unbuffered.
213 pre_buffer : bool, default False
214 Coalesce and issue file reads in parallel to improve performance on
215 high-latency filesystems (e.g. S3). If True, Arrow will use a
216 background I/O thread pool.
217 read_dictionary : list
218 List of column names to read directly as DictionaryArray.
219 coerce_int96_timestamp_unit : str, default None.
220 Cast timestamps that are stored in INT96 format to a particular
221 resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
222 and therefore INT96 timestamps will be infered as timestamps
226 def __init__(self
, source
, metadata
=None, common_metadata
=None,
227 read_dictionary
=None, memory_map
=False, buffer_size
=0,
228 pre_buffer
=False, coerce_int96_timestamp_unit
=None):
229 self
.reader
= ParquetReader()
231 source
, use_memory_map
=memory_map
,
232 buffer_size
=buffer_size
, pre_buffer
=pre_buffer
,
233 read_dictionary
=read_dictionary
, metadata
=metadata
,
234 coerce_int96_timestamp_unit
=coerce_int96_timestamp_unit
236 self
.common_metadata
= common_metadata
237 self
._nested
_paths
_by
_prefix
= self
._build
_nested
_paths
()
239 def _build_nested_paths(self
):
240 paths
= self
.reader
.column_paths
242 result
= defaultdict(list)
244 for i
, path
in enumerate(paths
):
248 result
[key
].append(i
)
253 key
= '.'.join((key
, rest
[0]))
260 return self
.reader
.metadata
265 Return the Parquet schema, unconverted to Arrow types
267 return self
.metadata
.schema
270 def schema_arrow(self
):
272 Return the inferred Arrow schema, converted from the whole Parquet
275 return self
.reader
.schema_arrow
278 def num_row_groups(self
):
279 return self
.reader
.num_row_groups
281 def read_row_group(self
, i
, columns
=None, use_threads
=True,
282 use_pandas_metadata
=False):
284 Read a single row group from a Parquet file.
289 Index of the individual row group that we want to read.
291 If not None, only these columns will be read from the row group. A
292 column name may be a prefix of a nested field, e.g. 'a' will select
293 'a.b', 'a.c', and 'a.d.e'.
294 use_threads : bool, default True
295 Perform multi-threaded column reads.
296 use_pandas_metadata : bool, default False
297 If True and file has custom pandas schema metadata, ensure that
298 index columns are also loaded.
303 Content of the row group as a table (of columns)
305 column_indices
= self
._get
_column
_indices
(
306 columns
, use_pandas_metadata
=use_pandas_metadata
)
307 return self
.reader
.read_row_group(i
, column_indices
=column_indices
,
308 use_threads
=use_threads
)
310 def read_row_groups(self
, row_groups
, columns
=None, use_threads
=True,
311 use_pandas_metadata
=False):
313 Read a multiple row groups from a Parquet file.
318 Only these row groups will be read from the file.
320 If not None, only these columns will be read from the row group. A
321 column name may be a prefix of a nested field, e.g. 'a' will select
322 'a.b', 'a.c', and 'a.d.e'.
323 use_threads : bool, default True
324 Perform multi-threaded column reads.
325 use_pandas_metadata : bool, default False
326 If True and file has custom pandas schema metadata, ensure that
327 index columns are also loaded.
332 Content of the row groups as a table (of columns).
334 column_indices
= self
._get
_column
_indices
(
335 columns
, use_pandas_metadata
=use_pandas_metadata
)
336 return self
.reader
.read_row_groups(row_groups
,
337 column_indices
=column_indices
,
338 use_threads
=use_threads
)
340 def iter_batches(self
, batch_size
=65536, row_groups
=None, columns
=None,
341 use_threads
=True, use_pandas_metadata
=False):
343 Read streaming batches from a Parquet file
347 batch_size : int, default 64K
348 Maximum number of records to yield per batch. Batches may be
349 smaller if there aren't enough rows in the file.
351 Only these row groups will be read from the file.
353 If not None, only these columns will be read from the file. A
354 column name may be a prefix of a nested field, e.g. 'a' will select
355 'a.b', 'a.c', and 'a.d.e'.
356 use_threads : boolean, default True
357 Perform multi-threaded column reads.
358 use_pandas_metadata : boolean, default False
359 If True and file has custom pandas schema metadata, ensure that
360 index columns are also loaded.
364 iterator of pyarrow.RecordBatch
365 Contents of each batch as a record batch
367 if row_groups
is None:
368 row_groups
= range(0, self
.metadata
.num_row_groups
)
369 column_indices
= self
._get
_column
_indices
(
370 columns
, use_pandas_metadata
=use_pandas_metadata
)
372 batches
= self
.reader
.iter_batches(batch_size
,
373 row_groups
=row_groups
,
374 column_indices
=column_indices
,
375 use_threads
=use_threads
)
378 def read(self
, columns
=None, use_threads
=True, use_pandas_metadata
=False):
380 Read a Table from Parquet format,
385 If not None, only these columns will be read from the file. A
386 column name may be a prefix of a nested field, e.g. 'a' will select
387 'a.b', 'a.c', and 'a.d.e'.
388 use_threads : bool, default True
389 Perform multi-threaded column reads.
390 use_pandas_metadata : bool, default False
391 If True and file has custom pandas schema metadata, ensure that
392 index columns are also loaded.
397 Content of the file as a table (of columns).
399 column_indices
= self
._get
_column
_indices
(
400 columns
, use_pandas_metadata
=use_pandas_metadata
)
401 return self
.reader
.read_all(column_indices
=column_indices
,
402 use_threads
=use_threads
)
404 def scan_contents(self
, columns
=None, batch_size
=65536):
406 Read contents of file for the given columns and batch size.
410 This function's primary purpose is benchmarking.
411 The scan is executed on a single thread.
415 columns : list of integers, default None
416 Select columns to read, if None scan all columns.
417 batch_size : int, default 64K
418 Number of rows to read at a time internally.
422 num_rows : number of rows in file
424 column_indices
= self
._get
_column
_indices
(columns
)
425 return self
.reader
.scan_contents(column_indices
,
426 batch_size
=batch_size
)
428 def _get_column_indices(self
, column_names
, use_pandas_metadata
=False):
429 if column_names
is None:
434 for name
in column_names
:
435 if name
in self
._nested
_paths
_by
_prefix
:
436 indices
.extend(self
._nested
_paths
_by
_prefix
[name
])
438 if use_pandas_metadata
:
439 file_keyvalues
= self
.metadata
.metadata
440 common_keyvalues
= (self
.common_metadata
.metadata
441 if self
.common_metadata
is not None
444 if file_keyvalues
and b
'pandas' in file_keyvalues
:
445 index_columns
= _get_pandas_index_columns(file_keyvalues
)
446 elif common_keyvalues
and b
'pandas' in common_keyvalues
:
447 index_columns
= _get_pandas_index_columns(common_keyvalues
)
451 if indices
is not None and index_columns
:
452 indices
+= [self
.reader
.column_name_idx(descr
)
453 for descr
in index_columns
454 if not isinstance(descr
, dict)]
459 _SPARK_DISALLOWED_CHARS
= re
.compile('[ ,;{}()\n\t=]')
462 def _sanitized_spark_field_name(name
):
463 return _SPARK_DISALLOWED_CHARS
.sub('_', name
)
466 def _sanitize_schema(schema
, flavor
):
467 if 'spark' in flavor
:
468 sanitized_fields
= []
470 schema_changed
= False
474 sanitized_name
= _sanitized_spark_field_name(name
)
476 if sanitized_name
!= name
:
477 schema_changed
= True
478 sanitized_field
= pa
.field(sanitized_name
, field
.type,
479 field
.nullable
, field
.metadata
)
480 sanitized_fields
.append(sanitized_field
)
482 sanitized_fields
.append(field
)
484 new_schema
= pa
.schema(sanitized_fields
, metadata
=schema
.metadata
)
485 return new_schema
, schema_changed
490 def _sanitize_table(table
, new_schema
, flavor
):
491 # TODO: This will not handle prohibited characters in nested field names
492 if 'spark' in flavor
:
493 column_data
= [table
[i
] for i
in range(table
.num_columns
)]
494 return pa
.Table
.from_arrays(column_data
, schema
=new_schema
)
499 _parquet_writer_arg_docs
= """version : {"1.0", "2.4", "2.6"}, default "1.0"
500 Determine which Parquet logical types are available for use, whether the
501 reduced set from the Parquet 1.x.x format or the expanded logical types
502 added in later format versions.
503 Files written with version='2.4' or '2.6' may not be readable in all
504 Parquet implementations, so version='1.0' is likely the choice that
505 maximizes file compatibility.
506 UINT32 and some logical types are only available with version '2.4'.
507 Nanosecond timestamps are only available with version '2.6'.
508 Other features such as compression algorithms or the new serialized
509 data page format must be enabled separately (see 'compression' and
510 'data_page_version').
511 use_dictionary : bool or list
512 Specify if we should use dictionary encoding in general or only for
514 use_deprecated_int96_timestamps : bool, default None
515 Write timestamps to INT96 Parquet format. Defaults to False unless enabled
516 by flavor argument. This take priority over the coerce_timestamps option.
517 coerce_timestamps : str, default None
518 Cast timestamps to a particular resolution. If omitted, defaults are chosen
519 depending on `version`. By default, for ``version='1.0'`` (the default)
520 and ``version='2.4'``, nanoseconds are cast to microseconds ('us'), while
521 for other `version` values, they are written natively without loss
522 of resolution. Seconds are always cast to milliseconds ('ms') by default,
523 as Parquet does not have any temporal type with seconds resolution.
524 If the casting results in loss of data, it will raise an exception
525 unless ``allow_truncated_timestamps=True`` is given.
526 Valid values: {None, 'ms', 'us'}
527 data_page_size : int, default None
528 Set a target threshold for the approximate encoded size of data
529 pages within a column chunk (in bytes). If None, use the default data page
531 allow_truncated_timestamps : bool, default False
532 Allow loss of data when coercing timestamps to a particular
533 resolution. E.g. if microsecond or nanosecond data is lost when coercing to
534 'ms', do not raise an exception. Passing ``allow_truncated_timestamp=True``
535 will NOT result in the truncation exception being ignored unless
536 ``coerce_timestamps`` is not None.
537 compression : str or dict
538 Specify the compression codec, either on a general basis or per-column.
539 Valid values: {'NONE', 'SNAPPY', 'GZIP', 'BROTLI', 'LZ4', 'ZSTD'}.
540 write_statistics : bool or list
541 Specify if we should write statistics in general (default is True) or only
543 flavor : {'spark'}, default None
544 Sanitize schema or set other compatibility options to work with
545 various target systems.
546 filesystem : FileSystem, default None
547 If nothing passed, will be inferred from `where` if path-like, else
548 `where` is already a file-like object so no filesystem is needed.
549 compression_level : int or dict, default None
550 Specify the compression level for a codec, either on a general basis or
551 per-column. If None is passed, arrow selects the compression level for
552 the compression codec in use. The compression level has a different
553 meaning for each codec, so you have to read the documentation of the
555 An exception is thrown if the compression codec does not allow specifying
557 use_byte_stream_split : bool or list, default False
558 Specify if the byte_stream_split encoding should be used in general or
559 only for some columns. If both dictionary and byte_stream_stream are
560 enabled, then dictionary is preferred.
561 The byte_stream_split encoding is valid only for floating-point data types
562 and should be combined with a compression codec.
563 data_page_version : {"1.0", "2.0"}, default "1.0"
564 The serialized Parquet data page format version to write, defaults to
565 1.0. This does not impact the file schema logical types and Arrow to
566 Parquet type casting behavior; for that use the "version" option.
567 use_compliant_nested_type : bool, default False
568 Whether to write compliant Parquet nested type (lists) as defined
569 `here <https://github.com/apache/parquet-format/blob/master/
570 LogicalTypes.md#nested-types>`_, defaults to ``False``.
571 For ``use_compliant_nested_type=True``, this will write into a list
572 with 3-level structure where the middle level, named ``list``,
573 is a repeated group with a single field named ``element``::
575 <list-repetition> group <name> (LIST) {
576 repeated group list {
577 <element-repetition> <element-type> element;
581 For ``use_compliant_nested_type=False``, this will also write into a list
582 with 3-level structure, where the name of the single field of the middle
583 level ``list`` is taken from the element name for nested columns in Arrow,
584 which defaults to ``item``::
586 <list-repetition> group <name> (LIST) {
587 repeated group list {
588 <element-repetition> <element-type> item;
597 Class for incrementally building a Parquet file for Arrow tables.
601 where : path or file-like object
602 schema : arrow Schema
604 writer_engine_version : unused
606 If options contains a key `metadata_collector` then the
607 corresponding value is assumed to be a list (or any object with
608 `.append` method) that will be filled with the file metadata instance
610 """.format(_parquet_writer_arg_docs
)
612 def __init__(self
, where
, schema
, filesystem
=None,
616 compression
='snappy',
617 write_statistics
=True,
618 use_deprecated_int96_timestamps
=None,
619 compression_level
=None,
620 use_byte_stream_split
=False,
621 writer_engine_version
=None,
622 data_page_version
='1.0',
623 use_compliant_nested_type
=False,
625 if use_deprecated_int96_timestamps
is None:
626 # Use int96 timestamps for Spark
627 if flavor
is not None and 'spark' in flavor
:
628 use_deprecated_int96_timestamps
= True
630 use_deprecated_int96_timestamps
= False
633 if flavor
is not None:
634 schema
, self
.schema_changed
= _sanitize_schema(schema
, flavor
)
636 self
.schema_changed
= False
641 # If we open a file using a filesystem, store file handle so we can be
642 # sure to close it when `self.close` is called.
643 self
.file_handle
= None
645 filesystem
, path
= _resolve_filesystem_and_path(
646 where
, filesystem
, allow_legacy_filesystem
=True
648 if filesystem
is not None:
649 if isinstance(filesystem
, legacyfs
.FileSystem
):
650 # legacy filesystem (eg custom subclass)
652 sink
= self
.file_handle
= filesystem
.open(path
, 'wb')
654 # ARROW-10480: do not auto-detect compression. While
655 # a filename like foo.parquet.gz is nonconforming, it
656 # shouldn't implicitly apply compression.
657 sink
= self
.file_handle
= filesystem
.open_output_stream(
658 path
, compression
=None)
661 self
._metadata
_collector
= options
.pop('metadata_collector', None)
662 engine_version
= 'V2'
663 self
.writer
= _parquet
.ParquetWriter(
666 compression
=compression
,
667 use_dictionary
=use_dictionary
,
668 write_statistics
=write_statistics
,
669 use_deprecated_int96_timestamps
=use_deprecated_int96_timestamps
,
670 compression_level
=compression_level
,
671 use_byte_stream_split
=use_byte_stream_split
,
672 writer_engine_version
=engine_version
,
673 data_page_version
=data_page_version
,
674 use_compliant_nested_type
=use_compliant_nested_type
,
679 if getattr(self
, 'is_open', False):
685 def __exit__(self
, *args
, **kwargs
):
687 # return false since we want to propagate exceptions
690 def write_table(self
, table
, row_group_size
=None):
691 if self
.schema_changed
:
692 table
= _sanitize_table(table
, self
.schema
, self
.flavor
)
695 if not table
.schema
.equals(self
.schema
, check_metadata
=False):
696 msg
= ('Table schema does not match schema used to create file: '
697 '\ntable:\n{!s} vs. \nfile:\n{!s}'
698 .format(table
.schema
, self
.schema
))
699 raise ValueError(msg
)
701 self
.writer
.write_table(table
, row_group_size
=row_group_size
)
707 if self
._metadata
_collector
is not None:
708 self
._metadata
_collector
.append(self
.writer
.metadata
)
709 if self
.file_handle
is not None:
710 self
.file_handle
.close()
713 def _get_pandas_index_columns(keyvalues
):
714 return (json
.loads(keyvalues
[b
'pandas'].decode('utf8'))
718 # ----------------------------------------------------------------------
719 # Metadata container providing instructions about reading a single Parquet
720 # file, possibly part of a partitioned dataset
723 class ParquetDatasetPiece
:
725 DEPRECATED: A single chunk of a potentially larger Parquet dataset to read.
727 The arguments will indicate to read either a single row group or all row
728 groups, and whether to add partition keys to the resulting pyarrow.Table.
731 Directly constructing a ``ParquetDatasetPiece`` is deprecated, as well
732 as accessing the pieces of a ``ParquetDataset`` object. Specify
733 ``use_legacy_dataset=False`` when constructing the ``ParquetDataset``
734 and use the ``ParquetDataset.fragments`` attribute instead.
738 path : str or pathlib.Path
739 Path to file in the file system where this piece is located.
740 open_file_func : callable
741 Function to use for obtaining file handle to dataset piece.
742 partition_keys : list of tuples
743 Two-element tuples of ``(column name, ordinal index)``.
744 row_group : int, default None
745 Row group to load. By default, reads all row groups.
750 def __init__(self
, path
, open_file_func
=partial(open, mode
='rb'),
751 file_options
=None, row_group
=None, partition_keys
=None):
753 "ParquetDatasetPiece is deprecated as of pyarrow 5.0.0 and will "
754 "be removed in a future version.",
755 DeprecationWarning, stacklevel
=2)
757 path
, open_file_func
, file_options
, row_group
, partition_keys
)
760 def _create(path
, open_file_func
=partial(open, mode
='rb'),
761 file_options
=None, row_group
=None, partition_keys
=None):
762 self
= ParquetDatasetPiece
.__new
__(ParquetDatasetPiece
)
764 path
, open_file_func
, file_options
, row_group
, partition_keys
)
767 def _init(self
, path
, open_file_func
, file_options
, row_group
,
769 self
.path
= _stringify_path(path
)
770 self
.open_file_func
= open_file_func
771 self
.row_group
= row_group
772 self
.partition_keys
= partition_keys
or []
773 self
.file_options
= file_options
or {}
775 def __eq__(self
, other
):
776 if not isinstance(other
, ParquetDatasetPiece
):
778 return (self
.path
== other
.path
and
779 self
.row_group
== other
.row_group
and
780 self
.partition_keys
== other
.partition_keys
)
783 return ('{}({!r}, row_group={!r}, partition_keys={!r})'
784 .format(type(self
).__name
__, self
.path
,
786 self
.partition_keys
))
791 if len(self
.partition_keys
) > 0:
792 partition_str
= ', '.join('{}={}'.format(name
, index
)
793 for name
, index
in self
.partition_keys
)
794 result
+= 'partition[{}] '.format(partition_str
)
798 if self
.row_group
is not None:
799 result
+= ' | row_group={}'.format(self
.row_group
)
803 def get_metadata(self
):
805 Return the file's metadata.
809 metadata : FileMetaData
816 Return instance of ParquetFile.
818 reader
= self
.open_file_func(self
.path
)
819 if not isinstance(reader
, ParquetFile
):
820 reader
= ParquetFile(reader
, **self
.file_options
)
823 def read(self
, columns
=None, use_threads
=True, partitions
=None,
824 file=None, use_pandas_metadata
=False):
826 Read this piece as a pyarrow.Table.
830 columns : list of column names, default None
831 use_threads : bool, default True
832 Perform multi-threaded column reads.
833 partitions : ParquetPartitions, default None
834 file : file-like object
835 Passed to ParquetFile.
836 use_pandas_metadata : bool
837 If pandas metadata should be used or not.
841 table : pyarrow.Table
843 if self
.open_file_func
is not None:
845 elif file is not None:
846 reader
= ParquetFile(file, **self
.file_options
)
848 # try to read the local path
849 reader
= ParquetFile(self
.path
, **self
.file_options
)
851 options
= dict(columns
=columns
,
852 use_threads
=use_threads
,
853 use_pandas_metadata
=use_pandas_metadata
)
855 if self
.row_group
is not None:
856 table
= reader
.read_row_group(self
.row_group
, **options
)
858 table
= reader
.read(**options
)
860 if len(self
.partition_keys
) > 0:
861 if partitions
is None:
862 raise ValueError('Must pass partition sets')
864 # Here, the index is the categorical code of the partition where
865 # this piece is located. Suppose we had
871 # Then we assign a=0, b=1, c=2. And the resulting Table pieces will
872 # have a DictionaryArray column named foo having the constant index
873 # value as indicated. The distinct categories of the partition have
874 # been computed in the ParquetManifest
875 for i
, (name
, index
) in enumerate(self
.partition_keys
):
876 # The partition code is the same for all values in this piece
877 indices
= np
.full(len(table
), index
, dtype
='i4')
879 # This is set of all partition values, computed as part of the
880 # manifest, so ['a', 'b', 'c'] as in our example above.
881 dictionary
= partitions
.levels
[i
].dictionary
883 arr
= pa
.DictionaryArray
.from_arrays(indices
, dictionary
)
884 table
= table
.append_column(name
, arr
)
891 A data structure for cataloguing the observed Parquet partitions at a
892 particular level. So if we have
901 Then we have two partition sets, one for foo, another for bar. As we visit
902 levels of the partition hierarchy, a PartitionSet tracks the distinct
903 values and assigns categorical codes to use when reading the pieces
908 Name of the partition set. Under which key to collect all values.
910 All possible values that have been collected for that partition set.
913 def __init__(self
, name
, keys
=None):
915 self
.keys
= keys
or []
916 self
.key_indices
= {k
: i
for i
, k
in enumerate(self
.keys
)}
917 self
._dictionary
= None
919 def get_index(self
, key
):
921 Get the index of the partition value if it is known, otherwise assign
926 key : The value for which we want to known the index.
928 if key
in self
.key_indices
:
929 return self
.key_indices
[key
]
931 index
= len(self
.key_indices
)
932 self
.keys
.append(key
)
933 self
.key_indices
[key
] = index
937 def dictionary(self
):
938 if self
._dictionary
is not None:
939 return self
._dictionary
941 if len(self
.keys
) == 0:
942 raise ValueError('No known partition keys')
944 # Only integer and string partition types are supported right now
946 integer_keys
= [int(x
) for x
in self
.keys
]
947 dictionary
= lib
.array(integer_keys
)
949 dictionary
= lib
.array(self
.keys
)
951 self
._dictionary
= dictionary
956 return list(self
.keys
) == sorted(self
.keys
)
959 class ParquetPartitions
:
963 self
.partition_names
= set()
966 return len(self
.levels
)
968 def __getitem__(self
, i
):
969 return self
.levels
[i
]
971 def equals(self
, other
):
972 if not isinstance(other
, ParquetPartitions
):
973 raise TypeError('`other` must be an instance of ParquetPartitions')
975 return (self
.levels
== other
.levels
and
976 self
.partition_names
== other
.partition_names
)
978 def __eq__(self
, other
):
980 return self
.equals(other
)
982 return NotImplemented
984 def get_index(self
, level
, name
, key
):
986 Record a partition value at a particular level, returning the distinct
987 code for that value at that level.
991 partitions.get_index(1, 'foo', 'a') returns 0
992 partitions.get_index(1, 'foo', 'b') returns 1
993 partitions.get_index(1, 'foo', 'c') returns 2
994 partitions.get_index(1, 'foo', 'a') returns 0
999 The nesting level of the partition we are observing
1005 if level
== len(self
.levels
):
1006 if name
in self
.partition_names
:
1007 raise ValueError('{} was the name of the partition in '
1008 'another level'.format(name
))
1010 part_set
= PartitionSet(name
)
1011 self
.levels
.append(part_set
)
1012 self
.partition_names
.add(name
)
1014 return self
.levels
[level
].get_index(key
)
1016 def filter_accepts_partition(self
, part_key
, filter, level
):
1017 p_column
, p_value_index
= part_key
1018 f_column
, op
, f_value
= filter
1019 if p_column
!= f_column
:
1022 f_type
= type(f_value
)
1024 if op
in {'in', 'not in'}:
1025 if not isinstance(f_value
, Collection
):
1027 "'%s' object is not a collection", f_type
.__name
__)
1029 raise ValueError("Cannot use empty collection as filter value")
1030 if len({type(item
) for item
in f_value
}) != 1:
1031 raise ValueError("All elements of the collection '%s' must be"
1032 " of same type", f_value
)
1033 f_type
= type(next(iter(f_value
)))
1035 elif not isinstance(f_value
, str) and isinstance(f_value
, Collection
):
1037 "Op '%s' not supported with a collection value", op
)
1039 p_value
= f_type(self
.levels
[level
]
1040 .dictionary
[p_value_index
].as_py())
1042 if op
== "=" or op
== "==":
1043 return p_value
== f_value
1045 return p_value
!= f_value
1047 return p_value
< f_value
1049 return p_value
> f_value
1051 return p_value
<= f_value
1053 return p_value
>= f_value
1055 return p_value
in f_value
1056 elif op
== 'not in':
1057 return p_value
not in f_value
1059 raise ValueError("'%s' is not a valid operator in predicates.",
1063 class ParquetManifest
:
1065 def __init__(self
, dirpath
, open_file_func
=None, filesystem
=None,
1066 pathsep
='/', partition_scheme
='hive', metadata_nthreads
=1):
1067 filesystem
, dirpath
= _get_filesystem_and_path(filesystem
, dirpath
)
1068 self
.filesystem
= filesystem
1069 self
.open_file_func
= open_file_func
1070 self
.pathsep
= pathsep
1071 self
.dirpath
= _stringify_path(dirpath
)
1072 self
.partition_scheme
= partition_scheme
1073 self
.partitions
= ParquetPartitions()
1075 self
._metadata
_nthreads
= metadata_nthreads
1076 self
._thread
_pool
= futures
.ThreadPoolExecutor(
1077 max_workers
=metadata_nthreads
)
1079 self
.common_metadata_path
= None
1080 self
.metadata_path
= None
1082 self
._visit
_level
(0, self
.dirpath
, [])
1084 # Due to concurrency, pieces will potentially by out of order if the
1085 # dataset is partitioned so we sort them to yield stable results
1086 self
.pieces
.sort(key
=lambda piece
: piece
.path
)
1088 if self
.common_metadata_path
is None:
1089 # _common_metadata is a subset of _metadata
1090 self
.common_metadata_path
= self
.metadata_path
1092 self
._thread
_pool
.shutdown()
1094 def _visit_level(self
, level
, base_path
, part_keys
):
1095 fs
= self
.filesystem
1097 _
, directories
, files
= next(fs
.walk(base_path
))
1101 full_path
= self
.pathsep
.join((base_path
, path
))
1102 if path
.endswith('_common_metadata'):
1103 self
.common_metadata_path
= full_path
1104 elif path
.endswith('_metadata'):
1105 self
.metadata_path
= full_path
1106 elif self
._should
_silently
_exclude
(path
):
1109 filtered_files
.append(full_path
)
1111 # ARROW-1079: Filter out "private" directories starting with underscore
1112 filtered_directories
= [self
.pathsep
.join((base_path
, x
))
1113 for x
in directories
1114 if not _is_private_directory(x
)]
1116 filtered_files
.sort()
1117 filtered_directories
.sort()
1119 if len(filtered_files
) > 0 and len(filtered_directories
) > 0:
1120 raise ValueError('Found files in an intermediate '
1121 'directory: {}'.format(base_path
))
1122 elif len(filtered_directories
) > 0:
1123 self
._visit
_directories
(level
, filtered_directories
, part_keys
)
1125 self
._push
_pieces
(filtered_files
, part_keys
)
1127 def _should_silently_exclude(self
, file_name
):
1128 return (file_name
.endswith('.crc') or # Checksums
1129 file_name
.endswith('_$folder$') or # HDFS directories in S3
1130 file_name
.startswith('.') or # Hidden files starting with .
1131 file_name
.startswith('_') or # Hidden files starting with _
1132 file_name
in EXCLUDED_PARQUET_PATHS
)
1134 def _visit_directories(self
, level
, directories
, part_keys
):
1136 for path
in directories
:
1137 head
, tail
= _path_split(path
, self
.pathsep
)
1138 name
, key
= _parse_hive_partition(tail
)
1140 index
= self
.partitions
.get_index(level
, name
, key
)
1141 dir_part_keys
= part_keys
+ [(name
, index
)]
1142 # If you have less threads than levels, the wait call will block
1143 # indefinitely due to multiple waits within a thread.
1144 if level
< self
._metadata
_nthreads
:
1145 future
= self
._thread
_pool
.submit(self
._visit
_level
,
1149 futures_list
.append(future
)
1151 self
._visit
_level
(level
+ 1, path
, dir_part_keys
)
1153 futures
.wait(futures_list
)
1155 def _parse_partition(self
, dirname
):
1156 if self
.partition_scheme
== 'hive':
1157 return _parse_hive_partition(dirname
)
1159 raise NotImplementedError('partition schema: {}'
1160 .format(self
.partition_scheme
))
1162 def _push_pieces(self
, files
, part_keys
):
1163 self
.pieces
.extend([
1164 ParquetDatasetPiece
._create
(path
, partition_keys
=part_keys
,
1165 open_file_func
=self
.open_file_func
)
1170 def _parse_hive_partition(value
):
1171 if '=' not in value
:
1172 raise ValueError('Directory name did not appear to be a '
1173 'partition: {}'.format(value
))
1174 return value
.split('=', 1)
1177 def _is_private_directory(x
):
1178 _
, tail
= os
.path
.split(x
)
1179 return (tail
.startswith('_') or tail
.startswith('.')) and '=' not in tail
1182 def _path_split(path
, sep
):
1183 i
= path
.rfind(sep
) + 1
1184 head
, tail
= path
[:i
], path
[i
:]
1185 head
= head
.rstrip(sep
)
1189 EXCLUDED_PARQUET_PATHS
= {'_SUCCESS'}
1192 class _ParquetDatasetMetadata
:
1193 __slots__
= ('fs', 'memory_map', 'read_dictionary', 'common_metadata',
1197 def _open_dataset_file(dataset
, path
, meta
=None):
1198 if (dataset
.fs
is not None and
1199 not isinstance(dataset
.fs
, legacyfs
.LocalFileSystem
)):
1200 path
= dataset
.fs
.open(path
, mode
='rb')
1204 memory_map
=dataset
.memory_map
,
1205 read_dictionary
=dataset
.read_dictionary
,
1206 common_metadata
=dataset
.common_metadata
,
1207 buffer_size
=dataset
.buffer_size
1212 "'{}' attribute is deprecated as of pyarrow 5.0.0 and will be removed "
1213 "in a future version.{}"
1217 _read_docstring_common
= """\
1218 read_dictionary : list, default None
1219 List of names or column paths (for nested types) to read directly
1220 as DictionaryArray. Only supported for BYTE_ARRAY storage. To read
1221 a flat column as dictionary-encoded pass the column name. For
1222 nested types, you must pass the full column "path", which could be
1223 something like level1.level2.list.item. Refer to the Parquet
1224 file's schema to obtain the paths.
1225 memory_map : bool, default False
1226 If the source is a file path, use a memory map to read file, which can
1227 improve performance in some environments.
1228 buffer_size : int, default 0
1229 If positive, perform read buffering when deserializing individual
1230 column chunks. Otherwise IO calls are unbuffered.
1231 partitioning : Partitioning or str or list of str, default "hive"
1232 The partitioning scheme for a partitioned dataset. The default of "hive"
1233 assumes directory names with key=value pairs like "/year=2009/month=11".
1234 In addition, a scheme like "/2009/11" is also supported, in which case
1235 you need to specify the field names or a full schema. See the
1236 ``pyarrow.dataset.partitioning()`` function for more details."""
1239 class ParquetDataset
:
1242 Encapsulates details of reading a complete Parquet dataset possibly
1243 consisting of multiple files and partitions in subdirectories.
1247 path_or_paths : str or List[str]
1248 A directory name, single file name, or list of file names.
1249 filesystem : FileSystem, default None
1250 If nothing passed, paths assumed to be found in the local on-disk
1252 metadata : pyarrow.parquet.FileMetaData
1253 Use metadata obtained elsewhere to validate file schemas.
1254 schema : pyarrow.parquet.Schema
1255 Use schema obtained elsewhere to validate file schemas. Alternative to
1257 split_row_groups : bool, default False
1258 Divide files into pieces for each row group in the file.
1259 validate_schema : bool, default True
1260 Check that individual file schemas are all the same / compatible.
1261 filters : List[Tuple] or List[List[Tuple]] or None (default)
1262 Rows which do not match the filter predicate will be removed from scanned
1263 data. Partition keys embedded in a nested directory structure will be
1264 exploited to avoid loading files at all if they contain no matching rows.
1265 If `use_legacy_dataset` is True, filters can only reference partition
1266 keys and only a hive-style directory structure is supported. When
1267 setting `use_legacy_dataset` to False, also within-file level filtering
1268 and different partitioning schemes are supported.
1271 metadata_nthreads : int, default 1
1272 How many threads to allow the thread pool which is used to read the
1273 dataset metadata. Increasing this is helpful to read partitioned
1276 use_legacy_dataset : bool, default True
1277 Set to False to enable the new code path (experimental, using the
1278 new Arrow Dataset API). Among other things, this allows to pass
1279 `filters` for all columns and not only the partition keys, enables
1280 different partitioning schemes, etc.
1281 pre_buffer : bool, default True
1282 Coalesce and issue file reads in parallel to improve performance on
1283 high-latency filesystems (e.g. S3). If True, Arrow will use a
1284 background I/O thread pool. This option is only supported for
1285 use_legacy_dataset=False. If using a filesystem layer that itself
1286 performs readahead (e.g. fsspec's S3FS), disable readahead for best
1288 coerce_int96_timestamp_unit : str, default None.
1289 Cast timestamps that are stored in INT96 format to a particular resolution
1290 (e.g. 'ms'). Setting to None is equivalent to 'ns' and therefore INT96
1291 timestamps will be infered as timestamps in nanoseconds.
1292 """.format(_read_docstring_common
, _DNF_filter_doc
)
1294 def __new__(cls
, path_or_paths
=None, filesystem
=None, schema
=None,
1295 metadata
=None, split_row_groups
=False, validate_schema
=True,
1296 filters
=None, metadata_nthreads
=1, read_dictionary
=None,
1297 memory_map
=False, buffer_size
=0, partitioning
="hive",
1298 use_legacy_dataset
=None, pre_buffer
=True,
1299 coerce_int96_timestamp_unit
=None):
1300 if use_legacy_dataset
is None:
1301 # if a new filesystem is passed -> default to new implementation
1302 if isinstance(filesystem
, FileSystem
):
1303 use_legacy_dataset
= False
1304 # otherwise the default is still True
1306 use_legacy_dataset
= True
1308 if not use_legacy_dataset
:
1309 return _ParquetDatasetV2(
1310 path_or_paths
, filesystem
=filesystem
,
1312 partitioning
=partitioning
,
1313 read_dictionary
=read_dictionary
,
1314 memory_map
=memory_map
,
1315 buffer_size
=buffer_size
,
1316 pre_buffer
=pre_buffer
,
1317 coerce_int96_timestamp_unit
=coerce_int96_timestamp_unit
,
1318 # unsupported keywords
1319 schema
=schema
, metadata
=metadata
,
1320 split_row_groups
=split_row_groups
,
1321 validate_schema
=validate_schema
,
1322 metadata_nthreads
=metadata_nthreads
1324 self
= object.__new
__(cls
)
1327 def __init__(self
, path_or_paths
, filesystem
=None, schema
=None,
1328 metadata
=None, split_row_groups
=False, validate_schema
=True,
1329 filters
=None, metadata_nthreads
=1, read_dictionary
=None,
1330 memory_map
=False, buffer_size
=0, partitioning
="hive",
1331 use_legacy_dataset
=True, pre_buffer
=True,
1332 coerce_int96_timestamp_unit
=None):
1333 if partitioning
!= "hive":
1335 'Only "hive" for hive-like partitioning is supported when '
1336 'using use_legacy_dataset=True')
1337 self
._metadata
= _ParquetDatasetMetadata()
1338 a_path
= path_or_paths
1339 if isinstance(a_path
, list):
1342 self
._metadata
.fs
, _
= _get_filesystem_and_path(filesystem
, a_path
)
1343 if isinstance(path_or_paths
, list):
1344 self
.paths
= [_parse_uri(path
) for path
in path_or_paths
]
1346 self
.paths
= _parse_uri(path_or_paths
)
1348 self
._metadata
.read_dictionary
= read_dictionary
1349 self
._metadata
.memory_map
= memory_map
1350 self
._metadata
.buffer_size
= buffer_size
1354 self
.common_metadata_path
,
1355 self
.metadata_path
) = _make_manifest(
1356 path_or_paths
, self
._fs
, metadata_nthreads
=metadata_nthreads
,
1357 open_file_func
=partial(_open_dataset_file
, self
._metadata
)
1360 if self
.common_metadata_path
is not None:
1361 with self
._fs
.open(self
.common_metadata_path
) as f
:
1362 self
._metadata
.common_metadata
= read_metadata(
1364 memory_map
=memory_map
1367 self
._metadata
.common_metadata
= None
1369 if metadata
is None and self
.metadata_path
is not None:
1370 with self
._fs
.open(self
.metadata_path
) as f
:
1371 self
.metadata
= read_metadata(f
, memory_map
=memory_map
)
1373 self
.metadata
= metadata
1375 self
.schema
= schema
1377 self
.split_row_groups
= split_row_groups
1379 if split_row_groups
:
1380 raise NotImplementedError("split_row_groups not yet implemented")
1382 if filters
is not None:
1383 filters
= _check_filters(filters
)
1384 self
._filter
(filters
)
1387 self
.validate_schemas()
1389 def equals(self
, other
):
1390 if not isinstance(other
, ParquetDataset
):
1391 raise TypeError('`other` must be an instance of ParquetDataset')
1393 if self
._fs
.__class
__ != other
._fs
.__class
__:
1395 for prop
in ('paths', '_pieces', '_partitions',
1396 'common_metadata_path', 'metadata_path',
1397 'common_metadata', 'metadata', 'schema',
1398 'split_row_groups'):
1399 if getattr(self
, prop
) != getattr(other
, prop
):
1401 for prop
in ('memory_map', 'buffer_size'):
1402 if getattr(self
._metadata
, prop
) != getattr(other
._metadata
, prop
):
1407 def __eq__(self
, other
):
1409 return self
.equals(other
)
1411 return NotImplemented
1413 def validate_schemas(self
):
1414 if self
.metadata
is None and self
.schema
is None:
1415 if self
.common_metadata
is not None:
1416 self
.schema
= self
.common_metadata
.schema
1418 self
.schema
= self
._pieces
[0].get_metadata().schema
1419 elif self
.schema
is None:
1420 self
.schema
= self
.metadata
.schema
1422 # Verify schemas are all compatible
1423 dataset_schema
= self
.schema
.to_arrow_schema()
1424 # Exclude the partition columns from the schema, they are provided
1425 # by the path, not the DatasetPiece
1426 if self
._partitions
is not None:
1427 for partition_name
in self
._partitions
.partition_names
:
1428 if dataset_schema
.get_field_index(partition_name
) != -1:
1429 field_idx
= dataset_schema
.get_field_index(partition_name
)
1430 dataset_schema
= dataset_schema
.remove(field_idx
)
1432 for piece
in self
._pieces
:
1433 file_metadata
= piece
.get_metadata()
1434 file_schema
= file_metadata
.schema
.to_arrow_schema()
1435 if not dataset_schema
.equals(file_schema
, check_metadata
=False):
1436 raise ValueError('Schema in {!s} was different. \n'
1437 '{!s}\n\nvs\n\n{!s}'
1438 .format(piece
, file_schema
,
1441 def read(self
, columns
=None, use_threads
=True, use_pandas_metadata
=False):
1443 Read multiple Parquet files as a single pyarrow.Table.
1448 Names of columns to read from the file.
1449 use_threads : bool, default True
1450 Perform multi-threaded column reads
1451 use_pandas_metadata : bool, default False
1452 Passed through to each dataset piece.
1457 Content of the file as a table (of columns).
1460 for piece
in self
._pieces
:
1461 table
= piece
.read(columns
=columns
, use_threads
=use_threads
,
1462 partitions
=self
._partitions
,
1463 use_pandas_metadata
=use_pandas_metadata
)
1464 tables
.append(table
)
1466 all_data
= lib
.concat_tables(tables
)
1468 if use_pandas_metadata
:
1469 # We need to ensure that this metadata is set in the Table's schema
1470 # so that Table.to_pandas will construct pandas.DataFrame with the
1472 common_metadata
= self
._get
_common
_pandas
_metadata
()
1473 current_metadata
= all_data
.schema
.metadata
or {}
1475 if common_metadata
and b
'pandas' not in current_metadata
:
1476 all_data
= all_data
.replace_schema_metadata({
1477 b
'pandas': common_metadata
})
1481 def read_pandas(self
, **kwargs
):
1483 Read dataset including pandas metadata, if any. Other arguments passed
1484 through to ParquetDataset.read, see docstring for further details.
1489 All additional options to pass to the reader.
1494 Content of the file as a table (of columns).
1496 return self
.read(use_pandas_metadata
=True, **kwargs
)
1498 def _get_common_pandas_metadata(self
):
1499 if self
.common_metadata
is None:
1502 keyvalues
= self
.common_metadata
.metadata
1503 return keyvalues
.get(b
'pandas', None)
1505 def _filter(self
, filters
):
1506 accepts_filter
= self
._partitions
.filter_accepts_partition
1508 def one_filter_accepts(piece
, filter):
1509 return all(accepts_filter(part_key
, filter, level
)
1510 for level
, part_key
in enumerate(piece
.partition_keys
))
1512 def all_filters_accept(piece
):
1513 return any(all(one_filter_accepts(piece
, f
) for f
in conjunction
)
1514 for conjunction
in filters
)
1516 self
._pieces
= [p
for p
in self
._pieces
if all_filters_accept(p
)]
1522 "ParquetDataset.pieces",
1523 " Specify 'use_legacy_dataset=False' while constructing the "
1524 "ParquetDataset, and then use the '.fragments' attribute "
1526 DeprecationWarning, stacklevel
=2)
1530 def partitions(self
):
1533 "ParquetDataset.partitions",
1534 " Specify 'use_legacy_dataset=False' while constructing the "
1535 "ParquetDataset, and then use the '.partitioning' attribute "
1537 DeprecationWarning, stacklevel
=2)
1538 return self
._partitions
1541 def memory_map(self
):
1543 _DEPR_MSG
.format("ParquetDataset.memory_map", ""),
1544 DeprecationWarning, stacklevel
=2)
1545 return self
._metadata
.memory_map
1548 def read_dictionary(self
):
1550 _DEPR_MSG
.format("ParquetDataset.read_dictionary", ""),
1551 DeprecationWarning, stacklevel
=2)
1552 return self
._metadata
.read_dictionary
1555 def buffer_size(self
):
1557 _DEPR_MSG
.format("ParquetDataset.buffer_size", ""),
1558 DeprecationWarning, stacklevel
=2)
1559 return self
._metadata
.buffer_size
1562 operator
.attrgetter('_metadata.fs')
1569 "ParquetDataset.fs",
1570 " Specify 'use_legacy_dataset=False' while constructing the "
1571 "ParquetDataset, and then use the '.filesystem' attribute "
1573 DeprecationWarning, stacklevel
=2)
1574 return self
._metadata
.fs
1576 common_metadata
= property(
1577 operator
.attrgetter('_metadata.common_metadata')
1581 def _make_manifest(path_or_paths
, fs
, pathsep
='/', metadata_nthreads
=1,
1582 open_file_func
=None):
1584 common_metadata_path
= None
1585 metadata_path
= None
1587 if isinstance(path_or_paths
, list) and len(path_or_paths
) == 1:
1588 # Dask passes a directory as a list of length 1
1589 path_or_paths
= path_or_paths
[0]
1591 if _is_path_like(path_or_paths
) and fs
.isdir(path_or_paths
):
1592 manifest
= ParquetManifest(path_or_paths
, filesystem
=fs
,
1593 open_file_func
=open_file_func
,
1594 pathsep
=getattr(fs
, "pathsep", "/"),
1595 metadata_nthreads
=metadata_nthreads
)
1596 common_metadata_path
= manifest
.common_metadata_path
1597 metadata_path
= manifest
.metadata_path
1598 pieces
= manifest
.pieces
1599 partitions
= manifest
.partitions
1601 if not isinstance(path_or_paths
, list):
1602 path_or_paths
= [path_or_paths
]
1605 if len(path_or_paths
) == 0:
1606 raise ValueError('Must pass at least one file path')
1609 for path
in path_or_paths
:
1610 if not fs
.isfile(path
):
1611 raise OSError('Passed non-file path: {}'
1613 piece
= ParquetDatasetPiece
._create
(
1614 path
, open_file_func
=open_file_func
)
1615 pieces
.append(piece
)
1617 return pieces
, partitions
, common_metadata_path
, metadata_path
1620 def _is_local_file_system(fs
):
1621 return isinstance(fs
, LocalFileSystem
) or isinstance(
1622 fs
, legacyfs
.LocalFileSystem
1626 class _ParquetDatasetV2
:
1628 ParquetDataset shim using the Dataset API under the hood.
1631 def __init__(self
, path_or_paths
, filesystem
=None, filters
=None,
1632 partitioning
="hive", read_dictionary
=None, buffer_size
=None,
1633 memory_map
=False, ignore_prefixes
=None, pre_buffer
=True,
1634 coerce_int96_timestamp_unit
=None, **kwargs
):
1635 import pyarrow
.dataset
as ds
1637 # Raise error for not supported keywords
1638 for keyword
, default
in [
1639 ("schema", None), ("metadata", None),
1640 ("split_row_groups", False), ("validate_schema", True),
1641 ("metadata_nthreads", 1)]:
1642 if keyword
in kwargs
and kwargs
[keyword
] is not default
:
1644 "Keyword '{0}' is not yet supported with the new "
1645 "Dataset API".format(keyword
))
1647 # map format arguments
1649 "pre_buffer": pre_buffer
,
1650 "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit
1653 read_options
.update(use_buffered_stream
=True,
1654 buffer_size
=buffer_size
)
1655 if read_dictionary
is not None:
1656 read_options
.update(dictionary_columns
=read_dictionary
)
1658 # map filters to Expressions
1659 self
._filters
= filters
1660 self
._filter
_expression
= filters
and _filters_to_expression(filters
)
1662 # map old filesystems to new one
1663 if filesystem
is not None:
1664 filesystem
= _ensure_filesystem(
1665 filesystem
, use_mmap
=memory_map
)
1666 elif filesystem
is None and memory_map
:
1667 # if memory_map is specified, assume local file system (string
1668 # path can in principle be URI for any filesystem)
1669 filesystem
= LocalFileSystem(use_mmap
=memory_map
)
1671 # This needs to be checked after _ensure_filesystem, because that
1672 # handles the case of an fsspec LocalFileSystem
1674 hasattr(path_or_paths
, "__fspath__") and
1675 filesystem
is not None and
1676 not _is_local_file_system(filesystem
)
1679 "Path-like objects with __fspath__ must only be used with "
1680 f
"local file systems, not {type(filesystem)}"
1683 # check for single fragment dataset
1685 if isinstance(path_or_paths
, list):
1686 if len(path_or_paths
) == 1:
1687 single_file
= path_or_paths
[0]
1689 if _is_path_like(path_or_paths
):
1690 path_or_paths
= _stringify_path(path_or_paths
)
1691 if filesystem
is None:
1692 # path might be a URI describing the FileSystem as well
1694 filesystem
, path_or_paths
= FileSystem
.from_uri(
1697 filesystem
= LocalFileSystem(use_mmap
=memory_map
)
1698 if filesystem
.get_file_info(path_or_paths
).is_file
:
1699 single_file
= path_or_paths
1701 single_file
= path_or_paths
1703 if single_file
is not None:
1704 self
._enable
_parallel
_column
_conversion
= True
1705 read_options
.update(enable_parallel_column_conversion
=True)
1707 parquet_format
= ds
.ParquetFileFormat(**read_options
)
1708 fragment
= parquet_format
.make_fragment(single_file
, filesystem
)
1710 self
._dataset
= ds
.FileSystemDataset(
1711 [fragment
], schema
=fragment
.physical_schema
,
1712 format
=parquet_format
,
1713 filesystem
=fragment
.filesystem
1717 self
._enable
_parallel
_column
_conversion
= False
1719 parquet_format
= ds
.ParquetFileFormat(**read_options
)
1721 # check partitioning to enable dictionary encoding
1722 if partitioning
== "hive":
1723 partitioning
= ds
.HivePartitioning
.discover(
1724 infer_dictionary
=True)
1726 self
._dataset
= ds
.dataset(path_or_paths
, filesystem
=filesystem
,
1727 format
=parquet_format
,
1728 partitioning
=partitioning
,
1729 ignore_prefixes
=ignore_prefixes
)
1733 return self
._dataset
.schema
1735 def read(self
, columns
=None, use_threads
=True, use_pandas_metadata
=False):
1737 Read (multiple) Parquet files as a single pyarrow.Table.
1742 Names of columns to read from the dataset. The partition fields
1743 are not automatically included (in contrast to when setting
1744 ``use_legacy_dataset=True``).
1745 use_threads : bool, default True
1746 Perform multi-threaded column reads.
1747 use_pandas_metadata : bool, default False
1748 If True and file has custom pandas schema metadata, ensure that
1749 index columns are also loaded.
1754 Content of the file as a table (of columns).
1756 # if use_pandas_metadata, we need to include index columns in the
1757 # column selection, to be able to restore those in the pandas DataFrame
1758 metadata
= self
.schema
.metadata
1759 if columns
is not None and use_pandas_metadata
:
1760 if metadata
and b
'pandas' in metadata
:
1761 # RangeIndex can be represented as dict instead of column name
1763 col
for col
in _get_pandas_index_columns(metadata
)
1764 if not isinstance(col
, dict)
1767 list(columns
) + list(set(index_columns
) - set(columns
))
1770 if self
._enable
_parallel
_column
_conversion
:
1772 # Allow per-column parallelism; would otherwise cause
1773 # contention in the presence of per-file parallelism.
1776 table
= self
._dataset
.to_table(
1777 columns
=columns
, filter=self
._filter
_expression
,
1778 use_threads
=use_threads
1781 # if use_pandas_metadata, restore the pandas metadata (which gets
1782 # lost if doing a specific `columns` selection in to_table)
1783 if use_pandas_metadata
:
1784 if metadata
and b
"pandas" in metadata
:
1785 new_metadata
= table
.schema
.metadata
or {}
1786 new_metadata
.update({b
"pandas": metadata
[b
"pandas"]})
1787 table
= table
.replace_schema_metadata(new_metadata
)
1791 def read_pandas(self
, **kwargs
):
1793 Read dataset including pandas metadata, if any. Other arguments passed
1794 through to ParquetDataset.read, see docstring for further details.
1796 return self
.read(use_pandas_metadata
=True, **kwargs
)
1801 _DEPR_MSG
.format("ParquetDataset.pieces",
1802 " Use the '.fragments' attribute instead"),
1803 DeprecationWarning, stacklevel
=2)
1804 return list(self
._dataset
.get_fragments())
1807 def fragments(self
):
1808 return list(self
._dataset
.get_fragments())
1812 return self
._dataset
.files
1815 def filesystem(self
):
1816 return self
._dataset
.filesystem
1819 def partitioning(self
):
1821 The partitioning of the Dataset source, if discovered.
1823 return self
._dataset
.partitioning
1826 _read_table_docstring
= """
1831 source : str, pyarrow.NativeFile, or file-like object
1832 If a string passed, can be a single file name or directory name. For
1833 file-like objects, only read a single file. Use pyarrow.BufferReader to
1834 read a file contained in a bytes or buffer-like object.
1836 If not None, only these columns will be read from the file. A column
1837 name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
1838 'a.c', and 'a.d.e'. If empty, no columns will be read. Note
1839 that the table will still have the correct num_rows set despite having
1841 use_threads : bool, default True
1842 Perform multi-threaded column reads.
1843 metadata : FileMetaData
1844 If separately computed
1846 use_legacy_dataset : bool, default False
1847 By default, `read_table` uses the new Arrow Datasets API since
1848 pyarrow 1.0.0. Among other things, this allows to pass `filters`
1849 for all columns and not only the partition keys, enables
1850 different partitioning schemes, etc.
1851 Set to True to use the legacy behaviour.
1852 ignore_prefixes : list, optional
1853 Files matching any of these prefixes will be ignored by the
1854 discovery process if use_legacy_dataset=False.
1855 This is matched to the basename of a path.
1856 By default this is ['.', '_'].
1857 Note that discovery happens only if a directory is passed as source.
1858 filesystem : FileSystem, default None
1859 If nothing passed, paths assumed to be found in the local on-disk
1861 filters : List[Tuple] or List[List[Tuple]] or None (default)
1862 Rows which do not match the filter predicate will be removed from scanned
1863 data. Partition keys embedded in a nested directory structure will be
1864 exploited to avoid loading files at all if they contain no matching rows.
1865 If `use_legacy_dataset` is True, filters can only reference partition
1866 keys and only a hive-style directory structure is supported. When
1867 setting `use_legacy_dataset` to False, also within-file level filtering
1868 and different partitioning schemes are supported.
1871 pre_buffer : bool, default True
1872 Coalesce and issue file reads in parallel to improve performance on
1873 high-latency filesystems (e.g. S3). If True, Arrow will use a
1874 background I/O thread pool. This option is only supported for
1875 use_legacy_dataset=False. If using a filesystem layer that itself
1876 performs readahead (e.g. fsspec's S3FS), disable readahead for best
1878 coerce_int96_timestamp_unit : str, default None.
1879 Cast timestamps that are stored in INT96 format to a particular
1880 resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
1881 and therefore INT96 timestamps will be infered as timestamps
1890 def read_table(source
, columns
=None, use_threads
=True, metadata
=None,
1891 use_pandas_metadata
=False, memory_map
=False,
1892 read_dictionary
=None, filesystem
=None, filters
=None,
1893 buffer_size
=0, partitioning
="hive", use_legacy_dataset
=False,
1894 ignore_prefixes
=None, pre_buffer
=True,
1895 coerce_int96_timestamp_unit
=None):
1896 if not use_legacy_dataset
:
1897 if metadata
is not None:
1899 "The 'metadata' keyword is no longer supported with the new "
1900 "datasets-based implementation. Specify "
1901 "'use_legacy_dataset=True' to temporarily recover the old "
1905 dataset
= _ParquetDatasetV2(
1907 filesystem
=filesystem
,
1908 partitioning
=partitioning
,
1909 memory_map
=memory_map
,
1910 read_dictionary
=read_dictionary
,
1911 buffer_size
=buffer_size
,
1913 ignore_prefixes
=ignore_prefixes
,
1914 pre_buffer
=pre_buffer
,
1915 coerce_int96_timestamp_unit
=coerce_int96_timestamp_unit
1918 # fall back on ParquetFile for simple cases when pyarrow.dataset
1919 # module is not available
1920 if filters
is not None:
1922 "the 'filters' keyword is not supported when the "
1923 "pyarrow.dataset module is not available"
1925 if partitioning
!= "hive":
1927 "the 'partitioning' keyword is not supported when the "
1928 "pyarrow.dataset module is not available"
1930 filesystem
, path
= _resolve_filesystem_and_path(source
, filesystem
)
1931 if filesystem
is not None:
1932 source
= filesystem
.open_input_file(path
)
1933 # TODO test that source is not a directory or a list
1934 dataset
= ParquetFile(
1935 source
, metadata
=metadata
, read_dictionary
=read_dictionary
,
1936 memory_map
=memory_map
, buffer_size
=buffer_size
,
1937 pre_buffer
=pre_buffer
,
1938 coerce_int96_timestamp_unit
=coerce_int96_timestamp_unit
1941 return dataset
.read(columns
=columns
, use_threads
=use_threads
,
1942 use_pandas_metadata
=use_pandas_metadata
)
1944 if ignore_prefixes
is not None:
1946 "The 'ignore_prefixes' keyword is only supported when "
1947 "use_legacy_dataset=False")
1949 if _is_path_like(source
):
1950 pf
= ParquetDataset(
1951 source
, metadata
=metadata
, memory_map
=memory_map
,
1952 read_dictionary
=read_dictionary
,
1953 buffer_size
=buffer_size
,
1954 filesystem
=filesystem
, filters
=filters
,
1955 partitioning
=partitioning
,
1956 coerce_int96_timestamp_unit
=coerce_int96_timestamp_unit
1960 source
, metadata
=metadata
,
1961 read_dictionary
=read_dictionary
,
1962 memory_map
=memory_map
,
1963 buffer_size
=buffer_size
,
1964 coerce_int96_timestamp_unit
=coerce_int96_timestamp_unit
1966 return pf
.read(columns
=columns
, use_threads
=use_threads
,
1967 use_pandas_metadata
=use_pandas_metadata
)
1970 read_table
.__doc
__ = _read_table_docstring
.format(
1971 """Read a Table from Parquet format
1973 Note: starting with pyarrow 1.0, the default for `use_legacy_dataset` is
1974 switched to False.""",
1975 "\n".join((_read_docstring_common
,
1976 """use_pandas_metadata : bool, default False
1977 If True and file has custom pandas schema metadata, ensure that
1978 index columns are also loaded.""")),
1980 Content of the file as a table (of columns)""",
1984 def read_pandas(source
, columns
=None, **kwargs
):
1986 source
, columns
=columns
, use_pandas_metadata
=True, **kwargs
1990 read_pandas
.__doc
__ = _read_table_docstring
.format(
1991 'Read a Table from Parquet format, also reading DataFrame\n'
1992 'index values if known in the file metadata',
1993 "\n".join((_read_docstring_common
,
1994 """**kwargs : additional options for :func:`read_table`""")),
1996 Content of the file as a Table of Columns, including DataFrame
1997 indexes as columns""",
2001 def write_table(table
, where
, row_group_size
=None, version
='1.0',
2002 use_dictionary
=True, compression
='snappy',
2003 write_statistics
=True,
2004 use_deprecated_int96_timestamps
=None,
2005 coerce_timestamps
=None,
2006 allow_truncated_timestamps
=False,
2007 data_page_size
=None, flavor
=None,
2009 compression_level
=None,
2010 use_byte_stream_split
=False,
2011 data_page_version
='1.0',
2012 use_compliant_nested_type
=False,
2014 row_group_size
= kwargs
.pop('chunk_size', row_group_size
)
2015 use_int96
= use_deprecated_int96_timestamps
2018 where
, table
.schema
,
2019 filesystem
=filesystem
,
2022 use_dictionary
=use_dictionary
,
2023 write_statistics
=write_statistics
,
2024 coerce_timestamps
=coerce_timestamps
,
2025 data_page_size
=data_page_size
,
2026 allow_truncated_timestamps
=allow_truncated_timestamps
,
2027 compression
=compression
,
2028 use_deprecated_int96_timestamps
=use_int96
,
2029 compression_level
=compression_level
,
2030 use_byte_stream_split
=use_byte_stream_split
,
2031 data_page_version
=data_page_version
,
2032 use_compliant_nested_type
=use_compliant_nested_type
,
2033 **kwargs
) as writer
:
2034 writer
.write_table(table
, row_group_size
=row_group_size
)
2036 if _is_path_like(where
):
2038 os
.remove(_stringify_path(where
))
2044 write_table
.__doc
__ = """
2045 Write a Table to Parquet format.
2049 table : pyarrow.Table
2050 where : string or pyarrow.NativeFile
2051 row_group_size : int
2052 The number of rows per rowgroup
2055 Additional options for ParquetWriter
2056 """.format(_parquet_writer_arg_docs
)
2059 def _mkdir_if_not_exists(fs
, path
):
2060 if fs
._isfilestore
() and not fs
.exists(path
):
2064 assert fs
.exists(path
)
2067 def write_to_dataset(table
, root_path
, partition_cols
=None,
2068 partition_filename_cb
=None, filesystem
=None,
2069 use_legacy_dataset
=None, **kwargs
):
2070 """Wrapper around parquet.write_table for writing a Table to
2071 Parquet format by partitions.
2072 For each combination of partition columns and values,
2073 a subdirectories are created in the following
2090 table : pyarrow.Table
2091 root_path : str, pathlib.Path
2092 The root directory of the dataset
2093 filesystem : FileSystem, default None
2094 If nothing passed, paths assumed to be found in the local on-disk
2096 partition_cols : list,
2097 Column names by which to partition the dataset
2098 Columns are partitioned in the order they are given
2099 partition_filename_cb : callable,
2100 A callback function that takes the partition key(s) as an argument
2101 and allow you to override the partition filename. If nothing is
2102 passed, the filename will consist of a uuid.
2103 use_legacy_dataset : bool
2104 Default is True unless a ``pyarrow.fs`` filesystem is passed.
2105 Set to False to enable the new code path (experimental, using the
2106 new Arrow Dataset API). This is more efficient when using partition
2107 columns, but does not (yet) support `partition_filename_cb` and
2108 `metadata_collector` keywords.
2110 Additional kwargs for write_table function. See docstring for
2111 `write_table` or `ParquetWriter` for more information.
2112 Using `metadata_collector` in kwargs allows one to collect the
2113 file metadata instances of dataset pieces. The file paths in the
2114 ColumnChunkMetaData will be set relative to `root_path`.
2116 if use_legacy_dataset
is None:
2117 # if a new filesystem is passed -> default to new implementation
2118 if isinstance(filesystem
, FileSystem
):
2119 use_legacy_dataset
= False
2120 # otherwise the default is still True
2122 use_legacy_dataset
= True
2124 if not use_legacy_dataset
:
2125 import pyarrow
.dataset
as ds
2127 # extract non-file format options
2128 schema
= kwargs
.pop("schema", None)
2129 use_threads
= kwargs
.pop("use_threads", True)
2131 # raise for unsupported keywords
2133 "The '{}' argument is not supported with the new dataset "
2136 metadata_collector
= kwargs
.pop('metadata_collector', None)
2138 if metadata_collector
is not None:
2139 def file_visitor(written_file
):
2140 metadata_collector
.append(written_file
.metadata
)
2141 if partition_filename_cb
is not None:
2142 raise ValueError(msg
.format("partition_filename_cb"))
2144 # map format arguments
2145 parquet_format
= ds
.ParquetFileFormat()
2146 write_options
= parquet_format
.make_write_options(**kwargs
)
2148 # map old filesystems to new one
2149 if filesystem
is not None:
2150 filesystem
= _ensure_filesystem(filesystem
)
2154 part_schema
= table
.select(partition_cols
).schema
2155 partitioning
= ds
.partitioning(part_schema
, flavor
="hive")
2158 table
, root_path
, filesystem
=filesystem
,
2159 format
=parquet_format
, file_options
=write_options
, schema
=schema
,
2160 partitioning
=partitioning
, use_threads
=use_threads
,
2161 file_visitor
=file_visitor
)
2164 fs
, root_path
= legacyfs
.resolve_filesystem_and_path(root_path
, filesystem
)
2166 _mkdir_if_not_exists(fs
, root_path
)
2168 metadata_collector
= kwargs
.pop('metadata_collector', None)
2170 if partition_cols
is not None and len(partition_cols
) > 0:
2171 df
= table
.to_pandas()
2172 partition_keys
= [df
[col
] for col
in partition_cols
]
2173 data_df
= df
.drop(partition_cols
, axis
='columns')
2174 data_cols
= df
.columns
.drop(partition_cols
)
2175 if len(data_cols
) == 0:
2176 raise ValueError('No data left to save outside partition columns')
2178 subschema
= table
.schema
2180 # ARROW-2891: Ensure the output_schema is preserved when writing a
2181 # partitioned dataset
2182 for col
in table
.schema
.names
:
2183 if col
in partition_cols
:
2184 subschema
= subschema
.remove(subschema
.get_field_index(col
))
2186 for keys
, subgroup
in data_df
.groupby(partition_keys
):
2187 if not isinstance(keys
, tuple):
2190 ['{colname}={value}'.format(colname
=name
, value
=val
)
2191 for name
, val
in zip(partition_cols
, keys
)])
2192 subtable
= pa
.Table
.from_pandas(subgroup
, schema
=subschema
,
2194 _mkdir_if_not_exists(fs
, '/'.join([root_path
, subdir
]))
2195 if partition_filename_cb
:
2196 outfile
= partition_filename_cb(keys
)
2198 outfile
= guid() + '.parquet'
2199 relative_path
= '/'.join([subdir
, outfile
])
2200 full_path
= '/'.join([root_path
, relative_path
])
2201 with fs
.open(full_path
, 'wb') as f
:
2202 write_table(subtable
, f
, metadata_collector
=metadata_collector
,
2204 if metadata_collector
is not None:
2205 metadata_collector
[-1].set_file_path(relative_path
)
2207 if partition_filename_cb
:
2208 outfile
= partition_filename_cb(None)
2210 outfile
= guid() + '.parquet'
2211 full_path
= '/'.join([root_path
, outfile
])
2212 with fs
.open(full_path
, 'wb') as f
:
2213 write_table(table
, f
, metadata_collector
=metadata_collector
,
2215 if metadata_collector
is not None:
2216 metadata_collector
[-1].set_file_path(outfile
)
2219 def write_metadata(schema
, where
, metadata_collector
=None, **kwargs
):
2221 Write metadata-only Parquet file from schema. This can be used with
2222 `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
2227 schema : pyarrow.Schema
2228 where : string or pyarrow.NativeFile
2229 metadata_collector : list
2230 where to collect metadata information.
2232 Additional kwargs for ParquetWriter class. See docstring for
2233 `ParquetWriter` for more information.
2238 Write a dataset and collect metadata information.
2240 >>> metadata_collector = []
2241 >>> write_to_dataset(
2242 ... table, root_path,
2243 ... metadata_collector=metadata_collector, **writer_kwargs)
2245 Write the `_common_metadata` parquet file without row groups statistics.
2248 ... table.schema, root_path / '_common_metadata', **writer_kwargs)
2250 Write the `_metadata` parquet file with row groups statistics.
2253 ... table.schema, root_path / '_metadata',
2254 ... metadata_collector=metadata_collector, **writer_kwargs)
2256 writer
= ParquetWriter(where
, schema
, **kwargs
)
2259 if metadata_collector
is not None:
2260 # ParquetWriter doesn't expose the metadata until it's written. Write
2261 # it and read it again.
2262 metadata
= read_metadata(where
)
2263 for m
in metadata_collector
:
2264 metadata
.append_row_groups(m
)
2265 metadata
.write_metadata_file(where
)
2268 def read_metadata(where
, memory_map
=False):
2270 Read FileMetadata from footer of a single Parquet file.
2274 where : str (filepath) or file-like object
2275 memory_map : bool, default False
2276 Create memory map when the source is a file path.
2280 metadata : FileMetadata
2282 return ParquetFile(where
, memory_map
=memory_map
).metadata
2285 def read_schema(where
, memory_map
=False):
2287 Read effective Arrow schema from Parquet file metadata.
2291 where : str (filepath) or file-like object
2292 memory_map : bool, default False
2293 Create memory map when the source is a file path.
2297 schema : pyarrow.Schema
2299 return ParquetFile(where
, memory_map
=memory_map
).schema
.to_arrow_schema()