]> git.proxmox.com Git - ceph.git/blob - ceph/src/s3select/include/s3select_json_parser.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / s3select / include / s3select_json_parser.h
1 #ifndef S3SELECT_JSON_PARSER_H
2 #define S3SELECT_JSON_PARSER_H
3
4 //TODO add __FILE__ __LINE__ message
5 #define RAPIDJSON_ASSERT(x) s3select_json_parse_error(x)
6 bool s3select_json_parse_error(bool b);
7 bool s3select_json_parse_error(const char* error);
8
9 #include "rapidjson/reader.h"
10 #include "rapidjson/writer.h"
11 #include "rapidjson/filereadstream.h"
12 #include "rapidjson/filewritestream.h"
13 #include "rapidjson/error/en.h"
14 #include "rapidjson/document.h"
15 #include <cassert>
16 #include <sstream>
17 #include <vector>
18 #include <iostream>
19 #include <functional>
20 #include <boost/spirit/include/classic_core.hpp>
21 #include <boost/algorithm/string/predicate.hpp>
22 #include "s3select_oper.h"//class value
23 #include <boost/algorithm/string/predicate.hpp>
24
25 #define JSON_PROCESSING_LIMIT_REACHED 2
26
27 //TODO missing s3selectEngine namespace
28
29 bool s3select_json_parse_error(bool b)
30 {
31 if(!b)
32 {
33 const char* error_str = "failure while processing JSON document";
34 throw s3selectEngine::base_s3select_exception(error_str, s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL);
35 }
36 return false;
37 }
38
39 bool s3select_json_parse_error(const char* error)
40 {
41 if(!error)
42 {
43 const char* error_str = "failure while processing JSON document";
44 throw s3selectEngine::base_s3select_exception(error_str, s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL);
45 }
46 return false;
47 }
48
49 static auto iequal_predicate = [](std::string& it1, std::string& it2)
50 {
51 return boost::iequals(it1,it2);
52 };
53
54
55 class ChunksStreamer : public rapidjson::MemoryStream {
56
57 //purpose: adding a method `resetBuffer` that enables to parse chunk after chunk
58 //per each new chunk it reset internal data members
59 public:
60
61 std::string internal_buffer;
62 const Ch* next_src_;
63 size_t next_size_;
64
65 ChunksStreamer():rapidjson::MemoryStream(0,0){next_src_=0;next_size_=0;}
66
67 ChunksStreamer(const Ch *src, size_t size) : rapidjson::MemoryStream(src,size){next_src_=0;next_size_=0;}
68
69 //override Peek methode
70 Ch Peek() //const
71 {
72 if(RAPIDJSON_UNLIKELY(src_ == end_))
73 {
74 if(next_src_)//next chunk exist
75 {//upon reaching to end of current buffer, to switch with next one
76 src_ = next_src_;
77 begin_ = src_;
78 size_ =next_size_;
79 end_ = src_ + size_;
80
81 next_src_ = 0;
82 next_size_ = 0;
83 return *src_;
84 }
85 else return 0;
86 }
87 return *src_;
88 }
89
90 //override Take method
91 Ch Take()
92 {
93 if(RAPIDJSON_UNLIKELY(src_ == end_))
94 {
95 if(next_src_)//next chunk exist
96 {//upon reaching to end of current buffer, to switch with next one
97 src_ = next_src_;
98 begin_ = src_;
99 size_ = next_size_;
100 end_ = src_ + size_;
101
102 next_src_ = 0;
103 next_size_ = 0;
104 return *src_;
105 }
106 else return 0;
107 }
108 return *src_++;
109 }
110
111 void resetBuffer(char* buff, size_t size)
112 {
113 if(!src_)
114 {//first time calling
115 begin_ = buff;
116 src_ = buff;
117 size_ = size;
118 end_= src_ + size_;
119 return;
120 }
121
122 if(!next_src_)
123 {//save the next-chunk that will be used upon parser reaches end of current buffer
124 next_src_ = buff;
125 next_size_ = size;
126 }
127 else
128 {// should not happen
129 std::cout << "can not replace pointers!!!" << std::endl;//TODO exception
130 return;
131 }
132 }
133
134 void saveRemainingBytes()
135 {//this routine called per each new chunk
136 //save the remaining bytes, before its overriden by the next-chunk.
137 size_t copy_left_sz = getBytesLeft(); //should be very small
138 internal_buffer.assign(src_,copy_left_sz);
139
140 src_ = internal_buffer.data();
141 begin_ = src_;
142 size_ = copy_left_sz;
143 end_= src_ + copy_left_sz;
144 }
145
146 size_t getBytesLeft() { return end_ - src_; }
147
148 };
149
150 enum class row_state
151 {
152 NA,
153 OBJECT_START_ROW,
154 ARRAY_START_ROW
155 };
156
157 class json_variable_access {
158 //purpose: a state-machine for json-variables.
159 //upon the syntax-parser accepts a variable (projection / where-clause) it create this object.
160 //this object get events (key,start-array ... etc) as the JSON reader scans the input,
161 //these events are advancing the states until it reaches to the last one, result with pushing value into scratch-area.
162
163 private:
164
165 // to set the following.
166 std::vector<std::string>* from_clause;
167 std::vector<std::string>* key_path;
168 int* m_current_depth;
169 std::function <int(s3selectEngine::value&,int)>* m_exact_match_cb;
170 // a state number : (_1).a.b.c[ 17 ].d.e (a.b)=1 (c[)=2 (17)=3 (.d.e)=4
171 size_t current_state;//contain the current state of the state machine for searching-expression (each JSON variable in SQL statement has a searching expression)
172 int nested_array_level;//in the case of array within array it contain the nesting level
173
174 struct variable_state_md {
175 std::vector<std::string> required_path;//set by the syntax-parser. in the case of array its empty
176 int required_array_entry_no;//set by the syntax-parser, in the case of object-key its -1.
177 int actual_array_entry_no;//upon scanning the JSON input, this value increased by 1 each new element
178 int required_depth_size;// depth of state, is aggregated (include the previous). it's the summary of key-elements and array-operator's.
179 int required_key_depth_size;// same as the above, not including the array-operators.
180 int last_array_start;//it actually mark the nested-array-level (array within array)
181 };
182
183 std::vector<struct variable_state_md> variable_states;//vector is populated upon syntax phase.
184
185 public:
186
187 json_variable_access():from_clause(nullptr),key_path(nullptr),m_current_depth(nullptr),m_exact_match_cb(nullptr),current_state(-1),nested_array_level(0)
188 {}
189
190 void init(
191 std::vector<std::string>* reader_from_clause,
192 std::vector<std::string>* reader_key_path,
193 int* reader_current_depth,
194 std::function <int(s3selectEngine::value&,int)>* excat_match_cb)
195 {//this routine should be called before scanning the JSON input
196 from_clause = reader_from_clause;
197 key_path = reader_key_path;
198 m_exact_match_cb = excat_match_cb;
199 m_current_depth = reader_current_depth;
200 current_state = 0;
201
202 //loop on variable_states compute required_depth_size
203 }
204
205 void clear()
206 {
207 variable_states.clear();
208 }
209
210 void debug_info()
211 {
212 auto f = [](std::vector<std::string> x){std::string res;for(auto i : x){res.append(i);res.append(".");};return res;};
213
214 std::cout << "m_current_depth=" << *m_current_depth << " required_depth_size= " << reader_position_state().required_depth_size << " ";
215 std::cout << "variable_states[ current_state ].last_array_start=" << reader_position_state().last_array_start;
216 std::cout << " current_state=" << current_state << " key_path=" << f(*key_path) << std::endl;
217 }
218 #define DBG {std::cout << "event=" << __FUNCTION__ << std::endl; debug_info();}
219 #undef DBG
220 #define DBG
221
222 void compile_state_machine()
223 {
224 size_t aggregated_required_depth_size = 0;
225 size_t aggregated_required_key_depth_size = 0;
226 for(auto& v : variable_states)
227 {
228 if(v.required_path.size())
229 {
230 v.required_depth_size = aggregated_required_depth_size + v.required_path.size();//depth size in general, including array
231 v.required_key_depth_size = aggregated_required_key_depth_size;//depth include ONLY key parts
232 aggregated_required_key_depth_size += v.required_path.size();
233 }
234 else
235 {
236 v.required_depth_size = aggregated_required_depth_size + 1;
237 }
238 aggregated_required_depth_size = v.required_depth_size;
239 }
240 }
241
242 void push_variable_state(std::vector<std::string>& required_path,int required_array_entry_no)
243 {
244 struct variable_state_md new_state={required_path,required_array_entry_no,-1,0,0,-1};
245 variable_states.push_back(new_state);
246 //TODO required_path.size() > 0 or required_path,required_array_entry_no>=0 : not both
247 compile_state_machine();
248 }
249
250 struct variable_state_md& reader_position_state()
251 {
252 if (current_state>=variable_states.size())
253 {
254 const char* out_of_range = "\nJSON reader failed due to array-out-of-range\n";
255 throw s3selectEngine::base_s3select_exception(out_of_range,s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL);
256 }
257
258 return variable_states[ current_state ];
259 }
260
261 bool is_array_state()
262 {
263 return (reader_position_state().required_array_entry_no>=0);
264 }
265
266 bool is_reader_located_on_required_depth()
267 {
268 return (*m_current_depth == reader_position_state().required_depth_size);
269 }
270
271 bool is_on_final_state()
272 {
273 return ((size_t)current_state == (variable_states.size()));
274 //&& *m_current_depth == variable_states[ current_state -1 ].required_depth_size);
275
276 // NOTE: by ignoring the current-depth, the matcher gives precedence to key-path match, while not ignoring accessing using array
277 // meaning, upon requeting a.b[12] , the [12] is not ignored, the a<-->b distance should be calculated as key distance, i.e. not counting array/object with *no keys*.
278 // user may request 'select _1.phonearray.num'; the reader will traverse `num` exist in `phonearray`
279 }
280
281 bool is_reader_reached_required_array_entry()
282 {
283 return (reader_position_state().actual_array_entry_no == reader_position_state().required_array_entry_no);
284 }
285
286 bool is_reader_passed_required_array_entry()
287 {
288 return (reader_position_state().actual_array_entry_no > reader_position_state().required_array_entry_no);
289 }
290
291 bool is_reader_located_on_array_according_to_current_state()
292 {
293 return (nested_array_level == reader_position_state().last_array_start);
294 }
295
296 bool is_reader_position_depth_lower_than_required()
297 {
298 return (*m_current_depth < reader_position_state().required_depth_size);
299 }
300
301 bool is_reader_located_on_array_entry_according_to_current_state()
302 {
303 return (reader_position_state().actual_array_entry_no == reader_position_state().required_array_entry_no);
304 }
305
306 void increase_current_state()
307 {
308 DBG
309
310 if((size_t)current_state >= (variable_states.size())) return;
311 current_state ++;
312 }
313
314 void decrease_current_state()
315 {
316 DBG
317
318 if(current_state == 0) return;
319 current_state --;
320 }
321
322 void key()
323 {
324 DBG
325
326 if(reader_position_state().required_path.size())//state has a key
327 {// key should match
328 std::vector<std::string>* filter = &reader_position_state().required_path;
329 auto required_key_depth_size = reader_position_state().required_key_depth_size;
330 if(std::equal((*key_path).begin()+(*from_clause).size() + required_key_depth_size, //key-path-start-point + from-clause-depth-size + key-depth
331 (*key_path).end(),
332 (*filter).begin(),
333 (*filter).end(), iequal_predicate))
334 {
335 increase_current_state();//key match, advancing to next
336 }
337 }
338 }
339
340 void increase_array_index()
341 {
342 if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary? what about nesting level
343 {
344 DBG
345 reader_position_state().actual_array_entry_no++;
346 }
347 }
348
349 void dec_key()
350 {
351 DBG
352
353 if(is_reader_position_depth_lower_than_required())
354 {//actual key-path is shorter than required
355 decrease_current_state();
356 return;
357 }
358
359 if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary?; json_element_state.back() != ARRAY_STATE)
360 {//key-path-depth matches, and it an array
361 if(is_reader_reached_required_array_entry())
362 {//we reached the required array entry
363 increase_current_state();
364 }
365 else if(is_reader_passed_required_array_entry())
366 {//had passed the array entry
367 decrease_current_state();
368 }
369 }
370 }
371
372 void new_value(s3selectEngine::value& v,size_t json_index)
373 {
374 DBG
375
376 if(is_on_final_state())
377 {
378 (*m_exact_match_cb)(v, json_index);
379 decrease_current_state();//TODO why decrease? the state-machine reached its final destination, and it should be only one result
380 }
381 increase_array_index();//next-value in array
382 }
383
384 void end_object()
385 {
386 increase_array_index();
387 }
388
389 void end_array()
390 {
391 //init the correct array index
392 DBG
393
394 if(is_reader_located_on_array_according_to_current_state())
395 {//it reached end of required array
396 reader_position_state().actual_array_entry_no = 0;
397 decrease_current_state();
398 }
399 nested_array_level --;
400
401 // option 1. move out of one array, and enter a new one; option-2. enter an object
402 increase_array_index();//increase only upon correct array //TODO move it into dec_key()?
403 dec_key();
404 }
405
406 void start_array()
407 {
408 DBG
409
410 nested_array_level++;
411 if(is_reader_located_on_required_depth())
412 {//reader entered an array required by JSON variable
413 reader_position_state().actual_array_entry_no = 0;
414 reader_position_state().last_array_start = nested_array_level;
415
416 if(is_reader_located_on_array_entry_according_to_current_state())
417 {//we reached the required array entry -> next state
418 increase_current_state();
419 }
420 }
421 }
422
423 }; //json_variable_access
424
425 class json_variables_operations {
426
427 public:
428
429 std::vector<std::pair<json_variable_access*,size_t>> json_statement_variables{};
430
431 void init(std::vector<std::pair<json_variable_access*,size_t>>& jsv, //TODO init upon construction?
432 std::vector <std::string>* from_clause,
433 std::vector<std::string>* key_path,
434 int* current_depth,
435 std::function <int(s3selectEngine::value&,int)>* exact_match_cb)
436 {
437 json_statement_variables = jsv;
438
439 for(auto& var : json_statement_variables)
440 {
441 var.first->init(from_clause,
442 key_path,
443 current_depth,
444 exact_match_cb);
445 }
446 }
447
448 void start_array()
449 {
450 for(auto& j : json_statement_variables)
451 {
452 j.first->start_array();
453 }
454 }
455 void end_array()
456 {
457 for(auto& j : json_statement_variables)
458 {
459 j.first->end_array();
460 }
461 }
462 void dec_key()
463 {
464 for(auto& j : json_statement_variables)
465 {
466 j.first->dec_key();
467 }
468 }
469 void end_object()
470 {
471 for(auto& j : json_statement_variables)
472 {
473 j.first->end_object();
474 }
475 }
476 void key()
477 {
478 for(auto& j : json_statement_variables)
479 {
480 j.first->key();
481 }
482 }
483 void new_value(s3selectEngine::value& v)
484 {
485 for(auto& j : json_statement_variables)
486 {
487 j.first->new_value(v,j.second);
488 }
489 }
490 };//json_variables_operations
491
492 class JsonParserHandler : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, JsonParserHandler> {
493
494 public:
495
496 typedef enum {OBJECT_STATE,ARRAY_STATE} en_json_elm_state_t;
497 typedef std::pair<std::vector<std::string>, s3selectEngine::value> json_key_value_t;
498
499 row_state state = row_state::NA;
500 std::function <int(s3selectEngine::value&,int)> m_exact_match_cb;
501 std::function <int(s3selectEngine::scratch_area::json_key_value_t&)> m_star_operation_cb;
502
503 json_variables_operations variable_match_operations;
504 int row_count{};
505 std::vector <std::string> from_clause{};
506 bool prefix_match{};
507 s3selectEngine::value var_value;
508 ChunksStreamer stream_buffer;
509 bool init_buffer_stream;
510 rapidjson::Reader reader;
511 std::vector<en_json_elm_state_t> json_element_state;
512 std::vector<std::string> key_path;
513 std::function<int(void)> m_s3select_processing;
514 int m_start_row_depth;
515 int m_current_depth;
516 bool m_star_operation;
517 int m_sql_processing_status;
518
519 JsonParserHandler() : prefix_match(false),init_buffer_stream(false),m_start_row_depth(-1),m_current_depth(0),m_star_operation(false),m_sql_processing_status(0)
520 {
521 }
522
523 std::string get_key_path()
524 {//for debug
525 std::string res;
526 for(const auto & i: key_path)
527 {
528 res.append(i);
529 res.append(std::string("/"));
530 }
531 return res;
532 }
533
534 void dec_key_path()
535 {
536 if (json_element_state.size()) {
537 if(json_element_state.back() != ARRAY_STATE) {
538 if(key_path.size() != 0) {
539 key_path.pop_back();
540 }
541 }
542 }
543
544 variable_match_operations.dec_key();
545
546 //TODO m_current_depth-- should done here
547 if(m_start_row_depth > m_current_depth)
548 {
549 prefix_match = false;
550 } else
551 if (prefix_match) {
552 if (state == row_state::ARRAY_START_ROW && m_start_row_depth == m_current_depth) {
553 m_sql_processing_status = m_s3select_processing(); //per each element in array
554 ++row_count;
555 }
556 }
557 }
558
559 void push_new_key_value(s3selectEngine::value& v)
560 {
561 if (m_star_operation && prefix_match)
562 {
563 json_key_value_t key_value(key_path,v);
564 m_star_operation_cb(key_value);
565 }
566 if (prefix_match)
567 variable_match_operations.new_value(v);
568
569 dec_key_path();
570 }
571
572 bool Null() {
573 var_value.setnull();
574 push_new_key_value(var_value);
575 return true; }
576
577 bool Bool(bool b) {
578 var_value = b;
579 push_new_key_value(var_value);
580 return true; }
581
582 bool Int(int i) {
583 var_value = i;
584 push_new_key_value(var_value);
585 return true; }
586
587 bool Uint(unsigned u) {
588 var_value = u;
589 push_new_key_value(var_value);
590 return true; }
591
592 bool Int64(int64_t i) {
593 var_value = i;
594 push_new_key_value(var_value);
595 return true; }
596
597 bool Uint64(uint64_t u) {
598 var_value = u;
599 push_new_key_value(var_value);
600 return true; }
601
602 bool Double(double d) {
603 var_value = d;
604 push_new_key_value(var_value);
605 return true; }
606
607 bool String(const char* str, rapidjson::SizeType length, bool copy) {
608 //TODO use copy
609 var_value = str;
610 push_new_key_value(var_value);
611 return true;
612 }
613
614 bool Key(const char* str, rapidjson::SizeType length, bool copy) {
615 key_path.push_back(std::string(str));
616
617 if(from_clause.size() == 0 || std::equal(key_path.begin(), key_path.end(), from_clause.begin(), from_clause.end(), iequal_predicate)) {
618 prefix_match = true;
619 }
620
621 variable_match_operations.key();
622
623 return true;
624 }
625
626 bool is_already_row_started()
627 {
628 if(state == row_state::OBJECT_START_ROW || state == row_state::ARRAY_START_ROW)
629 return true;
630 else
631 return false;
632 }
633
634 bool StartObject() {
635 json_element_state.push_back(OBJECT_STATE);
636 m_current_depth++;
637 if (prefix_match && !is_already_row_started()) {
638 state = row_state::OBJECT_START_ROW;
639 m_start_row_depth = m_current_depth;
640 ++row_count;
641 }
642
643 return true;
644 }
645
646 bool EndObject(rapidjson::SizeType memberCount) {
647 json_element_state.pop_back();
648 m_current_depth --;
649
650 variable_match_operations.end_object();
651
652 dec_key_path();
653 if (state == row_state::OBJECT_START_ROW && (m_start_row_depth > m_current_depth)) {
654 m_sql_processing_status = m_s3select_processing();
655 state = row_state::NA;
656 }
657 return true;
658 }
659
660 bool StartArray() {
661 json_element_state.push_back(ARRAY_STATE);
662 m_current_depth++;
663 if (prefix_match && !is_already_row_started()) {
664 state = row_state::ARRAY_START_ROW;
665 m_start_row_depth = m_current_depth;
666 }
667
668 variable_match_operations.start_array();
669
670 return true;
671 }
672
673 bool EndArray(rapidjson::SizeType elementCount) {
674 json_element_state.pop_back();
675 m_current_depth--;
676 dec_key_path();
677
678 if (state == row_state::ARRAY_START_ROW && (m_start_row_depth > m_current_depth)) {
679 state = row_state::NA;
680 }
681
682 variable_match_operations.end_array();
683
684 return true;
685 }
686
687 void set_prefix_match(std::vector<std::string>& requested_prefix_match)
688 {//purpose: set the filter according to SQL statement(from clause)
689 from_clause = requested_prefix_match;
690 if(from_clause.size() ==0)
691 {
692 prefix_match = true;
693 m_start_row_depth = m_current_depth;
694 }
695 }
696
697 void set_statement_json_variables(std::vector<std::pair<json_variable_access*,size_t>>& statement_variables)
698 {//purpose: set the json variables extracted from the SQL statement(projection columns, predicates columns)
699 variable_match_operations.init(
700 statement_variables,
701 &from_clause,
702 &key_path,
703 &m_current_depth,
704 &m_exact_match_cb);
705 }
706
707 void set_exact_match_callback(std::function<int(s3selectEngine::value&, int)> f)
708 {//purpose: upon key is matching one of the exact filters, the callback is called.
709 m_exact_match_cb = f;
710 }
711
712 void set_s3select_processing_callback(std::function<int(void)>& f)
713 {//purpose: execute s3select statement on matching row (according to filters)
714 m_s3select_processing = f;
715 }
716
717 void set_push_per_star_operation_callback( std::function <int(s3selectEngine::scratch_area::json_key_value_t&)> cb)
718 {
719 m_star_operation_cb = cb;
720 }
721
722 void set_star_operation()
723 {
724 m_star_operation = true;
725 }
726
727 int process_json_buffer(char* json_buffer,size_t json_buffer_sz, bool end_of_stream=false)
728 {//user keeps calling with buffers, the method is not aware of the object size.
729
730
731 try{
732 if(!init_buffer_stream)
733 {
734 //set the memoryStreamer
735 reader.IterativeParseInit();
736 init_buffer_stream = true;
737
738 }
739
740 //the non-processed bytes plus the next chunk are copy into main processing buffer
741 if(!end_of_stream)
742 stream_buffer.resetBuffer(json_buffer, json_buffer_sz);
743
744 while (!reader.IterativeParseComplete()) {
745 reader.IterativeParseNext<rapidjson::kParseDefaultFlags>(stream_buffer, *this);
746
747 //once all key-values move into s3select(for further filtering and processing), it should be cleared
748
749 //TODO in the case the chunk is too small or some value in input is too big, the parsing will fail.
750 if (!end_of_stream && stream_buffer.next_src_==0 && stream_buffer.getBytesLeft() < 2048)
751 {//the non processed bytes will be processed on next fetched chunk
752 //TODO save remaining-bytes to internal buffer (or caller will use 2 sets of buffer)
753 stream_buffer.saveRemainingBytes();
754 return 0;
755 }
756 if(m_sql_processing_status == JSON_PROCESSING_LIMIT_REACHED)//return status(int) from callback
757 {
758 return JSON_PROCESSING_LIMIT_REACHED;
759 }
760
761 // error message
762 if(reader.HasParseError()) {
763 rapidjson::ParseErrorCode c = reader.GetParseErrorCode();
764 size_t ofs = reader.GetErrorOffset();
765 std::stringstream error_str;
766 error_str << "parsing error. code:" << c << " position: " << ofs << std::endl;
767 throw s3selectEngine::base_s3select_exception(error_str.str(), s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL);
768 return -1;
769 }
770 }//while reader.IterativeParseComplete
771 }
772 catch(std::exception &e){
773 std::stringstream error_str;
774 error_str << "failed to process JSON : " << e.what() << std::endl;
775 throw s3selectEngine::base_s3select_exception(error_str.str(), s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL);
776 return -1;
777 }
778 return 0;
779 }
780 };
781
782
783 #endif
784