]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | # -*- coding: utf-8 -*- |
2 | # Licensed to the Apache Software Foundation (ASF) under one | |
3 | # or more contributor license agreements. See the NOTICE file | |
4 | # distributed with this work for additional information | |
5 | # regarding copyright ownership. The ASF licenses this file | |
6 | # to you under the Apache License, Version 2.0 (the | |
7 | # "License"); you may not use this file except in compliance | |
8 | # with the License. You may obtain a copy of the License at | |
9 | # | |
10 | # http://www.apache.org/licenses/LICENSE-2.0 | |
11 | # | |
12 | # Unless required by applicable law or agreed to in writing, | |
13 | # software distributed under the License is distributed on an | |
14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | # KIND, either express or implied. See the License for the | |
16 | # specific language governing permissions and limitations | |
17 | # under the License. | |
18 | ||
19 | import gc | |
20 | ||
21 | import pyarrow as pa | |
22 | try: | |
23 | from pyarrow.cffi import ffi | |
24 | except ImportError: | |
25 | ffi = None | |
26 | ||
27 | import pytest | |
28 | ||
29 | try: | |
30 | import pandas as pd | |
31 | import pandas.testing as tm | |
32 | except ImportError: | |
33 | pd = tm = None | |
34 | ||
35 | ||
36 | needs_cffi = pytest.mark.skipif(ffi is None, | |
37 | reason="test needs cffi package installed") | |
38 | ||
39 | ||
40 | assert_schema_released = pytest.raises( | |
41 | ValueError, match="Cannot import released ArrowSchema") | |
42 | ||
43 | assert_array_released = pytest.raises( | |
44 | ValueError, match="Cannot import released ArrowArray") | |
45 | ||
46 | assert_stream_released = pytest.raises( | |
47 | ValueError, match="Cannot import released ArrowArrayStream") | |
48 | ||
49 | ||
50 | class ParamExtType(pa.PyExtensionType): | |
51 | ||
52 | def __init__(self, width): | |
53 | self._width = width | |
54 | pa.PyExtensionType.__init__(self, pa.binary(width)) | |
55 | ||
56 | @property | |
57 | def width(self): | |
58 | return self._width | |
59 | ||
60 | def __reduce__(self): | |
61 | return ParamExtType, (self.width,) | |
62 | ||
63 | ||
64 | def make_schema(): | |
65 | return pa.schema([('ints', pa.list_(pa.int32()))], | |
66 | metadata={b'key1': b'value1'}) | |
67 | ||
68 | ||
69 | def make_extension_schema(): | |
70 | return pa.schema([('ext', ParamExtType(3))], | |
71 | metadata={b'key1': b'value1'}) | |
72 | ||
73 | ||
74 | def make_batch(): | |
75 | return pa.record_batch([[[1], [2, 42]]], make_schema()) | |
76 | ||
77 | ||
78 | def make_extension_batch(): | |
79 | schema = make_extension_schema() | |
80 | ext_col = schema[0].type.wrap_array(pa.array([b"foo", b"bar"], | |
81 | type=pa.binary(3))) | |
82 | return pa.record_batch([ext_col], schema) | |
83 | ||
84 | ||
85 | def make_batches(): | |
86 | schema = make_schema() | |
87 | return [ | |
88 | pa.record_batch([[[1], [2, 42]]], schema), | |
89 | pa.record_batch([[None, [], [5, 6]]], schema), | |
90 | ] | |
91 | ||
92 | ||
93 | def make_serialized(schema, batches): | |
94 | with pa.BufferOutputStream() as sink: | |
95 | with pa.ipc.new_stream(sink, schema) as out: | |
96 | for batch in batches: | |
97 | out.write(batch) | |
98 | return sink.getvalue() | |
99 | ||
100 | ||
101 | @needs_cffi | |
102 | def test_export_import_type(): | |
103 | c_schema = ffi.new("struct ArrowSchema*") | |
104 | ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | |
105 | ||
106 | gc.collect() # Make sure no Arrow data dangles in a ref cycle | |
107 | old_allocated = pa.total_allocated_bytes() | |
108 | ||
109 | typ = pa.list_(pa.int32()) | |
110 | typ._export_to_c(ptr_schema) | |
111 | assert pa.total_allocated_bytes() > old_allocated | |
112 | # Delete and recreate C++ object from exported pointer | |
113 | del typ | |
114 | assert pa.total_allocated_bytes() > old_allocated | |
115 | typ_new = pa.DataType._import_from_c(ptr_schema) | |
116 | assert typ_new == pa.list_(pa.int32()) | |
117 | assert pa.total_allocated_bytes() == old_allocated | |
118 | # Now released | |
119 | with assert_schema_released: | |
120 | pa.DataType._import_from_c(ptr_schema) | |
121 | ||
122 | # Invalid format string | |
123 | pa.int32()._export_to_c(ptr_schema) | |
124 | bad_format = ffi.new("char[]", b"zzz") | |
125 | c_schema.format = bad_format | |
126 | with pytest.raises(ValueError, | |
127 | match="Invalid or unsupported format string"): | |
128 | pa.DataType._import_from_c(ptr_schema) | |
129 | # Now released | |
130 | with assert_schema_released: | |
131 | pa.DataType._import_from_c(ptr_schema) | |
132 | ||
133 | ||
134 | @needs_cffi | |
135 | def test_export_import_field(): | |
136 | c_schema = ffi.new("struct ArrowSchema*") | |
137 | ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | |
138 | ||
139 | gc.collect() # Make sure no Arrow data dangles in a ref cycle | |
140 | old_allocated = pa.total_allocated_bytes() | |
141 | ||
142 | field = pa.field("test", pa.list_(pa.int32()), nullable=True) | |
143 | field._export_to_c(ptr_schema) | |
144 | assert pa.total_allocated_bytes() > old_allocated | |
145 | # Delete and recreate C++ object from exported pointer | |
146 | del field | |
147 | assert pa.total_allocated_bytes() > old_allocated | |
148 | ||
149 | field_new = pa.Field._import_from_c(ptr_schema) | |
150 | assert field_new == pa.field("test", pa.list_(pa.int32()), nullable=True) | |
151 | assert pa.total_allocated_bytes() == old_allocated | |
152 | ||
153 | # Now released | |
154 | with assert_schema_released: | |
155 | pa.Field._import_from_c(ptr_schema) | |
156 | ||
157 | ||
158 | @needs_cffi | |
159 | def test_export_import_array(): | |
160 | c_schema = ffi.new("struct ArrowSchema*") | |
161 | ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | |
162 | c_array = ffi.new("struct ArrowArray*") | |
163 | ptr_array = int(ffi.cast("uintptr_t", c_array)) | |
164 | ||
165 | gc.collect() # Make sure no Arrow data dangles in a ref cycle | |
166 | old_allocated = pa.total_allocated_bytes() | |
167 | ||
168 | # Type is known up front | |
169 | typ = pa.list_(pa.int32()) | |
170 | arr = pa.array([[1], [2, 42]], type=typ) | |
171 | py_value = arr.to_pylist() | |
172 | arr._export_to_c(ptr_array) | |
173 | assert pa.total_allocated_bytes() > old_allocated | |
174 | # Delete recreate C++ object from exported pointer | |
175 | del arr | |
176 | arr_new = pa.Array._import_from_c(ptr_array, typ) | |
177 | assert arr_new.to_pylist() == py_value | |
178 | assert arr_new.type == pa.list_(pa.int32()) | |
179 | assert pa.total_allocated_bytes() > old_allocated | |
180 | del arr_new, typ | |
181 | assert pa.total_allocated_bytes() == old_allocated | |
182 | # Now released | |
183 | with assert_array_released: | |
184 | pa.Array._import_from_c(ptr_array, pa.list_(pa.int32())) | |
185 | ||
186 | # Type is exported and imported at the same time | |
187 | arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) | |
188 | py_value = arr.to_pylist() | |
189 | arr._export_to_c(ptr_array, ptr_schema) | |
190 | # Delete and recreate C++ objects from exported pointers | |
191 | del arr | |
192 | arr_new = pa.Array._import_from_c(ptr_array, ptr_schema) | |
193 | assert arr_new.to_pylist() == py_value | |
194 | assert arr_new.type == pa.list_(pa.int32()) | |
195 | assert pa.total_allocated_bytes() > old_allocated | |
196 | del arr_new | |
197 | assert pa.total_allocated_bytes() == old_allocated | |
198 | # Now released | |
199 | with assert_schema_released: | |
200 | pa.Array._import_from_c(ptr_array, ptr_schema) | |
201 | ||
202 | ||
203 | def check_export_import_schema(schema_factory): | |
204 | c_schema = ffi.new("struct ArrowSchema*") | |
205 | ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | |
206 | ||
207 | gc.collect() # Make sure no Arrow data dangles in a ref cycle | |
208 | old_allocated = pa.total_allocated_bytes() | |
209 | ||
210 | schema_factory()._export_to_c(ptr_schema) | |
211 | assert pa.total_allocated_bytes() > old_allocated | |
212 | # Delete and recreate C++ object from exported pointer | |
213 | schema_new = pa.Schema._import_from_c(ptr_schema) | |
214 | assert schema_new == schema_factory() | |
215 | assert pa.total_allocated_bytes() == old_allocated | |
216 | del schema_new | |
217 | assert pa.total_allocated_bytes() == old_allocated | |
218 | # Now released | |
219 | with assert_schema_released: | |
220 | pa.Schema._import_from_c(ptr_schema) | |
221 | ||
222 | # Not a struct type | |
223 | pa.int32()._export_to_c(ptr_schema) | |
224 | with pytest.raises(ValueError, | |
225 | match="ArrowSchema describes non-struct type"): | |
226 | pa.Schema._import_from_c(ptr_schema) | |
227 | # Now released | |
228 | with assert_schema_released: | |
229 | pa.Schema._import_from_c(ptr_schema) | |
230 | ||
231 | ||
232 | @needs_cffi | |
233 | def test_export_import_schema(): | |
234 | check_export_import_schema(make_schema) | |
235 | ||
236 | ||
237 | @needs_cffi | |
238 | def test_export_import_schema_with_extension(): | |
239 | check_export_import_schema(make_extension_schema) | |
240 | ||
241 | ||
242 | def check_export_import_batch(batch_factory): | |
243 | c_schema = ffi.new("struct ArrowSchema*") | |
244 | ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | |
245 | c_array = ffi.new("struct ArrowArray*") | |
246 | ptr_array = int(ffi.cast("uintptr_t", c_array)) | |
247 | ||
248 | gc.collect() # Make sure no Arrow data dangles in a ref cycle | |
249 | old_allocated = pa.total_allocated_bytes() | |
250 | ||
251 | # Schema is known up front | |
252 | batch = batch_factory() | |
253 | schema = batch.schema | |
254 | py_value = batch.to_pydict() | |
255 | batch._export_to_c(ptr_array) | |
256 | assert pa.total_allocated_bytes() > old_allocated | |
257 | # Delete and recreate C++ object from exported pointer | |
258 | del batch | |
259 | batch_new = pa.RecordBatch._import_from_c(ptr_array, schema) | |
260 | assert batch_new.to_pydict() == py_value | |
261 | assert batch_new.schema == schema | |
262 | assert pa.total_allocated_bytes() > old_allocated | |
263 | del batch_new, schema | |
264 | assert pa.total_allocated_bytes() == old_allocated | |
265 | # Now released | |
266 | with assert_array_released: | |
267 | pa.RecordBatch._import_from_c(ptr_array, make_schema()) | |
268 | ||
269 | # Type is exported and imported at the same time | |
270 | batch = batch_factory() | |
271 | py_value = batch.to_pydict() | |
272 | batch._export_to_c(ptr_array, ptr_schema) | |
273 | # Delete and recreate C++ objects from exported pointers | |
274 | del batch | |
275 | batch_new = pa.RecordBatch._import_from_c(ptr_array, ptr_schema) | |
276 | assert batch_new.to_pydict() == py_value | |
277 | assert batch_new.schema == batch_factory().schema | |
278 | assert pa.total_allocated_bytes() > old_allocated | |
279 | del batch_new | |
280 | assert pa.total_allocated_bytes() == old_allocated | |
281 | # Now released | |
282 | with assert_schema_released: | |
283 | pa.RecordBatch._import_from_c(ptr_array, ptr_schema) | |
284 | ||
285 | # Not a struct type | |
286 | pa.int32()._export_to_c(ptr_schema) | |
287 | batch_factory()._export_to_c(ptr_array) | |
288 | with pytest.raises(ValueError, | |
289 | match="ArrowSchema describes non-struct type"): | |
290 | pa.RecordBatch._import_from_c(ptr_array, ptr_schema) | |
291 | # Now released | |
292 | with assert_schema_released: | |
293 | pa.RecordBatch._import_from_c(ptr_array, ptr_schema) | |
294 | ||
295 | ||
296 | @needs_cffi | |
297 | def test_export_import_batch(): | |
298 | check_export_import_batch(make_batch) | |
299 | ||
300 | ||
301 | @needs_cffi | |
302 | def test_export_import_batch_with_extension(): | |
303 | check_export_import_batch(make_extension_batch) | |
304 | ||
305 | ||
306 | def _export_import_batch_reader(ptr_stream, reader_factory): | |
307 | # Prepare input | |
308 | batches = make_batches() | |
309 | schema = batches[0].schema | |
310 | ||
311 | reader = reader_factory(schema, batches) | |
312 | reader._export_to_c(ptr_stream) | |
313 | # Delete and recreate C++ object from exported pointer | |
314 | del reader, batches | |
315 | ||
316 | reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream) | |
317 | assert reader_new.schema == schema | |
318 | got_batches = list(reader_new) | |
319 | del reader_new | |
320 | assert got_batches == make_batches() | |
321 | ||
322 | # Test read_pandas() | |
323 | if pd is not None: | |
324 | batches = make_batches() | |
325 | schema = batches[0].schema | |
326 | expected_df = pa.Table.from_batches(batches).to_pandas() | |
327 | ||
328 | reader = reader_factory(schema, batches) | |
329 | reader._export_to_c(ptr_stream) | |
330 | del reader, batches | |
331 | ||
332 | reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream) | |
333 | got_df = reader_new.read_pandas() | |
334 | del reader_new | |
335 | tm.assert_frame_equal(expected_df, got_df) | |
336 | ||
337 | ||
338 | def make_ipc_stream_reader(schema, batches): | |
339 | return pa.ipc.open_stream(make_serialized(schema, batches)) | |
340 | ||
341 | ||
342 | def make_py_record_batch_reader(schema, batches): | |
343 | return pa.ipc.RecordBatchReader.from_batches(schema, batches) | |
344 | ||
345 | ||
346 | @needs_cffi | |
347 | @pytest.mark.parametrize('reader_factory', | |
348 | [make_ipc_stream_reader, | |
349 | make_py_record_batch_reader]) | |
350 | def test_export_import_batch_reader(reader_factory): | |
351 | c_stream = ffi.new("struct ArrowArrayStream*") | |
352 | ptr_stream = int(ffi.cast("uintptr_t", c_stream)) | |
353 | ||
354 | gc.collect() # Make sure no Arrow data dangles in a ref cycle | |
355 | old_allocated = pa.total_allocated_bytes() | |
356 | ||
357 | _export_import_batch_reader(ptr_stream, reader_factory) | |
358 | ||
359 | assert pa.total_allocated_bytes() == old_allocated | |
360 | ||
361 | # Now released | |
362 | with assert_stream_released: | |
363 | pa.ipc.RecordBatchReader._import_from_c(ptr_stream) | |
364 | ||
365 | ||
366 | @needs_cffi | |
367 | def test_imported_batch_reader_error(): | |
368 | c_stream = ffi.new("struct ArrowArrayStream*") | |
369 | ptr_stream = int(ffi.cast("uintptr_t", c_stream)) | |
370 | ||
371 | schema = pa.schema([('foo', pa.int32())]) | |
372 | batches = [pa.record_batch([[1, 2, 3]], schema=schema), | |
373 | pa.record_batch([[4, 5, 6]], schema=schema)] | |
374 | buf = make_serialized(schema, batches) | |
375 | ||
376 | # Open a corrupt/incomplete stream and export it | |
377 | reader = pa.ipc.open_stream(buf[:-16]) | |
378 | reader._export_to_c(ptr_stream) | |
379 | del reader | |
380 | ||
381 | reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream) | |
382 | batch = reader_new.read_next_batch() | |
383 | assert batch == batches[0] | |
384 | with pytest.raises(OSError, | |
385 | match="Expected to be able to read 16 bytes " | |
386 | "for message body, got 8"): | |
387 | reader_new.read_next_batch() | |
388 | ||
389 | # Again, but call read_all() | |
390 | reader = pa.ipc.open_stream(buf[:-16]) | |
391 | reader._export_to_c(ptr_stream) | |
392 | del reader | |
393 | ||
394 | reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream) | |
395 | with pytest.raises(OSError, | |
396 | match="Expected to be able to read 16 bytes " | |
397 | "for message body, got 8"): | |
398 | reader_new.read_all() |