1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
17 package ipc // import "github.com/apache/arrow/go/v6/arrow/ipc"
22 "github.com/apache/arrow/go/v6/arrow"
23 "github.com/apache/arrow/go/v6/arrow/arrio"
24 "github.com/apache/arrow/go/v6/arrow/internal/flatbuf"
25 "github.com/apache/arrow/go/v6/arrow/memory"
29 errNotArrowFile = errString("arrow/ipc: not an Arrow file")
30 errInconsistentFileMetadata = errString("arrow/ipc: file is smaller than indicated metadata size")
31 errInconsistentSchema = errString("arrow/ipc: tried to write record batch with different schema")
32 errMaxRecursion = errString("arrow/ipc: max recursion depth reached")
33 errBigArray = errString("arrow/ipc: array larger than 2^31-1 in length")
35 kArrowAlignment = 64 // buffers are padded to 64b boundaries (for SIMD)
36 kTensorAlignment = 64 // tensors are padded to 64b boundaries
37 kArrowIPCAlignment = 8 // align on 8b boundaries in IPC
41 paddingBytes [kArrowAlignment]byte
42 kEOS = [8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0} // end of stream message
43 kIPCContToken uint32 = 0xFFFFFFFF // 32b continuation indicator for FlatBuffers 8b alignment
46 func paddedLength(nbytes int64, alignment int32) int64 {
47 align := int64(alignment)
48 return ((nbytes + align - 1) / align) * align
53 func (s errString) Error() string {
57 type ReadAtSeeker interface {
64 alloc memory.Allocator
69 codec flatbuf.CompressionType
73 func newConfig(opts ...Option) *config {
75 alloc: memory.NewGoAllocator(),
76 codec: -1, // uncompressed
79 for _, opt := range opts {
86 // Option is a functional option to configure opening or creating Arrow files
88 type Option func(*config)
90 // WithFooterOffset specifies the Arrow footer position in bytes.
91 func WithFooterOffset(offset int64) Option {
92 return func(cfg *config) {
93 cfg.footer.offset = offset
97 // WithAllocator specifies the Arrow memory allocator used while building records.
98 func WithAllocator(mem memory.Allocator) Option {
99 return func(cfg *config) {
104 // WithSchema specifies the Arrow schema to be used for reading or writing.
105 func WithSchema(schema *arrow.Schema) Option {
106 return func(cfg *config) {
111 // WithLZ4 tells the writer to use LZ4 Frame compression on the data
112 // buffers before writing. Requires >= Arrow 1.0.0 to read/decompress
113 func WithLZ4() Option {
114 return func(cfg *config) {
115 cfg.codec = flatbuf.CompressionTypeLZ4_FRAME
119 // WithZstd tells the writer to use ZSTD compression on the data
120 // buffers before writing. Requires >= Arrow 1.0.0 to read/decompress
121 func WithZstd() Option {
122 return func(cfg *config) {
123 cfg.codec = flatbuf.CompressionTypeZSTD
127 // WithCompressConcurrency specifies a number of goroutines to spin up for
128 // concurrent compression of the body buffers when writing compress IPC records.
129 // If n <= 1 then compression will be done serially without goroutine
130 // parallelization. Default is 0.
131 func WithCompressConcurrency(n int) Option {
132 return func(cfg *config) {
138 _ arrio.Reader = (*Reader)(nil)
139 _ arrio.Writer = (*Writer)(nil)
140 _ arrio.Reader = (*FileReader)(nil)
141 _ arrio.Writer = (*FileWriter)(nil)
143 _ arrio.ReaderAt = (*FileReader)(nil)