]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/s3select/include/s3select_oper.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / s3select / include / s3select_oper.h
index 591e5f9da245514e92e606038895667343fb4052..0c5181cba31518e751a05acd2aadd2115ac296cd 100644 (file)
 #include <list>
 #include <map>
 #include <vector>
-#include <string.h>
-#include <math.h>
+#include <algorithm>
+#include <cstring>
+#include <cmath>
 
 #include <boost/lexical_cast.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/bind.hpp>
+#include "s3select_parquet_intrf.h" //NOTE: should include first (c++11 std::string_view)
+
 namespace bsc = BOOST_SPIRIT_CLASSIC_NS;
 
 namespace s3selectEngine
 {
 
+//=== stl allocator definition
+//this allocator is fit for placement new (no calls to heap)
+
+class chunkalloc_out_of_mem
+{
+};
+
+template <typename T, size_t pool_sz>
+class ChunkAllocator : public std::allocator<T>
+{
+public:
+  typedef size_t size_type;
+  typedef T* pointer;
+  size_t buffer_capacity;
+  char* buffer_ptr;
+
+  //only ONE pool,not allocated dynamically; main assumption, caller knows in advance its memory limitations.
+  char buffer[pool_sz];
+
+  template <typename _Tp1>
+  struct rebind
+  {
+    typedef ChunkAllocator<_Tp1, pool_sz> other;
+  };
+
+  //==================================
+  inline T* _Allocate(size_t num_of_element, T*)
+  {
+    // allocate storage for _Count elements of type T
+
+    pointer res = (pointer)(buffer_ptr + buffer_capacity);
+
+    buffer_capacity+= sizeof(T) * num_of_element;
+    
+    size_t addr_alignment = (buffer_capacity % sizeof(char*));
+    buffer_capacity += addr_alignment != 0 ? sizeof(char*) - addr_alignment : 0;
+
+    if (buffer_capacity> sizeof(buffer))
+    {
+      throw chunkalloc_out_of_mem();
+    }
+
+    return res;
+  }
+
+  //==================================
+  inline pointer allocate(size_type n,  [[maybe_unused]] const void* hint = 0)
+  {
+    return (_Allocate(n, (pointer)0));
+  }
+
+  //==================================
+  inline void deallocate(pointer p, size_type n)
+  {
+  }
+
+  //==================================
+  ChunkAllocator() noexcept : std::allocator<T>()
+  {
+    // alloc from main-buffer
+    buffer_capacity = 0;
+    memset( &buffer[0], 0, sizeof(buffer));
+    buffer_ptr = &buffer[0];
+  }
+
+  //==================================
+  ChunkAllocator(const ChunkAllocator& other) noexcept : std::allocator<T>(other)
+  {
+    // copy const
+    buffer_capacity = 0;
+    buffer_ptr = &buffer[0];
+  }
+
+  //==================================
+  ~ChunkAllocator() noexcept
+  {
+    //do nothing
+  }
+};
+
+class base_statement;
+//typedef std::vector<base_statement *> bs_stmt_vec_t; //without specific allocator
+
+//ChunkAllocator, prevent allocation from heap.
+typedef std::vector<base_statement*, ChunkAllocator<base_statement*, 256> > bs_stmt_vec_t;
+
 class base_s3select_exception
 {
 
@@ -33,7 +122,7 @@ private:
 
 public:
   std::string _msg;
-  base_s3select_exception(const char* n) : m_severity(s3select_exp_en_t::NONE)
+  explicit base_s3select_exception(const char* n) : m_severity(s3select_exp_en_t::NONE)
   {
     _msg.assign(n);
   }
@@ -56,33 +145,20 @@ public:
     return m_severity;
   }
 
-  virtual ~base_s3select_exception() {}
+  virtual ~base_s3select_exception() = default;
 };
 
 
-// pointer to dynamic allocated buffer , which used for placement new.
-static __thread char* _s3select_buff_ptr =0;
 
 class s3select_allocator //s3select is the "owner"
 {
 private:
 
   std::vector<char*> list_of_buff;
+  std::vector<char*> list_of_ptr;
   u_int32_t m_idx;
 
-public:
-#define __S3_ALLOCATION_BUFF__ (8*1024)
-  s3select_allocator():m_idx(0)
-  {
-    list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__));
-  }
-
-  void set_global_buff()
-  {
-    char* buff = list_of_buff.back();
-    _s3select_buff_ptr = &buff[ m_idx ];
-  }
-
+#define __S3_ALLOCATION_BUFF__ (24*1024)
   void check_capacity(size_t sz)
   {
     if (sz>__S3_ALLOCATION_BUFF__)
@@ -103,10 +179,28 @@ public:
     m_idx += sizeof(char*) - (m_idx % sizeof(char*)); //alignment
   }
 
-  void zero()
+public:
+  s3select_allocator():m_idx(0)
+  {
+    list_of_buff.push_back((char*)malloc(__S3_ALLOCATION_BUFF__));
+  }
+
+  void *alloc(size_t sz)
   {
-    //not a must, its for safty.
-    _s3select_buff_ptr=0;
+    check_capacity(sz);
+
+    char* buff = list_of_buff.back();
+
+    u_int32_t idx = m_idx;
+   
+    inc(sz);
+    return &buff[ idx ];
+  }
+
+  void push_for_delete(void *p)
+  {//in case of using S3SELECT_NO_PLACEMENT_NEW
+    list_of_ptr.push_back((char*)p);
   }
 
   virtual ~s3select_allocator()
@@ -115,99 +209,70 @@ public:
     {
       free(b);
     }
-  }
-};
-
-class __clt_allocator
-{
-public:
-  s3select_allocator* m_s3select_allocator;
-
-public:
-
-  __clt_allocator():m_s3select_allocator(0) {}
 
-  void set(s3select_allocator* a)
-  {
-    m_s3select_allocator = a;
+    for(auto b : list_of_ptr)
+    {//in case of using S3SELECT_NO_PLACEMENT_NEW
+      delete(b);
+    }
   }
 };
 
 // placement new for allocation of all s3select objects on single(or few) buffers, deallocation of those objects is by releasing the buffer.
-#define S3SELECT_NEW( type , ... ) [=]() \
+#define S3SELECT_NEW(self, type , ... ) [=]() \
         {   \
-            m_s3select_allocator->check_capacity(sizeof( type )); \
-            m_s3select_allocator->set_global_buff(); \
-            auto res=new (_s3select_buff_ptr) type(__VA_ARGS__); \
-            m_s3select_allocator->inc(sizeof( type )); \
-            m_s3select_allocator->zero(); \
+            auto res=new (self->getAllocator()->alloc(sizeof(type))) type(__VA_ARGS__); \
             return res; \
         }();
 
-class scratch_area
-{
+// no placement new; actually, its an oridinary new with additional functionality for deleting the AST nodes.
+// (this changes, is for verifying the valgrind report on leak)
+#define S3SELECT_NO_PLACEMENT_NEW(self, type , ... ) [=]() \
+        {   \
+            auto res=new type(__VA_ARGS__); \
+           self->getAllocator()->push_for_delete(res); \
+            return res; \
+        }();
 
-private:
-  std::vector<std::string_view> m_columns{128};
-  int m_upper_bound;
+class s3select_reserved_word
+{
+  public:
 
-  std::vector<std::pair<std::string, int >> m_column_name_pos;
+  enum class reserve_word_en_t
+  {
+    NA,
+    S3S_NULL,//TODO check AWS defintions for reserve words, its a long list , what about functions-names? 
+    S3S_NAN,
+    S3S_TRUE,
+    S3S_FALSE
+  } ;
 
-public:
+  using reserved_words = std::map<std::string,reserve_word_en_t>;
 
-  void set_column_pos(const char* n, int pos)//TODO use std::string
+  const reserved_words m_reserved_words=
   {
-    m_column_name_pos.push_back( std::pair<const char*, int>(n, pos));
-  }
+    {"null",reserve_word_en_t::S3S_NULL},{"NULL",reserve_word_en_t::S3S_NULL},
+    {"nan",reserve_word_en_t::S3S_NAN},{"NaN",reserve_word_en_t::S3S_NAN},
+    {"true",reserve_word_en_t::S3S_TRUE},{"TRUE",reserve_word_en_t::S3S_TRUE},
+    {"false",reserve_word_en_t::S3S_FALSE},{"FALSE",reserve_word_en_t::S3S_FALSE}
+  };
 
-  void update(std::vector<char*> tokens, size_t num_of_tokens)
+  bool is_reserved_word(std::string & token)
   {
-    size_t i=0;
-    for(auto s : tokens)
-    {
-      if (i>=num_of_tokens)
-      {
-        break;
-      }
-
-      m_columns[i++] = s;
-    }
-    m_upper_bound = i;
-
+    return m_reserved_words.find(token) != m_reserved_words.end() ;
   }
 
-  int get_column_pos(const char* n)
+  reserve_word_en_t get_reserved_word(std::string & token)
   {
-    //done only upon building the AST , not on "runtime"
-
-    std::vector<std::pair<std::string, int >>::iterator iter;
-
-    for( auto iter : m_column_name_pos)
+    if (is_reserved_word(token)==true)
     {
-      if (!strcmp(iter.first.c_str(), n))
-      {
-        return iter.second;
-      }
+      return m_reserved_words.find(token)->second;
     }
-
-    return -1;
-  }
-
-  std::string_view get_column_value(int column_pos)
-  {
-
-    if ((column_pos >= m_upper_bound) || column_pos < 0)
+    else
     {
-      throw base_s3select_exception("column_position_is_wrong", base_s3select_exception::s3select_exp_en_t::ERROR);
+      return reserve_word_en_t::NA;
     }
-
-    return m_columns[column_pos];
   }
 
-  int get_num_of_columns()
-  {
-    return m_upper_bound;
-  }
 };
 
 class base_statement;
@@ -260,7 +325,7 @@ struct binop_plus
 {
   double operator()(double a, double b)
   {
-    return a+b;
+    return a + b;
   }
 };
 
@@ -268,7 +333,7 @@ struct binop_minus
 {
   double operator()(double a, double b)
   {
-    return a-b;
+    return a - b;
   }
 };
 
@@ -278,13 +343,21 @@ struct binop_mult
   {
     return a * b;
   }
-};
+};   
 
 struct binop_div
 {
   double operator()(double a, double b)
   {
-    return a / b;
+    if (b == 0) {
+      if( isnan(a)) {
+        return a;
+      } else {
+        throw base_s3select_exception("division by zero is not allowed");
+      } 
+    } else {
+      return a / b;
+    }
   }
 };
 
@@ -296,6 +369,37 @@ struct binop_pow
   }
 };
 
+struct binop_modulo
+{
+  int64_t operator()(int64_t a, int64_t b)
+  {
+    if (b == 0)
+    {
+      throw base_s3select_exception("Mod zero is not allowed");
+    } else {
+      return a % b;
+    }
+  }
+};
+
+typedef std::tuple<boost::posix_time::ptime, boost::posix_time::time_duration, bool> timestamp_t;
+
+class value;
+class multi_values
+{
+  public:
+  std::vector<value*> values;
+
+  public:
+  void push_value(value* v);
+
+  void clear()
+  {
+    values.clear();
+  }
+
+};
+
 class value
 {
 
@@ -303,15 +407,19 @@ public:
   typedef union
   {
     int64_t num;
-    char* str;//TODO consider string_view
+    char* str;//TODO consider string_view(save copy)
     double dbl;
-    boost::posix_time::ptime* timestamp;
+    timestamp_t* timestamp;
   } value_t;
 
+  multi_values multiple_values;
+
 private:
   value_t __val;
-  std::string m_to_string;
-  std::string m_str_value;
+  //std::string m_to_string;
+  std::basic_string<char,std::char_traits<char>,ChunkAllocator<char,256>> m_to_string;
+  //std::string m_str_value;
+  std::basic_string<char,std::char_traits<char>,ChunkAllocator<char,256>> m_str_value;
 
 public:
   enum class value_En_t
@@ -320,37 +428,46 @@ public:
     FLOAT,
     STRING,
     TIMESTAMP,
+    S3NULL,
+    S3NAN,
+    BOOL,
+    MULTIPLE_VALUES,
     NA
   } ;
   value_En_t type;
 
-  value(int64_t n) : type(value_En_t::DECIMAL)
+  explicit value(int64_t n) : type(value_En_t::DECIMAL)
   {
     __val.num = n;
   }
-  value(int n) : type(value_En_t::DECIMAL)
+  explicit value(int n) : type(value_En_t::DECIMAL)
   {
     __val.num = n;
   }
-  value(bool b) : type(value_En_t::DECIMAL)
+  explicit value(bool b) : type(value_En_t::DECIMAL)
   {
     __val.num = (int64_t)b;
   }
-  value(double d) : type(value_En_t::FLOAT)
+  explicit value(double d) : type(value_En_t::FLOAT)
   {
     __val.dbl = d;
   }
-  value(boost::posix_time::ptime* timestamp) : type(value_En_t::TIMESTAMP)
+  explicit value(timestamp_t* timestamp) : type(value_En_t::TIMESTAMP)
   {
     __val.timestamp = timestamp;
   }
 
-  value(const char* s) : type(value_En_t::STRING)
+  explicit value(const char* s) : type(value_En_t::STRING)
   {
     m_str_value.assign(s);
     __val.str = m_str_value.data();
   }
 
+  ~value()
+  {//TODO should be a part of the cleanup routine(__function::push_for_cleanup)
+    multiple_values.values.clear();
+  }
+
   value():type(value_En_t::NA)
   {
     __val.num=0;
@@ -375,8 +492,53 @@ public:
     return type == value_En_t::TIMESTAMP;
   }
 
+  bool is_bool() const
+  {
+    return type == value_En_t::BOOL;
+  }
+
+  bool is_null() const
+  {
+    return type == value_En_t::S3NULL;
+  }
+
+  bool is_nan() const
+  {
+    if (type == value_En_t::FLOAT) {
+      return std::isnan(this->__val.dbl);
+    }
+    return type == value_En_t::S3NAN; 
+  }
+
+  bool is_true()
+  {
+    return (i64()!=0 && !is_null());
+  }
+
+  void set_nan() 
+  {
+    __val.dbl = NAN;
+    type = value_En_t::FLOAT;
+  }
+
+  void set_true() 
+  {
+    __val.num = 1;
+    type = value_En_t::BOOL;
+  }
+
+  void set_false() 
+  {
+    __val.num = 0;
+    type = value_En_t::BOOL;
+  }
+
+  void setnull()
+  {
+    type = value_En_t::S3NULL;
+  }
 
-  std::string& to_string()  //TODO very intensive , must improve this
+  const char* to_string()  //TODO very intensive , must improve this
   {
 
     if (type != value_En_t::STRING)
@@ -385,13 +547,51 @@ public:
       {
         m_to_string.assign( boost::lexical_cast<std::string>(__val.num) );
       }
+      if (type == value_En_t::BOOL)
+      {
+        if(__val.num == 0)
+        {
+          m_to_string.assign("false");
+        }
+        else
+        {
+          m_to_string.assign("true");
+        }
+      }
       else if(type == value_En_t::FLOAT)
       {
         m_to_string = boost::lexical_cast<std::string>(__val.dbl);
       }
-      else
+      else if (type == value_En_t::TIMESTAMP)
+      {
+        boost::posix_time::ptime new_ptime;
+        boost::posix_time::time_duration td;
+        bool flag;
+
+       std::tie(new_ptime, td, flag) = *__val.timestamp;
+
+       if (flag)
+       {
+          m_to_string =  to_iso_extended_string(new_ptime) + "Z";
+       }
+        else
+       {
+          std::string tz_hour = std::to_string(std::abs(td.hours()));
+          std::string tz_mint = std::to_string(std::abs(td.minutes()));
+         std::string sign;
+          if (td.is_negative())
+            sign = "-";
+         else
+            sign = "+";
+
+          m_to_string =  to_iso_extended_string(new_ptime) + sign +
+                        std::string(2 - tz_hour.length(), '0') +  tz_hour + ":"
+                        + std::string(2 - tz_mint.length(), '0') +  tz_mint;
+       }
+      }
+      else if (type == value_En_t::S3NULL)
       {
-        m_to_string =  to_simple_string( *__val.timestamp );
+        m_to_string.assign("null");
       }
     }
     else
@@ -399,13 +599,13 @@ public:
       m_to_string.assign( __val.str );
     }
 
-    return m_to_string;
+    return  m_to_string.c_str();
   }
 
 
   value& operator=(value& o)
   {
-    if(this->type == value_En_t::STRING)
+    if(o.type == value_En_t::STRING)
     {
       m_str_value.assign(o.str());
       __val.str = m_str_value.data();
@@ -448,12 +648,12 @@ public:
   value& operator=(bool b)
   {
     this->__val.num = (int64_t)b;
-    this->type = value_En_t::DECIMAL;
+    this->type = value_En_t::BOOL;
 
     return *this;
   }
 
-  value& operator=(boost::posix_time::ptime* p)
+  value& operator=(timestamp_t* p)
   {
     this->__val.timestamp = p;
     this->type = value_En_t::TIMESTAMP;
@@ -476,13 +676,13 @@ public:
     return __val.dbl;
   }
 
-  boost::posix_time::ptime* timestamp() const
+  timestamp_t* timestamp() const
   {
     return __val.timestamp;
   }
 
   bool operator<(const value& v)//basic compare operator , most itensive runtime operation
-  {
+  { 
     //TODO NA possible?
     if (is_string() && v.is_string())
     {
@@ -522,6 +722,11 @@ public:
       return *timestamp() < *(v.timestamp());
     }
 
+    if(is_nan() || v.is_nan())
+    {
+      return false;
+    } 
+
     throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
   }
 
@@ -566,6 +771,11 @@ public:
       return *timestamp() > *(v.timestamp());
     }
 
+    if(is_nan() || v.is_nan())
+    {
+      return false;
+    }
+
     throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
   }
 
@@ -611,21 +821,51 @@ public:
       return *timestamp() == *(v.timestamp());
     }
 
+    if(
+    (is_bool() && v.is_bool())
+    ||
+    (is_number() && v.is_bool())
+    ||
+    (is_bool() && v.is_number())
+    )
+    {
+      return __val.num == v.__val.num;
+    }
+
+    if (is_nan() || v.is_nan())
+    {
+      return false;
+    }  
+
     throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
   }
   bool operator<=(const value& v)
-  {
-    return !(*this>v);
-  }
+  { 
+    if (is_nan() || v.is_nan()) {
+      return false;
+    } else { 
+      return !(*this>v);
+    } 
+  }
+  
   bool operator>=(const value& v)
-  {
-    return !(*this<v);
-  }
+  { 
+    if (is_nan() || v.is_nan()) {
+      return false;
+    } else { 
+      return !(*this<v);
+    } 
+  }
+  
   bool operator!=(const value& v)
-  {
-    return !(*this == v);
+  { 
+    if (is_nan() || v.is_nan()) {
+      return true;
+    } else { 
+      return !(*this == v);
+    }
   }
-
+  
   template<typename binop> //conversion rules for arithmetical binary operations
   value& compute(value& l, const value& r) //left should be this, it contain the result
   {
@@ -635,8 +875,14 @@ public:
     {
       throw base_s3select_exception("illegal binary operation with string");
     }
+    if (l.is_bool() || r.is_bool())
+    {
+      throw base_s3select_exception("illegal binary operation with bool type");
+    }
 
-    if (l.type != r.type)
+    if (l.is_number() && r.is_number())
+    {
+      if (l.type != r.type)
     {
       //conversion
 
@@ -666,6 +912,14 @@ public:
         l.type = value_En_t::FLOAT;
       }
     }
+  }
+    
+    if (l.is_null() || r.is_null()) 
+    {
+      l.setnull();
+    } else if(l.is_nan() || r.is_nan()) {
+      l.set_nan();
+    }
 
     return l;
   }
@@ -675,6 +929,12 @@ public:
     return compute<binop_plus>(*this, v);
   }
 
+  value operator++(int)
+  {
+    *this = *this + static_cast<value>(1);
+    return *this;
+  }
+    
   value& operator-(const value& v)
   {
     return compute<binop_minus>(*this, v);
@@ -684,74 +944,336 @@ public:
   {
     return compute<binop_mult>(*this, v);
   }
-
-  value& operator/(const value& v)  // TODO  handle division by zero
+  
+  value& operator/(value& v)
   {
-    return compute<binop_div>(*this, v);
+    if (v.is_null() || this->is_null()) {
+      v.setnull();
+      return v;
+    } else {
+      return compute<binop_div>(*this, v);
+    }
   }
-
+  
   value& operator^(const value& v)
   {
     return compute<binop_pow>(*this, v);
   }
 
+  value & operator%(const value &v)
+  {
+    if(v.type == value_En_t::DECIMAL) {
+      return compute<binop_modulo>(*this,v);
+    } else {
+      throw base_s3select_exception("wrong use of modulo operation!");
+    }
+  }
 };
 
-class base_statement
+void multi_values::push_value(value *v)
 {
+  //v could be single or multiple values
+  if (v->type == value::value_En_t::MULTIPLE_VALUES)
+  {
+    for (auto sv : v->multiple_values.values)
+    {
+      values.push_back(sv);
+    }
+  }
+  else
+  {
+    values.push_back(v);
+  }
+}
 
-protected:
 
-  scratch_area* m_scratch;
-  projection_alias* m_aliases;
-  bool is_last_call; //valid only for aggregation functions
-  bool m_is_cache_result;
-  value m_alias_result;
-  base_statement* m_projection_alias;
-  int m_eval_stack_depth;
+class scratch_area
+{
+
+private:
+  std::vector<std::string_view> m_columns{128};//TODO not correct
+  std::vector<value> *m_schema_values; //values got a type
+  int m_upper_bound;
 
+  std::vector<std::pair<std::string, int >> m_column_name_pos;
+  bool parquet_type;
+  char str_buff[4096];
+  uint16_t buff_loc;
 public:
-  base_statement():m_scratch(0), is_last_call(false), m_is_cache_result(false), m_projection_alias(0), m_eval_stack_depth(0) {}
-  virtual value& eval() =0;
-  virtual base_statement* left()
-  {
-    return 0;
-  }
-  virtual base_statement* right()
+
+  scratch_area():m_upper_bound(-1),parquet_type(false),buff_loc(0)
   {
-    return 0;
+    m_schema_values = new std::vector<value>(128,value(""));
   }
-  virtual std::string print(int ident) =0;//TODO complete it, one option to use level parametr in interface ,
-  virtual bool semantic() =0;//done once , post syntax , traverse all nodes and validate semantics.
 
-  virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa)
+  ~scratch_area()
   {
-    m_scratch = sa;
-    m_aliases = pa;
-    if (left())
-    {
-      left()->traverse_and_apply(m_scratch, m_aliases);
-    }
-    if (right())
-    {
-      right()->traverse_and_apply(m_scratch, m_aliases);
-    }
+    delete m_schema_values;
   }
 
-  virtual bool is_aggregate()
+  void set_column_pos(const char* n, int pos)//TODO use std::string
   {
-    return false;
+    m_column_name_pos.push_back( std::pair<const char*, int>(n, pos));
   }
-  virtual bool is_column()
+
+  void update(std::vector<char*>& tokens, size_t num_of_tokens)
   {
-    return false;
+    size_t i=0;
+    for(auto s : tokens)
+    {
+      if (i>=num_of_tokens)
+      {
+        break;
+      }
+
+      m_columns[i++] = s;//TODO no need for copy, could use reference to tokens
+    }
+    m_upper_bound = i;
+
+  }
+
+  int get_column_pos(const char* n)
+  {
+    //done only upon building the AST, not on "runtime"
+
+    for( auto iter : m_column_name_pos)
+    {
+      if (!strcmp(iter.first.c_str(), n))
+      {
+        return iter.second;
+      }
+    }
+
+    return -1;
+  }
+
+  void set_parquet_type()
+  {
+    parquet_type = true;
+  }
+
+  void get_column_value(uint16_t column_pos, value &v)
+  {
+
+    if (parquet_type == false)
+    {
+      if (column_pos >= m_upper_bound) //|| column_pos < 0)
+      {
+        throw base_s3select_exception("column_position_is_wrong", base_s3select_exception::s3select_exp_en_t::ERROR);
+      }
+      
+      v = m_columns[ column_pos ].data();
+    }
+    else
+    {
+      v = (*m_schema_values)[ column_pos ];
+    }
+    
+  }
+
+
+  std::string_view get_column_value(int column_pos)//TODO reuse it
+  {
+    if ((column_pos >= m_upper_bound) || column_pos < 0)
+    {
+      throw base_s3select_exception("column_position_is_wrong", base_s3select_exception::s3select_exp_en_t::ERROR);
+    }
+
+     if (parquet_type == false)
+     {
+      return m_columns[column_pos];
+     }
+     else
+     {
+       return  (*m_schema_values)[ column_pos ].to_string();
+     }
+  }
+
+
+  int get_num_of_columns()
+  {
+    return m_upper_bound;
+  }
+
+  void init_string_buff() //TODO temporary
+  {
+    buff_loc=0;
+  }
+
+#ifdef _ARROW_EXIST
+  int update(std::vector<parquet_file_parser::parquet_value_t> &parquet_row_value, parquet_file_parser::column_pos_t &column_positions)
+  {
+    //TODO no need for copy , possible to save referece (its save last row for calculation)
+
+    parquet_file_parser::column_pos_t::iterator column_pos_iter = column_positions.begin();
+    m_upper_bound =0;
+    buff_loc=0;
+
+    for(auto v : parquet_row_value)
+    {
+      //TODO (parquet_value_t) --> (value) , or better get it as value (i.e. parquet reader know class-value)
+      //TODO temporary 
+      switch( v.type )
+      {
+        case  parquet_file_parser::parquet_type::INT32:
+              //TODO waste of CPU
+              (*m_schema_values)[ *column_pos_iter ] = value( v.num ).i64();
+              break;
+
+        case  parquet_file_parser::parquet_type::INT64:
+              //TODO waste of CPU
+              (*m_schema_values)[ *column_pos_iter ] = value( v.num ).i64();
+              break;
+
+        case  parquet_file_parser::parquet_type::DOUBLE:
+              //TODO waste of CPU
+              (*m_schema_values)[ *column_pos_iter ] = value( v.dbl ).dbl();
+              break;
+
+        case  parquet_file_parser::parquet_type::STRING:
+              //TODO waste of CPU
+              //TODO value need to present string with char* and length
+
+              memcpy(str_buff+buff_loc, v.str, v.str_len);
+              str_buff[buff_loc+v.str_len] = 0;
+              (*m_schema_values)[ *column_pos_iter ] = str_buff+buff_loc;
+              buff_loc += v.str_len+1;
+              break;
+
+        case  parquet_file_parser::parquet_type::PARQUET_NULL:
+             
+              (*m_schema_values)[ *column_pos_iter ].setnull();
+             break;
+
+        default:
+        return -1;
+      }
+      m_upper_bound++;
+      column_pos_iter ++;
+    }
+    return 0;
   }
+#endif // _ARROW_EXIST
+
+};
 
-  bool is_function();
-  bool is_aggregate_exist_in_expression(base_statement* e);//TODO obsolete ?
-  base_statement* get_aggregate();
-  bool is_nested_aggregate(base_statement* e);
-  bool is_binop_aggregate_and_column(base_statement* skip);
+class base_statement
+{
+
+protected:
+
+  scratch_area* m_scratch;
+  projection_alias* m_aliases;
+  bool is_last_call; //valid only for aggregation functions
+  bool m_is_cache_result;
+  value m_alias_result;
+  base_statement* m_projection_alias;
+  int m_eval_stack_depth;
+  bool m_skip_non_aggregate_op;
+  value value_na;
+
+public:
+  base_statement():m_scratch(nullptr), is_last_call(false), m_is_cache_result(false),
+  m_projection_alias(nullptr), m_eval_stack_depth(0), m_skip_non_aggregate_op(false) {}
+
+  virtual value& eval()
+  {
+    //purpose: on aggregation flow to run only the correct subtree(aggregation subtree)
+     
+    if (m_skip_non_aggregate_op == false)
+      return eval_internal();//not skipping this node.
+    else
+    {
+    //skipping this node.
+    //in case execution should skip a node, it will traverse (left and right) 
+    //and search for subtree to execute.   
+    //example: sum( ... ) - sum( ... ) ; the minus operand is skipped while sum() operand is not.
+    if(left())
+      left()->eval_internal();
+    
+    if(right())
+      right()->eval_internal();
+    
+    }
+
+    return value_na;
+  }
+
+  virtual value& eval_internal() = 0;
+  
+  bool parquet_type; //TODO enum switch (csv,json,parquet,other)
+
+public:
+  virtual base_statement* left() const
+  {
+    return 0;
+  }
+  virtual base_statement* right() const
+  {
+    return 0;
+  }
+  virtual std::string print(int ident) =0;//TODO complete it, one option to use level parametr in interface ,
+  virtual bool semantic() =0;//done once , post syntax , traverse all nodes and validate semantics.
+
+  virtual void traverse_and_apply(scratch_area* sa, projection_alias* pa)
+  {
+    m_scratch = sa;
+    m_aliases = pa;
+    if (left())
+    {
+      left()->traverse_and_apply(m_scratch, m_aliases);
+    }
+    if (right())
+    {
+      right()->traverse_and_apply(m_scratch, m_aliases);
+    }
+  }
+
+  virtual void set_skip_non_aggregate(bool skip_non_aggregate_op)
+  {
+    m_skip_non_aggregate_op = skip_non_aggregate_op;
+
+    if (left())
+    {
+      left()->set_skip_non_aggregate(m_skip_non_aggregate_op);
+    }
+    if (right())
+    {
+      right()->set_skip_non_aggregate(m_skip_non_aggregate_op);
+    }
+  }
+
+  virtual bool is_aggregate() const
+  {
+    return false;
+  }
+
+  virtual bool is_column() const
+  {
+    return false;
+  }
+
+  virtual void resolve_node()
+  {//part of semantic analysis(TODO maybe semantic method should handle this)
+    if (left())
+    {
+      left()->resolve_node();
+    }
+    if (right())
+    {
+      right()->resolve_node();
+    }
+  }
+
+  bool is_function() const;
+  const base_statement* get_aggregate() const;
+  bool is_nested_aggregate(bool&) const;
+  bool is_column_reference() const;
+  bool mark_aggreagtion_subtree_to_execute();
+
+#ifdef _ARROW_EXIST
+  void extract_columns(parquet_file_parser::column_pos_t &cols,const uint16_t max_columns);
+#endif  
 
   virtual void set_last_call()
   {
@@ -805,6 +1327,21 @@ public:
 
   virtual ~base_statement() {}
 
+  void dtor()
+  {
+    this->~base_statement();
+  }
+
+  scratch_area* getScratchArea()
+  {
+    return m_scratch;
+  }
+
+  projection_alias* getAlias()
+  {
+    return m_aliases;
+  }
+
 };
 
 class variable : public base_statement
@@ -828,7 +1365,8 @@ private:
   int column_pos;
   value var_value;
   std::string m_star_op_result;
-  char m_star_op_result_charc[4096]; //TODO should be dynamic
+  char m_star_op_result_charc[4096]; //TODO cause larger allocations for other objects containing variable (dynamic is one solution)
+  value star_operation_values[16];//TODO cause larger allocations for other objects containing variable (dynamic is one solution)
 
   const int undefined_column_pos = -1;
   const int column_alias = -2;
@@ -836,13 +1374,13 @@ private:
 public:
   variable():m_var_type(var_t::NA), _name(""), column_pos(-1) {}
 
-  variable(int64_t i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {}
+  explicit variable(int64_t i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {}
 
-  variable(double d) : m_var_type(var_t::COL_VALUE), _name("#"), column_pos(-1), var_value(d) {}
+  explicit variable(double d) : m_var_type(var_t::COL_VALUE), _name("#"), column_pos(-1), var_value(d) {}
 
-  variable(int i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {}
+  explicit variable(int i) : m_var_type(var_t::COL_VALUE), column_pos(-1), var_value(i) {}
 
-  variable(const std::string& n) : m_var_type(var_t::VAR), _name(n), column_pos(-1) {}
+  explicit variable(const std::string& n) : m_var_type(var_t::VAR), _name(n), column_pos(-1) {}
 
   variable(const std::string& n,  var_t tp) : m_var_type(var_t::NA)
   {
@@ -868,6 +1406,40 @@ public:
     }
   }
 
+  explicit variable(s3select_reserved_word::reserve_word_en_t reserve_word)
+  {
+    if (reserve_word == s3select_reserved_word::reserve_word_en_t::S3S_NULL)
+    {
+      m_var_type = variable::var_t::COL_VALUE;
+      column_pos = undefined_column_pos;
+      var_value.type = value::value_En_t::S3NULL;//TODO use set_null
+    }
+    else if (reserve_word == s3select_reserved_word::reserve_word_en_t::S3S_NAN)
+    {
+      m_var_type = variable::var_t::COL_VALUE;
+      column_pos = undefined_column_pos;
+      var_value.set_nan();
+    }
+    else if (reserve_word == s3select_reserved_word::reserve_word_en_t::S3S_TRUE)
+    {
+      m_var_type = variable::var_t::COL_VALUE;
+      column_pos = -1;
+      var_value.set_true();
+    }
+    else if (reserve_word == s3select_reserved_word::reserve_word_en_t::S3S_FALSE)
+    {
+      m_var_type = variable::var_t::COL_VALUE;
+      column_pos = -1;
+      var_value.set_false();
+    }
+    else 
+    {
+      _name = "#";
+      m_var_type = var_t::NA;
+      column_pos = undefined_column_pos;
+    }
+  }
+
   void operator=(value& v)
   {
     var_value = v;
@@ -888,16 +1460,27 @@ public:
     var_value = i;
   }
 
-  void set_value(boost::posix_time::ptime* p)
+  void set_value(timestamp_t* p)
   {
     var_value = p;
   }
 
+  void set_value(bool b)
+  {
+       var_value = b;
+    var_value.type = value::value_En_t::BOOL;
+  }
+
+  void set_null()
+  {
+    var_value.setnull();
+  }
+
   virtual ~variable() {}
 
-  virtual bool is_column()  //is reference to column.
+  virtual bool is_column() const //is reference to column.
   {
-    if(m_var_type == var_t::VAR || m_var_type == var_t::POS)
+    if(m_var_type == var_t::VAR || m_var_type == var_t::POS || m_var_type == var_t::STAR_OPERATION)
     {
       return true;
     }
@@ -908,6 +1491,17 @@ public:
   {
     return var_value; //TODO is it correct
   }
+
+  std::string get_name()
+  {
+    return _name;
+  }
+
+  int get_column_pos()
+  {
+    return column_pos;
+  }
+
   virtual value::value_En_t get_value_type()
   {
     return var_value.type;
@@ -917,11 +1511,17 @@ public:
   value& star_operation()   //purpose return content of all columns in a input stream
   {
 
-
-    int i;
+    
     size_t pos=0;
-    int num_of_columns = m_scratch->get_num_of_columns();
-    for(i=0; i<num_of_columns-1; i++)
+    size_t num_of_columns = m_scratch->get_num_of_columns();
+    var_value.multiple_values.clear(); //TODO var_value.clear()??
+
+    if(sizeof(star_operation_values)/sizeof(value) < num_of_columns)
+    {
+        throw base_s3select_exception(std::string("not enough memory for star-operation"), base_s3select_exception::s3select_exp_en_t::FATAL);
+    }
+
+    for(size_t i=0; i<num_of_columns; i++)
     {
       size_t len = m_scratch->get_column_value(i).size();
       if((pos+len)>sizeof(m_star_op_result_charc))
@@ -929,26 +1529,23 @@ public:
         throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL);
       }
 
-      memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len);
+      memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len);//TODO using string_view will avoid copy
+      m_star_op_result_charc[ pos + len ] = 0;
+
+      star_operation_values[i] = &m_star_op_result_charc[pos];//set string value
+      var_value.multiple_values.push_value( &star_operation_values[i] );
+
       pos += len;
-      m_star_op_result_charc[ pos ] = ',';//TODO need for another abstraction (per file type)
       pos ++;
 
     }
 
-    size_t len = m_scratch->get_column_value(i).size();
-    if((pos+len)>sizeof(m_star_op_result_charc))
-    {
-      throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL);
-    }
+    var_value.type = value::value_En_t::MULTIPLE_VALUES;
 
-    memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len);
-    m_star_op_result_charc[ pos + len ] = 0;
-    var_value = (char*)&m_star_op_result_charc[0];
     return var_value;
   }
 
-  virtual value& eval()
+  virtual value& eval_internal()
   {
     if (m_var_type == var_t::COL_VALUE)
     {
@@ -1005,7 +1602,10 @@ public:
     }
     else
     {
-      var_value = (char*)m_scratch->get_column_value(column_pos).data();  //no allocation. returning pointer of allocated space
+      m_scratch->get_column_value(column_pos,var_value);
+      //in the case of successive column-delimiter {1,some_data,,3}=> third column is NULL 
+      if (var_value.is_string() && (var_value.str()== 0 || (var_value.str() && *var_value.str()==0)))
+          var_value.setnull();
     }
 
     return var_value;
@@ -1038,7 +1638,8 @@ private:
 
   cmp_t _cmp;
   value var_value;
-
+  bool negation_result;//false: dont negate ; upon NOT operator(unary) its true
+  
 public:
 
   virtual bool semantic()
@@ -1046,11 +1647,11 @@ public:
     return true;
   }
 
-  virtual base_statement* left()
+  base_statement* left() const override
   {
     return l;
   }
-  virtual base_statement* right()
+  base_statement* right() const override
   {
     return r;
   }
@@ -1062,33 +1663,40 @@ public:
     return std::string("#");//TBD
   }
 
-  virtual value& eval()
+  virtual value& eval_internal()
   {
-
+    if ((l->eval()).is_null()) {
+        var_value.setnull();
+        return var_value;
+      } else if((r->eval()).is_null()) {
+        var_value.setnull();
+        return var_value;
+      }
+    
     switch (_cmp)
     {
     case cmp_t::EQ:
-      return var_value = (l->eval() == r->eval());
+      return var_value =  bool( (l->eval() == r->eval()) ^ negation_result );
       break;
 
     case cmp_t::LE:
-      return var_value = (l->eval() <= r->eval());
+      return var_value = bool( (l->eval() <= r->eval()) ^ negation_result );
       break;
 
     case cmp_t::GE:
-      return var_value = (l->eval() >= r->eval());
+      return var_value = bool( (l->eval() >= r->eval()) ^ negation_result );
       break;
 
     case cmp_t::NE:
-      return var_value = (l->eval() != r->eval());
+      return var_value = bool( (l->eval() != r->eval()) ^ negation_result );
       break;
 
     case cmp_t::GT:
-      return var_value = (l->eval() > r->eval());
+      return var_value = bool( (l->eval() > r->eval()) ^ negation_result );
       break;
 
     case cmp_t::LT:
-      return var_value = (l->eval() < r->eval());
+      return var_value = bool( (l->eval() < r->eval()) ^ negation_result );
       break;
 
     default:
@@ -1097,7 +1705,16 @@ public:
     }
   }
 
-  arithmetic_operand(base_statement* _l, cmp_t c, base_statement* _r):l(_l), r(_r), _cmp(c) {}
+  arithmetic_operand(base_statement* _l, cmp_t c, base_statement* _r):l(_l), r(_r), _cmp(c),negation_result(false) {}
+  
+  explicit arithmetic_operand(base_statement* p)//NOT operator 
+  {
+    l = dynamic_cast<arithmetic_operand*>(p)->l;
+    r = dynamic_cast<arithmetic_operand*>(p)->r;
+    _cmp = dynamic_cast<arithmetic_operand*>(p)->_cmp;
+    // not( not ( logical expression )) == ( logical expression ); there is no limitation for number of NOT.
+    negation_result = ! dynamic_cast<arithmetic_operand*>(p)->negation_result;
+  }
 
   virtual ~arithmetic_operand() {}
 };
@@ -1115,14 +1732,15 @@ private:
 
   oplog_t _oplog;
   value var_value;
+  bool negation_result;//false: dont negate ; upon NOT operator(unary) its true
 
 public:
 
-  virtual base_statement* left()
+  base_statement* left() const override
   {
     return l;
   }
-  virtual base_statement* right()
+  base_statement* right() const override
   {
     return r;
   }
@@ -1132,7 +1750,16 @@ public:
     return true;
   }
 
-  logical_operand(base_statement* _l, oplog_t _o, base_statement* _r):l(_l), r(_r), _oplog(_o) {}
+  logical_operand(base_statement* _l, oplog_t _o, base_statement* _r):l(_l), r(_r), _oplog(_o),negation_result(false) {}
+
+  explicit logical_operand(base_statement * p)//NOT operator
+  {
+    l = dynamic_cast<logical_operand*>(p)->l;
+    r = dynamic_cast<logical_operand*>(p)->r;
+    _oplog = dynamic_cast<logical_operand*>(p)->_oplog;
+    // not( not ( logical expression )) == ( logical expression ); there is no limitation for number of NOT.
+    negation_result = ! dynamic_cast<logical_operand*>(p)->negation_result; 
+  }
 
   virtual ~logical_operand() {}
 
@@ -1142,26 +1769,54 @@ public:
     //return out;
     return std::string("#");//TBD
   }
-  virtual value& eval()
+  virtual value& eval_internal()
   {
+    if (!l || !r)
+    {
+      throw base_s3select_exception("missing operand for logical ", base_s3select_exception::s3select_exp_en_t::FATAL);
+    }
+    value a = l->eval();
     if (_oplog == oplog_t::AND)
     {
-      if (!l || !r)
-      {
-        throw base_s3select_exception("missing operand for logical and", base_s3select_exception::s3select_exp_en_t::FATAL);
-      }
-      return var_value =  (l->eval().i64() && r->eval().i64());
+      if (!a.is_null() && a.i64() == false) {
+        bool res = false ^ negation_result;
+        return var_value = res;
+      } 
+      value b = r->eval();
+      if(!b.is_null() && b.i64() == false) {
+        bool res = false ^ negation_result;
+        return var_value = res;
+      } else {
+        if (a.is_null() || b.is_null()) {
+          var_value.setnull();
+          return var_value;
+        } else {
+          bool res =  true ^ negation_result ;
+          return var_value =res;
+        }
+      }   
     }
     else
     {
-      if (!l || !r)
-      {
-        throw base_s3select_exception("missing operand for logical or", base_s3select_exception::s3select_exp_en_t::FATAL);
+      if (a.is_true()) {
+        bool res = true ^ negation_result;
+        return var_value = res;
+      } 
+      value b = r->eval();
+      if(b.is_true() == true) {
+        bool res = true ^ negation_result;
+        return var_value = res;
+      } else {
+        if (a.is_null() || b.is_null()) {
+          var_value.setnull();
+          return var_value;
+        } else {
+          bool res =  false ^ negation_result ;
+          return var_value =res;
+        }
       }
-      return var_value =  (l->eval().i64() || r->eval().i64());
     }
   }
-
 };
 
 class mulldiv_operation : public base_statement
@@ -1169,7 +1824,7 @@ class mulldiv_operation : public base_statement
 
 public:
 
-  enum class muldiv_t {NA, MULL, DIV, POW} ;
+  enum class muldiv_t {NA, MULL, DIV, POW, MOD} ;
 
 private:
   base_statement* l;
@@ -1177,14 +1832,15 @@ private:
 
   muldiv_t _mulldiv;
   value var_value;
+  value tmp_value;
 
 public:
 
-  virtual base_statement* left()
+  base_statement* left() const override
   {
     return l;
   }
-  virtual base_statement* right()
+  base_statement* right() const override
   {
     return r;
   }
@@ -1201,20 +1857,28 @@ public:
     return std::string("#");//TBD
   }
 
-  virtual value& eval()
+  virtual value& eval_internal()
   {
     switch (_mulldiv)
     {
     case muldiv_t::MULL:
-      return var_value = l->eval() * r->eval();
+      tmp_value = l->eval();
+      return var_value = tmp_value * r->eval();
       break;
 
     case muldiv_t::DIV:
-      return var_value = l->eval() / r->eval();
+      tmp_value = l->eval();
+      return var_value = tmp_value / r->eval();
       break;
 
     case muldiv_t::POW:
-      return var_value = l->eval() ^ r->eval();
+      tmp_value = l->eval();
+      return var_value = tmp_value ^ r->eval();
+      break;
+
+    case muldiv_t::MOD:
+      tmp_value = l->eval();
+      return var_value = tmp_value % r->eval();
       break;
 
     default:
@@ -1241,14 +1905,15 @@ private:
 
   addsub_op_t _op;
   value var_value;
+  value tmp_value;
 
 public:
 
-  virtual base_statement* left()
+  base_statement* left() const override
   {
     return l;
   }
-  virtual base_statement* right()
+  base_statement* right() const override
   {
     return r;
   }
@@ -1268,7 +1933,7 @@ public:
     return std::string("#");//TBD
   }
 
-  virtual value& eval()
+  virtual value& eval_internal()
   {
     if (_op == addsub_op_t::NA) // -num , +num , unary-operation on number
     {
@@ -1282,18 +1947,67 @@ public:
       }
     }
     else if (_op == addsub_op_t::ADD)
-    {
-      return var_value = (l->eval() + r->eval());
+    {tmp_value=l->eval();
+      return var_value = (tmp_value + r->eval());
     }
     else
-    {
-      return var_value = (l->eval() - r->eval());
+    {tmp_value=l->eval();
+      return var_value = (tmp_value - r->eval());
     }
 
     return var_value;
   }
 };
 
+class negate_function_operation : public base_statement
+{
+  //purpose: some functions (between,like,in) are participating in where-clause as predicates; thus NOT unary-operator may operate on them.
+
+  private:
+  
+  base_statement* function_to_negate;
+  value res;
+  
+  public:
+
+  explicit negate_function_operation(base_statement *f):function_to_negate(f){}
+
+  virtual std::string print(int ident)
+  {
+    return std::string("#");//TBD
+  }
+
+  virtual bool semantic()
+  {
+    return true;
+  }
+
+  base_statement* left() const override
+  {
+    return function_to_negate;
+  }
+
+  virtual value& eval_internal()
+  {
+    res = function_to_negate->eval();
+
+    if (res.is_number() || res.is_bool())//TODO is integer type
+    {
+      if (res.is_true())
+      {
+        res = (bool)0;
+      }
+      else
+      {
+        res = (bool)1;
+      }
+    }
+
+    return res;
+  }
+
+};
+
 class base_function
 {
 
@@ -1303,17 +2017,664 @@ protected:
 public:
   //TODO add semantic to base-function , it operate once on function creation
   // validate semantic on creation instead on run-time
-  virtual bool operator()(std::vector<base_statement*>* args, variable* result) = 0;
+  virtual bool operator()(bs_stmt_vec_t* args, variable* result) = 0;
   base_function() : aggregate(false) {}
-  bool is_aggregate()
+  bool is_aggregate() const
   {
     return aggregate == true;
   }
   virtual void get_aggregate_result(variable*) {}
 
-  virtual ~base_function() {}
+  virtual ~base_function() = default;
+  
+  virtual void dtor()
+  {//release function-body implementation 
+    this->~base_function();
+  }
+
 };
 
+class base_date_extract : public base_function
+{
+  protected:
+    value val_timestamp;
+    boost::posix_time::ptime new_ptime;
+    boost::posix_time::time_duration td;
+    bool flag;
+
+  public:
+    void param_validation(bs_stmt_vec_t*& args)
+    {
+      auto iter = args->begin();
+      int args_size = args->size();
+
+      if (args_size < 1)
+      {
+        throw base_s3select_exception("to_timestamp should have 2 parameters");
+      }
+
+      base_statement* ts = *iter;
+      val_timestamp = ts->eval();
+      if(val_timestamp.is_timestamp()== false)
+      {
+        throw base_s3select_exception("second parameter is not timestamp");
+      }
+
+      std::tie(new_ptime, td, flag) = *val_timestamp.timestamp();
+    }
+
+};
+
+class base_date_diff : public base_function
+{
+  protected:
+    boost::posix_time::ptime ptime1;
+    boost::posix_time::ptime ptime2;
+
+  public:
+    void param_validation(bs_stmt_vec_t*& args)
+    {
+      auto iter = args->begin();
+      int args_size = args->size();
+
+      if (args_size < 2)
+      {
+        throw base_s3select_exception("datediff need 3 parameters");
+      }
+
+      base_statement* dt1_param = *iter;
+      value val_ts1 = dt1_param->eval();
+
+      if (val_ts1.is_timestamp() == false)
+      {
+        throw base_s3select_exception("second parameter should be timestamp");
+      }
+
+      iter++;
+      base_statement* dt2_param = *iter;
+      value val_ts2 = dt2_param->eval();
+
+      if (val_ts2.is_timestamp() == false)
+      {
+        throw base_s3select_exception("third parameter should be timestamp");
+      }
+
+      boost::posix_time::ptime ts1_ptime;
+      boost::posix_time::time_duration ts1_td;
+      boost::posix_time::ptime ts2_ptime;
+      boost::posix_time::time_duration ts2_td;
+
+      std::tie(ts1_ptime, ts1_td, std::ignore) = *val_ts1.timestamp();
+      std::tie(ts2_ptime, ts2_td, std::ignore) = *val_ts2.timestamp();
+
+      ptime1 = ts1_ptime + boost::posix_time::hours(ts1_td.hours() * -1);
+      ptime1 += boost::posix_time::minutes(ts1_td.minutes() * -1);
+      ptime2 = ts2_ptime + boost::posix_time::hours(ts2_td.hours() * -1);
+      ptime2 += boost::posix_time::minutes(ts2_td.minutes() * -1);
+    }
+
+};
+
+class base_date_add : public base_function
+{
+  protected:
+    value val_quantity;
+    boost::posix_time::ptime new_ptime;
+    boost::posix_time::time_duration td;
+    bool flag;
+    timestamp_t new_tmstmp;
+
+  public:
+    void param_validation(bs_stmt_vec_t*& args)
+    {
+      auto iter = args->begin();
+      int args_size = args->size();
+
+      if (args_size < 2)
+      {
+        throw base_s3select_exception("add_to_timestamp should have 3 parameters");
+      }
+
+      base_statement* quan = *iter;
+      val_quantity = quan->eval();
+
+      if (val_quantity.is_number() == false)
+      {
+        throw base_s3select_exception("second parameter should be number");  //TODO what about double?
+      }
+
+      iter++;
+      base_statement* ts = *iter;
+      value val_ts = ts->eval();
+
+      if(val_ts.is_timestamp() == false)
+      {
+        throw base_s3select_exception("third parameter should be time-stamp");
+      }
+
+      std::tie(new_ptime, td, flag) = *val_ts.timestamp();
+    }
+
+};
+
+class base_time_to_string
+{
+  protected:
+    std::vector<std::string> months = { "January", "February", "March","April",
+                        "May", "June", "July", "August", "September",
+                        "October", "November", "December"};
+  public:
+    virtual std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param) = 0;
+    virtual ~base_time_to_string() = default;
+};
+
+class derive_yyyy : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t yr = new_ptime.date().year();
+      return std::string(param - 4, '0') + std::to_string(yr);
+    }
+} yyyy_to_string;
+
+class derive_yy : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t yr = new_ptime.date().year();
+      return std::string(2 - std::to_string(yr%100).length(), '0') + std::to_string(yr%100);
+    }
+} yy_to_string;
+
+class derive_y : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t yr = new_ptime.date().year();
+      return std::to_string(yr);
+    }
+} y_to_string;
+
+class derive_mmmmm_month : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t mnth = new_ptime.date().month();
+      return (months[mnth - 1]).substr(0, 1);
+    }
+} mmmmm_month_to_string;
+
+class derive_mmmm_month : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t mnth = new_ptime.date().month();
+      return months[mnth - 1];
+    }
+} mmmm_month_to_string;
+
+class derive_mmm_month : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t mnth = new_ptime.date().month();
+      return (months[mnth - 1]).substr(0, 3);
+    }
+} mmm_month_to_string;
+
+class derive_mm_month : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t mnth = new_ptime.date().month();
+      std::string mnth_str = std::to_string(mnth);
+      return std::string(2 - mnth_str.length(), '0') + mnth_str;
+    }
+} mm_month_to_string;
+
+class derive_m_month : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t mnth = new_ptime.date().month();
+      return std::to_string(mnth);
+    }
+} m_month_to_string;
+
+class derive_dd : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string day = std::to_string(new_ptime.date().day());
+      return std::string(2 - day.length(), '0') + day;
+    }
+} dd_to_string;
+
+class derive_d : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string day = std::to_string(new_ptime.date().day());
+      return day;
+    }
+} d_to_string;
+
+class derive_a : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t hr = new_ptime.time_of_day().hours();
+      std::string meridiem = (hr < 12 ? "AM" : "PM");
+      return meridiem;
+    }
+} a_to_string;
+
+class derive_hh : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t hr = new_ptime.time_of_day().hours();
+      std::string hr_12 = std::to_string(hr%12 == 0 ? 12 : hr%12);
+      return std::string(2 - hr_12.length(), '0') + hr_12;
+    }
+} hh_to_string;
+
+class derive_h : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t hr = new_ptime.time_of_day().hours();
+      std::string hr_12 = std::to_string(hr%12 == 0 ? 12 : hr%12);
+      return hr_12;
+    }
+} h_to_string;
+
+class derive_h2 : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t hr = new_ptime.time_of_day().hours();
+      std::string hr_24 = std::to_string(hr);
+      return std::string(2 - hr_24.length(), '0') + hr_24;
+    }
+} h2_to_string;
+
+class derive_h1 : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int64_t hr = new_ptime.time_of_day().hours();
+      return std::to_string(hr);
+    }
+} h1_to_string;
+
+class derive_mm : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string mint = std::to_string(new_ptime.time_of_day().minutes());
+      return std::string(2 - mint.length(), '0') + mint;
+    }
+} mm_to_string;
+
+class derive_m : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string mint = std::to_string(new_ptime.time_of_day().minutes());
+      return mint;
+    }
+} m_to_string;
+
+class derive_ss : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string sec =  std::to_string(new_ptime.time_of_day().seconds());
+      return std::string(2 - sec.length(), '0') + sec;
+    }
+} ss_to_string;
+
+class derive_s : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string sec =  std::to_string(new_ptime.time_of_day().seconds());
+      return sec;
+    }
+} s_to_string;
+
+class derive_frac_sec : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string frac_seconds = std::to_string(new_ptime.time_of_day().fractional_seconds());
+      #if BOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
+        frac_seconds = std::string(9 - frac_seconds.length(), '0') + frac_seconds;
+      #else
+        frac_seconds = std::string(6 - frac_seconds.length(), '0') + frac_seconds;
+      #endif
+      if (param >= frac_seconds.length())
+      {
+        return frac_seconds + std::string(param - frac_seconds.length(), '0');
+      }
+      else
+      {
+        return frac_seconds.substr(0, param);
+      }
+    }
+} frac_sec_to_string;
+
+class derive_n : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int frac_seconds = new_ptime.time_of_day().fractional_seconds();
+
+      if(frac_seconds == 0)
+        return std::to_string(frac_seconds);
+      else
+      {
+        #if BOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
+          return std::to_string(frac_seconds);
+        #else
+          return std::to_string(frac_seconds) + std::string(3, '0');
+        #endif
+      }
+    }
+} n_to_string;
+
+class derive_x1 : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int tz_hour = td.hours();
+      int tz_minute = td.minutes();
+      if (tz_hour == 0 && tz_minute == 0)
+      {
+        return "Z";
+      }
+      else if (tz_minute == 0)
+      {
+        std::string tz_hr = std::to_string(std::abs(tz_hour));
+        return (td.is_negative() ? "-" : "+") + std::string(2 - tz_hr.length(), '0') + tz_hr;
+      }
+      else
+      {
+        std::string tz_hr = std::to_string(std::abs(tz_hour));
+       std::string tz_mn = std::to_string(std::abs(tz_minute));
+        return (td.is_negative() ? "-" : "+") + std::string(2 - tz_hr.length(), '0') + tz_hr + std::string(2 - tz_mn.length(), '0') + tz_mn;
+      }
+    }
+} x1_to_string;
+
+class derive_x2 : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int tz_hour = td.hours();
+      int tz_minute = td.minutes();
+      if (tz_hour == 0 && tz_minute == 0)
+      {
+        return "Z";
+      }
+      else
+      {
+        std::string tz_hr = std::to_string(std::abs(tz_hour));
+       std::string tz_mn = std::to_string(std::abs(tz_minute));
+        return (td.is_negative() ? "-" : "+") + std::string(2 - tz_hr.length(), '0') + tz_hr + std::string(2 - tz_mn.length(), '0') + tz_mn;
+      }
+    }
+} x2_to_string;
+
+class derive_x3 : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int tz_hour = td.hours();
+      int tz_minute = td.minutes();
+      if (tz_hour == 0 && tz_minute == 0)
+      {
+        return "Z";
+      }
+      else
+      {
+        std::string tz_hr = std::to_string(std::abs(tz_hour));
+        std::string tz_mn = std::to_string(std::abs(tz_minute));
+        return (td.is_negative() ? "-" : "+") + std::string(2 - tz_hr.length(), '0') + tz_hr + ":" + std::string(2 - tz_mn.length(), '0') + tz_mn;
+      }
+    }
+} x3_to_string;
+
+class derive_x : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      int tz_minute = td.minutes();
+      std::string tz_hr = std::to_string(std::abs(td.hours()));
+      if (tz_minute == 0)
+      {
+        return (td.is_negative() ? "-" : "+") + std::string(2 - tz_hr.length(), '0') + tz_hr;
+      }
+      else
+      {
+        std::string tz_mn = std::to_string(std::abs(tz_minute));
+        return (td.is_negative() ? "-" : "+") + std::string(2 - tz_hr.length(), '0') + tz_hr + std::string(2 - tz_mn.length(), '0') + tz_mn;
+      }
+    }
+} x_to_string;
+
+class derive_xx : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string tz_hr = std::to_string(std::abs(td.hours()));
+      std::string tz_mn = std::to_string(std::abs(td.minutes()));
+      return (td.is_negative() ? "-" : "+") + std::string(2 - tz_hr.length(), '0') + tz_hr + std::string(2 - tz_mn.length(), '0') + tz_mn;
+    }
+} xx_to_string;
+
+class derive_xxx : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      std::string tz_hr = std::to_string(std::abs(td.hours()));
+      std::string tz_mn = std::to_string(std::abs(td.minutes()));
+      return (td.is_negative() ? "-" : "+") + std::string(2 - tz_hr.length(), '0') + tz_hr + ":" + std::string(2 - tz_mn.length(), '0') + tz_mn;
+    }
+} xxx_to_string;
+
+class derive_delimiter : public base_time_to_string
+{
+  public:
+    std::string print_time(boost::posix_time::ptime& new_ptime, boost::posix_time::time_duration& td, uint32_t param)
+    {
+      char ch = param;
+      return std::string(1, ch);
+    }
+} delimiter_to_string;
+
+class base_timestamp_to_string : public base_function
+{
+  protected:
+    boost::posix_time::ptime new_ptime;
+    boost::posix_time::time_duration td;
+    bool flag;
+    std::string format;
+    std::vector<char> m_metachar {'y', 'M', 'd', 'a', 'h', 'H', 'm', 's', 'S', 'n', 'X', 'x'};
+    std::vector<std::string> m_metaword_vec {"yyy", "yy", "y", "MMMMM", "MMMM", "MMM", "MM", "M",
+                                    "dd", "d", "a", "hh", "h", "HH", "H", "mm", "m", "ss", "s", "n",
+                                    "XXXXX", "XXXX", "XXX", "XX", "X", "xxxxx", "xxxx", "xxx", "xx",
+                                    "x"};
+    std::vector<base_time_to_string*> print_vector;
+    std::vector<uint32_t> para;
+    bool initialized = false;
+
+    using to_string_lib_t = std::map<std::string,base_time_to_string* >;
+
+    const to_string_lib_t time_to_string_functions =
+    {
+      {"yyyy+", &yyyy_to_string},
+      {"yyy", &y_to_string},
+      {"yy", &yy_to_string},
+      {"y", &y_to_string},
+      {"MMMMM", &mmmmm_month_to_string},
+      {"MMMM", &mmmm_month_to_string},
+      {"MMM", &mmm_month_to_string},
+      {"MM", &mm_month_to_string},
+      {"M", &m_month_to_string},
+      {"dd", &dd_to_string },
+      {"d", &d_to_string },
+      {"a", &a_to_string },
+      {"hh", &hh_to_string},
+      {"h", &h_to_string},
+      {"HH", &h2_to_string},
+      {"H", &h1_to_string},
+      {"mm", &mm_to_string},
+      {"m", &m_to_string},
+      {"ss", &ss_to_string},
+      {"s", &s_to_string},
+      {"S+", &frac_sec_to_string},
+      {"n", &n_to_string},
+      {"XXXXX", &x3_to_string},
+      {"XXXX", &x2_to_string},
+      {"XXX", &x3_to_string},
+      {"XX", &x2_to_string},
+      {"X", &x1_to_string},
+      {"xxxxx", &xxx_to_string},
+      {"xxxx", &xx_to_string},
+      {"xxx", &xxx_to_string},
+      {"xx", &xx_to_string},
+      {"x", &x_to_string},
+      {"delimiter", &delimiter_to_string}
+    };
+
+  public:
+    void param_validation(bs_stmt_vec_t*& args)
+    {
+      auto iter = args->begin();
+      int args_size = args->size();
+
+      if (args_size < 2)
+      {
+        throw base_s3select_exception("to_string need 2 parameters");
+      }
+
+      base_statement* dt1_param = *iter;
+      value val_timestamp = dt1_param->eval();
+
+      if (val_timestamp.is_timestamp() == false)
+      {
+        throw base_s3select_exception("first parameter should be timestamp");
+      }
+
+      iter++;
+      base_statement* frmt = *iter;
+      value val_format = frmt->eval();
+
+      if (val_format.is_string() == false)
+      {
+        throw base_s3select_exception("second parameter should be string");
+      }
+
+      std::tie(new_ptime, td, flag) = *val_timestamp.timestamp();
+      format = val_format.to_string();
+    }
+
+    uint32_t length_same_char_str(std::string str, char ch)
+    {
+      int i = 0;
+      while(str[i] == ch)
+        i++;
+      return i;
+    }
+
+    void prepare_to_string_vector(std::vector<base_time_to_string*>& print_vector, std::vector<uint32_t>& para)
+    {
+      for (uint32_t i = 0; i < format.length(); i++)
+      {
+        if (std::find(m_metachar.begin(), m_metachar.end() , format[i]) != m_metachar.end())
+        {
+          if (format.substr(i, 4).compare("yyyy") == 0)
+          {
+            uint32_t len = length_same_char_str(format.substr(i), 'y');
+            auto it = time_to_string_functions.find("yyyy+");
+            print_vector.push_back( it->second);
+            para.push_back(len);
+            i += len - 1;
+            continue;
+          }
+          else if (format[i] == 'S')
+          {
+            uint32_t len = length_same_char_str(format.substr(i), 'S');
+            auto it = time_to_string_functions.find("S+");
+            print_vector.push_back( it->second);
+            para.push_back(len);
+            i += len - 1;
+            continue;
+          }
+
+          for (auto word : m_metaword_vec)
+          {
+            if (format.substr(i, word.length()).compare(word) == 0)
+            {
+              auto it = time_to_string_functions.find(word.c_str());
+              print_vector.push_back( it->second);
+              para.push_back('\0');
+              i += word.length() - 1;
+              break;
+            }
+          }
+        }
+        else
+        {
+          auto it = time_to_string_functions.find("delimiter");
+          print_vector.push_back( it->second );
+          para.push_back(format[i]);
+        }
+      }
+    }
+
+    std::string execute_to_string(std::vector<base_time_to_string*>& print_vector, std::vector<uint32_t>& para)
+    {
+      std::string res;
+      int temp = 0;
+      for(auto p : print_vector)
+      {
+        res += p->print_time(new_ptime, td, para.at(temp));
+        temp++;
+      }
+      return res;
+    }
+
+};
 
 };//namespace