]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/python/pyarrow/pandas_compat.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / pandas_compat.py
CommitLineData
1d09f67e
TL
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18
19import ast
20from collections.abc import Sequence
21from concurrent import futures
22# import threading submodule upfront to avoid partially initialized
23# module bug (ARROW-11983)
24import concurrent.futures.thread # noqa
25from copy import deepcopy
26from itertools import zip_longest
27import json
28import operator
29import re
30import warnings
31
32import numpy as np
33
34import pyarrow as pa
35from pyarrow.lib import _pandas_api, builtin_pickle, frombytes # noqa
36
37
38_logical_type_map = {}
39
40
41def get_logical_type_map():
42 global _logical_type_map
43
44 if not _logical_type_map:
45 _logical_type_map.update({
46 pa.lib.Type_NA: 'empty',
47 pa.lib.Type_BOOL: 'bool',
48 pa.lib.Type_INT8: 'int8',
49 pa.lib.Type_INT16: 'int16',
50 pa.lib.Type_INT32: 'int32',
51 pa.lib.Type_INT64: 'int64',
52 pa.lib.Type_UINT8: 'uint8',
53 pa.lib.Type_UINT16: 'uint16',
54 pa.lib.Type_UINT32: 'uint32',
55 pa.lib.Type_UINT64: 'uint64',
56 pa.lib.Type_HALF_FLOAT: 'float16',
57 pa.lib.Type_FLOAT: 'float32',
58 pa.lib.Type_DOUBLE: 'float64',
59 pa.lib.Type_DATE32: 'date',
60 pa.lib.Type_DATE64: 'date',
61 pa.lib.Type_TIME32: 'time',
62 pa.lib.Type_TIME64: 'time',
63 pa.lib.Type_BINARY: 'bytes',
64 pa.lib.Type_FIXED_SIZE_BINARY: 'bytes',
65 pa.lib.Type_STRING: 'unicode',
66 })
67 return _logical_type_map
68
69
70def get_logical_type(arrow_type):
71 logical_type_map = get_logical_type_map()
72
73 try:
74 return logical_type_map[arrow_type.id]
75 except KeyError:
76 if isinstance(arrow_type, pa.lib.DictionaryType):
77 return 'categorical'
78 elif isinstance(arrow_type, pa.lib.ListType):
79 return 'list[{}]'.format(get_logical_type(arrow_type.value_type))
80 elif isinstance(arrow_type, pa.lib.TimestampType):
81 return 'datetimetz' if arrow_type.tz is not None else 'datetime'
82 elif isinstance(arrow_type, pa.lib.Decimal128Type):
83 return 'decimal'
84 return 'object'
85
86
87_numpy_logical_type_map = {
88 np.bool_: 'bool',
89 np.int8: 'int8',
90 np.int16: 'int16',
91 np.int32: 'int32',
92 np.int64: 'int64',
93 np.uint8: 'uint8',
94 np.uint16: 'uint16',
95 np.uint32: 'uint32',
96 np.uint64: 'uint64',
97 np.float32: 'float32',
98 np.float64: 'float64',
99 'datetime64[D]': 'date',
100 np.unicode_: 'string',
101 np.bytes_: 'bytes',
102}
103
104
105def get_logical_type_from_numpy(pandas_collection):
106 try:
107 return _numpy_logical_type_map[pandas_collection.dtype.type]
108 except KeyError:
109 if hasattr(pandas_collection.dtype, 'tz'):
110 return 'datetimetz'
111 # See https://github.com/pandas-dev/pandas/issues/24739
112 if str(pandas_collection.dtype) == 'datetime64[ns]':
113 return 'datetime64[ns]'
114 result = _pandas_api.infer_dtype(pandas_collection)
115 if result == 'string':
116 return 'unicode'
117 return result
118
119
120def get_extension_dtype_info(column):
121 dtype = column.dtype
122 if str(dtype) == 'category':
123 cats = getattr(column, 'cat', column)
124 assert cats is not None
125 metadata = {
126 'num_categories': len(cats.categories),
127 'ordered': cats.ordered,
128 }
129 physical_dtype = str(cats.codes.dtype)
130 elif hasattr(dtype, 'tz'):
131 metadata = {'timezone': pa.lib.tzinfo_to_string(dtype.tz)}
132 physical_dtype = 'datetime64[ns]'
133 else:
134 metadata = None
135 physical_dtype = str(dtype)
136 return physical_dtype, metadata
137
138
139def get_column_metadata(column, name, arrow_type, field_name):
140 """Construct the metadata for a given column
141
142 Parameters
143 ----------
144 column : pandas.Series or pandas.Index
145 name : str
146 arrow_type : pyarrow.DataType
147 field_name : str
148 Equivalent to `name` when `column` is a `Series`, otherwise if `column`
149 is a pandas Index then `field_name` will not be the same as `name`.
150 This is the name of the field in the arrow Table's schema.
151
152 Returns
153 -------
154 dict
155 """
156 logical_type = get_logical_type(arrow_type)
157
158 string_dtype, extra_metadata = get_extension_dtype_info(column)
159 if logical_type == 'decimal':
160 extra_metadata = {
161 'precision': arrow_type.precision,
162 'scale': arrow_type.scale,
163 }
164 string_dtype = 'object'
165
166 if name is not None and not isinstance(name, str):
167 raise TypeError(
168 'Column name must be a string. Got column {} of type {}'.format(
169 name, type(name).__name__
170 )
171 )
172
173 assert field_name is None or isinstance(field_name, str), \
174 str(type(field_name))
175 return {
176 'name': name,
177 'field_name': 'None' if field_name is None else field_name,
178 'pandas_type': logical_type,
179 'numpy_type': string_dtype,
180 'metadata': extra_metadata,
181 }
182
183
184def construct_metadata(columns_to_convert, df, column_names, index_levels,
185 index_descriptors, preserve_index, types):
186 """Returns a dictionary containing enough metadata to reconstruct a pandas
187 DataFrame as an Arrow Table, including index columns.
188
189 Parameters
190 ----------
191 columns_to_convert : list[pd.Series]
192 df : pandas.DataFrame
193 index_levels : List[pd.Index]
194 index_descriptors : List[Dict]
195 preserve_index : bool
196 types : List[pyarrow.DataType]
197
198 Returns
199 -------
200 dict
201 """
202 num_serialized_index_levels = len([descr for descr in index_descriptors
203 if not isinstance(descr, dict)])
204 # Use ntypes instead of Python shorthand notation [:-len(x)] as [:-0]
205 # behaves differently to what we want.
206 ntypes = len(types)
207 df_types = types[:ntypes - num_serialized_index_levels]
208 index_types = types[ntypes - num_serialized_index_levels:]
209
210 column_metadata = []
211 for col, sanitized_name, arrow_type in zip(columns_to_convert,
212 column_names, df_types):
213 metadata = get_column_metadata(col, name=sanitized_name,
214 arrow_type=arrow_type,
215 field_name=sanitized_name)
216 column_metadata.append(metadata)
217
218 index_column_metadata = []
219 if preserve_index is not False:
220 for level, arrow_type, descriptor in zip(index_levels, index_types,
221 index_descriptors):
222 if isinstance(descriptor, dict):
223 # The index is represented in a non-serialized fashion,
224 # e.g. RangeIndex
225 continue
226 metadata = get_column_metadata(level, name=level.name,
227 arrow_type=arrow_type,
228 field_name=descriptor)
229 index_column_metadata.append(metadata)
230
231 column_indexes = []
232
233 levels = getattr(df.columns, 'levels', [df.columns])
234 names = getattr(df.columns, 'names', [df.columns.name])
235 for level, name in zip(levels, names):
236 metadata = _get_simple_index_descriptor(level, name)
237 column_indexes.append(metadata)
238 else:
239 index_descriptors = index_column_metadata = column_indexes = []
240
241 return {
242 b'pandas': json.dumps({
243 'index_columns': index_descriptors,
244 'column_indexes': column_indexes,
245 'columns': column_metadata + index_column_metadata,
246 'creator': {
247 'library': 'pyarrow',
248 'version': pa.__version__
249 },
250 'pandas_version': _pandas_api.version
251 }).encode('utf8')
252 }
253
254
255def _get_simple_index_descriptor(level, name):
256 string_dtype, extra_metadata = get_extension_dtype_info(level)
257 pandas_type = get_logical_type_from_numpy(level)
258 if 'mixed' in pandas_type:
259 warnings.warn(
260 "The DataFrame has column names of mixed type. They will be "
261 "converted to strings and not roundtrip correctly.",
262 UserWarning, stacklevel=4)
263 if pandas_type == 'unicode':
264 assert not extra_metadata
265 extra_metadata = {'encoding': 'UTF-8'}
266 return {
267 'name': name,
268 'field_name': name,
269 'pandas_type': pandas_type,
270 'numpy_type': string_dtype,
271 'metadata': extra_metadata,
272 }
273
274
275def _column_name_to_strings(name):
276 """Convert a column name (or level) to either a string or a recursive
277 collection of strings.
278
279 Parameters
280 ----------
281 name : str or tuple
282
283 Returns
284 -------
285 value : str or tuple
286
287 Examples
288 --------
289 >>> name = 'foo'
290 >>> _column_name_to_strings(name)
291 'foo'
292 >>> name = ('foo', 'bar')
293 >>> _column_name_to_strings(name)
294 ('foo', 'bar')
295 >>> import pandas as pd
296 >>> name = (1, pd.Timestamp('2017-02-01 00:00:00'))
297 >>> _column_name_to_strings(name)
298 ('1', '2017-02-01 00:00:00')
299 """
300 if isinstance(name, str):
301 return name
302 elif isinstance(name, bytes):
303 # XXX: should we assume that bytes in Python 3 are UTF-8?
304 return name.decode('utf8')
305 elif isinstance(name, tuple):
306 return str(tuple(map(_column_name_to_strings, name)))
307 elif isinstance(name, Sequence):
308 raise TypeError("Unsupported type for MultiIndex level")
309 elif name is None:
310 return None
311 return str(name)
312
313
314def _index_level_name(index, i, column_names):
315 """Return the name of an index level or a default name if `index.name` is
316 None or is already a column name.
317
318 Parameters
319 ----------
320 index : pandas.Index
321 i : int
322
323 Returns
324 -------
325 name : str
326 """
327 if index.name is not None and index.name not in column_names:
328 return index.name
329 else:
330 return '__index_level_{:d}__'.format(i)
331
332
333def _get_columns_to_convert(df, schema, preserve_index, columns):
334 columns = _resolve_columns_of_interest(df, schema, columns)
335
336 if not df.columns.is_unique:
337 raise ValueError(
338 'Duplicate column names found: {}'.format(list(df.columns))
339 )
340
341 if schema is not None:
342 return _get_columns_to_convert_given_schema(df, schema, preserve_index)
343
344 column_names = []
345
346 index_levels = (
347 _get_index_level_values(df.index) if preserve_index is not False
348 else []
349 )
350
351 columns_to_convert = []
352 convert_fields = []
353
354 for name in columns:
355 col = df[name]
356 name = _column_name_to_strings(name)
357
358 if _pandas_api.is_sparse(col):
359 raise TypeError(
360 "Sparse pandas data (column {}) not supported.".format(name))
361
362 columns_to_convert.append(col)
363 convert_fields.append(None)
364 column_names.append(name)
365
366 index_descriptors = []
367 index_column_names = []
368 for i, index_level in enumerate(index_levels):
369 name = _index_level_name(index_level, i, column_names)
370 if (isinstance(index_level, _pandas_api.pd.RangeIndex) and
371 preserve_index is None):
372 descr = _get_range_index_descriptor(index_level)
373 else:
374 columns_to_convert.append(index_level)
375 convert_fields.append(None)
376 descr = name
377 index_column_names.append(name)
378 index_descriptors.append(descr)
379
380 all_names = column_names + index_column_names
381
382 # all_names : all of the columns in the resulting table including the data
383 # columns and serialized index columns
384 # column_names : the names of the data columns
385 # index_column_names : the names of the serialized index columns
386 # index_descriptors : descriptions of each index to be used for
387 # reconstruction
388 # index_levels : the extracted index level values
389 # columns_to_convert : assembled raw data (both data columns and indexes)
390 # to be converted to Arrow format
391 # columns_fields : specified column to use for coercion / casting
392 # during serialization, if a Schema was provided
393 return (all_names, column_names, index_column_names, index_descriptors,
394 index_levels, columns_to_convert, convert_fields)
395
396
397def _get_columns_to_convert_given_schema(df, schema, preserve_index):
398 """
399 Specialized version of _get_columns_to_convert in case a Schema is
400 specified.
401 In that case, the Schema is used as the single point of truth for the
402 table structure (types, which columns are included, order of columns, ...).
403 """
404 column_names = []
405 columns_to_convert = []
406 convert_fields = []
407 index_descriptors = []
408 index_column_names = []
409 index_levels = []
410
411 for name in schema.names:
412 try:
413 col = df[name]
414 is_index = False
415 except KeyError:
416 try:
417 col = _get_index_level(df, name)
418 except (KeyError, IndexError):
419 # name not found as index level
420 raise KeyError(
421 "name '{}' present in the specified schema is not found "
422 "in the columns or index".format(name))
423 if preserve_index is False:
424 raise ValueError(
425 "name '{}' present in the specified schema corresponds "
426 "to the index, but 'preserve_index=False' was "
427 "specified".format(name))
428 elif (preserve_index is None and
429 isinstance(col, _pandas_api.pd.RangeIndex)):
430 raise ValueError(
431 "name '{}' is present in the schema, but it is a "
432 "RangeIndex which will not be converted as a column "
433 "in the Table, but saved as metadata-only not in "
434 "columns. Specify 'preserve_index=True' to force it "
435 "being added as a column, or remove it from the "
436 "specified schema".format(name))
437 is_index = True
438
439 name = _column_name_to_strings(name)
440
441 if _pandas_api.is_sparse(col):
442 raise TypeError(
443 "Sparse pandas data (column {}) not supported.".format(name))
444
445 field = schema.field(name)
446 columns_to_convert.append(col)
447 convert_fields.append(field)
448 column_names.append(name)
449
450 if is_index:
451 index_column_names.append(name)
452 index_descriptors.append(name)
453 index_levels.append(col)
454
455 all_names = column_names + index_column_names
456
457 return (all_names, column_names, index_column_names, index_descriptors,
458 index_levels, columns_to_convert, convert_fields)
459
460
461def _get_index_level(df, name):
462 """
463 Get the index level of a DataFrame given 'name' (column name in an arrow
464 Schema).
465 """
466 key = name
467 if name not in df.index.names and _is_generated_index_name(name):
468 # we know we have an autogenerated name => extract number and get
469 # the index level positionally
470 key = int(name[len("__index_level_"):-2])
471 return df.index.get_level_values(key)
472
473
474def _level_name(name):
475 # preserve type when default serializable, otherwise str it
476 try:
477 json.dumps(name)
478 return name
479 except TypeError:
480 return str(name)
481
482
483def _get_range_index_descriptor(level):
484 # public start/stop/step attributes added in pandas 0.25.0
485 return {
486 'kind': 'range',
487 'name': _level_name(level.name),
488 'start': _pandas_api.get_rangeindex_attribute(level, 'start'),
489 'stop': _pandas_api.get_rangeindex_attribute(level, 'stop'),
490 'step': _pandas_api.get_rangeindex_attribute(level, 'step')
491 }
492
493
494def _get_index_level_values(index):
495 n = len(getattr(index, 'levels', [index]))
496 return [index.get_level_values(i) for i in range(n)]
497
498
499def _resolve_columns_of_interest(df, schema, columns):
500 if schema is not None and columns is not None:
501 raise ValueError('Schema and columns arguments are mutually '
502 'exclusive, pass only one of them')
503 elif schema is not None:
504 columns = schema.names
505 elif columns is not None:
506 columns = [c for c in columns if c in df.columns]
507 else:
508 columns = df.columns
509
510 return columns
511
512
513def dataframe_to_types(df, preserve_index, columns=None):
514 (all_names,
515 column_names,
516 _,
517 index_descriptors,
518 index_columns,
519 columns_to_convert,
520 _) = _get_columns_to_convert(df, None, preserve_index, columns)
521
522 types = []
523 # If pandas knows type, skip conversion
524 for c in columns_to_convert:
525 values = c.values
526 if _pandas_api.is_categorical(values):
527 type_ = pa.array(c, from_pandas=True).type
528 elif _pandas_api.is_extension_array_dtype(values):
529 type_ = pa.array(c.head(0), from_pandas=True).type
530 else:
531 values, type_ = get_datetimetz_type(values, c.dtype, None)
532 type_ = pa.lib._ndarray_to_arrow_type(values, type_)
533 if type_ is None:
534 type_ = pa.array(c, from_pandas=True).type
535 types.append(type_)
536
537 metadata = construct_metadata(
538 columns_to_convert, df, column_names, index_columns,
539 index_descriptors, preserve_index, types
540 )
541
542 return all_names, types, metadata
543
544
545def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
546 safe=True):
547 (all_names,
548 column_names,
549 index_column_names,
550 index_descriptors,
551 index_columns,
552 columns_to_convert,
553 convert_fields) = _get_columns_to_convert(df, schema, preserve_index,
554 columns)
555
556 # NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
557 # using a thread pool is worth it. Currently the heuristic is whether the
558 # nrows > 100 * ncols and ncols > 1.
559 if nthreads is None:
560 nrows, ncols = len(df), len(df.columns)
561 if nrows > ncols * 100 and ncols > 1:
562 nthreads = pa.cpu_count()
563 else:
564 nthreads = 1
565
566 def convert_column(col, field):
567 if field is None:
568 field_nullable = True
569 type_ = None
570 else:
571 field_nullable = field.nullable
572 type_ = field.type
573
574 try:
575 result = pa.array(col, type=type_, from_pandas=True, safe=safe)
576 except (pa.ArrowInvalid,
577 pa.ArrowNotImplementedError,
578 pa.ArrowTypeError) as e:
579 e.args += ("Conversion failed for column {!s} with type {!s}"
580 .format(col.name, col.dtype),)
581 raise e
582 if not field_nullable and result.null_count > 0:
583 raise ValueError("Field {} was non-nullable but pandas column "
584 "had {} null values".format(str(field),
585 result.null_count))
586 return result
587
588 def _can_definitely_zero_copy(arr):
589 return (isinstance(arr, np.ndarray) and
590 arr.flags.contiguous and
591 issubclass(arr.dtype.type, np.integer))
592
593 if nthreads == 1:
594 arrays = [convert_column(c, f)
595 for c, f in zip(columns_to_convert, convert_fields)]
596 else:
597 arrays = []
598 with futures.ThreadPoolExecutor(nthreads) as executor:
599 for c, f in zip(columns_to_convert, convert_fields):
600 if _can_definitely_zero_copy(c.values):
601 arrays.append(convert_column(c, f))
602 else:
603 arrays.append(executor.submit(convert_column, c, f))
604
605 for i, maybe_fut in enumerate(arrays):
606 if isinstance(maybe_fut, futures.Future):
607 arrays[i] = maybe_fut.result()
608
609 types = [x.type for x in arrays]
610
611 if schema is None:
612 fields = []
613 for name, type_ in zip(all_names, types):
614 name = name if name is not None else 'None'
615 fields.append(pa.field(name, type_))
616 schema = pa.schema(fields)
617
618 pandas_metadata = construct_metadata(
619 columns_to_convert, df, column_names, index_columns,
620 index_descriptors, preserve_index, types
621 )
622 metadata = deepcopy(schema.metadata) if schema.metadata else dict()
623 metadata.update(pandas_metadata)
624 schema = schema.with_metadata(metadata)
625
626 return arrays, schema
627
628
629def get_datetimetz_type(values, dtype, type_):
630 if values.dtype.type != np.datetime64:
631 return values, type_
632
633 if _pandas_api.is_datetimetz(dtype) and type_ is None:
634 # If no user type passed, construct a tz-aware timestamp type
635 tz = dtype.tz
636 unit = dtype.unit
637 type_ = pa.timestamp(unit, tz)
638 elif type_ is None:
639 # Trust the NumPy dtype
640 type_ = pa.from_numpy_dtype(values.dtype)
641
642 return values, type_
643
644# ----------------------------------------------------------------------
645# Converting pandas.DataFrame to a dict containing only NumPy arrays or other
646# objects friendly to pyarrow.serialize
647
648
649def dataframe_to_serialized_dict(frame):
650 block_manager = frame._data
651
652 blocks = []
653 axes = [ax for ax in block_manager.axes]
654
655 for block in block_manager.blocks:
656 values = block.values
657 block_data = {}
658
659 if _pandas_api.is_datetimetz(values.dtype):
660 block_data['timezone'] = pa.lib.tzinfo_to_string(values.tz)
661 if hasattr(values, 'values'):
662 values = values.values
663 elif _pandas_api.is_categorical(values):
664 block_data.update(dictionary=values.categories,
665 ordered=values.ordered)
666 values = values.codes
667 block_data.update(
668 placement=block.mgr_locs.as_array,
669 block=values
670 )
671
672 # If we are dealing with an object array, pickle it instead.
673 if values.dtype == np.dtype(object):
674 block_data['object'] = None
675 block_data['block'] = builtin_pickle.dumps(
676 values, protocol=builtin_pickle.HIGHEST_PROTOCOL)
677
678 blocks.append(block_data)
679
680 return {
681 'blocks': blocks,
682 'axes': axes
683 }
684
685
686def serialized_dict_to_dataframe(data):
687 import pandas.core.internals as _int
688 reconstructed_blocks = [_reconstruct_block(block)
689 for block in data['blocks']]
690
691 block_mgr = _int.BlockManager(reconstructed_blocks, data['axes'])
692 return _pandas_api.data_frame(block_mgr)
693
694
695def _reconstruct_block(item, columns=None, extension_columns=None):
696 """
697 Construct a pandas Block from the `item` dictionary coming from pyarrow's
698 serialization or returned by arrow::python::ConvertTableToPandas.
699
700 This function takes care of converting dictionary types to pandas
701 categorical, Timestamp-with-timezones to the proper pandas Block, and
702 conversion to pandas ExtensionBlock
703
704 Parameters
705 ----------
706 item : dict
707 For basic types, this is a dictionary in the form of
708 {'block': np.ndarray of values, 'placement': pandas block placement}.
709 Additional keys are present for other types (dictionary, timezone,
710 object).
711 columns :
712 Column names of the table being constructed, used for extension types
713 extension_columns : dict
714 Dictionary of {column_name: pandas_dtype} that includes all columns
715 and corresponding dtypes that will be converted to a pandas
716 ExtensionBlock.
717
718 Returns
719 -------
720 pandas Block
721
722 """
723 import pandas.core.internals as _int
724
725 block_arr = item.get('block', None)
726 placement = item['placement']
727 if 'dictionary' in item:
728 cat = _pandas_api.categorical_type.from_codes(
729 block_arr, categories=item['dictionary'],
730 ordered=item['ordered'])
731 block = _int.make_block(cat, placement=placement)
732 elif 'timezone' in item:
733 dtype = make_datetimetz(item['timezone'])
734 block = _int.make_block(block_arr, placement=placement,
735 klass=_int.DatetimeTZBlock,
736 dtype=dtype)
737 elif 'object' in item:
738 block = _int.make_block(builtin_pickle.loads(block_arr),
739 placement=placement)
740 elif 'py_array' in item:
741 # create ExtensionBlock
742 arr = item['py_array']
743 assert len(placement) == 1
744 name = columns[placement[0]]
745 pandas_dtype = extension_columns[name]
746 if not hasattr(pandas_dtype, '__from_arrow__'):
747 raise ValueError("This column does not support to be converted "
748 "to a pandas ExtensionArray")
749 pd_ext_arr = pandas_dtype.__from_arrow__(arr)
750 block = _int.make_block(pd_ext_arr, placement=placement)
751 else:
752 block = _int.make_block(block_arr, placement=placement)
753
754 return block
755
756
757def make_datetimetz(tz):
758 tz = pa.lib.string_to_tzinfo(tz)
759 return _pandas_api.datetimetz_type('ns', tz=tz)
760
761
762# ----------------------------------------------------------------------
763# Converting pyarrow.Table efficiently to pandas.DataFrame
764
765
766def table_to_blockmanager(options, table, categories=None,
767 ignore_metadata=False, types_mapper=None):
768 from pandas.core.internals import BlockManager
769
770 all_columns = []
771 column_indexes = []
772 pandas_metadata = table.schema.pandas_metadata
773
774 if not ignore_metadata and pandas_metadata is not None:
775 all_columns = pandas_metadata['columns']
776 column_indexes = pandas_metadata.get('column_indexes', [])
777 index_descriptors = pandas_metadata['index_columns']
778 table = _add_any_metadata(table, pandas_metadata)
779 table, index = _reconstruct_index(table, index_descriptors,
780 all_columns)
781 ext_columns_dtypes = _get_extension_dtypes(
782 table, all_columns, types_mapper)
783 else:
784 index = _pandas_api.pd.RangeIndex(table.num_rows)
785 ext_columns_dtypes = _get_extension_dtypes(table, [], types_mapper)
786
787 _check_data_column_metadata_consistency(all_columns)
788 columns = _deserialize_column_index(table, all_columns, column_indexes)
789 blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
790
791 axes = [columns, index]
792 return BlockManager(blocks, axes)
793
794
795# Set of the string repr of all numpy dtypes that can be stored in a pandas
796# dataframe (complex not included since not supported by Arrow)
797_pandas_supported_numpy_types = {
798 str(np.dtype(typ))
799 for typ in (np.sctypes['int'] + np.sctypes['uint'] + np.sctypes['float'] +
800 ['object', 'bool'])
801}
802
803
804def _get_extension_dtypes(table, columns_metadata, types_mapper=None):
805 """
806 Based on the stored column pandas metadata and the extension types
807 in the arrow schema, infer which columns should be converted to a
808 pandas extension dtype.
809
810 The 'numpy_type' field in the column metadata stores the string
811 representation of the original pandas dtype (and, despite its name,
812 not the 'pandas_type' field).
813 Based on this string representation, a pandas/numpy dtype is constructed
814 and then we can check if this dtype supports conversion from arrow.
815
816 """
817 ext_columns = {}
818
819 # older pandas version that does not yet support extension dtypes
820 if _pandas_api.extension_dtype is None:
821 return ext_columns
822
823 # infer the extension columns from the pandas metadata
824 for col_meta in columns_metadata:
825 name = col_meta['name']
826 dtype = col_meta['numpy_type']
827 if dtype not in _pandas_supported_numpy_types:
828 # pandas_dtype is expensive, so avoid doing this for types
829 # that are certainly numpy dtypes
830 pandas_dtype = _pandas_api.pandas_dtype(dtype)
831 if isinstance(pandas_dtype, _pandas_api.extension_dtype):
832 if hasattr(pandas_dtype, "__from_arrow__"):
833 ext_columns[name] = pandas_dtype
834
835 # infer from extension type in the schema
836 for field in table.schema:
837 typ = field.type
838 if isinstance(typ, pa.BaseExtensionType):
839 try:
840 pandas_dtype = typ.to_pandas_dtype()
841 except NotImplementedError:
842 pass
843 else:
844 ext_columns[field.name] = pandas_dtype
845
846 # use the specified mapping of built-in arrow types to pandas dtypes
847 if types_mapper:
848 for field in table.schema:
849 typ = field.type
850 pandas_dtype = types_mapper(typ)
851 if pandas_dtype is not None:
852 ext_columns[field.name] = pandas_dtype
853
854 return ext_columns
855
856
857def _check_data_column_metadata_consistency(all_columns):
858 # It can never be the case in a released version of pyarrow that
859 # c['name'] is None *and* 'field_name' is not a key in the column metadata,
860 # because the change to allow c['name'] to be None and the change to add
861 # 'field_name' are in the same release (0.8.0)
862 assert all(
863 (c['name'] is None and 'field_name' in c) or c['name'] is not None
864 for c in all_columns
865 )
866
867
868def _deserialize_column_index(block_table, all_columns, column_indexes):
869 column_strings = [frombytes(x) if isinstance(x, bytes) else x
870 for x in block_table.column_names]
871 if all_columns:
872 columns_name_dict = {
873 c.get('field_name', _column_name_to_strings(c['name'])): c['name']
874 for c in all_columns
875 }
876 columns_values = [
877 columns_name_dict.get(name, name) for name in column_strings
878 ]
879 else:
880 columns_values = column_strings
881
882 # If we're passed multiple column indexes then evaluate with
883 # ast.literal_eval, since the column index values show up as a list of
884 # tuples
885 to_pair = ast.literal_eval if len(column_indexes) > 1 else lambda x: (x,)
886
887 # Create the column index
888
889 # Construct the base index
890 if not columns_values:
891 columns = _pandas_api.pd.Index(columns_values)
892 else:
893 columns = _pandas_api.pd.MultiIndex.from_tuples(
894 list(map(to_pair, columns_values)),
895 names=[col_index['name'] for col_index in column_indexes] or None,
896 )
897
898 # if we're reconstructing the index
899 if len(column_indexes) > 0:
900 columns = _reconstruct_columns_from_metadata(columns, column_indexes)
901
902 # ARROW-1751: flatten a single level column MultiIndex for pandas 0.21.0
903 columns = _flatten_single_level_multiindex(columns)
904
905 return columns
906
907
908def _reconstruct_index(table, index_descriptors, all_columns):
909 # 0. 'field_name' is the name of the column in the arrow Table
910 # 1. 'name' is the user-facing name of the column, that is, it came from
911 # pandas
912 # 2. 'field_name' and 'name' differ for index columns
913 # 3. We fall back on c['name'] for backwards compatibility
914 field_name_to_metadata = {
915 c.get('field_name', c['name']): c
916 for c in all_columns
917 }
918
919 # Build up a list of index columns and names while removing those columns
920 # from the original table
921 index_arrays = []
922 index_names = []
923 result_table = table
924 for descr in index_descriptors:
925 if isinstance(descr, str):
926 result_table, index_level, index_name = _extract_index_level(
927 table, result_table, descr, field_name_to_metadata)
928 if index_level is None:
929 # ARROW-1883: the serialized index column was not found
930 continue
931 elif descr['kind'] == 'range':
932 index_name = descr['name']
933 index_level = _pandas_api.pd.RangeIndex(descr['start'],
934 descr['stop'],
935 step=descr['step'],
936 name=index_name)
937 if len(index_level) != len(table):
938 # Possibly the result of munged metadata
939 continue
940 else:
941 raise ValueError("Unrecognized index kind: {}"
942 .format(descr['kind']))
943 index_arrays.append(index_level)
944 index_names.append(index_name)
945
946 pd = _pandas_api.pd
947
948 # Reconstruct the row index
949 if len(index_arrays) > 1:
950 index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
951 elif len(index_arrays) == 1:
952 index = index_arrays[0]
953 if not isinstance(index, pd.Index):
954 # Box anything that wasn't boxed above
955 index = pd.Index(index, name=index_names[0])
956 else:
957 index = pd.RangeIndex(table.num_rows)
958
959 return result_table, index
960
961
962def _extract_index_level(table, result_table, field_name,
963 field_name_to_metadata):
964 logical_name = field_name_to_metadata[field_name]['name']
965 index_name = _backwards_compatible_index_name(field_name, logical_name)
966 i = table.schema.get_field_index(field_name)
967
968 if i == -1:
969 # The serialized index column was removed by the user
970 return result_table, None, None
971
972 pd = _pandas_api.pd
973
974 col = table.column(i)
975 values = col.to_pandas().values
976
977 if hasattr(values, 'flags') and not values.flags.writeable:
978 # ARROW-1054: in pandas 0.19.2, factorize will reject
979 # non-writeable arrays when calling MultiIndex.from_arrays
980 values = values.copy()
981
982 if isinstance(col.type, pa.lib.TimestampType) and col.type.tz is not None:
983 index_level = make_tz_aware(pd.Series(values), col.type.tz)
984 else:
985 index_level = pd.Series(values, dtype=values.dtype)
986 result_table = result_table.remove_column(
987 result_table.schema.get_field_index(field_name)
988 )
989 return result_table, index_level, index_name
990
991
992def _backwards_compatible_index_name(raw_name, logical_name):
993 """Compute the name of an index column that is compatible with older
994 versions of :mod:`pyarrow`.
995
996 Parameters
997 ----------
998 raw_name : str
999 logical_name : str
1000
1001 Returns
1002 -------
1003 result : str
1004
1005 Notes
1006 -----
1007 * Part of :func:`~pyarrow.pandas_compat.table_to_blockmanager`
1008 """
1009 # Part of table_to_blockmanager
1010 if raw_name == logical_name and _is_generated_index_name(raw_name):
1011 return None
1012 else:
1013 return logical_name
1014
1015
1016def _is_generated_index_name(name):
1017 pattern = r'^__index_level_\d+__$'
1018 return re.match(pattern, name) is not None
1019
1020
1021_pandas_logical_type_map = {
1022 'date': 'datetime64[D]',
1023 'datetime': 'datetime64[ns]',
1024 'unicode': np.unicode_,
1025 'bytes': np.bytes_,
1026 'string': np.str_,
1027 'integer': np.int64,
1028 'floating': np.float64,
1029 'empty': np.object_,
1030}
1031
1032
1033def _pandas_type_to_numpy_type(pandas_type):
1034 """Get the numpy dtype that corresponds to a pandas type.
1035
1036 Parameters
1037 ----------
1038 pandas_type : str
1039 The result of a call to pandas.lib.infer_dtype.
1040
1041 Returns
1042 -------
1043 dtype : np.dtype
1044 The dtype that corresponds to `pandas_type`.
1045 """
1046 try:
1047 return _pandas_logical_type_map[pandas_type]
1048 except KeyError:
1049 if 'mixed' in pandas_type:
1050 # catching 'mixed', 'mixed-integer' and 'mixed-integer-float'
1051 return np.object_
1052 return np.dtype(pandas_type)
1053
1054
1055def _get_multiindex_codes(mi):
1056 # compat for pandas < 0.24 (MI labels renamed to codes).
1057 if isinstance(mi, _pandas_api.pd.MultiIndex):
1058 return mi.codes if hasattr(mi, 'codes') else mi.labels
1059 else:
1060 return None
1061
1062
1063def _reconstruct_columns_from_metadata(columns, column_indexes):
1064 """Construct a pandas MultiIndex from `columns` and column index metadata
1065 in `column_indexes`.
1066
1067 Parameters
1068 ----------
1069 columns : List[pd.Index]
1070 The columns coming from a pyarrow.Table
1071 column_indexes : List[Dict[str, str]]
1072 The column index metadata deserialized from the JSON schema metadata
1073 in a :class:`~pyarrow.Table`.
1074
1075 Returns
1076 -------
1077 result : MultiIndex
1078 The index reconstructed using `column_indexes` metadata with levels of
1079 the correct type.
1080
1081 Notes
1082 -----
1083 * Part of :func:`~pyarrow.pandas_compat.table_to_blockmanager`
1084 """
1085 pd = _pandas_api.pd
1086 # Get levels and labels, and provide sane defaults if the index has a
1087 # single level to avoid if/else spaghetti.
1088 levels = getattr(columns, 'levels', None) or [columns]
1089 labels = _get_multiindex_codes(columns) or [
1090 pd.RangeIndex(len(level)) for level in levels
1091 ]
1092
1093 # Convert each level to the dtype provided in the metadata
1094 levels_dtypes = [
1095 (level, col_index.get('pandas_type', str(level.dtype)),
1096 col_index.get('numpy_type', None))
1097 for level, col_index in zip_longest(
1098 levels, column_indexes, fillvalue={}
1099 )
1100 ]
1101
1102 new_levels = []
1103 encoder = operator.methodcaller('encode', 'UTF-8')
1104
1105 for level, pandas_dtype, numpy_dtype in levels_dtypes:
1106 dtype = _pandas_type_to_numpy_type(pandas_dtype)
1107 # Since our metadata is UTF-8 encoded, Python turns things that were
1108 # bytes into unicode strings when json.loads-ing them. We need to
1109 # convert them back to bytes to preserve metadata.
1110 if dtype == np.bytes_:
1111 level = level.map(encoder)
1112 elif level.dtype != dtype:
1113 level = level.astype(dtype)
1114 # ARROW-9096: if original DataFrame was upcast we keep that
1115 if level.dtype != numpy_dtype:
1116 level = level.astype(numpy_dtype)
1117
1118 new_levels.append(level)
1119
1120 return pd.MultiIndex(new_levels, labels, names=columns.names)
1121
1122
1123def _table_to_blocks(options, block_table, categories, extension_columns):
1124 # Part of table_to_blockmanager
1125
1126 # Convert an arrow table to Block from the internal pandas API
1127 columns = block_table.column_names
1128 result = pa.lib.table_to_blocks(options, block_table, categories,
1129 list(extension_columns.keys()))
1130 return [_reconstruct_block(item, columns, extension_columns)
1131 for item in result]
1132
1133
1134def _flatten_single_level_multiindex(index):
1135 pd = _pandas_api.pd
1136 if isinstance(index, pd.MultiIndex) and index.nlevels == 1:
1137 levels, = index.levels
1138 labels, = _get_multiindex_codes(index)
1139 # ARROW-9096: use levels.dtype to match cast with original DataFrame
1140 dtype = levels.dtype
1141
1142 # Cheaply check that we do not somehow have duplicate column names
1143 if not index.is_unique:
1144 raise ValueError('Found non-unique column index')
1145
1146 return pd.Index(
1147 [levels[_label] if _label != -1 else None for _label in labels],
1148 dtype=dtype,
1149 name=index.names[0]
1150 )
1151 return index
1152
1153
1154def _add_any_metadata(table, pandas_metadata):
1155 modified_columns = {}
1156 modified_fields = {}
1157
1158 schema = table.schema
1159
1160 index_columns = pandas_metadata['index_columns']
1161 # only take index columns into account if they are an actual table column
1162 index_columns = [idx_col for idx_col in index_columns
1163 if isinstance(idx_col, str)]
1164 n_index_levels = len(index_columns)
1165 n_columns = len(pandas_metadata['columns']) - n_index_levels
1166
1167 # Add time zones
1168 for i, col_meta in enumerate(pandas_metadata['columns']):
1169
1170 raw_name = col_meta.get('field_name')
1171 if not raw_name:
1172 # deal with metadata written with arrow < 0.8 or fastparquet
1173 raw_name = col_meta['name']
1174 if i >= n_columns:
1175 # index columns
1176 raw_name = index_columns[i - n_columns]
1177 if raw_name is None:
1178 raw_name = 'None'
1179
1180 idx = schema.get_field_index(raw_name)
1181 if idx != -1:
1182 if col_meta['pandas_type'] == 'datetimetz':
1183 col = table[idx]
1184 if not isinstance(col.type, pa.lib.TimestampType):
1185 continue
1186 metadata = col_meta['metadata']
1187 if not metadata:
1188 continue
1189 metadata_tz = metadata.get('timezone')
1190 if metadata_tz and metadata_tz != col.type.tz:
1191 converted = col.to_pandas()
1192 tz_aware_type = pa.timestamp('ns', tz=metadata_tz)
1193 with_metadata = pa.Array.from_pandas(converted,
1194 type=tz_aware_type)
1195
1196 modified_fields[idx] = pa.field(schema[idx].name,
1197 tz_aware_type)
1198 modified_columns[idx] = with_metadata
1199
1200 if len(modified_columns) > 0:
1201 columns = []
1202 fields = []
1203 for i in range(len(table.schema)):
1204 if i in modified_columns:
1205 columns.append(modified_columns[i])
1206 fields.append(modified_fields[i])
1207 else:
1208 columns.append(table[i])
1209 fields.append(table.schema[i])
1210 return pa.Table.from_arrays(columns, schema=pa.schema(fields))
1211 else:
1212 return table
1213
1214
1215# ----------------------------------------------------------------------
1216# Helper functions used in lib
1217
1218
1219def make_tz_aware(series, tz):
1220 """
1221 Make a datetime64 Series timezone-aware for the given tz
1222 """
1223 tz = pa.lib.string_to_tzinfo(tz)
1224 series = (series.dt.tz_localize('utc')
1225 .dt.tz_convert(tz))
1226 return series