--- /dev/null
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from collections import namedtuple
+import warnings
+
+
+cpdef enum MetadataVersion:
+ V1 = <char> CMetadataVersion_V1
+ V2 = <char> CMetadataVersion_V2
+ V3 = <char> CMetadataVersion_V3
+ V4 = <char> CMetadataVersion_V4
+ V5 = <char> CMetadataVersion_V5
+
+
+cdef object _wrap_metadata_version(CMetadataVersion version):
+ return MetadataVersion(<char> version)
+
+
+cdef CMetadataVersion _unwrap_metadata_version(
+ MetadataVersion version) except *:
+ if version == MetadataVersion.V1:
+ return CMetadataVersion_V1
+ elif version == MetadataVersion.V2:
+ return CMetadataVersion_V2
+ elif version == MetadataVersion.V3:
+ return CMetadataVersion_V3
+ elif version == MetadataVersion.V4:
+ return CMetadataVersion_V4
+ elif version == MetadataVersion.V5:
+ return CMetadataVersion_V5
+ raise ValueError("Not a metadata version: " + repr(version))
+
+
+_WriteStats = namedtuple(
+ 'WriteStats',
+ ('num_messages', 'num_record_batches', 'num_dictionary_batches',
+ 'num_dictionary_deltas', 'num_replaced_dictionaries'))
+
+
+class WriteStats(_WriteStats):
+ """IPC write statistics
+
+ Parameters
+ ----------
+ num_messages : number of messages.
+ num_record_batches : number of record batches.
+ num_dictionary_batches : number of dictionary batches.
+ num_dictionary_deltas : delta of dictionaries.
+ num_replaced_dictionaries : number of replaced dictionaries.
+ """
+ __slots__ = ()
+
+
+@staticmethod
+cdef _wrap_write_stats(CIpcWriteStats c):
+ return WriteStats(c.num_messages, c.num_record_batches,
+ c.num_dictionary_batches, c.num_dictionary_deltas,
+ c.num_replaced_dictionaries)
+
+
+_ReadStats = namedtuple(
+ 'ReadStats',
+ ('num_messages', 'num_record_batches', 'num_dictionary_batches',
+ 'num_dictionary_deltas', 'num_replaced_dictionaries'))
+
+
+class ReadStats(_ReadStats):
+ """IPC read statistics
+
+ Parameters
+ ----------
+ num_messages : number of messages.
+ num_record_batches : number of record batches.
+ num_dictionary_batches : number of dictionary batches.
+ num_dictionary_deltas : delta of dictionaries.
+ num_replaced_dictionaries : number of replaced dictionaries.
+ """
+ __slots__ = ()
+
+
+@staticmethod
+cdef _wrap_read_stats(CIpcReadStats c):
+ return ReadStats(c.num_messages, c.num_record_batches,
+ c.num_dictionary_batches, c.num_dictionary_deltas,
+ c.num_replaced_dictionaries)
+
+
+cdef class IpcWriteOptions(_Weakrefable):
+ """
+ Serialization options for the IPC format.
+
+ Parameters
+ ----------
+ metadata_version : MetadataVersion, default MetadataVersion.V5
+ The metadata version to write. V5 is the current and latest,
+ V4 is the pre-1.0 metadata version (with incompatible Union layout).
+ allow_64bit : bool, default False
+ If true, allow field lengths that don't fit in a signed 32-bit int.
+ use_legacy_format : bool, default False
+ Whether to use the pre-Arrow 0.15 IPC format.
+ compression : str, Codec, or None
+ compression codec to use for record batch buffers.
+ If None then batch buffers will be uncompressed.
+ Must be "lz4", "zstd" or None.
+ To specify a compression_level use `pyarrow.Codec`
+ use_threads : bool
+ Whether to use the global CPU thread pool to parallelize any
+ computational tasks like compression.
+ emit_dictionary_deltas : bool
+ Whether to emit dictionary deltas. Default is false for maximum
+ stream compatibility.
+ """
+ __slots__ = ()
+
+ # cdef block is in lib.pxd
+
+ def __init__(self, *, metadata_version=MetadataVersion.V5,
+ bint allow_64bit=False, use_legacy_format=False,
+ compression=None, bint use_threads=True,
+ bint emit_dictionary_deltas=False):
+ self.c_options = CIpcWriteOptions.Defaults()
+ self.allow_64bit = allow_64bit
+ self.use_legacy_format = use_legacy_format
+ self.metadata_version = metadata_version
+ if compression is not None:
+ self.compression = compression
+ self.use_threads = use_threads
+ self.emit_dictionary_deltas = emit_dictionary_deltas
+
+ @property
+ def allow_64bit(self):
+ return self.c_options.allow_64bit
+
+ @allow_64bit.setter
+ def allow_64bit(self, bint value):
+ self.c_options.allow_64bit = value
+
+ @property
+ def use_legacy_format(self):
+ return self.c_options.write_legacy_ipc_format
+
+ @use_legacy_format.setter
+ def use_legacy_format(self, bint value):
+ self.c_options.write_legacy_ipc_format = value
+
+ @property
+ def metadata_version(self):
+ return _wrap_metadata_version(self.c_options.metadata_version)
+
+ @metadata_version.setter
+ def metadata_version(self, value):
+ self.c_options.metadata_version = _unwrap_metadata_version(value)
+
+ @property
+ def compression(self):
+ if self.c_options.codec == nullptr:
+ return None
+ else:
+ return frombytes(self.c_options.codec.get().name())
+
+ @compression.setter
+ def compression(self, value):
+ if value is None:
+ self.c_options.codec.reset()
+ elif isinstance(value, str):
+ self.c_options.codec = shared_ptr[CCodec](GetResultValue(
+ CCodec.Create(_ensure_compression(value))).release())
+ elif isinstance(value, Codec):
+ self.c_options.codec = (<Codec>value).wrapped
+ else:
+ raise TypeError(
+ "Property `compression` must be None, str, or pyarrow.Codec")
+
+ @property
+ def use_threads(self):
+ return self.c_options.use_threads
+
+ @use_threads.setter
+ def use_threads(self, bint value):
+ self.c_options.use_threads = value
+
+ @property
+ def emit_dictionary_deltas(self):
+ return self.c_options.emit_dictionary_deltas
+
+ @emit_dictionary_deltas.setter
+ def emit_dictionary_deltas(self, bint value):
+ self.c_options.emit_dictionary_deltas = value
+
+
+cdef class Message(_Weakrefable):
+ """
+ Container for an Arrow IPC message with metadata and optional body
+ """
+
+ def __cinit__(self):
+ pass
+
+ def __init__(self):
+ raise TypeError("Do not call {}'s constructor directly, use "
+ "`pyarrow.ipc.read_message` function instead."
+ .format(self.__class__.__name__))
+
+ @property
+ def type(self):
+ return frombytes(FormatMessageType(self.message.get().type()))
+
+ @property
+ def metadata(self):
+ return pyarrow_wrap_buffer(self.message.get().metadata())
+
+ @property
+ def metadata_version(self):
+ return _wrap_metadata_version(self.message.get().metadata_version())
+
+ @property
+ def body(self):
+ cdef shared_ptr[CBuffer] body = self.message.get().body()
+ if body.get() == NULL:
+ return None
+ else:
+ return pyarrow_wrap_buffer(body)
+
+ def equals(self, Message other):
+ """
+ Returns True if the message contents (metadata and body) are identical
+
+ Parameters
+ ----------
+ other : Message
+
+ Returns
+ -------
+ are_equal : bool
+ """
+ cdef c_bool result
+ with nogil:
+ result = self.message.get().Equals(deref(other.message.get()))
+ return result
+
+ def serialize_to(self, NativeFile sink, alignment=8, memory_pool=None):
+ """
+ Write message to generic OutputStream
+
+ Parameters
+ ----------
+ sink : NativeFile
+ alignment : int, default 8
+ Byte alignment for metadata and body
+ memory_pool : MemoryPool, default None
+ Uses default memory pool if not specified
+ """
+ cdef:
+ int64_t output_length = 0
+ COutputStream* out
+ CIpcWriteOptions options
+
+ options.alignment = alignment
+ out = sink.get_output_stream().get()
+ with nogil:
+ check_status(self.message.get()
+ .SerializeTo(out, options, &output_length))
+
+ def serialize(self, alignment=8, memory_pool=None):
+ """
+ Write message as encapsulated IPC message
+
+ Parameters
+ ----------
+ alignment : int, default 8
+ Byte alignment for metadata and body
+ memory_pool : MemoryPool, default None
+ Uses default memory pool if not specified
+
+ Returns
+ -------
+ serialized : Buffer
+ """
+ stream = BufferOutputStream(memory_pool)
+ self.serialize_to(stream, alignment=alignment, memory_pool=memory_pool)
+ return stream.getvalue()
+
+ def __repr__(self):
+ if self.message == nullptr:
+ return """pyarrow.Message(uninitialized)"""
+
+ metadata_len = self.metadata.size
+ body = self.body
+ body_len = 0 if body is None else body.size
+
+ return """pyarrow.Message
+type: {0}
+metadata length: {1}
+body length: {2}""".format(self.type, metadata_len, body_len)
+
+
+cdef class MessageReader(_Weakrefable):
+ """
+ Interface for reading Message objects from some source (like an
+ InputStream)
+ """
+ cdef:
+ unique_ptr[CMessageReader] reader
+
+ def __cinit__(self):
+ pass
+
+ def __init__(self):
+ raise TypeError("Do not call {}'s constructor directly, use "
+ "`pyarrow.ipc.MessageReader.open_stream` function "
+ "instead.".format(self.__class__.__name__))
+
+ @staticmethod
+ def open_stream(source):
+ """
+ Open stream from source.
+
+ Parameters
+ ----------
+ source : a readable source, like an InputStream
+ """
+ cdef:
+ MessageReader result = MessageReader.__new__(MessageReader)
+ shared_ptr[CInputStream] in_stream
+ unique_ptr[CMessageReader] reader
+
+ _get_input_stream(source, &in_stream)
+ with nogil:
+ reader = CMessageReader.Open(in_stream)
+ result.reader.reset(reader.release())
+
+ return result
+
+ def __iter__(self):
+ while True:
+ yield self.read_next_message()
+
+ def read_next_message(self):
+ """
+ Read next Message from the stream.
+
+ Raises
+ ------
+ StopIteration : at end of stream
+ """
+ cdef Message result = Message.__new__(Message)
+
+ with nogil:
+ result.message = move(GetResultValue(self.reader.get()
+ .ReadNextMessage()))
+
+ if result.message.get() == NULL:
+ raise StopIteration
+
+ return result
+
+# ----------------------------------------------------------------------
+# File and stream readers and writers
+
+cdef class _CRecordBatchWriter(_Weakrefable):
+ """The base RecordBatchWriter wrapper.
+
+ Provides common implementations of convenience methods. Should not
+ be instantiated directly by user code.
+ """
+
+ # cdef block is in lib.pxd
+
+ def write(self, table_or_batch):
+ """
+ Write RecordBatch or Table to stream.
+
+ Parameters
+ ----------
+ table_or_batch : {RecordBatch, Table}
+ """
+ if isinstance(table_or_batch, RecordBatch):
+ self.write_batch(table_or_batch)
+ elif isinstance(table_or_batch, Table):
+ self.write_table(table_or_batch)
+ else:
+ raise ValueError(type(table_or_batch))
+
+ def write_batch(self, RecordBatch batch):
+ """
+ Write RecordBatch to stream.
+
+ Parameters
+ ----------
+ batch : RecordBatch
+ """
+ with nogil:
+ check_status(self.writer.get()
+ .WriteRecordBatch(deref(batch.batch)))
+
+ def write_table(self, Table table, max_chunksize=None, **kwargs):
+ """
+ Write Table to stream in (contiguous) RecordBatch objects.
+
+ Parameters
+ ----------
+ table : Table
+ max_chunksize : int, default None
+ Maximum size for RecordBatch chunks. Individual chunks may be
+ smaller depending on the chunk layout of individual columns.
+ """
+ cdef:
+ # max_chunksize must be > 0 to have any impact
+ int64_t c_max_chunksize = -1
+
+ if 'chunksize' in kwargs:
+ max_chunksize = kwargs['chunksize']
+ msg = ('The parameter chunksize is deprecated for the write_table '
+ 'methods as of 0.15, please use parameter '
+ 'max_chunksize instead')
+ warnings.warn(msg, FutureWarning)
+
+ if max_chunksize is not None:
+ c_max_chunksize = max_chunksize
+
+ with nogil:
+ check_status(self.writer.get().WriteTable(table.table[0],
+ c_max_chunksize))
+
+ def close(self):
+ """
+ Close stream and write end-of-stream 0 marker.
+ """
+ with nogil:
+ check_status(self.writer.get().Close())
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ @property
+ def stats(self):
+ """
+ Current IPC write statistics.
+ """
+ if not self.writer:
+ raise ValueError("Operation on closed writer")
+ return _wrap_write_stats(self.writer.get().stats())
+
+
+cdef class _RecordBatchStreamWriter(_CRecordBatchWriter):
+ cdef:
+ CIpcWriteOptions options
+ bint closed
+
+ def __cinit__(self):
+ pass
+
+ def __dealloc__(self):
+ pass
+
+ @property
+ def _use_legacy_format(self):
+ # For testing (see test_ipc.py)
+ return self.options.write_legacy_ipc_format
+
+ @property
+ def _metadata_version(self):
+ # For testing (see test_ipc.py)
+ return _wrap_metadata_version(self.options.metadata_version)
+
+ def _open(self, sink, Schema schema not None,
+ IpcWriteOptions options=IpcWriteOptions()):
+ cdef:
+ shared_ptr[COutputStream] c_sink
+
+ self.options = options.c_options
+ get_writer(sink, &c_sink)
+ with nogil:
+ self.writer = GetResultValue(
+ MakeStreamWriter(c_sink, schema.sp_schema,
+ self.options))
+
+
+cdef _get_input_stream(object source, shared_ptr[CInputStream]* out):
+ try:
+ source = as_buffer(source)
+ except TypeError:
+ # Non-buffer-like
+ pass
+
+ get_input_stream(source, True, out)
+
+
+class _ReadPandasMixin:
+
+ def read_pandas(self, **options):
+ """
+ Read contents of stream to a pandas.DataFrame.
+
+ Read all record batches as a pyarrow.Table then convert it to a
+ pandas.DataFrame using Table.to_pandas.
+
+ Parameters
+ ----------
+ **options : arguments to forward to Table.to_pandas
+
+ Returns
+ -------
+ df : pandas.DataFrame
+ """
+ table = self.read_all()
+ return table.to_pandas(**options)
+
+
+cdef class RecordBatchReader(_Weakrefable):
+ """Base class for reading stream of record batches.
+
+ Provides common implementations of convenience methods. Should not
+ be instantiated directly by user code.
+ """
+
+ # cdef block is in lib.pxd
+
+ def __iter__(self):
+ while True:
+ try:
+ yield self.read_next_batch()
+ except StopIteration:
+ return
+
+ @property
+ def schema(self):
+ """
+ Shared schema of the record batches in the stream.
+ """
+ cdef shared_ptr[CSchema] c_schema
+
+ with nogil:
+ c_schema = self.reader.get().schema()
+
+ return pyarrow_wrap_schema(c_schema)
+
+ def get_next_batch(self):
+ import warnings
+ warnings.warn('Please use read_next_batch instead of '
+ 'get_next_batch', FutureWarning)
+ return self.read_next_batch()
+
+ def read_next_batch(self):
+ """
+ Read next RecordBatch from the stream.
+
+ Raises
+ ------
+ StopIteration:
+ At end of stream.
+ """
+ cdef shared_ptr[CRecordBatch] batch
+
+ with nogil:
+ check_status(self.reader.get().ReadNext(&batch))
+
+ if batch.get() == NULL:
+ raise StopIteration
+
+ return pyarrow_wrap_batch(batch)
+
+ def read_all(self):
+ """
+ Read all record batches as a pyarrow.Table.
+ """
+ cdef shared_ptr[CTable] table
+ with nogil:
+ check_status(self.reader.get().ReadAll(&table))
+ return pyarrow_wrap_table(table)
+
+ read_pandas = _ReadPandasMixin.read_pandas
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ def _export_to_c(self, uintptr_t out_ptr):
+ """
+ Export to a C ArrowArrayStream struct, given its pointer.
+
+ Parameters
+ ----------
+ out_ptr: int
+ The raw pointer to a C ArrowArrayStream struct.
+
+ Be careful: if you don't pass the ArrowArrayStream struct to a
+ consumer, array memory will leak. This is a low-level function
+ intended for expert users.
+ """
+ with nogil:
+ check_status(ExportRecordBatchReader(
+ self.reader, <ArrowArrayStream*> out_ptr))
+
+ @staticmethod
+ def _import_from_c(uintptr_t in_ptr):
+ """
+ Import RecordBatchReader from a C ArrowArrayStream struct,
+ given its pointer.
+
+ Parameters
+ ----------
+ in_ptr: int
+ The raw pointer to a C ArrowArrayStream struct.
+
+ This is a low-level function intended for expert users.
+ """
+ cdef:
+ shared_ptr[CRecordBatchReader] c_reader
+ RecordBatchReader self
+
+ with nogil:
+ c_reader = GetResultValue(ImportRecordBatchReader(
+ <ArrowArrayStream*> in_ptr))
+
+ self = RecordBatchReader.__new__(RecordBatchReader)
+ self.reader = c_reader
+ return self
+
+ @staticmethod
+ def from_batches(schema, batches):
+ """
+ Create RecordBatchReader from an iterable of batches.
+
+ Parameters
+ ----------
+ schema : Schema
+ The shared schema of the record batches
+ batches : Iterable[RecordBatch]
+ The batches that this reader will return.
+
+ Returns
+ -------
+ reader : RecordBatchReader
+ """
+ cdef:
+ shared_ptr[CSchema] c_schema
+ shared_ptr[CRecordBatchReader] c_reader
+ RecordBatchReader self
+
+ c_schema = pyarrow_unwrap_schema(schema)
+ c_reader = GetResultValue(CPyRecordBatchReader.Make(
+ c_schema, batches))
+
+ self = RecordBatchReader.__new__(RecordBatchReader)
+ self.reader = c_reader
+ return self
+
+
+cdef class _RecordBatchStreamReader(RecordBatchReader):
+ cdef:
+ shared_ptr[CInputStream] in_stream
+ CIpcReadOptions options
+ CRecordBatchStreamReader* stream_reader
+
+ def __cinit__(self):
+ pass
+
+ def _open(self, source):
+ _get_input_stream(source, &self.in_stream)
+ with nogil:
+ self.reader = GetResultValue(CRecordBatchStreamReader.Open(
+ self.in_stream, self.options))
+ self.stream_reader = <CRecordBatchStreamReader*> self.reader.get()
+
+ @property
+ def stats(self):
+ """
+ Current IPC read statistics.
+ """
+ if not self.reader:
+ raise ValueError("Operation on closed reader")
+ return _wrap_read_stats(self.stream_reader.stats())
+
+
+cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
+
+ def _open(self, sink, Schema schema not None,
+ IpcWriteOptions options=IpcWriteOptions()):
+ cdef:
+ shared_ptr[COutputStream] c_sink
+
+ self.options = options.c_options
+ get_writer(sink, &c_sink)
+ with nogil:
+ self.writer = GetResultValue(
+ MakeFileWriter(c_sink, schema.sp_schema, self.options))
+
+
+cdef class _RecordBatchFileReader(_Weakrefable):
+ cdef:
+ shared_ptr[CRecordBatchFileReader] reader
+ shared_ptr[CRandomAccessFile] file
+ CIpcReadOptions options
+
+ cdef readonly:
+ Schema schema
+
+ def __cinit__(self):
+ pass
+
+ def _open(self, source, footer_offset=None):
+ try:
+ source = as_buffer(source)
+ except TypeError:
+ pass
+
+ get_reader(source, True, &self.file)
+
+ cdef int64_t offset = 0
+ if footer_offset is not None:
+ offset = footer_offset
+
+ with nogil:
+ if offset != 0:
+ self.reader = GetResultValue(
+ CRecordBatchFileReader.Open2(self.file.get(), offset,
+ self.options))
+
+ else:
+ self.reader = GetResultValue(
+ CRecordBatchFileReader.Open(self.file.get(),
+ self.options))
+
+ self.schema = pyarrow_wrap_schema(self.reader.get().schema())
+
+ @property
+ def num_record_batches(self):
+ return self.reader.get().num_record_batches()
+
+ def get_batch(self, int i):
+ cdef shared_ptr[CRecordBatch] batch
+
+ if i < 0 or i >= self.num_record_batches:
+ raise ValueError('Batch number {0} out of range'.format(i))
+
+ with nogil:
+ batch = GetResultValue(self.reader.get().ReadRecordBatch(i))
+
+ return pyarrow_wrap_batch(batch)
+
+ # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
+ # time has passed
+ get_record_batch = get_batch
+
+ def read_all(self):
+ """
+ Read all record batches as a pyarrow.Table
+ """
+ cdef:
+ vector[shared_ptr[CRecordBatch]] batches
+ shared_ptr[CTable] table
+ int i, nbatches
+
+ nbatches = self.num_record_batches
+
+ batches.resize(nbatches)
+ with nogil:
+ for i in range(nbatches):
+ batches[i] = GetResultValue(self.reader.get()
+ .ReadRecordBatch(i))
+ table = GetResultValue(
+ CTable.FromRecordBatches(self.schema.sp_schema, move(batches)))
+
+ return pyarrow_wrap_table(table)
+
+ read_pandas = _ReadPandasMixin.read_pandas
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+ @property
+ def stats(self):
+ """
+ Current IPC read statistics.
+ """
+ if not self.reader:
+ raise ValueError("Operation on closed reader")
+ return _wrap_read_stats(self.reader.get().stats())
+
+
+def get_tensor_size(Tensor tensor):
+ """
+ Return total size of serialized Tensor including metadata and padding.
+
+ Parameters
+ ----------
+ tensor : Tensor
+ The tensor for which we want to known the size.
+ """
+ cdef int64_t size
+ with nogil:
+ check_status(GetTensorSize(deref(tensor.tp), &size))
+ return size
+
+
+def get_record_batch_size(RecordBatch batch):
+ """
+ Return total size of serialized RecordBatch including metadata and padding.
+
+ Parameters
+ ----------
+ batch : RecordBatch
+ The recordbatch for which we want to know the size.
+ """
+ cdef int64_t size
+ with nogil:
+ check_status(GetRecordBatchSize(deref(batch.batch), &size))
+ return size
+
+
+def write_tensor(Tensor tensor, NativeFile dest):
+ """
+ Write pyarrow.Tensor to pyarrow.NativeFile object its current position.
+
+ Parameters
+ ----------
+ tensor : pyarrow.Tensor
+ dest : pyarrow.NativeFile
+
+ Returns
+ -------
+ bytes_written : int
+ Total number of bytes written to the file
+ """
+ cdef:
+ int32_t metadata_length
+ int64_t body_length
+
+ handle = dest.get_output_stream()
+
+ with nogil:
+ check_status(
+ WriteTensor(deref(tensor.tp), handle.get(),
+ &metadata_length, &body_length))
+
+ return metadata_length + body_length
+
+
+cdef NativeFile as_native_file(source):
+ if not isinstance(source, NativeFile):
+ if hasattr(source, 'read'):
+ source = PythonFile(source)
+ else:
+ source = BufferReader(source)
+
+ if not isinstance(source, NativeFile):
+ raise ValueError('Unable to read message from object with type: {0}'
+ .format(type(source)))
+ return source
+
+
+def read_tensor(source):
+ """Read pyarrow.Tensor from pyarrow.NativeFile object from current
+ position. If the file source supports zero copy (e.g. a memory map), then
+ this operation does not allocate any memory. This function not assume that
+ the stream is aligned
+
+ Parameters
+ ----------
+ source : pyarrow.NativeFile
+
+ Returns
+ -------
+ tensor : Tensor
+
+ """
+ cdef:
+ shared_ptr[CTensor] sp_tensor
+ CInputStream* c_stream
+ NativeFile nf = as_native_file(source)
+
+ c_stream = nf.get_input_stream().get()
+ with nogil:
+ sp_tensor = GetResultValue(ReadTensor(c_stream))
+ return pyarrow_wrap_tensor(sp_tensor)
+
+
+def read_message(source):
+ """
+ Read length-prefixed message from file or buffer-like object
+
+ Parameters
+ ----------
+ source : pyarrow.NativeFile, file-like object, or buffer-like object
+
+ Returns
+ -------
+ message : Message
+ """
+ cdef:
+ Message result = Message.__new__(Message)
+ CInputStream* c_stream
+
+ cdef NativeFile nf = as_native_file(source)
+ c_stream = nf.get_input_stream().get()
+
+ with nogil:
+ result.message = move(
+ GetResultValue(ReadMessage(c_stream, c_default_memory_pool())))
+
+ if result.message == nullptr:
+ raise EOFError("End of Arrow stream")
+
+ return result
+
+
+def read_schema(obj, DictionaryMemo dictionary_memo=None):
+ """
+ Read Schema from message or buffer
+
+ Parameters
+ ----------
+ obj : buffer or Message
+ dictionary_memo : DictionaryMemo, optional
+ Needed to be able to reconstruct dictionary-encoded fields
+ with read_record_batch
+
+ Returns
+ -------
+ schema : Schema
+ """
+ cdef:
+ shared_ptr[CSchema] result
+ shared_ptr[CRandomAccessFile] cpp_file
+ CDictionaryMemo temp_memo
+ CDictionaryMemo* arg_dict_memo
+
+ if isinstance(obj, Message):
+ raise NotImplementedError(type(obj))
+
+ get_reader(obj, True, &cpp_file)
+
+ if dictionary_memo is not None:
+ arg_dict_memo = dictionary_memo.memo
+ else:
+ arg_dict_memo = &temp_memo
+
+ with nogil:
+ result = GetResultValue(ReadSchema(cpp_file.get(), arg_dict_memo))
+
+ return pyarrow_wrap_schema(result)
+
+
+def read_record_batch(obj, Schema schema,
+ DictionaryMemo dictionary_memo=None):
+ """
+ Read RecordBatch from message, given a known schema. If reading data from a
+ complete IPC stream, use ipc.open_stream instead
+
+ Parameters
+ ----------
+ obj : Message or Buffer-like
+ schema : Schema
+ dictionary_memo : DictionaryMemo, optional
+ If message contains dictionaries, must pass a populated
+ DictionaryMemo
+
+ Returns
+ -------
+ batch : RecordBatch
+ """
+ cdef:
+ shared_ptr[CRecordBatch] result
+ Message message
+ CDictionaryMemo temp_memo
+ CDictionaryMemo* arg_dict_memo
+
+ if isinstance(obj, Message):
+ message = obj
+ else:
+ message = read_message(obj)
+
+ if dictionary_memo is not None:
+ arg_dict_memo = dictionary_memo.memo
+ else:
+ arg_dict_memo = &temp_memo
+
+ with nogil:
+ result = GetResultValue(
+ ReadRecordBatch(deref(message.message.get()),
+ schema.sp_schema,
+ arg_dict_memo,
+ CIpcReadOptions.Defaults()))
+
+ return pyarrow_wrap_batch(result)