]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_s3select.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_s3select.cc
CommitLineData
20effc67
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "rgw_s3select_private.h"
5
6namespace rgw::s3select {
7RGWOp* create_s3select_op()
8{
9 return new RGWSelectObj_ObjStore_S3();
10}
11};
12
13using namespace s3selectEngine;
14
15std::string& aws_response_handler::get_sql_result()
16{
17 return sql_result;
18}
19
20uint64_t aws_response_handler::get_processed_size()
21{
22 return processed_size;
23}
24
25void aws_response_handler::update_processed_size(uint64_t value)
26{
27 processed_size += value;
28}
29
30uint64_t aws_response_handler::get_total_bytes_returned()
31{
32 return total_bytes_returned;
33}
34
35void aws_response_handler::update_total_bytes_returned(uint64_t value)
36{
37 total_bytes_returned += value;
38}
39
40void aws_response_handler::push_header(const char* header_name, const char* header_value)
41{
42 char x;
43 short s;
44 x = char(strlen(header_name));
45 m_buff_header.append(&x, sizeof(x));
46 m_buff_header.append(header_name);
47 x = char(7);
48 m_buff_header.append(&x, sizeof(x));
49 s = htons(uint16_t(strlen(header_value)));
50 m_buff_header.append(reinterpret_cast<char*>(&s), sizeof(s));
51 m_buff_header.append(header_value);
52}
53
54#define IDX( x ) static_cast<int>( x )
55
56int aws_response_handler::create_header_records()
57{
58 //headers description(AWS)
59 //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
60 //1
61 push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::RECORDS)]);
62 //2
63 push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]);
64 //3
65 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
66 return m_buff_header.size();
67}
68
69int aws_response_handler::create_header_continuation()
70{
71 //headers description(AWS)
72 //1
73 push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]);
74 //2
75 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
76 return m_buff_header.size();
77}
78
79int aws_response_handler::create_header_progress()
80{
81 //headers description(AWS)
82 //1
83 push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::PROGRESS)]);
84 //2
85 push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
86 //3
87 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
88 return m_buff_header.size();
89}
90
91int aws_response_handler::create_header_stats()
92{
93 //headers description(AWS)
94 //1
95 push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::STATS)]);
96 //2
97 push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
98 //3
99 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
100 return m_buff_header.size();
101}
102
103int aws_response_handler::create_header_end()
104{
105 //headers description(AWS)
106 //1
107 push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]);
108 //2
109 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
110 return m_buff_header.size();
111}
112
113int aws_response_handler::create_error_header_records(const char* error_message)
114{
115 //headers description(AWS)
116 //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
117 //1
118 push_header(header_name_str[IDX(header_name_En::ERROR_CODE)], header_value_str[IDX(header_value_En::ENGINE_ERROR)]);
119 //2
120 push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message);
121 //3
122 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]);
123 return m_buff_header.size();
124}
125
126int aws_response_handler::create_message(u_int32_t header_len)
127{
128 //message description(AWS):
129 //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
130 //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
131 //are created later to the produced SQL result, and actually wrapping the payload.
132 auto push_encode_int = [&](u_int32_t s, int pos) {
133 u_int32_t x = htonl(s);
134 sql_result.replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
135 };
136 u_int32_t total_byte_len = 0;
137 u_int32_t preload_crc = 0;
138 u_int32_t message_crc = 0;
139 total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
140 push_encode_int(total_byte_len, 0);
141 push_encode_int(header_len, 4);
142 crc32.reset();
143 crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
144 preload_crc = crc32();
145 push_encode_int(preload_crc, 8);
146 crc32.reset();
147 crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
148 message_crc = crc32();
149 u_int32_t x = htonl(message_crc);
150 sql_result.append(reinterpret_cast<char*>(&x), sizeof(x));
151 return sql_result.size();
152}
153
154void aws_response_handler::init_response()
155{
156 //12 positions for header-crc
157 sql_result.resize(header_crc_size, '\0');
158}
159
160void aws_response_handler::init_success_response()
161{
162 m_buff_header.clear();
163 header_size = create_header_records();
164 sql_result.append(m_buff_header.c_str(), header_size);
165 sql_result.append(PAYLOAD_LINE);
166}
167
168void aws_response_handler::send_continuation_response()
169{
170 sql_result.resize(header_crc_size, '\0');
171 m_buff_header.clear();
172 header_size = create_header_continuation();
173 sql_result.append(m_buff_header.c_str(), header_size);
174 int buff_len = create_message(header_size);
175 s->formatter->write_bin_data(sql_result.data(), buff_len);
176 rgw_flush_formatter_and_reset(s, s->formatter);
177}
178
179void aws_response_handler::init_progress_response()
180{
181 sql_result.resize(header_crc_size, '\0');
182 m_buff_header.clear();
183 header_size = create_header_progress();
184 sql_result.append(m_buff_header.c_str(), header_size);
185}
186
187void aws_response_handler::init_stats_response()
188{
189 sql_result.resize(header_crc_size, '\0');
190 m_buff_header.clear();
191 header_size = create_header_stats();
192 sql_result.append(m_buff_header.c_str(), header_size);
193}
194
195void aws_response_handler::init_end_response()
196{
197 sql_result.resize(header_crc_size, '\0');
198 m_buff_header.clear();
199 header_size = create_header_end();
200 sql_result.append(m_buff_header.c_str(), header_size);
201 int buff_len = create_message(header_size);
202 s->formatter->write_bin_data(sql_result.data(), buff_len);
203 rgw_flush_formatter_and_reset(s, s->formatter);
204}
205
206void aws_response_handler::init_error_response(const char* error_message)
207{
208 //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
209 m_buff_header.clear();
210 header_size = create_error_header_records(error_message);
211 sql_result.append(m_buff_header.c_str(), header_size);
212}
213
214void aws_response_handler::send_success_response()
215{
216 sql_result.append(END_PAYLOAD_LINE);
217 int buff_len = create_message(header_size);
218 s->formatter->write_bin_data(sql_result.data(), buff_len);
219 rgw_flush_formatter_and_reset(s, s->formatter);
220}
221
222void aws_response_handler::send_error_response(const char* error_code,
223 const char* error_message,
224 const char* resource_id)
225{
226 set_req_state_err(s, 0);
227 dump_errno(s, 400);
228 end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING);
229 dump_start(s);
230 s->formatter->open_object_section("Error");
231 s->formatter->dump_string("Code", error_code);
232 s->formatter->dump_string("Message", error_message);
233 s->formatter->dump_string("Resource", "#Resource#");
234 s->formatter->dump_string("RequestId", resource_id);
235 s->formatter->close_section();
236 rgw_flush_formatter_and_reset(s, s->formatter);
237}
238
239void aws_response_handler::send_progress_response()
240{
241 std::string progress_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Progress>"
242 , get_processed_size(), get_processed_size(), get_total_bytes_returned());
243 sql_result.append(progress_payload);
244 int buff_len = create_message(header_size);
245 s->formatter->write_bin_data(sql_result.data(), buff_len);
246 rgw_flush_formatter_and_reset(s, s->formatter);
247}
248
249void aws_response_handler::send_stats_response()
250{
251 std::string stats_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Stats>"
252 , get_processed_size(), get_processed_size(), get_total_bytes_returned());
253 sql_result.append(stats_payload);
254 int buff_len = create_message(header_size);
255 s->formatter->write_bin_data(sql_result.data(), buff_len);
256 rgw_flush_formatter_and_reset(s, s->formatter);
257}
258
259RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
260 m_buff_header(std::make_unique<char[]>(1000)),
261 m_parquet_type(false),
262 chunk_number(0)
263{
264 set_get_data(true);
265 fp_get_obj_size = [&]() {
266 return get_obj_size();
267 };
268 fp_range_req = [&](int64_t start, int64_t len, void* buff, optional_yield* y) {
269 ldout(s->cct, 10) << "S3select: range-request start: " << start << " length: " << len << dendl;
270 auto status = range_request(start, len, buff, *y);
271 return status;
272 };
273#ifdef _ARROW_EXIST
274 m_rgw_api.set_get_size_api(fp_get_obj_size);
275 m_rgw_api.set_range_req_api(fp_range_req);
276#endif
277 fp_result_header_format = [this](std::string& result) {
278 m_aws_response_handler.init_response();
279 m_aws_response_handler.init_success_response();
280 return 0;
281 };
282 fp_s3select_result_format = [this](std::string& result) {
283 m_aws_response_handler.send_success_response();
284 return 0;
285 };
286}
287
288RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
289{}
290
291int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
292{
293 if(m_s3select_query.empty() == false) {
294 return 0;
295 }
296 if(s->object->get_name().find(".parquet") != std::string::npos) { //aws cli is missing the parquet
297#ifdef _ARROW_EXIST
298 m_parquet_type = true;
299#else
300 ldpp_dout(this, 10) << "arrow library is not installed" << dendl;
301#endif
302 }
303 //retrieve s3-select query from payload
304 bufferlist data;
305 int ret;
306 int max_size = 4096;
307 std::tie(ret, data) = read_all_input(s, max_size, false);
308 if (ret != 0) {
309 ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
310 return ret;
311 }
312 m_s3select_query = data.to_str();
313 if (m_s3select_query.length() > 0) {
314 ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl;
315 } else {
316 ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
317 return -1;
318 }
319 int status = handle_aws_cli_parameters(m_sql_query);
320 if (status<0) {
321 return status;
322 }
323 return RGWGetObj_ObjStore_S3::get_params(y);
324}
325
326int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
327{
328 int status = 0;
329 uint32_t length_before_processing, length_post_processing;
330 csv_object::csv_defintions csv;
331 const char* s3select_syntax_error = "s3select-Syntax-Error";
332 const char* s3select_resource_id = "resourcse-id";
333 const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
334
335 s3select_syntax.parse_query(query);
336 if (m_row_delimiter.size()) {
337 csv.row_delimiter = *m_row_delimiter.c_str();
338 }
339 if (m_column_delimiter.size()) {
340 csv.column_delimiter = *m_column_delimiter.c_str();
341 }
342 if (m_quot.size()) {
343 csv.quot_char = *m_quot.c_str();
344 }
345 if (m_escape_char.size()) {
346 csv.escape_char = *m_escape_char.c_str();
347 }
348 if (m_enable_progress.compare("true")==0) {
349 enable_progress = true;
350 } else {
351 enable_progress = false;
352 }
353 if (output_row_delimiter.size()) {
354 csv.output_row_delimiter = *output_row_delimiter.c_str();
355 }
356 if (output_column_delimiter.size()) {
357 csv.output_column_delimiter = *output_column_delimiter.c_str();
358 }
359 if (output_quot.size()) {
360 csv.output_quot_char = *output_quot.c_str();
361 }
362 if (output_escape_char.size()) {
363 csv.output_escape_char = *output_escape_char.c_str();
364 }
365 if(output_quote_fields.compare("ALWAYS") == 0) {
366 csv.quote_fields_always = true;
367 } else if(output_quote_fields.compare("ASNEEDED") == 0) {
368 csv.quote_fields_asneeded = true;
369 }
370 if(m_header_info.compare("IGNORE")==0) {
371 csv.ignore_header_info=true;
372 } else if(m_header_info.compare("USE")==0) {
373 csv.use_header_info=true;
374 }
375 m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
376 m_aws_response_handler.init_response();
377 if (s3select_syntax.get_error_description().empty() == false) {
378 //error-flow (syntax-error)
379 m_aws_response_handler.send_error_response(s3select_syntax_error,
380 s3select_syntax.get_error_description().c_str(),
381 s3select_resource_id);
382 ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
383 return -1;
384 } else {
385 if (input == nullptr) {
386 input = "";
387 }
388 m_aws_response_handler.init_success_response();
389 length_before_processing = (m_aws_response_handler.get_sql_result()).size();
390 //query is correct(syntax), processing is starting.
391 status = m_s3_csv_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, s->obj_size);
392 length_post_processing = (m_aws_response_handler.get_sql_result()).size();
393 m_aws_response_handler.update_total_bytes_returned(length_post_processing-length_before_processing);
394 if (status < 0) {
395 //error flow(processing-time)
396 m_aws_response_handler.send_error_response(s3select_processTime_error,
397 m_s3_csv_object.get_error_description().c_str(),
398 s3select_resource_id);
399 ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
400 return -1;
401 }
402 if (chunk_number == 0) {
403 //success flow
404 if (op_ret < 0) {
405 set_req_state_err(s, op_ret);
406 }
407 dump_errno(s);
408 // Explicitly use chunked transfer encoding so that we can stream the result
409 // to the user without having to wait for the full length of it.
410 end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
411 }
412 chunk_number++;
413 }
414 if (length_post_processing-length_before_processing != 0) {
415 m_aws_response_handler.send_success_response();
416 } else {
417 m_aws_response_handler.send_continuation_response();
418 }
419 if (enable_progress == true) {
420 m_aws_response_handler.init_progress_response();
421 m_aws_response_handler.send_progress_response();
422 }
423 return status;
424}
425
426int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
427{
428 int status = 0;
429#ifdef _ARROW_EXIST
430 if (!m_s3_parquet_object.is_set()) {
431 s3select_syntax.parse_query(m_sql_query.c_str());
432 try {
433 m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
434 } catch(base_s3select_exception& e) {
435 ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl;
436 fp_result_header_format(m_aws_response_handler.get_sql_result());
437 m_aws_response_handler.get_sql_result().append(e.what());
438 fp_s3select_result_format(m_aws_response_handler.get_sql_result());
439 return -1;
440 }
441 }
442 if (s3select_syntax.get_error_description().empty() == false) {
443 fp_result_header_format(m_aws_response_handler.get_sql_result());
444 m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data());
445 fp_s3select_result_format(m_aws_response_handler.get_sql_result());
446 ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
447 status = -1;
448 } else {
449 fp_result_header_format(m_aws_response_handler.get_sql_result());
450 status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format);
451 if (status < 0) {
452 m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description());
453 fp_s3select_result_format(m_aws_response_handler.get_sql_result());
454 ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl;
455 }
456 }
457#endif
458 return status;
459}
460
461int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
462{
463 std::string input_tag{"InputSerialization"};
464 std::string output_tag{"OutputSerialization"};
465 if(chunk_number !=0) {
466 return 0;
467 }
468#define GT "&gt;"
469#define LT "&lt;"
470 if (m_s3select_query.find(GT) != std::string::npos) {
471 boost::replace_all(m_s3select_query, GT, ">");
472 }
473 if (m_s3select_query.find(LT) != std::string::npos) {
474 boost::replace_all(m_s3select_query, LT, "<");
475 }
476 //AWS cli s3select parameters
477 extract_by_tag(m_s3select_query, "Expression", sql_query);
478 extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
479 size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
480 size_t _qe = m_s3select_query.find("</" + input_tag + ">", _qi);
481 m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2));
482 extract_by_tag(m_s3select_input, "FieldDelimiter", m_column_delimiter);
483 extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot);
484 extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter);
485 extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info);
486 if (m_row_delimiter.size()==0) {
487 m_row_delimiter='\n';
488 } else if(m_row_delimiter.compare("&#10;") == 0) {
489 //presto change
490 m_row_delimiter='\n';
491 }
492 extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char);
493 extract_by_tag(m_s3select_input, "CompressionType", m_compression_type);
494 size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0);
495 size_t _qs = m_s3select_query.find("</" + output_tag + ">", _qi);
496 m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2));
497 extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter);
498 extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot);
499 extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char);
500 extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields);
501 extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter);
502 if (output_row_delimiter.size()==0) {
503 output_row_delimiter='\n';
504 } else if(output_row_delimiter.compare("&#10;") == 0) {
505 //presto change
506 output_row_delimiter='\n';
507 }
508 if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) {
509 ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl;
510 return -1;
511 }
512 return 0;
513}
514
515int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result)
516{
517 result = "";
518 size_t _qs = input.find("<" + tag_name + ">", 0);
519 size_t qs_input = _qs + tag_name.size() + 2;
520 if (_qs == std::string::npos) {
521 return -1;
522 }
523 size_t _qe = input.find("</" + tag_name + ">", qs_input);
524 if (_qe == std::string::npos) {
525 return -1;
526 }
527 result = input.substr(qs_input, _qe - qs_input);
528 return 0;
529}
530
531size_t RGWSelectObj_ObjStore_S3::get_obj_size()
532{
533 return s->obj_size;
534}
535
536int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff, optional_yield y)
537{
538 //purpose: implementation for arrow::ReadAt, this may take several async calls.
539 //send_response_date(call_back) accumulate buffer, upon completion control is back to ReadAt.
540 range_req_str = "bytes=" + std::to_string(ofs) + "-" + std::to_string(ofs+len-1);
541 range_str = range_req_str.c_str();
542 range_parsed = false;
543 RGWGetObj::parse_range();
544 requested_buffer.clear();
545 m_request_range = len;
546 ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl;
547 RGWGetObj::execute(y);
548 memcpy(buff, requested_buffer.data(), len);
549 ldout(s->cct, 10) << "S3select: done waiting, buffer is complete buffer-size:" << requested_buffer.size() << dendl;
550 return len;
551}
552
553void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
554{
555 int status = 0;
556 char parquet_magic[4];
557 static constexpr uint8_t parquet_magic1[4] = {'P', 'A', 'R', '1'};
558 static constexpr uint8_t parquet_magicE[4] = {'P', 'A', 'R', 'E'};
559 get_params(y);
560#ifdef _ARROW_EXIST
561 m_rgw_api.m_y = &y;
562#endif
563 if (m_parquet_type) {
564 //parquet processing
565 range_request(0, 4, parquet_magic, y);
566 if(memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)) {
567 ldout(s->cct, 10) << s->object->get_name() << " does not contain parquet magic" << dendl;
568 op_ret = -ERR_INVALID_REQUEST;
569 return;
570 }
571 s3select_syntax.parse_query(m_sql_query.c_str());
572 status = run_s3select_on_parquet(m_sql_query.c_str());
573 if (status) {
574 ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl;
575 op_ret = -ERR_INVALID_REQUEST;
576 } else {
577 ldout(s->cct, 10) << "S3select: complete query with success " << dendl;
578 }
579 } else {
580 //CSV processing
581 RGWGetObj::execute(y);
582 }
583}
584
585int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len)
586{
587 if (chunk_number == 0) {
588 if (op_ret < 0) {
589 set_req_state_err(s, op_ret);
590 }
591 dump_errno(s);
592 }
593 // Explicitly use chunked transfer encoding so that we can stream the result
594 // to the user without having to wait for the full length of it.
595 if (chunk_number == 0) {
596 end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
597 }
598 chunk_number++;
599 size_t append_in_callback = 0;
600 int part_no = 1;
601 //concat the requested buffer
602 for (auto& it : bl.buffers()) {
603 if(it.length() == 0) {
604 ldout(s->cct, 10) << "S3select: get zero-buffer while appending request-buffer " << dendl;
605 }
606 append_in_callback += it.length();
607 ldout(s->cct, 10) << "S3select: part " << part_no++ << " it.length() = " << it.length() << dendl;
608 requested_buffer.append(&(it)[0]+ofs, len);
609 }
610 ldout(s->cct, 10) << "S3select:append_in_callback = " << append_in_callback << dendl;
611 if (requested_buffer.size() < m_request_range) {
612 ldout(s->cct, 10) << "S3select: need another round buffe-size: " << requested_buffer.size() << " request range length:" << m_request_range << dendl;
613 return 0;
614 } else {//buffer is complete
615 ldout(s->cct, 10) << "S3select: buffer is complete " << requested_buffer.size() << " request range length:" << m_request_range << dendl;
616 m_request_range = 0;
617 }
618 return 0;
619}
620
621int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
622{
623 int status = 0;
624
625 if (s->obj_size == 0) {
626 status = run_s3select(m_sql_query.c_str(), nullptr, 0);
627 } else {
628 auto bl_len = bl.get_num_buffers();
629 int i=0;
630 for(auto& it : bl.buffers()) {
631 ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
632 << " len " << len << " obj-size " << s->obj_size << dendl;
633 if(it.length() == 0) {
634 ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
635 << " obj-size " << s->obj_size << dendl;
636 continue;
637 }
638 m_aws_response_handler.update_processed_size(it.length());
639 status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length());
640 if(status<0) {
641 break;
642 }
643 i++;
644 }
645 }
646 if (m_aws_response_handler.get_processed_size() == s->obj_size) {
647 if (status >=0) {
648 m_aws_response_handler.init_stats_response();
649 m_aws_response_handler.send_stats_response();
650 m_aws_response_handler.init_end_response();
651 }
652 }
653 return status;
654}
655
656int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
657{
658 if (!m_aws_response_handler.is_set()) {
659 m_aws_response_handler.set(s, this);
660 }
661 if(len == 0 && s->obj_size != 0) {
662 return 0;
663 }
664 if (m_parquet_type) {
665 return parquet_processing(bl,ofs,len);
666 }
667 return csv_processing(bl,ofs,len);
668}
669