]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/docs/source/python/ipc.rst
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / docs / source / python / ipc.rst
CommitLineData
1d09f67e
TL
1.. Licensed to the Apache Software Foundation (ASF) under one
2.. or more contributor license agreements. See the NOTICE file
3.. distributed with this work for additional information
4.. regarding copyright ownership. The ASF licenses this file
5.. to you under the Apache License, Version 2.0 (the
6.. "License"); you may not use this file except in compliance
7.. with the License. You may obtain a copy of the License at
8
9.. http://www.apache.org/licenses/LICENSE-2.0
10
11.. Unless required by applicable law or agreed to in writing,
12.. software distributed under the License is distributed on an
13.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14.. KIND, either express or implied. See the License for the
15.. specific language governing permissions and limitations
16.. under the License.
17
18.. currentmodule:: pyarrow
19
20.. _ipc:
21
22Streaming, Serialization, and IPC
23=================================
24
25Writing and Reading Streams
26---------------------------
27
28Arrow defines two types of binary formats for serializing record batches:
29
30* **Streaming format**: for sending an arbitrary length sequence of record
31 batches. The format must be processed from start to end, and does not support
32 random access
33
34* **File or Random Access format**: for serializing a fixed number of record
35 batches. Supports random access, and thus is very useful when used with
36 memory maps
37
38To follow this section, make sure to first read the section on :ref:`Memory and
39IO <io>`.
40
41Using streams
42~~~~~~~~~~~~~
43
44First, let's create a small record batch:
45
46.. ipython:: python
47
48 import pyarrow as pa
49
50 data = [
51 pa.array([1, 2, 3, 4]),
52 pa.array(['foo', 'bar', 'baz', None]),
53 pa.array([True, None, False, True])
54 ]
55
56 batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
57 batch.num_rows
58 batch.num_columns
59
60Now, we can begin writing a stream containing some number of these batches. For
61this we use :class:`~pyarrow.RecordBatchStreamWriter`, which can write to a
62writeable ``NativeFile`` object or a writeable Python object. For convenience,
63this one can be created with :func:`~pyarrow.ipc.new_stream`:
64
65.. ipython:: python
66
67 sink = pa.BufferOutputStream()
68
69 with pa.ipc.new_stream(sink, batch.schema) as writer:
70 for i in range(5):
71 writer.write_batch(batch)
72
73Here we used an in-memory Arrow buffer stream (``sink``),
74but this could have been a socket or some other IO sink.
75
76When creating the ``StreamWriter``, we pass the schema, since the schema
77(column names and types) must be the same for all of the batches sent in this
78particular stream. Now we can do:
79
80.. ipython:: python
81
82 buf = sink.getvalue()
83 buf.size
84
85Now ``buf`` contains the complete stream as an in-memory byte buffer. We can
86read such a stream with :class:`~pyarrow.RecordBatchStreamReader` or the
87convenience function ``pyarrow.ipc.open_stream``:
88
89.. ipython:: python
90
91 with pa.ipc.open_stream(buf) as reader:
92 schema = reader.schema
93 batches = [b for b in reader]
94
95 schema
96 len(batches)
97
98We can check the returned batches are the same as the original input:
99
100.. ipython:: python
101
102 batches[0].equals(batch)
103
104An important point is that if the input source supports zero-copy reads
105(e.g. like a memory map, or ``pyarrow.BufferReader``), then the returned
106batches are also zero-copy and do not allocate any new memory on read.
107
108Writing and Reading Random Access Files
109~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
110
111The :class:`~pyarrow.RecordBatchFileWriter` has the same API as
112:class:`~pyarrow.RecordBatchStreamWriter`. You can create one with
113:func:`~pyarrow.ipc.new_file`:
114
115.. ipython:: python
116
117 sink = pa.BufferOutputStream()
118
119 with pa.ipc.new_file(sink, batch.schema) as writer:
120 for i in range(10):
121 writer.write_batch(batch)
122
123 buf = sink.getvalue()
124 buf.size
125
126The difference between :class:`~pyarrow.RecordBatchFileReader` and
127:class:`~pyarrow.RecordBatchStreamReader` is that the input source must have a
128``seek`` method for random access. The stream reader only requires read
129operations. We can also use the :func:`~pyarrow.ipc.open_file` method to open a file:
130
131.. ipython:: python
132
133 with pa.ipc.open_file(buf) as reader:
134 num_record_batches = reader.num_record_batches
135 b = reader.get_batch(3)
136
137Because we have access to the entire payload, we know the number of record
138batches in the file, and can read any at random.
139
140.. ipython:: python
141
142 num_record_batches
143 b.equals(batch)
144
145Reading from Stream and File Format for pandas
146~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
147
148The stream and file reader classes have a special ``read_pandas`` method to
149simplify reading multiple record batches and converting them to a single
150DataFrame output:
151
152.. ipython:: python
153
154 with pa.ipc.open_file(buf) as reader:
155 df = reader.read_pandas()
156
157 df[:5]
158
159Efficiently Writing and Reading Arrow Data
160------------------------------------------
161
162Being optimized for zero copy and memory mapped data, Arrow allows to easily
163read and write arrays consuming the minimum amount of resident memory.
164
165When writing and reading raw Arrow data, we can use the Arrow File Format
166or the Arrow Streaming Format.
167
168To dump an array to file, you can use the :meth:`~pyarrow.ipc.new_file`
169which will provide a new :class:`~pyarrow.ipc.RecordBatchFileWriter` instance
170that can be used to write batches of data to that file.
171
172For example to write an array of 10M integers, we could write it in 1000 chunks
173of 10000 entries:
174
175.. ipython:: python
176
177 BATCH_SIZE = 10000
178 NUM_BATCHES = 1000
179
180 schema = pa.schema([pa.field('nums', pa.int32())])
181
182 with pa.OSFile('bigfile.arrow', 'wb') as sink:
183 with pa.ipc.new_file(sink, schema) as writer:
184 for row in range(NUM_BATCHES):
185 batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
186 writer.write(batch)
187
188record batches support multiple columns, so in practice we always write the
189equivalent of a :class:`~pyarrow.Table`.
190
191Writing in batches is effective because we in theory need to keep in memory only
192the current batch we are writing. But when reading back, we can be even more effective
193by directly mapping the data from disk and avoid allocating any new memory on read.
194
195Under normal conditions, reading back our file will consume a few hundred megabytes
196of memory:
197
198.. ipython:: python
199
200 with pa.OSFile('bigfile.arrow', 'rb') as source:
201 loaded_array = pa.ipc.open_file(source).read_all()
202
203 print("LEN:", len(loaded_array))
204 print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
205
206To more efficiently read big data from disk, we can memory map the file, so that
207Arrow can directly reference the data mapped from disk and avoid having to
208allocate its own memory.
209In such case the operating system will be able to page in the mapped memory
210lazily and page it out without any write back cost when under pressure,
211allowing to more easily read arrays bigger than the total memory.
212
213.. ipython:: python
214
215 with pa.memory_map('bigfile.arrow', 'rb') as source:
216 loaded_array = pa.ipc.open_file(source).read_all()
217 print("LEN:", len(loaded_array))
218 print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
219
220.. note::
221
222 Other high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a
223 ``memory_map`` option. But in those cases, the memory mapping can't help with
224 reducing resident memory consumption. See :ref:`parquet_mmap` for details.
225
226Arbitrary Object Serialization
227------------------------------
228
229.. warning::
230
231 The custom serialization functionality is deprecated in pyarrow 2.0, and
232 will be removed in a future version.
233
234 While the serialization functions in this section utilize the Arrow stream
235 protocol internally, they do not produce data that is compatible with the
236 above ``ipc.open_file`` and ``ipc.open_stream`` functions.
237
238 For arbitrary objects, you can use the standard library ``pickle``
239 functionality instead. For pyarrow objects, you can use the IPC
240 serialization format through the ``pyarrow.ipc`` module, as explained
241 above.
242
243 PyArrow serialization was originally meant to provide a higher-performance
244 alternative to ``pickle`` thanks to zero-copy semantics. However,
245 ``pickle`` protocol 5 gained support for zero-copy using out-of-band
246 buffers, and can be used instead for similar benefits.
247
248In ``pyarrow`` we are able to serialize and deserialize many kinds of Python
249objects. As an example, consider a dictionary containing NumPy arrays:
250
251.. ipython:: python
252
253 import numpy as np
254
255 data = {
256 i: np.random.randn(500, 500)
257 for i in range(100)
258 }
259
260We use the ``pyarrow.serialize`` function to convert this data to a byte
261buffer:
262
263.. ipython:: python
264 :okwarning:
265
266 buf = pa.serialize(data).to_buffer()
267 type(buf)
268 buf.size
269
270``pyarrow.serialize`` creates an intermediate object which can be converted to
271a buffer (the ``to_buffer`` method) or written directly to an output stream.
272
273``pyarrow.deserialize`` converts a buffer-like object back to the original
274Python object:
275
276.. ipython:: python
277 :okwarning:
278
279 restored_data = pa.deserialize(buf)
280 restored_data[0]
281
282
283Serializing Custom Data Types
284~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
285
286If an unrecognized data type is encountered when serializing an object,
287``pyarrow`` will fall back on using ``pickle`` for converting that type to a
288byte string. There may be a more efficient way, though.
289
290Consider a class with two members, one of which is a NumPy array:
291
292.. code-block:: python
293
294 class MyData:
295 def __init__(self, name, data):
296 self.name = name
297 self.data = data
298
299We write functions to convert this to and from a dictionary with simpler types:
300
301.. code-block:: python
302
303 def _serialize_MyData(val):
304 return {'name': val.name, 'data': val.data}
305
306 def _deserialize_MyData(data):
307 return MyData(data['name'], data['data']
308
309then, we must register these functions in a ``SerializationContext`` so that
310``MyData`` can be recognized:
311
312.. code-block:: python
313
314 context = pa.SerializationContext()
315 context.register_type(MyData, 'MyData',
316 custom_serializer=_serialize_MyData,
317 custom_deserializer=_deserialize_MyData)
318
319Lastly, we use this context as an additional argument to ``pyarrow.serialize``:
320
321.. code-block:: python
322
323 buf = pa.serialize(val, context=context).to_buffer()
324 restored_val = pa.deserialize(buf, context=context)
325
326The ``SerializationContext`` also has convenience methods ``serialize`` and
327``deserialize``, so these are equivalent statements:
328
329.. code-block:: python
330
331 buf = context.serialize(val).to_buffer()
332 restored_val = context.deserialize(buf)
333
334Component-based Serialization
335~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
336
337For serializing Python objects containing some number of NumPy arrays, Arrow
338buffers, or other data types, it may be desirable to transport their serialized
339representation without having to produce an intermediate copy using the
340``to_buffer`` method. To motivate this, suppose we have a list of NumPy arrays:
341
342.. ipython:: python
343
344 import numpy as np
345 data = [np.random.randn(10, 10) for i in range(5)]
346
347The call ``pa.serialize(data)`` does not copy the memory inside each of these
348NumPy arrays. This serialized representation can be then decomposed into a
349dictionary containing a sequence of ``pyarrow.Buffer`` objects containing
350metadata for each array and references to the memory inside the arrays. To do
351this, use the ``to_components`` method:
352
353.. ipython:: python
354 :okwarning:
355
356 serialized = pa.serialize(data)
357 components = serialized.to_components()
358
359The particular details of the output of ``to_components`` are not too
360important. The objects in the ``'data'`` field are ``pyarrow.Buffer`` objects,
361which are zero-copy convertible to Python ``memoryview`` objects:
362
363.. ipython:: python
364
365 memoryview(components['data'][0])
366
367A memoryview can be converted back to a Arrow ``Buffer`` with
368``pyarrow.py_buffer``:
369
370.. ipython:: python
371
372 mv = memoryview(components['data'][0])
373 buf = pa.py_buffer(mv)
374
375An object can be reconstructed from its component-based representation using
376``deserialize_components``:
377
378.. ipython:: python
379 :okwarning:
380
381 restored_data = pa.deserialize_components(components)
382 restored_data[0]
383
384``deserialize_components`` is also available as a method on
385``SerializationContext`` objects.