]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_s3select_private.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_s3select_private.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3 //
4 #pragma once
5
6 #include <errno.h>
7 #include <array>
8 #include <string.h>
9 #include <string_view>
10
11 #include "common/ceph_crypto.h"
12 #include "common/split.h"
13 #include "common/Formatter.h"
14 #include "common/utf8.h"
15 #include "common/ceph_json.h"
16 #include "common/safe_io.h"
17 #include "common/errno.h"
18 #include "auth/Crypto.h"
19 #include <boost/algorithm/string.hpp>
20 #include <boost/algorithm/string/replace.hpp>
21 #include <boost/tokenizer.hpp>
22 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
23 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
24 #pragma clang diagnostic push
25 #pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion"
26 #endif
27 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
28 #pragma clang diagnostic pop
29 #endif
30 #undef BOOST_BIND_GLOBAL_PLACEHOLDERS
31
32 #include <liboath/oath.h>
33
34
35 #pragma GCC diagnostic push
36 #pragma clang diagnostic push
37 #pragma GCC diagnostic ignored "-Wdeprecated"
38 #pragma clang diagnostic ignored "-Wdeprecated"
39 #include <s3select/include/s3select.h>
40 #pragma GCC diagnostic pop
41 #pragma clang diagnostic pop
42
43 #include "rgw_rest_s3.h"
44 #include "rgw_s3select.h"
45
46 class aws_response_handler
47 {
48
49 private:
50 std::string sql_result;
51 req_state* s;
52 uint32_t header_size;
53 // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
54 boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32;
55 RGWOp* m_rgwop;
56 std::string m_buff_header;
57 uint64_t total_bytes_returned;
58 uint64_t processed_size;
59
60 enum class header_name_En {
61 EVENT_TYPE,
62 CONTENT_TYPE,
63 MESSAGE_TYPE,
64 ERROR_CODE,
65 ERROR_MESSAGE
66 };
67
68 enum class header_value_En {
69 RECORDS,
70 OCTET_STREAM,
71 EVENT,
72 CONT,
73 PROGRESS,
74 END,
75 XML,
76 STATS,
77 ENGINE_ERROR,
78 ERROR_TYPE
79 };
80
81 const char* PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
82 const char* END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
83 const char* header_name_str[5] = {":event-type", ":content-type", ":message-type", ":error-code", ":error-message"};
84 const char* header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error", "error"};
85 static constexpr size_t header_crc_size = 12;
86
87 void push_header(const char* header_name, const char* header_value);
88
89 int create_message(u_int32_t header_len);
90
91 public:
92 aws_response_handler(req_state* ps, RGWOp* rgwop) : s(ps), m_rgwop(rgwop), total_bytes_returned{0}, processed_size{0}
93 {}
94
95 aws_response_handler() : s(nullptr), m_rgwop(nullptr), total_bytes_returned{0}, processed_size{0}
96 {}
97
98 bool is_set()
99 {
100 if(s==nullptr || m_rgwop == nullptr){
101 return false;
102 }
103 return true;
104 }
105
106 void set(req_state* ps, RGWOp* rgwop)
107 {
108 s = ps;
109 m_rgwop = rgwop;
110 }
111
112 std::string& get_sql_result();
113
114 uint64_t get_processed_size();
115
116 void update_processed_size(uint64_t value);
117
118 uint64_t get_total_bytes_returned();
119
120 void update_total_bytes_returned(uint64_t value);
121
122 int create_header_records();
123
124 int create_header_continuation();
125
126 int create_header_progress();
127
128 int create_header_stats();
129
130 int create_header_end();
131
132 int create_error_header_records(const char* error_message);
133
134 void init_response();
135
136 void init_success_response();
137
138 void send_continuation_response();
139
140 void init_progress_response();
141
142 void init_end_response();
143
144 void init_stats_response();
145
146 void init_error_response(const char* error_message);
147
148 void send_success_response();
149
150 void send_progress_response();
151
152 void send_stats_response();
153
154 void send_error_response(const char* error_code,
155 const char* error_message,
156 const char* resource_id);
157
158 }; //end class aws_response_handler
159
160 class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
161 {
162
163 private:
164 s3selectEngine::s3select s3select_syntax;
165 std::string m_s3select_query;
166 std::string m_s3select_input;
167 std::string m_s3select_output;
168 s3selectEngine::csv_object m_s3_csv_object;
169 #ifdef _ARROW_EXIST
170 s3selectEngine::parquet_object m_s3_parquet_object;
171 #endif
172 s3selectEngine::json_object m_s3_json_object;
173 std::string m_column_delimiter;
174 std::string m_quot;
175 std::string m_row_delimiter;
176 std::string m_compression_type;
177 std::string m_escape_char;
178 std::unique_ptr<char[]> m_buff_header;
179 std::string m_header_info;
180 std::string m_sql_query;
181 std::string m_enable_progress;
182 std::string output_column_delimiter;
183 std::string output_quot;
184 std::string output_escape_char;
185 std::string output_quote_fields;
186 std::string output_row_delimiter;
187 std::string m_start_scan;
188 std::string m_end_scan;
189 bool m_scan_range_ind;
190 int64_t m_start_scan_sz;
191 int64_t m_end_scan_sz;
192 int64_t m_object_size_for_processing;
193 aws_response_handler m_aws_response_handler;
194 bool enable_progress;
195
196 //parquet request
197 bool m_parquet_type;
198 //json request
199 std::string m_json_datatype;
200 bool m_json_type;
201 #ifdef _ARROW_EXIST
202 s3selectEngine::rgw_s3select_api m_rgw_api;
203 #endif
204 //a request for range may statisfy by several calls to send_response_date;
205 size_t m_request_range;
206 std::string requested_buffer;
207 std::string range_req_str;
208 std::function<int(std::string&)> fp_result_header_format;
209 std::function<int(std::string&)> fp_s3select_result_format;
210 std::function<void(const char*)> fp_debug_mesg;
211 std::function<void(void)> fp_chunked_transfer_encoding;
212 int m_header_size;
213
214 public:
215 unsigned int chunk_number;
216 size_t m_requested_range;
217 size_t m_scan_offset;
218 bool m_skip_next_chunk;
219 bool m_is_trino_request;
220
221 RGWSelectObj_ObjStore_S3();
222 virtual ~RGWSelectObj_ObjStore_S3();
223
224 virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override;
225
226 virtual int get_params(optional_yield y) override;
227
228 virtual void execute(optional_yield) override;
229
230 private:
231
232 int csv_processing(bufferlist& bl, off_t ofs, off_t len);
233
234 int parquet_processing(bufferlist& bl, off_t ofs, off_t len);
235
236 int json_processing(bufferlist& bl, off_t ofs, off_t len);
237
238 int run_s3select_on_csv(const char* query, const char* input, size_t input_length);
239
240 int run_s3select_on_parquet(const char* query);
241
242 int run_s3select_on_json(const char* query, const char* input, size_t input_length);
243
244 int extract_by_tag(std::string input, std::string tag_name, std::string& result);
245
246 void convert_escape_seq(std::string& esc);
247
248 int handle_aws_cli_parameters(std::string& sql_query);
249
250 int range_request(int64_t start, int64_t len, void*, optional_yield);
251
252 size_t get_obj_size();
253 std::function<int(int64_t, int64_t, void*, optional_yield*)> fp_range_req;
254 std::function<size_t(void)> fp_get_obj_size;
255
256 void shape_chunk_per_trino_requests(const char*, off_t& ofs, off_t& len);
257 };
258