1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/io/buffered.h"
19 #include "arrow/io/file.h"
20 #include "arrow/testing/gtest_util.h"
21 #include "arrow/util/io_util.h"
22 #include "arrow/util/logging.h"
23 #include "arrow/util/windows_compatibility.h"
25 #include "benchmark/benchmark.h"
48 std::string
GetNullFile() {
56 const std::valarray
<int64_t> small_sizes
= {8, 24, 33, 1, 32, 192, 16, 40};
57 const std::valarray
<int64_t> large_sizes
= {8192, 100000};
59 constexpr int64_t kBufferSize
= 4096;
63 class BackgroundReader
{
64 // A class that reads data in the background from a file descriptor
65 // (Windows implementation)
68 static std::shared_ptr
<BackgroundReader
> StartReader(int fd
) {
69 std::shared_ptr
<BackgroundReader
> reader(new BackgroundReader(fd
));
70 reader
->worker_
.reset(new std::thread([=] { reader
->LoopReading(); }));
73 void Stop() { ARROW_CHECK(SetEvent(event_
)); }
74 void Join() { worker_
->join(); }
77 ABORT_NOT_OK(internal::FileClose(fd_
));
78 ARROW_CHECK(CloseHandle(event_
));
82 explicit BackgroundReader(int fd
) : fd_(fd
), total_bytes_(0) {
83 file_handle_
= reinterpret_cast<HANDLE
>(_get_osfhandle(fd
));
84 ARROW_CHECK_NE(file_handle_
, INVALID_HANDLE_VALUE
);
86 CreateEvent(nullptr, /* bManualReset=*/TRUE
, /* bInitialState=*/FALSE
, nullptr);
87 ARROW_CHECK_NE(event_
, INVALID_HANDLE_VALUE
);
91 const HANDLE handles
[] = {file_handle_
, event_
};
93 DWORD ret
= WaitForMultipleObjects(2, handles
, /* bWaitAll=*/FALSE
, INFINITE
);
94 ARROW_CHECK_NE(ret
, WAIT_FAILED
);
95 if (ret
== WAIT_OBJECT_0
+ 1) {
98 } else if (ret
== WAIT_OBJECT_0
) {
99 // File ready for reading
100 total_bytes_
+= *internal::FileRead(fd_
, buffer_
, buffer_size_
);
102 ARROW_LOG(FATAL
) << "Unexpected WaitForMultipleObjects return value " << ret
;
108 HANDLE file_handle_
, event_
;
109 int64_t total_bytes_
;
111 static const int64_t buffer_size_
= 16384;
112 uint8_t buffer_
[buffer_size_
];
114 std::unique_ptr
<std::thread
> worker_
;
119 class BackgroundReader
{
120 // A class that reads data in the background from a file descriptor
121 // (Unix implementation)
124 static std::shared_ptr
<BackgroundReader
> StartReader(int fd
) {
125 std::shared_ptr
<BackgroundReader
> reader(new BackgroundReader(fd
));
126 reader
->worker_
.reset(new std::thread([=] { reader
->LoopReading(); }));
130 const uint8_t data
[] = "x";
131 ABORT_NOT_OK(internal::FileWrite(wakeup_w_
, data
, 1));
133 void Join() { worker_
->join(); }
135 ~BackgroundReader() {
136 for (int fd
: {fd_
, wakeup_r_
, wakeup_w_
}) {
137 ABORT_NOT_OK(internal::FileClose(fd
));
142 explicit BackgroundReader(int fd
) : fd_(fd
), total_bytes_(0) {
143 // Prepare self-pipe trick
144 auto pipe
= *internal::CreatePipe();
145 wakeup_r_
= pipe
.rfd
;
146 wakeup_w_
= pipe
.wfd
;
147 // Put fd in non-blocking mode
148 fcntl(fd
, F_SETFL
, O_NONBLOCK
);
152 struct pollfd pollfds
[2];
154 pollfds
[0].events
= POLLIN
;
155 pollfds
[1].fd
= wakeup_r_
;
156 pollfds
[1].events
= POLLIN
;
158 int ret
= poll(pollfds
, 2, -1 /* timeout */);
160 std::cerr
<< "poll() failed with code " << ret
<< "\n";
163 if (pollfds
[1].revents
& POLLIN
) {
167 if (!(pollfds
[0].revents
& POLLIN
)) {
170 auto result
= internal::FileRead(fd_
, buffer_
, buffer_size_
);
171 // There could be a spurious wakeup followed by EAGAIN
173 total_bytes_
+= *result
;
178 int fd_
, wakeup_r_
, wakeup_w_
;
179 int64_t total_bytes_
;
181 static const int64_t buffer_size_
= 16384;
182 uint8_t buffer_
[buffer_size_
];
184 std::unique_ptr
<std::thread
> worker_
;
189 // Set up a pipe with an OutputStream at one end and a BackgroundReader at
191 static void SetupPipeWriter(std::shared_ptr
<io::OutputStream
>* stream
,
192 std::shared_ptr
<BackgroundReader
>* reader
) {
193 auto pipe
= *internal::CreatePipe();
194 *stream
= *io::FileOutputStream::Open(pipe
.wfd
);
195 *reader
= BackgroundReader::StartReader(pipe
.rfd
);
198 static void BenchmarkStreamingWrites(benchmark::State
& state
,
199 std::valarray
<int64_t> sizes
,
200 io::OutputStream
* stream
,
201 BackgroundReader
* reader
= nullptr) {
202 const std::string
datastr(*std::max_element(std::begin(sizes
), std::end(sizes
)), 'x');
203 const void* data
= datastr
.data();
204 const int64_t sum_sizes
= sizes
.sum();
206 while (state
.KeepRunning()) {
207 for (const int64_t size
: sizes
) {
208 ABORT_NOT_OK(stream
->Write(data
, size
));
211 // For Windows: need to close writer before joining reader thread.
212 ABORT_NOT_OK(stream
->Close());
214 const int64_t total_bytes
= static_cast<int64_t>(state
.iterations()) * sum_sizes
;
215 state
.SetBytesProcessed(total_bytes
);
217 if (reader
!= nullptr) {
224 // Benchmark writing to /dev/null
226 // This situation is irrealistic as the kernel likely doesn't
227 // copy the data at all, so we only measure small writes.
229 static void FileOutputStreamSmallWritesToNull(
230 benchmark::State
& state
) { // NOLINT non-const reference
231 auto stream
= *io::FileOutputStream::Open(GetNullFile());
233 BenchmarkStreamingWrites(state
, small_sizes
, stream
.get());
236 static void BufferedOutputStreamSmallWritesToNull(
237 benchmark::State
& state
) { // NOLINT non-const reference
238 auto file
= *io::FileOutputStream::Open(GetNullFile());
241 *io::BufferedOutputStream::Create(kBufferSize
, default_memory_pool(), file
);
242 BenchmarkStreamingWrites(state
, small_sizes
, buffered_file
.get());
245 // Benchmark writing a pipe
247 // This is slightly more realistic than the above
249 static void FileOutputStreamSmallWritesToPipe(
250 benchmark::State
& state
) { // NOLINT non-const reference
251 std::shared_ptr
<io::OutputStream
> stream
;
252 std::shared_ptr
<BackgroundReader
> reader
;
253 SetupPipeWriter(&stream
, &reader
);
255 BenchmarkStreamingWrites(state
, small_sizes
, stream
.get(), reader
.get());
258 static void FileOutputStreamLargeWritesToPipe(
259 benchmark::State
& state
) { // NOLINT non-const reference
260 std::shared_ptr
<io::OutputStream
> stream
;
261 std::shared_ptr
<BackgroundReader
> reader
;
262 SetupPipeWriter(&stream
, &reader
);
264 BenchmarkStreamingWrites(state
, large_sizes
, stream
.get(), reader
.get());
267 static void BufferedOutputStreamSmallWritesToPipe(
268 benchmark::State
& state
) { // NOLINT non-const reference
269 std::shared_ptr
<io::OutputStream
> stream
;
270 std::shared_ptr
<BackgroundReader
> reader
;
271 SetupPipeWriter(&stream
, &reader
);
273 auto buffered_stream
=
274 *io::BufferedOutputStream::Create(kBufferSize
, default_memory_pool(), stream
);
275 BenchmarkStreamingWrites(state
, small_sizes
, buffered_stream
.get(), reader
.get());
278 static void BufferedOutputStreamLargeWritesToPipe(
279 benchmark::State
& state
) { // NOLINT non-const reference
280 std::shared_ptr
<io::OutputStream
> stream
;
281 std::shared_ptr
<BackgroundReader
> reader
;
282 SetupPipeWriter(&stream
, &reader
);
284 auto buffered_stream
=
285 *io::BufferedOutputStream::Create(kBufferSize
, default_memory_pool(), stream
);
287 BenchmarkStreamingWrites(state
, large_sizes
, buffered_stream
.get(), reader
.get());
290 // We use real time as we don't want to count CPU time spent in the
291 // BackgroundReader thread
293 BENCHMARK(FileOutputStreamSmallWritesToNull
)->UseRealTime();
294 BENCHMARK(FileOutputStreamSmallWritesToPipe
)->UseRealTime();
295 BENCHMARK(FileOutputStreamLargeWritesToPipe
)->UseRealTime();
297 BENCHMARK(BufferedOutputStreamSmallWritesToNull
)->UseRealTime();
298 BENCHMARK(BufferedOutputStreamSmallWritesToPipe
)->UseRealTime();
299 BENCHMARK(BufferedOutputStreamLargeWritesToPipe
)->UseRealTime();