]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/pyarrow/parquet.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / parquet.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18
19 from collections import defaultdict
20 from concurrent import futures
21 from functools import partial, reduce
22
23 import json
24 from collections.abc import Collection
25 import numpy as np
26 import os
27 import re
28 import operator
29 import urllib.parse
30 import warnings
31
32 import pyarrow as pa
33 import pyarrow.lib as lib
34 import pyarrow._parquet as _parquet
35
36 from pyarrow._parquet import (ParquetReader, Statistics, # noqa
37 FileMetaData, RowGroupMetaData,
38 ColumnChunkMetaData,
39 ParquetSchema, ColumnSchema)
40 from pyarrow.fs import (LocalFileSystem, FileSystem,
41 _resolve_filesystem_and_path, _ensure_filesystem)
42 from pyarrow import filesystem as legacyfs
43 from pyarrow.util import guid, _is_path_like, _stringify_path
44
45 _URI_STRIP_SCHEMES = ('hdfs',)
46
47
48 def _parse_uri(path):
49 path = _stringify_path(path)
50 parsed_uri = urllib.parse.urlparse(path)
51 if parsed_uri.scheme in _URI_STRIP_SCHEMES:
52 return parsed_uri.path
53 else:
54 # ARROW-4073: On Windows returning the path with the scheme
55 # stripped removes the drive letter, if any
56 return path
57
58
59 def _get_filesystem_and_path(passed_filesystem, path):
60 if passed_filesystem is None:
61 return legacyfs.resolve_filesystem_and_path(path, passed_filesystem)
62 else:
63 passed_filesystem = legacyfs._ensure_filesystem(passed_filesystem)
64 parsed_path = _parse_uri(path)
65 return passed_filesystem, parsed_path
66
67
68 def _check_contains_null(val):
69 if isinstance(val, bytes):
70 for byte in val:
71 if isinstance(byte, bytes):
72 compare_to = chr(0)
73 else:
74 compare_to = 0
75 if byte == compare_to:
76 return True
77 elif isinstance(val, str):
78 return '\x00' in val
79 return False
80
81
82 def _check_filters(filters, check_null_strings=True):
83 """
84 Check if filters are well-formed.
85 """
86 if filters is not None:
87 if len(filters) == 0 or any(len(f) == 0 for f in filters):
88 raise ValueError("Malformed filters")
89 if isinstance(filters[0][0], str):
90 # We have encountered the situation where we have one nesting level
91 # too few:
92 # We have [(,,), ..] instead of [[(,,), ..]]
93 filters = [filters]
94 if check_null_strings:
95 for conjunction in filters:
96 for col, op, val in conjunction:
97 if (
98 isinstance(val, list) and
99 all(_check_contains_null(v) for v in val) or
100 _check_contains_null(val)
101 ):
102 raise NotImplementedError(
103 "Null-terminated binary strings are not supported "
104 "as filter values."
105 )
106 return filters
107
108
109 _DNF_filter_doc = """Predicates are expressed in disjunctive normal form (DNF), like
110 ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary boolean logical
111 combinations of single column predicates. The innermost tuples each
112 describe a single column predicate. The list of inner predicates is
113 interpreted as a conjunction (AND), forming a more selective and
114 multiple column predicate. Finally, the most outer list combines these
115 filters as a disjunction (OR).
116
117 Predicates may also be passed as List[Tuple]. This form is interpreted
118 as a single conjunction. To express OR in predicates, one must
119 use the (preferred) List[List[Tuple]] notation.
120
121 Each tuple has format: (``key``, ``op``, ``value``) and compares the
122 ``key`` with the ``value``.
123 The supported ``op`` are: ``=`` or ``==``, ``!=``, ``<``, ``>``, ``<=``,
124 ``>=``, ``in`` and ``not in``. If the ``op`` is ``in`` or ``not in``, the
125 ``value`` must be a collection such as a ``list``, a ``set`` or a
126 ``tuple``.
127
128 Examples:
129
130 .. code-block:: python
131
132 ('x', '=', 0)
133 ('y', 'in', ['a', 'b', 'c'])
134 ('z', 'not in', {'a','b'})
135
136 """
137
138
139 def _filters_to_expression(filters):
140 """
141 Check if filters are well-formed.
142
143 See _DNF_filter_doc above for more details.
144 """
145 import pyarrow.dataset as ds
146
147 if isinstance(filters, ds.Expression):
148 return filters
149
150 filters = _check_filters(filters, check_null_strings=False)
151
152 def convert_single_predicate(col, op, val):
153 field = ds.field(col)
154
155 if op == "=" or op == "==":
156 return field == val
157 elif op == "!=":
158 return field != val
159 elif op == '<':
160 return field < val
161 elif op == '>':
162 return field > val
163 elif op == '<=':
164 return field <= val
165 elif op == '>=':
166 return field >= val
167 elif op == 'in':
168 return field.isin(val)
169 elif op == 'not in':
170 return ~field.isin(val)
171 else:
172 raise ValueError(
173 '"{0}" is not a valid operator in predicates.'.format(
174 (col, op, val)))
175
176 disjunction_members = []
177
178 for conjunction in filters:
179 conjunction_members = [
180 convert_single_predicate(col, op, val)
181 for col, op, val in conjunction
182 ]
183
184 disjunction_members.append(reduce(operator.and_, conjunction_members))
185
186 return reduce(operator.or_, disjunction_members)
187
188
189 # ----------------------------------------------------------------------
190 # Reading a single Parquet file
191
192
193 class ParquetFile:
194 """
195 Reader interface for a single Parquet file.
196
197 Parameters
198 ----------
199 source : str, pathlib.Path, pyarrow.NativeFile, or file-like object
200 Readable source. For passing bytes or buffer-like file containing a
201 Parquet file, use pyarrow.BufferReader.
202 metadata : FileMetaData, default None
203 Use existing metadata object, rather than reading from file.
204 common_metadata : FileMetaData, default None
205 Will be used in reads for pandas schema metadata if not found in the
206 main file's metadata, no other uses at the moment.
207 memory_map : bool, default False
208 If the source is a file path, use a memory map to read file, which can
209 improve performance in some environments.
210 buffer_size : int, default 0
211 If positive, perform read buffering when deserializing individual
212 column chunks. Otherwise IO calls are unbuffered.
213 pre_buffer : bool, default False
214 Coalesce and issue file reads in parallel to improve performance on
215 high-latency filesystems (e.g. S3). If True, Arrow will use a
216 background I/O thread pool.
217 read_dictionary : list
218 List of column names to read directly as DictionaryArray.
219 coerce_int96_timestamp_unit : str, default None.
220 Cast timestamps that are stored in INT96 format to a particular
221 resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
222 and therefore INT96 timestamps will be infered as timestamps
223 in nanoseconds.
224 """
225
226 def __init__(self, source, metadata=None, common_metadata=None,
227 read_dictionary=None, memory_map=False, buffer_size=0,
228 pre_buffer=False, coerce_int96_timestamp_unit=None):
229 self.reader = ParquetReader()
230 self.reader.open(
231 source, use_memory_map=memory_map,
232 buffer_size=buffer_size, pre_buffer=pre_buffer,
233 read_dictionary=read_dictionary, metadata=metadata,
234 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
235 )
236 self.common_metadata = common_metadata
237 self._nested_paths_by_prefix = self._build_nested_paths()
238
239 def _build_nested_paths(self):
240 paths = self.reader.column_paths
241
242 result = defaultdict(list)
243
244 for i, path in enumerate(paths):
245 key = path[0]
246 rest = path[1:]
247 while True:
248 result[key].append(i)
249
250 if not rest:
251 break
252
253 key = '.'.join((key, rest[0]))
254 rest = rest[1:]
255
256 return result
257
258 @property
259 def metadata(self):
260 return self.reader.metadata
261
262 @property
263 def schema(self):
264 """
265 Return the Parquet schema, unconverted to Arrow types
266 """
267 return self.metadata.schema
268
269 @property
270 def schema_arrow(self):
271 """
272 Return the inferred Arrow schema, converted from the whole Parquet
273 file's schema
274 """
275 return self.reader.schema_arrow
276
277 @property
278 def num_row_groups(self):
279 return self.reader.num_row_groups
280
281 def read_row_group(self, i, columns=None, use_threads=True,
282 use_pandas_metadata=False):
283 """
284 Read a single row group from a Parquet file.
285
286 Parameters
287 ----------
288 i : int
289 Index of the individual row group that we want to read.
290 columns : list
291 If not None, only these columns will be read from the row group. A
292 column name may be a prefix of a nested field, e.g. 'a' will select
293 'a.b', 'a.c', and 'a.d.e'.
294 use_threads : bool, default True
295 Perform multi-threaded column reads.
296 use_pandas_metadata : bool, default False
297 If True and file has custom pandas schema metadata, ensure that
298 index columns are also loaded.
299
300 Returns
301 -------
302 pyarrow.table.Table
303 Content of the row group as a table (of columns)
304 """
305 column_indices = self._get_column_indices(
306 columns, use_pandas_metadata=use_pandas_metadata)
307 return self.reader.read_row_group(i, column_indices=column_indices,
308 use_threads=use_threads)
309
310 def read_row_groups(self, row_groups, columns=None, use_threads=True,
311 use_pandas_metadata=False):
312 """
313 Read a multiple row groups from a Parquet file.
314
315 Parameters
316 ----------
317 row_groups : list
318 Only these row groups will be read from the file.
319 columns : list
320 If not None, only these columns will be read from the row group. A
321 column name may be a prefix of a nested field, e.g. 'a' will select
322 'a.b', 'a.c', and 'a.d.e'.
323 use_threads : bool, default True
324 Perform multi-threaded column reads.
325 use_pandas_metadata : bool, default False
326 If True and file has custom pandas schema metadata, ensure that
327 index columns are also loaded.
328
329 Returns
330 -------
331 pyarrow.table.Table
332 Content of the row groups as a table (of columns).
333 """
334 column_indices = self._get_column_indices(
335 columns, use_pandas_metadata=use_pandas_metadata)
336 return self.reader.read_row_groups(row_groups,
337 column_indices=column_indices,
338 use_threads=use_threads)
339
340 def iter_batches(self, batch_size=65536, row_groups=None, columns=None,
341 use_threads=True, use_pandas_metadata=False):
342 """
343 Read streaming batches from a Parquet file
344
345 Parameters
346 ----------
347 batch_size : int, default 64K
348 Maximum number of records to yield per batch. Batches may be
349 smaller if there aren't enough rows in the file.
350 row_groups : list
351 Only these row groups will be read from the file.
352 columns : list
353 If not None, only these columns will be read from the file. A
354 column name may be a prefix of a nested field, e.g. 'a' will select
355 'a.b', 'a.c', and 'a.d.e'.
356 use_threads : boolean, default True
357 Perform multi-threaded column reads.
358 use_pandas_metadata : boolean, default False
359 If True and file has custom pandas schema metadata, ensure that
360 index columns are also loaded.
361
362 Returns
363 -------
364 iterator of pyarrow.RecordBatch
365 Contents of each batch as a record batch
366 """
367 if row_groups is None:
368 row_groups = range(0, self.metadata.num_row_groups)
369 column_indices = self._get_column_indices(
370 columns, use_pandas_metadata=use_pandas_metadata)
371
372 batches = self.reader.iter_batches(batch_size,
373 row_groups=row_groups,
374 column_indices=column_indices,
375 use_threads=use_threads)
376 return batches
377
378 def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
379 """
380 Read a Table from Parquet format,
381
382 Parameters
383 ----------
384 columns : list
385 If not None, only these columns will be read from the file. A
386 column name may be a prefix of a nested field, e.g. 'a' will select
387 'a.b', 'a.c', and 'a.d.e'.
388 use_threads : bool, default True
389 Perform multi-threaded column reads.
390 use_pandas_metadata : bool, default False
391 If True and file has custom pandas schema metadata, ensure that
392 index columns are also loaded.
393
394 Returns
395 -------
396 pyarrow.table.Table
397 Content of the file as a table (of columns).
398 """
399 column_indices = self._get_column_indices(
400 columns, use_pandas_metadata=use_pandas_metadata)
401 return self.reader.read_all(column_indices=column_indices,
402 use_threads=use_threads)
403
404 def scan_contents(self, columns=None, batch_size=65536):
405 """
406 Read contents of file for the given columns and batch size.
407
408 Notes
409 -----
410 This function's primary purpose is benchmarking.
411 The scan is executed on a single thread.
412
413 Parameters
414 ----------
415 columns : list of integers, default None
416 Select columns to read, if None scan all columns.
417 batch_size : int, default 64K
418 Number of rows to read at a time internally.
419
420 Returns
421 -------
422 num_rows : number of rows in file
423 """
424 column_indices = self._get_column_indices(columns)
425 return self.reader.scan_contents(column_indices,
426 batch_size=batch_size)
427
428 def _get_column_indices(self, column_names, use_pandas_metadata=False):
429 if column_names is None:
430 return None
431
432 indices = []
433
434 for name in column_names:
435 if name in self._nested_paths_by_prefix:
436 indices.extend(self._nested_paths_by_prefix[name])
437
438 if use_pandas_metadata:
439 file_keyvalues = self.metadata.metadata
440 common_keyvalues = (self.common_metadata.metadata
441 if self.common_metadata is not None
442 else None)
443
444 if file_keyvalues and b'pandas' in file_keyvalues:
445 index_columns = _get_pandas_index_columns(file_keyvalues)
446 elif common_keyvalues and b'pandas' in common_keyvalues:
447 index_columns = _get_pandas_index_columns(common_keyvalues)
448 else:
449 index_columns = []
450
451 if indices is not None and index_columns:
452 indices += [self.reader.column_name_idx(descr)
453 for descr in index_columns
454 if not isinstance(descr, dict)]
455
456 return indices
457
458
459 _SPARK_DISALLOWED_CHARS = re.compile('[ ,;{}()\n\t=]')
460
461
462 def _sanitized_spark_field_name(name):
463 return _SPARK_DISALLOWED_CHARS.sub('_', name)
464
465
466 def _sanitize_schema(schema, flavor):
467 if 'spark' in flavor:
468 sanitized_fields = []
469
470 schema_changed = False
471
472 for field in schema:
473 name = field.name
474 sanitized_name = _sanitized_spark_field_name(name)
475
476 if sanitized_name != name:
477 schema_changed = True
478 sanitized_field = pa.field(sanitized_name, field.type,
479 field.nullable, field.metadata)
480 sanitized_fields.append(sanitized_field)
481 else:
482 sanitized_fields.append(field)
483
484 new_schema = pa.schema(sanitized_fields, metadata=schema.metadata)
485 return new_schema, schema_changed
486 else:
487 return schema, False
488
489
490 def _sanitize_table(table, new_schema, flavor):
491 # TODO: This will not handle prohibited characters in nested field names
492 if 'spark' in flavor:
493 column_data = [table[i] for i in range(table.num_columns)]
494 return pa.Table.from_arrays(column_data, schema=new_schema)
495 else:
496 return table
497
498
499 _parquet_writer_arg_docs = """version : {"1.0", "2.4", "2.6"}, default "1.0"
500 Determine which Parquet logical types are available for use, whether the
501 reduced set from the Parquet 1.x.x format or the expanded logical types
502 added in later format versions.
503 Files written with version='2.4' or '2.6' may not be readable in all
504 Parquet implementations, so version='1.0' is likely the choice that
505 maximizes file compatibility.
506 UINT32 and some logical types are only available with version '2.4'.
507 Nanosecond timestamps are only available with version '2.6'.
508 Other features such as compression algorithms or the new serialized
509 data page format must be enabled separately (see 'compression' and
510 'data_page_version').
511 use_dictionary : bool or list
512 Specify if we should use dictionary encoding in general or only for
513 some columns.
514 use_deprecated_int96_timestamps : bool, default None
515 Write timestamps to INT96 Parquet format. Defaults to False unless enabled
516 by flavor argument. This take priority over the coerce_timestamps option.
517 coerce_timestamps : str, default None
518 Cast timestamps to a particular resolution. If omitted, defaults are chosen
519 depending on `version`. By default, for ``version='1.0'`` (the default)
520 and ``version='2.4'``, nanoseconds are cast to microseconds ('us'), while
521 for other `version` values, they are written natively without loss
522 of resolution. Seconds are always cast to milliseconds ('ms') by default,
523 as Parquet does not have any temporal type with seconds resolution.
524 If the casting results in loss of data, it will raise an exception
525 unless ``allow_truncated_timestamps=True`` is given.
526 Valid values: {None, 'ms', 'us'}
527 data_page_size : int, default None
528 Set a target threshold for the approximate encoded size of data
529 pages within a column chunk (in bytes). If None, use the default data page
530 size of 1MByte.
531 allow_truncated_timestamps : bool, default False
532 Allow loss of data when coercing timestamps to a particular
533 resolution. E.g. if microsecond or nanosecond data is lost when coercing to
534 'ms', do not raise an exception. Passing ``allow_truncated_timestamp=True``
535 will NOT result in the truncation exception being ignored unless
536 ``coerce_timestamps`` is not None.
537 compression : str or dict
538 Specify the compression codec, either on a general basis or per-column.
539 Valid values: {'NONE', 'SNAPPY', 'GZIP', 'BROTLI', 'LZ4', 'ZSTD'}.
540 write_statistics : bool or list
541 Specify if we should write statistics in general (default is True) or only
542 for some columns.
543 flavor : {'spark'}, default None
544 Sanitize schema or set other compatibility options to work with
545 various target systems.
546 filesystem : FileSystem, default None
547 If nothing passed, will be inferred from `where` if path-like, else
548 `where` is already a file-like object so no filesystem is needed.
549 compression_level : int or dict, default None
550 Specify the compression level for a codec, either on a general basis or
551 per-column. If None is passed, arrow selects the compression level for
552 the compression codec in use. The compression level has a different
553 meaning for each codec, so you have to read the documentation of the
554 codec you are using.
555 An exception is thrown if the compression codec does not allow specifying
556 a compression level.
557 use_byte_stream_split : bool or list, default False
558 Specify if the byte_stream_split encoding should be used in general or
559 only for some columns. If both dictionary and byte_stream_stream are
560 enabled, then dictionary is preferred.
561 The byte_stream_split encoding is valid only for floating-point data types
562 and should be combined with a compression codec.
563 data_page_version : {"1.0", "2.0"}, default "1.0"
564 The serialized Parquet data page format version to write, defaults to
565 1.0. This does not impact the file schema logical types and Arrow to
566 Parquet type casting behavior; for that use the "version" option.
567 use_compliant_nested_type : bool, default False
568 Whether to write compliant Parquet nested type (lists) as defined
569 `here <https://github.com/apache/parquet-format/blob/master/
570 LogicalTypes.md#nested-types>`_, defaults to ``False``.
571 For ``use_compliant_nested_type=True``, this will write into a list
572 with 3-level structure where the middle level, named ``list``,
573 is a repeated group with a single field named ``element``::
574
575 <list-repetition> group <name> (LIST) {
576 repeated group list {
577 <element-repetition> <element-type> element;
578 }
579 }
580
581 For ``use_compliant_nested_type=False``, this will also write into a list
582 with 3-level structure, where the name of the single field of the middle
583 level ``list`` is taken from the element name for nested columns in Arrow,
584 which defaults to ``item``::
585
586 <list-repetition> group <name> (LIST) {
587 repeated group list {
588 <element-repetition> <element-type> item;
589 }
590 }
591 """
592
593
594 class ParquetWriter:
595
596 __doc__ = """
597 Class for incrementally building a Parquet file for Arrow tables.
598
599 Parameters
600 ----------
601 where : path or file-like object
602 schema : arrow Schema
603 {}
604 writer_engine_version : unused
605 **options : dict
606 If options contains a key `metadata_collector` then the
607 corresponding value is assumed to be a list (or any object with
608 `.append` method) that will be filled with the file metadata instance
609 of the written file.
610 """.format(_parquet_writer_arg_docs)
611
612 def __init__(self, where, schema, filesystem=None,
613 flavor=None,
614 version='1.0',
615 use_dictionary=True,
616 compression='snappy',
617 write_statistics=True,
618 use_deprecated_int96_timestamps=None,
619 compression_level=None,
620 use_byte_stream_split=False,
621 writer_engine_version=None,
622 data_page_version='1.0',
623 use_compliant_nested_type=False,
624 **options):
625 if use_deprecated_int96_timestamps is None:
626 # Use int96 timestamps for Spark
627 if flavor is not None and 'spark' in flavor:
628 use_deprecated_int96_timestamps = True
629 else:
630 use_deprecated_int96_timestamps = False
631
632 self.flavor = flavor
633 if flavor is not None:
634 schema, self.schema_changed = _sanitize_schema(schema, flavor)
635 else:
636 self.schema_changed = False
637
638 self.schema = schema
639 self.where = where
640
641 # If we open a file using a filesystem, store file handle so we can be
642 # sure to close it when `self.close` is called.
643 self.file_handle = None
644
645 filesystem, path = _resolve_filesystem_and_path(
646 where, filesystem, allow_legacy_filesystem=True
647 )
648 if filesystem is not None:
649 if isinstance(filesystem, legacyfs.FileSystem):
650 # legacy filesystem (eg custom subclass)
651 # TODO deprecate
652 sink = self.file_handle = filesystem.open(path, 'wb')
653 else:
654 # ARROW-10480: do not auto-detect compression. While
655 # a filename like foo.parquet.gz is nonconforming, it
656 # shouldn't implicitly apply compression.
657 sink = self.file_handle = filesystem.open_output_stream(
658 path, compression=None)
659 else:
660 sink = where
661 self._metadata_collector = options.pop('metadata_collector', None)
662 engine_version = 'V2'
663 self.writer = _parquet.ParquetWriter(
664 sink, schema,
665 version=version,
666 compression=compression,
667 use_dictionary=use_dictionary,
668 write_statistics=write_statistics,
669 use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
670 compression_level=compression_level,
671 use_byte_stream_split=use_byte_stream_split,
672 writer_engine_version=engine_version,
673 data_page_version=data_page_version,
674 use_compliant_nested_type=use_compliant_nested_type,
675 **options)
676 self.is_open = True
677
678 def __del__(self):
679 if getattr(self, 'is_open', False):
680 self.close()
681
682 def __enter__(self):
683 return self
684
685 def __exit__(self, *args, **kwargs):
686 self.close()
687 # return false since we want to propagate exceptions
688 return False
689
690 def write_table(self, table, row_group_size=None):
691 if self.schema_changed:
692 table = _sanitize_table(table, self.schema, self.flavor)
693 assert self.is_open
694
695 if not table.schema.equals(self.schema, check_metadata=False):
696 msg = ('Table schema does not match schema used to create file: '
697 '\ntable:\n{!s} vs. \nfile:\n{!s}'
698 .format(table.schema, self.schema))
699 raise ValueError(msg)
700
701 self.writer.write_table(table, row_group_size=row_group_size)
702
703 def close(self):
704 if self.is_open:
705 self.writer.close()
706 self.is_open = False
707 if self._metadata_collector is not None:
708 self._metadata_collector.append(self.writer.metadata)
709 if self.file_handle is not None:
710 self.file_handle.close()
711
712
713 def _get_pandas_index_columns(keyvalues):
714 return (json.loads(keyvalues[b'pandas'].decode('utf8'))
715 ['index_columns'])
716
717
718 # ----------------------------------------------------------------------
719 # Metadata container providing instructions about reading a single Parquet
720 # file, possibly part of a partitioned dataset
721
722
723 class ParquetDatasetPiece:
724 """
725 DEPRECATED: A single chunk of a potentially larger Parquet dataset to read.
726
727 The arguments will indicate to read either a single row group or all row
728 groups, and whether to add partition keys to the resulting pyarrow.Table.
729
730 .. deprecated:: 5.0
731 Directly constructing a ``ParquetDatasetPiece`` is deprecated, as well
732 as accessing the pieces of a ``ParquetDataset`` object. Specify
733 ``use_legacy_dataset=False`` when constructing the ``ParquetDataset``
734 and use the ``ParquetDataset.fragments`` attribute instead.
735
736 Parameters
737 ----------
738 path : str or pathlib.Path
739 Path to file in the file system where this piece is located.
740 open_file_func : callable
741 Function to use for obtaining file handle to dataset piece.
742 partition_keys : list of tuples
743 Two-element tuples of ``(column name, ordinal index)``.
744 row_group : int, default None
745 Row group to load. By default, reads all row groups.
746 file_options : dict
747 Options
748 """
749
750 def __init__(self, path, open_file_func=partial(open, mode='rb'),
751 file_options=None, row_group=None, partition_keys=None):
752 warnings.warn(
753 "ParquetDatasetPiece is deprecated as of pyarrow 5.0.0 and will "
754 "be removed in a future version.",
755 DeprecationWarning, stacklevel=2)
756 self._init(
757 path, open_file_func, file_options, row_group, partition_keys)
758
759 @staticmethod
760 def _create(path, open_file_func=partial(open, mode='rb'),
761 file_options=None, row_group=None, partition_keys=None):
762 self = ParquetDatasetPiece.__new__(ParquetDatasetPiece)
763 self._init(
764 path, open_file_func, file_options, row_group, partition_keys)
765 return self
766
767 def _init(self, path, open_file_func, file_options, row_group,
768 partition_keys):
769 self.path = _stringify_path(path)
770 self.open_file_func = open_file_func
771 self.row_group = row_group
772 self.partition_keys = partition_keys or []
773 self.file_options = file_options or {}
774
775 def __eq__(self, other):
776 if not isinstance(other, ParquetDatasetPiece):
777 return False
778 return (self.path == other.path and
779 self.row_group == other.row_group and
780 self.partition_keys == other.partition_keys)
781
782 def __repr__(self):
783 return ('{}({!r}, row_group={!r}, partition_keys={!r})'
784 .format(type(self).__name__, self.path,
785 self.row_group,
786 self.partition_keys))
787
788 def __str__(self):
789 result = ''
790
791 if len(self.partition_keys) > 0:
792 partition_str = ', '.join('{}={}'.format(name, index)
793 for name, index in self.partition_keys)
794 result += 'partition[{}] '.format(partition_str)
795
796 result += self.path
797
798 if self.row_group is not None:
799 result += ' | row_group={}'.format(self.row_group)
800
801 return result
802
803 def get_metadata(self):
804 """
805 Return the file's metadata.
806
807 Returns
808 -------
809 metadata : FileMetaData
810 """
811 f = self.open()
812 return f.metadata
813
814 def open(self):
815 """
816 Return instance of ParquetFile.
817 """
818 reader = self.open_file_func(self.path)
819 if not isinstance(reader, ParquetFile):
820 reader = ParquetFile(reader, **self.file_options)
821 return reader
822
823 def read(self, columns=None, use_threads=True, partitions=None,
824 file=None, use_pandas_metadata=False):
825 """
826 Read this piece as a pyarrow.Table.
827
828 Parameters
829 ----------
830 columns : list of column names, default None
831 use_threads : bool, default True
832 Perform multi-threaded column reads.
833 partitions : ParquetPartitions, default None
834 file : file-like object
835 Passed to ParquetFile.
836 use_pandas_metadata : bool
837 If pandas metadata should be used or not.
838
839 Returns
840 -------
841 table : pyarrow.Table
842 """
843 if self.open_file_func is not None:
844 reader = self.open()
845 elif file is not None:
846 reader = ParquetFile(file, **self.file_options)
847 else:
848 # try to read the local path
849 reader = ParquetFile(self.path, **self.file_options)
850
851 options = dict(columns=columns,
852 use_threads=use_threads,
853 use_pandas_metadata=use_pandas_metadata)
854
855 if self.row_group is not None:
856 table = reader.read_row_group(self.row_group, **options)
857 else:
858 table = reader.read(**options)
859
860 if len(self.partition_keys) > 0:
861 if partitions is None:
862 raise ValueError('Must pass partition sets')
863
864 # Here, the index is the categorical code of the partition where
865 # this piece is located. Suppose we had
866 #
867 # /foo=a/0.parq
868 # /foo=b/0.parq
869 # /foo=c/0.parq
870 #
871 # Then we assign a=0, b=1, c=2. And the resulting Table pieces will
872 # have a DictionaryArray column named foo having the constant index
873 # value as indicated. The distinct categories of the partition have
874 # been computed in the ParquetManifest
875 for i, (name, index) in enumerate(self.partition_keys):
876 # The partition code is the same for all values in this piece
877 indices = np.full(len(table), index, dtype='i4')
878
879 # This is set of all partition values, computed as part of the
880 # manifest, so ['a', 'b', 'c'] as in our example above.
881 dictionary = partitions.levels[i].dictionary
882
883 arr = pa.DictionaryArray.from_arrays(indices, dictionary)
884 table = table.append_column(name, arr)
885
886 return table
887
888
889 class PartitionSet:
890 """
891 A data structure for cataloguing the observed Parquet partitions at a
892 particular level. So if we have
893
894 /foo=a/bar=0
895 /foo=a/bar=1
896 /foo=a/bar=2
897 /foo=b/bar=0
898 /foo=b/bar=1
899 /foo=b/bar=2
900
901 Then we have two partition sets, one for foo, another for bar. As we visit
902 levels of the partition hierarchy, a PartitionSet tracks the distinct
903 values and assigns categorical codes to use when reading the pieces
904
905 Parameters
906 ----------
907 name : str
908 Name of the partition set. Under which key to collect all values.
909 keys : list
910 All possible values that have been collected for that partition set.
911 """
912
913 def __init__(self, name, keys=None):
914 self.name = name
915 self.keys = keys or []
916 self.key_indices = {k: i for i, k in enumerate(self.keys)}
917 self._dictionary = None
918
919 def get_index(self, key):
920 """
921 Get the index of the partition value if it is known, otherwise assign
922 one
923
924 Parameters
925 ----------
926 key : The value for which we want to known the index.
927 """
928 if key in self.key_indices:
929 return self.key_indices[key]
930 else:
931 index = len(self.key_indices)
932 self.keys.append(key)
933 self.key_indices[key] = index
934 return index
935
936 @property
937 def dictionary(self):
938 if self._dictionary is not None:
939 return self._dictionary
940
941 if len(self.keys) == 0:
942 raise ValueError('No known partition keys')
943
944 # Only integer and string partition types are supported right now
945 try:
946 integer_keys = [int(x) for x in self.keys]
947 dictionary = lib.array(integer_keys)
948 except ValueError:
949 dictionary = lib.array(self.keys)
950
951 self._dictionary = dictionary
952 return dictionary
953
954 @property
955 def is_sorted(self):
956 return list(self.keys) == sorted(self.keys)
957
958
959 class ParquetPartitions:
960
961 def __init__(self):
962 self.levels = []
963 self.partition_names = set()
964
965 def __len__(self):
966 return len(self.levels)
967
968 def __getitem__(self, i):
969 return self.levels[i]
970
971 def equals(self, other):
972 if not isinstance(other, ParquetPartitions):
973 raise TypeError('`other` must be an instance of ParquetPartitions')
974
975 return (self.levels == other.levels and
976 self.partition_names == other.partition_names)
977
978 def __eq__(self, other):
979 try:
980 return self.equals(other)
981 except TypeError:
982 return NotImplemented
983
984 def get_index(self, level, name, key):
985 """
986 Record a partition value at a particular level, returning the distinct
987 code for that value at that level.
988
989 Example:
990
991 partitions.get_index(1, 'foo', 'a') returns 0
992 partitions.get_index(1, 'foo', 'b') returns 1
993 partitions.get_index(1, 'foo', 'c') returns 2
994 partitions.get_index(1, 'foo', 'a') returns 0
995
996 Parameters
997 ----------
998 level : int
999 The nesting level of the partition we are observing
1000 name : str
1001 The partition name
1002 key : str or int
1003 The partition value
1004 """
1005 if level == len(self.levels):
1006 if name in self.partition_names:
1007 raise ValueError('{} was the name of the partition in '
1008 'another level'.format(name))
1009
1010 part_set = PartitionSet(name)
1011 self.levels.append(part_set)
1012 self.partition_names.add(name)
1013
1014 return self.levels[level].get_index(key)
1015
1016 def filter_accepts_partition(self, part_key, filter, level):
1017 p_column, p_value_index = part_key
1018 f_column, op, f_value = filter
1019 if p_column != f_column:
1020 return True
1021
1022 f_type = type(f_value)
1023
1024 if op in {'in', 'not in'}:
1025 if not isinstance(f_value, Collection):
1026 raise TypeError(
1027 "'%s' object is not a collection", f_type.__name__)
1028 if not f_value:
1029 raise ValueError("Cannot use empty collection as filter value")
1030 if len({type(item) for item in f_value}) != 1:
1031 raise ValueError("All elements of the collection '%s' must be"
1032 " of same type", f_value)
1033 f_type = type(next(iter(f_value)))
1034
1035 elif not isinstance(f_value, str) and isinstance(f_value, Collection):
1036 raise ValueError(
1037 "Op '%s' not supported with a collection value", op)
1038
1039 p_value = f_type(self.levels[level]
1040 .dictionary[p_value_index].as_py())
1041
1042 if op == "=" or op == "==":
1043 return p_value == f_value
1044 elif op == "!=":
1045 return p_value != f_value
1046 elif op == '<':
1047 return p_value < f_value
1048 elif op == '>':
1049 return p_value > f_value
1050 elif op == '<=':
1051 return p_value <= f_value
1052 elif op == '>=':
1053 return p_value >= f_value
1054 elif op == 'in':
1055 return p_value in f_value
1056 elif op == 'not in':
1057 return p_value not in f_value
1058 else:
1059 raise ValueError("'%s' is not a valid operator in predicates.",
1060 filter[1])
1061
1062
1063 class ParquetManifest:
1064
1065 def __init__(self, dirpath, open_file_func=None, filesystem=None,
1066 pathsep='/', partition_scheme='hive', metadata_nthreads=1):
1067 filesystem, dirpath = _get_filesystem_and_path(filesystem, dirpath)
1068 self.filesystem = filesystem
1069 self.open_file_func = open_file_func
1070 self.pathsep = pathsep
1071 self.dirpath = _stringify_path(dirpath)
1072 self.partition_scheme = partition_scheme
1073 self.partitions = ParquetPartitions()
1074 self.pieces = []
1075 self._metadata_nthreads = metadata_nthreads
1076 self._thread_pool = futures.ThreadPoolExecutor(
1077 max_workers=metadata_nthreads)
1078
1079 self.common_metadata_path = None
1080 self.metadata_path = None
1081
1082 self._visit_level(0, self.dirpath, [])
1083
1084 # Due to concurrency, pieces will potentially by out of order if the
1085 # dataset is partitioned so we sort them to yield stable results
1086 self.pieces.sort(key=lambda piece: piece.path)
1087
1088 if self.common_metadata_path is None:
1089 # _common_metadata is a subset of _metadata
1090 self.common_metadata_path = self.metadata_path
1091
1092 self._thread_pool.shutdown()
1093
1094 def _visit_level(self, level, base_path, part_keys):
1095 fs = self.filesystem
1096
1097 _, directories, files = next(fs.walk(base_path))
1098
1099 filtered_files = []
1100 for path in files:
1101 full_path = self.pathsep.join((base_path, path))
1102 if path.endswith('_common_metadata'):
1103 self.common_metadata_path = full_path
1104 elif path.endswith('_metadata'):
1105 self.metadata_path = full_path
1106 elif self._should_silently_exclude(path):
1107 continue
1108 else:
1109 filtered_files.append(full_path)
1110
1111 # ARROW-1079: Filter out "private" directories starting with underscore
1112 filtered_directories = [self.pathsep.join((base_path, x))
1113 for x in directories
1114 if not _is_private_directory(x)]
1115
1116 filtered_files.sort()
1117 filtered_directories.sort()
1118
1119 if len(filtered_files) > 0 and len(filtered_directories) > 0:
1120 raise ValueError('Found files in an intermediate '
1121 'directory: {}'.format(base_path))
1122 elif len(filtered_directories) > 0:
1123 self._visit_directories(level, filtered_directories, part_keys)
1124 else:
1125 self._push_pieces(filtered_files, part_keys)
1126
1127 def _should_silently_exclude(self, file_name):
1128 return (file_name.endswith('.crc') or # Checksums
1129 file_name.endswith('_$folder$') or # HDFS directories in S3
1130 file_name.startswith('.') or # Hidden files starting with .
1131 file_name.startswith('_') or # Hidden files starting with _
1132 file_name in EXCLUDED_PARQUET_PATHS)
1133
1134 def _visit_directories(self, level, directories, part_keys):
1135 futures_list = []
1136 for path in directories:
1137 head, tail = _path_split(path, self.pathsep)
1138 name, key = _parse_hive_partition(tail)
1139
1140 index = self.partitions.get_index(level, name, key)
1141 dir_part_keys = part_keys + [(name, index)]
1142 # If you have less threads than levels, the wait call will block
1143 # indefinitely due to multiple waits within a thread.
1144 if level < self._metadata_nthreads:
1145 future = self._thread_pool.submit(self._visit_level,
1146 level + 1,
1147 path,
1148 dir_part_keys)
1149 futures_list.append(future)
1150 else:
1151 self._visit_level(level + 1, path, dir_part_keys)
1152 if futures_list:
1153 futures.wait(futures_list)
1154
1155 def _parse_partition(self, dirname):
1156 if self.partition_scheme == 'hive':
1157 return _parse_hive_partition(dirname)
1158 else:
1159 raise NotImplementedError('partition schema: {}'
1160 .format(self.partition_scheme))
1161
1162 def _push_pieces(self, files, part_keys):
1163 self.pieces.extend([
1164 ParquetDatasetPiece._create(path, partition_keys=part_keys,
1165 open_file_func=self.open_file_func)
1166 for path in files
1167 ])
1168
1169
1170 def _parse_hive_partition(value):
1171 if '=' not in value:
1172 raise ValueError('Directory name did not appear to be a '
1173 'partition: {}'.format(value))
1174 return value.split('=', 1)
1175
1176
1177 def _is_private_directory(x):
1178 _, tail = os.path.split(x)
1179 return (tail.startswith('_') or tail.startswith('.')) and '=' not in tail
1180
1181
1182 def _path_split(path, sep):
1183 i = path.rfind(sep) + 1
1184 head, tail = path[:i], path[i:]
1185 head = head.rstrip(sep)
1186 return head, tail
1187
1188
1189 EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
1190
1191
1192 class _ParquetDatasetMetadata:
1193 __slots__ = ('fs', 'memory_map', 'read_dictionary', 'common_metadata',
1194 'buffer_size')
1195
1196
1197 def _open_dataset_file(dataset, path, meta=None):
1198 if (dataset.fs is not None and
1199 not isinstance(dataset.fs, legacyfs.LocalFileSystem)):
1200 path = dataset.fs.open(path, mode='rb')
1201 return ParquetFile(
1202 path,
1203 metadata=meta,
1204 memory_map=dataset.memory_map,
1205 read_dictionary=dataset.read_dictionary,
1206 common_metadata=dataset.common_metadata,
1207 buffer_size=dataset.buffer_size
1208 )
1209
1210
1211 _DEPR_MSG = (
1212 "'{}' attribute is deprecated as of pyarrow 5.0.0 and will be removed "
1213 "in a future version.{}"
1214 )
1215
1216
1217 _read_docstring_common = """\
1218 read_dictionary : list, default None
1219 List of names or column paths (for nested types) to read directly
1220 as DictionaryArray. Only supported for BYTE_ARRAY storage. To read
1221 a flat column as dictionary-encoded pass the column name. For
1222 nested types, you must pass the full column "path", which could be
1223 something like level1.level2.list.item. Refer to the Parquet
1224 file's schema to obtain the paths.
1225 memory_map : bool, default False
1226 If the source is a file path, use a memory map to read file, which can
1227 improve performance in some environments.
1228 buffer_size : int, default 0
1229 If positive, perform read buffering when deserializing individual
1230 column chunks. Otherwise IO calls are unbuffered.
1231 partitioning : Partitioning or str or list of str, default "hive"
1232 The partitioning scheme for a partitioned dataset. The default of "hive"
1233 assumes directory names with key=value pairs like "/year=2009/month=11".
1234 In addition, a scheme like "/2009/11" is also supported, in which case
1235 you need to specify the field names or a full schema. See the
1236 ``pyarrow.dataset.partitioning()`` function for more details."""
1237
1238
1239 class ParquetDataset:
1240
1241 __doc__ = """
1242 Encapsulates details of reading a complete Parquet dataset possibly
1243 consisting of multiple files and partitions in subdirectories.
1244
1245 Parameters
1246 ----------
1247 path_or_paths : str or List[str]
1248 A directory name, single file name, or list of file names.
1249 filesystem : FileSystem, default None
1250 If nothing passed, paths assumed to be found in the local on-disk
1251 filesystem.
1252 metadata : pyarrow.parquet.FileMetaData
1253 Use metadata obtained elsewhere to validate file schemas.
1254 schema : pyarrow.parquet.Schema
1255 Use schema obtained elsewhere to validate file schemas. Alternative to
1256 metadata parameter.
1257 split_row_groups : bool, default False
1258 Divide files into pieces for each row group in the file.
1259 validate_schema : bool, default True
1260 Check that individual file schemas are all the same / compatible.
1261 filters : List[Tuple] or List[List[Tuple]] or None (default)
1262 Rows which do not match the filter predicate will be removed from scanned
1263 data. Partition keys embedded in a nested directory structure will be
1264 exploited to avoid loading files at all if they contain no matching rows.
1265 If `use_legacy_dataset` is True, filters can only reference partition
1266 keys and only a hive-style directory structure is supported. When
1267 setting `use_legacy_dataset` to False, also within-file level filtering
1268 and different partitioning schemes are supported.
1269
1270 {1}
1271 metadata_nthreads : int, default 1
1272 How many threads to allow the thread pool which is used to read the
1273 dataset metadata. Increasing this is helpful to read partitioned
1274 datasets.
1275 {0}
1276 use_legacy_dataset : bool, default True
1277 Set to False to enable the new code path (experimental, using the
1278 new Arrow Dataset API). Among other things, this allows to pass
1279 `filters` for all columns and not only the partition keys, enables
1280 different partitioning schemes, etc.
1281 pre_buffer : bool, default True
1282 Coalesce and issue file reads in parallel to improve performance on
1283 high-latency filesystems (e.g. S3). If True, Arrow will use a
1284 background I/O thread pool. This option is only supported for
1285 use_legacy_dataset=False. If using a filesystem layer that itself
1286 performs readahead (e.g. fsspec's S3FS), disable readahead for best
1287 results.
1288 coerce_int96_timestamp_unit : str, default None.
1289 Cast timestamps that are stored in INT96 format to a particular resolution
1290 (e.g. 'ms'). Setting to None is equivalent to 'ns' and therefore INT96
1291 timestamps will be infered as timestamps in nanoseconds.
1292 """.format(_read_docstring_common, _DNF_filter_doc)
1293
1294 def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
1295 metadata=None, split_row_groups=False, validate_schema=True,
1296 filters=None, metadata_nthreads=1, read_dictionary=None,
1297 memory_map=False, buffer_size=0, partitioning="hive",
1298 use_legacy_dataset=None, pre_buffer=True,
1299 coerce_int96_timestamp_unit=None):
1300 if use_legacy_dataset is None:
1301 # if a new filesystem is passed -> default to new implementation
1302 if isinstance(filesystem, FileSystem):
1303 use_legacy_dataset = False
1304 # otherwise the default is still True
1305 else:
1306 use_legacy_dataset = True
1307
1308 if not use_legacy_dataset:
1309 return _ParquetDatasetV2(
1310 path_or_paths, filesystem=filesystem,
1311 filters=filters,
1312 partitioning=partitioning,
1313 read_dictionary=read_dictionary,
1314 memory_map=memory_map,
1315 buffer_size=buffer_size,
1316 pre_buffer=pre_buffer,
1317 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
1318 # unsupported keywords
1319 schema=schema, metadata=metadata,
1320 split_row_groups=split_row_groups,
1321 validate_schema=validate_schema,
1322 metadata_nthreads=metadata_nthreads
1323 )
1324 self = object.__new__(cls)
1325 return self
1326
1327 def __init__(self, path_or_paths, filesystem=None, schema=None,
1328 metadata=None, split_row_groups=False, validate_schema=True,
1329 filters=None, metadata_nthreads=1, read_dictionary=None,
1330 memory_map=False, buffer_size=0, partitioning="hive",
1331 use_legacy_dataset=True, pre_buffer=True,
1332 coerce_int96_timestamp_unit=None):
1333 if partitioning != "hive":
1334 raise ValueError(
1335 'Only "hive" for hive-like partitioning is supported when '
1336 'using use_legacy_dataset=True')
1337 self._metadata = _ParquetDatasetMetadata()
1338 a_path = path_or_paths
1339 if isinstance(a_path, list):
1340 a_path = a_path[0]
1341
1342 self._metadata.fs, _ = _get_filesystem_and_path(filesystem, a_path)
1343 if isinstance(path_or_paths, list):
1344 self.paths = [_parse_uri(path) for path in path_or_paths]
1345 else:
1346 self.paths = _parse_uri(path_or_paths)
1347
1348 self._metadata.read_dictionary = read_dictionary
1349 self._metadata.memory_map = memory_map
1350 self._metadata.buffer_size = buffer_size
1351
1352 (self._pieces,
1353 self._partitions,
1354 self.common_metadata_path,
1355 self.metadata_path) = _make_manifest(
1356 path_or_paths, self._fs, metadata_nthreads=metadata_nthreads,
1357 open_file_func=partial(_open_dataset_file, self._metadata)
1358 )
1359
1360 if self.common_metadata_path is not None:
1361 with self._fs.open(self.common_metadata_path) as f:
1362 self._metadata.common_metadata = read_metadata(
1363 f,
1364 memory_map=memory_map
1365 )
1366 else:
1367 self._metadata.common_metadata = None
1368
1369 if metadata is None and self.metadata_path is not None:
1370 with self._fs.open(self.metadata_path) as f:
1371 self.metadata = read_metadata(f, memory_map=memory_map)
1372 else:
1373 self.metadata = metadata
1374
1375 self.schema = schema
1376
1377 self.split_row_groups = split_row_groups
1378
1379 if split_row_groups:
1380 raise NotImplementedError("split_row_groups not yet implemented")
1381
1382 if filters is not None:
1383 filters = _check_filters(filters)
1384 self._filter(filters)
1385
1386 if validate_schema:
1387 self.validate_schemas()
1388
1389 def equals(self, other):
1390 if not isinstance(other, ParquetDataset):
1391 raise TypeError('`other` must be an instance of ParquetDataset')
1392
1393 if self._fs.__class__ != other._fs.__class__:
1394 return False
1395 for prop in ('paths', '_pieces', '_partitions',
1396 'common_metadata_path', 'metadata_path',
1397 'common_metadata', 'metadata', 'schema',
1398 'split_row_groups'):
1399 if getattr(self, prop) != getattr(other, prop):
1400 return False
1401 for prop in ('memory_map', 'buffer_size'):
1402 if getattr(self._metadata, prop) != getattr(other._metadata, prop):
1403 return False
1404
1405 return True
1406
1407 def __eq__(self, other):
1408 try:
1409 return self.equals(other)
1410 except TypeError:
1411 return NotImplemented
1412
1413 def validate_schemas(self):
1414 if self.metadata is None and self.schema is None:
1415 if self.common_metadata is not None:
1416 self.schema = self.common_metadata.schema
1417 else:
1418 self.schema = self._pieces[0].get_metadata().schema
1419 elif self.schema is None:
1420 self.schema = self.metadata.schema
1421
1422 # Verify schemas are all compatible
1423 dataset_schema = self.schema.to_arrow_schema()
1424 # Exclude the partition columns from the schema, they are provided
1425 # by the path, not the DatasetPiece
1426 if self._partitions is not None:
1427 for partition_name in self._partitions.partition_names:
1428 if dataset_schema.get_field_index(partition_name) != -1:
1429 field_idx = dataset_schema.get_field_index(partition_name)
1430 dataset_schema = dataset_schema.remove(field_idx)
1431
1432 for piece in self._pieces:
1433 file_metadata = piece.get_metadata()
1434 file_schema = file_metadata.schema.to_arrow_schema()
1435 if not dataset_schema.equals(file_schema, check_metadata=False):
1436 raise ValueError('Schema in {!s} was different. \n'
1437 '{!s}\n\nvs\n\n{!s}'
1438 .format(piece, file_schema,
1439 dataset_schema))
1440
1441 def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
1442 """
1443 Read multiple Parquet files as a single pyarrow.Table.
1444
1445 Parameters
1446 ----------
1447 columns : List[str]
1448 Names of columns to read from the file.
1449 use_threads : bool, default True
1450 Perform multi-threaded column reads
1451 use_pandas_metadata : bool, default False
1452 Passed through to each dataset piece.
1453
1454 Returns
1455 -------
1456 pyarrow.Table
1457 Content of the file as a table (of columns).
1458 """
1459 tables = []
1460 for piece in self._pieces:
1461 table = piece.read(columns=columns, use_threads=use_threads,
1462 partitions=self._partitions,
1463 use_pandas_metadata=use_pandas_metadata)
1464 tables.append(table)
1465
1466 all_data = lib.concat_tables(tables)
1467
1468 if use_pandas_metadata:
1469 # We need to ensure that this metadata is set in the Table's schema
1470 # so that Table.to_pandas will construct pandas.DataFrame with the
1471 # right index
1472 common_metadata = self._get_common_pandas_metadata()
1473 current_metadata = all_data.schema.metadata or {}
1474
1475 if common_metadata and b'pandas' not in current_metadata:
1476 all_data = all_data.replace_schema_metadata({
1477 b'pandas': common_metadata})
1478
1479 return all_data
1480
1481 def read_pandas(self, **kwargs):
1482 """
1483 Read dataset including pandas metadata, if any. Other arguments passed
1484 through to ParquetDataset.read, see docstring for further details.
1485
1486 Parameters
1487 ----------
1488 **kwargs : optional
1489 All additional options to pass to the reader.
1490
1491 Returns
1492 -------
1493 pyarrow.Table
1494 Content of the file as a table (of columns).
1495 """
1496 return self.read(use_pandas_metadata=True, **kwargs)
1497
1498 def _get_common_pandas_metadata(self):
1499 if self.common_metadata is None:
1500 return None
1501
1502 keyvalues = self.common_metadata.metadata
1503 return keyvalues.get(b'pandas', None)
1504
1505 def _filter(self, filters):
1506 accepts_filter = self._partitions.filter_accepts_partition
1507
1508 def one_filter_accepts(piece, filter):
1509 return all(accepts_filter(part_key, filter, level)
1510 for level, part_key in enumerate(piece.partition_keys))
1511
1512 def all_filters_accept(piece):
1513 return any(all(one_filter_accepts(piece, f) for f in conjunction)
1514 for conjunction in filters)
1515
1516 self._pieces = [p for p in self._pieces if all_filters_accept(p)]
1517
1518 @property
1519 def pieces(self):
1520 warnings.warn(
1521 _DEPR_MSG.format(
1522 "ParquetDataset.pieces",
1523 " Specify 'use_legacy_dataset=False' while constructing the "
1524 "ParquetDataset, and then use the '.fragments' attribute "
1525 "instead."),
1526 DeprecationWarning, stacklevel=2)
1527 return self._pieces
1528
1529 @property
1530 def partitions(self):
1531 warnings.warn(
1532 _DEPR_MSG.format(
1533 "ParquetDataset.partitions",
1534 " Specify 'use_legacy_dataset=False' while constructing the "
1535 "ParquetDataset, and then use the '.partitioning' attribute "
1536 "instead."),
1537 DeprecationWarning, stacklevel=2)
1538 return self._partitions
1539
1540 @property
1541 def memory_map(self):
1542 warnings.warn(
1543 _DEPR_MSG.format("ParquetDataset.memory_map", ""),
1544 DeprecationWarning, stacklevel=2)
1545 return self._metadata.memory_map
1546
1547 @property
1548 def read_dictionary(self):
1549 warnings.warn(
1550 _DEPR_MSG.format("ParquetDataset.read_dictionary", ""),
1551 DeprecationWarning, stacklevel=2)
1552 return self._metadata.read_dictionary
1553
1554 @property
1555 def buffer_size(self):
1556 warnings.warn(
1557 _DEPR_MSG.format("ParquetDataset.buffer_size", ""),
1558 DeprecationWarning, stacklevel=2)
1559 return self._metadata.buffer_size
1560
1561 _fs = property(
1562 operator.attrgetter('_metadata.fs')
1563 )
1564
1565 @property
1566 def fs(self):
1567 warnings.warn(
1568 _DEPR_MSG.format(
1569 "ParquetDataset.fs",
1570 " Specify 'use_legacy_dataset=False' while constructing the "
1571 "ParquetDataset, and then use the '.filesystem' attribute "
1572 "instead."),
1573 DeprecationWarning, stacklevel=2)
1574 return self._metadata.fs
1575
1576 common_metadata = property(
1577 operator.attrgetter('_metadata.common_metadata')
1578 )
1579
1580
1581 def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1,
1582 open_file_func=None):
1583 partitions = None
1584 common_metadata_path = None
1585 metadata_path = None
1586
1587 if isinstance(path_or_paths, list) and len(path_or_paths) == 1:
1588 # Dask passes a directory as a list of length 1
1589 path_or_paths = path_or_paths[0]
1590
1591 if _is_path_like(path_or_paths) and fs.isdir(path_or_paths):
1592 manifest = ParquetManifest(path_or_paths, filesystem=fs,
1593 open_file_func=open_file_func,
1594 pathsep=getattr(fs, "pathsep", "/"),
1595 metadata_nthreads=metadata_nthreads)
1596 common_metadata_path = manifest.common_metadata_path
1597 metadata_path = manifest.metadata_path
1598 pieces = manifest.pieces
1599 partitions = manifest.partitions
1600 else:
1601 if not isinstance(path_or_paths, list):
1602 path_or_paths = [path_or_paths]
1603
1604 # List of paths
1605 if len(path_or_paths) == 0:
1606 raise ValueError('Must pass at least one file path')
1607
1608 pieces = []
1609 for path in path_or_paths:
1610 if not fs.isfile(path):
1611 raise OSError('Passed non-file path: {}'
1612 .format(path))
1613 piece = ParquetDatasetPiece._create(
1614 path, open_file_func=open_file_func)
1615 pieces.append(piece)
1616
1617 return pieces, partitions, common_metadata_path, metadata_path
1618
1619
1620 def _is_local_file_system(fs):
1621 return isinstance(fs, LocalFileSystem) or isinstance(
1622 fs, legacyfs.LocalFileSystem
1623 )
1624
1625
1626 class _ParquetDatasetV2:
1627 """
1628 ParquetDataset shim using the Dataset API under the hood.
1629 """
1630
1631 def __init__(self, path_or_paths, filesystem=None, filters=None,
1632 partitioning="hive", read_dictionary=None, buffer_size=None,
1633 memory_map=False, ignore_prefixes=None, pre_buffer=True,
1634 coerce_int96_timestamp_unit=None, **kwargs):
1635 import pyarrow.dataset as ds
1636
1637 # Raise error for not supported keywords
1638 for keyword, default in [
1639 ("schema", None), ("metadata", None),
1640 ("split_row_groups", False), ("validate_schema", True),
1641 ("metadata_nthreads", 1)]:
1642 if keyword in kwargs and kwargs[keyword] is not default:
1643 raise ValueError(
1644 "Keyword '{0}' is not yet supported with the new "
1645 "Dataset API".format(keyword))
1646
1647 # map format arguments
1648 read_options = {
1649 "pre_buffer": pre_buffer,
1650 "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit
1651 }
1652 if buffer_size:
1653 read_options.update(use_buffered_stream=True,
1654 buffer_size=buffer_size)
1655 if read_dictionary is not None:
1656 read_options.update(dictionary_columns=read_dictionary)
1657
1658 # map filters to Expressions
1659 self._filters = filters
1660 self._filter_expression = filters and _filters_to_expression(filters)
1661
1662 # map old filesystems to new one
1663 if filesystem is not None:
1664 filesystem = _ensure_filesystem(
1665 filesystem, use_mmap=memory_map)
1666 elif filesystem is None and memory_map:
1667 # if memory_map is specified, assume local file system (string
1668 # path can in principle be URI for any filesystem)
1669 filesystem = LocalFileSystem(use_mmap=memory_map)
1670
1671 # This needs to be checked after _ensure_filesystem, because that
1672 # handles the case of an fsspec LocalFileSystem
1673 if (
1674 hasattr(path_or_paths, "__fspath__") and
1675 filesystem is not None and
1676 not _is_local_file_system(filesystem)
1677 ):
1678 raise TypeError(
1679 "Path-like objects with __fspath__ must only be used with "
1680 f"local file systems, not {type(filesystem)}"
1681 )
1682
1683 # check for single fragment dataset
1684 single_file = None
1685 if isinstance(path_or_paths, list):
1686 if len(path_or_paths) == 1:
1687 single_file = path_or_paths[0]
1688 else:
1689 if _is_path_like(path_or_paths):
1690 path_or_paths = _stringify_path(path_or_paths)
1691 if filesystem is None:
1692 # path might be a URI describing the FileSystem as well
1693 try:
1694 filesystem, path_or_paths = FileSystem.from_uri(
1695 path_or_paths)
1696 except ValueError:
1697 filesystem = LocalFileSystem(use_mmap=memory_map)
1698 if filesystem.get_file_info(path_or_paths).is_file:
1699 single_file = path_or_paths
1700 else:
1701 single_file = path_or_paths
1702
1703 if single_file is not None:
1704 self._enable_parallel_column_conversion = True
1705 read_options.update(enable_parallel_column_conversion=True)
1706
1707 parquet_format = ds.ParquetFileFormat(**read_options)
1708 fragment = parquet_format.make_fragment(single_file, filesystem)
1709
1710 self._dataset = ds.FileSystemDataset(
1711 [fragment], schema=fragment.physical_schema,
1712 format=parquet_format,
1713 filesystem=fragment.filesystem
1714 )
1715 return
1716 else:
1717 self._enable_parallel_column_conversion = False
1718
1719 parquet_format = ds.ParquetFileFormat(**read_options)
1720
1721 # check partitioning to enable dictionary encoding
1722 if partitioning == "hive":
1723 partitioning = ds.HivePartitioning.discover(
1724 infer_dictionary=True)
1725
1726 self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
1727 format=parquet_format,
1728 partitioning=partitioning,
1729 ignore_prefixes=ignore_prefixes)
1730
1731 @property
1732 def schema(self):
1733 return self._dataset.schema
1734
1735 def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
1736 """
1737 Read (multiple) Parquet files as a single pyarrow.Table.
1738
1739 Parameters
1740 ----------
1741 columns : List[str]
1742 Names of columns to read from the dataset. The partition fields
1743 are not automatically included (in contrast to when setting
1744 ``use_legacy_dataset=True``).
1745 use_threads : bool, default True
1746 Perform multi-threaded column reads.
1747 use_pandas_metadata : bool, default False
1748 If True and file has custom pandas schema metadata, ensure that
1749 index columns are also loaded.
1750
1751 Returns
1752 -------
1753 pyarrow.Table
1754 Content of the file as a table (of columns).
1755 """
1756 # if use_pandas_metadata, we need to include index columns in the
1757 # column selection, to be able to restore those in the pandas DataFrame
1758 metadata = self.schema.metadata
1759 if columns is not None and use_pandas_metadata:
1760 if metadata and b'pandas' in metadata:
1761 # RangeIndex can be represented as dict instead of column name
1762 index_columns = [
1763 col for col in _get_pandas_index_columns(metadata)
1764 if not isinstance(col, dict)
1765 ]
1766 columns = (
1767 list(columns) + list(set(index_columns) - set(columns))
1768 )
1769
1770 if self._enable_parallel_column_conversion:
1771 if use_threads:
1772 # Allow per-column parallelism; would otherwise cause
1773 # contention in the presence of per-file parallelism.
1774 use_threads = False
1775
1776 table = self._dataset.to_table(
1777 columns=columns, filter=self._filter_expression,
1778 use_threads=use_threads
1779 )
1780
1781 # if use_pandas_metadata, restore the pandas metadata (which gets
1782 # lost if doing a specific `columns` selection in to_table)
1783 if use_pandas_metadata:
1784 if metadata and b"pandas" in metadata:
1785 new_metadata = table.schema.metadata or {}
1786 new_metadata.update({b"pandas": metadata[b"pandas"]})
1787 table = table.replace_schema_metadata(new_metadata)
1788
1789 return table
1790
1791 def read_pandas(self, **kwargs):
1792 """
1793 Read dataset including pandas metadata, if any. Other arguments passed
1794 through to ParquetDataset.read, see docstring for further details.
1795 """
1796 return self.read(use_pandas_metadata=True, **kwargs)
1797
1798 @property
1799 def pieces(self):
1800 warnings.warn(
1801 _DEPR_MSG.format("ParquetDataset.pieces",
1802 " Use the '.fragments' attribute instead"),
1803 DeprecationWarning, stacklevel=2)
1804 return list(self._dataset.get_fragments())
1805
1806 @property
1807 def fragments(self):
1808 return list(self._dataset.get_fragments())
1809
1810 @property
1811 def files(self):
1812 return self._dataset.files
1813
1814 @property
1815 def filesystem(self):
1816 return self._dataset.filesystem
1817
1818 @property
1819 def partitioning(self):
1820 """
1821 The partitioning of the Dataset source, if discovered.
1822 """
1823 return self._dataset.partitioning
1824
1825
1826 _read_table_docstring = """
1827 {0}
1828
1829 Parameters
1830 ----------
1831 source : str, pyarrow.NativeFile, or file-like object
1832 If a string passed, can be a single file name or directory name. For
1833 file-like objects, only read a single file. Use pyarrow.BufferReader to
1834 read a file contained in a bytes or buffer-like object.
1835 columns : list
1836 If not None, only these columns will be read from the file. A column
1837 name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
1838 'a.c', and 'a.d.e'. If empty, no columns will be read. Note
1839 that the table will still have the correct num_rows set despite having
1840 no columns.
1841 use_threads : bool, default True
1842 Perform multi-threaded column reads.
1843 metadata : FileMetaData
1844 If separately computed
1845 {1}
1846 use_legacy_dataset : bool, default False
1847 By default, `read_table` uses the new Arrow Datasets API since
1848 pyarrow 1.0.0. Among other things, this allows to pass `filters`
1849 for all columns and not only the partition keys, enables
1850 different partitioning schemes, etc.
1851 Set to True to use the legacy behaviour.
1852 ignore_prefixes : list, optional
1853 Files matching any of these prefixes will be ignored by the
1854 discovery process if use_legacy_dataset=False.
1855 This is matched to the basename of a path.
1856 By default this is ['.', '_'].
1857 Note that discovery happens only if a directory is passed as source.
1858 filesystem : FileSystem, default None
1859 If nothing passed, paths assumed to be found in the local on-disk
1860 filesystem.
1861 filters : List[Tuple] or List[List[Tuple]] or None (default)
1862 Rows which do not match the filter predicate will be removed from scanned
1863 data. Partition keys embedded in a nested directory structure will be
1864 exploited to avoid loading files at all if they contain no matching rows.
1865 If `use_legacy_dataset` is True, filters can only reference partition
1866 keys and only a hive-style directory structure is supported. When
1867 setting `use_legacy_dataset` to False, also within-file level filtering
1868 and different partitioning schemes are supported.
1869
1870 {3}
1871 pre_buffer : bool, default True
1872 Coalesce and issue file reads in parallel to improve performance on
1873 high-latency filesystems (e.g. S3). If True, Arrow will use a
1874 background I/O thread pool. This option is only supported for
1875 use_legacy_dataset=False. If using a filesystem layer that itself
1876 performs readahead (e.g. fsspec's S3FS), disable readahead for best
1877 results.
1878 coerce_int96_timestamp_unit : str, default None.
1879 Cast timestamps that are stored in INT96 format to a particular
1880 resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
1881 and therefore INT96 timestamps will be infered as timestamps
1882 in nanoseconds.
1883
1884 Returns
1885 -------
1886 {2}
1887 """
1888
1889
1890 def read_table(source, columns=None, use_threads=True, metadata=None,
1891 use_pandas_metadata=False, memory_map=False,
1892 read_dictionary=None, filesystem=None, filters=None,
1893 buffer_size=0, partitioning="hive", use_legacy_dataset=False,
1894 ignore_prefixes=None, pre_buffer=True,
1895 coerce_int96_timestamp_unit=None):
1896 if not use_legacy_dataset:
1897 if metadata is not None:
1898 raise ValueError(
1899 "The 'metadata' keyword is no longer supported with the new "
1900 "datasets-based implementation. Specify "
1901 "'use_legacy_dataset=True' to temporarily recover the old "
1902 "behaviour."
1903 )
1904 try:
1905 dataset = _ParquetDatasetV2(
1906 source,
1907 filesystem=filesystem,
1908 partitioning=partitioning,
1909 memory_map=memory_map,
1910 read_dictionary=read_dictionary,
1911 buffer_size=buffer_size,
1912 filters=filters,
1913 ignore_prefixes=ignore_prefixes,
1914 pre_buffer=pre_buffer,
1915 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
1916 )
1917 except ImportError:
1918 # fall back on ParquetFile for simple cases when pyarrow.dataset
1919 # module is not available
1920 if filters is not None:
1921 raise ValueError(
1922 "the 'filters' keyword is not supported when the "
1923 "pyarrow.dataset module is not available"
1924 )
1925 if partitioning != "hive":
1926 raise ValueError(
1927 "the 'partitioning' keyword is not supported when the "
1928 "pyarrow.dataset module is not available"
1929 )
1930 filesystem, path = _resolve_filesystem_and_path(source, filesystem)
1931 if filesystem is not None:
1932 source = filesystem.open_input_file(path)
1933 # TODO test that source is not a directory or a list
1934 dataset = ParquetFile(
1935 source, metadata=metadata, read_dictionary=read_dictionary,
1936 memory_map=memory_map, buffer_size=buffer_size,
1937 pre_buffer=pre_buffer,
1938 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
1939 )
1940
1941 return dataset.read(columns=columns, use_threads=use_threads,
1942 use_pandas_metadata=use_pandas_metadata)
1943
1944 if ignore_prefixes is not None:
1945 raise ValueError(
1946 "The 'ignore_prefixes' keyword is only supported when "
1947 "use_legacy_dataset=False")
1948
1949 if _is_path_like(source):
1950 pf = ParquetDataset(
1951 source, metadata=metadata, memory_map=memory_map,
1952 read_dictionary=read_dictionary,
1953 buffer_size=buffer_size,
1954 filesystem=filesystem, filters=filters,
1955 partitioning=partitioning,
1956 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
1957 )
1958 else:
1959 pf = ParquetFile(
1960 source, metadata=metadata,
1961 read_dictionary=read_dictionary,
1962 memory_map=memory_map,
1963 buffer_size=buffer_size,
1964 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
1965 )
1966 return pf.read(columns=columns, use_threads=use_threads,
1967 use_pandas_metadata=use_pandas_metadata)
1968
1969
1970 read_table.__doc__ = _read_table_docstring.format(
1971 """Read a Table from Parquet format
1972
1973 Note: starting with pyarrow 1.0, the default for `use_legacy_dataset` is
1974 switched to False.""",
1975 "\n".join((_read_docstring_common,
1976 """use_pandas_metadata : bool, default False
1977 If True and file has custom pandas schema metadata, ensure that
1978 index columns are also loaded.""")),
1979 """pyarrow.Table
1980 Content of the file as a table (of columns)""",
1981 _DNF_filter_doc)
1982
1983
1984 def read_pandas(source, columns=None, **kwargs):
1985 return read_table(
1986 source, columns=columns, use_pandas_metadata=True, **kwargs
1987 )
1988
1989
1990 read_pandas.__doc__ = _read_table_docstring.format(
1991 'Read a Table from Parquet format, also reading DataFrame\n'
1992 'index values if known in the file metadata',
1993 "\n".join((_read_docstring_common,
1994 """**kwargs : additional options for :func:`read_table`""")),
1995 """pyarrow.Table
1996 Content of the file as a Table of Columns, including DataFrame
1997 indexes as columns""",
1998 _DNF_filter_doc)
1999
2000
2001 def write_table(table, where, row_group_size=None, version='1.0',
2002 use_dictionary=True, compression='snappy',
2003 write_statistics=True,
2004 use_deprecated_int96_timestamps=None,
2005 coerce_timestamps=None,
2006 allow_truncated_timestamps=False,
2007 data_page_size=None, flavor=None,
2008 filesystem=None,
2009 compression_level=None,
2010 use_byte_stream_split=False,
2011 data_page_version='1.0',
2012 use_compliant_nested_type=False,
2013 **kwargs):
2014 row_group_size = kwargs.pop('chunk_size', row_group_size)
2015 use_int96 = use_deprecated_int96_timestamps
2016 try:
2017 with ParquetWriter(
2018 where, table.schema,
2019 filesystem=filesystem,
2020 version=version,
2021 flavor=flavor,
2022 use_dictionary=use_dictionary,
2023 write_statistics=write_statistics,
2024 coerce_timestamps=coerce_timestamps,
2025 data_page_size=data_page_size,
2026 allow_truncated_timestamps=allow_truncated_timestamps,
2027 compression=compression,
2028 use_deprecated_int96_timestamps=use_int96,
2029 compression_level=compression_level,
2030 use_byte_stream_split=use_byte_stream_split,
2031 data_page_version=data_page_version,
2032 use_compliant_nested_type=use_compliant_nested_type,
2033 **kwargs) as writer:
2034 writer.write_table(table, row_group_size=row_group_size)
2035 except Exception:
2036 if _is_path_like(where):
2037 try:
2038 os.remove(_stringify_path(where))
2039 except os.error:
2040 pass
2041 raise
2042
2043
2044 write_table.__doc__ = """
2045 Write a Table to Parquet format.
2046
2047 Parameters
2048 ----------
2049 table : pyarrow.Table
2050 where : string or pyarrow.NativeFile
2051 row_group_size : int
2052 The number of rows per rowgroup
2053 {}
2054 **kwargs : optional
2055 Additional options for ParquetWriter
2056 """.format(_parquet_writer_arg_docs)
2057
2058
2059 def _mkdir_if_not_exists(fs, path):
2060 if fs._isfilestore() and not fs.exists(path):
2061 try:
2062 fs.mkdir(path)
2063 except OSError:
2064 assert fs.exists(path)
2065
2066
2067 def write_to_dataset(table, root_path, partition_cols=None,
2068 partition_filename_cb=None, filesystem=None,
2069 use_legacy_dataset=None, **kwargs):
2070 """Wrapper around parquet.write_table for writing a Table to
2071 Parquet format by partitions.
2072 For each combination of partition columns and values,
2073 a subdirectories are created in the following
2074 manner:
2075
2076 root_dir/
2077 group1=value1
2078 group2=value1
2079 <uuid>.parquet
2080 group2=value2
2081 <uuid>.parquet
2082 group1=valueN
2083 group2=value1
2084 <uuid>.parquet
2085 group2=valueN
2086 <uuid>.parquet
2087
2088 Parameters
2089 ----------
2090 table : pyarrow.Table
2091 root_path : str, pathlib.Path
2092 The root directory of the dataset
2093 filesystem : FileSystem, default None
2094 If nothing passed, paths assumed to be found in the local on-disk
2095 filesystem
2096 partition_cols : list,
2097 Column names by which to partition the dataset
2098 Columns are partitioned in the order they are given
2099 partition_filename_cb : callable,
2100 A callback function that takes the partition key(s) as an argument
2101 and allow you to override the partition filename. If nothing is
2102 passed, the filename will consist of a uuid.
2103 use_legacy_dataset : bool
2104 Default is True unless a ``pyarrow.fs`` filesystem is passed.
2105 Set to False to enable the new code path (experimental, using the
2106 new Arrow Dataset API). This is more efficient when using partition
2107 columns, but does not (yet) support `partition_filename_cb` and
2108 `metadata_collector` keywords.
2109 **kwargs : dict,
2110 Additional kwargs for write_table function. See docstring for
2111 `write_table` or `ParquetWriter` for more information.
2112 Using `metadata_collector` in kwargs allows one to collect the
2113 file metadata instances of dataset pieces. The file paths in the
2114 ColumnChunkMetaData will be set relative to `root_path`.
2115 """
2116 if use_legacy_dataset is None:
2117 # if a new filesystem is passed -> default to new implementation
2118 if isinstance(filesystem, FileSystem):
2119 use_legacy_dataset = False
2120 # otherwise the default is still True
2121 else:
2122 use_legacy_dataset = True
2123
2124 if not use_legacy_dataset:
2125 import pyarrow.dataset as ds
2126
2127 # extract non-file format options
2128 schema = kwargs.pop("schema", None)
2129 use_threads = kwargs.pop("use_threads", True)
2130
2131 # raise for unsupported keywords
2132 msg = (
2133 "The '{}' argument is not supported with the new dataset "
2134 "implementation."
2135 )
2136 metadata_collector = kwargs.pop('metadata_collector', None)
2137 file_visitor = None
2138 if metadata_collector is not None:
2139 def file_visitor(written_file):
2140 metadata_collector.append(written_file.metadata)
2141 if partition_filename_cb is not None:
2142 raise ValueError(msg.format("partition_filename_cb"))
2143
2144 # map format arguments
2145 parquet_format = ds.ParquetFileFormat()
2146 write_options = parquet_format.make_write_options(**kwargs)
2147
2148 # map old filesystems to new one
2149 if filesystem is not None:
2150 filesystem = _ensure_filesystem(filesystem)
2151
2152 partitioning = None
2153 if partition_cols:
2154 part_schema = table.select(partition_cols).schema
2155 partitioning = ds.partitioning(part_schema, flavor="hive")
2156
2157 ds.write_dataset(
2158 table, root_path, filesystem=filesystem,
2159 format=parquet_format, file_options=write_options, schema=schema,
2160 partitioning=partitioning, use_threads=use_threads,
2161 file_visitor=file_visitor)
2162 return
2163
2164 fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem)
2165
2166 _mkdir_if_not_exists(fs, root_path)
2167
2168 metadata_collector = kwargs.pop('metadata_collector', None)
2169
2170 if partition_cols is not None and len(partition_cols) > 0:
2171 df = table.to_pandas()
2172 partition_keys = [df[col] for col in partition_cols]
2173 data_df = df.drop(partition_cols, axis='columns')
2174 data_cols = df.columns.drop(partition_cols)
2175 if len(data_cols) == 0:
2176 raise ValueError('No data left to save outside partition columns')
2177
2178 subschema = table.schema
2179
2180 # ARROW-2891: Ensure the output_schema is preserved when writing a
2181 # partitioned dataset
2182 for col in table.schema.names:
2183 if col in partition_cols:
2184 subschema = subschema.remove(subschema.get_field_index(col))
2185
2186 for keys, subgroup in data_df.groupby(partition_keys):
2187 if not isinstance(keys, tuple):
2188 keys = (keys,)
2189 subdir = '/'.join(
2190 ['{colname}={value}'.format(colname=name, value=val)
2191 for name, val in zip(partition_cols, keys)])
2192 subtable = pa.Table.from_pandas(subgroup, schema=subschema,
2193 safe=False)
2194 _mkdir_if_not_exists(fs, '/'.join([root_path, subdir]))
2195 if partition_filename_cb:
2196 outfile = partition_filename_cb(keys)
2197 else:
2198 outfile = guid() + '.parquet'
2199 relative_path = '/'.join([subdir, outfile])
2200 full_path = '/'.join([root_path, relative_path])
2201 with fs.open(full_path, 'wb') as f:
2202 write_table(subtable, f, metadata_collector=metadata_collector,
2203 **kwargs)
2204 if metadata_collector is not None:
2205 metadata_collector[-1].set_file_path(relative_path)
2206 else:
2207 if partition_filename_cb:
2208 outfile = partition_filename_cb(None)
2209 else:
2210 outfile = guid() + '.parquet'
2211 full_path = '/'.join([root_path, outfile])
2212 with fs.open(full_path, 'wb') as f:
2213 write_table(table, f, metadata_collector=metadata_collector,
2214 **kwargs)
2215 if metadata_collector is not None:
2216 metadata_collector[-1].set_file_path(outfile)
2217
2218
2219 def write_metadata(schema, where, metadata_collector=None, **kwargs):
2220 """
2221 Write metadata-only Parquet file from schema. This can be used with
2222 `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
2223 files.
2224
2225 Parameters
2226 ----------
2227 schema : pyarrow.Schema
2228 where : string or pyarrow.NativeFile
2229 metadata_collector : list
2230 where to collect metadata information.
2231 **kwargs : dict,
2232 Additional kwargs for ParquetWriter class. See docstring for
2233 `ParquetWriter` for more information.
2234
2235 Examples
2236 --------
2237
2238 Write a dataset and collect metadata information.
2239
2240 >>> metadata_collector = []
2241 >>> write_to_dataset(
2242 ... table, root_path,
2243 ... metadata_collector=metadata_collector, **writer_kwargs)
2244
2245 Write the `_common_metadata` parquet file without row groups statistics.
2246
2247 >>> write_metadata(
2248 ... table.schema, root_path / '_common_metadata', **writer_kwargs)
2249
2250 Write the `_metadata` parquet file with row groups statistics.
2251
2252 >>> write_metadata(
2253 ... table.schema, root_path / '_metadata',
2254 ... metadata_collector=metadata_collector, **writer_kwargs)
2255 """
2256 writer = ParquetWriter(where, schema, **kwargs)
2257 writer.close()
2258
2259 if metadata_collector is not None:
2260 # ParquetWriter doesn't expose the metadata until it's written. Write
2261 # it and read it again.
2262 metadata = read_metadata(where)
2263 for m in metadata_collector:
2264 metadata.append_row_groups(m)
2265 metadata.write_metadata_file(where)
2266
2267
2268 def read_metadata(where, memory_map=False):
2269 """
2270 Read FileMetadata from footer of a single Parquet file.
2271
2272 Parameters
2273 ----------
2274 where : str (filepath) or file-like object
2275 memory_map : bool, default False
2276 Create memory map when the source is a file path.
2277
2278 Returns
2279 -------
2280 metadata : FileMetadata
2281 """
2282 return ParquetFile(where, memory_map=memory_map).metadata
2283
2284
2285 def read_schema(where, memory_map=False):
2286 """
2287 Read effective Arrow schema from Parquet file metadata.
2288
2289 Parameters
2290 ----------
2291 where : str (filepath) or file-like object
2292 memory_map : bool, default False
2293 Create memory map when the source is a file path.
2294
2295 Returns
2296 -------
2297 schema : pyarrow.Schema
2298 """
2299 return ParquetFile(where, memory_map=memory_map).schema.to_arrow_schema()