]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | .. Licensed to the Apache Software Foundation (ASF) under one |
2 | .. or more contributor license agreements. See the NOTICE file | |
3 | .. distributed with this work for additional information | |
4 | .. regarding copyright ownership. The ASF licenses this file | |
5 | .. to you under the Apache License, Version 2.0 (the | |
6 | .. "License"); you may not use this file except in compliance | |
7 | .. with the License. You may obtain a copy of the License at | |
8 | ||
9 | .. http://www.apache.org/licenses/LICENSE-2.0 | |
10 | ||
11 | .. Unless required by applicable law or agreed to in writing, | |
12 | .. software distributed under the License is distributed on an | |
13 | .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | .. KIND, either express or implied. See the License for the | |
15 | .. specific language governing permissions and limitations | |
16 | .. under the License. | |
17 | ||
18 | .. currentmodule:: pyarrow.dataset | |
19 | ||
20 | .. _dataset: | |
21 | ||
22 | Tabular Datasets | |
23 | ================ | |
24 | ||
25 | .. warning:: | |
26 | ||
27 | The ``pyarrow.dataset`` module is experimental (specifically the classes), | |
28 | and a stable API is not yet guaranteed. | |
29 | ||
30 | The ``pyarrow.dataset`` module provides functionality to efficiently work with | |
31 | tabular, potentially larger than memory, and multi-file datasets. This includes: | |
32 | ||
33 | * A unified interface that supports different sources and file formats | |
34 | (Parquet, ORC, Feather / Arrow IPC, and CSV files) and different file systems | |
35 | (local, cloud). | |
36 | * Discovery of sources (crawling directories, handle directory-based partitioned | |
37 | datasets, basic schema normalization, ..) | |
38 | * Optimized reading with predicate pushdown (filtering rows), projection | |
39 | (selecting and deriving columns), and optionally parallel reading. | |
40 | ||
41 | Currently, only Parquet, ORC, Feather / Arrow IPC, and CSV files are | |
42 | supported. The goal is to expand this in the future to other file formats and | |
43 | data sources (e.g. database connections). | |
44 | ||
45 | For those familiar with the existing :class:`pyarrow.parquet.ParquetDataset` for | |
46 | reading Parquet datasets: ``pyarrow.dataset``'s goal is similar but not specific | |
47 | to the Parquet format and not tied to Python: the same datasets API is exposed | |
48 | in the R bindings or Arrow. In addition ``pyarrow.dataset`` boasts improved | |
49 | performance and new features (e.g. filtering within files rather than only on | |
50 | partition keys). | |
51 | ||
52 | ||
53 | Reading Datasets | |
54 | ---------------- | |
55 | ||
56 | .. TODO Full blown example with NYC taxi data to show off, afterwards explain all parts: | |
57 | ||
58 | For the examples below, let's create a small dataset consisting | |
59 | of a directory with two parquet files: | |
60 | ||
61 | .. ipython:: python | |
62 | ||
63 | import tempfile | |
64 | import pathlib | |
65 | import pyarrow as pa | |
66 | import pyarrow.parquet as pq | |
67 | import numpy as np | |
68 | ||
69 | base = pathlib.Path(tempfile.gettempdir()) | |
70 | (base / "parquet_dataset").mkdir(exist_ok=True) | |
71 | ||
72 | # creating an Arrow Table | |
73 | table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5}) | |
74 | ||
75 | # writing it into two parquet files | |
76 | pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet") | |
77 | pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet") | |
78 | ||
79 | Dataset discovery | |
80 | ~~~~~~~~~~~~~~~~~ | |
81 | ||
82 | A :class:`Dataset` object can be created with the :func:`dataset` function. We | |
83 | can pass it the path to the directory containing the data files: | |
84 | ||
85 | .. ipython:: python | |
86 | ||
87 | import pyarrow.dataset as ds | |
88 | dataset = ds.dataset(base / "parquet_dataset", format="parquet") | |
89 | dataset | |
90 | ||
91 | In addition to searching a base directory, :func:`dataset` accepts a path to a | |
92 | single file or a list of file paths. | |
93 | ||
94 | Creating a :class:`Dataset` object does not begin reading the data itself. If | |
95 | needed, it only crawls the directory to find all the files: | |
96 | ||
97 | .. ipython:: python | |
98 | ||
99 | dataset.files | |
100 | ||
101 | ... and infers the dataset's schema (by default from the first file): | |
102 | ||
103 | .. ipython:: python | |
104 | ||
105 | print(dataset.schema.to_string(show_field_metadata=False)) | |
106 | ||
107 | Using the :meth:`Dataset.to_table` method we can read the dataset (or a portion | |
108 | of it) into a pyarrow Table (note that depending on the size of your dataset | |
109 | this can require a lot of memory, see below on filtering / iterative loading): | |
110 | ||
111 | .. ipython:: python | |
112 | ||
113 | dataset.to_table() | |
114 | # converting to pandas to see the contents of the scanned table | |
115 | dataset.to_table().to_pandas() | |
116 | ||
117 | Reading different file formats | |
118 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
119 | ||
120 | The above examples use Parquet files as dataset sources but the Dataset API | |
121 | provides a consistent interface across multiple file formats and filesystems. | |
122 | Currently, Parquet, ORC, Feather / Arrow IPC, and CSV file formats are | |
123 | supported; more formats are planned in the future. | |
124 | ||
125 | If we save the table as Feather files instead of Parquet files: | |
126 | ||
127 | .. ipython:: python | |
128 | ||
129 | import pyarrow.feather as feather | |
130 | ||
131 | feather.write_feather(table, base / "data.feather") | |
132 | ||
133 | …then we can read the Feather file using the same functions, but with specifying | |
134 | ``format="feather"``: | |
135 | ||
136 | .. ipython:: python | |
137 | ||
138 | dataset = ds.dataset(base / "data.feather", format="feather") | |
139 | dataset.to_table().to_pandas().head() | |
140 | ||
141 | Customizing file formats | |
142 | ~~~~~~~~~~~~~~~~~~~~~~~~ | |
143 | ||
144 | The format name as a string, like:: | |
145 | ||
146 | ds.dataset(..., format="parquet") | |
147 | ||
148 | is short hand for a default constructed :class:`ParquetFileFormat`:: | |
149 | ||
150 | ds.dataset(..., format=ds.ParquetFileFormat()) | |
151 | ||
152 | The :class:`FileFormat` objects can be customized using keywords. For example:: | |
153 | ||
154 | parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']}) | |
155 | ds.dataset(..., format=parquet_format) | |
156 | ||
157 | Will configure column ``"a"`` to be dictionary encoded on scan. | |
158 | ||
159 | Filtering data | |
160 | -------------- | |
161 | ||
162 | To avoid reading all data when only needing a subset, the ``columns`` and | |
163 | ``filter`` keywords can be used. | |
164 | ||
165 | The ``columns`` keyword can be used to only read the specified columns: | |
166 | ||
167 | .. ipython:: python | |
168 | ||
169 | dataset = ds.dataset(base / "parquet_dataset", format="parquet") | |
170 | dataset.to_table(columns=['a', 'b']).to_pandas() | |
171 | ||
172 | With the ``filter`` keyword, rows which do not match the filter predicate will | |
173 | not be included in the returned table. The keyword expects a boolean | |
174 | :class:`Expression` referencing at least one of the columns: | |
175 | ||
176 | .. ipython:: python | |
177 | ||
178 | dataset.to_table(filter=ds.field('a') >= 7).to_pandas() | |
179 | dataset.to_table(filter=ds.field('c') == 2).to_pandas() | |
180 | ||
181 | The easiest way to construct those :class:`Expression` objects is by using the | |
182 | :func:`field` helper function. Any column - not just partition columns - can be | |
183 | referenced using the :func:`field` function (which creates a | |
184 | :class:`FieldExpression`). Operator overloads are provided to compose filters | |
185 | including the comparisons (equal, larger/less than, etc), set membership | |
186 | testing, and boolean combinations (``&``, ``|``, ``~``): | |
187 | ||
188 | .. ipython:: python | |
189 | ||
190 | ds.field('a') != 3 | |
191 | ds.field('a').isin([1, 2, 3]) | |
192 | (ds.field('a') > ds.field('b')) & (ds.field('b') > 1) | |
193 | ||
194 | Note that :class:`Expression` objects can **not** be combined by python logical | |
195 | operators ``and``, ``or`` and ``not``. | |
196 | ||
197 | Projecting columns | |
198 | ------------------ | |
199 | ||
200 | The ``columns`` keyword can be used to read a subset of the columns of the | |
201 | dataset by passing it a list of column names. The keyword can also be used | |
202 | for more complex projections in combination with expressions. | |
203 | ||
204 | In this case, we pass it a dictionary with the keys being the resulting | |
205 | column names and the values the expression that is used to construct the column | |
206 | values: | |
207 | ||
208 | .. ipython:: python | |
209 | ||
210 | projection = { | |
211 | "a_renamed": ds.field("a"), | |
212 | "b_as_float32": ds.field("b").cast("float32"), | |
213 | "c_1": ds.field("c") == 1, | |
214 | } | |
215 | dataset.to_table(columns=projection).to_pandas().head() | |
216 | ||
217 | The dictionary also determines the column selection (only the keys in the | |
218 | dictionary will be present as columns in the resulting table). If you want | |
219 | to include a derived column in *addition* to the existing columns, you can | |
220 | build up the dictionary from the dataset schema: | |
221 | ||
222 | .. ipython:: python | |
223 | ||
224 | projection = {col: ds.field(col) for col in dataset.schema.names} | |
225 | projection.update({"b_large": ds.field("b") > 1}) | |
226 | dataset.to_table(columns=projection).to_pandas().head() | |
227 | ||
228 | ||
229 | Reading partitioned data | |
230 | ------------------------ | |
231 | ||
232 | Above, a dataset consisting of a flat directory with files was shown. However, a | |
233 | dataset can exploit a nested directory structure defining a partitioned dataset, | |
234 | where the sub-directory names hold information about which subset of the data is | |
235 | stored in that directory. | |
236 | ||
237 | For example, a dataset partitioned by year and month may look like on disk: | |
238 | ||
239 | .. code-block:: text | |
240 | ||
241 | dataset_name/ | |
242 | year=2007/ | |
243 | month=01/ | |
244 | data0.parquet | |
245 | data1.parquet | |
246 | ... | |
247 | month=02/ | |
248 | data0.parquet | |
249 | data1.parquet | |
250 | ... | |
251 | month=03/ | |
252 | ... | |
253 | year=2008/ | |
254 | month=01/ | |
255 | ... | |
256 | ... | |
257 | ||
258 | The above partitioning scheme is using "/key=value/" directory names, as found | |
259 | in Apache Hive. | |
260 | ||
261 | Let's create a small partitioned dataset. The :func:`~pyarrow.parquet.write_to_dataset` | |
262 | function can write such hive-like partitioned datasets. | |
263 | ||
264 | .. ipython:: python | |
265 | ||
266 | table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5, | |
267 | 'part': ['a'] * 5 + ['b'] * 5}) | |
268 | pq.write_to_dataset(table, str(base / "parquet_dataset_partitioned"), | |
269 | partition_cols=['part']) | |
270 | ||
271 | The above created a directory with two subdirectories ("part=a" and "part=b"), | |
272 | and the Parquet files written in those directories no longer include the "part" | |
273 | column. | |
274 | ||
275 | Reading this dataset with :func:`dataset`, we now specify that the dataset | |
276 | should use a hive-like partitioning scheme with the ``partitioning`` keyword: | |
277 | ||
278 | .. ipython:: python | |
279 | ||
280 | dataset = ds.dataset(str(base / "parquet_dataset_partitioned"), format="parquet", | |
281 | partitioning="hive") | |
282 | dataset.files | |
283 | ||
284 | Although the partition fields are not included in the actual Parquet files, | |
285 | they will be added back to the resulting table when scanning this dataset: | |
286 | ||
287 | .. ipython:: python | |
288 | ||
289 | dataset.to_table().to_pandas().head(3) | |
290 | ||
291 | We can now filter on the partition keys, which avoids loading files | |
292 | altogether if they do not match the filter: | |
293 | ||
294 | .. ipython:: python | |
295 | ||
296 | dataset.to_table(filter=ds.field("part") == "b").to_pandas() | |
297 | ||
298 | ||
299 | Different partitioning schemes | |
300 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
301 | ||
302 | The above example uses a hive-like directory scheme, such as "/year=2009/month=11/day=15". | |
303 | We specified this passing the ``partitioning="hive"`` keyword. In this case, | |
304 | the types of the partition keys are inferred from the file paths. | |
305 | ||
306 | It is also possible to explicitly define the schema of the partition keys | |
307 | using the :func:`partitioning` function. For example: | |
308 | ||
309 | .. code-block:: python | |
310 | ||
311 | part = ds.partitioning( | |
312 | pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]), | |
313 | flavor="hive" | |
314 | ) | |
315 | dataset = ds.dataset(..., partitioning=part) | |
316 | ||
317 | "Directory partitioning" is also supported, where the segments in the file path | |
318 | represent the values of the partition keys without including the name (the | |
319 | field name are implicit in the segment's index). For example, given field names | |
320 | "year", "month", and "day", one path might be "/2019/11/15". | |
321 | ||
322 | Since the names are not included in the file paths, these must be specified | |
323 | when constructing a directory partitioning: | |
324 | ||
325 | .. code-block:: python | |
326 | ||
327 | part = ds.partitioning(field_names=["year", "month", "day"]) | |
328 | ||
329 | Directory partitioning also supports providing a full schema rather than inferring | |
330 | types from file paths. | |
331 | ||
332 | ||
333 | Reading from cloud storage | |
334 | -------------------------- | |
335 | ||
336 | In addition to local files, pyarrow also supports reading from cloud storage. | |
337 | Currently, :class:`HDFS <pyarrow.fs.HadoopFileSystem>` and | |
338 | :class:`Amazon S3-compatible storage <pyarrow.fs.S3FileSystem>` are supported. | |
339 | ||
340 | When passing a file URI, the file system will be inferred. For example, | |
341 | specifying a S3 path: | |
342 | ||
343 | .. code-block:: python | |
344 | ||
345 | dataset = ds.dataset("s3://ursa-labs-taxi-data/", partitioning=["year", "month"]) | |
346 | ||
347 | Typically, you will want to customize the connection parameters, and then | |
348 | a file system object can be created and passed to the ``filesystem`` keyword: | |
349 | ||
350 | .. code-block:: python | |
351 | ||
352 | from pyarrow import fs | |
353 | ||
354 | s3 = fs.S3FileSystem(region="us-east-2") | |
355 | dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=s3, | |
356 | partitioning=["year", "month"]) | |
357 | ||
358 | The currently available classes are :class:`~pyarrow.fs.S3FileSystem` and | |
359 | :class:`~pyarrow.fs.HadoopFileSystem`. See the :ref:`filesystem` docs for more | |
360 | details. | |
361 | ||
362 | ||
363 | Reading from Minio | |
364 | ------------------ | |
365 | ||
366 | In addition to cloud storage, pyarrow also supports reading from a | |
367 | `MinIO <https://github.com/minio/minio>`_ object storage instance emulating S3 | |
368 | APIs. Paired with `toxiproxy <https://github.com/shopify/toxiproxy>`_, this is | |
369 | useful for testing or benchmarking. | |
370 | ||
371 | .. code-block:: python | |
372 | ||
373 | from pyarrow import fs | |
374 | ||
375 | # By default, MinIO will listen for unencrypted HTTP traffic. | |
376 | minio = fs.S3FileSystem(scheme="http", endpoint="localhost:9000") | |
377 | dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=minio, | |
378 | partitioning=["year", "month"]) | |
379 | ||
380 | ||
381 | Working with Parquet Datasets | |
382 | ----------------------------- | |
383 | ||
384 | While the Datasets API provides a unified interface to different file formats, | |
385 | some specific methods exist for Parquet Datasets. | |
386 | ||
387 | Some processing frameworks such as Dask (optionally) use a ``_metadata`` file | |
388 | with partitioned datasets which includes information about the schema and the | |
389 | row group metadata of the full dataset. Using such a file can give a more | |
390 | efficient creation of a parquet Dataset, since it does not need to infer the | |
391 | schema and crawl the directories for all Parquet files (this is especially the | |
392 | case for filesystems where accessing files is expensive). The | |
393 | :func:`parquet_dataset` function allows us to create a Dataset from a partitioned | |
394 | dataset with a ``_metadata`` file: | |
395 | ||
396 | .. code-block:: python | |
397 | ||
398 | dataset = ds.parquet_dataset("/path/to/dir/_metadata") | |
399 | ||
400 | By default, the constructed :class:`Dataset` object for Parquet datasets maps | |
401 | each fragment to a single Parquet file. If you want fragments mapping to each | |
402 | row group of a Parquet file, you can use the ``split_by_row_group()`` method of | |
403 | the fragments: | |
404 | ||
405 | .. code-block:: python | |
406 | ||
407 | fragments = list(dataset.get_fragments()) | |
408 | fragments[0].split_by_row_group() | |
409 | ||
410 | This method returns a list of new Fragments mapping to each row group of | |
411 | the original Fragment (Parquet file). Both ``get_fragments()`` and | |
412 | ``split_by_row_group()`` accept an optional filter expression to get a | |
413 | filtered list of fragments. | |
414 | ||
415 | ||
416 | Manual specification of the Dataset | |
417 | ----------------------------------- | |
418 | ||
419 | The :func:`dataset` function allows easy creation of a Dataset viewing a directory, | |
420 | crawling all subdirectories for files and partitioning information. However | |
421 | sometimes discovery is not required and the dataset's files and partitions | |
422 | are already known (for example, when this information is stored in metadata). | |
423 | In this case it is possible to create a Dataset explicitly without any | |
424 | automatic discovery or inference. | |
425 | ||
426 | For the example here, we are going to use a dataset where the file names contain | |
427 | additional partitioning information: | |
428 | ||
429 | .. ipython:: python | |
430 | ||
431 | # creating a dummy dataset: directory with two files | |
432 | table = pa.table({'col1': range(3), 'col2': np.random.randn(3)}) | |
433 | (base / "parquet_dataset_manual").mkdir(exist_ok=True) | |
434 | pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet") | |
435 | pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet") | |
436 | ||
437 | To create a Dataset from a list of files, we need to specify the paths, schema, | |
438 | format, filesystem, and partition expressions manually: | |
439 | ||
440 | .. ipython:: python | |
441 | ||
442 | from pyarrow import fs | |
443 | ||
444 | schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())]) | |
445 | ||
446 | dataset = ds.FileSystemDataset.from_paths( | |
447 | ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(), | |
448 | filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()), | |
449 | partitions=[ds.field('year') == 2018, ds.field('year') == 2019]) | |
450 | ||
451 | Since we specified the "partition expressions" for our files, this information | |
452 | is materialized as columns when reading the data and can be used for filtering: | |
453 | ||
454 | .. ipython:: python | |
455 | ||
456 | dataset.to_table().to_pandas() | |
457 | dataset.to_table(filter=ds.field('year') == 2019).to_pandas() | |
458 | ||
459 | Another benefit of manually listing the files is that the order of the files | |
460 | controls the order of the data. When performing an ordered read (or a read to | |
461 | a table) then the rows returned will match the order of the files given. This | |
462 | only applies when the dataset is constructed with a list of files. There | |
463 | are no order guarantees given when the files are instead discovered by scanning | |
464 | a directory. | |
465 | ||
466 | Iterative (out of core or streaming) reads | |
467 | ------------------------------------------ | |
468 | ||
469 | The previous examples have demonstrated how to read the data into a table using :func:`~Dataset.to_table`. This is | |
470 | useful if the dataset is small or there is only a small amount of data that needs to | |
471 | be read. The dataset API contains additional methods to read and process large amounts | |
472 | of data in a streaming fashion. | |
473 | ||
474 | The easiest way to do this is to use the method :meth:`Dataset.to_batches`. This | |
475 | method returns an iterator of record batches. For example, we can use this method to | |
476 | calculate the average of a column without loading the entire column into memory: | |
477 | ||
478 | .. ipython:: python | |
479 | ||
480 | import pyarrow.compute as pc | |
481 | ||
482 | col2_sum = 0 | |
483 | count = 0 | |
484 | for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()): | |
485 | col2_sum += pc.sum(batch.column("col2")).as_py() | |
486 | count += batch.num_rows | |
487 | mean_a = col2_sum/count | |
488 | ||
489 | Customizing the batch size | |
490 | ~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
491 | ||
492 | An iterative read of a dataset is often called a "scan" of the dataset and pyarrow | |
493 | uses an object called a :class:`Scanner` to do this. A Scanner is created for you | |
494 | automatically by the :func:`~Dataset.to_table` and :func:`~Dataset.to_batches` method of the dataset. | |
495 | Any arguments you pass to these methods will be passed on to the Scanner constructor. | |
496 | ||
497 | One of those parameters is the ``batch_size``. This controls the maximum size of the | |
498 | batches returned by the scanner. Batches can still be smaller than the ``batch_size`` | |
499 | if the dataset consists of small files or those files themselves consist of small | |
500 | row groups. For example, a parquet file with 10,000 rows per row group will yield | |
501 | batches with, at most, 10,000 rows unless the ``batch_size`` is set to a smaller value. | |
502 | ||
503 | The default batch size is one million rows and this is typically a good default but | |
504 | you may want to customize it if you are reading a large number of columns. | |
505 | ||
506 | Writing Datasets | |
507 | ---------------- | |
508 | ||
509 | The dataset API also simplifies writing data to a dataset using :func:`write_dataset` . This can be useful when | |
510 | you want to partition your data or you need to write a large amount of data. A | |
511 | basic dataset write is similar to writing a table except that you specify a directory | |
512 | instead of a filename. | |
513 | ||
514 | .. ipython:: python | |
515 | ||
516 | base = pathlib.Path(tempfile.gettempdir()) | |
517 | dataset_root = base / "sample_dataset" | |
518 | dataset_root.mkdir(exist_ok=True) | |
519 | ||
520 | table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5}) | |
521 | ds.write_dataset(table, dataset_root, format="parquet") | |
522 | ||
523 | The above example will create a single file named part-0.parquet in our sample_dataset | |
524 | directory. | |
525 | ||
526 | .. warning:: | |
527 | ||
528 | If you run the example again it will replace the existing part-0.parquet file. | |
529 | Appending files to an existing dataset requires specifying a new | |
530 | ``basename_template`` for each call to ``ds.write_dataset`` | |
531 | to avoid overwrite. | |
532 | ||
533 | Writing partitioned data | |
534 | ~~~~~~~~~~~~~~~~~~~~~~~~ | |
535 | ||
536 | A partitioning object can be used to specify how your output data should be partitioned. | |
537 | This uses the same kind of partitioning objects we used for reading datasets. To write | |
538 | our above data out to a partitioned directory we only need to specify how we want the | |
539 | dataset to be partitioned. For example: | |
540 | ||
541 | .. ipython:: python | |
542 | ||
543 | part = ds.partitioning( | |
544 | pa.schema([("c", pa.int16())]), flavor="hive" | |
545 | ) | |
546 | ds.write_dataset(table, dataset_root, format="parquet", partitioning=part) | |
547 | ||
548 | This will create two files. Half our data will be in the dataset_root/c=1 directory and | |
549 | the other half will be in the dataset_root/c=2 directory. | |
550 | ||
551 | Writing large amounts of data | |
552 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
553 | ||
554 | The above examples wrote data from a table. If you are writing a large amount of data | |
555 | you may not be able to load everything into a single in-memory table. Fortunately, the | |
556 | :func:`~Dataset.write_dataset` method also accepts an iterable of record batches. This makes it really | |
557 | simple, for example, to repartition a large dataset without loading the entire dataset | |
558 | into memory: | |
559 | ||
560 | .. ipython:: python | |
561 | ||
562 | old_part = ds.partitioning( | |
563 | pa.schema([("c", pa.int16())]), flavor="hive" | |
564 | ) | |
565 | new_part = ds.partitioning( | |
566 | pa.schema([("c", pa.int16())]), flavor=None | |
567 | ) | |
568 | input_dataset = ds.dataset(dataset_root, partitioning=old_part) | |
569 | new_root = base / "repartitioned_dataset" | |
570 | # A scanner can act as an iterator of record batches but you could also receive | |
571 | # data from the network (e.g. via flight), from your own scanning, or from any | |
572 | # other method that yields record batches. In addition, you can pass a dataset | |
573 | # into write_dataset directly but this method is useful if you want to customize | |
574 | # the scanner (e.g. to filter the input dataset or set a maximum batch size) | |
575 | scanner = input_dataset.scanner(use_async=True) | |
576 | ||
577 | ds.write_dataset(scanner, new_root, format="parquet", partitioning=new_part) | |
578 | ||
579 | After the above example runs our data will be in dataset_root/1 and dataset_root/2 | |
580 | directories. In this simple example we are not changing the structure of the data | |
581 | (only the directory naming schema) but you could also use this mechnaism to change | |
582 | which columns are used to partition the dataset. This is useful when you expect to | |
583 | query your data in specific ways and you can utilize partitioning to reduce the | |
584 | amount of data you need to read. | |
585 | ||
586 | Customizing & inspecting written files | |
587 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
588 | ||
589 | By default the dataset API will create files named "part-i.format" where "i" is a integer | |
590 | generated during the write and "format" is the file format specified in the write_dataset | |
591 | call. For simple datasets it may be possible to know which files will be created but for | |
592 | larger or partitioned datasets it is not so easy. The ``file_visitor`` keyword can be used | |
593 | to supply a visitor that will be called as each file is created: | |
594 | ||
595 | .. ipython:: python | |
596 | ||
597 | def file_visitor(written_file): | |
598 | print(f"path={written_file.path}") | |
599 | print(f"metadata={written_file.metadata}") | |
600 | ||
601 | .. ipython:: python | |
602 | ||
603 | ds.write_dataset(table, base / "dataset_visited", format="parquet", partitioning=part, | |
604 | file_visitor=file_visitor) | |
605 | ||
606 | This will allow you to collect the filenames that belong to the dataset and store them elsewhere | |
607 | which can be useful when you want to avoid scanning directories the next time you need to read | |
608 | the data. It can also be used to generate the _metadata index file used by other tools such as | |
609 | dask or spark to create an index of the dataset. | |
610 | ||
611 | Configuring format-specific parameters during a write | |
612 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
613 | ||
614 | In addition to the common options shared by all formats there are also format specific options | |
615 | that are unique to a particular format. For example, to allow truncated timestamps while writing | |
616 | Parquet files: | |
617 | ||
618 | .. ipython:: python | |
619 | ||
620 | dataset_root = base / "sample_dataset2" | |
621 | dataset_root.mkdir(exist_ok=True) | |
622 | ||
623 | parquet_format = ds.ParquetFileFormat() | |
624 | write_options = parquet_format.make_write_options(allow_truncated_timestamps=True) | |
625 | ds.write_dataset(table, dataset_root, format="parquet", partitioning=part, | |
626 | file_options=write_options) |