]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/python/pyarrow/ipc.pxi
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / ipc.pxi
CommitLineData
1d09f67e
TL
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from collections import namedtuple
19import warnings
20
21
22cpdef enum MetadataVersion:
23 V1 = <char> CMetadataVersion_V1
24 V2 = <char> CMetadataVersion_V2
25 V3 = <char> CMetadataVersion_V3
26 V4 = <char> CMetadataVersion_V4
27 V5 = <char> CMetadataVersion_V5
28
29
30cdef object _wrap_metadata_version(CMetadataVersion version):
31 return MetadataVersion(<char> version)
32
33
34cdef CMetadataVersion _unwrap_metadata_version(
35 MetadataVersion version) except *:
36 if version == MetadataVersion.V1:
37 return CMetadataVersion_V1
38 elif version == MetadataVersion.V2:
39 return CMetadataVersion_V2
40 elif version == MetadataVersion.V3:
41 return CMetadataVersion_V3
42 elif version == MetadataVersion.V4:
43 return CMetadataVersion_V4
44 elif version == MetadataVersion.V5:
45 return CMetadataVersion_V5
46 raise ValueError("Not a metadata version: " + repr(version))
47
48
49_WriteStats = namedtuple(
50 'WriteStats',
51 ('num_messages', 'num_record_batches', 'num_dictionary_batches',
52 'num_dictionary_deltas', 'num_replaced_dictionaries'))
53
54
55class WriteStats(_WriteStats):
56 """IPC write statistics
57
58 Parameters
59 ----------
60 num_messages : number of messages.
61 num_record_batches : number of record batches.
62 num_dictionary_batches : number of dictionary batches.
63 num_dictionary_deltas : delta of dictionaries.
64 num_replaced_dictionaries : number of replaced dictionaries.
65 """
66 __slots__ = ()
67
68
69@staticmethod
70cdef _wrap_write_stats(CIpcWriteStats c):
71 return WriteStats(c.num_messages, c.num_record_batches,
72 c.num_dictionary_batches, c.num_dictionary_deltas,
73 c.num_replaced_dictionaries)
74
75
76_ReadStats = namedtuple(
77 'ReadStats',
78 ('num_messages', 'num_record_batches', 'num_dictionary_batches',
79 'num_dictionary_deltas', 'num_replaced_dictionaries'))
80
81
82class ReadStats(_ReadStats):
83 """IPC read statistics
84
85 Parameters
86 ----------
87 num_messages : number of messages.
88 num_record_batches : number of record batches.
89 num_dictionary_batches : number of dictionary batches.
90 num_dictionary_deltas : delta of dictionaries.
91 num_replaced_dictionaries : number of replaced dictionaries.
92 """
93 __slots__ = ()
94
95
96@staticmethod
97cdef _wrap_read_stats(CIpcReadStats c):
98 return ReadStats(c.num_messages, c.num_record_batches,
99 c.num_dictionary_batches, c.num_dictionary_deltas,
100 c.num_replaced_dictionaries)
101
102
103cdef class IpcWriteOptions(_Weakrefable):
104 """
105 Serialization options for the IPC format.
106
107 Parameters
108 ----------
109 metadata_version : MetadataVersion, default MetadataVersion.V5
110 The metadata version to write. V5 is the current and latest,
111 V4 is the pre-1.0 metadata version (with incompatible Union layout).
112 allow_64bit : bool, default False
113 If true, allow field lengths that don't fit in a signed 32-bit int.
114 use_legacy_format : bool, default False
115 Whether to use the pre-Arrow 0.15 IPC format.
116 compression : str, Codec, or None
117 compression codec to use for record batch buffers.
118 If None then batch buffers will be uncompressed.
119 Must be "lz4", "zstd" or None.
120 To specify a compression_level use `pyarrow.Codec`
121 use_threads : bool
122 Whether to use the global CPU thread pool to parallelize any
123 computational tasks like compression.
124 emit_dictionary_deltas : bool
125 Whether to emit dictionary deltas. Default is false for maximum
126 stream compatibility.
127 """
128 __slots__ = ()
129
130 # cdef block is in lib.pxd
131
132 def __init__(self, *, metadata_version=MetadataVersion.V5,
133 bint allow_64bit=False, use_legacy_format=False,
134 compression=None, bint use_threads=True,
135 bint emit_dictionary_deltas=False):
136 self.c_options = CIpcWriteOptions.Defaults()
137 self.allow_64bit = allow_64bit
138 self.use_legacy_format = use_legacy_format
139 self.metadata_version = metadata_version
140 if compression is not None:
141 self.compression = compression
142 self.use_threads = use_threads
143 self.emit_dictionary_deltas = emit_dictionary_deltas
144
145 @property
146 def allow_64bit(self):
147 return self.c_options.allow_64bit
148
149 @allow_64bit.setter
150 def allow_64bit(self, bint value):
151 self.c_options.allow_64bit = value
152
153 @property
154 def use_legacy_format(self):
155 return self.c_options.write_legacy_ipc_format
156
157 @use_legacy_format.setter
158 def use_legacy_format(self, bint value):
159 self.c_options.write_legacy_ipc_format = value
160
161 @property
162 def metadata_version(self):
163 return _wrap_metadata_version(self.c_options.metadata_version)
164
165 @metadata_version.setter
166 def metadata_version(self, value):
167 self.c_options.metadata_version = _unwrap_metadata_version(value)
168
169 @property
170 def compression(self):
171 if self.c_options.codec == nullptr:
172 return None
173 else:
174 return frombytes(self.c_options.codec.get().name())
175
176 @compression.setter
177 def compression(self, value):
178 if value is None:
179 self.c_options.codec.reset()
180 elif isinstance(value, str):
181 self.c_options.codec = shared_ptr[CCodec](GetResultValue(
182 CCodec.Create(_ensure_compression(value))).release())
183 elif isinstance(value, Codec):
184 self.c_options.codec = (<Codec>value).wrapped
185 else:
186 raise TypeError(
187 "Property `compression` must be None, str, or pyarrow.Codec")
188
189 @property
190 def use_threads(self):
191 return self.c_options.use_threads
192
193 @use_threads.setter
194 def use_threads(self, bint value):
195 self.c_options.use_threads = value
196
197 @property
198 def emit_dictionary_deltas(self):
199 return self.c_options.emit_dictionary_deltas
200
201 @emit_dictionary_deltas.setter
202 def emit_dictionary_deltas(self, bint value):
203 self.c_options.emit_dictionary_deltas = value
204
205
206cdef class Message(_Weakrefable):
207 """
208 Container for an Arrow IPC message with metadata and optional body
209 """
210
211 def __cinit__(self):
212 pass
213
214 def __init__(self):
215 raise TypeError("Do not call {}'s constructor directly, use "
216 "`pyarrow.ipc.read_message` function instead."
217 .format(self.__class__.__name__))
218
219 @property
220 def type(self):
221 return frombytes(FormatMessageType(self.message.get().type()))
222
223 @property
224 def metadata(self):
225 return pyarrow_wrap_buffer(self.message.get().metadata())
226
227 @property
228 def metadata_version(self):
229 return _wrap_metadata_version(self.message.get().metadata_version())
230
231 @property
232 def body(self):
233 cdef shared_ptr[CBuffer] body = self.message.get().body()
234 if body.get() == NULL:
235 return None
236 else:
237 return pyarrow_wrap_buffer(body)
238
239 def equals(self, Message other):
240 """
241 Returns True if the message contents (metadata and body) are identical
242
243 Parameters
244 ----------
245 other : Message
246
247 Returns
248 -------
249 are_equal : bool
250 """
251 cdef c_bool result
252 with nogil:
253 result = self.message.get().Equals(deref(other.message.get()))
254 return result
255
256 def serialize_to(self, NativeFile sink, alignment=8, memory_pool=None):
257 """
258 Write message to generic OutputStream
259
260 Parameters
261 ----------
262 sink : NativeFile
263 alignment : int, default 8
264 Byte alignment for metadata and body
265 memory_pool : MemoryPool, default None
266 Uses default memory pool if not specified
267 """
268 cdef:
269 int64_t output_length = 0
270 COutputStream* out
271 CIpcWriteOptions options
272
273 options.alignment = alignment
274 out = sink.get_output_stream().get()
275 with nogil:
276 check_status(self.message.get()
277 .SerializeTo(out, options, &output_length))
278
279 def serialize(self, alignment=8, memory_pool=None):
280 """
281 Write message as encapsulated IPC message
282
283 Parameters
284 ----------
285 alignment : int, default 8
286 Byte alignment for metadata and body
287 memory_pool : MemoryPool, default None
288 Uses default memory pool if not specified
289
290 Returns
291 -------
292 serialized : Buffer
293 """
294 stream = BufferOutputStream(memory_pool)
295 self.serialize_to(stream, alignment=alignment, memory_pool=memory_pool)
296 return stream.getvalue()
297
298 def __repr__(self):
299 if self.message == nullptr:
300 return """pyarrow.Message(uninitialized)"""
301
302 metadata_len = self.metadata.size
303 body = self.body
304 body_len = 0 if body is None else body.size
305
306 return """pyarrow.Message
307type: {0}
308metadata length: {1}
309body length: {2}""".format(self.type, metadata_len, body_len)
310
311
312cdef class MessageReader(_Weakrefable):
313 """
314 Interface for reading Message objects from some source (like an
315 InputStream)
316 """
317 cdef:
318 unique_ptr[CMessageReader] reader
319
320 def __cinit__(self):
321 pass
322
323 def __init__(self):
324 raise TypeError("Do not call {}'s constructor directly, use "
325 "`pyarrow.ipc.MessageReader.open_stream` function "
326 "instead.".format(self.__class__.__name__))
327
328 @staticmethod
329 def open_stream(source):
330 """
331 Open stream from source.
332
333 Parameters
334 ----------
335 source : a readable source, like an InputStream
336 """
337 cdef:
338 MessageReader result = MessageReader.__new__(MessageReader)
339 shared_ptr[CInputStream] in_stream
340 unique_ptr[CMessageReader] reader
341
342 _get_input_stream(source, &in_stream)
343 with nogil:
344 reader = CMessageReader.Open(in_stream)
345 result.reader.reset(reader.release())
346
347 return result
348
349 def __iter__(self):
350 while True:
351 yield self.read_next_message()
352
353 def read_next_message(self):
354 """
355 Read next Message from the stream.
356
357 Raises
358 ------
359 StopIteration : at end of stream
360 """
361 cdef Message result = Message.__new__(Message)
362
363 with nogil:
364 result.message = move(GetResultValue(self.reader.get()
365 .ReadNextMessage()))
366
367 if result.message.get() == NULL:
368 raise StopIteration
369
370 return result
371
372# ----------------------------------------------------------------------
373# File and stream readers and writers
374
375cdef class _CRecordBatchWriter(_Weakrefable):
376 """The base RecordBatchWriter wrapper.
377
378 Provides common implementations of convenience methods. Should not
379 be instantiated directly by user code.
380 """
381
382 # cdef block is in lib.pxd
383
384 def write(self, table_or_batch):
385 """
386 Write RecordBatch or Table to stream.
387
388 Parameters
389 ----------
390 table_or_batch : {RecordBatch, Table}
391 """
392 if isinstance(table_or_batch, RecordBatch):
393 self.write_batch(table_or_batch)
394 elif isinstance(table_or_batch, Table):
395 self.write_table(table_or_batch)
396 else:
397 raise ValueError(type(table_or_batch))
398
399 def write_batch(self, RecordBatch batch):
400 """
401 Write RecordBatch to stream.
402
403 Parameters
404 ----------
405 batch : RecordBatch
406 """
407 with nogil:
408 check_status(self.writer.get()
409 .WriteRecordBatch(deref(batch.batch)))
410
411 def write_table(self, Table table, max_chunksize=None, **kwargs):
412 """
413 Write Table to stream in (contiguous) RecordBatch objects.
414
415 Parameters
416 ----------
417 table : Table
418 max_chunksize : int, default None
419 Maximum size for RecordBatch chunks. Individual chunks may be
420 smaller depending on the chunk layout of individual columns.
421 """
422 cdef:
423 # max_chunksize must be > 0 to have any impact
424 int64_t c_max_chunksize = -1
425
426 if 'chunksize' in kwargs:
427 max_chunksize = kwargs['chunksize']
428 msg = ('The parameter chunksize is deprecated for the write_table '
429 'methods as of 0.15, please use parameter '
430 'max_chunksize instead')
431 warnings.warn(msg, FutureWarning)
432
433 if max_chunksize is not None:
434 c_max_chunksize = max_chunksize
435
436 with nogil:
437 check_status(self.writer.get().WriteTable(table.table[0],
438 c_max_chunksize))
439
440 def close(self):
441 """
442 Close stream and write end-of-stream 0 marker.
443 """
444 with nogil:
445 check_status(self.writer.get().Close())
446
447 def __enter__(self):
448 return self
449
450 def __exit__(self, exc_type, exc_val, exc_tb):
451 self.close()
452
453 @property
454 def stats(self):
455 """
456 Current IPC write statistics.
457 """
458 if not self.writer:
459 raise ValueError("Operation on closed writer")
460 return _wrap_write_stats(self.writer.get().stats())
461
462
463cdef class _RecordBatchStreamWriter(_CRecordBatchWriter):
464 cdef:
465 CIpcWriteOptions options
466 bint closed
467
468 def __cinit__(self):
469 pass
470
471 def __dealloc__(self):
472 pass
473
474 @property
475 def _use_legacy_format(self):
476 # For testing (see test_ipc.py)
477 return self.options.write_legacy_ipc_format
478
479 @property
480 def _metadata_version(self):
481 # For testing (see test_ipc.py)
482 return _wrap_metadata_version(self.options.metadata_version)
483
484 def _open(self, sink, Schema schema not None,
485 IpcWriteOptions options=IpcWriteOptions()):
486 cdef:
487 shared_ptr[COutputStream] c_sink
488
489 self.options = options.c_options
490 get_writer(sink, &c_sink)
491 with nogil:
492 self.writer = GetResultValue(
493 MakeStreamWriter(c_sink, schema.sp_schema,
494 self.options))
495
496
497cdef _get_input_stream(object source, shared_ptr[CInputStream]* out):
498 try:
499 source = as_buffer(source)
500 except TypeError:
501 # Non-buffer-like
502 pass
503
504 get_input_stream(source, True, out)
505
506
507class _ReadPandasMixin:
508
509 def read_pandas(self, **options):
510 """
511 Read contents of stream to a pandas.DataFrame.
512
513 Read all record batches as a pyarrow.Table then convert it to a
514 pandas.DataFrame using Table.to_pandas.
515
516 Parameters
517 ----------
518 **options : arguments to forward to Table.to_pandas
519
520 Returns
521 -------
522 df : pandas.DataFrame
523 """
524 table = self.read_all()
525 return table.to_pandas(**options)
526
527
528cdef class RecordBatchReader(_Weakrefable):
529 """Base class for reading stream of record batches.
530
531 Provides common implementations of convenience methods. Should not
532 be instantiated directly by user code.
533 """
534
535 # cdef block is in lib.pxd
536
537 def __iter__(self):
538 while True:
539 try:
540 yield self.read_next_batch()
541 except StopIteration:
542 return
543
544 @property
545 def schema(self):
546 """
547 Shared schema of the record batches in the stream.
548 """
549 cdef shared_ptr[CSchema] c_schema
550
551 with nogil:
552 c_schema = self.reader.get().schema()
553
554 return pyarrow_wrap_schema(c_schema)
555
556 def get_next_batch(self):
557 import warnings
558 warnings.warn('Please use read_next_batch instead of '
559 'get_next_batch', FutureWarning)
560 return self.read_next_batch()
561
562 def read_next_batch(self):
563 """
564 Read next RecordBatch from the stream.
565
566 Raises
567 ------
568 StopIteration:
569 At end of stream.
570 """
571 cdef shared_ptr[CRecordBatch] batch
572
573 with nogil:
574 check_status(self.reader.get().ReadNext(&batch))
575
576 if batch.get() == NULL:
577 raise StopIteration
578
579 return pyarrow_wrap_batch(batch)
580
581 def read_all(self):
582 """
583 Read all record batches as a pyarrow.Table.
584 """
585 cdef shared_ptr[CTable] table
586 with nogil:
587 check_status(self.reader.get().ReadAll(&table))
588 return pyarrow_wrap_table(table)
589
590 read_pandas = _ReadPandasMixin.read_pandas
591
592 def __enter__(self):
593 return self
594
595 def __exit__(self, exc_type, exc_val, exc_tb):
596 pass
597
598 def _export_to_c(self, uintptr_t out_ptr):
599 """
600 Export to a C ArrowArrayStream struct, given its pointer.
601
602 Parameters
603 ----------
604 out_ptr: int
605 The raw pointer to a C ArrowArrayStream struct.
606
607 Be careful: if you don't pass the ArrowArrayStream struct to a
608 consumer, array memory will leak. This is a low-level function
609 intended for expert users.
610 """
611 with nogil:
612 check_status(ExportRecordBatchReader(
613 self.reader, <ArrowArrayStream*> out_ptr))
614
615 @staticmethod
616 def _import_from_c(uintptr_t in_ptr):
617 """
618 Import RecordBatchReader from a C ArrowArrayStream struct,
619 given its pointer.
620
621 Parameters
622 ----------
623 in_ptr: int
624 The raw pointer to a C ArrowArrayStream struct.
625
626 This is a low-level function intended for expert users.
627 """
628 cdef:
629 shared_ptr[CRecordBatchReader] c_reader
630 RecordBatchReader self
631
632 with nogil:
633 c_reader = GetResultValue(ImportRecordBatchReader(
634 <ArrowArrayStream*> in_ptr))
635
636 self = RecordBatchReader.__new__(RecordBatchReader)
637 self.reader = c_reader
638 return self
639
640 @staticmethod
641 def from_batches(schema, batches):
642 """
643 Create RecordBatchReader from an iterable of batches.
644
645 Parameters
646 ----------
647 schema : Schema
648 The shared schema of the record batches
649 batches : Iterable[RecordBatch]
650 The batches that this reader will return.
651
652 Returns
653 -------
654 reader : RecordBatchReader
655 """
656 cdef:
657 shared_ptr[CSchema] c_schema
658 shared_ptr[CRecordBatchReader] c_reader
659 RecordBatchReader self
660
661 c_schema = pyarrow_unwrap_schema(schema)
662 c_reader = GetResultValue(CPyRecordBatchReader.Make(
663 c_schema, batches))
664
665 self = RecordBatchReader.__new__(RecordBatchReader)
666 self.reader = c_reader
667 return self
668
669
670cdef class _RecordBatchStreamReader(RecordBatchReader):
671 cdef:
672 shared_ptr[CInputStream] in_stream
673 CIpcReadOptions options
674 CRecordBatchStreamReader* stream_reader
675
676 def __cinit__(self):
677 pass
678
679 def _open(self, source):
680 _get_input_stream(source, &self.in_stream)
681 with nogil:
682 self.reader = GetResultValue(CRecordBatchStreamReader.Open(
683 self.in_stream, self.options))
684 self.stream_reader = <CRecordBatchStreamReader*> self.reader.get()
685
686 @property
687 def stats(self):
688 """
689 Current IPC read statistics.
690 """
691 if not self.reader:
692 raise ValueError("Operation on closed reader")
693 return _wrap_read_stats(self.stream_reader.stats())
694
695
696cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
697
698 def _open(self, sink, Schema schema not None,
699 IpcWriteOptions options=IpcWriteOptions()):
700 cdef:
701 shared_ptr[COutputStream] c_sink
702
703 self.options = options.c_options
704 get_writer(sink, &c_sink)
705 with nogil:
706 self.writer = GetResultValue(
707 MakeFileWriter(c_sink, schema.sp_schema, self.options))
708
709
710cdef class _RecordBatchFileReader(_Weakrefable):
711 cdef:
712 shared_ptr[CRecordBatchFileReader] reader
713 shared_ptr[CRandomAccessFile] file
714 CIpcReadOptions options
715
716 cdef readonly:
717 Schema schema
718
719 def __cinit__(self):
720 pass
721
722 def _open(self, source, footer_offset=None):
723 try:
724 source = as_buffer(source)
725 except TypeError:
726 pass
727
728 get_reader(source, True, &self.file)
729
730 cdef int64_t offset = 0
731 if footer_offset is not None:
732 offset = footer_offset
733
734 with nogil:
735 if offset != 0:
736 self.reader = GetResultValue(
737 CRecordBatchFileReader.Open2(self.file.get(), offset,
738 self.options))
739
740 else:
741 self.reader = GetResultValue(
742 CRecordBatchFileReader.Open(self.file.get(),
743 self.options))
744
745 self.schema = pyarrow_wrap_schema(self.reader.get().schema())
746
747 @property
748 def num_record_batches(self):
749 return self.reader.get().num_record_batches()
750
751 def get_batch(self, int i):
752 cdef shared_ptr[CRecordBatch] batch
753
754 if i < 0 or i >= self.num_record_batches:
755 raise ValueError('Batch number {0} out of range'.format(i))
756
757 with nogil:
758 batch = GetResultValue(self.reader.get().ReadRecordBatch(i))
759
760 return pyarrow_wrap_batch(batch)
761
762 # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
763 # time has passed
764 get_record_batch = get_batch
765
766 def read_all(self):
767 """
768 Read all record batches as a pyarrow.Table
769 """
770 cdef:
771 vector[shared_ptr[CRecordBatch]] batches
772 shared_ptr[CTable] table
773 int i, nbatches
774
775 nbatches = self.num_record_batches
776
777 batches.resize(nbatches)
778 with nogil:
779 for i in range(nbatches):
780 batches[i] = GetResultValue(self.reader.get()
781 .ReadRecordBatch(i))
782 table = GetResultValue(
783 CTable.FromRecordBatches(self.schema.sp_schema, move(batches)))
784
785 return pyarrow_wrap_table(table)
786
787 read_pandas = _ReadPandasMixin.read_pandas
788
789 def __enter__(self):
790 return self
791
792 def __exit__(self, exc_type, exc_value, traceback):
793 pass
794
795 @property
796 def stats(self):
797 """
798 Current IPC read statistics.
799 """
800 if not self.reader:
801 raise ValueError("Operation on closed reader")
802 return _wrap_read_stats(self.reader.get().stats())
803
804
805def get_tensor_size(Tensor tensor):
806 """
807 Return total size of serialized Tensor including metadata and padding.
808
809 Parameters
810 ----------
811 tensor : Tensor
812 The tensor for which we want to known the size.
813 """
814 cdef int64_t size
815 with nogil:
816 check_status(GetTensorSize(deref(tensor.tp), &size))
817 return size
818
819
820def get_record_batch_size(RecordBatch batch):
821 """
822 Return total size of serialized RecordBatch including metadata and padding.
823
824 Parameters
825 ----------
826 batch : RecordBatch
827 The recordbatch for which we want to know the size.
828 """
829 cdef int64_t size
830 with nogil:
831 check_status(GetRecordBatchSize(deref(batch.batch), &size))
832 return size
833
834
835def write_tensor(Tensor tensor, NativeFile dest):
836 """
837 Write pyarrow.Tensor to pyarrow.NativeFile object its current position.
838
839 Parameters
840 ----------
841 tensor : pyarrow.Tensor
842 dest : pyarrow.NativeFile
843
844 Returns
845 -------
846 bytes_written : int
847 Total number of bytes written to the file
848 """
849 cdef:
850 int32_t metadata_length
851 int64_t body_length
852
853 handle = dest.get_output_stream()
854
855 with nogil:
856 check_status(
857 WriteTensor(deref(tensor.tp), handle.get(),
858 &metadata_length, &body_length))
859
860 return metadata_length + body_length
861
862
863cdef NativeFile as_native_file(source):
864 if not isinstance(source, NativeFile):
865 if hasattr(source, 'read'):
866 source = PythonFile(source)
867 else:
868 source = BufferReader(source)
869
870 if not isinstance(source, NativeFile):
871 raise ValueError('Unable to read message from object with type: {0}'
872 .format(type(source)))
873 return source
874
875
876def read_tensor(source):
877 """Read pyarrow.Tensor from pyarrow.NativeFile object from current
878 position. If the file source supports zero copy (e.g. a memory map), then
879 this operation does not allocate any memory. This function not assume that
880 the stream is aligned
881
882 Parameters
883 ----------
884 source : pyarrow.NativeFile
885
886 Returns
887 -------
888 tensor : Tensor
889
890 """
891 cdef:
892 shared_ptr[CTensor] sp_tensor
893 CInputStream* c_stream
894 NativeFile nf = as_native_file(source)
895
896 c_stream = nf.get_input_stream().get()
897 with nogil:
898 sp_tensor = GetResultValue(ReadTensor(c_stream))
899 return pyarrow_wrap_tensor(sp_tensor)
900
901
902def read_message(source):
903 """
904 Read length-prefixed message from file or buffer-like object
905
906 Parameters
907 ----------
908 source : pyarrow.NativeFile, file-like object, or buffer-like object
909
910 Returns
911 -------
912 message : Message
913 """
914 cdef:
915 Message result = Message.__new__(Message)
916 CInputStream* c_stream
917
918 cdef NativeFile nf = as_native_file(source)
919 c_stream = nf.get_input_stream().get()
920
921 with nogil:
922 result.message = move(
923 GetResultValue(ReadMessage(c_stream, c_default_memory_pool())))
924
925 if result.message == nullptr:
926 raise EOFError("End of Arrow stream")
927
928 return result
929
930
931def read_schema(obj, DictionaryMemo dictionary_memo=None):
932 """
933 Read Schema from message or buffer
934
935 Parameters
936 ----------
937 obj : buffer or Message
938 dictionary_memo : DictionaryMemo, optional
939 Needed to be able to reconstruct dictionary-encoded fields
940 with read_record_batch
941
942 Returns
943 -------
944 schema : Schema
945 """
946 cdef:
947 shared_ptr[CSchema] result
948 shared_ptr[CRandomAccessFile] cpp_file
949 CDictionaryMemo temp_memo
950 CDictionaryMemo* arg_dict_memo
951
952 if isinstance(obj, Message):
953 raise NotImplementedError(type(obj))
954
955 get_reader(obj, True, &cpp_file)
956
957 if dictionary_memo is not None:
958 arg_dict_memo = dictionary_memo.memo
959 else:
960 arg_dict_memo = &temp_memo
961
962 with nogil:
963 result = GetResultValue(ReadSchema(cpp_file.get(), arg_dict_memo))
964
965 return pyarrow_wrap_schema(result)
966
967
968def read_record_batch(obj, Schema schema,
969 DictionaryMemo dictionary_memo=None):
970 """
971 Read RecordBatch from message, given a known schema. If reading data from a
972 complete IPC stream, use ipc.open_stream instead
973
974 Parameters
975 ----------
976 obj : Message or Buffer-like
977 schema : Schema
978 dictionary_memo : DictionaryMemo, optional
979 If message contains dictionaries, must pass a populated
980 DictionaryMemo
981
982 Returns
983 -------
984 batch : RecordBatch
985 """
986 cdef:
987 shared_ptr[CRecordBatch] result
988 Message message
989 CDictionaryMemo temp_memo
990 CDictionaryMemo* arg_dict_memo
991
992 if isinstance(obj, Message):
993 message = obj
994 else:
995 message = read_message(obj)
996
997 if dictionary_memo is not None:
998 arg_dict_memo = dictionary_memo.memo
999 else:
1000 arg_dict_memo = &temp_memo
1001
1002 with nogil:
1003 result = GetResultValue(
1004 ReadRecordBatch(deref(message.message.get()),
1005 schema.sp_schema,
1006 arg_dict_memo,
1007 CIpcReadOptions.Defaults()))
1008
1009 return pyarrow_wrap_batch(result)