]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, software | |
12 | // distributed under the License is distributed on an "AS IS" BASIS, | |
13 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | // See the License for the specific language governing permissions and | |
15 | // limitations under the License. | |
16 | ||
17 | package arrdata // import "github.com/apache/arrow/go/v6/arrow/internal/arrdata" | |
18 | ||
19 | import ( | |
20 | "fmt" | |
21 | "io" | |
22 | "os" | |
23 | "sync" | |
24 | "testing" | |
25 | ||
26 | "github.com/apache/arrow/go/v6/arrow" | |
27 | "github.com/apache/arrow/go/v6/arrow/array" | |
28 | "github.com/apache/arrow/go/v6/arrow/internal/flatbuf" | |
29 | "github.com/apache/arrow/go/v6/arrow/ipc" | |
30 | "github.com/apache/arrow/go/v6/arrow/memory" | |
31 | ) | |
32 | ||
33 | // CheckArrowFile checks whether a given ARROW file contains the expected list of records. | |
34 | func CheckArrowFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) { | |
35 | t.Helper() | |
36 | ||
37 | _, err := f.Seek(0, io.SeekStart) | |
38 | if err != nil { | |
39 | t.Fatal(err) | |
40 | } | |
41 | ||
42 | r, err := ipc.NewFileReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem)) | |
43 | if err != nil { | |
44 | t.Fatal(err) | |
45 | } | |
46 | defer r.Close() | |
47 | ||
48 | for i := 0; i < r.NumRecords(); i++ { | |
49 | rec, err := r.Record(i) | |
50 | if err != nil { | |
51 | t.Fatalf("could not read record %d: %v", i, err) | |
52 | } | |
53 | if !array.RecordEqual(rec, recs[i]) { | |
54 | t.Fatalf("records[%d] differ", i) | |
55 | } | |
56 | } | |
57 | ||
58 | err = r.Close() | |
59 | if err != nil { | |
60 | t.Fatal(err) | |
61 | } | |
62 | ||
63 | } | |
64 | ||
65 | func CheckArrowConcurrentFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) { | |
66 | t.Helper() | |
67 | ||
68 | _, err := f.Seek(0, io.SeekStart) | |
69 | if err != nil { | |
70 | t.Fatal(err) | |
71 | } | |
72 | ||
73 | r, err := ipc.NewFileReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem)) | |
74 | if err != nil { | |
75 | t.Fatal(err) | |
76 | } | |
77 | defer r.Close() | |
78 | ||
79 | var g sync.WaitGroup | |
80 | errs := make(chan error, r.NumRecords()) | |
81 | checkRecord := func(i int) { | |
82 | defer g.Done() | |
83 | rec, err := r.RecordAt(i) | |
84 | if err != nil { | |
85 | errs <- fmt.Errorf("could not read record %d: %v", i, err) | |
86 | return | |
87 | } | |
88 | if !array.RecordEqual(rec, recs[i]) { | |
89 | errs <- fmt.Errorf("records[%d] differ", i) | |
90 | } | |
91 | } | |
92 | ||
93 | for i := 0; i < r.NumRecords(); i++ { | |
94 | g.Add(1) | |
95 | go checkRecord(i) | |
96 | } | |
97 | ||
98 | g.Wait() | |
99 | close(errs) | |
100 | ||
101 | for err := range errs { | |
102 | if err != nil { | |
103 | t.Fatal(err) | |
104 | } | |
105 | } | |
106 | ||
107 | err = r.Close() | |
108 | if err != nil { | |
109 | t.Fatal(err) | |
110 | } | |
111 | } | |
112 | ||
113 | // CheckArrowStream checks whether a given ARROW stream contains the expected list of records. | |
114 | func CheckArrowStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) { | |
115 | t.Helper() | |
116 | ||
117 | _, err := f.Seek(0, io.SeekStart) | |
118 | if err != nil { | |
119 | t.Fatal(err) | |
120 | } | |
121 | ||
122 | r, err := ipc.NewReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem)) | |
123 | if err != nil { | |
124 | t.Fatal(err) | |
125 | } | |
126 | defer r.Release() | |
127 | ||
128 | n := 0 | |
129 | for r.Next() { | |
130 | rec := r.Record() | |
131 | if !array.RecordEqual(rec, recs[n]) { | |
132 | t.Fatalf("records[%d] differ, got: %s, expected %s", n, rec, recs[n]) | |
133 | } | |
134 | n++ | |
135 | } | |
136 | ||
137 | if len(recs) != n { | |
138 | t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs)) | |
139 | ||
140 | } | |
141 | } | |
142 | ||
143 | // WriteFile writes a list of records to the given file descriptor, as an ARROW file. | |
144 | func WriteFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) { | |
145 | t.Helper() | |
146 | ||
147 | w, err := ipc.NewFileWriter(f, ipc.WithSchema(schema), ipc.WithAllocator(mem)) | |
148 | if err != nil { | |
149 | t.Fatal(err) | |
150 | } | |
151 | defer w.Close() | |
152 | ||
153 | for i, rec := range recs { | |
154 | err = w.Write(rec) | |
155 | if err != nil { | |
156 | t.Fatalf("could not write record[%d]: %v", i, err) | |
157 | } | |
158 | } | |
159 | ||
160 | err = w.Close() | |
161 | if err != nil { | |
162 | t.Fatal(err) | |
163 | } | |
164 | ||
165 | err = f.Sync() | |
166 | if err != nil { | |
167 | t.Fatalf("could not sync data to disk: %v", err) | |
168 | } | |
169 | ||
170 | // put the cursor back at the start of the file before returning rather than | |
171 | // leaving it at the end so the reader can just start reading from the handle | |
172 | // immediately for the test. | |
173 | _, err = f.Seek(0, io.SeekStart) | |
174 | if err != nil { | |
175 | t.Fatalf("could not seek to start: %v", err) | |
176 | } | |
177 | } | |
178 | ||
179 | // WriteFile writes a list of records to the given file descriptor, as an ARROW file. | |
180 | func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType) { | |
181 | t.Helper() | |
182 | ||
183 | opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem)} | |
184 | switch codec { | |
185 | case flatbuf.CompressionTypeLZ4_FRAME: | |
186 | opts = append(opts, ipc.WithLZ4()) | |
187 | case flatbuf.CompressionTypeZSTD: | |
188 | opts = append(opts, ipc.WithZstd()) | |
189 | default: | |
190 | t.Fatalf("invalid compression codec %v, only LZ4_FRAME or ZSTD is allowed", codec) | |
191 | } | |
192 | ||
193 | w, err := ipc.NewFileWriter(f, opts...) | |
194 | if err != nil { | |
195 | t.Fatal(err) | |
196 | } | |
197 | defer w.Close() | |
198 | ||
199 | for i, rec := range recs { | |
200 | err = w.Write(rec) | |
201 | if err != nil { | |
202 | t.Fatalf("could not write record[%d]: %v", i, err) | |
203 | } | |
204 | } | |
205 | ||
206 | err = w.Close() | |
207 | if err != nil { | |
208 | t.Fatal(err) | |
209 | } | |
210 | ||
211 | err = f.Sync() | |
212 | if err != nil { | |
213 | t.Fatalf("could not sync data to disk: %v", err) | |
214 | } | |
215 | ||
216 | // put the cursor back at the start of the file before returning rather than | |
217 | // leaving it at the end so the reader can just start reading from the handle | |
218 | // immediately for the test. | |
219 | _, err = f.Seek(0, io.SeekStart) | |
220 | if err != nil { | |
221 | t.Fatalf("could not seek to start: %v", err) | |
222 | } | |
223 | } | |
224 | ||
225 | // WriteStream writes a list of records to the given file descriptor, as an ARROW stream. | |
226 | func WriteStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) { | |
227 | t.Helper() | |
228 | ||
229 | w := ipc.NewWriter(f, ipc.WithSchema(schema), ipc.WithAllocator(mem)) | |
230 | defer w.Close() | |
231 | ||
232 | for i, rec := range recs { | |
233 | err := w.Write(rec) | |
234 | if err != nil { | |
235 | t.Fatalf("could not write record[%d]: %v", i, err) | |
236 | } | |
237 | } | |
238 | ||
239 | err := w.Close() | |
240 | if err != nil { | |
241 | t.Fatal(err) | |
242 | } | |
243 | } | |
244 | ||
245 | // WriteStreamCompressed writes a list of records to the given file descriptor as an ARROW stream | |
246 | // using the provided compression type. | |
247 | func WriteStreamCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType, np int) { | |
248 | t.Helper() | |
249 | ||
250 | opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem), ipc.WithCompressConcurrency(np)} | |
251 | switch codec { | |
252 | case flatbuf.CompressionTypeLZ4_FRAME: | |
253 | opts = append(opts, ipc.WithLZ4()) | |
254 | case flatbuf.CompressionTypeZSTD: | |
255 | opts = append(opts, ipc.WithZstd()) | |
256 | default: | |
257 | t.Fatalf("invalid compression codec %v, only LZ4_FRAME or ZSTD is allowed", codec) | |
258 | } | |
259 | ||
260 | w := ipc.NewWriter(f, opts...) | |
261 | defer w.Close() | |
262 | ||
263 | for i, rec := range recs { | |
264 | err := w.Write(rec) | |
265 | if err != nil { | |
266 | t.Fatalf("could not write record[%d]: %v", i, err) | |
267 | } | |
268 | } | |
269 | ||
270 | err := w.Close() | |
271 | if err != nil { | |
272 | t.Fatal(err) | |
273 | } | |
274 | } |