1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
11 #include "common/ceph_crypto.h"
12 #include "common/split.h"
13 #include "common/Formatter.h"
14 #include "common/utf8.h"
15 #include "common/ceph_json.h"
16 #include "common/safe_io.h"
17 #include "common/errno.h"
18 #include "auth/Crypto.h"
19 #include <boost/algorithm/string.hpp>
20 #include <boost/algorithm/string/replace.hpp>
21 #include <boost/tokenizer.hpp>
22 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
23 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
24 #pragma clang diagnostic push
25 #pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion"
27 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
28 #pragma clang diagnostic pop
30 #undef BOOST_BIND_GLOBAL_PLACEHOLDERS
32 #include <liboath/oath.h>
35 #pragma GCC diagnostic push
36 #pragma clang diagnostic push
37 #pragma GCC diagnostic ignored "-Wdeprecated"
38 #pragma clang diagnostic ignored "-Wdeprecated"
39 #include <s3select/include/s3select.h>
40 #pragma GCC diagnostic pop
41 #pragma clang diagnostic pop
43 #include "rgw_rest_s3.h"
44 #include "rgw_s3select.h"
46 class aws_response_handler
50 std::string sql_result
;
53 // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
54 boost::crc_optimal
<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32
;
56 std::string m_buff_header
;
57 uint64_t total_bytes_returned
;
58 uint64_t processed_size
;
60 enum class header_name_En
{
68 enum class header_value_En
{
81 const char* PAYLOAD_LINE
= "\n<Payload>\n<Records>\n<Payload>\n";
82 const char* END_PAYLOAD_LINE
= "\n</Payload></Records></Payload>";
83 const char* header_name_str
[5] = {":event-type", ":content-type", ":message-type", ":error-code", ":error-message"};
84 const char* header_value_str
[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error", "error"};
85 static constexpr size_t header_crc_size
= 12;
87 void push_header(const char* header_name
, const char* header_value
);
89 int create_message(u_int32_t header_len
);
92 aws_response_handler(req_state
* ps
, RGWOp
* rgwop
) : s(ps
), m_rgwop(rgwop
), total_bytes_returned
{0}, processed_size
{0}
95 aws_response_handler() : s(nullptr), m_rgwop(nullptr), total_bytes_returned
{0}, processed_size
{0}
100 if(s
==nullptr || m_rgwop
== nullptr){
106 void set(req_state
* ps
, RGWOp
* rgwop
)
112 std::string
& get_sql_result();
114 uint64_t get_processed_size();
116 void update_processed_size(uint64_t value
);
118 uint64_t get_total_bytes_returned();
120 void update_total_bytes_returned(uint64_t value
);
122 int create_header_records();
124 int create_header_continuation();
126 int create_header_progress();
128 int create_header_stats();
130 int create_header_end();
132 int create_error_header_records(const char* error_message
);
134 void init_response();
136 void init_success_response();
138 void send_continuation_response();
140 void init_progress_response();
142 void init_end_response();
144 void init_stats_response();
146 void init_error_response(const char* error_message
);
148 void send_success_response();
150 void send_progress_response();
152 void send_stats_response();
154 void send_error_response(const char* error_code
,
155 const char* error_message
,
156 const char* resource_id
);
158 }; //end class aws_response_handler
160 class RGWSelectObj_ObjStore_S3
: public RGWGetObj_ObjStore_S3
164 s3selectEngine::s3select s3select_syntax
;
165 std::string m_s3select_query
;
166 std::string m_s3select_input
;
167 std::string m_s3select_output
;
168 s3selectEngine::csv_object m_s3_csv_object
;
170 s3selectEngine::parquet_object m_s3_parquet_object
;
172 s3selectEngine::json_object m_s3_json_object
;
173 std::string m_column_delimiter
;
175 std::string m_row_delimiter
;
176 std::string m_compression_type
;
177 std::string m_escape_char
;
178 std::unique_ptr
<char[]> m_buff_header
;
179 std::string m_header_info
;
180 std::string m_sql_query
;
181 std::string m_enable_progress
;
182 std::string output_column_delimiter
;
183 std::string output_quot
;
184 std::string output_escape_char
;
185 std::string output_quote_fields
;
186 std::string output_row_delimiter
;
187 std::string m_start_scan
;
188 std::string m_end_scan
;
189 bool m_scan_range_ind
;
190 int64_t m_start_scan_sz
;
191 int64_t m_end_scan_sz
;
192 int64_t m_object_size_for_processing
;
193 aws_response_handler m_aws_response_handler
;
194 bool enable_progress
;
199 std::string m_json_datatype
;
202 s3selectEngine::rgw_s3select_api m_rgw_api
;
204 //a request for range may statisfy by several calls to send_response_date;
205 size_t m_request_range
;
206 std::string requested_buffer
;
207 std::string range_req_str
;
208 std::function
<int(std::string
&)> fp_result_header_format
;
209 std::function
<int(std::string
&)> fp_s3select_result_format
;
210 std::function
<void(const char*)> fp_debug_mesg
;
211 std::function
<void(void)> fp_chunked_transfer_encoding
;
215 unsigned int chunk_number
;
216 size_t m_requested_range
;
217 size_t m_scan_offset
;
218 bool m_skip_next_chunk
;
219 bool m_is_trino_request
;
221 RGWSelectObj_ObjStore_S3();
222 virtual ~RGWSelectObj_ObjStore_S3();
224 virtual int send_response_data(bufferlist
& bl
, off_t ofs
, off_t len
) override
;
226 virtual int get_params(optional_yield y
) override
;
228 virtual void execute(optional_yield
) override
;
232 int csv_processing(bufferlist
& bl
, off_t ofs
, off_t len
);
234 int parquet_processing(bufferlist
& bl
, off_t ofs
, off_t len
);
236 int json_processing(bufferlist
& bl
, off_t ofs
, off_t len
);
238 int run_s3select_on_csv(const char* query
, const char* input
, size_t input_length
);
240 int run_s3select_on_parquet(const char* query
);
242 int run_s3select_on_json(const char* query
, const char* input
, size_t input_length
);
244 int extract_by_tag(std::string input
, std::string tag_name
, std::string
& result
);
246 void convert_escape_seq(std::string
& esc
);
248 int handle_aws_cli_parameters(std::string
& sql_query
);
250 int range_request(int64_t start
, int64_t len
, void*, optional_yield
);
252 size_t get_obj_size();
253 std::function
<int(int64_t, int64_t, void*, optional_yield
*)> fp_range_req
;
254 std::function
<size_t(void)> fp_get_obj_size
;
256 void shape_chunk_per_trino_requests(const char*, off_t
& ofs
, off_t
& len
);