1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
18 """Dataset is currently unstable. APIs subject to change without notice."""
21 from pyarrow
.util
import _is_iterable
, _stringify_path
, _is_path_like
23 from pyarrow
._dataset
import ( # noqa
25 CsvFragmentScanOptions
,
29 DirectoryPartitioning
,
33 FileSystemDatasetFactory
,
34 FileSystemFactoryOptions
,
41 ParquetDatasetFactory
,
42 ParquetFactoryOptions
,
45 ParquetFileWriteOptions
,
46 ParquetFragmentScanOptions
,
56 _filesystemdataset_write
,
59 _orc_available
= False
61 "The pyarrow installation is not built with support for the ORC file "
66 from pyarrow
._dataset
_orc
import OrcFileFormat
72 def __getattr__(name
):
73 if name
== "OrcFileFormat" and not _orc_available
:
74 raise ImportError(_orc_msg
)
77 "module 'pyarrow.dataset' has no attribute '{0}'".format(name
)
82 """Reference a named column of the dataset.
84 Stores only the field's name. Type and other information is known only when
85 the expression is bound to a dataset having an explicit scheme.
90 The name of the field the expression references to.
94 field_expr : Expression
96 return Expression
._field
(name
)
100 """Expression representing a scalar value.
104 value : bool, int, float or string
105 Python value of the scalar. Note that only a subset of types are
110 scalar_expr : Expression
112 return Expression
._scalar
(value
)
115 def partitioning(schema
=None, field_names
=None, flavor
=None,
118 Specify a partitioning scheme.
120 The supported schemes include:
122 - "DirectoryPartitioning": this scheme expects one segment in the file path
123 for each field in the specified schema (all fields are required to be
124 present). For example given schema<year:int16, month:int8> the path
125 "/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
126 - "HivePartitioning": a scheme for "/$key=$value/" nested directories as
127 found in Apache Hive. This is a multi-level, directory based partitioning
128 scheme. Data is partitioned by static values of a particular column in
129 the schema. Partition keys are represented in the form $key=$value in
130 directory names. Field order is ignored, as are missing or unrecognized
132 For example, given schema<year:int16, month:int8, day:int8>, a possible
133 path would be "/year=2009/month=11/day=15" (but the field order does not
138 schema : pyarrow.Schema, default None
139 The schema that describes the partitions present in the file path.
140 If not specified, and `field_names` and/or `flavor` are specified,
141 the schema will be inferred from the file path (and a
142 PartitioningFactory is returned).
143 field_names : list of str, default None
144 A list of strings (field names). If specified, the schema's types are
145 inferred from the file paths (only valid for DirectoryPartitioning).
146 flavor : str, default None
147 The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
149 dictionaries : Dict[str, Array]
150 If the type of any field of `schema` is a dictionary type, the
151 corresponding entry of `dictionaries` must be an array containing
152 every value which may be taken by the corresponding column or an
153 error will be raised in parsing. Alternatively, pass `infer` to have
154 Arrow discover the dictionary values, in which case a
155 PartitioningFactory is returned.
159 Partitioning or PartitioningFactory
164 Specify the Schema for paths like "/2009/June":
166 >>> partitioning(pa.schema([("year", pa.int16()), ("month", pa.string())]))
168 or let the types be inferred by only specifying the field names:
170 >>> partitioning(field_names=["year", "month"])
172 For paths like "/2009/June", the year will be inferred as int32 while month
173 will be inferred as string.
175 Specify a Schema with dictionary encoding, providing dictionary values:
179 ... ("year", pa.int16()),
180 ... ("month", pa.dictionary(pa.int8(), pa.string()))
183 ... "month": pa.array(["January", "February", "March"]),
186 Alternatively, specify a Schema with dictionary encoding, but have Arrow
187 infer the dictionary values:
191 ... ("year", pa.int16()),
192 ... ("month", pa.dictionary(pa.int8(), pa.string()))
194 ... dictionaries="infer")
196 Create a Hive scheme for a path like "/year=2009/month=11":
199 ... pa.schema([("year", pa.int16()), ("month", pa.int8())]),
202 A Hive scheme can also be discovered from the directory structure (and
203 types will be inferred):
205 >>> partitioning(flavor="hive")
210 if schema
is not None:
211 if field_names
is not None:
213 "Cannot specify both 'schema' and 'field_names'")
214 if dictionaries
== 'infer':
215 return DirectoryPartitioning
.discover(schema
=schema
)
216 return DirectoryPartitioning(schema
, dictionaries
)
217 elif field_names
is not None:
218 if isinstance(field_names
, list):
219 return DirectoryPartitioning
.discover(field_names
)
222 "Expected list of field names, got {}".format(
226 "For the default directory flavor, need to specify "
227 "a Schema or a list of field names")
228 elif flavor
== 'hive':
229 if field_names
is not None:
230 raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
231 elif schema
is not None:
232 if isinstance(schema
, pa
.Schema
):
233 if dictionaries
== 'infer':
234 return HivePartitioning
.discover(schema
=schema
)
235 return HivePartitioning(schema
, dictionaries
)
238 "Expected Schema for 'schema', got {}".format(
241 return HivePartitioning
.discover()
243 raise ValueError("Unsupported flavor")
246 def _ensure_partitioning(scheme
):
248 Validate input and return a Partitioning(Factory).
250 It passes None through if no partitioning scheme is defined.
254 elif isinstance(scheme
, str):
255 scheme
= partitioning(flavor
=scheme
)
256 elif isinstance(scheme
, list):
257 scheme
= partitioning(field_names
=scheme
)
258 elif isinstance(scheme
, (Partitioning
, PartitioningFactory
)):
261 ValueError("Expected Partitioning or PartitioningFactory, got {}"
262 .format(type(scheme
)))
266 def _ensure_format(obj
):
267 if isinstance(obj
, FileFormat
):
269 elif obj
== "parquet":
270 return ParquetFileFormat()
271 elif obj
in {"ipc", "arrow", "feather"}:
272 return IpcFileFormat()
274 return CsvFileFormat()
276 if not _orc_available
:
277 raise ValueError(_orc_msg
)
278 return OrcFileFormat()
280 raise ValueError("format '{}' is not supported".format(obj
))
283 def _ensure_multiple_sources(paths
, filesystem
=None):
285 Treat a list of paths as files belonging to a single file system
287 If the file system is local then also validates that all paths
288 are referencing existing *files* otherwise any non-file paths will be
289 silently skipped (for example on a remote filesystem).
293 paths : list of path-like
294 Note that URIs are not allowed.
295 filesystem : FileSystem or str, optional
296 If an URI is passed, then its path component will act as a prefix for
301 (FileSystem, list of str)
302 File system object and a list of normalized paths.
307 If the passed filesystem has wrong type.
309 If the file system is local and a referenced path is not available or
312 from pyarrow
.fs
import (
313 LocalFileSystem
, SubTreeFileSystem
, _MockFileSystem
, FileType
,
317 if filesystem
is None:
318 # fall back to local file system as the default
319 filesystem
= LocalFileSystem()
321 # construct a filesystem if it is a valid URI
322 filesystem
= _ensure_filesystem(filesystem
)
325 isinstance(filesystem
, (LocalFileSystem
, _MockFileSystem
)) or
326 (isinstance(filesystem
, SubTreeFileSystem
) and
327 isinstance(filesystem
.base_fs
, LocalFileSystem
))
330 # allow normalizing irregular paths such as Windows local paths
331 paths
= [filesystem
.normalize_path(_stringify_path(p
)) for p
in paths
]
333 # validate that all of the paths are pointing to existing *files*
334 # possible improvement is to group the file_infos by type and raise for
335 # multiple paths per error category
337 for info
in filesystem
.get_file_info(paths
):
338 file_type
= info
.type
339 if file_type
== FileType
.File
:
341 elif file_type
== FileType
.NotFound
:
342 raise FileNotFoundError(info
.path
)
343 elif file_type
== FileType
.Directory
:
344 raise IsADirectoryError(
345 'Path {} points to a directory, but only file paths are '
346 'supported. To construct a nested or union dataset pass '
347 'a list of dataset objects instead.'.format(info
.path
)
351 'Path {} exists but its type is unknown (could be a '
352 'special file such as a Unix socket or character device, '
353 'or Windows NUL / CON / ...)'.format(info
.path
)
356 return filesystem
, paths
359 def _ensure_single_source(path
, filesystem
=None):
361 Treat path as either a recursively traversable directory or a single file.
366 filesystem : FileSystem or str, optional
367 If an URI is passed, then its path component will act as a prefix for
372 (FileSystem, list of str or fs.Selector)
373 File system object and either a single item list pointing to a file or
374 an fs.Selector object pointing to a directory.
379 If the passed filesystem has wrong type.
381 If the referenced file or directory doesn't exist.
383 from pyarrow
.fs
import FileType
, FileSelector
, _resolve_filesystem_and_path
385 # at this point we already checked that `path` is a path-like
386 filesystem
, path
= _resolve_filesystem_and_path(path
, filesystem
)
388 # ensure that the path is normalized before passing to dataset discovery
389 path
= filesystem
.normalize_path(path
)
391 # retrieve the file descriptor
392 file_info
= filesystem
.get_file_info(path
)
394 # depending on the path type either return with a recursive
395 # directory selector or as a list containing a single file
396 if file_info
.type == FileType
.Directory
:
397 paths_or_selector
= FileSelector(path
, recursive
=True)
398 elif file_info
.type == FileType
.File
:
399 paths_or_selector
= [path
]
401 raise FileNotFoundError(path
)
403 return filesystem
, paths_or_selector
406 def _filesystem_dataset(source
, schema
=None, filesystem
=None,
407 partitioning
=None, format
=None,
408 partition_base_dir
=None, exclude_invalid_files
=None,
409 selector_ignore_prefixes
=None):
411 Create a FileSystemDataset which can be used to build a Dataset.
413 Parameters are documented in the dataset function.
419 format
= _ensure_format(format
or 'parquet')
420 partitioning
= _ensure_partitioning(partitioning
)
422 if isinstance(source
, (list, tuple)):
423 fs
, paths_or_selector
= _ensure_multiple_sources(source
, filesystem
)
425 fs
, paths_or_selector
= _ensure_single_source(source
, filesystem
)
427 options
= FileSystemFactoryOptions(
428 partitioning
=partitioning
,
429 partition_base_dir
=partition_base_dir
,
430 exclude_invalid_files
=exclude_invalid_files
,
431 selector_ignore_prefixes
=selector_ignore_prefixes
433 factory
= FileSystemDatasetFactory(fs
, paths_or_selector
, format
, options
)
435 return factory
.finish(schema
)
438 def _in_memory_dataset(source
, schema
=None, **kwargs
):
439 if any(v
is not None for v
in kwargs
.values()):
441 "For in-memory datasets, you cannot pass any additional arguments")
442 return InMemoryDataset(source
, schema
)
445 def _union_dataset(children
, schema
=None, **kwargs
):
446 if any(v
is not None for v
in kwargs
.values()):
448 "When passing a list of Datasets, you cannot pass any additional "
453 # unify the children datasets' schemas
454 schema
= pa
.unify_schemas([child
.schema
for child
in children
])
456 # create datasets with the requested schema
457 children
= [child
.replace_schema(schema
) for child
in children
]
459 return UnionDataset(schema
, children
)
462 def parquet_dataset(metadata_path
, schema
=None, filesystem
=None, format
=None,
463 partitioning
=None, partition_base_dir
=None):
465 Create a FileSystemDataset from a `_metadata` file created via
466 `pyarrrow.parquet.write_metadata`.
470 metadata_path : path,
471 Path pointing to a single file parquet metadata file
472 schema : Schema, optional
473 Optionally provide the Schema for the Dataset, in which case it will
474 not be inferred from the source.
475 filesystem : FileSystem or URI string, default None
476 If a single path is given as source and filesystem is None, then the
477 filesystem will be inferred from the path.
478 If an URI string is passed, then a filesystem object is constructed
479 using the URI's optional path component as a directory prefix. See the
481 Note that the URIs on Windows must follow 'file:///C:...' or
482 'file:/C:...' patterns.
483 format : ParquetFileFormat
484 An instance of a ParquetFileFormat if special options needs to be
486 partitioning : Partitioning, PartitioningFactory, str, list of str
487 The partitioning scheme specified with the ``partitioning()``
488 function. A flavor string can be used as shortcut, and with a list of
489 field names a DirectionaryPartitioning will be inferred.
490 partition_base_dir : str, optional
491 For the purposes of applying the partitioning, paths will be
492 stripped of the partition_base_dir. Files not matching the
493 partition_base_dir prefix will be skipped for partitioning discovery.
494 The ignored files will still be part of the Dataset, but will not
495 have partition information.
501 from pyarrow
.fs
import LocalFileSystem
, _ensure_filesystem
504 format
= ParquetFileFormat()
505 elif not isinstance(format
, ParquetFileFormat
):
506 raise ValueError("format argument must be a ParquetFileFormat")
508 if filesystem
is None:
509 filesystem
= LocalFileSystem()
511 filesystem
= _ensure_filesystem(filesystem
)
513 metadata_path
= filesystem
.normalize_path(_stringify_path(metadata_path
))
514 options
= ParquetFactoryOptions(
515 partition_base_dir
=partition_base_dir
,
516 partitioning
=_ensure_partitioning(partitioning
)
519 factory
= ParquetDatasetFactory(
520 metadata_path
, filesystem
, format
, options
=options
)
521 return factory
.finish(schema
)
524 def dataset(source
, schema
=None, format
=None, filesystem
=None,
525 partitioning
=None, partition_base_dir
=None,
526 exclude_invalid_files
=None, ignore_prefixes
=None):
530 Datasets provides functionality to efficiently work with tabular,
531 potentially larger than memory and multi-file dataset.
533 - A unified interface for different sources, like Parquet and Feather
534 - Discovery of sources (crawling directories, handle directory-based
535 partitioned datasets, basic schema normalization)
536 - Optimized reading with predicate pushdown (filtering rows), projection
537 (selecting columns), parallel reading or fine-grained managing of tasks.
539 Note that this is the high-level API, to have more control over the dataset
540 construction use the low-level API classes (FileSystemDataset,
541 FilesystemDatasetFactory, etc.)
545 source : path, list of paths, dataset, list of datasets, (list of) batches\
546 or tables, iterable of batches, RecordBatchReader, or URI
547 Path pointing to a single file:
548 Open a FileSystemDataset from a single file.
549 Path pointing to a directory:
550 The directory gets discovered recursively according to a
551 partitioning scheme if given.
553 Create a FileSystemDataset from explicitly given files. The files
554 must be located on the same filesystem given by the filesystem
556 Note that in contrary of construction from a single file, passing
557 URIs as paths is not allowed.
559 A nested UnionDataset gets constructed, it allows arbitrary
560 composition of other datasets.
561 Note that additional keyword arguments are not allowed.
562 (List of) batches or tables, iterable of batches, or RecordBatchReader:
563 Create an InMemoryDataset. If an iterable or empty list is given,
564 a schema must also be given. If an iterable or RecordBatchReader
565 is given, the resulting dataset can only be scanned once; further
566 attempts will raise an error.
567 schema : Schema, optional
568 Optionally provide the Schema for the Dataset, in which case it will
569 not be inferred from the source.
570 format : FileFormat or str
571 Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For
572 Feather, only version 2 files are supported.
573 filesystem : FileSystem or URI string, default None
574 If a single path is given as source and filesystem is None, then the
575 filesystem will be inferred from the path.
576 If an URI string is passed, then a filesystem object is constructed
577 using the URI's optional path component as a directory prefix. See the
579 Note that the URIs on Windows must follow 'file:///C:...' or
580 'file:/C:...' patterns.
581 partitioning : Partitioning, PartitioningFactory, str, list of str
582 The partitioning scheme specified with the ``partitioning()``
583 function. A flavor string can be used as shortcut, and with a list of
584 field names a DirectionaryPartitioning will be inferred.
585 partition_base_dir : str, optional
586 For the purposes of applying the partitioning, paths will be
587 stripped of the partition_base_dir. Files not matching the
588 partition_base_dir prefix will be skipped for partitioning discovery.
589 The ignored files will still be part of the Dataset, but will not
590 have partition information.
591 exclude_invalid_files : bool, optional (default True)
592 If True, invalid files will be excluded (file format specific check).
593 This will incur IO for each files in a serial and single threaded
594 fashion. Disabling this feature will skip the IO, but unsupported
595 files may be present in the Dataset (resulting in an error at scan
597 ignore_prefixes : list, optional
598 Files matching any of these prefixes will be ignored by the
599 discovery process. This is matched to the basename of a path.
600 By default this is ['.', '_'].
601 Note that discovery happens only if a directory is passed as source.
606 Either a FileSystemDataset or a UnionDataset depending on the source
611 Opening a single file:
613 >>> dataset("path/to/file.parquet", format="parquet")
615 Opening a single file with an explicit schema:
617 >>> dataset("path/to/file.parquet", schema=myschema, format="parquet")
619 Opening a dataset for a single directory:
621 >>> dataset("path/to/nyc-taxi/", format="parquet")
622 >>> dataset("s3://mybucket/nyc-taxi/", format="parquet")
624 Opening a dataset from a list of relatives local paths:
627 ... "part0/data.parquet",
628 ... "part1/data.parquet",
629 ... "part3/data.parquet",
630 ... ], format='parquet')
632 With filesystem provided:
635 ... 'part0/data.parquet',
636 ... 'part1/data.parquet',
637 ... 'part3/data.parquet',
639 >>> dataset(paths, filesystem='file:///directory/prefix, format='parquet')
641 Which is equivalent with:
643 >>> fs = SubTreeFileSystem("/directory/prefix", LocalFileSystem())
644 >>> dataset(paths, filesystem=fs, format='parquet')
646 With a remote filesystem URI:
649 ... 'nested/directory/part0/data.parquet',
650 ... 'nested/directory/part1/data.parquet',
651 ... 'nested/directory/part3/data.parquet',
653 >>> dataset(paths, filesystem='s3://bucket/', format='parquet')
655 Similarly to the local example, the directory prefix may be included in the
658 >>> dataset(paths, filesystem='s3://bucket/nested/directory',
659 ... format='parquet')
661 Construction of a nested dataset:
664 ... dataset("s3://old-taxi-data", format="parquet"),
665 ... dataset("local/path/to/data", format="ipc")
668 # collect the keyword arguments for later reuse
671 filesystem
=filesystem
,
672 partitioning
=partitioning
,
674 partition_base_dir
=partition_base_dir
,
675 exclude_invalid_files
=exclude_invalid_files
,
676 selector_ignore_prefixes
=ignore_prefixes
679 if _is_path_like(source
):
680 return _filesystem_dataset(source
, **kwargs
)
681 elif isinstance(source
, (tuple, list)):
682 if all(_is_path_like(elem
) for elem
in source
):
683 return _filesystem_dataset(source
, **kwargs
)
684 elif all(isinstance(elem
, Dataset
) for elem
in source
):
685 return _union_dataset(source
, **kwargs
)
686 elif all(isinstance(elem
, (pa
.RecordBatch
, pa
.Table
))
688 return _in_memory_dataset(source
, **kwargs
)
690 unique_types
= set(type(elem
).__name
__ for elem
in source
)
691 type_names
= ', '.join('{}'.format(t
) for t
in unique_types
)
693 'Expected a list of path-like or dataset objects, or a list '
694 'of batches or tables. The given list contains the following '
695 'types: {}'.format(type_names
)
697 elif isinstance(source
, (pa
.RecordBatch
, pa
.Table
)):
698 return _in_memory_dataset(source
, **kwargs
)
701 'Expected a path-like, list of path-likes or a list of Datasets '
702 'instead of the given type: {}'.format(type(source
).__name
__)
706 def _ensure_write_partitioning(part
, schema
, flavor
):
707 if isinstance(part
, PartitioningFactory
):
708 raise ValueError("A PartitioningFactory cannot be used. "
709 "Did you call the partitioning function "
710 "without supplying a schema?")
712 if isinstance(part
, Partitioning
) and flavor
:
714 "Providing a partitioning_flavor with "
715 "a Partitioning object is not supported"
717 elif isinstance(part
, (tuple, list)):
718 # Name of fields were provided instead of a partitioning object.
719 # Create a partitioning factory with those field names.
721 schema
=pa
.schema([schema
.field(f
) for f
in part
]),
725 part
= partitioning(pa
.schema([]), flavor
=flavor
)
727 if not isinstance(part
, Partitioning
):
729 "partitioning must be a Partitioning object or "
730 "a list of column names"
736 def write_dataset(data
, base_dir
, basename_template
=None, format
=None,
737 partitioning
=None, partitioning_flavor
=None, schema
=None,
738 filesystem
=None, file_options
=None, use_threads
=True,
739 max_partitions
=None, file_visitor
=None,
740 existing_data_behavior
='error'):
742 Write a dataset to a given format and partitioning.
746 data : Dataset, Table/RecordBatch, RecordBatchReader, list of
747 Table/RecordBatch, or iterable of RecordBatch
748 The data to write. This can be a Dataset instance or
749 in-memory Arrow data. If an iterable is given, the schema must
752 The root directory where to write the dataset.
753 basename_template : str, optional
754 A template string used to generate basenames of written data files.
755 The token '{i}' will be replaced with an automatically incremented
756 integer. If not specified, it defaults to
757 "part-{i}." + format.default_extname
758 format : FileFormat or str
759 The format in which to write the dataset. Currently supported:
760 "parquet", "ipc"/"feather". If a FileSystemDataset is being written
761 and `format` is not specified, it defaults to the same format as the
762 specified FileSystemDataset. When writing a Table or RecordBatch, this
764 partitioning : Partitioning or list[str], optional
765 The partitioning scheme specified with the ``partitioning()``
766 function or a list of field names. When providing a list of
767 field names, you can use ``partitioning_flavor`` to drive which
768 partitioning type should be used.
769 partitioning_flavor : str, optional
770 One of the partitioning flavors supported by
771 ``pyarrow.dataset.partitioning``. If omitted will use the
772 default of ``partitioning()`` which is directory partitioning.
773 schema : Schema, optional
774 filesystem : FileSystem, optional
775 file_options : FileWriteOptions, optional
776 FileFormat specific write options, created using the
777 ``FileFormat.make_write_options()`` function.
778 use_threads : bool, default True
779 Write files in parallel. If enabled, then maximum parallelism will be
780 used determined by the number of available CPU cores.
781 max_partitions : int, default 1024
782 Maximum number of partitions any batch may be written into.
783 file_visitor : Function
784 If set, this function will be called with a WrittenFile instance
785 for each file created during the call. This object will have both
786 a path attribute and a metadata attribute.
788 The path attribute will be a string containing the path to
791 The metadata attribute will be the parquet metadata of the file.
792 This metadata will have the file path attribute set and can be used
793 to build a _metadata file. The metadata attribute will be None if
794 the format is not parquet.
796 Example visitor which simple collects the filenames created::
800 def file_visitor(written_file):
801 visited_paths.append(written_file.path)
802 existing_data_behavior : 'error' | 'overwrite_or_ignore' | \
804 Controls how the dataset will handle data that already exists in
805 the destination. The default behavior ('error') is to raise an error
806 if any data exists in the destination.
808 'overwrite_or_ignore' will ignore any existing data and will
809 overwrite files with the same name as an output file. Other
810 existing files will be ignored. This behavior, in combination
811 with a unique basename_template for each write, will allow for
814 'delete_matching' is useful when you are writing a partitioned
815 dataset. The first time each partition directory is encountered
816 the entire directory will be deleted. This allows you to overwrite
817 old partitions completely.
819 from pyarrow
.fs
import _resolve_filesystem_and_path
821 if isinstance(data
, (list, tuple)):
822 schema
= schema
or data
[0].schema
823 data
= InMemoryDataset(data
, schema
=schema
)
824 elif isinstance(data
, (pa
.RecordBatch
, pa
.Table
)):
825 schema
= schema
or data
.schema
826 data
= InMemoryDataset(data
, schema
=schema
)
827 elif isinstance(data
, pa
.ipc
.RecordBatchReader
) or _is_iterable(data
):
828 data
= Scanner
.from_batches(data
, schema
=schema
, use_async
=True)
830 elif not isinstance(data
, (Dataset
, Scanner
)):
832 "Only Dataset, Scanner, Table/RecordBatch, RecordBatchReader, "
833 "a list of Tables/RecordBatches, or iterable of batches are "
837 if format
is None and isinstance(data
, FileSystemDataset
):
840 format
= _ensure_format(format
)
842 if file_options
is None:
843 file_options
= format
.make_write_options()
845 if format
!= file_options
.format
:
846 raise TypeError("Supplied FileWriteOptions have format {}, "
847 "which doesn't match supplied FileFormat {}".format(
848 format
, file_options
))
850 if basename_template
is None:
851 basename_template
= "part-{i}." + format
.default_extname
853 if max_partitions
is None:
854 max_partitions
= 1024
856 # at this point data is a Scanner or a Dataset, anything else
857 # was converted to one of those two. So we can grab the schema
858 # to build the partitioning object from Dataset.
859 if isinstance(data
, Scanner
):
860 partitioning_schema
= data
.dataset_schema
862 partitioning_schema
= data
.schema
863 partitioning
= _ensure_write_partitioning(partitioning
,
864 schema
=partitioning_schema
,
865 flavor
=partitioning_flavor
)
867 filesystem
, base_dir
= _resolve_filesystem_and_path(base_dir
, filesystem
)
869 if isinstance(data
, Dataset
):
870 scanner
= data
.scanner(use_threads
=use_threads
, use_async
=True)
872 # scanner was passed directly by the user, in which case a schema
874 if schema
is not None:
875 raise ValueError("Cannot specify a schema when writing a Scanner")
878 _filesystemdataset_write(
879 scanner
, base_dir
, basename_template
, filesystem
, partitioning
,
880 file_options
, max_partitions
, file_visitor
, existing_data_behavior