1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
24 "github.com/apache/arrow/go/v6/arrow"
25 "github.com/apache/arrow/go/v6/arrow/internal/debug"
26 "github.com/apache/arrow/go/v6/arrow/memory"
29 // RecordReader reads a stream of records.
30 type RecordReader interface {
34 Schema() *arrow.Schema
40 // simpleRecords is a simple iterator over a collection of records.
41 type simpleRecords struct {
49 // NewRecordReader returns a simple iterator over the given slice of records.
50 func NewRecordReader(schema *arrow.Schema, recs []Record) (*simpleRecords, error) {
58 for _, rec := range rs.recs {
62 for _, rec := range recs {
63 if !rec.Schema().Equal(rs.schema) {
65 return nil, fmt.Errorf("arrow/array: mismatch schema")
72 // Retain increases the reference count by 1.
73 // Retain may be called simultaneously from multiple goroutines.
74 func (rs *simpleRecords) Retain() {
75 atomic.AddInt64(&rs.refCount, 1)
78 // Release decreases the reference count by 1.
79 // When the reference count goes to zero, the memory is freed.
80 // Release may be called simultaneously from multiple goroutines.
81 func (rs *simpleRecords) Release() {
82 debug.Assert(atomic.LoadInt64(&rs.refCount) > 0, "too many releases")
84 if atomic.AddInt64(&rs.refCount, -1) == 0 {
88 for _, rec := range rs.recs {
95 func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema }
96 func (rs *simpleRecords) Record() Record { return rs.cur }
97 func (rs *simpleRecords) Next() bool {
98 if len(rs.recs) == 0 {
105 rs.recs = rs.recs[1:]
109 // Record is a collection of equal-length arrays
110 // matching a particular Schema.
111 type Record interface {
115 Schema() *arrow.Schema
120 Columns() []Interface
121 Column(i int) Interface
122 ColumnName(i int) string
124 // NewSlice constructs a zero-copy slice of the record with the indicated
125 // indices i and j, corresponding to array[i:j].
126 // The returned record must be Release()'d after use.
128 // NewSlice panics if the slice is outside the valid range of the record array.
129 // NewSlice panics if j < i.
130 NewSlice(i, j int64) Record
133 // simpleRecord is a basic, non-lazy in-memory record batch.
134 type simpleRecord struct {
143 // NewRecord returns a basic, non-lazy in-memory record batch.
145 // NewRecord panics if the columns and schema are inconsistent.
146 // NewRecord panics if rows is larger than the height of the columns.
147 func NewRecord(schema *arrow.Schema, cols []Interface, nrows int64) *simpleRecord {
148 rec := &simpleRecord{
152 arrs: make([]Interface, len(cols)),
155 for _, arr := range rec.arrs {
160 switch len(rec.arrs) {
164 rec.rows = int64(rec.arrs[0].Len())
168 err := rec.validate()
177 func (rec *simpleRecord) validate() error {
178 if len(rec.arrs) != len(rec.schema.Fields()) {
179 return fmt.Errorf("arrow/array: number of columns/fields mismatch")
182 for i, arr := range rec.arrs {
183 f := rec.schema.Field(i)
184 if int64(arr.Len()) < rec.rows {
185 return fmt.Errorf("arrow/array: mismatch number of rows in column %q: got=%d, want=%d",
190 if !arrow.TypeEqual(f.Type, arr.DataType()) {
191 return fmt.Errorf("arrow/array: column %q type mismatch: got=%v, want=%v",
193 arr.DataType(), f.Type,
200 // Retain increases the reference count by 1.
201 // Retain may be called simultaneously from multiple goroutines.
202 func (rec *simpleRecord) Retain() {
203 atomic.AddInt64(&rec.refCount, 1)
206 // Release decreases the reference count by 1.
207 // When the reference count goes to zero, the memory is freed.
208 // Release may be called simultaneously from multiple goroutines.
209 func (rec *simpleRecord) Release() {
210 debug.Assert(atomic.LoadInt64(&rec.refCount) > 0, "too many releases")
212 if atomic.AddInt64(&rec.refCount, -1) == 0 {
213 for _, arr := range rec.arrs {
220 func (rec *simpleRecord) Schema() *arrow.Schema { return rec.schema }
221 func (rec *simpleRecord) NumRows() int64 { return rec.rows }
222 func (rec *simpleRecord) NumCols() int64 { return int64(len(rec.arrs)) }
223 func (rec *simpleRecord) Columns() []Interface { return rec.arrs }
224 func (rec *simpleRecord) Column(i int) Interface { return rec.arrs[i] }
225 func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i).Name }
227 // NewSlice constructs a zero-copy slice of the record with the indicated
228 // indices i and j, corresponding to array[i:j].
229 // The returned record must be Release()'d after use.
231 // NewSlice panics if the slice is outside the valid range of the record array.
232 // NewSlice panics if j < i.
233 func (rec *simpleRecord) NewSlice(i, j int64) Record {
234 arrs := make([]Interface, len(rec.arrs))
235 for ii, arr := range rec.arrs {
236 arrs[ii] = NewSlice(arr, i, j)
239 for _, arr := range arrs {
243 return NewRecord(rec.schema, arrs, j-i)
246 func (rec *simpleRecord) String() string {
247 o := new(strings.Builder)
248 fmt.Fprintf(o, "record:\n %v\n", rec.schema)
249 fmt.Fprintf(o, " rows: %d\n", rec.rows)
250 for i, col := range rec.arrs {
251 fmt.Fprintf(o, " col[%d][%s]: %v\n", i, rec.schema.Field(i).Name, col)
257 // RecordBuilder eases the process of building a Record, iteratively, from
259 type RecordBuilder struct {
266 // NewRecordBuilder returns a builder, using the provided memory allocator and a schema.
267 func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder {
272 fields: make([]Builder, len(schema.Fields())),
275 for i, f := range schema.Fields() {
276 b.fields[i] = NewBuilder(b.mem, f.Type)
282 // Retain increases the reference count by 1.
283 // Retain may be called simultaneously from multiple goroutines.
284 func (b *RecordBuilder) Retain() {
285 atomic.AddInt64(&b.refCount, 1)
288 // Release decreases the reference count by 1.
289 func (b *RecordBuilder) Release() {
290 debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
292 if atomic.AddInt64(&b.refCount, -1) == 0 {
293 for _, f := range b.fields {
300 func (b *RecordBuilder) Schema() *arrow.Schema { return b.schema }
301 func (b *RecordBuilder) Fields() []Builder { return b.fields }
302 func (b *RecordBuilder) Field(i int) Builder { return b.fields[i] }
304 func (b *RecordBuilder) Reserve(size int) {
305 for _, f := range b.fields {
310 // NewRecord creates a new record from the memory buffers and resets the
311 // RecordBuilder so it can be used to build a new record.
313 // The returned Record must be Release()'d after use.
315 // NewRecord panics if the fields' builder do not have the same length.
316 func (b *RecordBuilder) NewRecord() Record {
317 cols := make([]Interface, len(b.fields))
320 defer func(cols []Interface) {
321 for _, col := range cols {
329 for i, f := range b.fields {
330 cols[i] = f.NewArray()
331 irow := int64(cols[i].Len())
332 if i > 0 && irow != rows {
333 panic(fmt.Errorf("arrow/array: field %d has %d rows. want=%d", i, irow, rows))
338 return NewRecord(b.schema, cols, rows)
342 _ Record = (*simpleRecord)(nil)
343 _ RecordReader = (*simpleRecords)(nil)