]>
git.proxmox.com Git - ceph.git/blob - ceph/src/s3select/example/csv_to_parquet.cpp
2 // Licensed to the Apache Software Foundation (ASF) under one
3 // or more contributor license agreements. See the NOTICE file
4 // distributed with this work for additional information
5 // regarding copyright ownership. The ASF licenses this file
6 // to you under the Apache License, Version 2.0 (the
7 // "License"); you may not use this file except in compliance
8 // with the License. You may obtain a copy of the License at
10 // http://www.apache.org/licenses/LICENSE-2.0
12 // Unless required by applicable law or agreed to in writing,
13 // software distributed under the License is distributed on an
14 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 // KIND, either express or implied. See the License for the
16 // specific language governing permissions and limitations
25 #include "boost/date_time/gregorian/gregorian.hpp"
26 #include "boost/date_time/posix_time/posix_time.hpp"
28 #include <sys/types.h>
33 #include <arrow/io/file.h>
34 #include <arrow/util/logging.h>
36 #include <parquet/api/reader.h>
37 #include <parquet/api/writer.h>
39 using parquet::ConvertedType
;
40 using parquet::Repetition
;
42 using parquet::schema::GroupNode
;
43 using parquet::schema::PrimitiveNode
;
46 * This example describes writing and reading Parquet Files in C++ and serves as a
47 * reference to the API.
48 * The file contains all the physical data types supported by Parquet.
49 * This example uses the RowGroupWriter API that supports writing RowGroups based on a
53 /* Parquet is a structured columnar file format
54 * Parquet File = "Parquet data" + "Parquet Metadata"
55 * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
57 * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
59 * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
60 * complex (nested) type (internal nodes)
61 * For specific details, please refer the format here:
62 * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
66 #include <boost/tokenizer.hpp>
67 using namespace boost
;
70 //constexpr int NUM_ROWS = 10000000;
71 constexpr int NUM_ROWS
= 10000;
73 //constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB
74 constexpr int64_t ROW_GROUP_SIZE
= 1024 * 1024;
76 const char PARQUET_FILENAME
[] = "csv_converted.parquet";
78 static std::shared_ptr
<GroupNode
> column_string_2(uint32_t num_of_columns
) {
80 parquet::schema::NodeVector fields
;
82 for(uint32_t i
=0;i
<num_of_columns
;i
++)
84 std::string column_name
= "column_" + to_string(i
) ;
85 fields
.push_back(PrimitiveNode::Make(column_name
, Repetition::OPTIONAL
, Type::BYTE_ARRAY
,
86 ConvertedType::NONE
));
89 return std::static_pointer_cast
<GroupNode
>(
90 GroupNode::Make("schema", Repetition::REQUIRED
, fields
));
102 tokenize(std::string
& in
):s(0),input(in
),p(input
.c_str()),last_token(false)
106 void get_token(std::string
& token
)
117 while(*p
&& *p
!= ',' && *p
!= '\n') p
++;
119 token
= std::string(s
,p
);
125 return last_token
== true;
129 void generate_rand_columns_csv_datetime(std::string
& out
, size_t size
) {
130 std::stringstream ss
;
131 auto year
= [](){return rand()%100 + 1900;};
132 auto month
= [](){return 1 + rand()%12;};
133 auto day
= [](){return 1 + rand()%28;};
134 auto hours
= [](){return rand()%24;};
135 auto minutes
= [](){return rand()%60;};
136 auto seconds
= [](){return rand()%60;};
138 for (auto i
= 0U; i
< size
; ++i
) {
139 ss
<< year() << "-" << std::setw(2) << std::setfill('0')<< month() << "-" << std::setw(2) << std::setfill('0')<< day() << "T" <<std::setw(2) << std::setfill('0')<< hours() << ":" << std::setw(2) << std::setfill('0')<< minutes() << ":" << std::setw(2) << std::setfill('0')<<seconds() << "Z" << "," << std::endl
;
144 void generate_columns_csv(std::string
& out
, size_t size
) {
145 std::stringstream ss
;
147 for (auto i
= 0U; i
< size
; ++i
) {
148 ss
<< i
<< "," << i
+1 << "," << i
<< "," << i
<< "," << i
<< "," << i
<< "," << i
<< "," << i
<< "," << i
<< "," << i
<< std::endl
;
153 void generate_rand_columns_csv_with_null(std::string
& out
, size_t size
) {
154 std::stringstream ss
;
155 auto r
= [](){ int x
=rand()%1000;if (x
<100) return std::string(""); else return std::to_string(x
);};
157 for (auto i
= 0U; i
< size
; ++i
) {
158 ss
<< r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << std::endl
;
163 void generate_fix_columns_csv(std::string
& out
, size_t size
) {
164 std::stringstream ss
;
165 for (auto i
= 0U; i
< size
; ++i
) {
166 ss
<< 1 << "," << 2 << "," << 3 << "," << 4 << "," << 5 << std::endl
;
171 void generate_rand_csv_datetime_to_string(std::string
& out
, std::string
& result
, size_t size
, bool const_frmt
= true) {
173 std::stringstream ss_out
, ss_res
;
174 std::string format
= "yyyysMMMMMdddSSSSSSSSSSSMMMM HHa:m -:-";
175 std::string months
[12] = {"January", "February", "March","April", "May", "June", "July", "August", "September", "October", "November", "December"};
176 auto year
= [](){return rand()%100 + 1900;};
177 auto month
= [](){return 1 + rand()%12;};
178 auto day
= [](){return 1 + rand()%28;};
179 auto hours
= [](){return rand()%24;};
180 auto minutes
= [](){return rand()%60;};
181 auto seconds
= [](){return rand()%60;};
182 auto fracation_sec
= [](){return rand()%1000000;};
184 for (auto i
= 0U; i
< size
; ++i
)
190 auto mint
= minutes();
191 auto sec
= seconds();
192 auto frac_sec
= fracation_sec();
196 ss_out
<< yr
<< "-" << std::setw(2) << std::setfill('0') << mnth
<< "-" << std::setw(2) << std::setfill('0') << dy
<< "T" <<std::setw(2) << std::setfill('0') << hr
<< ":" << std::setw(2) << std::setfill('0') << mint
<< ":" << std::setw(2) << std::setfill('0') <<sec
<< "." << frac_sec
<< "Z" << "," << std::endl
;
198 ss_res
<< yr
<< sec
<< months
[mnth
-1].substr(0, 1) << std::setw(2) << std::setfill('0') << dy
<< dy
<< frac_sec
<< std::string(11 - std::to_string(frac_sec
).length(), '0') << months
[mnth
-1] << " " << std::setw(2) << std::setfill('0') << hr
<< (hr
< 12 ? "AM" : "PM") << ":" << mint
<< " -:-" << "," << std::endl
;
205 format
= "yyyysMMMMMdddSSSSSSSSSSSMMMM HHa:m -:-";
206 ss_res
<< yr
<< sec
<< months
[mnth
-1].substr(0, 1) << std::setw(2) << std::setfill('0') << dy
<< dy
<< frac_sec
<< std::string(11 - std::to_string(frac_sec
).length(), '0') << months
[mnth
-1] << " " << std::setw(2) << std::setfill('0') << hr
<< (hr
< 12 ? "AM" : "PM") << ":" << mint
<< " -:-" << "," << std::endl
;
210 ss_res
<< (hr
< 12 ? "AM" : "PM") << std::setw(2) << std::setfill('0') << mnth
<< std::setw(2) << std::setfill('0') << (hr
%12 == 0 ? 12 : hr
%12) << "," << std::endl
;
213 format
= "y M d ABCDEF";
214 ss_res
<< yr
<< " " << mnth
<< " " << dy
<< " ABCDEF" << "," << std::endl
;
218 ss_res
<< "W " << (hr
%12 == 0 ? 12 : hr
%12) << ":" << months
[mnth
-1] << "," << std::endl
;
222 ss_res
<< hr
<< ":" << mint
<< ":" << sec
<< "," << std::endl
;
226 ss_out
<< yr
<< "-" << std::setw(2) << std::setfill('0') << mnth
<< "-" << std::setw(2) << std::setfill('0') << dy
<< "T" <<std::setw(2) << std::setfill('0') << hr
<< ":" << std::setw(2) << std::setfill('0') << mint
<< ":" << std::setw(2) << std::setfill('0') <<sec
<< "." << frac_sec
<< "Z" << "," << format
<< "," << std::endl
;
230 result
= ss_res
.str();
232 void generate_rand_columns_csv(std::string
& out
, size_t size
) {
233 std::stringstream ss
;
234 auto r
= [](){return rand()%1000;};
236 for (auto i
= 0U; i
< size
; ++i
) {
237 ss
<< r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << "," << r() << std::endl
;
242 int csv_to_parquet(std::string
& csv_object
)
245 auto csv_num_of_columns
= std::count( csv_object
.begin(),csv_object
.begin() + csv_object
.find('\n'),',')+1;
246 auto csv_num_of_rows
= std::count(csv_object
.begin(),csv_object
.end(),'\n');
248 tokenize
csv_tokens(csv_object
);
251 // Create a local file output stream instance.
253 using FileClass
= ::arrow::io::FileOutputStream
;
254 std::shared_ptr
<FileClass
> out_file
;
255 PARQUET_ASSIGN_OR_THROW(out_file
, FileClass::Open(PARQUET_FILENAME
));
257 // Setup the parquet schema
258 std::shared_ptr
<GroupNode
> schema
= column_string_2(csv_num_of_columns
);
260 // Add writer properties
261 parquet::WriterProperties::Builder builder
;
262 // builder.compression(parquet::Compression::SNAPPY);
263 std::shared_ptr
<parquet::WriterProperties
> props
= builder
.build();
265 // Create a ParquetFileWriter instance
266 std::shared_ptr
<parquet::ParquetFileWriter
> file_writer
=
267 parquet::ParquetFileWriter::Open(out_file
, schema
, props
);
269 // Append a BufferedRowGroup to keep the RowGroup open until a certain size
270 parquet::RowGroupWriter
* rg_writer
= file_writer
->AppendBufferedRowGroup();
272 int num_columns
= file_writer
->num_columns();
273 std::vector
<int64_t> buffered_values_estimate(num_columns
, 0);
275 for (int i
= 0; !csv_tokens
.is_last() && i
<csv_num_of_rows
; i
++) {
276 int64_t estimated_bytes
= 0;
277 // Get the estimated size of the values that are not written to a page yet
278 for (int n
= 0; n
< num_columns
; n
++) {
279 estimated_bytes
+= buffered_values_estimate
[n
];
282 // We need to consider the compressed pages
283 // as well as the values that are not compressed yet
284 if ((rg_writer
->total_bytes_written() + rg_writer
->total_compressed_bytes() +
285 estimated_bytes
) > ROW_GROUP_SIZE
) {
287 std::fill(buffered_values_estimate
.begin(), buffered_values_estimate
.end(), 0);
288 rg_writer
= file_writer
->AppendBufferedRowGroup();
292 for(col_id
=0;col_id
<num_columns
&& !csv_tokens
.is_last();col_id
++)
294 // Write the byte-array column
295 parquet::ByteArrayWriter
* ba_writer
=
296 static_cast<parquet::ByteArrayWriter
*>(rg_writer
->column(col_id
));
297 parquet::ByteArray ba_value
;
300 csv_tokens
.get_token(token
);
301 if(token
.size() == 0)
303 int16_t definition_level
= 0;
304 ba_writer
->WriteBatch(1, &definition_level
, nullptr, nullptr);
308 int16_t definition_level
= 1;
309 ba_value
.ptr
= (uint8_t*)(token
.data());
310 ba_value
.len
= token
.size();
311 ba_writer
->WriteBatch(1, &definition_level
, nullptr, &ba_value
);
314 buffered_values_estimate
[col_id
] = ba_writer
->EstimatedBufferedValueBytes();
319 if(csv_tokens
.is_last() && col_id
<num_columns
)
321 for(;col_id
<num_columns
;col_id
++)
323 parquet::ByteArrayWriter
* ba_writer
=
324 static_cast<parquet::ByteArrayWriter
*>(rg_writer
->column(col_id
));
326 int16_t definition_level
= 0;
327 ba_writer
->WriteBatch(1, &definition_level
, nullptr, nullptr);
329 buffered_values_estimate
[col_id
] = ba_writer
->EstimatedBufferedValueBytes();
336 // Close the RowGroupWriter
338 // Close the ParquetFileWriter
339 file_writer
->Close();
341 // Write the bytes to file
342 DCHECK(out_file
->Close().ok());
344 } catch (const std::exception
& e
) {
345 std::cerr
<< "Parquet write error: " << e
.what() << std::endl
;
353 static int csv_file_to_parquet(int argc
,char **argv
)
355 //open file (CSV) and load into std::string, convert to parquet(save to FS)
357 if (argc
<2) exit(-1);
361 int st
= lstat(argv
[1], &l_buf
);
364 printf("input csv file size = %ld\n",l_buf
.st_size
);
366 char * buffer
= new char[ l_buf
.st_size
];
367 fp
= fopen(argv
[1],"r");
371 size_t read_sz
= fread(buffer
, 1, l_buf
.st_size
,fp
);
374 csv_obj
.append(buffer
,read_sz
);
376 csv_to_parquet(csv_obj
);
381 int csv_object_to_parquet(int argc
,char **argv
)
386 std::string expected_result
;
387 generate_rand_columns_csv(csv_obj
, 128);
388 //generate_rand_csv_datetime_to_string(csv_obj, expected_result, 10000);
389 //generate_rand_columns_csv_with_null(csv_obj, 10000);
390 //generate_columns_csv(csv_obj,128);
391 //generate_rand_columns_csv_datetime(csv_obj,10000);
392 generate_fix_columns_csv(csv_obj
,128);
393 FILE *fp
= fopen("10k.csv","w");
397 fwrite(csv_obj
.data(),csv_obj
.size(),1,fp
);
405 //csv_obj="1,2,3,4,5,6,7,8,9,10\n10,20,30,40,50,60,70,80,90,100\n";
408 csv_to_parquet(csv_obj
);
413 int main(int argc
,char **argv
)
415 return csv_file_to_parquet(argc
,argv
);