]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/docs/source/python/dataset.rst
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / docs / source / python / dataset.rst
CommitLineData
1d09f67e
TL
1.. Licensed to the Apache Software Foundation (ASF) under one
2.. or more contributor license agreements. See the NOTICE file
3.. distributed with this work for additional information
4.. regarding copyright ownership. The ASF licenses this file
5.. to you under the Apache License, Version 2.0 (the
6.. "License"); you may not use this file except in compliance
7.. with the License. You may obtain a copy of the License at
8
9.. http://www.apache.org/licenses/LICENSE-2.0
10
11.. Unless required by applicable law or agreed to in writing,
12.. software distributed under the License is distributed on an
13.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14.. KIND, either express or implied. See the License for the
15.. specific language governing permissions and limitations
16.. under the License.
17
18.. currentmodule:: pyarrow.dataset
19
20.. _dataset:
21
22Tabular Datasets
23================
24
25.. warning::
26
27 The ``pyarrow.dataset`` module is experimental (specifically the classes),
28 and a stable API is not yet guaranteed.
29
30The ``pyarrow.dataset`` module provides functionality to efficiently work with
31tabular, potentially larger than memory, and multi-file datasets. This includes:
32
33* A unified interface that supports different sources and file formats
34 (Parquet, ORC, Feather / Arrow IPC, and CSV files) and different file systems
35 (local, cloud).
36* Discovery of sources (crawling directories, handle directory-based partitioned
37 datasets, basic schema normalization, ..)
38* Optimized reading with predicate pushdown (filtering rows), projection
39 (selecting and deriving columns), and optionally parallel reading.
40
41Currently, only Parquet, ORC, Feather / Arrow IPC, and CSV files are
42supported. The goal is to expand this in the future to other file formats and
43data sources (e.g. database connections).
44
45For those familiar with the existing :class:`pyarrow.parquet.ParquetDataset` for
46reading Parquet datasets: ``pyarrow.dataset``'s goal is similar but not specific
47to the Parquet format and not tied to Python: the same datasets API is exposed
48in the R bindings or Arrow. In addition ``pyarrow.dataset`` boasts improved
49performance and new features (e.g. filtering within files rather than only on
50partition keys).
51
52
53Reading Datasets
54----------------
55
56.. TODO Full blown example with NYC taxi data to show off, afterwards explain all parts:
57
58For the examples below, let's create a small dataset consisting
59of a directory with two parquet files:
60
61.. ipython:: python
62
63 import tempfile
64 import pathlib
65 import pyarrow as pa
66 import pyarrow.parquet as pq
67 import numpy as np
68
69 base = pathlib.Path(tempfile.gettempdir())
70 (base / "parquet_dataset").mkdir(exist_ok=True)
71
72 # creating an Arrow Table
73 table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})
74
75 # writing it into two parquet files
76 pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")
77 pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")
78
79Dataset discovery
80~~~~~~~~~~~~~~~~~
81
82A :class:`Dataset` object can be created with the :func:`dataset` function. We
83can pass it the path to the directory containing the data files:
84
85.. ipython:: python
86
87 import pyarrow.dataset as ds
88 dataset = ds.dataset(base / "parquet_dataset", format="parquet")
89 dataset
90
91In addition to searching a base directory, :func:`dataset` accepts a path to a
92single file or a list of file paths.
93
94Creating a :class:`Dataset` object does not begin reading the data itself. If
95needed, it only crawls the directory to find all the files:
96
97.. ipython:: python
98
99 dataset.files
100
101... and infers the dataset's schema (by default from the first file):
102
103.. ipython:: python
104
105 print(dataset.schema.to_string(show_field_metadata=False))
106
107Using the :meth:`Dataset.to_table` method we can read the dataset (or a portion
108of it) into a pyarrow Table (note that depending on the size of your dataset
109this can require a lot of memory, see below on filtering / iterative loading):
110
111.. ipython:: python
112
113 dataset.to_table()
114 # converting to pandas to see the contents of the scanned table
115 dataset.to_table().to_pandas()
116
117Reading different file formats
118~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
119
120The above examples use Parquet files as dataset sources but the Dataset API
121provides a consistent interface across multiple file formats and filesystems.
122Currently, Parquet, ORC, Feather / Arrow IPC, and CSV file formats are
123supported; more formats are planned in the future.
124
125If we save the table as Feather files instead of Parquet files:
126
127.. ipython:: python
128
129 import pyarrow.feather as feather
130
131 feather.write_feather(table, base / "data.feather")
132
133…then we can read the Feather file using the same functions, but with specifying
134``format="feather"``:
135
136.. ipython:: python
137
138 dataset = ds.dataset(base / "data.feather", format="feather")
139 dataset.to_table().to_pandas().head()
140
141Customizing file formats
142~~~~~~~~~~~~~~~~~~~~~~~~
143
144The format name as a string, like::
145
146 ds.dataset(..., format="parquet")
147
148is short hand for a default constructed :class:`ParquetFileFormat`::
149
150 ds.dataset(..., format=ds.ParquetFileFormat())
151
152The :class:`FileFormat` objects can be customized using keywords. For example::
153
154 parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
155 ds.dataset(..., format=parquet_format)
156
157Will configure column ``"a"`` to be dictionary encoded on scan.
158
159Filtering data
160--------------
161
162To avoid reading all data when only needing a subset, the ``columns`` and
163``filter`` keywords can be used.
164
165The ``columns`` keyword can be used to only read the specified columns:
166
167.. ipython:: python
168
169 dataset = ds.dataset(base / "parquet_dataset", format="parquet")
170 dataset.to_table(columns=['a', 'b']).to_pandas()
171
172With the ``filter`` keyword, rows which do not match the filter predicate will
173not be included in the returned table. The keyword expects a boolean
174:class:`Expression` referencing at least one of the columns:
175
176.. ipython:: python
177
178 dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
179 dataset.to_table(filter=ds.field('c') == 2).to_pandas()
180
181The easiest way to construct those :class:`Expression` objects is by using the
182:func:`field` helper function. Any column - not just partition columns - can be
183referenced using the :func:`field` function (which creates a
184:class:`FieldExpression`). Operator overloads are provided to compose filters
185including the comparisons (equal, larger/less than, etc), set membership
186testing, and boolean combinations (``&``, ``|``, ``~``):
187
188.. ipython:: python
189
190 ds.field('a') != 3
191 ds.field('a').isin([1, 2, 3])
192 (ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
193
194Note that :class:`Expression` objects can **not** be combined by python logical
195operators ``and``, ``or`` and ``not``.
196
197Projecting columns
198------------------
199
200The ``columns`` keyword can be used to read a subset of the columns of the
201dataset by passing it a list of column names. The keyword can also be used
202for more complex projections in combination with expressions.
203
204In this case, we pass it a dictionary with the keys being the resulting
205column names and the values the expression that is used to construct the column
206values:
207
208.. ipython:: python
209
210 projection = {
211 "a_renamed": ds.field("a"),
212 "b_as_float32": ds.field("b").cast("float32"),
213 "c_1": ds.field("c") == 1,
214 }
215 dataset.to_table(columns=projection).to_pandas().head()
216
217The dictionary also determines the column selection (only the keys in the
218dictionary will be present as columns in the resulting table). If you want
219to include a derived column in *addition* to the existing columns, you can
220build up the dictionary from the dataset schema:
221
222.. ipython:: python
223
224 projection = {col: ds.field(col) for col in dataset.schema.names}
225 projection.update({"b_large": ds.field("b") > 1})
226 dataset.to_table(columns=projection).to_pandas().head()
227
228
229Reading partitioned data
230------------------------
231
232Above, a dataset consisting of a flat directory with files was shown. However, a
233dataset can exploit a nested directory structure defining a partitioned dataset,
234where the sub-directory names hold information about which subset of the data is
235stored in that directory.
236
237For example, a dataset partitioned by year and month may look like on disk:
238
239.. code-block:: text
240
241 dataset_name/
242 year=2007/
243 month=01/
244 data0.parquet
245 data1.parquet
246 ...
247 month=02/
248 data0.parquet
249 data1.parquet
250 ...
251 month=03/
252 ...
253 year=2008/
254 month=01/
255 ...
256 ...
257
258The above partitioning scheme is using "/key=value/" directory names, as found
259in Apache Hive.
260
261Let's create a small partitioned dataset. The :func:`~pyarrow.parquet.write_to_dataset`
262function can write such hive-like partitioned datasets.
263
264.. ipython:: python
265
266 table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
267 'part': ['a'] * 5 + ['b'] * 5})
268 pq.write_to_dataset(table, str(base / "parquet_dataset_partitioned"),
269 partition_cols=['part'])
270
271The above created a directory with two subdirectories ("part=a" and "part=b"),
272and the Parquet files written in those directories no longer include the "part"
273column.
274
275Reading this dataset with :func:`dataset`, we now specify that the dataset
276should use a hive-like partitioning scheme with the ``partitioning`` keyword:
277
278.. ipython:: python
279
280 dataset = ds.dataset(str(base / "parquet_dataset_partitioned"), format="parquet",
281 partitioning="hive")
282 dataset.files
283
284Although the partition fields are not included in the actual Parquet files,
285they will be added back to the resulting table when scanning this dataset:
286
287.. ipython:: python
288
289 dataset.to_table().to_pandas().head(3)
290
291We can now filter on the partition keys, which avoids loading files
292altogether if they do not match the filter:
293
294.. ipython:: python
295
296 dataset.to_table(filter=ds.field("part") == "b").to_pandas()
297
298
299Different partitioning schemes
300~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
301
302The above example uses a hive-like directory scheme, such as "/year=2009/month=11/day=15".
303We specified this passing the ``partitioning="hive"`` keyword. In this case,
304the types of the partition keys are inferred from the file paths.
305
306It is also possible to explicitly define the schema of the partition keys
307using the :func:`partitioning` function. For example:
308
309.. code-block:: python
310
311 part = ds.partitioning(
312 pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
313 flavor="hive"
314 )
315 dataset = ds.dataset(..., partitioning=part)
316
317"Directory partitioning" is also supported, where the segments in the file path
318represent the values of the partition keys without including the name (the
319field name are implicit in the segment's index). For example, given field names
320"year", "month", and "day", one path might be "/2019/11/15".
321
322Since the names are not included in the file paths, these must be specified
323when constructing a directory partitioning:
324
325.. code-block:: python
326
327 part = ds.partitioning(field_names=["year", "month", "day"])
328
329Directory partitioning also supports providing a full schema rather than inferring
330types from file paths.
331
332
333Reading from cloud storage
334--------------------------
335
336In addition to local files, pyarrow also supports reading from cloud storage.
337Currently, :class:`HDFS <pyarrow.fs.HadoopFileSystem>` and
338:class:`Amazon S3-compatible storage <pyarrow.fs.S3FileSystem>` are supported.
339
340When passing a file URI, the file system will be inferred. For example,
341specifying a S3 path:
342
343.. code-block:: python
344
345 dataset = ds.dataset("s3://ursa-labs-taxi-data/", partitioning=["year", "month"])
346
347Typically, you will want to customize the connection parameters, and then
348a file system object can be created and passed to the ``filesystem`` keyword:
349
350.. code-block:: python
351
352 from pyarrow import fs
353
354 s3 = fs.S3FileSystem(region="us-east-2")
355 dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=s3,
356 partitioning=["year", "month"])
357
358The currently available classes are :class:`~pyarrow.fs.S3FileSystem` and
359:class:`~pyarrow.fs.HadoopFileSystem`. See the :ref:`filesystem` docs for more
360details.
361
362
363Reading from Minio
364------------------
365
366In addition to cloud storage, pyarrow also supports reading from a
367`MinIO <https://github.com/minio/minio>`_ object storage instance emulating S3
368APIs. Paired with `toxiproxy <https://github.com/shopify/toxiproxy>`_, this is
369useful for testing or benchmarking.
370
371.. code-block:: python
372
373 from pyarrow import fs
374
375 # By default, MinIO will listen for unencrypted HTTP traffic.
376 minio = fs.S3FileSystem(scheme="http", endpoint="localhost:9000")
377 dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=minio,
378 partitioning=["year", "month"])
379
380
381Working with Parquet Datasets
382-----------------------------
383
384While the Datasets API provides a unified interface to different file formats,
385some specific methods exist for Parquet Datasets.
386
387Some processing frameworks such as Dask (optionally) use a ``_metadata`` file
388with partitioned datasets which includes information about the schema and the
389row group metadata of the full dataset. Using such a file can give a more
390efficient creation of a parquet Dataset, since it does not need to infer the
391schema and crawl the directories for all Parquet files (this is especially the
392case for filesystems where accessing files is expensive). The
393:func:`parquet_dataset` function allows us to create a Dataset from a partitioned
394dataset with a ``_metadata`` file:
395
396.. code-block:: python
397
398 dataset = ds.parquet_dataset("/path/to/dir/_metadata")
399
400By default, the constructed :class:`Dataset` object for Parquet datasets maps
401each fragment to a single Parquet file. If you want fragments mapping to each
402row group of a Parquet file, you can use the ``split_by_row_group()`` method of
403the fragments:
404
405.. code-block:: python
406
407 fragments = list(dataset.get_fragments())
408 fragments[0].split_by_row_group()
409
410This method returns a list of new Fragments mapping to each row group of
411the original Fragment (Parquet file). Both ``get_fragments()`` and
412``split_by_row_group()`` accept an optional filter expression to get a
413filtered list of fragments.
414
415
416Manual specification of the Dataset
417-----------------------------------
418
419The :func:`dataset` function allows easy creation of a Dataset viewing a directory,
420crawling all subdirectories for files and partitioning information. However
421sometimes discovery is not required and the dataset's files and partitions
422are already known (for example, when this information is stored in metadata).
423In this case it is possible to create a Dataset explicitly without any
424automatic discovery or inference.
425
426For the example here, we are going to use a dataset where the file names contain
427additional partitioning information:
428
429.. ipython:: python
430
431 # creating a dummy dataset: directory with two files
432 table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})
433 (base / "parquet_dataset_manual").mkdir(exist_ok=True)
434 pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")
435 pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")
436
437To create a Dataset from a list of files, we need to specify the paths, schema,
438format, filesystem, and partition expressions manually:
439
440.. ipython:: python
441
442 from pyarrow import fs
443
444 schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])
445
446 dataset = ds.FileSystemDataset.from_paths(
447 ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
448 filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
449 partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
450
451Since we specified the "partition expressions" for our files, this information
452is materialized as columns when reading the data and can be used for filtering:
453
454.. ipython:: python
455
456 dataset.to_table().to_pandas()
457 dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
458
459Another benefit of manually listing the files is that the order of the files
460controls the order of the data. When performing an ordered read (or a read to
461a table) then the rows returned will match the order of the files given. This
462only applies when the dataset is constructed with a list of files. There
463are no order guarantees given when the files are instead discovered by scanning
464a directory.
465
466Iterative (out of core or streaming) reads
467------------------------------------------
468
469The previous examples have demonstrated how to read the data into a table using :func:`~Dataset.to_table`. This is
470useful if the dataset is small or there is only a small amount of data that needs to
471be read. The dataset API contains additional methods to read and process large amounts
472of data in a streaming fashion.
473
474The easiest way to do this is to use the method :meth:`Dataset.to_batches`. This
475method returns an iterator of record batches. For example, we can use this method to
476calculate the average of a column without loading the entire column into memory:
477
478.. ipython:: python
479
480 import pyarrow.compute as pc
481
482 col2_sum = 0
483 count = 0
484 for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
485 col2_sum += pc.sum(batch.column("col2")).as_py()
486 count += batch.num_rows
487 mean_a = col2_sum/count
488
489Customizing the batch size
490~~~~~~~~~~~~~~~~~~~~~~~~~~
491
492An iterative read of a dataset is often called a "scan" of the dataset and pyarrow
493uses an object called a :class:`Scanner` to do this. A Scanner is created for you
494automatically by the :func:`~Dataset.to_table` and :func:`~Dataset.to_batches` method of the dataset.
495Any arguments you pass to these methods will be passed on to the Scanner constructor.
496
497One of those parameters is the ``batch_size``. This controls the maximum size of the
498batches returned by the scanner. Batches can still be smaller than the ``batch_size``
499if the dataset consists of small files or those files themselves consist of small
500row groups. For example, a parquet file with 10,000 rows per row group will yield
501batches with, at most, 10,000 rows unless the ``batch_size`` is set to a smaller value.
502
503The default batch size is one million rows and this is typically a good default but
504you may want to customize it if you are reading a large number of columns.
505
506Writing Datasets
507----------------
508
509The dataset API also simplifies writing data to a dataset using :func:`write_dataset` . This can be useful when
510you want to partition your data or you need to write a large amount of data. A
511basic dataset write is similar to writing a table except that you specify a directory
512instead of a filename.
513
514.. ipython:: python
515
516 base = pathlib.Path(tempfile.gettempdir())
517 dataset_root = base / "sample_dataset"
518 dataset_root.mkdir(exist_ok=True)
519
520 table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})
521 ds.write_dataset(table, dataset_root, format="parquet")
522
523The above example will create a single file named part-0.parquet in our sample_dataset
524directory.
525
526.. warning::
527
528 If you run the example again it will replace the existing part-0.parquet file.
529 Appending files to an existing dataset requires specifying a new
530 ``basename_template`` for each call to ``ds.write_dataset``
531 to avoid overwrite.
532
533Writing partitioned data
534~~~~~~~~~~~~~~~~~~~~~~~~
535
536A partitioning object can be used to specify how your output data should be partitioned.
537This uses the same kind of partitioning objects we used for reading datasets. To write
538our above data out to a partitioned directory we only need to specify how we want the
539dataset to be partitioned. For example:
540
541.. ipython:: python
542
543 part = ds.partitioning(
544 pa.schema([("c", pa.int16())]), flavor="hive"
545 )
546 ds.write_dataset(table, dataset_root, format="parquet", partitioning=part)
547
548This will create two files. Half our data will be in the dataset_root/c=1 directory and
549the other half will be in the dataset_root/c=2 directory.
550
551Writing large amounts of data
552~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
553
554The above examples wrote data from a table. If you are writing a large amount of data
555you may not be able to load everything into a single in-memory table. Fortunately, the
556:func:`~Dataset.write_dataset` method also accepts an iterable of record batches. This makes it really
557simple, for example, to repartition a large dataset without loading the entire dataset
558into memory:
559
560.. ipython:: python
561
562 old_part = ds.partitioning(
563 pa.schema([("c", pa.int16())]), flavor="hive"
564 )
565 new_part = ds.partitioning(
566 pa.schema([("c", pa.int16())]), flavor=None
567 )
568 input_dataset = ds.dataset(dataset_root, partitioning=old_part)
569 new_root = base / "repartitioned_dataset"
570 # A scanner can act as an iterator of record batches but you could also receive
571 # data from the network (e.g. via flight), from your own scanning, or from any
572 # other method that yields record batches. In addition, you can pass a dataset
573 # into write_dataset directly but this method is useful if you want to customize
574 # the scanner (e.g. to filter the input dataset or set a maximum batch size)
575 scanner = input_dataset.scanner(use_async=True)
576
577 ds.write_dataset(scanner, new_root, format="parquet", partitioning=new_part)
578
579After the above example runs our data will be in dataset_root/1 and dataset_root/2
580directories. In this simple example we are not changing the structure of the data
581(only the directory naming schema) but you could also use this mechnaism to change
582which columns are used to partition the dataset. This is useful when you expect to
583query your data in specific ways and you can utilize partitioning to reduce the
584amount of data you need to read.
585
586Customizing & inspecting written files
587~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
588
589By default the dataset API will create files named "part-i.format" where "i" is a integer
590generated during the write and "format" is the file format specified in the write_dataset
591call. For simple datasets it may be possible to know which files will be created but for
592larger or partitioned datasets it is not so easy. The ``file_visitor`` keyword can be used
593to supply a visitor that will be called as each file is created:
594
595.. ipython:: python
596
597 def file_visitor(written_file):
598 print(f"path={written_file.path}")
599 print(f"metadata={written_file.metadata}")
600
601.. ipython:: python
602
603 ds.write_dataset(table, base / "dataset_visited", format="parquet", partitioning=part,
604 file_visitor=file_visitor)
605
606This will allow you to collect the filenames that belong to the dataset and store them elsewhere
607which can be useful when you want to avoid scanning directories the next time you need to read
608the data. It can also be used to generate the _metadata index file used by other tools such as
609dask or spark to create an index of the dataset.
610
611Configuring format-specific parameters during a write
612~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
613
614In addition to the common options shared by all formats there are also format specific options
615that are unique to a particular format. For example, to allow truncated timestamps while writing
616Parquet files:
617
618.. ipython:: python
619
620 dataset_root = base / "sample_dataset2"
621 dataset_root.mkdir(exist_ok=True)
622
623 parquet_format = ds.ParquetFileFormat()
624 write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)
625 ds.write_dataset(table, dataset_root, format="parquet", partitioning=part,
626 file_options=write_options)