]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/fstream.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / core / fstream.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #include <seastar/core/fstream.hh>
23 #include <seastar/core/align.hh>
24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/semaphore.hh>
26 #include <seastar/core/reactor.hh>
27 #include <seastar/core/when_all.hh>
28 #include <seastar/core/io_intent.hh>
29 #include <fmt/format.h>
30 #include <fmt/ostream.h>
31 #include <malloc.h>
32 #include <string.h>
33
34 namespace seastar {
35
36 static_assert(std::is_nothrow_constructible_v<data_source>);
37 static_assert(std::is_nothrow_move_constructible_v<data_source>);
38
39 static_assert(std::is_nothrow_constructible_v<data_sink>);
40 static_assert(std::is_nothrow_move_constructible_v<data_sink>);
41
42 static_assert(std::is_nothrow_constructible_v<temporary_buffer<char>>);
43 static_assert(std::is_nothrow_move_constructible_v<temporary_buffer<char>>);
44
45 static_assert(std::is_nothrow_constructible_v<input_stream<char>>);
46 static_assert(std::is_nothrow_move_constructible_v<input_stream<char>>);
47
48 static_assert(std::is_nothrow_constructible_v<output_stream<char>>);
49 static_assert(std::is_nothrow_move_constructible_v<output_stream<char>>);
50
51 // The buffers size must not be greater than the limit, but when capping
52 // it we make it 2^n to better utilize the memory allocated for buffers
53 template <typename T>
54 static inline T select_buffer_size(T configured_value, T maximum_value) noexcept {
55 if (configured_value <= maximum_value) {
56 return configured_value;
57 } else {
58 return T(1) << log2floor(maximum_value);
59 }
60 }
61
62 class file_data_source_impl : public data_source_impl {
63 struct issued_read {
64 uint64_t _pos;
65 uint64_t _size;
66 future<temporary_buffer<char>> _ready;
67
68 issued_read(uint64_t pos, uint64_t size, future<temporary_buffer<char>> f)
69 : _pos(pos), _size(size), _ready(std::move(f)) { }
70 };
71
72 reactor& _reactor = engine();
73 file _file;
74 file_input_stream_options _options;
75 uint64_t _pos;
76 uint64_t _remain;
77 circular_buffer<issued_read> _read_buffers;
78 unsigned _reads_in_progress = 0;
79 unsigned _current_read_ahead;
80 future<> _dropped_reads = make_ready_future<>();
81 std::optional<promise<>> _done;
82 size_t _current_buffer_size;
83 bool _in_slow_start = false;
84 io_intent _intent;
85 using unused_ratio_target = std::ratio<25, 100>;
86 private:
87 size_t minimal_buffer_size() const {
88 return std::min(std::max(_options.buffer_size / 4, size_t(8192)), _options.buffer_size);
89 }
90
91 void try_increase_read_ahead() {
92 // Read-ahead can be increased up to user-specified limit if the
93 // consumer has to wait for a buffer and we are not in a slow start
94 // phase.
95 if (_current_read_ahead < _options.read_ahead && !_in_slow_start) {
96 _current_read_ahead++;
97 if (_options.dynamic_adjustments) {
98 auto& h = *_options.dynamic_adjustments;
99 h.read_ahead = std::max(h.read_ahead, _current_read_ahead);
100 }
101 }
102 }
103 unsigned get_initial_read_ahead() const {
104 return _options.dynamic_adjustments
105 ? std::min(_options.dynamic_adjustments->read_ahead, _options.read_ahead)
106 : !!_options.read_ahead;
107 }
108
109 void update_history(uint64_t unused, uint64_t total) {
110 // We are maintaining two windows each no larger than window_size.
111 // Dynamic adjustment logic uses data from both of them, which
112 // essentially means that the actual window size is variable and
113 // in the range [window_size, 2*window_size].
114 auto& h = *_options.dynamic_adjustments;
115 h.current_window.total_read += total;
116 h.current_window.unused_read += unused;
117 if (h.current_window.total_read >= h.window_size) {
118 h.previous_window = h.current_window;
119 h.current_window = { };
120 }
121 }
122 static bool below_target(uint64_t unused, uint64_t total) {
123 return unused * unused_ratio_target::den < total * unused_ratio_target::num;
124 }
125 void update_history_consumed(uint64_t bytes) {
126 if (!_options.dynamic_adjustments) {
127 return;
128 }
129 update_history(0, bytes);
130 if (!_in_slow_start) {
131 return;
132 }
133 unsigned new_size = std::min(_current_buffer_size * 2, _options.buffer_size);
134 auto& h = *_options.dynamic_adjustments;
135 auto total = h.current_window.total_read + h.previous_window.total_read + new_size;
136 auto unused = h.current_window.unused_read + h.previous_window.unused_read + new_size;
137 // Check whether we can safely increase the buffer size to new_size
138 // and still be below unused_ratio_target even if it is entirely
139 // dropped.
140 if (below_target(unused, total)) {
141 _current_buffer_size = new_size;
142 _in_slow_start = _current_buffer_size < _options.buffer_size;
143 }
144 }
145
146 using after_skip = bool_class<class after_skip_tag>;
147 void set_new_buffer_size(after_skip skip) {
148 if (!_options.dynamic_adjustments) {
149 return;
150 }
151 auto& h = *_options.dynamic_adjustments;
152 int64_t total = h.current_window.total_read + h.previous_window.total_read;
153 int64_t unused = h.current_window.unused_read + h.previous_window.unused_read;
154 if (skip == after_skip::yes && below_target(unused, total)) {
155 // Do not attempt to shrink buffer size if we are still below the
156 // target. Otherwise, we could get a bad interaction with
157 // update_history_consumed() which tries to increase the buffer
158 // size as much as possible so that after a single drop we are
159 // still below the target.
160 return;
161 }
162 // Calculate the maximum buffer size that would guarantee that we are
163 // still below unused_ratio_target even if the subsequent reads are
164 // dropped. If it is larger than or equal to the current buffer size do
165 // nothing. If it is smaller then we are back in the slow start phase.
166 auto new_target = (unused_ratio_target::num * total - unused_ratio_target::den * unused) / (unused_ratio_target::den - unused_ratio_target::num);
167 uint64_t new_size = std::max(new_target, int64_t(minimal_buffer_size()));
168 new_size = std::max(uint64_t(1) << log2floor(new_size), uint64_t(minimal_buffer_size()));
169 if (new_size >= _current_buffer_size) {
170 return;
171 }
172 _in_slow_start = true;
173 _current_read_ahead = std::min(_current_read_ahead, 1u);
174 _current_buffer_size = new_size;
175 }
176 void update_history_unused(uint64_t bytes) {
177 if (!_options.dynamic_adjustments) {
178 return;
179 }
180 update_history(bytes, bytes);
181 set_new_buffer_size(after_skip::yes);
182 }
183 // Safely ignores read future even if it is not resolved yet.
184 void ignore_read_future(future<temporary_buffer<char>> read_future) {
185 if (read_future.available()) {
186 read_future.ignore_ready_future();
187 return;
188 }
189 auto f = read_future.then_wrapped([] (auto f) { f.ignore_ready_future(); });
190 _dropped_reads = _dropped_reads.then([f = std::move(f)] () mutable { return std::move(f); });
191 }
192 public:
193 file_data_source_impl(file f, uint64_t offset, uint64_t len, file_input_stream_options options)
194 : _file(std::move(f)), _options(options), _pos(offset), _remain(len), _current_read_ahead(get_initial_read_ahead())
195 {
196 _options.buffer_size = select_buffer_size(_options.buffer_size, _file.disk_read_max_length());
197 _current_buffer_size = _options.buffer_size;
198 // prevent wraparounds
199 set_new_buffer_size(after_skip::no);
200 _remain = std::min(std::numeric_limits<uint64_t>::max() - _pos, _remain);
201 }
202 virtual ~file_data_source_impl() override {
203 // If the data source hasn't been closed, we risk having reads in progress
204 // that will try to access freed memory.
205 assert(_reads_in_progress == 0);
206 }
207 virtual future<temporary_buffer<char>> get() override {
208 if (!_read_buffers.empty() && !_read_buffers.front()._ready.available()) {
209 try_increase_read_ahead();
210 }
211 issue_read_aheads(1);
212 auto ret = std::move(_read_buffers.front());
213 _read_buffers.pop_front();
214 update_history_consumed(ret._size);
215 _reactor._io_stats.fstream_reads += 1;
216 _reactor._io_stats.fstream_read_bytes += ret._size;
217 if (!ret._ready.available()) {
218 _reactor._io_stats.fstream_reads_blocked += 1;
219 _reactor._io_stats.fstream_read_bytes_blocked += ret._size;
220 }
221 return std::move(ret._ready);
222 }
223 virtual future<temporary_buffer<char>> skip(uint64_t n) override {
224 uint64_t dropped = 0;
225 while (n) {
226 if (_read_buffers.empty()) {
227 assert(n <= _remain);
228 _pos += n;
229 _remain -= n;
230 break;
231 }
232 auto& front = _read_buffers.front();
233 if (n < front._size) {
234 front._size -= n;
235 front._pos += n;
236 front._ready = front._ready.then([n] (temporary_buffer<char> buf) {
237 buf.trim_front(n);
238 return buf;
239 });
240 break;
241 } else {
242 ignore_read_future(std::move(front._ready));
243 n -= front._size;
244 dropped += front._size;
245 _reactor._io_stats.fstream_read_aheads_discarded += 1;
246 _reactor._io_stats.fstream_read_ahead_discarded_bytes += front._size;
247 _read_buffers.pop_front();
248 }
249 }
250 update_history_unused(dropped);
251 return make_ready_future<temporary_buffer<char>>();
252 }
253 virtual future<> close() override {
254 _done.emplace();
255 if (!_reads_in_progress) {
256 _done->set_value();
257 }
258 _intent.cancel();
259 return _done->get_future().then([this] {
260 uint64_t dropped = 0;
261 for (auto&& c : _read_buffers) {
262 _reactor._io_stats.fstream_read_aheads_discarded += 1;
263 _reactor._io_stats.fstream_read_ahead_discarded_bytes += c._size;
264 dropped += c._size;
265 ignore_read_future(std::move(c._ready));
266 }
267 update_history_unused(dropped);
268 return std::move(_dropped_reads);
269 });
270 }
271 private:
272 void issue_read_aheads(unsigned additional = 0) {
273 if (_done) {
274 return;
275 }
276 auto ra = _current_read_ahead + additional;
277 _read_buffers.reserve(ra); // prevent push_back() failure
278 while (_read_buffers.size() < ra) {
279 if (!_remain) {
280 if (_read_buffers.size() >= additional) {
281 return;
282 }
283 _read_buffers.emplace_back(_pos, 0, make_ready_future<temporary_buffer<char>>());
284 continue;
285 }
286 ++_reads_in_progress;
287 // if _pos is not dma-aligned, we'll get a short read. Account for that.
288 // Also avoid reading beyond _remain.
289 uint64_t align = _file.disk_read_dma_alignment();
290 auto start = align_down(_pos, align);
291 auto end = std::min(align_up(start + _current_buffer_size, align), _pos + _remain);
292 auto len = end - start;
293 auto actual_size = std::min(end - _pos, _remain);
294 _read_buffers.emplace_back(_pos, actual_size, futurize_invoke([&] {
295 return _file.dma_read_bulk<char>(start, len, _options.io_priority_class, &_intent);
296 }).then_wrapped(
297 [this, start, pos = _pos, remain = _remain] (future<temporary_buffer<char>> ret) {
298 --_reads_in_progress;
299 if (_done && !_reads_in_progress) {
300 _done->set_value();
301 }
302 if (ret.failed()) {
303 // no games needed
304 return ret;
305 } else {
306 // first or last buffer, need trimming
307 auto tmp = ret.get0();
308 auto real_end = start + tmp.size();
309 if (real_end <= pos) {
310 return make_ready_future<temporary_buffer<char>>();
311 }
312 if (real_end > pos + remain) {
313 tmp.trim(pos + remain - start);
314 }
315 if (start < pos) {
316 tmp.trim_front(pos - start);
317 }
318 return make_ready_future<temporary_buffer<char>>(std::move(tmp));
319 }
320 }));
321 _remain -= end - _pos;
322 _pos = end;
323 };
324 }
325 };
326
327 class file_data_source : public data_source {
328 public:
329 file_data_source(file f, uint64_t offset, uint64_t len, file_input_stream_options options)
330 : data_source(std::make_unique<file_data_source_impl>(
331 std::move(f), offset, len, options)) {}
332 };
333
334
335 input_stream<char> make_file_input_stream(
336 file f, uint64_t offset, uint64_t len, file_input_stream_options options) {
337 return input_stream<char>(file_data_source(std::move(f), offset, len, std::move(options)));
338 }
339
340 input_stream<char> make_file_input_stream(
341 file f, uint64_t offset, file_input_stream_options options) {
342 return make_file_input_stream(std::move(f), offset, std::numeric_limits<uint64_t>::max(), std::move(options));
343 }
344
345 input_stream<char> make_file_input_stream(
346 file f, file_input_stream_options options) {
347 return make_file_input_stream(std::move(f), 0, std::move(options));
348 }
349
350
351 class file_data_sink_impl : public data_sink_impl {
352 file _file;
353 file_output_stream_options _options;
354 uint64_t _pos = 0;
355 semaphore _write_behind_sem = { _options.write_behind };
356 future<> _background_writes_done = make_ready_future<>();
357 bool _failed = false;
358 public:
359 file_data_sink_impl(file f, file_output_stream_options options)
360 : _file(std::move(f)), _options(options) {
361 _options.buffer_size = select_buffer_size<unsigned>(_options.buffer_size, _file.disk_write_max_length());
362 _write_behind_sem.ensure_space_for_waiters(1); // So that wait() doesn't throw
363 }
364 future<> put(net::packet data) override { abort(); }
365 virtual temporary_buffer<char> allocate_buffer(size_t size) override {
366 return temporary_buffer<char>::aligned(_file.memory_dma_alignment(), size);
367 }
368 using data_sink_impl::put;
369 virtual future<> put(temporary_buffer<char> buf) override {
370 uint64_t pos = _pos;
371 _pos += buf.size();
372 if (!_options.write_behind) {
373 return do_put(pos, std::move(buf));
374 }
375 // Write behind strategy:
376 //
377 // 1. Issue N writes in parallel, using a semaphore to limit to N
378 // 2. Collect results in _background_writes_done, merging exception futures
379 // 3. If we've already seen a failure, don't issue more writes.
380 return _write_behind_sem.wait().then([this, pos, buf = std::move(buf)] () mutable {
381 if (_failed) {
382 _write_behind_sem.signal();
383 auto ret = std::move(_background_writes_done);
384 _background_writes_done = make_ready_future<>();
385 return ret;
386 }
387 auto this_write_done = do_put(pos, std::move(buf)).finally([this] {
388 _write_behind_sem.signal();
389 });
390 _background_writes_done = when_all(std::move(_background_writes_done), std::move(this_write_done))
391 .then([this] (std::tuple<future<>, future<>> possible_errors) {
392 // merge the two errors, preferring the first
393 auto& e1 = std::get<0>(possible_errors);
394 auto& e2 = std::get<1>(possible_errors);
395 if (e1.failed()) {
396 e2.ignore_ready_future();
397 return std::move(e1);
398 } else {
399 if (e2.failed()) {
400 _failed = true;
401 }
402 return std::move(e2);
403 }
404 });
405 return make_ready_future<>();
406 });
407 }
408 private:
409 future<> do_put(uint64_t pos, temporary_buffer<char> buf) noexcept {
410 try {
411 // put() must usually be of chunks multiple of file::dma_alignment.
412 // Only the last part can have an unaligned length. If put() was
413 // called again with an unaligned pos, we have a bug in the caller.
414 assert(!(pos & (_file.disk_write_dma_alignment() - 1)));
415 bool truncate = false;
416 auto p = static_cast<const char*>(buf.get());
417 size_t buf_size = buf.size();
418
419 if ((buf.size() & (_file.disk_write_dma_alignment() - 1)) != 0) {
420 // If buf size isn't aligned, copy its content into a new aligned buf.
421 // This should only happen when the user calls output_stream::flush().
422 auto tmp = allocate_buffer(align_up(buf.size(), _file.disk_write_dma_alignment()));
423 ::memcpy(tmp.get_write(), buf.get(), buf.size());
424 ::memset(tmp.get_write() + buf.size(), 0, tmp.size() - buf.size());
425 buf = std::move(tmp);
426 p = buf.get();
427 buf_size = buf.size();
428 truncate = true;
429 }
430
431 return _file.dma_write(pos, p, buf_size, _options.io_priority_class).then(
432 [this, pos, buf = std::move(buf), truncate, buf_size] (size_t size) mutable {
433 // short write handling
434 if (size < buf_size) {
435 buf.trim_front(size);
436 return do_put(pos + size, std::move(buf)).then([this, truncate] {
437 if (truncate) {
438 return _file.truncate(_pos);
439 }
440 return make_ready_future<>();
441 });
442 }
443 if (truncate) {
444 return _file.truncate(_pos);
445 }
446 return make_ready_future<>();
447 });
448 } catch (...) {
449 return make_exception_future<>(std::current_exception());
450 }
451 }
452 future<> wait() noexcept {
453 // restore to pristine state; for flush() + close() sequence
454 // (we allow either flush, or close, or both)
455 return _write_behind_sem.wait(_options.write_behind).then([this] {
456 return std::exchange(_background_writes_done, make_ready_future<>());
457 }).finally([this] {
458 _write_behind_sem.signal(_options.write_behind);
459 });
460 }
461 public:
462 virtual future<> flush() override {
463 return wait().then([this] {
464 return _file.flush();
465 });
466 }
467 virtual future<> close() noexcept override {
468 return wait().finally([this] {
469 return _file.close();
470 });
471 }
472 virtual size_t buffer_size() const noexcept override { return _options.buffer_size; }
473 };
474
475 SEASTAR_INCLUDE_API_V2 namespace api_v2 {
476
477 data_sink make_file_data_sink(file f, file_output_stream_options options) {
478 return data_sink(std::make_unique<file_data_sink_impl>(std::move(f), options));
479 }
480
481 output_stream<char> make_file_output_stream(file f, size_t buffer_size) {
482 file_output_stream_options options;
483 options.buffer_size = buffer_size;
484 // Don't generate a deprecation warning for the unsafe functions calling each other.
485 #pragma GCC diagnostic push
486 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
487 return api_v2::make_file_output_stream(std::move(f), options);
488 #pragma GCC diagnostic pop
489 }
490
491 output_stream<char> make_file_output_stream(file f, file_output_stream_options options) {
492 // Don't generate a deprecation warning for the unsafe functions calling each other.
493 #pragma GCC diagnostic push
494 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
495 return output_stream<char>(api_v2::make_file_data_sink(std::move(f), options));
496 #pragma GCC diagnostic pop
497 }
498
499 }
500
501 SEASTAR_INCLUDE_API_V3 namespace api_v3 {
502 inline namespace and_newer {
503
504 future<data_sink> make_file_data_sink(file f, file_output_stream_options options) noexcept {
505 try {
506 return make_ready_future<data_sink>(std::make_unique<file_data_sink_impl>(f, options));
507 } catch (...) {
508 return f.close().then_wrapped([ex = std::current_exception(), f] (future<> fut) mutable {
509 if (fut.failed()) {
510 try {
511 std::rethrow_exception(std::move(ex));
512 } catch (...) {
513 std::throw_with_nested(std::runtime_error(fmt::format("While handling failed construction of data_sink, caught exception: {}",
514 fut.get_exception())));
515 }
516 }
517 return make_exception_future<data_sink>(std::move(ex));
518 });
519 }
520 }
521
522 future<output_stream<char>> make_file_output_stream(file f, size_t buffer_size) noexcept {
523 file_output_stream_options options;
524 options.buffer_size = buffer_size;
525 return api_v3::and_newer::make_file_output_stream(std::move(f), options);
526 }
527
528 future<output_stream<char>> make_file_output_stream(file f, file_output_stream_options options) noexcept {
529 return api_v3::and_newer::make_file_data_sink(std::move(f), options).then([] (data_sink&& ds) {
530 return output_stream<char>(std::move(ds));
531 });
532 }
533
534 }
535 }
536
537 /*
538 * template initialization, definition in iostream-impl.hh
539 */
540 template struct internal::stream_copy_consumer<char>;
541 template future<> copy<char>(input_stream<char>&, output_stream<char>&);
542
543 }
544