]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/julia/Arrow/src/utils.jl
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / julia / Arrow / src / utils.jl
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # Determines the total number of bytes needed to store `n` bytes with padding.
18 # Note that the Arrow standard requires buffers to be aligned to 8-byte boundaries.
19 padding(n::Integer, alignment) = ((n + alignment - 1) ÷ alignment) * alignment
20
21 paddinglength(n::Integer, alignment) = padding(n, alignment) - n
22
23 function writezeros(io::IO, n::Integer)
24 s = 0
25 for i ∈ 1:n
26 s += Base.write(io, 0x00)
27 end
28 s
29 end
30
31 # efficient writing of arrays
32 writearray(io, col) = writearray(io, maybemissing(eltype(col)), col)
33
34 function writearray(io::IO, ::Type{T}, col) where {T}
35 if col isa Vector{T}
36 n = Base.write(io, col)
37 elseif isbitstype(T) && (col isa Vector{Union{T, Missing}} || col isa SentinelVector{T, T, Missing, Vector{T}})
38 # need to write the non-selector bytes of isbits Union Arrays
39 n = Base.unsafe_write(io, pointer(col), sizeof(T) * length(col))
40 elseif col isa ChainedVector
41 n = 0
42 for A in col.arrays
43 n += writearray(io, T, A)
44 end
45 else
46 n = 0
47 data = Vector{UInt8}(undef, sizeof(col))
48 buf = IOBuffer(data; write=true)
49 for x in col
50 n += Base.write(buf, coalesce(x, ArrowTypes.default(T)))
51 end
52 n = Base.write(io, take!(buf))
53 end
54 return n
55 end
56
57 getbit(v::UInt8, n::Integer) = Bool((v & 0x02^(n - 1)) >> (n - 1))
58
59 function setbit(v::UInt8, b::Bool, n::Integer)
60 if b
61 v | 0x02^(n - 1)
62 else
63 v & (0xff ⊻ 0x02^(n - 1))
64 end
65 end
66
67 # Determines the number of bytes used by `n` bits, optionally with padding.
68 function bitpackedbytes(n::Integer, alignment)
69 ℓ = cld(n, 8)
70 return ℓ + paddinglength(ℓ, alignment)
71 end
72
73 # count # of missing elements in an iterable
74 nullcount(col) = count(ismissing, col)
75
76 # like startswith/endswith for strings, but on byte buffers
77 function _startswith(a::AbstractVector{UInt8}, pos::Integer, b::AbstractVector{UInt8})
78 for i = 1:length(b)
79 @inbounds check = a[pos + i - 1] == b[i]
80 check || return false
81 end
82 return true
83 end
84
85 function _endswith(a::AbstractVector{UInt8}, endpos::Integer, b::AbstractVector{UInt8})
86 aoff = endpos - length(b) + 1
87 for i = 1:length(b)
88 @inbounds check = a[aoff] == b[i]
89 check || return false
90 aoff += 1
91 end
92 return true
93 end
94
95 # read a single element from a byte vector
96 # copied from read(::IOBuffer, T) in Base
97 function readbuffer(t::AbstractVector{UInt8}, pos::Integer, ::Type{T}) where {T}
98 GC.@preserve t begin
99 ptr::Ptr{T} = pointer(t, pos)
100 x = unsafe_load(ptr)
101 end
102 end
103
104 # given a number of unique values; what dict encoding _index_ type is most appropriate
105 encodingtype(n) = n < div(typemax(Int8), 2) ? Int8 : n < div(typemax(Int16), 2) ? Int16 : n < div(typemax(Int32), 2) ? Int32 : Int64
106
107 # lazily call convert(T, x) on getindex for each x in data
108 struct Converter{T, A} <: AbstractVector{T}
109 data::A
110 end
111
112 converter(::Type{T}, x::A) where {T, A} = Converter{eltype(A) >: Missing ? Union{T, Missing} : T, A}(x)
113 converter(::Type{T}, x::ChainedVector{A}) where {T, A} = ChainedVector([converter(T, x) for x in x.arrays])
114
115 Base.IndexStyle(::Type{<:Converter}) = Base.IndexLinear()
116 Base.size(x::Converter) = (length(x.data),)
117 Base.eltype(x::Converter{T, A}) where {T, A} = T
118 Base.getindex(x::Converter{T}, i::Int) where {T} = ArrowTypes.arrowconvert(T, getindex(x.data, i))
119
120 maybemissing(::Type{T}) where {T} = T === Missing ? Missing : Base.nonmissingtype(T)
121
122 function getfooter(filebytes)
123 len = readbuffer(filebytes, length(filebytes) - 9, Int32)
124 FlatBuffers.getrootas(Meta.Footer, filebytes[end-(9 + len):end-10], 0)
125 end
126
127 function getrb(filebytes)
128 f = getfooter(filebytes)
129 rb = f.recordBatches[1]
130 return filebytes[rb.offset+1:(rb.offset+1+rb.metaDataLength)]
131 # FlatBuffers.getrootas(Meta.Message, filebytes, rb.offset)
132 end
133
134 function readmessage(filebytes, off=9)
135 @assert readbuffer(filebytes, off, UInt32) === 0xFFFFFFFF
136 len = readbuffer(filebytes, off + 4, Int32)
137
138 FlatBuffers.getrootas(Meta.Message, filebytes, off + 8)
139 end
140
141 # a custom Channel type that only allows put!-ing objects in a specific, monotonically increasing order
142 struct OrderedChannel{T}
143 chan::Channel{T}
144 cond::Threads.Condition
145 i::Ref{Int}
146 end
147
148 OrderedChannel{T}(sz) where {T} = OrderedChannel{T}(Channel{T}(sz), Threads.Condition(), Ref(1))
149 Base.iterate(ch::OrderedChannel, st...) = iterate(ch.chan, st...)
150
151 macro lock(obj, expr)
152 esc(quote
153 lock($obj)
154 try
155 $expr
156 finally
157 unlock($obj)
158 end
159 end)
160 end
161
162 # when put!-ing an object, operation may have to wait until other tasks have put their
163 # objects to ensure the channel is ordered correctly
164 function Base.put!(ch::OrderedChannel{T}, x::T, i::Integer, incr::Bool=false) where {T}
165 @lock ch.cond begin
166 while ch.i[] < i
167 # channel index too early, need to wait for other tasks to put their objects first
168 wait(ch.cond)
169 end
170 # now it's our turn
171 put!(ch.chan, x)
172 if incr
173 ch.i[] += 1
174 end
175 # wake up tasks that may be waiting to put their objects
176 notify(ch.cond)
177 end
178 return
179 end
180
181 function Base.close(ch::OrderedChannel)
182 @lock ch.cond begin
183 # just need to ensure any tasks waiting to put their tasks have had a chance to put
184 while Base.n_waiters(ch.cond) > 0
185 wait(ch.cond)
186 end
187 close(ch.chan)
188 end
189 return
190 end
191
192 struct Lockable{T}
193 x::T
194 lock::ReentrantLock
195 end
196
197 Lockable(x::T) where {T} = Lockable{T}(x, ReentrantLock())
198
199 Base.lock(x::Lockable) = lock(x.lock)
200 Base.unlock(x::Lockable) = unlock(x.lock)