]> git.proxmox.com Git - ceph.git/blob - ceph/src/s3select/include/s3select.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / s3select / include / s3select.h
1 #ifndef __S3SELECT__
2 #define __S3SELECT__
3 #define BOOST_SPIRIT_THREADSAFE
4
5 #pragma once
6 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
7 #include <boost/spirit/include/classic_core.hpp>
8 #include <boost/algorithm/string.hpp>
9 #include <iostream>
10 #include <string>
11 #include <list>
12 #include "s3select_oper.h"
13 #include "s3select_functions.h"
14 #include "s3select_csv_parser.h"
15 #include <boost/function.hpp>
16 #include <boost/bind.hpp>
17 #include <functional>
18
19
20 #define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}
21
22 namespace s3selectEngine
23 {
24
25 /// AST builder
26
27 class s3select_projections
28 {
29
30 private:
31 std::vector<base_statement*> m_projections;
32
33 public:
34
35 std::vector<base_statement*>* get()
36 {
37 return &m_projections;
38 }
39
40 };
41
42 static s3select_reserved_word g_s3select_reserve_word;//read-only
43
44 struct actionQ
45 {
46 // upon parser is accepting a token (lets say some number),
47 // it push it into dedicated queue, later those tokens are poped out to build some "higher" contruct (lets say 1 + 2)
48 // those containers are used only for parsing phase and not for runtime.
49
50 std::vector<mulldiv_operation::muldiv_t> muldivQ;
51 std::vector<addsub_operation::addsub_op_t> addsubQ;
52 std::vector<arithmetic_operand::cmp_t> arithmetic_compareQ;
53 std::vector<logical_operand::oplog_t> logical_compareQ;
54 std::vector<base_statement*> exprQ;
55 std::vector<base_statement*> funcQ;
56 std::vector<base_statement*> whenThenQ;
57 std::vector<base_statement*> inPredicateQ;
58 base_statement* inMainArg;
59 std::vector<std::string> dataTypeQ;
60 std::vector<std::string> trimTypeQ;
61 std::vector<std::string> datePartQ;
62 std::vector<base_statement*> caseValueQ;
63 projection_alias alias_map;
64 std::string from_clause;
65 std::string column_prefix;
66 std::string table_alias;
67 s3select_projections projections;
68
69 bool projection_or_predicate_state; //true->projection false->predicate(where-clause statement)
70 std::vector<base_statement*> predicate_columns;
71 std::vector<base_statement*> projections_columns;
72
73 size_t when_then_count;
74
75 actionQ(): inMainArg(0),from_clause("##"),column_prefix("##"),table_alias("##"),projection_or_predicate_state(true),when_then_count(0){}//TODO remove when_then_count
76
77 std::map<const void*,std::vector<const char*> *> x_map;
78
79 ~actionQ()
80 {
81 for(auto m : x_map)
82 delete m.second;
83 }
84
85 bool is_already_scanned(const void *th,const char *a)
86 {
87 //purpose: caller get indication in the case a specific builder is scan more than once the same text(pointer)
88 auto t = x_map.find(th);
89
90 if(t == x_map.end())
91 {
92 auto v = new std::vector<const char*>;//TODO delete
93 x_map.insert(std::pair<const void*,std::vector<const char*> *>(th,v));
94 v->push_back(a);
95 }
96 else
97 {
98 for(auto& c : *(t->second))
99 {
100 if( strcmp(c,a) == 0)
101 return true;
102 }
103 t->second->push_back(a);
104 }
105 return false;
106 }
107
108 };
109
110 class s3select;
111
112 struct base_ast_builder
113 {
114 void operator()(s3select* self, const char* a, const char* b) const;
115
116 virtual void builder(s3select* self, const char* a, const char* b) const = 0;
117
118 virtual ~base_ast_builder() = default;
119 };
120
121 struct push_from_clause : public base_ast_builder
122 {
123 void builder(s3select* self, const char* a, const char* b) const;
124 };
125 static push_from_clause g_push_from_clause;
126
127 struct push_number : public base_ast_builder
128 {
129 void builder(s3select* self, const char* a, const char* b) const;
130 };
131 static push_number g_push_number;
132
133 struct push_float_number : public base_ast_builder
134 {
135 void builder(s3select* self, const char* a, const char* b) const;
136 };
137 static push_float_number g_push_float_number;
138
139 struct push_string : public base_ast_builder
140 {
141 void builder(s3select* self, const char* a, const char* b) const;
142 };
143 static push_string g_push_string;
144
145 struct push_variable : public base_ast_builder
146 {
147 void builder(s3select* self, const char* a, const char* b) const;
148 };
149 static push_variable g_push_variable;
150
151 /////////////////////////arithmetic unit /////////////////
152 struct push_addsub : public base_ast_builder
153 {
154 void builder(s3select* self, const char* a, const char* b) const;
155 };
156 static push_addsub g_push_addsub;
157
158 struct push_mulop : public base_ast_builder
159 {
160 void builder(s3select* self, const char* a, const char* b) const;
161 };
162 static push_mulop g_push_mulop;
163
164 struct push_addsub_binop : public base_ast_builder
165 {
166 void builder(s3select* self, const char* a, const char* b) const;
167 };
168 static push_addsub_binop g_push_addsub_binop;
169
170 struct push_mulldiv_binop : public base_ast_builder
171 {
172 void builder(s3select* self, const char* a, const char* b) const;
173 };
174 static push_mulldiv_binop g_push_mulldiv_binop;
175
176 struct push_function_arg : public base_ast_builder
177 {
178 void builder(s3select* self, const char* a, const char* b) const;
179 };
180 static push_function_arg g_push_function_arg;
181
182 struct push_function_name : public base_ast_builder
183 {
184 void builder(s3select* self, const char* a, const char* b) const;
185 };
186 static push_function_name g_push_function_name;
187
188 struct push_function_expr : public base_ast_builder
189 {
190 void builder(s3select* self, const char* a, const char* b) const;
191 };
192 static push_function_expr g_push_function_expr;
193
194 struct push_cast_expr : public base_ast_builder
195 {
196 void builder(s3select* self, const char* a, const char* b) const;
197 };
198 static push_cast_expr g_push_cast_expr;
199
200 struct push_data_type : public base_ast_builder
201 {
202 void builder(s3select* self, const char* a, const char* b) const;
203 };
204 static push_data_type g_push_data_type;
205
206 ////////////////////// logical unit ////////////////////////
207
208 struct push_compare_operator : public base_ast_builder
209 {
210 void builder(s3select* self, const char* a, const char* b) const;
211
212 };
213 static push_compare_operator g_push_compare_operator;
214
215 struct push_logical_operator : public base_ast_builder
216 {
217 void builder(s3select* self, const char* a, const char* b) const;
218
219 };
220 static push_logical_operator g_push_logical_operator;
221
222 struct push_arithmetic_predicate : public base_ast_builder
223 {
224 void builder(s3select* self, const char* a, const char* b) const;
225
226 };
227 static push_arithmetic_predicate g_push_arithmetic_predicate;
228
229 struct push_logical_predicate : public base_ast_builder
230 {
231 void builder(s3select* self, const char* a, const char* b) const;
232 };
233 static push_logical_predicate g_push_logical_predicate;
234
235 struct push_negation : public base_ast_builder
236 {
237 void builder(s3select* self, const char* a, const char* b) const;
238 };
239 static push_negation g_push_negation;
240
241 struct push_column_pos : public base_ast_builder
242 {
243 void builder(s3select* self, const char* a, const char* b) const;
244 };
245 static push_column_pos g_push_column_pos;
246
247 struct push_projection : public base_ast_builder
248 {
249 void builder(s3select* self, const char* a, const char* b) const;
250 };
251 static push_projection g_push_projection;
252
253 struct push_alias_projection : public base_ast_builder
254 {
255 void builder(s3select* self, const char* a, const char* b) const;
256 };
257 static push_alias_projection g_push_alias_projection;
258
259 struct push_between_filter : public base_ast_builder
260 {
261 void builder(s3select* self, const char* a, const char* b) const;
262 };
263 static push_between_filter g_push_between_filter;
264
265 struct push_in_predicate : public base_ast_builder
266 {
267 void builder(s3select* self, const char* a, const char* b) const;
268 };
269 static push_in_predicate g_push_in_predicate;
270
271 struct push_in_predicate_arguments : public base_ast_builder
272 {
273 void builder(s3select* self, const char* a, const char* b) const;
274 };
275 static push_in_predicate_arguments g_push_in_predicate_arguments;
276
277 struct push_in_predicate_first_arg : public base_ast_builder
278 {
279 void builder(s3select* self, const char* a, const char* b) const;
280 };
281 static push_in_predicate_first_arg g_push_in_predicate_first_arg;
282
283 struct push_like_predicate_escape : public base_ast_builder
284 {
285 void builder(s3select* self, const char* a, const char* b) const;
286 };
287 static push_like_predicate_escape g_push_like_predicate_escape;
288
289 struct push_like_predicate_no_escape : public base_ast_builder
290 {
291 void builder(s3select* self, const char* a, const char* b) const;
292 };
293 static push_like_predicate_no_escape g_push_like_predicate_no_escape;
294
295 struct push_is_null_predicate : public base_ast_builder
296 {
297 void builder(s3select* self, const char* a, const char* b) const;
298 };
299 static push_is_null_predicate g_push_is_null_predicate;
300
301 struct push_case_when_else : public base_ast_builder
302 {
303 void builder(s3select* self, const char* a, const char* b) const;
304 };
305 static push_case_when_else g_push_case_when_else;
306
307 struct push_when_condition_then : public base_ast_builder
308 {
309 void builder(s3select* self, const char* a, const char* b) const;
310 };
311 static push_when_condition_then g_push_when_condition_then;
312
313 struct push_when_value_then : public base_ast_builder
314 {
315 void builder(s3select* self, const char* a, const char* b) const;
316 };
317 static push_when_value_then g_push_when_value_then;
318
319 struct push_case_value : public base_ast_builder
320 {
321 void builder(s3select* self, const char* a, const char* b) const;
322 };
323 static push_case_value g_push_case_value;
324
325 struct push_substr_from : public base_ast_builder
326 {
327 void builder(s3select* self, const char* a, const char* b) const;
328 };
329 static push_substr_from g_push_substr_from;
330
331 struct push_substr_from_for : public base_ast_builder
332 {
333 void builder(s3select* self, const char* a, const char* b) const;
334 };
335 static push_substr_from_for g_push_substr_from_for;
336
337 struct push_trim_type : public base_ast_builder
338 {
339 void builder(s3select* self, const char* a, const char* b) const;
340 };
341 static push_trim_type g_push_trim_type;
342
343 struct push_trim_whitespace_both : public base_ast_builder
344 {
345 void builder(s3select* self, const char* a, const char* b) const;
346 };
347 static push_trim_whitespace_both g_push_trim_whitespace_both;
348
349 struct push_trim_expr_one_side_whitespace : public base_ast_builder
350 {
351 void builder(s3select* self, const char* a, const char* b) const;
352 };
353 static push_trim_expr_one_side_whitespace g_push_trim_expr_one_side_whitespace;
354
355 struct push_trim_expr_anychar_anyside : public base_ast_builder
356 {
357 void builder(s3select* self, const char* a, const char* b) const;
358 };
359 static push_trim_expr_anychar_anyside g_push_trim_expr_anychar_anyside;
360
361 struct push_datediff : public base_ast_builder
362 {
363 void builder(s3select* self, const char* a, const char* b) const;
364 };
365 static push_datediff g_push_datediff;
366
367 struct push_dateadd : public base_ast_builder
368 {
369 void builder(s3select* self, const char* a, const char* b) const;
370 };
371 static push_dateadd g_push_dateadd;
372
373 struct push_extract : public base_ast_builder
374 {
375 void builder(s3select* self, const char* a, const char* b) const;
376 };
377 static push_extract g_push_extract;
378
379 struct push_date_part : public base_ast_builder
380 {
381 void builder(s3select* self, const char* a, const char* b) const;
382 };
383 static push_date_part g_push_date_part;
384
385 struct push_time_to_string_constant : public base_ast_builder
386 {
387 void builder(s3select* self, const char* a, const char* b) const;
388 };
389 static push_time_to_string_constant g_push_time_to_string_constant;
390
391 struct push_time_to_string_dynamic : public base_ast_builder
392 {
393 void builder(s3select* self, const char* a, const char* b) const;
394 };
395 static push_time_to_string_dynamic g_push_time_to_string_dynamic;
396
397 struct s3select : public bsc::grammar<s3select>
398 {
399 private:
400
401 actionQ m_actionQ;
402
403 scratch_area m_sca;
404
405 s3select_functions m_s3select_functions;
406
407 std::string error_description;
408
409 s3select_allocator m_s3select_allocator;
410
411 bool aggr_flow;
412
413 #define BOOST_BIND_ACTION( push_name ) boost::bind( &push_name::operator(), g_ ## push_name, const_cast<s3select*>(&self), _1, _2)
414
415 public:
416
417 actionQ* getAction()
418 {
419 return &m_actionQ;
420 }
421
422 s3select_allocator* getAllocator()
423 {
424 return &m_s3select_allocator;
425 }
426
427 s3select_functions* getS3F()
428 {
429 return &m_s3select_functions;
430 }
431
432 int semantic()
433 {
434 for (const auto &e : get_projections_list())
435 {
436 e->resolve_node();
437 //upon validate there is no aggregation-function nested calls, it validates legit aggregation call.
438 if (e->is_nested_aggregate(aggr_flow))
439 {
440 error_description = "nested aggregation function is illegal i.e. sum(...sum ...)";
441 throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL);
442 }
443 }
444
445 if (aggr_flow == true)
446 {// atleast one projection column contain aggregation function
447 for (const auto &e : get_projections_list())
448 {
449 auto aggregate_expr = e->get_aggregate();
450
451 if (aggregate_expr)
452 {
453 //per each column, subtree is mark to skip except for the aggregation function subtree.
454 //for an example: substring( ... , sum() , count() ) :: the substring is mark to skip execution, while sum and count not.
455 e->set_skip_non_aggregate(true);
456 e->mark_aggreagtion_subtree_to_execute();
457 }
458 else
459 {
460 //in case projection column is not aggregate, the projection column must *not* contain reference to columns.
461 if(e->is_column_reference())
462 {
463 error_description = "illegal query; projection contains aggregation function is not allowed with projection contains column reference";
464 throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL);
465 }
466 }
467
468 }
469 }
470 return 0;
471 }
472
473 int parse_query(const char* input_query)
474 {
475 if(get_projections_list().empty() == false)
476 {
477 return 0; //already parsed
478 }
479
480
481 error_description.clear();
482 aggr_flow = false;
483
484 try
485 {
486 bsc::parse_info<> info = bsc::parse(input_query, *this, bsc::space_p);
487 auto query_parse_position = info.stop;
488
489 if (!info.full)
490 {
491 error_description = std::string("failure -->") + query_parse_position + std::string("<---");
492 return -1;
493 }
494
495 semantic();
496 }
497 catch (base_s3select_exception& e)
498 {
499 error_description.assign(e.what());
500 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
501 {
502 return -1;
503 }
504 }
505
506 return 0;
507 }
508
509 std::string get_error_description()
510 {
511 return error_description;
512 }
513
514 s3select()
515 {
516 m_s3select_functions.setAllocator(&m_s3select_allocator);
517 }
518
519 bool is_semantic()//TBD traverse and validate semantics per all nodes
520 {
521 base_statement* cond = m_actionQ.exprQ.back();
522
523 return cond->semantic();
524 }
525
526 std::string get_from_clause() const
527 {
528 return m_actionQ.from_clause;
529 }
530
531 void load_schema(std::vector< std::string>& scm)
532 {
533 int i = 0;
534 for (auto& c : scm)
535 {
536 m_sca.set_column_pos(c.c_str(), i++);
537 }
538 }
539
540 base_statement* get_filter()
541 {
542 if(m_actionQ.exprQ.empty())
543 {
544 return nullptr;
545 }
546
547 return m_actionQ.exprQ.back();
548 }
549
550 std::vector<base_statement*> get_projections_list()
551 {
552 return *m_actionQ.projections.get(); //TODO return COPY(?) or to return evalaution results (list of class value{}) / return reference(?)
553 }
554
555 scratch_area* get_scratch_area()
556 {
557 return &m_sca;
558 }
559
560 projection_alias* get_aliases()
561 {
562 return &m_actionQ.alias_map;
563 }
564
565 bool is_aggregate_query() const
566 {
567 return aggr_flow == true;
568 }
569
570 ~s3select()
571 {
572 m_s3select_functions.clean();
573 }
574
575 //the input is converted to lower case
576 #define S3SELECT_KW( reserve_word ) bsc::as_lower_d[ reserve_word ]
577
578 template <typename ScannerT>
579 struct definition
580 {
581 explicit definition(s3select const& self)
582 {
583 ///// s3select syntax rules and actions for building AST
584
585 select_expr = (select_expr_base >> ';') | select_expr_base;
586
587 select_expr_base = S3SELECT_KW("select") >> projections >> S3SELECT_KW("from") >> (from_expression)[BOOST_BIND_ACTION(push_from_clause)] >> !where_clause ;
588
589 projections = projection_expression >> *( ',' >> projection_expression) ;
590
591 projection_expression = (when_case_else_projection|when_case_value_when) [BOOST_BIND_ACTION(push_projection)] |
592 (arithmetic_expression >> S3SELECT_KW("as") >> alias_name)[BOOST_BIND_ACTION(push_alias_projection)] |
593 (arithmetic_expression)[BOOST_BIND_ACTION(push_projection)] |
594 (arithmetic_predicate >> S3SELECT_KW("as") >> alias_name)[BOOST_BIND_ACTION(push_alias_projection)] |
595 (arithmetic_predicate)[BOOST_BIND_ACTION(push_projection)] ;
596
597 alias_name = bsc::lexeme_d[(+bsc::alpha_p >> *bsc::digit_p)] ;
598
599 when_case_else_projection = (S3SELECT_KW("case") >> (+when_stmt) >> S3SELECT_KW("else") >> arithmetic_expression >> S3SELECT_KW("end")) [BOOST_BIND_ACTION(push_case_when_else)];
600
601 when_stmt = (S3SELECT_KW("when") >> condition_expression >> S3SELECT_KW("then") >> arithmetic_expression)[BOOST_BIND_ACTION(push_when_condition_then)];
602
603 when_case_value_when = (S3SELECT_KW("case") >> arithmetic_expression[BOOST_BIND_ACTION(push_case_value)] >>
604 (+when_value_then) >> S3SELECT_KW("else") >> arithmetic_expression >> S3SELECT_KW("end")) [BOOST_BIND_ACTION(push_case_when_else)];
605
606 when_value_then = (S3SELECT_KW("when") >> arithmetic_expression >> S3SELECT_KW("then") >> arithmetic_expression)[BOOST_BIND_ACTION(push_when_value_then)];
607
608 from_expression = (s3_object >> (variable - S3SELECT_KW("where"))) | s3_object;
609
610 //the stdin and object_path are for debug purposes(not part of the specs)
611 s3_object = S3SELECT_KW("stdin") | S3SELECT_KW("s3object") | object_path ;
612
613 object_path = "/" >> *( fs_type >> "/") >> fs_type;
614
615 fs_type = bsc::lexeme_d[+( bsc::alnum_p | bsc::str_p(".") | bsc::str_p("_")) ];
616
617 where_clause = S3SELECT_KW("where") >> condition_expression;
618
619 condition_expression = arithmetic_predicate;
620
621 arithmetic_predicate = (S3SELECT_KW("not") >> logical_predicate)[BOOST_BIND_ACTION(push_negation)] | logical_predicate;
622
623 logical_predicate = (logical_and) >> *(or_op[BOOST_BIND_ACTION(push_logical_operator)] >> (logical_and)[BOOST_BIND_ACTION(push_logical_predicate)]);
624
625 logical_and = (cmp_operand) >> *(and_op[BOOST_BIND_ACTION(push_logical_operator)] >> (cmp_operand)[BOOST_BIND_ACTION(push_logical_predicate)]);
626
627 cmp_operand = special_predicates | (factor) >> *(arith_cmp[BOOST_BIND_ACTION(push_compare_operator)] >> (factor)[BOOST_BIND_ACTION(push_arithmetic_predicate)]);
628
629 special_predicates = (is_null) | (is_not_null) | (between_predicate) | (in_predicate) | (like_predicate);
630
631 is_null = ((factor) >> S3SELECT_KW("is") >> S3SELECT_KW("null"))[BOOST_BIND_ACTION(push_is_null_predicate)];
632
633 is_not_null = ((factor) >> S3SELECT_KW("is") >> S3SELECT_KW("not") >> S3SELECT_KW("null"))[BOOST_BIND_ACTION(push_is_null_predicate)];
634
635 between_predicate = (arithmetic_expression >> S3SELECT_KW("between") >> arithmetic_expression >> S3SELECT_KW("and") >> arithmetic_expression)[BOOST_BIND_ACTION(push_between_filter)];
636
637 in_predicate = (arithmetic_expression >> S3SELECT_KW("in") >> '(' >> arithmetic_expression[BOOST_BIND_ACTION(push_in_predicate_first_arg)] >> *(',' >> arithmetic_expression[BOOST_BIND_ACTION(push_in_predicate_arguments)]) >> ')')[BOOST_BIND_ACTION(push_in_predicate)];
638
639 like_predicate = (like_predicate_escape) |(like_predicate_no_escape);
640
641 like_predicate_no_escape = (arithmetic_expression >> S3SELECT_KW("like") >> arithmetic_expression)[BOOST_BIND_ACTION(push_like_predicate_no_escape)];
642
643 like_predicate_escape = (arithmetic_expression >> S3SELECT_KW("like") >> arithmetic_expression >> S3SELECT_KW("escape") >> arithmetic_expression)[BOOST_BIND_ACTION(push_like_predicate_escape)];
644
645 factor = arithmetic_expression | ( '(' >> arithmetic_predicate >> ')' ) ;
646
647 arithmetic_expression = (addsub_operand >> *(addsubop_operator[BOOST_BIND_ACTION(push_addsub)] >> addsub_operand[BOOST_BIND_ACTION(push_addsub_binop)] ));
648
649 addsub_operand = (mulldiv_operand >> *(muldiv_operator[BOOST_BIND_ACTION(push_mulop)] >> mulldiv_operand[BOOST_BIND_ACTION(push_mulldiv_binop)] ));// this non-terminal gives precedense to mull/div
650
651 mulldiv_operand = arithmetic_argument | ('(' >> (arithmetic_expression) >> ')') ;
652
653 list_of_function_arguments = (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)] >> *(',' >> (arithmetic_expression)[BOOST_BIND_ACTION(push_function_arg)]);
654
655 function = ((variable >> '(' )[BOOST_BIND_ACTION(push_function_name)] >> !list_of_function_arguments >> ')')[BOOST_BIND_ACTION(push_function_expr)];
656
657 arithmetic_argument = (float_number)[BOOST_BIND_ACTION(push_float_number)] | (number)[BOOST_BIND_ACTION(push_number)] | (column_pos)[BOOST_BIND_ACTION(push_column_pos)] |
658 (string)[BOOST_BIND_ACTION(push_string)] | (datediff) | (dateadd) | (extract) | (time_to_string_constant) | (time_to_string_dynamic) |
659 (cast) | (substr) | (trim) |
660 (function) | (variable)[BOOST_BIND_ACTION(push_variable)]; //function is pushed by right-term
661
662 cast = (S3SELECT_KW("cast") >> '(' >> arithmetic_expression >> S3SELECT_KW("as") >> (data_type)[BOOST_BIND_ACTION(push_data_type)] >> ')') [BOOST_BIND_ACTION(push_cast_expr)];
663
664 data_type = (S3SELECT_KW("int") | S3SELECT_KW("float") | S3SELECT_KW("string") | S3SELECT_KW("timestamp") | S3SELECT_KW("bool") );
665
666 substr = (substr_from) | (substr_from_for);
667
668 substr_from = (S3SELECT_KW("substring") >> '(' >> (arithmetic_expression >> S3SELECT_KW("from") >> arithmetic_expression) >> ')') [BOOST_BIND_ACTION(push_substr_from)];
669
670 substr_from_for = (S3SELECT_KW("substring") >> '(' >> (arithmetic_expression >> S3SELECT_KW("from") >> arithmetic_expression >> S3SELECT_KW("for") >> arithmetic_expression) >> ')') [BOOST_BIND_ACTION(push_substr_from_for)];
671
672 trim = (trim_whitespace_both) | (trim_one_side_whitespace) | (trim_anychar_anyside);
673
674 trim_one_side_whitespace = (S3SELECT_KW("trim") >> '(' >> (trim_type)[BOOST_BIND_ACTION(push_trim_type)] >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_trim_expr_one_side_whitespace)];
675
676 trim_whitespace_both = (S3SELECT_KW("trim") >> '(' >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_trim_whitespace_both)];
677
678 trim_anychar_anyside = (S3SELECT_KW("trim") >> '(' >> ((trim_remove_type)[BOOST_BIND_ACTION(push_trim_type)] >> arithmetic_expression >> S3SELECT_KW("from") >> arithmetic_expression) >> ')') [BOOST_BIND_ACTION(push_trim_expr_anychar_anyside)];
679
680 trim_type = ((S3SELECT_KW("leading") >> S3SELECT_KW("from")) | ( S3SELECT_KW("trailing") >> S3SELECT_KW("from")) | (S3SELECT_KW("both") >> S3SELECT_KW("from")) | S3SELECT_KW("from") );
681
682 trim_remove_type = (S3SELECT_KW("leading") | S3SELECT_KW("trailing") | S3SELECT_KW("both") );
683
684 datediff = (S3SELECT_KW("date_diff") >> '(' >> date_part >> ',' >> arithmetic_expression >> ',' >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_datediff)];
685
686 dateadd = (S3SELECT_KW("date_add") >> '(' >> date_part >> ',' >> arithmetic_expression >> ',' >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_dateadd)];
687
688 extract = (S3SELECT_KW("extract") >> '(' >> (date_part_extract)[BOOST_BIND_ACTION(push_date_part)] >> S3SELECT_KW("from") >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_extract)];
689
690 date_part = (S3SELECT_KW("year") | S3SELECT_KW("month") | S3SELECT_KW("day") | S3SELECT_KW("hour") | S3SELECT_KW("minute") | S3SELECT_KW("second")) [BOOST_BIND_ACTION(push_date_part)];
691
692 date_part_extract = ((date_part) | S3SELECT_KW("week") | S3SELECT_KW("timezone_hour") | S3SELECT_KW("timezone_minute"));
693
694 time_to_string_constant = (S3SELECT_KW("to_string") >> '(' >> arithmetic_expression >> ',' >> (string)[BOOST_BIND_ACTION(push_string)] >> ')') [BOOST_BIND_ACTION(push_time_to_string_constant)];
695
696 time_to_string_dynamic = (S3SELECT_KW("to_string") >> '(' >> arithmetic_expression >> ',' >> arithmetic_expression >> ')') [BOOST_BIND_ACTION(push_time_to_string_dynamic)];
697
698 number = bsc::int_p;
699
700 float_number = bsc::real_p;
701
702 string = (bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"")) | (bsc::str_p("\'") >> *( bsc::anychar_p - bsc::str_p("\'") ) >> bsc::str_p("\'")) ;
703
704 column_pos = (variable_name >> "." >> column_pos_name) | column_pos_name; //TODO what about space
705
706 column_pos_name = ('_'>>+(bsc::digit_p) ) | '*' ;
707
708 muldiv_operator = bsc::str_p("*") | bsc::str_p("/") | bsc::str_p("^") | bsc::str_p("%");// got precedense
709
710 addsubop_operator = bsc::str_p("+") | bsc::str_p("-");
711
712 arith_cmp = bsc::str_p(">=") | bsc::str_p("<=") | bsc::str_p("=") | bsc::str_p("<") | bsc::str_p(">") | bsc::str_p("!=");
713
714 and_op = S3SELECT_KW("and");
715
716 or_op = S3SELECT_KW("or");
717
718 variable_name = bsc::lexeme_d[(+bsc::alpha_p >> *( bsc::alpha_p | bsc::digit_p | '_') ) - S3SELECT_KW("not")];
719
720 variable = (variable_name >> "." >> variable_name) | variable_name;
721 }
722
723
724 bsc::rule<ScannerT> cast, data_type, variable, variable_name, select_expr, select_expr_base, s3_object, where_clause, number, float_number, string, from_expression;
725 bsc::rule<ScannerT> cmp_operand, arith_cmp, condition_expression, arithmetic_predicate, logical_predicate, factor;
726 bsc::rule<ScannerT> trim, trim_whitespace_both, trim_one_side_whitespace, trim_anychar_anyside, trim_type, trim_remove_type, substr, substr_from, substr_from_for;
727 bsc::rule<ScannerT> datediff, dateadd, extract, date_part, date_part_extract, time_to_string_constant, time_to_string_dynamic;
728 bsc::rule<ScannerT> special_predicates, between_predicate, in_predicate, like_predicate, like_predicate_escape, like_predicate_no_escape, is_null, is_not_null;
729 bsc::rule<ScannerT> muldiv_operator, addsubop_operator, function, arithmetic_expression, addsub_operand, list_of_function_arguments, arithmetic_argument, mulldiv_operand;
730 bsc::rule<ScannerT> fs_type, object_path;
731 bsc::rule<ScannerT> projections, projection_expression, alias_name, column_pos,column_pos_name;
732 bsc::rule<ScannerT> when_case_else_projection, when_case_value_when, when_stmt, when_value_then;
733 bsc::rule<ScannerT> logical_and,and_op,or_op;
734 bsc::rule<ScannerT> const& start() const
735 {
736 return select_expr ;
737 }
738 };
739 };
740
741 void base_ast_builder::operator()(s3select *self, const char *a, const char *b) const
742 {
743 //the purpose of the following procedure is to bypass boost::spirit rescan (calling to bind-action more than once per the same text)
744 //which cause wrong AST creation (and later false execution).
745 if (self->getAction()->is_already_scanned((void *)(this), const_cast<char *>(a)))
746 return;
747
748 builder(self, a, b);
749 }
750
751 void push_from_clause::builder(s3select* self, const char* a, const char* b) const
752 {
753 std::string token(a, b),table_name,alias_name;
754
755 //should search for generic space
756 if(token.find(' ') != std::string::npos)
757 {
758 size_t pos = token.find(' ');
759 table_name = token.substr(0,pos);
760
761 pos = token.rfind(' ');
762 alias_name = token.substr(pos+1,token.size());
763
764 self->getAction()->table_alias = alias_name;
765
766 if(self->getAction()->column_prefix != "##" && self->getAction()->table_alias != self->getAction()->column_prefix)
767 {
768 throw base_s3select_exception(std::string("query can not contain more then a single table-alias"), base_s3select_exception::s3select_exp_en_t::FATAL);
769 }
770
771 token = table_name;
772 }
773
774 self->getAction()->from_clause = token; //TODO add table alias
775
776 self->getAction()->exprQ.clear();
777
778 }
779
780 void push_number::builder(s3select* self, const char* a, const char* b) const
781 {
782 std::string token(a, b);
783
784 variable* v = S3SELECT_NEW(self, variable, atoi(token.c_str()));
785
786 self->getAction()->exprQ.push_back(v);
787 }
788
789 void push_float_number::builder(s3select* self, const char* a, const char* b) const
790 {
791 std::string token(a, b);
792
793 //the parser for float(real_p) is accepting also integers, thus "blocking" integer acceptence and all are float.
794 bsc::parse_info<> info = bsc::parse(token.c_str(), bsc::int_p, bsc::space_p);
795
796 if (!info.full)
797 {
798 char* perr;
799 double d = strtod(token.c_str(), &perr);
800 variable* v = S3SELECT_NEW(self, variable, d);
801
802 self->getAction()->exprQ.push_back(v);
803 }
804 else
805 {
806 variable* v = S3SELECT_NEW(self, variable, atoi(token.c_str()));
807
808 self->getAction()->exprQ.push_back(v);
809 }
810 }
811
812 void push_string::builder(s3select* self, const char* a, const char* b) const
813 {
814 a++;
815 b--; // remove double quotes
816 std::string token(a, b);
817
818 variable* v = S3SELECT_NEW(self, variable, token, variable::var_t::COL_VALUE);
819
820 self->getAction()->exprQ.push_back(v);
821 }
822
823 void push_variable::builder(s3select* self, const char* a, const char* b) const
824 {
825 std::string token(a, b);
826
827 variable* v = nullptr;
828
829 if (g_s3select_reserve_word.is_reserved_word(token))
830 {
831 if (g_s3select_reserve_word.get_reserved_word(token) == s3select_reserved_word::reserve_word_en_t::S3S_NULL)
832 {
833 v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::S3S_NULL);
834 }
835 else if (g_s3select_reserve_word.get_reserved_word(token) == s3select_reserved_word::reserve_word_en_t::S3S_NAN)
836 {
837 v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::S3S_NAN);
838 }
839 else if (g_s3select_reserve_word.get_reserved_word(token) == s3select_reserved_word::reserve_word_en_t::S3S_FALSE)
840 {
841 v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::S3S_FALSE);
842 }
843 else if (g_s3select_reserve_word.get_reserved_word(token) == s3select_reserved_word::reserve_word_en_t::S3S_TRUE)
844 {
845 v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::S3S_TRUE);
846 }
847 else
848 {
849 v = S3SELECT_NEW(self, variable, s3select_reserved_word::reserve_word_en_t::NA);
850 }
851
852 }
853 else
854 {
855 size_t pos = token.find('.');
856 std::string alias_name;
857 if(pos != std::string::npos)
858 {
859 alias_name = token.substr(0,pos);
860 pos ++;
861 token = token.substr(pos,token.size());
862
863 if(self->getAction()->column_prefix != "##" && alias_name != self->getAction()->column_prefix)
864 {
865 throw base_s3select_exception(std::string("query can not contain more then a single table-alias"), base_s3select_exception::s3select_exp_en_t::FATAL);
866 }
867
868 self->getAction()->column_prefix = alias_name;
869 }
870 v = S3SELECT_NEW(self, variable, token);
871 }
872
873 self->getAction()->exprQ.push_back(v);
874 }
875
876 void push_addsub::builder(s3select* self, const char* a, const char* b) const
877 {
878 std::string token(a, b);
879
880 if (token == "+")
881 {
882 self->getAction()->addsubQ.push_back(addsub_operation::addsub_op_t::ADD);
883 }
884 else
885 {
886 self->getAction()->addsubQ.push_back(addsub_operation::addsub_op_t::SUB);
887 }
888 }
889
890 void push_mulop::builder(s3select* self, const char* a, const char* b) const
891 {
892 std::string token(a, b);
893
894 if (token == "*")
895 {
896 self->getAction()->muldivQ.push_back(mulldiv_operation::muldiv_t::MULL);
897 }
898 else if (token == "/")
899 {
900 self->getAction()->muldivQ.push_back(mulldiv_operation::muldiv_t::DIV);
901 }
902 else if(token == "^")
903 {
904 self->getAction()->muldivQ.push_back(mulldiv_operation::muldiv_t::POW);
905 }
906 else
907 {
908 self->getAction()->muldivQ.push_back(mulldiv_operation::muldiv_t::MOD);
909 }
910 }
911
912 void push_addsub_binop::builder(s3select* self, [[maybe_unused]] const char* a,[[maybe_unused]] const char* b) const
913 {
914 base_statement* l = nullptr, *r = nullptr;
915
916 r = self->getAction()->exprQ.back();
917 self->getAction()->exprQ.pop_back();
918 l = self->getAction()->exprQ.back();
919 self->getAction()->exprQ.pop_back();
920 addsub_operation::addsub_op_t o = self->getAction()->addsubQ.back();
921 self->getAction()->addsubQ.pop_back();
922 addsub_operation* as = S3SELECT_NEW(self, addsub_operation, l, o, r);
923 self->getAction()->exprQ.push_back(as);
924 }
925
926 void push_mulldiv_binop::builder(s3select* self, [[maybe_unused]] const char* a, [[maybe_unused]] const char* b) const
927 {
928 base_statement* vl = nullptr, *vr = nullptr;
929
930 vr = self->getAction()->exprQ.back();
931 self->getAction()->exprQ.pop_back();
932 vl = self->getAction()->exprQ.back();
933 self->getAction()->exprQ.pop_back();
934 mulldiv_operation::muldiv_t o = self->getAction()->muldivQ.back();
935 self->getAction()->muldivQ.pop_back();
936 mulldiv_operation* f = S3SELECT_NEW(self, mulldiv_operation, vl, o, vr);
937 self->getAction()->exprQ.push_back(f);
938 }
939
940 void push_function_arg::builder(s3select* self, const char* a, const char* b) const
941 {
942 std::string token(a, b);
943
944 base_statement* be = self->getAction()->exprQ.back();
945 self->getAction()->exprQ.pop_back();
946 base_statement* f = self->getAction()->funcQ.back();
947
948 if (dynamic_cast<__function*>(f))
949 {
950 dynamic_cast<__function*>(f)->push_argument(be);
951 }
952 }
953
954 void push_function_name::builder(s3select* self, const char* a, const char* b) const
955 {
956 b--;
957 while (*b == '(' || *b == ' ')
958 {
959 b--; //point to function-name
960 }
961
962 std::string fn;
963 fn.assign(a, b - a + 1);
964
965 __function* func = S3SELECT_NEW(self, __function, fn.c_str(), self->getS3F());
966 self->getAction()->funcQ.push_back(func);
967 }
968
969 void push_function_expr::builder(s3select* self, const char* a, const char* b) const
970 {
971 std::string token(a, b);
972
973 base_statement* func = self->getAction()->funcQ.back();
974 self->getAction()->funcQ.pop_back();
975
976 self->getAction()->exprQ.push_back(func);
977 }
978
979 void push_compare_operator::builder(s3select* self, const char* a, const char* b) const
980 {
981 std::string token(a, b);
982 arithmetic_operand::cmp_t c = arithmetic_operand::cmp_t::NA;
983
984 if (token == "=")
985 {
986 c = arithmetic_operand::cmp_t::EQ;
987 }
988 else if (token == "!=")
989 {
990 c = arithmetic_operand::cmp_t::NE;
991 }
992 else if (token == ">=")
993 {
994 c = arithmetic_operand::cmp_t::GE;
995 }
996 else if (token == "<=")
997 {
998 c = arithmetic_operand::cmp_t::LE;
999 }
1000 else if (token == ">")
1001 {
1002 c = arithmetic_operand::cmp_t::GT;
1003 }
1004 else if (token == "<")
1005 {
1006 c = arithmetic_operand::cmp_t::LT;
1007 }
1008
1009 self->getAction()->arithmetic_compareQ.push_back(c);
1010 }
1011
1012 void push_logical_operator::builder(s3select* self, const char* a, const char* b) const
1013 {
1014 std::string token(a, b);
1015 logical_operand::oplog_t l = logical_operand::oplog_t::NA;
1016
1017 if (token == "and")
1018 {
1019 l = logical_operand::oplog_t::AND;
1020 }
1021 else if (token == "or")
1022 {
1023 l = logical_operand::oplog_t::OR;
1024 }
1025
1026 self->getAction()->logical_compareQ.push_back(l);
1027 }
1028
1029 void push_arithmetic_predicate::builder(s3select* self, const char* a, const char* b) const
1030 {
1031 std::string token(a, b);
1032
1033 base_statement* vr, *vl;
1034 arithmetic_operand::cmp_t c = self->getAction()->arithmetic_compareQ.back();
1035 self->getAction()->arithmetic_compareQ.pop_back();
1036
1037 if (!self->getAction()->exprQ.empty())
1038 {
1039 vr = self->getAction()->exprQ.back();
1040 self->getAction()->exprQ.pop_back();
1041 }
1042 else
1043 {
1044 throw base_s3select_exception(std::string("missing right operand for arithmetic-comparision expression"), base_s3select_exception::s3select_exp_en_t::FATAL);
1045 }
1046
1047 if (!self->getAction()->exprQ.empty())
1048 {
1049 vl = self->getAction()->exprQ.back();
1050 self->getAction()->exprQ.pop_back();
1051 }
1052 else
1053 {
1054 throw base_s3select_exception(std::string("missing left operand for arithmetic-comparision expression"), base_s3select_exception::s3select_exp_en_t::FATAL);
1055 }
1056
1057 arithmetic_operand* t = S3SELECT_NEW(self, arithmetic_operand, vl, c, vr);
1058
1059 self->getAction()->exprQ.push_back(t);
1060 }
1061
1062 void push_logical_predicate::builder(s3select* self, const char* a, const char* b) const
1063 {
1064 std::string token(a, b);
1065
1066 base_statement* tl = nullptr, *tr = nullptr;
1067 logical_operand::oplog_t oplog = self->getAction()->logical_compareQ.back();
1068 self->getAction()->logical_compareQ.pop_back();
1069
1070 if (self->getAction()->exprQ.empty() == false)
1071 {
1072 tr = self->getAction()->exprQ.back();
1073 self->getAction()->exprQ.pop_back();
1074 }
1075 else
1076 {//should reject by syntax parser
1077 throw base_s3select_exception(std::string("missing right operand for logical expression"), base_s3select_exception::s3select_exp_en_t::FATAL);
1078 }
1079
1080 if (self->getAction()->exprQ.empty() == false)
1081 {
1082 tl = self->getAction()->exprQ.back();
1083 self->getAction()->exprQ.pop_back();
1084 }
1085 else
1086 {//should reject by syntax parser
1087 throw base_s3select_exception(std::string("missing left operand for logical expression"), base_s3select_exception::s3select_exp_en_t::FATAL);
1088 }
1089
1090 logical_operand* f = S3SELECT_NEW(self, logical_operand, tl, oplog, tr);
1091
1092 self->getAction()->exprQ.push_back(f);
1093 }
1094
1095 void push_negation::builder(s3select* self, const char* a, const char* b) const
1096 {
1097 std::string token(a, b);
1098 base_statement* pred = nullptr;
1099
1100 if (self->getAction()->exprQ.empty() == false)
1101 {
1102 pred = self->getAction()->exprQ.back();
1103 self->getAction()->exprQ.pop_back();
1104 }
1105 else
1106 {
1107 throw base_s3select_exception(std::string("failed to create AST for NOT operator"), base_s3select_exception::s3select_exp_en_t::FATAL);
1108 }
1109
1110 //upon NOT operator, the logical and arithmetical operators are "tagged" to negate result.
1111 if (dynamic_cast<logical_operand*>(pred))
1112 {
1113 logical_operand* f = S3SELECT_NEW(self, logical_operand, pred);
1114 self->getAction()->exprQ.push_back(f);
1115 }
1116 else if (dynamic_cast<__function*>(pred) || dynamic_cast<negate_function_operation*>(pred) || dynamic_cast<variable*>(pred))
1117 {
1118 negate_function_operation* nf = S3SELECT_NEW(self, negate_function_operation, pred);
1119 self->getAction()->exprQ.push_back(nf);
1120 }
1121 else if(dynamic_cast<arithmetic_operand*>(pred))
1122 {
1123 arithmetic_operand* f = S3SELECT_NEW(self, arithmetic_operand, pred);
1124 self->getAction()->exprQ.push_back(f);
1125 }
1126 else
1127 {
1128 throw base_s3select_exception(std::string("failed to create AST for NOT operator"), base_s3select_exception::s3select_exp_en_t::FATAL);
1129 }
1130 }
1131
1132 void push_column_pos::builder(s3select* self, const char* a, const char* b) const
1133 {
1134 std::string token(a, b);
1135 std::string alias_name;
1136 variable* v;
1137
1138 if (token == "*" || token == "* ") //TODO space should skip in boost::spirit
1139 {
1140 v = S3SELECT_NEW(self, variable, token, variable::var_t::STAR_OPERATION);
1141
1142 //NOTE: variable may leak upon star-operation(multi_value object is not destruct entirly, it contain stl-vactor which is allocated on heap).
1143 //TODO: find a generic way for such use-cases, one possible solution is to push all-nodes(upon AST is complete) into cleanup-container.
1144 self->getS3F()->push_for_cleanup(v);
1145 }
1146 else
1147 {
1148 size_t pos = token.find('.');
1149 if(pos != std::string::npos)
1150 {
1151 alias_name = token.substr(0,pos);
1152
1153 pos ++;
1154 token = token.substr(pos,token.size());
1155
1156 if(self->getAction()->column_prefix != "##" && self->getAction()->column_prefix != alias_name)
1157 {
1158 throw base_s3select_exception(std::string("query can not contain more then a single table-alias"), base_s3select_exception::s3select_exp_en_t::FATAL);
1159 }
1160
1161 self->getAction()->column_prefix = alias_name;
1162 }
1163 v = S3SELECT_NEW(self, variable, token, variable::var_t::POS);
1164 }
1165
1166 self->getAction()->exprQ.push_back(v);
1167 }
1168
1169 void push_projection::builder(s3select* self, const char* a, const char* b) const
1170 {
1171 std::string token(a, b);
1172
1173 self->getAction()->projections.get()->push_back(self->getAction()->exprQ.back());
1174 self->getAction()->exprQ.pop_back();
1175 }
1176
1177 void push_alias_projection::builder(s3select* self, const char* a, const char* b) const
1178 {
1179 std::string token(a, b);
1180 //extract alias name
1181 const char* p = b;
1182 while (*(--p) != ' ')
1183 ;
1184 std::string alias_name(p + 1, b);
1185 base_statement* bs = self->getAction()->exprQ.back();
1186
1187 //mapping alias name to base-statement
1188 bool res = self->getAction()->alias_map.insert_new_entry(alias_name, bs);
1189 if (res == false)
1190 {
1191 throw base_s3select_exception(std::string("alias <") + alias_name + std::string("> is already been used in query"), base_s3select_exception::s3select_exp_en_t::FATAL);
1192 }
1193
1194 self->getAction()->projections.get()->push_back(bs);
1195 self->getAction()->exprQ.pop_back();
1196 }
1197
1198 void push_between_filter::builder(s3select* self, const char* a, const char* b) const
1199 {
1200 std::string token(a, b);
1201
1202 std::string between_function("#between#");
1203
1204 __function* func = S3SELECT_NEW(self, __function, between_function.c_str(), self->getS3F());
1205
1206 base_statement* second_expr = self->getAction()->exprQ.back();
1207 self->getAction()->exprQ.pop_back();
1208 func->push_argument(second_expr);
1209
1210 base_statement* first_expr = self->getAction()->exprQ.back();
1211 self->getAction()->exprQ.pop_back();
1212 func->push_argument(first_expr);
1213
1214 base_statement* main_expr = self->getAction()->exprQ.back();
1215 self->getAction()->exprQ.pop_back();
1216 func->push_argument(main_expr);
1217
1218 self->getAction()->exprQ.push_back(func);
1219 }
1220
1221 void push_in_predicate_first_arg::builder(s3select* self, const char* a, const char* b) const
1222 {
1223 std::string token(a, b);
1224
1225 if(self->getAction()->exprQ.empty())
1226 {
1227 throw base_s3select_exception("failed to create AST for in predicate", base_s3select_exception::s3select_exp_en_t::FATAL);
1228 }
1229
1230 self->getAction()->inPredicateQ.push_back( self->getAction()->exprQ.back() );
1231 self->getAction()->exprQ.pop_back();
1232
1233 if(self->getAction()->exprQ.empty())
1234 {
1235 throw base_s3select_exception("failed to create AST for in predicate", base_s3select_exception::s3select_exp_en_t::FATAL);
1236 }
1237
1238 self->getAction()->inMainArg = self->getAction()->exprQ.back();
1239 self->getAction()->exprQ.pop_back();
1240
1241
1242 }
1243
1244 void push_in_predicate_arguments::builder(s3select* self, const char* a, const char* b) const
1245 {
1246 std::string token(a, b);
1247
1248 if(self->getAction()->exprQ.empty())
1249 {
1250 throw base_s3select_exception("failed to create AST for in predicate", base_s3select_exception::s3select_exp_en_t::FATAL);
1251 }
1252
1253 self->getAction()->inPredicateQ.push_back( self->getAction()->exprQ.back() );
1254
1255 self->getAction()->exprQ.pop_back();
1256
1257 }
1258
1259 void push_in_predicate::builder(s3select* self, const char* a, const char* b) const
1260 {
1261 // expr in (e1,e2,e3 ...)
1262 std::string token(a, b);
1263
1264 std::string in_function("#in_predicate#");
1265
1266 __function* func = S3SELECT_NEW(self, __function, in_function.c_str(), self->getS3F());
1267
1268 while(!self->getAction()->inPredicateQ.empty())
1269 {
1270 base_statement* ei = self->getAction()->inPredicateQ.back();
1271
1272 self->getAction()->inPredicateQ.pop_back();
1273
1274 func->push_argument(ei);
1275
1276 }
1277
1278 func->push_argument( self->getAction()->inMainArg );
1279
1280 self->getAction()->exprQ.push_back(func);
1281
1282 self->getAction()->inPredicateQ.clear();
1283
1284 self->getAction()->inMainArg = 0;
1285 }
1286
1287 void push_like_predicate_no_escape::builder(s3select* self, const char* a, const char* b) const
1288 {
1289
1290 std::string token(a, b);
1291 std::string in_function("#like_predicate#");
1292
1293 __function* func = S3SELECT_NEW(self, __function, in_function.c_str(), self->getS3F());
1294
1295 variable* v = S3SELECT_NEW(self, variable, "\\",variable::var_t::COL_VALUE);
1296 func->push_argument(v);
1297
1298 // experimenting valgrind-issue happens only on teuthology
1299 //self->getS3F()->push_for_cleanup(v);
1300
1301 base_statement* like_expr = self->getAction()->exprQ.back();
1302 self->getAction()->exprQ.pop_back();
1303 func->push_argument(like_expr);
1304
1305 base_statement* expr = self->getAction()->exprQ.back();
1306 self->getAction()->exprQ.pop_back();
1307
1308 func->push_argument(expr);
1309
1310 self->getAction()->exprQ.push_back(func);
1311 }
1312
1313 void push_like_predicate_escape::builder(s3select* self, const char* a, const char* b) const
1314 {
1315 std::string token(a, b);
1316 std::string in_function("#like_predicate#");
1317
1318 __function* func = S3SELECT_NEW(self, __function, in_function.c_str(), self->getS3F());
1319
1320 base_statement* expr = self->getAction()->exprQ.back();
1321 self->getAction()->exprQ.pop_back();
1322
1323 func->push_argument(expr);
1324
1325 base_statement* main_expr = self->getAction()->exprQ.back();
1326 self->getAction()->exprQ.pop_back();
1327 func->push_argument(main_expr);
1328
1329 base_statement* escape_expr = self->getAction()->exprQ.back();
1330 self->getAction()->exprQ.pop_back();
1331 func->push_argument(escape_expr);
1332
1333 self->getAction()->exprQ.push_back(func);
1334 }
1335
1336 void push_is_null_predicate::builder(s3select* self, const char* a, const char* b) const
1337 {
1338 //expression is null, is not null
1339 std::string token(a, b);
1340 bool is_null = true;
1341
1342 for(size_t i=0;i<token.size();i++)
1343 {//TODO use other scan rules
1344 bsc::parse_info<> info = bsc::parse(token.c_str()+i, (bsc::str_p("is") >> bsc::str_p("not") >> bsc::str_p("null")) , bsc::space_p);
1345 if (info.full)
1346 is_null = false;
1347 }
1348
1349 std::string in_function("#is_null#");
1350
1351 if (is_null == false)
1352 {
1353 in_function = "#is_not_null#";
1354 }
1355
1356 __function* func = S3SELECT_NEW(self, __function, in_function.c_str(), self->getS3F());
1357
1358 if (!self->getAction()->exprQ.empty())
1359 {
1360 base_statement* expr = self->getAction()->exprQ.back();
1361 self->getAction()->exprQ.pop_back();
1362 func->push_argument(expr);
1363 }
1364
1365 self->getAction()->exprQ.push_back(func);
1366 }
1367
1368 void push_when_condition_then::builder(s3select* self, const char* a, const char* b) const
1369 {
1370 std::string token(a, b);
1371
1372 __function* func = S3SELECT_NEW(self, __function, "#when-then#", self->getS3F());
1373
1374 base_statement* then_expr = self->getAction()->exprQ.back();
1375 self->getAction()->exprQ.pop_back();
1376
1377 base_statement* when_expr = self->getAction()->exprQ.back();
1378 self->getAction()->exprQ.pop_back();
1379
1380 func->push_argument(then_expr);
1381 func->push_argument(when_expr);
1382
1383 self->getAction()->whenThenQ.push_back(func);
1384
1385 self->getAction()->when_then_count ++;
1386 }
1387
1388 void push_case_when_else::builder(s3select* self, const char* a, const char* b) const
1389 {
1390 std::string token(a, b);
1391
1392 base_statement* else_expr = self->getAction()->exprQ.back();
1393 self->getAction()->exprQ.pop_back();
1394
1395 __function* func = S3SELECT_NEW(self, __function, "#case-when-else#", self->getS3F());
1396
1397 func->push_argument(else_expr);
1398
1399 while(self->getAction()->when_then_count)
1400 {
1401 base_statement* when_then_func = self->getAction()->whenThenQ.back();
1402 self->getAction()->whenThenQ.pop_back();
1403
1404 func->push_argument(when_then_func);
1405
1406 self->getAction()->when_then_count--;
1407 }
1408
1409 // condQ is cleared explicitly, because of "leftover", due to double scanning upon accepting
1410 // the following rule '(' condition-expression ')' , i.e. (3*3 == 12)
1411 // Because of the double-scan (bug in spirit?defintion?), a sub-tree for the left side is created, twice.
1412 // thus, it causes wrong calculation.
1413
1414 self->getAction()->exprQ.clear();
1415
1416 self->getAction()->exprQ.push_back(func);
1417 }
1418
1419 void push_case_value::builder(s3select* self, const char* a, const char* b) const
1420 {
1421 std::string token(a, b);
1422
1423 base_statement* case_value = self->getAction()->exprQ.back();
1424 self->getAction()->exprQ.pop_back();
1425
1426 self->getAction()->caseValueQ.push_back(case_value);
1427 }
1428
1429 void push_when_value_then::builder(s3select* self, const char* a, const char* b) const
1430 {
1431 std::string token(a, b);
1432
1433 __function* func = S3SELECT_NEW(self, __function, "#when-value-then#", self->getS3F());
1434
1435 base_statement* then_expr = self->getAction()->exprQ.back();
1436 self->getAction()->exprQ.pop_back();
1437
1438 base_statement* when_expr = self->getAction()->exprQ.back();
1439 self->getAction()->exprQ.pop_back();
1440
1441 base_statement* case_expr = self->getAction()->caseValueQ.back();
1442
1443 func->push_argument(then_expr);
1444 func->push_argument(when_expr);
1445 func->push_argument(case_expr);
1446
1447 self->getAction()->whenThenQ.push_back(func);
1448
1449 self->getAction()->when_then_count ++;
1450 }
1451
1452 void push_cast_expr::builder(s3select* self, const char* a, const char* b) const
1453 {
1454 //cast(expression as int/float/string/timestamp) --> new function "int/float/string/timestamp" ( args = expression )
1455 std::string token(a, b);
1456
1457 std::string cast_function;
1458
1459 cast_function = self->getAction()->dataTypeQ.back();
1460 self->getAction()->dataTypeQ.pop_back();
1461
1462 __function* func = S3SELECT_NEW(self, __function, cast_function.c_str(), self->getS3F());
1463
1464 base_statement* expr = self->getAction()->exprQ.back();
1465 self->getAction()->exprQ.pop_back();
1466 func->push_argument(expr);
1467
1468 self->getAction()->exprQ.push_back(func);
1469 }
1470
1471 void push_data_type::builder(s3select* self, const char* a, const char* b) const
1472 {
1473 std::string token(a, b);
1474
1475 auto cast_operator = [&](const char *s){return strncmp(a,s,strlen(s))==0;};
1476
1477 if(cast_operator("int"))
1478 {
1479 self->getAction()->dataTypeQ.push_back("int");
1480 }else if(cast_operator("float"))
1481 {
1482 self->getAction()->dataTypeQ.push_back("float");
1483 }else if(cast_operator("string"))
1484 {
1485 self->getAction()->dataTypeQ.push_back("string");
1486 }else if(cast_operator("timestamp"))
1487 {
1488 self->getAction()->dataTypeQ.push_back("to_timestamp");
1489 }else if(cast_operator("bool"))
1490 {
1491 self->getAction()->dataTypeQ.push_back("to_bool");
1492 }
1493 }
1494
1495 void push_trim_whitespace_both::builder(s3select* self, const char* a, const char* b) const
1496 {
1497 std::string token(a, b);
1498
1499 __function* func = S3SELECT_NEW(self, __function, "#trim#", self->getS3F());
1500
1501 base_statement* expr = self->getAction()->exprQ.back();
1502 self->getAction()->exprQ.pop_back();
1503 func->push_argument(expr);
1504
1505 self->getAction()->exprQ.push_back(func);
1506 }
1507
1508 void push_trim_expr_one_side_whitespace::builder(s3select* self, const char* a, const char* b) const
1509 {
1510 std::string token(a, b);
1511
1512 std::string trim_function;
1513
1514 trim_function = self->getAction()->trimTypeQ.back();
1515 self->getAction()->trimTypeQ.pop_back();
1516
1517 __function* func = S3SELECT_NEW(self, __function, trim_function.c_str(), self->getS3F());
1518
1519 base_statement* inp_expr = self->getAction()->exprQ.back();
1520 self->getAction()->exprQ.pop_back();
1521 func->push_argument(inp_expr);
1522
1523 self->getAction()->exprQ.push_back(func);
1524 }
1525
1526 void push_trim_expr_anychar_anyside::builder(s3select* self, const char* a, const char* b) const
1527 {
1528 std::string token(a, b);
1529
1530 std::string trim_function;
1531
1532 trim_function = self->getAction()->trimTypeQ.back();
1533 self->getAction()->trimTypeQ.pop_back();
1534
1535 __function* func = S3SELECT_NEW(self, __function, trim_function.c_str(), self->getS3F());
1536
1537 base_statement* expr = self->getAction()->exprQ.back();
1538 self->getAction()->exprQ.pop_back();
1539 func->push_argument(expr);
1540
1541 base_statement* inp_expr = self->getAction()->exprQ.back();
1542 self->getAction()->exprQ.pop_back();
1543 func->push_argument(inp_expr);
1544
1545 self->getAction()->exprQ.push_back(func);
1546 }
1547
1548 void push_trim_type::builder(s3select* self, const char* a, const char* b) const
1549 {
1550 std::string token(a, b);
1551
1552 auto trim_option = [&](const char *s){return strncmp(a,s,strlen(s))==0;};
1553
1554 if(trim_option("leading"))
1555 {
1556 self->getAction()->trimTypeQ.push_back("#leading#");
1557 }else if(trim_option("trailing"))
1558 {
1559 self->getAction()->trimTypeQ.push_back("#trailing#");
1560 }else
1561 {
1562 self->getAction()->trimTypeQ.push_back("#trim#");
1563 }
1564 }
1565
1566 void push_substr_from::builder(s3select* self, const char* a, const char* b) const
1567 {
1568 std::string token(a, b);
1569
1570 __function* func = S3SELECT_NEW(self, __function, "substring", self->getS3F());
1571
1572 base_statement* expr = self->getAction()->exprQ.back();
1573 self->getAction()->exprQ.pop_back();
1574
1575 base_statement* start_position = self->getAction()->exprQ.back();
1576
1577 self->getAction()->exprQ.pop_back();
1578 func->push_argument(start_position);
1579 func->push_argument(expr);
1580
1581 self->getAction()->exprQ.push_back(func);
1582 }
1583
1584 void push_substr_from_for::builder(s3select* self, const char* a, const char* b) const
1585 {
1586 std::string token(a, b);
1587
1588 __function* func = S3SELECT_NEW(self, __function, "substring", self->getS3F());
1589
1590 base_statement* expr = self->getAction()->exprQ.back();
1591 self->getAction()->exprQ.pop_back();
1592
1593 base_statement* start_position = self->getAction()->exprQ.back();
1594 self->getAction()->exprQ.pop_back();
1595
1596 base_statement* end_position = self->getAction()->exprQ.back();
1597 self->getAction()->exprQ.pop_back();
1598
1599 func->push_argument(end_position);
1600 func->push_argument(start_position);
1601 func->push_argument(expr);
1602
1603 self->getAction()->exprQ.push_back(func);
1604 }
1605
1606 void push_datediff::builder(s3select* self, const char* a, const char* b) const
1607 {
1608 std::string token(a, b);
1609
1610 std::string date_op;
1611
1612 date_op = self->getAction()->datePartQ.back();
1613 self->getAction()->datePartQ.pop_back();
1614
1615 std::string date_function = "#datediff_" + date_op + "#";
1616
1617 __function* func = S3SELECT_NEW(self, __function, date_function.c_str(), self->getS3F());
1618
1619 base_statement* expr = self->getAction()->exprQ.back();
1620 self->getAction()->exprQ.pop_back();
1621
1622 base_statement* start_position = self->getAction()->exprQ.back();
1623 self->getAction()->exprQ.pop_back();
1624
1625 func->push_argument(start_position);
1626 func->push_argument(expr);
1627
1628 self->getAction()->exprQ.push_back(func);
1629 }
1630
1631 void push_dateadd::builder(s3select* self, const char* a, const char* b) const
1632 {
1633 std::string token(a, b);
1634
1635 std::string date_op;
1636
1637 date_op = self->getAction()->datePartQ.back();
1638 self->getAction()->datePartQ.pop_back();
1639
1640 std::string date_function = "#dateadd_" + date_op + "#";
1641
1642 __function* func = S3SELECT_NEW(self, __function, date_function.c_str(), self->getS3F());
1643
1644 base_statement* expr = self->getAction()->exprQ.back();
1645 self->getAction()->exprQ.pop_back();
1646
1647 base_statement* start_position = self->getAction()->exprQ.back();
1648 self->getAction()->exprQ.pop_back();
1649
1650 func->push_argument(start_position);
1651 func->push_argument(expr);
1652
1653 self->getAction()->exprQ.push_back(func);
1654 }
1655
1656 void push_extract::builder(s3select* self, const char* a, const char* b) const
1657 {
1658 std::string token(a, b);
1659
1660 std::string date_op;
1661
1662 date_op = self->getAction()->datePartQ.back();
1663 self->getAction()->datePartQ.pop_back();
1664
1665 std::string date_function = "#extract_" + date_op + "#";
1666
1667 __function* func = S3SELECT_NEW(self, __function, date_function.c_str(), self->getS3F());
1668
1669 base_statement* expr = self->getAction()->exprQ.back();
1670 self->getAction()->exprQ.pop_back();
1671
1672 func->push_argument(expr);
1673
1674 self->getAction()->exprQ.push_back(func);
1675 }
1676
1677 void push_date_part::builder(s3select* self, const char* a, const char* b) const
1678 {
1679 std::string token(a, b);
1680
1681 self->getAction()->datePartQ.push_back(token);
1682 }
1683
1684 void push_time_to_string_constant::builder(s3select* self, const char* a, const char* b) const
1685 {
1686 std::string token(a, b);
1687
1688 __function* func = S3SELECT_NEW(self, __function, "#to_string_constant#", self->getS3F());
1689
1690 base_statement* expr = self->getAction()->exprQ.back();
1691 self->getAction()->exprQ.pop_back();
1692
1693 base_statement* frmt = self->getAction()->exprQ.back();
1694 self->getAction()->exprQ.pop_back();
1695
1696 func->push_argument(frmt);
1697 func->push_argument(expr);
1698
1699 self->getAction()->exprQ.push_back(func);
1700
1701 }
1702
1703 void push_time_to_string_dynamic::builder(s3select* self, const char* a, const char* b) const
1704 {
1705 std::string token(a, b);
1706
1707 __function* func = S3SELECT_NEW(self, __function, "#to_string_dynamic#", self->getS3F());
1708
1709 base_statement* expr = self->getAction()->exprQ.back();
1710 self->getAction()->exprQ.pop_back();
1711
1712 base_statement* frmt = self->getAction()->exprQ.back();
1713 self->getAction()->exprQ.pop_back();
1714
1715 func->push_argument(frmt);
1716 func->push_argument(expr);
1717
1718 self->getAction()->exprQ.push_back(func);
1719 }
1720
1721 /////// handling different object types
1722 class base_s3object
1723 {
1724
1725 protected:
1726 scratch_area* m_sa;
1727 std::string m_obj_name;
1728
1729 public:
1730 explicit base_s3object(scratch_area* m) : m_sa(m){}
1731
1732 void set(scratch_area* m)
1733 {
1734 m_sa = m;
1735 }
1736
1737 virtual ~base_s3object() = default;
1738 };
1739
1740
1741 class csv_object : public base_s3object
1742 {
1743
1744 public:
1745 struct csv_defintions
1746 {
1747 char row_delimiter;
1748 char column_delimiter;
1749 char output_row_delimiter;
1750 char output_column_delimiter;
1751 char escape_char;
1752 char output_escape_char;
1753 char output_quot_char;
1754 char quot_char;
1755 bool use_header_info;
1756 bool ignore_header_info;//skip first line
1757 bool quote_fields_always;
1758 bool quote_fields_asneeded;
1759 bool redundant_column;
1760
1761 csv_defintions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false) {}
1762
1763 } m_csv_defintion;
1764
1765 explicit csv_object(s3select* s3_query) :
1766 base_s3object(s3_query->get_scratch_area()),
1767 m_skip_last_line(false),
1768 m_s3_select(nullptr),
1769 m_error_count(0),
1770 m_extract_csv_header_info(false),
1771 m_previous_line(false),
1772 m_skip_first_line(false),
1773 m_processed_bytes(0)
1774 {
1775 set(s3_query);
1776 csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
1777 }
1778
1779 csv_object(s3select* s3_query, struct csv_defintions csv) :
1780 base_s3object(s3_query->get_scratch_area()),
1781 m_skip_last_line(false),
1782 m_s3_select(nullptr),
1783 m_error_count(0),
1784 m_extract_csv_header_info(false),
1785 m_previous_line(false),
1786 m_skip_first_line(false),
1787 m_processed_bytes(0)
1788 {
1789 set(s3_query);
1790 m_csv_defintion = csv;
1791 csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
1792 }
1793
1794 csv_object():
1795 base_s3object(nullptr),
1796 m_skip_last_line(false),
1797 m_s3_select(nullptr),
1798 m_error_count(0),
1799 m_extract_csv_header_info(false),
1800 m_previous_line(false),
1801 m_skip_first_line(false),
1802 m_processed_bytes(0)
1803 {
1804 csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
1805 }
1806
1807 private:
1808 base_statement* m_where_clause;
1809 std::vector<base_statement*> m_projections;
1810 bool m_aggr_flow = false; //TODO once per query
1811 bool m_is_to_aggregate;
1812 bool m_skip_last_line;
1813 std::string m_error_description;
1814 char* m_stream;
1815 char* m_end_stream;
1816 std::vector<char*> m_row_tokens{128};
1817 s3select* m_s3_select;
1818 csvParser csv_parser;
1819 size_t m_error_count;
1820 bool m_extract_csv_header_info;
1821 std::vector<std::string> m_csv_schema{128};
1822
1823 //handling arbitrary chunks (rows cut in the middle)
1824 bool m_previous_line;
1825 bool m_skip_first_line;
1826 std::string merge_line;
1827 std::string m_last_line;
1828 size_t m_processed_bytes;
1829
1830 int getNextRow()
1831 {
1832 size_t num_of_tokens=0;
1833
1834 if(m_stream>=m_end_stream)
1835 {
1836 return -1;
1837 }
1838
1839 if(csv_parser.parse(m_stream, m_end_stream, &m_row_tokens, &num_of_tokens)<0)
1840 {
1841 throw base_s3select_exception("failed to parse csv stream", base_s3select_exception::s3select_exp_en_t::FATAL);
1842 }
1843
1844 m_stream = (char*)csv_parser.currentLoc();
1845
1846 if (m_skip_last_line && m_stream >= m_end_stream)
1847 {
1848 return -1;
1849 }
1850
1851 return num_of_tokens;
1852
1853 }
1854
1855 public:
1856
1857 void set(s3select* s3_query)
1858 {
1859 m_s3_select = s3_query;
1860 base_s3object::set(m_s3_select->get_scratch_area());
1861
1862 m_projections = m_s3_select->get_projections_list();
1863 m_where_clause = m_s3_select->get_filter();
1864
1865 if (m_where_clause)
1866 {
1867 m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases());
1868 }
1869
1870 for (auto& p : m_projections)
1871 {
1872 p->traverse_and_apply(m_sa, m_s3_select->get_aliases());
1873 }
1874
1875 m_aggr_flow = m_s3_select->is_aggregate_query();
1876 }
1877
1878 void set_csv_query(s3select* s3_query,struct csv_defintions csv)
1879 {
1880 if(m_s3_select != nullptr)
1881 {
1882 return;
1883 }
1884
1885 set(s3_query);
1886 m_csv_defintion = csv;
1887 csv_parser.set(m_csv_defintion.row_delimiter, m_csv_defintion.column_delimiter, m_csv_defintion.quot_char, m_csv_defintion.escape_char);
1888 }
1889
1890 std::string get_error_description()
1891 {
1892 return m_error_description;
1893 }
1894
1895 virtual ~csv_object() = default;
1896
1897 public:
1898
1899 void result_values_to_string(multi_values& projections_resuls, std::string& result)
1900 {
1901 size_t i = 0;
1902 std::string output_delimiter(1,m_csv_defintion.output_column_delimiter);
1903
1904 for(auto res : projections_resuls.values)
1905 {
1906 if (m_csv_defintion.quote_fields_always) {
1907 std::ostringstream quoted_result;
1908 quoted_result << std::quoted(res->to_string(),m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
1909 result.append(quoted_result.str());
1910 }//TODO to add asneeded
1911 else
1912 {
1913 result.append( res->to_string() );
1914 }
1915
1916 if(!m_csv_defintion.redundant_column) {
1917 if(++i < projections_resuls.values.size()) {
1918 result.append(output_delimiter);
1919 }
1920 }
1921 else {
1922 result.append(output_delimiter);
1923 }
1924 }
1925 }
1926
1927 int getMatchRow( std::string& result) //TODO virtual ? getResult
1928 {
1929 int number_of_tokens = 0;
1930 std::string output_delimiter(1,m_csv_defintion.output_row_delimiter);
1931 multi_values projections_resuls;
1932
1933
1934
1935 if (m_aggr_flow == true)
1936 {
1937 do
1938 {
1939
1940 number_of_tokens = getNextRow();
1941 if (number_of_tokens < 0) //end of stream
1942 {
1943 projections_resuls.clear();
1944 if (m_is_to_aggregate)
1945 for (auto& i : m_projections)
1946 {
1947 i->set_last_call();
1948 i->set_skip_non_aggregate(false);//projection column is set to be runnable
1949
1950 projections_resuls.push_value( &(i->eval()) );
1951 }
1952
1953 result_values_to_string(projections_resuls,result);
1954 return number_of_tokens;
1955 }
1956
1957 if ((*m_projections.begin())->is_set_last_call())
1958 {
1959 //should validate while query execution , no update upon nodes are marked with set_last_call
1960 throw base_s3select_exception("on aggregation query , can not stream row data post do-aggregate call", base_s3select_exception::s3select_exp_en_t::FATAL);
1961 }
1962
1963 m_sa->update(m_row_tokens, number_of_tokens);
1964 for (auto& a : *m_s3_select->get_aliases()->get())
1965 {
1966 a.second->invalidate_cache_result();
1967 }
1968
1969 if (!m_where_clause || m_where_clause->eval().is_true())
1970 for (auto i : m_projections)
1971 {
1972 i->eval();
1973 }
1974
1975 }
1976 while (true);
1977 }
1978 else
1979 {
1980
1981 do
1982 {
1983
1984 number_of_tokens = getNextRow();
1985 if (number_of_tokens < 0)
1986 {
1987 return number_of_tokens;
1988 }
1989
1990 m_sa->update(m_row_tokens, number_of_tokens);
1991 for (auto& a : *m_s3_select->get_aliases()->get())
1992 {
1993 a.second->invalidate_cache_result();
1994 }
1995
1996 }
1997 while (m_where_clause && !m_where_clause->eval().is_true());
1998
1999 projections_resuls.clear();
2000 for (auto& i : m_projections)
2001 {
2002 projections_resuls.push_value( &(i->eval()) );
2003 }
2004 result_values_to_string(projections_resuls,result);
2005 result.append(output_delimiter);
2006 }
2007
2008 return number_of_tokens; //TODO wrong
2009 }
2010
2011 int extract_csv_header_info()
2012 {
2013
2014 if (m_csv_defintion.ignore_header_info == true)
2015 {
2016 while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter ))
2017 {
2018 m_stream++;
2019 }
2020 m_stream++;
2021 }
2022 else if(m_csv_defintion.use_header_info == true)
2023 {
2024 size_t num_of_tokens = getNextRow();//TODO validate number of tokens
2025
2026 for(size_t i=0; i<num_of_tokens; i++)
2027 {
2028 m_csv_schema[i].assign(m_row_tokens[i]);
2029 }
2030
2031 m_s3_select->load_schema(m_csv_schema);
2032 }
2033
2034 m_extract_csv_header_info = true;
2035
2036 return 0;
2037 }
2038
2039
2040 int run_s3select_on_stream(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size)
2041 {
2042 int status=0;
2043 try{
2044 status = run_s3select_on_stream_internal(result,csv_stream,stream_length,obj_size);
2045 }
2046 catch(base_s3select_exception& e)
2047 {
2048 m_error_description = e.what();
2049 m_error_count ++;
2050 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100)//abort query execution
2051 {
2052 return -1;
2053 }
2054 }
2055 catch(chunkalloc_out_of_mem)
2056 {
2057 m_error_description = "out of memory";
2058 return -1;
2059 }
2060
2061 return status;
2062 }
2063
2064 private:
2065 int run_s3select_on_stream_internal(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size)
2066 {
2067 //purpose: the cv data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores
2068 //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed.
2069 std::string tmp_buff;
2070 m_processed_bytes += stream_length;
2071
2072 m_skip_first_line = false;
2073
2074 if (m_previous_line)
2075 {
2076 //if previous broken line exist , merge it to current chunk
2077 char* p_obj_chunk = (char*)csv_stream;
2078 while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk<(csv_stream+stream_length))
2079 {
2080 p_obj_chunk++;
2081 }
2082
2083 tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
2084 merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
2085 m_previous_line = false;
2086 m_skip_first_line = true;
2087
2088 run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
2089 }
2090
2091 if (csv_stream[stream_length - 1] != m_csv_defintion.row_delimiter)
2092 {
2093 //in case of "broken" last line
2094 char* p_obj_chunk = (char*)&(csv_stream[stream_length - 1]);
2095 while (*p_obj_chunk != m_csv_defintion.row_delimiter && p_obj_chunk>csv_stream)
2096 {
2097 p_obj_chunk--; //scan until end-of previous line in chunk
2098 }
2099
2100 u_int32_t skip_last_bytes = (&(csv_stream[stream_length - 1]) - p_obj_chunk);
2101 m_last_line.assign(p_obj_chunk + 1, p_obj_chunk + 1 + skip_last_bytes); //save it for next chunk
2102
2103 m_previous_line = true;//it means to skip last line
2104
2105 }
2106
2107 return run_s3select_on_object(result, csv_stream, stream_length, m_skip_first_line, m_previous_line, (m_processed_bytes >= obj_size));
2108
2109 }
2110
2111 public:
2112 int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate)
2113 {
2114
2115
2116 m_stream = (char*)csv_stream;
2117 m_end_stream = (char*)csv_stream + stream_length;
2118 m_is_to_aggregate = do_aggregate;
2119 m_skip_last_line = skip_last_line;
2120
2121 if(m_extract_csv_header_info == false)
2122 {
2123 extract_csv_header_info();
2124 }
2125
2126 if(skip_first_line)
2127 {
2128 while(*m_stream && (*m_stream != m_csv_defintion.row_delimiter ))
2129 {
2130 m_stream++;
2131 }
2132 m_stream++;//TODO nicer
2133 }
2134
2135 do
2136 {
2137
2138 int num = 0;
2139 try
2140 {
2141 num = getMatchRow(result);
2142 }
2143 catch (base_s3select_exception& e)
2144 {
2145 m_error_description = e.what();
2146 m_error_count ++;
2147 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100 || (m_stream>=m_end_stream))//abort query execution
2148 {
2149 return -1;
2150 }
2151 }
2152
2153 if (num < 0)
2154 {
2155 break;
2156 }
2157
2158 }
2159 while (true);
2160
2161 return 0;
2162 }
2163 };
2164
2165 #ifdef _ARROW_EXIST
2166 class parquet_object : public base_s3object
2167 {
2168
2169 private:
2170 base_statement *m_where_clause;
2171 std::vector<base_statement *> m_projections;
2172 bool m_aggr_flow = false; //TODO once per query
2173 bool m_is_to_aggregate;
2174 std::string m_error_description;
2175 s3select *m_s3_select;
2176 size_t m_error_count;
2177 parquet_file_parser* object_reader;
2178 parquet_file_parser::column_pos_t m_where_clause_columns;
2179 parquet_file_parser::column_pos_t m_projections_columns;
2180 std::vector<parquet_file_parser::parquet_value_t> m_predicate_values;
2181 std::vector<parquet_file_parser::parquet_value_t> m_projections_values;
2182
2183 public:
2184
2185 void result_values_to_string(multi_values& projections_resuls, std::string& result)
2186 {
2187 size_t i = 0;
2188
2189 for(auto res : projections_resuls.values)
2190 {
2191 std::ostringstream quoted_result;
2192 //quoted_result << std::quoted(res->to_string(),'"','\\');
2193 quoted_result << res->to_string();
2194 if(++i < projections_resuls.values.size()) {
2195 quoted_result << ',';//TODO to use output serialization?
2196 }
2197 result.append(quoted_result.str());
2198 }
2199 }
2200
2201 parquet_object(std::string parquet_file_name, s3select *s3_query,s3selectEngine::rgw_s3select_api* rgw) : base_s3object(s3_query->get_scratch_area()),object_reader(nullptr)
2202 {
2203 try{
2204
2205 object_reader = new parquet_file_parser(parquet_file_name,rgw); //TODO uniq ptr
2206 } catch(std::exception &e)
2207 {
2208 throw base_s3select_exception(std::string("failure while processing parquet meta-data ") + std::string(e.what()) ,base_s3select_exception::s3select_exp_en_t::FATAL);
2209 }
2210
2211 set(s3_query);
2212
2213 s3_query->get_scratch_area()->set_parquet_type();
2214
2215 load_meta_data_into_scratch_area();
2216
2217 for(auto x : m_s3_select->get_projections_list())
2218 {
2219 x->extract_columns(m_projections_columns,object_reader->get_num_of_columns());
2220 }
2221
2222 if(m_s3_select->get_filter())
2223 m_s3_select->get_filter()->extract_columns(m_where_clause_columns,object_reader->get_num_of_columns());
2224 }
2225
2226 parquet_object() : base_s3object(nullptr),m_s3_select(nullptr),object_reader(nullptr)
2227 {}
2228
2229 ~parquet_object()
2230 {
2231 if(object_reader != nullptr)
2232 {
2233 delete object_reader;
2234 }
2235
2236 }
2237
2238 std::string get_error_description()
2239 {
2240 return m_error_description;
2241 }
2242
2243 bool is_set()
2244 {
2245 return m_s3_select != nullptr;
2246 }
2247
2248 void set_parquet_object(std::string parquet_file_name, s3select *s3_query,s3selectEngine::rgw_s3select_api* rgw) //TODO duplicate code
2249 {
2250 try{
2251
2252 object_reader = new parquet_file_parser(parquet_file_name,rgw); //TODO uniq ptr
2253 } catch(std::exception &e)
2254 {
2255 throw base_s3select_exception(std::string("failure while processing parquet meta-data ") + std::string(e.what()) ,base_s3select_exception::s3select_exp_en_t::FATAL);
2256 }
2257
2258 set(s3_query);
2259
2260 m_sa = s3_query->get_scratch_area();
2261
2262 s3_query->get_scratch_area()->set_parquet_type();
2263
2264 load_meta_data_into_scratch_area();
2265
2266 for(auto x : m_s3_select->get_projections_list())
2267 {
2268 x->extract_columns(m_projections_columns,object_reader->get_num_of_columns());
2269 }
2270
2271 if(m_s3_select->get_filter())
2272 m_s3_select->get_filter()->extract_columns(m_where_clause_columns,object_reader->get_num_of_columns());
2273 }
2274
2275
2276 int run_s3select_on_object(std::string &result,
2277 std::function<int(std::string&)> fp_s3select_result_format,
2278 std::function<int(std::string&)> fp_s3select_header_format)
2279 {
2280 int status = 0;
2281
2282 do
2283 {
2284 try
2285 {
2286 status = getMatchRow(result);
2287 }
2288 catch (base_s3select_exception &e)
2289 {
2290 m_error_description = e.what();
2291 m_error_count++;
2292 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count > 100) //abort query execution
2293 {
2294 return -1;
2295 }
2296 }
2297 catch (std::exception &e)
2298 {
2299 m_error_description = e.what();
2300 m_error_count++;
2301 if (m_error_count > 100) //abort query execution
2302 {
2303 return -1;
2304 }
2305 }
2306
2307 #define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024)
2308 if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT)
2309 {//AWS-cli limits response size the following callbacks send response upon some threshold
2310 fp_s3select_result_format(result);
2311
2312 if (!is_end_of_stream())
2313 {
2314 fp_s3select_header_format(result);
2315 }
2316 }
2317 else
2318 {
2319 if (is_end_of_stream())
2320 {
2321 fp_s3select_result_format(result);
2322 }
2323 }
2324
2325 if (status < 0 || is_end_of_stream())
2326 {
2327 break;
2328 }
2329
2330 } while (1);
2331
2332 return status;
2333 }
2334
2335 void load_meta_data_into_scratch_area()
2336 {
2337 int i=0;
2338 for(auto x : object_reader->get_schema())
2339 {
2340 m_s3_select->get_scratch_area()->set_column_pos(x.first.c_str(),i++);
2341 }
2342 }
2343
2344 void set(s3select* s3_query) //TODO reuse code on base
2345 {
2346 m_s3_select = s3_query;
2347 base_s3object::set(m_s3_select->get_scratch_area());
2348
2349 m_projections = m_s3_select->get_projections_list();
2350 m_where_clause = m_s3_select->get_filter();
2351
2352 if (m_where_clause)
2353 {
2354 m_where_clause->traverse_and_apply(m_sa, m_s3_select->get_aliases());
2355 }
2356
2357 for (auto p : m_projections)
2358 {
2359 p->traverse_and_apply(m_sa, m_s3_select->get_aliases());
2360 }
2361
2362 m_aggr_flow = m_s3_select->is_aggregate_query();
2363 }
2364
2365 bool is_end_of_stream()
2366 {
2367 return object_reader->end_of_stream();
2368 }
2369
2370 int getMatchRow(std::string &result) //TODO virtual ? getResult
2371 {
2372
2373 // get all column-references from where-clause
2374 // call parquet-reader(predicate-column-positions ,&row-values)
2375 // update scrach area with row-values
2376 // run where (if exist) in-case its true --> parquet-reader(projections-column-positions ,&row-values)
2377
2378 bool next_rownum_status = true;
2379 multi_values projections_resuls;
2380
2381 if (m_aggr_flow == true)
2382 {
2383 do
2384 {
2385 if (is_end_of_stream())
2386 {
2387 if (true) //(m_is_to_aggregate)
2388 {
2389 for (auto i : m_projections)
2390 {
2391 i->set_last_call();
2392 i->set_skip_non_aggregate(false);//projection column is set to be runnable
2393 projections_resuls.push_value( &(i->eval()) );
2394 }
2395 result_values_to_string(projections_resuls,result);
2396 }
2397
2398 return 0;
2399 }
2400
2401 if ((*m_projections.begin())->is_set_last_call())
2402 {
2403 //should validate while query execution , no update upon nodes are marked with set_last_call
2404 throw base_s3select_exception("on aggregation query , can not stream row data post do-aggregate call", base_s3select_exception::s3select_exp_en_t::FATAL);
2405 }
2406
2407 //TODO if (m_where_clause)
2408 object_reader->get_column_values_by_positions(m_where_clause_columns, m_predicate_values); //TODO status should indicate error/end-of-stream/success
2409
2410 m_sa->update(m_predicate_values, m_where_clause_columns);
2411
2412 for (auto a : *m_s3_select->get_aliases()->get())
2413 {
2414 a.second->invalidate_cache_result();
2415 }
2416
2417 if (!m_where_clause || m_where_clause->eval().is_true())
2418 {
2419 object_reader->get_column_values_by_positions(m_projections_columns, m_projections_values);
2420 m_sa->update(m_projections_values, m_projections_columns);
2421 for (auto i : m_projections)
2422 {
2423 i->eval();
2424 }
2425 }
2426
2427 object_reader->increase_rownum();
2428
2429 } while (1);
2430 }
2431 else
2432 {
2433 if (m_where_clause)
2434 {
2435 do
2436 {
2437
2438 for (auto a : *m_s3_select->get_aliases()->get())
2439 {
2440 a.second->invalidate_cache_result();
2441 }
2442
2443 object_reader->get_column_values_by_positions(m_where_clause_columns, m_predicate_values); //TODO status should indicate error/end-of-stream/success
2444
2445 m_sa->update(m_predicate_values, m_where_clause_columns);
2446
2447 if (m_where_clause->eval().is_true())
2448 break;
2449 else
2450 next_rownum_status = object_reader->increase_rownum();
2451
2452 } while (next_rownum_status);
2453
2454 if (next_rownum_status == false)
2455 return 1;
2456 }
2457 else
2458 {
2459 for (auto a : *m_s3_select->get_aliases()->get())
2460 {
2461 a.second->invalidate_cache_result();
2462 }
2463 }
2464
2465 object_reader->get_column_values_by_positions(m_projections_columns, m_projections_values);
2466 m_sa->update(m_projections_values, m_projections_columns);
2467
2468 for (auto i : m_projections)
2469 {
2470 projections_resuls.push_value( &(i->eval()) );
2471 }
2472 result_values_to_string(projections_resuls,result);
2473 result.append("\n");//TODO not generic
2474
2475 object_reader->increase_rownum();
2476
2477 if (is_end_of_stream())
2478 {
2479 return 0;
2480 }
2481 }
2482
2483 return 1; //1>0
2484 }
2485 };
2486 #endif //_ARROW_EXIST
2487
2488 }; // namespace s3selectEngine
2489
2490 #endif