]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/r/R/query-engine.R
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / r / R / query-engine.R
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 do_exec_plan <- function(.data) {
19 plan <- ExecPlan$create()
20 final_node <- plan$Build(.data)
21 tab <- plan$Run(final_node)
22
23 # TODO (ARROW-14289): make the head/tail methods return RBR not Table
24 if (inherits(tab, "RecordBatchReader")) {
25 tab <- tab$read_table()
26 }
27
28 # If arrange() created $temp_columns, make sure to omit them from the result
29 # We can't currently handle this in the ExecPlan itself because sorting
30 # happens in the end (SinkNode) so nothing comes after it.
31 if (length(final_node$sort$temp_columns) > 0) {
32 tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE]
33 }
34
35 if (ncol(tab)) {
36 # Apply any column metadata from the original schema, where appropriate
37 original_schema <- source_data(.data)$schema
38 # TODO: do we care about other (non-R) metadata preservation?
39 # How would we know if it were meaningful?
40 r_meta <- original_schema$r_metadata
41 if (!is.null(r_meta)) {
42 # Filter r_metadata$columns on columns with name _and_ type match
43 new_schema <- tab$schema
44 common_names <- intersect(names(r_meta$columns), names(tab))
45 keep <- common_names[
46 map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]])
47 ]
48 r_meta$columns <- r_meta$columns[keep]
49 if (has_aggregation(.data)) {
50 # dplyr drops top-level attributes if you do summarize
51 r_meta$attributes <- NULL
52 }
53 tab$r_metadata <- r_meta
54 }
55 }
56
57 tab
58 }
59
60 ExecPlan <- R6Class("ExecPlan",
61 inherit = ArrowObject,
62 public = list(
63 Scan = function(dataset) {
64 # Handle arrow_dplyr_query
65 if (inherits(dataset, "arrow_dplyr_query")) {
66 if (inherits(dataset$.data, "RecordBatchReader")) {
67 return(ExecNode_ReadFromRecordBatchReader(self, dataset$.data))
68 }
69
70 filter <- dataset$filtered_rows
71 if (isTRUE(filter)) {
72 filter <- Expression$scalar(TRUE)
73 }
74 # Use FieldsInExpression to find all from dataset$selected_columns
75 colnames <- unique(unlist(map(
76 dataset$selected_columns,
77 field_names_in_expression
78 )))
79 dataset <- dataset$.data
80 assert_is(dataset, "Dataset")
81 } else {
82 if (inherits(dataset, "ArrowTabular")) {
83 dataset <- InMemoryDataset$create(dataset)
84 }
85 assert_is(dataset, "Dataset")
86 # Set some defaults
87 filter <- Expression$scalar(TRUE)
88 colnames <- names(dataset)
89 }
90 # ScanNode needs the filter to do predicate pushdown and skip partitions,
91 # and it needs to know which fields to materialize (and which are unnecessary)
92 ExecNode_Scan(self, dataset, filter, colnames %||% character(0))
93 },
94 Build = function(.data) {
95 # This method takes an arrow_dplyr_query and chains together the
96 # ExecNodes that they produce. It does not evaluate them--that is Run().
97 group_vars <- dplyr::group_vars(.data)
98 grouped <- length(group_vars) > 0
99
100 # Collect the target names first because we have to add back the group vars
101 target_names <- names(.data)
102 .data <- ensure_group_vars(.data)
103 .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns
104
105 if (inherits(.data$.data, "arrow_dplyr_query")) {
106 # We have a nested query. Recurse.
107 node <- self$Build(.data$.data)
108 } else {
109 node <- self$Scan(.data)
110 }
111
112 # ARROW-13498: Even though Scan takes the filter, apparently we have to do it again
113 if (inherits(.data$filtered_rows, "Expression")) {
114 node <- node$Filter(.data$filtered_rows)
115 }
116
117 if (!is.null(.data$aggregations)) {
118 # Project to include just the data required for each aggregation,
119 # plus group_by_vars (last)
120 # TODO: validate that none of names(aggregations) are the same as names(group_by_vars)
121 # dplyr does not error on this but the result it gives isn't great
122 node <- node$Project(summarize_projection(.data))
123
124 if (grouped) {
125 # We need to prefix all of the aggregation function names with "hash_"
126 .data$aggregations <- lapply(.data$aggregations, function(x) {
127 x[["fun"]] <- paste0("hash_", x[["fun"]])
128 x
129 })
130 }
131
132 node <- node$Aggregate(
133 options = map(.data$aggregations, ~ .[c("fun", "options")]),
134 target_names = names(.data$aggregations),
135 out_field_names = names(.data$aggregations),
136 key_names = group_vars
137 )
138
139 if (grouped) {
140 # The result will have result columns first then the grouping cols.
141 # dplyr orders group cols first, so adapt the result to meet that expectation.
142 node <- node$Project(
143 make_field_refs(c(group_vars, names(.data$aggregations)))
144 )
145 if (getOption("arrow.summarise.sort", FALSE)) {
146 # Add sorting instructions for the rows too to match dplyr
147 # (see below about why sorting isn't itself a Node)
148 node$sort <- list(
149 names = group_vars,
150 orders = rep(0L, length(group_vars))
151 )
152 }
153 }
154 } else {
155 # If any columns are derived, reordered, or renamed we need to Project
156 # If there are aggregations, the projection was already handled above
157 # We have to project at least once to eliminate some junk columns
158 # that the ExecPlan adds:
159 # __fragment_index, __batch_index, __last_in_fragment
160 # Presumably extraneous repeated projection of the same thing
161 # (as when we've done collapse() and not projected after) is cheap/no-op
162 projection <- c(.data$selected_columns, .data$temp_columns)
163 node <- node$Project(projection)
164
165 if (!is.null(.data$join)) {
166 node <- node$Join(
167 type = .data$join$type,
168 right_node = self$Build(.data$join$right_data),
169 by = .data$join$by,
170 left_output = names(.data),
171 right_output = setdiff(names(.data$join$right_data), .data$join$by)
172 )
173 }
174 }
175
176 # Apply sorting: this is currently not an ExecNode itself, it is a
177 # sink node option.
178 # TODO: handle some cases:
179 # (1) arrange > summarize > arrange
180 # (2) ARROW-13779: arrange then operation where order matters (e.g. cumsum)
181 if (length(.data$arrange_vars)) {
182 node$sort <- list(
183 names = names(.data$arrange_vars),
184 orders = .data$arrange_desc,
185 temp_columns = names(.data$temp_columns)
186 )
187 }
188
189 # This is only safe because we are going to evaluate queries that end
190 # with head/tail first, then evaluate any subsequent query as a new query
191 if (!is.null(.data$head)) {
192 node$head <- .data$head
193 }
194 if (!is.null(.data$tail)) {
195 node$tail <- .data$tail
196 }
197
198 node
199 },
200 Run = function(node) {
201 assert_is(node, "ExecNode")
202
203 # Sorting and head/tail (if sorted) are handled in the SinkNode,
204 # created in ExecPlan_run
205 sorting <- node$sort %||% list()
206 select_k <- node$head %||% -1L
207 has_sorting <- length(sorting) > 0
208 if (has_sorting) {
209 if (!is.null(node$tail)) {
210 # Reverse the sort order and take the top K, then after we'll reverse
211 # the resulting rows so that it is ordered as expected
212 sorting$orders <- !sorting$orders
213 select_k <- node$tail
214 }
215 sorting$orders <- as.integer(sorting$orders)
216 }
217
218 out <- ExecPlan_run(self, node, sorting, select_k)
219
220 if (!has_sorting) {
221 # Since ExecPlans don't scan in deterministic order, head/tail are both
222 # essentially taking a random slice from somewhere in the dataset.
223 # And since the head() implementation is way more efficient than tail(),
224 # just use it to take the random slice
225 slice_size <- node$head %||% node$tail
226 if (!is.null(slice_size)) {
227 # TODO (ARROW-14289): make the head methods return RBR not Table
228 out <- head(out, slice_size)
229 }
230 # Can we now tell `self$Stop()` to StopProducing? We already have
231 # everything we need for the head (but it seems to segfault: ARROW-14329)
232 } else if (!is.null(node$tail)) {
233 # Reverse the row order to get back what we expect
234 # TODO: don't return Table, return RecordBatchReader
235 out <- out$read_table()
236 out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
237 }
238
239 out
240 },
241 Stop = function() ExecPlan_StopProducing(self)
242 )
243 )
244 ExecPlan$create <- function(use_threads = option_use_threads()) {
245 ExecPlan_create(use_threads)
246 }
247
248 ExecNode <- R6Class("ExecNode",
249 inherit = ArrowObject,
250 public = list(
251 # `sort` is a slight hack to be able to keep around arrange() params,
252 # which don't currently yield their own ExecNode but rather are consumed
253 # in the SinkNode (in ExecPlan$run())
254 sort = NULL,
255 # Similar hacks for head and tail
256 head = NULL,
257 tail = NULL,
258 preserve_sort = function(new_node) {
259 new_node$sort <- self$sort
260 new_node$head <- self$head
261 new_node$tail <- self$tail
262 new_node
263 },
264 Project = function(cols) {
265 if (length(cols)) {
266 assert_is_list_of(cols, "Expression")
267 self$preserve_sort(ExecNode_Project(self, cols, names(cols)))
268 } else {
269 self$preserve_sort(ExecNode_Project(self, character(0), character(0)))
270 }
271 },
272 Filter = function(expr) {
273 assert_is(expr, "Expression")
274 self$preserve_sort(ExecNode_Filter(self, expr))
275 },
276 Aggregate = function(options, target_names, out_field_names, key_names) {
277 self$preserve_sort(
278 ExecNode_Aggregate(self, options, target_names, out_field_names, key_names)
279 )
280 },
281 Join = function(type, right_node, by, left_output, right_output) {
282 self$preserve_sort(
283 ExecNode_Join(
284 self,
285 type,
286 right_node,
287 left_keys = names(by),
288 right_keys = by,
289 left_output = left_output,
290 right_output = right_output
291 )
292 )
293 }
294 ),
295 active = list(
296 schema = function() ExecNode_output_schema(self)
297 )
298 )