]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/iostream-impl.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / iostream-impl.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23
24#pragma once
25
f67539c2
TL
26#include <seastar/core/do_with.hh>
27#include <seastar/core/loop.hh>
11fdf7f2 28#include <seastar/net/packet.hh>
11fdf7f2
TL
29#include <seastar/util/variant_utils.hh>
30
31namespace seastar {
32
33inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
34{
35 return do_with(uint64_t(n), [this] (uint64_t& n) {
36 return repeat_until_value([&] {
f67539c2 37 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
20effc67
TL
38 if (buffer.empty()) {
39 return buffer;
40 }
11fdf7f2
TL
41 if (buffer.size() >= n) {
42 buffer.trim_front(n);
f67539c2 43 return buffer;
11fdf7f2
TL
44 }
45 n -= buffer.size();
46 return { };
47 });
48 });
49 });
50}
51
52template<typename CharType>
53inline
20effc67 54future<> output_stream<CharType>::write(const char_type* buf) noexcept {
11fdf7f2
TL
55 return write(buf, strlen(buf));
56}
57
58template<typename CharType>
59template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
60inline
20effc67 61future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept {
11fdf7f2
TL
62 return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
63}
64
65template<typename CharType>
66inline
20effc67 67future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept {
11fdf7f2
TL
68 return write(s.c_str(), s.size());
69}
70
71template<typename CharType>
20effc67 72future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
11fdf7f2
TL
73 return write(std::move(msg).release());
74}
75
76template<typename CharType>
77future<>
20effc67 78output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
11fdf7f2
TL
79 // if flush is scheduled, disable it, so it will not try to write in parallel
80 _flush = false;
81 if (_flushing) {
82 // flush in progress, wait for it to end before continuing
83 return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
84 return _fd.put(std::move(p));
85 });
86 } else {
87 return _fd.put(std::move(p));
88 }
89}
90
91// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
92template <typename CharType>
93future<>
20effc67 94output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
11fdf7f2
TL
95 return repeat([this, p = std::move(p)] () mutable {
96 if (p.len() < _size) {
97 if (p.len()) {
98 _zc_bufs = std::move(p);
99 } else {
100 _zc_bufs = net::packet::make_null_packet();
101 }
102 return make_ready_future<stop_iteration>(stop_iteration::yes);
103 }
104 auto chunk = p.share(0, _size);
105 p.trim_front(_size);
106 return zero_copy_put(std::move(chunk)).then([] {
107 return stop_iteration::no;
108 });
109 });
110}
111
112template<typename CharType>
20effc67 113future<> output_stream<CharType>::write(net::packet p) noexcept {
11fdf7f2 114 static_assert(std::is_same<CharType, char>::value, "packet works on char");
20effc67 115 try {
11fdf7f2
TL
116 if (p.len() != 0) {
117 assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
118
119 if (_zc_bufs) {
120 _zc_bufs.append(std::move(p));
121 } else {
122 _zc_bufs = std::move(p);
123 }
124
125 if (_zc_bufs.len() >= _size) {
126 if (_trim_to_size) {
127 return zero_copy_split_and_put(std::move(_zc_bufs));
128 } else {
129 return zero_copy_put(std::move(_zc_bufs));
130 }
131 }
132 }
133 return make_ready_future<>();
20effc67
TL
134 } catch (...) {
135 return current_exception_as_future();
136 }
11fdf7f2
TL
137}
138
139template<typename CharType>
20effc67
TL
140future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
141 try {
11fdf7f2
TL
142 if (p.empty()) {
143 return make_ready_future<>();
144 }
145 assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
11fdf7f2 146 return write(net::packet(std::move(p)));
20effc67
TL
147 } catch (...) {
148 return current_exception_as_future();
149 }
11fdf7f2
TL
150}
151
152template <typename CharType>
153future<temporary_buffer<CharType>>
20effc67 154input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) noexcept {
11fdf7f2
TL
155 if (available()) {
156 auto now = std::min(n - completed, available());
157 std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
158 _buf.trim_front(now);
159 completed += now;
160 }
161 if (completed == n) {
162 return make_ready_future<tmp_buf>(std::move(out));
163 }
164
165 // _buf is now empty
166 return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
167 if (buf.size() == 0) {
168 _eof = true;
20effc67
TL
169 out.trim(completed);
170 return make_ready_future<tmp_buf>(std::move(out));
11fdf7f2
TL
171 }
172 _buf = std::move(buf);
173 return this->read_exactly_part(n, std::move(out), completed);
174 });
175}
176
177template <typename CharType>
178future<temporary_buffer<CharType>>
20effc67 179input_stream<CharType>::read_exactly(size_t n) noexcept {
11fdf7f2
TL
180 if (_buf.size() == n) {
181 // easy case: steal buffer, return to caller
182 return make_ready_future<tmp_buf>(std::move(_buf));
183 } else if (_buf.size() > n) {
184 // buffer large enough, share it with caller
185 auto front = _buf.share(0, n);
186 _buf.trim_front(n);
187 return make_ready_future<tmp_buf>(std::move(front));
188 } else if (_buf.size() == 0) {
189 // buffer is empty: grab one and retry
190 return _fd.get().then([this, n] (auto buf) mutable {
191 if (buf.size() == 0) {
192 _eof = true;
193 return make_ready_future<tmp_buf>(std::move(buf));
194 }
195 _buf = std::move(buf);
196 return this->read_exactly(n);
197 });
198 } else {
20effc67 199 try {
11fdf7f2
TL
200 // buffer too small: start copy/read loop
201 tmp_buf b(n);
202 return read_exactly_part(n, std::move(b), 0);
20effc67
TL
203 } catch (...) {
204 return current_exception_as_future<tmp_buf>();
205 }
11fdf7f2
TL
206 }
207}
208
209template <typename CharType>
210template <typename Consumer>
f67539c2 211SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
11fdf7f2 212future<>
20effc67 213input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
11fdf7f2
TL
214 return repeat([consumer = std::move(consumer), this] () mutable {
215 if (_buf.empty() && !_eof) {
216 return _fd.get().then([this] (tmp_buf buf) {
217 _buf = std::move(buf);
218 _eof = _buf.empty();
219 return make_ready_future<stop_iteration>(stop_iteration::no);
220 });
221 }
222 return consumer(std::move(_buf)).then([this] (consumption_result_type result) {
223 return seastar::visit(result.get(), [this] (const continue_consuming&) {
224 // If we're here, consumer consumed entire buffer and is ready for
225 // more now. So we do not return, and rather continue the loop.
226 //
227 // If we're at eof, we should stop.
228 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
229 }, [this] (stop_consuming<CharType>& stop) {
230 // consumer is done
231 this->_buf = std::move(stop.get_buffer());
232 return make_ready_future<stop_iteration>(stop_iteration::yes);
233 }, [this] (const skip_bytes& skip) {
234 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
235 if (!buf.empty()) {
236 this->_buf = std::move(buf);
237 }
238 return make_ready_future<stop_iteration>(stop_iteration::no);
239 });
240 });
241 });
242 });
243}
244
245template <typename CharType>
246template <typename Consumer>
f67539c2 247SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
11fdf7f2 248future<>
20effc67 249input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
11fdf7f2
TL
250 return consume(std::ref(consumer));
251}
252
253template <typename CharType>
254future<temporary_buffer<CharType>>
20effc67 255input_stream<CharType>::read_up_to(size_t n) noexcept {
11fdf7f2
TL
256 using tmp_buf = temporary_buffer<CharType>;
257 if (_buf.empty()) {
258 if (_eof) {
259 return make_ready_future<tmp_buf>();
260 } else {
261 return _fd.get().then([this, n] (tmp_buf buf) {
262 _eof = buf.empty();
263 _buf = std::move(buf);
264 return read_up_to(n);
265 });
266 }
267 } else if (_buf.size() <= n) {
268 // easy case: steal buffer, return to caller
269 return make_ready_future<tmp_buf>(std::move(_buf));
270 } else {
20effc67 271 try {
11fdf7f2
TL
272 // buffer is larger than n, so share its head with a caller
273 auto front = _buf.share(0, n);
274 _buf.trim_front(n);
275 return make_ready_future<tmp_buf>(std::move(front));
20effc67
TL
276 } catch (...) {
277 return current_exception_as_future<tmp_buf>();
278 }
11fdf7f2
TL
279 }
280}
281
282template <typename CharType>
283future<temporary_buffer<CharType>>
20effc67 284input_stream<CharType>::read() noexcept {
11fdf7f2
TL
285 using tmp_buf = temporary_buffer<CharType>;
286 if (_eof) {
287 return make_ready_future<tmp_buf>();
288 }
289 if (_buf.empty()) {
290 return _fd.get().then([this] (tmp_buf buf) {
291 _eof = buf.empty();
292 return make_ready_future<tmp_buf>(std::move(buf));
293 });
294 } else {
295 return make_ready_future<tmp_buf>(std::move(_buf));
296 }
297}
298
299template <typename CharType>
300future<>
20effc67 301input_stream<CharType>::skip(uint64_t n) noexcept {
11fdf7f2
TL
302 auto skip_buf = std::min(n, _buf.size());
303 _buf.trim_front(skip_buf);
304 n -= skip_buf;
305 if (!n) {
306 return make_ready_future<>();
307 }
308 return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) {
309 _buf = std::move(buffer);
310 });
311}
312
313template <typename CharType>
314data_source
315input_stream<CharType>::detach() && {
316 if (_buf) {
317 throw std::logic_error("detach() called on a used input_stream");
318 }
319
320 return std::move(_fd);
321}
322
323// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
324template <typename CharType>
325future<>
20effc67 326output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) noexcept {
11fdf7f2
TL
327 assert(_end == 0);
328
329 return repeat([this, buf = std::move(buf)] () mutable {
330 if (buf.size() < _size) {
331 if (!_buf) {
332 _buf = _fd.allocate_buffer(_size);
333 }
334 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
335 _end = buf.size();
336 return make_ready_future<stop_iteration>(stop_iteration::yes);
337 }
338 auto chunk = buf.share(0, _size);
339 buf.trim_front(_size);
340 return put(std::move(chunk)).then([] {
341 return stop_iteration::no;
342 });
343 });
344}
345
346template <typename CharType>
347future<>
20effc67 348output_stream<CharType>::write(const char_type* buf, size_t n) noexcept {
9f95a23c
TL
349 if (__builtin_expect(!_buf || n > _size - _end, false)) {
350 return slow_write(buf, n);
351 }
352 std::copy_n(buf, n, _buf.get_write() + _end);
353 _end += n;
354 return make_ready_future<>();
355}
356
357template <typename CharType>
358future<>
20effc67
TL
359output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
360 try {
11fdf7f2
TL
361 assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
362 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
363 if (n >= bulk_threshold) {
364 if (_end) {
365 auto now = _size - _end;
366 std::copy(buf, buf + now, _buf.get_write() + _end);
367 _end = _size;
368 temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
369 std::copy(buf + now, buf + n, tmp.get_write());
370 _buf.trim(_end);
371 _end = 0;
372 return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
373 if (_trim_to_size) {
374 return split_and_put(std::move(tmp));
375 } else {
376 return put(std::move(tmp));
377 }
378 });
379 } else {
380 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
381 std::copy(buf, buf + n, tmp.get_write());
382 if (_trim_to_size) {
383 return split_and_put(std::move(tmp));
384 } else {
385 return put(std::move(tmp));
386 }
387 }
388 }
389
390 if (!_buf) {
391 _buf = _fd.allocate_buffer(_size);
392 }
393
394 auto now = std::min(n, _size - _end);
395 std::copy(buf, buf + now, _buf.get_write() + _end);
396 _end += now;
397 if (now == n) {
398 return make_ready_future<>();
399 } else {
400 temporary_buffer<char> next = _fd.allocate_buffer(_size);
401 std::copy(buf + now, buf + n, next.get_write());
402 _end = n - now;
403 std::swap(next, _buf);
404 return put(std::move(next));
405 }
20effc67
TL
406 } catch (...) {
407 return current_exception_as_future();
408 }
11fdf7f2
TL
409}
410
411template <typename CharType>
412future<>
20effc67 413output_stream<CharType>::flush() noexcept {
11fdf7f2
TL
414 if (!_batch_flushes) {
415 if (_end) {
416 _buf.trim(_end);
417 _end = 0;
418 return put(std::move(_buf)).then([this] {
419 return _fd.flush();
420 });
421 } else if (_zc_bufs) {
422 return zero_copy_put(std::move(_zc_bufs)).then([this] {
423 return _fd.flush();
424 });
425 }
426 } else {
427 if (_ex) {
428 // flush is a good time to deliver outstanding errors
429 return make_exception_future<>(std::move(_ex));
430 } else {
431 _flush = true;
432 if (!_in_batch) {
433 add_to_flush_poller(this);
434 _in_batch = promise<>();
435 }
436 }
437 }
438 return make_ready_future<>();
439}
440
441void add_to_flush_poller(output_stream<char>* x);
442
443template <typename CharType>
444future<>
20effc67 445output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
11fdf7f2
TL
446 // if flush is scheduled, disable it, so it will not try to write in parallel
447 _flush = false;
448 if (_flushing) {
449 // flush in progress, wait for it to end before continuing
450 return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
451 return _fd.put(std::move(buf));
452 });
453 } else {
454 return _fd.put(std::move(buf));
455 }
456}
457
458template <typename CharType>
459void
20effc67 460output_stream<CharType>::poll_flush() noexcept {
11fdf7f2
TL
461 if (!_flush) {
462 // flush was canceled, do nothing
463 _flushing = false;
464 _in_batch.value().set_value();
f67539c2 465 _in_batch = std::nullopt;
11fdf7f2
TL
466 return;
467 }
468
469 auto f = make_ready_future();
470 _flush = false;
471 _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
472
473 if (_end) {
474 // send whatever is in the buffer right now
475 _buf.trim(_end);
476 _end = 0;
477 f = _fd.put(std::move(_buf));
478 } else if(_zc_bufs) {
479 f = _fd.put(std::move(_zc_bufs));
480 }
481
9f95a23c
TL
482 // FIXME: future is discarded
483 (void)f.then([this] {
11fdf7f2
TL
484 return _fd.flush();
485 }).then_wrapped([this] (future<> f) {
486 try {
487 f.get();
488 } catch (...) {
489 _ex = std::current_exception();
490 }
491 // if flush() was called while flushing flush once more
492 poll_flush();
493 });
494}
495
496template <typename CharType>
497future<>
20effc67 498output_stream<CharType>::close() noexcept {
11fdf7f2
TL
499 return flush().finally([this] {
500 if (_in_batch) {
501 return _in_batch.value().get_future();
502 } else {
503 return make_ready_future();
504 }
505 }).then([this] {
506 // report final exception as close error
507 if (_ex) {
508 std::rethrow_exception(_ex);
509 }
510 }).finally([this] {
511 return _fd.close();
512 });
513}
514
515template <typename CharType>
516data_sink
517output_stream<CharType>::detach() && {
518 if (_buf) {
519 throw std::logic_error("detach() called on a used output_stream");
520 }
521
522 return std::move(_fd);
523}
524
525namespace internal {
526
527/// \cond internal
528template <typename CharType>
529struct stream_copy_consumer {
530private:
531 output_stream<CharType>& _os;
f67539c2 532 using unconsumed_remainder = std::optional<temporary_buffer<CharType>>;
11fdf7f2
TL
533public:
534 stream_copy_consumer(output_stream<CharType>& os) : _os(os) {
535 }
536 future<unconsumed_remainder> operator()(temporary_buffer<CharType> data) {
537 if (data.empty()) {
538 return make_ready_future<unconsumed_remainder>(std::move(data));
539 }
540 return _os.write(data.get(), data.size()).then([] () {
541 return make_ready_future<unconsumed_remainder>();
542 });
543 }
544};
545/// \endcond
546
547}
548
549extern template struct internal::stream_copy_consumer<char>;
550
551template <typename CharType>
552future<> copy(input_stream<CharType>& in, output_stream<CharType>& out) {
553 return in.consume(internal::stream_copy_consumer<CharType>(out));
554}
555
556extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
557}