1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
11 #include "common/ceph_crypto.h"
12 #include "common/split.h"
13 #include "common/Formatter.h"
14 #include "common/utf8.h"
15 #include "common/ceph_json.h"
16 #include "common/safe_io.h"
17 #include "common/errno.h"
18 #include "auth/Crypto.h"
19 #include <boost/algorithm/string.hpp>
20 #include <boost/algorithm/string/replace.hpp>
21 #include <boost/tokenizer.hpp>
22 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
23 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
24 #pragma clang diagnostic push
25 #pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion"
27 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
28 #pragma clang diagnostic pop
30 #undef BOOST_BIND_GLOBAL_PLACEHOLDERS
32 #include <liboath/oath.h>
35 #include <s3select/include/s3select.h>
36 #include "rgw_rest_s3.h"
37 #include "rgw_s3select.h"
39 class aws_response_handler
43 std::string sql_result
;
46 // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
47 boost::crc_optimal
<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32
;
49 std::string m_buff_header
;
50 uint64_t total_bytes_returned
;
51 uint64_t processed_size
;
53 enum class header_name_En
{
61 enum class header_value_En
{
74 const char* PAYLOAD_LINE
= "\n<Payload>\n<Records>\n<Payload>\n";
75 const char* END_PAYLOAD_LINE
= "\n</Payload></Records></Payload>";
76 const char* header_name_str
[5] = {":event-type", ":content-type", ":message-type", ":error-code", ":error-message"};
77 const char* header_value_str
[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error", "error"};
78 static constexpr size_t header_crc_size
= 12;
80 void push_header(const char* header_name
, const char* header_value
);
82 int create_message(u_int32_t header_len
);
85 aws_response_handler(struct req_state
* ps
, RGWOp
* rgwop
) : s(ps
), m_rgwop(rgwop
), total_bytes_returned
{0}, processed_size
{0}
88 aws_response_handler() : s(nullptr), m_rgwop(nullptr), total_bytes_returned
{0}, processed_size
{0}
93 if(s
==nullptr || m_rgwop
== nullptr){
99 void set(struct req_state
* ps
, RGWOp
* rgwop
)
105 std::string
& get_sql_result();
107 uint64_t get_processed_size();
109 void update_processed_size(uint64_t value
);
111 uint64_t get_total_bytes_returned();
113 void update_total_bytes_returned(uint64_t value
);
115 int create_header_records();
117 int create_header_continuation();
119 int create_header_progress();
121 int create_header_stats();
123 int create_header_end();
125 int create_error_header_records(const char* error_message
);
127 void init_response();
129 void init_success_response();
131 void send_continuation_response();
133 void init_progress_response();
135 void init_end_response();
137 void init_stats_response();
139 void init_error_response(const char* error_message
);
141 void send_success_response();
143 void send_progress_response();
145 void send_stats_response();
147 void send_error_response(const char* error_code
,
148 const char* error_message
,
149 const char* resource_id
);
151 }; //end class aws_response_handler
153 class RGWSelectObj_ObjStore_S3
: public RGWGetObj_ObjStore_S3
157 s3selectEngine::s3select s3select_syntax
;
158 std::string m_s3select_query
;
159 std::string m_s3select_input
;
160 std::string m_s3select_output
;
161 s3selectEngine::csv_object m_s3_csv_object
;
163 s3selectEngine::parquet_object m_s3_parquet_object
;
165 std::string m_column_delimiter
;
167 std::string m_row_delimiter
;
168 std::string m_compression_type
;
169 std::string m_escape_char
;
170 std::unique_ptr
<char[]> m_buff_header
;
171 std::string m_header_info
;
172 std::string m_sql_query
;
173 std::string m_enable_progress
;
174 std::string output_column_delimiter
;
175 std::string output_quot
;
176 std::string output_escape_char
;
177 std::string output_quote_fields
;
178 std::string output_row_delimiter
;
179 aws_response_handler m_aws_response_handler
;
180 bool enable_progress
;
185 s3selectEngine::rgw_s3select_api m_rgw_api
;
187 //a request for range may statisfy by several calls to send_response_date;
188 size_t m_request_range
;
189 std::string requested_buffer
;
190 std::string range_req_str
;
191 std::function
<int(std::string
&)> fp_result_header_format
;
192 std::function
<int(std::string
&)> fp_s3select_result_format
;
196 unsigned int chunk_number
;
198 RGWSelectObj_ObjStore_S3();
199 virtual ~RGWSelectObj_ObjStore_S3();
201 virtual int send_response_data(bufferlist
& bl
, off_t ofs
, off_t len
) override
;
203 virtual int get_params(optional_yield y
) override
;
205 virtual void execute(optional_yield
) override
;
209 int csv_processing(bufferlist
& bl
, off_t ofs
, off_t len
);
211 int parquet_processing(bufferlist
& bl
, off_t ofs
, off_t len
);
213 int run_s3select(const char* query
, const char* input
, size_t input_length
);
215 int run_s3select_on_parquet(const char* query
);
217 int extract_by_tag(std::string input
, std::string tag_name
, std::string
& result
);
219 void convert_escape_seq(std::string
& esc
);
221 int handle_aws_cli_parameters(std::string
& sql_query
);
223 int range_request(int64_t start
, int64_t len
, void*, optional_yield
);
225 size_t get_obj_size();
226 std::function
<int(int64_t, int64_t, void*, optional_yield
*)> fp_range_req
;
227 std::function
<size_t(void)> fp_get_obj_size
;