2 * /usr/include/boost/bind.hpp:36:1: note: ‘#pragma message: The practice of declaring the Bind placeholders (_1, _2, ...) in the global namespace is deprecated. Please use <boost/bind/bind.hpp> + using namespace boost::placeholders, or define BOOST_BIND_GLOBAL_PLACEHOLDERS to retain the current behavior.’
4 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
7 #include "gtest/gtest.h"
12 #include "boost/date_time/gregorian/gregorian.hpp"
13 #include "boost/date_time/posix_time/posix_time.hpp"
15 using namespace s3selectEngine
;
18 // ============================================================ //
26 #include <arrow/io/file.h>
27 #include <arrow/util/logging.h>
29 #include <parquet/api/reader.h>
30 #include <parquet/api/writer.h>
32 using parquet::ConvertedType
;
33 using parquet::Repetition
;
35 using parquet::schema::GroupNode
;
36 using parquet::schema::PrimitiveNode
;
40 constexpr int NUM_ROWS
= 100000;
41 constexpr int64_t ROW_GROUP_SIZE
= 1024 * 1024;
42 const char PARQUET_FILENAME
[] = "/tmp/csv_converted.parquet";
43 #define JSON_NO_RUN "no_run"
53 tokenize(std::string
& in
):s(0),input(in
),p(input
.c_str()),last_token(false)
57 void get_token(std::string
& token
)
67 while(*p
&& *p
!= ',' && *p
!= '\n') p
++;
69 token
= std::string(s
,p
);
75 return last_token
== true;
81 static std::shared_ptr
<GroupNode
> column_string_2(uint32_t num_of_columns
) {
83 parquet::schema::NodeVector fields
;
85 for(uint32_t i
=0;i
<num_of_columns
;i
++)
87 std::string column_name
= "column_" + std::to_string(i
) ;
88 fields
.push_back(PrimitiveNode::Make(column_name
, Repetition::OPTIONAL
, Type::BYTE_ARRAY
,
89 ConvertedType::NONE
));
92 return std::static_pointer_cast
<GroupNode
>(
93 GroupNode::Make("schema", Repetition::REQUIRED
, fields
));
96 int csv_to_parquet(std::string
& csv_object
)
99 auto csv_num_of_columns
= std::count( csv_object
.begin(),csv_object
.begin() + csv_object
.find('\n'),',')+1;
100 auto csv_num_of_rows
= std::count(csv_object
.begin(),csv_object
.end(),'\n');
102 tokenize
csv_tokens(csv_object
);
105 // Create a local file output stream instance.
107 using FileClass
= ::arrow::io::FileOutputStream
;
108 std::shared_ptr
<FileClass
> out_file
;
109 PARQUET_ASSIGN_OR_THROW(out_file
, FileClass::Open(PARQUET_FILENAME
));
111 // Setup the parquet schema
112 std::shared_ptr
<GroupNode
> schema
= column_string_2(csv_num_of_columns
);
114 // Add writer properties
115 parquet::WriterProperties::Builder builder
;
116 // builder.compression(parquet::Compression::SNAPPY);
117 std::shared_ptr
<parquet::WriterProperties
> props
= builder
.build();
119 // Create a ParquetFileWriter instance
120 std::shared_ptr
<parquet::ParquetFileWriter
> file_writer
=
121 parquet::ParquetFileWriter::Open(out_file
, schema
, props
);
123 // Append a BufferedRowGroup to keep the RowGroup open until a certain size
124 parquet::RowGroupWriter
* rg_writer
= file_writer
->AppendBufferedRowGroup();
126 int num_columns
= file_writer
->num_columns();
127 std::vector
<int64_t> buffered_values_estimate(num_columns
, 0);
129 for (int i
= 0; !csv_tokens
.is_last() && i
<csv_num_of_rows
; i
++) {
130 int64_t estimated_bytes
= 0;
131 // Get the estimated size of the values that are not written to a page yet
132 for (int n
= 0; n
< num_columns
; n
++) {
133 estimated_bytes
+= buffered_values_estimate
[n
];
136 // We need to consider the compressed pages
137 // as well as the values that are not compressed yet
138 if ((rg_writer
->total_bytes_written() + rg_writer
->total_compressed_bytes() +
139 estimated_bytes
) > ROW_GROUP_SIZE
) {
141 std::fill(buffered_values_estimate
.begin(), buffered_values_estimate
.end(), 0);
142 rg_writer
= file_writer
->AppendBufferedRowGroup();
147 for(col_id
=0;col_id
<num_columns
&& !csv_tokens
.is_last();col_id
++)
149 // Write the byte-array column
150 parquet::ByteArrayWriter
* ba_writer
=
151 static_cast<parquet::ByteArrayWriter
*>(rg_writer
->column(col_id
));
152 parquet::ByteArray ba_value
;
155 csv_tokens
.get_token(token
);
156 if(token
.size() == 0)
158 int16_t definition_level
= 0;
159 ba_writer
->WriteBatch(1, &definition_level
, nullptr, nullptr);
163 int16_t definition_level
= 1;
164 ba_value
.ptr
= (uint8_t*)(token
.data());
165 ba_value
.len
= token
.size();
166 ba_writer
->WriteBatch(1, &definition_level
, nullptr, &ba_value
);
169 buffered_values_estimate
[col_id
] = ba_writer
->EstimatedBufferedValueBytes();
174 if(csv_tokens
.is_last() && col_id
<num_columns
)
176 for(;col_id
<num_columns
;col_id
++)
178 parquet::ByteArrayWriter
* ba_writer
=
179 static_cast<parquet::ByteArrayWriter
*>(rg_writer
->column(col_id
));
181 int16_t definition_level
= 0;
182 ba_writer
->WriteBatch(1, &definition_level
, nullptr, nullptr);
184 buffered_values_estimate
[col_id
] = ba_writer
->EstimatedBufferedValueBytes();
191 // Close the RowGroupWriter
193 // Close the ParquetFileWriter
194 file_writer
->Close();
196 // Write the bytes to file
197 DCHECK(out_file
->Close().ok());
199 } catch (const std::exception
& e
) {
200 std::cerr
<< "Parquet write error: " << e
.what() << std::endl
;
207 int run_query_on_parquet_file(const char* input_query
, const char* input_file
, std::string
&result
)
210 s3select s3select_syntax
;
213 status
= s3select_syntax
.parse_query(input_query
);
216 std::cout
<< "failed to parse query " << s3select_syntax
.get_error_description() << std::endl
;
222 fp
=fopen(input_file
,"r");
225 std::cout
<< "can not open " << input_file
<< std::endl
;
229 std::function
<int(void)> fp_get_size
=[&]()
232 lstat(input_file
,&l_buf
);
233 return l_buf
.st_size
;
236 std::function
<size_t(int64_t,int64_t,void*,optional_yield
*)> fp_range_req
=[&](int64_t start
,int64_t length
,void *buff
,optional_yield
*y
)
238 fseek(fp
,start
,SEEK_SET
);
239 size_t read_sz
= fread(buff
, 1, length
, fp
);
243 rgw_s3select_api rgw
;
244 rgw
.set_get_size_api(fp_get_size
);
245 rgw
.set_range_req_api(fp_range_req
);
247 std::function
<int(std::string
&)> fp_s3select_result_format
= [](std::string
& result
){return 0;};//append
248 std::function
<int(std::string
&)> fp_s3select_header_format
= [](std::string
& result
){return 0;};//append
250 parquet_object
parquet_processor(input_file
,&s3select_syntax
,&rgw
);
252 //std::string result;
258 status
= parquet_processor
.run_s3select_on_object(result
,fp_s3select_result_format
,fp_s3select_header_format
);
260 catch (base_s3select_exception
&e
)
262 if (e
.severity() == base_s3select_exception::s3select_exp_en_t::FATAL
) //abort query execution
280 }// ============================================================ //
282 int run_query_on_parquet_file(const char* input_query
, const char* input_file
, std::string
&result
)
286 #endif //_ARROW_EXIST
288 std::string
convert_to_json(const char* csv_stream
, size_t stream_length
)
292 char row_delimiter('\n');
293 char column_delimiter(',');
296 m_stream
= (char*)csv_stream
;
297 m_end_stream
= (char*)csv_stream
+ stream_length
;
298 std::stringstream ss
;
300 ss
<< "{\"root\" : [";
302 while (m_stream
< m_end_stream
) {
305 while( *m_stream
&& (*m_stream
!= row_delimiter
) ) {
306 if (*m_stream
!= column_delimiter
&& previous
) {
307 ss
<< "\"c" << ++counter
<< "\"" << ":";
311 } else if (*m_stream
!= column_delimiter
) {
313 } else if (*m_stream
== column_delimiter
) {
315 ss
<< "\"c" << ++counter
<< "\"" << ":";
326 ss
.seekp(-1, std::ios_base::end
);
331 ss
<< "}" << ',' << std::endl
;
334 ss
.seekp(-2, std::ios_base::end
);
340 const char* convert_query(std::string
& expression
)
342 std::string from_clause
= "s3object";
343 boost::replace_all(expression
, from_clause
, "s3object[*].root");
345 std::string from_clause_1
= "stdin";
346 boost::replace_all(expression
, from_clause_1
, "s3object[*].root");
348 std::string col_1
= "_1";
349 boost::replace_all(expression
, col_1
, "_1.c1");
351 std::string col_2
= "_2";
352 boost::replace_all(expression
, col_2
, "_1.c2");
354 std::string col_3
= "_3";
355 boost::replace_all(expression
, col_3
, "_1.c3");
357 std::string col_4
= "_4";
358 boost::replace_all(expression
, col_4
, "_1.c4");
360 std::string col_5
= "_5";
361 boost::replace_all(expression
, col_5
, "_1.c5");
363 std::string col_9
= "_9";
364 boost::replace_all(expression
, col_9
, "_1.c9");
366 return expression
.c_str();
370 std::string
run_expression_in_C_prog(const char* expression
)
372 //purpose: per use-case a c-file is generated, compiles , and finally executed.
374 // side note: its possible to do the following: cat test_hello.c | gcc -pipe -x c - -o /dev/stdout > ./1
375 // gcc can read and write from/to pipe (use pipe2()) i.e. not using file-system , BUT should also run gcc-output from memory
377 const int C_FILE_SIZE
=(1024*1024);
378 std::string c_test_file
= std::string("/tmp/test_s3.c");
379 std::string c_run_file
= std::string("/tmp/s3test");
381 FILE* fp_c_file
= fopen(c_test_file
.c_str(), "w");
383 //contain return result
384 char result_buff
[100];
390 prog_c
= (char*)malloc(C_FILE_SIZE
);
392 size_t sz
=sprintf(prog_c
,"#include <stdio.h>\n \
393 #include <float.h>\n \
396 printf(\"%%.*e\\n\",DECIMAL_DIG,(double)(%s));\
399 fwrite(prog_c
, 1, sz
, fp_c_file
);
403 std::string gcc_and_run_cmd
= std::string("gcc ") + c_test_file
+ " -o " + c_run_file
+ " -Wall && " + c_run_file
;
405 FILE* fp_build
= popen(gcc_and_run_cmd
.c_str(), "r"); //TODO read stderr from pipe
412 return std::string("#ERROR#");
415 char * res
= fgets(result_buff
, sizeof(result_buff
), fp_build
);
423 return std::string("#ERROR#");
426 unlink(c_run_file
.c_str());
427 unlink(c_test_file
.c_str());
433 return std::string(result_buff
);
436 #define OPER oper[ rand() % oper.size() ]
444 std::string oper
= {"+-+*/*"};
448 return std::to_string(rand() % 1000) + ".0" + OPER
+ std::to_string(rand() % 1000) + ".0";
451 std::string
g_openp()
453 if ((rand() % 3) == 0)
456 return std::string("(");
458 return std::string("");
461 std::string
g_closep()
463 if ((rand() % 2) == 0 && open
> 0)
466 return std::string(")");
468 return std::string("");
473 std::string
generate()
475 std::string exp
= "";
478 for (int i
= 0; i
< 10; i
++)
480 exp
= (exp
.size() > 0 ? exp
+ OPER
: std::string("")) + g_openp() + gexpr() + OPER
+ gexpr() + g_closep();
493 const std::string
failure_sign("#failure#");
495 std::string
string_to_quot(std::string
& s
, char quot
= '"')
497 std::string result
= "";
498 std::stringstream str_strm
;
500 std::string temp_str
;
502 while(!str_strm
.eof()) {
503 str_strm
>> temp_str
;
504 if(std::stringstream(temp_str
) >> temp_int
) {
505 std::stringstream s1
;
507 result
+= quot
+ s1
.str() + quot
+ "\n";
514 void parquet_csv_report_error(std::string parquet_result
, std::string csv_result
)
517 ASSERT_EQ(parquet_result
,csv_result
);
523 void json_csv_report_error(std::string json_result
, std::string csv_result
)
525 ASSERT_EQ(json_result
, csv_result
);
528 std::string
run_s3select(std::string expression
)
529 {//purpose: run query on single row and return result(single projections).
530 s3select s3select_syntax
;
532 int status
= s3select_syntax
.parse_query(expression
.c_str());
537 std::string s3select_result
;
538 s3selectEngine::csv_object
s3_csv_object(&s3select_syntax
);
539 std::string in
= "1,1,1,1\n";
540 std::string csv_obj
= in
;
541 std::string parquet_result
;
543 s3_csv_object
.run_s3select_on_object(s3select_result
, in
.c_str(), in
.size(), false, false, true);
545 s3select_result
= s3select_result
.substr(0, s3select_result
.find_first_of(","));
546 s3select_result
= s3select_result
.substr(0, s3select_result
.find_first_of("\n"));//remove last \n
549 csv_to_parquet(csv_obj
);
550 run_query_on_parquet_file(expression
.c_str(),PARQUET_FILENAME
,parquet_result
);
551 parquet_result
= parquet_result
.substr(0, parquet_result
.find_first_of(","));
552 parquet_result
= parquet_result
.substr(0, parquet_result
.find_first_of("\n"));//remove last \n
554 parquet_csv_report_error(parquet_result
,s3select_result
);
557 return s3select_result
;
560 void run_s3select_test_opserialization(std::string expression
,std::string input
, char *row_delimiter
, char *column_delimiter
)
561 {//purpose: run query on multiple rows and return result(multiple projections).
562 s3select s3select_syntax
;
564 int status
= s3select_syntax
.parse_query(expression
.c_str());
569 std::string s3select_result
;
570 csv_object::csv_defintions csv
;
571 csv
.redundant_column
= false;
573 csv
.output_row_delimiter
= *row_delimiter
;
574 csv
.output_column_delimiter
= *column_delimiter
;
576 s3selectEngine::csv_object
s3_csv_object(&s3select_syntax
, csv
);
578 s3_csv_object
.run_s3select_on_object(s3select_result
, input
.c_str(), input
.size(), false, false, true);
580 std::string s3select_result1
= s3select_result
;
582 csv
.row_delimiter
= *row_delimiter
;
583 csv
.column_delimiter
= *column_delimiter
;
584 csv
.output_row_delimiter
= *row_delimiter
;
585 csv
.output_column_delimiter
= *column_delimiter
;
586 csv
.redundant_column
= false;
587 std::string s3select_result_second_phase
;
589 s3selectEngine::csv_object
s3_csv_object_second(&s3select_syntax
, csv
);
591 s3_csv_object_second
.run_s3select_on_object(s3select_result_second_phase
, s3select_result
.c_str(), s3select_result
.size(), false, false, true);
593 ASSERT_EQ(s3select_result_second_phase
, s3select_result1
);
596 std::string
run_s3select_opserialization_quot(std::string expression
,std::string input
, bool quot_always
= false, char quot_char
= '"')
597 {//purpose: run query on multiple rows and return result(multiple projections).
598 s3select s3select_syntax
;
600 int status
= s3select_syntax
.parse_query(expression
.c_str());
605 std::string s3select_result
;
606 csv_object::csv_defintions csv
;
608 csv
.redundant_column
= false;
609 csv
.quote_fields_always
= quot_always
;
610 csv
.output_quot_char
= quot_char
;
612 s3selectEngine::csv_object
s3_csv_object(&s3select_syntax
, csv
);
614 s3_csv_object
.run_s3select_on_object(s3select_result
, input
.c_str(), input
.size(), false, false, true);
616 return s3select_result
;
620 int run_json_query(const char* json_query
, std::string
& json_input
,std::string
& result
)
621 {//purpose: run single-chunk json queries
623 s3select s3select_syntax
;
624 int status
= s3select_syntax
.parse_query(json_query
);
627 std::cout
<< "failed to parse query " << s3select_syntax
.get_error_description() << std::endl
;
631 json_object
json_query_processor(&s3select_syntax
);
633 status
= json_query_processor
.run_s3select_on_stream(result
, json_input
.data(), json_input
.size(), json_input
.size());
634 std::string prev_result
= result
;
636 status
= json_query_processor
.run_s3select_on_stream(result
, 0, 0, json_input
.size());
638 result
= prev_result
+ result
;
643 std::string
run_s3select(std::string expression
,std::string input
, const char* json_query
= "")
644 {//purpose: run query on multiple rows and return result(multiple projections).
645 s3select s3select_syntax
;
646 std::string parquet_input
= input
;
648 std::string js
= convert_to_json(input
.c_str(), input
.size());
650 int status
= s3select_syntax
.parse_query(expression
.c_str());
655 std::string s3select_result
;
656 std::string json_result
;
657 s3selectEngine::csv_object
s3_csv_object(&s3select_syntax
);
658 s3_csv_object
.m_csv_defintion
.redundant_column
= false;
660 s3_csv_object
.run_s3select_on_object(s3select_result
, input
.c_str(), input
.size(), false, false, true);
663 static int file_no
= 1;
664 csv_to_parquet(parquet_input
);
665 std::string parquet_result
;
666 run_query_on_parquet_file(expression
.c_str(),PARQUET_FILENAME
,parquet_result
);
668 if (strcmp(parquet_result
.c_str(),s3select_result
.c_str()))
670 std::cout
<< "failed on query " << expression
<< std::endl
;
671 std::cout
<< "input for query reside on" << "./failed_test_input" << std::to_string(file_no
) << ".[csv|parquet]" << std::endl
;
676 std::ifstream
f(PARQUET_FILENAME
);
677 f
.seekg(0, std::ios::end
);
678 buffer
.resize(f
.tellg());
680 f
.read(buffer
.data(), buffer
.size());
682 std::string fn
= std::string("./failed_test_input_") + std::to_string(file_no
) + std::string(".parquet");
683 std::ofstream
fw(fn
.c_str());
684 fw
.write(buffer
.data(), buffer
.size());
686 fn
= std::string("./failed_test_input_") + std::to_string(file_no
++) + std::string(".csv");
687 std::ofstream
fw2(fn
.c_str());
688 fw2
.write(parquet_input
.data(), parquet_input
.size());
693 parquet_csv_report_error(parquet_result
,s3select_result
);
694 #endif //_ARROW_EXIST
696 if(strlen(json_query
) == 0) {
697 json_query
= convert_query(expression
);
700 if(strcmp(json_query
,JSON_NO_RUN
)) {
701 run_json_query(json_query
, js
, json_result
);
702 json_csv_report_error(json_result
, s3select_result
);
705 return s3select_result
;