]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | .. Licensed to the Apache Software Foundation (ASF) under one |
2 | .. or more contributor license agreements. See the NOTICE file | |
3 | .. distributed with this work for additional information | |
4 | .. regarding copyright ownership. The ASF licenses this file | |
5 | .. to you under the Apache License, Version 2.0 (the | |
6 | .. "License"); you may not use this file except in compliance | |
7 | .. with the License. You may obtain a copy of the License at | |
8 | ||
9 | .. http://www.apache.org/licenses/LICENSE-2.0 | |
10 | ||
11 | .. Unless required by applicable law or agreed to in writing, | |
12 | .. software distributed under the License is distributed on an | |
13 | .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | .. KIND, either express or implied. See the License for the | |
15 | .. specific language governing permissions and limitations | |
16 | .. under the License. | |
17 | ||
18 | .. currentmodule:: pyarrow | |
19 | ||
20 | .. _ipc: | |
21 | ||
22 | Streaming, Serialization, and IPC | |
23 | ================================= | |
24 | ||
25 | Writing and Reading Streams | |
26 | --------------------------- | |
27 | ||
28 | Arrow defines two types of binary formats for serializing record batches: | |
29 | ||
30 | * **Streaming format**: for sending an arbitrary length sequence of record | |
31 | batches. The format must be processed from start to end, and does not support | |
32 | random access | |
33 | ||
34 | * **File or Random Access format**: for serializing a fixed number of record | |
35 | batches. Supports random access, and thus is very useful when used with | |
36 | memory maps | |
37 | ||
38 | To follow this section, make sure to first read the section on :ref:`Memory and | |
39 | IO <io>`. | |
40 | ||
41 | Using streams | |
42 | ~~~~~~~~~~~~~ | |
43 | ||
44 | First, let's create a small record batch: | |
45 | ||
46 | .. ipython:: python | |
47 | ||
48 | import pyarrow as pa | |
49 | ||
50 | data = [ | |
51 | pa.array([1, 2, 3, 4]), | |
52 | pa.array(['foo', 'bar', 'baz', None]), | |
53 | pa.array([True, None, False, True]) | |
54 | ] | |
55 | ||
56 | batch = pa.record_batch(data, names=['f0', 'f1', 'f2']) | |
57 | batch.num_rows | |
58 | batch.num_columns | |
59 | ||
60 | Now, we can begin writing a stream containing some number of these batches. For | |
61 | this we use :class:`~pyarrow.RecordBatchStreamWriter`, which can write to a | |
62 | writeable ``NativeFile`` object or a writeable Python object. For convenience, | |
63 | this one can be created with :func:`~pyarrow.ipc.new_stream`: | |
64 | ||
65 | .. ipython:: python | |
66 | ||
67 | sink = pa.BufferOutputStream() | |
68 | ||
69 | with pa.ipc.new_stream(sink, batch.schema) as writer: | |
70 | for i in range(5): | |
71 | writer.write_batch(batch) | |
72 | ||
73 | Here we used an in-memory Arrow buffer stream (``sink``), | |
74 | but this could have been a socket or some other IO sink. | |
75 | ||
76 | When creating the ``StreamWriter``, we pass the schema, since the schema | |
77 | (column names and types) must be the same for all of the batches sent in this | |
78 | particular stream. Now we can do: | |
79 | ||
80 | .. ipython:: python | |
81 | ||
82 | buf = sink.getvalue() | |
83 | buf.size | |
84 | ||
85 | Now ``buf`` contains the complete stream as an in-memory byte buffer. We can | |
86 | read such a stream with :class:`~pyarrow.RecordBatchStreamReader` or the | |
87 | convenience function ``pyarrow.ipc.open_stream``: | |
88 | ||
89 | .. ipython:: python | |
90 | ||
91 | with pa.ipc.open_stream(buf) as reader: | |
92 | schema = reader.schema | |
93 | batches = [b for b in reader] | |
94 | ||
95 | schema | |
96 | len(batches) | |
97 | ||
98 | We can check the returned batches are the same as the original input: | |
99 | ||
100 | .. ipython:: python | |
101 | ||
102 | batches[0].equals(batch) | |
103 | ||
104 | An important point is that if the input source supports zero-copy reads | |
105 | (e.g. like a memory map, or ``pyarrow.BufferReader``), then the returned | |
106 | batches are also zero-copy and do not allocate any new memory on read. | |
107 | ||
108 | Writing and Reading Random Access Files | |
109 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
110 | ||
111 | The :class:`~pyarrow.RecordBatchFileWriter` has the same API as | |
112 | :class:`~pyarrow.RecordBatchStreamWriter`. You can create one with | |
113 | :func:`~pyarrow.ipc.new_file`: | |
114 | ||
115 | .. ipython:: python | |
116 | ||
117 | sink = pa.BufferOutputStream() | |
118 | ||
119 | with pa.ipc.new_file(sink, batch.schema) as writer: | |
120 | for i in range(10): | |
121 | writer.write_batch(batch) | |
122 | ||
123 | buf = sink.getvalue() | |
124 | buf.size | |
125 | ||
126 | The difference between :class:`~pyarrow.RecordBatchFileReader` and | |
127 | :class:`~pyarrow.RecordBatchStreamReader` is that the input source must have a | |
128 | ``seek`` method for random access. The stream reader only requires read | |
129 | operations. We can also use the :func:`~pyarrow.ipc.open_file` method to open a file: | |
130 | ||
131 | .. ipython:: python | |
132 | ||
133 | with pa.ipc.open_file(buf) as reader: | |
134 | num_record_batches = reader.num_record_batches | |
135 | b = reader.get_batch(3) | |
136 | ||
137 | Because we have access to the entire payload, we know the number of record | |
138 | batches in the file, and can read any at random. | |
139 | ||
140 | .. ipython:: python | |
141 | ||
142 | num_record_batches | |
143 | b.equals(batch) | |
144 | ||
145 | Reading from Stream and File Format for pandas | |
146 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
147 | ||
148 | The stream and file reader classes have a special ``read_pandas`` method to | |
149 | simplify reading multiple record batches and converting them to a single | |
150 | DataFrame output: | |
151 | ||
152 | .. ipython:: python | |
153 | ||
154 | with pa.ipc.open_file(buf) as reader: | |
155 | df = reader.read_pandas() | |
156 | ||
157 | df[:5] | |
158 | ||
159 | Efficiently Writing and Reading Arrow Data | |
160 | ------------------------------------------ | |
161 | ||
162 | Being optimized for zero copy and memory mapped data, Arrow allows to easily | |
163 | read and write arrays consuming the minimum amount of resident memory. | |
164 | ||
165 | When writing and reading raw Arrow data, we can use the Arrow File Format | |
166 | or the Arrow Streaming Format. | |
167 | ||
168 | To dump an array to file, you can use the :meth:`~pyarrow.ipc.new_file` | |
169 | which will provide a new :class:`~pyarrow.ipc.RecordBatchFileWriter` instance | |
170 | that can be used to write batches of data to that file. | |
171 | ||
172 | For example to write an array of 10M integers, we could write it in 1000 chunks | |
173 | of 10000 entries: | |
174 | ||
175 | .. ipython:: python | |
176 | ||
177 | BATCH_SIZE = 10000 | |
178 | NUM_BATCHES = 1000 | |
179 | ||
180 | schema = pa.schema([pa.field('nums', pa.int32())]) | |
181 | ||
182 | with pa.OSFile('bigfile.arrow', 'wb') as sink: | |
183 | with pa.ipc.new_file(sink, schema) as writer: | |
184 | for row in range(NUM_BATCHES): | |
185 | batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema) | |
186 | writer.write(batch) | |
187 | ||
188 | record batches support multiple columns, so in practice we always write the | |
189 | equivalent of a :class:`~pyarrow.Table`. | |
190 | ||
191 | Writing in batches is effective because we in theory need to keep in memory only | |
192 | the current batch we are writing. But when reading back, we can be even more effective | |
193 | by directly mapping the data from disk and avoid allocating any new memory on read. | |
194 | ||
195 | Under normal conditions, reading back our file will consume a few hundred megabytes | |
196 | of memory: | |
197 | ||
198 | .. ipython:: python | |
199 | ||
200 | with pa.OSFile('bigfile.arrow', 'rb') as source: | |
201 | loaded_array = pa.ipc.open_file(source).read_all() | |
202 | ||
203 | print("LEN:", len(loaded_array)) | |
204 | print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) | |
205 | ||
206 | To more efficiently read big data from disk, we can memory map the file, so that | |
207 | Arrow can directly reference the data mapped from disk and avoid having to | |
208 | allocate its own memory. | |
209 | In such case the operating system will be able to page in the mapped memory | |
210 | lazily and page it out without any write back cost when under pressure, | |
211 | allowing to more easily read arrays bigger than the total memory. | |
212 | ||
213 | .. ipython:: python | |
214 | ||
215 | with pa.memory_map('bigfile.arrow', 'rb') as source: | |
216 | loaded_array = pa.ipc.open_file(source).read_all() | |
217 | print("LEN:", len(loaded_array)) | |
218 | print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) | |
219 | ||
220 | .. note:: | |
221 | ||
222 | Other high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a | |
223 | ``memory_map`` option. But in those cases, the memory mapping can't help with | |
224 | reducing resident memory consumption. See :ref:`parquet_mmap` for details. | |
225 | ||
226 | Arbitrary Object Serialization | |
227 | ------------------------------ | |
228 | ||
229 | .. warning:: | |
230 | ||
231 | The custom serialization functionality is deprecated in pyarrow 2.0, and | |
232 | will be removed in a future version. | |
233 | ||
234 | While the serialization functions in this section utilize the Arrow stream | |
235 | protocol internally, they do not produce data that is compatible with the | |
236 | above ``ipc.open_file`` and ``ipc.open_stream`` functions. | |
237 | ||
238 | For arbitrary objects, you can use the standard library ``pickle`` | |
239 | functionality instead. For pyarrow objects, you can use the IPC | |
240 | serialization format through the ``pyarrow.ipc`` module, as explained | |
241 | above. | |
242 | ||
243 | PyArrow serialization was originally meant to provide a higher-performance | |
244 | alternative to ``pickle`` thanks to zero-copy semantics. However, | |
245 | ``pickle`` protocol 5 gained support for zero-copy using out-of-band | |
246 | buffers, and can be used instead for similar benefits. | |
247 | ||
248 | In ``pyarrow`` we are able to serialize and deserialize many kinds of Python | |
249 | objects. As an example, consider a dictionary containing NumPy arrays: | |
250 | ||
251 | .. ipython:: python | |
252 | ||
253 | import numpy as np | |
254 | ||
255 | data = { | |
256 | i: np.random.randn(500, 500) | |
257 | for i in range(100) | |
258 | } | |
259 | ||
260 | We use the ``pyarrow.serialize`` function to convert this data to a byte | |
261 | buffer: | |
262 | ||
263 | .. ipython:: python | |
264 | :okwarning: | |
265 | ||
266 | buf = pa.serialize(data).to_buffer() | |
267 | type(buf) | |
268 | buf.size | |
269 | ||
270 | ``pyarrow.serialize`` creates an intermediate object which can be converted to | |
271 | a buffer (the ``to_buffer`` method) or written directly to an output stream. | |
272 | ||
273 | ``pyarrow.deserialize`` converts a buffer-like object back to the original | |
274 | Python object: | |
275 | ||
276 | .. ipython:: python | |
277 | :okwarning: | |
278 | ||
279 | restored_data = pa.deserialize(buf) | |
280 | restored_data[0] | |
281 | ||
282 | ||
283 | Serializing Custom Data Types | |
284 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
285 | ||
286 | If an unrecognized data type is encountered when serializing an object, | |
287 | ``pyarrow`` will fall back on using ``pickle`` for converting that type to a | |
288 | byte string. There may be a more efficient way, though. | |
289 | ||
290 | Consider a class with two members, one of which is a NumPy array: | |
291 | ||
292 | .. code-block:: python | |
293 | ||
294 | class MyData: | |
295 | def __init__(self, name, data): | |
296 | self.name = name | |
297 | self.data = data | |
298 | ||
299 | We write functions to convert this to and from a dictionary with simpler types: | |
300 | ||
301 | .. code-block:: python | |
302 | ||
303 | def _serialize_MyData(val): | |
304 | return {'name': val.name, 'data': val.data} | |
305 | ||
306 | def _deserialize_MyData(data): | |
307 | return MyData(data['name'], data['data'] | |
308 | ||
309 | then, we must register these functions in a ``SerializationContext`` so that | |
310 | ``MyData`` can be recognized: | |
311 | ||
312 | .. code-block:: python | |
313 | ||
314 | context = pa.SerializationContext() | |
315 | context.register_type(MyData, 'MyData', | |
316 | custom_serializer=_serialize_MyData, | |
317 | custom_deserializer=_deserialize_MyData) | |
318 | ||
319 | Lastly, we use this context as an additional argument to ``pyarrow.serialize``: | |
320 | ||
321 | .. code-block:: python | |
322 | ||
323 | buf = pa.serialize(val, context=context).to_buffer() | |
324 | restored_val = pa.deserialize(buf, context=context) | |
325 | ||
326 | The ``SerializationContext`` also has convenience methods ``serialize`` and | |
327 | ``deserialize``, so these are equivalent statements: | |
328 | ||
329 | .. code-block:: python | |
330 | ||
331 | buf = context.serialize(val).to_buffer() | |
332 | restored_val = context.deserialize(buf) | |
333 | ||
334 | Component-based Serialization | |
335 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
336 | ||
337 | For serializing Python objects containing some number of NumPy arrays, Arrow | |
338 | buffers, or other data types, it may be desirable to transport their serialized | |
339 | representation without having to produce an intermediate copy using the | |
340 | ``to_buffer`` method. To motivate this, suppose we have a list of NumPy arrays: | |
341 | ||
342 | .. ipython:: python | |
343 | ||
344 | import numpy as np | |
345 | data = [np.random.randn(10, 10) for i in range(5)] | |
346 | ||
347 | The call ``pa.serialize(data)`` does not copy the memory inside each of these | |
348 | NumPy arrays. This serialized representation can be then decomposed into a | |
349 | dictionary containing a sequence of ``pyarrow.Buffer`` objects containing | |
350 | metadata for each array and references to the memory inside the arrays. To do | |
351 | this, use the ``to_components`` method: | |
352 | ||
353 | .. ipython:: python | |
354 | :okwarning: | |
355 | ||
356 | serialized = pa.serialize(data) | |
357 | components = serialized.to_components() | |
358 | ||
359 | The particular details of the output of ``to_components`` are not too | |
360 | important. The objects in the ``'data'`` field are ``pyarrow.Buffer`` objects, | |
361 | which are zero-copy convertible to Python ``memoryview`` objects: | |
362 | ||
363 | .. ipython:: python | |
364 | ||
365 | memoryview(components['data'][0]) | |
366 | ||
367 | A memoryview can be converted back to a Arrow ``Buffer`` with | |
368 | ``pyarrow.py_buffer``: | |
369 | ||
370 | .. ipython:: python | |
371 | ||
372 | mv = memoryview(components['data'][0]) | |
373 | buf = pa.py_buffer(mv) | |
374 | ||
375 | An object can be reconstructed from its component-based representation using | |
376 | ``deserialize_components``: | |
377 | ||
378 | .. ipython:: python | |
379 | :okwarning: | |
380 | ||
381 | restored_data = pa.deserialize_components(components) | |
382 | restored_data[0] | |
383 | ||
384 | ``deserialize_components`` is also available as a method on | |
385 | ``SerializationContext`` objects. |