]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/pyarrow/io.pxi
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / io.pxi
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 # Cython wrappers for IO interfaces defined in arrow::io and messaging in
19 # arrow::ipc
20
21 from libc.stdlib cimport malloc, free
22
23 import codecs
24 import re
25 import sys
26 import threading
27 import time
28 import warnings
29 from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation
30 from queue import Queue, Empty as QueueEmpty
31
32 from pyarrow.util import _is_path_like, _stringify_path
33
34
35 # 64K
36 DEFAULT_BUFFER_SIZE = 2 ** 16
37
38
39 # To let us get a PyObject* and avoid Cython auto-ref-counting
40 cdef extern from "Python.h":
41 PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
42 char *v, Py_ssize_t len) except NULL
43
44
45 def io_thread_count():
46 """
47 Return the number of threads to use for I/O operations.
48
49 Many operations, such as scanning a dataset, will implicitly make
50 use of this pool. The number of threads is set to a fixed value at
51 startup. It can be modified at runtime by calling
52 :func:`set_io_thread_count()`.
53
54 See Also
55 --------
56 set_io_thread_count : Modify the size of this pool.
57 cpu_count : The analogous function for the CPU thread pool.
58 """
59 return GetIOThreadPoolCapacity()
60
61
62 def set_io_thread_count(int count):
63 """
64 Set the number of threads to use for I/O operations.
65
66 Many operations, such as scanning a dataset, will implicitly make
67 use of this pool.
68
69 Parameters
70 ----------
71 count : int
72 The max number of threads that may be used for I/O.
73 Must be positive.
74
75 See Also
76 --------
77 io_thread_count : Get the size of this pool.
78 set_cpu_count : The analogous function for the CPU thread pool.
79 """
80 if count < 1:
81 raise ValueError("IO thread count must be strictly positive")
82 check_status(SetIOThreadPoolCapacity(count))
83
84
85 cdef class NativeFile(_Weakrefable):
86 """
87 The base class for all Arrow streams.
88
89 Streams are either readable, writable, or both.
90 They optionally support seeking.
91
92 While this class exposes methods to read or write data from Python, the
93 primary intent of using a Arrow stream is to pass it to other Arrow
94 facilities that will make use of it, such as Arrow IPC routines.
95
96 Be aware that there are subtle differences with regular Python files,
97 e.g. destroying a writable Arrow stream without closing it explicitly
98 will not flush any pending data.
99 """
100
101 def __cinit__(self):
102 self.own_file = False
103 self.is_readable = False
104 self.is_writable = False
105 self.is_seekable = False
106
107 def __dealloc__(self):
108 if self.own_file:
109 self.close()
110
111 def __enter__(self):
112 return self
113
114 def __exit__(self, exc_type, exc_value, tb):
115 self.close()
116
117 @property
118 def mode(self):
119 """
120 The file mode. Currently instances of NativeFile may support:
121
122 * rb: binary read
123 * wb: binary write
124 * rb+: binary read and write
125 """
126 # Emulate built-in file modes
127 if self.is_readable and self.is_writable:
128 return 'rb+'
129 elif self.is_readable:
130 return 'rb'
131 elif self.is_writable:
132 return 'wb'
133 else:
134 raise ValueError('File object is malformed, has no mode')
135
136 def readable(self):
137 self._assert_open()
138 return self.is_readable
139
140 def writable(self):
141 self._assert_open()
142 return self.is_writable
143
144 def seekable(self):
145 self._assert_open()
146 return self.is_seekable
147
148 def isatty(self):
149 self._assert_open()
150 return False
151
152 def fileno(self):
153 """
154 NOT IMPLEMENTED
155 """
156 raise UnsupportedOperation()
157
158 @property
159 def closed(self):
160 if self.is_readable:
161 return self.input_stream.get().closed()
162 elif self.is_writable:
163 return self.output_stream.get().closed()
164 else:
165 return True
166
167 def close(self):
168 if not self.closed:
169 with nogil:
170 if self.is_readable:
171 check_status(self.input_stream.get().Close())
172 else:
173 check_status(self.output_stream.get().Close())
174
175 cdef set_random_access_file(self, shared_ptr[CRandomAccessFile] handle):
176 self.input_stream = <shared_ptr[CInputStream]> handle
177 self.random_access = handle
178 self.is_seekable = True
179
180 cdef set_input_stream(self, shared_ptr[CInputStream] handle):
181 self.input_stream = handle
182 self.random_access.reset()
183 self.is_seekable = False
184
185 cdef set_output_stream(self, shared_ptr[COutputStream] handle):
186 self.output_stream = handle
187
188 cdef shared_ptr[CRandomAccessFile] get_random_access_file(self) except *:
189 self._assert_readable()
190 self._assert_seekable()
191 return self.random_access
192
193 cdef shared_ptr[CInputStream] get_input_stream(self) except *:
194 self._assert_readable()
195 return self.input_stream
196
197 cdef shared_ptr[COutputStream] get_output_stream(self) except *:
198 self._assert_writable()
199 return self.output_stream
200
201 def _assert_open(self):
202 if self.closed:
203 raise ValueError("I/O operation on closed file")
204
205 def _assert_readable(self):
206 self._assert_open()
207 if not self.is_readable:
208 # XXX UnsupportedOperation
209 raise IOError("only valid on readable files")
210
211 def _assert_writable(self):
212 self._assert_open()
213 if not self.is_writable:
214 raise IOError("only valid on writable files")
215
216 def _assert_seekable(self):
217 self._assert_open()
218 if not self.is_seekable:
219 raise IOError("only valid on seekable files")
220
221 def size(self):
222 """
223 Return file size
224 """
225 cdef int64_t size
226
227 handle = self.get_random_access_file()
228 with nogil:
229 size = GetResultValue(handle.get().GetSize())
230
231 return size
232
233 def metadata(self):
234 """
235 Return file metadata
236 """
237 cdef:
238 shared_ptr[const CKeyValueMetadata] c_metadata
239
240 handle = self.get_input_stream()
241 with nogil:
242 c_metadata = GetResultValue(handle.get().ReadMetadata())
243
244 metadata = {}
245 if c_metadata.get() != nullptr:
246 for i in range(c_metadata.get().size()):
247 metadata[frombytes(c_metadata.get().key(i))] = \
248 c_metadata.get().value(i)
249 return metadata
250
251 def tell(self):
252 """
253 Return current stream position
254 """
255 cdef int64_t position
256
257 if self.is_readable:
258 rd_handle = self.get_random_access_file()
259 with nogil:
260 position = GetResultValue(rd_handle.get().Tell())
261 else:
262 wr_handle = self.get_output_stream()
263 with nogil:
264 position = GetResultValue(wr_handle.get().Tell())
265
266 return position
267
268 def seek(self, int64_t position, int whence=0):
269 """
270 Change current file stream position
271
272 Parameters
273 ----------
274 position : int
275 Byte offset, interpreted relative to value of whence argument
276 whence : int, default 0
277 Point of reference for seek offset
278
279 Notes
280 -----
281 Values of whence:
282 * 0 -- start of stream (the default); offset should be zero or positive
283 * 1 -- current stream position; offset may be negative
284 * 2 -- end of stream; offset is usually negative
285
286 Returns
287 -------
288 new_position : the new absolute stream position
289 """
290 cdef int64_t offset
291 handle = self.get_random_access_file()
292
293 with nogil:
294 if whence == 0:
295 offset = position
296 elif whence == 1:
297 offset = GetResultValue(handle.get().Tell())
298 offset = offset + position
299 elif whence == 2:
300 offset = GetResultValue(handle.get().GetSize())
301 offset = offset + position
302 else:
303 with gil:
304 raise ValueError("Invalid value of whence: {0}"
305 .format(whence))
306 check_status(handle.get().Seek(offset))
307
308 return self.tell()
309
310 def flush(self):
311 """
312 Flush the stream, if applicable.
313
314 An error is raised if stream is not writable.
315 """
316 self._assert_open()
317 # For IOBase compatibility, flush() on an input stream is a no-op
318 if self.is_writable:
319 handle = self.get_output_stream()
320 with nogil:
321 check_status(handle.get().Flush())
322
323 def write(self, data):
324 """
325 Write byte from any object implementing buffer protocol (bytes,
326 bytearray, ndarray, pyarrow.Buffer)
327
328 Parameters
329 ----------
330 data : bytes-like object or exporter of buffer protocol
331
332 Returns
333 -------
334 nbytes : number of bytes written
335 """
336 self._assert_writable()
337 handle = self.get_output_stream()
338
339 cdef shared_ptr[CBuffer] buf = as_c_buffer(data)
340
341 with nogil:
342 check_status(handle.get().WriteBuffer(buf))
343 return buf.get().size()
344
345 def read(self, nbytes=None):
346 """
347 Read indicated number of bytes from file, or read all remaining bytes
348 if no argument passed
349
350 Parameters
351 ----------
352 nbytes : int, default None
353
354 Returns
355 -------
356 data : bytes
357 """
358 cdef:
359 int64_t c_nbytes
360 int64_t bytes_read = 0
361 PyObject* obj
362
363 if nbytes is None:
364 if not self.is_seekable:
365 # Cannot get file size => read chunkwise
366 bs = 16384
367 chunks = []
368 while True:
369 chunk = self.read(bs)
370 if not chunk:
371 break
372 chunks.append(chunk)
373 return b"".join(chunks)
374
375 c_nbytes = self.size() - self.tell()
376 else:
377 c_nbytes = nbytes
378
379 handle = self.get_input_stream()
380
381 # Allocate empty write space
382 obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
383
384 cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
385 with nogil:
386 bytes_read = GetResultValue(handle.get().Read(c_nbytes, buf))
387
388 if bytes_read < c_nbytes:
389 cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
390
391 return PyObject_to_object(obj)
392
393 def read_at(self, nbytes, offset):
394 """
395 Read indicated number of bytes at offset from the file
396
397 Parameters
398 ----------
399 nbytes : int
400 offset : int
401
402 Returns
403 -------
404 data : bytes
405 """
406 cdef:
407 int64_t c_nbytes
408 int64_t c_offset
409 int64_t bytes_read = 0
410 PyObject* obj
411
412 c_nbytes = nbytes
413
414 c_offset = offset
415
416 handle = self.get_random_access_file()
417
418 # Allocate empty write space
419 obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
420
421 cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
422 with nogil:
423 bytes_read = GetResultValue(handle.get().
424 ReadAt(c_offset, c_nbytes, buf))
425
426 if bytes_read < c_nbytes:
427 cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
428
429 return PyObject_to_object(obj)
430
431 def read1(self, nbytes=None):
432 """Read and return up to n bytes.
433
434 Alias for read, needed to match the IOBase interface."""
435 return self.read(nbytes=None)
436
437 def readall(self):
438 return self.read()
439
440 def readinto(self, b):
441 """
442 Read into the supplied buffer
443
444 Parameters
445 -----------
446 b: any python object supporting buffer interface
447
448 Returns
449 --------
450 number of bytes written
451 """
452
453 cdef:
454 int64_t bytes_read
455 uint8_t* buf
456 Buffer py_buf
457 int64_t buf_len
458
459 handle = self.get_input_stream()
460
461 py_buf = py_buffer(b)
462 buf_len = py_buf.size
463 buf = py_buf.buffer.get().mutable_data()
464
465 with nogil:
466 bytes_read = GetResultValue(handle.get().Read(buf_len, buf))
467
468 return bytes_read
469
470 def readline(self, size=None):
471 """NOT IMPLEMENTED. Read and return a line of bytes from the file.
472
473 If size is specified, read at most size bytes.
474
475 Line terminator is always b"\\n".
476 """
477
478 raise UnsupportedOperation()
479
480 def readlines(self, hint=None):
481 """NOT IMPLEMENTED. Read lines of the file
482
483 Parameters
484 -----------
485
486 hint: int maximum number of bytes read until we stop
487 """
488
489 raise UnsupportedOperation()
490
491 def __iter__(self):
492 self._assert_readable()
493 return self
494
495 def __next__(self):
496 line = self.readline()
497 if not line:
498 raise StopIteration
499 return line
500
501 def read_buffer(self, nbytes=None):
502 cdef:
503 int64_t c_nbytes
504 int64_t bytes_read = 0
505 shared_ptr[CBuffer] output
506
507 handle = self.get_input_stream()
508
509 if nbytes is None:
510 if not self.is_seekable:
511 # Cannot get file size => read chunkwise
512 return py_buffer(self.read())
513 c_nbytes = self.size() - self.tell()
514 else:
515 c_nbytes = nbytes
516
517 with nogil:
518 output = GetResultValue(handle.get().ReadBuffer(c_nbytes))
519
520 return pyarrow_wrap_buffer(output)
521
522 def truncate(self):
523 """
524 NOT IMPLEMENTED
525 """
526 raise UnsupportedOperation()
527
528 def writelines(self, lines):
529 self._assert_writable()
530
531 for line in lines:
532 self.write(line)
533
534 def download(self, stream_or_path, buffer_size=None):
535 """
536 Read file completely to local path (rather than reading completely into
537 memory). First seeks to the beginning of the file.
538 """
539 cdef:
540 int64_t bytes_read = 0
541 uint8_t* buf
542
543 handle = self.get_input_stream()
544
545 buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
546
547 write_queue = Queue(50)
548
549 if not hasattr(stream_or_path, 'read'):
550 stream = open(stream_or_path, 'wb')
551
552 def cleanup():
553 stream.close()
554 else:
555 stream = stream_or_path
556
557 def cleanup():
558 pass
559
560 done = False
561 exc_info = None
562
563 def bg_write():
564 try:
565 while not done or write_queue.qsize() > 0:
566 try:
567 buf = write_queue.get(timeout=0.01)
568 except QueueEmpty:
569 continue
570 stream.write(buf)
571 except Exception as e:
572 exc_info = sys.exc_info()
573 finally:
574 cleanup()
575
576 self.seek(0)
577
578 writer_thread = threading.Thread(target=bg_write)
579
580 # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
581 # the passed buffer, so it's hard for us to avoid doubling the memory
582 buf = <uint8_t*> malloc(buffer_size)
583 if buf == NULL:
584 raise MemoryError("Failed to allocate {0} bytes"
585 .format(buffer_size))
586
587 writer_thread.start()
588
589 cdef int64_t total_bytes = 0
590 cdef int32_t c_buffer_size = buffer_size
591
592 try:
593 while True:
594 with nogil:
595 bytes_read = GetResultValue(
596 handle.get().Read(c_buffer_size, buf))
597
598 total_bytes += bytes_read
599
600 # EOF
601 if bytes_read == 0:
602 break
603
604 pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
605 bytes_read)
606
607 if writer_thread.is_alive():
608 while write_queue.full():
609 time.sleep(0.01)
610 else:
611 break
612
613 write_queue.put_nowait(pybuf)
614 finally:
615 free(buf)
616 done = True
617
618 writer_thread.join()
619 if exc_info is not None:
620 raise exc_info[0], exc_info[1], exc_info[2]
621
622 def upload(self, stream, buffer_size=None):
623 """
624 Pipe file-like object to file
625 """
626 write_queue = Queue(50)
627 self._assert_writable()
628
629 buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
630
631 done = False
632 exc_info = None
633
634 def bg_write():
635 try:
636 while not done or write_queue.qsize() > 0:
637 try:
638 buf = write_queue.get(timeout=0.01)
639 except QueueEmpty:
640 continue
641
642 self.write(buf)
643
644 except Exception as e:
645 exc_info = sys.exc_info()
646
647 writer_thread = threading.Thread(target=bg_write)
648 writer_thread.start()
649
650 try:
651 while True:
652 buf = stream.read(buffer_size)
653 if not buf:
654 break
655
656 if writer_thread.is_alive():
657 while write_queue.full():
658 time.sleep(0.01)
659 else:
660 break
661
662 write_queue.put_nowait(buf)
663 finally:
664 done = True
665
666 writer_thread.join()
667 if exc_info is not None:
668 raise exc_info[0], exc_info[1], exc_info[2]
669
670 BufferedIOBase.register(NativeFile)
671
672 # ----------------------------------------------------------------------
673 # Python file-like objects
674
675
676 cdef class PythonFile(NativeFile):
677 """
678 A stream backed by a Python file object.
679
680 This class allows using Python file objects with arbitrary Arrow
681 functions, including functions written in another language than Python.
682
683 As a downside, there is a non-zero redirection cost in translating
684 Arrow stream calls to Python method calls. Furthermore, Python's
685 Global Interpreter Lock may limit parallelism in some situations.
686 """
687 cdef:
688 object handle
689
690 def __cinit__(self, handle, mode=None):
691 self.handle = handle
692
693 if mode is None:
694 try:
695 inferred_mode = handle.mode
696 except AttributeError:
697 # Not all file-like objects have a mode attribute
698 # (e.g. BytesIO)
699 try:
700 inferred_mode = 'w' if handle.writable() else 'r'
701 except AttributeError:
702 raise ValueError("could not infer open mode for file-like "
703 "object %r, please pass it explicitly"
704 % (handle,))
705 else:
706 inferred_mode = mode
707
708 if inferred_mode.startswith('w'):
709 kind = 'w'
710 elif inferred_mode.startswith('r'):
711 kind = 'r'
712 else:
713 raise ValueError('Invalid file mode: {0}'.format(mode))
714
715 # If mode was given, check it matches the given file
716 if mode is not None:
717 if isinstance(handle, IOBase):
718 # Python 3 IO object
719 if kind == 'r':
720 if not handle.readable():
721 raise TypeError("readable file expected")
722 else:
723 if not handle.writable():
724 raise TypeError("writable file expected")
725 # (other duck-typed file-like objects are possible)
726
727 # If possible, check the file is a binary file
728 if isinstance(handle, TextIOBase):
729 raise TypeError("binary file expected, got text file")
730
731 if kind == 'r':
732 self.set_random_access_file(
733 shared_ptr[CRandomAccessFile](new PyReadableFile(handle)))
734 self.is_readable = True
735 else:
736 self.set_output_stream(
737 shared_ptr[COutputStream](new PyOutputStream(handle)))
738 self.is_writable = True
739
740 def truncate(self, pos=None):
741 self.handle.truncate(pos)
742
743 def readline(self, size=None):
744 return self.handle.readline(size)
745
746 def readlines(self, hint=None):
747 return self.handle.readlines(hint)
748
749
750 cdef class MemoryMappedFile(NativeFile):
751 """
752 A stream that represents a memory-mapped file.
753
754 Supports 'r', 'r+', 'w' modes.
755 """
756 cdef:
757 shared_ptr[CMemoryMappedFile] handle
758 object path
759
760 @staticmethod
761 def create(path, size):
762 """
763 Create a MemoryMappedFile
764
765 Parameters
766 ----------
767 path : str
768 Where to create the file.
769 size : int
770 Size of the memory mapped file.
771 """
772 cdef:
773 shared_ptr[CMemoryMappedFile] handle
774 c_string c_path = encode_file_path(path)
775 int64_t c_size = size
776
777 with nogil:
778 handle = GetResultValue(CMemoryMappedFile.Create(c_path, c_size))
779
780 cdef MemoryMappedFile result = MemoryMappedFile()
781 result.path = path
782 result.is_readable = True
783 result.is_writable = True
784 result.set_output_stream(<shared_ptr[COutputStream]> handle)
785 result.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle)
786 result.handle = handle
787
788 return result
789
790 def _open(self, path, mode='r'):
791 self.path = path
792
793 cdef:
794 FileMode c_mode
795 shared_ptr[CMemoryMappedFile] handle
796 c_string c_path = encode_file_path(path)
797
798 if mode in ('r', 'rb'):
799 c_mode = FileMode_READ
800 self.is_readable = True
801 elif mode in ('w', 'wb'):
802 c_mode = FileMode_WRITE
803 self.is_writable = True
804 elif mode in ('r+', 'r+b', 'rb+'):
805 c_mode = FileMode_READWRITE
806 self.is_readable = True
807 self.is_writable = True
808 else:
809 raise ValueError('Invalid file mode: {0}'.format(mode))
810
811 with nogil:
812 handle = GetResultValue(CMemoryMappedFile.Open(c_path, c_mode))
813
814 self.set_output_stream(<shared_ptr[COutputStream]> handle)
815 self.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle)
816 self.handle = handle
817
818 def resize(self, new_size):
819 """
820 Resize the map and underlying file.
821
822 Parameters
823 ----------
824 new_size : new size in bytes
825 """
826 check_status(self.handle.get().Resize(new_size))
827
828 def fileno(self):
829 self._assert_open()
830 return self.handle.get().file_descriptor()
831
832
833 def memory_map(path, mode='r'):
834 """
835 Open memory map at file path. Size of the memory map cannot change.
836
837 Parameters
838 ----------
839 path : str
840 mode : {'r', 'r+', 'w'}, default 'r'
841 Whether the file is opened for reading ('r+'), writing ('w')
842 or both ('r+').
843
844 Returns
845 -------
846 mmap : MemoryMappedFile
847 """
848 _check_is_file(path)
849
850 cdef MemoryMappedFile mmap = MemoryMappedFile()
851 mmap._open(path, mode)
852 return mmap
853
854
855 cdef _check_is_file(path):
856 if os.path.isdir(path):
857 raise IOError("Expected file path, but {0} is a directory"
858 .format(path))
859
860
861 def create_memory_map(path, size):
862 """
863 Create a file of the given size and memory-map it.
864
865 Parameters
866 ----------
867 path : str
868 The file path to create, on the local filesystem.
869 size : int
870 The file size to create.
871
872 Returns
873 -------
874 mmap : MemoryMappedFile
875 """
876 return MemoryMappedFile.create(path, size)
877
878
879 cdef class OSFile(NativeFile):
880 """
881 A stream backed by a regular file descriptor.
882 """
883 cdef:
884 object path
885
886 def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
887 _check_is_file(path)
888 self.path = path
889
890 cdef:
891 FileMode c_mode
892 shared_ptr[Readable] handle
893 c_string c_path = encode_file_path(path)
894
895 if mode in ('r', 'rb'):
896 self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
897 elif mode in ('w', 'wb'):
898 self._open_writable(c_path)
899 else:
900 raise ValueError('Invalid file mode: {0}'.format(mode))
901
902 cdef _open_readable(self, c_string path, CMemoryPool* pool):
903 cdef shared_ptr[ReadableFile] handle
904
905 with nogil:
906 handle = GetResultValue(ReadableFile.Open(path, pool))
907
908 self.is_readable = True
909 self.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle)
910
911 cdef _open_writable(self, c_string path):
912 with nogil:
913 self.output_stream = GetResultValue(FileOutputStream.Open(path))
914 self.is_writable = True
915
916 def fileno(self):
917 self._assert_open()
918 return self.handle.file_descriptor()
919
920
921 cdef class FixedSizeBufferWriter(NativeFile):
922 """
923 A stream writing to a Arrow buffer.
924 """
925
926 def __cinit__(self, Buffer buffer):
927 self.output_stream.reset(new CFixedSizeBufferWriter(buffer.buffer))
928 self.is_writable = True
929
930 def set_memcopy_threads(self, int num_threads):
931 cdef CFixedSizeBufferWriter* writer = \
932 <CFixedSizeBufferWriter*> self.output_stream.get()
933 writer.set_memcopy_threads(num_threads)
934
935 def set_memcopy_blocksize(self, int64_t blocksize):
936 cdef CFixedSizeBufferWriter* writer = \
937 <CFixedSizeBufferWriter*> self.output_stream.get()
938 writer.set_memcopy_blocksize(blocksize)
939
940 def set_memcopy_threshold(self, int64_t threshold):
941 cdef CFixedSizeBufferWriter* writer = \
942 <CFixedSizeBufferWriter*> self.output_stream.get()
943 writer.set_memcopy_threshold(threshold)
944
945
946 # ----------------------------------------------------------------------
947 # Arrow buffers
948
949
950 cdef class Buffer(_Weakrefable):
951 """
952 The base class for all Arrow buffers.
953
954 A buffer represents a contiguous memory area. Many buffers will own
955 their memory, though not all of them do.
956 """
957
958 def __cinit__(self):
959 pass
960
961 def __init__(self):
962 raise TypeError("Do not call Buffer's constructor directly, use "
963 "`pyarrow.py_buffer` function instead.")
964
965 cdef void init(self, const shared_ptr[CBuffer]& buffer):
966 self.buffer = buffer
967 self.shape[0] = self.size
968 self.strides[0] = <Py_ssize_t>(1)
969
970 def __len__(self):
971 return self.size
972
973 @property
974 def size(self):
975 """
976 The buffer size in bytes.
977 """
978 return self.buffer.get().size()
979
980 @property
981 def address(self):
982 """
983 The buffer's address, as an integer.
984
985 The returned address may point to CPU or device memory.
986 Use `is_cpu()` to disambiguate.
987 """
988 return self.buffer.get().address()
989
990 def hex(self):
991 """
992 Compute hexadecimal representation of the buffer.
993
994 Returns
995 -------
996 : bytes
997 """
998 return self.buffer.get().ToHexString()
999
1000 @property
1001 def is_mutable(self):
1002 """
1003 Whether the buffer is mutable.
1004 """
1005 return self.buffer.get().is_mutable()
1006
1007 @property
1008 def is_cpu(self):
1009 """
1010 Whether the buffer is CPU-accessible.
1011 """
1012 return self.buffer.get().is_cpu()
1013
1014 @property
1015 def parent(self):
1016 cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
1017
1018 if parent_buf.get() == NULL:
1019 return None
1020 else:
1021 return pyarrow_wrap_buffer(parent_buf)
1022
1023 def __getitem__(self, key):
1024 if PySlice_Check(key):
1025 if (key.step or 1) != 1:
1026 raise IndexError('only slices with step 1 supported')
1027 return _normalize_slice(self, key)
1028
1029 return self.getitem(_normalize_index(key, self.size))
1030
1031 cdef getitem(self, int64_t i):
1032 return self.buffer.get().data()[i]
1033
1034 def slice(self, offset=0, length=None):
1035 """
1036 Slice this buffer. Memory is not copied.
1037
1038 You can also use the Python slice notation ``buffer[start:stop]``.
1039
1040 Parameters
1041 ----------
1042 offset : int, default 0
1043 Offset from start of buffer to slice.
1044 length : int, default None
1045 Length of slice (default is until end of Buffer starting from
1046 offset).
1047
1048 Returns
1049 -------
1050 sliced : Buffer
1051 A logical view over this buffer.
1052 """
1053 cdef shared_ptr[CBuffer] result
1054
1055 if offset < 0:
1056 raise IndexError('Offset must be non-negative')
1057
1058 if length is None:
1059 result = SliceBuffer(self.buffer, offset)
1060 else:
1061 result = SliceBuffer(self.buffer, offset, max(length, 0))
1062
1063 return pyarrow_wrap_buffer(result)
1064
1065 def equals(self, Buffer other):
1066 """
1067 Determine if two buffers contain exactly the same data.
1068
1069 Parameters
1070 ----------
1071 other : Buffer
1072
1073 Returns
1074 -------
1075 are_equal : True if buffer contents and size are equal
1076 """
1077 cdef c_bool result = False
1078 with nogil:
1079 result = self.buffer.get().Equals(deref(other.buffer.get()))
1080 return result
1081
1082 def __eq__(self, other):
1083 if isinstance(other, Buffer):
1084 return self.equals(other)
1085 else:
1086 return self.equals(py_buffer(other))
1087
1088 def __reduce_ex__(self, protocol):
1089 if protocol >= 5:
1090 return py_buffer, (builtin_pickle.PickleBuffer(self),)
1091 else:
1092 return py_buffer, (self.to_pybytes(),)
1093
1094 def to_pybytes(self):
1095 """
1096 Return this buffer as a Python bytes object. Memory is copied.
1097 """
1098 return cp.PyBytes_FromStringAndSize(
1099 <const char*>self.buffer.get().data(),
1100 self.buffer.get().size())
1101
1102 def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
1103 if self.buffer.get().is_mutable():
1104 buffer.readonly = 0
1105 else:
1106 if flags & cp.PyBUF_WRITABLE:
1107 raise BufferError("Writable buffer requested but Arrow "
1108 "buffer was not mutable")
1109 buffer.readonly = 1
1110 buffer.buf = <char *>self.buffer.get().data()
1111 buffer.format = 'b'
1112 buffer.internal = NULL
1113 buffer.itemsize = 1
1114 buffer.len = self.size
1115 buffer.ndim = 1
1116 buffer.obj = self
1117 buffer.shape = self.shape
1118 buffer.strides = self.strides
1119 buffer.suboffsets = NULL
1120
1121 def __getsegcount__(self, Py_ssize_t *len_out):
1122 if len_out != NULL:
1123 len_out[0] = <Py_ssize_t>self.size
1124 return 1
1125
1126 def __getreadbuffer__(self, Py_ssize_t idx, void **p):
1127 if idx != 0:
1128 raise SystemError("accessing non-existent buffer segment")
1129 if p != NULL:
1130 p[0] = <void*> self.buffer.get().data()
1131 return self.size
1132
1133 def __getwritebuffer__(self, Py_ssize_t idx, void **p):
1134 if not self.buffer.get().is_mutable():
1135 raise SystemError("trying to write an immutable buffer")
1136 if idx != 0:
1137 raise SystemError("accessing non-existent buffer segment")
1138 if p != NULL:
1139 p[0] = <void*> self.buffer.get().data()
1140 return self.size
1141
1142
1143 cdef class ResizableBuffer(Buffer):
1144 """
1145 A base class for buffers that can be resized.
1146 """
1147
1148 cdef void init_rz(self, const shared_ptr[CResizableBuffer]& buffer):
1149 self.init(<shared_ptr[CBuffer]> buffer)
1150
1151 def resize(self, int64_t new_size, shrink_to_fit=False):
1152 """
1153 Resize buffer to indicated size.
1154
1155 Parameters
1156 ----------
1157 new_size : int
1158 New size of buffer (padding may be added internally).
1159 shrink_to_fit : bool, default False
1160 If this is true, the buffer is shrunk when new_size is less
1161 than the current size.
1162 If this is false, the buffer is never shrunk.
1163 """
1164 cdef c_bool c_shrink_to_fit = shrink_to_fit
1165 with nogil:
1166 check_status((<CResizableBuffer*> self.buffer.get())
1167 .Resize(new_size, c_shrink_to_fit))
1168
1169
1170 cdef shared_ptr[CResizableBuffer] _allocate_buffer(CMemoryPool* pool) except *:
1171 with nogil:
1172 return to_shared(GetResultValue(AllocateResizableBuffer(0, pool)))
1173
1174
1175 def allocate_buffer(int64_t size, MemoryPool memory_pool=None,
1176 resizable=False):
1177 """
1178 Allocate a mutable buffer.
1179
1180 Parameters
1181 ----------
1182 size : int
1183 Number of bytes to allocate (plus internal padding)
1184 memory_pool : MemoryPool, optional
1185 The pool to allocate memory from.
1186 If not given, the default memory pool is used.
1187 resizable : bool, default False
1188 If true, the returned buffer is resizable.
1189
1190 Returns
1191 -------
1192 buffer : Buffer or ResizableBuffer
1193 """
1194 cdef:
1195 CMemoryPool* cpool = maybe_unbox_memory_pool(memory_pool)
1196 shared_ptr[CResizableBuffer] c_rz_buffer
1197 shared_ptr[CBuffer] c_buffer
1198
1199 if resizable:
1200 with nogil:
1201 c_rz_buffer = to_shared(GetResultValue(
1202 AllocateResizableBuffer(size, cpool)))
1203 return pyarrow_wrap_resizable_buffer(c_rz_buffer)
1204 else:
1205 with nogil:
1206 c_buffer = to_shared(GetResultValue(AllocateBuffer(size, cpool)))
1207 return pyarrow_wrap_buffer(c_buffer)
1208
1209
1210 cdef class BufferOutputStream(NativeFile):
1211
1212 cdef:
1213 shared_ptr[CResizableBuffer] buffer
1214
1215 def __cinit__(self, MemoryPool memory_pool=None):
1216 self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
1217 self.output_stream.reset(new CBufferOutputStream(
1218 <shared_ptr[CResizableBuffer]> self.buffer))
1219 self.is_writable = True
1220
1221 def getvalue(self):
1222 """
1223 Finalize output stream and return result as pyarrow.Buffer.
1224
1225 Returns
1226 -------
1227 value : Buffer
1228 """
1229 with nogil:
1230 check_status(self.output_stream.get().Close())
1231 return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
1232
1233
1234 cdef class MockOutputStream(NativeFile):
1235
1236 def __cinit__(self):
1237 self.output_stream.reset(new CMockOutputStream())
1238 self.is_writable = True
1239
1240 def size(self):
1241 handle = <CMockOutputStream*> self.output_stream.get()
1242 return handle.GetExtentBytesWritten()
1243
1244
1245 cdef class BufferReader(NativeFile):
1246 """
1247 Zero-copy reader from objects convertible to Arrow buffer.
1248
1249 Parameters
1250 ----------
1251 obj : Python bytes or pyarrow.Buffer
1252 """
1253 cdef:
1254 Buffer buffer
1255
1256 def __cinit__(self, object obj):
1257 self.buffer = as_buffer(obj)
1258 self.set_random_access_file(shared_ptr[CRandomAccessFile](
1259 new CBufferReader(self.buffer.buffer)))
1260 self.is_readable = True
1261
1262
1263 cdef class CompressedInputStream(NativeFile):
1264 """
1265 An input stream wrapper which decompresses data on the fly.
1266
1267 Parameters
1268 ----------
1269 stream : string, path, pa.NativeFile, or file-like object
1270 Input stream object to wrap with the compression.
1271 compression : str
1272 The compression type ("bz2", "brotli", "gzip", "lz4" or "zstd").
1273 """
1274
1275 def __init__(self, object stream, str compression not None):
1276 cdef:
1277 NativeFile nf
1278 Codec codec = Codec(compression)
1279 shared_ptr[CInputStream] c_reader
1280 shared_ptr[CCompressedInputStream] compressed_stream
1281 nf = get_native_file(stream, False)
1282 c_reader = nf.get_input_stream()
1283 compressed_stream = GetResultValue(
1284 CCompressedInputStream.Make(codec.unwrap(), c_reader)
1285 )
1286 self.set_input_stream(<shared_ptr[CInputStream]> compressed_stream)
1287 self.is_readable = True
1288
1289
1290 cdef class CompressedOutputStream(NativeFile):
1291 """
1292 An output stream wrapper which compresses data on the fly.
1293
1294 Parameters
1295 ----------
1296 stream : string, path, pa.NativeFile, or file-like object
1297 Input stream object to wrap with the compression.
1298 compression : str
1299 The compression type ("bz2", "brotli", "gzip", "lz4" or "zstd").
1300 """
1301
1302 def __init__(self, object stream, str compression not None):
1303 cdef:
1304 Codec codec = Codec(compression)
1305 shared_ptr[COutputStream] c_writer
1306 shared_ptr[CCompressedOutputStream] compressed_stream
1307 get_writer(stream, &c_writer)
1308 compressed_stream = GetResultValue(
1309 CCompressedOutputStream.Make(codec.unwrap(), c_writer)
1310 )
1311 self.set_output_stream(<shared_ptr[COutputStream]> compressed_stream)
1312 self.is_writable = True
1313
1314
1315 ctypedef CBufferedInputStream* _CBufferedInputStreamPtr
1316 ctypedef CBufferedOutputStream* _CBufferedOutputStreamPtr
1317 ctypedef CRandomAccessFile* _RandomAccessFilePtr
1318
1319
1320 cdef class BufferedInputStream(NativeFile):
1321 """
1322 An input stream that performs buffered reads from
1323 an unbuffered input stream, which can mitigate the overhead
1324 of many small reads in some cases.
1325
1326 Parameters
1327 ----------
1328 stream : NativeFile
1329 The input stream to wrap with the buffer
1330 buffer_size : int
1331 Size of the temporary read buffer.
1332 memory_pool : MemoryPool
1333 The memory pool used to allocate the buffer.
1334 """
1335
1336 def __init__(self, NativeFile stream, int buffer_size,
1337 MemoryPool memory_pool=None):
1338 cdef shared_ptr[CBufferedInputStream] buffered_stream
1339
1340 if buffer_size <= 0:
1341 raise ValueError('Buffer size must be larger than zero')
1342 buffered_stream = GetResultValue(CBufferedInputStream.Create(
1343 buffer_size, maybe_unbox_memory_pool(memory_pool),
1344 stream.get_input_stream()))
1345
1346 self.set_input_stream(<shared_ptr[CInputStream]> buffered_stream)
1347 self.is_readable = True
1348
1349 def detach(self):
1350 """
1351 Release the raw InputStream.
1352 Further operations on this stream are invalid.
1353
1354 Returns
1355 -------
1356 raw : NativeFile
1357 The underlying raw input stream
1358 """
1359 cdef:
1360 shared_ptr[CInputStream] c_raw
1361 _CBufferedInputStreamPtr buffered
1362 NativeFile raw
1363
1364 buffered = dynamic_cast[_CBufferedInputStreamPtr](
1365 self.input_stream.get())
1366 assert buffered != nullptr
1367
1368 with nogil:
1369 c_raw = GetResultValue(buffered.Detach())
1370
1371 raw = NativeFile()
1372 raw.is_readable = True
1373 # Find out whether the raw stream is a RandomAccessFile
1374 # or a mere InputStream. This helps us support seek() etc.
1375 # selectively.
1376 if dynamic_cast[_RandomAccessFilePtr](c_raw.get()) != nullptr:
1377 raw.set_random_access_file(
1378 static_pointer_cast[CRandomAccessFile, CInputStream](c_raw))
1379 else:
1380 raw.set_input_stream(c_raw)
1381 return raw
1382
1383
1384 cdef class BufferedOutputStream(NativeFile):
1385 """
1386 An output stream that performs buffered reads from
1387 an unbuffered output stream, which can mitigate the overhead
1388 of many small writes in some cases.
1389
1390 Parameters
1391 ----------
1392 stream : NativeFile
1393 The writable output stream to wrap with the buffer
1394 buffer_size : int
1395 Size of the buffer that should be added.
1396 memory_pool : MemoryPool
1397 The memory pool used to allocate the buffer.
1398 """
1399
1400 def __init__(self, NativeFile stream, int buffer_size,
1401 MemoryPool memory_pool=None):
1402 cdef shared_ptr[CBufferedOutputStream] buffered_stream
1403
1404 if buffer_size <= 0:
1405 raise ValueError('Buffer size must be larger than zero')
1406 buffered_stream = GetResultValue(CBufferedOutputStream.Create(
1407 buffer_size, maybe_unbox_memory_pool(memory_pool),
1408 stream.get_output_stream()))
1409
1410 self.set_output_stream(<shared_ptr[COutputStream]> buffered_stream)
1411 self.is_writable = True
1412
1413 def detach(self):
1414 """
1415 Flush any buffered writes and release the raw OutputStream.
1416 Further operations on this stream are invalid.
1417
1418 Returns
1419 -------
1420 raw : NativeFile
1421 The underlying raw output stream.
1422 """
1423 cdef:
1424 shared_ptr[COutputStream] c_raw
1425 _CBufferedOutputStreamPtr buffered
1426 NativeFile raw
1427
1428 buffered = dynamic_cast[_CBufferedOutputStreamPtr](
1429 self.output_stream.get())
1430 assert buffered != nullptr
1431
1432 with nogil:
1433 c_raw = GetResultValue(buffered.Detach())
1434
1435 raw = NativeFile()
1436 raw.is_writable = True
1437 raw.set_output_stream(c_raw)
1438 return raw
1439
1440
1441 cdef void _cb_transform(transform_func, const shared_ptr[CBuffer]& src,
1442 shared_ptr[CBuffer]* dest) except *:
1443 py_dest = transform_func(pyarrow_wrap_buffer(src))
1444 dest[0] = pyarrow_unwrap_buffer(py_buffer(py_dest))
1445
1446
1447 cdef class TransformInputStream(NativeFile):
1448 """
1449 Transform an input stream.
1450
1451 Parameters
1452 ----------
1453 stream : NativeFile
1454 The stream to transform.
1455 transform_func : callable
1456 The transformation to apply.
1457 """
1458
1459 def __init__(self, NativeFile stream, transform_func):
1460 self.set_input_stream(TransformInputStream.make_native(
1461 stream.get_input_stream(), transform_func))
1462 self.is_readable = True
1463
1464 @staticmethod
1465 cdef shared_ptr[CInputStream] make_native(
1466 shared_ptr[CInputStream] stream, transform_func) except *:
1467 cdef:
1468 shared_ptr[CInputStream] transform_stream
1469 CTransformInputStreamVTable vtable
1470
1471 vtable.transform = _cb_transform
1472 return MakeTransformInputStream(stream, move(vtable),
1473 transform_func)
1474
1475
1476 class Transcoder:
1477
1478 def __init__(self, decoder, encoder):
1479 self._decoder = decoder
1480 self._encoder = encoder
1481
1482 def __call__(self, buf):
1483 final = len(buf) == 0
1484 return self._encoder.encode(self._decoder.decode(buf, final), final)
1485
1486
1487 def transcoding_input_stream(stream, src_encoding, dest_encoding):
1488 """
1489 Add a transcoding transformation to the stream.
1490 Incoming data will be decoded according to ``src_encoding`` and
1491 then re-encoded according to ``dest_encoding``.
1492
1493 Parameters
1494 ----------
1495 stream : NativeFile
1496 The stream to which the transformation should be applied.
1497 src_encoding : str
1498 The codec to use when reading data data.
1499 dest_encoding : str
1500 The codec to use for emitted data.
1501 """
1502 src_codec = codecs.lookup(src_encoding)
1503 dest_codec = codecs.lookup(dest_encoding)
1504 if src_codec.name == dest_codec.name:
1505 # Avoid losing performance on no-op transcoding
1506 # (encoding errors won't be detected)
1507 return stream
1508 return TransformInputStream(stream,
1509 Transcoder(src_codec.incrementaldecoder(),
1510 dest_codec.incrementalencoder()))
1511
1512
1513 cdef shared_ptr[CInputStream] native_transcoding_input_stream(
1514 shared_ptr[CInputStream] stream, src_encoding,
1515 dest_encoding) except *:
1516 src_codec = codecs.lookup(src_encoding)
1517 dest_codec = codecs.lookup(dest_encoding)
1518 if src_codec.name == dest_codec.name:
1519 # Avoid losing performance on no-op transcoding
1520 # (encoding errors won't be detected)
1521 return stream
1522 return TransformInputStream.make_native(
1523 stream, Transcoder(src_codec.incrementaldecoder(),
1524 dest_codec.incrementalencoder()))
1525
1526
1527 def py_buffer(object obj):
1528 """
1529 Construct an Arrow buffer from a Python bytes-like or buffer-like object
1530
1531 Parameters
1532 ----------
1533 obj : object
1534 the object from which the buffer should be constructed.
1535 """
1536 cdef shared_ptr[CBuffer] buf
1537 buf = GetResultValue(PyBuffer.FromPyObject(obj))
1538 return pyarrow_wrap_buffer(buf)
1539
1540
1541 def foreign_buffer(address, size, base=None):
1542 """
1543 Construct an Arrow buffer with the given *address* and *size*.
1544
1545 The buffer will be optionally backed by the Python *base* object, if given.
1546 The *base* object will be kept alive as long as this buffer is alive,
1547 including across language boundaries (for example if the buffer is
1548 referenced by C++ code).
1549
1550 Parameters
1551 ----------
1552 address : int
1553 The starting address of the buffer. The address can
1554 refer to both device or host memory but it must be
1555 accessible from device after mapping it with
1556 `get_device_address` method.
1557 size : int
1558 The size of device buffer in bytes.
1559 base : {None, object}
1560 Object that owns the referenced memory.
1561 """
1562 cdef:
1563 intptr_t c_addr = address
1564 int64_t c_size = size
1565 shared_ptr[CBuffer] buf
1566
1567 check_status(PyForeignBuffer.Make(<uint8_t*> c_addr, c_size,
1568 base, &buf))
1569 return pyarrow_wrap_buffer(buf)
1570
1571
1572 def as_buffer(object o):
1573 if isinstance(o, Buffer):
1574 return o
1575 return py_buffer(o)
1576
1577
1578 cdef shared_ptr[CBuffer] as_c_buffer(object o) except *:
1579 cdef shared_ptr[CBuffer] buf
1580 if isinstance(o, Buffer):
1581 buf = (<Buffer> o).buffer
1582 if buf == nullptr:
1583 raise ValueError("got null buffer")
1584 else:
1585 buf = GetResultValue(PyBuffer.FromPyObject(o))
1586 return buf
1587
1588
1589 cdef NativeFile get_native_file(object source, c_bool use_memory_map):
1590 try:
1591 source_path = _stringify_path(source)
1592 except TypeError:
1593 if isinstance(source, Buffer):
1594 source = BufferReader(source)
1595 elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
1596 # Optimistically hope this is file-like
1597 source = PythonFile(source, mode='r')
1598 else:
1599 if use_memory_map:
1600 source = memory_map(source_path, mode='r')
1601 else:
1602 source = OSFile(source_path, mode='r')
1603
1604 return source
1605
1606
1607 cdef get_reader(object source, c_bool use_memory_map,
1608 shared_ptr[CRandomAccessFile]* reader):
1609 cdef NativeFile nf
1610
1611 nf = get_native_file(source, use_memory_map)
1612 reader[0] = nf.get_random_access_file()
1613
1614
1615 cdef get_input_stream(object source, c_bool use_memory_map,
1616 shared_ptr[CInputStream]* out):
1617 """
1618 Like get_reader(), but can automatically decompress, and returns
1619 an InputStream.
1620 """
1621 cdef:
1622 NativeFile nf
1623 Codec codec
1624 shared_ptr[CInputStream] input_stream
1625
1626 try:
1627 codec = Codec.detect(source)
1628 except TypeError:
1629 codec = None
1630
1631 nf = get_native_file(source, use_memory_map)
1632 input_stream = nf.get_input_stream()
1633
1634 # codec is None if compression can't be detected
1635 if codec is not None:
1636 input_stream = <shared_ptr[CInputStream]> GetResultValue(
1637 CCompressedInputStream.Make(codec.unwrap(), input_stream)
1638 )
1639
1640 out[0] = input_stream
1641
1642
1643 cdef get_writer(object source, shared_ptr[COutputStream]* writer):
1644 cdef NativeFile nf
1645
1646 try:
1647 source_path = _stringify_path(source)
1648 except TypeError:
1649 if not isinstance(source, NativeFile) and hasattr(source, 'write'):
1650 # Optimistically hope this is file-like
1651 source = PythonFile(source, mode='w')
1652 else:
1653 source = OSFile(source_path, mode='w')
1654
1655 if isinstance(source, NativeFile):
1656 nf = source
1657 writer[0] = nf.get_output_stream()
1658 else:
1659 raise TypeError('Unable to read from object of type: {0}'
1660 .format(type(source)))
1661
1662
1663 # ---------------------------------------------------------------------
1664
1665
1666 def _detect_compression(path):
1667 if isinstance(path, str):
1668 if path.endswith('.bz2'):
1669 return 'bz2'
1670 elif path.endswith('.gz'):
1671 return 'gzip'
1672 elif path.endswith('.lz4'):
1673 return 'lz4'
1674 elif path.endswith('.zst'):
1675 return 'zstd'
1676
1677
1678 cdef CCompressionType _ensure_compression(str name) except *:
1679 uppercase = name.upper()
1680 if uppercase == 'BZ2':
1681 return CCompressionType_BZ2
1682 elif uppercase == 'GZIP':
1683 return CCompressionType_GZIP
1684 elif uppercase == 'BROTLI':
1685 return CCompressionType_BROTLI
1686 elif uppercase == 'LZ4' or uppercase == 'LZ4_FRAME':
1687 return CCompressionType_LZ4_FRAME
1688 elif uppercase == 'LZ4_RAW':
1689 return CCompressionType_LZ4
1690 elif uppercase == 'SNAPPY':
1691 return CCompressionType_SNAPPY
1692 elif uppercase == 'ZSTD':
1693 return CCompressionType_ZSTD
1694 else:
1695 raise ValueError('Invalid value for compression: {!r}'.format(name))
1696
1697
1698 cdef class Codec(_Weakrefable):
1699 """
1700 Compression codec.
1701
1702 Parameters
1703 ----------
1704 compression : str
1705 Type of compression codec to initialize, valid values are: 'gzip',
1706 'bz2', 'brotli', 'lz4' (or 'lz4_frame'), 'lz4_raw', 'zstd' and
1707 'snappy'.
1708 compression_level : int, None
1709 Optional parameter specifying how aggressively to compress. The
1710 possible ranges and effect of this parameter depend on the specific
1711 codec chosen. Higher values compress more but typically use more
1712 resources (CPU/RAM). Some codecs support negative values.
1713
1714 gzip
1715 The compression_level maps to the memlevel parameter of
1716 deflateInit2. Higher levels use more RAM but are faster
1717 and should have higher compression ratios.
1718
1719 bz2
1720 The compression level maps to the blockSize100k parameter of
1721 the BZ2_bzCompressInit function. Higher levels use more RAM
1722 but are faster and should have higher compression ratios.
1723
1724 brotli
1725 The compression level maps to the BROTLI_PARAM_QUALITY
1726 parameter. Higher values are slower and should have higher
1727 compression ratios.
1728
1729 lz4/lz4_frame/lz4_raw
1730 The compression level parameter is not supported and must
1731 be None
1732
1733 zstd
1734 The compression level maps to the compressionLevel parameter
1735 of ZSTD_initCStream. Negative values are supported. Higher
1736 values are slower and should have higher compression ratios.
1737
1738 snappy
1739 The compression level parameter is not supported and must
1740 be None
1741
1742
1743 Raises
1744 ------
1745 ValueError
1746 If invalid compression value is passed.
1747 """
1748
1749 def __init__(self, str compression not None, compression_level=None):
1750 cdef CCompressionType typ = _ensure_compression(compression)
1751 if compression_level is not None:
1752 self.wrapped = shared_ptr[CCodec](move(GetResultValue(
1753 CCodec.CreateWithLevel(typ, compression_level))))
1754 else:
1755 self.wrapped = shared_ptr[CCodec](move(GetResultValue(
1756 CCodec.Create(typ))))
1757
1758 cdef inline CCodec* unwrap(self) nogil:
1759 return self.wrapped.get()
1760
1761 @staticmethod
1762 def detect(path):
1763 """
1764 Detect and instantiate compression codec based on file extension.
1765
1766 Parameters
1767 ----------
1768 path : str, path-like
1769 File-path to detect compression from.
1770
1771 Raises
1772 ------
1773 TypeError
1774 If the passed value is not path-like.
1775 ValueError
1776 If the compression can't be detected from the path.
1777
1778 Returns
1779 -------
1780 Codec
1781 """
1782 return Codec(_detect_compression(_stringify_path(path)))
1783
1784 @staticmethod
1785 def is_available(str compression not None):
1786 """
1787 Returns whether the compression support has been built and enabled.
1788
1789 Parameters
1790 ----------
1791 compression : str
1792 Type of compression codec,
1793 refer to Codec docstring for a list of supported ones.
1794
1795 Returns
1796 -------
1797 bool
1798 """
1799 cdef CCompressionType typ = _ensure_compression(compression)
1800 return CCodec.IsAvailable(typ)
1801
1802 @staticmethod
1803 def supports_compression_level(str compression not None):
1804 """
1805 Returns true if the compression level parameter is supported
1806 for the given codec.
1807
1808 Parameters
1809 ----------
1810 compression : str
1811 Type of compression codec,
1812 refer to Codec docstring for a list of supported ones.
1813 """
1814 cdef CCompressionType typ = _ensure_compression(compression)
1815 return CCodec.SupportsCompressionLevel(typ)
1816
1817 @staticmethod
1818 def default_compression_level(str compression not None):
1819 """
1820 Returns the compression level that Arrow will use for the codec if
1821 None is specified.
1822
1823 Parameters
1824 ----------
1825 compression : str
1826 Type of compression codec,
1827 refer to Codec docstring for a list of supported ones.
1828 """
1829 cdef CCompressionType typ = _ensure_compression(compression)
1830 return GetResultValue(CCodec.DefaultCompressionLevel(typ))
1831
1832 @staticmethod
1833 def minimum_compression_level(str compression not None):
1834 """
1835 Returns the smallest valid value for the compression level
1836
1837 Parameters
1838 ----------
1839 compression : str
1840 Type of compression codec,
1841 refer to Codec docstring for a list of supported ones.
1842 """
1843 cdef CCompressionType typ = _ensure_compression(compression)
1844 return GetResultValue(CCodec.MinimumCompressionLevel(typ))
1845
1846 @staticmethod
1847 def maximum_compression_level(str compression not None):
1848 """
1849 Returns the largest valid value for the compression level
1850
1851 Parameters
1852 ----------
1853 compression : str
1854 Type of compression codec,
1855 refer to Codec docstring for a list of supported ones.
1856 """
1857 cdef CCompressionType typ = _ensure_compression(compression)
1858 return GetResultValue(CCodec.MaximumCompressionLevel(typ))
1859
1860 @property
1861 def name(self):
1862 """Returns the name of the codec"""
1863 return frombytes(self.unwrap().name())
1864
1865 @property
1866 def compression_level(self):
1867 """Returns the compression level parameter of the codec"""
1868 return frombytes(self.unwrap().compression_level())
1869
1870 def compress(self, object buf, asbytes=False, memory_pool=None):
1871 """
1872 Compress data from buffer-like object.
1873
1874 Parameters
1875 ----------
1876 buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol
1877 asbytes : bool, default False
1878 Return result as Python bytes object, otherwise Buffer
1879 memory_pool : MemoryPool, default None
1880 Memory pool to use for buffer allocations, if any
1881
1882 Returns
1883 -------
1884 compressed : pyarrow.Buffer or bytes (if asbytes=True)
1885 """
1886 cdef:
1887 shared_ptr[CBuffer] owned_buf
1888 CBuffer* c_buf
1889 PyObject* pyobj
1890 ResizableBuffer out_buf
1891 int64_t max_output_size
1892 int64_t output_length
1893 uint8_t* output_buffer = NULL
1894
1895 owned_buf = as_c_buffer(buf)
1896 c_buf = owned_buf.get()
1897
1898 max_output_size = self.wrapped.get().MaxCompressedLen(
1899 c_buf.size(), c_buf.data()
1900 )
1901
1902 if asbytes:
1903 pyobj = PyBytes_FromStringAndSizeNative(NULL, max_output_size)
1904 output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(<object> pyobj)
1905 else:
1906 out_buf = allocate_buffer(
1907 max_output_size, memory_pool=memory_pool, resizable=True
1908 )
1909 output_buffer = out_buf.buffer.get().mutable_data()
1910
1911 with nogil:
1912 output_length = GetResultValue(
1913 self.unwrap().Compress(
1914 c_buf.size(),
1915 c_buf.data(),
1916 max_output_size,
1917 output_buffer
1918 )
1919 )
1920
1921 if asbytes:
1922 cp._PyBytes_Resize(&pyobj, <Py_ssize_t> output_length)
1923 return PyObject_to_object(pyobj)
1924 else:
1925 out_buf.resize(output_length)
1926 return out_buf
1927
1928 def decompress(self, object buf, decompressed_size=None, asbytes=False,
1929 memory_pool=None):
1930 """
1931 Decompress data from buffer-like object.
1932
1933 Parameters
1934 ----------
1935 buf : pyarrow.Buffer, bytes, or memoryview-compatible object
1936 decompressed_size : int64_t, default None
1937 If not specified, will be computed if the codec is able to
1938 determine the uncompressed buffer size.
1939 asbytes : boolean, default False
1940 Return result as Python bytes object, otherwise Buffer
1941 memory_pool : MemoryPool, default None
1942 Memory pool to use for buffer allocations, if any.
1943
1944 Returns
1945 -------
1946 uncompressed : pyarrow.Buffer or bytes (if asbytes=True)
1947 """
1948 cdef:
1949 shared_ptr[CBuffer] owned_buf
1950 CBuffer* c_buf
1951 Buffer out_buf
1952 int64_t output_size
1953 uint8_t* output_buffer = NULL
1954
1955 owned_buf = as_c_buffer(buf)
1956 c_buf = owned_buf.get()
1957
1958 if decompressed_size is None:
1959 raise ValueError(
1960 "Must pass decompressed_size for {} codec".format(self)
1961 )
1962
1963 output_size = decompressed_size
1964
1965 if asbytes:
1966 pybuf = cp.PyBytes_FromStringAndSize(NULL, output_size)
1967 output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(pybuf)
1968 else:
1969 out_buf = allocate_buffer(output_size, memory_pool=memory_pool)
1970 output_buffer = out_buf.buffer.get().mutable_data()
1971
1972 with nogil:
1973 GetResultValue(
1974 self.unwrap().Decompress(
1975 c_buf.size(),
1976 c_buf.data(),
1977 output_size,
1978 output_buffer
1979 )
1980 )
1981
1982 return pybuf if asbytes else out_buf
1983
1984
1985 def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
1986 """
1987 Compress data from buffer-like object.
1988
1989 Parameters
1990 ----------
1991 buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol
1992 codec : str, default 'lz4'
1993 Compression codec.
1994 Supported types: {'brotli, 'gzip', 'lz4', 'lz4_raw', 'snappy', 'zstd'}
1995 asbytes : bool, default False
1996 Return result as Python bytes object, otherwise Buffer.
1997 memory_pool : MemoryPool, default None
1998 Memory pool to use for buffer allocations, if any.
1999
2000 Returns
2001 -------
2002 compressed : pyarrow.Buffer or bytes (if asbytes=True)
2003 """
2004 cdef Codec coder = Codec(codec)
2005 return coder.compress(buf, asbytes=asbytes, memory_pool=memory_pool)
2006
2007
2008 def decompress(object buf, decompressed_size=None, codec='lz4',
2009 asbytes=False, memory_pool=None):
2010 """
2011 Decompress data from buffer-like object.
2012
2013 Parameters
2014 ----------
2015 buf : pyarrow.Buffer, bytes, or memoryview-compatible object
2016 Input object to decompress data from.
2017 decompressed_size : int64_t, default None
2018 If not specified, will be computed if the codec is able to determine
2019 the uncompressed buffer size.
2020 codec : str, default 'lz4'
2021 Compression codec.
2022 Supported types: {'brotli, 'gzip', 'lz4', 'lz4_raw', 'snappy', 'zstd'}
2023 asbytes : bool, default False
2024 Return result as Python bytes object, otherwise Buffer.
2025 memory_pool : MemoryPool, default None
2026 Memory pool to use for buffer allocations, if any.
2027
2028 Returns
2029 -------
2030 uncompressed : pyarrow.Buffer or bytes (if asbytes=True)
2031 """
2032 cdef Codec decoder = Codec(codec)
2033 return decoder.decompress(buf, asbytes=asbytes, memory_pool=memory_pool,
2034 decompressed_size=decompressed_size)
2035
2036
2037 def input_stream(source, compression='detect', buffer_size=None):
2038 """
2039 Create an Arrow input stream.
2040
2041 Parameters
2042 ----------
2043 source : str, Path, buffer, file-like object, ...
2044 The source to open for reading.
2045 compression : str optional, default 'detect'
2046 The compression algorithm to use for on-the-fly decompression.
2047 If "detect" and source is a file path, then compression will be
2048 chosen based on the file extension.
2049 If None, no compression will be applied.
2050 Otherwise, a well-known algorithm name must be supplied (e.g. "gzip").
2051 buffer_size : int, default None
2052 If None or 0, no buffering will happen. Otherwise the size of the
2053 temporary read buffer.
2054 """
2055 cdef NativeFile stream
2056
2057 try:
2058 source_path = _stringify_path(source)
2059 except TypeError:
2060 source_path = None
2061
2062 if isinstance(source, NativeFile):
2063 stream = source
2064 elif source_path is not None:
2065 stream = OSFile(source_path, 'r')
2066 elif isinstance(source, (Buffer, memoryview)):
2067 stream = BufferReader(as_buffer(source))
2068 elif (hasattr(source, 'read') and
2069 hasattr(source, 'close') and
2070 hasattr(source, 'closed')):
2071 stream = PythonFile(source, 'r')
2072 else:
2073 raise TypeError("pa.input_stream() called with instance of '{}'"
2074 .format(source.__class__))
2075
2076 if compression == 'detect':
2077 # detect for OSFile too
2078 compression = _detect_compression(source_path)
2079
2080 if buffer_size is not None and buffer_size != 0:
2081 stream = BufferedInputStream(stream, buffer_size)
2082
2083 if compression is not None:
2084 stream = CompressedInputStream(stream, compression)
2085
2086 return stream
2087
2088
2089 def output_stream(source, compression='detect', buffer_size=None):
2090 """
2091 Create an Arrow output stream.
2092
2093 Parameters
2094 ----------
2095 source : str, Path, buffer, file-like object, ...
2096 The source to open for writing.
2097 compression : str optional, default 'detect'
2098 The compression algorithm to use for on-the-fly compression.
2099 If "detect" and source is a file path, then compression will be
2100 chosen based on the file extension.
2101 If None, no compression will be applied.
2102 Otherwise, a well-known algorithm name must be supplied (e.g. "gzip").
2103 buffer_size : int, default None
2104 If None or 0, no buffering will happen. Otherwise the size of the
2105 temporary write buffer.
2106 """
2107 cdef NativeFile stream
2108
2109 try:
2110 source_path = _stringify_path(source)
2111 except TypeError:
2112 source_path = None
2113
2114 if isinstance(source, NativeFile):
2115 stream = source
2116 elif source_path is not None:
2117 stream = OSFile(source_path, 'w')
2118 elif isinstance(source, (Buffer, memoryview)):
2119 stream = FixedSizeBufferWriter(as_buffer(source))
2120 elif (hasattr(source, 'write') and
2121 hasattr(source, 'close') and
2122 hasattr(source, 'closed')):
2123 stream = PythonFile(source, 'w')
2124 else:
2125 raise TypeError("pa.output_stream() called with instance of '{}'"
2126 .format(source.__class__))
2127
2128 if compression == 'detect':
2129 compression = _detect_compression(source_path)
2130
2131 if buffer_size is not None and buffer_size != 0:
2132 stream = BufferedOutputStream(stream, buffer_size)
2133
2134 if compression is not None:
2135 stream = CompressedOutputStream(stream, compression)
2136
2137 return stream