return do_with(uint64_t(n), [this] (uint64_t& n) {
return repeat_until_value([&] {
return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
+ if (buffer.empty()) {
+ return buffer;
+ }
if (buffer.size() >= n) {
buffer.trim_front(n);
return buffer;
template<typename CharType>
inline
-future<> output_stream<CharType>::write(const char_type* buf) {
+future<> output_stream<CharType>::write(const char_type* buf) noexcept {
return write(buf, strlen(buf));
}
template<typename CharType>
template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
inline
-future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) {
+future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept {
return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
}
template<typename CharType>
inline
-future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) {
+future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept {
return write(s.c_str(), s.size());
}
template<typename CharType>
-future<> output_stream<CharType>::write(scattered_message<CharType> msg) {
+future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
return write(std::move(msg).release());
}
template<typename CharType>
future<>
-output_stream<CharType>::zero_copy_put(net::packet p) {
+output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
template <typename CharType>
future<>
-output_stream<CharType>::zero_copy_split_and_put(net::packet p) {
+output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
return repeat([this, p = std::move(p)] () mutable {
if (p.len() < _size) {
if (p.len()) {
}
template<typename CharType>
-future<> output_stream<CharType>::write(net::packet p) {
+future<> output_stream<CharType>::write(net::packet p) noexcept {
static_assert(std::is_same<CharType, char>::value, "packet works on char");
-
+ try {
if (p.len() != 0) {
assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
}
}
return make_ready_future<>();
+ } catch (...) {
+ return current_exception_as_future();
+ }
}
template<typename CharType>
-future<> output_stream<CharType>::write(temporary_buffer<CharType> p) {
+future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
+ try {
if (p.empty()) {
return make_ready_future<>();
}
assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
-
return write(net::packet(std::move(p)));
+ } catch (...) {
+ return current_exception_as_future();
+ }
}
template <typename CharType>
future<temporary_buffer<CharType>>
-input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) {
+input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) noexcept {
if (available()) {
auto now = std::min(n - completed, available());
std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
if (buf.size() == 0) {
_eof = true;
- return make_ready_future<tmp_buf>(std::move(buf));
+ out.trim(completed);
+ return make_ready_future<tmp_buf>(std::move(out));
}
_buf = std::move(buf);
return this->read_exactly_part(n, std::move(out), completed);
template <typename CharType>
future<temporary_buffer<CharType>>
-input_stream<CharType>::read_exactly(size_t n) {
+input_stream<CharType>::read_exactly(size_t n) noexcept {
if (_buf.size() == n) {
// easy case: steal buffer, return to caller
return make_ready_future<tmp_buf>(std::move(_buf));
return this->read_exactly(n);
});
} else {
+ try {
// buffer too small: start copy/read loop
tmp_buf b(n);
return read_exactly_part(n, std::move(b), 0);
+ } catch (...) {
+ return current_exception_as_future<tmp_buf>();
+ }
}
}
template <typename Consumer>
SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
future<>
-input_stream<CharType>::consume(Consumer&& consumer) {
+input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
return repeat([consumer = std::move(consumer), this] () mutable {
if (_buf.empty() && !_eof) {
return _fd.get().then([this] (tmp_buf buf) {
template <typename Consumer>
SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
future<>
-input_stream<CharType>::consume(Consumer& consumer) {
+input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
return consume(std::ref(consumer));
}
template <typename CharType>
future<temporary_buffer<CharType>>
-input_stream<CharType>::read_up_to(size_t n) {
+input_stream<CharType>::read_up_to(size_t n) noexcept {
using tmp_buf = temporary_buffer<CharType>;
if (_buf.empty()) {
if (_eof) {
// easy case: steal buffer, return to caller
return make_ready_future<tmp_buf>(std::move(_buf));
} else {
+ try {
// buffer is larger than n, so share its head with a caller
auto front = _buf.share(0, n);
_buf.trim_front(n);
return make_ready_future<tmp_buf>(std::move(front));
+ } catch (...) {
+ return current_exception_as_future<tmp_buf>();
+ }
}
}
template <typename CharType>
future<temporary_buffer<CharType>>
-input_stream<CharType>::read() {
+input_stream<CharType>::read() noexcept {
using tmp_buf = temporary_buffer<CharType>;
if (_eof) {
return make_ready_future<tmp_buf>();
template <typename CharType>
future<>
-input_stream<CharType>::skip(uint64_t n) {
+input_stream<CharType>::skip(uint64_t n) noexcept {
auto skip_buf = std::min(n, _buf.size());
_buf.trim_front(skip_buf);
n -= skip_buf;
// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
template <typename CharType>
future<>
-output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
+output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) noexcept {
assert(_end == 0);
return repeat([this, buf = std::move(buf)] () mutable {
template <typename CharType>
future<>
-output_stream<CharType>::write(const char_type* buf, size_t n) {
+output_stream<CharType>::write(const char_type* buf, size_t n) noexcept {
if (__builtin_expect(!_buf || n > _size - _end, false)) {
return slow_write(buf, n);
}
template <typename CharType>
future<>
-output_stream<CharType>::slow_write(const char_type* buf, size_t n) {
+output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
+ try {
assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
auto bulk_threshold = _end ? (2 * _size - _end) : _size;
if (n >= bulk_threshold) {
std::swap(next, _buf);
return put(std::move(next));
}
+ } catch (...) {
+ return current_exception_as_future();
+ }
}
template <typename CharType>
future<>
-output_stream<CharType>::flush() {
+output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
if (_end) {
_buf.trim(_end);
template <typename CharType>
future<>
-output_stream<CharType>::put(temporary_buffer<CharType> buf) {
+output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
template <typename CharType>
void
-output_stream<CharType>::poll_flush() {
+output_stream<CharType>::poll_flush() noexcept {
if (!_flush) {
// flush was canceled, do nothing
_flushing = false;
template <typename CharType>
future<>
-output_stream<CharType>::close() {
+output_stream<CharType>::close() noexcept {
return flush().finally([this] {
if (_in_batch) {
return _in_batch.value().get_future();