]> git.proxmox.com Git - ceph.git/blame - ceph/src/s3select/example/s3select_example.cpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / s3select / example / s3select_example.cpp
CommitLineData
f67539c2
TL
1#include "s3select.h"
2#include <fstream>
3#include <sys/types.h>
4#include <sys/stat.h>
5#include <unistd.h>
20effc67
TL
6#include <boost/crc.hpp>
7#include <arpa/inet.h>
8#include <boost/filesystem.hpp>
9#include <boost/tokenizer.hpp>
f67539c2
TL
10
11using namespace s3selectEngine;
12using namespace BOOST_SPIRIT_CLASSIC_NS;
13
20effc67
TL
14class awsCli_handler {
15
16
17//TODO get parameter
18private:
19 std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
20 std::string m_s3select_query;
21 std::string m_result;
22 std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
23 std::string m_column_delimiter;//TODO remove
24 std::string m_quot;//TODO remove
25 std::string m_row_delimiter;//TODO remove
26 std::string m_compression_type;//TODO remove
27 std::string m_escape_char;//TODO remove
28 std::unique_ptr<char[]> m_buff_header;
29 std::string m_header_info;
30 std::string m_sql_query;
31 uint64_t m_total_object_processing_size;
32
33public:
34
35 awsCli_handler():
36 s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
37 m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
38 m_buff_header(std::make_unique<char[]>(1000)),
39 m_total_object_processing_size(0),
40 crc32(std::unique_ptr<boost::crc_32_type>())
41 {
42 }
f67539c2 43
20effc67
TL
44 enum header_name_En
45 {
46 EVENT_TYPE,
47 CONTENT_TYPE,
48 MESSAGE_TYPE
49 };
50 static const char* header_name_str[3];
51
52 enum header_value_En
53 {
54 RECORDS,
55 OCTET_STREAM,
56 EVENT,
57 CONT
58 };
59 static const char* header_value_str[4];
f67539c2 60
20effc67 61private:
f67539c2 62
20effc67
TL
63 void encode_short(char *buff, uint16_t s, int &i)
64 {
65 short x = htons(s);
66 memcpy(buff, &x, sizeof(s));
67 i += sizeof(s);
68 }
69
70 void encode_int(char *buff, u_int32_t s, int &i)
71 {
72 u_int32_t x = htonl(s);
73 memcpy(buff, &x, sizeof(s));
74 i += sizeof(s);
75 }
76
77 int create_header_records(char* buff)
f67539c2 78 {
20effc67
TL
79 int i = 0;
80
81 //1
82 buff[i++] = char(strlen(header_name_str[EVENT_TYPE]));
83 memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
84 i += strlen(header_name_str[EVENT_TYPE]);
85 buff[i++] = char(7);
86 encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i);
87 memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
88 i += strlen(header_value_str[RECORDS]);
89
90 //2
91 buff[i++] = char(strlen(header_name_str[CONTENT_TYPE]));
92 memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
93 i += strlen(header_name_str[CONTENT_TYPE]);
94 buff[i++] = char(7);
95 encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i);
96 memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
97 i += strlen(header_value_str[OCTET_STREAM]);
98
99 //3
100 buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE]));
101 memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
102 i += strlen(header_name_str[MESSAGE_TYPE]);
103 buff[i++] = char(7);
104 encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i);
105 memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
106 i += strlen(header_value_str[EVENT]);
107
108 return i;
109}
110
111 std::unique_ptr<boost::crc_32_type> crc32;
112
113 int create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len)
114 {
115 u_int32_t total_byte_len = 0;
116 u_int32_t preload_crc = 0;
117 u_int32_t message_crc = 0;
118 int i = 0;
119 char *buff = out_string.data();
120
121 if (crc32 == 0)
122 {
123 // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
124 crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
125 }
126
127 total_byte_len = result_len + 16;
128
129 encode_int(&buff[i], total_byte_len, i);
130 encode_int(&buff[i], header_len, i);
131
132 crc32->reset();
133 *crc32 = std::for_each(buff, buff + 8, *crc32);
134 preload_crc = (*crc32)();
135 encode_int(&buff[i], preload_crc, i);
136
137 i += result_len;
138
139 crc32->reset();
140 *crc32 = std::for_each(buff, buff + i, *crc32);
141 message_crc = (*crc32)();
142
143 int out_encode;
144 encode_int(reinterpret_cast<char*>(&out_encode), message_crc, i);
145 out_string.append(reinterpret_cast<char*>(&out_encode),sizeof(out_encode));
146
147 return i;
f67539c2
TL
148 }
149
20effc67
TL
150#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
151#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"
152
153public:
154
155 //std::string get_error_description(){}
156
157 std::string get_result()
158 {
159 return m_result;
160 }
161
162 int run_s3select(const char *query, const char *input, size_t input_length, size_t object_size)
163 {
164 int status = 0;
165 csv_object::csv_defintions csv;
166
167 m_result = "012345678901"; //12 positions for header-crc
168
169 int header_size = 0;
170
171 if (m_s3_csv_object == 0)
172 {
173 s3select_syntax->parse_query(query);
174
175 if (m_row_delimiter.size())
176 {
177 csv.row_delimiter = *m_row_delimiter.c_str();
178 }
179
180 if (m_column_delimiter.size())
181 {
182 csv.column_delimiter = *m_column_delimiter.c_str();
183 }
184
185 if (m_quot.size())
186 {
187 csv.quot_char = *m_quot.c_str();
188 }
189
190 if (m_escape_char.size())
191 {
192 csv.escape_char = *m_escape_char.c_str();
193 }
194
195 if (m_header_info.compare("IGNORE") == 0)
196 {
197 csv.ignore_header_info = true;
198 }
199 else if (m_header_info.compare("USE") == 0)
200 {
201 csv.use_header_info = true;
202 }
203
204 m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
205 }
206
207 if (s3select_syntax->get_error_description().empty() == false)
208 {
209 header_size = create_header_records(m_buff_header.get());
210 m_result.append(m_buff_header.get(), header_size);
211 m_result.append(PAYLOAD_LINE);
212 m_result.append(s3select_syntax->get_error_description());
213 //ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl;
214 status = -1;
215 }
216 else
217 {
218 header_size = create_header_records(m_buff_header.get());
219 m_result.append(m_buff_header.get(), header_size);
220 m_result.append(PAYLOAD_LINE);
221 //status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size);
222 status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, object_size);
223 if (status < 0)
224 {
225 m_result.append(m_s3_csv_object->get_error_description());
226 }
227 }
228
229 if (m_result.size() > strlen(PAYLOAD_LINE))
230 {
231 m_result.append(END_PAYLOAD_LINE);
232 create_message(m_result, m_result.size() - 12, header_size);
233 //s->formatter->write_bin_data(m_result.data(), buff_len);
234 //if (op_ret < 0)
235 //{
236 // return op_ret;
237 //}
238 }
239 //rgw_flush_formatter_and_reset(s, s->formatter);
240
241 return status;
242 }
243 //int extract_by_tag(std::string tag_name, std::string& result);
244
245 //void convert_escape_seq(std::string& esc);
246
247 //int handle_aws_cli_parameters(std::string& sql_query);
248
249};
250
251const char* awsCli_handler::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
252const char* awsCli_handler::header_value_str[4] = {"Records", "application/octet-stream", "event","cont"};
253int run_on_localFile(char* input_query);
254
255bool is_parquet_file(const char * fn)
256{//diffrentiate between csv and parquet
257 const char * ext = "parquet";
258
259 if(strstr(fn+strlen(fn)-strlen(ext), ext ))
260 {
261 return true;
262 }
263
264 return false;
f67539c2
TL
265}
266
20effc67
TL
267#ifdef _ARROW_EXIST
268int run_query_on_parquet_file(const char* input_query, const char* input_file)
f67539c2 269{
20effc67 270 int status;
f67539c2
TL
271 s3select s3select_syntax;
272
20effc67
TL
273 status = s3select_syntax.parse_query(input_query);
274 if (status != 0)
275 {
276 std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
277 return -1;
278 }
279
280 FILE *fp;
281
282 fp=fopen(input_file,"r");
283
284 if(!fp){
285 std::cout << "can not open " << input_file << std::endl;
286 return -1;
287 }
288
289 std::function<int(void)> fp_get_size=[&]()
290 {
291 struct stat l_buf;
292 lstat(input_file,&l_buf);
293 return l_buf.st_size;
294 };
f67539c2 295
20effc67 296 std::function<size_t(int64_t,int64_t,void*,optional_yield*)> fp_range_req=[&](int64_t start,int64_t length,void *buff,optional_yield*y)
f67539c2 297 {
20effc67 298 fseek(fp,start,SEEK_SET);
1e59de90
TL
299 size_t read_sz = fread(buff, 1, length, fp);
300 return read_sz;
20effc67 301 };
f67539c2 302
20effc67
TL
303 rgw_s3select_api rgw;
304 rgw.set_get_size_api(fp_get_size);
305 rgw.set_range_req_api(fp_range_req);
306
307 std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){std::cout << result;result.clear();return 0;};
308 std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){result="";return 0;};
1e59de90
TL
309 std::function<void(const char*)> fp_debug = [](const char* msg)
310 {
311 std::cout << "DEBUG: {" << msg << "}" << std::endl;
312 };
20effc67
TL
313
314 parquet_object parquet_processor(input_file,&s3select_syntax,&rgw);
1e59de90 315 //parquet_processor.set_external_debug_system(fp_debug);
20effc67
TL
316
317 std::string result;
318
319 do
320 {
321 try
f67539c2 322 {
20effc67
TL
323 status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format);
324 }
325 catch (base_s3select_exception &e)
326 {
327 std::cout << e.what() << std::endl;
328 //m_error_description = e.what();
329 //m_error_count++;
330 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
331 {
332 return -1;
333 }
f67539c2 334 }
f67539c2 335
20effc67
TL
336 if(status<0)
337 {
338 std::cout << parquet_processor.get_error_description() << std::endl;
339 break;
340 }
341
342 std::cout << result << std::endl;
343
1e59de90
TL
344 if(status == 2) // limit reached
345 {
346 break;
347 }
348
20effc67
TL
349 } while (0);
350
351 return 0;
352}
353#else
354int run_query_on_parquet_file(const char* input_query, const char* input_file)
355{
356 std::cout << "arrow is not installed" << std::endl;
357 return 0;
358}
359#endif //_ARROW_EXIST
360
1e59de90
TL
361#define BUFFER_SIZE (4*1024*1024)
362int process_json_query(const char* input_query,const char* fname)
363{//purpose: process json query
364
365 s3select s3select_syntax;
366 int status = s3select_syntax.parse_query(input_query);
367 if (status != 0)
368 {
369 std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
370 return -1;
371 }
372
373 std::ifstream input_file_stream;
374 try {
375 input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary);
376 }
377 catch( ... )
378 {
379 std::cout << "failed to open file " << fname << std::endl;
380 exit(-1);
381 }
382
383 auto object_sz = boost::filesystem::file_size(fname);
384 json_object json_query_processor(&s3select_syntax);
385 std::string buff(BUFFER_SIZE,0);
386 std::string result;
387
388 size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE);
389
390 while(read_sz)
391 {
392 std::cout << "read next chunk " << read_sz << std::endl;
393 result.clear();
394
395 try{
396 status = json_query_processor.run_s3select_on_stream(result, buff.data(), read_sz, object_sz);
397 } catch (base_s3select_exception &e)
398 {
399 std::cout << e.what() << std::endl;
400 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
401 {
402 return -1;
403 }
404 }
405
406 std::cout << result << std::endl;
407
408 if(status<0)
409 {
410 std::cout << "failure upon processing " << std::endl;
411 return -1;
412 }
413 if(json_query_processor.is_sql_limit_reached())
414 {
415 std::cout << "json processing reached limit " << std::endl;
416 break;
417 }
418 read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE);
419 }
420 try{
421 result.clear();
422 json_query_processor.run_s3select_on_stream(result, 0, 0, object_sz);
423 } catch (base_s3select_exception &e)
424 {
425 std::cout << e.what() << std::endl;
426 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
427 {
428 return -1;
429 }
430 }
431
432 std::cout << result << std::endl;
433
434 return 0;
435}
20effc67 436
1e59de90
TL
437int run_on_localFile(char* input_query)
438{
20effc67
TL
439 //purpose: demostrate the s3select functionalities
440 s3select s3select_syntax;
f67539c2
TL
441
442 if (!input_query)
443 {
444 std::cout << "type -q 'select ... from ... '" << std::endl;
445 return -1;
446 }
447
f67539c2
TL
448 int status = s3select_syntax.parse_query(input_query);
449 if (status != 0)
450 {
451 std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
452 return -1;
453 }
454
20effc67
TL
455 std::string object_name = s3select_syntax.get_from_clause();
456
457 if (is_parquet_file(object_name.c_str()))
458 {
459 try {
460 return run_query_on_parquet_file(input_query, object_name.c_str());
461 }
462 catch (base_s3select_exception &e)
463 {
464 std::cout << e.what() << std::endl;
465 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
466 {
467 return -1;
468 }
469 }
470 }
f67539c2 471
1e59de90 472 FILE* fp = nullptr;
f67539c2
TL
473
474 if (object_name.compare("stdin")==0)
475 {
476 fp = stdin;
477 }
478 else
479 {
480 fp = fopen(object_name.c_str(), "r");
481 }
482
f67539c2
TL
483 if(!fp)
484 {
485 std::cout << " input stream is not valid, abort;" << std::endl;
486 return -1;
487 }
488
489 struct stat statbuf;
f67539c2
TL
490 lstat(object_name.c_str(), &statbuf);
491
492 std::string s3select_result;
493 s3selectEngine::csv_object::csv_defintions csv;
494 csv.use_header_info = false;
20effc67
TL
495 csv.quote_fields_always=false;
496
1e59de90
TL
497#define CSV_QUOT "CSV_ALWAYS_QUOT"
498#define CSV_COL_DELIM "CSV_COLUMN_DELIMETER"
499#define CSV_ROW_DELIM "CSV_ROW_DELIMITER"
500#define CSV_HEADER_INFO "CSV_HEADER_INFO"
501
502 if(getenv(CSV_QUOT))
20effc67
TL
503 {
504 csv.quote_fields_always=true;
505 }
1e59de90
TL
506 if(getenv(CSV_COL_DELIM))
507 {
508 csv.column_delimiter=*getenv(CSV_COL_DELIM);
509 }
510 if(getenv(CSV_ROW_DELIM))
511 {
512 csv.row_delimiter=*getenv(CSV_ROW_DELIM);
513 }
514 if(getenv(CSV_HEADER_INFO))
515 {
516 csv.use_header_info = true;
517 }
20effc67 518
f67539c2 519 s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv);
f67539c2 520
1e59de90
TL
521 std::function<void(const char*)> fp_debug = [](const char* msg)
522 {
523 std::cout << "DEBUG" << msg << std::endl;
524 };
525
526 //s3_csv_object.set_external_debug_system(fp_debug);
527
528#define BUFF_SIZE (1024*1024*4) //simulate 4mb parts in s3 object
f67539c2
TL
529 char* buff = (char*)malloc( BUFF_SIZE );
530 while(1)
531 {
1e59de90 532 buff[0]=0;
f67539c2
TL
533 size_t input_sz = fread(buff, 1, BUFF_SIZE, fp);
534 char* in=buff;
f67539c2 535
1e59de90 536 if (!input_sz)
20effc67 537 {
1e59de90
TL
538 if(fp == stdin)
539 {
540 status = s3_csv_object.run_s3select_on_stream(s3select_result, nullptr, 0, 0);
541 if(s3select_result.size()>0)
542 {
543 std::cout << s3select_result;
544 }
545 }
546 break;
20effc67 547 }
f67539c2 548
1e59de90 549 if(fp != stdin)
20effc67 550 {
1e59de90 551 status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size);
20effc67
TL
552 }
553 else
554 {
1e59de90 555 status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, INT_MAX);
20effc67 556 }
f67539c2 557
f67539c2
TL
558 if(status<0)
559 {
20effc67 560 std::cout << "failure on execution " << std::endl << s3_csv_object.get_error_description() << std::endl;
f67539c2
TL
561 break;
562 }
563
1e59de90 564 if(s3select_result.size()>0)
f67539c2
TL
565 {
566 std::cout << s3select_result;
567 }
568
1e59de90 569 if(!input_sz || feof(fp) || status == 2)
f67539c2
TL
570 {
571 break;
572 }
573
1e59de90
TL
574 s3select_result.clear();
575 }//end-while
f67539c2 576
1e59de90
TL
577 free(buff);
578 fclose(fp);
f67539c2 579
1e59de90 580 return 0;
20effc67
TL
581}
582
583int run_on_single_query(const char* fname, const char* query)
584{
585
586 std::unique_ptr<awsCli_handler> awscli = std::make_unique<awsCli_handler>() ;
587 std::ifstream input_file_stream;
588 try {
589 input_file_stream = std::ifstream(fname, std::ios::in | std::ios::binary);
590 }
591 catch( ... )
592 {
593 std::cout << "failed to open file " << fname << std::endl;
594 exit(-1);
595 }
596
597
598 if (is_parquet_file(fname))
599 {
600 std::string result;
601 int status = run_query_on_parquet_file(query, fname);
602 return status;
603 }
604
1e59de90
TL
605 s3select query_ast;
606 auto status = query_ast.parse_query(query);
607 if(status<0)
608 {
609 std::cout << "failed to parse query : " << query_ast.get_error_description() << std::endl;
610 return -1;
611 }
612
613 if(query_ast.is_json_query())
614 {
615 return process_json_query(query,fname);
616 }
617
618
20effc67
TL
619 auto file_sz = boost::filesystem::file_size(fname);
620
20effc67
TL
621 std::string buff(BUFFER_SIZE,0);
622 while (1)
623 {
624 size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE);
625
626 status = awscli->run_s3select(query, buff.data(), read_sz, file_sz);
627 if(status<0)
628 {
629 std::cout << "failure on execution " << std::endl;
630 break;
631 }
632 else
633 {
634 std::cout << awscli->get_result() << std::endl;
635 }
636
637 if(!read_sz || input_file_stream.eof())
638 {
639 break;
640 }
641 }
642
643 return status;
644}
645
646int main(int argc,char **argv)
647{
20effc67
TL
648 char *query=0;
649 char *fname=0;
650 char *query_file=0;//file contains many queries
651
652 for (int i = 0; i < argc; i++)
653 {
20effc67 654 if (!strcmp(argv[i], "-key"))
1e59de90 655 {//object recieved as CLI parameter
20effc67
TL
656 fname = argv[i + 1];
657 continue;
658 }
659
660 if (!strcmp(argv[i], "-q"))
661 {
662 query = argv[i + 1];
663 continue;
664 }
665
666 if (!strcmp(argv[i], "-cmds"))
1e59de90 667 {//query file contain many queries
20effc67
TL
668 query_file = argv[i + 1];
669 continue;
670 }
1e59de90
TL
671
672 if (!strcmp(argv[i], "-h") || !strcmp(argv[i], "-help"))
673 {
674 std::cout << "CSV_ALWAYS_QUOT= CSV_COLUMN_DELIMETER= CSV_ROW_DELIMITER= CSV_HEADER_INFO= s3select_example -q \"... query ...\" -key object-path -cmds queries-file" << std::endl;
675 exit(0);
676 }
20effc67
TL
677 }
678
679 if(fname == 0)
680 {//object is in query explicitly.
681 return run_on_localFile(query);
682 }
f67539c2 683
20effc67
TL
684 if(query_file)
685 {
1e59de90 686 //purpose: run many queries (reside in file) on single file.
20effc67 687 std::fstream f(query_file, std::ios::in | std::ios::binary);
20effc67 688 const auto sz = boost::filesystem::file_size(query_file);
20effc67 689 std::string result(sz, '\0');
20effc67 690 f.read(result.data(), sz);
20effc67
TL
691 boost::char_separator<char> sep("\n");
692 boost::tokenizer<boost::char_separator<char>> tokens(result, sep);
1e59de90 693
20effc67
TL
694 for (const auto& t : tokens) {
695 std::cout << t << std::endl;
20effc67 696 int status = run_on_single_query(fname,t.c_str());
20effc67
TL
697 std::cout << "status: " << status << std::endl;
698 }
699
700 return(0);
701 }
702
20effc67 703 int status = run_on_single_query(fname,query);
20effc67 704 return status;
f67539c2 705}
20effc67 706