1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
18 # cython: profile=False
19 # distutils: language = c++
22 from textwrap import indent
27 from cython.operator cimport dereference as deref
28 from pyarrow.includes.common cimport *
29 from pyarrow.includes.libarrow cimport *
30 from pyarrow.lib cimport (_Weakrefable, Buffer, Array, Schema,
32 MemoryPool, maybe_unbox_memory_pool,
34 pyarrow_wrap_chunked_array,
39 NativeFile, get_reader, get_writer,
42 from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream,
43 _stringify_path, _datetime_from_int,
49 cdef class Statistics(_Weakrefable):
63 converted_type (legacy): {}""".format(object.__repr__(self),
71 str(self.logical_type),
76 has_min_max=self.has_min_max,
79 null_count=self.null_count,
80 distinct_count=self.distinct_count,
81 num_values=self.num_values,
82 physical_type=self.physical_type
86 def __eq__(self, other):
88 return self.equals(other)
92 def equals(self, Statistics other):
93 return self.statistics.get().Equals(deref(other.statistics.get()))
96 def has_min_max(self):
97 return self.statistics.get().HasMinMax()
100 def has_null_count(self):
101 return self.statistics.get().HasNullCount()
104 def has_distinct_count(self):
105 return self.statistics.get().HasDistinctCount()
110 return _cast_statistic_raw_min(self.statistics.get())
117 return _cast_statistic_raw_max(self.statistics.get())
124 return _cast_statistic_min(self.statistics.get())
131 return _cast_statistic_max(self.statistics.get())
136 def null_count(self):
137 return self.statistics.get().null_count()
140 def distinct_count(self):
141 return self.statistics.get().distinct_count()
144 def num_values(self):
145 return self.statistics.get().num_values()
148 def physical_type(self):
149 raw_physical_type = self.statistics.get().physical_type()
150 return physical_type_name_from_enum(raw_physical_type)
153 def logical_type(self):
154 return wrap_logical_type(self.statistics.get().descr().logical_type())
157 def converted_type(self):
158 raw_converted_type = self.statistics.get().descr().converted_type()
159 return converted_type_name_from_enum(raw_converted_type)
162 cdef class ParquetLogicalType(_Weakrefable):
164 shared_ptr[const CParquetLogicalType] type
169 cdef init(self, const shared_ptr[const CParquetLogicalType]& type):
173 return frombytes(self.type.get().ToString(), safe=True)
176 return frombytes(self.type.get().ToJSON())
180 return logical_type_name_from_enum(self.type.get().type())
183 cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type):
184 cdef ParquetLogicalType out = ParquetLogicalType()
189 cdef _cast_statistic_raw_min(CStatistics* statistics):
190 cdef ParquetType physical_type = statistics.physical_type()
191 cdef uint32_t type_length = statistics.descr().type_length()
192 if physical_type == ParquetType_BOOLEAN:
193 return (<CBoolStatistics*> statistics).min()
194 elif physical_type == ParquetType_INT32:
195 return (<CInt32Statistics*> statistics).min()
196 elif physical_type == ParquetType_INT64:
197 return (<CInt64Statistics*> statistics).min()
198 elif physical_type == ParquetType_FLOAT:
199 return (<CFloatStatistics*> statistics).min()
200 elif physical_type == ParquetType_DOUBLE:
201 return (<CDoubleStatistics*> statistics).min()
202 elif physical_type == ParquetType_BYTE_ARRAY:
203 return _box_byte_array((<CByteArrayStatistics*> statistics).min())
204 elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
205 return _box_flba((<CFLBAStatistics*> statistics).min(), type_length)
208 cdef _cast_statistic_raw_max(CStatistics* statistics):
209 cdef ParquetType physical_type = statistics.physical_type()
210 cdef uint32_t type_length = statistics.descr().type_length()
211 if physical_type == ParquetType_BOOLEAN:
212 return (<CBoolStatistics*> statistics).max()
213 elif physical_type == ParquetType_INT32:
214 return (<CInt32Statistics*> statistics).max()
215 elif physical_type == ParquetType_INT64:
216 return (<CInt64Statistics*> statistics).max()
217 elif physical_type == ParquetType_FLOAT:
218 return (<CFloatStatistics*> statistics).max()
219 elif physical_type == ParquetType_DOUBLE:
220 return (<CDoubleStatistics*> statistics).max()
221 elif physical_type == ParquetType_BYTE_ARRAY:
222 return _box_byte_array((<CByteArrayStatistics*> statistics).max())
223 elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
224 return _box_flba((<CFLBAStatistics*> statistics).max(), type_length)
227 cdef _cast_statistic_min(CStatistics* statistics):
228 min_raw = _cast_statistic_raw_min(statistics)
229 return _box_logical_type_value(min_raw, statistics.descr())
232 cdef _cast_statistic_max(CStatistics* statistics):
233 max_raw = _cast_statistic_raw_max(statistics)
234 return _box_logical_type_value(max_raw, statistics.descr())
237 cdef _box_logical_type_value(object value, const ColumnDescriptor* descr):
239 const CParquetLogicalType* ltype = descr.logical_type().get()
240 ParquetTimeUnit time_unit
241 const CParquetIntType* itype
242 const CParquetTimestampType* ts_type
244 if ltype.type() == ParquetLogicalType_STRING:
245 return value.decode('utf8')
246 elif ltype.type() == ParquetLogicalType_TIME:
247 time_unit = (<const CParquetTimeType*> ltype).time_unit()
248 if time_unit == ParquetTimeUnit_MILLIS:
249 return _datetime_from_int(value, unit=TimeUnit_MILLI).time()
251 return _datetime_from_int(value, unit=TimeUnit_MICRO).time()
252 elif ltype.type() == ParquetLogicalType_TIMESTAMP:
253 ts_type = <const CParquetTimestampType*> ltype
254 time_unit = ts_type.time_unit()
255 if ts_type.is_adjusted_to_utc():
260 if time_unit == ParquetTimeUnit_MILLIS:
261 return _datetime_from_int(value, unit=TimeUnit_MILLI,
263 elif time_unit == ParquetTimeUnit_MICROS:
264 return _datetime_from_int(value, unit=TimeUnit_MICRO,
266 elif time_unit == ParquetTimeUnit_NANOS:
267 return _datetime_from_int(value, unit=TimeUnit_NANO,
270 raise ValueError("Unsupported time unit")
271 elif ltype.type() == ParquetLogicalType_INT:
272 itype = <const CParquetIntType*> ltype
273 if not itype.is_signed() and itype.bit_width() == 32:
274 return int(np.int32(value).view(np.uint32))
275 elif not itype.is_signed() and itype.bit_width() == 64:
276 return int(np.int64(value).view(np.uint64))
280 # No logical boxing defined
284 cdef _box_byte_array(ParquetByteArray val):
285 return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> val.len)
288 cdef _box_flba(ParquetFLBA val, uint32_t len):
289 return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)
292 cdef class ColumnChunkMetaData(_Weakrefable):
297 statistics = indent(repr(self.statistics), 4 * ' ')
309 has_dictionary_page: {10}
310 dictionary_page_offset: {11}
311 data_page_offset: {12}
312 total_compressed_size: {13}
313 total_uncompressed_size: {14}""".format(object.__repr__(self),
323 self.has_dictionary_page,
324 self.dictionary_page_offset,
325 self.data_page_offset,
326 self.total_compressed_size,
327 self.total_uncompressed_size)
330 statistics = self.statistics.to_dict() if self.is_stats_set else None
332 file_offset=self.file_offset,
333 file_path=self.file_path,
334 physical_type=self.physical_type,
335 num_values=self.num_values,
336 path_in_schema=self.path_in_schema,
337 is_stats_set=self.is_stats_set,
338 statistics=statistics,
339 compression=self.compression,
340 encodings=self.encodings,
341 has_dictionary_page=self.has_dictionary_page,
342 dictionary_page_offset=self.dictionary_page_offset,
343 data_page_offset=self.data_page_offset,
344 total_compressed_size=self.total_compressed_size,
345 total_uncompressed_size=self.total_uncompressed_size
349 def __eq__(self, other):
351 return self.equals(other)
353 return NotImplemented
355 def equals(self, ColumnChunkMetaData other):
356 return self.metadata.Equals(deref(other.metadata))
359 def file_offset(self):
360 return self.metadata.file_offset()
364 return frombytes(self.metadata.file_path())
367 def physical_type(self):
368 return physical_type_name_from_enum(self.metadata.type())
371 def num_values(self):
372 return self.metadata.num_values()
375 def path_in_schema(self):
376 path = self.metadata.path_in_schema().get().ToDotString()
377 return frombytes(path)
380 def is_stats_set(self):
381 return self.metadata.is_stats_set()
384 def statistics(self):
385 if not self.metadata.is_stats_set():
387 statistics = Statistics()
388 statistics.init(self.metadata.statistics(), self)
392 def compression(self):
393 return compression_name_from_enum(self.metadata.compression())
397 return tuple(map(encoding_name_from_enum, self.metadata.encodings()))
400 def has_dictionary_page(self):
401 return bool(self.metadata.has_dictionary_page())
404 def dictionary_page_offset(self):
405 if self.has_dictionary_page:
406 return self.metadata.dictionary_page_offset()
411 def data_page_offset(self):
412 return self.metadata.data_page_offset()
415 def has_index_page(self):
416 raise NotImplementedError('not supported in parquet-cpp')
419 def index_page_offset(self):
420 raise NotImplementedError("parquet-cpp doesn't return valid values")
423 def total_compressed_size(self):
424 return self.metadata.total_compressed_size()
427 def total_uncompressed_size(self):
428 return self.metadata.total_uncompressed_size()
431 cdef class RowGroupMetaData(_Weakrefable):
432 def __cinit__(self, FileMetaData parent, int index):
433 if index < 0 or index >= parent.num_row_groups:
434 raise IndexError('{0} out of bounds'.format(index))
435 self.up_metadata = parent._metadata.RowGroup(index)
436 self.metadata = self.up_metadata.get()
440 def __reduce__(self):
441 return RowGroupMetaData, (self.parent, self.index)
443 def __eq__(self, other):
445 return self.equals(other)
447 return NotImplemented
449 def equals(self, RowGroupMetaData other):
450 return self.metadata.Equals(deref(other.metadata))
452 def column(self, int i):
453 if i < 0 or i >= self.num_columns:
454 raise IndexError('{0} out of bounds'.format(i))
455 chunk = ColumnChunkMetaData()
463 total_byte_size: {3}""".format(object.__repr__(self),
466 self.total_byte_size)
471 num_columns=self.num_columns,
472 num_rows=self.num_rows,
473 total_byte_size=self.total_byte_size,
476 for i in range(self.num_columns):
477 columns.append(self.column(i).to_dict())
481 def num_columns(self):
482 return self.metadata.num_columns()
486 return self.metadata.num_rows()
489 def total_byte_size(self):
490 return self.metadata.total_byte_size()
493 def _reconstruct_filemetadata(Buffer serialized):
495 FileMetaData metadata = FileMetaData.__new__(FileMetaData)
496 CBuffer *buffer = serialized.buffer.get()
497 uint32_t metadata_len = <uint32_t>buffer.size()
499 metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len))
504 cdef class FileMetaData(_Weakrefable):
508 def __reduce__(self):
510 NativeFile sink = BufferOutputStream()
511 COutputStream* c_sink = sink.get_output_stream().get()
513 self._metadata.WriteTo(c_sink)
515 cdef Buffer buffer = sink.getvalue()
516 return _reconstruct_filemetadata, (buffer,)
525 serialized_size: {6}""".format(object.__repr__(self),
526 self.created_by, self.num_columns,
527 self.num_rows, self.num_row_groups,
529 self.serialized_size)
534 created_by=self.created_by,
535 num_columns=self.num_columns,
536 num_rows=self.num_rows,
537 num_row_groups=self.num_row_groups,
538 row_groups=row_groups,
539 format_version=self.format_version,
540 serialized_size=self.serialized_size
542 for i in range(self.num_row_groups):
543 row_groups.append(self.row_group(i).to_dict())
546 def __eq__(self, other):
548 return self.equals(other)
550 return NotImplemented
552 def equals(self, FileMetaData other):
553 return self._metadata.Equals(deref(other._metadata))
557 if self._schema is None:
558 self._schema = ParquetSchema(self)
562 def serialized_size(self):
563 return self._metadata.size()
566 def num_columns(self):
567 return self._metadata.num_columns()
571 return self._metadata.num_rows()
574 def num_row_groups(self):
575 return self._metadata.num_row_groups()
578 def format_version(self):
579 cdef ParquetVersion version = self._metadata.version()
580 if version == ParquetVersion_V1:
582 elif version == ParquetVersion_V2_0:
584 elif version == ParquetVersion_V2_4:
586 elif version == ParquetVersion_V2_6:
589 warnings.warn('Unrecognized file version, assuming 1.0: {}'
594 def created_by(self):
595 return frombytes(self._metadata.created_by())
600 unordered_map[c_string, c_string] metadata
601 const CKeyValueMetadata* underlying_metadata
602 underlying_metadata = self._metadata.key_value_metadata().get()
603 if underlying_metadata != NULL:
604 underlying_metadata.ToUnorderedMap(&metadata)
609 def row_group(self, int i):
610 return RowGroupMetaData(self, i)
612 def set_file_path(self, path):
614 Modify the file_path field of each ColumnChunk in the
615 FileMetaData to be a particular value
618 c_string c_path = tobytes(path)
619 self._metadata.set_file_path(c_path)
621 def append_row_groups(self, FileMetaData other):
623 Append row groups of other FileMetaData object
625 cdef shared_ptr[CFileMetaData] c_metadata
627 c_metadata = other.sp_metadata
628 self._metadata.AppendRowGroups(deref(c_metadata))
630 def write_metadata_file(self, where):
632 Write the metadata object to a metadata-only file
635 shared_ptr[COutputStream] sink
639 where = _stringify_path(where)
641 get_writer(where, &sink)
643 c_where = tobytes(where)
645 sink = GetResultValue(FileOutputStream.Open(c_where))
649 WriteMetaDataFile(deref(self._metadata), sink.get()))
652 cdef class ParquetSchema(_Weakrefable):
653 def __cinit__(self, FileMetaData container):
654 self.parent = container
655 self.schema = container._metadata.schema()
658 return "{0}\n{1}".format(
659 object.__repr__(self),
660 frombytes(self.schema.ToString(), safe=True))
662 def __reduce__(self):
663 return ParquetSchema, (self.parent,)
666 return self.schema.num_columns()
668 def __getitem__(self, i):
669 return self.column(i)
673 return [self[i].name for i in range(len(self))]
675 def to_arrow_schema(self):
677 Convert Parquet schema to effective Arrow schema
681 schema : pyarrow.Schema
683 cdef shared_ptr[CSchema] sp_arrow_schema
686 check_status(FromParquetSchema(
687 self.schema, default_arrow_reader_properties(),
688 self.parent._metadata.key_value_metadata(),
691 return pyarrow_wrap_schema(sp_arrow_schema)
693 def __eq__(self, other):
695 return self.equals(other)
697 return NotImplemented
699 def equals(self, ParquetSchema other):
701 Returns True if the Parquet schemas are equal
703 return self.schema.Equals(deref(other.schema))
706 if i < 0 or i >= len(self):
707 raise IndexError('{0} out of bounds'.format(i))
709 return ColumnSchema(self, i)
712 cdef class ColumnSchema(_Weakrefable):
716 const ColumnDescriptor* descr
718 def __cinit__(self, ParquetSchema schema, int index):
720 self.index = index # for pickling support
721 self.descr = schema.schema.Column(index)
723 def __eq__(self, other):
725 return self.equals(other)
727 return NotImplemented
729 def __reduce__(self):
730 return ColumnSchema, (self.parent, self.index)
732 def equals(self, ColumnSchema other):
734 Returns True if the column schemas are equal
736 return self.descr.Equals(deref(other.descr))
739 physical_type = self.physical_type
740 converted_type = self.converted_type
741 if converted_type == 'DECIMAL':
742 converted_type = 'DECIMAL({0}, {1})'.format(self.precision,
744 elif physical_type == 'FIXED_LEN_BYTE_ARRAY':
745 converted_type = ('FIXED_LEN_BYTE_ARRAY(length={0})'
746 .format(self.length))
748 return """<ParquetColumnSchema>
751 max_definition_level: {2}
752 max_repetition_level: {3}
755 converted_type (legacy): {6}""".format(self.name, self.path,
756 self.max_definition_level,
757 self.max_repetition_level,
759 str(self.logical_type),
764 return frombytes(self.descr.name())
768 return frombytes(self.descr.path().get().ToDotString())
771 def max_definition_level(self):
772 return self.descr.max_definition_level()
775 def max_repetition_level(self):
776 return self.descr.max_repetition_level()
779 def physical_type(self):
780 return physical_type_name_from_enum(self.descr.physical_type())
783 def logical_type(self):
784 return wrap_logical_type(self.descr.logical_type())
787 def converted_type(self):
788 return converted_type_name_from_enum(self.descr.converted_type())
791 def logical_type(self):
792 return wrap_logical_type(self.descr.logical_type())
794 # FIXED_LEN_BYTE_ARRAY attribute
797 return self.descr.type_length()
802 return self.descr.type_precision()
806 return self.descr.type_scale()
809 cdef physical_type_name_from_enum(ParquetType type_):
811 ParquetType_BOOLEAN: 'BOOLEAN',
812 ParquetType_INT32: 'INT32',
813 ParquetType_INT64: 'INT64',
814 ParquetType_INT96: 'INT96',
815 ParquetType_FLOAT: 'FLOAT',
816 ParquetType_DOUBLE: 'DOUBLE',
817 ParquetType_BYTE_ARRAY: 'BYTE_ARRAY',
818 ParquetType_FIXED_LEN_BYTE_ARRAY: 'FIXED_LEN_BYTE_ARRAY',
819 }.get(type_, 'UNKNOWN')
822 cdef logical_type_name_from_enum(ParquetLogicalTypeId type_):
824 ParquetLogicalType_UNDEFINED: 'UNDEFINED',
825 ParquetLogicalType_STRING: 'STRING',
826 ParquetLogicalType_MAP: 'MAP',
827 ParquetLogicalType_LIST: 'LIST',
828 ParquetLogicalType_ENUM: 'ENUM',
829 ParquetLogicalType_DECIMAL: 'DECIMAL',
830 ParquetLogicalType_DATE: 'DATE',
831 ParquetLogicalType_TIME: 'TIME',
832 ParquetLogicalType_TIMESTAMP: 'TIMESTAMP',
833 ParquetLogicalType_INT: 'INT',
834 ParquetLogicalType_JSON: 'JSON',
835 ParquetLogicalType_BSON: 'BSON',
836 ParquetLogicalType_UUID: 'UUID',
837 ParquetLogicalType_NONE: 'NONE',
838 }.get(type_, 'UNKNOWN')
841 cdef converted_type_name_from_enum(ParquetConvertedType type_):
843 ParquetConvertedType_NONE: 'NONE',
844 ParquetConvertedType_UTF8: 'UTF8',
845 ParquetConvertedType_MAP: 'MAP',
846 ParquetConvertedType_MAP_KEY_VALUE: 'MAP_KEY_VALUE',
847 ParquetConvertedType_LIST: 'LIST',
848 ParquetConvertedType_ENUM: 'ENUM',
849 ParquetConvertedType_DECIMAL: 'DECIMAL',
850 ParquetConvertedType_DATE: 'DATE',
851 ParquetConvertedType_TIME_MILLIS: 'TIME_MILLIS',
852 ParquetConvertedType_TIME_MICROS: 'TIME_MICROS',
853 ParquetConvertedType_TIMESTAMP_MILLIS: 'TIMESTAMP_MILLIS',
854 ParquetConvertedType_TIMESTAMP_MICROS: 'TIMESTAMP_MICROS',
855 ParquetConvertedType_UINT_8: 'UINT_8',
856 ParquetConvertedType_UINT_16: 'UINT_16',
857 ParquetConvertedType_UINT_32: 'UINT_32',
858 ParquetConvertedType_UINT_64: 'UINT_64',
859 ParquetConvertedType_INT_8: 'INT_8',
860 ParquetConvertedType_INT_16: 'INT_16',
861 ParquetConvertedType_INT_32: 'INT_32',
862 ParquetConvertedType_INT_64: 'INT_64',
863 ParquetConvertedType_JSON: 'JSON',
864 ParquetConvertedType_BSON: 'BSON',
865 ParquetConvertedType_INTERVAL: 'INTERVAL',
866 }.get(type_, 'UNKNOWN')
869 cdef encoding_name_from_enum(ParquetEncoding encoding_):
871 ParquetEncoding_PLAIN: 'PLAIN',
872 ParquetEncoding_PLAIN_DICTIONARY: 'PLAIN_DICTIONARY',
873 ParquetEncoding_RLE: 'RLE',
874 ParquetEncoding_BIT_PACKED: 'BIT_PACKED',
875 ParquetEncoding_DELTA_BINARY_PACKED: 'DELTA_BINARY_PACKED',
876 ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: 'DELTA_LENGTH_BYTE_ARRAY',
877 ParquetEncoding_DELTA_BYTE_ARRAY: 'DELTA_BYTE_ARRAY',
878 ParquetEncoding_RLE_DICTIONARY: 'RLE_DICTIONARY',
879 ParquetEncoding_BYTE_STREAM_SPLIT: 'BYTE_STREAM_SPLIT',
880 }.get(encoding_, 'UNKNOWN')
883 cdef compression_name_from_enum(ParquetCompression compression_):
885 ParquetCompression_UNCOMPRESSED: 'UNCOMPRESSED',
886 ParquetCompression_SNAPPY: 'SNAPPY',
887 ParquetCompression_GZIP: 'GZIP',
888 ParquetCompression_LZO: 'LZO',
889 ParquetCompression_BROTLI: 'BROTLI',
890 ParquetCompression_LZ4: 'LZ4',
891 ParquetCompression_ZSTD: 'ZSTD',
892 }.get(compression_, 'UNKNOWN')
895 cdef int check_compression_name(name) except -1:
896 if name.upper() not in {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4',
898 raise ArrowException("Unsupported compression: " + name)
902 cdef ParquetCompression compression_from_name(name):
905 return ParquetCompression_SNAPPY
907 return ParquetCompression_GZIP
909 return ParquetCompression_LZO
910 elif name == 'BROTLI':
911 return ParquetCompression_BROTLI
913 return ParquetCompression_LZ4
915 return ParquetCompression_ZSTD
917 return ParquetCompression_UNCOMPRESSED
920 cdef class ParquetReader(_Weakrefable):
924 unique_ptr[FileReader] reader
925 FileMetaData _metadata
930 def __cinit__(self, MemoryPool memory_pool=None):
931 self.pool = maybe_unbox_memory_pool(memory_pool)
932 self._metadata = None
934 def open(self, object source not None, bint use_memory_map=True,
935 read_dictionary=None, FileMetaData metadata=None,
936 int buffer_size=0, bint pre_buffer=False,
937 coerce_int96_timestamp_unit=None):
939 shared_ptr[CRandomAccessFile] rd_handle
940 shared_ptr[CFileMetaData] c_metadata
941 CReaderProperties properties = default_reader_properties()
942 ArrowReaderProperties arrow_props = (
943 default_arrow_reader_properties())
945 FileReaderBuilder builder
946 TimeUnit int96_timestamp_unit_code
948 if metadata is not None:
949 c_metadata = metadata.sp_metadata
952 properties.enable_buffered_stream()
953 properties.set_buffer_size(buffer_size)
954 elif buffer_size == 0:
955 properties.disable_buffered_stream()
957 raise ValueError('Buffer size must be larger than zero')
959 arrow_props.set_pre_buffer(pre_buffer)
961 if coerce_int96_timestamp_unit is None:
962 # use the default defined in default_arrow_reader_properties()
965 arrow_props.set_coerce_int96_timestamp_unit(
966 string_to_timeunit(coerce_int96_timestamp_unit))
970 get_reader(source, use_memory_map, &rd_handle)
972 check_status(builder.Open(rd_handle, properties, c_metadata))
976 c_metadata = builder.raw_reader().metadata()
977 self._metadata = result = FileMetaData()
978 result.init(c_metadata)
980 if read_dictionary is not None:
981 self._set_read_dictionary(read_dictionary, &arrow_props)
984 check_status(builder.memory_pool(self.pool)
985 .properties(arrow_props)
986 .Build(&self.reader))
988 cdef _set_read_dictionary(self, read_dictionary,
989 ArrowReaderProperties* props):
990 for column in read_dictionary:
991 if not isinstance(column, int):
992 column = self.column_name_idx(column)
993 props.set_read_dictionary(column, True)
996 def column_paths(self):
998 FileMetaData container = self.metadata
999 const CFileMetaData* metadata = container._metadata
1000 vector[c_string] path
1004 for i in range(0, metadata.num_columns()):
1005 path = (metadata.schema().Column(i)
1006 .path().get().ToDotVector())
1007 paths.append([frombytes(x) for x in path])
1013 return self._metadata
1016 def schema_arrow(self):
1017 cdef shared_ptr[CSchema] out
1019 check_status(self.reader.get().GetSchema(&out))
1020 return pyarrow_wrap_schema(out)
1023 def num_row_groups(self):
1024 return self.reader.get().num_row_groups()
1026 def set_use_threads(self, bint use_threads):
1027 self.reader.get().set_use_threads(use_threads)
1029 def set_batch_size(self, int64_t batch_size):
1030 self.reader.get().set_batch_size(batch_size)
1032 def iter_batches(self, int64_t batch_size, row_groups, column_indices=None,
1033 bint use_threads=True):
1035 vector[int] c_row_groups
1036 vector[int] c_column_indices
1037 shared_ptr[CRecordBatch] record_batch
1038 shared_ptr[TableBatchReader] batch_reader
1039 unique_ptr[CRecordBatchReader] recordbatchreader
1041 self.set_batch_size(batch_size)
1044 self.set_use_threads(use_threads)
1046 for row_group in row_groups:
1047 c_row_groups.push_back(row_group)
1049 if column_indices is not None:
1050 for index in column_indices:
1051 c_column_indices.push_back(index)
1054 self.reader.get().GetRecordBatchReader(
1055 c_row_groups, c_column_indices, &recordbatchreader
1061 self.reader.get().GetRecordBatchReader(
1062 c_row_groups, &recordbatchreader
1069 recordbatchreader.get().ReadNext(&record_batch)
1072 if record_batch.get() == NULL:
1075 yield pyarrow_wrap_batch(record_batch)
1077 def read_row_group(self, int i, column_indices=None,
1078 bint use_threads=True):
1079 return self.read_row_groups([i], column_indices, use_threads)
1081 def read_row_groups(self, row_groups not None, column_indices=None,
1082 bint use_threads=True):
1084 shared_ptr[CTable] ctable
1085 vector[int] c_row_groups
1086 vector[int] c_column_indices
1088 self.set_use_threads(use_threads)
1090 for row_group in row_groups:
1091 c_row_groups.push_back(row_group)
1093 if column_indices is not None:
1094 for index in column_indices:
1095 c_column_indices.push_back(index)
1098 check_status(self.reader.get()
1099 .ReadRowGroups(c_row_groups, c_column_indices,
1104 check_status(self.reader.get()
1105 .ReadRowGroups(c_row_groups, &ctable))
1106 return pyarrow_wrap_table(ctable)
1108 def read_all(self, column_indices=None, bint use_threads=True):
1110 shared_ptr[CTable] ctable
1111 vector[int] c_column_indices
1113 self.set_use_threads(use_threads)
1115 if column_indices is not None:
1116 for index in column_indices:
1117 c_column_indices.push_back(index)
1120 check_status(self.reader.get()
1121 .ReadTable(c_column_indices, &ctable))
1125 check_status(self.reader.get()
1126 .ReadTable(&ctable))
1127 return pyarrow_wrap_table(ctable)
1129 def scan_contents(self, column_indices=None, batch_size=65536):
1131 vector[int] c_column_indices
1132 int32_t c_batch_size
1135 if column_indices is not None:
1136 for index in column_indices:
1137 c_column_indices.push_back(index)
1139 c_batch_size = batch_size
1142 check_status(self.reader.get()
1143 .ScanContents(c_column_indices, c_batch_size,
1148 def column_name_idx(self, column_name):
1150 Find the matching index of a column in the schema.
1155 Name of the column, separation of nesting levels is done via ".".
1160 Integer index of the position of the column
1163 FileMetaData container = self.metadata
1164 const CFileMetaData* metadata = container._metadata
1167 if self._column_idx_map is None:
1168 self._column_idx_map = {}
1169 for i in range(0, metadata.num_columns()):
1170 col_bytes = tobytes(metadata.schema().Column(i)
1171 .path().get().ToDotString())
1172 self._column_idx_map[col_bytes] = i
1174 return self._column_idx_map[tobytes(column_name)]
1176 def read_column(self, int column_index):
1177 cdef shared_ptr[CChunkedArray] out
1179 check_status(self.reader.get()
1180 .ReadColumn(column_index, &out))
1181 return pyarrow_wrap_chunked_array(out)
1183 def read_schema_field(self, int field_index):
1184 cdef shared_ptr[CChunkedArray] out
1186 check_status(self.reader.get()
1187 .ReadSchemaField(field_index, &out))
1188 return pyarrow_wrap_chunked_array(out)
1191 cdef shared_ptr[WriterProperties] _create_writer_properties(
1192 use_dictionary=None,
1195 write_statistics=None,
1196 data_page_size=None,
1197 compression_level=None,
1198 use_byte_stream_split=False,
1199 data_page_version=None) except *:
1200 """General writer properties"""
1202 shared_ptr[WriterProperties] properties
1203 WriterProperties.Builder props
1207 if data_page_version is not None:
1208 if data_page_version == "1.0":
1209 props.data_page_version(ParquetDataPageVersion_V1)
1210 elif data_page_version == "2.0":
1211 props.data_page_version(ParquetDataPageVersion_V2)
1213 raise ValueError("Unsupported Parquet data page version: {0}"
1214 .format(data_page_version))
1218 if version is not None:
1219 if version == "1.0":
1220 props.version(ParquetVersion_V1)
1221 elif version in ("2.0", "pseudo-2.0"):
1223 "Parquet format '2.0' pseudo version is deprecated, use "
1224 "'2.4' or '2.6' for fine-grained feature selection",
1225 FutureWarning, stacklevel=2)
1226 props.version(ParquetVersion_V2_0)
1227 elif version == "2.4":
1228 props.version(ParquetVersion_V2_4)
1229 elif version == "2.6":
1230 props.version(ParquetVersion_V2_6)
1232 raise ValueError("Unsupported Parquet format version: {0}"
1237 if isinstance(compression, basestring):
1238 check_compression_name(compression)
1239 props.compression(compression_from_name(compression))
1240 elif compression is not None:
1241 for column, codec in compression.iteritems():
1242 check_compression_name(codec)
1243 props.compression(tobytes(column), compression_from_name(codec))
1245 if isinstance(compression_level, int):
1246 props.compression_level(compression_level)
1247 elif compression_level is not None:
1248 for column, level in compression_level.iteritems():
1249 props.compression_level(tobytes(column), level)
1253 if isinstance(use_dictionary, bool):
1255 props.enable_dictionary()
1257 props.disable_dictionary()
1258 elif use_dictionary is not None:
1259 # Deactivate dictionary encoding by default
1260 props.disable_dictionary()
1261 for column in use_dictionary:
1262 props.enable_dictionary(tobytes(column))
1266 if isinstance(write_statistics, bool):
1267 if write_statistics:
1268 props.enable_statistics()
1270 props.disable_statistics()
1271 elif write_statistics is not None:
1272 # Deactivate statistics by default and enable for specified columns
1273 props.disable_statistics()
1274 for column in write_statistics:
1275 props.enable_statistics(tobytes(column))
1277 # use_byte_stream_split
1279 if isinstance(use_byte_stream_split, bool):
1280 if use_byte_stream_split:
1281 props.encoding(ParquetEncoding_BYTE_STREAM_SPLIT)
1282 elif use_byte_stream_split is not None:
1283 for column in use_byte_stream_split:
1284 props.encoding(tobytes(column),
1285 ParquetEncoding_BYTE_STREAM_SPLIT)
1287 if data_page_size is not None:
1288 props.data_pagesize(data_page_size)
1290 properties = props.build()
1295 cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
1296 use_deprecated_int96_timestamps=False,
1297 coerce_timestamps=None,
1298 allow_truncated_timestamps=False,
1299 writer_engine_version=None,
1300 use_compliant_nested_type=False) except *:
1301 """Arrow writer properties"""
1303 shared_ptr[ArrowWriterProperties] arrow_properties
1304 ArrowWriterProperties.Builder arrow_props
1306 # Store the original Arrow schema so things like dictionary types can
1307 # be automatically reconstructed
1308 arrow_props.store_schema()
1312 if use_deprecated_int96_timestamps:
1313 arrow_props.enable_deprecated_int96_timestamps()
1315 arrow_props.disable_deprecated_int96_timestamps()
1319 if coerce_timestamps == 'ms':
1320 arrow_props.coerce_timestamps(TimeUnit_MILLI)
1321 elif coerce_timestamps == 'us':
1322 arrow_props.coerce_timestamps(TimeUnit_MICRO)
1323 elif coerce_timestamps is not None:
1324 raise ValueError('Invalid value for coerce_timestamps: {0}'
1325 .format(coerce_timestamps))
1327 # allow_truncated_timestamps
1329 if allow_truncated_timestamps:
1330 arrow_props.allow_truncated_timestamps()
1332 arrow_props.disallow_truncated_timestamps()
1334 # use_compliant_nested_type
1336 if use_compliant_nested_type:
1337 arrow_props.enable_compliant_nested_types()
1339 arrow_props.disable_compliant_nested_types()
1341 # writer_engine_version
1343 if writer_engine_version == "V1":
1344 warnings.warn("V1 parquet writer engine is a no-op. Use V2.")
1345 arrow_props.set_engine_version(ArrowWriterEngineVersion.V1)
1346 elif writer_engine_version != "V2":
1347 raise ValueError("Unsupported Writer Engine Version: {0}"
1348 .format(writer_engine_version))
1350 arrow_properties = arrow_props.build()
1352 return arrow_properties
1355 cdef class ParquetWriter(_Weakrefable):
1357 unique_ptr[FileWriter] writer
1358 shared_ptr[COutputStream] sink
1362 object use_dictionary
1363 object use_deprecated_int96_timestamps
1364 object use_byte_stream_split
1365 object coerce_timestamps
1366 object allow_truncated_timestamps
1368 object compression_level
1369 object data_page_version
1370 object use_compliant_nested_type
1372 object write_statistics
1373 object writer_engine_version
1375 int64_t data_page_size
1377 def __cinit__(self, where, Schema schema, use_dictionary=None,
1378 compression=None, version=None,
1379 write_statistics=None,
1380 MemoryPool memory_pool=None,
1381 use_deprecated_int96_timestamps=False,
1382 coerce_timestamps=None,
1383 data_page_size=None,
1384 allow_truncated_timestamps=False,
1385 compression_level=None,
1386 use_byte_stream_split=False,
1387 writer_engine_version=None,
1388 data_page_version=None,
1389 use_compliant_nested_type=False):
1391 shared_ptr[WriterProperties] properties
1392 shared_ptr[ArrowWriterProperties] arrow_properties
1397 where = _stringify_path(where)
1399 get_writer(where, &self.sink)
1400 self.own_sink = False
1402 c_where = tobytes(where)
1404 self.sink = GetResultValue(FileOutputStream.Open(c_where))
1405 self.own_sink = True
1407 properties = _create_writer_properties(
1408 use_dictionary=use_dictionary,
1409 compression=compression,
1411 write_statistics=write_statistics,
1412 data_page_size=data_page_size,
1413 compression_level=compression_level,
1414 use_byte_stream_split=use_byte_stream_split,
1415 data_page_version=data_page_version
1417 arrow_properties = _create_arrow_writer_properties(
1418 use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
1419 coerce_timestamps=coerce_timestamps,
1420 allow_truncated_timestamps=allow_truncated_timestamps,
1421 writer_engine_version=writer_engine_version,
1422 use_compliant_nested_type=use_compliant_nested_type
1425 pool = maybe_unbox_memory_pool(memory_pool)
1428 FileWriter.Open(deref(schema.schema), pool,
1429 self.sink, properties, arrow_properties,
1434 check_status(self.writer.get().Close())
1436 check_status(self.sink.get().Close())
1438 def write_table(self, Table table, row_group_size=None):
1440 CTable* ctable = table.table
1441 int64_t c_row_group_size
1443 if row_group_size is None or row_group_size == -1:
1444 c_row_group_size = ctable.num_rows()
1445 elif row_group_size == 0:
1446 raise ValueError('Row group size cannot be 0')
1448 c_row_group_size = row_group_size
1451 check_status(self.writer.get()
1452 .WriteTable(deref(ctable), c_row_group_size))
1457 shared_ptr[CFileMetaData] metadata
1460 metadata = self.writer.get().metadata()
1462 result = FileMetaData()
1463 result.init(metadata)
1466 'file metadata is only available after writer close')