]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, software | |
12 | // distributed under the License is distributed on an "AS IS" BASIS, | |
13 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | // See the License for the specific language governing permissions and | |
15 | // limitations under the License. | |
16 | ||
17 | package ipc // import "github.com/apache/arrow/go/v6/arrow/ipc" | |
18 | ||
19 | import ( | |
20 | "bytes" | |
21 | "encoding/binary" | |
22 | "io" | |
23 | ||
24 | "github.com/apache/arrow/go/v6/arrow" | |
25 | "github.com/apache/arrow/go/v6/arrow/array" | |
26 | "github.com/apache/arrow/go/v6/arrow/bitutil" | |
27 | "github.com/apache/arrow/go/v6/arrow/internal/flatbuf" | |
28 | "github.com/apache/arrow/go/v6/arrow/memory" | |
29 | "golang.org/x/xerrors" | |
30 | ) | |
31 | ||
32 | // FileReader is an Arrow file reader. | |
33 | type FileReader struct { | |
34 | r ReadAtSeeker | |
35 | ||
36 | footer struct { | |
37 | offset int64 | |
38 | buffer *memory.Buffer | |
39 | data *flatbuf.Footer | |
40 | } | |
41 | ||
42 | fields dictTypeMap | |
43 | memo dictMemo | |
44 | ||
45 | schema *arrow.Schema | |
46 | record array.Record | |
47 | ||
48 | irec int // current record index. used for the arrio.Reader interface | |
49 | err error // last error | |
50 | } | |
51 | ||
52 | // NewFileReader opens an Arrow file using the provided reader r. | |
53 | func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) { | |
54 | var ( | |
55 | cfg = newConfig(opts...) | |
56 | err error | |
57 | ||
58 | f = FileReader{ | |
59 | r: r, | |
60 | fields: make(dictTypeMap), | |
61 | memo: newMemo(), | |
62 | } | |
63 | ) | |
64 | ||
65 | if cfg.footer.offset <= 0 { | |
66 | cfg.footer.offset, err = f.r.Seek(0, io.SeekEnd) | |
67 | if err != nil { | |
68 | return nil, xerrors.Errorf("arrow/ipc: could retrieve footer offset: %w", err) | |
69 | } | |
70 | } | |
71 | f.footer.offset = cfg.footer.offset | |
72 | ||
73 | err = f.readFooter() | |
74 | if err != nil { | |
75 | return nil, xerrors.Errorf("arrow/ipc: could not decode footer: %w", err) | |
76 | } | |
77 | ||
78 | err = f.readSchema() | |
79 | if err != nil { | |
80 | return nil, xerrors.Errorf("arrow/ipc: could not decode schema: %w", err) | |
81 | } | |
82 | ||
83 | if cfg.schema != nil && !cfg.schema.Equal(f.schema) { | |
84 | return nil, xerrors.Errorf("arrow/ipc: inconsistent schema for reading (got: %v, want: %v)", f.schema, cfg.schema) | |
85 | } | |
86 | ||
87 | return &f, err | |
88 | } | |
89 | ||
90 | func (f *FileReader) readFooter() error { | |
91 | var err error | |
92 | ||
93 | if f.footer.offset <= int64(len(Magic)*2+4) { | |
94 | return xerrors.Errorf("arrow/ipc: file too small (size=%d)", f.footer.offset) | |
95 | } | |
96 | ||
97 | eof := int64(len(Magic) + 4) | |
98 | buf := make([]byte, eof) | |
99 | n, err := f.r.ReadAt(buf, f.footer.offset-eof) | |
100 | if err != nil { | |
101 | return xerrors.Errorf("arrow/ipc: could not read footer: %w", err) | |
102 | } | |
103 | if n != len(buf) { | |
104 | return xerrors.Errorf("arrow/ipc: could not read %d bytes from end of file", len(buf)) | |
105 | } | |
106 | ||
107 | if !bytes.Equal(buf[4:], Magic) { | |
108 | return errNotArrowFile | |
109 | } | |
110 | ||
111 | size := int64(binary.LittleEndian.Uint32(buf[:4])) | |
112 | if size <= 0 || size+int64(len(Magic)*2+4) > f.footer.offset { | |
113 | return errInconsistentFileMetadata | |
114 | } | |
115 | ||
116 | buf = make([]byte, size) | |
117 | n, err = f.r.ReadAt(buf, f.footer.offset-size-eof) | |
118 | if err != nil { | |
119 | return xerrors.Errorf("arrow/ipc: could not read footer data: %w", err) | |
120 | } | |
121 | if n != len(buf) { | |
122 | return xerrors.Errorf("arrow/ipc: could not read %d bytes from footer data", len(buf)) | |
123 | } | |
124 | ||
125 | f.footer.buffer = memory.NewBufferBytes(buf) | |
126 | f.footer.data = flatbuf.GetRootAsFooter(buf, 0) | |
127 | return err | |
128 | } | |
129 | ||
130 | func (f *FileReader) readSchema() error { | |
131 | var err error | |
132 | f.fields, err = dictTypesFromFB(f.footer.data.Schema(nil)) | |
133 | if err != nil { | |
134 | return xerrors.Errorf("arrow/ipc: could not load dictionary types from file: %w", err) | |
135 | } | |
136 | ||
137 | for i := 0; i < f.NumDictionaries(); i++ { | |
138 | blk, err := f.dict(i) | |
139 | if err != nil { | |
140 | return xerrors.Errorf("arrow/ipc: could read dictionary[%d]: %w", i, err) | |
141 | } | |
142 | switch { | |
143 | case !bitutil.IsMultipleOf8(blk.Offset): | |
144 | return xerrors.Errorf("arrow/ipc: invalid file offset=%d for dictionary %d", blk.Offset, i) | |
145 | case !bitutil.IsMultipleOf8(int64(blk.Meta)): | |
146 | return xerrors.Errorf("arrow/ipc: invalid file metadata=%d position for dictionary %d", blk.Meta, i) | |
147 | case !bitutil.IsMultipleOf8(blk.Body): | |
148 | return xerrors.Errorf("arrow/ipc: invalid file body=%d position for dictionary %d", blk.Body, i) | |
149 | } | |
150 | ||
151 | msg, err := blk.NewMessage() | |
152 | if err != nil { | |
153 | return err | |
154 | } | |
155 | ||
156 | id, dict, err := readDictionary(msg.meta, f.fields, f.r) | |
157 | msg.Release() | |
158 | if err != nil { | |
159 | return xerrors.Errorf("arrow/ipc: could not read dictionary %d from file: %w", i, err) | |
160 | } | |
161 | f.memo.Add(id, dict) | |
162 | dict.Release() // memo.Add increases ref-count of dict. | |
163 | } | |
164 | ||
165 | schema := f.footer.data.Schema(nil) | |
166 | if schema == nil { | |
167 | return xerrors.Errorf("arrow/ipc: could not load schema from flatbuffer data") | |
168 | } | |
169 | f.schema, err = schemaFromFB(schema, &f.memo) | |
170 | if err != nil { | |
171 | return xerrors.Errorf("arrow/ipc: could not read schema: %w", err) | |
172 | } | |
173 | ||
174 | return err | |
175 | } | |
176 | ||
177 | func (f *FileReader) block(i int) (fileBlock, error) { | |
178 | var blk flatbuf.Block | |
179 | if !f.footer.data.RecordBatches(&blk, i) { | |
180 | return fileBlock{}, xerrors.Errorf("arrow/ipc: could not extract file block %d", i) | |
181 | } | |
182 | ||
183 | return fileBlock{ | |
184 | Offset: blk.Offset(), | |
185 | Meta: blk.MetaDataLength(), | |
186 | Body: blk.BodyLength(), | |
187 | r: f.r, | |
188 | }, nil | |
189 | } | |
190 | ||
191 | func (f *FileReader) dict(i int) (fileBlock, error) { | |
192 | var blk flatbuf.Block | |
193 | if !f.footer.data.Dictionaries(&blk, i) { | |
194 | return fileBlock{}, xerrors.Errorf("arrow/ipc: could not extract dictionary block %d", i) | |
195 | } | |
196 | ||
197 | return fileBlock{ | |
198 | Offset: blk.Offset(), | |
199 | Meta: blk.MetaDataLength(), | |
200 | Body: blk.BodyLength(), | |
201 | r: f.r, | |
202 | }, nil | |
203 | } | |
204 | ||
205 | func (f *FileReader) Schema() *arrow.Schema { | |
206 | return f.schema | |
207 | } | |
208 | ||
209 | func (f *FileReader) NumDictionaries() int { | |
210 | if f.footer.data == nil { | |
211 | return 0 | |
212 | } | |
213 | return f.footer.data.DictionariesLength() | |
214 | } | |
215 | ||
216 | func (f *FileReader) NumRecords() int { | |
217 | return f.footer.data.RecordBatchesLength() | |
218 | } | |
219 | ||
220 | func (f *FileReader) Version() MetadataVersion { | |
221 | return MetadataVersion(f.footer.data.Version()) | |
222 | } | |
223 | ||
224 | // Close cleans up resources used by the File. | |
225 | // Close does not close the underlying reader. | |
226 | func (f *FileReader) Close() error { | |
227 | if f.footer.data != nil { | |
228 | f.footer.data = nil | |
229 | } | |
230 | ||
231 | if f.footer.buffer != nil { | |
232 | f.footer.buffer.Release() | |
233 | f.footer.buffer = nil | |
234 | } | |
235 | ||
236 | if f.record != nil { | |
237 | f.record.Release() | |
238 | f.record = nil | |
239 | } | |
240 | return nil | |
241 | } | |
242 | ||
243 | // Record returns the i-th record from the file. | |
244 | // The returned value is valid until the next call to Record. | |
245 | // Users need to call Retain on that Record to keep it valid for longer. | |
246 | func (f *FileReader) Record(i int) (array.Record, error) { | |
247 | record, err := f.RecordAt(i) | |
248 | if err != nil { | |
249 | return nil, err | |
250 | } | |
251 | ||
252 | if f.record != nil { | |
253 | f.record.Release() | |
254 | } | |
255 | ||
256 | f.record = record | |
257 | return record, nil | |
258 | } | |
259 | ||
260 | // Record returns the i-th record from the file. Ownership is transferred to the | |
261 | // caller and must call Release() to free the memory. This method is safe to | |
262 | // call concurrently. | |
263 | func (f *FileReader) RecordAt(i int) (array.Record, error) { | |
264 | if i < 0 || i > f.NumRecords() { | |
265 | panic("arrow/ipc: record index out of bounds") | |
266 | } | |
267 | ||
268 | blk, err := f.block(i) | |
269 | if err != nil { | |
270 | return nil, err | |
271 | } | |
272 | switch { | |
273 | case !bitutil.IsMultipleOf8(blk.Offset): | |
274 | return nil, xerrors.Errorf("arrow/ipc: invalid file offset=%d for record %d", blk.Offset, i) | |
275 | case !bitutil.IsMultipleOf8(int64(blk.Meta)): | |
276 | return nil, xerrors.Errorf("arrow/ipc: invalid file metadata=%d position for record %d", blk.Meta, i) | |
277 | case !bitutil.IsMultipleOf8(blk.Body): | |
278 | return nil, xerrors.Errorf("arrow/ipc: invalid file body=%d position for record %d", blk.Body, i) | |
279 | } | |
280 | ||
281 | msg, err := blk.NewMessage() | |
282 | if err != nil { | |
283 | return nil, err | |
284 | } | |
285 | defer msg.Release() | |
286 | ||
287 | if msg.Type() != MessageRecordBatch { | |
288 | return nil, xerrors.Errorf("arrow/ipc: message %d is not a Record", i) | |
289 | } | |
290 | ||
291 | return newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes())), nil | |
292 | } | |
293 | ||
294 | // Read reads the current record from the underlying stream and an error, if any. | |
295 | // When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF). | |
296 | // | |
297 | // The returned record value is valid until the next call to Read. | |
298 | // Users need to call Retain on that Record to keep it valid for longer. | |
299 | func (f *FileReader) Read() (rec array.Record, err error) { | |
300 | if f.irec == f.NumRecords() { | |
301 | return nil, io.EOF | |
302 | } | |
303 | rec, f.err = f.Record(f.irec) | |
304 | f.irec++ | |
305 | return rec, f.err | |
306 | } | |
307 | ||
308 | // ReadAt reads the i-th record from the underlying stream and an error, if any. | |
309 | func (f *FileReader) ReadAt(i int64) (array.Record, error) { | |
310 | return f.Record(int(i)) | |
311 | } | |
312 | ||
313 | func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record { | |
314 | var ( | |
315 | msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) | |
316 | md flatbuf.RecordBatch | |
317 | codec decompressor | |
318 | ) | |
319 | initFB(&md, msg.Header) | |
320 | rows := md.Length() | |
321 | ||
322 | bodyCompress := md.Compression(nil) | |
323 | if bodyCompress != nil { | |
324 | codec = getDecompressor(bodyCompress.Codec()) | |
325 | } | |
326 | ||
327 | ctx := &arrayLoaderContext{ | |
328 | src: ipcSource{ | |
329 | meta: &md, | |
330 | r: body, | |
331 | codec: codec, | |
332 | }, | |
333 | max: kMaxNestingDepth, | |
334 | } | |
335 | ||
336 | cols := make([]array.Interface, len(schema.Fields())) | |
337 | for i, field := range schema.Fields() { | |
338 | cols[i] = ctx.loadArray(field.Type) | |
339 | } | |
340 | ||
341 | return array.NewRecord(schema, cols, rows) | |
342 | } | |
343 | ||
344 | type ipcSource struct { | |
345 | meta *flatbuf.RecordBatch | |
346 | r ReadAtSeeker | |
347 | codec decompressor | |
348 | } | |
349 | ||
350 | func (src *ipcSource) buffer(i int) *memory.Buffer { | |
351 | var buf flatbuf.Buffer | |
352 | if !src.meta.Buffers(&buf, i) { | |
353 | panic("buffer index out of bound") | |
354 | } | |
355 | if buf.Length() == 0 { | |
356 | return memory.NewBufferBytes(nil) | |
357 | } | |
358 | ||
359 | var raw []byte | |
360 | if src.codec == nil { | |
361 | raw = make([]byte, buf.Length()) | |
362 | _, err := src.r.ReadAt(raw, buf.Offset()) | |
363 | if err != nil { | |
364 | panic(err) | |
365 | } | |
366 | } else { | |
367 | sr := io.NewSectionReader(src.r, buf.Offset(), buf.Length()) | |
368 | var uncompressedSize uint64 | |
369 | ||
370 | err := binary.Read(sr, binary.LittleEndian, &uncompressedSize) | |
371 | if err != nil { | |
372 | panic(err) | |
373 | } | |
374 | ||
375 | var r io.Reader = sr | |
376 | // check for an uncompressed buffer | |
377 | if int64(uncompressedSize) != -1 { | |
378 | raw = make([]byte, uncompressedSize) | |
379 | src.codec.Reset(sr) | |
380 | r = src.codec | |
381 | } else { | |
382 | raw = make([]byte, buf.Length()) | |
383 | } | |
384 | ||
385 | if _, err = io.ReadFull(r, raw); err != nil { | |
386 | panic(err) | |
387 | } | |
388 | } | |
389 | ||
390 | return memory.NewBufferBytes(raw) | |
391 | } | |
392 | ||
393 | func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode { | |
394 | var node flatbuf.FieldNode | |
395 | if !src.meta.Nodes(&node, i) { | |
396 | panic("field metadata out of bound") | |
397 | } | |
398 | return &node | |
399 | } | |
400 | ||
401 | type arrayLoaderContext struct { | |
402 | src ipcSource | |
403 | ifield int | |
404 | ibuffer int | |
405 | max int | |
406 | } | |
407 | ||
408 | func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode { | |
409 | field := ctx.src.fieldMetadata(ctx.ifield) | |
410 | ctx.ifield++ | |
411 | return field | |
412 | } | |
413 | ||
414 | func (ctx *arrayLoaderContext) buffer() *memory.Buffer { | |
415 | buf := ctx.src.buffer(ctx.ibuffer) | |
416 | ctx.ibuffer++ | |
417 | return buf | |
418 | } | |
419 | ||
420 | func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface { | |
421 | switch dt := dt.(type) { | |
422 | case *arrow.NullType: | |
423 | return ctx.loadNull() | |
424 | ||
425 | case *arrow.BooleanType, | |
426 | *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type, | |
427 | *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type, | |
428 | *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type, | |
429 | *arrow.Decimal128Type, | |
430 | *arrow.Time32Type, *arrow.Time64Type, | |
431 | *arrow.TimestampType, | |
432 | *arrow.Date32Type, *arrow.Date64Type, | |
433 | *arrow.MonthIntervalType, *arrow.DayTimeIntervalType, *arrow.MonthDayNanoIntervalType, | |
434 | *arrow.DurationType: | |
435 | return ctx.loadPrimitive(dt) | |
436 | ||
437 | case *arrow.BinaryType, *arrow.StringType: | |
438 | return ctx.loadBinary(dt) | |
439 | ||
440 | case *arrow.FixedSizeBinaryType: | |
441 | return ctx.loadFixedSizeBinary(dt) | |
442 | ||
443 | case *arrow.ListType: | |
444 | return ctx.loadList(dt) | |
445 | ||
446 | case *arrow.FixedSizeListType: | |
447 | return ctx.loadFixedSizeList(dt) | |
448 | ||
449 | case *arrow.StructType: | |
450 | return ctx.loadStruct(dt) | |
451 | ||
452 | case *arrow.MapType: | |
453 | return ctx.loadMap(dt) | |
454 | ||
455 | case arrow.ExtensionType: | |
456 | storage := ctx.loadArray(dt.StorageType()) | |
457 | defer storage.Release() | |
458 | return array.NewExtensionArrayWithStorage(dt, storage) | |
459 | ||
460 | default: | |
461 | panic(xerrors.Errorf("array type %T not handled yet", dt)) | |
462 | } | |
463 | } | |
464 | ||
465 | func (ctx *arrayLoaderContext) loadCommon(nbufs int) (*flatbuf.FieldNode, []*memory.Buffer) { | |
466 | buffers := make([]*memory.Buffer, 0, nbufs) | |
467 | field := ctx.field() | |
468 | ||
469 | var buf *memory.Buffer | |
470 | switch field.NullCount() { | |
471 | case 0: | |
472 | ctx.ibuffer++ | |
473 | default: | |
474 | buf = ctx.buffer() | |
475 | } | |
476 | buffers = append(buffers, buf) | |
477 | ||
478 | return field, buffers | |
479 | } | |
480 | ||
481 | func (ctx *arrayLoaderContext) loadChild(dt arrow.DataType) array.Interface { | |
482 | if ctx.max == 0 { | |
483 | panic("arrow/ipc: nested type limit reached") | |
484 | } | |
485 | ctx.max-- | |
486 | sub := ctx.loadArray(dt) | |
487 | ctx.max++ | |
488 | return sub | |
489 | } | |
490 | ||
491 | func (ctx *arrayLoaderContext) loadNull() array.Interface { | |
492 | field := ctx.field() | |
493 | data := array.NewData(arrow.Null, int(field.Length()), nil, nil, int(field.NullCount()), 0) | |
494 | defer data.Release() | |
495 | ||
496 | return array.MakeFromData(data) | |
497 | } | |
498 | ||
499 | func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType) array.Interface { | |
500 | field, buffers := ctx.loadCommon(2) | |
501 | ||
502 | switch field.Length() { | |
503 | case 0: | |
504 | buffers = append(buffers, nil) | |
505 | ctx.ibuffer++ | |
506 | default: | |
507 | buffers = append(buffers, ctx.buffer()) | |
508 | } | |
509 | ||
510 | data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) | |
511 | defer data.Release() | |
512 | ||
513 | return array.MakeFromData(data) | |
514 | } | |
515 | ||
516 | func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface { | |
517 | field, buffers := ctx.loadCommon(3) | |
518 | buffers = append(buffers, ctx.buffer(), ctx.buffer()) | |
519 | ||
520 | data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) | |
521 | defer data.Release() | |
522 | ||
523 | return array.MakeFromData(data) | |
524 | } | |
525 | ||
526 | func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) array.Interface { | |
527 | field, buffers := ctx.loadCommon(2) | |
528 | buffers = append(buffers, ctx.buffer()) | |
529 | ||
530 | data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) | |
531 | defer data.Release() | |
532 | ||
533 | return array.MakeFromData(data) | |
534 | } | |
535 | ||
536 | func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) array.Interface { | |
537 | field, buffers := ctx.loadCommon(2) | |
538 | buffers = append(buffers, ctx.buffer()) | |
539 | ||
540 | sub := ctx.loadChild(dt.ValueType()) | |
541 | defer sub.Release() | |
542 | ||
543 | data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0) | |
544 | defer data.Release() | |
545 | ||
546 | return array.NewMapData(data) | |
547 | } | |
548 | ||
549 | func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface { | |
550 | field, buffers := ctx.loadCommon(2) | |
551 | buffers = append(buffers, ctx.buffer()) | |
552 | ||
553 | sub := ctx.loadChild(dt.Elem()) | |
554 | defer sub.Release() | |
555 | ||
556 | data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0) | |
557 | defer data.Release() | |
558 | ||
559 | return array.NewListData(data) | |
560 | } | |
561 | ||
562 | func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) array.Interface { | |
563 | field, buffers := ctx.loadCommon(1) | |
564 | ||
565 | sub := ctx.loadChild(dt.Elem()) | |
566 | defer sub.Release() | |
567 | ||
568 | data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0) | |
569 | defer data.Release() | |
570 | ||
571 | return array.NewFixedSizeListData(data) | |
572 | } | |
573 | ||
574 | func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType) array.Interface { | |
575 | field, buffers := ctx.loadCommon(1) | |
576 | ||
577 | arrs := make([]array.Interface, len(dt.Fields())) | |
578 | subs := make([]*array.Data, len(dt.Fields())) | |
579 | for i, f := range dt.Fields() { | |
580 | arrs[i] = ctx.loadChild(f.Type) | |
581 | subs[i] = arrs[i].Data() | |
582 | } | |
583 | defer func() { | |
584 | for i := range arrs { | |
585 | arrs[i].Release() | |
586 | } | |
587 | }() | |
588 | ||
589 | data := array.NewData(dt, int(field.Length()), buffers, subs, int(field.NullCount()), 0) | |
590 | defer data.Release() | |
591 | ||
592 | return array.NewStructData(data) | |
593 | } | |
594 | ||
595 | func readDictionary(meta *memory.Buffer, types dictTypeMap, r ReadAtSeeker) (int64, array.Interface, error) { | |
596 | // msg := flatbuf.GetRootAsMessage(meta.Bytes(), 0) | |
597 | // var dictBatch flatbuf.DictionaryBatch | |
598 | // initFB(&dictBatch, msg.Header) | |
599 | // | |
600 | // id := dictBatch.Id() | |
601 | // v, ok := types[id] | |
602 | // if !ok { | |
603 | // return id, nil, errors.Errorf("arrow/ipc: no type metadata for dictionary with ID=%d", id) | |
604 | // } | |
605 | // | |
606 | // fields := []arrow.Field{v} | |
607 | // | |
608 | // // we need a schema for the record batch. | |
609 | // schema := arrow.NewSchema(fields, nil) | |
610 | // | |
611 | // // the dictionary is embedded in a record batch with a single column. | |
612 | // recBatch := dictBatch.Data(nil) | |
613 | // | |
614 | // var ( | |
615 | // batchMeta *memory.Buffer | |
616 | // body *memory.Buffer | |
617 | // ) | |
618 | // | |
619 | // | |
620 | // ctx := &arrayLoaderContext{ | |
621 | // src: ipcSource{ | |
622 | // meta: &md, | |
623 | // r: bytes.NewReader(body.Bytes()), | |
624 | // }, | |
625 | // max: kMaxNestingDepth, | |
626 | // } | |
627 | // | |
628 | // cols := make([]array.Interface, len(schema.Fields())) | |
629 | // for i, field := range schema.Fields() { | |
630 | // cols[i] = ctx.loadArray(field.Type) | |
631 | // } | |
632 | // | |
633 | // batch := array.NewRecord(schema, cols, rows) | |
634 | ||
635 | panic("not implemented") | |
636 | } |