# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. #' Read a Parquet file #' #' '[Parquet](https://parquet.apache.org/)' is a columnar storage file format. #' This function enables you to read Parquet files into R. #' #' @inheritParams read_feather #' @param props [ParquetArrowReaderProperties] #' @param ... Additional arguments passed to `ParquetFileReader$create()` #' #' @return A [arrow::Table][Table], or a `data.frame` if `as_data_frame` is #' `TRUE` (the default). #' @examplesIf arrow_with_parquet() #' tf <- tempfile() #' on.exit(unlink(tf)) #' write_parquet(mtcars, tf) #' df <- read_parquet(tf, col_select = starts_with("d")) #' head(df) #' @export read_parquet <- function(file, col_select = NULL, as_data_frame = TRUE, props = ParquetArrowReaderProperties$create(), ...) { if (is.string(file)) { file <- make_readable_file(file) on.exit(file$close()) } reader <- ParquetFileReader$create(file, props = props, ...) col_select <- enquo(col_select) if (!quo_is_null(col_select)) { # infer which columns to keep from schema schema <- reader$GetSchema() names <- names(schema) indices <- match(vars_select(names, !!col_select), names) - 1L tab <- tryCatch( reader$ReadTable(indices), error = read_compressed_error ) } else { # read all columns tab <- tryCatch( reader$ReadTable(), error = read_compressed_error ) } if (as_data_frame) { tab <- as.data.frame(tab) } tab } #' Write Parquet file to disk #' #' [Parquet](https://parquet.apache.org/) is a columnar storage file format. #' This function enables you to write Parquet files from R. #' #' Due to features of the format, Parquet files cannot be appended to. #' If you want to use the Parquet format but also want the ability to extend #' your dataset, you can write to additional Parquet files and then treat #' the whole directory of files as a [Dataset] you can query. #' See `vignette("dataset", package = "arrow")` for examples of this. #' #' @param x `data.frame`, [RecordBatch], or [Table] #' @param sink A string file path, URI, or [OutputStream], or path in a file #' system (`SubTreeFileSystem`) #' @param chunk_size chunk size in number of rows. If NULL, the total number of rows is used. #' @param version parquet version, "1.0" or "2.0". Default "1.0". Numeric values #' are coerced to character. #' @param compression compression algorithm. Default "snappy". See details. #' @param compression_level compression level. Meaning depends on compression algorithm #' @param use_dictionary Specify if we should use dictionary encoding. Default `TRUE` #' @param write_statistics Specify if we should write statistics. Default `TRUE` #' @param data_page_size Set a target threshold for the approximate encoded #' size of data pages within a column chunk (in bytes). Default 1 MiB. #' @param use_deprecated_int96_timestamps Write timestamps to INT96 Parquet format. Default `FALSE`. #' @param coerce_timestamps Cast timestamps a particular resolution. Can be #' `NULL`, "ms" or "us". Default `NULL` (no casting) #' @param allow_truncated_timestamps Allow loss of data when coercing timestamps to a #' particular resolution. E.g. if microsecond or nanosecond data is lost when coercing #' to "ms", do not raise an exception #' @param properties A `ParquetWriterProperties` object, used instead of the options #' enumerated in this function's signature. Providing `properties` as an argument #' is deprecated; if you need to assemble `ParquetWriterProperties` outside #' of `write_parquet()`, use `ParquetFileWriter` instead. #' @param arrow_properties A `ParquetArrowWriterProperties` object. Like #' `properties`, this argument is deprecated. #' #' @details The parameters `compression`, `compression_level`, `use_dictionary` and #' `write_statistics` support various patterns: #' #' - The default `NULL` leaves the parameter unspecified, and the C++ library #' uses an appropriate default for each column (defaults listed above) #' - A single, unnamed, value (e.g. a single string for `compression`) applies to all columns #' - An unnamed vector, of the same size as the number of columns, to specify a #' value for each column, in positional order #' - A named vector, to specify the value for the named columns, the default #' value for the setting is used when not supplied #' #' The `compression` argument can be any of the following (case insensitive): #' "uncompressed", "snappy", "gzip", "brotli", "zstd", "lz4", "lzo" or "bz2". #' Only "uncompressed" is guaranteed to be available, but "snappy" and "gzip" #' are almost always included. See [codec_is_available()]. #' The default "snappy" is used if available, otherwise "uncompressed". To #' disable compression, set `compression = "uncompressed"`. #' Note that "uncompressed" columns may still have dictionary encoding. #' #' @return the input `x` invisibly. #' #' @examplesIf arrow_with_parquet() #' tf1 <- tempfile(fileext = ".parquet") #' write_parquet(data.frame(x = 1:5), tf1) #' #' # using compression #' if (codec_is_available("gzip")) { #' tf2 <- tempfile(fileext = ".gz.parquet") #' write_parquet(data.frame(x = 1:5), tf2, compression = "gzip", compression_level = 5) #' } #' @export write_parquet <- function(x, sink, chunk_size = NULL, # writer properties version = NULL, compression = default_parquet_compression(), compression_level = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL, # arrow writer properties use_deprecated_int96_timestamps = FALSE, coerce_timestamps = NULL, allow_truncated_timestamps = FALSE, properties = NULL, arrow_properties = NULL) { x_out <- x if (is.data.frame(x) || inherits(x, "RecordBatch")) { x <- Table$create(x) } assert_that(is_writable_table(x)) if (!inherits(sink, "OutputStream")) { sink <- make_output_stream(sink) on.exit(sink$close()) } # Deprecation warnings if (!is.null(properties)) { warning( "Providing 'properties' is deprecated. If you need to assemble properties outside ", "this function, use ParquetFileWriter instead." ) } if (!is.null(arrow_properties)) { warning( "Providing 'arrow_properties' is deprecated. If you need to assemble arrow_properties ", "outside this function, use ParquetFileWriter instead." ) } writer <- ParquetFileWriter$create( x$schema, sink, properties = properties %||% ParquetWriterProperties$create( x, version = version, compression = compression, compression_level = compression_level, use_dictionary = use_dictionary, write_statistics = write_statistics, data_page_size = data_page_size ), arrow_properties = arrow_properties %||% ParquetArrowWriterProperties$create( use_deprecated_int96_timestamps = use_deprecated_int96_timestamps, coerce_timestamps = coerce_timestamps, allow_truncated_timestamps = allow_truncated_timestamps ) ) writer$WriteTable(x, chunk_size = chunk_size %||% x$num_rows) writer$Close() invisible(x_out) } default_parquet_compression <- function() { # Match the pyarrow default (overriding the C++ default) if (codec_is_available("snappy")) { "snappy" } else { NULL } } ParquetArrowWriterProperties <- R6Class("ParquetArrowWriterProperties", inherit = ArrowObject) ParquetArrowWriterProperties$create <- function(use_deprecated_int96_timestamps = FALSE, coerce_timestamps = NULL, allow_truncated_timestamps = FALSE, ...) { if (is.null(coerce_timestamps)) { timestamp_unit <- -1L # null sentinel value } else { timestamp_unit <- make_valid_time_unit( coerce_timestamps, c("ms" = TimeUnit$MILLI, "us" = TimeUnit$MICRO) ) } parquet___ArrowWriterProperties___create( use_deprecated_int96_timestamps = isTRUE(use_deprecated_int96_timestamps), timestamp_unit = timestamp_unit, allow_truncated_timestamps = isTRUE(allow_truncated_timestamps) ) } valid_parquet_version <- c( "1.0" = ParquetVersionType$PARQUET_1_0, "2.0" = ParquetVersionType$PARQUET_2_0 ) make_valid_version <- function(version, valid_versions = valid_parquet_version) { if (is_integerish(version)) { version <- as.character(version) } tryCatch( valid_versions[[match.arg(version, choices = names(valid_versions))]], error = function(cond) { stop('"version" should be one of ', oxford_paste(names(valid_versions), "or"), call. = FALSE) } ) } #' @title ParquetWriterProperties class #' @rdname ParquetWriterProperties #' @name ParquetWriterProperties #' @docType class #' @usage NULL #' @format NULL #' @description This class holds settings to control how a Parquet file is read #' by [ParquetFileWriter]. #' #' @section Factory: #' #' The `ParquetWriterProperties$create()` factory method instantiates the object #' and takes the following arguments: #' #' - `table`: table to write (required) #' - `version`: Parquet version, "1.0" or "2.0". Default "1.0" #' - `compression`: Compression type, algorithm `"uncompressed"` #' - `compression_level`: Compression level; meaning depends on compression algorithm #' - `use_dictionary`: Specify if we should use dictionary encoding. Default `TRUE` #' - `write_statistics`: Specify if we should write statistics. Default `TRUE` #' - `data_page_size`: Set a target threshold for the approximate encoded #' size of data pages within a column chunk (in bytes). Default 1 MiB. #' #' @details The parameters `compression`, `compression_level`, `use_dictionary` #' and write_statistics` support various patterns: #' #' - The default `NULL` leaves the parameter unspecified, and the C++ library #' uses an appropriate default for each column (defaults listed above) #' - A single, unnamed, value (e.g. a single string for `compression`) applies to all columns #' - An unnamed vector, of the same size as the number of columns, to specify a #' value for each column, in positional order #' - A named vector, to specify the value for the named columns, the default #' value for the setting is used when not supplied #' #' Unlike the high-level [write_parquet], `ParquetWriterProperties` arguments #' use the C++ defaults. Currently this means "uncompressed" rather than #' "snappy" for the `compression` argument. #' #' @seealso [write_parquet] #' @seealso [Schema] for information about schemas and metadata handling. #' #' @export ParquetWriterProperties <- R6Class("ParquetWriterProperties", inherit = ArrowObject) ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", inherit = ArrowObject, public = list( set_version = function(version) { parquet___WriterProperties___Builder__version(self, make_valid_version(version)) }, set_compression = function(table, compression) { compression <- compression_from_name(compression) assert_that(is.integer(compression)) private$.set( table, compression, parquet___ArrowWriterProperties___Builder__set_compressions ) }, set_compression_level = function(table, compression_level) { # cast to integer but keep names compression_level <- set_names(as.integer(compression_level), names(compression_level)) private$.set( table, compression_level, parquet___ArrowWriterProperties___Builder__set_compression_levels ) }, set_dictionary = function(table, use_dictionary) { assert_that(is.logical(use_dictionary)) private$.set( table, use_dictionary, parquet___ArrowWriterProperties___Builder__set_use_dictionary ) }, set_write_statistics = function(table, write_statistics) { assert_that(is.logical(write_statistics)) private$.set( table, write_statistics, parquet___ArrowWriterProperties___Builder__set_write_statistics ) }, set_data_page_size = function(data_page_size) { parquet___ArrowWriterProperties___Builder__data_page_size(self, data_page_size) } ), private = list( .set = function(table, value, FUN) { msg <- paste0("unsupported ", substitute(value), "= specification") column_names <- names(table) given_names <- names(value) if (is.null(given_names)) { if (length(value) %in% c(1L, length(column_names))) { # If there's a single, unnamed value, FUN will set it globally # If there are values for all columns, send them along with the names FUN(self, column_names, value) } else { abort(msg) } } else if (all(given_names %in% column_names)) { # Use the given names FUN(self, given_names, value) } else { abort(msg) } } ) ) ParquetWriterProperties$create <- function(table, version = NULL, compression = default_parquet_compression(), compression_level = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL, ...) { builder <- parquet___WriterProperties___Builder__create() if (!is.null(version)) { builder$set_version(version) } if (!is.null(compression)) { builder$set_compression(table, compression = compression) } if (!is.null(compression_level)) { builder$set_compression_level(table, compression_level = compression_level) } if (!is.null(use_dictionary)) { builder$set_dictionary(table, use_dictionary) } if (!is.null(write_statistics)) { builder$set_write_statistics(table, write_statistics) } if (!is.null(data_page_size)) { builder$set_data_page_size(data_page_size) } parquet___WriterProperties___Builder__build(builder) } #' @title ParquetFileWriter class #' @rdname ParquetFileWriter #' @name ParquetFileWriter #' @docType class #' @usage NULL #' @format NULL #' @description This class enables you to interact with Parquet files. #' #' @section Factory: #' #' The `ParquetFileWriter$create()` factory method instantiates the object and #' takes the following arguments: #' #' - `schema` A [Schema] #' - `sink` An [arrow::io::OutputStream][OutputStream] #' - `properties` An instance of [ParquetWriterProperties] #' - `arrow_properties` An instance of `ParquetArrowWriterProperties` #' #' @section Methods: #' #' - `WriteTable` Write a [Table] to `sink` #' - `Close` Close the writer. Note: does not close the `sink`. #' [arrow::io::OutputStream][OutputStream] has its own `close()` method. #' #' @export #' @include arrow-package.R ParquetFileWriter <- R6Class("ParquetFileWriter", inherit = ArrowObject, public = list( WriteTable = function(table, chunk_size) { parquet___arrow___FileWriter__WriteTable(self, table, chunk_size) }, Close = function() parquet___arrow___FileWriter__Close(self) ) ) ParquetFileWriter$create <- function(schema, sink, properties = ParquetWriterProperties$create(), arrow_properties = ParquetArrowWriterProperties$create()) { assert_is(sink, "OutputStream") parquet___arrow___ParquetFileWriter__Open(schema, sink, properties, arrow_properties) } #' @title ParquetFileReader class #' @rdname ParquetFileReader #' @name ParquetFileReader #' @docType class #' @usage NULL #' @format NULL #' @description This class enables you to interact with Parquet files. #' #' @section Factory: #' #' The `ParquetFileReader$create()` factory method instantiates the object and #' takes the following arguments: #' #' - `file` A character file name, raw vector, or Arrow file connection object #' (e.g. `RandomAccessFile`). #' - `props` Optional [ParquetArrowReaderProperties] #' - `mmap` Logical: whether to memory-map the file (default `TRUE`) #' - `...` Additional arguments, currently ignored #' #' @section Methods: #' #' - `$ReadTable(column_indices)`: get an `arrow::Table` from the file. The optional #' `column_indices=` argument is a 0-based integer vector indicating which columns to retain. #' - `$ReadRowGroup(i, column_indices)`: get an `arrow::Table` by reading the `i`th row group (0-based). #' The optional `column_indices=` argument is a 0-based integer vector indicating which columns to retain. #' - `$ReadRowGroups(row_groups, column_indices)`: get an `arrow::Table` by reading several row #' groups (0-based integers). #' The optional `column_indices=` argument is a 0-based integer vector indicating which columns to retain. #' - `$GetSchema()`: get the `arrow::Schema` of the data in the file #' - `$ReadColumn(i)`: read the `i`th column (0-based) as a [ChunkedArray]. #' #' @section Active bindings: #' #' - `$num_rows`: number of rows. #' - `$num_columns`: number of columns. #' - `$num_row_groups`: number of row groups. #' #' @export #' @examplesIf arrow_with_parquet() #' f <- system.file("v0.7.1.parquet", package = "arrow") #' pq <- ParquetFileReader$create(f) #' pq$GetSchema() #' if (codec_is_available("snappy")) { #' # This file has compressed data columns #' tab <- pq$ReadTable() #' tab$schema #' } #' @include arrow-package.R ParquetFileReader <- R6Class("ParquetFileReader", inherit = ArrowObject, active = list( num_rows = function() { as.integer(parquet___arrow___FileReader__num_rows(self)) }, num_columns = function() { parquet___arrow___FileReader__num_columns(self) }, num_row_groups = function() { parquet___arrow___FileReader__num_row_groups(self) } ), public = list( ReadTable = function(column_indices = NULL) { if (is.null(column_indices)) { parquet___arrow___FileReader__ReadTable1(self) } else { column_indices <- vec_cast(column_indices, integer()) parquet___arrow___FileReader__ReadTable2(self, column_indices) } }, ReadRowGroup = function(i, column_indices = NULL) { i <- vec_cast(i, integer()) if (is.null(column_indices)) { parquet___arrow___FileReader__ReadRowGroup1(self, i) } else { column_indices <- vec_cast(column_indices, integer()) parquet___arrow___FileReader__ReadRowGroup2(self, i, column_indices) } }, ReadRowGroups = function(row_groups, column_indices = NULL) { row_groups <- vec_cast(row_groups, integer()) if (is.null(column_indices)) { parquet___arrow___FileReader__ReadRowGroups1(self, row_groups) } else { column_indices <- vec_cast(column_indices, integer()) parquet___arrow___FileReader__ReadRowGroups2(self, row_groups, column_indices) } }, ReadColumn = function(i) { i <- vec_cast(i, integer()) parquet___arrow___FileReader__ReadColumn(self, i) }, GetSchema = function() { parquet___arrow___FileReader__GetSchema(self) } ) ) ParquetFileReader$create <- function(file, props = ParquetArrowReaderProperties$create(), mmap = TRUE, ...) { file <- make_readable_file(file, mmap) assert_is(props, "ParquetArrowReaderProperties") parquet___arrow___FileReader__OpenFile(file, props) } #' @title ParquetArrowReaderProperties class #' @rdname ParquetArrowReaderProperties #' @name ParquetArrowReaderProperties #' @docType class #' @usage NULL #' @format NULL #' @description This class holds settings to control how a Parquet file is read #' by [ParquetFileReader]. #' #' @section Factory: #' #' The `ParquetArrowReaderProperties$create()` factory method instantiates the object #' and takes the following arguments: #' #' - `use_threads` Logical: whether to use multithreading (default `TRUE`) #' #' @section Methods: #' #' - `$read_dictionary(column_index)` #' - `$set_read_dictionary(column_index, read_dict)` #' - `$use_threads(use_threads)` #' #' @export ParquetArrowReaderProperties <- R6Class("ParquetArrowReaderProperties", inherit = ArrowObject, public = list( read_dictionary = function(column_index) { parquet___arrow___ArrowReaderProperties__get_read_dictionary(self, column_index) }, set_read_dictionary = function(column_index, read_dict) { parquet___arrow___ArrowReaderProperties__set_read_dictionary(self, column_index, read_dict) } ), active = list( use_threads = function(use_threads) { if (missing(use_threads)) { parquet___arrow___ArrowReaderProperties__get_use_threads(self) } else { parquet___arrow___ArrowReaderProperties__set_use_threads(self, use_threads) } } ) ) ParquetArrowReaderProperties$create <- function(use_threads = option_use_threads()) { parquet___arrow___ArrowReaderProperties__Make(isTRUE(use_threads)) }