]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | # Licensed to the Apache Software Foundation (ASF) under one |
2 | # or more contributor license agreements. See the NOTICE file | |
3 | # distributed with this work for additional information | |
4 | # regarding copyright ownership. The ASF licenses this file | |
5 | # to you under the Apache License, Version 2.0 (the | |
6 | # "License"); you may not use this file except in compliance | |
7 | # with the License. You may obtain a copy of the License at | |
8 | # | |
9 | # http://www.apache.org/licenses/LICENSE-2.0 | |
10 | # | |
11 | # Unless required by applicable law or agreed to in writing, software | |
12 | # distributed under the License is distributed on an "AS IS" BASIS, | |
13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | # See the License for the specific language governing permissions and | |
15 | # limitations under the License. | |
16 | ||
17 | """ | |
18 | Arrow.DictEncoding | |
19 | ||
20 | Represents the "pool" of possible values for a [`DictEncoded`](@ref) | |
21 | array type. Whether the order of values is significant can be checked | |
22 | by looking at the `isOrdered` boolean field. | |
23 | """ | |
24 | mutable struct DictEncoding{T, A} <: ArrowVector{T} | |
25 | id::Int64 | |
26 | data::A | |
27 | isOrdered::Bool | |
28 | metadata::Union{Nothing, Dict{String, String}} | |
29 | end | |
30 | ||
31 | Base.size(d::DictEncoding) = size(d.data) | |
32 | ||
33 | @propagate_inbounds function Base.getindex(d::DictEncoding{T}, i::Integer) where {T} | |
34 | @boundscheck checkbounds(d, i) | |
35 | return @inbounds ArrowTypes.arrowconvert(T, d.data[i]) | |
36 | end | |
37 | ||
38 | # convenience wrapper to signal that an input column should be | |
39 | # dict encoded when written to the arrow format | |
40 | struct DictEncodeType{T} end | |
41 | getT(::Type{DictEncodeType{T}}) where {T} = T | |
42 | ||
43 | """ | |
44 | Arrow.DictEncode(::AbstractVector, id::Integer=nothing) | |
45 | ||
46 | Signals that a column/array should be dictionary encoded when serialized | |
47 | to the arrow streaming/file format. An optional `id` number may be provided | |
48 | to signal that multiple columns should use the same pool when being | |
49 | dictionary encoded. | |
50 | """ | |
51 | struct DictEncode{T, A} <: AbstractVector{DictEncodeType{T}} | |
52 | id::Int64 | |
53 | data::A | |
54 | end | |
55 | ||
56 | DictEncode(x::A, id=-1) where {A} = DictEncode{eltype(A), A}(id, x) | |
57 | Base.IndexStyle(::Type{<:DictEncode}) = Base.IndexLinear() | |
58 | Base.size(x::DictEncode) = (length(x.data),) | |
59 | Base.iterate(x::DictEncode, st...) = iterate(x.data, st...) | |
60 | Base.getindex(x::DictEncode, i::Int) = getindex(x.data, i) | |
61 | ArrowTypes.ArrowType(::Type{<:DictEncodeType}) = DictEncodedType() | |
62 | ||
63 | """ | |
64 | Arrow.DictEncoded | |
65 | ||
66 | A dictionary encoded array type (similar to a `PooledArray`). Behaves just | |
67 | like a normal array in most respects; internally, possible values are stored | |
68 | in the `encoding::DictEncoding` field, while the `indices::Vector{<:Integer}` | |
69 | field holds the "codes" of each element for indexing into the encoding pool. | |
70 | Any column/array can be dict encoding when serializing to the arrow format | |
71 | either by passing the `dictencode=true` keyword argument to [`Arrow.write`](@ref) | |
72 | (which causes _all_ columns to be dict encoded), or wrapping individual columns/ | |
73 | arrays in [`Arrow.DictEncode(x)`](@ref). | |
74 | """ | |
75 | struct DictEncoded{T, S, A} <: ArrowVector{T} | |
76 | arrow::Vector{UInt8} # need to hold a reference to arrow memory blob | |
77 | validity::ValidityBitmap | |
78 | indices::Vector{S} | |
79 | encoding::DictEncoding{T, A} | |
80 | metadata::Union{Nothing, Dict{String, String}} | |
81 | end | |
82 | ||
83 | DictEncoded(b::Vector{UInt8}, v::ValidityBitmap, inds::Vector{S}, encoding::DictEncoding{T, A}, meta) where {S, T, A} = | |
84 | DictEncoded{T, S, A}(b, v, inds, encoding, meta) | |
85 | ||
86 | Base.size(d::DictEncoded) = size(d.indices) | |
87 | ||
88 | isdictencoded(d::DictEncoded) = true | |
89 | isdictencoded(x) = false | |
90 | isdictencoded(c::Compressed{Z, A}) where {Z, A <: DictEncoded} = true | |
91 | ||
92 | signedtype(::Type{UInt8}) = Int8 | |
93 | signedtype(::Type{UInt16}) = Int16 | |
94 | signedtype(::Type{UInt32}) = Int32 | |
95 | signedtype(::Type{UInt64}) = Int64 | |
96 | ||
97 | indtype(d::DictEncoded{T, S, A}) where {T, S, A} = S | |
98 | indtype(c::Compressed{Z, A}) where {Z, A <: DictEncoded} = indtype(c.data) | |
99 | ||
100 | dictencodeid(colidx, nestedlevel, fieldid) = (Int64(nestedlevel) << 48) | (Int64(fieldid) << 32) | Int64(colidx) | |
101 | ||
102 | getid(d::DictEncoded) = d.encoding.id | |
103 | getid(c::Compressed{Z, A}) where {Z, A <: DictEncoded} = c.data.encoding.id | |
104 | ||
105 | arrowvector(::DictEncodedType, x::DictEncoded, i, nl, fi, de, ded, meta; kw...) = x | |
106 | ||
107 | function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode::Bool=false, dictencodenested::Bool=false, kw...) | |
108 | @assert x isa DictEncode | |
109 | id = x.id == -1 ? dictencodeid(i, nl, fi) : x.id | |
110 | x = x.data | |
111 | len = length(x) | |
112 | validity = ValidityBitmap(x) | |
113 | if !haskey(de, id) | |
114 | # dict encoding doesn't exist yet, so create for 1st time | |
115 | if DataAPI.refarray(x) === x | |
116 | # need to encode ourselves | |
117 | x = PooledArray(x, encodingtype(length(x))) | |
118 | inds = DataAPI.refarray(x) | |
119 | else | |
120 | inds = copy(DataAPI.refarray(x)) | |
121 | end | |
122 | # adjust to "offset" instead of index | |
123 | for i = 1:length(inds) | |
124 | @inbounds inds[i] -= 1 | |
125 | end | |
126 | pool = DataAPI.refpool(x) | |
127 | # horrible hack? yes. better than taking CategoricalArrays dependency? also yes. | |
128 | if typeof(pool).name.name == :CategoricalRefPool | |
129 | pool = [get(pool[i]) for i = 1:length(pool)] | |
130 | end | |
131 | data = arrowvector(pool, i, nl, fi, de, ded, nothing; dictencode=dictencodenested, dictencodenested=dictencodenested, dictencoding=true, kw...) | |
132 | encoding = DictEncoding{eltype(data), typeof(data)}(id, data, false, getmetadata(data)) | |
133 | de[id] = Lockable(encoding) | |
134 | else | |
135 | # encoding already exists | |
136 | # compute inds based on it | |
137 | # if value doesn't exist in encoding, push! it | |
138 | # also add to deltas updates | |
139 | encodinglockable = de[id] | |
140 | @lock encodinglockable begin | |
141 | encoding = encodinglockable.x | |
142 | len = length(x) | |
143 | ET = encodingtype(len) | |
144 | pool = Dict{Union{eltype(encoding), eltype(x)}, ET}(a => (b - 1) for (b, a) in enumerate(encoding)) | |
145 | deltas = eltype(x)[] | |
146 | inds = Vector{ET}(undef, len) | |
147 | categorical = typeof(x).name.name == :CategoricalArray | |
148 | for (j, val) in enumerate(x) | |
149 | if categorical | |
150 | val = get(val) | |
151 | end | |
152 | @inbounds inds[j] = get!(pool, val) do | |
153 | push!(deltas, val) | |
154 | length(pool) | |
155 | end | |
156 | end | |
157 | if !isempty(deltas) | |
158 | data = arrowvector(deltas, i, nl, fi, de, ded, nothing; dictencode=dictencodenested, dictencodenested=dictencodenested, dictencoding=true, kw...) | |
159 | push!(ded, DictEncoding{eltype(data), typeof(data)}(id, data, false, getmetadata(data))) | |
160 | if typeof(encoding.data) <: ChainedVector | |
161 | append!(encoding.data, data) | |
162 | else | |
163 | data2 = ChainedVector([encoding.data, data]) | |
164 | encoding = DictEncoding{eltype(data2), typeof(data2)}(id, data2, false, getmetadata(encoding)) | |
165 | de[id] = Lockable(encoding) | |
166 | end | |
167 | end | |
168 | end | |
169 | end | |
170 | if meta !== nothing && getmetadata(encoding) !== nothing | |
171 | merge!(meta, getmetadata(encoding)) | |
172 | elseif getmetadata(encoding) !== nothing | |
173 | meta = getmetadata(encoding) | |
174 | end | |
175 | return DictEncoded(UInt8[], validity, inds, encoding, meta) | |
176 | end | |
177 | ||
178 | @propagate_inbounds function Base.getindex(d::DictEncoded, i::Integer) | |
179 | @boundscheck checkbounds(d, i) | |
180 | @inbounds valid = d.validity[i] | |
181 | !valid && return missing | |
182 | @inbounds idx = d.indices[i] | |
183 | return @inbounds d.encoding[idx + 1] | |
184 | end | |
185 | ||
186 | @propagate_inbounds function Base.setindex!(d::DictEncoded{T}, v, i::Integer) where {T} | |
187 | @boundscheck checkbounds(d, i) | |
188 | if v === missing | |
189 | @inbounds d.validity[i] = false | |
190 | else | |
191 | ix = findfirst(d.encoding.data, v) | |
192 | if ix === nothing | |
193 | push!(d.encoding.data, v) | |
194 | @inbounds d.indices[i] = length(d.encoding.data) - 1 | |
195 | else | |
196 | @inbounds d.indices[i] = ix - 1 | |
197 | end | |
198 | end | |
199 | return v | |
200 | end | |
201 | ||
202 | function Base.copy(x::DictEncoded{T, S}) where {T, S} | |
203 | pool = copy(x.encoding.data) | |
204 | valid = x.validity | |
205 | inds = x.indices | |
206 | refs = copy(inds) | |
207 | @inbounds for i = 1:length(inds) | |
208 | refs[i] = refs[i] + one(S) | |
209 | end | |
210 | return PooledArray(PooledArrays.RefArray(refs), Dict{T, S}(val => i for (i, val) in enumerate(pool)), pool) | |
211 | end | |
212 | ||
213 | function compress(Z::Meta.CompressionType, comp, x::A) where {A <: DictEncoded} | |
214 | len = length(x) | |
215 | nc = nullcount(x) | |
216 | validity = compress(Z, comp, x.validity) | |
217 | inds = compress(Z, comp, x.indices) | |
218 | return Compressed{Z, A}(x, [validity, inds], len, nc, Compressed[]) | |
219 | end | |
220 | ||
221 | function makenodesbuffers!(col::DictEncoded{T, S}, fieldnodes, fieldbuffers, bufferoffset, alignment) where {T, S} | |
222 | len = length(col) | |
223 | nc = nullcount(col) | |
224 | push!(fieldnodes, FieldNode(len, nc)) | |
225 | @debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)" | |
226 | # validity bitmap | |
227 | blen = nc == 0 ? 0 : bitpackedbytes(len, alignment) | |
228 | push!(fieldbuffers, Buffer(bufferoffset, blen)) | |
229 | @debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))" | |
230 | bufferoffset += blen | |
231 | # indices | |
232 | blen = sizeof(S) * len | |
233 | push!(fieldbuffers, Buffer(bufferoffset, blen)) | |
234 | @debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))" | |
235 | bufferoffset += padding(blen, alignment) | |
236 | return bufferoffset | |
237 | end | |
238 | ||
239 | function writebuffer(io, col::DictEncoded, alignment) | |
240 | @debug 1 "writebuffer: col = $(typeof(col))" | |
241 | @debug 2 col | |
242 | writebitmap(io, col, alignment) | |
243 | # write indices | |
244 | n = writearray(io, col.indices) | |
245 | @debug 1 "writing array: col = $(typeof(col.indices)), n = $n, padded = $(padding(n, alignment))" | |
246 | writezeros(io, paddinglength(n, alignment)) | |
247 | return | |
248 | end |