]> git.proxmox.com Git - ceph.git/blob - ceph/src/s3select/example/csv_to_parquet.cpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / s3select / example / csv_to_parquet.cpp
1
2 // Licensed to the Apache Software Foundation (ASF) under one
3 // or more contributor license agreements. See the NOTICE file
4 // distributed with this work for additional information
5 // regarding copyright ownership. The ASF licenses this file
6 // to you under the Apache License, Version 2.0 (the
7 // "License"); you may not use this file except in compliance
8 // with the License. You may obtain a copy of the License at
9 //
10 // http://www.apache.org/licenses/LICENSE-2.0
11 //
12 // Unless required by applicable law or agreed to in writing,
13 // software distributed under the License is distributed on an
14 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 // KIND, either express or implied. See the License for the
16 // specific language governing permissions and limitations
17 // under the License.
18
19 #include <cassert>
20 #include <fstream>
21 #include <iostream>
22 #include <memory>
23 #include <iomanip>
24 #include <algorithm>
25 #include "boost/date_time/gregorian/gregorian.hpp"
26 #include "boost/date_time/posix_time/posix_time.hpp"
27 #include <stdio.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <unistd.h>
31
32
33 #include <arrow/io/file.h>
34 #include <arrow/util/logging.h>
35
36 #include <parquet/api/reader.h>
37 #include <parquet/api/writer.h>
38
39 using parquet::ConvertedType;
40 using parquet::Repetition;
41 using parquet::Type;
42 using parquet::schema::GroupNode;
43 using parquet::schema::PrimitiveNode;
44
45 /*
46 * This example describes writing and reading Parquet Files in C++ and serves as a
47 * reference to the API.
48 * The file contains all the physical data types supported by Parquet.
49 * This example uses the RowGroupWriter API that supports writing RowGroups based on a
50 *certain size
51 **/
52
53 /* Parquet is a structured columnar file format
54 * Parquet File = "Parquet data" + "Parquet Metadata"
55 * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
56 * columnar layout
57 * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
58 * Columns
59 * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
60 * complex (nested) type (internal nodes)
61 * For specific details, please refer the format here:
62 * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
63 **/
64
65 #include <string>
66 #include <boost/tokenizer.hpp>
67 using namespace boost;
68 using namespace std;
69
70 //constexpr int NUM_ROWS = 10000000;
71 constexpr int NUM_ROWS = 10000;
72
73 //constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB
74 constexpr int64_t ROW_GROUP_SIZE = 1024 * 1024;
75
76 const char PARQUET_FILENAME[] = "csv_converted.parquet";
77
78 static std::shared_ptr<GroupNode> column_string_2(uint32_t num_of_columns) {
79
80 parquet::schema::NodeVector fields;
81
82 for(uint32_t i=0;i<num_of_columns;i++)
83 {
84 std::string column_name = "column_" + to_string(i) ;
85 fields.push_back(PrimitiveNode::Make(column_name, Repetition::OPTIONAL, Type::BYTE_ARRAY,
86 ConvertedType::NONE));
87 }
88
89 return std::static_pointer_cast<GroupNode>(
90 GroupNode::Make("schema", Repetition::REQUIRED, fields));
91 }
92
93
94 class tokenize {
95
96 public:
97 const char *s;
98 std::string input;
99 const char *p;
100 bool last_token;
101
102 tokenize(std::string& in):s(0),input(in),p(input.c_str()),last_token(false)
103 {
104 };
105
106 void get_token(std::string& token)
107 {
108 if(!*p)
109 {
110 token = "";
111 last_token = true;
112 return;
113 }
114
115
116 s=p;
117 while(*p && *p != ',' && *p != '\n') p++;
118
119 token = std::string(s,p);
120 p++;
121 }
122
123 bool is_last()
124 {
125 return last_token == true;
126 }
127 };
128
129 void generate_rand_columns_csv_datetime(std::string& out, size_t size) {
130 std::stringstream ss;
131 auto year = [](){return rand()%100 + 1900;};
132 auto month = [](){return 1 + rand()%12;};
133 auto day = [](){return 1 + rand()%28;};
134 auto hours = [](){return rand()%24;};
135 auto minutes = [](){return rand()%60;};
136 auto seconds = [](){return rand()%60;};
137
138 for (auto i = 0U; i < size; ++i) {
139 ss << year() << "-" << std::setw(2) << std::setfill('0')<< month() << "-" << std::setw(2) << std::setfill('0')<< day() << "T" <<std::setw(2) << std::setfill('0')<< hours() << ":" << std::setw(2) << std::setfill('0')<< minutes() << ":" << std::setw(2) << std::setfill('0')<<seconds() << "Z" << "," << std::endl;
140 }
141 out = ss.str();
142 }
143
144 void generate_columns_csv(std::string& out, size_t size) {
145 std::stringstream ss;
146
147 for (auto i = 0U; i < size; ++i) {
148 ss << i << "," << i+1 << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << "," << i << std::endl;
149 }
150 out = ss.str();
151 }
152
153 void generate_rand_columns_csv_with_null(std::string& out, size_t size) {
154 std::stringstream ss;
155 auto r = [](){ int x=rand()%1000;if (x<100) return std::string(""); else return std::to_string(x);};
156
157 for (auto i = 0U; i < size; ++i) {
158 ss << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << std::endl;
159 }
160 out = ss.str();
161 }
162
163 void generate_fix_columns_csv(std::string& out, size_t size) {
164 std::stringstream ss;
165 for (auto i = 0U; i < size; ++i) {
166 ss << 1 << "," << 2 << "," << 3 << "," << 4 << "," << 5 << std::endl;
167 }
168 out = ss.str();
169 }
170
171 void generate_rand_csv_datetime_to_string(std::string& out, std::string& result, size_t size, bool const_frmt = true) {
172
173 std::stringstream ss_out, ss_res;
174 std::string format = "yyyysMMMMMdddSSSSSSSSSSSMMMM HHa:m -:-";
175 std::string months[12] = {"January", "February", "March","April", "May", "June", "July", "August", "September", "October", "November", "December"};
176 auto year = [](){return rand()%100 + 1900;};
177 auto month = [](){return 1 + rand()%12;};
178 auto day = [](){return 1 + rand()%28;};
179 auto hours = [](){return rand()%24;};
180 auto minutes = [](){return rand()%60;};
181 auto seconds = [](){return rand()%60;};
182 auto fracation_sec = [](){return rand()%1000000;};
183
184 for (auto i = 0U; i < size; ++i)
185 {
186 auto yr = year();
187 auto mnth = month();
188 auto dy = day();
189 auto hr = hours();
190 auto mint = minutes();
191 auto sec = seconds();
192 auto frac_sec = fracation_sec();
193
194 if (const_frmt)
195 {
196 ss_out << yr << "-" << std::setw(2) << std::setfill('0') << mnth << "-" << std::setw(2) << std::setfill('0') << dy << "T" <<std::setw(2) << std::setfill('0') << hr << ":" << std::setw(2) << std::setfill('0') << mint << ":" << std::setw(2) << std::setfill('0') <<sec << "." << frac_sec << "Z" << "," << std::endl;
197
198 ss_res << yr << sec << months[mnth-1].substr(0, 1) << std::setw(2) << std::setfill('0') << dy << dy << frac_sec << std::string(11 - std::to_string(frac_sec).length(), '0') << months[mnth-1] << " " << std::setw(2) << std::setfill('0') << hr << (hr < 12 ? "AM" : "PM") << ":" << mint << " -:-" << "," << std::endl;
199 }
200 else
201 {
202 switch(rand()%5)
203 {
204 case 0:
205 format = "yyyysMMMMMdddSSSSSSSSSSSMMMM HHa:m -:-";
206 ss_res << yr << sec << months[mnth-1].substr(0, 1) << std::setw(2) << std::setfill('0') << dy << dy << frac_sec << std::string(11 - std::to_string(frac_sec).length(), '0') << months[mnth-1] << " " << std::setw(2) << std::setfill('0') << hr << (hr < 12 ? "AM" : "PM") << ":" << mint << " -:-" << "," << std::endl;
207 break;
208 case 1:
209 format = "aMMhh";
210 ss_res << (hr < 12 ? "AM" : "PM") << std::setw(2) << std::setfill('0') << mnth << std::setw(2) << std::setfill('0') << (hr%12 == 0 ? 12 : hr%12) << "," << std::endl;
211 break;
212 case 2:
213 format = "y M d ABCDEF";
214 ss_res << yr << " " << mnth << " " << dy << " ABCDEF" << "," << std::endl;
215 break;
216 case 3:
217 format = "W h:MMMM";
218 ss_res << "W " << (hr%12 == 0 ? 12 : hr%12) << ":" << months[mnth-1] << "," << std::endl;
219 break;
220 case 4:
221 format = "H:m:s";
222 ss_res << hr << ":" << mint << ":" << sec << "," << std::endl;
223 break;
224 }
225
226 ss_out << yr << "-" << std::setw(2) << std::setfill('0') << mnth << "-" << std::setw(2) << std::setfill('0') << dy << "T" <<std::setw(2) << std::setfill('0') << hr << ":" << std::setw(2) << std::setfill('0') << mint << ":" << std::setw(2) << std::setfill('0') <<sec << "." << frac_sec << "Z" << "," << format << "," << std::endl;
227 }
228 }
229 out = ss_out.str();
230 result = ss_res.str();
231 }
232 void generate_rand_columns_csv(std::string& out, size_t size) {
233 std::stringstream ss;
234 auto r = [](){return rand()%1000;};
235
236 for (auto i = 0U; i < size; ++i) {
237 ss << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << std::endl;
238 }
239 out = ss.str();
240 }
241
242 int csv_to_parquet(std::string & csv_object)
243 {
244
245 auto csv_num_of_columns = std::count( csv_object.begin(),csv_object.begin() + csv_object.find('\n'),',')+1;
246 auto csv_num_of_rows = std::count(csv_object.begin(),csv_object.end(),'\n');
247
248 tokenize csv_tokens(csv_object);
249
250 try {
251 // Create a local file output stream instance.
252
253 using FileClass = ::arrow::io::FileOutputStream;
254 std::shared_ptr<FileClass> out_file;
255 PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME));
256
257 // Setup the parquet schema
258 std::shared_ptr<GroupNode> schema = column_string_2(csv_num_of_columns);
259
260 // Add writer properties
261 parquet::WriterProperties::Builder builder;
262 // builder.compression(parquet::Compression::SNAPPY);
263 std::shared_ptr<parquet::WriterProperties> props = builder.build();
264
265 // Create a ParquetFileWriter instance
266 std::shared_ptr<parquet::ParquetFileWriter> file_writer =
267 parquet::ParquetFileWriter::Open(out_file, schema, props);
268
269 // Append a BufferedRowGroup to keep the RowGroup open until a certain size
270 parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
271
272 int num_columns = file_writer->num_columns();
273 std::vector<int64_t> buffered_values_estimate(num_columns, 0);
274
275 for (int i = 0; !csv_tokens.is_last() && i<csv_num_of_rows; i++) {
276 int64_t estimated_bytes = 0;
277 // Get the estimated size of the values that are not written to a page yet
278 for (int n = 0; n < num_columns; n++) {
279 estimated_bytes += buffered_values_estimate[n];
280 }
281
282 // We need to consider the compressed pages
283 // as well as the values that are not compressed yet
284 if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() +
285 estimated_bytes) > ROW_GROUP_SIZE) {
286 rg_writer->Close();
287 std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0);
288 rg_writer = file_writer->AppendBufferedRowGroup();
289 }
290
291 int col_id;
292 for(col_id=0;col_id<num_columns && !csv_tokens.is_last();col_id++)
293 {
294 // Write the byte-array column
295 parquet::ByteArrayWriter* ba_writer =
296 static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
297 parquet::ByteArray ba_value;
298
299 std::string token;
300 csv_tokens.get_token(token);
301 if(token.size() == 0)
302 {//null column
303 int16_t definition_level = 0;
304 ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
305 }
306 else
307 {
308 int16_t definition_level = 1;
309 ba_value.ptr = (uint8_t*)(token.data());
310 ba_value.len = token.size();
311 ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
312 }
313
314 buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
315
316
317 } //end-for columns
318
319 if(csv_tokens.is_last() && col_id<num_columns)
320 {
321 for(;col_id<num_columns;col_id++)
322 {
323 parquet::ByteArrayWriter* ba_writer =
324 static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
325
326 int16_t definition_level = 0;
327 ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
328
329 buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
330 }
331
332 }
333
334 } // end-for rows
335
336 // Close the RowGroupWriter
337 rg_writer->Close();
338 // Close the ParquetFileWriter
339 file_writer->Close();
340
341 // Write the bytes to file
342 DCHECK(out_file->Close().ok());
343
344 } catch (const std::exception& e) {
345 std::cerr << "Parquet write error: " << e.what() << std::endl;
346 return -1;
347 }
348
349 return 0;
350 }
351
352
353 static int csv_file_to_parquet(int argc,char **argv)
354 {
355 //open file (CSV) and load into std::string, convert to parquet(save to FS)
356
357 if (argc<2) exit(-1);
358
359 FILE* fp;
360 struct stat l_buf;
361 int st = lstat(argv[1], &l_buf);
362 if(st<0) exit(-1);
363
364 printf("input csv file size = %ld\n",l_buf.st_size);
365
366 char * buffer = new char[ l_buf.st_size ];
367 fp = fopen(argv[1],"r");
368
369 if(!fp) exit(-1);
370
371 size_t read_sz = fread(buffer, 1, l_buf.st_size,fp);
372
373 std::string csv_obj;
374 csv_obj.append(buffer,read_sz);
375
376 csv_to_parquet(csv_obj);
377
378 return 0;
379 }
380
381 int csv_object_to_parquet(int argc,char **argv)
382 {
383 srand(time(0));
384
385 std::string csv_obj;
386 std::string expected_result;
387 generate_rand_columns_csv(csv_obj, 128);
388 //generate_rand_csv_datetime_to_string(csv_obj, expected_result, 10000);
389 //generate_rand_columns_csv_with_null(csv_obj, 10000);
390 //generate_columns_csv(csv_obj,128);
391 //generate_rand_columns_csv_datetime(csv_obj,10000);
392 generate_fix_columns_csv(csv_obj,128);
393 FILE *fp = fopen("10k.csv","w");
394
395 if(fp)
396 {
397 fwrite(csv_obj.data(),csv_obj.size(),1,fp);
398 fclose(fp);
399 }
400 else
401 {
402 exit(-1);
403 }
404
405 //csv_obj="1,2,3,4,5,6,7,8,9,10\n10,20,30,40,50,60,70,80,90,100\n";
406 csv_obj="1,2,3,4\n";
407
408 csv_to_parquet(csv_obj);
409
410 return 0;
411 }
412
413 int main(int argc,char **argv)
414 {
415 return csv_file_to_parquet(argc,argv);
416 }
417