]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/go/arrow/ipc/file_reader.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / arrow / ipc / file_reader.go
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package ipc // import "github.com/apache/arrow/go/v6/arrow/ipc"
18
19import (
20 "bytes"
21 "encoding/binary"
22 "io"
23
24 "github.com/apache/arrow/go/v6/arrow"
25 "github.com/apache/arrow/go/v6/arrow/array"
26 "github.com/apache/arrow/go/v6/arrow/bitutil"
27 "github.com/apache/arrow/go/v6/arrow/internal/flatbuf"
28 "github.com/apache/arrow/go/v6/arrow/memory"
29 "golang.org/x/xerrors"
30)
31
32// FileReader is an Arrow file reader.
33type FileReader struct {
34 r ReadAtSeeker
35
36 footer struct {
37 offset int64
38 buffer *memory.Buffer
39 data *flatbuf.Footer
40 }
41
42 fields dictTypeMap
43 memo dictMemo
44
45 schema *arrow.Schema
46 record array.Record
47
48 irec int // current record index. used for the arrio.Reader interface
49 err error // last error
50}
51
52// NewFileReader opens an Arrow file using the provided reader r.
53func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) {
54 var (
55 cfg = newConfig(opts...)
56 err error
57
58 f = FileReader{
59 r: r,
60 fields: make(dictTypeMap),
61 memo: newMemo(),
62 }
63 )
64
65 if cfg.footer.offset <= 0 {
66 cfg.footer.offset, err = f.r.Seek(0, io.SeekEnd)
67 if err != nil {
68 return nil, xerrors.Errorf("arrow/ipc: could retrieve footer offset: %w", err)
69 }
70 }
71 f.footer.offset = cfg.footer.offset
72
73 err = f.readFooter()
74 if err != nil {
75 return nil, xerrors.Errorf("arrow/ipc: could not decode footer: %w", err)
76 }
77
78 err = f.readSchema()
79 if err != nil {
80 return nil, xerrors.Errorf("arrow/ipc: could not decode schema: %w", err)
81 }
82
83 if cfg.schema != nil && !cfg.schema.Equal(f.schema) {
84 return nil, xerrors.Errorf("arrow/ipc: inconsistent schema for reading (got: %v, want: %v)", f.schema, cfg.schema)
85 }
86
87 return &f, err
88}
89
90func (f *FileReader) readFooter() error {
91 var err error
92
93 if f.footer.offset <= int64(len(Magic)*2+4) {
94 return xerrors.Errorf("arrow/ipc: file too small (size=%d)", f.footer.offset)
95 }
96
97 eof := int64(len(Magic) + 4)
98 buf := make([]byte, eof)
99 n, err := f.r.ReadAt(buf, f.footer.offset-eof)
100 if err != nil {
101 return xerrors.Errorf("arrow/ipc: could not read footer: %w", err)
102 }
103 if n != len(buf) {
104 return xerrors.Errorf("arrow/ipc: could not read %d bytes from end of file", len(buf))
105 }
106
107 if !bytes.Equal(buf[4:], Magic) {
108 return errNotArrowFile
109 }
110
111 size := int64(binary.LittleEndian.Uint32(buf[:4]))
112 if size <= 0 || size+int64(len(Magic)*2+4) > f.footer.offset {
113 return errInconsistentFileMetadata
114 }
115
116 buf = make([]byte, size)
117 n, err = f.r.ReadAt(buf, f.footer.offset-size-eof)
118 if err != nil {
119 return xerrors.Errorf("arrow/ipc: could not read footer data: %w", err)
120 }
121 if n != len(buf) {
122 return xerrors.Errorf("arrow/ipc: could not read %d bytes from footer data", len(buf))
123 }
124
125 f.footer.buffer = memory.NewBufferBytes(buf)
126 f.footer.data = flatbuf.GetRootAsFooter(buf, 0)
127 return err
128}
129
130func (f *FileReader) readSchema() error {
131 var err error
132 f.fields, err = dictTypesFromFB(f.footer.data.Schema(nil))
133 if err != nil {
134 return xerrors.Errorf("arrow/ipc: could not load dictionary types from file: %w", err)
135 }
136
137 for i := 0; i < f.NumDictionaries(); i++ {
138 blk, err := f.dict(i)
139 if err != nil {
140 return xerrors.Errorf("arrow/ipc: could read dictionary[%d]: %w", i, err)
141 }
142 switch {
143 case !bitutil.IsMultipleOf8(blk.Offset):
144 return xerrors.Errorf("arrow/ipc: invalid file offset=%d for dictionary %d", blk.Offset, i)
145 case !bitutil.IsMultipleOf8(int64(blk.Meta)):
146 return xerrors.Errorf("arrow/ipc: invalid file metadata=%d position for dictionary %d", blk.Meta, i)
147 case !bitutil.IsMultipleOf8(blk.Body):
148 return xerrors.Errorf("arrow/ipc: invalid file body=%d position for dictionary %d", blk.Body, i)
149 }
150
151 msg, err := blk.NewMessage()
152 if err != nil {
153 return err
154 }
155
156 id, dict, err := readDictionary(msg.meta, f.fields, f.r)
157 msg.Release()
158 if err != nil {
159 return xerrors.Errorf("arrow/ipc: could not read dictionary %d from file: %w", i, err)
160 }
161 f.memo.Add(id, dict)
162 dict.Release() // memo.Add increases ref-count of dict.
163 }
164
165 schema := f.footer.data.Schema(nil)
166 if schema == nil {
167 return xerrors.Errorf("arrow/ipc: could not load schema from flatbuffer data")
168 }
169 f.schema, err = schemaFromFB(schema, &f.memo)
170 if err != nil {
171 return xerrors.Errorf("arrow/ipc: could not read schema: %w", err)
172 }
173
174 return err
175}
176
177func (f *FileReader) block(i int) (fileBlock, error) {
178 var blk flatbuf.Block
179 if !f.footer.data.RecordBatches(&blk, i) {
180 return fileBlock{}, xerrors.Errorf("arrow/ipc: could not extract file block %d", i)
181 }
182
183 return fileBlock{
184 Offset: blk.Offset(),
185 Meta: blk.MetaDataLength(),
186 Body: blk.BodyLength(),
187 r: f.r,
188 }, nil
189}
190
191func (f *FileReader) dict(i int) (fileBlock, error) {
192 var blk flatbuf.Block
193 if !f.footer.data.Dictionaries(&blk, i) {
194 return fileBlock{}, xerrors.Errorf("arrow/ipc: could not extract dictionary block %d", i)
195 }
196
197 return fileBlock{
198 Offset: blk.Offset(),
199 Meta: blk.MetaDataLength(),
200 Body: blk.BodyLength(),
201 r: f.r,
202 }, nil
203}
204
205func (f *FileReader) Schema() *arrow.Schema {
206 return f.schema
207}
208
209func (f *FileReader) NumDictionaries() int {
210 if f.footer.data == nil {
211 return 0
212 }
213 return f.footer.data.DictionariesLength()
214}
215
216func (f *FileReader) NumRecords() int {
217 return f.footer.data.RecordBatchesLength()
218}
219
220func (f *FileReader) Version() MetadataVersion {
221 return MetadataVersion(f.footer.data.Version())
222}
223
224// Close cleans up resources used by the File.
225// Close does not close the underlying reader.
226func (f *FileReader) Close() error {
227 if f.footer.data != nil {
228 f.footer.data = nil
229 }
230
231 if f.footer.buffer != nil {
232 f.footer.buffer.Release()
233 f.footer.buffer = nil
234 }
235
236 if f.record != nil {
237 f.record.Release()
238 f.record = nil
239 }
240 return nil
241}
242
243// Record returns the i-th record from the file.
244// The returned value is valid until the next call to Record.
245// Users need to call Retain on that Record to keep it valid for longer.
246func (f *FileReader) Record(i int) (array.Record, error) {
247 record, err := f.RecordAt(i)
248 if err != nil {
249 return nil, err
250 }
251
252 if f.record != nil {
253 f.record.Release()
254 }
255
256 f.record = record
257 return record, nil
258}
259
260// Record returns the i-th record from the file. Ownership is transferred to the
261// caller and must call Release() to free the memory. This method is safe to
262// call concurrently.
263func (f *FileReader) RecordAt(i int) (array.Record, error) {
264 if i < 0 || i > f.NumRecords() {
265 panic("arrow/ipc: record index out of bounds")
266 }
267
268 blk, err := f.block(i)
269 if err != nil {
270 return nil, err
271 }
272 switch {
273 case !bitutil.IsMultipleOf8(blk.Offset):
274 return nil, xerrors.Errorf("arrow/ipc: invalid file offset=%d for record %d", blk.Offset, i)
275 case !bitutil.IsMultipleOf8(int64(blk.Meta)):
276 return nil, xerrors.Errorf("arrow/ipc: invalid file metadata=%d position for record %d", blk.Meta, i)
277 case !bitutil.IsMultipleOf8(blk.Body):
278 return nil, xerrors.Errorf("arrow/ipc: invalid file body=%d position for record %d", blk.Body, i)
279 }
280
281 msg, err := blk.NewMessage()
282 if err != nil {
283 return nil, err
284 }
285 defer msg.Release()
286
287 if msg.Type() != MessageRecordBatch {
288 return nil, xerrors.Errorf("arrow/ipc: message %d is not a Record", i)
289 }
290
291 return newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes())), nil
292}
293
294// Read reads the current record from the underlying stream and an error, if any.
295// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
296//
297// The returned record value is valid until the next call to Read.
298// Users need to call Retain on that Record to keep it valid for longer.
299func (f *FileReader) Read() (rec array.Record, err error) {
300 if f.irec == f.NumRecords() {
301 return nil, io.EOF
302 }
303 rec, f.err = f.Record(f.irec)
304 f.irec++
305 return rec, f.err
306}
307
308// ReadAt reads the i-th record from the underlying stream and an error, if any.
309func (f *FileReader) ReadAt(i int64) (array.Record, error) {
310 return f.Record(int(i))
311}
312
313func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record {
314 var (
315 msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
316 md flatbuf.RecordBatch
317 codec decompressor
318 )
319 initFB(&md, msg.Header)
320 rows := md.Length()
321
322 bodyCompress := md.Compression(nil)
323 if bodyCompress != nil {
324 codec = getDecompressor(bodyCompress.Codec())
325 }
326
327 ctx := &arrayLoaderContext{
328 src: ipcSource{
329 meta: &md,
330 r: body,
331 codec: codec,
332 },
333 max: kMaxNestingDepth,
334 }
335
336 cols := make([]array.Interface, len(schema.Fields()))
337 for i, field := range schema.Fields() {
338 cols[i] = ctx.loadArray(field.Type)
339 }
340
341 return array.NewRecord(schema, cols, rows)
342}
343
344type ipcSource struct {
345 meta *flatbuf.RecordBatch
346 r ReadAtSeeker
347 codec decompressor
348}
349
350func (src *ipcSource) buffer(i int) *memory.Buffer {
351 var buf flatbuf.Buffer
352 if !src.meta.Buffers(&buf, i) {
353 panic("buffer index out of bound")
354 }
355 if buf.Length() == 0 {
356 return memory.NewBufferBytes(nil)
357 }
358
359 var raw []byte
360 if src.codec == nil {
361 raw = make([]byte, buf.Length())
362 _, err := src.r.ReadAt(raw, buf.Offset())
363 if err != nil {
364 panic(err)
365 }
366 } else {
367 sr := io.NewSectionReader(src.r, buf.Offset(), buf.Length())
368 var uncompressedSize uint64
369
370 err := binary.Read(sr, binary.LittleEndian, &uncompressedSize)
371 if err != nil {
372 panic(err)
373 }
374
375 var r io.Reader = sr
376 // check for an uncompressed buffer
377 if int64(uncompressedSize) != -1 {
378 raw = make([]byte, uncompressedSize)
379 src.codec.Reset(sr)
380 r = src.codec
381 } else {
382 raw = make([]byte, buf.Length())
383 }
384
385 if _, err = io.ReadFull(r, raw); err != nil {
386 panic(err)
387 }
388 }
389
390 return memory.NewBufferBytes(raw)
391}
392
393func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
394 var node flatbuf.FieldNode
395 if !src.meta.Nodes(&node, i) {
396 panic("field metadata out of bound")
397 }
398 return &node
399}
400
401type arrayLoaderContext struct {
402 src ipcSource
403 ifield int
404 ibuffer int
405 max int
406}
407
408func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode {
409 field := ctx.src.fieldMetadata(ctx.ifield)
410 ctx.ifield++
411 return field
412}
413
414func (ctx *arrayLoaderContext) buffer() *memory.Buffer {
415 buf := ctx.src.buffer(ctx.ibuffer)
416 ctx.ibuffer++
417 return buf
418}
419
420func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface {
421 switch dt := dt.(type) {
422 case *arrow.NullType:
423 return ctx.loadNull()
424
425 case *arrow.BooleanType,
426 *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type,
427 *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type,
428 *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type,
429 *arrow.Decimal128Type,
430 *arrow.Time32Type, *arrow.Time64Type,
431 *arrow.TimestampType,
432 *arrow.Date32Type, *arrow.Date64Type,
433 *arrow.MonthIntervalType, *arrow.DayTimeIntervalType, *arrow.MonthDayNanoIntervalType,
434 *arrow.DurationType:
435 return ctx.loadPrimitive(dt)
436
437 case *arrow.BinaryType, *arrow.StringType:
438 return ctx.loadBinary(dt)
439
440 case *arrow.FixedSizeBinaryType:
441 return ctx.loadFixedSizeBinary(dt)
442
443 case *arrow.ListType:
444 return ctx.loadList(dt)
445
446 case *arrow.FixedSizeListType:
447 return ctx.loadFixedSizeList(dt)
448
449 case *arrow.StructType:
450 return ctx.loadStruct(dt)
451
452 case *arrow.MapType:
453 return ctx.loadMap(dt)
454
455 case arrow.ExtensionType:
456 storage := ctx.loadArray(dt.StorageType())
457 defer storage.Release()
458 return array.NewExtensionArrayWithStorage(dt, storage)
459
460 default:
461 panic(xerrors.Errorf("array type %T not handled yet", dt))
462 }
463}
464
465func (ctx *arrayLoaderContext) loadCommon(nbufs int) (*flatbuf.FieldNode, []*memory.Buffer) {
466 buffers := make([]*memory.Buffer, 0, nbufs)
467 field := ctx.field()
468
469 var buf *memory.Buffer
470 switch field.NullCount() {
471 case 0:
472 ctx.ibuffer++
473 default:
474 buf = ctx.buffer()
475 }
476 buffers = append(buffers, buf)
477
478 return field, buffers
479}
480
481func (ctx *arrayLoaderContext) loadChild(dt arrow.DataType) array.Interface {
482 if ctx.max == 0 {
483 panic("arrow/ipc: nested type limit reached")
484 }
485 ctx.max--
486 sub := ctx.loadArray(dt)
487 ctx.max++
488 return sub
489}
490
491func (ctx *arrayLoaderContext) loadNull() array.Interface {
492 field := ctx.field()
493 data := array.NewData(arrow.Null, int(field.Length()), nil, nil, int(field.NullCount()), 0)
494 defer data.Release()
495
496 return array.MakeFromData(data)
497}
498
499func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType) array.Interface {
500 field, buffers := ctx.loadCommon(2)
501
502 switch field.Length() {
503 case 0:
504 buffers = append(buffers, nil)
505 ctx.ibuffer++
506 default:
507 buffers = append(buffers, ctx.buffer())
508 }
509
510 data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
511 defer data.Release()
512
513 return array.MakeFromData(data)
514}
515
516func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
517 field, buffers := ctx.loadCommon(3)
518 buffers = append(buffers, ctx.buffer(), ctx.buffer())
519
520 data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
521 defer data.Release()
522
523 return array.MakeFromData(data)
524}
525
526func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) array.Interface {
527 field, buffers := ctx.loadCommon(2)
528 buffers = append(buffers, ctx.buffer())
529
530 data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
531 defer data.Release()
532
533 return array.MakeFromData(data)
534}
535
536func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) array.Interface {
537 field, buffers := ctx.loadCommon(2)
538 buffers = append(buffers, ctx.buffer())
539
540 sub := ctx.loadChild(dt.ValueType())
541 defer sub.Release()
542
543 data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0)
544 defer data.Release()
545
546 return array.NewMapData(data)
547}
548
549func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
550 field, buffers := ctx.loadCommon(2)
551 buffers = append(buffers, ctx.buffer())
552
553 sub := ctx.loadChild(dt.Elem())
554 defer sub.Release()
555
556 data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0)
557 defer data.Release()
558
559 return array.NewListData(data)
560}
561
562func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) array.Interface {
563 field, buffers := ctx.loadCommon(1)
564
565 sub := ctx.loadChild(dt.Elem())
566 defer sub.Release()
567
568 data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0)
569 defer data.Release()
570
571 return array.NewFixedSizeListData(data)
572}
573
574func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType) array.Interface {
575 field, buffers := ctx.loadCommon(1)
576
577 arrs := make([]array.Interface, len(dt.Fields()))
578 subs := make([]*array.Data, len(dt.Fields()))
579 for i, f := range dt.Fields() {
580 arrs[i] = ctx.loadChild(f.Type)
581 subs[i] = arrs[i].Data()
582 }
583 defer func() {
584 for i := range arrs {
585 arrs[i].Release()
586 }
587 }()
588
589 data := array.NewData(dt, int(field.Length()), buffers, subs, int(field.NullCount()), 0)
590 defer data.Release()
591
592 return array.NewStructData(data)
593}
594
595func readDictionary(meta *memory.Buffer, types dictTypeMap, r ReadAtSeeker) (int64, array.Interface, error) {
596 // msg := flatbuf.GetRootAsMessage(meta.Bytes(), 0)
597 // var dictBatch flatbuf.DictionaryBatch
598 // initFB(&dictBatch, msg.Header)
599 //
600 // id := dictBatch.Id()
601 // v, ok := types[id]
602 // if !ok {
603 // return id, nil, errors.Errorf("arrow/ipc: no type metadata for dictionary with ID=%d", id)
604 // }
605 //
606 // fields := []arrow.Field{v}
607 //
608 // // we need a schema for the record batch.
609 // schema := arrow.NewSchema(fields, nil)
610 //
611 // // the dictionary is embedded in a record batch with a single column.
612 // recBatch := dictBatch.Data(nil)
613 //
614 // var (
615 // batchMeta *memory.Buffer
616 // body *memory.Buffer
617 // )
618 //
619 //
620 // ctx := &arrayLoaderContext{
621 // src: ipcSource{
622 // meta: &md,
623 // r: bytes.NewReader(body.Bytes()),
624 // },
625 // max: kMaxNestingDepth,
626 // }
627 //
628 // cols := make([]array.Interface, len(schema.Fields()))
629 // for i, field := range schema.Fields() {
630 // cols[i] = ctx.loadArray(field.Type)
631 // }
632 //
633 // batch := array.NewRecord(schema, cols, rows)
634
635 panic("not implemented")
636}