1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
18 # Cython wrappers for IO interfaces defined in arrow::io and messaging in
21 from libc.stdlib cimport malloc, free
29 from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation
30 from queue import Queue, Empty as QueueEmpty
32 from pyarrow.util import _is_path_like, _stringify_path
36 DEFAULT_BUFFER_SIZE = 2 ** 16
39 # To let us get a PyObject* and avoid Cython auto-ref-counting
40 cdef extern from "Python.h":
41 PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
42 char *v, Py_ssize_t len) except NULL
45 def io_thread_count():
47 Return the number of threads to use for I/O operations.
49 Many operations, such as scanning a dataset, will implicitly make
50 use of this pool. The number of threads is set to a fixed value at
51 startup. It can be modified at runtime by calling
52 :func:`set_io_thread_count()`.
56 set_io_thread_count : Modify the size of this pool.
57 cpu_count : The analogous function for the CPU thread pool.
59 return GetIOThreadPoolCapacity()
62 def set_io_thread_count(int count):
64 Set the number of threads to use for I/O operations.
66 Many operations, such as scanning a dataset, will implicitly make
72 The max number of threads that may be used for I/O.
77 io_thread_count : Get the size of this pool.
78 set_cpu_count : The analogous function for the CPU thread pool.
81 raise ValueError("IO thread count must be strictly positive")
82 check_status(SetIOThreadPoolCapacity(count))
85 cdef class NativeFile(_Weakrefable):
87 The base class for all Arrow streams.
89 Streams are either readable, writable, or both.
90 They optionally support seeking.
92 While this class exposes methods to read or write data from Python, the
93 primary intent of using a Arrow stream is to pass it to other Arrow
94 facilities that will make use of it, such as Arrow IPC routines.
96 Be aware that there are subtle differences with regular Python files,
97 e.g. destroying a writable Arrow stream without closing it explicitly
98 will not flush any pending data.
102 self.own_file = False
103 self.is_readable = False
104 self.is_writable = False
105 self.is_seekable = False
107 def __dealloc__(self):
114 def __exit__(self, exc_type, exc_value, tb):
120 The file mode. Currently instances of NativeFile may support:
124 * rb+: binary read and write
126 # Emulate built-in file modes
127 if self.is_readable and self.is_writable:
129 elif self.is_readable:
131 elif self.is_writable:
134 raise ValueError('File object is malformed, has no mode')
138 return self.is_readable
142 return self.is_writable
146 return self.is_seekable
156 raise UnsupportedOperation()
161 return self.input_stream.get().closed()
162 elif self.is_writable:
163 return self.output_stream.get().closed()
171 check_status(self.input_stream.get().Close())
173 check_status(self.output_stream.get().Close())
175 cdef set_random_access_file(self, shared_ptr[CRandomAccessFile] handle):
176 self.input_stream = <shared_ptr[CInputStream]> handle
177 self.random_access = handle
178 self.is_seekable = True
180 cdef set_input_stream(self, shared_ptr[CInputStream] handle):
181 self.input_stream = handle
182 self.random_access.reset()
183 self.is_seekable = False
185 cdef set_output_stream(self, shared_ptr[COutputStream] handle):
186 self.output_stream = handle
188 cdef shared_ptr[CRandomAccessFile] get_random_access_file(self) except *:
189 self._assert_readable()
190 self._assert_seekable()
191 return self.random_access
193 cdef shared_ptr[CInputStream] get_input_stream(self) except *:
194 self._assert_readable()
195 return self.input_stream
197 cdef shared_ptr[COutputStream] get_output_stream(self) except *:
198 self._assert_writable()
199 return self.output_stream
201 def _assert_open(self):
203 raise ValueError("I/O operation on closed file")
205 def _assert_readable(self):
207 if not self.is_readable:
208 # XXX UnsupportedOperation
209 raise IOError("only valid on readable files")
211 def _assert_writable(self):
213 if not self.is_writable:
214 raise IOError("only valid on writable files")
216 def _assert_seekable(self):
218 if not self.is_seekable:
219 raise IOError("only valid on seekable files")
227 handle = self.get_random_access_file()
229 size = GetResultValue(handle.get().GetSize())
238 shared_ptr[const CKeyValueMetadata] c_metadata
240 handle = self.get_input_stream()
242 c_metadata = GetResultValue(handle.get().ReadMetadata())
245 if c_metadata.get() != nullptr:
246 for i in range(c_metadata.get().size()):
247 metadata[frombytes(c_metadata.get().key(i))] = \
248 c_metadata.get().value(i)
253 Return current stream position
255 cdef int64_t position
258 rd_handle = self.get_random_access_file()
260 position = GetResultValue(rd_handle.get().Tell())
262 wr_handle = self.get_output_stream()
264 position = GetResultValue(wr_handle.get().Tell())
268 def seek(self, int64_t position, int whence=0):
270 Change current file stream position
275 Byte offset, interpreted relative to value of whence argument
276 whence : int, default 0
277 Point of reference for seek offset
282 * 0 -- start of stream (the default); offset should be zero or positive
283 * 1 -- current stream position; offset may be negative
284 * 2 -- end of stream; offset is usually negative
288 new_position : the new absolute stream position
291 handle = self.get_random_access_file()
297 offset = GetResultValue(handle.get().Tell())
298 offset = offset + position
300 offset = GetResultValue(handle.get().GetSize())
301 offset = offset + position
304 raise ValueError("Invalid value of whence: {0}"
306 check_status(handle.get().Seek(offset))
312 Flush the stream, if applicable.
314 An error is raised if stream is not writable.
317 # For IOBase compatibility, flush() on an input stream is a no-op
319 handle = self.get_output_stream()
321 check_status(handle.get().Flush())
323 def write(self, data):
325 Write byte from any object implementing buffer protocol (bytes,
326 bytearray, ndarray, pyarrow.Buffer)
330 data : bytes-like object or exporter of buffer protocol
334 nbytes : number of bytes written
336 self._assert_writable()
337 handle = self.get_output_stream()
339 cdef shared_ptr[CBuffer] buf = as_c_buffer(data)
342 check_status(handle.get().WriteBuffer(buf))
343 return buf.get().size()
345 def read(self, nbytes=None):
347 Read indicated number of bytes from file, or read all remaining bytes
348 if no argument passed
352 nbytes : int, default None
360 int64_t bytes_read = 0
364 if not self.is_seekable:
365 # Cannot get file size => read chunkwise
369 chunk = self.read(bs)
373 return b"".join(chunks)
375 c_nbytes = self.size() - self.tell()
379 handle = self.get_input_stream()
381 # Allocate empty write space
382 obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
384 cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
386 bytes_read = GetResultValue(handle.get().Read(c_nbytes, buf))
388 if bytes_read < c_nbytes:
389 cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
391 return PyObject_to_object(obj)
393 def read_at(self, nbytes, offset):
395 Read indicated number of bytes at offset from the file
409 int64_t bytes_read = 0
416 handle = self.get_random_access_file()
418 # Allocate empty write space
419 obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
421 cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
423 bytes_read = GetResultValue(handle.get().
424 ReadAt(c_offset, c_nbytes, buf))
426 if bytes_read < c_nbytes:
427 cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
429 return PyObject_to_object(obj)
431 def read1(self, nbytes=None):
432 """Read and return up to n bytes.
434 Alias for read, needed to match the IOBase interface."""
435 return self.read(nbytes=None)
440 def readinto(self, b):
442 Read into the supplied buffer
446 b: any python object supporting buffer interface
450 number of bytes written
459 handle = self.get_input_stream()
461 py_buf = py_buffer(b)
462 buf_len = py_buf.size
463 buf = py_buf.buffer.get().mutable_data()
466 bytes_read = GetResultValue(handle.get().Read(buf_len, buf))
470 def readline(self, size=None):
471 """NOT IMPLEMENTED. Read and return a line of bytes from the file.
473 If size is specified, read at most size bytes.
475 Line terminator is always b"\\n".
478 raise UnsupportedOperation()
480 def readlines(self, hint=None):
481 """NOT IMPLEMENTED. Read lines of the file
486 hint: int maximum number of bytes read until we stop
489 raise UnsupportedOperation()
492 self._assert_readable()
496 line = self.readline()
501 def read_buffer(self, nbytes=None):
504 int64_t bytes_read = 0
505 shared_ptr[CBuffer] output
507 handle = self.get_input_stream()
510 if not self.is_seekable:
511 # Cannot get file size => read chunkwise
512 return py_buffer(self.read())
513 c_nbytes = self.size() - self.tell()
518 output = GetResultValue(handle.get().ReadBuffer(c_nbytes))
520 return pyarrow_wrap_buffer(output)
526 raise UnsupportedOperation()
528 def writelines(self, lines):
529 self._assert_writable()
534 def download(self, stream_or_path, buffer_size=None):
536 Read file completely to local path (rather than reading completely into
537 memory). First seeks to the beginning of the file.
540 int64_t bytes_read = 0
543 handle = self.get_input_stream()
545 buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
547 write_queue = Queue(50)
549 if not hasattr(stream_or_path, 'read'):
550 stream = open(stream_or_path, 'wb')
555 stream = stream_or_path
565 while not done or write_queue.qsize() > 0:
567 buf = write_queue.get(timeout=0.01)
571 except Exception as e:
572 exc_info = sys.exc_info()
578 writer_thread = threading.Thread(target=bg_write)
580 # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
581 # the passed buffer, so it's hard for us to avoid doubling the memory
582 buf = <uint8_t*> malloc(buffer_size)
584 raise MemoryError("Failed to allocate {0} bytes"
585 .format(buffer_size))
587 writer_thread.start()
589 cdef int64_t total_bytes = 0
590 cdef int32_t c_buffer_size = buffer_size
595 bytes_read = GetResultValue(
596 handle.get().Read(c_buffer_size, buf))
598 total_bytes += bytes_read
604 pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
607 if writer_thread.is_alive():
608 while write_queue.full():
613 write_queue.put_nowait(pybuf)
619 if exc_info is not None:
620 raise exc_info[0], exc_info[1], exc_info[2]
622 def upload(self, stream, buffer_size=None):
624 Pipe file-like object to file
626 write_queue = Queue(50)
627 self._assert_writable()
629 buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
636 while not done or write_queue.qsize() > 0:
638 buf = write_queue.get(timeout=0.01)
644 except Exception as e:
645 exc_info = sys.exc_info()
647 writer_thread = threading.Thread(target=bg_write)
648 writer_thread.start()
652 buf = stream.read(buffer_size)
656 if writer_thread.is_alive():
657 while write_queue.full():
662 write_queue.put_nowait(buf)
667 if exc_info is not None:
668 raise exc_info[0], exc_info[1], exc_info[2]
670 BufferedIOBase.register(NativeFile)
672 # ----------------------------------------------------------------------
673 # Python file-like objects
676 cdef class PythonFile(NativeFile):
678 A stream backed by a Python file object.
680 This class allows using Python file objects with arbitrary Arrow
681 functions, including functions written in another language than Python.
683 As a downside, there is a non-zero redirection cost in translating
684 Arrow stream calls to Python method calls. Furthermore, Python's
685 Global Interpreter Lock may limit parallelism in some situations.
690 def __cinit__(self, handle, mode=None):
695 inferred_mode = handle.mode
696 except AttributeError:
697 # Not all file-like objects have a mode attribute
700 inferred_mode = 'w' if handle.writable() else 'r'
701 except AttributeError:
702 raise ValueError("could not infer open mode for file-like "
703 "object %r, please pass it explicitly"
708 if inferred_mode.startswith('w'):
710 elif inferred_mode.startswith('r'):
713 raise ValueError('Invalid file mode: {0}'.format(mode))
715 # If mode was given, check it matches the given file
717 if isinstance(handle, IOBase):
720 if not handle.readable():
721 raise TypeError("readable file expected")
723 if not handle.writable():
724 raise TypeError("writable file expected")
725 # (other duck-typed file-like objects are possible)
727 # If possible, check the file is a binary file
728 if isinstance(handle, TextIOBase):
729 raise TypeError("binary file expected, got text file")
732 self.set_random_access_file(
733 shared_ptr[CRandomAccessFile](new PyReadableFile(handle)))
734 self.is_readable = True
736 self.set_output_stream(
737 shared_ptr[COutputStream](new PyOutputStream(handle)))
738 self.is_writable = True
740 def truncate(self, pos=None):
741 self.handle.truncate(pos)
743 def readline(self, size=None):
744 return self.handle.readline(size)
746 def readlines(self, hint=None):
747 return self.handle.readlines(hint)
750 cdef class MemoryMappedFile(NativeFile):
752 A stream that represents a memory-mapped file.
754 Supports 'r', 'r+', 'w' modes.
757 shared_ptr[CMemoryMappedFile] handle
761 def create(path, size):
763 Create a MemoryMappedFile
768 Where to create the file.
770 Size of the memory mapped file.
773 shared_ptr[CMemoryMappedFile] handle
774 c_string c_path = encode_file_path(path)
775 int64_t c_size = size
778 handle = GetResultValue(CMemoryMappedFile.Create(c_path, c_size))
780 cdef MemoryMappedFile result = MemoryMappedFile()
782 result.is_readable = True
783 result.is_writable = True
784 result.set_output_stream(<shared_ptr[COutputStream]> handle)
785 result.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle)
786 result.handle = handle
790 def _open(self, path, mode='r'):
795 shared_ptr[CMemoryMappedFile] handle
796 c_string c_path = encode_file_path(path)
798 if mode in ('r', 'rb'):
799 c_mode = FileMode_READ
800 self.is_readable = True
801 elif mode in ('w', 'wb'):
802 c_mode = FileMode_WRITE
803 self.is_writable = True
804 elif mode in ('r+', 'r+b', 'rb+'):
805 c_mode = FileMode_READWRITE
806 self.is_readable = True
807 self.is_writable = True
809 raise ValueError('Invalid file mode: {0}'.format(mode))
812 handle = GetResultValue(CMemoryMappedFile.Open(c_path, c_mode))
814 self.set_output_stream(<shared_ptr[COutputStream]> handle)
815 self.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle)
818 def resize(self, new_size):
820 Resize the map and underlying file.
824 new_size : new size in bytes
826 check_status(self.handle.get().Resize(new_size))
830 return self.handle.get().file_descriptor()
833 def memory_map(path, mode='r'):
835 Open memory map at file path. Size of the memory map cannot change.
840 mode : {'r', 'r+', 'w'}, default 'r'
841 Whether the file is opened for reading ('r+'), writing ('w')
846 mmap : MemoryMappedFile
850 cdef MemoryMappedFile mmap = MemoryMappedFile()
851 mmap._open(path, mode)
855 cdef _check_is_file(path):
856 if os.path.isdir(path):
857 raise IOError("Expected file path, but {0} is a directory"
861 def create_memory_map(path, size):
863 Create a file of the given size and memory-map it.
868 The file path to create, on the local filesystem.
870 The file size to create.
874 mmap : MemoryMappedFile
876 return MemoryMappedFile.create(path, size)
879 cdef class OSFile(NativeFile):
881 A stream backed by a regular file descriptor.
886 def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
892 shared_ptr[Readable] handle
893 c_string c_path = encode_file_path(path)
895 if mode in ('r', 'rb'):
896 self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
897 elif mode in ('w', 'wb'):
898 self._open_writable(c_path)
900 raise ValueError('Invalid file mode: {0}'.format(mode))
902 cdef _open_readable(self, c_string path, CMemoryPool* pool):
903 cdef shared_ptr[ReadableFile] handle
906 handle = GetResultValue(ReadableFile.Open(path, pool))
908 self.is_readable = True
909 self.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle)
911 cdef _open_writable(self, c_string path):
913 self.output_stream = GetResultValue(FileOutputStream.Open(path))
914 self.is_writable = True
918 return self.handle.file_descriptor()
921 cdef class FixedSizeBufferWriter(NativeFile):
923 A stream writing to a Arrow buffer.
926 def __cinit__(self, Buffer buffer):
927 self.output_stream.reset(new CFixedSizeBufferWriter(buffer.buffer))
928 self.is_writable = True
930 def set_memcopy_threads(self, int num_threads):
931 cdef CFixedSizeBufferWriter* writer = \
932 <CFixedSizeBufferWriter*> self.output_stream.get()
933 writer.set_memcopy_threads(num_threads)
935 def set_memcopy_blocksize(self, int64_t blocksize):
936 cdef CFixedSizeBufferWriter* writer = \
937 <CFixedSizeBufferWriter*> self.output_stream.get()
938 writer.set_memcopy_blocksize(blocksize)
940 def set_memcopy_threshold(self, int64_t threshold):
941 cdef CFixedSizeBufferWriter* writer = \
942 <CFixedSizeBufferWriter*> self.output_stream.get()
943 writer.set_memcopy_threshold(threshold)
946 # ----------------------------------------------------------------------
950 cdef class Buffer(_Weakrefable):
952 The base class for all Arrow buffers.
954 A buffer represents a contiguous memory area. Many buffers will own
955 their memory, though not all of them do.
962 raise TypeError("Do not call Buffer's constructor directly, use "
963 "`pyarrow.py_buffer` function instead.")
965 cdef void init(self, const shared_ptr[CBuffer]& buffer):
967 self.shape[0] = self.size
968 self.strides[0] = <Py_ssize_t>(1)
976 The buffer size in bytes.
978 return self.buffer.get().size()
983 The buffer's address, as an integer.
985 The returned address may point to CPU or device memory.
986 Use `is_cpu()` to disambiguate.
988 return self.buffer.get().address()
992 Compute hexadecimal representation of the buffer.
998 return self.buffer.get().ToHexString()
1001 def is_mutable(self):
1003 Whether the buffer is mutable.
1005 return self.buffer.get().is_mutable()
1010 Whether the buffer is CPU-accessible.
1012 return self.buffer.get().is_cpu()
1016 cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
1018 if parent_buf.get() == NULL:
1021 return pyarrow_wrap_buffer(parent_buf)
1023 def __getitem__(self, key):
1024 if PySlice_Check(key):
1025 if (key.step or 1) != 1:
1026 raise IndexError('only slices with step 1 supported')
1027 return _normalize_slice(self, key)
1029 return self.getitem(_normalize_index(key, self.size))
1031 cdef getitem(self, int64_t i):
1032 return self.buffer.get().data()[i]
1034 def slice(self, offset=0, length=None):
1036 Slice this buffer. Memory is not copied.
1038 You can also use the Python slice notation ``buffer[start:stop]``.
1042 offset : int, default 0
1043 Offset from start of buffer to slice.
1044 length : int, default None
1045 Length of slice (default is until end of Buffer starting from
1051 A logical view over this buffer.
1053 cdef shared_ptr[CBuffer] result
1056 raise IndexError('Offset must be non-negative')
1059 result = SliceBuffer(self.buffer, offset)
1061 result = SliceBuffer(self.buffer, offset, max(length, 0))
1063 return pyarrow_wrap_buffer(result)
1065 def equals(self, Buffer other):
1067 Determine if two buffers contain exactly the same data.
1075 are_equal : True if buffer contents and size are equal
1077 cdef c_bool result = False
1079 result = self.buffer.get().Equals(deref(other.buffer.get()))
1082 def __eq__(self, other):
1083 if isinstance(other, Buffer):
1084 return self.equals(other)
1086 return self.equals(py_buffer(other))
1088 def __reduce_ex__(self, protocol):
1090 return py_buffer, (builtin_pickle.PickleBuffer(self),)
1092 return py_buffer, (self.to_pybytes(),)
1094 def to_pybytes(self):
1096 Return this buffer as a Python bytes object. Memory is copied.
1098 return cp.PyBytes_FromStringAndSize(
1099 <const char*>self.buffer.get().data(),
1100 self.buffer.get().size())
1102 def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
1103 if self.buffer.get().is_mutable():
1106 if flags & cp.PyBUF_WRITABLE:
1107 raise BufferError("Writable buffer requested but Arrow "
1108 "buffer was not mutable")
1110 buffer.buf = <char *>self.buffer.get().data()
1112 buffer.internal = NULL
1114 buffer.len = self.size
1117 buffer.shape = self.shape
1118 buffer.strides = self.strides
1119 buffer.suboffsets = NULL
1121 def __getsegcount__(self, Py_ssize_t *len_out):
1123 len_out[0] = <Py_ssize_t>self.size
1126 def __getreadbuffer__(self, Py_ssize_t idx, void **p):
1128 raise SystemError("accessing non-existent buffer segment")
1130 p[0] = <void*> self.buffer.get().data()
1133 def __getwritebuffer__(self, Py_ssize_t idx, void **p):
1134 if not self.buffer.get().is_mutable():
1135 raise SystemError("trying to write an immutable buffer")
1137 raise SystemError("accessing non-existent buffer segment")
1139 p[0] = <void*> self.buffer.get().data()
1143 cdef class ResizableBuffer(Buffer):
1145 A base class for buffers that can be resized.
1148 cdef void init_rz(self, const shared_ptr[CResizableBuffer]& buffer):
1149 self.init(<shared_ptr[CBuffer]> buffer)
1151 def resize(self, int64_t new_size, shrink_to_fit=False):
1153 Resize buffer to indicated size.
1158 New size of buffer (padding may be added internally).
1159 shrink_to_fit : bool, default False
1160 If this is true, the buffer is shrunk when new_size is less
1161 than the current size.
1162 If this is false, the buffer is never shrunk.
1164 cdef c_bool c_shrink_to_fit = shrink_to_fit
1166 check_status((<CResizableBuffer*> self.buffer.get())
1167 .Resize(new_size, c_shrink_to_fit))
1170 cdef shared_ptr[CResizableBuffer] _allocate_buffer(CMemoryPool* pool) except *:
1172 return to_shared(GetResultValue(AllocateResizableBuffer(0, pool)))
1175 def allocate_buffer(int64_t size, MemoryPool memory_pool=None,
1178 Allocate a mutable buffer.
1183 Number of bytes to allocate (plus internal padding)
1184 memory_pool : MemoryPool, optional
1185 The pool to allocate memory from.
1186 If not given, the default memory pool is used.
1187 resizable : bool, default False
1188 If true, the returned buffer is resizable.
1192 buffer : Buffer or ResizableBuffer
1195 CMemoryPool* cpool = maybe_unbox_memory_pool(memory_pool)
1196 shared_ptr[CResizableBuffer] c_rz_buffer
1197 shared_ptr[CBuffer] c_buffer
1201 c_rz_buffer = to_shared(GetResultValue(
1202 AllocateResizableBuffer(size, cpool)))
1203 return pyarrow_wrap_resizable_buffer(c_rz_buffer)
1206 c_buffer = to_shared(GetResultValue(AllocateBuffer(size, cpool)))
1207 return pyarrow_wrap_buffer(c_buffer)
1210 cdef class BufferOutputStream(NativeFile):
1213 shared_ptr[CResizableBuffer] buffer
1215 def __cinit__(self, MemoryPool memory_pool=None):
1216 self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
1217 self.output_stream.reset(new CBufferOutputStream(
1218 <shared_ptr[CResizableBuffer]> self.buffer))
1219 self.is_writable = True
1223 Finalize output stream and return result as pyarrow.Buffer.
1230 check_status(self.output_stream.get().Close())
1231 return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
1234 cdef class MockOutputStream(NativeFile):
1236 def __cinit__(self):
1237 self.output_stream.reset(new CMockOutputStream())
1238 self.is_writable = True
1241 handle = <CMockOutputStream*> self.output_stream.get()
1242 return handle.GetExtentBytesWritten()
1245 cdef class BufferReader(NativeFile):
1247 Zero-copy reader from objects convertible to Arrow buffer.
1251 obj : Python bytes or pyarrow.Buffer
1256 def __cinit__(self, object obj):
1257 self.buffer = as_buffer(obj)
1258 self.set_random_access_file(shared_ptr[CRandomAccessFile](
1259 new CBufferReader(self.buffer.buffer)))
1260 self.is_readable = True
1263 cdef class CompressedInputStream(NativeFile):
1265 An input stream wrapper which decompresses data on the fly.
1269 stream : string, path, pa.NativeFile, or file-like object
1270 Input stream object to wrap with the compression.
1272 The compression type ("bz2", "brotli", "gzip", "lz4" or "zstd").
1275 def __init__(self, object stream, str compression not None):
1278 Codec codec = Codec(compression)
1279 shared_ptr[CInputStream] c_reader
1280 shared_ptr[CCompressedInputStream] compressed_stream
1281 nf = get_native_file(stream, False)
1282 c_reader = nf.get_input_stream()
1283 compressed_stream = GetResultValue(
1284 CCompressedInputStream.Make(codec.unwrap(), c_reader)
1286 self.set_input_stream(<shared_ptr[CInputStream]> compressed_stream)
1287 self.is_readable = True
1290 cdef class CompressedOutputStream(NativeFile):
1292 An output stream wrapper which compresses data on the fly.
1296 stream : string, path, pa.NativeFile, or file-like object
1297 Input stream object to wrap with the compression.
1299 The compression type ("bz2", "brotli", "gzip", "lz4" or "zstd").
1302 def __init__(self, object stream, str compression not None):
1304 Codec codec = Codec(compression)
1305 shared_ptr[COutputStream] c_writer
1306 shared_ptr[CCompressedOutputStream] compressed_stream
1307 get_writer(stream, &c_writer)
1308 compressed_stream = GetResultValue(
1309 CCompressedOutputStream.Make(codec.unwrap(), c_writer)
1311 self.set_output_stream(<shared_ptr[COutputStream]> compressed_stream)
1312 self.is_writable = True
1315 ctypedef CBufferedInputStream* _CBufferedInputStreamPtr
1316 ctypedef CBufferedOutputStream* _CBufferedOutputStreamPtr
1317 ctypedef CRandomAccessFile* _RandomAccessFilePtr
1320 cdef class BufferedInputStream(NativeFile):
1322 An input stream that performs buffered reads from
1323 an unbuffered input stream, which can mitigate the overhead
1324 of many small reads in some cases.
1329 The input stream to wrap with the buffer
1331 Size of the temporary read buffer.
1332 memory_pool : MemoryPool
1333 The memory pool used to allocate the buffer.
1336 def __init__(self, NativeFile stream, int buffer_size,
1337 MemoryPool memory_pool=None):
1338 cdef shared_ptr[CBufferedInputStream] buffered_stream
1340 if buffer_size <= 0:
1341 raise ValueError('Buffer size must be larger than zero')
1342 buffered_stream = GetResultValue(CBufferedInputStream.Create(
1343 buffer_size, maybe_unbox_memory_pool(memory_pool),
1344 stream.get_input_stream()))
1346 self.set_input_stream(<shared_ptr[CInputStream]> buffered_stream)
1347 self.is_readable = True
1351 Release the raw InputStream.
1352 Further operations on this stream are invalid.
1357 The underlying raw input stream
1360 shared_ptr[CInputStream] c_raw
1361 _CBufferedInputStreamPtr buffered
1364 buffered = dynamic_cast[_CBufferedInputStreamPtr](
1365 self.input_stream.get())
1366 assert buffered != nullptr
1369 c_raw = GetResultValue(buffered.Detach())
1372 raw.is_readable = True
1373 # Find out whether the raw stream is a RandomAccessFile
1374 # or a mere InputStream. This helps us support seek() etc.
1376 if dynamic_cast[_RandomAccessFilePtr](c_raw.get()) != nullptr:
1377 raw.set_random_access_file(
1378 static_pointer_cast[CRandomAccessFile, CInputStream](c_raw))
1380 raw.set_input_stream(c_raw)
1384 cdef class BufferedOutputStream(NativeFile):
1386 An output stream that performs buffered reads from
1387 an unbuffered output stream, which can mitigate the overhead
1388 of many small writes in some cases.
1393 The writable output stream to wrap with the buffer
1395 Size of the buffer that should be added.
1396 memory_pool : MemoryPool
1397 The memory pool used to allocate the buffer.
1400 def __init__(self, NativeFile stream, int buffer_size,
1401 MemoryPool memory_pool=None):
1402 cdef shared_ptr[CBufferedOutputStream] buffered_stream
1404 if buffer_size <= 0:
1405 raise ValueError('Buffer size must be larger than zero')
1406 buffered_stream = GetResultValue(CBufferedOutputStream.Create(
1407 buffer_size, maybe_unbox_memory_pool(memory_pool),
1408 stream.get_output_stream()))
1410 self.set_output_stream(<shared_ptr[COutputStream]> buffered_stream)
1411 self.is_writable = True
1415 Flush any buffered writes and release the raw OutputStream.
1416 Further operations on this stream are invalid.
1421 The underlying raw output stream.
1424 shared_ptr[COutputStream] c_raw
1425 _CBufferedOutputStreamPtr buffered
1428 buffered = dynamic_cast[_CBufferedOutputStreamPtr](
1429 self.output_stream.get())
1430 assert buffered != nullptr
1433 c_raw = GetResultValue(buffered.Detach())
1436 raw.is_writable = True
1437 raw.set_output_stream(c_raw)
1441 cdef void _cb_transform(transform_func, const shared_ptr[CBuffer]& src,
1442 shared_ptr[CBuffer]* dest) except *:
1443 py_dest = transform_func(pyarrow_wrap_buffer(src))
1444 dest[0] = pyarrow_unwrap_buffer(py_buffer(py_dest))
1447 cdef class TransformInputStream(NativeFile):
1449 Transform an input stream.
1454 The stream to transform.
1455 transform_func : callable
1456 The transformation to apply.
1459 def __init__(self, NativeFile stream, transform_func):
1460 self.set_input_stream(TransformInputStream.make_native(
1461 stream.get_input_stream(), transform_func))
1462 self.is_readable = True
1465 cdef shared_ptr[CInputStream] make_native(
1466 shared_ptr[CInputStream] stream, transform_func) except *:
1468 shared_ptr[CInputStream] transform_stream
1469 CTransformInputStreamVTable vtable
1471 vtable.transform = _cb_transform
1472 return MakeTransformInputStream(stream, move(vtable),
1478 def __init__(self, decoder, encoder):
1479 self._decoder = decoder
1480 self._encoder = encoder
1482 def __call__(self, buf):
1483 final = len(buf) == 0
1484 return self._encoder.encode(self._decoder.decode(buf, final), final)
1487 def transcoding_input_stream(stream, src_encoding, dest_encoding):
1489 Add a transcoding transformation to the stream.
1490 Incoming data will be decoded according to ``src_encoding`` and
1491 then re-encoded according to ``dest_encoding``.
1496 The stream to which the transformation should be applied.
1498 The codec to use when reading data data.
1500 The codec to use for emitted data.
1502 src_codec = codecs.lookup(src_encoding)
1503 dest_codec = codecs.lookup(dest_encoding)
1504 if src_codec.name == dest_codec.name:
1505 # Avoid losing performance on no-op transcoding
1506 # (encoding errors won't be detected)
1508 return TransformInputStream(stream,
1509 Transcoder(src_codec.incrementaldecoder(),
1510 dest_codec.incrementalencoder()))
1513 cdef shared_ptr[CInputStream] native_transcoding_input_stream(
1514 shared_ptr[CInputStream] stream, src_encoding,
1515 dest_encoding) except *:
1516 src_codec = codecs.lookup(src_encoding)
1517 dest_codec = codecs.lookup(dest_encoding)
1518 if src_codec.name == dest_codec.name:
1519 # Avoid losing performance on no-op transcoding
1520 # (encoding errors won't be detected)
1522 return TransformInputStream.make_native(
1523 stream, Transcoder(src_codec.incrementaldecoder(),
1524 dest_codec.incrementalencoder()))
1527 def py_buffer(object obj):
1529 Construct an Arrow buffer from a Python bytes-like or buffer-like object
1534 the object from which the buffer should be constructed.
1536 cdef shared_ptr[CBuffer] buf
1537 buf = GetResultValue(PyBuffer.FromPyObject(obj))
1538 return pyarrow_wrap_buffer(buf)
1541 def foreign_buffer(address, size, base=None):
1543 Construct an Arrow buffer with the given *address* and *size*.
1545 The buffer will be optionally backed by the Python *base* object, if given.
1546 The *base* object will be kept alive as long as this buffer is alive,
1547 including across language boundaries (for example if the buffer is
1548 referenced by C++ code).
1553 The starting address of the buffer. The address can
1554 refer to both device or host memory but it must be
1555 accessible from device after mapping it with
1556 `get_device_address` method.
1558 The size of device buffer in bytes.
1559 base : {None, object}
1560 Object that owns the referenced memory.
1563 intptr_t c_addr = address
1564 int64_t c_size = size
1565 shared_ptr[CBuffer] buf
1567 check_status(PyForeignBuffer.Make(<uint8_t*> c_addr, c_size,
1569 return pyarrow_wrap_buffer(buf)
1572 def as_buffer(object o):
1573 if isinstance(o, Buffer):
1578 cdef shared_ptr[CBuffer] as_c_buffer(object o) except *:
1579 cdef shared_ptr[CBuffer] buf
1580 if isinstance(o, Buffer):
1581 buf = (<Buffer> o).buffer
1583 raise ValueError("got null buffer")
1585 buf = GetResultValue(PyBuffer.FromPyObject(o))
1589 cdef NativeFile get_native_file(object source, c_bool use_memory_map):
1591 source_path = _stringify_path(source)
1593 if isinstance(source, Buffer):
1594 source = BufferReader(source)
1595 elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
1596 # Optimistically hope this is file-like
1597 source = PythonFile(source, mode='r')
1600 source = memory_map(source_path, mode='r')
1602 source = OSFile(source_path, mode='r')
1607 cdef get_reader(object source, c_bool use_memory_map,
1608 shared_ptr[CRandomAccessFile]* reader):
1611 nf = get_native_file(source, use_memory_map)
1612 reader[0] = nf.get_random_access_file()
1615 cdef get_input_stream(object source, c_bool use_memory_map,
1616 shared_ptr[CInputStream]* out):
1618 Like get_reader(), but can automatically decompress, and returns
1624 shared_ptr[CInputStream] input_stream
1627 codec = Codec.detect(source)
1631 nf = get_native_file(source, use_memory_map)
1632 input_stream = nf.get_input_stream()
1634 # codec is None if compression can't be detected
1635 if codec is not None:
1636 input_stream = <shared_ptr[CInputStream]> GetResultValue(
1637 CCompressedInputStream.Make(codec.unwrap(), input_stream)
1640 out[0] = input_stream
1643 cdef get_writer(object source, shared_ptr[COutputStream]* writer):
1647 source_path = _stringify_path(source)
1649 if not isinstance(source, NativeFile) and hasattr(source, 'write'):
1650 # Optimistically hope this is file-like
1651 source = PythonFile(source, mode='w')
1653 source = OSFile(source_path, mode='w')
1655 if isinstance(source, NativeFile):
1657 writer[0] = nf.get_output_stream()
1659 raise TypeError('Unable to read from object of type: {0}'
1660 .format(type(source)))
1663 # ---------------------------------------------------------------------
1666 def _detect_compression(path):
1667 if isinstance(path, str):
1668 if path.endswith('.bz2'):
1670 elif path.endswith('.gz'):
1672 elif path.endswith('.lz4'):
1674 elif path.endswith('.zst'):
1678 cdef CCompressionType _ensure_compression(str name) except *:
1679 uppercase = name.upper()
1680 if uppercase == 'BZ2':
1681 return CCompressionType_BZ2
1682 elif uppercase == 'GZIP':
1683 return CCompressionType_GZIP
1684 elif uppercase == 'BROTLI':
1685 return CCompressionType_BROTLI
1686 elif uppercase == 'LZ4' or uppercase == 'LZ4_FRAME':
1687 return CCompressionType_LZ4_FRAME
1688 elif uppercase == 'LZ4_RAW':
1689 return CCompressionType_LZ4
1690 elif uppercase == 'SNAPPY':
1691 return CCompressionType_SNAPPY
1692 elif uppercase == 'ZSTD':
1693 return CCompressionType_ZSTD
1695 raise ValueError('Invalid value for compression: {!r}'.format(name))
1698 cdef class Codec(_Weakrefable):
1705 Type of compression codec to initialize, valid values are: 'gzip',
1706 'bz2', 'brotli', 'lz4' (or 'lz4_frame'), 'lz4_raw', 'zstd' and
1708 compression_level : int, None
1709 Optional parameter specifying how aggressively to compress. The
1710 possible ranges and effect of this parameter depend on the specific
1711 codec chosen. Higher values compress more but typically use more
1712 resources (CPU/RAM). Some codecs support negative values.
1715 The compression_level maps to the memlevel parameter of
1716 deflateInit2. Higher levels use more RAM but are faster
1717 and should have higher compression ratios.
1720 The compression level maps to the blockSize100k parameter of
1721 the BZ2_bzCompressInit function. Higher levels use more RAM
1722 but are faster and should have higher compression ratios.
1725 The compression level maps to the BROTLI_PARAM_QUALITY
1726 parameter. Higher values are slower and should have higher
1729 lz4/lz4_frame/lz4_raw
1730 The compression level parameter is not supported and must
1734 The compression level maps to the compressionLevel parameter
1735 of ZSTD_initCStream. Negative values are supported. Higher
1736 values are slower and should have higher compression ratios.
1739 The compression level parameter is not supported and must
1746 If invalid compression value is passed.
1749 def __init__(self, str compression not None, compression_level=None):
1750 cdef CCompressionType typ = _ensure_compression(compression)
1751 if compression_level is not None:
1752 self.wrapped = shared_ptr[CCodec](move(GetResultValue(
1753 CCodec.CreateWithLevel(typ, compression_level))))
1755 self.wrapped = shared_ptr[CCodec](move(GetResultValue(
1756 CCodec.Create(typ))))
1758 cdef inline CCodec* unwrap(self) nogil:
1759 return self.wrapped.get()
1764 Detect and instantiate compression codec based on file extension.
1768 path : str, path-like
1769 File-path to detect compression from.
1774 If the passed value is not path-like.
1776 If the compression can't be detected from the path.
1782 return Codec(_detect_compression(_stringify_path(path)))
1785 def is_available(str compression not None):
1787 Returns whether the compression support has been built and enabled.
1792 Type of compression codec,
1793 refer to Codec docstring for a list of supported ones.
1799 cdef CCompressionType typ = _ensure_compression(compression)
1800 return CCodec.IsAvailable(typ)
1803 def supports_compression_level(str compression not None):
1805 Returns true if the compression level parameter is supported
1806 for the given codec.
1811 Type of compression codec,
1812 refer to Codec docstring for a list of supported ones.
1814 cdef CCompressionType typ = _ensure_compression(compression)
1815 return CCodec.SupportsCompressionLevel(typ)
1818 def default_compression_level(str compression not None):
1820 Returns the compression level that Arrow will use for the codec if
1826 Type of compression codec,
1827 refer to Codec docstring for a list of supported ones.
1829 cdef CCompressionType typ = _ensure_compression(compression)
1830 return GetResultValue(CCodec.DefaultCompressionLevel(typ))
1833 def minimum_compression_level(str compression not None):
1835 Returns the smallest valid value for the compression level
1840 Type of compression codec,
1841 refer to Codec docstring for a list of supported ones.
1843 cdef CCompressionType typ = _ensure_compression(compression)
1844 return GetResultValue(CCodec.MinimumCompressionLevel(typ))
1847 def maximum_compression_level(str compression not None):
1849 Returns the largest valid value for the compression level
1854 Type of compression codec,
1855 refer to Codec docstring for a list of supported ones.
1857 cdef CCompressionType typ = _ensure_compression(compression)
1858 return GetResultValue(CCodec.MaximumCompressionLevel(typ))
1862 """Returns the name of the codec"""
1863 return frombytes(self.unwrap().name())
1866 def compression_level(self):
1867 """Returns the compression level parameter of the codec"""
1868 return frombytes(self.unwrap().compression_level())
1870 def compress(self, object buf, asbytes=False, memory_pool=None):
1872 Compress data from buffer-like object.
1876 buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol
1877 asbytes : bool, default False
1878 Return result as Python bytes object, otherwise Buffer
1879 memory_pool : MemoryPool, default None
1880 Memory pool to use for buffer allocations, if any
1884 compressed : pyarrow.Buffer or bytes (if asbytes=True)
1887 shared_ptr[CBuffer] owned_buf
1890 ResizableBuffer out_buf
1891 int64_t max_output_size
1892 int64_t output_length
1893 uint8_t* output_buffer = NULL
1895 owned_buf = as_c_buffer(buf)
1896 c_buf = owned_buf.get()
1898 max_output_size = self.wrapped.get().MaxCompressedLen(
1899 c_buf.size(), c_buf.data()
1903 pyobj = PyBytes_FromStringAndSizeNative(NULL, max_output_size)
1904 output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(<object> pyobj)
1906 out_buf = allocate_buffer(
1907 max_output_size, memory_pool=memory_pool, resizable=True
1909 output_buffer = out_buf.buffer.get().mutable_data()
1912 output_length = GetResultValue(
1913 self.unwrap().Compress(
1922 cp._PyBytes_Resize(&pyobj, <Py_ssize_t> output_length)
1923 return PyObject_to_object(pyobj)
1925 out_buf.resize(output_length)
1928 def decompress(self, object buf, decompressed_size=None, asbytes=False,
1931 Decompress data from buffer-like object.
1935 buf : pyarrow.Buffer, bytes, or memoryview-compatible object
1936 decompressed_size : int64_t, default None
1937 If not specified, will be computed if the codec is able to
1938 determine the uncompressed buffer size.
1939 asbytes : boolean, default False
1940 Return result as Python bytes object, otherwise Buffer
1941 memory_pool : MemoryPool, default None
1942 Memory pool to use for buffer allocations, if any.
1946 uncompressed : pyarrow.Buffer or bytes (if asbytes=True)
1949 shared_ptr[CBuffer] owned_buf
1953 uint8_t* output_buffer = NULL
1955 owned_buf = as_c_buffer(buf)
1956 c_buf = owned_buf.get()
1958 if decompressed_size is None:
1960 "Must pass decompressed_size for {} codec".format(self)
1963 output_size = decompressed_size
1966 pybuf = cp.PyBytes_FromStringAndSize(NULL, output_size)
1967 output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(pybuf)
1969 out_buf = allocate_buffer(output_size, memory_pool=memory_pool)
1970 output_buffer = out_buf.buffer.get().mutable_data()
1974 self.unwrap().Decompress(
1982 return pybuf if asbytes else out_buf
1985 def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
1987 Compress data from buffer-like object.
1991 buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol
1992 codec : str, default 'lz4'
1994 Supported types: {'brotli, 'gzip', 'lz4', 'lz4_raw', 'snappy', 'zstd'}
1995 asbytes : bool, default False
1996 Return result as Python bytes object, otherwise Buffer.
1997 memory_pool : MemoryPool, default None
1998 Memory pool to use for buffer allocations, if any.
2002 compressed : pyarrow.Buffer or bytes (if asbytes=True)
2004 cdef Codec coder = Codec(codec)
2005 return coder.compress(buf, asbytes=asbytes, memory_pool=memory_pool)
2008 def decompress(object buf, decompressed_size=None, codec='lz4',
2009 asbytes=False, memory_pool=None):
2011 Decompress data from buffer-like object.
2015 buf : pyarrow.Buffer, bytes, or memoryview-compatible object
2016 Input object to decompress data from.
2017 decompressed_size : int64_t, default None
2018 If not specified, will be computed if the codec is able to determine
2019 the uncompressed buffer size.
2020 codec : str, default 'lz4'
2022 Supported types: {'brotli, 'gzip', 'lz4', 'lz4_raw', 'snappy', 'zstd'}
2023 asbytes : bool, default False
2024 Return result as Python bytes object, otherwise Buffer.
2025 memory_pool : MemoryPool, default None
2026 Memory pool to use for buffer allocations, if any.
2030 uncompressed : pyarrow.Buffer or bytes (if asbytes=True)
2032 cdef Codec decoder = Codec(codec)
2033 return decoder.decompress(buf, asbytes=asbytes, memory_pool=memory_pool,
2034 decompressed_size=decompressed_size)
2037 def input_stream(source, compression='detect', buffer_size=None):
2039 Create an Arrow input stream.
2043 source : str, Path, buffer, file-like object, ...
2044 The source to open for reading.
2045 compression : str optional, default 'detect'
2046 The compression algorithm to use for on-the-fly decompression.
2047 If "detect" and source is a file path, then compression will be
2048 chosen based on the file extension.
2049 If None, no compression will be applied.
2050 Otherwise, a well-known algorithm name must be supplied (e.g. "gzip").
2051 buffer_size : int, default None
2052 If None or 0, no buffering will happen. Otherwise the size of the
2053 temporary read buffer.
2055 cdef NativeFile stream
2058 source_path = _stringify_path(source)
2062 if isinstance(source, NativeFile):
2064 elif source_path is not None:
2065 stream = OSFile(source_path, 'r')
2066 elif isinstance(source, (Buffer, memoryview)):
2067 stream = BufferReader(as_buffer(source))
2068 elif (hasattr(source, 'read') and
2069 hasattr(source, 'close') and
2070 hasattr(source, 'closed')):
2071 stream = PythonFile(source, 'r')
2073 raise TypeError("pa.input_stream() called with instance of '{}'"
2074 .format(source.__class__))
2076 if compression == 'detect':
2077 # detect for OSFile too
2078 compression = _detect_compression(source_path)
2080 if buffer_size is not None and buffer_size != 0:
2081 stream = BufferedInputStream(stream, buffer_size)
2083 if compression is not None:
2084 stream = CompressedInputStream(stream, compression)
2089 def output_stream(source, compression='detect', buffer_size=None):
2091 Create an Arrow output stream.
2095 source : str, Path, buffer, file-like object, ...
2096 The source to open for writing.
2097 compression : str optional, default 'detect'
2098 The compression algorithm to use for on-the-fly compression.
2099 If "detect" and source is a file path, then compression will be
2100 chosen based on the file extension.
2101 If None, no compression will be applied.
2102 Otherwise, a well-known algorithm name must be supplied (e.g. "gzip").
2103 buffer_size : int, default None
2104 If None or 0, no buffering will happen. Otherwise the size of the
2105 temporary write buffer.
2107 cdef NativeFile stream
2110 source_path = _stringify_path(source)
2114 if isinstance(source, NativeFile):
2116 elif source_path is not None:
2117 stream = OSFile(source_path, 'w')
2118 elif isinstance(source, (Buffer, memoryview)):
2119 stream = FixedSizeBufferWriter(as_buffer(source))
2120 elif (hasattr(source, 'write') and
2121 hasattr(source, 'close') and
2122 hasattr(source, 'closed')):
2123 stream = PythonFile(source, 'w')
2125 raise TypeError("pa.output_stream() called with instance of '{}'"
2126 .format(source.__class__))
2128 if compression == 'detect':
2129 compression = _detect_compression(source_path)
2131 if buffer_size is not None and buffer_size != 0:
2132 stream = BufferedOutputStream(stream, buffer_size)
2134 if compression is not None:
2135 stream = CompressedOutputStream(stream, compression)