]> git.proxmox.com Git - ceph.git/blob - ceph/src/s3select/test/s3select_test.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / s3select / test / s3select_test.h
1 /*
2 * /usr/include/boost/bind.hpp:36:1: note: ‘#pragma message: The practice of declaring the Bind placeholders (_1, _2, ...) in the global namespace is deprecated. Please use <boost/bind/bind.hpp> + using namespace boost::placeholders, or define BOOST_BIND_GLOBAL_PLACEHOLDERS to retain the current behavior.’
3 */
4 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
5
6 #include "s3select.h"
7 #include "gtest/gtest.h"
8 #include <string>
9 #include <fstream>
10 #include <iomanip>
11 #include <algorithm>
12 #include "boost/date_time/gregorian/gregorian.hpp"
13 #include "boost/date_time/posix_time/posix_time.hpp"
14
15 using namespace s3selectEngine;
16
17 // parquet conversion
18 // ============================================================ //
19 #include <cassert>
20 #include <fstream>
21 #include <iostream>
22 #include <memory>
23
24 #ifdef _ARROW_EXIST
25
26 #include <arrow/io/file.h>
27 #include <arrow/util/logging.h>
28
29 #include <parquet/api/reader.h>
30 #include <parquet/api/writer.h>
31
32 using parquet::ConvertedType;
33 using parquet::Repetition;
34 using parquet::Type;
35 using parquet::schema::GroupNode;
36 using parquet::schema::PrimitiveNode;
37
38 #endif
39
40 constexpr int NUM_ROWS = 100000;
41 constexpr int64_t ROW_GROUP_SIZE = 1024 * 1024;
42 const char PARQUET_FILENAME[] = "/tmp/csv_converted.parquet";
43 #define JSON_NO_RUN "no_run"
44
45 class tokenize {
46
47 public:
48 const char *s;
49 std::string input;
50 const char *p;
51 bool last_token;
52
53 tokenize(std::string& in):s(0),input(in),p(input.c_str()),last_token(false)
54 {
55 };
56
57 void get_token(std::string& token)
58 {
59 if(!*p)
60 {
61 token = "";
62 last_token = true;
63 return;
64 }
65
66 s=p;
67 while(*p && *p != ',' && *p != '\n') p++;
68
69 token = std::string(s,p);
70 p++;
71 }
72
73 bool is_last()
74 {
75 return last_token == true;
76 }
77 };
78
79 #ifdef _ARROW_EXIST
80
81 static std::shared_ptr<GroupNode> column_string_2(uint32_t num_of_columns) {
82
83 parquet::schema::NodeVector fields;
84
85 for(uint32_t i=0;i<num_of_columns;i++)
86 {
87 std::string column_name = "column_" + std::to_string(i) ;
88 fields.push_back(PrimitiveNode::Make(column_name, Repetition::OPTIONAL, Type::BYTE_ARRAY,
89 ConvertedType::NONE));
90 }
91
92 return std::static_pointer_cast<GroupNode>(
93 GroupNode::Make("schema", Repetition::REQUIRED, fields));
94 }
95
96 int csv_to_parquet(std::string & csv_object)
97 {
98
99 auto csv_num_of_columns = std::count( csv_object.begin(),csv_object.begin() + csv_object.find('\n'),',')+1;
100 auto csv_num_of_rows = std::count(csv_object.begin(),csv_object.end(),'\n');
101
102 tokenize csv_tokens(csv_object);
103
104 try {
105 // Create a local file output stream instance.
106
107 using FileClass = ::arrow::io::FileOutputStream;
108 std::shared_ptr<FileClass> out_file;
109 PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME));
110
111 // Setup the parquet schema
112 std::shared_ptr<GroupNode> schema = column_string_2(csv_num_of_columns);
113
114 // Add writer properties
115 parquet::WriterProperties::Builder builder;
116 // builder.compression(parquet::Compression::SNAPPY);
117 std::shared_ptr<parquet::WriterProperties> props = builder.build();
118
119 // Create a ParquetFileWriter instance
120 std::shared_ptr<parquet::ParquetFileWriter> file_writer =
121 parquet::ParquetFileWriter::Open(out_file, schema, props);
122
123 // Append a BufferedRowGroup to keep the RowGroup open until a certain size
124 parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
125
126 int num_columns = file_writer->num_columns();
127 std::vector<int64_t> buffered_values_estimate(num_columns, 0);
128
129 for (int i = 0; !csv_tokens.is_last() && i<csv_num_of_rows; i++) {
130 int64_t estimated_bytes = 0;
131 // Get the estimated size of the values that are not written to a page yet
132 for (int n = 0; n < num_columns; n++) {
133 estimated_bytes += buffered_values_estimate[n];
134 }
135
136 // We need to consider the compressed pages
137 // as well as the values that are not compressed yet
138 if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() +
139 estimated_bytes) > ROW_GROUP_SIZE) {
140 rg_writer->Close();
141 std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0);
142 rg_writer = file_writer->AppendBufferedRowGroup();
143 }
144
145
146 int col_id;
147 for(col_id=0;col_id<num_columns && !csv_tokens.is_last();col_id++)
148 {
149 // Write the byte-array column
150 parquet::ByteArrayWriter* ba_writer =
151 static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
152 parquet::ByteArray ba_value;
153
154 std::string token;
155 csv_tokens.get_token(token);
156 if(token.size() == 0)
157 {//null column
158 int16_t definition_level = 0;
159 ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
160 }
161 else
162 {
163 int16_t definition_level = 1;
164 ba_value.ptr = (uint8_t*)(token.data());
165 ba_value.len = token.size();
166 ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
167 }
168
169 buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
170
171
172 } //end-for columns
173
174 if(csv_tokens.is_last() && col_id<num_columns)
175 {
176 for(;col_id<num_columns;col_id++)
177 {
178 parquet::ByteArrayWriter* ba_writer =
179 static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
180
181 int16_t definition_level = 0;
182 ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
183
184 buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
185 }
186
187 }
188
189 } // end-for rows
190
191 // Close the RowGroupWriter
192 rg_writer->Close();
193 // Close the ParquetFileWriter
194 file_writer->Close();
195
196 // Write the bytes to file
197 DCHECK(out_file->Close().ok());
198
199 } catch (const std::exception& e) {
200 std::cerr << "Parquet write error: " << e.what() << std::endl;
201 return -1;
202 }
203
204 return 0;
205 }
206
207 int run_query_on_parquet_file(const char* input_query, const char* input_file, std::string &result)
208 {
209 int status;
210 s3select s3select_syntax;
211 result.clear();
212
213 status = s3select_syntax.parse_query(input_query);
214 if (status != 0)
215 {
216 std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
217 return -1;
218 }
219
220 FILE *fp=nullptr;
221
222 fp=fopen(input_file,"r");
223
224 if(!fp){
225 std::cout << "can not open " << input_file << std::endl;
226 return -1;
227 }
228
229 std::function<int(void)> fp_get_size=[&]()
230 {
231 struct stat l_buf;
232 lstat(input_file,&l_buf);
233 return l_buf.st_size;
234 };
235
236 std::function<size_t(int64_t,int64_t,void*,optional_yield*)> fp_range_req=[&](int64_t start,int64_t length,void *buff,optional_yield*y)
237 {
238 fseek(fp,start,SEEK_SET);
239 size_t read_sz = fread(buff, 1, length, fp);
240 return read_sz;
241 };
242
243 rgw_s3select_api rgw;
244 rgw.set_get_size_api(fp_get_size);
245 rgw.set_range_req_api(fp_range_req);
246
247 std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){return 0;};//append
248 std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){return 0;};//append
249
250 parquet_object parquet_processor(input_file,&s3select_syntax,&rgw);
251
252 //std::string result;
253
254 do
255 {
256 try
257 {
258 status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format);
259 }
260 catch (base_s3select_exception &e)
261 {
262 if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
263 {
264 if(fp){
265 fclose(fp);
266 }
267 return -1;
268 }
269 }
270
271 if (status < 0)
272 break;
273
274 } while (0);
275
276 if(fp){
277 fclose(fp);
278 }
279 return 0;
280 }// ============================================================ //
281 #else
282 int run_query_on_parquet_file(const char* input_query, const char* input_file, std::string &result)
283 {
284 return 0;
285 }
286 #endif //_ARROW_EXIST
287
288 std::string convert_to_json(const char* csv_stream, size_t stream_length)
289 {
290 char* m_stream;
291 char* m_end_stream;
292 char row_delimiter('\n');
293 char column_delimiter(',');
294 bool previous{true};
295
296 m_stream = (char*)csv_stream;
297 m_end_stream = (char*)csv_stream + stream_length;
298 std::stringstream ss;
299 ss << std::endl;
300 ss << "{\"root\" : [";
301 ss << std::endl;
302 while (m_stream < m_end_stream) {
303 int counter{};
304 ss << "{";
305 while( *m_stream && (*m_stream != row_delimiter) ) {
306 if (*m_stream != column_delimiter && previous) {
307 ss << "\"c" << ++counter << "\"" << ":";
308 ss << "\"";
309 ss << *m_stream;
310 previous = false;
311 } else if (*m_stream != column_delimiter) {
312 ss << *m_stream;
313 } else if (*m_stream == column_delimiter) {
314 if (previous) {
315 ss << "\"c" << ++counter << "\"" << ":";
316 ss << "null";
317 } else {
318 ss << "\"";
319 }
320 ss << ",";
321 previous = true;
322 }
323 m_stream++;
324 }
325 if(previous) {
326 ss.seekp(-1, std::ios_base::end);
327 } else {
328 ss << "\"";
329 }
330 previous = true;
331 ss << "}" << ',' << std::endl;
332 m_stream++;
333 }
334 ss.seekp(-2, std::ios_base::end);
335 ss << std::endl;
336 ss << "]" << "}";
337 return ss.str();
338 }
339
340 const char* convert_query(std::string& expression)
341 {
342 std::string from_clause = "s3object";
343 boost::replace_all(expression, from_clause, "s3object[*].root");
344
345 std::string from_clause_1 = "stdin";
346 boost::replace_all(expression, from_clause_1, "s3object[*].root");
347
348 std::string col_1 = "_1";
349 boost::replace_all(expression, col_1, "_1.c1");
350
351 std::string col_2 = "_2";
352 boost::replace_all(expression, col_2, "_1.c2");
353
354 std::string col_3 = "_3";
355 boost::replace_all(expression, col_3, "_1.c3");
356
357 std::string col_4 = "_4";
358 boost::replace_all(expression, col_4, "_1.c4");
359
360 std::string col_5 = "_5";
361 boost::replace_all(expression, col_5, "_1.c5");
362
363 std::string col_9 = "_9";
364 boost::replace_all(expression, col_9, "_1.c9");
365
366 return expression.c_str();
367 }
368
369
370 std::string run_expression_in_C_prog(const char* expression)
371 {
372 //purpose: per use-case a c-file is generated, compiles , and finally executed.
373
374 // side note: its possible to do the following: cat test_hello.c | gcc -pipe -x c - -o /dev/stdout > ./1
375 // gcc can read and write from/to pipe (use pipe2()) i.e. not using file-system , BUT should also run gcc-output from memory
376
377 const int C_FILE_SIZE=(1024*1024);
378 std::string c_test_file = std::string("/tmp/test_s3.c");
379 std::string c_run_file = std::string("/tmp/s3test");
380
381 FILE* fp_c_file = fopen(c_test_file.c_str(), "w");
382
383 //contain return result
384 char result_buff[100];
385
386 char* prog_c = 0;
387
388 if(fp_c_file)
389 {
390 prog_c = (char*)malloc(C_FILE_SIZE);
391
392 size_t sz=sprintf(prog_c,"#include <stdio.h>\n \
393 #include <float.h>\n \
394 int main() \
395 {\
396 printf(\"%%.*e\\n\",DECIMAL_DIG,(double)(%s));\
397 } ", expression);
398
399 fwrite(prog_c, 1, sz, fp_c_file);
400 fclose(fp_c_file);
401 }
402
403 std::string gcc_and_run_cmd = std::string("gcc ") + c_test_file + " -o " + c_run_file + " -Wall && " + c_run_file;
404
405 FILE* fp_build = popen(gcc_and_run_cmd.c_str(), "r"); //TODO read stderr from pipe
406
407 if(!fp_build)
408 {
409 if(prog_c)
410 free(prog_c);
411
412 return std::string("#ERROR#");
413 }
414
415 char * res = fgets(result_buff, sizeof(result_buff), fp_build);
416
417 if(!res)
418 {
419 if(prog_c)
420 free(prog_c);
421
422 fclose(fp_build);
423 return std::string("#ERROR#");
424 }
425
426 unlink(c_run_file.c_str());
427 unlink(c_test_file.c_str());
428 fclose(fp_build);
429
430 if(prog_c)
431 free(prog_c);
432
433 return std::string(result_buff);
434 }
435
436 #define OPER oper[ rand() % oper.size() ]
437
438 class gen_expr
439 {
440
441 private:
442
443 int open = 0;
444 std::string oper= {"+-+*/*"};
445
446 std::string gexpr()
447 {
448 return std::to_string(rand() % 1000) + ".0" + OPER + std::to_string(rand() % 1000) + ".0";
449 }
450
451 std::string g_openp()
452 {
453 if ((rand() % 3) == 0)
454 {
455 open++;
456 return std::string("(");
457 }
458 return std::string("");
459 }
460
461 std::string g_closep()
462 {
463 if ((rand() % 2) == 0 && open > 0)
464 {
465 open--;
466 return std::string(")");
467 }
468 return std::string("");
469 }
470
471 public:
472
473 std::string generate()
474 {
475 std::string exp = "";
476 open = 0;
477
478 for (int i = 0; i < 10; i++)
479 {
480 exp = (exp.size() > 0 ? exp + OPER : std::string("")) + g_openp() + gexpr() + OPER + gexpr() + g_closep();
481 }
482
483 if (open)
484 for (; open--;)
485 {
486 exp += ")";
487 }
488
489 return exp;
490 }
491 };
492
493 const std::string failure_sign("#failure#");
494
495 std::string string_to_quot(std::string& s, char quot = '"')
496 {
497 std::string result = "";
498 std::stringstream str_strm;
499 str_strm << s;
500 std::string temp_str;
501 int temp_int;
502 while(!str_strm.eof()) {
503 str_strm >> temp_str;
504 if(std::stringstream(temp_str) >> temp_int) {
505 std::stringstream s1;
506 s1 << temp_int;
507 result += quot + s1.str() + quot + "\n";
508 }
509 temp_str = "";
510 }
511 return result;
512 }
513
514 void parquet_csv_report_error(std::string parquet_result, std::string csv_result)
515 {
516 #ifdef _ARROW_EXIST
517 ASSERT_EQ(parquet_result,csv_result);
518 #else
519 ASSERT_EQ(0,0);
520 #endif
521 }
522
523 void json_csv_report_error(std::string json_result, std::string csv_result)
524 {
525 ASSERT_EQ(json_result, csv_result);
526 }
527
528 std::string run_s3select(std::string expression)
529 {//purpose: run query on single row and return result(single projections).
530 s3select s3select_syntax;
531
532 int status = s3select_syntax.parse_query(expression.c_str());
533
534 if(status)
535 return failure_sign;
536
537 std::string s3select_result;
538 s3selectEngine::csv_object s3_csv_object(&s3select_syntax);
539 std::string in = "1,1,1,1\n";
540 std::string csv_obj = in;
541 std::string parquet_result;
542
543 s3_csv_object.run_s3select_on_object(s3select_result, in.c_str(), in.size(), false, false, true);
544
545 s3select_result = s3select_result.substr(0, s3select_result.find_first_of(","));
546 s3select_result = s3select_result.substr(0, s3select_result.find_first_of("\n"));//remove last \n
547
548 #ifdef _ARROW_EXIST
549 csv_to_parquet(csv_obj);
550 run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result);
551 parquet_result = parquet_result.substr(0, parquet_result.find_first_of(","));
552 parquet_result = parquet_result.substr(0, parquet_result.find_first_of("\n"));//remove last \n
553
554 parquet_csv_report_error(parquet_result,s3select_result);
555 #endif
556
557 return s3select_result;
558 }
559
560 void run_s3select_test_opserialization(std::string expression,std::string input, char *row_delimiter, char *column_delimiter)
561 {//purpose: run query on multiple rows and return result(multiple projections).
562 s3select s3select_syntax;
563
564 int status = s3select_syntax.parse_query(expression.c_str());
565
566 if(status)
567 return;
568
569 std::string s3select_result;
570 csv_object::csv_defintions csv;
571 csv.redundant_column = false;
572
573 csv.output_row_delimiter = *row_delimiter;
574 csv.output_column_delimiter = *column_delimiter;
575
576 s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv);
577
578 s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true);
579
580 std::string s3select_result1 = s3select_result;
581
582 csv.row_delimiter = *row_delimiter;
583 csv.column_delimiter = *column_delimiter;
584 csv.output_row_delimiter = *row_delimiter;
585 csv.output_column_delimiter = *column_delimiter;
586 csv.redundant_column = false;
587 std::string s3select_result_second_phase;
588
589 s3selectEngine::csv_object s3_csv_object_second(&s3select_syntax, csv);
590
591 s3_csv_object_second.run_s3select_on_object(s3select_result_second_phase, s3select_result.c_str(), s3select_result.size(), false, false, true);
592
593 ASSERT_EQ(s3select_result_second_phase, s3select_result1);
594 }
595
596 std::string run_s3select_opserialization_quot(std::string expression,std::string input, bool quot_always = false, char quot_char = '"')
597 {//purpose: run query on multiple rows and return result(multiple projections).
598 s3select s3select_syntax;
599
600 int status = s3select_syntax.parse_query(expression.c_str());
601
602 if(status)
603 return failure_sign;
604
605 std::string s3select_result;
606 csv_object::csv_defintions csv;
607
608 csv.redundant_column = false;
609 csv.quote_fields_always = quot_always;
610 csv.output_quot_char = quot_char;
611
612 s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv);
613
614 s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true);
615
616 return s3select_result;
617 }
618
619 // JSON tests API's
620 int run_json_query(const char* json_query, std::string& json_input,std::string& result)
621 {//purpose: run single-chunk json queries
622
623 s3select s3select_syntax;
624 int status = s3select_syntax.parse_query(json_query);
625 if (status != 0)
626 {
627 std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
628 return -1;
629 }
630
631 json_object json_query_processor(&s3select_syntax);
632 result.clear();
633 status = json_query_processor.run_s3select_on_stream(result, json_input.data(), json_input.size(), json_input.size());
634 std::string prev_result = result;
635 result.clear();
636 status = json_query_processor.run_s3select_on_stream(result, 0, 0, json_input.size());
637
638 result = prev_result + result;
639
640 return status;
641 }
642
643 std::string run_s3select(std::string expression,std::string input, const char* json_query = "")
644 {//purpose: run query on multiple rows and return result(multiple projections).
645 s3select s3select_syntax;
646 std::string parquet_input = input;
647
648 std::string js = convert_to_json(input.c_str(), input.size());
649
650 int status = s3select_syntax.parse_query(expression.c_str());
651
652 if(status)
653 return failure_sign;
654
655 std::string s3select_result;
656 std::string json_result;
657 s3selectEngine::csv_object s3_csv_object(&s3select_syntax);
658 s3_csv_object.m_csv_defintion.redundant_column = false;
659
660 s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true);
661
662 #ifdef _ARROW_EXIST
663 static int file_no = 1;
664 csv_to_parquet(parquet_input);
665 std::string parquet_result;
666 run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result);
667
668 if (strcmp(parquet_result.c_str(),s3select_result.c_str()))
669 {
670 std::cout << "failed on query " << expression << std::endl;
671 std::cout << "input for query reside on" << "./failed_test_input" << std::to_string(file_no) << ".[csv|parquet]" << std::endl;
672
673 {
674 std::string buffer;
675
676 std::ifstream f(PARQUET_FILENAME);
677 f.seekg(0, std::ios::end);
678 buffer.resize(f.tellg());
679 f.seekg(0);
680 f.read(buffer.data(), buffer.size());
681
682 std::string fn = std::string("./failed_test_input_") + std::to_string(file_no) + std::string(".parquet");
683 std::ofstream fw(fn.c_str());
684 fw.write(buffer.data(), buffer.size());
685
686 fn = std::string("./failed_test_input_") + std::to_string(file_no++) + std::string(".csv");
687 std::ofstream fw2(fn.c_str());
688 fw2.write(parquet_input.data(), parquet_input.size());
689
690 }
691 }
692
693 parquet_csv_report_error(parquet_result,s3select_result);
694 #endif //_ARROW_EXIST
695
696 if(strlen(json_query) == 0) {
697 json_query = convert_query(expression);
698 }
699
700 if(strcmp(json_query,JSON_NO_RUN)) {
701 run_json_query(json_query, js, json_result);
702 json_csv_report_error(json_result, s3select_result);
703 }
704
705 return s3select_result;
706 }
707
708
709