]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/go/arrow/ipc/metadata.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / arrow / ipc / metadata.go
diff --git a/ceph/src/arrow/go/arrow/ipc/metadata.go b/ceph/src/arrow/go/arrow/ipc/metadata.go
new file mode 100644 (file)
index 0000000..9bacd26
--- /dev/null
@@ -0,0 +1,1151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ipc // import "github.com/apache/arrow/go/v6/arrow/ipc"
+
+import (
+       "encoding/binary"
+       "io"
+       "sort"
+
+       "github.com/apache/arrow/go/v6/arrow"
+       "github.com/apache/arrow/go/v6/arrow/internal/flatbuf"
+       "github.com/apache/arrow/go/v6/arrow/memory"
+       flatbuffers "github.com/google/flatbuffers/go"
+       "golang.org/x/xerrors"
+)
+
+// Magic string identifying an Apache Arrow file.
+var Magic = []byte("ARROW1")
+
+const (
+       currentMetadataVersion = MetadataV5
+       minMetadataVersion     = MetadataV4
+
+       // constants for the extension type metadata keys for the type name and
+       // any extension metadata to be passed to deserialize.
+       ExtensionTypeKeyName     = "ARROW:extension:name"
+       ExtensionMetadataKeyName = "ARROW:extension:metadata"
+
+       // ARROW-109: We set this number arbitrarily to help catch user mistakes. For
+       // deeply nested schemas, it is expected the user will indicate explicitly the
+       // maximum allowed recursion depth
+       kMaxNestingDepth = 64
+)
+
+type startVecFunc func(b *flatbuffers.Builder, n int) flatbuffers.UOffsetT
+
+type fieldMetadata struct {
+       Len    int64
+       Nulls  int64
+       Offset int64
+}
+
+type bufferMetadata struct {
+       Offset int64 // relative offset into the memory page to the starting byte of the buffer
+       Len    int64 // absolute length in bytes of the buffer
+}
+
+type fileBlock struct {
+       Offset int64
+       Meta   int32
+       Body   int64
+
+       r io.ReaderAt
+}
+
+func fileBlocksToFB(b *flatbuffers.Builder, blocks []fileBlock, start startVecFunc) flatbuffers.UOffsetT {
+       start(b, len(blocks))
+       for i := len(blocks) - 1; i >= 0; i-- {
+               blk := blocks[i]
+               flatbuf.CreateBlock(b, blk.Offset, blk.Meta, blk.Body)
+       }
+
+       return b.EndVector(len(blocks))
+}
+
+func (blk fileBlock) NewMessage() (*Message, error) {
+       var (
+               err error
+               buf []byte
+               r   = blk.section()
+       )
+
+       buf = make([]byte, blk.Meta)
+       _, err = io.ReadFull(r, buf)
+       if err != nil {
+               return nil, xerrors.Errorf("arrow/ipc: could not read message metadata: %w", err)
+       }
+
+       prefix := 0
+       switch binary.LittleEndian.Uint32(buf) {
+       case 0:
+       case kIPCContToken:
+               prefix = 8
+       default:
+               // ARROW-6314: backwards compatibility for reading old IPC
+               // messages produced prior to version 0.15.0
+               prefix = 4
+       }
+
+       meta := memory.NewBufferBytes(buf[prefix:]) // drop buf-size already known from blk.Meta
+
+       buf = make([]byte, blk.Body)
+       _, err = io.ReadFull(r, buf)
+       if err != nil {
+               return nil, xerrors.Errorf("arrow/ipc: could not read message body: %w", err)
+       }
+       body := memory.NewBufferBytes(buf)
+
+       return NewMessage(meta, body), nil
+}
+
+func (blk fileBlock) section() io.Reader {
+       return io.NewSectionReader(blk.r, blk.Offset, int64(blk.Meta)+blk.Body)
+}
+
+func unitFromFB(unit flatbuf.TimeUnit) arrow.TimeUnit {
+       switch unit {
+       case flatbuf.TimeUnitSECOND:
+               return arrow.Second
+       case flatbuf.TimeUnitMILLISECOND:
+               return arrow.Millisecond
+       case flatbuf.TimeUnitMICROSECOND:
+               return arrow.Microsecond
+       case flatbuf.TimeUnitNANOSECOND:
+               return arrow.Nanosecond
+       default:
+               panic(xerrors.Errorf("arrow/ipc: invalid flatbuf.TimeUnit(%d) value", unit))
+       }
+}
+
+func unitToFB(unit arrow.TimeUnit) flatbuf.TimeUnit {
+       switch unit {
+       case arrow.Second:
+               return flatbuf.TimeUnitSECOND
+       case arrow.Millisecond:
+               return flatbuf.TimeUnitMILLISECOND
+       case arrow.Microsecond:
+               return flatbuf.TimeUnitMICROSECOND
+       case arrow.Nanosecond:
+               return flatbuf.TimeUnitNANOSECOND
+       default:
+               panic(xerrors.Errorf("arrow/ipc: invalid arrow.TimeUnit(%d) value", unit))
+       }
+}
+
+// initFB is a helper function to handle flatbuffers' polymorphism.
+func initFB(t interface {
+       Table() flatbuffers.Table
+       Init([]byte, flatbuffers.UOffsetT)
+}, f func(tbl *flatbuffers.Table) bool) {
+       tbl := t.Table()
+       if !f(&tbl) {
+               panic(xerrors.Errorf("arrow/ipc: could not initialize %T from flatbuffer", t))
+       }
+       t.Init(tbl.Bytes, tbl.Pos)
+}
+
+func fieldFromFB(field *flatbuf.Field, memo *dictMemo) (arrow.Field, error) {
+       var (
+               err error
+               o   arrow.Field
+       )
+
+       o.Name = string(field.Name())
+       o.Nullable = field.Nullable()
+       o.Metadata, err = metadataFromFB(field)
+       if err != nil {
+               return o, err
+       }
+
+       encoding := field.Dictionary(nil)
+       switch encoding {
+       case nil:
+               n := field.ChildrenLength()
+               children := make([]arrow.Field, n)
+               for i := range children {
+                       var childFB flatbuf.Field
+                       if !field.Children(&childFB, i) {
+                               return o, xerrors.Errorf("arrow/ipc: could not load field child %d", i)
+                       }
+                       child, err := fieldFromFB(&childFB, memo)
+                       if err != nil {
+                               return o, xerrors.Errorf("arrow/ipc: could not convert field child %d: %w", i, err)
+                       }
+                       children[i] = child
+               }
+
+               o.Type, err = typeFromFB(field, children, &o.Metadata)
+               if err != nil {
+                       return o, xerrors.Errorf("arrow/ipc: could not convert field type: %w", err)
+               }
+       default:
+               panic("not implemented") // FIXME(sbinet)
+       }
+
+       return o, nil
+}
+
+func fieldToFB(b *flatbuffers.Builder, field arrow.Field, memo *dictMemo) flatbuffers.UOffsetT {
+       var visitor = fieldVisitor{b: b, memo: memo, meta: make(map[string]string)}
+       return visitor.result(field)
+}
+
+type fieldVisitor struct {
+       b      *flatbuffers.Builder
+       memo   *dictMemo
+       dtype  flatbuf.Type
+       offset flatbuffers.UOffsetT
+       kids   []flatbuffers.UOffsetT
+       meta   map[string]string
+}
+
+func (fv *fieldVisitor) visit(field arrow.Field) {
+       dt := field.Type
+       switch dt := dt.(type) {
+       case *arrow.NullType:
+               fv.dtype = flatbuf.TypeNull
+               flatbuf.NullStart(fv.b)
+               fv.offset = flatbuf.NullEnd(fv.b)
+
+       case *arrow.BooleanType:
+               fv.dtype = flatbuf.TypeBool
+               flatbuf.BoolStart(fv.b)
+               fv.offset = flatbuf.BoolEnd(fv.b)
+
+       case *arrow.Uint8Type:
+               fv.dtype = flatbuf.TypeInt
+               fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false)
+
+       case *arrow.Uint16Type:
+               fv.dtype = flatbuf.TypeInt
+               fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false)
+
+       case *arrow.Uint32Type:
+               fv.dtype = flatbuf.TypeInt
+               fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false)
+
+       case *arrow.Uint64Type:
+               fv.dtype = flatbuf.TypeInt
+               fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false)
+
+       case *arrow.Int8Type:
+               fv.dtype = flatbuf.TypeInt
+               fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true)
+
+       case *arrow.Int16Type:
+               fv.dtype = flatbuf.TypeInt
+               fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true)
+
+       case *arrow.Int32Type:
+               fv.dtype = flatbuf.TypeInt
+               fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true)
+
+       case *arrow.Int64Type:
+               fv.dtype = flatbuf.TypeInt
+               fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true)
+
+       case *arrow.Float16Type:
+               fv.dtype = flatbuf.TypeFloatingPoint
+               fv.offset = floatToFB(fv.b, int32(dt.BitWidth()))
+
+       case *arrow.Float32Type:
+               fv.dtype = flatbuf.TypeFloatingPoint
+               fv.offset = floatToFB(fv.b, int32(dt.BitWidth()))
+
+       case *arrow.Float64Type:
+               fv.dtype = flatbuf.TypeFloatingPoint
+               fv.offset = floatToFB(fv.b, int32(dt.BitWidth()))
+
+       case *arrow.Decimal128Type:
+               fv.dtype = flatbuf.TypeDecimal
+               flatbuf.DecimalStart(fv.b)
+               flatbuf.DecimalAddPrecision(fv.b, dt.Precision)
+               flatbuf.DecimalAddScale(fv.b, dt.Scale)
+               fv.offset = flatbuf.DecimalEnd(fv.b)
+
+       case *arrow.FixedSizeBinaryType:
+               fv.dtype = flatbuf.TypeFixedSizeBinary
+               flatbuf.FixedSizeBinaryStart(fv.b)
+               flatbuf.FixedSizeBinaryAddByteWidth(fv.b, int32(dt.ByteWidth))
+               fv.offset = flatbuf.FixedSizeBinaryEnd(fv.b)
+
+       case *arrow.BinaryType:
+               fv.dtype = flatbuf.TypeBinary
+               flatbuf.BinaryStart(fv.b)
+               fv.offset = flatbuf.BinaryEnd(fv.b)
+
+       case *arrow.StringType:
+               fv.dtype = flatbuf.TypeUtf8
+               flatbuf.Utf8Start(fv.b)
+               fv.offset = flatbuf.Utf8End(fv.b)
+
+       case *arrow.Date32Type:
+               fv.dtype = flatbuf.TypeDate
+               flatbuf.DateStart(fv.b)
+               flatbuf.DateAddUnit(fv.b, flatbuf.DateUnitDAY)
+               fv.offset = flatbuf.DateEnd(fv.b)
+
+       case *arrow.Date64Type:
+               fv.dtype = flatbuf.TypeDate
+               flatbuf.DateStart(fv.b)
+               flatbuf.DateAddUnit(fv.b, flatbuf.DateUnitMILLISECOND)
+               fv.offset = flatbuf.DateEnd(fv.b)
+
+       case *arrow.Time32Type:
+               fv.dtype = flatbuf.TypeTime
+               flatbuf.TimeStart(fv.b)
+               flatbuf.TimeAddUnit(fv.b, unitToFB(dt.Unit))
+               flatbuf.TimeAddBitWidth(fv.b, 32)
+               fv.offset = flatbuf.TimeEnd(fv.b)
+
+       case *arrow.Time64Type:
+               fv.dtype = flatbuf.TypeTime
+               flatbuf.TimeStart(fv.b)
+               flatbuf.TimeAddUnit(fv.b, unitToFB(dt.Unit))
+               flatbuf.TimeAddBitWidth(fv.b, 64)
+               fv.offset = flatbuf.TimeEnd(fv.b)
+
+       case *arrow.TimestampType:
+               fv.dtype = flatbuf.TypeTimestamp
+               unit := unitToFB(dt.Unit)
+               var tz flatbuffers.UOffsetT
+               if dt.TimeZone != "" {
+                       tz = fv.b.CreateString(dt.TimeZone)
+               }
+               flatbuf.TimestampStart(fv.b)
+               flatbuf.TimestampAddUnit(fv.b, unit)
+               flatbuf.TimestampAddTimezone(fv.b, tz)
+               fv.offset = flatbuf.TimestampEnd(fv.b)
+
+       case *arrow.StructType:
+               fv.dtype = flatbuf.TypeStruct_
+               offsets := make([]flatbuffers.UOffsetT, len(dt.Fields()))
+               for i, field := range dt.Fields() {
+                       offsets[i] = fieldToFB(fv.b, field, fv.memo)
+               }
+               flatbuf.Struct_Start(fv.b)
+               for i := len(offsets) - 1; i >= 0; i-- {
+                       fv.b.PrependUOffsetT(offsets[i])
+               }
+               fv.offset = flatbuf.Struct_End(fv.b)
+               fv.kids = append(fv.kids, offsets...)
+
+       case *arrow.ListType:
+               fv.dtype = flatbuf.TypeList
+               fv.kids = append(fv.kids, fieldToFB(fv.b, dt.ElemField(), fv.memo))
+               flatbuf.ListStart(fv.b)
+               fv.offset = flatbuf.ListEnd(fv.b)
+
+       case *arrow.FixedSizeListType:
+               fv.dtype = flatbuf.TypeFixedSizeList
+               fv.kids = append(fv.kids, fieldToFB(fv.b, dt.ElemField(), fv.memo))
+               flatbuf.FixedSizeListStart(fv.b)
+               flatbuf.FixedSizeListAddListSize(fv.b, dt.Len())
+               fv.offset = flatbuf.FixedSizeListEnd(fv.b)
+
+       case *arrow.MonthIntervalType:
+               fv.dtype = flatbuf.TypeInterval
+               flatbuf.IntervalStart(fv.b)
+               flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitYEAR_MONTH)
+               fv.offset = flatbuf.IntervalEnd(fv.b)
+
+       case *arrow.DayTimeIntervalType:
+               fv.dtype = flatbuf.TypeInterval
+               flatbuf.IntervalStart(fv.b)
+               flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitDAY_TIME)
+               fv.offset = flatbuf.IntervalEnd(fv.b)
+
+       case *arrow.MonthDayNanoIntervalType:
+               fv.dtype = flatbuf.TypeInterval
+               flatbuf.IntervalStart(fv.b)
+               flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitMONTH_DAY_NANO)
+               fv.offset = flatbuf.IntervalEnd(fv.b)
+
+       case *arrow.DurationType:
+               fv.dtype = flatbuf.TypeDuration
+               unit := unitToFB(dt.Unit)
+               flatbuf.DurationStart(fv.b)
+               flatbuf.DurationAddUnit(fv.b, unit)
+               fv.offset = flatbuf.DurationEnd(fv.b)
+
+       case *arrow.MapType:
+               fv.dtype = flatbuf.TypeMap
+               fv.kids = append(fv.kids, fieldToFB(fv.b, dt.ValueField(), fv.memo))
+               flatbuf.MapStart(fv.b)
+               flatbuf.MapAddKeysSorted(fv.b, dt.KeysSorted)
+               fv.offset = flatbuf.MapEnd(fv.b)
+
+       case arrow.ExtensionType:
+               field.Type = dt.StorageType()
+               fv.visit(field)
+               fv.meta[ExtensionTypeKeyName] = dt.ExtensionName()
+               fv.meta[ExtensionMetadataKeyName] = string(dt.Serialize())
+
+       default:
+               err := xerrors.Errorf("arrow/ipc: invalid data type %v", dt)
+               panic(err) // FIXME(sbinet): implement all data-types.
+       }
+}
+
+func (fv *fieldVisitor) result(field arrow.Field) flatbuffers.UOffsetT {
+       nameFB := fv.b.CreateString(field.Name)
+
+       fv.visit(field)
+
+       flatbuf.FieldStartChildrenVector(fv.b, len(fv.kids))
+       for i := len(fv.kids) - 1; i >= 0; i-- {
+               fv.b.PrependUOffsetT(fv.kids[i])
+       }
+       kidsFB := fv.b.EndVector(len(fv.kids))
+
+       var dictFB flatbuffers.UOffsetT
+       if field.Type.ID() == arrow.DICTIONARY {
+               panic("not implemented") // FIXME(sbinet)
+       }
+
+       var (
+               metaFB flatbuffers.UOffsetT
+               kvs    []flatbuffers.UOffsetT
+       )
+       for i, k := range field.Metadata.Keys() {
+               v := field.Metadata.Values()[i]
+               kk := fv.b.CreateString(k)
+               vv := fv.b.CreateString(v)
+               flatbuf.KeyValueStart(fv.b)
+               flatbuf.KeyValueAddKey(fv.b, kk)
+               flatbuf.KeyValueAddValue(fv.b, vv)
+               kvs = append(kvs, flatbuf.KeyValueEnd(fv.b))
+       }
+       {
+               keys := make([]string, 0, len(fv.meta))
+               for k := range fv.meta {
+                       keys = append(keys, k)
+               }
+               sort.Strings(keys)
+               for _, k := range keys {
+                       v := fv.meta[k]
+                       kk := fv.b.CreateString(k)
+                       vv := fv.b.CreateString(v)
+                       flatbuf.KeyValueStart(fv.b)
+                       flatbuf.KeyValueAddKey(fv.b, kk)
+                       flatbuf.KeyValueAddValue(fv.b, vv)
+                       kvs = append(kvs, flatbuf.KeyValueEnd(fv.b))
+               }
+       }
+       if len(kvs) > 0 {
+               flatbuf.FieldStartCustomMetadataVector(fv.b, len(kvs))
+               for i := len(kvs) - 1; i >= 0; i-- {
+                       fv.b.PrependUOffsetT(kvs[i])
+               }
+               metaFB = fv.b.EndVector(len(kvs))
+       }
+
+       flatbuf.FieldStart(fv.b)
+       flatbuf.FieldAddName(fv.b, nameFB)
+       flatbuf.FieldAddNullable(fv.b, field.Nullable)
+       flatbuf.FieldAddTypeType(fv.b, fv.dtype)
+       flatbuf.FieldAddType(fv.b, fv.offset)
+       flatbuf.FieldAddDictionary(fv.b, dictFB)
+       flatbuf.FieldAddChildren(fv.b, kidsFB)
+       flatbuf.FieldAddCustomMetadata(fv.b, metaFB)
+
+       offset := flatbuf.FieldEnd(fv.b)
+
+       return offset
+}
+
+func fieldFromFBDict(field *flatbuf.Field) (arrow.Field, error) {
+       var (
+               o = arrow.Field{
+                       Name:     string(field.Name()),
+                       Nullable: field.Nullable(),
+               }
+               err  error
+               memo = newMemo()
+       )
+
+       // any DictionaryEncoding set is ignored here.
+
+       kids := make([]arrow.Field, field.ChildrenLength())
+       for i := range kids {
+               var kid flatbuf.Field
+               if !field.Children(&kid, i) {
+                       return o, xerrors.Errorf("arrow/ipc: could not load field child %d", i)
+               }
+               kids[i], err = fieldFromFB(&kid, &memo)
+               if err != nil {
+                       return o, xerrors.Errorf("arrow/ipc: field from dict: %w", err)
+               }
+       }
+
+       meta, err := metadataFromFB(field)
+       if err != nil {
+               return o, xerrors.Errorf("arrow/ipc: metadata for field from dict: %w", err)
+       }
+
+       o.Type, err = typeFromFB(field, kids, &meta)
+       if err != nil {
+               return o, xerrors.Errorf("arrow/ipc: type for field from dict: %w", err)
+       }
+
+       return o, nil
+}
+
+func typeFromFB(field *flatbuf.Field, children []arrow.Field, md *arrow.Metadata) (arrow.DataType, error) {
+       var data flatbuffers.Table
+       if !field.Type(&data) {
+               return nil, xerrors.Errorf("arrow/ipc: could not load field type data")
+       }
+
+       dt, err := concreteTypeFromFB(field.TypeType(), data, children)
+       if err != nil {
+               return dt, err
+       }
+
+       // look for extension metadata in custom metadata field.
+       if md.Len() > 0 {
+               i := md.FindKey(ExtensionTypeKeyName)
+               if i < 0 {
+                       return dt, err
+               }
+
+               extType := arrow.GetExtensionType(md.Values()[i])
+               if extType == nil {
+                       // if the extension type is unknown, we do not error here.
+                       // simply return the storage type.
+                       return dt, err
+               }
+
+               var (
+                       data    string
+                       dataIdx int
+               )
+
+               if dataIdx = md.FindKey(ExtensionMetadataKeyName); dataIdx >= 0 {
+                       data = md.Values()[dataIdx]
+               }
+
+               dt, err = extType.Deserialize(dt, data)
+               if err != nil {
+                       return dt, err
+               }
+
+               mdkeys := md.Keys()
+               mdvals := md.Values()
+               if dataIdx < 0 {
+                       // if there was no extension metadata, just the name, we only have to
+                       // remove the extension name metadata key/value to ensure roundtrip
+                       // metadata consistency
+                       *md = arrow.NewMetadata(append(mdkeys[:i], mdkeys[i+1:]...), append(mdvals[:i], mdvals[i+1:]...))
+               } else {
+                       // if there was extension metadata, we need to remove both the type name
+                       // and the extension metadata keys and values.
+                       newkeys := make([]string, 0, md.Len()-2)
+                       newvals := make([]string, 0, md.Len()-2)
+                       for j := range mdkeys {
+                               if j != i && j != dataIdx { // copy everything except the extension metadata keys/values
+                                       newkeys = append(newkeys, mdkeys[j])
+                                       newvals = append(newvals, mdvals[j])
+                               }
+                       }
+                       *md = arrow.NewMetadata(newkeys, newvals)
+               }
+       }
+
+       return dt, err
+}
+
+func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arrow.Field) (arrow.DataType, error) {
+       switch typ {
+       case flatbuf.TypeNONE:
+               return nil, xerrors.Errorf("arrow/ipc: Type metadata cannot be none")
+
+       case flatbuf.TypeNull:
+               return arrow.Null, nil
+
+       case flatbuf.TypeInt:
+               var dt flatbuf.Int
+               dt.Init(data.Bytes, data.Pos)
+               return intFromFB(dt)
+
+       case flatbuf.TypeFloatingPoint:
+               var dt flatbuf.FloatingPoint
+               dt.Init(data.Bytes, data.Pos)
+               return floatFromFB(dt)
+
+       case flatbuf.TypeDecimal:
+               var dt flatbuf.Decimal
+               dt.Init(data.Bytes, data.Pos)
+               return decimalFromFB(dt)
+
+       case flatbuf.TypeBinary:
+               return arrow.BinaryTypes.Binary, nil
+
+       case flatbuf.TypeFixedSizeBinary:
+               var dt flatbuf.FixedSizeBinary
+               dt.Init(data.Bytes, data.Pos)
+               return &arrow.FixedSizeBinaryType{ByteWidth: int(dt.ByteWidth())}, nil
+
+       case flatbuf.TypeUtf8:
+               return arrow.BinaryTypes.String, nil
+
+       case flatbuf.TypeBool:
+               return arrow.FixedWidthTypes.Boolean, nil
+
+       case flatbuf.TypeList:
+               if len(children) != 1 {
+                       return nil, xerrors.Errorf("arrow/ipc: List must have exactly 1 child field (got=%d)", len(children))
+               }
+               dt := arrow.ListOfField(children[0])
+               return dt, nil
+
+       case flatbuf.TypeFixedSizeList:
+               var dt flatbuf.FixedSizeList
+               dt.Init(data.Bytes, data.Pos)
+               if len(children) != 1 {
+                       return nil, xerrors.Errorf("arrow/ipc: FixedSizeList must have exactly 1 child field (got=%d)", len(children))
+               }
+               ret := arrow.FixedSizeListOfField(dt.ListSize(), children[0])
+               return ret, nil
+
+       case flatbuf.TypeStruct_:
+               return arrow.StructOf(children...), nil
+
+       case flatbuf.TypeTime:
+               var dt flatbuf.Time
+               dt.Init(data.Bytes, data.Pos)
+               return timeFromFB(dt)
+
+       case flatbuf.TypeTimestamp:
+               var dt flatbuf.Timestamp
+               dt.Init(data.Bytes, data.Pos)
+               return timestampFromFB(dt)
+
+       case flatbuf.TypeDate:
+               var dt flatbuf.Date
+               dt.Init(data.Bytes, data.Pos)
+               return dateFromFB(dt)
+
+       case flatbuf.TypeInterval:
+               var dt flatbuf.Interval
+               dt.Init(data.Bytes, data.Pos)
+               return intervalFromFB(dt)
+
+       case flatbuf.TypeDuration:
+               var dt flatbuf.Duration
+               dt.Init(data.Bytes, data.Pos)
+               return durationFromFB(dt)
+
+       case flatbuf.TypeMap:
+               if len(children) != 1 {
+                       return nil, xerrors.Errorf("arrow/ipc: Map must have exactly 1 child field")
+               }
+
+               if children[0].Nullable || children[0].Type.ID() != arrow.STRUCT || len(children[0].Type.(*arrow.StructType).Fields()) != 2 {
+                       return nil, xerrors.Errorf("arrow/ipc: Map's key-item pairs must be non-nullable structs")
+               }
+
+               pairType := children[0].Type.(*arrow.StructType)
+               if pairType.Field(0).Nullable {
+                       return nil, xerrors.Errorf("arrow/ipc: Map's keys must be non-nullable")
+               }
+
+               var dt flatbuf.Map
+               dt.Init(data.Bytes, data.Pos)
+               ret := arrow.MapOf(pairType.Field(0).Type, pairType.Field(1).Type)
+               ret.KeysSorted = dt.KeysSorted()
+               return ret, nil
+
+       default:
+               // FIXME(sbinet): implement all the other types.
+               panic(xerrors.Errorf("arrow/ipc: type %v not implemented", flatbuf.EnumNamesType[typ]))
+       }
+}
+
+func intFromFB(data flatbuf.Int) (arrow.DataType, error) {
+       bw := data.BitWidth()
+       if bw > 64 {
+               return nil, xerrors.Errorf("arrow/ipc: integers with more than 64 bits not implemented (bits=%d)", bw)
+       }
+       if bw < 8 {
+               return nil, xerrors.Errorf("arrow/ipc: integers with less than 8 bits not implemented (bits=%d)", bw)
+       }
+
+       switch bw {
+       case 8:
+               if !data.IsSigned() {
+                       return arrow.PrimitiveTypes.Uint8, nil
+               }
+               return arrow.PrimitiveTypes.Int8, nil
+
+       case 16:
+               if !data.IsSigned() {
+                       return arrow.PrimitiveTypes.Uint16, nil
+               }
+               return arrow.PrimitiveTypes.Int16, nil
+
+       case 32:
+               if !data.IsSigned() {
+                       return arrow.PrimitiveTypes.Uint32, nil
+               }
+               return arrow.PrimitiveTypes.Int32, nil
+
+       case 64:
+               if !data.IsSigned() {
+                       return arrow.PrimitiveTypes.Uint64, nil
+               }
+               return arrow.PrimitiveTypes.Int64, nil
+       default:
+               return nil, xerrors.Errorf("arrow/ipc: integers not in cstdint are not implemented")
+       }
+}
+
+func intToFB(b *flatbuffers.Builder, bw int32, isSigned bool) flatbuffers.UOffsetT {
+       flatbuf.IntStart(b)
+       flatbuf.IntAddBitWidth(b, bw)
+       flatbuf.IntAddIsSigned(b, isSigned)
+       return flatbuf.IntEnd(b)
+}
+
+func floatFromFB(data flatbuf.FloatingPoint) (arrow.DataType, error) {
+       switch p := data.Precision(); p {
+       case flatbuf.PrecisionHALF:
+               return arrow.FixedWidthTypes.Float16, nil
+       case flatbuf.PrecisionSINGLE:
+               return arrow.PrimitiveTypes.Float32, nil
+       case flatbuf.PrecisionDOUBLE:
+               return arrow.PrimitiveTypes.Float64, nil
+       default:
+               return nil, xerrors.Errorf("arrow/ipc: floating point type with %d precision not implemented", p)
+       }
+}
+
+func floatToFB(b *flatbuffers.Builder, bw int32) flatbuffers.UOffsetT {
+       switch bw {
+       case 16:
+               flatbuf.FloatingPointStart(b)
+               flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionHALF)
+               return flatbuf.FloatingPointEnd(b)
+       case 32:
+               flatbuf.FloatingPointStart(b)
+               flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionSINGLE)
+               return flatbuf.FloatingPointEnd(b)
+       case 64:
+               flatbuf.FloatingPointStart(b)
+               flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionDOUBLE)
+               return flatbuf.FloatingPointEnd(b)
+       default:
+               panic(xerrors.Errorf("arrow/ipc: invalid floating point precision %d-bits", bw))
+       }
+}
+
+func decimalFromFB(data flatbuf.Decimal) (arrow.DataType, error) {
+       return &arrow.Decimal128Type{Precision: data.Precision(), Scale: data.Scale()}, nil
+}
+
+func timeFromFB(data flatbuf.Time) (arrow.DataType, error) {
+       bw := data.BitWidth()
+       unit := unitFromFB(data.Unit())
+
+       switch bw {
+       case 32:
+               switch unit {
+               case arrow.Millisecond:
+                       return arrow.FixedWidthTypes.Time32ms, nil
+               case arrow.Second:
+                       return arrow.FixedWidthTypes.Time32s, nil
+               default:
+                       return nil, xerrors.Errorf("arrow/ipc: Time32 type with %v unit not implemented", unit)
+               }
+       case 64:
+               switch unit {
+               case arrow.Nanosecond:
+                       return arrow.FixedWidthTypes.Time64ns, nil
+               case arrow.Microsecond:
+                       return arrow.FixedWidthTypes.Time64us, nil
+               default:
+                       return nil, xerrors.Errorf("arrow/ipc: Time64 type with %v unit not implemented", unit)
+               }
+       default:
+               return nil, xerrors.Errorf("arrow/ipc: Time type with %d bitwidth not implemented", bw)
+       }
+}
+
+func timestampFromFB(data flatbuf.Timestamp) (arrow.DataType, error) {
+       unit := unitFromFB(data.Unit())
+       tz := string(data.Timezone())
+       return &arrow.TimestampType{Unit: unit, TimeZone: tz}, nil
+}
+
+func dateFromFB(data flatbuf.Date) (arrow.DataType, error) {
+       switch data.Unit() {
+       case flatbuf.DateUnitDAY:
+               return arrow.FixedWidthTypes.Date32, nil
+       case flatbuf.DateUnitMILLISECOND:
+               return arrow.FixedWidthTypes.Date64, nil
+       }
+       return nil, xerrors.Errorf("arrow/ipc: Date type with %d unit not implemented", data.Unit())
+}
+
+func intervalFromFB(data flatbuf.Interval) (arrow.DataType, error) {
+       switch data.Unit() {
+       case flatbuf.IntervalUnitYEAR_MONTH:
+               return arrow.FixedWidthTypes.MonthInterval, nil
+       case flatbuf.IntervalUnitDAY_TIME:
+               return arrow.FixedWidthTypes.DayTimeInterval, nil
+       case flatbuf.IntervalUnitMONTH_DAY_NANO:
+               return arrow.FixedWidthTypes.MonthDayNanoInterval, nil
+       }
+       return nil, xerrors.Errorf("arrow/ipc: Interval type with %d unit not implemented", data.Unit())
+}
+
+func durationFromFB(data flatbuf.Duration) (arrow.DataType, error) {
+       switch data.Unit() {
+       case flatbuf.TimeUnitSECOND:
+               return arrow.FixedWidthTypes.Duration_s, nil
+       case flatbuf.TimeUnitMILLISECOND:
+               return arrow.FixedWidthTypes.Duration_ms, nil
+       case flatbuf.TimeUnitMICROSECOND:
+               return arrow.FixedWidthTypes.Duration_us, nil
+       case flatbuf.TimeUnitNANOSECOND:
+               return arrow.FixedWidthTypes.Duration_ns, nil
+       }
+       return nil, xerrors.Errorf("arrow/ipc: Duration type with %d unit not implemented", data.Unit())
+}
+
+type customMetadataer interface {
+       CustomMetadataLength() int
+       CustomMetadata(*flatbuf.KeyValue, int) bool
+}
+
+func metadataFromFB(md customMetadataer) (arrow.Metadata, error) {
+       var (
+               keys = make([]string, md.CustomMetadataLength())
+               vals = make([]string, md.CustomMetadataLength())
+       )
+
+       for i := range keys {
+               var kv flatbuf.KeyValue
+               if !md.CustomMetadata(&kv, i) {
+                       return arrow.Metadata{}, xerrors.Errorf("arrow/ipc: could not read key-value %d from flatbuffer", i)
+               }
+               keys[i] = string(kv.Key())
+               vals[i] = string(kv.Value())
+       }
+
+       return arrow.NewMetadata(keys, vals), nil
+}
+
+func metadataToFB(b *flatbuffers.Builder, meta arrow.Metadata, start startVecFunc) flatbuffers.UOffsetT {
+       if meta.Len() == 0 {
+               return 0
+       }
+
+       n := meta.Len()
+       kvs := make([]flatbuffers.UOffsetT, n)
+       for i := range kvs {
+               k := b.CreateString(meta.Keys()[i])
+               v := b.CreateString(meta.Values()[i])
+               flatbuf.KeyValueStart(b)
+               flatbuf.KeyValueAddKey(b, k)
+               flatbuf.KeyValueAddValue(b, v)
+               kvs[i] = flatbuf.KeyValueEnd(b)
+       }
+
+       start(b, n)
+       for i := n - 1; i >= 0; i-- {
+               b.PrependUOffsetT(kvs[i])
+       }
+       return b.EndVector(n)
+}
+
+func schemaFromFB(schema *flatbuf.Schema, memo *dictMemo) (*arrow.Schema, error) {
+       var (
+               err    error
+               fields = make([]arrow.Field, schema.FieldsLength())
+       )
+
+       for i := range fields {
+               var field flatbuf.Field
+               if !schema.Fields(&field, i) {
+                       return nil, xerrors.Errorf("arrow/ipc: could not read field %d from schema", i)
+               }
+
+               fields[i], err = fieldFromFB(&field, memo)
+               if err != nil {
+                       return nil, xerrors.Errorf("arrow/ipc: could not convert field %d from flatbuf: %w", i, err)
+               }
+       }
+
+       md, err := metadataFromFB(schema)
+       if err != nil {
+               return nil, xerrors.Errorf("arrow/ipc: could not convert schema metadata from flatbuf: %w", err)
+       }
+
+       return arrow.NewSchema(fields, &md), nil
+}
+
+func schemaToFB(b *flatbuffers.Builder, schema *arrow.Schema, memo *dictMemo) flatbuffers.UOffsetT {
+       fields := make([]flatbuffers.UOffsetT, len(schema.Fields()))
+       for i, field := range schema.Fields() {
+               fields[i] = fieldToFB(b, field, memo)
+       }
+
+       flatbuf.SchemaStartFieldsVector(b, len(fields))
+       for i := len(fields) - 1; i >= 0; i-- {
+               b.PrependUOffsetT(fields[i])
+       }
+       fieldsFB := b.EndVector(len(fields))
+
+       metaFB := metadataToFB(b, schema.Metadata(), flatbuf.SchemaStartCustomMetadataVector)
+
+       flatbuf.SchemaStart(b)
+       flatbuf.SchemaAddEndianness(b, flatbuf.EndiannessLittle)
+       flatbuf.SchemaAddFields(b, fieldsFB)
+       flatbuf.SchemaAddCustomMetadata(b, metaFB)
+       offset := flatbuf.SchemaEnd(b)
+
+       return offset
+}
+
+func dictTypesFromFB(schema *flatbuf.Schema) (dictTypeMap, error) {
+       var (
+               err    error
+               fields = make(dictTypeMap, schema.FieldsLength())
+       )
+       for i := 0; i < schema.FieldsLength(); i++ {
+               var field flatbuf.Field
+               if !schema.Fields(&field, i) {
+                       return nil, xerrors.Errorf("arrow/ipc: could not load field %d from schema", i)
+               }
+               fields, err = visitField(&field, fields)
+               if err != nil {
+                       return nil, xerrors.Errorf("arrow/ipc: could not visit field %d from schema: %w", i, err)
+               }
+       }
+       return fields, err
+}
+
+func visitField(field *flatbuf.Field, dict dictTypeMap) (dictTypeMap, error) {
+       var err error
+       meta := field.Dictionary(nil)
+       switch meta {
+       case nil:
+               // field is not dictionary encoded.
+               // => visit children.
+               for i := 0; i < field.ChildrenLength(); i++ {
+                       var child flatbuf.Field
+                       if !field.Children(&child, i) {
+                               return nil, xerrors.Errorf("arrow/ipc: could not visit child %d from field", i)
+                       }
+                       dict, err = visitField(&child, dict)
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       default:
+               // field is dictionary encoded.
+               // construct the data type for the dictionary: no descendants can be dict-encoded.
+               dfield, err := fieldFromFBDict(field)
+               if err != nil {
+                       return nil, xerrors.Errorf("arrow/ipc: could not create data type for dictionary: %w", err)
+               }
+               dict[meta.Id()] = dfield
+       }
+       return dict, err
+}
+
+// payloadsFromSchema returns a slice of payloads corresponding to the given schema.
+// Callers of payloadsFromSchema will need to call Release after use.
+func payloadsFromSchema(schema *arrow.Schema, mem memory.Allocator, memo *dictMemo) payloads {
+       dict := newMemo()
+
+       ps := make(payloads, 1, dict.Len()+1)
+       ps[0].msg = MessageSchema
+       ps[0].meta = writeSchemaMessage(schema, mem, &dict)
+
+       // append dictionaries.
+       if dict.Len() > 0 {
+               panic("payloads-from-schema: not-implemented")
+               //              for id, arr := range dict.id2dict {
+               //                      // GetSchemaPayloads: writer.cc:535
+               //              }
+       }
+
+       if memo != nil {
+               *memo = dict
+       }
+
+       return ps
+}
+
+func writeFBBuilder(b *flatbuffers.Builder, mem memory.Allocator) *memory.Buffer {
+       raw := b.FinishedBytes()
+       buf := memory.NewResizableBuffer(mem)
+       buf.Resize(len(raw))
+       copy(buf.Bytes(), raw)
+       return buf
+}
+
+func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64) *memory.Buffer {
+
+       flatbuf.MessageStart(b)
+       flatbuf.MessageAddVersion(b, flatbuf.MetadataVersion(currentMetadataVersion))
+       flatbuf.MessageAddHeaderType(b, hdrType)
+       flatbuf.MessageAddHeader(b, hdr)
+       flatbuf.MessageAddBodyLength(b, bodyLen)
+       msg := flatbuf.MessageEnd(b)
+       b.Finish(msg)
+
+       return writeFBBuilder(b, mem)
+}
+
+func writeSchemaMessage(schema *arrow.Schema, mem memory.Allocator, dict *dictMemo) *memory.Buffer {
+       b := flatbuffers.NewBuilder(1024)
+       schemaFB := schemaToFB(b, schema, dict)
+       return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0)
+}
+
+func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w io.Writer) error {
+       var (
+               b    = flatbuffers.NewBuilder(1024)
+               memo = newMemo()
+       )
+
+       schemaFB := schemaToFB(b, schema, &memo)
+       dictsFB := fileBlocksToFB(b, dicts, flatbuf.FooterStartDictionariesVector)
+       recsFB := fileBlocksToFB(b, recs, flatbuf.FooterStartRecordBatchesVector)
+
+       flatbuf.FooterStart(b)
+       flatbuf.FooterAddVersion(b, flatbuf.MetadataVersion(currentMetadataVersion))
+       flatbuf.FooterAddSchema(b, schemaFB)
+       flatbuf.FooterAddDictionaries(b, dictsFB)
+       flatbuf.FooterAddRecordBatches(b, recsFB)
+       footer := flatbuf.FooterEnd(b)
+
+       b.Finish(footer)
+
+       _, err := w.Write(b.FinishedBytes())
+       return err
+}
+
+func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) *memory.Buffer {
+       b := flatbuffers.NewBuilder(0)
+       recFB := recordToFB(b, size, bodyLength, fields, meta, codec)
+       return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength)
+}
+
+func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) flatbuffers.UOffsetT {
+       fieldsFB := writeFieldNodes(b, fields, flatbuf.RecordBatchStartNodesVector)
+       metaFB := writeBuffers(b, meta, flatbuf.RecordBatchStartBuffersVector)
+       var bodyCompressFB flatbuffers.UOffsetT
+       if codec != -1 {
+               bodyCompressFB = writeBodyCompression(b, codec)
+       }
+
+       flatbuf.RecordBatchStart(b)
+       flatbuf.RecordBatchAddLength(b, size)
+       flatbuf.RecordBatchAddNodes(b, fieldsFB)
+       flatbuf.RecordBatchAddBuffers(b, metaFB)
+       if codec != -1 {
+               flatbuf.RecordBatchAddCompression(b, bodyCompressFB)
+       }
+
+       return flatbuf.RecordBatchEnd(b)
+}
+
+func writeFieldNodes(b *flatbuffers.Builder, fields []fieldMetadata, start startVecFunc) flatbuffers.UOffsetT {
+
+       start(b, len(fields))
+       for i := len(fields) - 1; i >= 0; i-- {
+               field := fields[i]
+               if field.Offset != 0 {
+                       panic(xerrors.Errorf("arrow/ipc: field metadata for IPC must have offset 0"))
+               }
+               flatbuf.CreateFieldNode(b, field.Len, field.Nulls)
+       }
+
+       return b.EndVector(len(fields))
+}
+
+func writeBuffers(b *flatbuffers.Builder, buffers []bufferMetadata, start startVecFunc) flatbuffers.UOffsetT {
+       start(b, len(buffers))
+       for i := len(buffers) - 1; i >= 0; i-- {
+               buffer := buffers[i]
+               flatbuf.CreateBuffer(b, buffer.Offset, buffer.Len)
+       }
+       return b.EndVector(len(buffers))
+}
+
+func writeBodyCompression(b *flatbuffers.Builder, codec flatbuf.CompressionType) flatbuffers.UOffsetT {
+       flatbuf.BodyCompressionStart(b)
+       flatbuf.BodyCompressionAddCodec(b, codec)
+       flatbuf.BodyCompressionAddMethod(b, flatbuf.BodyCompressionMethodBUFFER)
+       return flatbuf.BodyCompressionEnd(b)
+}
+
+func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error) {
+       var (
+               n   int
+               err error
+       )
+
+       // ARROW-3212: we do not make any assumption on whether the output stream is aligned or not.
+       paddedMsgLen := int32(msg.Len()) + 8
+       remainder := paddedMsgLen % alignment
+       if remainder != 0 {
+               paddedMsgLen += alignment - remainder
+       }
+
+       tmp := make([]byte, 4)
+
+       // write continuation indicator, to address 8-byte alignment requirement from FlatBuffers.
+       binary.LittleEndian.PutUint32(tmp, kIPCContToken)
+       _, err = w.Write(tmp)
+       if err != nil {
+               return 0, xerrors.Errorf("arrow/ipc: could not write continuation bit indicator: %w", err)
+       }
+
+       // the returned message size includes the length prefix, the flatbuffer, + padding
+       n = int(paddedMsgLen)
+
+       // write the flatbuffer size prefix, including padding
+       sizeFB := paddedMsgLen - 8
+       binary.LittleEndian.PutUint32(tmp, uint32(sizeFB))
+       _, err = w.Write(tmp)
+       if err != nil {
+               return n, xerrors.Errorf("arrow/ipc: could not write message flatbuffer size prefix: %w", err)
+       }
+
+       // write the flatbuffer
+       _, err = w.Write(msg.Bytes())
+       if err != nil {
+               return n, xerrors.Errorf("arrow/ipc: could not write message flatbuffer: %w", err)
+       }
+
+       // write any padding
+       padding := paddedMsgLen - int32(msg.Len()) - 8
+       if padding > 0 {
+               _, err = w.Write(paddingBytes[:padding])
+               if err != nil {
+                       return n, xerrors.Errorf("arrow/ipc: could not write message padding bytes: %w", err)
+               }
+       }
+
+       return n, err
+}