]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/ipc/read_write_benchmark.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / ipc / read_write_benchmark.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "benchmark/benchmark.h"
19
20 #include <cstdint>
21 #include <sstream>
22 #include <string>
23
24 #include "arrow/io/file.h"
25 #include "arrow/io/memory.h"
26 #include "arrow/ipc/api.h"
27 #include "arrow/record_batch.h"
28 #include "arrow/testing/gtest_util.h"
29 #include "arrow/testing/random.h"
30 #include "arrow/type.h"
31
32 namespace arrow {
33
34 std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t total_size, int64_t num_fields) {
35 int64_t length = total_size / num_fields / sizeof(int64_t);
36 random::RandomArrayGenerator rand(0x4f32a908);
37 auto type = arrow::int64();
38
39 ArrayVector arrays;
40 std::vector<std::shared_ptr<Field>> fields;
41 for (int64_t i = 0; i < num_fields; ++i) {
42 std::stringstream ss;
43 ss << "f" << i;
44 fields.push_back(field(ss.str(), type));
45 arrays.push_back(rand.Int64(length, 0, 100, 0.1));
46 }
47
48 auto schema = std::make_shared<Schema>(fields);
49 return RecordBatch::Make(schema, length, arrays);
50 }
51
52 static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference
53 // 1MB
54 constexpr int64_t kTotalSize = 1 << 20;
55 auto options = ipc::IpcWriteOptions::Defaults();
56
57 std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024);
58 auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
59
60 while (state.KeepRunning()) {
61 io::BufferOutputStream stream(buffer);
62 int32_t metadata_length;
63 int64_t body_length;
64 ABORT_NOT_OK(ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length,
65 &body_length, options));
66 }
67 state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
68 }
69
70 static void ReadRecordBatch(benchmark::State& state) { // NOLINT non-const reference
71 // 1MB
72 constexpr int64_t kTotalSize = 1 << 20;
73 auto options = ipc::IpcWriteOptions::Defaults();
74
75 std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024);
76 auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
77
78 io::BufferOutputStream stream(buffer);
79
80 int32_t metadata_length;
81 int64_t body_length;
82 ABORT_NOT_OK(ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length,
83 &body_length, options));
84
85 ipc::DictionaryMemo empty_memo;
86 while (state.KeepRunning()) {
87 io::BufferReader reader(buffer);
88 ABORT_NOT_OK(ipc::ReadRecordBatch(record_batch->schema(), &empty_memo,
89 ipc::IpcReadOptions::Defaults(), &reader));
90 }
91 state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
92 }
93
94 static void ReadStream(benchmark::State& state) { // NOLINT non-const reference
95 // 1MB
96 constexpr int64_t kTotalSize = 1 << 20;
97 auto options = ipc::IpcWriteOptions::Defaults();
98
99 std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024);
100 {
101 // Make Arrow IPC stream
102 auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
103
104 io::BufferOutputStream stream(buffer);
105
106 auto writer_result = ipc::MakeStreamWriter(&stream, record_batch->schema(), options);
107 ABORT_NOT_OK(writer_result);
108 auto writer = *writer_result;
109 ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch));
110 ABORT_NOT_OK(writer->Close());
111 ABORT_NOT_OK(stream.Close());
112 }
113
114 ipc::DictionaryMemo empty_memo;
115 while (state.KeepRunning()) {
116 io::BufferReader input(buffer);
117 auto reader_result =
118 ipc::RecordBatchStreamReader::Open(&input, ipc::IpcReadOptions::Defaults());
119 ABORT_NOT_OK(reader_result);
120 auto reader = *reader_result;
121 while (true) {
122 std::shared_ptr<RecordBatch> batch;
123 ABORT_NOT_OK(reader->ReadNext(&batch));
124 if (batch.get() == nullptr) {
125 break;
126 }
127 }
128 }
129 state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
130 }
131
132 static void DecodeStream(benchmark::State& state) { // NOLINT non-const reference
133 // 1MB
134 constexpr int64_t kTotalSize = 1 << 20;
135 auto options = ipc::IpcWriteOptions::Defaults();
136
137 std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024);
138 auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
139
140 io::BufferOutputStream stream(buffer);
141
142 auto writer_result = ipc::MakeStreamWriter(&stream, record_batch->schema(), options);
143 ABORT_NOT_OK(writer_result);
144 auto writer = *writer_result;
145 ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch));
146 ABORT_NOT_OK(writer->Close());
147
148 ipc::DictionaryMemo empty_memo;
149 while (state.KeepRunning()) {
150 class NullListener : public ipc::Listener {
151 Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> batch) override {
152 return Status::OK();
153 }
154 } listener;
155 ipc::StreamDecoder decoder(std::shared_ptr<NullListener>(&listener, [](void*) {}),
156 ipc::IpcReadOptions::Defaults());
157 ABORT_NOT_OK(decoder.Consume(buffer));
158 }
159 state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
160 }
161
162 #define GENERATE_COMPRESSED_DATA_IN_MEMORY() \
163 constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
164 constexpr int64_t kBatches = 16; \
165 auto options = ipc::IpcWriteOptions::Defaults(); \
166 ASSIGN_OR_ABORT(options.codec, \
167 arrow::util::Codec::Create(arrow::Compression::type::ZSTD)); \
168 std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024); \
169 { \
170 auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
171 io::BufferOutputStream stream(buffer); \
172 auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \
173 for (int i = 0; i < kBatches; i++) { \
174 ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
175 } \
176 ABORT_NOT_OK(writer->Close()); \
177 ABORT_NOT_OK(stream.Close()); \
178 }
179
180 #define GENERATE_DATA_IN_MEMORY() \
181 constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
182 constexpr int64_t kBatches = 1; \
183 auto options = ipc::IpcWriteOptions::Defaults(); \
184 std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024); \
185 { \
186 auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
187 io::BufferOutputStream stream(buffer); \
188 auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \
189 ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
190 ABORT_NOT_OK(writer->Close()); \
191 ABORT_NOT_OK(stream.Close()); \
192 }
193
194 #define GENERATE_DATA_TEMP_FILE() \
195 constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
196 constexpr int64_t kBatches = 16; \
197 auto options = ipc::IpcWriteOptions::Defaults(); \
198 ASSIGN_OR_ABORT(auto sink, io::FileOutputStream::Open("/tmp/benchmark.arrow")); \
199 { \
200 auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
201 auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \
202 ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
203 ABORT_NOT_OK(writer->Close()); \
204 ABORT_NOT_OK(sink->Close()); \
205 }
206
207 #define READ_DATA_IN_MEMORY() auto input = std::make_shared<io::BufferReader>(buffer);
208 #define READ_DATA_TEMP_FILE() \
209 ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow"));
210 #define READ_DATA_MMAP_FILE() \
211 ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \
212 io::FileMode::type::READ));
213
214 #define READ_SYNC(NAME, GENERATE, READ) \
215 static void NAME(benchmark::State& state) { \
216 GENERATE(); \
217 for (auto _ : state) { \
218 READ(); \
219 auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
220 ipc::IpcReadOptions::Defaults()); \
221 const int num_batches = reader->num_record_batches(); \
222 for (int i = 0; i < num_batches; ++i) { \
223 auto batch = *reader->ReadRecordBatch(i); \
224 } \
225 } \
226 state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
227 } \
228 BENCHMARK(NAME)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
229
230 #define READ_ASYNC(NAME, GENERATE, READ) \
231 static void NAME##Async(benchmark::State& state) { \
232 GENERATE(); \
233 for (auto _ : state) { \
234 READ(); \
235 auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
236 ipc::IpcReadOptions::Defaults()); \
237 ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \
238 const int num_batches = reader->num_record_batches(); \
239 for (int i = 0; i < num_batches; ++i) { \
240 auto batch = *generator().result(); \
241 } \
242 } \
243 state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
244 } \
245 BENCHMARK(NAME##Async)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
246
247 #define READ_BENCHMARK(NAME, GENERATE, READ) \
248 READ_SYNC(NAME, GENERATE, READ); \
249 READ_ASYNC(NAME, GENERATE, READ);
250
251 READ_BENCHMARK(ReadFile, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY);
252 READ_BENCHMARK(ReadTempFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE);
253 READ_BENCHMARK(ReadMmapFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE);
254 READ_BENCHMARK(ReadCompressedFile, GENERATE_COMPRESSED_DATA_IN_MEMORY,
255 READ_DATA_IN_MEMORY);
256
257 BENCHMARK(WriteRecordBatch)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
258 BENCHMARK(ReadRecordBatch)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
259 BENCHMARK(ReadStream)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
260 BENCHMARK(DecodeStream)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
261
262 } // namespace arrow