}
}
+void add_to_flush_poller(output_stream<char>& x) noexcept;
+
+template <typename CharType>
+future<> output_stream<CharType>::do_flush() noexcept {
+ if (_end) {
+ _buf.trim(_end);
+ _end = 0;
+ return _fd.put(std::move(_buf)).then([this] {
+ return _fd.flush();
+ });
+ } else if (_zc_bufs) {
+ return _fd.put(std::move(_zc_bufs)).then([this] {
+ return _fd.flush();
+ });
+ } else {
+ return make_ready_future<>();
+ }
+}
+
template <typename CharType>
future<>
output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
- if (_end) {
- _buf.trim(_end);
- _end = 0;
- return put(std::move(_buf)).then([this] {
- return _fd.flush();
- });
- } else if (_zc_bufs) {
- return zero_copy_put(std::move(_zc_bufs)).then([this] {
- return _fd.flush();
- });
- }
+ return do_flush();
} else {
if (_ex) {
// flush is a good time to deliver outstanding errors
} else {
_flush = true;
if (!_in_batch) {
- add_to_flush_poller(this);
+ add_to_flush_poller(*this);
_in_batch = promise<>();
}
}
return make_ready_future<>();
}
-void add_to_flush_poller(output_stream<char>* x);
-
template <typename CharType>
future<>
output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
return;
}
- auto f = make_ready_future();
_flush = false;
_flushing = true; // make whoever wants to write into the fd to wait for flush to complete
- if (_end) {
- // send whatever is in the buffer right now
- _buf.trim(_end);
- _end = 0;
- f = _fd.put(std::move(_buf));
- } else if(_zc_bufs) {
- f = _fd.put(std::move(_zc_bufs));
- }
-
// FIXME: future is discarded
- (void)f.then([this] {
- return _fd.flush();
- }).then_wrapped([this] (future<> f) {
+ (void)do_flush().then_wrapped([this] (future<> f) {
try {
f.get();
} catch (...) {