]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | #ifndef S3SELECT_JSON_PARSER_H |
2 | #define S3SELECT_JSON_PARSER_H | |
3 | ||
4 | //TODO add __FILE__ __LINE__ message | |
5 | #define RAPIDJSON_ASSERT(x) s3select_json_parse_error(x) | |
6 | bool s3select_json_parse_error(bool b); | |
7 | bool s3select_json_parse_error(const char* error); | |
8 | ||
9 | #include "rapidjson/reader.h" | |
10 | #include "rapidjson/writer.h" | |
11 | #include "rapidjson/filereadstream.h" | |
12 | #include "rapidjson/filewritestream.h" | |
13 | #include "rapidjson/error/en.h" | |
14 | #include "rapidjson/document.h" | |
15 | #include <cassert> | |
16 | #include <sstream> | |
17 | #include <vector> | |
18 | #include <iostream> | |
19 | #include <functional> | |
20 | #include <boost/spirit/include/classic_core.hpp> | |
21 | #include <boost/algorithm/string/predicate.hpp> | |
22 | #include "s3select_oper.h"//class value | |
23 | #include <boost/algorithm/string/predicate.hpp> | |
24 | ||
25 | #define JSON_PROCESSING_LIMIT_REACHED 2 | |
26 | ||
27 | //TODO missing s3selectEngine namespace | |
28 | ||
29 | bool s3select_json_parse_error(bool b) | |
30 | { | |
31 | if(!b) | |
32 | { | |
33 | const char* error_str = "failure while processing JSON document"; | |
34 | throw s3selectEngine::base_s3select_exception(error_str, s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); | |
35 | } | |
36 | return false; | |
37 | } | |
38 | ||
39 | bool s3select_json_parse_error(const char* error) | |
40 | { | |
41 | if(!error) | |
42 | { | |
43 | const char* error_str = "failure while processing JSON document"; | |
44 | throw s3selectEngine::base_s3select_exception(error_str, s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); | |
45 | } | |
46 | return false; | |
47 | } | |
48 | ||
49 | static auto iequal_predicate = [](std::string& it1, std::string& it2) | |
50 | { | |
51 | return boost::iequals(it1,it2); | |
52 | }; | |
53 | ||
54 | ||
55 | class ChunksStreamer : public rapidjson::MemoryStream { | |
56 | ||
57 | //purpose: adding a method `resetBuffer` that enables to parse chunk after chunk | |
58 | //per each new chunk it reset internal data members | |
59 | public: | |
60 | ||
61 | std::string internal_buffer; | |
62 | const Ch* next_src_; | |
63 | size_t next_size_; | |
64 | ||
65 | ChunksStreamer():rapidjson::MemoryStream(0,0){next_src_=0;next_size_=0;} | |
66 | ||
67 | ChunksStreamer(const Ch *src, size_t size) : rapidjson::MemoryStream(src,size){next_src_=0;next_size_=0;} | |
68 | ||
69 | //override Peek methode | |
70 | Ch Peek() //const | |
71 | { | |
72 | if(RAPIDJSON_UNLIKELY(src_ == end_)) | |
73 | { | |
74 | if(next_src_)//next chunk exist | |
75 | {//upon reaching to end of current buffer, to switch with next one | |
76 | src_ = next_src_; | |
77 | begin_ = src_; | |
78 | size_ =next_size_; | |
79 | end_ = src_ + size_; | |
80 | ||
81 | next_src_ = 0; | |
82 | next_size_ = 0; | |
83 | return *src_; | |
84 | } | |
85 | else return 0; | |
86 | } | |
87 | return *src_; | |
88 | } | |
89 | ||
90 | //override Take method | |
91 | Ch Take() | |
92 | { | |
93 | if(RAPIDJSON_UNLIKELY(src_ == end_)) | |
94 | { | |
95 | if(next_src_)//next chunk exist | |
96 | {//upon reaching to end of current buffer, to switch with next one | |
97 | src_ = next_src_; | |
98 | begin_ = src_; | |
99 | size_ = next_size_; | |
100 | end_ = src_ + size_; | |
101 | ||
102 | next_src_ = 0; | |
103 | next_size_ = 0; | |
104 | return *src_; | |
105 | } | |
106 | else return 0; | |
107 | } | |
108 | return *src_++; | |
109 | } | |
110 | ||
111 | void resetBuffer(char* buff, size_t size) | |
112 | { | |
113 | if(!src_) | |
114 | {//first time calling | |
115 | begin_ = buff; | |
116 | src_ = buff; | |
117 | size_ = size; | |
118 | end_= src_ + size_; | |
119 | return; | |
120 | } | |
121 | ||
122 | if(!next_src_) | |
123 | {//save the next-chunk that will be used upon parser reaches end of current buffer | |
124 | next_src_ = buff; | |
125 | next_size_ = size; | |
126 | } | |
127 | else | |
128 | {// should not happen | |
129 | std::cout << "can not replace pointers!!!" << std::endl;//TODO exception | |
130 | return; | |
131 | } | |
132 | } | |
133 | ||
134 | void saveRemainingBytes() | |
135 | {//this routine called per each new chunk | |
136 | //save the remaining bytes, before its overriden by the next-chunk. | |
137 | size_t copy_left_sz = getBytesLeft(); //should be very small | |
138 | internal_buffer.assign(src_,copy_left_sz); | |
139 | ||
140 | src_ = internal_buffer.data(); | |
141 | begin_ = src_; | |
142 | size_ = copy_left_sz; | |
143 | end_= src_ + copy_left_sz; | |
144 | } | |
145 | ||
146 | size_t getBytesLeft() { return end_ - src_; } | |
147 | ||
148 | }; | |
149 | ||
150 | enum class row_state | |
151 | { | |
152 | NA, | |
153 | OBJECT_START_ROW, | |
154 | ARRAY_START_ROW | |
155 | }; | |
156 | ||
157 | class json_variable_access { | |
158 | //purpose: a state-machine for json-variables. | |
159 | //upon the syntax-parser accepts a variable (projection / where-clause) it create this object. | |
160 | //this object get events (key,start-array ... etc) as the JSON reader scans the input, | |
161 | //these events are advancing the states until it reaches to the last one, result with pushing value into scratch-area. | |
162 | ||
163 | private: | |
164 | ||
165 | // to set the following. | |
166 | std::vector<std::string>* from_clause; | |
167 | std::vector<std::string>* key_path; | |
168 | int* m_current_depth; | |
169 | std::function <int(s3selectEngine::value&,int)>* m_exact_match_cb; | |
170 | // a state number : (_1).a.b.c[ 17 ].d.e (a.b)=1 (c[)=2 (17)=3 (.d.e)=4 | |
171 | size_t current_state;//contain the current state of the state machine for searching-expression (each JSON variable in SQL statement has a searching expression) | |
172 | int nested_array_level;//in the case of array within array it contain the nesting level | |
173 | ||
174 | struct variable_state_md { | |
175 | std::vector<std::string> required_path;//set by the syntax-parser. in the case of array its empty | |
176 | int required_array_entry_no;//set by the syntax-parser, in the case of object-key its -1. | |
177 | int actual_array_entry_no;//upon scanning the JSON input, this value increased by 1 each new element | |
178 | int required_depth_size;// depth of state, is aggregated (include the previous). it's the summary of key-elements and array-operator's. | |
179 | int required_key_depth_size;// same as the above, not including the array-operators. | |
180 | int last_array_start;//it actually mark the nested-array-level (array within array) | |
181 | }; | |
182 | ||
183 | std::vector<struct variable_state_md> variable_states;//vector is populated upon syntax phase. | |
184 | ||
185 | public: | |
186 | ||
187 | json_variable_access():from_clause(nullptr),key_path(nullptr),m_current_depth(nullptr),m_exact_match_cb(nullptr),current_state(-1),nested_array_level(0) | |
188 | {} | |
189 | ||
190 | void init( | |
191 | std::vector<std::string>* reader_from_clause, | |
192 | std::vector<std::string>* reader_key_path, | |
193 | int* reader_current_depth, | |
194 | std::function <int(s3selectEngine::value&,int)>* excat_match_cb) | |
195 | {//this routine should be called before scanning the JSON input | |
196 | from_clause = reader_from_clause; | |
197 | key_path = reader_key_path; | |
198 | m_exact_match_cb = excat_match_cb; | |
199 | m_current_depth = reader_current_depth; | |
200 | current_state = 0; | |
201 | ||
202 | //loop on variable_states compute required_depth_size | |
203 | } | |
204 | ||
205 | void clear() | |
206 | { | |
207 | variable_states.clear(); | |
208 | } | |
209 | ||
210 | void debug_info() | |
211 | { | |
212 | auto f = [](std::vector<std::string> x){std::string res;for(auto i : x){res.append(i);res.append(".");};return res;}; | |
213 | ||
214 | std::cout << "m_current_depth=" << *m_current_depth << " required_depth_size= " << reader_position_state().required_depth_size << " "; | |
215 | std::cout << "variable_states[ current_state ].last_array_start=" << reader_position_state().last_array_start; | |
216 | std::cout << " current_state=" << current_state << " key_path=" << f(*key_path) << std::endl; | |
217 | } | |
218 | #define DBG {std::cout << "event=" << __FUNCTION__ << std::endl; debug_info();} | |
219 | #undef DBG | |
220 | #define DBG | |
221 | ||
222 | void compile_state_machine() | |
223 | { | |
224 | size_t aggregated_required_depth_size = 0; | |
225 | size_t aggregated_required_key_depth_size = 0; | |
226 | for(auto& v : variable_states) | |
227 | { | |
228 | if(v.required_path.size()) | |
229 | { | |
230 | v.required_depth_size = aggregated_required_depth_size + v.required_path.size();//depth size in general, including array | |
231 | v.required_key_depth_size = aggregated_required_key_depth_size;//depth include ONLY key parts | |
232 | aggregated_required_key_depth_size += v.required_path.size(); | |
233 | } | |
234 | else | |
235 | { | |
236 | v.required_depth_size = aggregated_required_depth_size + 1; | |
237 | } | |
238 | aggregated_required_depth_size = v.required_depth_size; | |
239 | } | |
240 | } | |
241 | ||
242 | void push_variable_state(std::vector<std::string>& required_path,int required_array_entry_no) | |
243 | { | |
244 | struct variable_state_md new_state={required_path,required_array_entry_no,-1,0,0,-1}; | |
245 | variable_states.push_back(new_state); | |
246 | //TODO required_path.size() > 0 or required_path,required_array_entry_no>=0 : not both | |
247 | compile_state_machine(); | |
248 | } | |
249 | ||
250 | struct variable_state_md& reader_position_state() | |
251 | { | |
252 | if (current_state>=variable_states.size()) | |
253 | { | |
254 | const char* out_of_range = "\nJSON reader failed due to array-out-of-range\n"; | |
255 | throw s3selectEngine::base_s3select_exception(out_of_range,s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); | |
256 | } | |
257 | ||
258 | return variable_states[ current_state ]; | |
259 | } | |
260 | ||
261 | bool is_array_state() | |
262 | { | |
263 | return (reader_position_state().required_array_entry_no>=0); | |
264 | } | |
265 | ||
266 | bool is_reader_located_on_required_depth() | |
267 | { | |
268 | return (*m_current_depth == reader_position_state().required_depth_size); | |
269 | } | |
270 | ||
271 | bool is_on_final_state() | |
272 | { | |
273 | return ((size_t)current_state == (variable_states.size())); | |
274 | //&& *m_current_depth == variable_states[ current_state -1 ].required_depth_size); | |
275 | ||
276 | // NOTE: by ignoring the current-depth, the matcher gives precedence to key-path match, while not ignoring accessing using array | |
277 | // meaning, upon requeting a.b[12] , the [12] is not ignored, the a<-->b distance should be calculated as key distance, i.e. not counting array/object with *no keys*. | |
278 | // user may request 'select _1.phonearray.num'; the reader will traverse `num` exist in `phonearray` | |
279 | } | |
280 | ||
281 | bool is_reader_reached_required_array_entry() | |
282 | { | |
283 | return (reader_position_state().actual_array_entry_no == reader_position_state().required_array_entry_no); | |
284 | } | |
285 | ||
286 | bool is_reader_passed_required_array_entry() | |
287 | { | |
288 | return (reader_position_state().actual_array_entry_no > reader_position_state().required_array_entry_no); | |
289 | } | |
290 | ||
291 | bool is_reader_located_on_array_according_to_current_state() | |
292 | { | |
293 | return (nested_array_level == reader_position_state().last_array_start); | |
294 | } | |
295 | ||
296 | bool is_reader_position_depth_lower_than_required() | |
297 | { | |
298 | return (*m_current_depth < reader_position_state().required_depth_size); | |
299 | } | |
300 | ||
301 | bool is_reader_located_on_array_entry_according_to_current_state() | |
302 | { | |
303 | return (reader_position_state().actual_array_entry_no == reader_position_state().required_array_entry_no); | |
304 | } | |
305 | ||
306 | void increase_current_state() | |
307 | { | |
308 | DBG | |
309 | ||
310 | if((size_t)current_state >= (variable_states.size())) return; | |
311 | current_state ++; | |
312 | } | |
313 | ||
314 | void decrease_current_state() | |
315 | { | |
316 | DBG | |
317 | ||
318 | if(current_state == 0) return; | |
319 | current_state --; | |
320 | } | |
321 | ||
322 | void key() | |
323 | { | |
324 | DBG | |
325 | ||
326 | if(reader_position_state().required_path.size())//state has a key | |
327 | {// key should match | |
328 | std::vector<std::string>* filter = &reader_position_state().required_path; | |
329 | auto required_key_depth_size = reader_position_state().required_key_depth_size; | |
330 | if(std::equal((*key_path).begin()+(*from_clause).size() + required_key_depth_size, //key-path-start-point + from-clause-depth-size + key-depth | |
331 | (*key_path).end(), | |
332 | (*filter).begin(), | |
333 | (*filter).end(), iequal_predicate)) | |
334 | { | |
335 | increase_current_state();//key match, advancing to next | |
336 | } | |
337 | } | |
338 | } | |
339 | ||
340 | void increase_array_index() | |
341 | { | |
342 | if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary? what about nesting level | |
343 | { | |
344 | DBG | |
345 | reader_position_state().actual_array_entry_no++; | |
346 | } | |
347 | } | |
348 | ||
349 | void dec_key() | |
350 | { | |
351 | DBG | |
352 | ||
353 | if(is_reader_position_depth_lower_than_required()) | |
354 | {//actual key-path is shorter than required | |
355 | decrease_current_state(); | |
356 | return; | |
357 | } | |
358 | ||
359 | if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary?; json_element_state.back() != ARRAY_STATE) | |
360 | {//key-path-depth matches, and it an array | |
361 | if(is_reader_reached_required_array_entry()) | |
362 | {//we reached the required array entry | |
363 | increase_current_state(); | |
364 | } | |
365 | else if(is_reader_passed_required_array_entry()) | |
366 | {//had passed the array entry | |
367 | decrease_current_state(); | |
368 | } | |
369 | } | |
370 | } | |
371 | ||
372 | void new_value(s3selectEngine::value& v,size_t json_index) | |
373 | { | |
374 | DBG | |
375 | ||
376 | if(is_on_final_state()) | |
377 | { | |
378 | (*m_exact_match_cb)(v, json_index); | |
379 | decrease_current_state();//TODO why decrease? the state-machine reached its final destination, and it should be only one result | |
380 | } | |
381 | increase_array_index();//next-value in array | |
382 | } | |
383 | ||
384 | void end_object() | |
385 | { | |
386 | increase_array_index(); | |
387 | } | |
388 | ||
389 | void end_array() | |
390 | { | |
391 | //init the correct array index | |
392 | DBG | |
393 | ||
394 | if(is_reader_located_on_array_according_to_current_state()) | |
395 | {//it reached end of required array | |
396 | reader_position_state().actual_array_entry_no = 0; | |
397 | decrease_current_state(); | |
398 | } | |
399 | nested_array_level --; | |
400 | ||
401 | // option 1. move out of one array, and enter a new one; option-2. enter an object | |
402 | increase_array_index();//increase only upon correct array //TODO move it into dec_key()? | |
403 | dec_key(); | |
404 | } | |
405 | ||
406 | void start_array() | |
407 | { | |
408 | DBG | |
409 | ||
410 | nested_array_level++; | |
411 | if(is_reader_located_on_required_depth()) | |
412 | {//reader entered an array required by JSON variable | |
413 | reader_position_state().actual_array_entry_no = 0; | |
414 | reader_position_state().last_array_start = nested_array_level; | |
415 | ||
416 | if(is_reader_located_on_array_entry_according_to_current_state()) | |
417 | {//we reached the required array entry -> next state | |
418 | increase_current_state(); | |
419 | } | |
420 | } | |
421 | } | |
422 | ||
423 | }; //json_variable_access | |
424 | ||
425 | class json_variables_operations { | |
426 | ||
427 | public: | |
428 | ||
429 | std::vector<std::pair<json_variable_access*,size_t>> json_statement_variables{}; | |
430 | ||
431 | void init(std::vector<std::pair<json_variable_access*,size_t>>& jsv, //TODO init upon construction? | |
432 | std::vector <std::string>* from_clause, | |
433 | std::vector<std::string>* key_path, | |
434 | int* current_depth, | |
435 | std::function <int(s3selectEngine::value&,int)>* exact_match_cb) | |
436 | { | |
437 | json_statement_variables = jsv; | |
438 | ||
439 | for(auto& var : json_statement_variables) | |
440 | { | |
441 | var.first->init(from_clause, | |
442 | key_path, | |
443 | current_depth, | |
444 | exact_match_cb); | |
445 | } | |
446 | } | |
447 | ||
448 | void start_array() | |
449 | { | |
450 | for(auto& j : json_statement_variables) | |
451 | { | |
452 | j.first->start_array(); | |
453 | } | |
454 | } | |
455 | void end_array() | |
456 | { | |
457 | for(auto& j : json_statement_variables) | |
458 | { | |
459 | j.first->end_array(); | |
460 | } | |
461 | } | |
462 | void dec_key() | |
463 | { | |
464 | for(auto& j : json_statement_variables) | |
465 | { | |
466 | j.first->dec_key(); | |
467 | } | |
468 | } | |
469 | void end_object() | |
470 | { | |
471 | for(auto& j : json_statement_variables) | |
472 | { | |
473 | j.first->end_object(); | |
474 | } | |
475 | } | |
476 | void key() | |
477 | { | |
478 | for(auto& j : json_statement_variables) | |
479 | { | |
480 | j.first->key(); | |
481 | } | |
482 | } | |
483 | void new_value(s3selectEngine::value& v) | |
484 | { | |
485 | for(auto& j : json_statement_variables) | |
486 | { | |
487 | j.first->new_value(v,j.second); | |
488 | } | |
489 | } | |
490 | };//json_variables_operations | |
491 | ||
492 | class JsonParserHandler : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, JsonParserHandler> { | |
493 | ||
494 | public: | |
495 | ||
496 | typedef enum {OBJECT_STATE,ARRAY_STATE} en_json_elm_state_t; | |
497 | typedef std::pair<std::vector<std::string>, s3selectEngine::value> json_key_value_t; | |
498 | ||
499 | row_state state = row_state::NA; | |
500 | std::function <int(s3selectEngine::value&,int)> m_exact_match_cb; | |
501 | std::function <int(s3selectEngine::scratch_area::json_key_value_t&)> m_star_operation_cb; | |
502 | ||
503 | json_variables_operations variable_match_operations; | |
504 | int row_count{}; | |
505 | std::vector <std::string> from_clause{}; | |
506 | bool prefix_match{}; | |
507 | s3selectEngine::value var_value; | |
508 | ChunksStreamer stream_buffer; | |
509 | bool init_buffer_stream; | |
510 | rapidjson::Reader reader; | |
511 | std::vector<en_json_elm_state_t> json_element_state; | |
512 | std::vector<std::string> key_path; | |
513 | std::function<int(void)> m_s3select_processing; | |
514 | int m_start_row_depth; | |
515 | int m_current_depth; | |
516 | bool m_star_operation; | |
517 | int m_sql_processing_status; | |
518 | ||
519 | JsonParserHandler() : prefix_match(false),init_buffer_stream(false),m_start_row_depth(-1),m_current_depth(0),m_star_operation(false),m_sql_processing_status(0) | |
520 | { | |
521 | } | |
522 | ||
523 | std::string get_key_path() | |
524 | {//for debug | |
525 | std::string res; | |
526 | for(const auto & i: key_path) | |
527 | { | |
528 | res.append(i); | |
529 | res.append(std::string("/")); | |
530 | } | |
531 | return res; | |
532 | } | |
533 | ||
534 | void dec_key_path() | |
535 | { | |
536 | if (json_element_state.size()) { | |
537 | if(json_element_state.back() != ARRAY_STATE) { | |
538 | if(key_path.size() != 0) { | |
539 | key_path.pop_back(); | |
540 | } | |
541 | } | |
542 | } | |
543 | ||
544 | variable_match_operations.dec_key(); | |
545 | ||
546 | //TODO m_current_depth-- should done here | |
547 | if(m_start_row_depth > m_current_depth) | |
548 | { | |
549 | prefix_match = false; | |
550 | } else | |
551 | if (prefix_match) { | |
552 | if (state == row_state::ARRAY_START_ROW && m_start_row_depth == m_current_depth) { | |
553 | m_sql_processing_status = m_s3select_processing(); //per each element in array | |
554 | ++row_count; | |
555 | } | |
556 | } | |
557 | } | |
558 | ||
559 | void push_new_key_value(s3selectEngine::value& v) | |
560 | { | |
561 | if (m_star_operation && prefix_match) | |
562 | { | |
563 | json_key_value_t key_value(key_path,v); | |
564 | m_star_operation_cb(key_value); | |
565 | } | |
566 | if (prefix_match) | |
567 | variable_match_operations.new_value(v); | |
568 | ||
569 | dec_key_path(); | |
570 | } | |
571 | ||
572 | bool Null() { | |
573 | var_value.setnull(); | |
574 | push_new_key_value(var_value); | |
575 | return true; } | |
576 | ||
577 | bool Bool(bool b) { | |
578 | var_value = b; | |
579 | push_new_key_value(var_value); | |
580 | return true; } | |
581 | ||
582 | bool Int(int i) { | |
583 | var_value = i; | |
584 | push_new_key_value(var_value); | |
585 | return true; } | |
586 | ||
587 | bool Uint(unsigned u) { | |
588 | var_value = u; | |
589 | push_new_key_value(var_value); | |
590 | return true; } | |
591 | ||
592 | bool Int64(int64_t i) { | |
593 | var_value = i; | |
594 | push_new_key_value(var_value); | |
595 | return true; } | |
596 | ||
597 | bool Uint64(uint64_t u) { | |
598 | var_value = u; | |
599 | push_new_key_value(var_value); | |
600 | return true; } | |
601 | ||
602 | bool Double(double d) { | |
603 | var_value = d; | |
604 | push_new_key_value(var_value); | |
605 | return true; } | |
606 | ||
607 | bool String(const char* str, rapidjson::SizeType length, bool copy) { | |
608 | //TODO use copy | |
609 | var_value = str; | |
610 | push_new_key_value(var_value); | |
611 | return true; | |
612 | } | |
613 | ||
614 | bool Key(const char* str, rapidjson::SizeType length, bool copy) { | |
615 | key_path.push_back(std::string(str)); | |
616 | ||
617 | if(from_clause.size() == 0 || std::equal(key_path.begin(), key_path.end(), from_clause.begin(), from_clause.end(), iequal_predicate)) { | |
618 | prefix_match = true; | |
619 | } | |
620 | ||
621 | variable_match_operations.key(); | |
622 | ||
623 | return true; | |
624 | } | |
625 | ||
626 | bool is_already_row_started() | |
627 | { | |
628 | if(state == row_state::OBJECT_START_ROW || state == row_state::ARRAY_START_ROW) | |
629 | return true; | |
630 | else | |
631 | return false; | |
632 | } | |
633 | ||
634 | bool StartObject() { | |
635 | json_element_state.push_back(OBJECT_STATE); | |
636 | m_current_depth++; | |
637 | if (prefix_match && !is_already_row_started()) { | |
638 | state = row_state::OBJECT_START_ROW; | |
639 | m_start_row_depth = m_current_depth; | |
640 | ++row_count; | |
641 | } | |
642 | ||
643 | return true; | |
644 | } | |
645 | ||
646 | bool EndObject(rapidjson::SizeType memberCount) { | |
647 | json_element_state.pop_back(); | |
648 | m_current_depth --; | |
649 | ||
650 | variable_match_operations.end_object(); | |
651 | ||
652 | dec_key_path(); | |
653 | if (state == row_state::OBJECT_START_ROW && (m_start_row_depth > m_current_depth)) { | |
654 | m_sql_processing_status = m_s3select_processing(); | |
655 | state = row_state::NA; | |
656 | } | |
657 | return true; | |
658 | } | |
659 | ||
660 | bool StartArray() { | |
661 | json_element_state.push_back(ARRAY_STATE); | |
662 | m_current_depth++; | |
663 | if (prefix_match && !is_already_row_started()) { | |
664 | state = row_state::ARRAY_START_ROW; | |
665 | m_start_row_depth = m_current_depth; | |
666 | } | |
667 | ||
668 | variable_match_operations.start_array(); | |
669 | ||
670 | return true; | |
671 | } | |
672 | ||
673 | bool EndArray(rapidjson::SizeType elementCount) { | |
674 | json_element_state.pop_back(); | |
675 | m_current_depth--; | |
676 | dec_key_path(); | |
677 | ||
678 | if (state == row_state::ARRAY_START_ROW && (m_start_row_depth > m_current_depth)) { | |
679 | state = row_state::NA; | |
680 | } | |
681 | ||
682 | variable_match_operations.end_array(); | |
683 | ||
684 | return true; | |
685 | } | |
686 | ||
687 | void set_prefix_match(std::vector<std::string>& requested_prefix_match) | |
688 | {//purpose: set the filter according to SQL statement(from clause) | |
689 | from_clause = requested_prefix_match; | |
690 | if(from_clause.size() ==0) | |
691 | { | |
692 | prefix_match = true; | |
693 | m_start_row_depth = m_current_depth; | |
694 | } | |
695 | } | |
696 | ||
697 | void set_statement_json_variables(std::vector<std::pair<json_variable_access*,size_t>>& statement_variables) | |
698 | {//purpose: set the json variables extracted from the SQL statement(projection columns, predicates columns) | |
699 | variable_match_operations.init( | |
700 | statement_variables, | |
701 | &from_clause, | |
702 | &key_path, | |
703 | &m_current_depth, | |
704 | &m_exact_match_cb); | |
705 | } | |
706 | ||
707 | void set_exact_match_callback(std::function<int(s3selectEngine::value&, int)> f) | |
708 | {//purpose: upon key is matching one of the exact filters, the callback is called. | |
709 | m_exact_match_cb = f; | |
710 | } | |
711 | ||
712 | void set_s3select_processing_callback(std::function<int(void)>& f) | |
713 | {//purpose: execute s3select statement on matching row (according to filters) | |
714 | m_s3select_processing = f; | |
715 | } | |
716 | ||
717 | void set_push_per_star_operation_callback( std::function <int(s3selectEngine::scratch_area::json_key_value_t&)> cb) | |
718 | { | |
719 | m_star_operation_cb = cb; | |
720 | } | |
721 | ||
722 | void set_star_operation() | |
723 | { | |
724 | m_star_operation = true; | |
725 | } | |
726 | ||
727 | int process_json_buffer(char* json_buffer,size_t json_buffer_sz, bool end_of_stream=false) | |
728 | {//user keeps calling with buffers, the method is not aware of the object size. | |
729 | ||
730 | ||
731 | try{ | |
732 | if(!init_buffer_stream) | |
733 | { | |
734 | //set the memoryStreamer | |
735 | reader.IterativeParseInit(); | |
736 | init_buffer_stream = true; | |
737 | ||
738 | } | |
739 | ||
740 | //the non-processed bytes plus the next chunk are copy into main processing buffer | |
741 | if(!end_of_stream) | |
742 | stream_buffer.resetBuffer(json_buffer, json_buffer_sz); | |
743 | ||
744 | while (!reader.IterativeParseComplete()) { | |
745 | reader.IterativeParseNext<rapidjson::kParseDefaultFlags>(stream_buffer, *this); | |
746 | ||
747 | //once all key-values move into s3select(for further filtering and processing), it should be cleared | |
748 | ||
749 | //TODO in the case the chunk is too small or some value in input is too big, the parsing will fail. | |
750 | if (!end_of_stream && stream_buffer.next_src_==0 && stream_buffer.getBytesLeft() < 2048) | |
751 | {//the non processed bytes will be processed on next fetched chunk | |
752 | //TODO save remaining-bytes to internal buffer (or caller will use 2 sets of buffer) | |
753 | stream_buffer.saveRemainingBytes(); | |
754 | return 0; | |
755 | } | |
756 | if(m_sql_processing_status == JSON_PROCESSING_LIMIT_REACHED)//return status(int) from callback | |
757 | { | |
758 | return JSON_PROCESSING_LIMIT_REACHED; | |
759 | } | |
760 | ||
761 | // error message | |
762 | if(reader.HasParseError()) { | |
763 | rapidjson::ParseErrorCode c = reader.GetParseErrorCode(); | |
764 | size_t ofs = reader.GetErrorOffset(); | |
765 | std::stringstream error_str; | |
766 | error_str << "parsing error. code:" << c << " position: " << ofs << std::endl; | |
767 | throw s3selectEngine::base_s3select_exception(error_str.str(), s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); | |
768 | return -1; | |
769 | } | |
770 | }//while reader.IterativeParseComplete | |
771 | } | |
772 | catch(std::exception &e){ | |
773 | std::stringstream error_str; | |
774 | error_str << "failed to process JSON : " << e.what() << std::endl; | |
775 | throw s3selectEngine::base_s3select_exception(error_str.str(), s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); | |
776 | return -1; | |
777 | } | |
778 | return 0; | |
779 | } | |
780 | }; | |
781 | ||
782 | ||
783 | #endif | |
784 |