]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "rgw_s3select_private.h" | |
5 | ||
6 | namespace rgw::s3select { | |
7 | RGWOp* create_s3select_op() | |
8 | { | |
9 | return new RGWSelectObj_ObjStore_S3(); | |
10 | } | |
11 | }; | |
12 | ||
13 | using namespace s3selectEngine; | |
14 | ||
15 | std::string& aws_response_handler::get_sql_result() | |
16 | { | |
17 | return sql_result; | |
18 | } | |
19 | ||
20 | uint64_t aws_response_handler::get_processed_size() | |
21 | { | |
22 | return processed_size; | |
23 | } | |
24 | ||
25 | void aws_response_handler::update_processed_size(uint64_t value) | |
26 | { | |
27 | processed_size += value; | |
28 | } | |
29 | ||
30 | uint64_t aws_response_handler::get_total_bytes_returned() | |
31 | { | |
32 | return total_bytes_returned; | |
33 | } | |
34 | ||
35 | void aws_response_handler::update_total_bytes_returned(uint64_t value) | |
36 | { | |
37 | total_bytes_returned += value; | |
38 | } | |
39 | ||
40 | void aws_response_handler::push_header(const char* header_name, const char* header_value) | |
41 | { | |
42 | char x; | |
43 | short s; | |
44 | x = char(strlen(header_name)); | |
45 | m_buff_header.append(&x, sizeof(x)); | |
46 | m_buff_header.append(header_name); | |
47 | x = char(7); | |
48 | m_buff_header.append(&x, sizeof(x)); | |
49 | s = htons(uint16_t(strlen(header_value))); | |
50 | m_buff_header.append(reinterpret_cast<char*>(&s), sizeof(s)); | |
51 | m_buff_header.append(header_value); | |
52 | } | |
53 | ||
54 | #define IDX( x ) static_cast<int>( x ) | |
55 | ||
56 | int aws_response_handler::create_header_records() | |
57 | { | |
58 | //headers description(AWS) | |
59 | //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] | |
60 | //1 | |
61 | push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::RECORDS)]); | |
62 | //2 | |
63 | push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]); | |
64 | //3 | |
65 | push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); | |
66 | return m_buff_header.size(); | |
67 | } | |
68 | ||
69 | int aws_response_handler::create_header_continuation() | |
70 | { | |
71 | //headers description(AWS) | |
72 | //1 | |
73 | push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]); | |
74 | //2 | |
75 | push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); | |
76 | return m_buff_header.size(); | |
77 | } | |
78 | ||
79 | int aws_response_handler::create_header_progress() | |
80 | { | |
81 | //headers description(AWS) | |
82 | //1 | |
83 | push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::PROGRESS)]); | |
84 | //2 | |
85 | push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]); | |
86 | //3 | |
87 | push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); | |
88 | return m_buff_header.size(); | |
89 | } | |
90 | ||
91 | int aws_response_handler::create_header_stats() | |
92 | { | |
93 | //headers description(AWS) | |
94 | //1 | |
95 | push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::STATS)]); | |
96 | //2 | |
97 | push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]); | |
98 | //3 | |
99 | push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); | |
100 | return m_buff_header.size(); | |
101 | } | |
102 | ||
103 | int aws_response_handler::create_header_end() | |
104 | { | |
105 | //headers description(AWS) | |
106 | //1 | |
107 | push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]); | |
108 | //2 | |
109 | push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); | |
110 | return m_buff_header.size(); | |
111 | } | |
112 | ||
113 | int aws_response_handler::create_error_header_records(const char* error_message) | |
114 | { | |
115 | //headers description(AWS) | |
116 | //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] | |
117 | //1 | |
118 | push_header(header_name_str[IDX(header_name_En::ERROR_CODE)], header_value_str[IDX(header_value_En::ENGINE_ERROR)]); | |
119 | //2 | |
120 | push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message); | |
121 | //3 | |
122 | push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]); | |
123 | return m_buff_header.size(); | |
124 | } | |
125 | ||
126 | int aws_response_handler::create_message(u_int32_t header_len) | |
127 | { | |
128 | //message description(AWS): | |
129 | //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4] | |
130 | //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC | |
131 | //are created later to the produced SQL result, and actually wrapping the payload. | |
132 | auto push_encode_int = [&](u_int32_t s, int pos) { | |
133 | u_int32_t x = htonl(s); | |
134 | sql_result.replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x)); | |
135 | }; | |
136 | u_int32_t total_byte_len = 0; | |
137 | u_int32_t preload_crc = 0; | |
138 | u_int32_t message_crc = 0; | |
139 | total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size | |
140 | push_encode_int(total_byte_len, 0); | |
141 | push_encode_int(header_len, 4); | |
142 | crc32.reset(); | |
143 | crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes | |
144 | preload_crc = crc32(); | |
145 | push_encode_int(preload_crc, 8); | |
146 | crc32.reset(); | |
147 | crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum | |
148 | message_crc = crc32(); | |
149 | u_int32_t x = htonl(message_crc); | |
150 | sql_result.append(reinterpret_cast<char*>(&x), sizeof(x)); | |
151 | return sql_result.size(); | |
152 | } | |
153 | ||
154 | void aws_response_handler::init_response() | |
155 | { | |
156 | //12 positions for header-crc | |
157 | sql_result.resize(header_crc_size, '\0'); | |
158 | } | |
159 | ||
160 | void aws_response_handler::init_success_response() | |
161 | { | |
162 | m_buff_header.clear(); | |
163 | header_size = create_header_records(); | |
164 | sql_result.append(m_buff_header.c_str(), header_size); | |
165 | sql_result.append(PAYLOAD_LINE); | |
166 | } | |
167 | ||
168 | void aws_response_handler::send_continuation_response() | |
169 | { | |
170 | sql_result.resize(header_crc_size, '\0'); | |
171 | m_buff_header.clear(); | |
172 | header_size = create_header_continuation(); | |
173 | sql_result.append(m_buff_header.c_str(), header_size); | |
174 | int buff_len = create_message(header_size); | |
175 | s->formatter->write_bin_data(sql_result.data(), buff_len); | |
176 | rgw_flush_formatter_and_reset(s, s->formatter); | |
177 | } | |
178 | ||
179 | void aws_response_handler::init_progress_response() | |
180 | { | |
181 | sql_result.resize(header_crc_size, '\0'); | |
182 | m_buff_header.clear(); | |
183 | header_size = create_header_progress(); | |
184 | sql_result.append(m_buff_header.c_str(), header_size); | |
185 | } | |
186 | ||
187 | void aws_response_handler::init_stats_response() | |
188 | { | |
189 | sql_result.resize(header_crc_size, '\0'); | |
190 | m_buff_header.clear(); | |
191 | header_size = create_header_stats(); | |
192 | sql_result.append(m_buff_header.c_str(), header_size); | |
193 | } | |
194 | ||
195 | void aws_response_handler::init_end_response() | |
196 | { | |
197 | sql_result.resize(header_crc_size, '\0'); | |
198 | m_buff_header.clear(); | |
199 | header_size = create_header_end(); | |
200 | sql_result.append(m_buff_header.c_str(), header_size); | |
201 | int buff_len = create_message(header_size); | |
202 | s->formatter->write_bin_data(sql_result.data(), buff_len); | |
203 | rgw_flush_formatter_and_reset(s, s->formatter); | |
204 | } | |
205 | ||
206 | void aws_response_handler::init_error_response(const char* error_message) | |
207 | { | |
208 | //currently not in use. the headers in the case of error, are not extracted by AWS-cli. | |
209 | m_buff_header.clear(); | |
210 | header_size = create_error_header_records(error_message); | |
211 | sql_result.append(m_buff_header.c_str(), header_size); | |
212 | } | |
213 | ||
214 | void aws_response_handler::send_success_response() | |
215 | { | |
216 | sql_result.append(END_PAYLOAD_LINE); | |
217 | int buff_len = create_message(header_size); | |
218 | s->formatter->write_bin_data(sql_result.data(), buff_len); | |
219 | rgw_flush_formatter_and_reset(s, s->formatter); | |
220 | } | |
221 | ||
222 | void aws_response_handler::send_error_response(const char* error_code, | |
223 | const char* error_message, | |
224 | const char* resource_id) | |
225 | { | |
226 | set_req_state_err(s, 0); | |
227 | dump_errno(s, 400); | |
228 | end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING); | |
229 | dump_start(s); | |
230 | s->formatter->open_object_section("Error"); | |
231 | s->formatter->dump_string("Code", error_code); | |
232 | s->formatter->dump_string("Message", error_message); | |
233 | s->formatter->dump_string("Resource", "#Resource#"); | |
234 | s->formatter->dump_string("RequestId", resource_id); | |
235 | s->formatter->close_section(); | |
236 | rgw_flush_formatter_and_reset(s, s->formatter); | |
237 | } | |
238 | ||
239 | void aws_response_handler::send_progress_response() | |
240 | { | |
241 | std::string progress_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Progress>" | |
242 | , get_processed_size(), get_processed_size(), get_total_bytes_returned()); | |
243 | sql_result.append(progress_payload); | |
244 | int buff_len = create_message(header_size); | |
245 | s->formatter->write_bin_data(sql_result.data(), buff_len); | |
246 | rgw_flush_formatter_and_reset(s, s->formatter); | |
247 | } | |
248 | ||
249 | void aws_response_handler::send_stats_response() | |
250 | { | |
251 | std::string stats_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Stats>" | |
252 | , get_processed_size(), get_processed_size(), get_total_bytes_returned()); | |
253 | sql_result.append(stats_payload); | |
254 | int buff_len = create_message(header_size); | |
255 | s->formatter->write_bin_data(sql_result.data(), buff_len); | |
256 | rgw_flush_formatter_and_reset(s, s->formatter); | |
257 | } | |
258 | ||
259 | RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): | |
260 | m_buff_header(std::make_unique<char[]>(1000)), | |
261 | m_parquet_type(false), | |
262 | chunk_number(0) | |
263 | { | |
264 | set_get_data(true); | |
265 | fp_get_obj_size = [&]() { | |
266 | return get_obj_size(); | |
267 | }; | |
268 | fp_range_req = [&](int64_t start, int64_t len, void* buff, optional_yield* y) { | |
269 | ldout(s->cct, 10) << "S3select: range-request start: " << start << " length: " << len << dendl; | |
270 | auto status = range_request(start, len, buff, *y); | |
271 | return status; | |
272 | }; | |
273 | #ifdef _ARROW_EXIST | |
274 | m_rgw_api.set_get_size_api(fp_get_obj_size); | |
275 | m_rgw_api.set_range_req_api(fp_range_req); | |
276 | #endif | |
277 | fp_result_header_format = [this](std::string& result) { | |
278 | m_aws_response_handler.init_response(); | |
279 | m_aws_response_handler.init_success_response(); | |
280 | return 0; | |
281 | }; | |
282 | fp_s3select_result_format = [this](std::string& result) { | |
283 | m_aws_response_handler.send_success_response(); | |
284 | return 0; | |
285 | }; | |
286 | } | |
287 | ||
288 | RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() | |
289 | {} | |
290 | ||
291 | int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) | |
292 | { | |
293 | if(m_s3select_query.empty() == false) { | |
294 | return 0; | |
295 | } | |
296 | if(s->object->get_name().find(".parquet") != std::string::npos) { //aws cli is missing the parquet | |
297 | #ifdef _ARROW_EXIST | |
298 | m_parquet_type = true; | |
299 | #else | |
300 | ldpp_dout(this, 10) << "arrow library is not installed" << dendl; | |
301 | #endif | |
302 | } | |
303 | //retrieve s3-select query from payload | |
304 | bufferlist data; | |
305 | int ret; | |
306 | int max_size = 4096; | |
307 | std::tie(ret, data) = read_all_input(s, max_size, false); | |
308 | if (ret != 0) { | |
309 | ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl; | |
310 | return ret; | |
311 | } | |
312 | m_s3select_query = data.to_str(); | |
313 | if (m_s3select_query.length() > 0) { | |
314 | ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl; | |
315 | } else { | |
316 | ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl; | |
317 | return -1; | |
318 | } | |
319 | int status = handle_aws_cli_parameters(m_sql_query); | |
320 | if (status<0) { | |
321 | return status; | |
322 | } | |
323 | return RGWGetObj_ObjStore_S3::get_params(y); | |
324 | } | |
325 | ||
326 | int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length) | |
327 | { | |
328 | int status = 0; | |
329 | uint32_t length_before_processing, length_post_processing; | |
330 | csv_object::csv_defintions csv; | |
331 | const char* s3select_syntax_error = "s3select-Syntax-Error"; | |
332 | const char* s3select_resource_id = "resourcse-id"; | |
333 | const char* s3select_processTime_error = "s3select-ProcessingTime-Error"; | |
334 | ||
335 | s3select_syntax.parse_query(query); | |
336 | if (m_row_delimiter.size()) { | |
337 | csv.row_delimiter = *m_row_delimiter.c_str(); | |
338 | } | |
339 | if (m_column_delimiter.size()) { | |
340 | csv.column_delimiter = *m_column_delimiter.c_str(); | |
341 | } | |
342 | if (m_quot.size()) { | |
343 | csv.quot_char = *m_quot.c_str(); | |
344 | } | |
345 | if (m_escape_char.size()) { | |
346 | csv.escape_char = *m_escape_char.c_str(); | |
347 | } | |
348 | if (m_enable_progress.compare("true")==0) { | |
349 | enable_progress = true; | |
350 | } else { | |
351 | enable_progress = false; | |
352 | } | |
353 | if (output_row_delimiter.size()) { | |
354 | csv.output_row_delimiter = *output_row_delimiter.c_str(); | |
355 | } | |
356 | if (output_column_delimiter.size()) { | |
357 | csv.output_column_delimiter = *output_column_delimiter.c_str(); | |
358 | } | |
359 | if (output_quot.size()) { | |
360 | csv.output_quot_char = *output_quot.c_str(); | |
361 | } | |
362 | if (output_escape_char.size()) { | |
363 | csv.output_escape_char = *output_escape_char.c_str(); | |
364 | } | |
365 | if(output_quote_fields.compare("ALWAYS") == 0) { | |
366 | csv.quote_fields_always = true; | |
367 | } else if(output_quote_fields.compare("ASNEEDED") == 0) { | |
368 | csv.quote_fields_asneeded = true; | |
369 | } | |
370 | if(m_header_info.compare("IGNORE")==0) { | |
371 | csv.ignore_header_info=true; | |
372 | } else if(m_header_info.compare("USE")==0) { | |
373 | csv.use_header_info=true; | |
374 | } | |
375 | m_s3_csv_object.set_csv_query(&s3select_syntax, csv); | |
376 | m_aws_response_handler.init_response(); | |
377 | if (s3select_syntax.get_error_description().empty() == false) { | |
378 | //error-flow (syntax-error) | |
379 | m_aws_response_handler.send_error_response(s3select_syntax_error, | |
380 | s3select_syntax.get_error_description().c_str(), | |
381 | s3select_resource_id); | |
382 | ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl; | |
383 | return -1; | |
384 | } else { | |
385 | if (input == nullptr) { | |
386 | input = ""; | |
387 | } | |
388 | m_aws_response_handler.init_success_response(); | |
389 | length_before_processing = (m_aws_response_handler.get_sql_result()).size(); | |
390 | //query is correct(syntax), processing is starting. | |
391 | status = m_s3_csv_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, s->obj_size); | |
392 | length_post_processing = (m_aws_response_handler.get_sql_result()).size(); | |
393 | m_aws_response_handler.update_total_bytes_returned(length_post_processing-length_before_processing); | |
394 | if (status < 0) { | |
395 | //error flow(processing-time) | |
396 | m_aws_response_handler.send_error_response(s3select_processTime_error, | |
397 | m_s3_csv_object.get_error_description().c_str(), | |
398 | s3select_resource_id); | |
399 | ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl; | |
400 | return -1; | |
401 | } | |
402 | if (chunk_number == 0) { | |
403 | //success flow | |
404 | if (op_ret < 0) { | |
405 | set_req_state_err(s, op_ret); | |
406 | } | |
407 | dump_errno(s); | |
408 | // Explicitly use chunked transfer encoding so that we can stream the result | |
409 | // to the user without having to wait for the full length of it. | |
410 | end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); | |
411 | } | |
412 | chunk_number++; | |
413 | } | |
414 | if (length_post_processing-length_before_processing != 0) { | |
415 | m_aws_response_handler.send_success_response(); | |
416 | } else { | |
417 | m_aws_response_handler.send_continuation_response(); | |
418 | } | |
419 | if (enable_progress == true) { | |
420 | m_aws_response_handler.init_progress_response(); | |
421 | m_aws_response_handler.send_progress_response(); | |
422 | } | |
423 | return status; | |
424 | } | |
425 | ||
426 | int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) | |
427 | { | |
428 | int status = 0; | |
429 | #ifdef _ARROW_EXIST | |
430 | if (!m_s3_parquet_object.is_set()) { | |
431 | s3select_syntax.parse_query(m_sql_query.c_str()); | |
432 | try { | |
433 | m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api); | |
434 | } catch(base_s3select_exception& e) { | |
435 | ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl; | |
436 | fp_result_header_format(m_aws_response_handler.get_sql_result()); | |
437 | m_aws_response_handler.get_sql_result().append(e.what()); | |
438 | fp_s3select_result_format(m_aws_response_handler.get_sql_result()); | |
439 | return -1; | |
440 | } | |
441 | } | |
442 | if (s3select_syntax.get_error_description().empty() == false) { | |
443 | fp_result_header_format(m_aws_response_handler.get_sql_result()); | |
444 | m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data()); | |
445 | fp_s3select_result_format(m_aws_response_handler.get_sql_result()); | |
446 | ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl; | |
447 | status = -1; | |
448 | } else { | |
449 | fp_result_header_format(m_aws_response_handler.get_sql_result()); | |
450 | status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format); | |
451 | if (status < 0) { | |
452 | m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description()); | |
453 | fp_s3select_result_format(m_aws_response_handler.get_sql_result()); | |
454 | ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl; | |
455 | } | |
456 | } | |
457 | #endif | |
458 | return status; | |
459 | } | |
460 | ||
461 | int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query) | |
462 | { | |
463 | std::string input_tag{"InputSerialization"}; | |
464 | std::string output_tag{"OutputSerialization"}; | |
465 | if(chunk_number !=0) { | |
466 | return 0; | |
467 | } | |
468 | #define GT ">" | |
469 | #define LT "<" | |
470 | if (m_s3select_query.find(GT) != std::string::npos) { | |
471 | boost::replace_all(m_s3select_query, GT, ">"); | |
472 | } | |
473 | if (m_s3select_query.find(LT) != std::string::npos) { | |
474 | boost::replace_all(m_s3select_query, LT, "<"); | |
475 | } | |
476 | //AWS cli s3select parameters | |
477 | extract_by_tag(m_s3select_query, "Expression", sql_query); | |
478 | extract_by_tag(m_s3select_query, "Enabled", m_enable_progress); | |
479 | size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0); | |
480 | size_t _qe = m_s3select_query.find("</" + input_tag + ">", _qi); | |
481 | m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2)); | |
482 | extract_by_tag(m_s3select_input, "FieldDelimiter", m_column_delimiter); | |
483 | extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot); | |
484 | extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter); | |
485 | extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info); | |
486 | if (m_row_delimiter.size()==0) { | |
487 | m_row_delimiter='\n'; | |
488 | } else if(m_row_delimiter.compare(" ") == 0) { | |
489 | //presto change | |
490 | m_row_delimiter='\n'; | |
491 | } | |
492 | extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char); | |
493 | extract_by_tag(m_s3select_input, "CompressionType", m_compression_type); | |
494 | size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0); | |
495 | size_t _qs = m_s3select_query.find("</" + output_tag + ">", _qi); | |
496 | m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2)); | |
497 | extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter); | |
498 | extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot); | |
499 | extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char); | |
500 | extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields); | |
501 | extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter); | |
502 | if (output_row_delimiter.size()==0) { | |
503 | output_row_delimiter='\n'; | |
504 | } else if(output_row_delimiter.compare(" ") == 0) { | |
505 | //presto change | |
506 | output_row_delimiter='\n'; | |
507 | } | |
508 | if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) { | |
509 | ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl; | |
510 | return -1; | |
511 | } | |
512 | return 0; | |
513 | } | |
514 | ||
515 | int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result) | |
516 | { | |
517 | result = ""; | |
518 | size_t _qs = input.find("<" + tag_name + ">", 0); | |
519 | size_t qs_input = _qs + tag_name.size() + 2; | |
520 | if (_qs == std::string::npos) { | |
521 | return -1; | |
522 | } | |
523 | size_t _qe = input.find("</" + tag_name + ">", qs_input); | |
524 | if (_qe == std::string::npos) { | |
525 | return -1; | |
526 | } | |
527 | result = input.substr(qs_input, _qe - qs_input); | |
528 | return 0; | |
529 | } | |
530 | ||
531 | size_t RGWSelectObj_ObjStore_S3::get_obj_size() | |
532 | { | |
533 | return s->obj_size; | |
534 | } | |
535 | ||
536 | int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff, optional_yield y) | |
537 | { | |
538 | //purpose: implementation for arrow::ReadAt, this may take several async calls. | |
539 | //send_response_date(call_back) accumulate buffer, upon completion control is back to ReadAt. | |
540 | range_req_str = "bytes=" + std::to_string(ofs) + "-" + std::to_string(ofs+len-1); | |
541 | range_str = range_req_str.c_str(); | |
542 | range_parsed = false; | |
543 | RGWGetObj::parse_range(); | |
544 | requested_buffer.clear(); | |
545 | m_request_range = len; | |
546 | ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl; | |
547 | RGWGetObj::execute(y); | |
548 | memcpy(buff, requested_buffer.data(), len); | |
549 | ldout(s->cct, 10) << "S3select: done waiting, buffer is complete buffer-size:" << requested_buffer.size() << dendl; | |
550 | return len; | |
551 | } | |
552 | ||
553 | void RGWSelectObj_ObjStore_S3::execute(optional_yield y) | |
554 | { | |
555 | int status = 0; | |
556 | char parquet_magic[4]; | |
557 | static constexpr uint8_t parquet_magic1[4] = {'P', 'A', 'R', '1'}; | |
558 | static constexpr uint8_t parquet_magicE[4] = {'P', 'A', 'R', 'E'}; | |
559 | get_params(y); | |
560 | #ifdef _ARROW_EXIST | |
561 | m_rgw_api.m_y = &y; | |
562 | #endif | |
563 | if (m_parquet_type) { | |
564 | //parquet processing | |
565 | range_request(0, 4, parquet_magic, y); | |
566 | if(memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)) { | |
567 | ldout(s->cct, 10) << s->object->get_name() << " does not contain parquet magic" << dendl; | |
568 | op_ret = -ERR_INVALID_REQUEST; | |
569 | return; | |
570 | } | |
571 | s3select_syntax.parse_query(m_sql_query.c_str()); | |
572 | status = run_s3select_on_parquet(m_sql_query.c_str()); | |
573 | if (status) { | |
574 | ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl; | |
575 | op_ret = -ERR_INVALID_REQUEST; | |
576 | } else { | |
577 | ldout(s->cct, 10) << "S3select: complete query with success " << dendl; | |
578 | } | |
579 | } else { | |
580 | //CSV processing | |
581 | RGWGetObj::execute(y); | |
582 | } | |
583 | } | |
584 | ||
585 | int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len) | |
586 | { | |
587 | if (chunk_number == 0) { | |
588 | if (op_ret < 0) { | |
589 | set_req_state_err(s, op_ret); | |
590 | } | |
591 | dump_errno(s); | |
592 | } | |
593 | // Explicitly use chunked transfer encoding so that we can stream the result | |
594 | // to the user without having to wait for the full length of it. | |
595 | if (chunk_number == 0) { | |
596 | end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); | |
597 | } | |
598 | chunk_number++; | |
599 | size_t append_in_callback = 0; | |
600 | int part_no = 1; | |
601 | //concat the requested buffer | |
602 | for (auto& it : bl.buffers()) { | |
603 | if(it.length() == 0) { | |
604 | ldout(s->cct, 10) << "S3select: get zero-buffer while appending request-buffer " << dendl; | |
605 | } | |
606 | append_in_callback += it.length(); | |
607 | ldout(s->cct, 10) << "S3select: part " << part_no++ << " it.length() = " << it.length() << dendl; | |
608 | requested_buffer.append(&(it)[0]+ofs, len); | |
609 | } | |
610 | ldout(s->cct, 10) << "S3select:append_in_callback = " << append_in_callback << dendl; | |
611 | if (requested_buffer.size() < m_request_range) { | |
612 | ldout(s->cct, 10) << "S3select: need another round buffe-size: " << requested_buffer.size() << " request range length:" << m_request_range << dendl; | |
613 | return 0; | |
614 | } else {//buffer is complete | |
615 | ldout(s->cct, 10) << "S3select: buffer is complete " << requested_buffer.size() << " request range length:" << m_request_range << dendl; | |
616 | m_request_range = 0; | |
617 | } | |
618 | return 0; | |
619 | } | |
620 | ||
621 | int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len) | |
622 | { | |
623 | int status = 0; | |
624 | ||
625 | if (s->obj_size == 0) { | |
626 | status = run_s3select(m_sql_query.c_str(), nullptr, 0); | |
627 | } else { | |
628 | auto bl_len = bl.get_num_buffers(); | |
629 | int i=0; | |
630 | for(auto& it : bl.buffers()) { | |
631 | ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs | |
632 | << " len " << len << " obj-size " << s->obj_size << dendl; | |
633 | if(it.length() == 0) { | |
634 | ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len | |
635 | << " obj-size " << s->obj_size << dendl; | |
636 | continue; | |
637 | } | |
638 | m_aws_response_handler.update_processed_size(it.length()); | |
639 | status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length()); | |
640 | if(status<0) { | |
641 | break; | |
642 | } | |
643 | i++; | |
644 | } | |
645 | } | |
646 | if (m_aws_response_handler.get_processed_size() == s->obj_size) { | |
647 | if (status >=0) { | |
648 | m_aws_response_handler.init_stats_response(); | |
649 | m_aws_response_handler.send_stats_response(); | |
650 | m_aws_response_handler.init_end_response(); | |
651 | } | |
652 | } | |
653 | return status; | |
654 | } | |
655 | ||
656 | int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len) | |
657 | { | |
658 | if (!m_aws_response_handler.is_set()) { | |
659 | m_aws_response_handler.set(s, this); | |
660 | } | |
661 | if(len == 0 && s->obj_size != 0) { | |
662 | return 0; | |
663 | } | |
664 | if (m_parquet_type) { | |
665 | return parquet_processing(bl,ofs,len); | |
666 | } | |
667 | return csv_processing(bl,ofs,len); | |
668 | } | |
669 |