]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/arrow/array/record_test.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / arrow / array / record_test.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package array_test
18
19 import (
20 "fmt"
21 "reflect"
22 "testing"
23
24 "github.com/apache/arrow/go/v6/arrow"
25 "github.com/apache/arrow/go/v6/arrow/array"
26 "github.com/apache/arrow/go/v6/arrow/memory"
27 )
28
29 func TestRecord(t *testing.T) {
30 mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
31 defer mem.AssertSize(t, 0)
32
33 schema := arrow.NewSchema(
34 []arrow.Field{
35 arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
36 arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
37 },
38 nil,
39 )
40 col1 := func() array.Interface {
41 ib := array.NewInt32Builder(mem)
42 defer ib.Release()
43
44 ib.AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
45 return ib.NewInt32Array()
46 }()
47 defer col1.Release()
48
49 col2 := func() array.Interface {
50 b := array.NewFloat64Builder(mem)
51 defer b.Release()
52
53 b.AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
54 return b.NewFloat64Array()
55 }()
56 defer col2.Release()
57
58 cols := []array.Interface{col1, col2}
59 rec := array.NewRecord(schema, cols, -1)
60 defer rec.Release()
61
62 rec.Retain()
63 rec.Release()
64
65 if got, want := rec.Schema(), schema; !got.Equal(want) {
66 t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
67 }
68
69 if got, want := rec.NumRows(), int64(10); got != want {
70 t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
71 }
72 if got, want := rec.NumCols(), int64(2); got != want {
73 t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
74 }
75 if got, want := rec.Columns()[0], cols[0]; got != want {
76 t.Fatalf("invalid column: got=%q, want=%q", got, want)
77 }
78 if got, want := rec.Column(0), cols[0]; got != want {
79 t.Fatalf("invalid column: got=%q, want=%q", got, want)
80 }
81 if got, want := rec.ColumnName(0), schema.Field(0).Name; got != want {
82 t.Fatalf("invalid column name: got=%q, want=%q", got, want)
83 }
84
85 for _, tc := range []struct {
86 i, j int64
87 err error
88 }{
89 {i: 0, j: 10, err: nil},
90 {i: 1, j: 10, err: nil},
91 {i: 1, j: 9, err: nil},
92 {i: 0, j: 0, err: nil},
93 {i: 1, j: 1, err: nil},
94 {i: 10, j: 10, err: nil},
95 {i: 1, j: 0, err: fmt.Errorf("arrow/array: index out of range")},
96 {i: 1, j: 11, err: fmt.Errorf("arrow/array: index out of range")},
97 } {
98 t.Run(fmt.Sprintf("slice-%02d-%02d", tc.i, tc.j), func(t *testing.T) {
99 if tc.err != nil {
100 defer func() {
101 e := recover()
102 if e == nil {
103 t.Fatalf("expected an error %q", tc.err)
104 }
105 switch err := e.(type) {
106 case string:
107 if err != tc.err.Error() {
108 t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
109 }
110 case error:
111 if err.Error() != tc.err.Error() {
112 t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
113 }
114 default:
115 t.Fatalf("invalid type for panic message: %T (err=%v)", err, err)
116 }
117 }()
118 }
119 sub := rec.NewSlice(tc.i, tc.j)
120 defer sub.Release()
121
122 if got, want := sub.NumRows(), tc.j-tc.i; got != want {
123 t.Fatalf("invalid rec-slice number of rows: got=%d, want=%d", got, want)
124 }
125 })
126 }
127
128 for _, tc := range []struct {
129 schema *arrow.Schema
130 cols []array.Interface
131 rows int64
132 err error
133 }{
134 {
135 schema: schema,
136 cols: nil,
137 rows: -1,
138 err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
139 },
140 {
141 schema: schema,
142 cols: cols[:1],
143 rows: 0,
144 err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
145 },
146 {
147 schema: arrow.NewSchema(
148 []arrow.Field{
149 arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
150 },
151 nil,
152 ),
153 cols: cols,
154 rows: 0,
155 err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
156 },
157 {
158 schema: arrow.NewSchema(
159 []arrow.Field{
160 arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
161 arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Int32},
162 },
163 nil,
164 ),
165 cols: cols,
166 rows: 0,
167 err: fmt.Errorf(`arrow/array: column "f2-f64" type mismatch: got=float64, want=int32`),
168 },
169 {
170 schema: schema,
171 cols: cols,
172 rows: 11,
173 err: fmt.Errorf(`arrow/array: mismatch number of rows in column "f1-i32": got=10, want=11`),
174 },
175 {
176 schema: schema,
177 cols: cols,
178 rows: 10,
179 err: nil,
180 },
181 {
182 schema: schema,
183 cols: cols,
184 rows: 3,
185 err: nil,
186 },
187 {
188 schema: schema,
189 cols: cols,
190 rows: 0,
191 err: nil,
192 },
193 } {
194 t.Run("", func(t *testing.T) {
195 if tc.err != nil {
196 defer func() {
197 e := recover()
198 if e == nil {
199 t.Fatalf("expected an error %q", tc.err)
200 }
201 switch err := e.(type) {
202 case string:
203 if err != tc.err.Error() {
204 t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
205 }
206 case error:
207 if err.Error() != tc.err.Error() {
208 t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
209 }
210 default:
211 t.Fatalf("invalid type for panic message: %T (err=%v)", err, err)
212 }
213 }()
214 }
215 rec := array.NewRecord(tc.schema, tc.cols, tc.rows)
216 defer rec.Release()
217 if got, want := rec.NumRows(), tc.rows; got != want {
218 t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
219 }
220 })
221 }
222 }
223
224 func TestRecordReader(t *testing.T) {
225 mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
226 defer mem.AssertSize(t, 0)
227
228 schema := arrow.NewSchema(
229 []arrow.Field{
230 arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
231 arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
232 },
233 nil,
234 )
235 rec1 := func() array.Record {
236 col1 := func() array.Interface {
237 ib := array.NewInt32Builder(mem)
238 defer ib.Release()
239
240 ib.AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
241 return ib.NewInt32Array()
242 }()
243 defer col1.Release()
244
245 col2 := func() array.Interface {
246 b := array.NewFloat64Builder(mem)
247 defer b.Release()
248
249 b.AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
250 return b.NewFloat64Array()
251 }()
252 defer col2.Release()
253
254 cols := []array.Interface{col1, col2}
255 return array.NewRecord(schema, cols, -1)
256 }()
257 defer rec1.Release()
258
259 rec2 := func() array.Record {
260 col1 := func() array.Interface {
261 ib := array.NewInt32Builder(mem)
262 defer ib.Release()
263
264 ib.AppendValues([]int32{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil)
265 return ib.NewInt32Array()
266 }()
267 defer col1.Release()
268
269 col2 := func() array.Interface {
270 b := array.NewFloat64Builder(mem)
271 defer b.Release()
272
273 b.AppendValues([]float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil)
274 return b.NewFloat64Array()
275 }()
276 defer col2.Release()
277
278 cols := []array.Interface{col1, col2}
279 return array.NewRecord(schema, cols, -1)
280 }()
281 defer rec2.Release()
282
283 recs := []array.Record{rec1, rec2}
284 itr, err := array.NewRecordReader(schema, recs)
285 if err != nil {
286 t.Fatal(err)
287 }
288 defer itr.Release()
289
290 itr.Retain()
291 itr.Release()
292
293 if got, want := itr.Schema(), schema; !got.Equal(want) {
294 t.Fatalf("invalid schema. got=%#v, want=%#v", got, want)
295 }
296
297 n := 0
298 for itr.Next() {
299 n++
300 if got, want := itr.Record(), recs[n-1]; !reflect.DeepEqual(got, want) {
301 t.Fatalf("itr[%d], invalid record. got=%#v, want=%#v", n-1, got, want)
302 }
303 }
304
305 if n != len(recs) {
306 t.Fatalf("invalid number of iterations. got=%d, want=%d", n, len(recs))
307 }
308
309 for _, tc := range []struct {
310 name string
311 schema *arrow.Schema
312 err error
313 }{
314 {
315 name: "mismatch-name",
316 schema: arrow.NewSchema(
317 []arrow.Field{
318 arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
319 arrow.Field{Name: "f2-XXX", Type: arrow.PrimitiveTypes.Float64},
320 },
321 nil,
322 ),
323 err: fmt.Errorf("arrow/array: mismatch schema"),
324 },
325 {
326 name: "mismatch-type",
327 schema: arrow.NewSchema(
328 []arrow.Field{
329 arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
330 arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Int64},
331 },
332 nil,
333 ),
334 err: fmt.Errorf("arrow/array: mismatch schema"),
335 },
336 } {
337 t.Run(tc.name, func(t *testing.T) {
338 itr, err := array.NewRecordReader(tc.schema, recs)
339 if itr != nil {
340 itr.Release()
341 }
342 if err == nil {
343 t.Fatalf("expected an error: %v", tc.err)
344 }
345 if !reflect.DeepEqual(tc.err, err) {
346 t.Fatalf("invalid error: got=%v, want=%v", err, tc.err)
347 }
348 })
349 }
350 }
351
352 func TestRecordBuilder(t *testing.T) {
353 mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
354 defer mem.AssertSize(t, 0)
355
356 schema := arrow.NewSchema(
357 []arrow.Field{
358 arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
359 arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
360 },
361 nil,
362 )
363
364 b := array.NewRecordBuilder(mem, schema)
365 defer b.Release()
366
367 b.Retain()
368 b.Release()
369
370 b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6}, nil)
371 b.Field(0).(*array.Int32Builder).AppendValues([]int32{7, 8, 9, 10}, nil)
372 b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
373
374 rec := b.NewRecord()
375 defer rec.Release()
376
377 if got, want := rec.Schema(), schema; !got.Equal(want) {
378 t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
379 }
380
381 if got, want := rec.NumRows(), int64(10); got != want {
382 t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
383 }
384 if got, want := rec.NumCols(), int64(2); got != want {
385 t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
386 }
387 if got, want := rec.ColumnName(0), schema.Field(0).Name; got != want {
388 t.Fatalf("invalid column name: got=%q, want=%q", got, want)
389 }
390 }
391
392 type testMessage struct {
393 Foo *testMessageFoo
394 Bars []*testMessageBar
395 }
396
397 func (m *testMessage) Reset() { *m = testMessage{} }
398
399 func (m *testMessage) GetFoo() *testMessageFoo {
400 if m != nil {
401 return m.Foo
402 }
403 return nil
404 }
405
406 func (m *testMessage) GetBars() []*testMessageBar {
407 if m != nil {
408 return m.Bars
409 }
410 return nil
411 }
412
413 type testMessageFoo struct {
414 A int32
415 B []uint32
416 }
417
418 func (m *testMessageFoo) Reset() { *m = testMessageFoo{} }
419
420 func (m *testMessageFoo) GetA() int32 {
421 if m != nil {
422 return m.A
423 }
424 return 0
425 }
426
427 func (m *testMessageFoo) GetB() []uint32 {
428 if m != nil {
429 return m.B
430 }
431 return nil
432 }
433
434 type testMessageBar struct {
435 C int64
436 D []uint64
437 }
438
439 func (m *testMessageBar) Reset() { *m = testMessageBar{} }
440
441 func (m *testMessageBar) GetC() int64 {
442 if m != nil {
443 return m.C
444 }
445 return 0
446 }
447
448 func (m *testMessageBar) GetD() []uint64 {
449 if m != nil {
450 return m.D
451 }
452 return nil
453 }
454
455 var testMessageSchema = arrow.NewSchema(
456 []arrow.Field{
457 arrow.Field{Name: "foo", Type: arrow.StructOf(
458 arrow.Field{Name: "a", Type: arrow.PrimitiveTypes.Int32},
459 arrow.Field{Name: "b", Type: arrow.ListOf(
460 arrow.PrimitiveTypes.Uint32,
461 )},
462 )},
463 arrow.Field{Name: "bars", Type: arrow.ListOf(
464 arrow.StructOf(
465 arrow.Field{Name: "c", Type: arrow.PrimitiveTypes.Int64},
466 arrow.Field{Name: "d", Type: arrow.ListOf(
467 arrow.PrimitiveTypes.Uint64,
468 )},
469 ),
470 )},
471 },
472 nil,
473 )
474
475 func (m *testMessage) Fill(rec array.Record, row int) error {
476 m.Reset()
477
478 // foo
479 if 0 < rec.NumCols() {
480 src0 := rec.Column(0).Data()
481 typedSrc0 := array.NewStructData(src0)
482 defer typedSrc0.Release()
483 if typedSrc0.IsValid(row) {
484 m0 := &testMessageFoo{}
485 {
486
487 // a
488 if 0 < typedSrc0.NumField() {
489 src0_0 := typedSrc0.Field(0).Data()
490 typedSrc0_0 := array.NewInt32Data(src0_0)
491 defer typedSrc0_0.Release()
492 m0.A = typedSrc0_0.Value(row)
493 }
494
495 // b
496 if 1 < typedSrc0.NumField() {
497 src0_1 := typedSrc0.Field(1).Data()
498 listSrc0_1 := array.NewListData(src0_1)
499 defer listSrc0_1.Release()
500 if listSrc0_1.IsValid(row) {
501 typedSrc0_1 := array.NewUint32Data(listSrc0_1.ListValues().Data())
502 typedSrc0_1.Release()
503 start0_1 := int(listSrc0_1.Offsets()[row])
504 end0_1 := int(listSrc0_1.Offsets()[row+1])
505 for row := start0_1; row < end0_1; row++ {
506 m0.B = append(m0.B, typedSrc0_1.Value(row))
507 }
508 }
509 }
510 }
511 m.Foo = m0
512 }
513 }
514
515 // bars
516 if 1 < rec.NumCols() {
517 src1 := rec.Column(1).Data()
518 listSrc1 := array.NewListData(src1)
519 defer listSrc1.Release()
520 if listSrc1.IsValid(row) {
521 typedSrc1 := array.NewStructData(listSrc1.ListValues().Data())
522 defer typedSrc1.Release()
523 start1 := int(listSrc1.Offsets()[row])
524 end1 := int(listSrc1.Offsets()[row+1])
525 for row := start1; row < end1; row++ {
526 if typedSrc1.IsValid(row) {
527 m1 := &testMessageBar{}
528 {
529
530 // c
531 if 0 < typedSrc1.NumField() {
532 src1_0 := typedSrc1.Field(0).Data()
533 typedSrc1_0 := array.NewInt64Data(src1_0)
534 defer typedSrc1_0.Release()
535 m1.C = typedSrc1_0.Value(row)
536 }
537
538 // d
539 if 1 < typedSrc1.NumField() {
540 src1_1 := typedSrc1.Field(1).Data()
541 listSrc1_1 := array.NewListData(src1_1)
542 defer listSrc1_1.Release()
543 if listSrc1_1.IsValid(row) {
544 typedSrc1_1 := array.NewUint64Data(listSrc1_1.ListValues().Data())
545 defer typedSrc1_1.Release()
546 start1_1 := int(listSrc1_1.Offsets()[row])
547 end1_1 := int(listSrc1_1.Offsets()[row+1])
548 for row := start1_1; row < end1_1; row++ {
549 m1.D = append(m1.D, typedSrc1_1.Value(row))
550 }
551 }
552 }
553 }
554 m.Bars = append(m.Bars, m1)
555 } else {
556 m.Bars = append(m.Bars, nil)
557 }
558 }
559 }
560 }
561 return nil
562 }
563
564 func newTestMessageArrowRecordBuilder(mem memory.Allocator) *testMessageArrowRecordBuilder {
565 return &testMessageArrowRecordBuilder{
566 rb: array.NewRecordBuilder(mem, testMessageSchema),
567 }
568 }
569
570 type testMessageArrowRecordBuilder struct {
571 rb *array.RecordBuilder
572 }
573
574 func (b *testMessageArrowRecordBuilder) Build() array.Record {
575 return b.rb.NewRecord()
576 }
577
578 func (b *testMessageArrowRecordBuilder) Release() {
579 b.rb.Release()
580 }
581
582 func (b *testMessageArrowRecordBuilder) Append(m *testMessage) {
583
584 // foo
585 {
586 builder0 := b.rb.Field(0)
587 v0 := m.GetFoo()
588 valueBuilder0 := builder0.(*array.StructBuilder)
589 if v0 == nil {
590 valueBuilder0.AppendNull()
591 } else {
592 valueBuilder0.Append(true)
593
594 // a
595 {
596 v0_0 := v0.GetA()
597 builder0_0 := valueBuilder0.FieldBuilder(0)
598 valueBuilder0_0 := builder0_0.(*array.Int32Builder)
599 valueBuilder0_0.Append(v0_0)
600 }
601
602 // b
603 {
604 v0_1 := v0.GetB()
605 builder0_1 := valueBuilder0.FieldBuilder(1)
606 listBuilder0_1 := builder0_1.(*array.ListBuilder)
607 if len(v0_1) == 0 {
608 listBuilder0_1.AppendNull()
609 } else {
610 listBuilder0_1.Append(true)
611 valueBuilder0_1 := listBuilder0_1.ValueBuilder().(*array.Uint32Builder)
612 for _, item := range v0_1 {
613 valueBuilder0_1.Append(item)
614 }
615 }
616 }
617 }
618 }
619
620 // bars
621 {
622 builder1 := b.rb.Field(1)
623 v1 := m.GetBars()
624 listBuilder1 := builder1.(*array.ListBuilder)
625 if len(v1) == 0 {
626 listBuilder1.AppendNull()
627 } else {
628 listBuilder1.Append(true)
629 valueBuilder1 := listBuilder1.ValueBuilder().(*array.StructBuilder)
630 for _, item := range v1 {
631 if item == nil {
632 valueBuilder1.AppendNull()
633 } else {
634 valueBuilder1.Append(true)
635
636 // c
637 {
638 v1_0 := item.GetC()
639 builder1_0 := valueBuilder1.FieldBuilder(0)
640 valueBuilder1_0 := builder1_0.(*array.Int64Builder)
641 valueBuilder1_0.Append(v1_0)
642 }
643
644 // d
645 {
646 v1_1 := item.GetD()
647 builder1_1 := valueBuilder1.FieldBuilder(1)
648 listBuilder1_1 := builder1_1.(*array.ListBuilder)
649 if len(v1_1) == 0 {
650 listBuilder1_1.AppendNull()
651 } else {
652 listBuilder1_1.Append(true)
653 valueBuilder1_1 := listBuilder1_1.ValueBuilder().(*array.Uint64Builder)
654 for _, item := range v1_1 {
655 valueBuilder1_1.Append(item)
656 }
657 }
658 }
659 }
660 }
661 }
662 }
663 }
664
665 func TestRecordBuilderMessages(t *testing.T) {
666 mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
667 defer mem.AssertSize(t, 0)
668
669 b := newTestMessageArrowRecordBuilder(mem)
670 defer b.Release()
671
672 var msgs []*testMessage
673 for i := 0; i < 1000; i++ {
674 msg := &testMessage{
675 Foo: &testMessageFoo{
676 A: int32(i),
677 B: []uint32{2, 3, 4, 5, 6, 7, 8, 9},
678 },
679 Bars: []*testMessageBar{
680 &testMessageBar{
681 C: 11,
682 D: []uint64{12, 13, 14},
683 },
684 &testMessageBar{
685 C: 15,
686 D: []uint64{16, 17, 18, 19},
687 },
688 nil,
689 &testMessageBar{
690 C: 20,
691 D: []uint64{21},
692 },
693 },
694 }
695 msgs = append(msgs, msg)
696 b.Append(msg)
697 }
698
699 rec := b.Build()
700 defer rec.Release()
701
702 var got testMessage
703 for i := 0; i < 1000; i++ {
704 got.Fill(rec, i)
705 if !reflect.DeepEqual(&got, msgs[i]) {
706 t.Fatalf("row[%d], invalid record. got=%#v, want=%#v", i, &got, msgs[i])
707 }
708 }
709 }