1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "benchmark/benchmark.h"
24 #include "arrow/io/file.h"
25 #include "arrow/io/memory.h"
26 #include "arrow/ipc/api.h"
27 #include "arrow/record_batch.h"
28 #include "arrow/testing/gtest_util.h"
29 #include "arrow/testing/random.h"
30 #include "arrow/type.h"
34 std::shared_ptr
<RecordBatch
> MakeRecordBatch(int64_t total_size
, int64_t num_fields
) {
35 int64_t length
= total_size
/ num_fields
/ sizeof(int64_t);
36 random::RandomArrayGenerator
rand(0x4f32a908);
37 auto type
= arrow::int64();
40 std::vector
<std::shared_ptr
<Field
>> fields
;
41 for (int64_t i
= 0; i
< num_fields
; ++i
) {
44 fields
.push_back(field(ss
.str(), type
));
45 arrays
.push_back(rand
.Int64(length
, 0, 100, 0.1));
48 auto schema
= std::make_shared
<Schema
>(fields
);
49 return RecordBatch::Make(schema
, length
, arrays
);
52 static void WriteRecordBatch(benchmark::State
& state
) { // NOLINT non-const reference
54 constexpr int64_t kTotalSize
= 1 << 20;
55 auto options
= ipc::IpcWriteOptions::Defaults();
57 std::shared_ptr
<ResizableBuffer
> buffer
= *AllocateResizableBuffer(1024);
58 auto record_batch
= MakeRecordBatch(kTotalSize
, state
.range(0));
60 while (state
.KeepRunning()) {
61 io::BufferOutputStream
stream(buffer
);
62 int32_t metadata_length
;
64 ABORT_NOT_OK(ipc::WriteRecordBatch(*record_batch
, 0, &stream
, &metadata_length
,
65 &body_length
, options
));
67 state
.SetBytesProcessed(int64_t(state
.iterations()) * kTotalSize
);
70 static void ReadRecordBatch(benchmark::State
& state
) { // NOLINT non-const reference
72 constexpr int64_t kTotalSize
= 1 << 20;
73 auto options
= ipc::IpcWriteOptions::Defaults();
75 std::shared_ptr
<ResizableBuffer
> buffer
= *AllocateResizableBuffer(1024);
76 auto record_batch
= MakeRecordBatch(kTotalSize
, state
.range(0));
78 io::BufferOutputStream
stream(buffer
);
80 int32_t metadata_length
;
82 ABORT_NOT_OK(ipc::WriteRecordBatch(*record_batch
, 0, &stream
, &metadata_length
,
83 &body_length
, options
));
85 ipc::DictionaryMemo empty_memo
;
86 while (state
.KeepRunning()) {
87 io::BufferReader
reader(buffer
);
88 ABORT_NOT_OK(ipc::ReadRecordBatch(record_batch
->schema(), &empty_memo
,
89 ipc::IpcReadOptions::Defaults(), &reader
));
91 state
.SetBytesProcessed(int64_t(state
.iterations()) * kTotalSize
);
94 static void ReadStream(benchmark::State
& state
) { // NOLINT non-const reference
96 constexpr int64_t kTotalSize
= 1 << 20;
97 auto options
= ipc::IpcWriteOptions::Defaults();
99 std::shared_ptr
<ResizableBuffer
> buffer
= *AllocateResizableBuffer(1024);
101 // Make Arrow IPC stream
102 auto record_batch
= MakeRecordBatch(kTotalSize
, state
.range(0));
104 io::BufferOutputStream
stream(buffer
);
106 auto writer_result
= ipc::MakeStreamWriter(&stream
, record_batch
->schema(), options
);
107 ABORT_NOT_OK(writer_result
);
108 auto writer
= *writer_result
;
109 ABORT_NOT_OK(writer
->WriteRecordBatch(*record_batch
));
110 ABORT_NOT_OK(writer
->Close());
111 ABORT_NOT_OK(stream
.Close());
114 ipc::DictionaryMemo empty_memo
;
115 while (state
.KeepRunning()) {
116 io::BufferReader
input(buffer
);
118 ipc::RecordBatchStreamReader::Open(&input
, ipc::IpcReadOptions::Defaults());
119 ABORT_NOT_OK(reader_result
);
120 auto reader
= *reader_result
;
122 std::shared_ptr
<RecordBatch
> batch
;
123 ABORT_NOT_OK(reader
->ReadNext(&batch
));
124 if (batch
.get() == nullptr) {
129 state
.SetBytesProcessed(int64_t(state
.iterations()) * kTotalSize
);
132 static void DecodeStream(benchmark::State
& state
) { // NOLINT non-const reference
134 constexpr int64_t kTotalSize
= 1 << 20;
135 auto options
= ipc::IpcWriteOptions::Defaults();
137 std::shared_ptr
<ResizableBuffer
> buffer
= *AllocateResizableBuffer(1024);
138 auto record_batch
= MakeRecordBatch(kTotalSize
, state
.range(0));
140 io::BufferOutputStream
stream(buffer
);
142 auto writer_result
= ipc::MakeStreamWriter(&stream
, record_batch
->schema(), options
);
143 ABORT_NOT_OK(writer_result
);
144 auto writer
= *writer_result
;
145 ABORT_NOT_OK(writer
->WriteRecordBatch(*record_batch
));
146 ABORT_NOT_OK(writer
->Close());
148 ipc::DictionaryMemo empty_memo
;
149 while (state
.KeepRunning()) {
150 class NullListener
: public ipc::Listener
{
151 Status
OnRecordBatchDecoded(std::shared_ptr
<RecordBatch
> batch
) override
{
155 ipc::StreamDecoder
decoder(std::shared_ptr
<NullListener
>(&listener
, [](void*) {}),
156 ipc::IpcReadOptions::Defaults());
157 ABORT_NOT_OK(decoder
.Consume(buffer
));
159 state
.SetBytesProcessed(int64_t(state
.iterations()) * kTotalSize
);
162 #define GENERATE_COMPRESSED_DATA_IN_MEMORY() \
163 constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
164 constexpr int64_t kBatches = 16; \
165 auto options = ipc::IpcWriteOptions::Defaults(); \
166 ASSIGN_OR_ABORT(options.codec, \
167 arrow::util::Codec::Create(arrow::Compression::type::ZSTD)); \
168 std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024); \
170 auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
171 io::BufferOutputStream stream(buffer); \
172 auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \
173 for (int i = 0; i < kBatches; i++) { \
174 ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
176 ABORT_NOT_OK(writer->Close()); \
177 ABORT_NOT_OK(stream.Close()); \
180 #define GENERATE_DATA_IN_MEMORY() \
181 constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
182 constexpr int64_t kBatches = 1; \
183 auto options = ipc::IpcWriteOptions::Defaults(); \
184 std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024); \
186 auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
187 io::BufferOutputStream stream(buffer); \
188 auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \
189 ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
190 ABORT_NOT_OK(writer->Close()); \
191 ABORT_NOT_OK(stream.Close()); \
194 #define GENERATE_DATA_TEMP_FILE() \
195 constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
196 constexpr int64_t kBatches = 16; \
197 auto options = ipc::IpcWriteOptions::Defaults(); \
198 ASSIGN_OR_ABORT(auto sink, io::FileOutputStream::Open("/tmp/benchmark.arrow")); \
200 auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
201 auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \
202 ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
203 ABORT_NOT_OK(writer->Close()); \
204 ABORT_NOT_OK(sink->Close()); \
207 #define READ_DATA_IN_MEMORY() auto input = std::make_shared<io::BufferReader>(buffer);
208 #define READ_DATA_TEMP_FILE() \
209 ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow"));
210 #define READ_DATA_MMAP_FILE() \
211 ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \
212 io::FileMode::type::READ));
214 #define READ_SYNC(NAME, GENERATE, READ) \
215 static void NAME(benchmark::State& state) { \
217 for (auto _ : state) { \
219 auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
220 ipc::IpcReadOptions::Defaults()); \
221 const int num_batches = reader->num_record_batches(); \
222 for (int i = 0; i < num_batches; ++i) { \
223 auto batch = *reader->ReadRecordBatch(i); \
226 state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
228 BENCHMARK(NAME)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
230 #define READ_ASYNC(NAME, GENERATE, READ) \
231 static void NAME##Async(benchmark::State& state) { \
233 for (auto _ : state) { \
235 auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
236 ipc::IpcReadOptions::Defaults()); \
237 ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \
238 const int num_batches = reader->num_record_batches(); \
239 for (int i = 0; i < num_batches; ++i) { \
240 auto batch = *generator().result(); \
243 state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
245 BENCHMARK(NAME##Async)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
247 #define READ_BENCHMARK(NAME, GENERATE, READ) \
248 READ_SYNC(NAME, GENERATE, READ); \
249 READ_ASYNC(NAME, GENERATE, READ);
251 READ_BENCHMARK(ReadFile
, GENERATE_DATA_IN_MEMORY
, READ_DATA_IN_MEMORY
);
252 READ_BENCHMARK(ReadTempFile
, GENERATE_DATA_TEMP_FILE
, READ_DATA_TEMP_FILE
);
253 READ_BENCHMARK(ReadMmapFile
, GENERATE_DATA_TEMP_FILE
, READ_DATA_MMAP_FILE
);
254 READ_BENCHMARK(ReadCompressedFile
, GENERATE_COMPRESSED_DATA_IN_MEMORY
,
255 READ_DATA_IN_MEMORY
);
257 BENCHMARK(WriteRecordBatch
)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
258 BENCHMARK(ReadRecordBatch
)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
259 BENCHMARK(ReadStream
)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
260 BENCHMARK(DecodeStream
)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();