]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/go/arrow/array/table.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / arrow / array / table.go
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package array
18
19import (
20 "errors"
21 "fmt"
22 "math"
23 "sync/atomic"
24
25 "github.com/apache/arrow/go/v6/arrow"
26 "github.com/apache/arrow/go/v6/arrow/internal/debug"
27)
28
29// Table represents a logical sequence of chunked arrays.
30type Table interface {
31 Schema() *arrow.Schema
32 NumRows() int64
33 NumCols() int64
34 Column(i int) *Column
35
36 Retain()
37 Release()
38}
39
40// Column is an immutable column data structure consisting of
41// a field (type metadata) and a chunked data array.
42type Column struct {
43 field arrow.Field
44 data *Chunked
45}
46
47// NewColumn returns a column from a field and a chunked data array.
48//
49// NewColumn panics if the field's data type is inconsistent with the data type
50// of the chunked data array.
51func NewColumn(field arrow.Field, chunks *Chunked) *Column {
52 col := Column{
53 field: field,
54 data: chunks,
55 }
56 col.data.Retain()
57
58 if !arrow.TypeEqual(col.data.DataType(), col.field.Type) {
59 col.data.Release()
60 panic("arrow/array: inconsistent data type")
61 }
62
63 return &col
64}
65
66// Retain increases the reference count by 1.
67// Retain may be called simultaneously from multiple goroutines.
68func (col *Column) Retain() {
69 col.data.Retain()
70}
71
72// Release decreases the reference count by 1.
73// When the reference count goes to zero, the memory is freed.
74// Release may be called simultaneously from multiple goroutines.
75func (col *Column) Release() {
76 col.data.Release()
77}
78
79func (col *Column) Len() int { return col.data.Len() }
80func (col *Column) NullN() int { return col.data.NullN() }
81func (col *Column) Data() *Chunked { return col.data }
82func (col *Column) Field() arrow.Field { return col.field }
83func (col *Column) Name() string { return col.field.Name }
84func (col *Column) DataType() arrow.DataType { return col.field.Type }
85
86// NewSlice returns a new zero-copy slice of the column with the indicated
87// indices i and j, corresponding to the column's array[i:j].
88// The returned column must be Release()'d after use.
89//
90// NewSlice panics if the slice is outside the valid range of the column's array.
91// NewSlice panics if j < i.
92func (col *Column) NewSlice(i, j int64) *Column {
93 return &Column{
94 field: col.field,
95 data: col.data.NewSlice(i, j),
96 }
97}
98
99// Chunked manages a collection of primitives arrays as one logical large array.
100type Chunked struct {
101 refCount int64 // refCount must be first in the struct for 64 bit alignment and sync/atomic (https://github.com/golang/go/issues/37262)
102
103 chunks []Interface
104
105 length int
106 nulls int
107 dtype arrow.DataType
108}
109
110// NewChunked returns a new chunked array from the slice of arrays.
111//
112// NewChunked panics if the chunks do not have the same data type.
113func NewChunked(dtype arrow.DataType, chunks []Interface) *Chunked {
114 arr := &Chunked{
115 chunks: make([]Interface, len(chunks)),
116 refCount: 1,
117 dtype: dtype,
118 }
119 for i, chunk := range chunks {
120 if !arrow.TypeEqual(chunk.DataType(), dtype) {
121 panic("arrow/array: mismatch data type")
122 }
123 chunk.Retain()
124 arr.chunks[i] = chunk
125 arr.length += chunk.Len()
126 arr.nulls += chunk.NullN()
127 }
128 return arr
129}
130
131// Retain increases the reference count by 1.
132// Retain may be called simultaneously from multiple goroutines.
133func (a *Chunked) Retain() {
134 atomic.AddInt64(&a.refCount, 1)
135}
136
137// Release decreases the reference count by 1.
138// When the reference count goes to zero, the memory is freed.
139// Release may be called simultaneously from multiple goroutines.
140func (a *Chunked) Release() {
141 debug.Assert(atomic.LoadInt64(&a.refCount) > 0, "too many releases")
142
143 if atomic.AddInt64(&a.refCount, -1) == 0 {
144 for _, arr := range a.chunks {
145 arr.Release()
146 }
147 a.chunks = nil
148 a.length = 0
149 a.nulls = 0
150 }
151}
152
153func (a *Chunked) Len() int { return a.length }
154func (a *Chunked) NullN() int { return a.nulls }
155func (a *Chunked) DataType() arrow.DataType { return a.dtype }
156func (a *Chunked) Chunks() []Interface { return a.chunks }
157func (a *Chunked) Chunk(i int) Interface { return a.chunks[i] }
158
159// NewSlice constructs a zero-copy slice of the chunked array with the indicated
160// indices i and j, corresponding to array[i:j].
161// The returned chunked array must be Release()'d after use.
162//
163// NewSlice panics if the slice is outside the valid range of the input array.
164// NewSlice panics if j < i.
165func (a *Chunked) NewSlice(i, j int64) *Chunked {
166 if j > int64(a.length) || i > j || i > int64(a.length) {
167 panic("arrow/array: index out of range")
168 }
169
170 var (
171 cur = 0
172 beg = i
173 sz = j - i
174 chunks = make([]Interface, 0, len(a.chunks))
175 )
176
177 for cur < len(a.chunks) && beg >= int64(a.chunks[cur].Len()) {
178 beg -= int64(a.chunks[cur].Len())
179 cur++
180 }
181
182 for cur < len(a.chunks) && sz > 0 {
183 arr := a.chunks[cur]
184 end := beg + sz
185 if end > int64(arr.Len()) {
186 end = int64(arr.Len())
187 }
188 chunks = append(chunks, NewSlice(arr, beg, end))
189 sz -= int64(arr.Len()) - beg
190 beg = 0
191 cur++
192 }
193 chunks = chunks[:len(chunks):len(chunks)]
194 defer func() {
195 for _, chunk := range chunks {
196 chunk.Release()
197 }
198 }()
199
200 return NewChunked(a.dtype, chunks)
201}
202
203// simpleTable is a basic, non-lazy in-memory table.
204type simpleTable struct {
205 refCount int64
206
207 rows int64
208 cols []Column
209
210 schema *arrow.Schema
211}
212
213// NewTable returns a new basic, non-lazy in-memory table.
214// If rows is negative, the number of rows will be inferred from the height
215// of the columns.
216//
217// NewTable panics if the columns and schema are inconsistent.
218// NewTable panics if rows is larger than the height of the columns.
219func NewTable(schema *arrow.Schema, cols []Column, rows int64) *simpleTable {
220 tbl := simpleTable{
221 refCount: 1,
222 rows: rows,
223 cols: cols,
224 schema: schema,
225 }
226
227 if tbl.rows < 0 {
228 switch len(tbl.cols) {
229 case 0:
230 tbl.rows = 0
231 default:
232 tbl.rows = int64(tbl.cols[0].Len())
233 }
234 }
235
236 // validate the table and its constituents.
237 // note we retain the columns after having validated the table
238 // in case the validation fails and panics (and would otherwise leak
239 // a ref-count on the columns.)
240 tbl.validate()
241
242 for i := range tbl.cols {
243 tbl.cols[i].Retain()
244 }
245
246 return &tbl
247}
248
249// NewTableFromRecords returns a new basic, non-lazy in-memory table.
250//
251// NewTableFromRecords panics if the records and schema are inconsistent.
252func NewTableFromRecords(schema *arrow.Schema, recs []Record) *simpleTable {
253 arrs := make([]Interface, len(recs))
254 cols := make([]Column, len(schema.Fields()))
255
256 defer func(cols []Column) {
257 for i := range cols {
258 cols[i].Release()
259 }
260 }(cols)
261
262 for i := range cols {
263 field := schema.Field(i)
264 for j, rec := range recs {
265 arrs[j] = rec.Column(i)
266 }
267 chunk := NewChunked(field.Type, arrs)
268 cols[i] = *NewColumn(field, chunk)
269 chunk.Release()
270 }
271
272 return NewTable(schema, cols, -1)
273}
274
275func (tbl *simpleTable) Schema() *arrow.Schema { return tbl.schema }
276func (tbl *simpleTable) NumRows() int64 { return tbl.rows }
277func (tbl *simpleTable) NumCols() int64 { return int64(len(tbl.cols)) }
278func (tbl *simpleTable) Column(i int) *Column { return &tbl.cols[i] }
279
280func (tbl *simpleTable) validate() {
281 if len(tbl.cols) != len(tbl.schema.Fields()) {
282 panic(errors.New("arrow/array: table schema mismatch"))
283 }
284 for i, col := range tbl.cols {
285 if !col.field.Equal(tbl.schema.Field(i)) {
286 panic(fmt.Errorf("arrow/array: column field %q is inconsistent with schema", col.Name()))
287 }
288
289 if int64(col.Len()) < tbl.rows {
290 panic(fmt.Errorf("arrow/array: column %q expected length >= %d but got length %d", col.Name(), tbl.rows, col.Len()))
291 }
292 }
293}
294
295// Retain increases the reference count by 1.
296// Retain may be called simultaneously from multiple goroutines.
297func (tbl *simpleTable) Retain() {
298 atomic.AddInt64(&tbl.refCount, 1)
299}
300
301// Release decreases the reference count by 1.
302// When the reference count goes to zero, the memory is freed.
303// Release may be called simultaneously from multiple goroutines.
304func (tbl *simpleTable) Release() {
305 debug.Assert(atomic.LoadInt64(&tbl.refCount) > 0, "too many releases")
306
307 if atomic.AddInt64(&tbl.refCount, -1) == 0 {
308 for i := range tbl.cols {
309 tbl.cols[i].Release()
310 }
311 tbl.cols = nil
312 }
313}
314
315// TableReader is a Record iterator over a (possibly chunked) Table
316type TableReader struct {
317 refCount int64
318
319 tbl Table
320 cur int64 // current row
321 max int64 // total number of rows
322 rec Record // current Record
323 chksz int64 // chunk size
324
325 chunks []*Chunked
326 slots []int // chunk indices
327 offsets []int64 // chunk offsets
328}
329
330// NewTableReader returns a new TableReader to iterate over the (possibly chunked) Table.
331// if chunkSize is <= 0, the biggest possible chunk will be selected.
332func NewTableReader(tbl Table, chunkSize int64) *TableReader {
333 ncols := tbl.NumCols()
334 tr := &TableReader{
335 refCount: 1,
336 tbl: tbl,
337 cur: 0,
338 max: int64(tbl.NumRows()),
339 chksz: chunkSize,
340 chunks: make([]*Chunked, ncols),
341 slots: make([]int, ncols),
342 offsets: make([]int64, ncols),
343 }
344 tr.tbl.Retain()
345
346 if tr.chksz <= 0 {
347 tr.chksz = math.MaxInt64
348 }
349
350 for i := range tr.chunks {
351 col := tr.tbl.Column(i)
352 tr.chunks[i] = col.Data()
353 tr.chunks[i].Retain()
354 }
355 return tr
356}
357
358func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() }
359func (tr *TableReader) Record() Record { return tr.rec }
360
361func (tr *TableReader) Next() bool {
362 if tr.cur >= tr.max {
363 return false
364 }
365
366 if tr.rec != nil {
367 tr.rec.Release()
368 }
369
370 // determine the minimum contiguous slice across all columns
371 chunksz := imin64(tr.max, tr.chksz)
372 chunks := make([]Interface, len(tr.chunks))
373 for i := range chunks {
374 j := tr.slots[i]
375 chunk := tr.chunks[i].Chunk(j)
376 remain := int64(chunk.Len()) - tr.offsets[i]
377 if remain < chunksz {
378 chunksz = remain
379 }
380
381 chunks[i] = chunk
382 }
383
384 // slice the chunks, advance each chunk slot as appropriate.
385 batch := make([]Interface, len(tr.chunks))
386 for i, chunk := range chunks {
387 var slice Interface
388 offset := tr.offsets[i]
389 switch int64(chunk.Len()) - offset {
390 case chunksz:
391 tr.slots[i]++
392 tr.offsets[i] = 0
393 if offset > 0 {
394 // need to slice
395 slice = NewSlice(chunk, offset, offset+chunksz)
396 } else {
397 // no need to slice
398 slice = chunk
399 slice.Retain()
400 }
401 default:
402 tr.offsets[i] += chunksz
403 slice = NewSlice(chunk, offset, offset+chunksz)
404 }
405 batch[i] = slice
406 }
407
408 tr.cur += chunksz
409 tr.rec = NewRecord(tr.tbl.Schema(), batch, chunksz)
410
411 for _, arr := range batch {
412 arr.Release()
413 }
414
415 return true
416}
417
418// Retain increases the reference count by 1.
419// Retain may be called simultaneously from multiple goroutines.
420func (tr *TableReader) Retain() {
421 atomic.AddInt64(&tr.refCount, 1)
422}
423
424// Release decreases the reference count by 1.
425// When the reference count goes to zero, the memory is freed.
426// Release may be called simultaneously from multiple goroutines.
427func (tr *TableReader) Release() {
428 debug.Assert(atomic.LoadInt64(&tr.refCount) > 0, "too many releases")
429
430 if atomic.AddInt64(&tr.refCount, -1) == 0 {
431 tr.tbl.Release()
432 for _, chk := range tr.chunks {
433 chk.Release()
434 }
435 if tr.rec != nil {
436 tr.rec.Release()
437 }
438 tr.tbl = nil
439 tr.chunks = nil
440 tr.slots = nil
441 tr.offsets = nil
442 }
443}
444
445func imin64(a, b int64) int64 {
446 if a < b {
447 return a
448 }
449 return b
450}
451
452var (
453 _ Table = (*simpleTable)(nil)
454 _ RecordReader = (*TableReader)(nil)
455)