]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/arrow/array/record.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / arrow / array / record.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package array
18
19 import (
20 "fmt"
21 "strings"
22 "sync/atomic"
23
24 "github.com/apache/arrow/go/v6/arrow"
25 "github.com/apache/arrow/go/v6/arrow/internal/debug"
26 "github.com/apache/arrow/go/v6/arrow/memory"
27 )
28
29 // RecordReader reads a stream of records.
30 type RecordReader interface {
31 Retain()
32 Release()
33
34 Schema() *arrow.Schema
35
36 Next() bool
37 Record() Record
38 }
39
40 // simpleRecords is a simple iterator over a collection of records.
41 type simpleRecords struct {
42 refCount int64
43
44 schema *arrow.Schema
45 recs []Record
46 cur Record
47 }
48
49 // NewRecordReader returns a simple iterator over the given slice of records.
50 func NewRecordReader(schema *arrow.Schema, recs []Record) (*simpleRecords, error) {
51 rs := &simpleRecords{
52 refCount: 1,
53 schema: schema,
54 recs: recs,
55 cur: nil,
56 }
57
58 for _, rec := range rs.recs {
59 rec.Retain()
60 }
61
62 for _, rec := range recs {
63 if !rec.Schema().Equal(rs.schema) {
64 rs.Release()
65 return nil, fmt.Errorf("arrow/array: mismatch schema")
66 }
67 }
68
69 return rs, nil
70 }
71
72 // Retain increases the reference count by 1.
73 // Retain may be called simultaneously from multiple goroutines.
74 func (rs *simpleRecords) Retain() {
75 atomic.AddInt64(&rs.refCount, 1)
76 }
77
78 // Release decreases the reference count by 1.
79 // When the reference count goes to zero, the memory is freed.
80 // Release may be called simultaneously from multiple goroutines.
81 func (rs *simpleRecords) Release() {
82 debug.Assert(atomic.LoadInt64(&rs.refCount) > 0, "too many releases")
83
84 if atomic.AddInt64(&rs.refCount, -1) == 0 {
85 if rs.cur != nil {
86 rs.cur.Release()
87 }
88 for _, rec := range rs.recs {
89 rec.Release()
90 }
91 rs.recs = nil
92 }
93 }
94
95 func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema }
96 func (rs *simpleRecords) Record() Record { return rs.cur }
97 func (rs *simpleRecords) Next() bool {
98 if len(rs.recs) == 0 {
99 return false
100 }
101 if rs.cur != nil {
102 rs.cur.Release()
103 }
104 rs.cur = rs.recs[0]
105 rs.recs = rs.recs[1:]
106 return true
107 }
108
109 // Record is a collection of equal-length arrays
110 // matching a particular Schema.
111 type Record interface {
112 Release()
113 Retain()
114
115 Schema() *arrow.Schema
116
117 NumRows() int64
118 NumCols() int64
119
120 Columns() []Interface
121 Column(i int) Interface
122 ColumnName(i int) string
123
124 // NewSlice constructs a zero-copy slice of the record with the indicated
125 // indices i and j, corresponding to array[i:j].
126 // The returned record must be Release()'d after use.
127 //
128 // NewSlice panics if the slice is outside the valid range of the record array.
129 // NewSlice panics if j < i.
130 NewSlice(i, j int64) Record
131 }
132
133 // simpleRecord is a basic, non-lazy in-memory record batch.
134 type simpleRecord struct {
135 refCount int64
136
137 schema *arrow.Schema
138
139 rows int64
140 arrs []Interface
141 }
142
143 // NewRecord returns a basic, non-lazy in-memory record batch.
144 //
145 // NewRecord panics if the columns and schema are inconsistent.
146 // NewRecord panics if rows is larger than the height of the columns.
147 func NewRecord(schema *arrow.Schema, cols []Interface, nrows int64) *simpleRecord {
148 rec := &simpleRecord{
149 refCount: 1,
150 schema: schema,
151 rows: nrows,
152 arrs: make([]Interface, len(cols)),
153 }
154 copy(rec.arrs, cols)
155 for _, arr := range rec.arrs {
156 arr.Retain()
157 }
158
159 if rec.rows < 0 {
160 switch len(rec.arrs) {
161 case 0:
162 rec.rows = 0
163 default:
164 rec.rows = int64(rec.arrs[0].Len())
165 }
166 }
167
168 err := rec.validate()
169 if err != nil {
170 rec.Release()
171 panic(err)
172 }
173
174 return rec
175 }
176
177 func (rec *simpleRecord) validate() error {
178 if len(rec.arrs) != len(rec.schema.Fields()) {
179 return fmt.Errorf("arrow/array: number of columns/fields mismatch")
180 }
181
182 for i, arr := range rec.arrs {
183 f := rec.schema.Field(i)
184 if int64(arr.Len()) < rec.rows {
185 return fmt.Errorf("arrow/array: mismatch number of rows in column %q: got=%d, want=%d",
186 f.Name,
187 arr.Len(), rec.rows,
188 )
189 }
190 if !arrow.TypeEqual(f.Type, arr.DataType()) {
191 return fmt.Errorf("arrow/array: column %q type mismatch: got=%v, want=%v",
192 f.Name,
193 arr.DataType(), f.Type,
194 )
195 }
196 }
197 return nil
198 }
199
200 // Retain increases the reference count by 1.
201 // Retain may be called simultaneously from multiple goroutines.
202 func (rec *simpleRecord) Retain() {
203 atomic.AddInt64(&rec.refCount, 1)
204 }
205
206 // Release decreases the reference count by 1.
207 // When the reference count goes to zero, the memory is freed.
208 // Release may be called simultaneously from multiple goroutines.
209 func (rec *simpleRecord) Release() {
210 debug.Assert(atomic.LoadInt64(&rec.refCount) > 0, "too many releases")
211
212 if atomic.AddInt64(&rec.refCount, -1) == 0 {
213 for _, arr := range rec.arrs {
214 arr.Release()
215 }
216 rec.arrs = nil
217 }
218 }
219
220 func (rec *simpleRecord) Schema() *arrow.Schema { return rec.schema }
221 func (rec *simpleRecord) NumRows() int64 { return rec.rows }
222 func (rec *simpleRecord) NumCols() int64 { return int64(len(rec.arrs)) }
223 func (rec *simpleRecord) Columns() []Interface { return rec.arrs }
224 func (rec *simpleRecord) Column(i int) Interface { return rec.arrs[i] }
225 func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i).Name }
226
227 // NewSlice constructs a zero-copy slice of the record with the indicated
228 // indices i and j, corresponding to array[i:j].
229 // The returned record must be Release()'d after use.
230 //
231 // NewSlice panics if the slice is outside the valid range of the record array.
232 // NewSlice panics if j < i.
233 func (rec *simpleRecord) NewSlice(i, j int64) Record {
234 arrs := make([]Interface, len(rec.arrs))
235 for ii, arr := range rec.arrs {
236 arrs[ii] = NewSlice(arr, i, j)
237 }
238 defer func() {
239 for _, arr := range arrs {
240 arr.Release()
241 }
242 }()
243 return NewRecord(rec.schema, arrs, j-i)
244 }
245
246 func (rec *simpleRecord) String() string {
247 o := new(strings.Builder)
248 fmt.Fprintf(o, "record:\n %v\n", rec.schema)
249 fmt.Fprintf(o, " rows: %d\n", rec.rows)
250 for i, col := range rec.arrs {
251 fmt.Fprintf(o, " col[%d][%s]: %v\n", i, rec.schema.Field(i).Name, col)
252 }
253
254 return o.String()
255 }
256
257 // RecordBuilder eases the process of building a Record, iteratively, from
258 // a known Schema.
259 type RecordBuilder struct {
260 refCount int64
261 mem memory.Allocator
262 schema *arrow.Schema
263 fields []Builder
264 }
265
266 // NewRecordBuilder returns a builder, using the provided memory allocator and a schema.
267 func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder {
268 b := &RecordBuilder{
269 refCount: 1,
270 mem: mem,
271 schema: schema,
272 fields: make([]Builder, len(schema.Fields())),
273 }
274
275 for i, f := range schema.Fields() {
276 b.fields[i] = NewBuilder(b.mem, f.Type)
277 }
278
279 return b
280 }
281
282 // Retain increases the reference count by 1.
283 // Retain may be called simultaneously from multiple goroutines.
284 func (b *RecordBuilder) Retain() {
285 atomic.AddInt64(&b.refCount, 1)
286 }
287
288 // Release decreases the reference count by 1.
289 func (b *RecordBuilder) Release() {
290 debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
291
292 if atomic.AddInt64(&b.refCount, -1) == 0 {
293 for _, f := range b.fields {
294 f.Release()
295 }
296 b.fields = nil
297 }
298 }
299
300 func (b *RecordBuilder) Schema() *arrow.Schema { return b.schema }
301 func (b *RecordBuilder) Fields() []Builder { return b.fields }
302 func (b *RecordBuilder) Field(i int) Builder { return b.fields[i] }
303
304 func (b *RecordBuilder) Reserve(size int) {
305 for _, f := range b.fields {
306 f.Reserve(size)
307 }
308 }
309
310 // NewRecord creates a new record from the memory buffers and resets the
311 // RecordBuilder so it can be used to build a new record.
312 //
313 // The returned Record must be Release()'d after use.
314 //
315 // NewRecord panics if the fields' builder do not have the same length.
316 func (b *RecordBuilder) NewRecord() Record {
317 cols := make([]Interface, len(b.fields))
318 rows := int64(0)
319
320 defer func(cols []Interface) {
321 for _, col := range cols {
322 if col == nil {
323 continue
324 }
325 col.Release()
326 }
327 }(cols)
328
329 for i, f := range b.fields {
330 cols[i] = f.NewArray()
331 irow := int64(cols[i].Len())
332 if i > 0 && irow != rows {
333 panic(fmt.Errorf("arrow/array: field %d has %d rows. want=%d", i, irow, rows))
334 }
335 rows = irow
336 }
337
338 return NewRecord(b.schema, cols, rows)
339 }
340
341 var (
342 _ Record = (*simpleRecord)(nil)
343 _ RecordReader = (*simpleRecords)(nil)
344 )