1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_s3select_private.h"
6 namespace rgw::s3select
{
7 RGWOp
* create_s3select_op()
9 return new RGWSelectObj_ObjStore_S3();
13 using namespace s3selectEngine
;
15 std::string
& aws_response_handler::get_sql_result()
20 uint64_t aws_response_handler::get_processed_size()
22 return processed_size
;
25 void aws_response_handler::update_processed_size(uint64_t value
)
27 processed_size
+= value
;
30 uint64_t aws_response_handler::get_total_bytes_returned()
32 return total_bytes_returned
;
35 void aws_response_handler::update_total_bytes_returned(uint64_t value
)
37 total_bytes_returned
+= value
;
40 void aws_response_handler::push_header(const char* header_name
, const char* header_value
)
44 x
= char(strlen(header_name
));
45 m_buff_header
.append(&x
, sizeof(x
));
46 m_buff_header
.append(header_name
);
48 m_buff_header
.append(&x
, sizeof(x
));
49 s
= htons(uint16_t(strlen(header_value
)));
50 m_buff_header
.append(reinterpret_cast<char*>(&s
), sizeof(s
));
51 m_buff_header
.append(header_value
);
54 #define IDX( x ) static_cast<int>( x )
56 int aws_response_handler::create_header_records()
58 //headers description(AWS)
59 //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
61 push_header(header_name_str
[IDX(header_name_En::EVENT_TYPE
)], header_value_str
[IDX(header_value_En::RECORDS
)]);
63 push_header(header_name_str
[IDX(header_name_En::CONTENT_TYPE
)], header_value_str
[IDX(header_value_En::OCTET_STREAM
)]);
65 push_header(header_name_str
[IDX(header_name_En::MESSAGE_TYPE
)], header_value_str
[IDX(header_value_En::EVENT
)]);
66 return m_buff_header
.size();
69 int aws_response_handler::create_header_continuation()
71 //headers description(AWS)
73 push_header(header_name_str
[IDX(header_name_En::EVENT_TYPE
)], header_value_str
[IDX(header_value_En::CONT
)]);
75 push_header(header_name_str
[IDX(header_name_En::MESSAGE_TYPE
)], header_value_str
[IDX(header_value_En::EVENT
)]);
76 return m_buff_header
.size();
79 int aws_response_handler::create_header_progress()
81 //headers description(AWS)
83 push_header(header_name_str
[IDX(header_name_En::EVENT_TYPE
)], header_value_str
[IDX(header_value_En::PROGRESS
)]);
85 push_header(header_name_str
[IDX(header_name_En::CONTENT_TYPE
)], header_value_str
[IDX(header_value_En::XML
)]);
87 push_header(header_name_str
[IDX(header_name_En::MESSAGE_TYPE
)], header_value_str
[IDX(header_value_En::EVENT
)]);
88 return m_buff_header
.size();
91 int aws_response_handler::create_header_stats()
93 //headers description(AWS)
95 push_header(header_name_str
[IDX(header_name_En::EVENT_TYPE
)], header_value_str
[IDX(header_value_En::STATS
)]);
97 push_header(header_name_str
[IDX(header_name_En::CONTENT_TYPE
)], header_value_str
[IDX(header_value_En::XML
)]);
99 push_header(header_name_str
[IDX(header_name_En::MESSAGE_TYPE
)], header_value_str
[IDX(header_value_En::EVENT
)]);
100 return m_buff_header
.size();
103 int aws_response_handler::create_header_end()
105 //headers description(AWS)
107 push_header(header_name_str
[IDX(header_name_En::EVENT_TYPE
)], header_value_str
[IDX(header_value_En::END
)]);
109 push_header(header_name_str
[IDX(header_name_En::MESSAGE_TYPE
)], header_value_str
[IDX(header_value_En::EVENT
)]);
110 return m_buff_header
.size();
113 int aws_response_handler::create_error_header_records(const char* error_message
)
115 //headers description(AWS)
116 //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
118 push_header(header_name_str
[IDX(header_name_En::ERROR_CODE
)], header_value_str
[IDX(header_value_En::ENGINE_ERROR
)]);
120 push_header(header_name_str
[IDX(header_name_En::ERROR_MESSAGE
)], error_message
);
122 push_header(header_name_str
[IDX(header_name_En::MESSAGE_TYPE
)], header_value_str
[IDX(header_value_En::ERROR_TYPE
)]);
123 return m_buff_header
.size();
126 int aws_response_handler::create_message(u_int32_t header_len
)
128 //message description(AWS):
129 //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
130 //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
131 //are created later to the produced SQL result, and actually wrapping the payload.
132 auto push_encode_int
= [&](u_int32_t s
, int pos
) {
133 u_int32_t x
= htonl(s
);
134 sql_result
.replace(pos
, sizeof(x
), reinterpret_cast<char*>(&x
), sizeof(x
));
136 u_int32_t total_byte_len
= 0;
137 u_int32_t preload_crc
= 0;
138 u_int32_t message_crc
= 0;
139 total_byte_len
= sql_result
.size() + 4; //the total is greater in 4 bytes than current size
140 push_encode_int(total_byte_len
, 0);
141 push_encode_int(header_len
, 4);
143 crc32
= std::for_each(sql_result
.data(), sql_result
.data() + 8, crc32
); //crc for starting 8 bytes
144 preload_crc
= crc32();
145 push_encode_int(preload_crc
, 8);
147 crc32
= std::for_each(sql_result
.begin(), sql_result
.end(), crc32
); //crc for payload + checksum
148 message_crc
= crc32();
149 u_int32_t x
= htonl(message_crc
);
150 sql_result
.append(reinterpret_cast<char*>(&x
), sizeof(x
));
151 return sql_result
.size();
154 void aws_response_handler::init_response()
156 //12 positions for header-crc
157 sql_result
.resize(header_crc_size
, '\0');
160 void aws_response_handler::init_success_response()
162 m_buff_header
.clear();
163 header_size
= create_header_records();
164 sql_result
.append(m_buff_header
.c_str(), header_size
);
165 sql_result
.append(PAYLOAD_LINE
);
168 void aws_response_handler::send_continuation_response()
170 sql_result
.resize(header_crc_size
, '\0');
171 m_buff_header
.clear();
172 header_size
= create_header_continuation();
173 sql_result
.append(m_buff_header
.c_str(), header_size
);
174 int buff_len
= create_message(header_size
);
175 s
->formatter
->write_bin_data(sql_result
.data(), buff_len
);
176 rgw_flush_formatter_and_reset(s
, s
->formatter
);
179 void aws_response_handler::init_progress_response()
181 sql_result
.resize(header_crc_size
, '\0');
182 m_buff_header
.clear();
183 header_size
= create_header_progress();
184 sql_result
.append(m_buff_header
.c_str(), header_size
);
187 void aws_response_handler::init_stats_response()
189 sql_result
.resize(header_crc_size
, '\0');
190 m_buff_header
.clear();
191 header_size
= create_header_stats();
192 sql_result
.append(m_buff_header
.c_str(), header_size
);
195 void aws_response_handler::init_end_response()
197 sql_result
.resize(header_crc_size
, '\0');
198 m_buff_header
.clear();
199 header_size
= create_header_end();
200 sql_result
.append(m_buff_header
.c_str(), header_size
);
201 int buff_len
= create_message(header_size
);
202 s
->formatter
->write_bin_data(sql_result
.data(), buff_len
);
203 rgw_flush_formatter_and_reset(s
, s
->formatter
);
206 void aws_response_handler::init_error_response(const char* error_message
)
208 //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
209 m_buff_header
.clear();
210 header_size
= create_error_header_records(error_message
);
211 sql_result
.append(m_buff_header
.c_str(), header_size
);
214 void aws_response_handler::send_success_response()
216 sql_result
.append(END_PAYLOAD_LINE
);
217 int buff_len
= create_message(header_size
);
218 s
->formatter
->write_bin_data(sql_result
.data(), buff_len
);
219 rgw_flush_formatter_and_reset(s
, s
->formatter
);
222 void aws_response_handler::send_error_response(const char* error_code
,
223 const char* error_message
,
224 const char* resource_id
)
226 set_req_state_err(s
, 0);
228 end_header(s
, m_rgwop
, "application/xml", CHUNKED_TRANSFER_ENCODING
);
230 s
->formatter
->open_object_section("Error");
231 s
->formatter
->dump_string("Code", error_code
);
232 s
->formatter
->dump_string("Message", error_message
);
233 s
->formatter
->dump_string("Resource", "#Resource#");
234 s
->formatter
->dump_string("RequestId", resource_id
);
235 s
->formatter
->close_section();
236 rgw_flush_formatter_and_reset(s
, s
->formatter
);
239 void aws_response_handler::send_progress_response()
241 std::string progress_payload
= fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Progress>"
242 , get_processed_size(), get_processed_size(), get_total_bytes_returned());
243 sql_result
.append(progress_payload
);
244 int buff_len
= create_message(header_size
);
245 s
->formatter
->write_bin_data(sql_result
.data(), buff_len
);
246 rgw_flush_formatter_and_reset(s
, s
->formatter
);
249 void aws_response_handler::send_stats_response()
251 std::string stats_payload
= fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Stats>"
252 , get_processed_size(), get_processed_size(), get_total_bytes_returned());
253 sql_result
.append(stats_payload
);
254 int buff_len
= create_message(header_size
);
255 s
->formatter
->write_bin_data(sql_result
.data(), buff_len
);
256 rgw_flush_formatter_and_reset(s
, s
->formatter
);
259 RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
260 m_buff_header(std::make_unique
<char[]>(1000)),
261 m_parquet_type(false),
265 fp_get_obj_size
= [&]() {
266 return get_obj_size();
268 fp_range_req
= [&](int64_t start
, int64_t len
, void* buff
, optional_yield
* y
) {
269 ldout(s
->cct
, 10) << "S3select: range-request start: " << start
<< " length: " << len
<< dendl
;
270 auto status
= range_request(start
, len
, buff
, *y
);
274 m_rgw_api
.set_get_size_api(fp_get_obj_size
);
275 m_rgw_api
.set_range_req_api(fp_range_req
);
277 fp_result_header_format
= [this](std::string
& result
) {
278 m_aws_response_handler
.init_response();
279 m_aws_response_handler
.init_success_response();
282 fp_s3select_result_format
= [this](std::string
& result
) {
283 m_aws_response_handler
.send_success_response();
288 RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
291 int RGWSelectObj_ObjStore_S3::get_params(optional_yield y
)
293 if(m_s3select_query
.empty() == false) {
296 if(s
->object
->get_name().find(".parquet") != std::string::npos
) { //aws cli is missing the parquet
298 m_parquet_type
= true;
300 ldpp_dout(this, 10) << "arrow library is not installed" << dendl
;
303 //retrieve s3-select query from payload
307 std::tie(ret
, data
) = read_all_input(s
, max_size
, false);
309 ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret
<< dendl
;
312 m_s3select_query
= data
.to_str();
313 if (m_s3select_query
.length() > 0) {
314 ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query
<< dendl
;
316 ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl
;
319 int status
= handle_aws_cli_parameters(m_sql_query
);
323 return RGWGetObj_ObjStore_S3::get_params(y
);
326 int RGWSelectObj_ObjStore_S3::run_s3select(const char* query
, const char* input
, size_t input_length
)
329 uint32_t length_before_processing
, length_post_processing
;
330 csv_object::csv_defintions csv
;
331 const char* s3select_syntax_error
= "s3select-Syntax-Error";
332 const char* s3select_resource_id
= "resourcse-id";
333 const char* s3select_processTime_error
= "s3select-ProcessingTime-Error";
335 s3select_syntax
.parse_query(query
);
336 if (m_row_delimiter
.size()) {
337 csv
.row_delimiter
= *m_row_delimiter
.c_str();
339 if (m_column_delimiter
.size()) {
340 csv
.column_delimiter
= *m_column_delimiter
.c_str();
343 csv
.quot_char
= *m_quot
.c_str();
345 if (m_escape_char
.size()) {
346 csv
.escape_char
= *m_escape_char
.c_str();
348 if (m_enable_progress
.compare("true")==0) {
349 enable_progress
= true;
351 enable_progress
= false;
353 if (output_row_delimiter
.size()) {
354 csv
.output_row_delimiter
= *output_row_delimiter
.c_str();
356 if (output_column_delimiter
.size()) {
357 csv
.output_column_delimiter
= *output_column_delimiter
.c_str();
359 if (output_quot
.size()) {
360 csv
.output_quot_char
= *output_quot
.c_str();
362 if (output_escape_char
.size()) {
363 csv
.output_escape_char
= *output_escape_char
.c_str();
365 if(output_quote_fields
.compare("ALWAYS") == 0) {
366 csv
.quote_fields_always
= true;
367 } else if(output_quote_fields
.compare("ASNEEDED") == 0) {
368 csv
.quote_fields_asneeded
= true;
370 if(m_header_info
.compare("IGNORE")==0) {
371 csv
.ignore_header_info
=true;
372 } else if(m_header_info
.compare("USE")==0) {
373 csv
.use_header_info
=true;
375 m_s3_csv_object
.set_csv_query(&s3select_syntax
, csv
);
376 m_aws_response_handler
.init_response();
377 if (s3select_syntax
.get_error_description().empty() == false) {
378 //error-flow (syntax-error)
379 m_aws_response_handler
.send_error_response(s3select_syntax_error
,
380 s3select_syntax
.get_error_description().c_str(),
381 s3select_resource_id
);
382 ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax
.get_error_description() << "}" << dendl
;
385 if (input
== nullptr) {
388 m_aws_response_handler
.init_success_response();
389 length_before_processing
= (m_aws_response_handler
.get_sql_result()).size();
390 //query is correct(syntax), processing is starting.
391 status
= m_s3_csv_object
.run_s3select_on_stream(m_aws_response_handler
.get_sql_result(), input
, input_length
, s
->obj_size
);
392 length_post_processing
= (m_aws_response_handler
.get_sql_result()).size();
393 m_aws_response_handler
.update_total_bytes_returned(length_post_processing
-length_before_processing
);
395 //error flow(processing-time)
396 m_aws_response_handler
.send_error_response(s3select_processTime_error
,
397 m_s3_csv_object
.get_error_description().c_str(),
398 s3select_resource_id
);
399 ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object
.get_error_description() << "}" << dendl
;
402 if (chunk_number
== 0) {
405 set_req_state_err(s
, op_ret
);
408 // Explicitly use chunked transfer encoding so that we can stream the result
409 // to the user without having to wait for the full length of it.
410 end_header(s
, this, "application/xml", CHUNKED_TRANSFER_ENCODING
);
414 if (length_post_processing
-length_before_processing
!= 0) {
415 m_aws_response_handler
.send_success_response();
417 m_aws_response_handler
.send_continuation_response();
419 if (enable_progress
== true) {
420 m_aws_response_handler
.init_progress_response();
421 m_aws_response_handler
.send_progress_response();
426 int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query
)
430 if (!m_s3_parquet_object
.is_set()) {
431 s3select_syntax
.parse_query(m_sql_query
.c_str());
433 m_s3_parquet_object
.set_parquet_object(std::string("s3object"), &s3select_syntax
, &m_rgw_api
);
434 } catch(base_s3select_exception
& e
) {
435 ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e
.what() << dendl
;
436 fp_result_header_format(m_aws_response_handler
.get_sql_result());
437 m_aws_response_handler
.get_sql_result().append(e
.what());
438 fp_s3select_result_format(m_aws_response_handler
.get_sql_result());
442 if (s3select_syntax
.get_error_description().empty() == false) {
443 fp_result_header_format(m_aws_response_handler
.get_sql_result());
444 m_aws_response_handler
.get_sql_result().append(s3select_syntax
.get_error_description().data());
445 fp_s3select_result_format(m_aws_response_handler
.get_sql_result());
446 ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax
.get_error_description() << "}" << dendl
;
449 fp_result_header_format(m_aws_response_handler
.get_sql_result());
450 status
= m_s3_parquet_object
.run_s3select_on_object(m_aws_response_handler
.get_sql_result(), fp_s3select_result_format
, fp_result_header_format
);
452 m_aws_response_handler
.get_sql_result().append(m_s3_parquet_object
.get_error_description());
453 fp_s3select_result_format(m_aws_response_handler
.get_sql_result());
454 ldout(s
->cct
, 10) << "S3select: failure while execution" << m_s3_parquet_object
.get_error_description() << dendl
;
461 int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string
& sql_query
)
463 std::string input_tag
{"InputSerialization"};
464 std::string output_tag
{"OutputSerialization"};
465 if(chunk_number
!=0) {
470 if (m_s3select_query
.find(GT
) != std::string::npos
) {
471 boost::replace_all(m_s3select_query
, GT
, ">");
473 if (m_s3select_query
.find(LT
) != std::string::npos
) {
474 boost::replace_all(m_s3select_query
, LT
, "<");
476 //AWS cli s3select parameters
477 extract_by_tag(m_s3select_query
, "Expression", sql_query
);
478 extract_by_tag(m_s3select_query
, "Enabled", m_enable_progress
);
479 size_t _qi
= m_s3select_query
.find("<" + input_tag
+ ">", 0);
480 size_t _qe
= m_s3select_query
.find("</" + input_tag
+ ">", _qi
);
481 m_s3select_input
= m_s3select_query
.substr(_qi
+ input_tag
.size() + 2, _qe
- (_qi
+ input_tag
.size() + 2));
482 extract_by_tag(m_s3select_input
, "FieldDelimiter", m_column_delimiter
);
483 extract_by_tag(m_s3select_input
, "QuoteCharacter", m_quot
);
484 extract_by_tag(m_s3select_input
, "RecordDelimiter", m_row_delimiter
);
485 extract_by_tag(m_s3select_input
, "FileHeaderInfo", m_header_info
);
486 if (m_row_delimiter
.size()==0) {
487 m_row_delimiter
='\n';
488 } else if(m_row_delimiter
.compare(" ") == 0) {
490 m_row_delimiter
='\n';
492 extract_by_tag(m_s3select_input
, "QuoteEscapeCharacter", m_escape_char
);
493 extract_by_tag(m_s3select_input
, "CompressionType", m_compression_type
);
494 size_t _qo
= m_s3select_query
.find("<" + output_tag
+ ">", 0);
495 size_t _qs
= m_s3select_query
.find("</" + output_tag
+ ">", _qi
);
496 m_s3select_output
= m_s3select_query
.substr(_qo
+ output_tag
.size() + 2, _qs
- (_qo
+ output_tag
.size() + 2));
497 extract_by_tag(m_s3select_output
, "FieldDelimiter", output_column_delimiter
);
498 extract_by_tag(m_s3select_output
, "QuoteCharacter", output_quot
);
499 extract_by_tag(m_s3select_output
, "QuoteEscapeCharacter", output_escape_char
);
500 extract_by_tag(m_s3select_output
, "QuoteFields", output_quote_fields
);
501 extract_by_tag(m_s3select_output
, "RecordDelimiter", output_row_delimiter
);
502 if (output_row_delimiter
.size()==0) {
503 output_row_delimiter
='\n';
504 } else if(output_row_delimiter
.compare(" ") == 0) {
506 output_row_delimiter
='\n';
508 if (m_compression_type
.length()>0 && m_compression_type
.compare("NONE") != 0) {
509 ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl
;
515 int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input
, std::string tag_name
, std::string
& result
)
518 size_t _qs
= input
.find("<" + tag_name
+ ">", 0);
519 size_t qs_input
= _qs
+ tag_name
.size() + 2;
520 if (_qs
== std::string::npos
) {
523 size_t _qe
= input
.find("</" + tag_name
+ ">", qs_input
);
524 if (_qe
== std::string::npos
) {
527 result
= input
.substr(qs_input
, _qe
- qs_input
);
531 size_t RGWSelectObj_ObjStore_S3::get_obj_size()
536 int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs
, int64_t len
, void* buff
, optional_yield y
)
538 //purpose: implementation for arrow::ReadAt, this may take several async calls.
539 //send_response_date(call_back) accumulate buffer, upon completion control is back to ReadAt.
540 range_req_str
= "bytes=" + std::to_string(ofs
) + "-" + std::to_string(ofs
+len
-1);
541 range_str
= range_req_str
.c_str();
542 range_parsed
= false;
543 RGWGetObj::parse_range();
544 requested_buffer
.clear();
545 m_request_range
= len
;
546 ldout(s
->cct
, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs
<< " request-length :" << len
<< " buffer size : " << requested_buffer
.size() << dendl
;
547 RGWGetObj::execute(y
);
548 memcpy(buff
, requested_buffer
.data(), len
);
549 ldout(s
->cct
, 10) << "S3select: done waiting, buffer is complete buffer-size:" << requested_buffer
.size() << dendl
;
553 void RGWSelectObj_ObjStore_S3::execute(optional_yield y
)
556 char parquet_magic
[4];
557 static constexpr uint8_t parquet_magic1
[4] = {'P', 'A', 'R', '1'};
558 static constexpr uint8_t parquet_magicE
[4] = {'P', 'A', 'R', 'E'};
563 if (m_parquet_type
) {
565 range_request(0, 4, parquet_magic
, y
);
566 if(memcmp(parquet_magic
, parquet_magic1
, 4) && memcmp(parquet_magic
, parquet_magicE
, 4)) {
567 ldout(s
->cct
, 10) << s
->object
->get_name() << " does not contain parquet magic" << dendl
;
568 op_ret
= -ERR_INVALID_REQUEST
;
571 s3select_syntax
.parse_query(m_sql_query
.c_str());
572 status
= run_s3select_on_parquet(m_sql_query
.c_str());
574 ldout(s
->cct
, 10) << "S3select: failed to process query <" << m_sql_query
<< "> on object " << s
->object
->get_name() << dendl
;
575 op_ret
= -ERR_INVALID_REQUEST
;
577 ldout(s
->cct
, 10) << "S3select: complete query with success " << dendl
;
581 RGWGetObj::execute(y
);
585 int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist
& bl
, off_t ofs
, off_t len
)
587 if (chunk_number
== 0) {
589 set_req_state_err(s
, op_ret
);
593 // Explicitly use chunked transfer encoding so that we can stream the result
594 // to the user without having to wait for the full length of it.
595 if (chunk_number
== 0) {
596 end_header(s
, this, "application/xml", CHUNKED_TRANSFER_ENCODING
);
599 size_t append_in_callback
= 0;
601 //concat the requested buffer
602 for (auto& it
: bl
.buffers()) {
603 if(it
.length() == 0) {
604 ldout(s
->cct
, 10) << "S3select: get zero-buffer while appending request-buffer " << dendl
;
606 append_in_callback
+= it
.length();
607 ldout(s
->cct
, 10) << "S3select: part " << part_no
++ << " it.length() = " << it
.length() << dendl
;
608 requested_buffer
.append(&(it
)[0]+ofs
, len
);
610 ldout(s
->cct
, 10) << "S3select:append_in_callback = " << append_in_callback
<< dendl
;
611 if (requested_buffer
.size() < m_request_range
) {
612 ldout(s
->cct
, 10) << "S3select: need another round buffe-size: " << requested_buffer
.size() << " request range length:" << m_request_range
<< dendl
;
614 } else {//buffer is complete
615 ldout(s
->cct
, 10) << "S3select: buffer is complete " << requested_buffer
.size() << " request range length:" << m_request_range
<< dendl
;
621 int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist
& bl
, off_t ofs
, off_t len
)
625 if (s
->obj_size
== 0) {
626 status
= run_s3select(m_sql_query
.c_str(), nullptr, 0);
628 auto bl_len
= bl
.get_num_buffers();
630 for(auto& it
: bl
.buffers()) {
631 ldpp_dout(this, 10) << "processing segment " << i
<< " out of " << bl_len
<< " off " << ofs
632 << " len " << len
<< " obj-size " << s
->obj_size
<< dendl
;
633 if(it
.length() == 0) {
634 ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i
<< " out of " << bl_len
635 << " obj-size " << s
->obj_size
<< dendl
;
638 m_aws_response_handler
.update_processed_size(it
.length());
639 status
= run_s3select(m_sql_query
.c_str(), &(it
)[0], it
.length());
646 if (m_aws_response_handler
.get_processed_size() == s
->obj_size
) {
648 m_aws_response_handler
.init_stats_response();
649 m_aws_response_handler
.send_stats_response();
650 m_aws_response_handler
.init_end_response();
656 int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist
& bl
, off_t ofs
, off_t len
)
658 if (!m_aws_response_handler
.is_set()) {
659 m_aws_response_handler
.set(s
, this);
661 if(len
== 0 && s
->obj_size
!= 0) {
664 if (m_parquet_type
) {
665 return parquet_processing(bl
,ofs
,len
);
667 return csv_processing(bl
,ofs
,len
);