]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/pyarrow/dataset.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / dataset.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 """Dataset is currently unstable. APIs subject to change without notice."""
19
20 import pyarrow as pa
21 from pyarrow.util import _is_iterable, _stringify_path, _is_path_like
22
23 from pyarrow._dataset import ( # noqa
24 CsvFileFormat,
25 CsvFragmentScanOptions,
26 Expression,
27 Dataset,
28 DatasetFactory,
29 DirectoryPartitioning,
30 FileFormat,
31 FileFragment,
32 FileSystemDataset,
33 FileSystemDatasetFactory,
34 FileSystemFactoryOptions,
35 FileWriteOptions,
36 Fragment,
37 HivePartitioning,
38 IpcFileFormat,
39 IpcFileWriteOptions,
40 InMemoryDataset,
41 ParquetDatasetFactory,
42 ParquetFactoryOptions,
43 ParquetFileFormat,
44 ParquetFileFragment,
45 ParquetFileWriteOptions,
46 ParquetFragmentScanOptions,
47 ParquetReadOptions,
48 Partitioning,
49 PartitioningFactory,
50 RowGroupInfo,
51 Scanner,
52 TaggedRecordBatch,
53 UnionDataset,
54 UnionDatasetFactory,
55 _get_partition_keys,
56 _filesystemdataset_write,
57 )
58
59 _orc_available = False
60 _orc_msg = (
61 "The pyarrow installation is not built with support for the ORC file "
62 "format."
63 )
64
65 try:
66 from pyarrow._dataset_orc import OrcFileFormat
67 _orc_available = True
68 except ImportError:
69 pass
70
71
72 def __getattr__(name):
73 if name == "OrcFileFormat" and not _orc_available:
74 raise ImportError(_orc_msg)
75
76 raise AttributeError(
77 "module 'pyarrow.dataset' has no attribute '{0}'".format(name)
78 )
79
80
81 def field(name):
82 """Reference a named column of the dataset.
83
84 Stores only the field's name. Type and other information is known only when
85 the expression is bound to a dataset having an explicit scheme.
86
87 Parameters
88 ----------
89 name : string
90 The name of the field the expression references to.
91
92 Returns
93 -------
94 field_expr : Expression
95 """
96 return Expression._field(name)
97
98
99 def scalar(value):
100 """Expression representing a scalar value.
101
102 Parameters
103 ----------
104 value : bool, int, float or string
105 Python value of the scalar. Note that only a subset of types are
106 currently supported.
107
108 Returns
109 -------
110 scalar_expr : Expression
111 """
112 return Expression._scalar(value)
113
114
115 def partitioning(schema=None, field_names=None, flavor=None,
116 dictionaries=None):
117 """
118 Specify a partitioning scheme.
119
120 The supported schemes include:
121
122 - "DirectoryPartitioning": this scheme expects one segment in the file path
123 for each field in the specified schema (all fields are required to be
124 present). For example given schema<year:int16, month:int8> the path
125 "/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
126 - "HivePartitioning": a scheme for "/$key=$value/" nested directories as
127 found in Apache Hive. This is a multi-level, directory based partitioning
128 scheme. Data is partitioned by static values of a particular column in
129 the schema. Partition keys are represented in the form $key=$value in
130 directory names. Field order is ignored, as are missing or unrecognized
131 field names.
132 For example, given schema<year:int16, month:int8, day:int8>, a possible
133 path would be "/year=2009/month=11/day=15" (but the field order does not
134 need to match).
135
136 Parameters
137 ----------
138 schema : pyarrow.Schema, default None
139 The schema that describes the partitions present in the file path.
140 If not specified, and `field_names` and/or `flavor` are specified,
141 the schema will be inferred from the file path (and a
142 PartitioningFactory is returned).
143 field_names : list of str, default None
144 A list of strings (field names). If specified, the schema's types are
145 inferred from the file paths (only valid for DirectoryPartitioning).
146 flavor : str, default None
147 The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
148 a HivePartitioning.
149 dictionaries : Dict[str, Array]
150 If the type of any field of `schema` is a dictionary type, the
151 corresponding entry of `dictionaries` must be an array containing
152 every value which may be taken by the corresponding column or an
153 error will be raised in parsing. Alternatively, pass `infer` to have
154 Arrow discover the dictionary values, in which case a
155 PartitioningFactory is returned.
156
157 Returns
158 -------
159 Partitioning or PartitioningFactory
160
161 Examples
162 --------
163
164 Specify the Schema for paths like "/2009/June":
165
166 >>> partitioning(pa.schema([("year", pa.int16()), ("month", pa.string())]))
167
168 or let the types be inferred by only specifying the field names:
169
170 >>> partitioning(field_names=["year", "month"])
171
172 For paths like "/2009/June", the year will be inferred as int32 while month
173 will be inferred as string.
174
175 Specify a Schema with dictionary encoding, providing dictionary values:
176
177 >>> partitioning(
178 ... pa.schema([
179 ... ("year", pa.int16()),
180 ... ("month", pa.dictionary(pa.int8(), pa.string()))
181 ... ]),
182 ... dictionaries={
183 ... "month": pa.array(["January", "February", "March"]),
184 ... })
185
186 Alternatively, specify a Schema with dictionary encoding, but have Arrow
187 infer the dictionary values:
188
189 >>> partitioning(
190 ... pa.schema([
191 ... ("year", pa.int16()),
192 ... ("month", pa.dictionary(pa.int8(), pa.string()))
193 ... ]),
194 ... dictionaries="infer")
195
196 Create a Hive scheme for a path like "/year=2009/month=11":
197
198 >>> partitioning(
199 ... pa.schema([("year", pa.int16()), ("month", pa.int8())]),
200 ... flavor="hive")
201
202 A Hive scheme can also be discovered from the directory structure (and
203 types will be inferred):
204
205 >>> partitioning(flavor="hive")
206
207 """
208 if flavor is None:
209 # default flavor
210 if schema is not None:
211 if field_names is not None:
212 raise ValueError(
213 "Cannot specify both 'schema' and 'field_names'")
214 if dictionaries == 'infer':
215 return DirectoryPartitioning.discover(schema=schema)
216 return DirectoryPartitioning(schema, dictionaries)
217 elif field_names is not None:
218 if isinstance(field_names, list):
219 return DirectoryPartitioning.discover(field_names)
220 else:
221 raise ValueError(
222 "Expected list of field names, got {}".format(
223 type(field_names)))
224 else:
225 raise ValueError(
226 "For the default directory flavor, need to specify "
227 "a Schema or a list of field names")
228 elif flavor == 'hive':
229 if field_names is not None:
230 raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
231 elif schema is not None:
232 if isinstance(schema, pa.Schema):
233 if dictionaries == 'infer':
234 return HivePartitioning.discover(schema=schema)
235 return HivePartitioning(schema, dictionaries)
236 else:
237 raise ValueError(
238 "Expected Schema for 'schema', got {}".format(
239 type(schema)))
240 else:
241 return HivePartitioning.discover()
242 else:
243 raise ValueError("Unsupported flavor")
244
245
246 def _ensure_partitioning(scheme):
247 """
248 Validate input and return a Partitioning(Factory).
249
250 It passes None through if no partitioning scheme is defined.
251 """
252 if scheme is None:
253 pass
254 elif isinstance(scheme, str):
255 scheme = partitioning(flavor=scheme)
256 elif isinstance(scheme, list):
257 scheme = partitioning(field_names=scheme)
258 elif isinstance(scheme, (Partitioning, PartitioningFactory)):
259 pass
260 else:
261 ValueError("Expected Partitioning or PartitioningFactory, got {}"
262 .format(type(scheme)))
263 return scheme
264
265
266 def _ensure_format(obj):
267 if isinstance(obj, FileFormat):
268 return obj
269 elif obj == "parquet":
270 return ParquetFileFormat()
271 elif obj in {"ipc", "arrow", "feather"}:
272 return IpcFileFormat()
273 elif obj == "csv":
274 return CsvFileFormat()
275 elif obj == "orc":
276 if not _orc_available:
277 raise ValueError(_orc_msg)
278 return OrcFileFormat()
279 else:
280 raise ValueError("format '{}' is not supported".format(obj))
281
282
283 def _ensure_multiple_sources(paths, filesystem=None):
284 """
285 Treat a list of paths as files belonging to a single file system
286
287 If the file system is local then also validates that all paths
288 are referencing existing *files* otherwise any non-file paths will be
289 silently skipped (for example on a remote filesystem).
290
291 Parameters
292 ----------
293 paths : list of path-like
294 Note that URIs are not allowed.
295 filesystem : FileSystem or str, optional
296 If an URI is passed, then its path component will act as a prefix for
297 the file paths.
298
299 Returns
300 -------
301 (FileSystem, list of str)
302 File system object and a list of normalized paths.
303
304 Raises
305 ------
306 TypeError
307 If the passed filesystem has wrong type.
308 IOError
309 If the file system is local and a referenced path is not available or
310 not a file.
311 """
312 from pyarrow.fs import (
313 LocalFileSystem, SubTreeFileSystem, _MockFileSystem, FileType,
314 _ensure_filesystem
315 )
316
317 if filesystem is None:
318 # fall back to local file system as the default
319 filesystem = LocalFileSystem()
320 else:
321 # construct a filesystem if it is a valid URI
322 filesystem = _ensure_filesystem(filesystem)
323
324 is_local = (
325 isinstance(filesystem, (LocalFileSystem, _MockFileSystem)) or
326 (isinstance(filesystem, SubTreeFileSystem) and
327 isinstance(filesystem.base_fs, LocalFileSystem))
328 )
329
330 # allow normalizing irregular paths such as Windows local paths
331 paths = [filesystem.normalize_path(_stringify_path(p)) for p in paths]
332
333 # validate that all of the paths are pointing to existing *files*
334 # possible improvement is to group the file_infos by type and raise for
335 # multiple paths per error category
336 if is_local:
337 for info in filesystem.get_file_info(paths):
338 file_type = info.type
339 if file_type == FileType.File:
340 continue
341 elif file_type == FileType.NotFound:
342 raise FileNotFoundError(info.path)
343 elif file_type == FileType.Directory:
344 raise IsADirectoryError(
345 'Path {} points to a directory, but only file paths are '
346 'supported. To construct a nested or union dataset pass '
347 'a list of dataset objects instead.'.format(info.path)
348 )
349 else:
350 raise IOError(
351 'Path {} exists but its type is unknown (could be a '
352 'special file such as a Unix socket or character device, '
353 'or Windows NUL / CON / ...)'.format(info.path)
354 )
355
356 return filesystem, paths
357
358
359 def _ensure_single_source(path, filesystem=None):
360 """
361 Treat path as either a recursively traversable directory or a single file.
362
363 Parameters
364 ----------
365 path : path-like
366 filesystem : FileSystem or str, optional
367 If an URI is passed, then its path component will act as a prefix for
368 the file paths.
369
370 Returns
371 -------
372 (FileSystem, list of str or fs.Selector)
373 File system object and either a single item list pointing to a file or
374 an fs.Selector object pointing to a directory.
375
376 Raises
377 ------
378 TypeError
379 If the passed filesystem has wrong type.
380 FileNotFoundError
381 If the referenced file or directory doesn't exist.
382 """
383 from pyarrow.fs import FileType, FileSelector, _resolve_filesystem_and_path
384
385 # at this point we already checked that `path` is a path-like
386 filesystem, path = _resolve_filesystem_and_path(path, filesystem)
387
388 # ensure that the path is normalized before passing to dataset discovery
389 path = filesystem.normalize_path(path)
390
391 # retrieve the file descriptor
392 file_info = filesystem.get_file_info(path)
393
394 # depending on the path type either return with a recursive
395 # directory selector or as a list containing a single file
396 if file_info.type == FileType.Directory:
397 paths_or_selector = FileSelector(path, recursive=True)
398 elif file_info.type == FileType.File:
399 paths_or_selector = [path]
400 else:
401 raise FileNotFoundError(path)
402
403 return filesystem, paths_or_selector
404
405
406 def _filesystem_dataset(source, schema=None, filesystem=None,
407 partitioning=None, format=None,
408 partition_base_dir=None, exclude_invalid_files=None,
409 selector_ignore_prefixes=None):
410 """
411 Create a FileSystemDataset which can be used to build a Dataset.
412
413 Parameters are documented in the dataset function.
414
415 Returns
416 -------
417 FileSystemDataset
418 """
419 format = _ensure_format(format or 'parquet')
420 partitioning = _ensure_partitioning(partitioning)
421
422 if isinstance(source, (list, tuple)):
423 fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
424 else:
425 fs, paths_or_selector = _ensure_single_source(source, filesystem)
426
427 options = FileSystemFactoryOptions(
428 partitioning=partitioning,
429 partition_base_dir=partition_base_dir,
430 exclude_invalid_files=exclude_invalid_files,
431 selector_ignore_prefixes=selector_ignore_prefixes
432 )
433 factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options)
434
435 return factory.finish(schema)
436
437
438 def _in_memory_dataset(source, schema=None, **kwargs):
439 if any(v is not None for v in kwargs.values()):
440 raise ValueError(
441 "For in-memory datasets, you cannot pass any additional arguments")
442 return InMemoryDataset(source, schema)
443
444
445 def _union_dataset(children, schema=None, **kwargs):
446 if any(v is not None for v in kwargs.values()):
447 raise ValueError(
448 "When passing a list of Datasets, you cannot pass any additional "
449 "arguments"
450 )
451
452 if schema is None:
453 # unify the children datasets' schemas
454 schema = pa.unify_schemas([child.schema for child in children])
455
456 # create datasets with the requested schema
457 children = [child.replace_schema(schema) for child in children]
458
459 return UnionDataset(schema, children)
460
461
462 def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None,
463 partitioning=None, partition_base_dir=None):
464 """
465 Create a FileSystemDataset from a `_metadata` file created via
466 `pyarrrow.parquet.write_metadata`.
467
468 Parameters
469 ----------
470 metadata_path : path,
471 Path pointing to a single file parquet metadata file
472 schema : Schema, optional
473 Optionally provide the Schema for the Dataset, in which case it will
474 not be inferred from the source.
475 filesystem : FileSystem or URI string, default None
476 If a single path is given as source and filesystem is None, then the
477 filesystem will be inferred from the path.
478 If an URI string is passed, then a filesystem object is constructed
479 using the URI's optional path component as a directory prefix. See the
480 examples below.
481 Note that the URIs on Windows must follow 'file:///C:...' or
482 'file:/C:...' patterns.
483 format : ParquetFileFormat
484 An instance of a ParquetFileFormat if special options needs to be
485 passed.
486 partitioning : Partitioning, PartitioningFactory, str, list of str
487 The partitioning scheme specified with the ``partitioning()``
488 function. A flavor string can be used as shortcut, and with a list of
489 field names a DirectionaryPartitioning will be inferred.
490 partition_base_dir : str, optional
491 For the purposes of applying the partitioning, paths will be
492 stripped of the partition_base_dir. Files not matching the
493 partition_base_dir prefix will be skipped for partitioning discovery.
494 The ignored files will still be part of the Dataset, but will not
495 have partition information.
496
497 Returns
498 -------
499 FileSystemDataset
500 """
501 from pyarrow.fs import LocalFileSystem, _ensure_filesystem
502
503 if format is None:
504 format = ParquetFileFormat()
505 elif not isinstance(format, ParquetFileFormat):
506 raise ValueError("format argument must be a ParquetFileFormat")
507
508 if filesystem is None:
509 filesystem = LocalFileSystem()
510 else:
511 filesystem = _ensure_filesystem(filesystem)
512
513 metadata_path = filesystem.normalize_path(_stringify_path(metadata_path))
514 options = ParquetFactoryOptions(
515 partition_base_dir=partition_base_dir,
516 partitioning=_ensure_partitioning(partitioning)
517 )
518
519 factory = ParquetDatasetFactory(
520 metadata_path, filesystem, format, options=options)
521 return factory.finish(schema)
522
523
524 def dataset(source, schema=None, format=None, filesystem=None,
525 partitioning=None, partition_base_dir=None,
526 exclude_invalid_files=None, ignore_prefixes=None):
527 """
528 Open a dataset.
529
530 Datasets provides functionality to efficiently work with tabular,
531 potentially larger than memory and multi-file dataset.
532
533 - A unified interface for different sources, like Parquet and Feather
534 - Discovery of sources (crawling directories, handle directory-based
535 partitioned datasets, basic schema normalization)
536 - Optimized reading with predicate pushdown (filtering rows), projection
537 (selecting columns), parallel reading or fine-grained managing of tasks.
538
539 Note that this is the high-level API, to have more control over the dataset
540 construction use the low-level API classes (FileSystemDataset,
541 FilesystemDatasetFactory, etc.)
542
543 Parameters
544 ----------
545 source : path, list of paths, dataset, list of datasets, (list of) batches\
546 or tables, iterable of batches, RecordBatchReader, or URI
547 Path pointing to a single file:
548 Open a FileSystemDataset from a single file.
549 Path pointing to a directory:
550 The directory gets discovered recursively according to a
551 partitioning scheme if given.
552 List of file paths:
553 Create a FileSystemDataset from explicitly given files. The files
554 must be located on the same filesystem given by the filesystem
555 parameter.
556 Note that in contrary of construction from a single file, passing
557 URIs as paths is not allowed.
558 List of datasets:
559 A nested UnionDataset gets constructed, it allows arbitrary
560 composition of other datasets.
561 Note that additional keyword arguments are not allowed.
562 (List of) batches or tables, iterable of batches, or RecordBatchReader:
563 Create an InMemoryDataset. If an iterable or empty list is given,
564 a schema must also be given. If an iterable or RecordBatchReader
565 is given, the resulting dataset can only be scanned once; further
566 attempts will raise an error.
567 schema : Schema, optional
568 Optionally provide the Schema for the Dataset, in which case it will
569 not be inferred from the source.
570 format : FileFormat or str
571 Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For
572 Feather, only version 2 files are supported.
573 filesystem : FileSystem or URI string, default None
574 If a single path is given as source and filesystem is None, then the
575 filesystem will be inferred from the path.
576 If an URI string is passed, then a filesystem object is constructed
577 using the URI's optional path component as a directory prefix. See the
578 examples below.
579 Note that the URIs on Windows must follow 'file:///C:...' or
580 'file:/C:...' patterns.
581 partitioning : Partitioning, PartitioningFactory, str, list of str
582 The partitioning scheme specified with the ``partitioning()``
583 function. A flavor string can be used as shortcut, and with a list of
584 field names a DirectionaryPartitioning will be inferred.
585 partition_base_dir : str, optional
586 For the purposes of applying the partitioning, paths will be
587 stripped of the partition_base_dir. Files not matching the
588 partition_base_dir prefix will be skipped for partitioning discovery.
589 The ignored files will still be part of the Dataset, but will not
590 have partition information.
591 exclude_invalid_files : bool, optional (default True)
592 If True, invalid files will be excluded (file format specific check).
593 This will incur IO for each files in a serial and single threaded
594 fashion. Disabling this feature will skip the IO, but unsupported
595 files may be present in the Dataset (resulting in an error at scan
596 time).
597 ignore_prefixes : list, optional
598 Files matching any of these prefixes will be ignored by the
599 discovery process. This is matched to the basename of a path.
600 By default this is ['.', '_'].
601 Note that discovery happens only if a directory is passed as source.
602
603 Returns
604 -------
605 dataset : Dataset
606 Either a FileSystemDataset or a UnionDataset depending on the source
607 parameter.
608
609 Examples
610 --------
611 Opening a single file:
612
613 >>> dataset("path/to/file.parquet", format="parquet")
614
615 Opening a single file with an explicit schema:
616
617 >>> dataset("path/to/file.parquet", schema=myschema, format="parquet")
618
619 Opening a dataset for a single directory:
620
621 >>> dataset("path/to/nyc-taxi/", format="parquet")
622 >>> dataset("s3://mybucket/nyc-taxi/", format="parquet")
623
624 Opening a dataset from a list of relatives local paths:
625
626 >>> dataset([
627 ... "part0/data.parquet",
628 ... "part1/data.parquet",
629 ... "part3/data.parquet",
630 ... ], format='parquet')
631
632 With filesystem provided:
633
634 >>> paths = [
635 ... 'part0/data.parquet',
636 ... 'part1/data.parquet',
637 ... 'part3/data.parquet',
638 ... ]
639 >>> dataset(paths, filesystem='file:///directory/prefix, format='parquet')
640
641 Which is equivalent with:
642
643 >>> fs = SubTreeFileSystem("/directory/prefix", LocalFileSystem())
644 >>> dataset(paths, filesystem=fs, format='parquet')
645
646 With a remote filesystem URI:
647
648 >>> paths = [
649 ... 'nested/directory/part0/data.parquet',
650 ... 'nested/directory/part1/data.parquet',
651 ... 'nested/directory/part3/data.parquet',
652 ... ]
653 >>> dataset(paths, filesystem='s3://bucket/', format='parquet')
654
655 Similarly to the local example, the directory prefix may be included in the
656 filesystem URI:
657
658 >>> dataset(paths, filesystem='s3://bucket/nested/directory',
659 ... format='parquet')
660
661 Construction of a nested dataset:
662
663 >>> dataset([
664 ... dataset("s3://old-taxi-data", format="parquet"),
665 ... dataset("local/path/to/data", format="ipc")
666 ... ])
667 """
668 # collect the keyword arguments for later reuse
669 kwargs = dict(
670 schema=schema,
671 filesystem=filesystem,
672 partitioning=partitioning,
673 format=format,
674 partition_base_dir=partition_base_dir,
675 exclude_invalid_files=exclude_invalid_files,
676 selector_ignore_prefixes=ignore_prefixes
677 )
678
679 if _is_path_like(source):
680 return _filesystem_dataset(source, **kwargs)
681 elif isinstance(source, (tuple, list)):
682 if all(_is_path_like(elem) for elem in source):
683 return _filesystem_dataset(source, **kwargs)
684 elif all(isinstance(elem, Dataset) for elem in source):
685 return _union_dataset(source, **kwargs)
686 elif all(isinstance(elem, (pa.RecordBatch, pa.Table))
687 for elem in source):
688 return _in_memory_dataset(source, **kwargs)
689 else:
690 unique_types = set(type(elem).__name__ for elem in source)
691 type_names = ', '.join('{}'.format(t) for t in unique_types)
692 raise TypeError(
693 'Expected a list of path-like or dataset objects, or a list '
694 'of batches or tables. The given list contains the following '
695 'types: {}'.format(type_names)
696 )
697 elif isinstance(source, (pa.RecordBatch, pa.Table)):
698 return _in_memory_dataset(source, **kwargs)
699 else:
700 raise TypeError(
701 'Expected a path-like, list of path-likes or a list of Datasets '
702 'instead of the given type: {}'.format(type(source).__name__)
703 )
704
705
706 def _ensure_write_partitioning(part, schema, flavor):
707 if isinstance(part, PartitioningFactory):
708 raise ValueError("A PartitioningFactory cannot be used. "
709 "Did you call the partitioning function "
710 "without supplying a schema?")
711
712 if isinstance(part, Partitioning) and flavor:
713 raise ValueError(
714 "Providing a partitioning_flavor with "
715 "a Partitioning object is not supported"
716 )
717 elif isinstance(part, (tuple, list)):
718 # Name of fields were provided instead of a partitioning object.
719 # Create a partitioning factory with those field names.
720 part = partitioning(
721 schema=pa.schema([schema.field(f) for f in part]),
722 flavor=flavor
723 )
724 elif part is None:
725 part = partitioning(pa.schema([]), flavor=flavor)
726
727 if not isinstance(part, Partitioning):
728 raise ValueError(
729 "partitioning must be a Partitioning object or "
730 "a list of column names"
731 )
732
733 return part
734
735
736 def write_dataset(data, base_dir, basename_template=None, format=None,
737 partitioning=None, partitioning_flavor=None, schema=None,
738 filesystem=None, file_options=None, use_threads=True,
739 max_partitions=None, file_visitor=None,
740 existing_data_behavior='error'):
741 """
742 Write a dataset to a given format and partitioning.
743
744 Parameters
745 ----------
746 data : Dataset, Table/RecordBatch, RecordBatchReader, list of
747 Table/RecordBatch, or iterable of RecordBatch
748 The data to write. This can be a Dataset instance or
749 in-memory Arrow data. If an iterable is given, the schema must
750 also be given.
751 base_dir : str
752 The root directory where to write the dataset.
753 basename_template : str, optional
754 A template string used to generate basenames of written data files.
755 The token '{i}' will be replaced with an automatically incremented
756 integer. If not specified, it defaults to
757 "part-{i}." + format.default_extname
758 format : FileFormat or str
759 The format in which to write the dataset. Currently supported:
760 "parquet", "ipc"/"feather". If a FileSystemDataset is being written
761 and `format` is not specified, it defaults to the same format as the
762 specified FileSystemDataset. When writing a Table or RecordBatch, this
763 keyword is required.
764 partitioning : Partitioning or list[str], optional
765 The partitioning scheme specified with the ``partitioning()``
766 function or a list of field names. When providing a list of
767 field names, you can use ``partitioning_flavor`` to drive which
768 partitioning type should be used.
769 partitioning_flavor : str, optional
770 One of the partitioning flavors supported by
771 ``pyarrow.dataset.partitioning``. If omitted will use the
772 default of ``partitioning()`` which is directory partitioning.
773 schema : Schema, optional
774 filesystem : FileSystem, optional
775 file_options : FileWriteOptions, optional
776 FileFormat specific write options, created using the
777 ``FileFormat.make_write_options()`` function.
778 use_threads : bool, default True
779 Write files in parallel. If enabled, then maximum parallelism will be
780 used determined by the number of available CPU cores.
781 max_partitions : int, default 1024
782 Maximum number of partitions any batch may be written into.
783 file_visitor : Function
784 If set, this function will be called with a WrittenFile instance
785 for each file created during the call. This object will have both
786 a path attribute and a metadata attribute.
787
788 The path attribute will be a string containing the path to
789 the created file.
790
791 The metadata attribute will be the parquet metadata of the file.
792 This metadata will have the file path attribute set and can be used
793 to build a _metadata file. The metadata attribute will be None if
794 the format is not parquet.
795
796 Example visitor which simple collects the filenames created::
797
798 visited_paths = []
799
800 def file_visitor(written_file):
801 visited_paths.append(written_file.path)
802 existing_data_behavior : 'error' | 'overwrite_or_ignore' | \
803 'delete_matching'
804 Controls how the dataset will handle data that already exists in
805 the destination. The default behavior ('error') is to raise an error
806 if any data exists in the destination.
807
808 'overwrite_or_ignore' will ignore any existing data and will
809 overwrite files with the same name as an output file. Other
810 existing files will be ignored. This behavior, in combination
811 with a unique basename_template for each write, will allow for
812 an append workflow.
813
814 'delete_matching' is useful when you are writing a partitioned
815 dataset. The first time each partition directory is encountered
816 the entire directory will be deleted. This allows you to overwrite
817 old partitions completely.
818 """
819 from pyarrow.fs import _resolve_filesystem_and_path
820
821 if isinstance(data, (list, tuple)):
822 schema = schema or data[0].schema
823 data = InMemoryDataset(data, schema=schema)
824 elif isinstance(data, (pa.RecordBatch, pa.Table)):
825 schema = schema or data.schema
826 data = InMemoryDataset(data, schema=schema)
827 elif isinstance(data, pa.ipc.RecordBatchReader) or _is_iterable(data):
828 data = Scanner.from_batches(data, schema=schema, use_async=True)
829 schema = None
830 elif not isinstance(data, (Dataset, Scanner)):
831 raise ValueError(
832 "Only Dataset, Scanner, Table/RecordBatch, RecordBatchReader, "
833 "a list of Tables/RecordBatches, or iterable of batches are "
834 "supported."
835 )
836
837 if format is None and isinstance(data, FileSystemDataset):
838 format = data.format
839 else:
840 format = _ensure_format(format)
841
842 if file_options is None:
843 file_options = format.make_write_options()
844
845 if format != file_options.format:
846 raise TypeError("Supplied FileWriteOptions have format {}, "
847 "which doesn't match supplied FileFormat {}".format(
848 format, file_options))
849
850 if basename_template is None:
851 basename_template = "part-{i}." + format.default_extname
852
853 if max_partitions is None:
854 max_partitions = 1024
855
856 # at this point data is a Scanner or a Dataset, anything else
857 # was converted to one of those two. So we can grab the schema
858 # to build the partitioning object from Dataset.
859 if isinstance(data, Scanner):
860 partitioning_schema = data.dataset_schema
861 else:
862 partitioning_schema = data.schema
863 partitioning = _ensure_write_partitioning(partitioning,
864 schema=partitioning_schema,
865 flavor=partitioning_flavor)
866
867 filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem)
868
869 if isinstance(data, Dataset):
870 scanner = data.scanner(use_threads=use_threads, use_async=True)
871 else:
872 # scanner was passed directly by the user, in which case a schema
873 # cannot be passed
874 if schema is not None:
875 raise ValueError("Cannot specify a schema when writing a Scanner")
876 scanner = data
877
878 _filesystemdataset_write(
879 scanner, base_dir, basename_template, filesystem, partitioning,
880 file_options, max_partitions, file_visitor, existing_data_behavior
881 )