]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_s3select_private.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_s3select_private.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3 //
4 #pragma once
5
6 #include <errno.h>
7 #include <array>
8 #include <string.h>
9 #include <string_view>
10
11 #include "common/ceph_crypto.h"
12 #include "common/split.h"
13 #include "common/Formatter.h"
14 #include "common/utf8.h"
15 #include "common/ceph_json.h"
16 #include "common/safe_io.h"
17 #include "common/errno.h"
18 #include "auth/Crypto.h"
19 #include <boost/algorithm/string.hpp>
20 #include <boost/algorithm/string/replace.hpp>
21 #include <boost/tokenizer.hpp>
22 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
23 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
24 #pragma clang diagnostic push
25 #pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion"
26 #endif
27 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
28 #pragma clang diagnostic pop
29 #endif
30 #undef BOOST_BIND_GLOBAL_PLACEHOLDERS
31
32 #include <liboath/oath.h>
33
34
35 #include <s3select/include/s3select.h>
36 #include "rgw_rest_s3.h"
37 #include "rgw_s3select.h"
38
39 class aws_response_handler
40 {
41
42 private:
43 std::string sql_result;
44 struct req_state* s;
45 uint32_t header_size;
46 // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
47 boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32;
48 RGWOp* m_rgwop;
49 std::string m_buff_header;
50 uint64_t total_bytes_returned;
51 uint64_t processed_size;
52
53 enum class header_name_En {
54 EVENT_TYPE,
55 CONTENT_TYPE,
56 MESSAGE_TYPE,
57 ERROR_CODE,
58 ERROR_MESSAGE
59 };
60
61 enum class header_value_En {
62 RECORDS,
63 OCTET_STREAM,
64 EVENT,
65 CONT,
66 PROGRESS,
67 END,
68 XML,
69 STATS,
70 ENGINE_ERROR,
71 ERROR_TYPE
72 };
73
74 const char* PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
75 const char* END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
76 const char* header_name_str[5] = {":event-type", ":content-type", ":message-type", ":error-code", ":error-message"};
77 const char* header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error", "error"};
78 static constexpr size_t header_crc_size = 12;
79
80 void push_header(const char* header_name, const char* header_value);
81
82 int create_message(u_int32_t header_len);
83
84 public:
85 aws_response_handler(struct req_state* ps, RGWOp* rgwop) : s(ps), m_rgwop(rgwop), total_bytes_returned{0}, processed_size{0}
86 {}
87
88 aws_response_handler() : s(nullptr), m_rgwop(nullptr), total_bytes_returned{0}, processed_size{0}
89 {}
90
91 bool is_set()
92 {
93 if(s==nullptr || m_rgwop == nullptr){
94 return false;
95 }
96 return true;
97 }
98
99 void set(struct req_state* ps, RGWOp* rgwop)
100 {
101 s = ps;
102 m_rgwop = rgwop;
103 }
104
105 std::string& get_sql_result();
106
107 uint64_t get_processed_size();
108
109 void update_processed_size(uint64_t value);
110
111 uint64_t get_total_bytes_returned();
112
113 void update_total_bytes_returned(uint64_t value);
114
115 int create_header_records();
116
117 int create_header_continuation();
118
119 int create_header_progress();
120
121 int create_header_stats();
122
123 int create_header_end();
124
125 int create_error_header_records(const char* error_message);
126
127 void init_response();
128
129 void init_success_response();
130
131 void send_continuation_response();
132
133 void init_progress_response();
134
135 void init_end_response();
136
137 void init_stats_response();
138
139 void init_error_response(const char* error_message);
140
141 void send_success_response();
142
143 void send_progress_response();
144
145 void send_stats_response();
146
147 void send_error_response(const char* error_code,
148 const char* error_message,
149 const char* resource_id);
150
151 }; //end class aws_response_handler
152
153 class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
154 {
155
156 private:
157 s3selectEngine::s3select s3select_syntax;
158 std::string m_s3select_query;
159 std::string m_s3select_input;
160 std::string m_s3select_output;
161 s3selectEngine::csv_object m_s3_csv_object;
162 #ifdef _ARROW_EXIST
163 s3selectEngine::parquet_object m_s3_parquet_object;
164 #endif
165 std::string m_column_delimiter;
166 std::string m_quot;
167 std::string m_row_delimiter;
168 std::string m_compression_type;
169 std::string m_escape_char;
170 std::unique_ptr<char[]> m_buff_header;
171 std::string m_header_info;
172 std::string m_sql_query;
173 std::string m_enable_progress;
174 std::string output_column_delimiter;
175 std::string output_quot;
176 std::string output_escape_char;
177 std::string output_quote_fields;
178 std::string output_row_delimiter;
179 aws_response_handler m_aws_response_handler;
180 bool enable_progress;
181
182 //parquet request
183 bool m_parquet_type;
184 #ifdef _ARROW_EXIST
185 s3selectEngine::rgw_s3select_api m_rgw_api;
186 #endif
187 //a request for range may statisfy by several calls to send_response_date;
188 size_t m_request_range;
189 std::string requested_buffer;
190 std::string range_req_str;
191 std::function<int(std::string&)> fp_result_header_format;
192 std::function<int(std::string&)> fp_s3select_result_format;
193 int m_header_size;
194
195 public:
196 unsigned int chunk_number;
197
198 RGWSelectObj_ObjStore_S3();
199 virtual ~RGWSelectObj_ObjStore_S3();
200
201 virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override;
202
203 virtual int get_params(optional_yield y) override;
204
205 virtual void execute(optional_yield) override;
206
207 private:
208
209 int csv_processing(bufferlist& bl, off_t ofs, off_t len);
210
211 int parquet_processing(bufferlist& bl, off_t ofs, off_t len);
212
213 int run_s3select(const char* query, const char* input, size_t input_length);
214
215 int run_s3select_on_parquet(const char* query);
216
217 int extract_by_tag(std::string input, std::string tag_name, std::string& result);
218
219 void convert_escape_seq(std::string& esc);
220
221 int handle_aws_cli_parameters(std::string& sql_query);
222
223 int range_request(int64_t start, int64_t len, void*, optional_yield);
224
225 size_t get_obj_size();
226 std::function<int(int64_t, int64_t, void*, optional_yield*)> fp_range_req;
227 std::function<size_t(void)> fp_get_obj_size;
228
229 };
230