1 #ifndef S3SELECT_JSON_PARSER_H
2 #define S3SELECT_JSON_PARSER_H
4 //TODO add __FILE__ __LINE__ message
5 #define RAPIDJSON_ASSERT(x) s3select_json_parse_error(x)
6 bool s3select_json_parse_error(bool b
);
7 bool s3select_json_parse_error(const char* error
);
9 #include "rapidjson/reader.h"
10 #include "rapidjson/writer.h"
11 #include "rapidjson/filereadstream.h"
12 #include "rapidjson/filewritestream.h"
13 #include "rapidjson/error/en.h"
14 #include "rapidjson/document.h"
20 #include <boost/spirit/include/classic_core.hpp>
21 #include <boost/algorithm/string/predicate.hpp>
22 #include "s3select_oper.h"//class value
23 #include <boost/algorithm/string/predicate.hpp>
25 #define JSON_PROCESSING_LIMIT_REACHED 2
27 //TODO missing s3selectEngine namespace
29 bool s3select_json_parse_error(bool b
)
33 const char* error_str
= "failure while processing JSON document";
34 throw s3selectEngine::base_s3select_exception(error_str
, s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL
);
39 bool s3select_json_parse_error(const char* error
)
43 const char* error_str
= "failure while processing JSON document";
44 throw s3selectEngine::base_s3select_exception(error_str
, s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL
);
49 static auto iequal_predicate
= [](std::string
& it1
, std::string
& it2
)
51 return boost::iequals(it1
,it2
);
55 class ChunksStreamer
: public rapidjson::MemoryStream
{
57 //purpose: adding a method `resetBuffer` that enables to parse chunk after chunk
58 //per each new chunk it reset internal data members
61 std::string internal_buffer
;
65 ChunksStreamer():rapidjson::MemoryStream(0,0){next_src_
=0;next_size_
=0;}
67 ChunksStreamer(const Ch
*src
, size_t size
) : rapidjson::MemoryStream(src
,size
){next_src_
=0;next_size_
=0;}
69 //override Peek methode
72 if(RAPIDJSON_UNLIKELY(src_
== end_
))
74 if(next_src_
)//next chunk exist
75 {//upon reaching to end of current buffer, to switch with next one
90 //override Take method
93 if(RAPIDJSON_UNLIKELY(src_
== end_
))
95 if(next_src_
)//next chunk exist
96 {//upon reaching to end of current buffer, to switch with next one
111 void resetBuffer(char* buff
, size_t size
)
114 {//first time calling
123 {//save the next-chunk that will be used upon parser reaches end of current buffer
128 {// should not happen
129 std::cout
<< "can not replace pointers!!!" << std::endl
;//TODO exception
134 void saveRemainingBytes()
135 {//this routine called per each new chunk
136 //save the remaining bytes, before its overriden by the next-chunk.
137 size_t copy_left_sz
= getBytesLeft(); //should be very small
138 internal_buffer
.assign(src_
,copy_left_sz
);
140 src_
= internal_buffer
.data();
142 size_
= copy_left_sz
;
143 end_
= src_
+ copy_left_sz
;
146 size_t getBytesLeft() { return end_
- src_
; }
157 class json_variable_access
{
158 //purpose: a state-machine for json-variables.
159 //upon the syntax-parser accepts a variable (projection / where-clause) it create this object.
160 //this object get events (key,start-array ... etc) as the JSON reader scans the input,
161 //these events are advancing the states until it reaches to the last one, result with pushing value into scratch-area.
165 // to set the following.
166 std::vector
<std::string
>* from_clause
;
167 std::vector
<std::string
>* key_path
;
168 int* m_current_depth
;
169 std::function
<int(s3selectEngine::value
&,int)>* m_exact_match_cb
;
170 // a state number : (_1).a.b.c[ 17 ].d.e (a.b)=1 (c[)=2 (17)=3 (.d.e)=4
171 size_t current_state
;//contain the current state of the state machine for searching-expression (each JSON variable in SQL statement has a searching expression)
172 int nested_array_level
;//in the case of array within array it contain the nesting level
174 struct variable_state_md
{
175 std::vector
<std::string
> required_path
;//set by the syntax-parser. in the case of array its empty
176 int required_array_entry_no
;//set by the syntax-parser, in the case of object-key its -1.
177 int actual_array_entry_no
;//upon scanning the JSON input, this value increased by 1 each new element
178 int required_depth_size
;// depth of state, is aggregated (include the previous). it's the summary of key-elements and array-operator's.
179 int required_key_depth_size
;// same as the above, not including the array-operators.
180 int last_array_start
;//it actually mark the nested-array-level (array within array)
183 std::vector
<struct variable_state_md
> variable_states
;//vector is populated upon syntax phase.
187 json_variable_access():from_clause(nullptr),key_path(nullptr),m_current_depth(nullptr),m_exact_match_cb(nullptr),current_state(-1),nested_array_level(0)
191 std::vector
<std::string
>* reader_from_clause
,
192 std::vector
<std::string
>* reader_key_path
,
193 int* reader_current_depth
,
194 std::function
<int(s3selectEngine::value
&,int)>* excat_match_cb
)
195 {//this routine should be called before scanning the JSON input
196 from_clause
= reader_from_clause
;
197 key_path
= reader_key_path
;
198 m_exact_match_cb
= excat_match_cb
;
199 m_current_depth
= reader_current_depth
;
202 //loop on variable_states compute required_depth_size
207 variable_states
.clear();
212 auto f
= [](std::vector
<std::string
> x
){std::string res
;for(auto i
: x
){res
.append(i
);res
.append(".");};return res
;};
214 std::cout
<< "m_current_depth=" << *m_current_depth
<< " required_depth_size= " << reader_position_state().required_depth_size
<< " ";
215 std::cout
<< "variable_states[ current_state ].last_array_start=" << reader_position_state().last_array_start
;
216 std::cout
<< " current_state=" << current_state
<< " key_path=" << f(*key_path
) << std::endl
;
218 #define DBG {std::cout << "event=" << __FUNCTION__ << std::endl; debug_info();}
222 void compile_state_machine()
224 size_t aggregated_required_depth_size
= 0;
225 size_t aggregated_required_key_depth_size
= 0;
226 for(auto& v
: variable_states
)
228 if(v
.required_path
.size())
230 v
.required_depth_size
= aggregated_required_depth_size
+ v
.required_path
.size();//depth size in general, including array
231 v
.required_key_depth_size
= aggregated_required_key_depth_size
;//depth include ONLY key parts
232 aggregated_required_key_depth_size
+= v
.required_path
.size();
236 v
.required_depth_size
= aggregated_required_depth_size
+ 1;
238 aggregated_required_depth_size
= v
.required_depth_size
;
242 void push_variable_state(std::vector
<std::string
>& required_path
,int required_array_entry_no
)
244 struct variable_state_md new_state
={required_path
,required_array_entry_no
,-1,0,0,-1};
245 variable_states
.push_back(new_state
);
246 //TODO required_path.size() > 0 or required_path,required_array_entry_no>=0 : not both
247 compile_state_machine();
250 struct variable_state_md
& reader_position_state()
252 if (current_state
>=variable_states
.size())
254 const char* out_of_range
= "\nJSON reader failed due to array-out-of-range\n";
255 throw s3selectEngine::base_s3select_exception(out_of_range
,s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL
);
258 return variable_states
[ current_state
];
261 bool is_array_state()
263 return (reader_position_state().required_array_entry_no
>=0);
266 bool is_reader_located_on_required_depth()
268 return (*m_current_depth
== reader_position_state().required_depth_size
);
271 bool is_on_final_state()
273 return ((size_t)current_state
== (variable_states
.size()));
274 //&& *m_current_depth == variable_states[ current_state -1 ].required_depth_size);
276 // NOTE: by ignoring the current-depth, the matcher gives precedence to key-path match, while not ignoring accessing using array
277 // meaning, upon requeting a.b[12] , the [12] is not ignored, the a<-->b distance should be calculated as key distance, i.e. not counting array/object with *no keys*.
278 // user may request 'select _1.phonearray.num'; the reader will traverse `num` exist in `phonearray`
281 bool is_reader_reached_required_array_entry()
283 return (reader_position_state().actual_array_entry_no
== reader_position_state().required_array_entry_no
);
286 bool is_reader_passed_required_array_entry()
288 return (reader_position_state().actual_array_entry_no
> reader_position_state().required_array_entry_no
);
291 bool is_reader_located_on_array_according_to_current_state()
293 return (nested_array_level
== reader_position_state().last_array_start
);
296 bool is_reader_position_depth_lower_than_required()
298 return (*m_current_depth
< reader_position_state().required_depth_size
);
301 bool is_reader_located_on_array_entry_according_to_current_state()
303 return (reader_position_state().actual_array_entry_no
== reader_position_state().required_array_entry_no
);
306 void increase_current_state()
310 if((size_t)current_state
>= (variable_states
.size())) return;
314 void decrease_current_state()
318 if(current_state
== 0) return;
326 if(reader_position_state().required_path
.size())//state has a key
328 std::vector
<std::string
>* filter
= &reader_position_state().required_path
;
329 auto required_key_depth_size
= reader_position_state().required_key_depth_size
;
330 if(std::equal((*key_path
).begin()+(*from_clause
).size() + required_key_depth_size
, //key-path-start-point + from-clause-depth-size + key-depth
333 (*filter
).end(), iequal_predicate
))
335 increase_current_state();//key match, advancing to next
340 void increase_array_index()
342 if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary? what about nesting level
345 reader_position_state().actual_array_entry_no
++;
353 if(is_reader_position_depth_lower_than_required())
354 {//actual key-path is shorter than required
355 decrease_current_state();
359 if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary?; json_element_state.back() != ARRAY_STATE)
360 {//key-path-depth matches, and it an array
361 if(is_reader_reached_required_array_entry())
362 {//we reached the required array entry
363 increase_current_state();
365 else if(is_reader_passed_required_array_entry())
366 {//had passed the array entry
367 decrease_current_state();
372 void new_value(s3selectEngine::value
& v
,size_t json_index
)
376 if(is_on_final_state())
378 (*m_exact_match_cb
)(v
, json_index
);
379 decrease_current_state();//TODO why decrease? the state-machine reached its final destination, and it should be only one result
381 increase_array_index();//next-value in array
386 increase_array_index();
391 //init the correct array index
394 if(is_reader_located_on_array_according_to_current_state())
395 {//it reached end of required array
396 reader_position_state().actual_array_entry_no
= 0;
397 decrease_current_state();
399 nested_array_level
--;
401 // option 1. move out of one array, and enter a new one; option-2. enter an object
402 increase_array_index();//increase only upon correct array //TODO move it into dec_key()?
410 nested_array_level
++;
411 if(is_reader_located_on_required_depth())
412 {//reader entered an array required by JSON variable
413 reader_position_state().actual_array_entry_no
= 0;
414 reader_position_state().last_array_start
= nested_array_level
;
416 if(is_reader_located_on_array_entry_according_to_current_state())
417 {//we reached the required array entry -> next state
418 increase_current_state();
423 }; //json_variable_access
425 class json_variables_operations
{
429 std::vector
<std::pair
<json_variable_access
*,size_t>> json_statement_variables
{};
431 void init(std::vector
<std::pair
<json_variable_access
*,size_t>>& jsv
, //TODO init upon construction?
432 std::vector
<std::string
>* from_clause
,
433 std::vector
<std::string
>* key_path
,
435 std::function
<int(s3selectEngine::value
&,int)>* exact_match_cb
)
437 json_statement_variables
= jsv
;
439 for(auto& var
: json_statement_variables
)
441 var
.first
->init(from_clause
,
450 for(auto& j
: json_statement_variables
)
452 j
.first
->start_array();
457 for(auto& j
: json_statement_variables
)
459 j
.first
->end_array();
464 for(auto& j
: json_statement_variables
)
471 for(auto& j
: json_statement_variables
)
473 j
.first
->end_object();
478 for(auto& j
: json_statement_variables
)
483 void new_value(s3selectEngine::value
& v
)
485 for(auto& j
: json_statement_variables
)
487 j
.first
->new_value(v
,j
.second
);
490 };//json_variables_operations
492 class JsonParserHandler
: public rapidjson::BaseReaderHandler
<rapidjson::UTF8
<>, JsonParserHandler
> {
496 typedef enum {OBJECT_STATE
,ARRAY_STATE
} en_json_elm_state_t
;
497 typedef std::pair
<std::vector
<std::string
>, s3selectEngine::value
> json_key_value_t
;
499 row_state state
= row_state::NA
;
500 std::function
<int(s3selectEngine::value
&,int)> m_exact_match_cb
;
501 std::function
<int(s3selectEngine::scratch_area::json_key_value_t
&)> m_star_operation_cb
;
503 json_variables_operations variable_match_operations
;
505 std::vector
<std::string
> from_clause
{};
507 s3selectEngine::value var_value
;
508 ChunksStreamer stream_buffer
;
509 bool init_buffer_stream
;
510 rapidjson::Reader reader
;
511 std::vector
<en_json_elm_state_t
> json_element_state
;
512 std::vector
<std::string
> key_path
;
513 std::function
<int(void)> m_s3select_processing
;
514 int m_start_row_depth
;
516 bool m_star_operation
;
517 int m_sql_processing_status
;
519 JsonParserHandler() : prefix_match(false),init_buffer_stream(false),m_start_row_depth(-1),m_current_depth(0),m_star_operation(false),m_sql_processing_status(0)
523 std::string
get_key_path()
526 for(const auto & i
: key_path
)
529 res
.append(std::string("/"));
536 if (json_element_state
.size()) {
537 if(json_element_state
.back() != ARRAY_STATE
) {
538 if(key_path
.size() != 0) {
544 variable_match_operations
.dec_key();
546 //TODO m_current_depth-- should done here
547 if(m_start_row_depth
> m_current_depth
)
549 prefix_match
= false;
552 if (state
== row_state::ARRAY_START_ROW
&& m_start_row_depth
== m_current_depth
) {
553 m_sql_processing_status
= m_s3select_processing(); //per each element in array
559 void push_new_key_value(s3selectEngine::value
& v
)
561 if (m_star_operation
&& prefix_match
)
563 json_key_value_t
key_value(key_path
,v
);
564 m_star_operation_cb(key_value
);
567 variable_match_operations
.new_value(v
);
574 push_new_key_value(var_value
);
579 push_new_key_value(var_value
);
584 push_new_key_value(var_value
);
587 bool Uint(unsigned u
) {
589 push_new_key_value(var_value
);
592 bool Int64(int64_t i
) {
594 push_new_key_value(var_value
);
597 bool Uint64(uint64_t u
) {
599 push_new_key_value(var_value
);
602 bool Double(double d
) {
604 push_new_key_value(var_value
);
607 bool String(const char* str
, rapidjson::SizeType length
, bool copy
) {
610 push_new_key_value(var_value
);
614 bool Key(const char* str
, rapidjson::SizeType length
, bool copy
) {
615 key_path
.push_back(std::string(str
));
617 if(from_clause
.size() == 0 || std::equal(key_path
.begin(), key_path
.end(), from_clause
.begin(), from_clause
.end(), iequal_predicate
)) {
621 variable_match_operations
.key();
626 bool is_already_row_started()
628 if(state
== row_state::OBJECT_START_ROW
|| state
== row_state::ARRAY_START_ROW
)
635 json_element_state
.push_back(OBJECT_STATE
);
637 if (prefix_match
&& !is_already_row_started()) {
638 state
= row_state::OBJECT_START_ROW
;
639 m_start_row_depth
= m_current_depth
;
646 bool EndObject(rapidjson::SizeType memberCount
) {
647 json_element_state
.pop_back();
650 variable_match_operations
.end_object();
653 if (state
== row_state::OBJECT_START_ROW
&& (m_start_row_depth
> m_current_depth
)) {
654 m_sql_processing_status
= m_s3select_processing();
655 state
= row_state::NA
;
661 json_element_state
.push_back(ARRAY_STATE
);
663 if (prefix_match
&& !is_already_row_started()) {
664 state
= row_state::ARRAY_START_ROW
;
665 m_start_row_depth
= m_current_depth
;
668 variable_match_operations
.start_array();
673 bool EndArray(rapidjson::SizeType elementCount
) {
674 json_element_state
.pop_back();
678 if (state
== row_state::ARRAY_START_ROW
&& (m_start_row_depth
> m_current_depth
)) {
679 state
= row_state::NA
;
682 variable_match_operations
.end_array();
687 void set_prefix_match(std::vector
<std::string
>& requested_prefix_match
)
688 {//purpose: set the filter according to SQL statement(from clause)
689 from_clause
= requested_prefix_match
;
690 if(from_clause
.size() ==0)
693 m_start_row_depth
= m_current_depth
;
697 void set_statement_json_variables(std::vector
<std::pair
<json_variable_access
*,size_t>>& statement_variables
)
698 {//purpose: set the json variables extracted from the SQL statement(projection columns, predicates columns)
699 variable_match_operations
.init(
707 void set_exact_match_callback(std::function
<int(s3selectEngine::value
&, int)> f
)
708 {//purpose: upon key is matching one of the exact filters, the callback is called.
709 m_exact_match_cb
= f
;
712 void set_s3select_processing_callback(std::function
<int(void)>& f
)
713 {//purpose: execute s3select statement on matching row (according to filters)
714 m_s3select_processing
= f
;
717 void set_push_per_star_operation_callback( std::function
<int(s3selectEngine::scratch_area::json_key_value_t
&)> cb
)
719 m_star_operation_cb
= cb
;
722 void set_star_operation()
724 m_star_operation
= true;
727 int process_json_buffer(char* json_buffer
,size_t json_buffer_sz
, bool end_of_stream
=false)
728 {//user keeps calling with buffers, the method is not aware of the object size.
732 if(!init_buffer_stream
)
734 //set the memoryStreamer
735 reader
.IterativeParseInit();
736 init_buffer_stream
= true;
740 //the non-processed bytes plus the next chunk are copy into main processing buffer
742 stream_buffer
.resetBuffer(json_buffer
, json_buffer_sz
);
744 while (!reader
.IterativeParseComplete()) {
745 reader
.IterativeParseNext
<rapidjson::kParseDefaultFlags
>(stream_buffer
, *this);
747 //once all key-values move into s3select(for further filtering and processing), it should be cleared
749 //TODO in the case the chunk is too small or some value in input is too big, the parsing will fail.
750 if (!end_of_stream
&& stream_buffer
.next_src_
==0 && stream_buffer
.getBytesLeft() < 2048)
751 {//the non processed bytes will be processed on next fetched chunk
752 //TODO save remaining-bytes to internal buffer (or caller will use 2 sets of buffer)
753 stream_buffer
.saveRemainingBytes();
756 if(m_sql_processing_status
== JSON_PROCESSING_LIMIT_REACHED
)//return status(int) from callback
758 return JSON_PROCESSING_LIMIT_REACHED
;
762 if(reader
.HasParseError()) {
763 rapidjson::ParseErrorCode c
= reader
.GetParseErrorCode();
764 size_t ofs
= reader
.GetErrorOffset();
765 std::stringstream error_str
;
766 error_str
<< "parsing error. code:" << c
<< " position: " << ofs
<< std::endl
;
767 throw s3selectEngine::base_s3select_exception(error_str
.str(), s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL
);
770 }//while reader.IterativeParseComplete
772 catch(std::exception
&e
){
773 std::stringstream error_str
;
774 error_str
<< "failed to process JSON : " << e
.what() << std::endl
;
775 throw s3selectEngine::base_s3select_exception(error_str
.str(), s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL
);