]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/arrow/ipc/ipc.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / arrow / ipc / ipc.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package ipc // import "github.com/apache/arrow/go/v6/arrow/ipc"
18
19 import (
20 "io"
21
22 "github.com/apache/arrow/go/v6/arrow"
23 "github.com/apache/arrow/go/v6/arrow/arrio"
24 "github.com/apache/arrow/go/v6/arrow/internal/flatbuf"
25 "github.com/apache/arrow/go/v6/arrow/memory"
26 )
27
28 const (
29 errNotArrowFile = errString("arrow/ipc: not an Arrow file")
30 errInconsistentFileMetadata = errString("arrow/ipc: file is smaller than indicated metadata size")
31 errInconsistentSchema = errString("arrow/ipc: tried to write record batch with different schema")
32 errMaxRecursion = errString("arrow/ipc: max recursion depth reached")
33 errBigArray = errString("arrow/ipc: array larger than 2^31-1 in length")
34
35 kArrowAlignment = 64 // buffers are padded to 64b boundaries (for SIMD)
36 kTensorAlignment = 64 // tensors are padded to 64b boundaries
37 kArrowIPCAlignment = 8 // align on 8b boundaries in IPC
38 )
39
40 var (
41 paddingBytes [kArrowAlignment]byte
42 kEOS = [8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0} // end of stream message
43 kIPCContToken uint32 = 0xFFFFFFFF // 32b continuation indicator for FlatBuffers 8b alignment
44 )
45
46 func paddedLength(nbytes int64, alignment int32) int64 {
47 align := int64(alignment)
48 return ((nbytes + align - 1) / align) * align
49 }
50
51 type errString string
52
53 func (s errString) Error() string {
54 return string(s)
55 }
56
57 type ReadAtSeeker interface {
58 io.Reader
59 io.Seeker
60 io.ReaderAt
61 }
62
63 type config struct {
64 alloc memory.Allocator
65 schema *arrow.Schema
66 footer struct {
67 offset int64
68 }
69 codec flatbuf.CompressionType
70 compressNP int
71 }
72
73 func newConfig(opts ...Option) *config {
74 cfg := &config{
75 alloc: memory.NewGoAllocator(),
76 codec: -1, // uncompressed
77 }
78
79 for _, opt := range opts {
80 opt(cfg)
81 }
82
83 return cfg
84 }
85
86 // Option is a functional option to configure opening or creating Arrow files
87 // and streams.
88 type Option func(*config)
89
90 // WithFooterOffset specifies the Arrow footer position in bytes.
91 func WithFooterOffset(offset int64) Option {
92 return func(cfg *config) {
93 cfg.footer.offset = offset
94 }
95 }
96
97 // WithAllocator specifies the Arrow memory allocator used while building records.
98 func WithAllocator(mem memory.Allocator) Option {
99 return func(cfg *config) {
100 cfg.alloc = mem
101 }
102 }
103
104 // WithSchema specifies the Arrow schema to be used for reading or writing.
105 func WithSchema(schema *arrow.Schema) Option {
106 return func(cfg *config) {
107 cfg.schema = schema
108 }
109 }
110
111 // WithLZ4 tells the writer to use LZ4 Frame compression on the data
112 // buffers before writing. Requires >= Arrow 1.0.0 to read/decompress
113 func WithLZ4() Option {
114 return func(cfg *config) {
115 cfg.codec = flatbuf.CompressionTypeLZ4_FRAME
116 }
117 }
118
119 // WithZstd tells the writer to use ZSTD compression on the data
120 // buffers before writing. Requires >= Arrow 1.0.0 to read/decompress
121 func WithZstd() Option {
122 return func(cfg *config) {
123 cfg.codec = flatbuf.CompressionTypeZSTD
124 }
125 }
126
127 // WithCompressConcurrency specifies a number of goroutines to spin up for
128 // concurrent compression of the body buffers when writing compress IPC records.
129 // If n <= 1 then compression will be done serially without goroutine
130 // parallelization. Default is 0.
131 func WithCompressConcurrency(n int) Option {
132 return func(cfg *config) {
133 cfg.compressNP = n
134 }
135 }
136
137 var (
138 _ arrio.Reader = (*Reader)(nil)
139 _ arrio.Writer = (*Writer)(nil)
140 _ arrio.Reader = (*FileReader)(nil)
141 _ arrio.Writer = (*FileWriter)(nil)
142
143 _ arrio.ReaderAt = (*FileReader)(nil)
144 )