#include <seastar/core/semaphore.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/when_all.hh>
+#include <seastar/core/io_intent.hh>
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <malloc.h>
static_assert(std::is_nothrow_constructible_v<output_stream<char>>);
static_assert(std::is_nothrow_move_constructible_v<output_stream<char>>);
+// The buffers size must not be greater than the limit, but when capping
+// it we make it 2^n to better utilize the memory allocated for buffers
+template <typename T>
+static inline T select_buffer_size(T configured_value, T maximum_value) noexcept {
+ if (configured_value <= maximum_value) {
+ return configured_value;
+ } else {
+ return T(1) << log2floor(maximum_value);
+ }
+}
+
class file_data_source_impl : public data_source_impl {
struct issued_read {
uint64_t _pos;
std::optional<promise<>> _done;
size_t _current_buffer_size;
bool _in_slow_start = false;
+ io_intent _intent;
using unused_ratio_target = std::ratio<25, 100>;
private:
size_t minimal_buffer_size() const {
public:
file_data_source_impl(file f, uint64_t offset, uint64_t len, file_input_stream_options options)
: _file(std::move(f)), _options(options), _pos(offset), _remain(len), _current_read_ahead(get_initial_read_ahead())
- , _current_buffer_size(_options.buffer_size) {
+ {
+ _options.buffer_size = select_buffer_size(_options.buffer_size, _file.disk_read_max_length());
+ _current_buffer_size = _options.buffer_size;
// prevent wraparounds
set_new_buffer_size(after_skip::no);
_remain = std::min(std::numeric_limits<uint64_t>::max() - _pos, _remain);
if (!_reads_in_progress) {
_done->set_value();
}
+ _intent.cancel();
return _done->get_future().then([this] {
uint64_t dropped = 0;
for (auto&& c : _read_buffers) {
auto len = end - start;
auto actual_size = std::min(end - _pos, _remain);
_read_buffers.emplace_back(_pos, actual_size, futurize_invoke([&] {
- return _file.dma_read_bulk<char>(start, len, _options.io_priority_class);
+ return _file.dma_read_bulk<char>(start, len, _options.io_priority_class, &_intent);
}).then_wrapped(
[this, start, pos = _pos, remain = _remain] (future<temporary_buffer<char>> ret) {
--_reads_in_progress;
public:
file_data_sink_impl(file f, file_output_stream_options options)
: _file(std::move(f)), _options(options) {
+ _options.buffer_size = select_buffer_size<unsigned>(_options.buffer_size, _file.disk_write_max_length());
_write_behind_sem.ensure_space_for_waiters(1); // So that wait() doesn't throw
}
future<> put(net::packet data) override { abort(); }
return make_ready_future<>();
});
}
-public:
+private:
future<> do_put(uint64_t pos, temporary_buffer<char> buf) noexcept {
try {
// put() must usually be of chunks multiple of file::dma_alignment.
return _file.close();
});
}
+ virtual size_t buffer_size() const noexcept override { return _options.buffer_size; }
};
SEASTAR_INCLUDE_API_V2 namespace api_v2 {
// Don't generate a deprecation warning for the unsafe functions calling each other.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- return output_stream<char>(api_v2::make_file_data_sink(std::move(f), options), options.buffer_size, true);
+ return output_stream<char>(api_v2::make_file_data_sink(std::move(f), options));
#pragma GCC diagnostic pop
}
}
future<output_stream<char>> make_file_output_stream(file f, file_output_stream_options options) noexcept {
- return api_v3::and_newer::make_file_data_sink(std::move(f), options).then([buffer_size = options.buffer_size] (data_sink&& ds) {
- return output_stream<char>(std::move(ds), buffer_size, true);
+ return api_v3::and_newer::make_file_data_sink(std::move(f), options).then([] (data_sink&& ds) {
+ return output_stream<char>(std::move(ds));
});
}