]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | #!/usr/bin/env python3 |
2 | # | |
3 | # Licensed to the Apache Software Foundation (ASF) under one | |
4 | # or more contributor license agreements. See the NOTICE file | |
5 | # distributed with this work for additional information | |
6 | # regarding copyright ownership. The ASF licenses this file | |
7 | # to you under the Apache License, Version 2.0 (the | |
8 | # "License"); you may not use this file except in compliance | |
9 | # with the License. You may obtain a copy of the License at | |
10 | # | |
11 | # http://www.apache.org/licenses/LICENSE-2.0 | |
12 | # | |
13 | # Unless required by applicable law or agreed to in writing, | |
14 | # software distributed under the License is distributed on an | |
15 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
16 | # KIND, either express or implied. See the License for the | |
17 | # specific language governing permissions and limitations | |
18 | # under the License. | |
19 | ||
20 | import contextlib | |
21 | import gc | |
22 | import os | |
23 | import unittest | |
24 | ||
25 | import pyarrow as pa | |
26 | from pyarrow.cffi import ffi | |
27 | ||
28 | ||
29 | def load_cgotest(): | |
30 | # XXX what about Darwin? | |
31 | libext = 'so' | |
32 | if os.name == 'nt': | |
33 | libext = 'dll' | |
34 | ||
35 | ffi.cdef( | |
36 | """ | |
37 | long long totalAllocated(); | |
38 | void importSchema(uintptr_t ptr); | |
39 | void importRecordBatch(uintptr_t scptr, uintptr_t rbptr); | |
40 | void runGC(); | |
41 | void exportSchema(uintptr_t ptr); | |
42 | void exportRecordBatch(uintptr_t schema, uintptr_t record); | |
43 | void importThenExportSchema(uintptr_t input, uintptr_t output); | |
44 | void importThenExportRecord(uintptr_t schemaIn, uintptr_t arrIn, | |
45 | uintptr_t schemaOut, uintptr_t arrOut); | |
46 | """) | |
47 | return ffi.dlopen(f'./cgotest.{libext}') | |
48 | ||
49 | ||
50 | cgotest = load_cgotest() | |
51 | ||
52 | class BaseTestGoPython(unittest.TestCase): | |
53 | def setUp(self): | |
54 | self.c_schema = ffi.new("struct ArrowSchema*") | |
55 | self.ptr_schema = int(ffi.cast("uintptr_t", self.c_schema)) | |
56 | self.c_array = ffi.new("struct ArrowArray*") | |
57 | self.ptr_array = int(ffi.cast("uintptr_t", self.c_array)) | |
58 | ||
59 | def make_schema(self): | |
60 | return pa.schema([('ints', pa.list_(pa.int32()))], | |
61 | metadata={b'key1': b'value1'}) | |
62 | ||
63 | def make_batch(self): | |
64 | return pa.record_batch([[[1], [], None, [2, 42]]], | |
65 | self.make_schema()) | |
66 | ||
67 | def run_gc(self): | |
68 | # Several Go GC runs can be required to run all finalizers | |
69 | for i in range(5): | |
70 | cgotest.runGC() | |
71 | gc.collect() | |
72 | ||
73 | @contextlib.contextmanager | |
74 | def assert_pyarrow_memory_released(self): | |
75 | self.run_gc() | |
76 | old_allocated = pa.total_allocated_bytes() | |
77 | old_go_allocated = cgotest.totalAllocated() | |
78 | yield | |
79 | self.run_gc() | |
80 | diff = pa.total_allocated_bytes() - old_allocated | |
81 | godiff = cgotest.totalAllocated() - old_go_allocated | |
82 | self.assertEqual( | |
83 | pa.total_allocated_bytes(), old_allocated, | |
84 | f"PyArrow memory was not adequately released: {diff} bytes lost") | |
85 | self.assertEqual( | |
86 | cgotest.totalAllocated(), old_go_allocated, | |
87 | f"Go memory was not properly released: {godiff} bytes lost") | |
88 | ||
89 | ||
90 | class TestPythonToGo(BaseTestGoPython): | |
91 | ||
92 | def test_schema(self): | |
93 | with self.assert_pyarrow_memory_released(): | |
94 | self.make_schema()._export_to_c(self.ptr_schema) | |
95 | # Will panic if expectations are not met | |
96 | cgotest.importSchema(self.ptr_schema) | |
97 | ||
98 | def test_record_batch(self): | |
99 | with self.assert_pyarrow_memory_released(): | |
100 | self.make_schema()._export_to_c(self.ptr_schema) | |
101 | self.make_batch()._export_to_c(self.ptr_array) | |
102 | # Will panic if expectations are not met | |
103 | cgotest.importRecordBatch(self.ptr_schema, self.ptr_array) | |
104 | ||
105 | ||
106 | class TestGoToPython(BaseTestGoPython): | |
107 | ||
108 | def test_get_schema(self): | |
109 | with self.assert_pyarrow_memory_released(): | |
110 | cgotest.exportSchema(self.ptr_schema) | |
111 | ||
112 | sc = pa.Schema._import_from_c(self.ptr_schema) | |
113 | assert sc == self.make_schema() | |
114 | ||
115 | def test_get_batch(self): | |
116 | with self.assert_pyarrow_memory_released(): | |
117 | cgotest.exportRecordBatch(self.ptr_schema, self.ptr_array) | |
118 | arrnew = pa.RecordBatch._import_from_c(self.ptr_array, self.ptr_schema) | |
119 | assert arrnew == self.make_batch() | |
120 | del arrnew | |
121 | ||
122 | class TestRoundTrip(BaseTestGoPython): | |
123 | ||
124 | def test_schema_roundtrip(self): | |
125 | with self.assert_pyarrow_memory_released(): | |
126 | # make sure that Python -> Go -> Python ends up with | |
127 | # the same exact schema | |
128 | schema = self.make_schema() | |
129 | schema._export_to_c(self.ptr_schema) | |
130 | del schema | |
131 | ||
132 | c_schema = ffi.new("struct ArrowSchema*") | |
133 | ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | |
134 | ||
135 | cgotest.importThenExportSchema(self.ptr_schema, ptr_schema) | |
136 | schema_new = pa.Schema._import_from_c(ptr_schema) | |
137 | assert schema_new == self.make_schema() | |
138 | del c_schema | |
139 | ||
140 | def test_batch_roundtrip(self): | |
141 | with self.assert_pyarrow_memory_released(): | |
142 | # make sure that Python -> Go -> Python for record | |
143 | # batches works correctly and gets the same data in the end | |
144 | schema = self.make_schema() | |
145 | batch = self.make_batch() | |
146 | schema._export_to_c(self.ptr_schema) | |
147 | batch._export_to_c(self.ptr_array) | |
148 | del schema | |
149 | del batch | |
150 | ||
151 | c_schema = ffi.new("struct ArrowSchema*") | |
152 | c_batch = ffi.new("struct ArrowArray*") | |
153 | ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | |
154 | ptr_batch = int(ffi.cast("uintptr_t", c_batch)) | |
155 | ||
156 | cgotest.importThenExportRecord(self.ptr_schema, self.ptr_array, | |
157 | ptr_schema, ptr_batch) | |
158 | batch_new = pa.RecordBatch._import_from_c(ptr_batch, ptr_schema) | |
159 | assert batch_new == self.make_batch() | |
160 | del batch_new | |
161 | del c_schema | |
162 | del c_batch | |
163 | ||
164 | ||
165 | if __name__ == '__main__': | |
166 | unittest.main(verbosity=2) |