]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/core/iostream-impl.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / iostream-impl.hh
index 2b3354742d3114c96dfd25755a2561714ca04b55..97db9c38158ca0d0e5b551cff9598a0dc0844a12 100644 (file)
@@ -35,6 +35,9 @@ inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
     return do_with(uint64_t(n), [this] (uint64_t& n) {
         return repeat_until_value([&] {
             return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
+                if (buffer.empty()) {
+                    return buffer;
+                }
                 if (buffer.size() >= n) {
                     buffer.trim_front(n);
                     return buffer;
@@ -48,31 +51,31 @@ inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
 
 template<typename CharType>
 inline
-future<> output_stream<CharType>::write(const char_type* buf) {
+future<> output_stream<CharType>::write(const char_type* buf) noexcept {
     return write(buf, strlen(buf));
 }
 
 template<typename CharType>
 template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
 inline
-future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) {
+future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept {
     return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
 }
 
 template<typename CharType>
 inline
-future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) {
+future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept {
     return write(s.c_str(), s.size());
 }
 
 template<typename CharType>
-future<> output_stream<CharType>::write(scattered_message<CharType> msg) {
+future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
     return write(std::move(msg).release());
 }
 
 template<typename CharType>
 future<>
-output_stream<CharType>::zero_copy_put(net::packet p) {
+output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
     // if flush is scheduled, disable it, so it will not try to write in parallel
     _flush = false;
     if (_flushing) {
@@ -88,7 +91,7 @@ output_stream<CharType>::zero_copy_put(net::packet p) {
 // Writes @p in chunks of _size length. The last chunk is buffered if smaller.
 template <typename CharType>
 future<>
-output_stream<CharType>::zero_copy_split_and_put(net::packet p) {
+output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
     return repeat([this, p = std::move(p)] () mutable {
         if (p.len() < _size) {
             if (p.len()) {
@@ -107,9 +110,9 @@ output_stream<CharType>::zero_copy_split_and_put(net::packet p) {
 }
 
 template<typename CharType>
-future<> output_stream<CharType>::write(net::packet p) {
+future<> output_stream<CharType>::write(net::packet p) noexcept {
     static_assert(std::is_same<CharType, char>::value, "packet works on char");
-
+  try {
     if (p.len() != 0) {
         assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
 
@@ -128,21 +131,27 @@ future<> output_stream<CharType>::write(net::packet p) {
         }
     }
     return make_ready_future<>();
+  } catch (...) {
+    return current_exception_as_future();
+  }
 }
 
 template<typename CharType>
-future<> output_stream<CharType>::write(temporary_buffer<CharType> p) {
+future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
+  try {
     if (p.empty()) {
         return make_ready_future<>();
     }
     assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
-
     return write(net::packet(std::move(p)));
+  } catch (...) {
+    return current_exception_as_future();
+  }
 }
 
 template <typename CharType>
 future<temporary_buffer<CharType>>
-input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) {
+input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) noexcept {
     if (available()) {
         auto now = std::min(n - completed, available());
         std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
@@ -157,7 +166,8 @@ input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t complete
     return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
         if (buf.size() == 0) {
             _eof = true;
-            return make_ready_future<tmp_buf>(std::move(buf));
+            out.trim(completed);
+            return make_ready_future<tmp_buf>(std::move(out));
         }
         _buf = std::move(buf);
         return this->read_exactly_part(n, std::move(out), completed);
@@ -166,7 +176,7 @@ input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t complete
 
 template <typename CharType>
 future<temporary_buffer<CharType>>
-input_stream<CharType>::read_exactly(size_t n) {
+input_stream<CharType>::read_exactly(size_t n) noexcept {
     if (_buf.size() == n) {
         // easy case: steal buffer, return to caller
         return make_ready_future<tmp_buf>(std::move(_buf));
@@ -186,9 +196,13 @@ input_stream<CharType>::read_exactly(size_t n) {
             return this->read_exactly(n);
         });
     } else {
+      try {
         // buffer too small: start copy/read loop
         tmp_buf b(n);
         return read_exactly_part(n, std::move(b), 0);
+      } catch (...) {
+        return current_exception_as_future<tmp_buf>();
+      }
     }
 }
 
@@ -196,7 +210,7 @@ template <typename CharType>
 template <typename Consumer>
 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
 future<>
-input_stream<CharType>::consume(Consumer&& consumer) {
+input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
     return repeat([consumer = std::move(consumer), this] () mutable {
         if (_buf.empty() && !_eof) {
             return _fd.get().then([this] (tmp_buf buf) {
@@ -232,13 +246,13 @@ template <typename CharType>
 template <typename Consumer>
 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
 future<>
-input_stream<CharType>::consume(Consumer& consumer) {
+input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
     return consume(std::ref(consumer));
 }
 
 template <typename CharType>
 future<temporary_buffer<CharType>>
-input_stream<CharType>::read_up_to(size_t n) {
+input_stream<CharType>::read_up_to(size_t n) noexcept {
     using tmp_buf = temporary_buffer<CharType>;
     if (_buf.empty()) {
         if (_eof) {
@@ -254,16 +268,20 @@ input_stream<CharType>::read_up_to(size_t n) {
         // easy case: steal buffer, return to caller
         return make_ready_future<tmp_buf>(std::move(_buf));
     } else {
+      try {
         // buffer is larger than n, so share its head with a caller
         auto front = _buf.share(0, n);
         _buf.trim_front(n);
         return make_ready_future<tmp_buf>(std::move(front));
+      } catch (...) {
+        return current_exception_as_future<tmp_buf>();
+      }
     }
 }
 
 template <typename CharType>
 future<temporary_buffer<CharType>>
-input_stream<CharType>::read() {
+input_stream<CharType>::read() noexcept {
     using tmp_buf = temporary_buffer<CharType>;
     if (_eof) {
         return make_ready_future<tmp_buf>();
@@ -280,7 +298,7 @@ input_stream<CharType>::read() {
 
 template <typename CharType>
 future<>
-input_stream<CharType>::skip(uint64_t n) {
+input_stream<CharType>::skip(uint64_t n) noexcept {
     auto skip_buf = std::min(n, _buf.size());
     _buf.trim_front(skip_buf);
     n -= skip_buf;
@@ -305,7 +323,7 @@ input_stream<CharType>::detach() && {
 // Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
 template <typename CharType>
 future<>
-output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
+output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) noexcept {
     assert(_end == 0);
 
     return repeat([this, buf = std::move(buf)] () mutable {
@@ -327,7 +345,7 @@ output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
 
 template <typename CharType>
 future<>
-output_stream<CharType>::write(const char_type* buf, size_t n) {
+output_stream<CharType>::write(const char_type* buf, size_t n) noexcept {
     if (__builtin_expect(!_buf || n > _size - _end, false)) {
         return slow_write(buf, n);
     }
@@ -338,7 +356,8 @@ output_stream<CharType>::write(const char_type* buf, size_t n) {
 
 template <typename CharType>
 future<>
-output_stream<CharType>::slow_write(const char_type* buf, size_t n) {
+output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
+  try {
     assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
     auto bulk_threshold = _end ? (2 * _size - _end) : _size;
     if (n >= bulk_threshold) {
@@ -384,11 +403,14 @@ output_stream<CharType>::slow_write(const char_type* buf, size_t n) {
         std::swap(next, _buf);
         return put(std::move(next));
     }
+  } catch (...) {
+    return current_exception_as_future();
+  }
 }
 
 template <typename CharType>
 future<>
-output_stream<CharType>::flush() {
+output_stream<CharType>::flush() noexcept {
     if (!_batch_flushes) {
         if (_end) {
             _buf.trim(_end);
@@ -420,7 +442,7 @@ void add_to_flush_poller(output_stream<char>* x);
 
 template <typename CharType>
 future<>
-output_stream<CharType>::put(temporary_buffer<CharType> buf) {
+output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
     // if flush is scheduled, disable it, so it will not try to write in parallel
     _flush = false;
     if (_flushing) {
@@ -435,7 +457,7 @@ output_stream<CharType>::put(temporary_buffer<CharType> buf) {
 
 template <typename CharType>
 void
-output_stream<CharType>::poll_flush() {
+output_stream<CharType>::poll_flush() noexcept {
     if (!_flush) {
         // flush was canceled, do nothing
         _flushing = false;
@@ -473,7 +495,7 @@ output_stream<CharType>::poll_flush() {
 
 template <typename CharType>
 future<>
-output_stream<CharType>::close() {
+output_stream<CharType>::close() noexcept {
     return flush().finally([this] {
         if (_in_batch) {
             return _in_batch.value().get_future();