]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/pyarrow/_parquet.pyx
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / _parquet.pyx
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 # cython: profile=False
19 # distutils: language = c++
20
21 import io
22 from textwrap import indent
23 import warnings
24
25 import numpy as np
26
27 from cython.operator cimport dereference as deref
28 from pyarrow.includes.common cimport *
29 from pyarrow.includes.libarrow cimport *
30 from pyarrow.lib cimport (_Weakrefable, Buffer, Array, Schema,
31 check_status,
32 MemoryPool, maybe_unbox_memory_pool,
33 Table, NativeFile,
34 pyarrow_wrap_chunked_array,
35 pyarrow_wrap_schema,
36 pyarrow_wrap_table,
37 pyarrow_wrap_buffer,
38 pyarrow_wrap_batch,
39 NativeFile, get_reader, get_writer,
40 string_to_timeunit)
41
42 from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream,
43 _stringify_path, _datetime_from_int,
44 tobytes, frombytes)
45
46 cimport cpython as cp
47
48
49 cdef class Statistics(_Weakrefable):
50 def __cinit__(self):
51 pass
52
53 def __repr__(self):
54 return """{}
55 has_min_max: {}
56 min: {}
57 max: {}
58 null_count: {}
59 distinct_count: {}
60 num_values: {}
61 physical_type: {}
62 logical_type: {}
63 converted_type (legacy): {}""".format(object.__repr__(self),
64 self.has_min_max,
65 self.min,
66 self.max,
67 self.null_count,
68 self.distinct_count,
69 self.num_values,
70 self.physical_type,
71 str(self.logical_type),
72 self.converted_type)
73
74 def to_dict(self):
75 d = dict(
76 has_min_max=self.has_min_max,
77 min=self.min,
78 max=self.max,
79 null_count=self.null_count,
80 distinct_count=self.distinct_count,
81 num_values=self.num_values,
82 physical_type=self.physical_type
83 )
84 return d
85
86 def __eq__(self, other):
87 try:
88 return self.equals(other)
89 except TypeError:
90 return NotImplemented
91
92 def equals(self, Statistics other):
93 return self.statistics.get().Equals(deref(other.statistics.get()))
94
95 @property
96 def has_min_max(self):
97 return self.statistics.get().HasMinMax()
98
99 @property
100 def has_null_count(self):
101 return self.statistics.get().HasNullCount()
102
103 @property
104 def has_distinct_count(self):
105 return self.statistics.get().HasDistinctCount()
106
107 @property
108 def min_raw(self):
109 if self.has_min_max:
110 return _cast_statistic_raw_min(self.statistics.get())
111 else:
112 return None
113
114 @property
115 def max_raw(self):
116 if self.has_min_max:
117 return _cast_statistic_raw_max(self.statistics.get())
118 else:
119 return None
120
121 @property
122 def min(self):
123 if self.has_min_max:
124 return _cast_statistic_min(self.statistics.get())
125 else:
126 return None
127
128 @property
129 def max(self):
130 if self.has_min_max:
131 return _cast_statistic_max(self.statistics.get())
132 else:
133 return None
134
135 @property
136 def null_count(self):
137 return self.statistics.get().null_count()
138
139 @property
140 def distinct_count(self):
141 return self.statistics.get().distinct_count()
142
143 @property
144 def num_values(self):
145 return self.statistics.get().num_values()
146
147 @property
148 def physical_type(self):
149 raw_physical_type = self.statistics.get().physical_type()
150 return physical_type_name_from_enum(raw_physical_type)
151
152 @property
153 def logical_type(self):
154 return wrap_logical_type(self.statistics.get().descr().logical_type())
155
156 @property
157 def converted_type(self):
158 raw_converted_type = self.statistics.get().descr().converted_type()
159 return converted_type_name_from_enum(raw_converted_type)
160
161
162 cdef class ParquetLogicalType(_Weakrefable):
163 cdef:
164 shared_ptr[const CParquetLogicalType] type
165
166 def __cinit__(self):
167 pass
168
169 cdef init(self, const shared_ptr[const CParquetLogicalType]& type):
170 self.type = type
171
172 def __str__(self):
173 return frombytes(self.type.get().ToString(), safe=True)
174
175 def to_json(self):
176 return frombytes(self.type.get().ToJSON())
177
178 @property
179 def type(self):
180 return logical_type_name_from_enum(self.type.get().type())
181
182
183 cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type):
184 cdef ParquetLogicalType out = ParquetLogicalType()
185 out.init(type)
186 return out
187
188
189 cdef _cast_statistic_raw_min(CStatistics* statistics):
190 cdef ParquetType physical_type = statistics.physical_type()
191 cdef uint32_t type_length = statistics.descr().type_length()
192 if physical_type == ParquetType_BOOLEAN:
193 return (<CBoolStatistics*> statistics).min()
194 elif physical_type == ParquetType_INT32:
195 return (<CInt32Statistics*> statistics).min()
196 elif physical_type == ParquetType_INT64:
197 return (<CInt64Statistics*> statistics).min()
198 elif physical_type == ParquetType_FLOAT:
199 return (<CFloatStatistics*> statistics).min()
200 elif physical_type == ParquetType_DOUBLE:
201 return (<CDoubleStatistics*> statistics).min()
202 elif physical_type == ParquetType_BYTE_ARRAY:
203 return _box_byte_array((<CByteArrayStatistics*> statistics).min())
204 elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
205 return _box_flba((<CFLBAStatistics*> statistics).min(), type_length)
206
207
208 cdef _cast_statistic_raw_max(CStatistics* statistics):
209 cdef ParquetType physical_type = statistics.physical_type()
210 cdef uint32_t type_length = statistics.descr().type_length()
211 if physical_type == ParquetType_BOOLEAN:
212 return (<CBoolStatistics*> statistics).max()
213 elif physical_type == ParquetType_INT32:
214 return (<CInt32Statistics*> statistics).max()
215 elif physical_type == ParquetType_INT64:
216 return (<CInt64Statistics*> statistics).max()
217 elif physical_type == ParquetType_FLOAT:
218 return (<CFloatStatistics*> statistics).max()
219 elif physical_type == ParquetType_DOUBLE:
220 return (<CDoubleStatistics*> statistics).max()
221 elif physical_type == ParquetType_BYTE_ARRAY:
222 return _box_byte_array((<CByteArrayStatistics*> statistics).max())
223 elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
224 return _box_flba((<CFLBAStatistics*> statistics).max(), type_length)
225
226
227 cdef _cast_statistic_min(CStatistics* statistics):
228 min_raw = _cast_statistic_raw_min(statistics)
229 return _box_logical_type_value(min_raw, statistics.descr())
230
231
232 cdef _cast_statistic_max(CStatistics* statistics):
233 max_raw = _cast_statistic_raw_max(statistics)
234 return _box_logical_type_value(max_raw, statistics.descr())
235
236
237 cdef _box_logical_type_value(object value, const ColumnDescriptor* descr):
238 cdef:
239 const CParquetLogicalType* ltype = descr.logical_type().get()
240 ParquetTimeUnit time_unit
241 const CParquetIntType* itype
242 const CParquetTimestampType* ts_type
243
244 if ltype.type() == ParquetLogicalType_STRING:
245 return value.decode('utf8')
246 elif ltype.type() == ParquetLogicalType_TIME:
247 time_unit = (<const CParquetTimeType*> ltype).time_unit()
248 if time_unit == ParquetTimeUnit_MILLIS:
249 return _datetime_from_int(value, unit=TimeUnit_MILLI).time()
250 else:
251 return _datetime_from_int(value, unit=TimeUnit_MICRO).time()
252 elif ltype.type() == ParquetLogicalType_TIMESTAMP:
253 ts_type = <const CParquetTimestampType*> ltype
254 time_unit = ts_type.time_unit()
255 if ts_type.is_adjusted_to_utc():
256 import pytz
257 tzinfo = pytz.utc
258 else:
259 tzinfo = None
260 if time_unit == ParquetTimeUnit_MILLIS:
261 return _datetime_from_int(value, unit=TimeUnit_MILLI,
262 tzinfo=tzinfo)
263 elif time_unit == ParquetTimeUnit_MICROS:
264 return _datetime_from_int(value, unit=TimeUnit_MICRO,
265 tzinfo=tzinfo)
266 elif time_unit == ParquetTimeUnit_NANOS:
267 return _datetime_from_int(value, unit=TimeUnit_NANO,
268 tzinfo=tzinfo)
269 else:
270 raise ValueError("Unsupported time unit")
271 elif ltype.type() == ParquetLogicalType_INT:
272 itype = <const CParquetIntType*> ltype
273 if not itype.is_signed() and itype.bit_width() == 32:
274 return int(np.int32(value).view(np.uint32))
275 elif not itype.is_signed() and itype.bit_width() == 64:
276 return int(np.int64(value).view(np.uint64))
277 else:
278 return value
279 else:
280 # No logical boxing defined
281 return value
282
283
284 cdef _box_byte_array(ParquetByteArray val):
285 return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> val.len)
286
287
288 cdef _box_flba(ParquetFLBA val, uint32_t len):
289 return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)
290
291
292 cdef class ColumnChunkMetaData(_Weakrefable):
293 def __cinit__(self):
294 pass
295
296 def __repr__(self):
297 statistics = indent(repr(self.statistics), 4 * ' ')
298 return """{0}
299 file_offset: {1}
300 file_path: {2}
301 physical_type: {3}
302 num_values: {4}
303 path_in_schema: {5}
304 is_stats_set: {6}
305 statistics:
306 {7}
307 compression: {8}
308 encodings: {9}
309 has_dictionary_page: {10}
310 dictionary_page_offset: {11}
311 data_page_offset: {12}
312 total_compressed_size: {13}
313 total_uncompressed_size: {14}""".format(object.__repr__(self),
314 self.file_offset,
315 self.file_path,
316 self.physical_type,
317 self.num_values,
318 self.path_in_schema,
319 self.is_stats_set,
320 statistics,
321 self.compression,
322 self.encodings,
323 self.has_dictionary_page,
324 self.dictionary_page_offset,
325 self.data_page_offset,
326 self.total_compressed_size,
327 self.total_uncompressed_size)
328
329 def to_dict(self):
330 statistics = self.statistics.to_dict() if self.is_stats_set else None
331 d = dict(
332 file_offset=self.file_offset,
333 file_path=self.file_path,
334 physical_type=self.physical_type,
335 num_values=self.num_values,
336 path_in_schema=self.path_in_schema,
337 is_stats_set=self.is_stats_set,
338 statistics=statistics,
339 compression=self.compression,
340 encodings=self.encodings,
341 has_dictionary_page=self.has_dictionary_page,
342 dictionary_page_offset=self.dictionary_page_offset,
343 data_page_offset=self.data_page_offset,
344 total_compressed_size=self.total_compressed_size,
345 total_uncompressed_size=self.total_uncompressed_size
346 )
347 return d
348
349 def __eq__(self, other):
350 try:
351 return self.equals(other)
352 except TypeError:
353 return NotImplemented
354
355 def equals(self, ColumnChunkMetaData other):
356 return self.metadata.Equals(deref(other.metadata))
357
358 @property
359 def file_offset(self):
360 return self.metadata.file_offset()
361
362 @property
363 def file_path(self):
364 return frombytes(self.metadata.file_path())
365
366 @property
367 def physical_type(self):
368 return physical_type_name_from_enum(self.metadata.type())
369
370 @property
371 def num_values(self):
372 return self.metadata.num_values()
373
374 @property
375 def path_in_schema(self):
376 path = self.metadata.path_in_schema().get().ToDotString()
377 return frombytes(path)
378
379 @property
380 def is_stats_set(self):
381 return self.metadata.is_stats_set()
382
383 @property
384 def statistics(self):
385 if not self.metadata.is_stats_set():
386 return None
387 statistics = Statistics()
388 statistics.init(self.metadata.statistics(), self)
389 return statistics
390
391 @property
392 def compression(self):
393 return compression_name_from_enum(self.metadata.compression())
394
395 @property
396 def encodings(self):
397 return tuple(map(encoding_name_from_enum, self.metadata.encodings()))
398
399 @property
400 def has_dictionary_page(self):
401 return bool(self.metadata.has_dictionary_page())
402
403 @property
404 def dictionary_page_offset(self):
405 if self.has_dictionary_page:
406 return self.metadata.dictionary_page_offset()
407 else:
408 return None
409
410 @property
411 def data_page_offset(self):
412 return self.metadata.data_page_offset()
413
414 @property
415 def has_index_page(self):
416 raise NotImplementedError('not supported in parquet-cpp')
417
418 @property
419 def index_page_offset(self):
420 raise NotImplementedError("parquet-cpp doesn't return valid values")
421
422 @property
423 def total_compressed_size(self):
424 return self.metadata.total_compressed_size()
425
426 @property
427 def total_uncompressed_size(self):
428 return self.metadata.total_uncompressed_size()
429
430
431 cdef class RowGroupMetaData(_Weakrefable):
432 def __cinit__(self, FileMetaData parent, int index):
433 if index < 0 or index >= parent.num_row_groups:
434 raise IndexError('{0} out of bounds'.format(index))
435 self.up_metadata = parent._metadata.RowGroup(index)
436 self.metadata = self.up_metadata.get()
437 self.parent = parent
438 self.index = index
439
440 def __reduce__(self):
441 return RowGroupMetaData, (self.parent, self.index)
442
443 def __eq__(self, other):
444 try:
445 return self.equals(other)
446 except TypeError:
447 return NotImplemented
448
449 def equals(self, RowGroupMetaData other):
450 return self.metadata.Equals(deref(other.metadata))
451
452 def column(self, int i):
453 if i < 0 or i >= self.num_columns:
454 raise IndexError('{0} out of bounds'.format(i))
455 chunk = ColumnChunkMetaData()
456 chunk.init(self, i)
457 return chunk
458
459 def __repr__(self):
460 return """{0}
461 num_columns: {1}
462 num_rows: {2}
463 total_byte_size: {3}""".format(object.__repr__(self),
464 self.num_columns,
465 self.num_rows,
466 self.total_byte_size)
467
468 def to_dict(self):
469 columns = []
470 d = dict(
471 num_columns=self.num_columns,
472 num_rows=self.num_rows,
473 total_byte_size=self.total_byte_size,
474 columns=columns,
475 )
476 for i in range(self.num_columns):
477 columns.append(self.column(i).to_dict())
478 return d
479
480 @property
481 def num_columns(self):
482 return self.metadata.num_columns()
483
484 @property
485 def num_rows(self):
486 return self.metadata.num_rows()
487
488 @property
489 def total_byte_size(self):
490 return self.metadata.total_byte_size()
491
492
493 def _reconstruct_filemetadata(Buffer serialized):
494 cdef:
495 FileMetaData metadata = FileMetaData.__new__(FileMetaData)
496 CBuffer *buffer = serialized.buffer.get()
497 uint32_t metadata_len = <uint32_t>buffer.size()
498
499 metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len))
500
501 return metadata
502
503
504 cdef class FileMetaData(_Weakrefable):
505 def __cinit__(self):
506 pass
507
508 def __reduce__(self):
509 cdef:
510 NativeFile sink = BufferOutputStream()
511 COutputStream* c_sink = sink.get_output_stream().get()
512 with nogil:
513 self._metadata.WriteTo(c_sink)
514
515 cdef Buffer buffer = sink.getvalue()
516 return _reconstruct_filemetadata, (buffer,)
517
518 def __repr__(self):
519 return """{0}
520 created_by: {1}
521 num_columns: {2}
522 num_rows: {3}
523 num_row_groups: {4}
524 format_version: {5}
525 serialized_size: {6}""".format(object.__repr__(self),
526 self.created_by, self.num_columns,
527 self.num_rows, self.num_row_groups,
528 self.format_version,
529 self.serialized_size)
530
531 def to_dict(self):
532 row_groups = []
533 d = dict(
534 created_by=self.created_by,
535 num_columns=self.num_columns,
536 num_rows=self.num_rows,
537 num_row_groups=self.num_row_groups,
538 row_groups=row_groups,
539 format_version=self.format_version,
540 serialized_size=self.serialized_size
541 )
542 for i in range(self.num_row_groups):
543 row_groups.append(self.row_group(i).to_dict())
544 return d
545
546 def __eq__(self, other):
547 try:
548 return self.equals(other)
549 except TypeError:
550 return NotImplemented
551
552 def equals(self, FileMetaData other):
553 return self._metadata.Equals(deref(other._metadata))
554
555 @property
556 def schema(self):
557 if self._schema is None:
558 self._schema = ParquetSchema(self)
559 return self._schema
560
561 @property
562 def serialized_size(self):
563 return self._metadata.size()
564
565 @property
566 def num_columns(self):
567 return self._metadata.num_columns()
568
569 @property
570 def num_rows(self):
571 return self._metadata.num_rows()
572
573 @property
574 def num_row_groups(self):
575 return self._metadata.num_row_groups()
576
577 @property
578 def format_version(self):
579 cdef ParquetVersion version = self._metadata.version()
580 if version == ParquetVersion_V1:
581 return '1.0'
582 elif version == ParquetVersion_V2_0:
583 return 'pseudo-2.0'
584 elif version == ParquetVersion_V2_4:
585 return '2.4'
586 elif version == ParquetVersion_V2_6:
587 return '2.6'
588 else:
589 warnings.warn('Unrecognized file version, assuming 1.0: {}'
590 .format(version))
591 return '1.0'
592
593 @property
594 def created_by(self):
595 return frombytes(self._metadata.created_by())
596
597 @property
598 def metadata(self):
599 cdef:
600 unordered_map[c_string, c_string] metadata
601 const CKeyValueMetadata* underlying_metadata
602 underlying_metadata = self._metadata.key_value_metadata().get()
603 if underlying_metadata != NULL:
604 underlying_metadata.ToUnorderedMap(&metadata)
605 return metadata
606 else:
607 return None
608
609 def row_group(self, int i):
610 return RowGroupMetaData(self, i)
611
612 def set_file_path(self, path):
613 """
614 Modify the file_path field of each ColumnChunk in the
615 FileMetaData to be a particular value
616 """
617 cdef:
618 c_string c_path = tobytes(path)
619 self._metadata.set_file_path(c_path)
620
621 def append_row_groups(self, FileMetaData other):
622 """
623 Append row groups of other FileMetaData object
624 """
625 cdef shared_ptr[CFileMetaData] c_metadata
626
627 c_metadata = other.sp_metadata
628 self._metadata.AppendRowGroups(deref(c_metadata))
629
630 def write_metadata_file(self, where):
631 """
632 Write the metadata object to a metadata-only file
633 """
634 cdef:
635 shared_ptr[COutputStream] sink
636 c_string c_where
637
638 try:
639 where = _stringify_path(where)
640 except TypeError:
641 get_writer(where, &sink)
642 else:
643 c_where = tobytes(where)
644 with nogil:
645 sink = GetResultValue(FileOutputStream.Open(c_where))
646
647 with nogil:
648 check_status(
649 WriteMetaDataFile(deref(self._metadata), sink.get()))
650
651
652 cdef class ParquetSchema(_Weakrefable):
653 def __cinit__(self, FileMetaData container):
654 self.parent = container
655 self.schema = container._metadata.schema()
656
657 def __repr__(self):
658 return "{0}\n{1}".format(
659 object.__repr__(self),
660 frombytes(self.schema.ToString(), safe=True))
661
662 def __reduce__(self):
663 return ParquetSchema, (self.parent,)
664
665 def __len__(self):
666 return self.schema.num_columns()
667
668 def __getitem__(self, i):
669 return self.column(i)
670
671 @property
672 def names(self):
673 return [self[i].name for i in range(len(self))]
674
675 def to_arrow_schema(self):
676 """
677 Convert Parquet schema to effective Arrow schema
678
679 Returns
680 -------
681 schema : pyarrow.Schema
682 """
683 cdef shared_ptr[CSchema] sp_arrow_schema
684
685 with nogil:
686 check_status(FromParquetSchema(
687 self.schema, default_arrow_reader_properties(),
688 self.parent._metadata.key_value_metadata(),
689 &sp_arrow_schema))
690
691 return pyarrow_wrap_schema(sp_arrow_schema)
692
693 def __eq__(self, other):
694 try:
695 return self.equals(other)
696 except TypeError:
697 return NotImplemented
698
699 def equals(self, ParquetSchema other):
700 """
701 Returns True if the Parquet schemas are equal
702 """
703 return self.schema.Equals(deref(other.schema))
704
705 def column(self, i):
706 if i < 0 or i >= len(self):
707 raise IndexError('{0} out of bounds'.format(i))
708
709 return ColumnSchema(self, i)
710
711
712 cdef class ColumnSchema(_Weakrefable):
713 cdef:
714 int index
715 ParquetSchema parent
716 const ColumnDescriptor* descr
717
718 def __cinit__(self, ParquetSchema schema, int index):
719 self.parent = schema
720 self.index = index # for pickling support
721 self.descr = schema.schema.Column(index)
722
723 def __eq__(self, other):
724 try:
725 return self.equals(other)
726 except TypeError:
727 return NotImplemented
728
729 def __reduce__(self):
730 return ColumnSchema, (self.parent, self.index)
731
732 def equals(self, ColumnSchema other):
733 """
734 Returns True if the column schemas are equal
735 """
736 return self.descr.Equals(deref(other.descr))
737
738 def __repr__(self):
739 physical_type = self.physical_type
740 converted_type = self.converted_type
741 if converted_type == 'DECIMAL':
742 converted_type = 'DECIMAL({0}, {1})'.format(self.precision,
743 self.scale)
744 elif physical_type == 'FIXED_LEN_BYTE_ARRAY':
745 converted_type = ('FIXED_LEN_BYTE_ARRAY(length={0})'
746 .format(self.length))
747
748 return """<ParquetColumnSchema>
749 name: {0}
750 path: {1}
751 max_definition_level: {2}
752 max_repetition_level: {3}
753 physical_type: {4}
754 logical_type: {5}
755 converted_type (legacy): {6}""".format(self.name, self.path,
756 self.max_definition_level,
757 self.max_repetition_level,
758 physical_type,
759 str(self.logical_type),
760 converted_type)
761
762 @property
763 def name(self):
764 return frombytes(self.descr.name())
765
766 @property
767 def path(self):
768 return frombytes(self.descr.path().get().ToDotString())
769
770 @property
771 def max_definition_level(self):
772 return self.descr.max_definition_level()
773
774 @property
775 def max_repetition_level(self):
776 return self.descr.max_repetition_level()
777
778 @property
779 def physical_type(self):
780 return physical_type_name_from_enum(self.descr.physical_type())
781
782 @property
783 def logical_type(self):
784 return wrap_logical_type(self.descr.logical_type())
785
786 @property
787 def converted_type(self):
788 return converted_type_name_from_enum(self.descr.converted_type())
789
790 @property
791 def logical_type(self):
792 return wrap_logical_type(self.descr.logical_type())
793
794 # FIXED_LEN_BYTE_ARRAY attribute
795 @property
796 def length(self):
797 return self.descr.type_length()
798
799 # Decimal attributes
800 @property
801 def precision(self):
802 return self.descr.type_precision()
803
804 @property
805 def scale(self):
806 return self.descr.type_scale()
807
808
809 cdef physical_type_name_from_enum(ParquetType type_):
810 return {
811 ParquetType_BOOLEAN: 'BOOLEAN',
812 ParquetType_INT32: 'INT32',
813 ParquetType_INT64: 'INT64',
814 ParquetType_INT96: 'INT96',
815 ParquetType_FLOAT: 'FLOAT',
816 ParquetType_DOUBLE: 'DOUBLE',
817 ParquetType_BYTE_ARRAY: 'BYTE_ARRAY',
818 ParquetType_FIXED_LEN_BYTE_ARRAY: 'FIXED_LEN_BYTE_ARRAY',
819 }.get(type_, 'UNKNOWN')
820
821
822 cdef logical_type_name_from_enum(ParquetLogicalTypeId type_):
823 return {
824 ParquetLogicalType_UNDEFINED: 'UNDEFINED',
825 ParquetLogicalType_STRING: 'STRING',
826 ParquetLogicalType_MAP: 'MAP',
827 ParquetLogicalType_LIST: 'LIST',
828 ParquetLogicalType_ENUM: 'ENUM',
829 ParquetLogicalType_DECIMAL: 'DECIMAL',
830 ParquetLogicalType_DATE: 'DATE',
831 ParquetLogicalType_TIME: 'TIME',
832 ParquetLogicalType_TIMESTAMP: 'TIMESTAMP',
833 ParquetLogicalType_INT: 'INT',
834 ParquetLogicalType_JSON: 'JSON',
835 ParquetLogicalType_BSON: 'BSON',
836 ParquetLogicalType_UUID: 'UUID',
837 ParquetLogicalType_NONE: 'NONE',
838 }.get(type_, 'UNKNOWN')
839
840
841 cdef converted_type_name_from_enum(ParquetConvertedType type_):
842 return {
843 ParquetConvertedType_NONE: 'NONE',
844 ParquetConvertedType_UTF8: 'UTF8',
845 ParquetConvertedType_MAP: 'MAP',
846 ParquetConvertedType_MAP_KEY_VALUE: 'MAP_KEY_VALUE',
847 ParquetConvertedType_LIST: 'LIST',
848 ParquetConvertedType_ENUM: 'ENUM',
849 ParquetConvertedType_DECIMAL: 'DECIMAL',
850 ParquetConvertedType_DATE: 'DATE',
851 ParquetConvertedType_TIME_MILLIS: 'TIME_MILLIS',
852 ParquetConvertedType_TIME_MICROS: 'TIME_MICROS',
853 ParquetConvertedType_TIMESTAMP_MILLIS: 'TIMESTAMP_MILLIS',
854 ParquetConvertedType_TIMESTAMP_MICROS: 'TIMESTAMP_MICROS',
855 ParquetConvertedType_UINT_8: 'UINT_8',
856 ParquetConvertedType_UINT_16: 'UINT_16',
857 ParquetConvertedType_UINT_32: 'UINT_32',
858 ParquetConvertedType_UINT_64: 'UINT_64',
859 ParquetConvertedType_INT_8: 'INT_8',
860 ParquetConvertedType_INT_16: 'INT_16',
861 ParquetConvertedType_INT_32: 'INT_32',
862 ParquetConvertedType_INT_64: 'INT_64',
863 ParquetConvertedType_JSON: 'JSON',
864 ParquetConvertedType_BSON: 'BSON',
865 ParquetConvertedType_INTERVAL: 'INTERVAL',
866 }.get(type_, 'UNKNOWN')
867
868
869 cdef encoding_name_from_enum(ParquetEncoding encoding_):
870 return {
871 ParquetEncoding_PLAIN: 'PLAIN',
872 ParquetEncoding_PLAIN_DICTIONARY: 'PLAIN_DICTIONARY',
873 ParquetEncoding_RLE: 'RLE',
874 ParquetEncoding_BIT_PACKED: 'BIT_PACKED',
875 ParquetEncoding_DELTA_BINARY_PACKED: 'DELTA_BINARY_PACKED',
876 ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: 'DELTA_LENGTH_BYTE_ARRAY',
877 ParquetEncoding_DELTA_BYTE_ARRAY: 'DELTA_BYTE_ARRAY',
878 ParquetEncoding_RLE_DICTIONARY: 'RLE_DICTIONARY',
879 ParquetEncoding_BYTE_STREAM_SPLIT: 'BYTE_STREAM_SPLIT',
880 }.get(encoding_, 'UNKNOWN')
881
882
883 cdef compression_name_from_enum(ParquetCompression compression_):
884 return {
885 ParquetCompression_UNCOMPRESSED: 'UNCOMPRESSED',
886 ParquetCompression_SNAPPY: 'SNAPPY',
887 ParquetCompression_GZIP: 'GZIP',
888 ParquetCompression_LZO: 'LZO',
889 ParquetCompression_BROTLI: 'BROTLI',
890 ParquetCompression_LZ4: 'LZ4',
891 ParquetCompression_ZSTD: 'ZSTD',
892 }.get(compression_, 'UNKNOWN')
893
894
895 cdef int check_compression_name(name) except -1:
896 if name.upper() not in {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4',
897 'ZSTD'}:
898 raise ArrowException("Unsupported compression: " + name)
899 return 0
900
901
902 cdef ParquetCompression compression_from_name(name):
903 name = name.upper()
904 if name == 'SNAPPY':
905 return ParquetCompression_SNAPPY
906 elif name == 'GZIP':
907 return ParquetCompression_GZIP
908 elif name == 'LZO':
909 return ParquetCompression_LZO
910 elif name == 'BROTLI':
911 return ParquetCompression_BROTLI
912 elif name == 'LZ4':
913 return ParquetCompression_LZ4
914 elif name == 'ZSTD':
915 return ParquetCompression_ZSTD
916 else:
917 return ParquetCompression_UNCOMPRESSED
918
919
920 cdef class ParquetReader(_Weakrefable):
921 cdef:
922 object source
923 CMemoryPool* pool
924 unique_ptr[FileReader] reader
925 FileMetaData _metadata
926
927 cdef public:
928 _column_idx_map
929
930 def __cinit__(self, MemoryPool memory_pool=None):
931 self.pool = maybe_unbox_memory_pool(memory_pool)
932 self._metadata = None
933
934 def open(self, object source not None, bint use_memory_map=True,
935 read_dictionary=None, FileMetaData metadata=None,
936 int buffer_size=0, bint pre_buffer=False,
937 coerce_int96_timestamp_unit=None):
938 cdef:
939 shared_ptr[CRandomAccessFile] rd_handle
940 shared_ptr[CFileMetaData] c_metadata
941 CReaderProperties properties = default_reader_properties()
942 ArrowReaderProperties arrow_props = (
943 default_arrow_reader_properties())
944 c_string path
945 FileReaderBuilder builder
946 TimeUnit int96_timestamp_unit_code
947
948 if metadata is not None:
949 c_metadata = metadata.sp_metadata
950
951 if buffer_size > 0:
952 properties.enable_buffered_stream()
953 properties.set_buffer_size(buffer_size)
954 elif buffer_size == 0:
955 properties.disable_buffered_stream()
956 else:
957 raise ValueError('Buffer size must be larger than zero')
958
959 arrow_props.set_pre_buffer(pre_buffer)
960
961 if coerce_int96_timestamp_unit is None:
962 # use the default defined in default_arrow_reader_properties()
963 pass
964 else:
965 arrow_props.set_coerce_int96_timestamp_unit(
966 string_to_timeunit(coerce_int96_timestamp_unit))
967
968 self.source = source
969
970 get_reader(source, use_memory_map, &rd_handle)
971 with nogil:
972 check_status(builder.Open(rd_handle, properties, c_metadata))
973
974 # Set up metadata
975 with nogil:
976 c_metadata = builder.raw_reader().metadata()
977 self._metadata = result = FileMetaData()
978 result.init(c_metadata)
979
980 if read_dictionary is not None:
981 self._set_read_dictionary(read_dictionary, &arrow_props)
982
983 with nogil:
984 check_status(builder.memory_pool(self.pool)
985 .properties(arrow_props)
986 .Build(&self.reader))
987
988 cdef _set_read_dictionary(self, read_dictionary,
989 ArrowReaderProperties* props):
990 for column in read_dictionary:
991 if not isinstance(column, int):
992 column = self.column_name_idx(column)
993 props.set_read_dictionary(column, True)
994
995 @property
996 def column_paths(self):
997 cdef:
998 FileMetaData container = self.metadata
999 const CFileMetaData* metadata = container._metadata
1000 vector[c_string] path
1001 int i = 0
1002
1003 paths = []
1004 for i in range(0, metadata.num_columns()):
1005 path = (metadata.schema().Column(i)
1006 .path().get().ToDotVector())
1007 paths.append([frombytes(x) for x in path])
1008
1009 return paths
1010
1011 @property
1012 def metadata(self):
1013 return self._metadata
1014
1015 @property
1016 def schema_arrow(self):
1017 cdef shared_ptr[CSchema] out
1018 with nogil:
1019 check_status(self.reader.get().GetSchema(&out))
1020 return pyarrow_wrap_schema(out)
1021
1022 @property
1023 def num_row_groups(self):
1024 return self.reader.get().num_row_groups()
1025
1026 def set_use_threads(self, bint use_threads):
1027 self.reader.get().set_use_threads(use_threads)
1028
1029 def set_batch_size(self, int64_t batch_size):
1030 self.reader.get().set_batch_size(batch_size)
1031
1032 def iter_batches(self, int64_t batch_size, row_groups, column_indices=None,
1033 bint use_threads=True):
1034 cdef:
1035 vector[int] c_row_groups
1036 vector[int] c_column_indices
1037 shared_ptr[CRecordBatch] record_batch
1038 shared_ptr[TableBatchReader] batch_reader
1039 unique_ptr[CRecordBatchReader] recordbatchreader
1040
1041 self.set_batch_size(batch_size)
1042
1043 if use_threads:
1044 self.set_use_threads(use_threads)
1045
1046 for row_group in row_groups:
1047 c_row_groups.push_back(row_group)
1048
1049 if column_indices is not None:
1050 for index in column_indices:
1051 c_column_indices.push_back(index)
1052 with nogil:
1053 check_status(
1054 self.reader.get().GetRecordBatchReader(
1055 c_row_groups, c_column_indices, &recordbatchreader
1056 )
1057 )
1058 else:
1059 with nogil:
1060 check_status(
1061 self.reader.get().GetRecordBatchReader(
1062 c_row_groups, &recordbatchreader
1063 )
1064 )
1065
1066 while True:
1067 with nogil:
1068 check_status(
1069 recordbatchreader.get().ReadNext(&record_batch)
1070 )
1071
1072 if record_batch.get() == NULL:
1073 break
1074
1075 yield pyarrow_wrap_batch(record_batch)
1076
1077 def read_row_group(self, int i, column_indices=None,
1078 bint use_threads=True):
1079 return self.read_row_groups([i], column_indices, use_threads)
1080
1081 def read_row_groups(self, row_groups not None, column_indices=None,
1082 bint use_threads=True):
1083 cdef:
1084 shared_ptr[CTable] ctable
1085 vector[int] c_row_groups
1086 vector[int] c_column_indices
1087
1088 self.set_use_threads(use_threads)
1089
1090 for row_group in row_groups:
1091 c_row_groups.push_back(row_group)
1092
1093 if column_indices is not None:
1094 for index in column_indices:
1095 c_column_indices.push_back(index)
1096
1097 with nogil:
1098 check_status(self.reader.get()
1099 .ReadRowGroups(c_row_groups, c_column_indices,
1100 &ctable))
1101 else:
1102 # Read all columns
1103 with nogil:
1104 check_status(self.reader.get()
1105 .ReadRowGroups(c_row_groups, &ctable))
1106 return pyarrow_wrap_table(ctable)
1107
1108 def read_all(self, column_indices=None, bint use_threads=True):
1109 cdef:
1110 shared_ptr[CTable] ctable
1111 vector[int] c_column_indices
1112
1113 self.set_use_threads(use_threads)
1114
1115 if column_indices is not None:
1116 for index in column_indices:
1117 c_column_indices.push_back(index)
1118
1119 with nogil:
1120 check_status(self.reader.get()
1121 .ReadTable(c_column_indices, &ctable))
1122 else:
1123 # Read all columns
1124 with nogil:
1125 check_status(self.reader.get()
1126 .ReadTable(&ctable))
1127 return pyarrow_wrap_table(ctable)
1128
1129 def scan_contents(self, column_indices=None, batch_size=65536):
1130 cdef:
1131 vector[int] c_column_indices
1132 int32_t c_batch_size
1133 int64_t c_num_rows
1134
1135 if column_indices is not None:
1136 for index in column_indices:
1137 c_column_indices.push_back(index)
1138
1139 c_batch_size = batch_size
1140
1141 with nogil:
1142 check_status(self.reader.get()
1143 .ScanContents(c_column_indices, c_batch_size,
1144 &c_num_rows))
1145
1146 return c_num_rows
1147
1148 def column_name_idx(self, column_name):
1149 """
1150 Find the matching index of a column in the schema.
1151
1152 Parameter
1153 ---------
1154 column_name: str
1155 Name of the column, separation of nesting levels is done via ".".
1156
1157 Returns
1158 -------
1159 column_idx: int
1160 Integer index of the position of the column
1161 """
1162 cdef:
1163 FileMetaData container = self.metadata
1164 const CFileMetaData* metadata = container._metadata
1165 int i = 0
1166
1167 if self._column_idx_map is None:
1168 self._column_idx_map = {}
1169 for i in range(0, metadata.num_columns()):
1170 col_bytes = tobytes(metadata.schema().Column(i)
1171 .path().get().ToDotString())
1172 self._column_idx_map[col_bytes] = i
1173
1174 return self._column_idx_map[tobytes(column_name)]
1175
1176 def read_column(self, int column_index):
1177 cdef shared_ptr[CChunkedArray] out
1178 with nogil:
1179 check_status(self.reader.get()
1180 .ReadColumn(column_index, &out))
1181 return pyarrow_wrap_chunked_array(out)
1182
1183 def read_schema_field(self, int field_index):
1184 cdef shared_ptr[CChunkedArray] out
1185 with nogil:
1186 check_status(self.reader.get()
1187 .ReadSchemaField(field_index, &out))
1188 return pyarrow_wrap_chunked_array(out)
1189
1190
1191 cdef shared_ptr[WriterProperties] _create_writer_properties(
1192 use_dictionary=None,
1193 compression=None,
1194 version=None,
1195 write_statistics=None,
1196 data_page_size=None,
1197 compression_level=None,
1198 use_byte_stream_split=False,
1199 data_page_version=None) except *:
1200 """General writer properties"""
1201 cdef:
1202 shared_ptr[WriterProperties] properties
1203 WriterProperties.Builder props
1204
1205 # data_page_version
1206
1207 if data_page_version is not None:
1208 if data_page_version == "1.0":
1209 props.data_page_version(ParquetDataPageVersion_V1)
1210 elif data_page_version == "2.0":
1211 props.data_page_version(ParquetDataPageVersion_V2)
1212 else:
1213 raise ValueError("Unsupported Parquet data page version: {0}"
1214 .format(data_page_version))
1215
1216 # version
1217
1218 if version is not None:
1219 if version == "1.0":
1220 props.version(ParquetVersion_V1)
1221 elif version in ("2.0", "pseudo-2.0"):
1222 warnings.warn(
1223 "Parquet format '2.0' pseudo version is deprecated, use "
1224 "'2.4' or '2.6' for fine-grained feature selection",
1225 FutureWarning, stacklevel=2)
1226 props.version(ParquetVersion_V2_0)
1227 elif version == "2.4":
1228 props.version(ParquetVersion_V2_4)
1229 elif version == "2.6":
1230 props.version(ParquetVersion_V2_6)
1231 else:
1232 raise ValueError("Unsupported Parquet format version: {0}"
1233 .format(version))
1234
1235 # compression
1236
1237 if isinstance(compression, basestring):
1238 check_compression_name(compression)
1239 props.compression(compression_from_name(compression))
1240 elif compression is not None:
1241 for column, codec in compression.iteritems():
1242 check_compression_name(codec)
1243 props.compression(tobytes(column), compression_from_name(codec))
1244
1245 if isinstance(compression_level, int):
1246 props.compression_level(compression_level)
1247 elif compression_level is not None:
1248 for column, level in compression_level.iteritems():
1249 props.compression_level(tobytes(column), level)
1250
1251 # use_dictionary
1252
1253 if isinstance(use_dictionary, bool):
1254 if use_dictionary:
1255 props.enable_dictionary()
1256 else:
1257 props.disable_dictionary()
1258 elif use_dictionary is not None:
1259 # Deactivate dictionary encoding by default
1260 props.disable_dictionary()
1261 for column in use_dictionary:
1262 props.enable_dictionary(tobytes(column))
1263
1264 # write_statistics
1265
1266 if isinstance(write_statistics, bool):
1267 if write_statistics:
1268 props.enable_statistics()
1269 else:
1270 props.disable_statistics()
1271 elif write_statistics is not None:
1272 # Deactivate statistics by default and enable for specified columns
1273 props.disable_statistics()
1274 for column in write_statistics:
1275 props.enable_statistics(tobytes(column))
1276
1277 # use_byte_stream_split
1278
1279 if isinstance(use_byte_stream_split, bool):
1280 if use_byte_stream_split:
1281 props.encoding(ParquetEncoding_BYTE_STREAM_SPLIT)
1282 elif use_byte_stream_split is not None:
1283 for column in use_byte_stream_split:
1284 props.encoding(tobytes(column),
1285 ParquetEncoding_BYTE_STREAM_SPLIT)
1286
1287 if data_page_size is not None:
1288 props.data_pagesize(data_page_size)
1289
1290 properties = props.build()
1291
1292 return properties
1293
1294
1295 cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
1296 use_deprecated_int96_timestamps=False,
1297 coerce_timestamps=None,
1298 allow_truncated_timestamps=False,
1299 writer_engine_version=None,
1300 use_compliant_nested_type=False) except *:
1301 """Arrow writer properties"""
1302 cdef:
1303 shared_ptr[ArrowWriterProperties] arrow_properties
1304 ArrowWriterProperties.Builder arrow_props
1305
1306 # Store the original Arrow schema so things like dictionary types can
1307 # be automatically reconstructed
1308 arrow_props.store_schema()
1309
1310 # int96 support
1311
1312 if use_deprecated_int96_timestamps:
1313 arrow_props.enable_deprecated_int96_timestamps()
1314 else:
1315 arrow_props.disable_deprecated_int96_timestamps()
1316
1317 # coerce_timestamps
1318
1319 if coerce_timestamps == 'ms':
1320 arrow_props.coerce_timestamps(TimeUnit_MILLI)
1321 elif coerce_timestamps == 'us':
1322 arrow_props.coerce_timestamps(TimeUnit_MICRO)
1323 elif coerce_timestamps is not None:
1324 raise ValueError('Invalid value for coerce_timestamps: {0}'
1325 .format(coerce_timestamps))
1326
1327 # allow_truncated_timestamps
1328
1329 if allow_truncated_timestamps:
1330 arrow_props.allow_truncated_timestamps()
1331 else:
1332 arrow_props.disallow_truncated_timestamps()
1333
1334 # use_compliant_nested_type
1335
1336 if use_compliant_nested_type:
1337 arrow_props.enable_compliant_nested_types()
1338 else:
1339 arrow_props.disable_compliant_nested_types()
1340
1341 # writer_engine_version
1342
1343 if writer_engine_version == "V1":
1344 warnings.warn("V1 parquet writer engine is a no-op. Use V2.")
1345 arrow_props.set_engine_version(ArrowWriterEngineVersion.V1)
1346 elif writer_engine_version != "V2":
1347 raise ValueError("Unsupported Writer Engine Version: {0}"
1348 .format(writer_engine_version))
1349
1350 arrow_properties = arrow_props.build()
1351
1352 return arrow_properties
1353
1354
1355 cdef class ParquetWriter(_Weakrefable):
1356 cdef:
1357 unique_ptr[FileWriter] writer
1358 shared_ptr[COutputStream] sink
1359 bint own_sink
1360
1361 cdef readonly:
1362 object use_dictionary
1363 object use_deprecated_int96_timestamps
1364 object use_byte_stream_split
1365 object coerce_timestamps
1366 object allow_truncated_timestamps
1367 object compression
1368 object compression_level
1369 object data_page_version
1370 object use_compliant_nested_type
1371 object version
1372 object write_statistics
1373 object writer_engine_version
1374 int row_group_size
1375 int64_t data_page_size
1376
1377 def __cinit__(self, where, Schema schema, use_dictionary=None,
1378 compression=None, version=None,
1379 write_statistics=None,
1380 MemoryPool memory_pool=None,
1381 use_deprecated_int96_timestamps=False,
1382 coerce_timestamps=None,
1383 data_page_size=None,
1384 allow_truncated_timestamps=False,
1385 compression_level=None,
1386 use_byte_stream_split=False,
1387 writer_engine_version=None,
1388 data_page_version=None,
1389 use_compliant_nested_type=False):
1390 cdef:
1391 shared_ptr[WriterProperties] properties
1392 shared_ptr[ArrowWriterProperties] arrow_properties
1393 c_string c_where
1394 CMemoryPool* pool
1395
1396 try:
1397 where = _stringify_path(where)
1398 except TypeError:
1399 get_writer(where, &self.sink)
1400 self.own_sink = False
1401 else:
1402 c_where = tobytes(where)
1403 with nogil:
1404 self.sink = GetResultValue(FileOutputStream.Open(c_where))
1405 self.own_sink = True
1406
1407 properties = _create_writer_properties(
1408 use_dictionary=use_dictionary,
1409 compression=compression,
1410 version=version,
1411 write_statistics=write_statistics,
1412 data_page_size=data_page_size,
1413 compression_level=compression_level,
1414 use_byte_stream_split=use_byte_stream_split,
1415 data_page_version=data_page_version
1416 )
1417 arrow_properties = _create_arrow_writer_properties(
1418 use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
1419 coerce_timestamps=coerce_timestamps,
1420 allow_truncated_timestamps=allow_truncated_timestamps,
1421 writer_engine_version=writer_engine_version,
1422 use_compliant_nested_type=use_compliant_nested_type
1423 )
1424
1425 pool = maybe_unbox_memory_pool(memory_pool)
1426 with nogil:
1427 check_status(
1428 FileWriter.Open(deref(schema.schema), pool,
1429 self.sink, properties, arrow_properties,
1430 &self.writer))
1431
1432 def close(self):
1433 with nogil:
1434 check_status(self.writer.get().Close())
1435 if self.own_sink:
1436 check_status(self.sink.get().Close())
1437
1438 def write_table(self, Table table, row_group_size=None):
1439 cdef:
1440 CTable* ctable = table.table
1441 int64_t c_row_group_size
1442
1443 if row_group_size is None or row_group_size == -1:
1444 c_row_group_size = ctable.num_rows()
1445 elif row_group_size == 0:
1446 raise ValueError('Row group size cannot be 0')
1447 else:
1448 c_row_group_size = row_group_size
1449
1450 with nogil:
1451 check_status(self.writer.get()
1452 .WriteTable(deref(ctable), c_row_group_size))
1453
1454 @property
1455 def metadata(self):
1456 cdef:
1457 shared_ptr[CFileMetaData] metadata
1458 FileMetaData result
1459 with nogil:
1460 metadata = self.writer.get().metadata()
1461 if metadata:
1462 result = FileMetaData()
1463 result.init(metadata)
1464 return result
1465 raise RuntimeError(
1466 'file metadata is only available after writer close')