]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/src/core/fstream.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / core / fstream.cc
index 902fa9ac54f82061b6c437b30e3d747f654a7ee4..3fb4988579ddfac9acbcc1a727e342322ad7c05c 100644 (file)
@@ -25,6 +25,7 @@
 #include <seastar/core/semaphore.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/when_all.hh>
+#include <seastar/core/io_intent.hh>
 #include <fmt/format.h>
 #include <fmt/ostream.h>
 #include <malloc.h>
@@ -47,6 +48,17 @@ static_assert(std::is_nothrow_move_constructible_v<input_stream<char>>);
 static_assert(std::is_nothrow_constructible_v<output_stream<char>>);
 static_assert(std::is_nothrow_move_constructible_v<output_stream<char>>);
 
+// The buffers size must not be greater than the limit, but when capping
+// it we make it 2^n to better utilize the memory allocated for buffers
+template <typename T>
+static inline T select_buffer_size(T configured_value, T maximum_value) noexcept {
+    if (configured_value <= maximum_value) {
+        return configured_value;
+    } else {
+        return T(1) << log2floor(maximum_value);
+    }
+}
+
 class file_data_source_impl : public data_source_impl {
     struct issued_read {
         uint64_t _pos;
@@ -69,6 +81,7 @@ class file_data_source_impl : public data_source_impl {
     std::optional<promise<>> _done;
     size_t _current_buffer_size;
     bool _in_slow_start = false;
+    io_intent _intent;
     using unused_ratio_target = std::ratio<25, 100>;
 private:
     size_t minimal_buffer_size() const {
@@ -179,7 +192,9 @@ private:
 public:
     file_data_source_impl(file f, uint64_t offset, uint64_t len, file_input_stream_options options)
             : _file(std::move(f)), _options(options), _pos(offset), _remain(len), _current_read_ahead(get_initial_read_ahead())
-            , _current_buffer_size(_options.buffer_size) {
+    {
+        _options.buffer_size = select_buffer_size(_options.buffer_size, _file.disk_read_max_length());
+        _current_buffer_size = _options.buffer_size;
         // prevent wraparounds
         set_new_buffer_size(after_skip::no);
         _remain = std::min(std::numeric_limits<uint64_t>::max() - _pos, _remain);
@@ -240,6 +255,7 @@ public:
         if (!_reads_in_progress) {
             _done->set_value();
         }
+        _intent.cancel();
         return _done->get_future().then([this] {
             uint64_t dropped = 0;
             for (auto&& c : _read_buffers) {
@@ -276,7 +292,7 @@ private:
             auto len = end - start;
             auto actual_size = std::min(end - _pos, _remain);
             _read_buffers.emplace_back(_pos, actual_size, futurize_invoke([&] {
-                    return _file.dma_read_bulk<char>(start, len, _options.io_priority_class);
+                    return _file.dma_read_bulk<char>(start, len, _options.io_priority_class, &_intent);
             }).then_wrapped(
                     [this, start, pos = _pos, remain = _remain] (future<temporary_buffer<char>> ret) {
                 --_reads_in_progress;
@@ -342,6 +358,7 @@ class file_data_sink_impl : public data_sink_impl {
 public:
     file_data_sink_impl(file f, file_output_stream_options options)
             : _file(std::move(f)), _options(options) {
+        _options.buffer_size = select_buffer_size<unsigned>(_options.buffer_size, _file.disk_write_max_length());
         _write_behind_sem.ensure_space_for_waiters(1); // So that wait() doesn't throw
     }
     future<> put(net::packet data) override { abort(); }
@@ -388,7 +405,7 @@ public:
             return make_ready_future<>();
         });
     }
-public:
+private:
     future<> do_put(uint64_t pos, temporary_buffer<char> buf) noexcept {
       try {
         // put() must usually be of chunks multiple of file::dma_alignment.
@@ -452,6 +469,7 @@ public:
             return _file.close();
         });
     }
+    virtual size_t buffer_size() const noexcept override { return _options.buffer_size; }
 };
 
 SEASTAR_INCLUDE_API_V2 namespace api_v2 {
@@ -474,7 +492,7 @@ output_stream<char> make_file_output_stream(file f, file_output_stream_options o
 // Don't generate a deprecation warning for the unsafe functions calling each other.
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
-    return output_stream<char>(api_v2::make_file_data_sink(std::move(f), options), options.buffer_size, true);
+    return output_stream<char>(api_v2::make_file_data_sink(std::move(f), options));
 #pragma GCC diagnostic pop
 }
 
@@ -508,8 +526,8 @@ future<output_stream<char>> make_file_output_stream(file f, size_t buffer_size)
 }
 
 future<output_stream<char>> make_file_output_stream(file f, file_output_stream_options options) noexcept {
-    return api_v3::and_newer::make_file_data_sink(std::move(f), options).then([buffer_size = options.buffer_size] (data_sink&& ds) {
-        return output_stream<char>(std::move(ds), buffer_size, true);
+    return api_v3::and_newer::make_file_data_sink(std::move(f), options).then([] (data_sink&& ds) {
+        return output_stream<char>(std::move(ds));
     });
 }