]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/core/iostream-impl.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / iostream-impl.hh
index 97db9c38158ca0d0e5b551cff9598a0dc0844a12..1c2a9ef7aee44d8b6c22cfa69df562bb6285dba0 100644 (file)
@@ -408,21 +408,30 @@ output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
   }
 }
 
+void add_to_flush_poller(output_stream<char>& x) noexcept;
+
+template <typename CharType>
+future<> output_stream<CharType>::do_flush() noexcept {
+    if (_end) {
+        _buf.trim(_end);
+        _end = 0;
+        return _fd.put(std::move(_buf)).then([this] {
+            return _fd.flush();
+        });
+    } else if (_zc_bufs) {
+        return _fd.put(std::move(_zc_bufs)).then([this] {
+            return _fd.flush();
+        });
+    } else {
+        return make_ready_future<>();
+    }
+}
+
 template <typename CharType>
 future<>
 output_stream<CharType>::flush() noexcept {
     if (!_batch_flushes) {
-        if (_end) {
-            _buf.trim(_end);
-            _end = 0;
-            return put(std::move(_buf)).then([this] {
-                return _fd.flush();
-            });
-        } else if (_zc_bufs) {
-            return zero_copy_put(std::move(_zc_bufs)).then([this] {
-                return _fd.flush();
-            });
-        }
+        return do_flush();
     } else {
         if (_ex) {
             // flush is a good time to deliver outstanding errors
@@ -430,7 +439,7 @@ output_stream<CharType>::flush() noexcept {
         } else {
             _flush = true;
             if (!_in_batch) {
-                add_to_flush_poller(this);
+                add_to_flush_poller(*this);
                 _in_batch = promise<>();
             }
         }
@@ -438,8 +447,6 @@ output_stream<CharType>::flush() noexcept {
     return make_ready_future<>();
 }
 
-void add_to_flush_poller(output_stream<char>* x);
-
 template <typename CharType>
 future<>
 output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
@@ -466,23 +473,11 @@ output_stream<CharType>::poll_flush() noexcept {
         return;
     }
 
-    auto f = make_ready_future();
     _flush = false;
     _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
 
-    if (_end) {
-        // send whatever is in the buffer right now
-        _buf.trim(_end);
-        _end = 0;
-        f = _fd.put(std::move(_buf));
-    } else if(_zc_bufs) {
-        f = _fd.put(std::move(_zc_bufs));
-    }
-
     // FIXME: future is discarded
-    (void)f.then([this] {
-        return _fd.flush();
-    }).then_wrapped([this] (future<> f) {
+    (void)do_flush().then_wrapped([this] (future<> f) {
         try {
             f.get();
         } catch (...) {