]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/io/file_benchmark.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / file_benchmark.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/io/buffered.h"
19 #include "arrow/io/file.h"
20 #include "arrow/testing/gtest_util.h"
21 #include "arrow/util/io_util.h"
22 #include "arrow/util/logging.h"
23 #include "arrow/util/windows_compatibility.h"
24
25 #include "benchmark/benchmark.h"
26
27 #include <algorithm>
28 #include <atomic>
29 #include <cstdlib>
30 #include <iostream>
31 #include <thread>
32 #include <valarray>
33
34 #ifdef _WIN32
35
36 #include <io.h>
37
38 #else
39
40 #include <fcntl.h>
41 #include <poll.h>
42 #include <unistd.h>
43
44 #endif
45
46 namespace arrow {
47
48 std::string GetNullFile() {
49 #ifdef _WIN32
50 return "NUL";
51 #else
52 return "/dev/null";
53 #endif
54 }
55
56 const std::valarray<int64_t> small_sizes = {8, 24, 33, 1, 32, 192, 16, 40};
57 const std::valarray<int64_t> large_sizes = {8192, 100000};
58
59 constexpr int64_t kBufferSize = 4096;
60
61 #ifdef _WIN32
62
63 class BackgroundReader {
64 // A class that reads data in the background from a file descriptor
65 // (Windows implementation)
66
67 public:
68 static std::shared_ptr<BackgroundReader> StartReader(int fd) {
69 std::shared_ptr<BackgroundReader> reader(new BackgroundReader(fd));
70 reader->worker_.reset(new std::thread([=] { reader->LoopReading(); }));
71 return reader;
72 }
73 void Stop() { ARROW_CHECK(SetEvent(event_)); }
74 void Join() { worker_->join(); }
75
76 ~BackgroundReader() {
77 ABORT_NOT_OK(internal::FileClose(fd_));
78 ARROW_CHECK(CloseHandle(event_));
79 }
80
81 protected:
82 explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) {
83 file_handle_ = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
84 ARROW_CHECK_NE(file_handle_, INVALID_HANDLE_VALUE);
85 event_ =
86 CreateEvent(nullptr, /* bManualReset=*/TRUE, /* bInitialState=*/FALSE, nullptr);
87 ARROW_CHECK_NE(event_, INVALID_HANDLE_VALUE);
88 }
89
90 void LoopReading() {
91 const HANDLE handles[] = {file_handle_, event_};
92 while (true) {
93 DWORD ret = WaitForMultipleObjects(2, handles, /* bWaitAll=*/FALSE, INFINITE);
94 ARROW_CHECK_NE(ret, WAIT_FAILED);
95 if (ret == WAIT_OBJECT_0 + 1) {
96 // Got stop request
97 break;
98 } else if (ret == WAIT_OBJECT_0) {
99 // File ready for reading
100 total_bytes_ += *internal::FileRead(fd_, buffer_, buffer_size_);
101 } else {
102 ARROW_LOG(FATAL) << "Unexpected WaitForMultipleObjects return value " << ret;
103 }
104 }
105 }
106
107 int fd_;
108 HANDLE file_handle_, event_;
109 int64_t total_bytes_;
110
111 static const int64_t buffer_size_ = 16384;
112 uint8_t buffer_[buffer_size_];
113
114 std::unique_ptr<std::thread> worker_;
115 };
116
117 #else
118
119 class BackgroundReader {
120 // A class that reads data in the background from a file descriptor
121 // (Unix implementation)
122
123 public:
124 static std::shared_ptr<BackgroundReader> StartReader(int fd) {
125 std::shared_ptr<BackgroundReader> reader(new BackgroundReader(fd));
126 reader->worker_.reset(new std::thread([=] { reader->LoopReading(); }));
127 return reader;
128 }
129 void Stop() {
130 const uint8_t data[] = "x";
131 ABORT_NOT_OK(internal::FileWrite(wakeup_w_, data, 1));
132 }
133 void Join() { worker_->join(); }
134
135 ~BackgroundReader() {
136 for (int fd : {fd_, wakeup_r_, wakeup_w_}) {
137 ABORT_NOT_OK(internal::FileClose(fd));
138 }
139 }
140
141 protected:
142 explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) {
143 // Prepare self-pipe trick
144 auto pipe = *internal::CreatePipe();
145 wakeup_r_ = pipe.rfd;
146 wakeup_w_ = pipe.wfd;
147 // Put fd in non-blocking mode
148 fcntl(fd, F_SETFL, O_NONBLOCK);
149 }
150
151 void LoopReading() {
152 struct pollfd pollfds[2];
153 pollfds[0].fd = fd_;
154 pollfds[0].events = POLLIN;
155 pollfds[1].fd = wakeup_r_;
156 pollfds[1].events = POLLIN;
157 while (true) {
158 int ret = poll(pollfds, 2, -1 /* timeout */);
159 if (ret < 1) {
160 std::cerr << "poll() failed with code " << ret << "\n";
161 abort();
162 }
163 if (pollfds[1].revents & POLLIN) {
164 // We're done
165 break;
166 }
167 if (!(pollfds[0].revents & POLLIN)) {
168 continue;
169 }
170 auto result = internal::FileRead(fd_, buffer_, buffer_size_);
171 // There could be a spurious wakeup followed by EAGAIN
172 if (result.ok()) {
173 total_bytes_ += *result;
174 }
175 }
176 }
177
178 int fd_, wakeup_r_, wakeup_w_;
179 int64_t total_bytes_;
180
181 static const int64_t buffer_size_ = 16384;
182 uint8_t buffer_[buffer_size_];
183
184 std::unique_ptr<std::thread> worker_;
185 };
186
187 #endif
188
189 // Set up a pipe with an OutputStream at one end and a BackgroundReader at
190 // the other end.
191 static void SetupPipeWriter(std::shared_ptr<io::OutputStream>* stream,
192 std::shared_ptr<BackgroundReader>* reader) {
193 auto pipe = *internal::CreatePipe();
194 *stream = *io::FileOutputStream::Open(pipe.wfd);
195 *reader = BackgroundReader::StartReader(pipe.rfd);
196 }
197
198 static void BenchmarkStreamingWrites(benchmark::State& state,
199 std::valarray<int64_t> sizes,
200 io::OutputStream* stream,
201 BackgroundReader* reader = nullptr) {
202 const std::string datastr(*std::max_element(std::begin(sizes), std::end(sizes)), 'x');
203 const void* data = datastr.data();
204 const int64_t sum_sizes = sizes.sum();
205
206 while (state.KeepRunning()) {
207 for (const int64_t size : sizes) {
208 ABORT_NOT_OK(stream->Write(data, size));
209 }
210 }
211 // For Windows: need to close writer before joining reader thread.
212 ABORT_NOT_OK(stream->Close());
213
214 const int64_t total_bytes = static_cast<int64_t>(state.iterations()) * sum_sizes;
215 state.SetBytesProcessed(total_bytes);
216
217 if (reader != nullptr) {
218 // Wake up and stop
219 reader->Stop();
220 reader->Join();
221 }
222 }
223
224 // Benchmark writing to /dev/null
225 //
226 // This situation is irrealistic as the kernel likely doesn't
227 // copy the data at all, so we only measure small writes.
228
229 static void FileOutputStreamSmallWritesToNull(
230 benchmark::State& state) { // NOLINT non-const reference
231 auto stream = *io::FileOutputStream::Open(GetNullFile());
232
233 BenchmarkStreamingWrites(state, small_sizes, stream.get());
234 }
235
236 static void BufferedOutputStreamSmallWritesToNull(
237 benchmark::State& state) { // NOLINT non-const reference
238 auto file = *io::FileOutputStream::Open(GetNullFile());
239
240 auto buffered_file =
241 *io::BufferedOutputStream::Create(kBufferSize, default_memory_pool(), file);
242 BenchmarkStreamingWrites(state, small_sizes, buffered_file.get());
243 }
244
245 // Benchmark writing a pipe
246 //
247 // This is slightly more realistic than the above
248
249 static void FileOutputStreamSmallWritesToPipe(
250 benchmark::State& state) { // NOLINT non-const reference
251 std::shared_ptr<io::OutputStream> stream;
252 std::shared_ptr<BackgroundReader> reader;
253 SetupPipeWriter(&stream, &reader);
254
255 BenchmarkStreamingWrites(state, small_sizes, stream.get(), reader.get());
256 }
257
258 static void FileOutputStreamLargeWritesToPipe(
259 benchmark::State& state) { // NOLINT non-const reference
260 std::shared_ptr<io::OutputStream> stream;
261 std::shared_ptr<BackgroundReader> reader;
262 SetupPipeWriter(&stream, &reader);
263
264 BenchmarkStreamingWrites(state, large_sizes, stream.get(), reader.get());
265 }
266
267 static void BufferedOutputStreamSmallWritesToPipe(
268 benchmark::State& state) { // NOLINT non-const reference
269 std::shared_ptr<io::OutputStream> stream;
270 std::shared_ptr<BackgroundReader> reader;
271 SetupPipeWriter(&stream, &reader);
272
273 auto buffered_stream =
274 *io::BufferedOutputStream::Create(kBufferSize, default_memory_pool(), stream);
275 BenchmarkStreamingWrites(state, small_sizes, buffered_stream.get(), reader.get());
276 }
277
278 static void BufferedOutputStreamLargeWritesToPipe(
279 benchmark::State& state) { // NOLINT non-const reference
280 std::shared_ptr<io::OutputStream> stream;
281 std::shared_ptr<BackgroundReader> reader;
282 SetupPipeWriter(&stream, &reader);
283
284 auto buffered_stream =
285 *io::BufferedOutputStream::Create(kBufferSize, default_memory_pool(), stream);
286
287 BenchmarkStreamingWrites(state, large_sizes, buffered_stream.get(), reader.get());
288 }
289
290 // We use real time as we don't want to count CPU time spent in the
291 // BackgroundReader thread
292
293 BENCHMARK(FileOutputStreamSmallWritesToNull)->UseRealTime();
294 BENCHMARK(FileOutputStreamSmallWritesToPipe)->UseRealTime();
295 BENCHMARK(FileOutputStreamLargeWritesToPipe)->UseRealTime();
296
297 BENCHMARK(BufferedOutputStreamSmallWritesToNull)->UseRealTime();
298 BENCHMARK(BufferedOutputStreamSmallWritesToPipe)->UseRealTime();
299 BENCHMARK(BufferedOutputStreamLargeWritesToPipe)->UseRealTime();
300
301 } // namespace arrow