]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/julia/Arrow/test/arrowjson.jl
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / julia / Arrow / test / arrowjson.jl
diff --git a/ceph/src/arrow/julia/Arrow/test/arrowjson.jl b/ceph/src/arrow/julia/Arrow/test/arrowjson.jl
new file mode 100644 (file)
index 0000000..6e9bccf
--- /dev/null
@@ -0,0 +1,611 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module ArrowJSON
+
+using Mmap
+using StructTypes, JSON3, Tables, SentinelArrays, Arrow
+
+# read json files as "table"
+# write to arrow stream/file
+# read arrow stream/file back
+
+abstract type Type end
+Type() = Null("null")
+StructTypes.StructType(::Base.Type{Type}) = StructTypes.AbstractType()
+
+children(::Base.Type{T}) where {T} = Field[]
+
+mutable struct Int <: Type
+    name::String
+    bitWidth::Int64
+    isSigned::Base.Bool
+end
+
+Int() = Int("", 0, true)
+Type(::Base.Type{T}) where {T <: Integer} = Int("int", 8 * sizeof(T), T <: Signed)
+StructTypes.StructType(::Base.Type{Int}) = StructTypes.Mutable()
+function juliatype(f, x::Int)
+    T = x.bitWidth == 8 ? Int8 : x.bitWidth == 16 ? Int16 :
+        x.bitWidth == 32 ? Int32 : x.bitWidth == 64 ? Int64 : Int128
+    return x.isSigned ? T : unsigned(T)
+end
+
+struct FloatingPoint <: Type
+    name::String
+    precision::String
+end
+
+Type(::Base.Type{T}) where {T <: AbstractFloat} = FloatingPoint("floatingpoint", T == Float16 ? "HALF" : T == Float32 ? "SINGLE" : "DOUBLE")
+StructTypes.StructType(::Base.Type{FloatingPoint}) = StructTypes.Struct()
+juliatype(f, x::FloatingPoint) = x.precision == "HALF" ? Float16 : x.precision == "SINGLE" ? Float32 : Float64
+
+struct FixedSizeBinary <: Type
+    name::String
+    byteWidth::Int64
+end
+
+Type(::Base.Type{NTuple{N, UInt8}}) where {N} = FixedSizeBinary("fixedsizebinary", N)
+children(::Base.Type{NTuple{N, UInt8}}) where {N} = Field[]
+StructTypes.StructType(::Base.Type{FixedSizeBinary}) = StructTypes.Struct()
+juliatype(f, x::FixedSizeBinary) = NTuple{x.byteWidth, UInt8}
+
+struct Decimal <: Type
+    name::String
+    precision::Int32
+    scale::Int32
+end
+
+Type(::Base.Type{Arrow.Decimal{P, S, T}}) where {P, S, T} = Decimal("decimal", P, S)
+StructTypes.StructType(::Base.Type{Decimal}) = StructTypes.Struct()
+juliatype(f, x::Decimal) = Arrow.Decimal{x.precision, x.scale, Int128}
+
+mutable struct Timestamp <: Type
+    name::String
+    unit::String
+    timezone::Union{Nothing ,String}
+end
+
+Timestamp() = Timestamp("", "", nothing)
+unit(U) = U == Arrow.Meta.TimeUnit.SECOND ? "SECOND" :
+          U == Arrow.Meta.TimeUnit.MILLISECOND ? "MILLISECOND" :
+          U == Arrow.Meta.TimeUnit.MICROSECOND ? "MICROSECOND" : "NANOSECOND"
+Type(::Base.Type{Arrow.Timestamp{U, TZ}}) where {U, TZ} = Timestamp("timestamp", unit(U), TZ === nothing ? nothing : String(TZ))
+StructTypes.StructType(::Base.Type{Timestamp}) = StructTypes.Mutable()
+unitT(u) = u == "SECOND" ? Arrow.Meta.TimeUnit.SECOND :
+           u == "MILLISECOND" ? Arrow.Meta.TimeUnit.MILLISECOND :
+           u == "MICROSECOND" ? Arrow.Meta.TimeUnit.MICROSECOND : Arrow.Meta.TimeUnit.NANOSECOND
+juliatype(f, x::Timestamp) = Arrow.Timestamp{unitT(x.unit), x.timezone === nothing ? nothing : Symbol(x.timezone)}
+
+struct Duration <: Type
+    name::String
+    unit::String
+end
+
+Type(::Base.Type{Arrow.Duration{U}}) where {U} = Duration("duration", unit(U))
+StructTypes.StructType(::Base.Type{Duration}) = StructTypes.Struct()
+juliatype(f, x::Duration) = Arrow.Duration{unit%(x.unit)}
+
+struct Date <: Type
+    name::String
+    unit::String
+end
+
+Type(::Base.Type{Arrow.Date{U, T}}) where {U, T} = Date("date", U == Arrow.Meta.DateUnit.DAY ? "DAY" : "MILLISECOND")
+StructTypes.StructType(::Base.Type{Date}) = StructTypes.Struct()
+juliatype(f, x::Date) = Arrow.Date{x.unit == "DAY" ? Arrow.Meta.DateUnit.DAY : Arrow.Meta.DateUnit.MILLISECOND, x.unit == "DAY" ? Int32 : Int64}
+
+struct Time <: Type
+    name::String
+    unit::String
+    bitWidth::Int64
+end
+
+Type(::Base.Type{Arrow.Time{U, T}}) where {U, T} = Time("time", unit(U), 8 * sizeof(T))
+StructTypes.StructType(::Base.Type{Time}) = StructTypes.Struct()
+juliatype(f, x::Time) = Arrow.Time{unitT(x.unit), x.unit == "SECOND" || x.unit == "MILLISECOND" ? Int32 : Int64}
+
+struct Interval <: Type
+    name::String
+    unit::String
+end
+
+Type(::Base.Type{Arrow.Interval{U, T}}) where {U, T} = Interval("interval", U == Arrow.Meta.IntervalUnit.YEAR_MONTH ? "YEAR_MONTH" : "DAY_TIME")
+StructTypes.StructType(::Base.Type{Interval}) = StructTypes.Struct()
+juliatype(f, x::Interval) = Arrow.Interval{x.unit == "YEAR_MONTH" ? Arrow.Meta.IntervalUnit.YEAR_MONTH : Arrow.Meta.IntervalUnit.DAY_TIME, x.unit == "YEAR_MONTH" ? Int32 : Int64}
+
+struct UnionT <: Type
+    name::String
+    mode::String
+    typIds::Vector{Int64}
+end
+
+Type(::Base.Type{Arrow.UnionT{T, typeIds, U}}) where {T, typeIds, U} = UnionT("union", T == Arrow.Meta.UnionMode.Dense ? "DENSE" : "SPARSE", collect(typeIds))
+children(::Base.Type{Arrow.UnionT{T, typeIds, U}}) where {T, typeIds, U} = Field[Field("", fieldtype(U, i), nothing) for i = 1:fieldcount(U)]
+StructTypes.StructType(::Base.Type{UnionT}) = StructTypes.Struct()
+juliatype(f, x::UnionT) = Arrow.UnionT{x.mode == "DENSE" ? Arrow.Meta.UnionMode.DENSE : Arrow.Meta.UnionMode.SPARSE, Tuple(x.typeIds), Tuple{(juliatype(y) for y in f.children)...}}
+
+struct List <: Type
+    name::String
+end
+
+Type(::Base.Type{Vector{T}}) where {T} = List("list")
+children(::Base.Type{Vector{T}}) where {T} = [Field("item", T, nothing)]
+StructTypes.StructType(::Base.Type{List}) = StructTypes.Struct()
+juliatype(f, x::List) = Vector{juliatype(f.children[1])}
+
+struct LargeList <: Type
+    name::String
+end
+
+StructTypes.StructType(::Base.Type{LargeList}) = StructTypes.Struct()
+juliatype(f, x::LargeList) = Vector{juliatype(f.children[1])}
+
+struct FixedSizeList <: Type
+    name::String
+    listSize::Int64
+end
+
+Type(::Base.Type{NTuple{N, T}}) where {N, T} = FixedSizeList("fixedsizelist", N)
+children(::Base.Type{NTuple{N, T}}) where {N, T} = [Field("item", T, nothing)]
+StructTypes.StructType(::Base.Type{FixedSizeList}) = StructTypes.Struct()
+juliatype(f, x::FixedSizeList) = NTuple{x.listSize, juliatype(f.children[1])}
+
+struct Struct <: Type
+    name::String
+end
+
+Type(::Base.Type{NamedTuple{names, types}}) where {names, types} = Struct("struct")
+children(::Base.Type{NamedTuple{names, types}}) where {names, types} = [Field(names[i], fieldtype(types, i), nothing) for i = 1:length(names)]
+StructTypes.StructType(::Base.Type{Struct}) = StructTypes.Struct()
+juliatype(f, x::Struct) = NamedTuple{Tuple(Symbol(x.name) for x in f.children), Tuple{(juliatype(y) for y in f.children)...}}
+
+struct Map <: Type
+    name::String
+    keysSorted::Base.Bool
+end
+
+Type(::Base.Type{Dict{K, V}}) where {K, V} = Map("map", false)
+children(::Base.Type{Dict{K, V}}) where {K, V} = [Field("entries", Arrow.KeyValue{K, V}, nothing)]
+StructTypes.StructType(::Base.Type{Map}) = StructTypes.Struct()
+juliatype(f, x::Map) = Dict{juliatype(f.children[1].children[1]), juliatype(f.children[1].children[2])}
+
+Type(::Base.Type{Arrow.KeyValue{K, V}}) where {K, V} = Struct("struct")
+children(::Base.Type{Arrow.KeyValue{K, V}}) where {K, V} = [Field("key", K, nothing), Field("value", V, nothing)]
+
+struct Null <: Type
+    name::String
+end
+
+Type(::Base.Type{Missing}) = Null("null")
+StructTypes.StructType(::Base.Type{Null}) = StructTypes.Struct()
+juliatype(f, x::Null) = Missing
+
+struct Utf8 <: Type
+    name::String
+end
+
+Type(::Base.Type{<:String}) = Utf8("utf8")
+StructTypes.StructType(::Base.Type{Utf8}) = StructTypes.Struct()
+juliatype(f, x::Utf8) = String
+
+struct LargeUtf8 <: Type
+    name::String
+end
+
+StructTypes.StructType(::Base.Type{LargeUtf8}) = StructTypes.Struct()
+juliatype(f, x::LargeUtf8) = String
+
+struct Binary <: Type
+    name::String
+end
+
+Type(::Base.Type{Vector{UInt8}}) = Binary("binary")
+children(::Base.Type{Vector{UInt8}}) = Field[]
+StructTypes.StructType(::Base.Type{Binary}) = StructTypes.Struct()
+juliatype(f, x::Binary) = Vector{UInt8}
+
+struct LargeBinary <: Type
+    name::String
+end
+
+StructTypes.StructType(::Base.Type{LargeBinary}) = StructTypes.Struct()
+juliatype(f, x::LargeBinary) = Vector{UInt8}
+
+struct Bool <: Type
+    name::String
+end
+
+Type(::Base.Type{Base.Bool}) = Bool("bool")
+StructTypes.StructType(::Base.Type{Bool}) = StructTypes.Struct()
+juliatype(f, x::Bool) = Base.Bool
+
+StructTypes.subtypekey(::Base.Type{Type}) = :name
+
+const SUBTYPES = @eval (
+    int=Int,
+    floatingpoint=FloatingPoint,
+    fixedsizebinary=FixedSizeBinary,
+    decimal=Decimal,
+    timestamp=Timestamp,
+    duration=Duration,
+    date=Date,
+    time=Time,
+    interval=Interval,
+    union=UnionT,
+    list=List,
+    largelist=LargeList,
+    fixedsizelist=FixedSizeList,
+    $(Symbol("struct"))=Struct,
+    map=Map,
+    null=Null,
+    utf8=Utf8,
+    largeutf8=LargeUtf8,
+    binary=Binary,
+    largebinary=LargeBinary,
+    bool=Bool
+)
+
+StructTypes.subtypes(::Base.Type{Type}) = SUBTYPES
+
+const Metadata = Union{Nothing, Vector{NamedTuple{(:key, :value), Tuple{String, String}}}}
+Metadata() = nothing
+
+mutable struct DictEncoding
+    id::Int64
+    indexType::Type
+    isOrdered::Base.Bool
+end
+
+DictEncoding() = DictEncoding(0, Type(), false)
+StructTypes.StructType(::Base.Type{DictEncoding}) = StructTypes.Mutable()
+
+mutable struct Field
+    name::String
+    nullable::Base.Bool
+    type::Type
+    children::Vector{Field}
+    dictionary::Union{DictEncoding, Nothing}
+    metadata::Metadata
+end
+
+Field() = Field("", true, Type(), Field[], nothing, Metadata())
+StructTypes.StructType(::Base.Type{Field}) = StructTypes.Mutable()
+Base.copy(f::Field) = Field(f.name, f.nullable, f.type, f.children, f.dictionary, f.metadata)
+
+function juliatype(f::Field)
+    T = juliatype(f, f.type)
+    return f.nullable ? Union{T, Missing} : T
+end
+
+function Field(nm, ::Base.Type{T}, dictencodings) where {T}
+    S = Arrow.maybemissing(T)
+    type = Type(S)
+    ch = children(S)
+    if dictencodings !== nothing && haskey(dictencodings, nm)
+        dict = dictencodings[nm]
+    else
+        dict = nothing
+    end
+    return Field(nm, T !== S, type, ch, dict, nothing)
+end
+
+mutable struct Schema
+    fields::Vector{Field}
+    metadata::Metadata
+end
+
+Schema() = Schema(Field[], Metadata())
+StructTypes.StructType(::Base.Type{Schema}) = StructTypes.Mutable()
+
+struct Offsets{T} <: AbstractVector{T}
+    data::Vector{T}
+end
+
+Base.size(x::Offsets) = size(x.data)
+Base.getindex(x::Offsets, i::Base.Int) = getindex(x.data, i)
+
+mutable struct FieldData
+    name::String
+    count::Int64
+    VALIDITY::Union{Nothing, Vector{Int8}}
+    OFFSET::Union{Nothing, Offsets}
+    TYPE_ID::Union{Nothing, Vector{Int8}}
+    DATA::Union{Nothing, Vector{Any}}
+    children::Vector{FieldData}
+end
+
+FieldData() = FieldData("", 0, nothing, nothing, nothing, nothing, FieldData[])
+StructTypes.StructType(::Base.Type{FieldData}) = StructTypes.Mutable()
+
+function FieldData(nm, ::Base.Type{T}, col, dictencodings) where {T}
+    if dictencodings !== nothing && haskey(dictencodings, nm)
+        refvals = DataAPI.refarray(col.data)
+        if refvals !== col.data
+            IT = eltype(refvals)
+            col = (x - one(T) for x in refvals)
+        else
+            _, de = dictencodings[nm]
+            IT = de.indexType
+            vals = unique(col)
+            col = Arrow.DictEncoder(col, vals, Arrow.encodingtype(length(vals)))
+        end
+        return FieldData(nm, IT, col, nothing)
+    end
+    S = Arrow.maybemissing(T)
+    len = Arrow._length(col)
+    VALIDITY = OFFSET = TYPE_ID = DATA = nothing
+    children = FieldData[]
+    if S <: Pair
+        return FieldData(nm, Vector{Arrow.KeyValue{Arrow._keytype(S), Arrow._valtype(S)}}, (Arrow.KeyValue(k, v) for (k, v) in pairs(col)))
+    elseif S !== Missing
+        # VALIDITY
+        VALIDITY = Int8[!ismissing(x) for x in col]
+        # OFFSET
+        if S <: Vector || S == String
+            lenfun = S == String ? x->ismissing(x) ? 0 : sizeof(x) : x->ismissing(x) ? 0 : length(x)
+            tot = sum(lenfun, col)
+            if tot > 2147483647
+                OFFSET = String[String(lenfun(x)) for x in col]
+                pushfirst!(OFFSET, "0")
+            else
+                OFFSET = Int32[ismissing(x) ? 0 : lenfun(x) for x in col]
+                pushfirst!(OFFSET, 0)
+            end
+            OFFSET = Offsets(OFFSET)
+            push!(children, FieldData("item", eltype(S), Arrow.flatten(skipmissing(col)), dictencodings))
+        elseif S <: NTuple
+            if Arrow.ArrowTypes.gettype(S) == UInt8
+                DATA = [ismissing(x) ? Arrow.ArrowTypes.default(S) : String(collect(x)) for x in col]
+            else
+                push!(children, FieldData("item", Arrow.ArrowTypes.gettype(S), Arrow.flatten(coalesce(x, Arrow.ArrowTypes.default(S)) for x in col), dictencodings))
+            end
+        elseif S <: NamedTuple
+            for (nm, typ) in zip(fieldnames(S), fieldtypes(S))
+                push!(children, FieldData(String(nm), typ, (getfield(x, nm) for x in col), dictencodings))
+            end
+        elseif S <: Arrow.UnionT
+            U = eltype(S)
+            tids = Arrow.typeids(S) === nothing ? (0:fieldcount(U)) : Arrow.typeids(S)
+            TYPE_ID = [x === missing ? 0 : tids[Arrow.isatypeid(x, U)] for x in col]
+            if Arrow.unionmode(S) == Arrow.Meta.UnionMode.Dense
+                offs = zeros(Int32, fieldcount(U))
+                OFFSET = Int32[]
+                for x in col
+                    idx = x === missing ? 1 : Arrow.isatypeid(x, U)
+                    push!(OFFSET, offs[idx])
+                    offs[idx] += 1
+                end
+                for i = 1:fieldcount(U)
+                    SS = fieldtype(U, i)
+                    push!(children, FieldData("$i", SS, Arrow.filtered(i == 1 ? Union{SS, Missing} : Arrow.maybemissing(SS), col), dictencodings))
+                end
+            else
+                for i = 1:fieldcount(U)
+                    SS = fieldtype(U, i)
+                    push!(children, FieldData("$i", SS, Arrow.replaced(SS, col), dictencodings))
+                end
+            end
+        elseif S <: KeyValue
+            push!(children, FieldData("key", Arrow.keyvalueK(S), (x.key for x in col), dictencodings))
+            push!(children, FieldData("value", Arrow.keyvalueV(S), (x.value for x in col), dictencodings))
+        end
+    end
+    return FieldData(nm, len, VALIDITY, OFFSET, TYPE_ID, DATA, children)
+end
+
+mutable struct RecordBatch
+    count::Int64
+    columns::Vector{FieldData}
+end
+
+RecordBatch() = RecordBatch(0, FieldData[])
+StructTypes.StructType(::Base.Type{RecordBatch}) = StructTypes.Mutable()
+
+mutable struct DictionaryBatch
+    id::Int64
+    data::RecordBatch
+end
+
+DictionaryBatch() = DictionaryBatch(0, RecordBatch())
+StructTypes.StructType(::Base.Type{DictionaryBatch}) = StructTypes.Mutable()
+
+mutable struct DataFile <: Tables.AbstractColumns
+    schema::Schema
+    batches::Vector{RecordBatch}
+    dictionaries::Vector{DictionaryBatch}
+end
+
+Base.propertynames(x::DataFile) = (:schema, :batches, :dictionaries)
+
+function Base.getproperty(df::DataFile, nm::Symbol)
+    if nm === :schema
+        return getfield(df, :schema)
+    elseif nm === :batches
+        return getfield(df, :batches)
+    elseif nm === :dictionaries
+        return getfield(df, :dictionaries)
+    end
+    return Tables.getcolumn(df, nm)
+end
+
+DataFile() = DataFile(Schema(), RecordBatch[], DictionaryBatch[])
+StructTypes.StructType(::Base.Type{DataFile}) = StructTypes.Mutable()
+
+parsefile(file) = JSON3.read(Mmap.mmap(file), DataFile)
+
+# make DataFile satisfy Tables.jl interface
+function Tables.partitions(x::DataFile)
+    if isempty(x.batches)
+        # special case empty batches by producing a single DataFile w/ schema
+        return (DataFile(x.schema, RecordBatch[], x.dictionaries),)
+    else
+        return (DataFile(x.schema, [x.batches[i]], x.dictionaries) for i = 1:length(x.batches))
+    end
+end
+
+Tables.columns(x::DataFile) = x
+
+function Tables.schema(x::DataFile)
+    names = map(x -> x.name, x.schema.fields)
+    types = map(x -> juliatype(x), x.schema.fields)
+    return Tables.Schema(names, types)
+end
+
+Tables.columnnames(x::DataFile) =  map(x -> Symbol(x.name), x.schema.fields)
+
+function Tables.getcolumn(x::DataFile, i::Base.Int)
+    field = x.schema.fields[i]
+    type = juliatype(field)
+    return ChainedVector(ArrowArray{type}[ArrowArray{type}(field, length(x.batches) > 0 ? x.batches[j].columns[i] : FieldData(), x.dictionaries) for j = 1:length(x.batches)])
+end
+
+function Tables.getcolumn(x::DataFile, nm::Symbol)
+    i = findfirst(x -> x.name == String(nm), x.schema.fields)
+    return Tables.getcolumn(x, i)
+end
+
+struct ArrowArray{T} <: AbstractVector{T}
+    field::Field
+    fielddata::FieldData
+    dictionaries::Vector{DictionaryBatch}
+end
+ArrowArray(f::Field, fd::FieldData, d) = ArrowArray{juliatype(f)}(f, fd, d)
+Base.size(x::ArrowArray) = (x.fielddata.count,)
+
+function Base.getindex(x::ArrowArray{T}, i::Base.Int) where {T}
+    @boundscheck checkbounds(x, i)
+    S = Base.nonmissingtype(T)
+    if x.field.dictionary !== nothing
+        fielddata = x.dictionaries[findfirst(y -> y.id == x.field.dictionary.id, x.dictionaries)].data.columns[1]
+        field = copy(x.field)
+        field.dictionary = nothing
+        idx = x.fielddata.DATA[i] + 1
+        return ArrowArray(field, fielddata, x.dictionaries)[idx]
+    end
+    if T === Missing
+        return missing
+    elseif S <: UnionT
+        U = eltype(S)
+        tids = Arrow.typeids(S) === nothing ? (0:fieldcount(U)) : Arrow.typeids(S)
+        typeid = tids[x.fielddata.TYPE_ID[i]]
+        if Arrow.unionmode(S) == Arrow.Meta.UnionMode.DENSE
+            off = x.fielddata.OFFSET[i]
+            return ArrowArray(x.field.children[typeid+1], x.fielddata.children[typeid+1], x.dictionaries)[off]
+        else
+            return ArrowArray(x.field.children[typeid+1], x.fielddata.children[typeid+1], x.dictionaries)[i]
+        end
+    end
+    x.fielddata.VALIDITY[i] == 0 && return missing
+    if S <: Vector{UInt8}
+        return copy(codeunits(x.fielddata.DATA[i]))
+    elseif S <: String
+        return x.fielddata.DATA[i]
+    elseif S <: Vector
+        offs = x.fielddata.OFFSET
+        A = ArrowArray{eltype(S)}(x.field.children[1], x.fielddata.children[1], x.dictionaries)
+        return A[(offs[i] + 1):offs[i + 1]]
+    elseif S <: Dict
+        offs = x.fielddata.OFFSET
+        A = ArrowArray(x.field.children[1], x.fielddata.children[1], x.dictionaries)
+        return Dict(y.key => y.value for y in A[(offs[i] + 1):offs[i + 1]])
+    elseif S <: Tuple
+        if Arrow.ArrowTypes.gettype(S) == UInt8
+            A = x.fielddata.DATA
+            return Tuple(map(UInt8, collect(A[i][1:x.field.type.byteWidth])))
+        else
+            sz = x.field.type.listSize
+            A = ArrowArray{Arrow.ArrowTypes.gettype(S)}(x.field.children[1], x.fielddata.children[1], x.dictionaries)
+            off = (i - 1) * sz + 1
+            return Tuple(A[off:(off + sz - 1)])
+        end
+    elseif S <: NamedTuple
+        data = (ArrowArray(x.field.children[j], x.fielddata.children[j], x.dictionaries)[i] for j = 1:length(x.field.children))
+        return NamedTuple{fieldnames(S)}(Tuple(data))
+    elseif S == Int64 || S == UInt64
+        return parse(S, x.fielddata.DATA[i])
+    elseif S <: Arrow.Decimal
+        str = x.fielddata.DATA[i]
+        return S(parse(Int128, str))
+    elseif S <: Arrow.Date || S <: Arrow.Time
+        val = x.fielddata.DATA[i]
+        return Arrow.storagetype(S) == Int32 ? S(val) : S(parse(Int64, val))
+    elseif S <: Arrow.Timestamp
+        return S(parse(Int64, x.fielddata.DATA[i]))
+    else
+        return S(x.fielddata.DATA[i])
+    end
+end
+
+# take any Tables.jl source and write out arrow json datafile
+function DataFile(source)
+    fields = Field[]
+    metadata = nothing # TODO?
+    batches = RecordBatch[]
+    dictionaries = DictionaryBatch[]
+    dictencodings = Dict{String, Tuple{Base.Type, DictEncoding}}()
+    dictid = Ref(0)
+    for (i, tbl1) in Tables.partitions(source)
+        tbl = Arrow.toarrowtable(tbl1)
+        if i == 1
+            sch = Tables.schema(tbl)
+            for (nm, T, col) in zip(sch.names, sch.types, Tables.Columns(tbl))
+                if col isa Arrow.DictEncode
+                    id = dictid[]
+                    dictid[] += 1
+                    codes = DataAPI.refarray(col.data)
+                    if codes !== col.data
+                        IT = Type(eltype(codes))
+                    else
+                        IT = Type(Arrow.encodingtype(length(unique(col))))
+                    end
+                    dictencodings[String(nm)] = (T, DictEncoding(id, IT, false))
+                end
+                push!(fields, Field(String(nm), T, dictencodings))
+            end
+        end
+        # build record batch
+        len = Tables.rowcount(tbl)
+        columns = FieldData[]
+        for (nm, T, col) in zip(sch.names, sch.types, Tables.Columns(tbl))
+            push!(columns, FieldData(String(nm), T, col, dictencodings))
+        end
+        push!(batches, RecordBatch(len, columns))
+        # build dictionaries
+        for (nm, (T, dictencoding)) in dictencodings
+            column = FieldData(nm, T, Tables.getcolumn(tbl, nm), nothing)
+            recordbatch = RecordBatch(len, [column])
+            push!(dictionaries, DictionaryBatch(dictencoding.id, recordbatch))
+        end
+    end
+    schema = Schema(fields, metadata)
+    return DataFile(schema, batches, dictionaries)
+end
+
+function Base.isequal(df::DataFile, tbl::Arrow.Table)
+    Tables.schema(df) == Tables.schema(tbl) || return false
+    i = 1
+    for (col1, col2) in zip(Tables.Columns(df), Tables.Columns(tbl))
+        if !isequal(col1, col2)
+            @show i
+            return false
+        end
+        i += 1
+    end
+    return true
+end
+
+end