1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
18 do_exec_plan <- function(.data) {
19 plan <- ExecPlan$create()
20 final_node <- plan$Build(.data)
21 tab <- plan$Run(final_node)
23 # TODO (ARROW-14289): make the head/tail methods return RBR not Table
24 if (inherits(tab, "RecordBatchReader")) {
25 tab <- tab$read_table()
28 # If arrange() created $temp_columns, make sure to omit them from the result
29 # We can't currently handle this in the ExecPlan itself because sorting
30 # happens in the end (SinkNode) so nothing comes after it.
31 if (length(final_node$sort$temp_columns) > 0) {
32 tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE]
36 # Apply any column metadata from the original schema, where appropriate
37 original_schema <- source_data(.data)$schema
38 # TODO: do we care about other (non-R) metadata preservation?
39 # How would we know if it were meaningful?
40 r_meta <- original_schema$r_metadata
41 if (!is.null(r_meta)) {
42 # Filter r_metadata$columns on columns with name _and_ type match
43 new_schema <- tab$schema
44 common_names <- intersect(names(r_meta$columns), names(tab))
46 map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]])
48 r_meta$columns <- r_meta$columns[keep]
49 if (has_aggregation(.data)) {
50 # dplyr drops top-level attributes if you do summarize
51 r_meta$attributes <- NULL
53 tab$r_metadata <- r_meta
60 ExecPlan <- R6Class("ExecPlan",
61 inherit = ArrowObject,
63 Scan = function(dataset) {
64 # Handle arrow_dplyr_query
65 if (inherits(dataset, "arrow_dplyr_query")) {
66 if (inherits(dataset$.data, "RecordBatchReader")) {
67 return(ExecNode_ReadFromRecordBatchReader(self, dataset$.data))
70 filter <- dataset$filtered_rows
72 filter <- Expression$scalar(TRUE)
74 # Use FieldsInExpression to find all from dataset$selected_columns
75 colnames <- unique(unlist(map(
76 dataset$selected_columns,
77 field_names_in_expression
79 dataset <- dataset$.data
80 assert_is(dataset, "Dataset")
82 if (inherits(dataset, "ArrowTabular")) {
83 dataset <- InMemoryDataset$create(dataset)
85 assert_is(dataset, "Dataset")
87 filter <- Expression$scalar(TRUE)
88 colnames <- names(dataset)
90 # ScanNode needs the filter to do predicate pushdown and skip partitions,
91 # and it needs to know which fields to materialize (and which are unnecessary)
92 ExecNode_Scan(self, dataset, filter, colnames %||% character(0))
94 Build = function(.data) {
95 # This method takes an arrow_dplyr_query and chains together the
96 # ExecNodes that they produce. It does not evaluate them--that is Run().
97 group_vars <- dplyr::group_vars(.data)
98 grouped <- length(group_vars) > 0
100 # Collect the target names first because we have to add back the group vars
101 target_names <- names(.data)
102 .data <- ensure_group_vars(.data)
103 .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns
105 if (inherits(.data$.data, "arrow_dplyr_query")) {
106 # We have a nested query. Recurse.
107 node <- self$Build(.data$.data)
109 node <- self$Scan(.data)
112 # ARROW-13498: Even though Scan takes the filter, apparently we have to do it again
113 if (inherits(.data$filtered_rows, "Expression")) {
114 node <- node$Filter(.data$filtered_rows)
117 if (!is.null(.data$aggregations)) {
118 # Project to include just the data required for each aggregation,
119 # plus group_by_vars (last)
120 # TODO: validate that none of names(aggregations) are the same as names(group_by_vars)
121 # dplyr does not error on this but the result it gives isn't great
122 node <- node$Project(summarize_projection(.data))
125 # We need to prefix all of the aggregation function names with "hash_"
126 .data$aggregations <- lapply(.data$aggregations, function(x) {
127 x[["fun"]] <- paste0("hash_", x[["fun"]])
132 node <- node$Aggregate(
133 options = map(.data$aggregations, ~ .[c("fun", "options")]),
134 target_names = names(.data$aggregations),
135 out_field_names = names(.data$aggregations),
136 key_names = group_vars
140 # The result will have result columns first then the grouping cols.
141 # dplyr orders group cols first, so adapt the result to meet that expectation.
142 node <- node$Project(
143 make_field_refs(c(group_vars, names(.data$aggregations)))
145 if (getOption("arrow.summarise.sort", FALSE)) {
146 # Add sorting instructions for the rows too to match dplyr
147 # (see below about why sorting isn't itself a Node)
150 orders = rep(0L, length(group_vars))
155 # If any columns are derived, reordered, or renamed we need to Project
156 # If there are aggregations, the projection was already handled above
157 # We have to project at least once to eliminate some junk columns
158 # that the ExecPlan adds:
159 # __fragment_index, __batch_index, __last_in_fragment
160 # Presumably extraneous repeated projection of the same thing
161 # (as when we've done collapse() and not projected after) is cheap/no-op
162 projection <- c(.data$selected_columns, .data$temp_columns)
163 node <- node$Project(projection)
165 if (!is.null(.data$join)) {
167 type = .data$join$type,
168 right_node = self$Build(.data$join$right_data),
170 left_output = names(.data),
171 right_output = setdiff(names(.data$join$right_data), .data$join$by)
176 # Apply sorting: this is currently not an ExecNode itself, it is a
178 # TODO: handle some cases:
179 # (1) arrange > summarize > arrange
180 # (2) ARROW-13779: arrange then operation where order matters (e.g. cumsum)
181 if (length(.data$arrange_vars)) {
183 names = names(.data$arrange_vars),
184 orders = .data$arrange_desc,
185 temp_columns = names(.data$temp_columns)
189 # This is only safe because we are going to evaluate queries that end
190 # with head/tail first, then evaluate any subsequent query as a new query
191 if (!is.null(.data$head)) {
192 node$head <- .data$head
194 if (!is.null(.data$tail)) {
195 node$tail <- .data$tail
200 Run = function(node) {
201 assert_is(node, "ExecNode")
203 # Sorting and head/tail (if sorted) are handled in the SinkNode,
204 # created in ExecPlan_run
205 sorting <- node$sort %||% list()
206 select_k <- node$head %||% -1L
207 has_sorting <- length(sorting) > 0
209 if (!is.null(node$tail)) {
210 # Reverse the sort order and take the top K, then after we'll reverse
211 # the resulting rows so that it is ordered as expected
212 sorting$orders <- !sorting$orders
213 select_k <- node$tail
215 sorting$orders <- as.integer(sorting$orders)
218 out <- ExecPlan_run(self, node, sorting, select_k)
221 # Since ExecPlans don't scan in deterministic order, head/tail are both
222 # essentially taking a random slice from somewhere in the dataset.
223 # And since the head() implementation is way more efficient than tail(),
224 # just use it to take the random slice
225 slice_size <- node$head %||% node$tail
226 if (!is.null(slice_size)) {
227 # TODO (ARROW-14289): make the head methods return RBR not Table
228 out <- head(out, slice_size)
230 # Can we now tell `self$Stop()` to StopProducing? We already have
231 # everything we need for the head (but it seems to segfault: ARROW-14329)
232 } else if (!is.null(node$tail)) {
233 # Reverse the row order to get back what we expect
234 # TODO: don't return Table, return RecordBatchReader
235 out <- out$read_table()
236 out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
241 Stop = function() ExecPlan_StopProducing(self)
244 ExecPlan$create <- function(use_threads = option_use_threads()) {
245 ExecPlan_create(use_threads)
248 ExecNode <- R6Class("ExecNode",
249 inherit = ArrowObject,
251 # `sort` is a slight hack to be able to keep around arrange() params,
252 # which don't currently yield their own ExecNode but rather are consumed
253 # in the SinkNode (in ExecPlan$run())
255 # Similar hacks for head and tail
258 preserve_sort = function(new_node) {
259 new_node$sort <- self$sort
260 new_node$head <- self$head
261 new_node$tail <- self$tail
264 Project = function(cols) {
266 assert_is_list_of(cols, "Expression")
267 self$preserve_sort(ExecNode_Project(self, cols, names(cols)))
269 self$preserve_sort(ExecNode_Project(self, character(0), character(0)))
272 Filter = function(expr) {
273 assert_is(expr, "Expression")
274 self$preserve_sort(ExecNode_Filter(self, expr))
276 Aggregate = function(options, target_names, out_field_names, key_names) {
278 ExecNode_Aggregate(self, options, target_names, out_field_names, key_names)
281 Join = function(type, right_node, by, left_output, right_output) {
287 left_keys = names(by),
289 left_output = left_output,
290 right_output = right_output
296 schema = function() ExecNode_output_schema(self)