]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/python/pyarrow/tests/test_cffi.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / tests / test_cffi.py
CommitLineData
1d09f67e
TL
1# -*- coding: utf-8 -*-
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18
19import gc
20
21import pyarrow as pa
22try:
23 from pyarrow.cffi import ffi
24except ImportError:
25 ffi = None
26
27import pytest
28
29try:
30 import pandas as pd
31 import pandas.testing as tm
32except ImportError:
33 pd = tm = None
34
35
36needs_cffi = pytest.mark.skipif(ffi is None,
37 reason="test needs cffi package installed")
38
39
40assert_schema_released = pytest.raises(
41 ValueError, match="Cannot import released ArrowSchema")
42
43assert_array_released = pytest.raises(
44 ValueError, match="Cannot import released ArrowArray")
45
46assert_stream_released = pytest.raises(
47 ValueError, match="Cannot import released ArrowArrayStream")
48
49
50class ParamExtType(pa.PyExtensionType):
51
52 def __init__(self, width):
53 self._width = width
54 pa.PyExtensionType.__init__(self, pa.binary(width))
55
56 @property
57 def width(self):
58 return self._width
59
60 def __reduce__(self):
61 return ParamExtType, (self.width,)
62
63
64def make_schema():
65 return pa.schema([('ints', pa.list_(pa.int32()))],
66 metadata={b'key1': b'value1'})
67
68
69def make_extension_schema():
70 return pa.schema([('ext', ParamExtType(3))],
71 metadata={b'key1': b'value1'})
72
73
74def make_batch():
75 return pa.record_batch([[[1], [2, 42]]], make_schema())
76
77
78def make_extension_batch():
79 schema = make_extension_schema()
80 ext_col = schema[0].type.wrap_array(pa.array([b"foo", b"bar"],
81 type=pa.binary(3)))
82 return pa.record_batch([ext_col], schema)
83
84
85def make_batches():
86 schema = make_schema()
87 return [
88 pa.record_batch([[[1], [2, 42]]], schema),
89 pa.record_batch([[None, [], [5, 6]]], schema),
90 ]
91
92
93def make_serialized(schema, batches):
94 with pa.BufferOutputStream() as sink:
95 with pa.ipc.new_stream(sink, schema) as out:
96 for batch in batches:
97 out.write(batch)
98 return sink.getvalue()
99
100
101@needs_cffi
102def test_export_import_type():
103 c_schema = ffi.new("struct ArrowSchema*")
104 ptr_schema = int(ffi.cast("uintptr_t", c_schema))
105
106 gc.collect() # Make sure no Arrow data dangles in a ref cycle
107 old_allocated = pa.total_allocated_bytes()
108
109 typ = pa.list_(pa.int32())
110 typ._export_to_c(ptr_schema)
111 assert pa.total_allocated_bytes() > old_allocated
112 # Delete and recreate C++ object from exported pointer
113 del typ
114 assert pa.total_allocated_bytes() > old_allocated
115 typ_new = pa.DataType._import_from_c(ptr_schema)
116 assert typ_new == pa.list_(pa.int32())
117 assert pa.total_allocated_bytes() == old_allocated
118 # Now released
119 with assert_schema_released:
120 pa.DataType._import_from_c(ptr_schema)
121
122 # Invalid format string
123 pa.int32()._export_to_c(ptr_schema)
124 bad_format = ffi.new("char[]", b"zzz")
125 c_schema.format = bad_format
126 with pytest.raises(ValueError,
127 match="Invalid or unsupported format string"):
128 pa.DataType._import_from_c(ptr_schema)
129 # Now released
130 with assert_schema_released:
131 pa.DataType._import_from_c(ptr_schema)
132
133
134@needs_cffi
135def test_export_import_field():
136 c_schema = ffi.new("struct ArrowSchema*")
137 ptr_schema = int(ffi.cast("uintptr_t", c_schema))
138
139 gc.collect() # Make sure no Arrow data dangles in a ref cycle
140 old_allocated = pa.total_allocated_bytes()
141
142 field = pa.field("test", pa.list_(pa.int32()), nullable=True)
143 field._export_to_c(ptr_schema)
144 assert pa.total_allocated_bytes() > old_allocated
145 # Delete and recreate C++ object from exported pointer
146 del field
147 assert pa.total_allocated_bytes() > old_allocated
148
149 field_new = pa.Field._import_from_c(ptr_schema)
150 assert field_new == pa.field("test", pa.list_(pa.int32()), nullable=True)
151 assert pa.total_allocated_bytes() == old_allocated
152
153 # Now released
154 with assert_schema_released:
155 pa.Field._import_from_c(ptr_schema)
156
157
158@needs_cffi
159def test_export_import_array():
160 c_schema = ffi.new("struct ArrowSchema*")
161 ptr_schema = int(ffi.cast("uintptr_t", c_schema))
162 c_array = ffi.new("struct ArrowArray*")
163 ptr_array = int(ffi.cast("uintptr_t", c_array))
164
165 gc.collect() # Make sure no Arrow data dangles in a ref cycle
166 old_allocated = pa.total_allocated_bytes()
167
168 # Type is known up front
169 typ = pa.list_(pa.int32())
170 arr = pa.array([[1], [2, 42]], type=typ)
171 py_value = arr.to_pylist()
172 arr._export_to_c(ptr_array)
173 assert pa.total_allocated_bytes() > old_allocated
174 # Delete recreate C++ object from exported pointer
175 del arr
176 arr_new = pa.Array._import_from_c(ptr_array, typ)
177 assert arr_new.to_pylist() == py_value
178 assert arr_new.type == pa.list_(pa.int32())
179 assert pa.total_allocated_bytes() > old_allocated
180 del arr_new, typ
181 assert pa.total_allocated_bytes() == old_allocated
182 # Now released
183 with assert_array_released:
184 pa.Array._import_from_c(ptr_array, pa.list_(pa.int32()))
185
186 # Type is exported and imported at the same time
187 arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32()))
188 py_value = arr.to_pylist()
189 arr._export_to_c(ptr_array, ptr_schema)
190 # Delete and recreate C++ objects from exported pointers
191 del arr
192 arr_new = pa.Array._import_from_c(ptr_array, ptr_schema)
193 assert arr_new.to_pylist() == py_value
194 assert arr_new.type == pa.list_(pa.int32())
195 assert pa.total_allocated_bytes() > old_allocated
196 del arr_new
197 assert pa.total_allocated_bytes() == old_allocated
198 # Now released
199 with assert_schema_released:
200 pa.Array._import_from_c(ptr_array, ptr_schema)
201
202
203def check_export_import_schema(schema_factory):
204 c_schema = ffi.new("struct ArrowSchema*")
205 ptr_schema = int(ffi.cast("uintptr_t", c_schema))
206
207 gc.collect() # Make sure no Arrow data dangles in a ref cycle
208 old_allocated = pa.total_allocated_bytes()
209
210 schema_factory()._export_to_c(ptr_schema)
211 assert pa.total_allocated_bytes() > old_allocated
212 # Delete and recreate C++ object from exported pointer
213 schema_new = pa.Schema._import_from_c(ptr_schema)
214 assert schema_new == schema_factory()
215 assert pa.total_allocated_bytes() == old_allocated
216 del schema_new
217 assert pa.total_allocated_bytes() == old_allocated
218 # Now released
219 with assert_schema_released:
220 pa.Schema._import_from_c(ptr_schema)
221
222 # Not a struct type
223 pa.int32()._export_to_c(ptr_schema)
224 with pytest.raises(ValueError,
225 match="ArrowSchema describes non-struct type"):
226 pa.Schema._import_from_c(ptr_schema)
227 # Now released
228 with assert_schema_released:
229 pa.Schema._import_from_c(ptr_schema)
230
231
232@needs_cffi
233def test_export_import_schema():
234 check_export_import_schema(make_schema)
235
236
237@needs_cffi
238def test_export_import_schema_with_extension():
239 check_export_import_schema(make_extension_schema)
240
241
242def check_export_import_batch(batch_factory):
243 c_schema = ffi.new("struct ArrowSchema*")
244 ptr_schema = int(ffi.cast("uintptr_t", c_schema))
245 c_array = ffi.new("struct ArrowArray*")
246 ptr_array = int(ffi.cast("uintptr_t", c_array))
247
248 gc.collect() # Make sure no Arrow data dangles in a ref cycle
249 old_allocated = pa.total_allocated_bytes()
250
251 # Schema is known up front
252 batch = batch_factory()
253 schema = batch.schema
254 py_value = batch.to_pydict()
255 batch._export_to_c(ptr_array)
256 assert pa.total_allocated_bytes() > old_allocated
257 # Delete and recreate C++ object from exported pointer
258 del batch
259 batch_new = pa.RecordBatch._import_from_c(ptr_array, schema)
260 assert batch_new.to_pydict() == py_value
261 assert batch_new.schema == schema
262 assert pa.total_allocated_bytes() > old_allocated
263 del batch_new, schema
264 assert pa.total_allocated_bytes() == old_allocated
265 # Now released
266 with assert_array_released:
267 pa.RecordBatch._import_from_c(ptr_array, make_schema())
268
269 # Type is exported and imported at the same time
270 batch = batch_factory()
271 py_value = batch.to_pydict()
272 batch._export_to_c(ptr_array, ptr_schema)
273 # Delete and recreate C++ objects from exported pointers
274 del batch
275 batch_new = pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
276 assert batch_new.to_pydict() == py_value
277 assert batch_new.schema == batch_factory().schema
278 assert pa.total_allocated_bytes() > old_allocated
279 del batch_new
280 assert pa.total_allocated_bytes() == old_allocated
281 # Now released
282 with assert_schema_released:
283 pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
284
285 # Not a struct type
286 pa.int32()._export_to_c(ptr_schema)
287 batch_factory()._export_to_c(ptr_array)
288 with pytest.raises(ValueError,
289 match="ArrowSchema describes non-struct type"):
290 pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
291 # Now released
292 with assert_schema_released:
293 pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
294
295
296@needs_cffi
297def test_export_import_batch():
298 check_export_import_batch(make_batch)
299
300
301@needs_cffi
302def test_export_import_batch_with_extension():
303 check_export_import_batch(make_extension_batch)
304
305
306def _export_import_batch_reader(ptr_stream, reader_factory):
307 # Prepare input
308 batches = make_batches()
309 schema = batches[0].schema
310
311 reader = reader_factory(schema, batches)
312 reader._export_to_c(ptr_stream)
313 # Delete and recreate C++ object from exported pointer
314 del reader, batches
315
316 reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
317 assert reader_new.schema == schema
318 got_batches = list(reader_new)
319 del reader_new
320 assert got_batches == make_batches()
321
322 # Test read_pandas()
323 if pd is not None:
324 batches = make_batches()
325 schema = batches[0].schema
326 expected_df = pa.Table.from_batches(batches).to_pandas()
327
328 reader = reader_factory(schema, batches)
329 reader._export_to_c(ptr_stream)
330 del reader, batches
331
332 reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
333 got_df = reader_new.read_pandas()
334 del reader_new
335 tm.assert_frame_equal(expected_df, got_df)
336
337
338def make_ipc_stream_reader(schema, batches):
339 return pa.ipc.open_stream(make_serialized(schema, batches))
340
341
342def make_py_record_batch_reader(schema, batches):
343 return pa.ipc.RecordBatchReader.from_batches(schema, batches)
344
345
346@needs_cffi
347@pytest.mark.parametrize('reader_factory',
348 [make_ipc_stream_reader,
349 make_py_record_batch_reader])
350def test_export_import_batch_reader(reader_factory):
351 c_stream = ffi.new("struct ArrowArrayStream*")
352 ptr_stream = int(ffi.cast("uintptr_t", c_stream))
353
354 gc.collect() # Make sure no Arrow data dangles in a ref cycle
355 old_allocated = pa.total_allocated_bytes()
356
357 _export_import_batch_reader(ptr_stream, reader_factory)
358
359 assert pa.total_allocated_bytes() == old_allocated
360
361 # Now released
362 with assert_stream_released:
363 pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
364
365
366@needs_cffi
367def test_imported_batch_reader_error():
368 c_stream = ffi.new("struct ArrowArrayStream*")
369 ptr_stream = int(ffi.cast("uintptr_t", c_stream))
370
371 schema = pa.schema([('foo', pa.int32())])
372 batches = [pa.record_batch([[1, 2, 3]], schema=schema),
373 pa.record_batch([[4, 5, 6]], schema=schema)]
374 buf = make_serialized(schema, batches)
375
376 # Open a corrupt/incomplete stream and export it
377 reader = pa.ipc.open_stream(buf[:-16])
378 reader._export_to_c(ptr_stream)
379 del reader
380
381 reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
382 batch = reader_new.read_next_batch()
383 assert batch == batches[0]
384 with pytest.raises(OSError,
385 match="Expected to be able to read 16 bytes "
386 "for message body, got 8"):
387 reader_new.read_next_batch()
388
389 # Again, but call read_all()
390 reader = pa.ipc.open_stream(buf[:-16])
391 reader._export_to_c(ptr_stream)
392 del reader
393
394 reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
395 with pytest.raises(OSError,
396 match="Expected to be able to read 16 bytes "
397 "for message body, got 8"):
398 reader_new.read_all()