]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/s3select/include/s3select.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / s3select / include / s3select.h
index 2ff410f6aa05883880a2ab0b79754d2dfb7b2fc6..3ac11135130eabdac7ad96e5f9f8bfd1d2bb68a7 100644 (file)
@@ -227,6 +227,18 @@ struct push_cast_expr : public base_ast_builder
 };
 static push_cast_expr g_push_cast_expr;
 
+struct push_cast_decimal_expr : public base_ast_builder
+{
+  void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_cast_decimal_expr g_push_cast_decimal_expr;
+
+struct push_decimal_operator : public base_ast_builder
+{
+  void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_decimal_operator g_push_decimal_operator;
+
 struct push_data_type : public base_ast_builder
 {
   void builder(s3select* self, const char* a, const char* b) const;
@@ -430,6 +442,12 @@ struct push_time_to_string_dynamic : public base_ast_builder
 };
 static push_time_to_string_dynamic g_push_time_to_string_dynamic;
 
+struct push_string_to_time_constant : public base_ast_builder
+{
+  void builder(s3select* self, const char* a, const char* b) const;
+};
+static push_string_to_time_constant g_push_string_to_time_constant;
+
 struct push_array_number :  public base_ast_builder
 {
   void builder(s3select* self, const char* a, const char* b) const;
@@ -457,14 +475,25 @@ private:
   s3select_functions m_s3select_functions;
   std::string error_description;
   s3select_allocator m_s3select_allocator;
-  bool aggr_flow;
-  bool m_json_query;
+  bool aggr_flow = false;
+  bool m_json_query = false;
   std::set<base_statement*> m_ast_nodes_to_delete;
+  base_function* m_to_timestamp_for_clean = nullptr;
 
 #define BOOST_BIND_ACTION( push_name ) boost::bind( &push_name::operator(), g_ ## push_name, const_cast<s3select*>(&self), _1, _2)
 
 public:
 
+  std::set<base_statement*>& get_ast_nodes_to_delete()
+  {
+    return m_ast_nodes_to_delete;
+  }
+
+  base_function* & get_to_timestamp_for_clean()
+  {
+    return m_to_timestamp_for_clean;
+  }
+
   actionQ* getAction()
   {
     return &m_actionQ;
@@ -664,6 +693,10 @@ public:
        {//the json_variable_access object is allocated by S3SELECT_NEW. this object contains stl-vector that should be free 
                x.first->~json_variable_access();
        }
+  if(m_to_timestamp_for_clean)
+  { 
+    m_to_timestamp_for_clean->dtor();
+  }
   }
 
 #define JSON_ROOT_OBJECT "s3object[*]"
@@ -764,13 +797,20 @@ public:
 
       arithmetic_argument = (float_number)[BOOST_BIND_ACTION(push_float_number)] |  (number)[BOOST_BIND_ACTION(push_number)] | (json_variable_name)[BOOST_BIND_ACTION(push_json_variable)] |
                            (column_pos)[BOOST_BIND_ACTION(push_column_pos)] |
-                            (string)[BOOST_BIND_ACTION(push_string)] | (datediff) | (dateadd) | (extract) | (time_to_string_constant) | (time_to_string_dynamic) |
+                            (string)[BOOST_BIND_ACTION(push_string)] | (backtick_string) | (datediff) | (dateadd) | (extract) | (time_to_string_constant) | (time_to_string_dynamic) |
                             (cast) | (substr) | (trim) | (when_case_value_when) | (when_case_else_projection) |
                             (function) | (variable)[BOOST_BIND_ACTION(push_variable)]; //function is pushed by right-term
 
-      cast = (S3SELECT_KW("cast") >> '(' >> factor >> S3SELECT_KW("as") >> (data_type)[BOOST_BIND_ACTION(push_data_type)] >> ')') [BOOST_BIND_ACTION(push_cast_expr)];
+      cast = cast_as_data_type | cast_as_decimal_expr ;
+
+      cast_as_data_type = (S3SELECT_KW("cast") >> '(' >> factor >> S3SELECT_KW("as") >> (data_type) >> ')') [BOOST_BIND_ACTION(push_cast_expr)];
+
+      cast_as_decimal_expr = (S3SELECT_KW("cast") >> '(' >> factor >> S3SELECT_KW("as") >> decimal_operator >> ')') [BOOST_BIND_ACTION(push_cast_decimal_expr)];
 
-      data_type = (S3SELECT_KW("int") | S3SELECT_KW("float") | S3SELECT_KW("string") |  S3SELECT_KW("timestamp") | S3SELECT_KW("bool") );
+      decimal_operator = (S3SELECT_KW("decimal") >> '(' >> (number)[BOOST_BIND_ACTION(push_number)] >> ',' >> (number)[BOOST_BIND_ACTION(push_number)] >> ')')
+                                       [BOOST_BIND_ACTION(push_decimal_operator)];
+
+      data_type = (S3SELECT_KW("int") | S3SELECT_KW("float") | S3SELECT_KW("string") |  S3SELECT_KW("timestamp") | S3SELECT_KW("bool"))[BOOST_BIND_ACTION(push_data_type)];
      
       substr = (substr_from) | (substr_from_for);
       
@@ -808,7 +848,9 @@ public:
 
       float_number = bsc::real_p;
 
-      string = (bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"")) | (bsc::str_p("\'") >> *( bsc::anychar_p - bsc::str_p("\'") ) >> bsc::str_p("\'")) ;
+      string = (bsc::str_p("\"") >> *( bsc::anychar_p - bsc::str_p("\"") ) >> bsc::str_p("\"")) | (bsc::str_p("\'") >> *( bsc::anychar_p - bsc::str_p("\'") ) >> bsc::str_p("\'"));
+
+      backtick_string = (bsc::str_p("`") >> *( bsc::anychar_p - bsc::str_p("`") ) >> bsc::str_p("`")) [BOOST_BIND_ACTION(push_string_to_time_constant)];
 
       column_pos = (variable_name >> "." >> column_pos_name) | column_pos_name; //TODO what about space
 
@@ -836,7 +878,8 @@ public:
     }
 
 
-    bsc::rule<ScannerT> cast, data_type, variable, json_variable_name, variable_name, select_expr, select_expr_base, select_expr_base_, s3_object, where_clause, limit_number, number, float_number, string, from_expression;
+    bsc::rule<ScannerT> cast, data_type, variable, json_variable_name, variable_name, select_expr, select_expr_base, select_expr_base_, s3_object, where_clause, limit_number;
+    bsc::rule<ScannerT> number, float_number, string, backtick_string, from_expression, cast_as_data_type, cast_as_decimal_expr, decimal_operator;
     bsc::rule<ScannerT> cmp_operand, arith_cmp, condition_expression, arithmetic_predicate, logical_predicate, factor; 
     bsc::rule<ScannerT> trim, trim_whitespace_both, trim_one_side_whitespace, trim_anychar_anyside, trim_type, trim_remove_type, substr, substr_from, substr_from_for;
     bsc::rule<ScannerT> datediff, dateadd, extract, date_part, date_part_extract, time_to_string_constant, time_to_string_dynamic;
@@ -1734,6 +1777,65 @@ void push_when_value_then::builder(s3select* self, const char* a, const char* b)
  self->getAction()->whenThenQ.push_back(func);
 }
 
+void push_decimal_operator::builder(s3select* self, const char* a, const char* b) const
+{//decimal(integer,integer)
+  std::string token(a, b);
+
+  base_statement* lhs = nullptr;
+  base_statement* rhs = nullptr;
+
+  //right side (decimal operator)
+  if (self->getAction()->exprQ.empty() == false)
+  {
+    rhs = self->getAction()->exprQ.back();
+    self->getAction()->exprQ.pop_back();
+  }
+
+  //left side (decimal operator)
+  if (self->getAction()->exprQ.empty() == false)
+  {
+    lhs = self->getAction()->exprQ.back();
+    self->getAction()->exprQ.pop_back();
+  }
+
+  __function* func = S3SELECT_NEW(self, __function, "#decimal_operator#", self->getS3F());
+
+  func->push_argument(rhs);
+  func->push_argument(lhs);
+
+  self->getAction()->exprQ.push_back(func);
+}
+
+void push_cast_decimal_expr::builder(s3select* self, const char* a, const char* b) const
+{
+  //cast(expression as decimal(x,y))
+  std::string token(a, b);
+
+  base_statement* lhs = nullptr;
+  base_statement* rhs = nullptr;
+
+  //right side (decimal operator)
+  if (self->getAction()->exprQ.empty() == false)
+  {
+    rhs = self->getAction()->exprQ.back();
+    self->getAction()->exprQ.pop_back();
+  }
+
+  //left side - expression
+  if (self->getAction()->exprQ.empty() == false)
+  {
+    lhs = self->getAction()->exprQ.back();
+    self->getAction()->exprQ.pop_back();
+  }
+
+  __function* func = S3SELECT_NEW(self, __function, "#cast_as_decimal#", self->getS3F());
+
+  func->push_argument(rhs);
+  func->push_argument(lhs);
+
+  self->getAction()->exprQ.push_back(func);
+}
+
 void push_cast_expr::builder(s3select* self, const char* a, const char* b) const
 {
   //cast(expression as int/float/string/timestamp) --> new function "int/float/string/timestamp" ( args = expression )
@@ -1757,7 +1859,7 @@ void push_data_type::builder(s3select* self, const char* a, const char* b) const
 {
   std::string token(a, b);
 
-  auto cast_operator = [&](const char *s){return strncmp(a,s,strlen(s))==0;};
+  auto cast_operator = [&](const char *s){return strncasecmp(a,s,strlen(s))==0;};
 
   if(cast_operator("int"))
   {
@@ -2003,6 +2105,41 @@ void push_time_to_string_dynamic::builder(s3select* self, const char* a, const c
   self->getAction()->exprQ.push_back(func);
 }
 
+void push_string_to_time_constant::builder(s3select* self, const char* a, const char* b) const
+{
+  //token could be a string or a timestamp, we need to check it
+  //upon it is a timestamp format, we need to push the variable as timestamp or else, it as a string
+  //the purpose is to use backticks to convert the string to timestamp in parsing time instead of processing time(Trino uses this approach)
+  
+  a++; //remove the first quote
+  b--;
+  std::string token(a, b);
+
+  _fn_to_timestamp* to_timestamp = S3SELECT_NEW(self, _fn_to_timestamp);//TODO the _fn_to_timestamp should release the memory (cleanup)
+  bs_stmt_vec_t args;
+
+  variable* var_string = S3SELECT_NEW(self, variable, token, variable::var_t::COLUMN_VALUE);
+  variable* timestamp = S3SELECT_NEW(self, variable, token, variable::var_t::COLUMN_VALUE);
+
+  (self->get_to_timestamp_for_clean()) = to_timestamp;
+  var_string->push_for_cleanup(self->get_ast_nodes_to_delete());
+  timestamp->push_for_cleanup(self->get_ast_nodes_to_delete());
+  
+  args.push_back(var_string);
+
+  try {
+    (*to_timestamp)(&args, timestamp);
+  }
+  catch(std::exception& e)
+  {
+    //it is not a timestamp, it is a string
+    self->getAction()->exprQ.push_back(var_string);
+    return;
+  }
+
+  self->getAction()->exprQ.push_back(timestamp);
+}
+
 struct s3select_csv_definitions //TODO 
 {
     char row_delimiter;
@@ -2240,6 +2377,8 @@ public:
     }
     else
     {
+      //save the where-clause evaluation result (performance perspective)
+      bool where_clause_result = false;
       do
       {
        row_fetch_data();
@@ -2256,9 +2395,14 @@ public:
           a.second->invalidate_cache_result();
         }
       }
-      while (multiple_row_processing() && m_where_clause && !m_where_clause->eval().is_true() && !(m_is_limit_on && m_processed_rows == m_limit));
+      while (multiple_row_processing() && m_where_clause && !(where_clause_result = m_where_clause->eval().is_true()) && !(m_is_limit_on && m_processed_rows == m_limit));
+
+       // in the of JSON it needs to evaluate the where-clause(for the first time)
+      if(!multiple_row_processing() && m_where_clause){
+       where_clause_result = m_where_clause->eval().is_true();
+      }
 
-      if(m_where_clause && !m_where_clause->eval().is_true() && m_is_limit_on && m_processed_rows == m_limit)
+      if(m_where_clause && ! where_clause_result && m_is_limit_on && m_processed_rows == m_limit)
       {
           return m_sql_processing_status = Status::LIMIT_REACHED;
       }
@@ -2267,7 +2411,7 @@ public:
 
       if(!multiple_row_processing())
       {
-               found = !m_where_clause || m_where_clause->eval().is_true();    
+               found = !m_where_clause || where_clause_result; 
       }
   
       if(found)
@@ -2291,7 +2435,7 @@ public:
 }; //base_s3object
 
 //TODO config / default-value
-#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (4 * 1024)
+#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
 class csv_object : public base_s3object
 {
 
@@ -2355,6 +2499,7 @@ private:
   std::string m_last_line;
   size_t m_processed_bytes;
   int64_t m_number_of_tokens;
+  size_t m_skip_x_first_bytes=0;
 
   std::function<int(std::string&)> fp_s3select_result_format=nullptr;
   std::function<int(std::string&)> fp_s3select_header_format=nullptr;
@@ -2507,6 +2652,7 @@ private:
       merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
       m_previous_line = false;
       m_skip_first_line = true;
+      m_skip_x_first_bytes = tmp_buff.size()+1;
 
       //processing the merged row (previous broken row)
       run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
@@ -2541,6 +2687,15 @@ public:
     m_is_to_aggregate = do_aggregate;
     m_skip_last_line = skip_last_line;
 
+    if(skip_first_line)
+    {
+      //the stream may start in the middle of a row (maybe in the middle of a quote).
+      //at this point the stream should skip the first row(broken row).
+      //the csv_parser should be init with the fixed stream position. 
+      m_stream += m_skip_x_first_bytes;
+      m_skip_x_first_bytes=0;
+    }
+
     CSVParser _csv_parser("csv", m_stream, m_end_stream);
     csv_parser = &_csv_parser;
     csv_parser->set_csv_def(   m_csv_defintion.row_delimiter, 
@@ -2556,12 +2711,6 @@ public:
     {
       extract_csv_header_info();
     }
-
-    if(skip_first_line)
-    {
-      csv_parser->next_line();
-    }
-
     do
     {
       m_sql_processing_status = Status::INITIAL_STAT;
@@ -2809,6 +2958,7 @@ public:
     if(m_init_json_processor_ind)
            return;
 
+    m_init_json_processor_ind = true;
     std::function<int(void)> f_sql = [this](void){auto res = sql_execution_on_row_cb();return res;};
     std::function<int(s3selectEngine::value&, int)> 
       f_push_to_scratch = [this](s3selectEngine::value& value,int json_var_idx){return push_into_scratch_area_cb(value,json_var_idx);};
@@ -2827,6 +2977,14 @@ public:
     //upon star-operation(in statemenet) the callback pushes the key-path and value into scratch-area
     JsonHandler.set_push_per_star_operation_callback(f_push_key_value_into_scratch_area_per_star_operation);
 
+    //the json-from-clause is unique and should exist. otherwise it's a failure. 
+    if(query->getAction()->json_from_clause.empty())
+    {
+       JsonHandler.m_fatal_initialization_ind = true;
+       JsonHandler.m_fatal_initialization_description = "the SQL statement is not align with the correct syntax of JSON statement. from-clause is missing.";
+       return;
+    }
+
     //setting the from clause path 
     if(query->getAction()->json_from_clause[0] == JSON_ROOT_OBJECT)
     {
@@ -2852,7 +3010,6 @@ public:
     }
 
     m_sa->set_parquet_type();//TODO json type
-    m_init_json_processor_ind = true;
   }
     
   json_object(s3select* query):base_s3object(query),m_processed_bytes(0),m_end_of_stream(false),m_row_count(0),star_operation_ind(false),m_init_json_processor_ind(false)
@@ -2944,6 +3101,11 @@ public:
     m_processed_bytes += stream_length;
     set_sql_result(result);
 
+    if(JsonHandler.is_fatal_initialization())
+    {
+      throw base_s3select_exception(JsonHandler.m_fatal_initialization_description, base_s3select_exception::s3select_exp_en_t::FATAL);
+    }
+
     if(!stream_length || !json_stream)//TODO m_processed_bytes(?)
     {//last processing cycle
       JsonHandler.process_json_buffer(0, 0, true);//TODO end-of-stream = end-of-row