]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | #include "s3select.h" |
2 | #include <fstream> | |
3 | #include <sys/types.h> | |
4 | #include <sys/stat.h> | |
5 | #include <unistd.h> | |
20effc67 TL |
6 | #include <boost/crc.hpp> |
7 | #include <arpa/inet.h> | |
8 | #include <boost/filesystem.hpp> | |
9 | #include <boost/tokenizer.hpp> | |
f67539c2 TL |
10 | |
11 | using namespace s3selectEngine; | |
12 | using namespace BOOST_SPIRIT_CLASSIC_NS; | |
13 | ||
20effc67 TL |
14 | class awsCli_handler { |
15 | ||
16 | ||
17 | //TODO get parameter | |
18 | private: | |
19 | std::unique_ptr<s3selectEngine::s3select> s3select_syntax; | |
20 | std::string m_s3select_query; | |
21 | std::string m_result; | |
22 | std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object; | |
23 | std::string m_column_delimiter;//TODO remove | |
24 | std::string m_quot;//TODO remove | |
25 | std::string m_row_delimiter;//TODO remove | |
26 | std::string m_compression_type;//TODO remove | |
27 | std::string m_escape_char;//TODO remove | |
28 | std::unique_ptr<char[]> m_buff_header; | |
29 | std::string m_header_info; | |
30 | std::string m_sql_query; | |
31 | uint64_t m_total_object_processing_size; | |
32 | ||
33 | public: | |
34 | ||
35 | awsCli_handler(): | |
36 | s3select_syntax(std::make_unique<s3selectEngine::s3select>()), | |
37 | m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()), | |
38 | m_buff_header(std::make_unique<char[]>(1000)), | |
39 | m_total_object_processing_size(0), | |
40 | crc32(std::unique_ptr<boost::crc_32_type>()) | |
41 | { | |
42 | } | |
f67539c2 | 43 | |
20effc67 TL |
44 | enum header_name_En |
45 | { | |
46 | EVENT_TYPE, | |
47 | CONTENT_TYPE, | |
48 | MESSAGE_TYPE | |
49 | }; | |
50 | static const char* header_name_str[3]; | |
51 | ||
52 | enum header_value_En | |
53 | { | |
54 | RECORDS, | |
55 | OCTET_STREAM, | |
56 | EVENT, | |
57 | CONT | |
58 | }; | |
59 | static const char* header_value_str[4]; | |
f67539c2 | 60 | |
20effc67 | 61 | private: |
f67539c2 | 62 | |
20effc67 TL |
63 | void encode_short(char *buff, uint16_t s, int &i) |
64 | { | |
65 | short x = htons(s); | |
66 | memcpy(buff, &x, sizeof(s)); | |
67 | i += sizeof(s); | |
68 | } | |
69 | ||
70 | void encode_int(char *buff, u_int32_t s, int &i) | |
71 | { | |
72 | u_int32_t x = htonl(s); | |
73 | memcpy(buff, &x, sizeof(s)); | |
74 | i += sizeof(s); | |
75 | } | |
76 | ||
77 | int create_header_records(char* buff) | |
f67539c2 | 78 | { |
20effc67 TL |
79 | int i = 0; |
80 | ||
81 | //1 | |
82 | buff[i++] = char(strlen(header_name_str[EVENT_TYPE])); | |
83 | memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE])); | |
84 | i += strlen(header_name_str[EVENT_TYPE]); | |
85 | buff[i++] = char(7); | |
86 | encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i); | |
87 | memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS])); | |
88 | i += strlen(header_value_str[RECORDS]); | |
89 | ||
90 | //2 | |
91 | buff[i++] = char(strlen(header_name_str[CONTENT_TYPE])); | |
92 | memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE])); | |
93 | i += strlen(header_name_str[CONTENT_TYPE]); | |
94 | buff[i++] = char(7); | |
95 | encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i); | |
96 | memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM])); | |
97 | i += strlen(header_value_str[OCTET_STREAM]); | |
98 | ||
99 | //3 | |
100 | buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE])); | |
101 | memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE])); | |
102 | i += strlen(header_name_str[MESSAGE_TYPE]); | |
103 | buff[i++] = char(7); | |
104 | encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i); | |
105 | memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT])); | |
106 | i += strlen(header_value_str[EVENT]); | |
107 | ||
108 | return i; | |
109 | } | |
110 | ||
111 | std::unique_ptr<boost::crc_32_type> crc32; | |
112 | ||
113 | int create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len) | |
114 | { | |
115 | u_int32_t total_byte_len = 0; | |
116 | u_int32_t preload_crc = 0; | |
117 | u_int32_t message_crc = 0; | |
118 | int i = 0; | |
119 | char *buff = out_string.data(); | |
120 | ||
121 | if (crc32 == 0) | |
122 | { | |
123 | // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum | |
124 | crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>); | |
125 | } | |
126 | ||
127 | total_byte_len = result_len + 16; | |
128 | ||
129 | encode_int(&buff[i], total_byte_len, i); | |
130 | encode_int(&buff[i], header_len, i); | |
131 | ||
132 | crc32->reset(); | |
133 | *crc32 = std::for_each(buff, buff + 8, *crc32); | |
134 | preload_crc = (*crc32)(); | |
135 | encode_int(&buff[i], preload_crc, i); | |
136 | ||
137 | i += result_len; | |
138 | ||
139 | crc32->reset(); | |
140 | *crc32 = std::for_each(buff, buff + i, *crc32); | |
141 | message_crc = (*crc32)(); | |
142 | ||
143 | int out_encode; | |
144 | encode_int(reinterpret_cast<char*>(&out_encode), message_crc, i); | |
145 | out_string.append(reinterpret_cast<char*>(&out_encode),sizeof(out_encode)); | |
146 | ||
147 | return i; | |
f67539c2 TL |
148 | } |
149 | ||
20effc67 TL |
150 | #define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n" |
151 | #define END_PAYLOAD_LINE "\n</Payload></Records></Payload>" | |
152 | ||
153 | public: | |
154 | ||
155 | //std::string get_error_description(){} | |
156 | ||
157 | std::string get_result() | |
158 | { | |
159 | return m_result; | |
160 | } | |
161 | ||
162 | int run_s3select(const char *query, const char *input, size_t input_length, size_t object_size) | |
163 | { | |
164 | int status = 0; | |
165 | csv_object::csv_defintions csv; | |
166 | ||
167 | m_result = "012345678901"; //12 positions for header-crc | |
168 | ||
169 | int header_size = 0; | |
170 | ||
171 | if (m_s3_csv_object == 0) | |
172 | { | |
173 | s3select_syntax->parse_query(query); | |
174 | ||
175 | if (m_row_delimiter.size()) | |
176 | { | |
177 | csv.row_delimiter = *m_row_delimiter.c_str(); | |
178 | } | |
179 | ||
180 | if (m_column_delimiter.size()) | |
181 | { | |
182 | csv.column_delimiter = *m_column_delimiter.c_str(); | |
183 | } | |
184 | ||
185 | if (m_quot.size()) | |
186 | { | |
187 | csv.quot_char = *m_quot.c_str(); | |
188 | } | |
189 | ||
190 | if (m_escape_char.size()) | |
191 | { | |
192 | csv.escape_char = *m_escape_char.c_str(); | |
193 | } | |
194 | ||
195 | if (m_header_info.compare("IGNORE") == 0) | |
196 | { | |
197 | csv.ignore_header_info = true; | |
198 | } | |
199 | else if (m_header_info.compare("USE") == 0) | |
200 | { | |
201 | csv.use_header_info = true; | |
202 | } | |
203 | ||
204 | m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv)); | |
205 | } | |
206 | ||
207 | if (s3select_syntax->get_error_description().empty() == false) | |
208 | { | |
209 | header_size = create_header_records(m_buff_header.get()); | |
210 | m_result.append(m_buff_header.get(), header_size); | |
211 | m_result.append(PAYLOAD_LINE); | |
212 | m_result.append(s3select_syntax->get_error_description()); | |
213 | //ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl; | |
214 | status = -1; | |
215 | } | |
216 | else | |
217 | { | |
218 | header_size = create_header_records(m_buff_header.get()); | |
219 | m_result.append(m_buff_header.get(), header_size); | |
220 | m_result.append(PAYLOAD_LINE); | |
221 | //status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size); | |
222 | status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, object_size); | |
223 | if (status < 0) | |
224 | { | |
225 | m_result.append(m_s3_csv_object->get_error_description()); | |
226 | } | |
227 | } | |
228 | ||
229 | if (m_result.size() > strlen(PAYLOAD_LINE)) | |
230 | { | |
231 | m_result.append(END_PAYLOAD_LINE); | |
232 | create_message(m_result, m_result.size() - 12, header_size); | |
233 | //s->formatter->write_bin_data(m_result.data(), buff_len); | |
234 | //if (op_ret < 0) | |
235 | //{ | |
236 | // return op_ret; | |
237 | //} | |
238 | } | |
239 | //rgw_flush_formatter_and_reset(s, s->formatter); | |
240 | ||
241 | return status; | |
242 | } | |
243 | //int extract_by_tag(std::string tag_name, std::string& result); | |
244 | ||
245 | //void convert_escape_seq(std::string& esc); | |
246 | ||
247 | //int handle_aws_cli_parameters(std::string& sql_query); | |
248 | ||
249 | }; | |
250 | ||
251 | const char* awsCli_handler::header_name_str[3] = {":event-type", ":content-type", ":message-type"}; | |
252 | const char* awsCli_handler::header_value_str[4] = {"Records", "application/octet-stream", "event","cont"}; | |
253 | int run_on_localFile(char* input_query); | |
254 | ||
255 | bool is_parquet_file(const char * fn) | |
256 | {//diffrentiate between csv and parquet | |
257 | const char * ext = "parquet"; | |
258 | ||
259 | if(strstr(fn+strlen(fn)-strlen(ext), ext )) | |
260 | { | |
261 | return true; | |
262 | } | |
263 | ||
264 | return false; | |
f67539c2 TL |
265 | } |
266 | ||
20effc67 TL |
267 | #ifdef _ARROW_EXIST |
268 | int run_query_on_parquet_file(const char* input_query, const char* input_file) | |
f67539c2 | 269 | { |
20effc67 | 270 | int status; |
f67539c2 TL |
271 | s3select s3select_syntax; |
272 | ||
20effc67 TL |
273 | status = s3select_syntax.parse_query(input_query); |
274 | if (status != 0) | |
275 | { | |
276 | std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; | |
277 | return -1; | |
278 | } | |
279 | ||
280 | FILE *fp; | |
281 | ||
282 | fp=fopen(input_file,"r"); | |
283 | ||
284 | if(!fp){ | |
285 | std::cout << "can not open " << input_file << std::endl; | |
286 | return -1; | |
287 | } | |
288 | ||
289 | std::function<int(void)> fp_get_size=[&]() | |
290 | { | |
291 | struct stat l_buf; | |
292 | lstat(input_file,&l_buf); | |
293 | return l_buf.st_size; | |
294 | }; | |
f67539c2 | 295 | |
20effc67 | 296 | std::function<size_t(int64_t,int64_t,void*,optional_yield*)> fp_range_req=[&](int64_t start,int64_t length,void *buff,optional_yield*y) |
f67539c2 | 297 | { |
20effc67 | 298 | fseek(fp,start,SEEK_SET); |
1e59de90 TL |
299 | size_t read_sz = fread(buff, 1, length, fp); |
300 | return read_sz; | |
20effc67 | 301 | }; |
f67539c2 | 302 | |
20effc67 TL |
303 | rgw_s3select_api rgw; |
304 | rgw.set_get_size_api(fp_get_size); | |
305 | rgw.set_range_req_api(fp_range_req); | |
306 | ||
307 | std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){std::cout << result;result.clear();return 0;}; | |
308 | std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){result="";return 0;}; | |
1e59de90 TL |
309 | std::function<void(const char*)> fp_debug = [](const char* msg) |
310 | { | |
311 | std::cout << "DEBUG: {" << msg << "}" << std::endl; | |
312 | }; | |
20effc67 TL |
313 | |
314 | parquet_object parquet_processor(input_file,&s3select_syntax,&rgw); | |
1e59de90 | 315 | //parquet_processor.set_external_debug_system(fp_debug); |
20effc67 TL |
316 | |
317 | std::string result; | |
318 | ||
319 | do | |
320 | { | |
321 | try | |
f67539c2 | 322 | { |
20effc67 TL |
323 | status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format); |
324 | } | |
325 | catch (base_s3select_exception &e) | |
326 | { | |
327 | std::cout << e.what() << std::endl; | |
328 | //m_error_description = e.what(); | |
329 | //m_error_count++; | |
330 | if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution | |
331 | { | |
332 | return -1; | |
333 | } | |
f67539c2 | 334 | } |
f67539c2 | 335 | |
20effc67 TL |
336 | if(status<0) |
337 | { | |
338 | std::cout << parquet_processor.get_error_description() << std::endl; | |
339 | break; | |
340 | } | |
341 | ||
342 | std::cout << result << std::endl; | |
343 | ||
1e59de90 TL |
344 | if(status == 2) // limit reached |
345 | { | |
346 | break; | |
347 | } | |
348 | ||
20effc67 TL |
349 | } while (0); |
350 | ||
351 | return 0; | |
352 | } | |
353 | #else | |
354 | int run_query_on_parquet_file(const char* input_query, const char* input_file) | |
355 | { | |
356 | std::cout << "arrow is not installed" << std::endl; | |
357 | return 0; | |
358 | } | |
359 | #endif //_ARROW_EXIST | |
360 | ||
1e59de90 TL |
361 | #define BUFFER_SIZE (4*1024*1024) |
362 | int process_json_query(const char* input_query,const char* fname) | |
363 | {//purpose: process json query | |
364 | ||
365 | s3select s3select_syntax; | |
366 | int status = s3select_syntax.parse_query(input_query); | |
367 | if (status != 0) | |
368 | { | |
369 | std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; | |
370 | return -1; | |
371 | } | |
372 | ||
373 | std::ifstream input_file_stream; | |
374 | try { | |
375 | input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary); | |
376 | } | |
377 | catch( ... ) | |
378 | { | |
379 | std::cout << "failed to open file " << fname << std::endl; | |
380 | exit(-1); | |
381 | } | |
382 | ||
383 | auto object_sz = boost::filesystem::file_size(fname); | |
384 | json_object json_query_processor(&s3select_syntax); | |
385 | std::string buff(BUFFER_SIZE,0); | |
386 | std::string result; | |
387 | ||
388 | size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); | |
389 | ||
390 | while(read_sz) | |
391 | { | |
392 | std::cout << "read next chunk " << read_sz << std::endl; | |
393 | result.clear(); | |
394 | ||
395 | try{ | |
396 | status = json_query_processor.run_s3select_on_stream(result, buff.data(), read_sz, object_sz); | |
397 | } catch (base_s3select_exception &e) | |
398 | { | |
399 | std::cout << e.what() << std::endl; | |
400 | if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution | |
401 | { | |
402 | return -1; | |
403 | } | |
404 | } | |
405 | ||
406 | std::cout << result << std::endl; | |
407 | ||
408 | if(status<0) | |
409 | { | |
410 | std::cout << "failure upon processing " << std::endl; | |
411 | return -1; | |
412 | } | |
413 | if(json_query_processor.is_sql_limit_reached()) | |
414 | { | |
415 | std::cout << "json processing reached limit " << std::endl; | |
416 | break; | |
417 | } | |
418 | read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); | |
419 | } | |
420 | try{ | |
421 | result.clear(); | |
422 | json_query_processor.run_s3select_on_stream(result, 0, 0, object_sz); | |
423 | } catch (base_s3select_exception &e) | |
424 | { | |
425 | std::cout << e.what() << std::endl; | |
426 | if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution | |
427 | { | |
428 | return -1; | |
429 | } | |
430 | } | |
431 | ||
432 | std::cout << result << std::endl; | |
433 | ||
434 | return 0; | |
435 | } | |
20effc67 | 436 | |
1e59de90 TL |
437 | int run_on_localFile(char* input_query) |
438 | { | |
20effc67 TL |
439 | //purpose: demostrate the s3select functionalities |
440 | s3select s3select_syntax; | |
f67539c2 TL |
441 | |
442 | if (!input_query) | |
443 | { | |
444 | std::cout << "type -q 'select ... from ... '" << std::endl; | |
445 | return -1; | |
446 | } | |
447 | ||
f67539c2 TL |
448 | int status = s3select_syntax.parse_query(input_query); |
449 | if (status != 0) | |
450 | { | |
451 | std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; | |
452 | return -1; | |
453 | } | |
454 | ||
20effc67 TL |
455 | std::string object_name = s3select_syntax.get_from_clause(); |
456 | ||
457 | if (is_parquet_file(object_name.c_str())) | |
458 | { | |
459 | try { | |
460 | return run_query_on_parquet_file(input_query, object_name.c_str()); | |
461 | } | |
462 | catch (base_s3select_exception &e) | |
463 | { | |
464 | std::cout << e.what() << std::endl; | |
465 | if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution | |
466 | { | |
467 | return -1; | |
468 | } | |
469 | } | |
470 | } | |
f67539c2 | 471 | |
1e59de90 | 472 | FILE* fp = nullptr; |
f67539c2 TL |
473 | |
474 | if (object_name.compare("stdin")==0) | |
475 | { | |
476 | fp = stdin; | |
477 | } | |
478 | else | |
479 | { | |
480 | fp = fopen(object_name.c_str(), "r"); | |
481 | } | |
482 | ||
f67539c2 TL |
483 | if(!fp) |
484 | { | |
485 | std::cout << " input stream is not valid, abort;" << std::endl; | |
486 | return -1; | |
487 | } | |
488 | ||
489 | struct stat statbuf; | |
f67539c2 TL |
490 | lstat(object_name.c_str(), &statbuf); |
491 | ||
492 | std::string s3select_result; | |
493 | s3selectEngine::csv_object::csv_defintions csv; | |
494 | csv.use_header_info = false; | |
20effc67 TL |
495 | csv.quote_fields_always=false; |
496 | ||
1e59de90 TL |
497 | #define CSV_QUOT "CSV_ALWAYS_QUOT" |
498 | #define CSV_COL_DELIM "CSV_COLUMN_DELIMETER" | |
499 | #define CSV_ROW_DELIM "CSV_ROW_DELIMITER" | |
500 | #define CSV_HEADER_INFO "CSV_HEADER_INFO" | |
501 | ||
502 | if(getenv(CSV_QUOT)) | |
20effc67 TL |
503 | { |
504 | csv.quote_fields_always=true; | |
505 | } | |
1e59de90 TL |
506 | if(getenv(CSV_COL_DELIM)) |
507 | { | |
508 | csv.column_delimiter=*getenv(CSV_COL_DELIM); | |
509 | } | |
510 | if(getenv(CSV_ROW_DELIM)) | |
511 | { | |
512 | csv.row_delimiter=*getenv(CSV_ROW_DELIM); | |
513 | } | |
514 | if(getenv(CSV_HEADER_INFO)) | |
515 | { | |
516 | csv.use_header_info = true; | |
517 | } | |
20effc67 | 518 | |
f67539c2 | 519 | s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); |
f67539c2 | 520 | |
1e59de90 TL |
521 | std::function<void(const char*)> fp_debug = [](const char* msg) |
522 | { | |
523 | std::cout << "DEBUG" << msg << std::endl; | |
524 | }; | |
525 | ||
526 | //s3_csv_object.set_external_debug_system(fp_debug); | |
527 | ||
528 | #define BUFF_SIZE (1024*1024*4) //simulate 4mb parts in s3 object | |
f67539c2 TL |
529 | char* buff = (char*)malloc( BUFF_SIZE ); |
530 | while(1) | |
531 | { | |
1e59de90 | 532 | buff[0]=0; |
f67539c2 TL |
533 | size_t input_sz = fread(buff, 1, BUFF_SIZE, fp); |
534 | char* in=buff; | |
f67539c2 | 535 | |
1e59de90 | 536 | if (!input_sz) |
20effc67 | 537 | { |
1e59de90 TL |
538 | if(fp == stdin) |
539 | { | |
540 | status = s3_csv_object.run_s3select_on_stream(s3select_result, nullptr, 0, 0); | |
541 | if(s3select_result.size()>0) | |
542 | { | |
543 | std::cout << s3select_result; | |
544 | } | |
545 | } | |
546 | break; | |
20effc67 | 547 | } |
f67539c2 | 548 | |
1e59de90 | 549 | if(fp != stdin) |
20effc67 | 550 | { |
1e59de90 | 551 | status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size); |
20effc67 TL |
552 | } |
553 | else | |
554 | { | |
1e59de90 | 555 | status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, INT_MAX); |
20effc67 | 556 | } |
f67539c2 | 557 | |
f67539c2 TL |
558 | if(status<0) |
559 | { | |
20effc67 | 560 | std::cout << "failure on execution " << std::endl << s3_csv_object.get_error_description() << std::endl; |
f67539c2 TL |
561 | break; |
562 | } | |
563 | ||
1e59de90 | 564 | if(s3select_result.size()>0) |
f67539c2 TL |
565 | { |
566 | std::cout << s3select_result; | |
567 | } | |
568 | ||
1e59de90 | 569 | if(!input_sz || feof(fp) || status == 2) |
f67539c2 TL |
570 | { |
571 | break; | |
572 | } | |
573 | ||
1e59de90 TL |
574 | s3select_result.clear(); |
575 | }//end-while | |
f67539c2 | 576 | |
1e59de90 TL |
577 | free(buff); |
578 | fclose(fp); | |
f67539c2 | 579 | |
1e59de90 | 580 | return 0; |
20effc67 TL |
581 | } |
582 | ||
583 | int run_on_single_query(const char* fname, const char* query) | |
584 | { | |
585 | ||
586 | std::unique_ptr<awsCli_handler> awscli = std::make_unique<awsCli_handler>() ; | |
587 | std::ifstream input_file_stream; | |
588 | try { | |
589 | input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary); | |
590 | } | |
591 | catch( ... ) | |
592 | { | |
593 | std::cout << "failed to open file " << fname << std::endl; | |
594 | exit(-1); | |
595 | } | |
596 | ||
597 | ||
598 | if (is_parquet_file(fname)) | |
599 | { | |
600 | std::string result; | |
601 | int status = run_query_on_parquet_file(query, fname); | |
602 | return status; | |
603 | } | |
604 | ||
1e59de90 TL |
605 | s3select query_ast; |
606 | auto status = query_ast.parse_query(query); | |
607 | if(status<0) | |
608 | { | |
609 | std::cout << "failed to parse query : " << query_ast.get_error_description() << std::endl; | |
610 | return -1; | |
611 | } | |
612 | ||
613 | if(query_ast.is_json_query()) | |
614 | { | |
615 | return process_json_query(query,fname); | |
616 | } | |
617 | ||
618 | ||
20effc67 TL |
619 | auto file_sz = boost::filesystem::file_size(fname); |
620 | ||
20effc67 TL |
621 | std::string buff(BUFFER_SIZE,0); |
622 | while (1) | |
623 | { | |
624 | size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); | |
625 | ||
626 | status = awscli->run_s3select(query, buff.data(), read_sz, file_sz); | |
627 | if(status<0) | |
628 | { | |
629 | std::cout << "failure on execution " << std::endl; | |
630 | break; | |
631 | } | |
632 | else | |
633 | { | |
634 | std::cout << awscli->get_result() << std::endl; | |
635 | } | |
636 | ||
637 | if(!read_sz || input_file_stream.eof()) | |
638 | { | |
639 | break; | |
640 | } | |
641 | } | |
642 | ||
643 | return status; | |
644 | } | |
645 | ||
646 | int main(int argc,char **argv) | |
647 | { | |
20effc67 TL |
648 | char *query=0; |
649 | char *fname=0; | |
650 | char *query_file=0;//file contains many queries | |
651 | ||
652 | for (int i = 0; i < argc; i++) | |
653 | { | |
20effc67 | 654 | if (!strcmp(argv[i], "-key")) |
1e59de90 | 655 | {//object recieved as CLI parameter |
20effc67 TL |
656 | fname = argv[i + 1]; |
657 | continue; | |
658 | } | |
659 | ||
660 | if (!strcmp(argv[i], "-q")) | |
661 | { | |
662 | query = argv[i + 1]; | |
663 | continue; | |
664 | } | |
665 | ||
666 | if (!strcmp(argv[i], "-cmds")) | |
1e59de90 | 667 | {//query file contain many queries |
20effc67 TL |
668 | query_file = argv[i + 1]; |
669 | continue; | |
670 | } | |
1e59de90 TL |
671 | |
672 | if (!strcmp(argv[i], "-h") || !strcmp(argv[i], "-help")) | |
673 | { | |
674 | std::cout << "CSV_ALWAYS_QUOT= CSV_COLUMN_DELIMETER= CSV_ROW_DELIMITER= CSV_HEADER_INFO= s3select_example -q \"... query ...\" -key object-path -cmds queries-file" << std::endl; | |
675 | exit(0); | |
676 | } | |
20effc67 TL |
677 | } |
678 | ||
679 | if(fname == 0) | |
680 | {//object is in query explicitly. | |
681 | return run_on_localFile(query); | |
682 | } | |
f67539c2 | 683 | |
20effc67 TL |
684 | if(query_file) |
685 | { | |
1e59de90 | 686 | //purpose: run many queries (reside in file) on single file. |
20effc67 | 687 | std::fstream f(query_file, std::ios::in | std::ios::binary); |
20effc67 | 688 | const auto sz = boost::filesystem::file_size(query_file); |
20effc67 | 689 | std::string result(sz, '\0'); |
20effc67 | 690 | f.read(result.data(), sz); |
20effc67 TL |
691 | boost::char_separator<char> sep("\n"); |
692 | boost::tokenizer<boost::char_separator<char>> tokens(result, sep); | |
1e59de90 | 693 | |
20effc67 TL |
694 | for (const auto& t : tokens) { |
695 | std::cout << t << std::endl; | |
20effc67 | 696 | int status = run_on_single_query(fname,t.c_str()); |
20effc67 TL |
697 | std::cout << "status: " << status << std::endl; |
698 | } | |
699 | ||
700 | return(0); | |
701 | } | |
702 | ||
20effc67 | 703 | int status = run_on_single_query(fname,query); |
20effc67 | 704 | return status; |
f67539c2 | 705 | } |
20effc67 | 706 |