]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/julia/Arrow/test/pyarrow_roundtrip.jl
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / julia / Arrow / test / pyarrow_roundtrip.jl
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 ENV["PYTHON"] = "python3"
18 import PyCall
19 pa = PyCall.pyimport("pyarrow")
20 include(joinpath(dirname(pathof(Arrow)), "../test/testtables.jl"))
21
22 for (nm, t, writekw, readkw, extratests) in testtables
23 nm == "unions" && continue
24 println("pyarrow roundtrip: $nm")
25 io = IOBuffer()
26 Arrow.write(io, t; writekw...)
27 seekstart(io)
28 buf = PyCall.pybytes(take!(io))
29 reader = pa.ipc.open_stream(buf)
30 sink = pa.BufferOutputStream()
31 writer = pa.ipc.new_stream(sink, reader.schema)
32 for batch in reader
33 writer.write_batch(batch)
34 end
35 writer.close()
36 buf = sink.getvalue()
37 jbytes = copy(reinterpret(UInt8, buf))
38 tt = Arrow.Table(jbytes)
39 println("pyarrow roundtrip w/ compression: $nm")
40 io = IOBuffer()
41 Arrow.write(io, t; compress=((:lz4, :zstd)[rand(1:2)]), writekw...)
42 seekstart(io)
43 buf = PyCall.pybytes(take!(io))
44 reader = pa.ipc.open_stream(buf)
45 sink = pa.BufferOutputStream()
46 writer = pa.ipc.new_stream(sink, reader.schema)
47 for batch in reader
48 writer.write_batch(batch)
49 end
50 writer.close()
51 buf = sink.getvalue()
52 jbytes = copy(reinterpret(UInt8, buf))
53 tt = Arrow.Table(jbytes)
54 end
55
56 f1 = pa.field("f1", pa.float64(), true)
57 f2 = pa.field("f2", pa.int64(), false)
58 fu = pa.field("col1", pa.union([f1, f2], "dense"))
59 sch = pa.schema([fu])
60
61 xs = pa.array([2.0, 4.0, PyCall.pynothing[]], type=pa.float64())
62 ys = pa.array([1, 3], type=pa.int64())
63 types = pa.array([0, 1, 0, 1, 1], type=pa.int8())
64 offsets = pa.array([0, 0, 1, 1, 2], type=pa.int32())
65 union_arr = pa.UnionArray.from_dense(types, offsets, [xs, ys])
66 data = [union_arr]
67 batch = pa.record_batch(data, names=["col1"])
68 sink = pa.BufferOutputStream()
69 writer = pa.ipc.new_stream(sink, batch.schema)
70 writer.write_batch(batch)
71 writer.close()
72 buf = sink.getvalue()
73 jbytes = copy(reinterpret(UInt8, buf))
74 tt = Arrow.Table(jbytes)