]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/go/arrow/array/record.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / arrow / array / record.go
diff --git a/ceph/src/arrow/go/arrow/array/record.go b/ceph/src/arrow/go/arrow/array/record.go
new file mode 100644 (file)
index 0000000..6543677
--- /dev/null
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package array
+
+import (
+       "fmt"
+       "strings"
+       "sync/atomic"
+
+       "github.com/apache/arrow/go/v6/arrow"
+       "github.com/apache/arrow/go/v6/arrow/internal/debug"
+       "github.com/apache/arrow/go/v6/arrow/memory"
+)
+
+// RecordReader reads a stream of records.
+type RecordReader interface {
+       Retain()
+       Release()
+
+       Schema() *arrow.Schema
+
+       Next() bool
+       Record() Record
+}
+
+// simpleRecords is a simple iterator over a collection of records.
+type simpleRecords struct {
+       refCount int64
+
+       schema *arrow.Schema
+       recs   []Record
+       cur    Record
+}
+
+// NewRecordReader returns a simple iterator over the given slice of records.
+func NewRecordReader(schema *arrow.Schema, recs []Record) (*simpleRecords, error) {
+       rs := &simpleRecords{
+               refCount: 1,
+               schema:   schema,
+               recs:     recs,
+               cur:      nil,
+       }
+
+       for _, rec := range rs.recs {
+               rec.Retain()
+       }
+
+       for _, rec := range recs {
+               if !rec.Schema().Equal(rs.schema) {
+                       rs.Release()
+                       return nil, fmt.Errorf("arrow/array: mismatch schema")
+               }
+       }
+
+       return rs, nil
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (rs *simpleRecords) Retain() {
+       atomic.AddInt64(&rs.refCount, 1)
+}
+
+// Release decreases the reference count by 1.
+// When the reference count goes to zero, the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (rs *simpleRecords) Release() {
+       debug.Assert(atomic.LoadInt64(&rs.refCount) > 0, "too many releases")
+
+       if atomic.AddInt64(&rs.refCount, -1) == 0 {
+               if rs.cur != nil {
+                       rs.cur.Release()
+               }
+               for _, rec := range rs.recs {
+                       rec.Release()
+               }
+               rs.recs = nil
+       }
+}
+
+func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema }
+func (rs *simpleRecords) Record() Record        { return rs.cur }
+func (rs *simpleRecords) Next() bool {
+       if len(rs.recs) == 0 {
+               return false
+       }
+       if rs.cur != nil {
+               rs.cur.Release()
+       }
+       rs.cur = rs.recs[0]
+       rs.recs = rs.recs[1:]
+       return true
+}
+
+// Record is a collection of equal-length arrays
+// matching a particular Schema.
+type Record interface {
+       Release()
+       Retain()
+
+       Schema() *arrow.Schema
+
+       NumRows() int64
+       NumCols() int64
+
+       Columns() []Interface
+       Column(i int) Interface
+       ColumnName(i int) string
+
+       // NewSlice constructs a zero-copy slice of the record with the indicated
+       // indices i and j, corresponding to array[i:j].
+       // The returned record must be Release()'d after use.
+       //
+       // NewSlice panics if the slice is outside the valid range of the record array.
+       // NewSlice panics if j < i.
+       NewSlice(i, j int64) Record
+}
+
+// simpleRecord is a basic, non-lazy in-memory record batch.
+type simpleRecord struct {
+       refCount int64
+
+       schema *arrow.Schema
+
+       rows int64
+       arrs []Interface
+}
+
+// NewRecord returns a basic, non-lazy in-memory record batch.
+//
+// NewRecord panics if the columns and schema are inconsistent.
+// NewRecord panics if rows is larger than the height of the columns.
+func NewRecord(schema *arrow.Schema, cols []Interface, nrows int64) *simpleRecord {
+       rec := &simpleRecord{
+               refCount: 1,
+               schema:   schema,
+               rows:     nrows,
+               arrs:     make([]Interface, len(cols)),
+       }
+       copy(rec.arrs, cols)
+       for _, arr := range rec.arrs {
+               arr.Retain()
+       }
+
+       if rec.rows < 0 {
+               switch len(rec.arrs) {
+               case 0:
+                       rec.rows = 0
+               default:
+                       rec.rows = int64(rec.arrs[0].Len())
+               }
+       }
+
+       err := rec.validate()
+       if err != nil {
+               rec.Release()
+               panic(err)
+       }
+
+       return rec
+}
+
+func (rec *simpleRecord) validate() error {
+       if len(rec.arrs) != len(rec.schema.Fields()) {
+               return fmt.Errorf("arrow/array: number of columns/fields mismatch")
+       }
+
+       for i, arr := range rec.arrs {
+               f := rec.schema.Field(i)
+               if int64(arr.Len()) < rec.rows {
+                       return fmt.Errorf("arrow/array: mismatch number of rows in column %q: got=%d, want=%d",
+                               f.Name,
+                               arr.Len(), rec.rows,
+                       )
+               }
+               if !arrow.TypeEqual(f.Type, arr.DataType()) {
+                       return fmt.Errorf("arrow/array: column %q type mismatch: got=%v, want=%v",
+                               f.Name,
+                               arr.DataType(), f.Type,
+                       )
+               }
+       }
+       return nil
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (rec *simpleRecord) Retain() {
+       atomic.AddInt64(&rec.refCount, 1)
+}
+
+// Release decreases the reference count by 1.
+// When the reference count goes to zero, the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (rec *simpleRecord) Release() {
+       debug.Assert(atomic.LoadInt64(&rec.refCount) > 0, "too many releases")
+
+       if atomic.AddInt64(&rec.refCount, -1) == 0 {
+               for _, arr := range rec.arrs {
+                       arr.Release()
+               }
+               rec.arrs = nil
+       }
+}
+
+func (rec *simpleRecord) Schema() *arrow.Schema   { return rec.schema }
+func (rec *simpleRecord) NumRows() int64          { return rec.rows }
+func (rec *simpleRecord) NumCols() int64          { return int64(len(rec.arrs)) }
+func (rec *simpleRecord) Columns() []Interface    { return rec.arrs }
+func (rec *simpleRecord) Column(i int) Interface  { return rec.arrs[i] }
+func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i).Name }
+
+// NewSlice constructs a zero-copy slice of the record with the indicated
+// indices i and j, corresponding to array[i:j].
+// The returned record must be Release()'d after use.
+//
+// NewSlice panics if the slice is outside the valid range of the record array.
+// NewSlice panics if j < i.
+func (rec *simpleRecord) NewSlice(i, j int64) Record {
+       arrs := make([]Interface, len(rec.arrs))
+       for ii, arr := range rec.arrs {
+               arrs[ii] = NewSlice(arr, i, j)
+       }
+       defer func() {
+               for _, arr := range arrs {
+                       arr.Release()
+               }
+       }()
+       return NewRecord(rec.schema, arrs, j-i)
+}
+
+func (rec *simpleRecord) String() string {
+       o := new(strings.Builder)
+       fmt.Fprintf(o, "record:\n  %v\n", rec.schema)
+       fmt.Fprintf(o, "  rows: %d\n", rec.rows)
+       for i, col := range rec.arrs {
+               fmt.Fprintf(o, "  col[%d][%s]: %v\n", i, rec.schema.Field(i).Name, col)
+       }
+
+       return o.String()
+}
+
+// RecordBuilder eases the process of building a Record, iteratively, from
+// a known Schema.
+type RecordBuilder struct {
+       refCount int64
+       mem      memory.Allocator
+       schema   *arrow.Schema
+       fields   []Builder
+}
+
+// NewRecordBuilder returns a builder, using the provided memory allocator and a schema.
+func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder {
+       b := &RecordBuilder{
+               refCount: 1,
+               mem:      mem,
+               schema:   schema,
+               fields:   make([]Builder, len(schema.Fields())),
+       }
+
+       for i, f := range schema.Fields() {
+               b.fields[i] = NewBuilder(b.mem, f.Type)
+       }
+
+       return b
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (b *RecordBuilder) Retain() {
+       atomic.AddInt64(&b.refCount, 1)
+}
+
+// Release decreases the reference count by 1.
+func (b *RecordBuilder) Release() {
+       debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+       if atomic.AddInt64(&b.refCount, -1) == 0 {
+               for _, f := range b.fields {
+                       f.Release()
+               }
+               b.fields = nil
+       }
+}
+
+func (b *RecordBuilder) Schema() *arrow.Schema { return b.schema }
+func (b *RecordBuilder) Fields() []Builder     { return b.fields }
+func (b *RecordBuilder) Field(i int) Builder   { return b.fields[i] }
+
+func (b *RecordBuilder) Reserve(size int) {
+       for _, f := range b.fields {
+               f.Reserve(size)
+       }
+}
+
+// NewRecord creates a new record from the memory buffers and resets the
+// RecordBuilder so it can be used to build a new record.
+//
+// The returned Record must be Release()'d after use.
+//
+// NewRecord panics if the fields' builder do not have the same length.
+func (b *RecordBuilder) NewRecord() Record {
+       cols := make([]Interface, len(b.fields))
+       rows := int64(0)
+
+       defer func(cols []Interface) {
+               for _, col := range cols {
+                       if col == nil {
+                               continue
+                       }
+                       col.Release()
+               }
+       }(cols)
+
+       for i, f := range b.fields {
+               cols[i] = f.NewArray()
+               irow := int64(cols[i].Len())
+               if i > 0 && irow != rows {
+                       panic(fmt.Errorf("arrow/array: field %d has %d rows. want=%d", i, irow, rows))
+               }
+               rows = irow
+       }
+
+       return NewRecord(b.schema, cols, rows)
+}
+
+var (
+       _ Record       = (*simpleRecord)(nil)
+       _ RecordReader = (*simpleRecords)(nil)
+)