]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/r/R/parquet.R
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / r / R / parquet.R
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 #' Read a Parquet file
19 #'
20 #' '[Parquet](https://parquet.apache.org/)' is a columnar storage file format.
21 #' This function enables you to read Parquet files into R.
22 #'
23 #' @inheritParams read_feather
24 #' @param props [ParquetArrowReaderProperties]
25 #' @param ... Additional arguments passed to `ParquetFileReader$create()`
26 #'
27 #' @return A [arrow::Table][Table], or a `data.frame` if `as_data_frame` is
28 #' `TRUE` (the default).
29 #' @examplesIf arrow_with_parquet()
30 #' tf <- tempfile()
31 #' on.exit(unlink(tf))
32 #' write_parquet(mtcars, tf)
33 #' df <- read_parquet(tf, col_select = starts_with("d"))
34 #' head(df)
35 #' @export
36 read_parquet <- function(file,
37 col_select = NULL,
38 as_data_frame = TRUE,
39 props = ParquetArrowReaderProperties$create(),
40 ...) {
41 if (is.string(file)) {
42 file <- make_readable_file(file)
43 on.exit(file$close())
44 }
45 reader <- ParquetFileReader$create(file, props = props, ...)
46
47 col_select <- enquo(col_select)
48 if (!quo_is_null(col_select)) {
49 # infer which columns to keep from schema
50 schema <- reader$GetSchema()
51 names <- names(schema)
52 indices <- match(vars_select(names, !!col_select), names) - 1L
53 tab <- tryCatch(
54 reader$ReadTable(indices),
55 error = read_compressed_error
56 )
57 } else {
58 # read all columns
59 tab <- tryCatch(
60 reader$ReadTable(),
61 error = read_compressed_error
62 )
63 }
64
65 if (as_data_frame) {
66 tab <- as.data.frame(tab)
67 }
68 tab
69 }
70
71 #' Write Parquet file to disk
72 #'
73 #' [Parquet](https://parquet.apache.org/) is a columnar storage file format.
74 #' This function enables you to write Parquet files from R.
75 #'
76 #' Due to features of the format, Parquet files cannot be appended to.
77 #' If you want to use the Parquet format but also want the ability to extend
78 #' your dataset, you can write to additional Parquet files and then treat
79 #' the whole directory of files as a [Dataset] you can query.
80 #' See `vignette("dataset", package = "arrow")` for examples of this.
81 #'
82 #' @param x `data.frame`, [RecordBatch], or [Table]
83 #' @param sink A string file path, URI, or [OutputStream], or path in a file
84 #' system (`SubTreeFileSystem`)
85 #' @param chunk_size chunk size in number of rows. If NULL, the total number of rows is used.
86 #' @param version parquet version, "1.0" or "2.0". Default "1.0". Numeric values
87 #' are coerced to character.
88 #' @param compression compression algorithm. Default "snappy". See details.
89 #' @param compression_level compression level. Meaning depends on compression algorithm
90 #' @param use_dictionary Specify if we should use dictionary encoding. Default `TRUE`
91 #' @param write_statistics Specify if we should write statistics. Default `TRUE`
92 #' @param data_page_size Set a target threshold for the approximate encoded
93 #' size of data pages within a column chunk (in bytes). Default 1 MiB.
94 #' @param use_deprecated_int96_timestamps Write timestamps to INT96 Parquet format. Default `FALSE`.
95 #' @param coerce_timestamps Cast timestamps a particular resolution. Can be
96 #' `NULL`, "ms" or "us". Default `NULL` (no casting)
97 #' @param allow_truncated_timestamps Allow loss of data when coercing timestamps to a
98 #' particular resolution. E.g. if microsecond or nanosecond data is lost when coercing
99 #' to "ms", do not raise an exception
100 #' @param properties A `ParquetWriterProperties` object, used instead of the options
101 #' enumerated in this function's signature. Providing `properties` as an argument
102 #' is deprecated; if you need to assemble `ParquetWriterProperties` outside
103 #' of `write_parquet()`, use `ParquetFileWriter` instead.
104 #' @param arrow_properties A `ParquetArrowWriterProperties` object. Like
105 #' `properties`, this argument is deprecated.
106 #'
107 #' @details The parameters `compression`, `compression_level`, `use_dictionary` and
108 #' `write_statistics` support various patterns:
109 #'
110 #' - The default `NULL` leaves the parameter unspecified, and the C++ library
111 #' uses an appropriate default for each column (defaults listed above)
112 #' - A single, unnamed, value (e.g. a single string for `compression`) applies to all columns
113 #' - An unnamed vector, of the same size as the number of columns, to specify a
114 #' value for each column, in positional order
115 #' - A named vector, to specify the value for the named columns, the default
116 #' value for the setting is used when not supplied
117 #'
118 #' The `compression` argument can be any of the following (case insensitive):
119 #' "uncompressed", "snappy", "gzip", "brotli", "zstd", "lz4", "lzo" or "bz2".
120 #' Only "uncompressed" is guaranteed to be available, but "snappy" and "gzip"
121 #' are almost always included. See [codec_is_available()].
122 #' The default "snappy" is used if available, otherwise "uncompressed". To
123 #' disable compression, set `compression = "uncompressed"`.
124 #' Note that "uncompressed" columns may still have dictionary encoding.
125 #'
126 #' @return the input `x` invisibly.
127 #'
128 #' @examplesIf arrow_with_parquet()
129 #' tf1 <- tempfile(fileext = ".parquet")
130 #' write_parquet(data.frame(x = 1:5), tf1)
131 #'
132 #' # using compression
133 #' if (codec_is_available("gzip")) {
134 #' tf2 <- tempfile(fileext = ".gz.parquet")
135 #' write_parquet(data.frame(x = 1:5), tf2, compression = "gzip", compression_level = 5)
136 #' }
137 #' @export
138 write_parquet <- function(x,
139 sink,
140 chunk_size = NULL,
141 # writer properties
142 version = NULL,
143 compression = default_parquet_compression(),
144 compression_level = NULL,
145 use_dictionary = NULL,
146 write_statistics = NULL,
147 data_page_size = NULL,
148 # arrow writer properties
149 use_deprecated_int96_timestamps = FALSE,
150 coerce_timestamps = NULL,
151 allow_truncated_timestamps = FALSE,
152 properties = NULL,
153 arrow_properties = NULL) {
154 x_out <- x
155
156 if (is.data.frame(x) || inherits(x, "RecordBatch")) {
157 x <- Table$create(x)
158 }
159
160 assert_that(is_writable_table(x))
161
162 if (!inherits(sink, "OutputStream")) {
163 sink <- make_output_stream(sink)
164 on.exit(sink$close())
165 }
166
167 # Deprecation warnings
168 if (!is.null(properties)) {
169 warning(
170 "Providing 'properties' is deprecated. If you need to assemble properties outside ",
171 "this function, use ParquetFileWriter instead."
172 )
173 }
174 if (!is.null(arrow_properties)) {
175 warning(
176 "Providing 'arrow_properties' is deprecated. If you need to assemble arrow_properties ",
177 "outside this function, use ParquetFileWriter instead."
178 )
179 }
180
181 writer <- ParquetFileWriter$create(
182 x$schema,
183 sink,
184 properties = properties %||% ParquetWriterProperties$create(
185 x,
186 version = version,
187 compression = compression,
188 compression_level = compression_level,
189 use_dictionary = use_dictionary,
190 write_statistics = write_statistics,
191 data_page_size = data_page_size
192 ),
193 arrow_properties = arrow_properties %||% ParquetArrowWriterProperties$create(
194 use_deprecated_int96_timestamps = use_deprecated_int96_timestamps,
195 coerce_timestamps = coerce_timestamps,
196 allow_truncated_timestamps = allow_truncated_timestamps
197 )
198 )
199 writer$WriteTable(x, chunk_size = chunk_size %||% x$num_rows)
200 writer$Close()
201
202 invisible(x_out)
203 }
204
205 default_parquet_compression <- function() {
206 # Match the pyarrow default (overriding the C++ default)
207 if (codec_is_available("snappy")) {
208 "snappy"
209 } else {
210 NULL
211 }
212 }
213
214 ParquetArrowWriterProperties <- R6Class("ParquetArrowWriterProperties", inherit = ArrowObject)
215 ParquetArrowWriterProperties$create <- function(use_deprecated_int96_timestamps = FALSE,
216 coerce_timestamps = NULL,
217 allow_truncated_timestamps = FALSE,
218 ...) {
219 if (is.null(coerce_timestamps)) {
220 timestamp_unit <- -1L # null sentinel value
221 } else {
222 timestamp_unit <- make_valid_time_unit(
223 coerce_timestamps,
224 c("ms" = TimeUnit$MILLI, "us" = TimeUnit$MICRO)
225 )
226 }
227 parquet___ArrowWriterProperties___create(
228 use_deprecated_int96_timestamps = isTRUE(use_deprecated_int96_timestamps),
229 timestamp_unit = timestamp_unit,
230 allow_truncated_timestamps = isTRUE(allow_truncated_timestamps)
231 )
232 }
233
234 valid_parquet_version <- c(
235 "1.0" = ParquetVersionType$PARQUET_1_0,
236 "2.0" = ParquetVersionType$PARQUET_2_0
237 )
238
239 make_valid_version <- function(version, valid_versions = valid_parquet_version) {
240 if (is_integerish(version)) {
241 version <- as.character(version)
242 }
243 tryCatch(
244 valid_versions[[match.arg(version, choices = names(valid_versions))]],
245 error = function(cond) {
246 stop('"version" should be one of ', oxford_paste(names(valid_versions), "or"), call. = FALSE)
247 }
248 )
249 }
250
251 #' @title ParquetWriterProperties class
252 #' @rdname ParquetWriterProperties
253 #' @name ParquetWriterProperties
254 #' @docType class
255 #' @usage NULL
256 #' @format NULL
257 #' @description This class holds settings to control how a Parquet file is read
258 #' by [ParquetFileWriter].
259 #'
260 #' @section Factory:
261 #'
262 #' The `ParquetWriterProperties$create()` factory method instantiates the object
263 #' and takes the following arguments:
264 #'
265 #' - `table`: table to write (required)
266 #' - `version`: Parquet version, "1.0" or "2.0". Default "1.0"
267 #' - `compression`: Compression type, algorithm `"uncompressed"`
268 #' - `compression_level`: Compression level; meaning depends on compression algorithm
269 #' - `use_dictionary`: Specify if we should use dictionary encoding. Default `TRUE`
270 #' - `write_statistics`: Specify if we should write statistics. Default `TRUE`
271 #' - `data_page_size`: Set a target threshold for the approximate encoded
272 #' size of data pages within a column chunk (in bytes). Default 1 MiB.
273 #'
274 #' @details The parameters `compression`, `compression_level`, `use_dictionary`
275 #' and write_statistics` support various patterns:
276 #'
277 #' - The default `NULL` leaves the parameter unspecified, and the C++ library
278 #' uses an appropriate default for each column (defaults listed above)
279 #' - A single, unnamed, value (e.g. a single string for `compression`) applies to all columns
280 #' - An unnamed vector, of the same size as the number of columns, to specify a
281 #' value for each column, in positional order
282 #' - A named vector, to specify the value for the named columns, the default
283 #' value for the setting is used when not supplied
284 #'
285 #' Unlike the high-level [write_parquet], `ParquetWriterProperties` arguments
286 #' use the C++ defaults. Currently this means "uncompressed" rather than
287 #' "snappy" for the `compression` argument.
288 #'
289 #' @seealso [write_parquet]
290 #' @seealso [Schema] for information about schemas and metadata handling.
291 #'
292 #' @export
293 ParquetWriterProperties <- R6Class("ParquetWriterProperties", inherit = ArrowObject)
294 ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
295 inherit = ArrowObject,
296 public = list(
297 set_version = function(version) {
298 parquet___WriterProperties___Builder__version(self, make_valid_version(version))
299 },
300 set_compression = function(table, compression) {
301 compression <- compression_from_name(compression)
302 assert_that(is.integer(compression))
303 private$.set(
304 table, compression,
305 parquet___ArrowWriterProperties___Builder__set_compressions
306 )
307 },
308 set_compression_level = function(table, compression_level) {
309 # cast to integer but keep names
310 compression_level <- set_names(as.integer(compression_level), names(compression_level))
311 private$.set(
312 table, compression_level,
313 parquet___ArrowWriterProperties___Builder__set_compression_levels
314 )
315 },
316 set_dictionary = function(table, use_dictionary) {
317 assert_that(is.logical(use_dictionary))
318 private$.set(
319 table, use_dictionary,
320 parquet___ArrowWriterProperties___Builder__set_use_dictionary
321 )
322 },
323 set_write_statistics = function(table, write_statistics) {
324 assert_that(is.logical(write_statistics))
325 private$.set(
326 table, write_statistics,
327 parquet___ArrowWriterProperties___Builder__set_write_statistics
328 )
329 },
330 set_data_page_size = function(data_page_size) {
331 parquet___ArrowWriterProperties___Builder__data_page_size(self, data_page_size)
332 }
333 ),
334 private = list(
335 .set = function(table, value, FUN) {
336 msg <- paste0("unsupported ", substitute(value), "= specification")
337 column_names <- names(table)
338 given_names <- names(value)
339 if (is.null(given_names)) {
340 if (length(value) %in% c(1L, length(column_names))) {
341 # If there's a single, unnamed value, FUN will set it globally
342 # If there are values for all columns, send them along with the names
343 FUN(self, column_names, value)
344 } else {
345 abort(msg)
346 }
347 } else if (all(given_names %in% column_names)) {
348 # Use the given names
349 FUN(self, given_names, value)
350 } else {
351 abort(msg)
352 }
353 }
354 )
355 )
356
357 ParquetWriterProperties$create <- function(table,
358 version = NULL,
359 compression = default_parquet_compression(),
360 compression_level = NULL,
361 use_dictionary = NULL,
362 write_statistics = NULL,
363 data_page_size = NULL,
364 ...) {
365 builder <- parquet___WriterProperties___Builder__create()
366 if (!is.null(version)) {
367 builder$set_version(version)
368 }
369 if (!is.null(compression)) {
370 builder$set_compression(table, compression = compression)
371 }
372 if (!is.null(compression_level)) {
373 builder$set_compression_level(table, compression_level = compression_level)
374 }
375 if (!is.null(use_dictionary)) {
376 builder$set_dictionary(table, use_dictionary)
377 }
378 if (!is.null(write_statistics)) {
379 builder$set_write_statistics(table, write_statistics)
380 }
381 if (!is.null(data_page_size)) {
382 builder$set_data_page_size(data_page_size)
383 }
384 parquet___WriterProperties___Builder__build(builder)
385 }
386
387 #' @title ParquetFileWriter class
388 #' @rdname ParquetFileWriter
389 #' @name ParquetFileWriter
390 #' @docType class
391 #' @usage NULL
392 #' @format NULL
393 #' @description This class enables you to interact with Parquet files.
394 #'
395 #' @section Factory:
396 #'
397 #' The `ParquetFileWriter$create()` factory method instantiates the object and
398 #' takes the following arguments:
399 #'
400 #' - `schema` A [Schema]
401 #' - `sink` An [arrow::io::OutputStream][OutputStream]
402 #' - `properties` An instance of [ParquetWriterProperties]
403 #' - `arrow_properties` An instance of `ParquetArrowWriterProperties`
404 #'
405 #' @section Methods:
406 #'
407 #' - `WriteTable` Write a [Table] to `sink`
408 #' - `Close` Close the writer. Note: does not close the `sink`.
409 #' [arrow::io::OutputStream][OutputStream] has its own `close()` method.
410 #'
411 #' @export
412 #' @include arrow-package.R
413 ParquetFileWriter <- R6Class("ParquetFileWriter",
414 inherit = ArrowObject,
415 public = list(
416 WriteTable = function(table, chunk_size) {
417 parquet___arrow___FileWriter__WriteTable(self, table, chunk_size)
418 },
419 Close = function() parquet___arrow___FileWriter__Close(self)
420 )
421 )
422 ParquetFileWriter$create <- function(schema,
423 sink,
424 properties = ParquetWriterProperties$create(),
425 arrow_properties = ParquetArrowWriterProperties$create()) {
426 assert_is(sink, "OutputStream")
427 parquet___arrow___ParquetFileWriter__Open(schema, sink, properties, arrow_properties)
428 }
429
430
431 #' @title ParquetFileReader class
432 #' @rdname ParquetFileReader
433 #' @name ParquetFileReader
434 #' @docType class
435 #' @usage NULL
436 #' @format NULL
437 #' @description This class enables you to interact with Parquet files.
438 #'
439 #' @section Factory:
440 #'
441 #' The `ParquetFileReader$create()` factory method instantiates the object and
442 #' takes the following arguments:
443 #'
444 #' - `file` A character file name, raw vector, or Arrow file connection object
445 #' (e.g. `RandomAccessFile`).
446 #' - `props` Optional [ParquetArrowReaderProperties]
447 #' - `mmap` Logical: whether to memory-map the file (default `TRUE`)
448 #' - `...` Additional arguments, currently ignored
449 #'
450 #' @section Methods:
451 #'
452 #' - `$ReadTable(column_indices)`: get an `arrow::Table` from the file. The optional
453 #' `column_indices=` argument is a 0-based integer vector indicating which columns to retain.
454 #' - `$ReadRowGroup(i, column_indices)`: get an `arrow::Table` by reading the `i`th row group (0-based).
455 #' The optional `column_indices=` argument is a 0-based integer vector indicating which columns to retain.
456 #' - `$ReadRowGroups(row_groups, column_indices)`: get an `arrow::Table` by reading several row
457 #' groups (0-based integers).
458 #' The optional `column_indices=` argument is a 0-based integer vector indicating which columns to retain.
459 #' - `$GetSchema()`: get the `arrow::Schema` of the data in the file
460 #' - `$ReadColumn(i)`: read the `i`th column (0-based) as a [ChunkedArray].
461 #'
462 #' @section Active bindings:
463 #'
464 #' - `$num_rows`: number of rows.
465 #' - `$num_columns`: number of columns.
466 #' - `$num_row_groups`: number of row groups.
467 #'
468 #' @export
469 #' @examplesIf arrow_with_parquet()
470 #' f <- system.file("v0.7.1.parquet", package = "arrow")
471 #' pq <- ParquetFileReader$create(f)
472 #' pq$GetSchema()
473 #' if (codec_is_available("snappy")) {
474 #' # This file has compressed data columns
475 #' tab <- pq$ReadTable()
476 #' tab$schema
477 #' }
478 #' @include arrow-package.R
479 ParquetFileReader <- R6Class("ParquetFileReader",
480 inherit = ArrowObject,
481 active = list(
482 num_rows = function() {
483 as.integer(parquet___arrow___FileReader__num_rows(self))
484 },
485 num_columns = function() {
486 parquet___arrow___FileReader__num_columns(self)
487 },
488 num_row_groups = function() {
489 parquet___arrow___FileReader__num_row_groups(self)
490 }
491 ),
492 public = list(
493 ReadTable = function(column_indices = NULL) {
494 if (is.null(column_indices)) {
495 parquet___arrow___FileReader__ReadTable1(self)
496 } else {
497 column_indices <- vec_cast(column_indices, integer())
498 parquet___arrow___FileReader__ReadTable2(self, column_indices)
499 }
500 },
501 ReadRowGroup = function(i, column_indices = NULL) {
502 i <- vec_cast(i, integer())
503 if (is.null(column_indices)) {
504 parquet___arrow___FileReader__ReadRowGroup1(self, i)
505 } else {
506 column_indices <- vec_cast(column_indices, integer())
507 parquet___arrow___FileReader__ReadRowGroup2(self, i, column_indices)
508 }
509 },
510 ReadRowGroups = function(row_groups, column_indices = NULL) {
511 row_groups <- vec_cast(row_groups, integer())
512 if (is.null(column_indices)) {
513 parquet___arrow___FileReader__ReadRowGroups1(self, row_groups)
514 } else {
515 column_indices <- vec_cast(column_indices, integer())
516 parquet___arrow___FileReader__ReadRowGroups2(self, row_groups, column_indices)
517 }
518 },
519 ReadColumn = function(i) {
520 i <- vec_cast(i, integer())
521 parquet___arrow___FileReader__ReadColumn(self, i)
522 },
523 GetSchema = function() {
524 parquet___arrow___FileReader__GetSchema(self)
525 }
526 )
527 )
528
529 ParquetFileReader$create <- function(file,
530 props = ParquetArrowReaderProperties$create(),
531 mmap = TRUE,
532 ...) {
533 file <- make_readable_file(file, mmap)
534 assert_is(props, "ParquetArrowReaderProperties")
535
536 parquet___arrow___FileReader__OpenFile(file, props)
537 }
538
539 #' @title ParquetArrowReaderProperties class
540 #' @rdname ParquetArrowReaderProperties
541 #' @name ParquetArrowReaderProperties
542 #' @docType class
543 #' @usage NULL
544 #' @format NULL
545 #' @description This class holds settings to control how a Parquet file is read
546 #' by [ParquetFileReader].
547 #'
548 #' @section Factory:
549 #'
550 #' The `ParquetArrowReaderProperties$create()` factory method instantiates the object
551 #' and takes the following arguments:
552 #'
553 #' - `use_threads` Logical: whether to use multithreading (default `TRUE`)
554 #'
555 #' @section Methods:
556 #'
557 #' - `$read_dictionary(column_index)`
558 #' - `$set_read_dictionary(column_index, read_dict)`
559 #' - `$use_threads(use_threads)`
560 #'
561 #' @export
562 ParquetArrowReaderProperties <- R6Class("ParquetArrowReaderProperties",
563 inherit = ArrowObject,
564 public = list(
565 read_dictionary = function(column_index) {
566 parquet___arrow___ArrowReaderProperties__get_read_dictionary(self, column_index)
567 },
568 set_read_dictionary = function(column_index, read_dict) {
569 parquet___arrow___ArrowReaderProperties__set_read_dictionary(self, column_index, read_dict)
570 }
571 ),
572 active = list(
573 use_threads = function(use_threads) {
574 if (missing(use_threads)) {
575 parquet___arrow___ArrowReaderProperties__get_use_threads(self)
576 } else {
577 parquet___arrow___ArrowReaderProperties__set_use_threads(self, use_threads)
578 }
579 }
580 )
581 )
582
583 ParquetArrowReaderProperties$create <- function(use_threads = option_use_threads()) {
584 parquet___arrow___ArrowReaderProperties__Make(isTRUE(use_threads))
585 }