};
static push_cast_expr g_push_cast_expr;
+struct push_cast_decimal_expr : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_cast_decimal_expr g_push_cast_decimal_expr;
+
+struct push_decimal_operator : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_decimal_operator g_push_decimal_operator;
+
struct push_data_type : public base_ast_builder
{
void builder(s3select* self, const char* a, const char* b) const;
};
static push_time_to_string_dynamic g_push_time_to_string_dynamic;
+struct push_string_to_time_constant : public base_ast_builder
+{
+ void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_string_to_time_constant g_push_string_to_time_constant;
+
struct push_array_number : public base_ast_builder
{
void builder(s3select* self, const char* a, const char* b) const;
s3select_functions m_s3select_functions;
std::string error_description;
s3select_allocator m_s3select_allocator;
- bool aggr_flow;
- bool m_json_query;
+ bool aggr_flow = false;
+ bool m_json_query = false;
std::set<base_statement*> m_ast_nodes_to_delete;
+ base_function* m_to_timestamp_for_clean = nullptr;
#define BOOST_BIND_ACTION( push_name ) boost::bind( &push_name::operator(), g_ ## push_name, const_cast<s3select*>(&self), _1, _2)
public:
+ std::set<base_statement*>& get_ast_nodes_to_delete()
+ {
+ return m_ast_nodes_to_delete;
+ }
+
+ base_function* & get_to_timestamp_for_clean()
+ {
+ return m_to_timestamp_for_clean;
+ }
+
actionQ* getAction()
{
return &m_actionQ;
{//the json_variable_access object is allocated by S3SELECT_NEW. this object contains stl-vector that should be free
x.first->~json_variable_access();
}
+ if(m_to_timestamp_for_clean)
+ {
+ m_to_timestamp_for_clean->dtor();
+ }
}
#define JSON_ROOT_OBJECT "s3object[*]"
arithmetic_argument = (float_number)[BOOST_BIND_ACTION(push_float_number)] | (number)[BOOST_BIND_ACTION(push_number)] | (json_variable_name)[BOOST_BIND_ACTION(push_json_variable)] |
(column_pos)[BOOST_BIND_ACTION(push_column_pos)] |
- (string)[BOOST_BIND_ACTION(push_string)] | (datediff) | (dateadd) | (extract) | (time_to_string_constant) | (time_to_string_dynamic) |
+ (string)[BOOST_BIND_ACTION(push_string)] | (backtick_string) | (datediff) | (dateadd) | (extract) | (time_to_string_constant) | (time_to_string_dynamic) |
(cast) | (substr) | (trim) | (when_case_value_when) | (when_case_else_projection) |
(function) | (variable)[BOOST_BIND_ACTION(push_variable)]; //function is pushed by right-term
- cast = (S3SELECT_KW("cast") >> '(' >> factor >> S3SELECT_KW("as") >> (data_type)[BOOST_BIND_ACTION(push_data_type)] >> ')') [BOOST_BIND_ACTION(push_cast_expr)];
+ cast = cast_as_data_type | cast_as_decimal_expr ;
+
+ cast_as_data_type = (S3SELECT_KW("cast") >> '(' >> factor >> S3SELECT_KW("as") >> (data_type) >> ')') [BOOST_BIND_ACTION(push_cast_expr)];
+
+ cast_as_decimal_expr = (S3SELECT_KW("cast") >> '(' >> factor >> S3SELECT_KW("as") >> decimal_operator >> ')') [BOOST_BIND_ACTION(push_cast_decimal_expr)];
- data_type = (S3SELECT_KW("int") | S3SELECT_KW("float") | S3SELECT_KW("string") | S3SELECT_KW("timestamp") | S3SELECT_KW("bool") );
+ decimal_operator = (S3SELECT_KW("decimal") >> '(' >> (number)[BOOST_BIND_ACTION(push_number)] >> ',' >> (number)[BOOST_BIND_ACTION(push_number)] >> ')')
+ [BOOST_BIND_ACTION(push_decimal_operator)];
+
+ data_type = (S3SELECT_KW("int") | S3SELECT_KW("float") | S3SELECT_KW("string") | S3SELECT_KW("timestamp") | S3SELECT_KW("bool"))[BOOST_BIND_ACTION(push_data_type)];
substr = (substr_from) | (substr_from_for);
float_number = bsc::real_p;
- string = (bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"")) | (bsc::str_p("\'") >> *( bsc::anychar_p - bsc::str_p("\'") ) >> bsc::str_p("\'")) ;
+ string = (bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"")) | (bsc::str_p("\'") >> *( bsc::anychar_p - bsc::str_p("\'") ) >> bsc::str_p("\'"));
+
+ backtick_string = (bsc::str_p("`") >> *( bsc::anychar_p - bsc::str_p("`") ) >> bsc::str_p("`")) [BOOST_BIND_ACTION(push_string_to_time_constant)];
column_pos = (variable_name >> "." >> column_pos_name) | column_pos_name; //TODO what about space
}
- bsc::rule<ScannerT> cast, data_type, variable, json_variable_name, variable_name, select_expr, select_expr_base, select_expr_base_, s3_object, where_clause, limit_number, number, float_number, string, from_expression;
+ bsc::rule<ScannerT> cast, data_type, variable, json_variable_name, variable_name, select_expr, select_expr_base, select_expr_base_, s3_object, where_clause, limit_number;
+ bsc::rule<ScannerT> number, float_number, string, backtick_string, from_expression, cast_as_data_type, cast_as_decimal_expr, decimal_operator;
bsc::rule<ScannerT> cmp_operand, arith_cmp, condition_expression, arithmetic_predicate, logical_predicate, factor;
bsc::rule<ScannerT> trim, trim_whitespace_both, trim_one_side_whitespace, trim_anychar_anyside, trim_type, trim_remove_type, substr, substr_from, substr_from_for;
bsc::rule<ScannerT> datediff, dateadd, extract, date_part, date_part_extract, time_to_string_constant, time_to_string_dynamic;
self->getAction()->whenThenQ.push_back(func);
}
+void push_decimal_operator::builder(s3select* self, const char* a, const char* b) const
+{//decimal(integer,integer)
+ std::string token(a, b);
+
+ base_statement* lhs = nullptr;
+ base_statement* rhs = nullptr;
+
+ //right side (decimal operator)
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ rhs = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+
+ //left side (decimal operator)
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ lhs = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+
+ __function* func = S3SELECT_NEW(self, __function, "#decimal_operator#", self->getS3F());
+
+ func->push_argument(rhs);
+ func->push_argument(lhs);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
+void push_cast_decimal_expr::builder(s3select* self, const char* a, const char* b) const
+{
+ //cast(expression as decimal(x,y))
+ std::string token(a, b);
+
+ base_statement* lhs = nullptr;
+ base_statement* rhs = nullptr;
+
+ //right side (decimal operator)
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ rhs = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+
+ //left side - expression
+ if (self->getAction()->exprQ.empty() == false)
+ {
+ lhs = self->getAction()->exprQ.back();
+ self->getAction()->exprQ.pop_back();
+ }
+
+ __function* func = S3SELECT_NEW(self, __function, "#cast_as_decimal#", self->getS3F());
+
+ func->push_argument(rhs);
+ func->push_argument(lhs);
+
+ self->getAction()->exprQ.push_back(func);
+}
+
void push_cast_expr::builder(s3select* self, const char* a, const char* b) const
{
//cast(expression as int/float/string/timestamp) --> new function "int/float/string/timestamp" ( args = expression )
{
std::string token(a, b);
- auto cast_operator = [&](const char *s){return strncmp(a,s,strlen(s))==0;};
+ auto cast_operator = [&](const char *s){return strncasecmp(a,s,strlen(s))==0;};
if(cast_operator("int"))
{
self->getAction()->exprQ.push_back(func);
}
+void push_string_to_time_constant::builder(s3select* self, const char* a, const char* b) const
+{
+ //token could be a string or a timestamp, we need to check it
+ //upon it is a timestamp format, we need to push the variable as timestamp or else, it as a string
+ //the purpose is to use backticks to convert the string to timestamp in parsing time instead of processing time(Trino uses this approach)
+
+ a++; //remove the first quote
+ b--;
+ std::string token(a, b);
+
+ _fn_to_timestamp* to_timestamp = S3SELECT_NEW(self, _fn_to_timestamp);//TODO the _fn_to_timestamp should release the memory (cleanup)
+ bs_stmt_vec_t args;
+
+ variable* var_string = S3SELECT_NEW(self, variable, token, variable::var_t::COLUMN_VALUE);
+ variable* timestamp = S3SELECT_NEW(self, variable, token, variable::var_t::COLUMN_VALUE);
+
+ (self->get_to_timestamp_for_clean()) = to_timestamp;
+ var_string->push_for_cleanup(self->get_ast_nodes_to_delete());
+ timestamp->push_for_cleanup(self->get_ast_nodes_to_delete());
+
+ args.push_back(var_string);
+
+ try {
+ (*to_timestamp)(&args, timestamp);
+ }
+ catch(std::exception& e)
+ {
+ //it is not a timestamp, it is a string
+ self->getAction()->exprQ.push_back(var_string);
+ return;
+ }
+
+ self->getAction()->exprQ.push_back(timestamp);
+}
+
struct s3select_csv_definitions //TODO
{
char row_delimiter;
}
else
{
+ //save the where-clause evaluation result (performance perspective)
+ bool where_clause_result = false;
do
{
row_fetch_data();
a.second->invalidate_cache_result();
}
}
- while (multiple_row_processing() && m_where_clause && !m_where_clause->eval().is_true() && !(m_is_limit_on && m_processed_rows == m_limit));
+ while (multiple_row_processing() && m_where_clause && !(where_clause_result = m_where_clause->eval().is_true()) && !(m_is_limit_on && m_processed_rows == m_limit));
+
+ // in the of JSON it needs to evaluate the where-clause(for the first time)
+ if(!multiple_row_processing() && m_where_clause){
+ where_clause_result = m_where_clause->eval().is_true();
+ }
- if(m_where_clause && !m_where_clause->eval().is_true() && m_is_limit_on && m_processed_rows == m_limit)
+ if(m_where_clause && ! where_clause_result && m_is_limit_on && m_processed_rows == m_limit)
{
return m_sql_processing_status = Status::LIMIT_REACHED;
}
if(!multiple_row_processing())
{
- found = !m_where_clause || m_where_clause->eval().is_true();
+ found = !m_where_clause || where_clause_result;
}
if(found)
}; //base_s3object
//TODO config / default-value
-#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (4 * 1024)
+#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
class csv_object : public base_s3object
{
std::string m_last_line;
size_t m_processed_bytes;
int64_t m_number_of_tokens;
+ size_t m_skip_x_first_bytes=0;
std::function<int(std::string&)> fp_s3select_result_format=nullptr;
std::function<int(std::string&)> fp_s3select_header_format=nullptr;
merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
m_previous_line = false;
m_skip_first_line = true;
+ m_skip_x_first_bytes = tmp_buff.size()+1;
//processing the merged row (previous broken row)
run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
m_is_to_aggregate = do_aggregate;
m_skip_last_line = skip_last_line;
+ if(skip_first_line)
+ {
+ //the stream may start in the middle of a row (maybe in the middle of a quote).
+ //at this point the stream should skip the first row(broken row).
+ //the csv_parser should be init with the fixed stream position.
+ m_stream += m_skip_x_first_bytes;
+ m_skip_x_first_bytes=0;
+ }
+
CSVParser _csv_parser("csv", m_stream, m_end_stream);
csv_parser = &_csv_parser;
csv_parser->set_csv_def( m_csv_defintion.row_delimiter,
{
extract_csv_header_info();
}
-
- if(skip_first_line)
- {
- csv_parser->next_line();
- }
-
do
{
m_sql_processing_status = Status::INITIAL_STAT;
if(m_init_json_processor_ind)
return;
+ m_init_json_processor_ind = true;
std::function<int(void)> f_sql = [this](void){auto res = sql_execution_on_row_cb();return res;};
std::function<int(s3selectEngine::value&, int)>
f_push_to_scratch = [this](s3selectEngine::value& value,int json_var_idx){return push_into_scratch_area_cb(value,json_var_idx);};
//upon star-operation(in statemenet) the callback pushes the key-path and value into scratch-area
JsonHandler.set_push_per_star_operation_callback(f_push_key_value_into_scratch_area_per_star_operation);
+ //the json-from-clause is unique and should exist. otherwise it's a failure.
+ if(query->getAction()->json_from_clause.empty())
+ {
+ JsonHandler.m_fatal_initialization_ind = true;
+ JsonHandler.m_fatal_initialization_description = "the SQL statement is not align with the correct syntax of JSON statement. from-clause is missing.";
+ return;
+ }
+
//setting the from clause path
if(query->getAction()->json_from_clause[0] == JSON_ROOT_OBJECT)
{
}
m_sa->set_parquet_type();//TODO json type
- m_init_json_processor_ind = true;
}
json_object(s3select* query):base_s3object(query),m_processed_bytes(0),m_end_of_stream(false),m_row_count(0),star_operation_ind(false),m_init_json_processor_ind(false)
m_processed_bytes += stream_length;
set_sql_result(result);
+ if(JsonHandler.is_fatal_initialization())
+ {
+ throw base_s3select_exception(JsonHandler.m_fatal_initialization_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+ }
+
if(!stream_length || !json_stream)//TODO m_processed_bytes(?)
{//last processing cycle
JsonHandler.process_json_buffer(0, 0, true);//TODO end-of-stream = end-of-row