]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/python/pyarrow/table.pxi
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / table.pxi
CommitLineData
1d09f67e
TL
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18import warnings
19
20
21cdef class ChunkedArray(_PandasConvertible):
22 """
23 An array-like composed from a (possibly empty) collection of pyarrow.Arrays
24
25 Warnings
26 --------
27 Do not call this class's constructor directly.
28 """
29
30 def __cinit__(self):
31 self.chunked_array = NULL
32
33 def __init__(self):
34 raise TypeError("Do not call ChunkedArray's constructor directly, use "
35 "`chunked_array` function instead.")
36
37 cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array):
38 self.sp_chunked_array = chunked_array
39 self.chunked_array = chunked_array.get()
40
41 def __reduce__(self):
42 return chunked_array, (self.chunks, self.type)
43
44 @property
45 def data(self):
46 import warnings
47 warnings.warn("Calling .data on ChunkedArray is provided for "
48 "compatibility after Column was removed, simply drop "
49 "this attribute", FutureWarning)
50 return self
51
52 @property
53 def type(self):
54 return pyarrow_wrap_data_type(self.sp_chunked_array.get().type())
55
56 def length(self):
57 return self.chunked_array.length()
58
59 def __len__(self):
60 return self.length()
61
62 def __repr__(self):
63 type_format = object.__repr__(self)
64 return '{0}\n{1}'.format(type_format, str(self))
65
66 def to_string(self, *, int indent=0, int window=10,
67 c_bool skip_new_lines=False):
68 """
69 Render a "pretty-printed" string representation of the ChunkedArray
70
71 Parameters
72 ----------
73 indent : int
74 How much to indent right the content of the array,
75 by default ``0``.
76 window : int
77 How many items to preview at the begin and end
78 of the array when the arrays is bigger than the window.
79 The other elements will be ellipsed.
80 skip_new_lines : bool
81 If the array should be rendered as a single line of text
82 or if each element should be on its own line.
83 """
84 cdef:
85 c_string result
86 PrettyPrintOptions options
87
88 with nogil:
89 options = PrettyPrintOptions(indent, window)
90 options.skip_new_lines = skip_new_lines
91 check_status(
92 PrettyPrint(
93 deref(self.chunked_array),
94 options,
95 &result
96 )
97 )
98
99 return frombytes(result, safe=True)
100
101 def format(self, **kwargs):
102 import warnings
103 warnings.warn('ChunkedArray.format is deprecated, '
104 'use ChunkedArray.to_string')
105 return self.to_string(**kwargs)
106
107 def __str__(self):
108 return self.to_string()
109
110 def validate(self, *, full=False):
111 """
112 Perform validation checks. An exception is raised if validation fails.
113
114 By default only cheap validation checks are run. Pass `full=True`
115 for thorough validation checks (potentially O(n)).
116
117 Parameters
118 ----------
119 full: bool, default False
120 If True, run expensive checks, otherwise cheap checks only.
121
122 Raises
123 ------
124 ArrowInvalid
125 """
126 if full:
127 with nogil:
128 check_status(self.sp_chunked_array.get().ValidateFull())
129 else:
130 with nogil:
131 check_status(self.sp_chunked_array.get().Validate())
132
133 @property
134 def null_count(self):
135 """
136 Number of null entries
137
138 Returns
139 -------
140 int
141 """
142 return self.chunked_array.null_count()
143
144 @property
145 def nbytes(self):
146 """
147 Total number of bytes consumed by the elements of the chunked array.
148 """
149 size = 0
150 for chunk in self.iterchunks():
151 size += chunk.nbytes
152 return size
153
154 def __sizeof__(self):
155 return super(ChunkedArray, self).__sizeof__() + self.nbytes
156
157 def __iter__(self):
158 for chunk in self.iterchunks():
159 for item in chunk:
160 yield item
161
162 def __getitem__(self, key):
163 """
164 Slice or return value at given index
165
166 Parameters
167 ----------
168 key : integer or slice
169 Slices with step not equal to 1 (or None) will produce a copy
170 rather than a zero-copy view
171
172 Returns
173 -------
174 value : Scalar (index) or ChunkedArray (slice)
175 """
176 if isinstance(key, slice):
177 return _normalize_slice(self, key)
178
179 return self.getitem(_normalize_index(key, self.chunked_array.length()))
180
181 cdef getitem(self, int64_t index):
182 cdef int j
183
184 for j in range(self.num_chunks):
185 if index < self.chunked_array.chunk(j).get().length():
186 return self.chunk(j)[index]
187 else:
188 index -= self.chunked_array.chunk(j).get().length()
189
190 def is_null(self, *, nan_is_null=False):
191 """
192 Return boolean array indicating the null values.
193
194 Parameters
195 ----------
196 nan_is_null : bool (optional, default False)
197 Whether floating-point NaN values should also be considered null.
198
199 Returns
200 -------
201 array : boolean Array or ChunkedArray
202 """
203 options = _pc().NullOptions(nan_is_null=nan_is_null)
204 return _pc().call_function('is_null', [self], options)
205
206 def is_valid(self):
207 """
208 Return boolean array indicating the non-null values.
209 """
210 return _pc().is_valid(self)
211
212 def __eq__(self, other):
213 try:
214 return self.equals(other)
215 except TypeError:
216 return NotImplemented
217
218 def fill_null(self, fill_value):
219 """
220 See pyarrow.compute.fill_null docstring for usage.
221 """
222 return _pc().fill_null(self, fill_value)
223
224 def equals(self, ChunkedArray other):
225 """
226 Return whether the contents of two chunked arrays are equal.
227
228 Parameters
229 ----------
230 other : pyarrow.ChunkedArray
231 Chunked array to compare against.
232
233 Returns
234 -------
235 are_equal : bool
236 """
237 if other is None:
238 return False
239
240 cdef:
241 CChunkedArray* this_arr = self.chunked_array
242 CChunkedArray* other_arr = other.chunked_array
243 c_bool result
244
245 with nogil:
246 result = this_arr.Equals(deref(other_arr))
247
248 return result
249
250 def _to_pandas(self, options, **kwargs):
251 return _array_like_to_pandas(self, options)
252
253 def to_numpy(self):
254 """
255 Return a NumPy copy of this array (experimental).
256
257 Returns
258 -------
259 array : numpy.ndarray
260 """
261 cdef:
262 PyObject* out
263 PandasOptions c_options
264 object values
265
266 if self.type.id == _Type_EXTENSION:
267 storage_array = chunked_array(
268 [chunk.storage for chunk in self.iterchunks()],
269 type=self.type.storage_type
270 )
271 return storage_array.to_numpy()
272
273 with nogil:
274 check_status(
275 ConvertChunkedArrayToPandas(
276 c_options,
277 self.sp_chunked_array,
278 self,
279 &out
280 )
281 )
282
283 # wrap_array_output uses pandas to convert to Categorical, here
284 # always convert to numpy array
285 values = PyObject_to_object(out)
286
287 if isinstance(values, dict):
288 values = np.take(values['dictionary'], values['indices'])
289
290 return values
291
292 def __array__(self, dtype=None):
293 values = self.to_numpy()
294 if dtype is None:
295 return values
296 return values.astype(dtype)
297
298 def cast(self, object target_type, safe=True):
299 """
300 Cast array values to another data type
301
302 See pyarrow.compute.cast for usage
303 """
304 return _pc().cast(self, target_type, safe=safe)
305
306 def dictionary_encode(self, null_encoding='mask'):
307 """
308 Compute dictionary-encoded representation of array
309
310 Returns
311 -------
312 pyarrow.ChunkedArray
313 Same chunking as the input, all chunks share a common dictionary.
314 """
315 options = _pc().DictionaryEncodeOptions(null_encoding)
316 return _pc().call_function('dictionary_encode', [self], options)
317
318 def flatten(self, MemoryPool memory_pool=None):
319 """
320 Flatten this ChunkedArray. If it has a struct type, the column is
321 flattened into one array per struct field.
322
323 Parameters
324 ----------
325 memory_pool : MemoryPool, default None
326 For memory allocations, if required, otherwise use default pool
327
328 Returns
329 -------
330 result : List[ChunkedArray]
331 """
332 cdef:
333 vector[shared_ptr[CChunkedArray]] flattened
334 CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
335
336 with nogil:
337 flattened = GetResultValue(self.chunked_array.Flatten(pool))
338
339 return [pyarrow_wrap_chunked_array(col) for col in flattened]
340
341 def combine_chunks(self, MemoryPool memory_pool=None):
342 """
343 Flatten this ChunkedArray into a single non-chunked array.
344
345 Parameters
346 ----------
347 memory_pool : MemoryPool, default None
348 For memory allocations, if required, otherwise use default pool
349
350 Returns
351 -------
352 result : Array
353 """
354 return concat_arrays(self.chunks)
355
356 def unique(self):
357 """
358 Compute distinct elements in array
359
360 Returns
361 -------
362 pyarrow.Array
363 """
364 return _pc().call_function('unique', [self])
365
366 def value_counts(self):
367 """
368 Compute counts of unique elements in array.
369
370 Returns
371 -------
372 An array of <input type "Values", int64_t "Counts"> structs
373 """
374 return _pc().call_function('value_counts', [self])
375
376 def slice(self, offset=0, length=None):
377 """
378 Compute zero-copy slice of this ChunkedArray
379
380 Parameters
381 ----------
382 offset : int, default 0
383 Offset from start of array to slice
384 length : int, default None
385 Length of slice (default is until end of batch starting from
386 offset)
387
388 Returns
389 -------
390 sliced : ChunkedArray
391 """
392 cdef shared_ptr[CChunkedArray] result
393
394 if offset < 0:
395 raise IndexError('Offset must be non-negative')
396
397 offset = min(len(self), offset)
398 if length is None:
399 result = self.chunked_array.Slice(offset)
400 else:
401 result = self.chunked_array.Slice(offset, length)
402
403 return pyarrow_wrap_chunked_array(result)
404
405 def filter(self, mask, object null_selection_behavior="drop"):
406 """
407 Select values from a chunked array. See pyarrow.compute.filter for full
408 usage.
409 """
410 return _pc().filter(self, mask, null_selection_behavior)
411
412 def index(self, value, start=None, end=None, *, memory_pool=None):
413 """
414 Find the first index of a value.
415
416 See pyarrow.compute.index for full usage.
417 """
418 return _pc().index(self, value, start, end, memory_pool=memory_pool)
419
420 def take(self, object indices):
421 """
422 Select values from a chunked array. See pyarrow.compute.take for full
423 usage.
424 """
425 return _pc().take(self, indices)
426
427 def drop_null(self):
428 """
429 Remove missing values from a chunked array.
430 See pyarrow.compute.drop_null for full description.
431 """
432 return _pc().drop_null(self)
433
434 def unify_dictionaries(self, MemoryPool memory_pool=None):
435 """
436 Unify dictionaries across all chunks.
437
438 This method returns an equivalent chunked array, but where all
439 chunks share the same dictionary values. Dictionary indices are
440 transposed accordingly.
441
442 If there are no dictionaries in the chunked array, it is returned
443 unchanged.
444
445 Parameters
446 ----------
447 memory_pool : MemoryPool, default None
448 For memory allocations, if required, otherwise use default pool
449
450 Returns
451 -------
452 result : ChunkedArray
453 """
454 cdef:
455 CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
456 shared_ptr[CChunkedArray] c_result
457
458 with nogil:
459 c_result = GetResultValue(CDictionaryUnifier.UnifyChunkedArray(
460 self.sp_chunked_array, pool))
461
462 return pyarrow_wrap_chunked_array(c_result)
463
464 @property
465 def num_chunks(self):
466 """
467 Number of underlying chunks
468
469 Returns
470 -------
471 int
472 """
473 return self.chunked_array.num_chunks()
474
475 def chunk(self, i):
476 """
477 Select a chunk by its index
478
479 Parameters
480 ----------
481 i : int
482
483 Returns
484 -------
485 pyarrow.Array
486 """
487 if i >= self.num_chunks or i < 0:
488 raise IndexError('Chunk index out of range.')
489
490 return pyarrow_wrap_array(self.chunked_array.chunk(i))
491
492 @property
493 def chunks(self):
494 return list(self.iterchunks())
495
496 def iterchunks(self):
497 for i in range(self.num_chunks):
498 yield self.chunk(i)
499
500 def to_pylist(self):
501 """
502 Convert to a list of native Python objects.
503 """
504 result = []
505 for i in range(self.num_chunks):
506 result += self.chunk(i).to_pylist()
507 return result
508
509
510def chunked_array(arrays, type=None):
511 """
512 Construct chunked array from list of array-like objects
513
514 Parameters
515 ----------
516 arrays : Array, list of Array, or values coercible to arrays
517 Must all be the same data type. Can be empty only if type also passed.
518 type : DataType or string coercible to DataType
519
520 Returns
521 -------
522 ChunkedArray
523 """
524 cdef:
525 Array arr
526 vector[shared_ptr[CArray]] c_arrays
527 shared_ptr[CChunkedArray] sp_chunked_array
528
529 type = ensure_type(type, allow_none=True)
530
531 if isinstance(arrays, Array):
532 arrays = [arrays]
533
534 for x in arrays:
535 arr = x if isinstance(x, Array) else array(x, type=type)
536
537 if type is None:
538 # it allows more flexible chunked array construction from to coerce
539 # subsequent arrays to the firstly inferred array type
540 # it also spares the inference overhead after the first chunk
541 type = arr.type
542 else:
543 if arr.type != type:
544 raise TypeError(
545 "All array chunks must have type {}".format(type)
546 )
547
548 c_arrays.push_back(arr.sp_array)
549
550 if c_arrays.size() == 0 and type is None:
551 raise ValueError("When passing an empty collection of arrays "
552 "you must also pass the data type")
553
554 sp_chunked_array.reset(
555 new CChunkedArray(c_arrays, pyarrow_unwrap_data_type(type))
556 )
557 with nogil:
558 check_status(sp_chunked_array.get().Validate())
559
560 return pyarrow_wrap_chunked_array(sp_chunked_array)
561
562
563cdef _schema_from_arrays(arrays, names, metadata, shared_ptr[CSchema]* schema):
564 cdef:
565 Py_ssize_t K = len(arrays)
566 c_string c_name
567 shared_ptr[CDataType] c_type
568 shared_ptr[const CKeyValueMetadata] c_meta
569 vector[shared_ptr[CField]] c_fields
570
571 if metadata is not None:
572 c_meta = KeyValueMetadata(metadata).unwrap()
573
574 if K == 0:
575 if names is None or len(names) == 0:
576 schema.reset(new CSchema(c_fields, c_meta))
577 return arrays
578 else:
579 raise ValueError('Length of names ({}) does not match '
580 'length of arrays ({})'.format(len(names), K))
581
582 c_fields.resize(K)
583
584 if names is None:
585 raise ValueError('Must pass names or schema when constructing '
586 'Table or RecordBatch.')
587
588 if len(names) != K:
589 raise ValueError('Length of names ({}) does not match '
590 'length of arrays ({})'.format(len(names), K))
591
592 converted_arrays = []
593 for i in range(K):
594 val = arrays[i]
595 if not isinstance(val, (Array, ChunkedArray)):
596 val = array(val)
597
598 c_type = (<DataType> val.type).sp_type
599
600 if names[i] is None:
601 c_name = b'None'
602 else:
603 c_name = tobytes(names[i])
604 c_fields[i].reset(new CField(c_name, c_type, True))
605 converted_arrays.append(val)
606
607 schema.reset(new CSchema(c_fields, c_meta))
608 return converted_arrays
609
610
611cdef _sanitize_arrays(arrays, names, schema, metadata,
612 shared_ptr[CSchema]* c_schema):
613 cdef Schema cy_schema
614 if schema is None:
615 converted_arrays = _schema_from_arrays(arrays, names, metadata,
616 c_schema)
617 else:
618 if names is not None:
619 raise ValueError('Cannot pass both schema and names')
620 if metadata is not None:
621 raise ValueError('Cannot pass both schema and metadata')
622 cy_schema = schema
623
624 if len(schema) != len(arrays):
625 raise ValueError('Schema and number of arrays unequal')
626
627 c_schema[0] = cy_schema.sp_schema
628 converted_arrays = []
629 for i, item in enumerate(arrays):
630 item = asarray(item, type=schema[i].type)
631 converted_arrays.append(item)
632 return converted_arrays
633
634
635cdef class RecordBatch(_PandasConvertible):
636 """
637 Batch of rows of columns of equal length
638
639 Warnings
640 --------
641 Do not call this class's constructor directly, use one of the
642 ``RecordBatch.from_*`` functions instead.
643 """
644
645 def __cinit__(self):
646 self.batch = NULL
647 self._schema = None
648
649 def __init__(self):
650 raise TypeError("Do not call RecordBatch's constructor directly, use "
651 "one of the `RecordBatch.from_*` functions instead.")
652
653 cdef void init(self, const shared_ptr[CRecordBatch]& batch):
654 self.sp_batch = batch
655 self.batch = batch.get()
656
657 @staticmethod
658 def from_pydict(mapping, schema=None, metadata=None):
659 """
660 Construct a RecordBatch from Arrow arrays or columns.
661
662 Parameters
663 ----------
664 mapping : dict or Mapping
665 A mapping of strings to Arrays or Python lists.
666 schema : Schema, default None
667 If not passed, will be inferred from the Mapping values.
668 metadata : dict or Mapping, default None
669 Optional metadata for the schema (if inferred).
670
671 Returns
672 -------
673 RecordBatch
674 """
675
676 return _from_pydict(cls=RecordBatch,
677 mapping=mapping,
678 schema=schema,
679 metadata=metadata)
680
681 def __reduce__(self):
682 return _reconstruct_record_batch, (self.columns, self.schema)
683
684 def __len__(self):
685 return self.batch.num_rows()
686
687 def __eq__(self, other):
688 try:
689 return self.equals(other)
690 except TypeError:
691 return NotImplemented
692
693 def to_string(self, show_metadata=False):
694 # Use less verbose schema output.
695 schema_as_string = self.schema.to_string(
696 show_field_metadata=show_metadata,
697 show_schema_metadata=show_metadata
698 )
699 return 'pyarrow.{}\n{}'.format(type(self).__name__, schema_as_string)
700
701 def __repr__(self):
702 return self.to_string()
703
704 def validate(self, *, full=False):
705 """
706 Perform validation checks. An exception is raised if validation fails.
707
708 By default only cheap validation checks are run. Pass `full=True`
709 for thorough validation checks (potentially O(n)).
710
711 Parameters
712 ----------
713 full: bool, default False
714 If True, run expensive checks, otherwise cheap checks only.
715
716 Raises
717 ------
718 ArrowInvalid
719 """
720 if full:
721 with nogil:
722 check_status(self.batch.ValidateFull())
723 else:
724 with nogil:
725 check_status(self.batch.Validate())
726
727 def replace_schema_metadata(self, metadata=None):
728 """
729 Create shallow copy of record batch by replacing schema
730 key-value metadata with the indicated new metadata (which may be None,
731 which deletes any existing metadata
732
733 Parameters
734 ----------
735 metadata : dict, default None
736
737 Returns
738 -------
739 shallow_copy : RecordBatch
740 """
741 cdef:
742 shared_ptr[const CKeyValueMetadata] c_meta
743 shared_ptr[CRecordBatch] c_batch
744
745 metadata = ensure_metadata(metadata, allow_none=True)
746 c_meta = pyarrow_unwrap_metadata(metadata)
747 with nogil:
748 c_batch = self.batch.ReplaceSchemaMetadata(c_meta)
749
750 return pyarrow_wrap_batch(c_batch)
751
752 @property
753 def num_columns(self):
754 """
755 Number of columns
756
757 Returns
758 -------
759 int
760 """
761 return self.batch.num_columns()
762
763 @property
764 def num_rows(self):
765 """
766 Number of rows
767
768 Due to the definition of a RecordBatch, all columns have the same
769 number of rows.
770
771 Returns
772 -------
773 int
774 """
775 return len(self)
776
777 @property
778 def schema(self):
779 """
780 Schema of the RecordBatch and its columns
781
782 Returns
783 -------
784 pyarrow.Schema
785 """
786 if self._schema is None:
787 self._schema = pyarrow_wrap_schema(self.batch.schema())
788
789 return self._schema
790
791 def field(self, i):
792 """
793 Select a schema field by its column name or numeric index
794
795 Parameters
796 ----------
797 i : int or string
798 The index or name of the field to retrieve
799
800 Returns
801 -------
802 pyarrow.Field
803 """
804 return self.schema.field(i)
805
806 @property
807 def columns(self):
808 """
809 List of all columns in numerical order
810
811 Returns
812 -------
813 list of pa.Array
814 """
815 return [self.column(i) for i in range(self.num_columns)]
816
817 def _ensure_integer_index(self, i):
818 """
819 Ensure integer index (convert string column name to integer if needed).
820 """
821 if isinstance(i, (bytes, str)):
822 field_indices = self.schema.get_all_field_indices(i)
823
824 if len(field_indices) == 0:
825 raise KeyError(
826 "Field \"{}\" does not exist in record batch schema"
827 .format(i))
828 elif len(field_indices) > 1:
829 raise KeyError(
830 "Field \"{}\" exists {} times in record batch schema"
831 .format(i, len(field_indices)))
832 else:
833 return field_indices[0]
834 elif isinstance(i, int):
835 return i
836 else:
837 raise TypeError("Index must either be string or integer")
838
839 def column(self, i):
840 """
841 Select single column from record batch
842
843 Parameters
844 ----------
845 i : int or string
846 The index or name of the column to retrieve.
847
848 Returns
849 -------
850 column : pyarrow.Array
851 """
852 return self._column(self._ensure_integer_index(i))
853
854 def _column(self, int i):
855 """
856 Select single column from record batch by its numeric index.
857
858 Parameters
859 ----------
860 i : int
861 The index of the column to retrieve.
862
863 Returns
864 -------
865 column : pyarrow.Array
866 """
867 cdef int index = <int> _normalize_index(i, self.num_columns)
868 cdef Array result = pyarrow_wrap_array(self.batch.column(index))
869 result._name = self.schema[index].name
870 return result
871
872 @property
873 def nbytes(self):
874 """
875 Total number of bytes consumed by the elements of the record batch.
876 """
877 size = 0
878 for i in range(self.num_columns):
879 size += self.column(i).nbytes
880 return size
881
882 def __sizeof__(self):
883 return super(RecordBatch, self).__sizeof__() + self.nbytes
884
885 def __getitem__(self, key):
886 """
887 Slice or return column at given index or column name
888
889 Parameters
890 ----------
891 key : integer, str, or slice
892 Slices with step not equal to 1 (or None) will produce a copy
893 rather than a zero-copy view
894
895 Returns
896 -------
897 value : Array (index/column) or RecordBatch (slice)
898 """
899 if isinstance(key, slice):
900 return _normalize_slice(self, key)
901 else:
902 return self.column(key)
903
904 def serialize(self, memory_pool=None):
905 """
906 Write RecordBatch to Buffer as encapsulated IPC message.
907
908 Parameters
909 ----------
910 memory_pool : MemoryPool, default None
911 Uses default memory pool if not specified
912
913 Returns
914 -------
915 serialized : Buffer
916 """
917 cdef shared_ptr[CBuffer] buffer
918 cdef CIpcWriteOptions options = CIpcWriteOptions.Defaults()
919 options.memory_pool = maybe_unbox_memory_pool(memory_pool)
920
921 with nogil:
922 buffer = GetResultValue(
923 SerializeRecordBatch(deref(self.batch), options))
924 return pyarrow_wrap_buffer(buffer)
925
926 def slice(self, offset=0, length=None):
927 """
928 Compute zero-copy slice of this RecordBatch
929
930 Parameters
931 ----------
932 offset : int, default 0
933 Offset from start of record batch to slice
934 length : int, default None
935 Length of slice (default is until end of batch starting from
936 offset)
937
938 Returns
939 -------
940 sliced : RecordBatch
941 """
942 cdef shared_ptr[CRecordBatch] result
943
944 if offset < 0:
945 raise IndexError('Offset must be non-negative')
946
947 offset = min(len(self), offset)
948 if length is None:
949 result = self.batch.Slice(offset)
950 else:
951 result = self.batch.Slice(offset, length)
952
953 return pyarrow_wrap_batch(result)
954
955 def filter(self, mask, object null_selection_behavior="drop"):
956 """
957 Select record from a record batch. See pyarrow.compute.filter for full
958 usage.
959 """
960 return _pc().filter(self, mask, null_selection_behavior)
961
962 def equals(self, object other, bint check_metadata=False):
963 """
964 Check if contents of two record batches are equal.
965
966 Parameters
967 ----------
968 other : pyarrow.RecordBatch
969 RecordBatch to compare against.
970 check_metadata : bool, default False
971 Whether schema metadata equality should be checked as well.
972
973 Returns
974 -------
975 are_equal : bool
976 """
977 cdef:
978 CRecordBatch* this_batch = self.batch
979 shared_ptr[CRecordBatch] other_batch = pyarrow_unwrap_batch(other)
980 c_bool result
981
982 if not other_batch:
983 return False
984
985 with nogil:
986 result = this_batch.Equals(deref(other_batch), check_metadata)
987
988 return result
989
990 def take(self, object indices):
991 """
992 Select records from a RecordBatch. See pyarrow.compute.take for full
993 usage.
994 """
995 return _pc().take(self, indices)
996
997 def drop_null(self):
998 """
999 Remove missing values from a RecordBatch.
1000 See pyarrow.compute.drop_null for full usage.
1001 """
1002 return _pc().drop_null(self)
1003
1004 def to_pydict(self):
1005 """
1006 Convert the RecordBatch to a dict or OrderedDict.
1007
1008 Returns
1009 -------
1010 dict
1011 """
1012 entries = []
1013 for i in range(self.batch.num_columns()):
1014 name = bytes(self.batch.column_name(i)).decode('utf8')
1015 column = self[i].to_pylist()
1016 entries.append((name, column))
1017 return ordered_dict(entries)
1018
1019 def _to_pandas(self, options, **kwargs):
1020 return Table.from_batches([self])._to_pandas(options, **kwargs)
1021
1022 @classmethod
1023 def from_pandas(cls, df, Schema schema=None, preserve_index=None,
1024 nthreads=None, columns=None):
1025 """
1026 Convert pandas.DataFrame to an Arrow RecordBatch
1027
1028 Parameters
1029 ----------
1030 df : pandas.DataFrame
1031 schema : pyarrow.Schema, optional
1032 The expected schema of the RecordBatch. This can be used to
1033 indicate the type of columns if we cannot infer it automatically.
1034 If passed, the output will have exactly this schema. Columns
1035 specified in the schema that are not found in the DataFrame columns
1036 or its index will raise an error. Additional columns or index
1037 levels in the DataFrame which are not specified in the schema will
1038 be ignored.
1039 preserve_index : bool, optional
1040 Whether to store the index as an additional column in the resulting
1041 ``RecordBatch``. The default of None will store the index as a
1042 column, except for RangeIndex which is stored as metadata only. Use
1043 ``preserve_index=True`` to force it to be stored as a column.
1044 nthreads : int, default None (may use up to system CPU count threads)
1045 If greater than 1, convert columns to Arrow in parallel using
1046 indicated number of threads
1047 columns : list, optional
1048 List of column to be converted. If None, use all columns.
1049
1050 Returns
1051 -------
1052 pyarrow.RecordBatch
1053 """
1054 from pyarrow.pandas_compat import dataframe_to_arrays
1055 arrays, schema = dataframe_to_arrays(
1056 df, schema, preserve_index, nthreads=nthreads, columns=columns
1057 )
1058 return cls.from_arrays(arrays, schema=schema)
1059
1060 @staticmethod
1061 def from_arrays(list arrays, names=None, schema=None, metadata=None):
1062 """
1063 Construct a RecordBatch from multiple pyarrow.Arrays
1064
1065 Parameters
1066 ----------
1067 arrays : list of pyarrow.Array
1068 One for each field in RecordBatch
1069 names : list of str, optional
1070 Names for the batch fields. If not passed, schema must be passed
1071 schema : Schema, default None
1072 Schema for the created batch. If not passed, names must be passed
1073 metadata : dict or Mapping, default None
1074 Optional metadata for the schema (if inferred).
1075
1076 Returns
1077 -------
1078 pyarrow.RecordBatch
1079 """
1080 cdef:
1081 Array arr
1082 shared_ptr[CSchema] c_schema
1083 vector[shared_ptr[CArray]] c_arrays
1084 int64_t num_rows
1085
1086 if len(arrays) > 0:
1087 num_rows = len(arrays[0])
1088 else:
1089 num_rows = 0
1090
1091 if isinstance(names, Schema):
1092 import warnings
1093 warnings.warn("Schema passed to names= option, please "
1094 "pass schema= explicitly. "
1095 "Will raise exception in future", FutureWarning)
1096 schema = names
1097 names = None
1098
1099 converted_arrays = _sanitize_arrays(arrays, names, schema, metadata,
1100 &c_schema)
1101
1102 c_arrays.reserve(len(arrays))
1103 for arr in converted_arrays:
1104 if len(arr) != num_rows:
1105 raise ValueError('Arrays were not all the same length: '
1106 '{0} vs {1}'.format(len(arr), num_rows))
1107 c_arrays.push_back(arr.sp_array)
1108
1109 result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, num_rows,
1110 c_arrays))
1111 result.validate()
1112 return result
1113
1114 @staticmethod
1115 def from_struct_array(StructArray struct_array):
1116 """
1117 Construct a RecordBatch from a StructArray.
1118
1119 Each field in the StructArray will become a column in the resulting
1120 ``RecordBatch``.
1121
1122 Parameters
1123 ----------
1124 struct_array : StructArray
1125 Array to construct the record batch from.
1126
1127 Returns
1128 -------
1129 pyarrow.RecordBatch
1130 """
1131 cdef:
1132 shared_ptr[CRecordBatch] c_record_batch
1133 with nogil:
1134 c_record_batch = GetResultValue(
1135 CRecordBatch.FromStructArray(struct_array.sp_array))
1136 return pyarrow_wrap_batch(c_record_batch)
1137
1138 def _export_to_c(self, uintptr_t out_ptr, uintptr_t out_schema_ptr=0):
1139 """
1140 Export to a C ArrowArray struct, given its pointer.
1141
1142 If a C ArrowSchema struct pointer is also given, the record batch
1143 schema is exported to it at the same time.
1144
1145 Parameters
1146 ----------
1147 out_ptr: int
1148 The raw pointer to a C ArrowArray struct.
1149 out_schema_ptr: int (optional)
1150 The raw pointer to a C ArrowSchema struct.
1151
1152 Be careful: if you don't pass the ArrowArray struct to a consumer,
1153 array memory will leak. This is a low-level function intended for
1154 expert users.
1155 """
1156 with nogil:
1157 check_status(ExportRecordBatch(deref(self.sp_batch),
1158 <ArrowArray*> out_ptr,
1159 <ArrowSchema*> out_schema_ptr))
1160
1161 @staticmethod
1162 def _import_from_c(uintptr_t in_ptr, schema):
1163 """
1164 Import RecordBatch from a C ArrowArray struct, given its pointer
1165 and the imported schema.
1166
1167 Parameters
1168 ----------
1169 in_ptr: int
1170 The raw pointer to a C ArrowArray struct.
1171 type: Schema or int
1172 Either a Schema object, or the raw pointer to a C ArrowSchema
1173 struct.
1174
1175 This is a low-level function intended for expert users.
1176 """
1177 cdef:
1178 shared_ptr[CRecordBatch] c_batch
1179
1180 c_schema = pyarrow_unwrap_schema(schema)
1181 if c_schema == nullptr:
1182 # Not a Schema object, perhaps a raw ArrowSchema pointer
1183 schema_ptr = <uintptr_t> schema
1184 with nogil:
1185 c_batch = GetResultValue(ImportRecordBatch(
1186 <ArrowArray*> in_ptr, <ArrowSchema*> schema_ptr))
1187 else:
1188 with nogil:
1189 c_batch = GetResultValue(ImportRecordBatch(
1190 <ArrowArray*> in_ptr, c_schema))
1191 return pyarrow_wrap_batch(c_batch)
1192
1193
1194def _reconstruct_record_batch(columns, schema):
1195 """
1196 Internal: reconstruct RecordBatch from pickled components.
1197 """
1198 return RecordBatch.from_arrays(columns, schema=schema)
1199
1200
1201def table_to_blocks(options, Table table, categories, extension_columns):
1202 cdef:
1203 PyObject* result_obj
1204 shared_ptr[CTable] c_table
1205 CMemoryPool* pool
1206 PandasOptions c_options = _convert_pandas_options(options)
1207
1208 if categories is not None:
1209 c_options.categorical_columns = {tobytes(cat) for cat in categories}
1210 if extension_columns is not None:
1211 c_options.extension_columns = {tobytes(col)
1212 for col in extension_columns}
1213
1214 # ARROW-3789(wesm); Convert date/timestamp types to datetime64[ns]
1215 c_options.coerce_temporal_nanoseconds = True
1216
1217 if c_options.self_destruct:
1218 # Move the shared_ptr, table is now unsafe to use further
1219 c_table = move(table.sp_table)
1220 table.table = NULL
1221 else:
1222 c_table = table.sp_table
1223
1224 with nogil:
1225 check_status(
1226 libarrow.ConvertTableToPandas(c_options, move(c_table),
1227 &result_obj)
1228 )
1229
1230 return PyObject_to_object(result_obj)
1231
1232
1233cdef class Table(_PandasConvertible):
1234 """
1235 A collection of top-level named, equal length Arrow arrays.
1236
1237 Warning
1238 -------
1239 Do not call this class's constructor directly, use one of the ``from_*``
1240 methods instead.
1241 """
1242
1243 def __cinit__(self):
1244 self.table = NULL
1245
1246 def __init__(self):
1247 raise TypeError("Do not call Table's constructor directly, use one of "
1248 "the `Table.from_*` functions instead.")
1249
1250 def to_string(self, *, show_metadata=False, preview_cols=0):
1251 """
1252 Return human-readable string representation of Table.
1253
1254 Parameters
1255 ----------
1256 show_metadata : bool, default True
1257 Display Field-level and Schema-level KeyValueMetadata.
1258 preview_cols : int, default 0
1259 Display values of the columns for the first N columns.
1260
1261 Returns
1262 -------
1263 str
1264 """
1265 # Use less verbose schema output.
1266 schema_as_string = self.schema.to_string(
1267 show_field_metadata=show_metadata,
1268 show_schema_metadata=show_metadata
1269 )
1270 title = 'pyarrow.{}\n{}'.format(type(self).__name__, schema_as_string)
1271 pieces = [title]
1272 if preview_cols:
1273 pieces.append('----')
1274 for i in range(min(self.num_columns, preview_cols)):
1275 pieces.append('{}: {}'.format(
1276 self.field(i).name,
1277 self.column(i).to_string(indent=0, skip_new_lines=True)
1278 ))
1279 if preview_cols < self.num_columns:
1280 pieces.append('...')
1281 return '\n'.join(pieces)
1282
1283 def __repr__(self):
1284 if self.table == NULL:
1285 raise ValueError("Table's internal pointer is NULL, do not use "
1286 "any methods or attributes on this object")
1287 return self.to_string(preview_cols=10)
1288
1289 cdef void init(self, const shared_ptr[CTable]& table):
1290 self.sp_table = table
1291 self.table = table.get()
1292
1293 def validate(self, *, full=False):
1294 """
1295 Perform validation checks. An exception is raised if validation fails.
1296
1297 By default only cheap validation checks are run. Pass `full=True`
1298 for thorough validation checks (potentially O(n)).
1299
1300 Parameters
1301 ----------
1302 full: bool, default False
1303 If True, run expensive checks, otherwise cheap checks only.
1304
1305 Raises
1306 ------
1307 ArrowInvalid
1308 """
1309 if full:
1310 with nogil:
1311 check_status(self.table.ValidateFull())
1312 else:
1313 with nogil:
1314 check_status(self.table.Validate())
1315
1316 def __reduce__(self):
1317 # Reduce the columns as ChunkedArrays to avoid serializing schema
1318 # data twice
1319 columns = [col for col in self.columns]
1320 return _reconstruct_table, (columns, self.schema)
1321
1322 def __getitem__(self, key):
1323 """
1324 Slice or return column at given index or column name.
1325
1326 Parameters
1327 ----------
1328 key : integer, str, or slice
1329 Slices with step not equal to 1 (or None) will produce a copy
1330 rather than a zero-copy view.
1331
1332 Returns
1333 -------
1334 ChunkedArray (index/column) or Table (slice)
1335 """
1336 if isinstance(key, slice):
1337 return _normalize_slice(self, key)
1338 else:
1339 return self.column(key)
1340
1341 def slice(self, offset=0, length=None):
1342 """
1343 Compute zero-copy slice of this Table.
1344
1345 Parameters
1346 ----------
1347 offset : int, default 0
1348 Offset from start of table to slice.
1349 length : int, default None
1350 Length of slice (default is until end of table starting from
1351 offset).
1352
1353 Returns
1354 -------
1355 Table
1356 """
1357 cdef shared_ptr[CTable] result
1358
1359 if offset < 0:
1360 raise IndexError('Offset must be non-negative')
1361
1362 offset = min(len(self), offset)
1363 if length is None:
1364 result = self.table.Slice(offset)
1365 else:
1366 result = self.table.Slice(offset, length)
1367
1368 return pyarrow_wrap_table(result)
1369
1370 def filter(self, mask, object null_selection_behavior="drop"):
1371 """
1372 Select records from a Table. See :func:`pyarrow.compute.filter` for
1373 full usage.
1374 """
1375 return _pc().filter(self, mask, null_selection_behavior)
1376
1377 def take(self, object indices):
1378 """
1379 Select records from a Table. See :func:`pyarrow.compute.take` for full
1380 usage.
1381 """
1382 return _pc().take(self, indices)
1383
1384 def drop_null(self):
1385 """
1386 Remove missing values from a Table.
1387 See :func:`pyarrow.compute.drop_null` for full usage.
1388 """
1389 return _pc().drop_null(self)
1390
1391 def select(self, object columns):
1392 """
1393 Select columns of the Table.
1394
1395 Returns a new Table with the specified columns, and metadata
1396 preserved.
1397
1398 Parameters
1399 ----------
1400 columns : list-like
1401 The column names or integer indices to select.
1402
1403 Returns
1404 -------
1405 Table
1406 """
1407 cdef:
1408 shared_ptr[CTable] c_table
1409 vector[int] c_indices
1410
1411 for idx in columns:
1412 idx = self._ensure_integer_index(idx)
1413 idx = _normalize_index(idx, self.num_columns)
1414 c_indices.push_back(<int> idx)
1415
1416 with nogil:
1417 c_table = GetResultValue(self.table.SelectColumns(move(c_indices)))
1418
1419 return pyarrow_wrap_table(c_table)
1420
1421 def replace_schema_metadata(self, metadata=None):
1422 """
1423 Create shallow copy of table by replacing schema
1424 key-value metadata with the indicated new metadata (which may be None),
1425 which deletes any existing metadata.
1426
1427 Parameters
1428 ----------
1429 metadata : dict, default None
1430
1431 Returns
1432 -------
1433 Table
1434 """
1435 cdef:
1436 shared_ptr[const CKeyValueMetadata] c_meta
1437 shared_ptr[CTable] c_table
1438
1439 metadata = ensure_metadata(metadata, allow_none=True)
1440 c_meta = pyarrow_unwrap_metadata(metadata)
1441 with nogil:
1442 c_table = self.table.ReplaceSchemaMetadata(c_meta)
1443
1444 return pyarrow_wrap_table(c_table)
1445
1446 def flatten(self, MemoryPool memory_pool=None):
1447 """
1448 Flatten this Table.
1449
1450 Each column with a struct type is flattened
1451 into one column per struct field. Other columns are left unchanged.
1452
1453 Parameters
1454 ----------
1455 memory_pool : MemoryPool, default None
1456 For memory allocations, if required, otherwise use default pool
1457
1458 Returns
1459 -------
1460 Table
1461 """
1462 cdef:
1463 shared_ptr[CTable] flattened
1464 CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
1465
1466 with nogil:
1467 flattened = GetResultValue(self.table.Flatten(pool))
1468
1469 return pyarrow_wrap_table(flattened)
1470
1471 def combine_chunks(self, MemoryPool memory_pool=None):
1472 """
1473 Make a new table by combining the chunks this table has.
1474
1475 All the underlying chunks in the ChunkedArray of each column are
1476 concatenated into zero or one chunk.
1477
1478 Parameters
1479 ----------
1480 memory_pool : MemoryPool, default None
1481 For memory allocations, if required, otherwise use default pool.
1482
1483 Returns
1484 -------
1485 Table
1486 """
1487 cdef:
1488 shared_ptr[CTable] combined
1489 CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
1490
1491 with nogil:
1492 combined = GetResultValue(self.table.CombineChunks(pool))
1493
1494 return pyarrow_wrap_table(combined)
1495
1496 def unify_dictionaries(self, MemoryPool memory_pool=None):
1497 """
1498 Unify dictionaries across all chunks.
1499
1500 This method returns an equivalent table, but where all chunks of
1501 each column share the same dictionary values. Dictionary indices
1502 are transposed accordingly.
1503
1504 Columns without dictionaries are returned unchanged.
1505
1506 Parameters
1507 ----------
1508 memory_pool : MemoryPool, default None
1509 For memory allocations, if required, otherwise use default pool
1510
1511 Returns
1512 -------
1513 Table
1514 """
1515 cdef:
1516 CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
1517 shared_ptr[CTable] c_result
1518
1519 with nogil:
1520 c_result = GetResultValue(CDictionaryUnifier.UnifyTable(
1521 deref(self.table), pool))
1522
1523 return pyarrow_wrap_table(c_result)
1524
1525 def __eq__(self, other):
1526 try:
1527 return self.equals(other)
1528 except TypeError:
1529 return NotImplemented
1530
1531 def equals(self, Table other, bint check_metadata=False):
1532 """
1533 Check if contents of two tables are equal.
1534
1535 Parameters
1536 ----------
1537 other : pyarrow.Table
1538 Table to compare against.
1539 check_metadata : bool, default False
1540 Whether schema metadata equality should be checked as well.
1541
1542 Returns
1543 -------
1544 bool
1545 """
1546 if other is None:
1547 return False
1548
1549 cdef:
1550 CTable* this_table = self.table
1551 CTable* other_table = other.table
1552 c_bool result
1553
1554 with nogil:
1555 result = this_table.Equals(deref(other_table), check_metadata)
1556
1557 return result
1558
1559 def cast(self, Schema target_schema, bint safe=True):
1560 """
1561 Cast table values to another schema.
1562
1563 Parameters
1564 ----------
1565 target_schema : Schema
1566 Schema to cast to, the names and order of fields must match.
1567 safe : bool, default True
1568 Check for overflows or other unsafe conversions.
1569
1570 Returns
1571 -------
1572 Table
1573 """
1574 cdef:
1575 ChunkedArray column, casted
1576 Field field
1577 list newcols = []
1578
1579 if self.schema.names != target_schema.names:
1580 raise ValueError("Target schema's field names are not matching "
1581 "the table's field names: {!r}, {!r}"
1582 .format(self.schema.names, target_schema.names))
1583
1584 for column, field in zip(self.itercolumns(), target_schema):
1585 casted = column.cast(field.type, safe=safe)
1586 newcols.append(casted)
1587
1588 return Table.from_arrays(newcols, schema=target_schema)
1589
1590 @classmethod
1591 def from_pandas(cls, df, Schema schema=None, preserve_index=None,
1592 nthreads=None, columns=None, bint safe=True):
1593 """
1594 Convert pandas.DataFrame to an Arrow Table.
1595
1596 The column types in the resulting Arrow Table are inferred from the
1597 dtypes of the pandas.Series in the DataFrame. In the case of non-object
1598 Series, the NumPy dtype is translated to its Arrow equivalent. In the
1599 case of `object`, we need to guess the datatype by looking at the
1600 Python objects in this Series.
1601
1602 Be aware that Series of the `object` dtype don't carry enough
1603 information to always lead to a meaningful Arrow type. In the case that
1604 we cannot infer a type, e.g. because the DataFrame is of length 0 or
1605 the Series only contains None/nan objects, the type is set to
1606 null. This behavior can be avoided by constructing an explicit schema
1607 and passing it to this function.
1608
1609 Parameters
1610 ----------
1611 df : pandas.DataFrame
1612 schema : pyarrow.Schema, optional
1613 The expected schema of the Arrow Table. This can be used to
1614 indicate the type of columns if we cannot infer it automatically.
1615 If passed, the output will have exactly this schema. Columns
1616 specified in the schema that are not found in the DataFrame columns
1617 or its index will raise an error. Additional columns or index
1618 levels in the DataFrame which are not specified in the schema will
1619 be ignored.
1620 preserve_index : bool, optional
1621 Whether to store the index as an additional column in the resulting
1622 ``Table``. The default of None will store the index as a column,
1623 except for RangeIndex which is stored as metadata only. Use
1624 ``preserve_index=True`` to force it to be stored as a column.
1625 nthreads : int, default None (may use up to system CPU count threads)
1626 If greater than 1, convert columns to Arrow in parallel using
1627 indicated number of threads.
1628 columns : list, optional
1629 List of column to be converted. If None, use all columns.
1630 safe : bool, default True
1631 Check for overflows or other unsafe conversions.
1632
1633 Returns
1634 -------
1635 Table
1636
1637 Examples
1638 --------
1639
1640 >>> import pandas as pd
1641 >>> import pyarrow as pa
1642 >>> df = pd.DataFrame({
1643 ... 'int': [1, 2],
1644 ... 'str': ['a', 'b']
1645 ... })
1646 >>> pa.Table.from_pandas(df)
1647 <pyarrow.lib.Table object at 0x7f05d1fb1b40>
1648 """
1649 from pyarrow.pandas_compat import dataframe_to_arrays
1650 arrays, schema = dataframe_to_arrays(
1651 df,
1652 schema=schema,
1653 preserve_index=preserve_index,
1654 nthreads=nthreads,
1655 columns=columns,
1656 safe=safe
1657 )
1658 return cls.from_arrays(arrays, schema=schema)
1659
1660 @staticmethod
1661 def from_arrays(arrays, names=None, schema=None, metadata=None):
1662 """
1663 Construct a Table from Arrow arrays.
1664
1665 Parameters
1666 ----------
1667 arrays : list of pyarrow.Array or pyarrow.ChunkedArray
1668 Equal-length arrays that should form the table.
1669 names : list of str, optional
1670 Names for the table columns. If not passed, schema must be passed.
1671 schema : Schema, default None
1672 Schema for the created table. If not passed, names must be passed.
1673 metadata : dict or Mapping, default None
1674 Optional metadata for the schema (if inferred).
1675
1676 Returns
1677 -------
1678 Table
1679 """
1680 cdef:
1681 vector[shared_ptr[CChunkedArray]] columns
1682 shared_ptr[CSchema] c_schema
1683 int i, K = <int> len(arrays)
1684
1685 converted_arrays = _sanitize_arrays(arrays, names, schema, metadata,
1686 &c_schema)
1687
1688 columns.reserve(K)
1689 for item in converted_arrays:
1690 if isinstance(item, Array):
1691 columns.push_back(
1692 make_shared[CChunkedArray](
1693 (<Array> item).sp_array
1694 )
1695 )
1696 elif isinstance(item, ChunkedArray):
1697 columns.push_back((<ChunkedArray> item).sp_chunked_array)
1698 else:
1699 raise TypeError(type(item))
1700
1701 result = pyarrow_wrap_table(CTable.Make(c_schema, columns))
1702 result.validate()
1703 return result
1704
1705 @staticmethod
1706 def from_pydict(mapping, schema=None, metadata=None):
1707 """
1708 Construct a Table from Arrow arrays or columns.
1709
1710 Parameters
1711 ----------
1712 mapping : dict or Mapping
1713 A mapping of strings to Arrays or Python lists.
1714 schema : Schema, default None
1715 If not passed, will be inferred from the Mapping values.
1716 metadata : dict or Mapping, default None
1717 Optional metadata for the schema (if inferred).
1718
1719 Returns
1720 -------
1721 Table
1722 """
1723
1724 return _from_pydict(cls=Table,
1725 mapping=mapping,
1726 schema=schema,
1727 metadata=metadata)
1728
1729 @staticmethod
1730 def from_batches(batches, Schema schema=None):
1731 """
1732 Construct a Table from a sequence or iterator of Arrow RecordBatches.
1733
1734 Parameters
1735 ----------
1736 batches : sequence or iterator of RecordBatch
1737 Sequence of RecordBatch to be converted, all schemas must be equal.
1738 schema : Schema, default None
1739 If not passed, will be inferred from the first RecordBatch.
1740
1741 Returns
1742 -------
1743 Table
1744 """
1745 cdef:
1746 vector[shared_ptr[CRecordBatch]] c_batches
1747 shared_ptr[CTable] c_table
1748 shared_ptr[CSchema] c_schema
1749 RecordBatch batch
1750
1751 for batch in batches:
1752 c_batches.push_back(batch.sp_batch)
1753
1754 if schema is None:
1755 if c_batches.size() == 0:
1756 raise ValueError('Must pass schema, or at least '
1757 'one RecordBatch')
1758 c_schema = c_batches[0].get().schema()
1759 else:
1760 c_schema = schema.sp_schema
1761
1762 with nogil:
1763 c_table = GetResultValue(
1764 CTable.FromRecordBatches(c_schema, move(c_batches)))
1765
1766 return pyarrow_wrap_table(c_table)
1767
1768 def to_batches(self, max_chunksize=None, **kwargs):
1769 """
1770 Convert Table to list of (contiguous) RecordBatch objects.
1771
1772 Parameters
1773 ----------
1774 max_chunksize : int, default None
1775 Maximum size for RecordBatch chunks. Individual chunks may be
1776 smaller depending on the chunk layout of individual columns.
1777
1778 Returns
1779 -------
1780 list of RecordBatch
1781 """
1782 cdef:
1783 unique_ptr[TableBatchReader] reader
1784 int64_t c_max_chunksize
1785 list result = []
1786 shared_ptr[CRecordBatch] batch
1787
1788 reader.reset(new TableBatchReader(deref(self.table)))
1789
1790 if 'chunksize' in kwargs:
1791 max_chunksize = kwargs['chunksize']
1792 msg = ('The parameter chunksize is deprecated for '
1793 'pyarrow.Table.to_batches as of 0.15, please use '
1794 'the parameter max_chunksize instead')
1795 warnings.warn(msg, FutureWarning)
1796
1797 if max_chunksize is not None:
1798 c_max_chunksize = max_chunksize
1799 reader.get().set_chunksize(c_max_chunksize)
1800
1801 while True:
1802 with nogil:
1803 check_status(reader.get().ReadNext(&batch))
1804
1805 if batch.get() == NULL:
1806 break
1807
1808 result.append(pyarrow_wrap_batch(batch))
1809
1810 return result
1811
1812 def _to_pandas(self, options, categories=None, ignore_metadata=False,
1813 types_mapper=None):
1814 from pyarrow.pandas_compat import table_to_blockmanager
1815 mgr = table_to_blockmanager(
1816 options, self, categories,
1817 ignore_metadata=ignore_metadata,
1818 types_mapper=types_mapper)
1819 return pandas_api.data_frame(mgr)
1820
1821 def to_pydict(self):
1822 """
1823 Convert the Table to a dict or OrderedDict.
1824
1825 Returns
1826 -------
1827 dict
1828 """
1829 cdef:
1830 size_t i
1831 size_t num_columns = self.table.num_columns()
1832 list entries = []
1833 ChunkedArray column
1834
1835 for i in range(num_columns):
1836 column = self.column(i)
1837 entries.append((self.field(i).name, column.to_pylist()))
1838
1839 return ordered_dict(entries)
1840
1841 @property
1842 def schema(self):
1843 """
1844 Schema of the table and its columns.
1845
1846 Returns
1847 -------
1848 Schema
1849 """
1850 return pyarrow_wrap_schema(self.table.schema())
1851
1852 def field(self, i):
1853 """
1854 Select a schema field by its column name or numeric index.
1855
1856 Parameters
1857 ----------
1858 i : int or string
1859 The index or name of the field to retrieve.
1860
1861 Returns
1862 -------
1863 Field
1864 """
1865 return self.schema.field(i)
1866
1867 def _ensure_integer_index(self, i):
1868 """
1869 Ensure integer index (convert string column name to integer if needed).
1870 """
1871 if isinstance(i, (bytes, str)):
1872 field_indices = self.schema.get_all_field_indices(i)
1873
1874 if len(field_indices) == 0:
1875 raise KeyError("Field \"{}\" does not exist in table schema"
1876 .format(i))
1877 elif len(field_indices) > 1:
1878 raise KeyError("Field \"{}\" exists {} times in table schema"
1879 .format(i, len(field_indices)))
1880 else:
1881 return field_indices[0]
1882 elif isinstance(i, int):
1883 return i
1884 else:
1885 raise TypeError("Index must either be string or integer")
1886
1887 def column(self, i):
1888 """
1889 Select a column by its column name, or numeric index.
1890
1891 Parameters
1892 ----------
1893 i : int or string
1894 The index or name of the column to retrieve.
1895
1896 Returns
1897 -------
1898 ChunkedArray
1899 """
1900 return self._column(self._ensure_integer_index(i))
1901
1902 def _column(self, int i):
1903 """
1904 Select a column by its numeric index.
1905
1906 Parameters
1907 ----------
1908 i : int
1909 The index of the column to retrieve.
1910
1911 Returns
1912 -------
1913 ChunkedArray
1914 """
1915 cdef int index = <int> _normalize_index(i, self.num_columns)
1916 cdef ChunkedArray result = pyarrow_wrap_chunked_array(
1917 self.table.column(index))
1918 result._name = self.schema[index].name
1919 return result
1920
1921 def itercolumns(self):
1922 """
1923 Iterator over all columns in their numerical order.
1924
1925 Yields
1926 ------
1927 ChunkedArray
1928 """
1929 for i in range(self.num_columns):
1930 yield self._column(i)
1931
1932 @property
1933 def columns(self):
1934 """
1935 List of all columns in numerical order.
1936
1937 Returns
1938 -------
1939 list of ChunkedArray
1940 """
1941 return [self._column(i) for i in range(self.num_columns)]
1942
1943 @property
1944 def num_columns(self):
1945 """
1946 Number of columns in this table.
1947
1948 Returns
1949 -------
1950 int
1951 """
1952 return self.table.num_columns()
1953
1954 @property
1955 def num_rows(self):
1956 """
1957 Number of rows in this table.
1958
1959 Due to the definition of a table, all columns have the same number of
1960 rows.
1961
1962 Returns
1963 -------
1964 int
1965 """
1966 return self.table.num_rows()
1967
1968 def __len__(self):
1969 return self.num_rows
1970
1971 @property
1972 def shape(self):
1973 """
1974 Dimensions of the table: (#rows, #columns).
1975
1976 Returns
1977 -------
1978 (int, int)
1979 Number of rows and number of columns.
1980 """
1981 return (self.num_rows, self.num_columns)
1982
1983 @property
1984 def nbytes(self):
1985 """
1986 Total number of bytes consumed by the elements of the table.
1987
1988 Returns
1989 -------
1990 int
1991 """
1992 size = 0
1993 for column in self.itercolumns():
1994 size += column.nbytes
1995 return size
1996
1997 def __sizeof__(self):
1998 return super(Table, self).__sizeof__() + self.nbytes
1999
2000 def add_column(self, int i, field_, column):
2001 """
2002 Add column to Table at position.
2003
2004 A new table is returned with the column added, the original table
2005 object is left unchanged.
2006
2007 Parameters
2008 ----------
2009 i : int
2010 Index to place the column at.
2011 field_ : str or Field
2012 If a string is passed then the type is deduced from the column
2013 data.
2014 column : Array, list of Array, or values coercible to arrays
2015 Column data.
2016
2017 Returns
2018 -------
2019 Table
2020 New table with the passed column added.
2021 """
2022 cdef:
2023 shared_ptr[CTable] c_table
2024 Field c_field
2025 ChunkedArray c_arr
2026
2027 if isinstance(column, ChunkedArray):
2028 c_arr = column
2029 else:
2030 c_arr = chunked_array(column)
2031
2032 if isinstance(field_, Field):
2033 c_field = field_
2034 else:
2035 c_field = field(field_, c_arr.type)
2036
2037 with nogil:
2038 c_table = GetResultValue(self.table.AddColumn(
2039 i, c_field.sp_field, c_arr.sp_chunked_array))
2040
2041 return pyarrow_wrap_table(c_table)
2042
2043 def append_column(self, field_, column):
2044 """
2045 Append column at end of columns.
2046
2047 Parameters
2048 ----------
2049 field_ : str or Field
2050 If a string is passed then the type is deduced from the column
2051 data.
2052 column : Array, list of Array, or values coercible to arrays
2053 Column data.
2054
2055 Returns
2056 -------
2057 Table
2058 New table with the passed column added.
2059 """
2060 return self.add_column(self.num_columns, field_, column)
2061
2062 def remove_column(self, int i):
2063 """
2064 Create new Table with the indicated column removed.
2065
2066 Parameters
2067 ----------
2068 i : int
2069 Index of column to remove.
2070
2071 Returns
2072 -------
2073 Table
2074 New table without the column.
2075 """
2076 cdef shared_ptr[CTable] c_table
2077
2078 with nogil:
2079 c_table = GetResultValue(self.table.RemoveColumn(i))
2080
2081 return pyarrow_wrap_table(c_table)
2082
2083 def set_column(self, int i, field_, column):
2084 """
2085 Replace column in Table at position.
2086
2087 Parameters
2088 ----------
2089 i : int
2090 Index to place the column at.
2091 field_ : str or Field
2092 If a string is passed then the type is deduced from the column
2093 data.
2094 column : Array, list of Array, or values coercible to arrays
2095 Column data.
2096
2097 Returns
2098 -------
2099 Table
2100 New table with the passed column set.
2101 """
2102 cdef:
2103 shared_ptr[CTable] c_table
2104 Field c_field
2105 ChunkedArray c_arr
2106
2107 if isinstance(column, ChunkedArray):
2108 c_arr = column
2109 else:
2110 c_arr = chunked_array(column)
2111
2112 if isinstance(field_, Field):
2113 c_field = field_
2114 else:
2115 c_field = field(field_, c_arr.type)
2116
2117 with nogil:
2118 c_table = GetResultValue(self.table.SetColumn(
2119 i, c_field.sp_field, c_arr.sp_chunked_array))
2120
2121 return pyarrow_wrap_table(c_table)
2122
2123 @property
2124 def column_names(self):
2125 """
2126 Names of the table's columns.
2127
2128 Returns
2129 -------
2130 list of str
2131 """
2132 names = self.table.ColumnNames()
2133 return [frombytes(name) for name in names]
2134
2135 def rename_columns(self, names):
2136 """
2137 Create new table with columns renamed to provided names.
2138
2139 Parameters
2140 ----------
2141 names : list of str
2142 List of new column names.
2143
2144 Returns
2145 -------
2146 Table
2147 """
2148 cdef:
2149 shared_ptr[CTable] c_table
2150 vector[c_string] c_names
2151
2152 for name in names:
2153 c_names.push_back(tobytes(name))
2154
2155 with nogil:
2156 c_table = GetResultValue(self.table.RenameColumns(move(c_names)))
2157
2158 return pyarrow_wrap_table(c_table)
2159
2160 def drop(self, columns):
2161 """
2162 Drop one or more columns and return a new table.
2163
2164 Parameters
2165 ----------
2166 columns : list of str
2167 List of field names referencing existing columns.
2168
2169 Raises
2170 ------
2171 KeyError
2172 If any of the passed columns name are not existing.
2173
2174 Returns
2175 -------
2176 Table
2177 New table without the columns.
2178 """
2179 indices = []
2180 for col in columns:
2181 idx = self.schema.get_field_index(col)
2182 if idx == -1:
2183 raise KeyError("Column {!r} not found".format(col))
2184 indices.append(idx)
2185
2186 indices.sort()
2187 indices.reverse()
2188
2189 table = self
2190 for idx in indices:
2191 table = table.remove_column(idx)
2192
2193 return table
2194
2195
2196def _reconstruct_table(arrays, schema):
2197 """
2198 Internal: reconstruct pa.Table from pickled components.
2199 """
2200 return Table.from_arrays(arrays, schema=schema)
2201
2202
2203def record_batch(data, names=None, schema=None, metadata=None):
2204 """
2205 Create a pyarrow.RecordBatch from another Python data structure or sequence
2206 of arrays.
2207
2208 Parameters
2209 ----------
2210 data : pandas.DataFrame, list
2211 A DataFrame or list of arrays or chunked arrays.
2212 names : list, default None
2213 Column names if list of arrays passed as data. Mutually exclusive with
2214 'schema' argument.
2215 schema : Schema, default None
2216 The expected schema of the RecordBatch. If not passed, will be inferred
2217 from the data. Mutually exclusive with 'names' argument.
2218 metadata : dict or Mapping, default None
2219 Optional metadata for the schema (if schema not passed).
2220
2221 Returns
2222 -------
2223 RecordBatch
2224
2225 See Also
2226 --------
2227 RecordBatch.from_arrays, RecordBatch.from_pandas, table
2228 """
2229 # accept schema as first argument for backwards compatibility / usability
2230 if isinstance(names, Schema) and schema is None:
2231 schema = names
2232 names = None
2233
2234 if isinstance(data, (list, tuple)):
2235 return RecordBatch.from_arrays(data, names=names, schema=schema,
2236 metadata=metadata)
2237 elif _pandas_api.is_data_frame(data):
2238 return RecordBatch.from_pandas(data, schema=schema)
2239 else:
2240 raise TypeError("Expected pandas DataFrame or list of arrays")
2241
2242
2243def table(data, names=None, schema=None, metadata=None, nthreads=None):
2244 """
2245 Create a pyarrow.Table from a Python data structure or sequence of arrays.
2246
2247 Parameters
2248 ----------
2249 data : pandas.DataFrame, dict, list
2250 A DataFrame, mapping of strings to Arrays or Python lists, or list of
2251 arrays or chunked arrays.
2252 names : list, default None
2253 Column names if list of arrays passed as data. Mutually exclusive with
2254 'schema' argument.
2255 schema : Schema, default None
2256 The expected schema of the Arrow Table. If not passed, will be inferred
2257 from the data. Mutually exclusive with 'names' argument.
2258 If passed, the output will have exactly this schema (raising an error
2259 when columns are not found in the data and ignoring additional data not
2260 specified in the schema, when data is a dict or DataFrame).
2261 metadata : dict or Mapping, default None
2262 Optional metadata for the schema (if schema not passed).
2263 nthreads : int, default None (may use up to system CPU count threads)
2264 For pandas.DataFrame inputs: if greater than 1, convert columns to
2265 Arrow in parallel using indicated number of threads.
2266
2267 Returns
2268 -------
2269 Table
2270
2271 See Also
2272 --------
2273 Table.from_arrays, Table.from_pandas, Table.from_pydict
2274 """
2275 # accept schema as first argument for backwards compatibility / usability
2276 if isinstance(names, Schema) and schema is None:
2277 schema = names
2278 names = None
2279
2280 if isinstance(data, (list, tuple)):
2281 return Table.from_arrays(data, names=names, schema=schema,
2282 metadata=metadata)
2283 elif isinstance(data, dict):
2284 if names is not None:
2285 raise ValueError(
2286 "The 'names' argument is not valid when passing a dictionary")
2287 return Table.from_pydict(data, schema=schema, metadata=metadata)
2288 elif _pandas_api.is_data_frame(data):
2289 if names is not None or metadata is not None:
2290 raise ValueError(
2291 "The 'names' and 'metadata' arguments are not valid when "
2292 "passing a pandas DataFrame")
2293 return Table.from_pandas(data, schema=schema, nthreads=nthreads)
2294 else:
2295 raise TypeError(
2296 "Expected pandas DataFrame, python dictionary or list of arrays")
2297
2298
2299def concat_tables(tables, c_bool promote=False, MemoryPool memory_pool=None):
2300 """
2301 Concatenate pyarrow.Table objects.
2302
2303 If promote==False, a zero-copy concatenation will be performed. The schemas
2304 of all the Tables must be the same (except the metadata), otherwise an
2305 exception will be raised. The result Table will share the metadata with the
2306 first table.
2307
2308 If promote==True, any null type arrays will be casted to the type of other
2309 arrays in the column of the same name. If a table is missing a particular
2310 field, null values of the appropriate type will be generated to take the
2311 place of the missing field. The new schema will share the metadata with the
2312 first table. Each field in the new schema will share the metadata with the
2313 first table which has the field defined. Note that type promotions may
2314 involve additional allocations on the given ``memory_pool``.
2315
2316 Parameters
2317 ----------
2318 tables : iterable of pyarrow.Table objects
2319 Pyarrow tables to concatenate into a single Table.
2320 promote : bool, default False
2321 If True, concatenate tables with null-filling and null type promotion.
2322 memory_pool : MemoryPool, default None
2323 For memory allocations, if required, otherwise use default pool.
2324 """
2325 cdef:
2326 vector[shared_ptr[CTable]] c_tables
2327 shared_ptr[CTable] c_result_table
2328 CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
2329 Table table
2330 CConcatenateTablesOptions options = (
2331 CConcatenateTablesOptions.Defaults())
2332
2333 for table in tables:
2334 c_tables.push_back(table.sp_table)
2335
2336 with nogil:
2337 options.unify_schemas = promote
2338 c_result_table = GetResultValue(
2339 ConcatenateTables(c_tables, options, pool))
2340
2341 return pyarrow_wrap_table(c_result_table)
2342
2343
2344def _from_pydict(cls, mapping, schema, metadata):
2345 """
2346 Construct a Table/RecordBatch from Arrow arrays or columns.
2347
2348 Parameters
2349 ----------
2350 cls : Class Table/RecordBatch
2351 mapping : dict or Mapping
2352 A mapping of strings to Arrays or Python lists.
2353 schema : Schema, default None
2354 If not passed, will be inferred from the Mapping values.
2355 metadata : dict or Mapping, default None
2356 Optional metadata for the schema (if inferred).
2357
2358 Returns
2359 -------
2360 Table/RecordBatch
2361 """
2362
2363 arrays = []
2364 if schema is None:
2365 names = []
2366 for k, v in mapping.items():
2367 names.append(k)
2368 arrays.append(asarray(v))
2369 return cls.from_arrays(arrays, names, metadata=metadata)
2370 elif isinstance(schema, Schema):
2371 for field in schema:
2372 try:
2373 v = mapping[field.name]
2374 except KeyError:
2375 try:
2376 v = mapping[tobytes(field.name)]
2377 except KeyError:
2378 present = mapping.keys()
2379 missing = [n for n in schema.names if n not in present]
2380 raise KeyError(
2381 "The passed mapping doesn't contain the "
2382 "following field(s) of the schema: {}".
2383 format(', '.join(missing))
2384 )
2385 arrays.append(asarray(v, type=field.type))
2386 # Will raise if metadata is not None
2387 return cls.from_arrays(arrays, schema=schema, metadata=metadata)
2388 else:
2389 raise TypeError('Schema must be an instance of pyarrow.Schema')