2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
26 #include <seastar/core/do_with.hh>
27 #include <seastar/core/loop.hh>
28 #include <seastar/net/packet.hh>
29 #include <seastar/util/variant_utils.hh>
33 inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
35 return do_with(uint64_t(n), [this] (uint64_t& n) {
36 return repeat_until_value([&] {
37 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
41 if (buffer.size() >= n) {
52 template<typename CharType>
54 future<> output_stream<CharType>::write(const char_type* buf) noexcept {
55 return write(buf, strlen(buf));
58 template<typename CharType>
59 template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
61 future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept {
62 return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
65 template<typename CharType>
67 future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept {
68 return write(s.c_str(), s.size());
71 template<typename CharType>
72 future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
73 return write(std::move(msg).release());
76 template<typename CharType>
78 output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
79 // if flush is scheduled, disable it, so it will not try to write in parallel
82 // flush in progress, wait for it to end before continuing
83 return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
84 return _fd.put(std::move(p));
87 return _fd.put(std::move(p));
91 // Writes @p in chunks of _size length. The last chunk is buffered if smaller.
92 template <typename CharType>
94 output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
95 return repeat([this, p = std::move(p)] () mutable {
96 if (p.len() < _size) {
98 _zc_bufs = std::move(p);
100 _zc_bufs = net::packet::make_null_packet();
102 return make_ready_future<stop_iteration>(stop_iteration::yes);
104 auto chunk = p.share(0, _size);
106 return zero_copy_put(std::move(chunk)).then([] {
107 return stop_iteration::no;
112 template<typename CharType>
113 future<> output_stream<CharType>::write(net::packet p) noexcept {
114 static_assert(std::is_same<CharType, char>::value, "packet works on char");
117 assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
120 _zc_bufs.append(std::move(p));
122 _zc_bufs = std::move(p);
125 if (_zc_bufs.len() >= _size) {
127 return zero_copy_split_and_put(std::move(_zc_bufs));
129 return zero_copy_put(std::move(_zc_bufs));
133 return make_ready_future<>();
135 return current_exception_as_future();
139 template<typename CharType>
140 future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
143 return make_ready_future<>();
145 assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
146 return write(net::packet(std::move(p)));
148 return current_exception_as_future();
152 template <typename CharType>
153 future<temporary_buffer<CharType>>
154 input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) noexcept {
156 auto now = std::min(n - completed, available());
157 std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
158 _buf.trim_front(now);
161 if (completed == n) {
162 return make_ready_future<tmp_buf>(std::move(out));
166 return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
167 if (buf.size() == 0) {
170 return make_ready_future<tmp_buf>(std::move(out));
172 _buf = std::move(buf);
173 return this->read_exactly_part(n, std::move(out), completed);
177 template <typename CharType>
178 future<temporary_buffer<CharType>>
179 input_stream<CharType>::read_exactly(size_t n) noexcept {
180 if (_buf.size() == n) {
181 // easy case: steal buffer, return to caller
182 return make_ready_future<tmp_buf>(std::move(_buf));
183 } else if (_buf.size() > n) {
184 // buffer large enough, share it with caller
185 auto front = _buf.share(0, n);
187 return make_ready_future<tmp_buf>(std::move(front));
188 } else if (_buf.size() == 0) {
189 // buffer is empty: grab one and retry
190 return _fd.get().then([this, n] (auto buf) mutable {
191 if (buf.size() == 0) {
193 return make_ready_future<tmp_buf>(std::move(buf));
195 _buf = std::move(buf);
196 return this->read_exactly(n);
200 // buffer too small: start copy/read loop
202 return read_exactly_part(n, std::move(b), 0);
204 return current_exception_as_future<tmp_buf>();
209 template <typename CharType>
210 template <typename Consumer>
211 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
213 input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
214 return repeat([consumer = std::move(consumer), this] () mutable {
215 if (_buf.empty() && !_eof) {
216 return _fd.get().then([this] (tmp_buf buf) {
217 _buf = std::move(buf);
219 return make_ready_future<stop_iteration>(stop_iteration::no);
222 return consumer(std::move(_buf)).then([this] (consumption_result_type result) {
223 return seastar::visit(result.get(), [this] (const continue_consuming&) {
224 // If we're here, consumer consumed entire buffer and is ready for
225 // more now. So we do not return, and rather continue the loop.
227 // If we're at eof, we should stop.
228 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
229 }, [this] (stop_consuming<CharType>& stop) {
231 this->_buf = std::move(stop.get_buffer());
232 return make_ready_future<stop_iteration>(stop_iteration::yes);
233 }, [this] (const skip_bytes& skip) {
234 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
236 this->_buf = std::move(buf);
238 return make_ready_future<stop_iteration>(stop_iteration::no);
245 template <typename CharType>
246 template <typename Consumer>
247 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
249 input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
250 return consume(std::ref(consumer));
253 template <typename CharType>
254 future<temporary_buffer<CharType>>
255 input_stream<CharType>::read_up_to(size_t n) noexcept {
256 using tmp_buf = temporary_buffer<CharType>;
259 return make_ready_future<tmp_buf>();
261 return _fd.get().then([this, n] (tmp_buf buf) {
263 _buf = std::move(buf);
264 return read_up_to(n);
267 } else if (_buf.size() <= n) {
268 // easy case: steal buffer, return to caller
269 return make_ready_future<tmp_buf>(std::move(_buf));
272 // buffer is larger than n, so share its head with a caller
273 auto front = _buf.share(0, n);
275 return make_ready_future<tmp_buf>(std::move(front));
277 return current_exception_as_future<tmp_buf>();
282 template <typename CharType>
283 future<temporary_buffer<CharType>>
284 input_stream<CharType>::read() noexcept {
285 using tmp_buf = temporary_buffer<CharType>;
287 return make_ready_future<tmp_buf>();
290 return _fd.get().then([this] (tmp_buf buf) {
292 return make_ready_future<tmp_buf>(std::move(buf));
295 return make_ready_future<tmp_buf>(std::move(_buf));
299 template <typename CharType>
301 input_stream<CharType>::skip(uint64_t n) noexcept {
302 auto skip_buf = std::min(n, _buf.size());
303 _buf.trim_front(skip_buf);
306 return make_ready_future<>();
308 return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) {
309 _buf = std::move(buffer);
313 template <typename CharType>
315 input_stream<CharType>::detach() && {
317 throw std::logic_error("detach() called on a used input_stream");
320 return std::move(_fd);
323 // Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
324 template <typename CharType>
326 output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) noexcept {
329 return repeat([this, buf = std::move(buf)] () mutable {
330 if (buf.size() < _size) {
332 _buf = _fd.allocate_buffer(_size);
334 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
336 return make_ready_future<stop_iteration>(stop_iteration::yes);
338 auto chunk = buf.share(0, _size);
339 buf.trim_front(_size);
340 return put(std::move(chunk)).then([] {
341 return stop_iteration::no;
346 template <typename CharType>
348 output_stream<CharType>::write(const char_type* buf, size_t n) noexcept {
349 if (__builtin_expect(!_buf || n > _size - _end, false)) {
350 return slow_write(buf, n);
352 std::copy_n(buf, n, _buf.get_write() + _end);
354 return make_ready_future<>();
357 template <typename CharType>
359 output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
361 assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
362 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
363 if (n >= bulk_threshold) {
365 auto now = _size - _end;
366 std::copy(buf, buf + now, _buf.get_write() + _end);
368 temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
369 std::copy(buf + now, buf + n, tmp.get_write());
372 return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
374 return split_and_put(std::move(tmp));
376 return put(std::move(tmp));
380 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
381 std::copy(buf, buf + n, tmp.get_write());
383 return split_and_put(std::move(tmp));
385 return put(std::move(tmp));
391 _buf = _fd.allocate_buffer(_size);
394 auto now = std::min(n, _size - _end);
395 std::copy(buf, buf + now, _buf.get_write() + _end);
398 return make_ready_future<>();
400 temporary_buffer<char> next = _fd.allocate_buffer(_size);
401 std::copy(buf + now, buf + n, next.get_write());
403 std::swap(next, _buf);
404 return put(std::move(next));
407 return current_exception_as_future();
411 template <typename CharType>
413 output_stream<CharType>::flush() noexcept {
414 if (!_batch_flushes) {
418 return put(std::move(_buf)).then([this] {
421 } else if (_zc_bufs) {
422 return zero_copy_put(std::move(_zc_bufs)).then([this] {
428 // flush is a good time to deliver outstanding errors
429 return make_exception_future<>(std::move(_ex));
433 add_to_flush_poller(this);
434 _in_batch = promise<>();
438 return make_ready_future<>();
441 void add_to_flush_poller(output_stream<char>* x);
443 template <typename CharType>
445 output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
446 // if flush is scheduled, disable it, so it will not try to write in parallel
449 // flush in progress, wait for it to end before continuing
450 return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
451 return _fd.put(std::move(buf));
454 return _fd.put(std::move(buf));
458 template <typename CharType>
460 output_stream<CharType>::poll_flush() noexcept {
462 // flush was canceled, do nothing
464 _in_batch.value().set_value();
465 _in_batch = std::nullopt;
469 auto f = make_ready_future();
471 _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
474 // send whatever is in the buffer right now
477 f = _fd.put(std::move(_buf));
478 } else if(_zc_bufs) {
479 f = _fd.put(std::move(_zc_bufs));
482 // FIXME: future is discarded
483 (void)f.then([this] {
485 }).then_wrapped([this] (future<> f) {
489 _ex = std::current_exception();
491 // if flush() was called while flushing flush once more
496 template <typename CharType>
498 output_stream<CharType>::close() noexcept {
499 return flush().finally([this] {
501 return _in_batch.value().get_future();
503 return make_ready_future();
506 // report final exception as close error
508 std::rethrow_exception(_ex);
515 template <typename CharType>
517 output_stream<CharType>::detach() && {
519 throw std::logic_error("detach() called on a used output_stream");
522 return std::move(_fd);
528 template <typename CharType>
529 struct stream_copy_consumer {
531 output_stream<CharType>& _os;
532 using unconsumed_remainder = std::optional<temporary_buffer<CharType>>;
534 stream_copy_consumer(output_stream<CharType>& os) : _os(os) {
536 future<unconsumed_remainder> operator()(temporary_buffer<CharType> data) {
538 return make_ready_future<unconsumed_remainder>(std::move(data));
540 return _os.write(data.get(), data.size()).then([] () {
541 return make_ready_future<unconsumed_remainder>();
549 extern template struct internal::stream_copy_consumer<char>;
551 template <typename CharType>
552 future<> copy(input_stream<CharType>& in, output_stream<CharType>& out) {
553 return in.consume(internal::stream_copy_consumer<CharType>(out));
556 extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);