]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, software | |
12 | // distributed under the License is distributed on an "AS IS" BASIS, | |
13 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | // See the License for the specific language governing permissions and | |
15 | // limitations under the License. | |
16 | ||
17 | package array | |
18 | ||
19 | import ( | |
20 | "errors" | |
21 | "fmt" | |
22 | "math" | |
23 | "sync/atomic" | |
24 | ||
25 | "github.com/apache/arrow/go/v6/arrow" | |
26 | "github.com/apache/arrow/go/v6/arrow/internal/debug" | |
27 | ) | |
28 | ||
29 | // Table represents a logical sequence of chunked arrays. | |
30 | type Table interface { | |
31 | Schema() *arrow.Schema | |
32 | NumRows() int64 | |
33 | NumCols() int64 | |
34 | Column(i int) *Column | |
35 | ||
36 | Retain() | |
37 | Release() | |
38 | } | |
39 | ||
40 | // Column is an immutable column data structure consisting of | |
41 | // a field (type metadata) and a chunked data array. | |
42 | type Column struct { | |
43 | field arrow.Field | |
44 | data *Chunked | |
45 | } | |
46 | ||
47 | // NewColumn returns a column from a field and a chunked data array. | |
48 | // | |
49 | // NewColumn panics if the field's data type is inconsistent with the data type | |
50 | // of the chunked data array. | |
51 | func NewColumn(field arrow.Field, chunks *Chunked) *Column { | |
52 | col := Column{ | |
53 | field: field, | |
54 | data: chunks, | |
55 | } | |
56 | col.data.Retain() | |
57 | ||
58 | if !arrow.TypeEqual(col.data.DataType(), col.field.Type) { | |
59 | col.data.Release() | |
60 | panic("arrow/array: inconsistent data type") | |
61 | } | |
62 | ||
63 | return &col | |
64 | } | |
65 | ||
66 | // Retain increases the reference count by 1. | |
67 | // Retain may be called simultaneously from multiple goroutines. | |
68 | func (col *Column) Retain() { | |
69 | col.data.Retain() | |
70 | } | |
71 | ||
72 | // Release decreases the reference count by 1. | |
73 | // When the reference count goes to zero, the memory is freed. | |
74 | // Release may be called simultaneously from multiple goroutines. | |
75 | func (col *Column) Release() { | |
76 | col.data.Release() | |
77 | } | |
78 | ||
79 | func (col *Column) Len() int { return col.data.Len() } | |
80 | func (col *Column) NullN() int { return col.data.NullN() } | |
81 | func (col *Column) Data() *Chunked { return col.data } | |
82 | func (col *Column) Field() arrow.Field { return col.field } | |
83 | func (col *Column) Name() string { return col.field.Name } | |
84 | func (col *Column) DataType() arrow.DataType { return col.field.Type } | |
85 | ||
86 | // NewSlice returns a new zero-copy slice of the column with the indicated | |
87 | // indices i and j, corresponding to the column's array[i:j]. | |
88 | // The returned column must be Release()'d after use. | |
89 | // | |
90 | // NewSlice panics if the slice is outside the valid range of the column's array. | |
91 | // NewSlice panics if j < i. | |
92 | func (col *Column) NewSlice(i, j int64) *Column { | |
93 | return &Column{ | |
94 | field: col.field, | |
95 | data: col.data.NewSlice(i, j), | |
96 | } | |
97 | } | |
98 | ||
99 | // Chunked manages a collection of primitives arrays as one logical large array. | |
100 | type Chunked struct { | |
101 | refCount int64 // refCount must be first in the struct for 64 bit alignment and sync/atomic (https://github.com/golang/go/issues/37262) | |
102 | ||
103 | chunks []Interface | |
104 | ||
105 | length int | |
106 | nulls int | |
107 | dtype arrow.DataType | |
108 | } | |
109 | ||
110 | // NewChunked returns a new chunked array from the slice of arrays. | |
111 | // | |
112 | // NewChunked panics if the chunks do not have the same data type. | |
113 | func NewChunked(dtype arrow.DataType, chunks []Interface) *Chunked { | |
114 | arr := &Chunked{ | |
115 | chunks: make([]Interface, len(chunks)), | |
116 | refCount: 1, | |
117 | dtype: dtype, | |
118 | } | |
119 | for i, chunk := range chunks { | |
120 | if !arrow.TypeEqual(chunk.DataType(), dtype) { | |
121 | panic("arrow/array: mismatch data type") | |
122 | } | |
123 | chunk.Retain() | |
124 | arr.chunks[i] = chunk | |
125 | arr.length += chunk.Len() | |
126 | arr.nulls += chunk.NullN() | |
127 | } | |
128 | return arr | |
129 | } | |
130 | ||
131 | // Retain increases the reference count by 1. | |
132 | // Retain may be called simultaneously from multiple goroutines. | |
133 | func (a *Chunked) Retain() { | |
134 | atomic.AddInt64(&a.refCount, 1) | |
135 | } | |
136 | ||
137 | // Release decreases the reference count by 1. | |
138 | // When the reference count goes to zero, the memory is freed. | |
139 | // Release may be called simultaneously from multiple goroutines. | |
140 | func (a *Chunked) Release() { | |
141 | debug.Assert(atomic.LoadInt64(&a.refCount) > 0, "too many releases") | |
142 | ||
143 | if atomic.AddInt64(&a.refCount, -1) == 0 { | |
144 | for _, arr := range a.chunks { | |
145 | arr.Release() | |
146 | } | |
147 | a.chunks = nil | |
148 | a.length = 0 | |
149 | a.nulls = 0 | |
150 | } | |
151 | } | |
152 | ||
153 | func (a *Chunked) Len() int { return a.length } | |
154 | func (a *Chunked) NullN() int { return a.nulls } | |
155 | func (a *Chunked) DataType() arrow.DataType { return a.dtype } | |
156 | func (a *Chunked) Chunks() []Interface { return a.chunks } | |
157 | func (a *Chunked) Chunk(i int) Interface { return a.chunks[i] } | |
158 | ||
159 | // NewSlice constructs a zero-copy slice of the chunked array with the indicated | |
160 | // indices i and j, corresponding to array[i:j]. | |
161 | // The returned chunked array must be Release()'d after use. | |
162 | // | |
163 | // NewSlice panics if the slice is outside the valid range of the input array. | |
164 | // NewSlice panics if j < i. | |
165 | func (a *Chunked) NewSlice(i, j int64) *Chunked { | |
166 | if j > int64(a.length) || i > j || i > int64(a.length) { | |
167 | panic("arrow/array: index out of range") | |
168 | } | |
169 | ||
170 | var ( | |
171 | cur = 0 | |
172 | beg = i | |
173 | sz = j - i | |
174 | chunks = make([]Interface, 0, len(a.chunks)) | |
175 | ) | |
176 | ||
177 | for cur < len(a.chunks) && beg >= int64(a.chunks[cur].Len()) { | |
178 | beg -= int64(a.chunks[cur].Len()) | |
179 | cur++ | |
180 | } | |
181 | ||
182 | for cur < len(a.chunks) && sz > 0 { | |
183 | arr := a.chunks[cur] | |
184 | end := beg + sz | |
185 | if end > int64(arr.Len()) { | |
186 | end = int64(arr.Len()) | |
187 | } | |
188 | chunks = append(chunks, NewSlice(arr, beg, end)) | |
189 | sz -= int64(arr.Len()) - beg | |
190 | beg = 0 | |
191 | cur++ | |
192 | } | |
193 | chunks = chunks[:len(chunks):len(chunks)] | |
194 | defer func() { | |
195 | for _, chunk := range chunks { | |
196 | chunk.Release() | |
197 | } | |
198 | }() | |
199 | ||
200 | return NewChunked(a.dtype, chunks) | |
201 | } | |
202 | ||
203 | // simpleTable is a basic, non-lazy in-memory table. | |
204 | type simpleTable struct { | |
205 | refCount int64 | |
206 | ||
207 | rows int64 | |
208 | cols []Column | |
209 | ||
210 | schema *arrow.Schema | |
211 | } | |
212 | ||
213 | // NewTable returns a new basic, non-lazy in-memory table. | |
214 | // If rows is negative, the number of rows will be inferred from the height | |
215 | // of the columns. | |
216 | // | |
217 | // NewTable panics if the columns and schema are inconsistent. | |
218 | // NewTable panics if rows is larger than the height of the columns. | |
219 | func NewTable(schema *arrow.Schema, cols []Column, rows int64) *simpleTable { | |
220 | tbl := simpleTable{ | |
221 | refCount: 1, | |
222 | rows: rows, | |
223 | cols: cols, | |
224 | schema: schema, | |
225 | } | |
226 | ||
227 | if tbl.rows < 0 { | |
228 | switch len(tbl.cols) { | |
229 | case 0: | |
230 | tbl.rows = 0 | |
231 | default: | |
232 | tbl.rows = int64(tbl.cols[0].Len()) | |
233 | } | |
234 | } | |
235 | ||
236 | // validate the table and its constituents. | |
237 | // note we retain the columns after having validated the table | |
238 | // in case the validation fails and panics (and would otherwise leak | |
239 | // a ref-count on the columns.) | |
240 | tbl.validate() | |
241 | ||
242 | for i := range tbl.cols { | |
243 | tbl.cols[i].Retain() | |
244 | } | |
245 | ||
246 | return &tbl | |
247 | } | |
248 | ||
249 | // NewTableFromRecords returns a new basic, non-lazy in-memory table. | |
250 | // | |
251 | // NewTableFromRecords panics if the records and schema are inconsistent. | |
252 | func NewTableFromRecords(schema *arrow.Schema, recs []Record) *simpleTable { | |
253 | arrs := make([]Interface, len(recs)) | |
254 | cols := make([]Column, len(schema.Fields())) | |
255 | ||
256 | defer func(cols []Column) { | |
257 | for i := range cols { | |
258 | cols[i].Release() | |
259 | } | |
260 | }(cols) | |
261 | ||
262 | for i := range cols { | |
263 | field := schema.Field(i) | |
264 | for j, rec := range recs { | |
265 | arrs[j] = rec.Column(i) | |
266 | } | |
267 | chunk := NewChunked(field.Type, arrs) | |
268 | cols[i] = *NewColumn(field, chunk) | |
269 | chunk.Release() | |
270 | } | |
271 | ||
272 | return NewTable(schema, cols, -1) | |
273 | } | |
274 | ||
275 | func (tbl *simpleTable) Schema() *arrow.Schema { return tbl.schema } | |
276 | func (tbl *simpleTable) NumRows() int64 { return tbl.rows } | |
277 | func (tbl *simpleTable) NumCols() int64 { return int64(len(tbl.cols)) } | |
278 | func (tbl *simpleTable) Column(i int) *Column { return &tbl.cols[i] } | |
279 | ||
280 | func (tbl *simpleTable) validate() { | |
281 | if len(tbl.cols) != len(tbl.schema.Fields()) { | |
282 | panic(errors.New("arrow/array: table schema mismatch")) | |
283 | } | |
284 | for i, col := range tbl.cols { | |
285 | if !col.field.Equal(tbl.schema.Field(i)) { | |
286 | panic(fmt.Errorf("arrow/array: column field %q is inconsistent with schema", col.Name())) | |
287 | } | |
288 | ||
289 | if int64(col.Len()) < tbl.rows { | |
290 | panic(fmt.Errorf("arrow/array: column %q expected length >= %d but got length %d", col.Name(), tbl.rows, col.Len())) | |
291 | } | |
292 | } | |
293 | } | |
294 | ||
295 | // Retain increases the reference count by 1. | |
296 | // Retain may be called simultaneously from multiple goroutines. | |
297 | func (tbl *simpleTable) Retain() { | |
298 | atomic.AddInt64(&tbl.refCount, 1) | |
299 | } | |
300 | ||
301 | // Release decreases the reference count by 1. | |
302 | // When the reference count goes to zero, the memory is freed. | |
303 | // Release may be called simultaneously from multiple goroutines. | |
304 | func (tbl *simpleTable) Release() { | |
305 | debug.Assert(atomic.LoadInt64(&tbl.refCount) > 0, "too many releases") | |
306 | ||
307 | if atomic.AddInt64(&tbl.refCount, -1) == 0 { | |
308 | for i := range tbl.cols { | |
309 | tbl.cols[i].Release() | |
310 | } | |
311 | tbl.cols = nil | |
312 | } | |
313 | } | |
314 | ||
315 | // TableReader is a Record iterator over a (possibly chunked) Table | |
316 | type TableReader struct { | |
317 | refCount int64 | |
318 | ||
319 | tbl Table | |
320 | cur int64 // current row | |
321 | max int64 // total number of rows | |
322 | rec Record // current Record | |
323 | chksz int64 // chunk size | |
324 | ||
325 | chunks []*Chunked | |
326 | slots []int // chunk indices | |
327 | offsets []int64 // chunk offsets | |
328 | } | |
329 | ||
330 | // NewTableReader returns a new TableReader to iterate over the (possibly chunked) Table. | |
331 | // if chunkSize is <= 0, the biggest possible chunk will be selected. | |
332 | func NewTableReader(tbl Table, chunkSize int64) *TableReader { | |
333 | ncols := tbl.NumCols() | |
334 | tr := &TableReader{ | |
335 | refCount: 1, | |
336 | tbl: tbl, | |
337 | cur: 0, | |
338 | max: int64(tbl.NumRows()), | |
339 | chksz: chunkSize, | |
340 | chunks: make([]*Chunked, ncols), | |
341 | slots: make([]int, ncols), | |
342 | offsets: make([]int64, ncols), | |
343 | } | |
344 | tr.tbl.Retain() | |
345 | ||
346 | if tr.chksz <= 0 { | |
347 | tr.chksz = math.MaxInt64 | |
348 | } | |
349 | ||
350 | for i := range tr.chunks { | |
351 | col := tr.tbl.Column(i) | |
352 | tr.chunks[i] = col.Data() | |
353 | tr.chunks[i].Retain() | |
354 | } | |
355 | return tr | |
356 | } | |
357 | ||
358 | func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() } | |
359 | func (tr *TableReader) Record() Record { return tr.rec } | |
360 | ||
361 | func (tr *TableReader) Next() bool { | |
362 | if tr.cur >= tr.max { | |
363 | return false | |
364 | } | |
365 | ||
366 | if tr.rec != nil { | |
367 | tr.rec.Release() | |
368 | } | |
369 | ||
370 | // determine the minimum contiguous slice across all columns | |
371 | chunksz := imin64(tr.max, tr.chksz) | |
372 | chunks := make([]Interface, len(tr.chunks)) | |
373 | for i := range chunks { | |
374 | j := tr.slots[i] | |
375 | chunk := tr.chunks[i].Chunk(j) | |
376 | remain := int64(chunk.Len()) - tr.offsets[i] | |
377 | if remain < chunksz { | |
378 | chunksz = remain | |
379 | } | |
380 | ||
381 | chunks[i] = chunk | |
382 | } | |
383 | ||
384 | // slice the chunks, advance each chunk slot as appropriate. | |
385 | batch := make([]Interface, len(tr.chunks)) | |
386 | for i, chunk := range chunks { | |
387 | var slice Interface | |
388 | offset := tr.offsets[i] | |
389 | switch int64(chunk.Len()) - offset { | |
390 | case chunksz: | |
391 | tr.slots[i]++ | |
392 | tr.offsets[i] = 0 | |
393 | if offset > 0 { | |
394 | // need to slice | |
395 | slice = NewSlice(chunk, offset, offset+chunksz) | |
396 | } else { | |
397 | // no need to slice | |
398 | slice = chunk | |
399 | slice.Retain() | |
400 | } | |
401 | default: | |
402 | tr.offsets[i] += chunksz | |
403 | slice = NewSlice(chunk, offset, offset+chunksz) | |
404 | } | |
405 | batch[i] = slice | |
406 | } | |
407 | ||
408 | tr.cur += chunksz | |
409 | tr.rec = NewRecord(tr.tbl.Schema(), batch, chunksz) | |
410 | ||
411 | for _, arr := range batch { | |
412 | arr.Release() | |
413 | } | |
414 | ||
415 | return true | |
416 | } | |
417 | ||
418 | // Retain increases the reference count by 1. | |
419 | // Retain may be called simultaneously from multiple goroutines. | |
420 | func (tr *TableReader) Retain() { | |
421 | atomic.AddInt64(&tr.refCount, 1) | |
422 | } | |
423 | ||
424 | // Release decreases the reference count by 1. | |
425 | // When the reference count goes to zero, the memory is freed. | |
426 | // Release may be called simultaneously from multiple goroutines. | |
427 | func (tr *TableReader) Release() { | |
428 | debug.Assert(atomic.LoadInt64(&tr.refCount) > 0, "too many releases") | |
429 | ||
430 | if atomic.AddInt64(&tr.refCount, -1) == 0 { | |
431 | tr.tbl.Release() | |
432 | for _, chk := range tr.chunks { | |
433 | chk.Release() | |
434 | } | |
435 | if tr.rec != nil { | |
436 | tr.rec.Release() | |
437 | } | |
438 | tr.tbl = nil | |
439 | tr.chunks = nil | |
440 | tr.slots = nil | |
441 | tr.offsets = nil | |
442 | } | |
443 | } | |
444 | ||
445 | func imin64(a, b int64) int64 { | |
446 | if a < b { | |
447 | return a | |
448 | } | |
449 | return b | |
450 | } | |
451 | ||
452 | var ( | |
453 | _ Table = (*simpleTable)(nil) | |
454 | _ RecordReader = (*TableReader)(nil) | |
455 | ) |