]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/go/arrow/internal/arrdata/ioutil.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / arrow / internal / arrdata / ioutil.go
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package arrdata // import "github.com/apache/arrow/go/v6/arrow/internal/arrdata"
18
19import (
20 "fmt"
21 "io"
22 "os"
23 "sync"
24 "testing"
25
26 "github.com/apache/arrow/go/v6/arrow"
27 "github.com/apache/arrow/go/v6/arrow/array"
28 "github.com/apache/arrow/go/v6/arrow/internal/flatbuf"
29 "github.com/apache/arrow/go/v6/arrow/ipc"
30 "github.com/apache/arrow/go/v6/arrow/memory"
31)
32
33// CheckArrowFile checks whether a given ARROW file contains the expected list of records.
34func CheckArrowFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
35 t.Helper()
36
37 _, err := f.Seek(0, io.SeekStart)
38 if err != nil {
39 t.Fatal(err)
40 }
41
42 r, err := ipc.NewFileReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
43 if err != nil {
44 t.Fatal(err)
45 }
46 defer r.Close()
47
48 for i := 0; i < r.NumRecords(); i++ {
49 rec, err := r.Record(i)
50 if err != nil {
51 t.Fatalf("could not read record %d: %v", i, err)
52 }
53 if !array.RecordEqual(rec, recs[i]) {
54 t.Fatalf("records[%d] differ", i)
55 }
56 }
57
58 err = r.Close()
59 if err != nil {
60 t.Fatal(err)
61 }
62
63}
64
65func CheckArrowConcurrentFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
66 t.Helper()
67
68 _, err := f.Seek(0, io.SeekStart)
69 if err != nil {
70 t.Fatal(err)
71 }
72
73 r, err := ipc.NewFileReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
74 if err != nil {
75 t.Fatal(err)
76 }
77 defer r.Close()
78
79 var g sync.WaitGroup
80 errs := make(chan error, r.NumRecords())
81 checkRecord := func(i int) {
82 defer g.Done()
83 rec, err := r.RecordAt(i)
84 if err != nil {
85 errs <- fmt.Errorf("could not read record %d: %v", i, err)
86 return
87 }
88 if !array.RecordEqual(rec, recs[i]) {
89 errs <- fmt.Errorf("records[%d] differ", i)
90 }
91 }
92
93 for i := 0; i < r.NumRecords(); i++ {
94 g.Add(1)
95 go checkRecord(i)
96 }
97
98 g.Wait()
99 close(errs)
100
101 for err := range errs {
102 if err != nil {
103 t.Fatal(err)
104 }
105 }
106
107 err = r.Close()
108 if err != nil {
109 t.Fatal(err)
110 }
111}
112
113// CheckArrowStream checks whether a given ARROW stream contains the expected list of records.
114func CheckArrowStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
115 t.Helper()
116
117 _, err := f.Seek(0, io.SeekStart)
118 if err != nil {
119 t.Fatal(err)
120 }
121
122 r, err := ipc.NewReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
123 if err != nil {
124 t.Fatal(err)
125 }
126 defer r.Release()
127
128 n := 0
129 for r.Next() {
130 rec := r.Record()
131 if !array.RecordEqual(rec, recs[n]) {
132 t.Fatalf("records[%d] differ, got: %s, expected %s", n, rec, recs[n])
133 }
134 n++
135 }
136
137 if len(recs) != n {
138 t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
139
140 }
141}
142
143// WriteFile writes a list of records to the given file descriptor, as an ARROW file.
144func WriteFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
145 t.Helper()
146
147 w, err := ipc.NewFileWriter(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
148 if err != nil {
149 t.Fatal(err)
150 }
151 defer w.Close()
152
153 for i, rec := range recs {
154 err = w.Write(rec)
155 if err != nil {
156 t.Fatalf("could not write record[%d]: %v", i, err)
157 }
158 }
159
160 err = w.Close()
161 if err != nil {
162 t.Fatal(err)
163 }
164
165 err = f.Sync()
166 if err != nil {
167 t.Fatalf("could not sync data to disk: %v", err)
168 }
169
170 // put the cursor back at the start of the file before returning rather than
171 // leaving it at the end so the reader can just start reading from the handle
172 // immediately for the test.
173 _, err = f.Seek(0, io.SeekStart)
174 if err != nil {
175 t.Fatalf("could not seek to start: %v", err)
176 }
177}
178
179// WriteFile writes a list of records to the given file descriptor, as an ARROW file.
180func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType) {
181 t.Helper()
182
183 opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem)}
184 switch codec {
185 case flatbuf.CompressionTypeLZ4_FRAME:
186 opts = append(opts, ipc.WithLZ4())
187 case flatbuf.CompressionTypeZSTD:
188 opts = append(opts, ipc.WithZstd())
189 default:
190 t.Fatalf("invalid compression codec %v, only LZ4_FRAME or ZSTD is allowed", codec)
191 }
192
193 w, err := ipc.NewFileWriter(f, opts...)
194 if err != nil {
195 t.Fatal(err)
196 }
197 defer w.Close()
198
199 for i, rec := range recs {
200 err = w.Write(rec)
201 if err != nil {
202 t.Fatalf("could not write record[%d]: %v", i, err)
203 }
204 }
205
206 err = w.Close()
207 if err != nil {
208 t.Fatal(err)
209 }
210
211 err = f.Sync()
212 if err != nil {
213 t.Fatalf("could not sync data to disk: %v", err)
214 }
215
216 // put the cursor back at the start of the file before returning rather than
217 // leaving it at the end so the reader can just start reading from the handle
218 // immediately for the test.
219 _, err = f.Seek(0, io.SeekStart)
220 if err != nil {
221 t.Fatalf("could not seek to start: %v", err)
222 }
223}
224
225// WriteStream writes a list of records to the given file descriptor, as an ARROW stream.
226func WriteStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
227 t.Helper()
228
229 w := ipc.NewWriter(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
230 defer w.Close()
231
232 for i, rec := range recs {
233 err := w.Write(rec)
234 if err != nil {
235 t.Fatalf("could not write record[%d]: %v", i, err)
236 }
237 }
238
239 err := w.Close()
240 if err != nil {
241 t.Fatal(err)
242 }
243}
244
245// WriteStreamCompressed writes a list of records to the given file descriptor as an ARROW stream
246// using the provided compression type.
247func WriteStreamCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType, np int) {
248 t.Helper()
249
250 opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem), ipc.WithCompressConcurrency(np)}
251 switch codec {
252 case flatbuf.CompressionTypeLZ4_FRAME:
253 opts = append(opts, ipc.WithLZ4())
254 case flatbuf.CompressionTypeZSTD:
255 opts = append(opts, ipc.WithZstd())
256 default:
257 t.Fatalf("invalid compression codec %v, only LZ4_FRAME or ZSTD is allowed", codec)
258 }
259
260 w := ipc.NewWriter(f, opts...)
261 defer w.Close()
262
263 for i, rec := range recs {
264 err := w.Write(rec)
265 if err != nil {
266 t.Fatalf("could not write record[%d]: %v", i, err)
267 }
268 }
269
270 err := w.Close()
271 if err != nil {
272 t.Fatal(err)
273 }
274}