]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | /* |
2 | * /usr/include/boost/bind.hpp:36:1: note: ‘#pragma message: The practice of declaring the Bind placeholders (_1, _2, ...) in the global namespace is deprecated. Please use <boost/bind/bind.hpp> + using namespace boost::placeholders, or define BOOST_BIND_GLOBAL_PLACEHOLDERS to retain the current behavior.’ | |
3 | */ | |
4 | #define BOOST_BIND_GLOBAL_PLACEHOLDERS | |
5 | ||
6 | #include "s3select.h" | |
7 | #include "gtest/gtest.h" | |
8 | #include <string> | |
9 | #include <fstream> | |
10 | #include <iomanip> | |
11 | #include <algorithm> | |
12 | #include "boost/date_time/gregorian/gregorian.hpp" | |
13 | #include "boost/date_time/posix_time/posix_time.hpp" | |
14 | ||
15 | using namespace s3selectEngine; | |
16 | ||
17 | // parquet conversion | |
18 | // ============================================================ // | |
19 | #include <cassert> | |
20 | #include <fstream> | |
21 | #include <iostream> | |
22 | #include <memory> | |
23 | ||
24 | #ifdef _ARROW_EXIST | |
25 | ||
26 | #include <arrow/io/file.h> | |
27 | #include <arrow/util/logging.h> | |
28 | ||
29 | #include <parquet/api/reader.h> | |
30 | #include <parquet/api/writer.h> | |
31 | ||
32 | using parquet::ConvertedType; | |
33 | using parquet::Repetition; | |
34 | using parquet::Type; | |
35 | using parquet::schema::GroupNode; | |
36 | using parquet::schema::PrimitiveNode; | |
37 | ||
38 | #endif | |
39 | ||
40 | constexpr int NUM_ROWS = 100000; | |
41 | constexpr int64_t ROW_GROUP_SIZE = 1024 * 1024; | |
42 | const char PARQUET_FILENAME[] = "/tmp/csv_converted.parquet"; | |
43 | #define JSON_NO_RUN "no_run" | |
44 | ||
45 | class tokenize { | |
46 | ||
47 | public: | |
48 | const char *s; | |
49 | std::string input; | |
50 | const char *p; | |
51 | bool last_token; | |
52 | ||
53 | tokenize(std::string& in):s(0),input(in),p(input.c_str()),last_token(false) | |
54 | { | |
55 | }; | |
56 | ||
57 | void get_token(std::string& token) | |
58 | { | |
59 | if(!*p) | |
60 | { | |
61 | token = ""; | |
62 | last_token = true; | |
63 | return; | |
64 | } | |
65 | ||
66 | s=p; | |
67 | while(*p && *p != ',' && *p != '\n') p++; | |
68 | ||
69 | token = std::string(s,p); | |
70 | p++; | |
71 | } | |
72 | ||
73 | bool is_last() | |
74 | { | |
75 | return last_token == true; | |
76 | } | |
77 | }; | |
78 | ||
79 | #ifdef _ARROW_EXIST | |
80 | ||
81 | static std::shared_ptr<GroupNode> column_string_2(uint32_t num_of_columns) { | |
82 | ||
83 | parquet::schema::NodeVector fields; | |
84 | ||
85 | for(uint32_t i=0;i<num_of_columns;i++) | |
86 | { | |
87 | std::string column_name = "column_" + std::to_string(i) ; | |
88 | fields.push_back(PrimitiveNode::Make(column_name, Repetition::OPTIONAL, Type::BYTE_ARRAY, | |
89 | ConvertedType::NONE)); | |
90 | } | |
91 | ||
92 | return std::static_pointer_cast<GroupNode>( | |
93 | GroupNode::Make("schema", Repetition::REQUIRED, fields)); | |
94 | } | |
95 | ||
96 | int csv_to_parquet(std::string & csv_object) | |
97 | { | |
98 | ||
99 | auto csv_num_of_columns = std::count( csv_object.begin(),csv_object.begin() + csv_object.find('\n'),',')+1; | |
100 | auto csv_num_of_rows = std::count(csv_object.begin(),csv_object.end(),'\n'); | |
101 | ||
102 | tokenize csv_tokens(csv_object); | |
103 | ||
104 | try { | |
105 | // Create a local file output stream instance. | |
106 | ||
107 | using FileClass = ::arrow::io::FileOutputStream; | |
108 | std::shared_ptr<FileClass> out_file; | |
109 | PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); | |
110 | ||
111 | // Setup the parquet schema | |
112 | std::shared_ptr<GroupNode> schema = column_string_2(csv_num_of_columns); | |
113 | ||
114 | // Add writer properties | |
115 | parquet::WriterProperties::Builder builder; | |
116 | // builder.compression(parquet::Compression::SNAPPY); | |
117 | std::shared_ptr<parquet::WriterProperties> props = builder.build(); | |
118 | ||
119 | // Create a ParquetFileWriter instance | |
120 | std::shared_ptr<parquet::ParquetFileWriter> file_writer = | |
121 | parquet::ParquetFileWriter::Open(out_file, schema, props); | |
122 | ||
123 | // Append a BufferedRowGroup to keep the RowGroup open until a certain size | |
124 | parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup(); | |
125 | ||
126 | int num_columns = file_writer->num_columns(); | |
127 | std::vector<int64_t> buffered_values_estimate(num_columns, 0); | |
128 | ||
129 | for (int i = 0; !csv_tokens.is_last() && i<csv_num_of_rows; i++) { | |
130 | int64_t estimated_bytes = 0; | |
131 | // Get the estimated size of the values that are not written to a page yet | |
132 | for (int n = 0; n < num_columns; n++) { | |
133 | estimated_bytes += buffered_values_estimate[n]; | |
134 | } | |
135 | ||
136 | // We need to consider the compressed pages | |
137 | // as well as the values that are not compressed yet | |
138 | if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() + | |
139 | estimated_bytes) > ROW_GROUP_SIZE) { | |
140 | rg_writer->Close(); | |
141 | std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0); | |
142 | rg_writer = file_writer->AppendBufferedRowGroup(); | |
143 | } | |
144 | ||
145 | ||
146 | int col_id; | |
147 | for(col_id=0;col_id<num_columns && !csv_tokens.is_last();col_id++) | |
148 | { | |
149 | // Write the byte-array column | |
150 | parquet::ByteArrayWriter* ba_writer = | |
151 | static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id)); | |
152 | parquet::ByteArray ba_value; | |
153 | ||
154 | std::string token; | |
155 | csv_tokens.get_token(token); | |
156 | if(token.size() == 0) | |
157 | {//null column | |
158 | int16_t definition_level = 0; | |
159 | ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); | |
160 | } | |
161 | else | |
162 | { | |
163 | int16_t definition_level = 1; | |
164 | ba_value.ptr = (uint8_t*)(token.data()); | |
165 | ba_value.len = token.size(); | |
166 | ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value); | |
167 | } | |
168 | ||
169 | buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); | |
170 | ||
171 | ||
172 | } //end-for columns | |
173 | ||
174 | if(csv_tokens.is_last() && col_id<num_columns) | |
175 | { | |
176 | for(;col_id<num_columns;col_id++) | |
177 | { | |
178 | parquet::ByteArrayWriter* ba_writer = | |
179 | static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id)); | |
180 | ||
181 | int16_t definition_level = 0; | |
182 | ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); | |
183 | ||
184 | buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); | |
185 | } | |
186 | ||
187 | } | |
188 | ||
189 | } // end-for rows | |
190 | ||
191 | // Close the RowGroupWriter | |
192 | rg_writer->Close(); | |
193 | // Close the ParquetFileWriter | |
194 | file_writer->Close(); | |
195 | ||
196 | // Write the bytes to file | |
197 | DCHECK(out_file->Close().ok()); | |
198 | ||
199 | } catch (const std::exception& e) { | |
200 | std::cerr << "Parquet write error: " << e.what() << std::endl; | |
201 | return -1; | |
202 | } | |
203 | ||
204 | return 0; | |
205 | } | |
206 | ||
207 | int run_query_on_parquet_file(const char* input_query, const char* input_file, std::string &result) | |
208 | { | |
209 | int status; | |
210 | s3select s3select_syntax; | |
211 | result.clear(); | |
212 | ||
213 | status = s3select_syntax.parse_query(input_query); | |
214 | if (status != 0) | |
215 | { | |
216 | std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; | |
217 | return -1; | |
218 | } | |
219 | ||
220 | FILE *fp=nullptr; | |
221 | ||
222 | fp=fopen(input_file,"r"); | |
223 | ||
224 | if(!fp){ | |
225 | std::cout << "can not open " << input_file << std::endl; | |
226 | return -1; | |
227 | } | |
228 | ||
229 | std::function<int(void)> fp_get_size=[&]() | |
230 | { | |
231 | struct stat l_buf; | |
232 | lstat(input_file,&l_buf); | |
233 | return l_buf.st_size; | |
234 | }; | |
235 | ||
236 | std::function<size_t(int64_t,int64_t,void*,optional_yield*)> fp_range_req=[&](int64_t start,int64_t length,void *buff,optional_yield*y) | |
237 | { | |
238 | fseek(fp,start,SEEK_SET); | |
239 | size_t read_sz = fread(buff, 1, length, fp); | |
240 | return read_sz; | |
241 | }; | |
242 | ||
243 | rgw_s3select_api rgw; | |
244 | rgw.set_get_size_api(fp_get_size); | |
245 | rgw.set_range_req_api(fp_range_req); | |
246 | ||
247 | std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){return 0;};//append | |
248 | std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){return 0;};//append | |
249 | ||
250 | parquet_object parquet_processor(input_file,&s3select_syntax,&rgw); | |
251 | ||
252 | //std::string result; | |
253 | ||
254 | do | |
255 | { | |
256 | try | |
257 | { | |
258 | status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format); | |
259 | } | |
260 | catch (base_s3select_exception &e) | |
261 | { | |
262 | if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution | |
263 | { | |
264 | if(fp){ | |
265 | fclose(fp); | |
266 | } | |
267 | return -1; | |
268 | } | |
269 | } | |
270 | ||
271 | if (status < 0) | |
272 | break; | |
273 | ||
274 | } while (0); | |
275 | ||
276 | if(fp){ | |
277 | fclose(fp); | |
278 | } | |
279 | return 0; | |
280 | }// ============================================================ // | |
281 | #else | |
282 | int run_query_on_parquet_file(const char* input_query, const char* input_file, std::string &result) | |
283 | { | |
284 | return 0; | |
285 | } | |
286 | #endif //_ARROW_EXIST | |
287 | ||
288 | std::string convert_to_json(const char* csv_stream, size_t stream_length) | |
289 | { | |
290 | char* m_stream; | |
291 | char* m_end_stream; | |
292 | char row_delimiter('\n'); | |
293 | char column_delimiter(','); | |
294 | bool previous{true}; | |
295 | ||
296 | m_stream = (char*)csv_stream; | |
297 | m_end_stream = (char*)csv_stream + stream_length; | |
298 | std::stringstream ss; | |
299 | ss << std::endl; | |
300 | ss << "{\"root\" : ["; | |
301 | ss << std::endl; | |
302 | while (m_stream < m_end_stream) { | |
303 | int counter{}; | |
304 | ss << "{"; | |
305 | while( *m_stream && (*m_stream != row_delimiter) ) { | |
306 | if (*m_stream != column_delimiter && previous) { | |
307 | ss << "\"c" << ++counter << "\"" << ":"; | |
308 | ss << "\""; | |
309 | ss << *m_stream; | |
310 | previous = false; | |
311 | } else if (*m_stream != column_delimiter) { | |
312 | ss << *m_stream; | |
313 | } else if (*m_stream == column_delimiter) { | |
314 | if (previous) { | |
315 | ss << "\"c" << ++counter << "\"" << ":"; | |
316 | ss << "null"; | |
317 | } else { | |
318 | ss << "\""; | |
319 | } | |
320 | ss << ","; | |
321 | previous = true; | |
322 | } | |
323 | m_stream++; | |
324 | } | |
325 | if(previous) { | |
326 | ss.seekp(-1, std::ios_base::end); | |
327 | } else { | |
328 | ss << "\""; | |
329 | } | |
330 | previous = true; | |
331 | ss << "}" << ',' << std::endl; | |
332 | m_stream++; | |
333 | } | |
334 | ss.seekp(-2, std::ios_base::end); | |
335 | ss << std::endl; | |
336 | ss << "]" << "}"; | |
337 | return ss.str(); | |
338 | } | |
339 | ||
340 | const char* convert_query(std::string& expression) | |
341 | { | |
342 | std::string from_clause = "s3object"; | |
343 | boost::replace_all(expression, from_clause, "s3object[*].root"); | |
344 | ||
345 | std::string from_clause_1 = "stdin"; | |
346 | boost::replace_all(expression, from_clause_1, "s3object[*].root"); | |
347 | ||
348 | std::string col_1 = "_1"; | |
349 | boost::replace_all(expression, col_1, "_1.c1"); | |
350 | ||
351 | std::string col_2 = "_2"; | |
352 | boost::replace_all(expression, col_2, "_1.c2"); | |
353 | ||
354 | std::string col_3 = "_3"; | |
355 | boost::replace_all(expression, col_3, "_1.c3"); | |
356 | ||
357 | std::string col_4 = "_4"; | |
358 | boost::replace_all(expression, col_4, "_1.c4"); | |
359 | ||
360 | std::string col_5 = "_5"; | |
361 | boost::replace_all(expression, col_5, "_1.c5"); | |
362 | ||
363 | std::string col_9 = "_9"; | |
364 | boost::replace_all(expression, col_9, "_1.c9"); | |
365 | ||
366 | return expression.c_str(); | |
367 | } | |
368 | ||
369 | ||
370 | std::string run_expression_in_C_prog(const char* expression) | |
371 | { | |
372 | //purpose: per use-case a c-file is generated, compiles , and finally executed. | |
373 | ||
374 | // side note: its possible to do the following: cat test_hello.c | gcc -pipe -x c - -o /dev/stdout > ./1 | |
375 | // gcc can read and write from/to pipe (use pipe2()) i.e. not using file-system , BUT should also run gcc-output from memory | |
376 | ||
377 | const int C_FILE_SIZE=(1024*1024); | |
378 | std::string c_test_file = std::string("/tmp/test_s3.c"); | |
379 | std::string c_run_file = std::string("/tmp/s3test"); | |
380 | ||
381 | FILE* fp_c_file = fopen(c_test_file.c_str(), "w"); | |
382 | ||
383 | //contain return result | |
384 | char result_buff[100]; | |
385 | ||
386 | char* prog_c = 0; | |
387 | ||
388 | if(fp_c_file) | |
389 | { | |
390 | prog_c = (char*)malloc(C_FILE_SIZE); | |
391 | ||
392 | size_t sz=sprintf(prog_c,"#include <stdio.h>\n \ | |
393 | #include <float.h>\n \ | |
394 | int main() \ | |
395 | {\ | |
396 | printf(\"%%.*e\\n\",DECIMAL_DIG,(double)(%s));\ | |
397 | } ", expression); | |
398 | ||
399 | fwrite(prog_c, 1, sz, fp_c_file); | |
400 | fclose(fp_c_file); | |
401 | } | |
402 | ||
403 | std::string gcc_and_run_cmd = std::string("gcc ") + c_test_file + " -o " + c_run_file + " -Wall && " + c_run_file; | |
404 | ||
405 | FILE* fp_build = popen(gcc_and_run_cmd.c_str(), "r"); //TODO read stderr from pipe | |
406 | ||
407 | if(!fp_build) | |
408 | { | |
409 | if(prog_c) | |
410 | free(prog_c); | |
411 | ||
412 | return std::string("#ERROR#"); | |
413 | } | |
414 | ||
415 | char * res = fgets(result_buff, sizeof(result_buff), fp_build); | |
416 | ||
417 | if(!res) | |
418 | { | |
419 | if(prog_c) | |
420 | free(prog_c); | |
421 | ||
422 | fclose(fp_build); | |
423 | return std::string("#ERROR#"); | |
424 | } | |
425 | ||
426 | unlink(c_run_file.c_str()); | |
427 | unlink(c_test_file.c_str()); | |
428 | fclose(fp_build); | |
429 | ||
430 | if(prog_c) | |
431 | free(prog_c); | |
432 | ||
433 | return std::string(result_buff); | |
434 | } | |
435 | ||
436 | #define OPER oper[ rand() % oper.size() ] | |
437 | ||
438 | class gen_expr | |
439 | { | |
440 | ||
441 | private: | |
442 | ||
443 | int open = 0; | |
444 | std::string oper= {"+-+*/*"}; | |
445 | ||
446 | std::string gexpr() | |
447 | { | |
448 | return std::to_string(rand() % 1000) + ".0" + OPER + std::to_string(rand() % 1000) + ".0"; | |
449 | } | |
450 | ||
451 | std::string g_openp() | |
452 | { | |
453 | if ((rand() % 3) == 0) | |
454 | { | |
455 | open++; | |
456 | return std::string("("); | |
457 | } | |
458 | return std::string(""); | |
459 | } | |
460 | ||
461 | std::string g_closep() | |
462 | { | |
463 | if ((rand() % 2) == 0 && open > 0) | |
464 | { | |
465 | open--; | |
466 | return std::string(")"); | |
467 | } | |
468 | return std::string(""); | |
469 | } | |
470 | ||
471 | public: | |
472 | ||
473 | std::string generate() | |
474 | { | |
475 | std::string exp = ""; | |
476 | open = 0; | |
477 | ||
478 | for (int i = 0; i < 10; i++) | |
479 | { | |
480 | exp = (exp.size() > 0 ? exp + OPER : std::string("")) + g_openp() + gexpr() + OPER + gexpr() + g_closep(); | |
481 | } | |
482 | ||
483 | if (open) | |
484 | for (; open--;) | |
485 | { | |
486 | exp += ")"; | |
487 | } | |
488 | ||
489 | return exp; | |
490 | } | |
491 | }; | |
492 | ||
493 | const std::string failure_sign("#failure#"); | |
494 | ||
495 | std::string string_to_quot(std::string& s, char quot = '"') | |
496 | { | |
497 | std::string result = ""; | |
498 | std::stringstream str_strm; | |
499 | str_strm << s; | |
500 | std::string temp_str; | |
501 | int temp_int; | |
502 | while(!str_strm.eof()) { | |
503 | str_strm >> temp_str; | |
504 | if(std::stringstream(temp_str) >> temp_int) { | |
505 | std::stringstream s1; | |
506 | s1 << temp_int; | |
507 | result += quot + s1.str() + quot + "\n"; | |
508 | } | |
509 | temp_str = ""; | |
510 | } | |
511 | return result; | |
512 | } | |
513 | ||
514 | void parquet_csv_report_error(std::string parquet_result, std::string csv_result) | |
515 | { | |
516 | #ifdef _ARROW_EXIST | |
517 | ASSERT_EQ(parquet_result,csv_result); | |
518 | #else | |
519 | ASSERT_EQ(0,0); | |
520 | #endif | |
521 | } | |
522 | ||
523 | void json_csv_report_error(std::string json_result, std::string csv_result) | |
524 | { | |
525 | ASSERT_EQ(json_result, csv_result); | |
526 | } | |
527 | ||
528 | std::string run_s3select(std::string expression) | |
529 | {//purpose: run query on single row and return result(single projections). | |
530 | s3select s3select_syntax; | |
531 | ||
532 | int status = s3select_syntax.parse_query(expression.c_str()); | |
533 | ||
534 | if(status) | |
535 | return failure_sign; | |
536 | ||
537 | std::string s3select_result; | |
538 | s3selectEngine::csv_object s3_csv_object(&s3select_syntax); | |
539 | std::string in = "1,1,1,1\n"; | |
540 | std::string csv_obj = in; | |
541 | std::string parquet_result; | |
542 | ||
543 | s3_csv_object.run_s3select_on_object(s3select_result, in.c_str(), in.size(), false, false, true); | |
544 | ||
545 | s3select_result = s3select_result.substr(0, s3select_result.find_first_of(",")); | |
546 | s3select_result = s3select_result.substr(0, s3select_result.find_first_of("\n"));//remove last \n | |
547 | ||
548 | #ifdef _ARROW_EXIST | |
549 | csv_to_parquet(csv_obj); | |
550 | run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result); | |
551 | parquet_result = parquet_result.substr(0, parquet_result.find_first_of(",")); | |
552 | parquet_result = parquet_result.substr(0, parquet_result.find_first_of("\n"));//remove last \n | |
553 | ||
554 | parquet_csv_report_error(parquet_result,s3select_result); | |
555 | #endif | |
556 | ||
557 | return s3select_result; | |
558 | } | |
559 | ||
560 | void run_s3select_test_opserialization(std::string expression,std::string input, char *row_delimiter, char *column_delimiter) | |
561 | {//purpose: run query on multiple rows and return result(multiple projections). | |
562 | s3select s3select_syntax; | |
563 | ||
564 | int status = s3select_syntax.parse_query(expression.c_str()); | |
565 | ||
566 | if(status) | |
567 | return; | |
568 | ||
569 | std::string s3select_result; | |
570 | csv_object::csv_defintions csv; | |
571 | csv.redundant_column = false; | |
572 | ||
573 | csv.output_row_delimiter = *row_delimiter; | |
574 | csv.output_column_delimiter = *column_delimiter; | |
575 | ||
576 | s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); | |
577 | ||
578 | s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); | |
579 | ||
580 | std::string s3select_result1 = s3select_result; | |
581 | ||
582 | csv.row_delimiter = *row_delimiter; | |
583 | csv.column_delimiter = *column_delimiter; | |
584 | csv.output_row_delimiter = *row_delimiter; | |
585 | csv.output_column_delimiter = *column_delimiter; | |
586 | csv.redundant_column = false; | |
587 | std::string s3select_result_second_phase; | |
588 | ||
589 | s3selectEngine::csv_object s3_csv_object_second(&s3select_syntax, csv); | |
590 | ||
591 | s3_csv_object_second.run_s3select_on_object(s3select_result_second_phase, s3select_result.c_str(), s3select_result.size(), false, false, true); | |
592 | ||
593 | ASSERT_EQ(s3select_result_second_phase, s3select_result1); | |
594 | } | |
595 | ||
596 | std::string run_s3select_opserialization_quot(std::string expression,std::string input, bool quot_always = false, char quot_char = '"') | |
597 | {//purpose: run query on multiple rows and return result(multiple projections). | |
598 | s3select s3select_syntax; | |
599 | ||
600 | int status = s3select_syntax.parse_query(expression.c_str()); | |
601 | ||
602 | if(status) | |
603 | return failure_sign; | |
604 | ||
605 | std::string s3select_result; | |
606 | csv_object::csv_defintions csv; | |
607 | ||
608 | csv.redundant_column = false; | |
609 | csv.quote_fields_always = quot_always; | |
610 | csv.output_quot_char = quot_char; | |
611 | ||
612 | s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); | |
613 | ||
614 | s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); | |
615 | ||
616 | return s3select_result; | |
617 | } | |
618 | ||
619 | // JSON tests API's | |
620 | int run_json_query(const char* json_query, std::string& json_input,std::string& result) | |
621 | {//purpose: run single-chunk json queries | |
622 | ||
623 | s3select s3select_syntax; | |
624 | int status = s3select_syntax.parse_query(json_query); | |
625 | if (status != 0) | |
626 | { | |
627 | std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl; | |
628 | return -1; | |
629 | } | |
630 | ||
631 | json_object json_query_processor(&s3select_syntax); | |
632 | result.clear(); | |
633 | status = json_query_processor.run_s3select_on_stream(result, json_input.data(), json_input.size(), json_input.size()); | |
634 | std::string prev_result = result; | |
635 | result.clear(); | |
636 | status = json_query_processor.run_s3select_on_stream(result, 0, 0, json_input.size()); | |
637 | ||
638 | result = prev_result + result; | |
639 | ||
640 | return status; | |
641 | } | |
642 | ||
643 | std::string run_s3select(std::string expression,std::string input, const char* json_query = "") | |
644 | {//purpose: run query on multiple rows and return result(multiple projections). | |
645 | s3select s3select_syntax; | |
646 | std::string parquet_input = input; | |
647 | ||
648 | std::string js = convert_to_json(input.c_str(), input.size()); | |
649 | ||
650 | int status = s3select_syntax.parse_query(expression.c_str()); | |
651 | ||
652 | if(status) | |
653 | return failure_sign; | |
654 | ||
655 | std::string s3select_result; | |
656 | std::string json_result; | |
657 | s3selectEngine::csv_object s3_csv_object(&s3select_syntax); | |
658 | s3_csv_object.m_csv_defintion.redundant_column = false; | |
659 | ||
660 | s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); | |
661 | ||
662 | #ifdef _ARROW_EXIST | |
663 | static int file_no = 1; | |
664 | csv_to_parquet(parquet_input); | |
665 | std::string parquet_result; | |
666 | run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result); | |
667 | ||
668 | if (strcmp(parquet_result.c_str(),s3select_result.c_str())) | |
669 | { | |
670 | std::cout << "failed on query " << expression << std::endl; | |
671 | std::cout << "input for query reside on" << "./failed_test_input" << std::to_string(file_no) << ".[csv|parquet]" << std::endl; | |
672 | ||
673 | { | |
674 | std::string buffer; | |
675 | ||
676 | std::ifstream f(PARQUET_FILENAME); | |
677 | f.seekg(0, std::ios::end); | |
678 | buffer.resize(f.tellg()); | |
679 | f.seekg(0); | |
680 | f.read(buffer.data(), buffer.size()); | |
681 | ||
682 | std::string fn = std::string("./failed_test_input_") + std::to_string(file_no) + std::string(".parquet"); | |
683 | std::ofstream fw(fn.c_str()); | |
684 | fw.write(buffer.data(), buffer.size()); | |
685 | ||
686 | fn = std::string("./failed_test_input_") + std::to_string(file_no++) + std::string(".csv"); | |
687 | std::ofstream fw2(fn.c_str()); | |
688 | fw2.write(parquet_input.data(), parquet_input.size()); | |
689 | ||
690 | } | |
691 | } | |
692 | ||
693 | parquet_csv_report_error(parquet_result,s3select_result); | |
694 | #endif //_ARROW_EXIST | |
695 | ||
696 | if(strlen(json_query) == 0) { | |
697 | json_query = convert_query(expression); | |
698 | } | |
699 | ||
700 | if(strcmp(json_query,JSON_NO_RUN)) { | |
701 | run_json_query(json_query, js, json_result); | |
702 | json_csv_report_error(json_result, s3select_result); | |
703 | } | |
704 | ||
705 | return s3select_result; | |
706 | } | |
707 | ||
708 | ||
709 |