2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
22 #include <seastar/core/fstream.hh>
23 #include <seastar/core/align.hh>
24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/semaphore.hh>
26 #include <seastar/core/reactor.hh>
27 #include <seastar/core/when_all.hh>
28 #include <seastar/core/io_intent.hh>
29 #include <fmt/format.h>
30 #include <fmt/ostream.h>
36 static_assert(std::is_nothrow_constructible_v
<data_source
>);
37 static_assert(std::is_nothrow_move_constructible_v
<data_source
>);
39 static_assert(std::is_nothrow_constructible_v
<data_sink
>);
40 static_assert(std::is_nothrow_move_constructible_v
<data_sink
>);
42 static_assert(std::is_nothrow_constructible_v
<temporary_buffer
<char>>);
43 static_assert(std::is_nothrow_move_constructible_v
<temporary_buffer
<char>>);
45 static_assert(std::is_nothrow_constructible_v
<input_stream
<char>>);
46 static_assert(std::is_nothrow_move_constructible_v
<input_stream
<char>>);
48 static_assert(std::is_nothrow_constructible_v
<output_stream
<char>>);
49 static_assert(std::is_nothrow_move_constructible_v
<output_stream
<char>>);
51 // The buffers size must not be greater than the limit, but when capping
52 // it we make it 2^n to better utilize the memory allocated for buffers
54 static inline T
select_buffer_size(T configured_value
, T maximum_value
) noexcept
{
55 if (configured_value
<= maximum_value
) {
56 return configured_value
;
58 return T(1) << log2floor(maximum_value
);
62 class file_data_source_impl
: public data_source_impl
{
66 future
<temporary_buffer
<char>> _ready
;
68 issued_read(uint64_t pos
, uint64_t size
, future
<temporary_buffer
<char>> f
)
69 : _pos(pos
), _size(size
), _ready(std::move(f
)) { }
72 reactor
& _reactor
= engine();
74 file_input_stream_options _options
;
77 circular_buffer
<issued_read
> _read_buffers
;
78 unsigned _reads_in_progress
= 0;
79 unsigned _current_read_ahead
;
80 future
<> _dropped_reads
= make_ready_future
<>();
81 std::optional
<promise
<>> _done
;
82 size_t _current_buffer_size
;
83 bool _in_slow_start
= false;
85 using unused_ratio_target
= std::ratio
<25, 100>;
87 size_t minimal_buffer_size() const {
88 return std::min(std::max(_options
.buffer_size
/ 4, size_t(8192)), _options
.buffer_size
);
91 void try_increase_read_ahead() {
92 // Read-ahead can be increased up to user-specified limit if the
93 // consumer has to wait for a buffer and we are not in a slow start
95 if (_current_read_ahead
< _options
.read_ahead
&& !_in_slow_start
) {
96 _current_read_ahead
++;
97 if (_options
.dynamic_adjustments
) {
98 auto& h
= *_options
.dynamic_adjustments
;
99 h
.read_ahead
= std::max(h
.read_ahead
, _current_read_ahead
);
103 unsigned get_initial_read_ahead() const {
104 return _options
.dynamic_adjustments
105 ? std::min(_options
.dynamic_adjustments
->read_ahead
, _options
.read_ahead
)
106 : !!_options
.read_ahead
;
109 void update_history(uint64_t unused
, uint64_t total
) {
110 // We are maintaining two windows each no larger than window_size.
111 // Dynamic adjustment logic uses data from both of them, which
112 // essentially means that the actual window size is variable and
113 // in the range [window_size, 2*window_size].
114 auto& h
= *_options
.dynamic_adjustments
;
115 h
.current_window
.total_read
+= total
;
116 h
.current_window
.unused_read
+= unused
;
117 if (h
.current_window
.total_read
>= h
.window_size
) {
118 h
.previous_window
= h
.current_window
;
119 h
.current_window
= { };
122 static bool below_target(uint64_t unused
, uint64_t total
) {
123 return unused
* unused_ratio_target::den
< total
* unused_ratio_target::num
;
125 void update_history_consumed(uint64_t bytes
) {
126 if (!_options
.dynamic_adjustments
) {
129 update_history(0, bytes
);
130 if (!_in_slow_start
) {
133 unsigned new_size
= std::min(_current_buffer_size
* 2, _options
.buffer_size
);
134 auto& h
= *_options
.dynamic_adjustments
;
135 auto total
= h
.current_window
.total_read
+ h
.previous_window
.total_read
+ new_size
;
136 auto unused
= h
.current_window
.unused_read
+ h
.previous_window
.unused_read
+ new_size
;
137 // Check whether we can safely increase the buffer size to new_size
138 // and still be below unused_ratio_target even if it is entirely
140 if (below_target(unused
, total
)) {
141 _current_buffer_size
= new_size
;
142 _in_slow_start
= _current_buffer_size
< _options
.buffer_size
;
146 using after_skip
= bool_class
<class after_skip_tag
>;
147 void set_new_buffer_size(after_skip skip
) {
148 if (!_options
.dynamic_adjustments
) {
151 auto& h
= *_options
.dynamic_adjustments
;
152 int64_t total
= h
.current_window
.total_read
+ h
.previous_window
.total_read
;
153 int64_t unused
= h
.current_window
.unused_read
+ h
.previous_window
.unused_read
;
154 if (skip
== after_skip::yes
&& below_target(unused
, total
)) {
155 // Do not attempt to shrink buffer size if we are still below the
156 // target. Otherwise, we could get a bad interaction with
157 // update_history_consumed() which tries to increase the buffer
158 // size as much as possible so that after a single drop we are
159 // still below the target.
162 // Calculate the maximum buffer size that would guarantee that we are
163 // still below unused_ratio_target even if the subsequent reads are
164 // dropped. If it is larger than or equal to the current buffer size do
165 // nothing. If it is smaller then we are back in the slow start phase.
166 auto new_target
= (unused_ratio_target::num
* total
- unused_ratio_target::den
* unused
) / (unused_ratio_target::den
- unused_ratio_target::num
);
167 uint64_t new_size
= std::max(new_target
, int64_t(minimal_buffer_size()));
168 new_size
= std::max(uint64_t(1) << log2floor(new_size
), uint64_t(minimal_buffer_size()));
169 if (new_size
>= _current_buffer_size
) {
172 _in_slow_start
= true;
173 _current_read_ahead
= std::min(_current_read_ahead
, 1u);
174 _current_buffer_size
= new_size
;
176 void update_history_unused(uint64_t bytes
) {
177 if (!_options
.dynamic_adjustments
) {
180 update_history(bytes
, bytes
);
181 set_new_buffer_size(after_skip::yes
);
183 // Safely ignores read future even if it is not resolved yet.
184 void ignore_read_future(future
<temporary_buffer
<char>> read_future
) {
185 if (read_future
.available()) {
186 read_future
.ignore_ready_future();
189 auto f
= read_future
.then_wrapped([] (auto f
) { f
.ignore_ready_future(); });
190 _dropped_reads
= _dropped_reads
.then([f
= std::move(f
)] () mutable { return std::move(f
); });
193 file_data_source_impl(file f
, uint64_t offset
, uint64_t len
, file_input_stream_options options
)
194 : _file(std::move(f
)), _options(options
), _pos(offset
), _remain(len
), _current_read_ahead(get_initial_read_ahead())
196 _options
.buffer_size
= select_buffer_size(_options
.buffer_size
, _file
.disk_read_max_length());
197 _current_buffer_size
= _options
.buffer_size
;
198 // prevent wraparounds
199 set_new_buffer_size(after_skip::no
);
200 _remain
= std::min(std::numeric_limits
<uint64_t>::max() - _pos
, _remain
);
202 virtual ~file_data_source_impl() override
{
203 // If the data source hasn't been closed, we risk having reads in progress
204 // that will try to access freed memory.
205 assert(_reads_in_progress
== 0);
207 virtual future
<temporary_buffer
<char>> get() override
{
208 if (!_read_buffers
.empty() && !_read_buffers
.front()._ready
.available()) {
209 try_increase_read_ahead();
211 issue_read_aheads(1);
212 auto ret
= std::move(_read_buffers
.front());
213 _read_buffers
.pop_front();
214 update_history_consumed(ret
._size
);
215 _reactor
._io_stats
.fstream_reads
+= 1;
216 _reactor
._io_stats
.fstream_read_bytes
+= ret
._size
;
217 if (!ret
._ready
.available()) {
218 _reactor
._io_stats
.fstream_reads_blocked
+= 1;
219 _reactor
._io_stats
.fstream_read_bytes_blocked
+= ret
._size
;
221 return std::move(ret
._ready
);
223 virtual future
<temporary_buffer
<char>> skip(uint64_t n
) override
{
224 uint64_t dropped
= 0;
226 if (_read_buffers
.empty()) {
227 assert(n
<= _remain
);
232 auto& front
= _read_buffers
.front();
233 if (n
< front
._size
) {
236 front
._ready
= front
._ready
.then([n
] (temporary_buffer
<char> buf
) {
242 ignore_read_future(std::move(front
._ready
));
244 dropped
+= front
._size
;
245 _reactor
._io_stats
.fstream_read_aheads_discarded
+= 1;
246 _reactor
._io_stats
.fstream_read_ahead_discarded_bytes
+= front
._size
;
247 _read_buffers
.pop_front();
250 update_history_unused(dropped
);
251 return make_ready_future
<temporary_buffer
<char>>();
253 virtual future
<> close() override
{
255 if (!_reads_in_progress
) {
259 return _done
->get_future().then([this] {
260 uint64_t dropped
= 0;
261 for (auto&& c
: _read_buffers
) {
262 _reactor
._io_stats
.fstream_read_aheads_discarded
+= 1;
263 _reactor
._io_stats
.fstream_read_ahead_discarded_bytes
+= c
._size
;
265 ignore_read_future(std::move(c
._ready
));
267 update_history_unused(dropped
);
268 return std::move(_dropped_reads
);
272 void issue_read_aheads(unsigned additional
= 0) {
276 auto ra
= _current_read_ahead
+ additional
;
277 _read_buffers
.reserve(ra
); // prevent push_back() failure
278 while (_read_buffers
.size() < ra
) {
280 if (_read_buffers
.size() >= additional
) {
283 _read_buffers
.emplace_back(_pos
, 0, make_ready_future
<temporary_buffer
<char>>());
286 ++_reads_in_progress
;
287 // if _pos is not dma-aligned, we'll get a short read. Account for that.
288 // Also avoid reading beyond _remain.
289 uint64_t align
= _file
.disk_read_dma_alignment();
290 auto start
= align_down(_pos
, align
);
291 auto end
= std::min(align_up(start
+ _current_buffer_size
, align
), _pos
+ _remain
);
292 auto len
= end
- start
;
293 auto actual_size
= std::min(end
- _pos
, _remain
);
294 _read_buffers
.emplace_back(_pos
, actual_size
, futurize_invoke([&] {
295 return _file
.dma_read_bulk
<char>(start
, len
, _options
.io_priority_class
, &_intent
);
297 [this, start
, pos
= _pos
, remain
= _remain
] (future
<temporary_buffer
<char>> ret
) {
298 --_reads_in_progress
;
299 if (_done
&& !_reads_in_progress
) {
306 // first or last buffer, need trimming
307 auto tmp
= ret
.get0();
308 auto real_end
= start
+ tmp
.size();
309 if (real_end
<= pos
) {
310 return make_ready_future
<temporary_buffer
<char>>();
312 if (real_end
> pos
+ remain
) {
313 tmp
.trim(pos
+ remain
- start
);
316 tmp
.trim_front(pos
- start
);
318 return make_ready_future
<temporary_buffer
<char>>(std::move(tmp
));
321 _remain
-= end
- _pos
;
327 class file_data_source
: public data_source
{
329 file_data_source(file f
, uint64_t offset
, uint64_t len
, file_input_stream_options options
)
330 : data_source(std::make_unique
<file_data_source_impl
>(
331 std::move(f
), offset
, len
, options
)) {}
335 input_stream
<char> make_file_input_stream(
336 file f
, uint64_t offset
, uint64_t len
, file_input_stream_options options
) {
337 return input_stream
<char>(file_data_source(std::move(f
), offset
, len
, std::move(options
)));
340 input_stream
<char> make_file_input_stream(
341 file f
, uint64_t offset
, file_input_stream_options options
) {
342 return make_file_input_stream(std::move(f
), offset
, std::numeric_limits
<uint64_t>::max(), std::move(options
));
345 input_stream
<char> make_file_input_stream(
346 file f
, file_input_stream_options options
) {
347 return make_file_input_stream(std::move(f
), 0, std::move(options
));
351 class file_data_sink_impl
: public data_sink_impl
{
353 file_output_stream_options _options
;
355 semaphore _write_behind_sem
= { _options
.write_behind
};
356 future
<> _background_writes_done
= make_ready_future
<>();
357 bool _failed
= false;
359 file_data_sink_impl(file f
, file_output_stream_options options
)
360 : _file(std::move(f
)), _options(options
) {
361 _options
.buffer_size
= select_buffer_size
<unsigned>(_options
.buffer_size
, _file
.disk_write_max_length());
362 _write_behind_sem
.ensure_space_for_waiters(1); // So that wait() doesn't throw
364 future
<> put(net::packet data
) override
{ abort(); }
365 virtual temporary_buffer
<char> allocate_buffer(size_t size
) override
{
366 return temporary_buffer
<char>::aligned(_file
.memory_dma_alignment(), size
);
368 using data_sink_impl::put
;
369 virtual future
<> put(temporary_buffer
<char> buf
) override
{
372 if (!_options
.write_behind
) {
373 return do_put(pos
, std::move(buf
));
375 // Write behind strategy:
377 // 1. Issue N writes in parallel, using a semaphore to limit to N
378 // 2. Collect results in _background_writes_done, merging exception futures
379 // 3. If we've already seen a failure, don't issue more writes.
380 return _write_behind_sem
.wait().then([this, pos
, buf
= std::move(buf
)] () mutable {
382 _write_behind_sem
.signal();
383 auto ret
= std::move(_background_writes_done
);
384 _background_writes_done
= make_ready_future
<>();
387 auto this_write_done
= do_put(pos
, std::move(buf
)).finally([this] {
388 _write_behind_sem
.signal();
390 _background_writes_done
= when_all(std::move(_background_writes_done
), std::move(this_write_done
))
391 .then([this] (std::tuple
<future
<>, future
<>> possible_errors
) {
392 // merge the two errors, preferring the first
393 auto& e1
= std::get
<0>(possible_errors
);
394 auto& e2
= std::get
<1>(possible_errors
);
396 e2
.ignore_ready_future();
397 return std::move(e1
);
402 return std::move(e2
);
405 return make_ready_future
<>();
409 future
<> do_put(uint64_t pos
, temporary_buffer
<char> buf
) noexcept
{
411 // put() must usually be of chunks multiple of file::dma_alignment.
412 // Only the last part can have an unaligned length. If put() was
413 // called again with an unaligned pos, we have a bug in the caller.
414 assert(!(pos
& (_file
.disk_write_dma_alignment() - 1)));
415 bool truncate
= false;
416 auto p
= static_cast<const char*>(buf
.get());
417 size_t buf_size
= buf
.size();
419 if ((buf
.size() & (_file
.disk_write_dma_alignment() - 1)) != 0) {
420 // If buf size isn't aligned, copy its content into a new aligned buf.
421 // This should only happen when the user calls output_stream::flush().
422 auto tmp
= allocate_buffer(align_up(buf
.size(), _file
.disk_write_dma_alignment()));
423 ::memcpy(tmp
.get_write(), buf
.get(), buf
.size());
424 ::memset(tmp
.get_write() + buf
.size(), 0, tmp
.size() - buf
.size());
425 buf
= std::move(tmp
);
427 buf_size
= buf
.size();
431 return _file
.dma_write(pos
, p
, buf_size
, _options
.io_priority_class
).then(
432 [this, pos
, buf
= std::move(buf
), truncate
, buf_size
] (size_t size
) mutable {
433 // short write handling
434 if (size
< buf_size
) {
435 buf
.trim_front(size
);
436 return do_put(pos
+ size
, std::move(buf
)).then([this, truncate
] {
438 return _file
.truncate(_pos
);
440 return make_ready_future
<>();
444 return _file
.truncate(_pos
);
446 return make_ready_future
<>();
449 return make_exception_future
<>(std::current_exception());
452 future
<> wait() noexcept
{
453 // restore to pristine state; for flush() + close() sequence
454 // (we allow either flush, or close, or both)
455 return _write_behind_sem
.wait(_options
.write_behind
).then([this] {
456 return std::exchange(_background_writes_done
, make_ready_future
<>());
458 _write_behind_sem
.signal(_options
.write_behind
);
462 virtual future
<> flush() override
{
463 return wait().then([this] {
464 return _file
.flush();
467 virtual future
<> close() noexcept override
{
468 return wait().finally([this] {
469 return _file
.close();
472 virtual size_t buffer_size() const noexcept override
{ return _options
.buffer_size
; }
475 SEASTAR_INCLUDE_API_V2
namespace api_v2
{
477 data_sink
make_file_data_sink(file f
, file_output_stream_options options
) {
478 return data_sink(std::make_unique
<file_data_sink_impl
>(std::move(f
), options
));
481 output_stream
<char> make_file_output_stream(file f
, size_t buffer_size
) {
482 file_output_stream_options options
;
483 options
.buffer_size
= buffer_size
;
484 // Don't generate a deprecation warning for the unsafe functions calling each other.
485 #pragma GCC diagnostic push
486 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
487 return api_v2::make_file_output_stream(std::move(f
), options
);
488 #pragma GCC diagnostic pop
491 output_stream
<char> make_file_output_stream(file f
, file_output_stream_options options
) {
492 // Don't generate a deprecation warning for the unsafe functions calling each other.
493 #pragma GCC diagnostic push
494 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
495 return output_stream
<char>(api_v2::make_file_data_sink(std::move(f
), options
));
496 #pragma GCC diagnostic pop
501 SEASTAR_INCLUDE_API_V3
namespace api_v3
{
502 inline namespace and_newer
{
504 future
<data_sink
> make_file_data_sink(file f
, file_output_stream_options options
) noexcept
{
506 return make_ready_future
<data_sink
>(std::make_unique
<file_data_sink_impl
>(f
, options
));
508 return f
.close().then_wrapped([ex
= std::current_exception(), f
] (future
<> fut
) mutable {
511 std::rethrow_exception(std::move(ex
));
513 std::throw_with_nested(std::runtime_error(fmt::format("While handling failed construction of data_sink, caught exception: {}",
514 fut
.get_exception())));
517 return make_exception_future
<data_sink
>(std::move(ex
));
522 future
<output_stream
<char>> make_file_output_stream(file f
, size_t buffer_size
) noexcept
{
523 file_output_stream_options options
;
524 options
.buffer_size
= buffer_size
;
525 return api_v3::and_newer::make_file_output_stream(std::move(f
), options
);
528 future
<output_stream
<char>> make_file_output_stream(file f
, file_output_stream_options options
) noexcept
{
529 return api_v3::and_newer::make_file_data_sink(std::move(f
), options
).then([] (data_sink
&& ds
) {
530 return output_stream
<char>(std::move(ds
));
538 * template initialization, definition in iostream-impl.hh
540 template struct internal::stream_copy_consumer
<char>;
541 template future
<> copy
<char>(input_stream
<char>&, output_stream
<char>&);