]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/iostream-impl.hh
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / include / seastar / core / iostream-impl.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23
24#pragma once
25
f67539c2
TL
26#include <seastar/core/do_with.hh>
27#include <seastar/core/loop.hh>
11fdf7f2 28#include <seastar/net/packet.hh>
11fdf7f2
TL
29#include <seastar/util/variant_utils.hh>
30
31namespace seastar {
32
33inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
34{
35 return do_with(uint64_t(n), [this] (uint64_t& n) {
36 return repeat_until_value([&] {
f67539c2 37 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
11fdf7f2
TL
38 if (buffer.size() >= n) {
39 buffer.trim_front(n);
f67539c2 40 return buffer;
11fdf7f2
TL
41 }
42 n -= buffer.size();
43 return { };
44 });
45 });
46 });
47}
48
49template<typename CharType>
50inline
51future<> output_stream<CharType>::write(const char_type* buf) {
52 return write(buf, strlen(buf));
53}
54
55template<typename CharType>
56template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
57inline
58future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) {
59 return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
60}
61
62template<typename CharType>
63inline
64future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) {
65 return write(s.c_str(), s.size());
66}
67
68template<typename CharType>
69future<> output_stream<CharType>::write(scattered_message<CharType> msg) {
70 return write(std::move(msg).release());
71}
72
73template<typename CharType>
74future<>
75output_stream<CharType>::zero_copy_put(net::packet p) {
76 // if flush is scheduled, disable it, so it will not try to write in parallel
77 _flush = false;
78 if (_flushing) {
79 // flush in progress, wait for it to end before continuing
80 return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
81 return _fd.put(std::move(p));
82 });
83 } else {
84 return _fd.put(std::move(p));
85 }
86}
87
88// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
89template <typename CharType>
90future<>
91output_stream<CharType>::zero_copy_split_and_put(net::packet p) {
92 return repeat([this, p = std::move(p)] () mutable {
93 if (p.len() < _size) {
94 if (p.len()) {
95 _zc_bufs = std::move(p);
96 } else {
97 _zc_bufs = net::packet::make_null_packet();
98 }
99 return make_ready_future<stop_iteration>(stop_iteration::yes);
100 }
101 auto chunk = p.share(0, _size);
102 p.trim_front(_size);
103 return zero_copy_put(std::move(chunk)).then([] {
104 return stop_iteration::no;
105 });
106 });
107}
108
109template<typename CharType>
110future<> output_stream<CharType>::write(net::packet p) {
111 static_assert(std::is_same<CharType, char>::value, "packet works on char");
112
113 if (p.len() != 0) {
114 assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
115
116 if (_zc_bufs) {
117 _zc_bufs.append(std::move(p));
118 } else {
119 _zc_bufs = std::move(p);
120 }
121
122 if (_zc_bufs.len() >= _size) {
123 if (_trim_to_size) {
124 return zero_copy_split_and_put(std::move(_zc_bufs));
125 } else {
126 return zero_copy_put(std::move(_zc_bufs));
127 }
128 }
129 }
130 return make_ready_future<>();
131}
132
133template<typename CharType>
134future<> output_stream<CharType>::write(temporary_buffer<CharType> p) {
135 if (p.empty()) {
136 return make_ready_future<>();
137 }
138 assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
139
140 return write(net::packet(std::move(p)));
141}
142
143template <typename CharType>
144future<temporary_buffer<CharType>>
145input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) {
146 if (available()) {
147 auto now = std::min(n - completed, available());
148 std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
149 _buf.trim_front(now);
150 completed += now;
151 }
152 if (completed == n) {
153 return make_ready_future<tmp_buf>(std::move(out));
154 }
155
156 // _buf is now empty
157 return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
158 if (buf.size() == 0) {
159 _eof = true;
160 return make_ready_future<tmp_buf>(std::move(buf));
161 }
162 _buf = std::move(buf);
163 return this->read_exactly_part(n, std::move(out), completed);
164 });
165}
166
167template <typename CharType>
168future<temporary_buffer<CharType>>
169input_stream<CharType>::read_exactly(size_t n) {
170 if (_buf.size() == n) {
171 // easy case: steal buffer, return to caller
172 return make_ready_future<tmp_buf>(std::move(_buf));
173 } else if (_buf.size() > n) {
174 // buffer large enough, share it with caller
175 auto front = _buf.share(0, n);
176 _buf.trim_front(n);
177 return make_ready_future<tmp_buf>(std::move(front));
178 } else if (_buf.size() == 0) {
179 // buffer is empty: grab one and retry
180 return _fd.get().then([this, n] (auto buf) mutable {
181 if (buf.size() == 0) {
182 _eof = true;
183 return make_ready_future<tmp_buf>(std::move(buf));
184 }
185 _buf = std::move(buf);
186 return this->read_exactly(n);
187 });
188 } else {
189 // buffer too small: start copy/read loop
190 tmp_buf b(n);
191 return read_exactly_part(n, std::move(b), 0);
192 }
193}
194
195template <typename CharType>
196template <typename Consumer>
f67539c2 197SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
11fdf7f2
TL
198future<>
199input_stream<CharType>::consume(Consumer&& consumer) {
200 return repeat([consumer = std::move(consumer), this] () mutable {
201 if (_buf.empty() && !_eof) {
202 return _fd.get().then([this] (tmp_buf buf) {
203 _buf = std::move(buf);
204 _eof = _buf.empty();
205 return make_ready_future<stop_iteration>(stop_iteration::no);
206 });
207 }
208 return consumer(std::move(_buf)).then([this] (consumption_result_type result) {
209 return seastar::visit(result.get(), [this] (const continue_consuming&) {
210 // If we're here, consumer consumed entire buffer and is ready for
211 // more now. So we do not return, and rather continue the loop.
212 //
213 // If we're at eof, we should stop.
214 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
215 }, [this] (stop_consuming<CharType>& stop) {
216 // consumer is done
217 this->_buf = std::move(stop.get_buffer());
218 return make_ready_future<stop_iteration>(stop_iteration::yes);
219 }, [this] (const skip_bytes& skip) {
220 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
221 if (!buf.empty()) {
222 this->_buf = std::move(buf);
223 }
224 return make_ready_future<stop_iteration>(stop_iteration::no);
225 });
226 });
227 });
228 });
229}
230
231template <typename CharType>
232template <typename Consumer>
f67539c2 233SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
11fdf7f2
TL
234future<>
235input_stream<CharType>::consume(Consumer& consumer) {
236 return consume(std::ref(consumer));
237}
238
239template <typename CharType>
240future<temporary_buffer<CharType>>
241input_stream<CharType>::read_up_to(size_t n) {
242 using tmp_buf = temporary_buffer<CharType>;
243 if (_buf.empty()) {
244 if (_eof) {
245 return make_ready_future<tmp_buf>();
246 } else {
247 return _fd.get().then([this, n] (tmp_buf buf) {
248 _eof = buf.empty();
249 _buf = std::move(buf);
250 return read_up_to(n);
251 });
252 }
253 } else if (_buf.size() <= n) {
254 // easy case: steal buffer, return to caller
255 return make_ready_future<tmp_buf>(std::move(_buf));
256 } else {
257 // buffer is larger than n, so share its head with a caller
258 auto front = _buf.share(0, n);
259 _buf.trim_front(n);
260 return make_ready_future<tmp_buf>(std::move(front));
261 }
262}
263
264template <typename CharType>
265future<temporary_buffer<CharType>>
266input_stream<CharType>::read() {
267 using tmp_buf = temporary_buffer<CharType>;
268 if (_eof) {
269 return make_ready_future<tmp_buf>();
270 }
271 if (_buf.empty()) {
272 return _fd.get().then([this] (tmp_buf buf) {
273 _eof = buf.empty();
274 return make_ready_future<tmp_buf>(std::move(buf));
275 });
276 } else {
277 return make_ready_future<tmp_buf>(std::move(_buf));
278 }
279}
280
281template <typename CharType>
282future<>
283input_stream<CharType>::skip(uint64_t n) {
284 auto skip_buf = std::min(n, _buf.size());
285 _buf.trim_front(skip_buf);
286 n -= skip_buf;
287 if (!n) {
288 return make_ready_future<>();
289 }
290 return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) {
291 _buf = std::move(buffer);
292 });
293}
294
295template <typename CharType>
296data_source
297input_stream<CharType>::detach() && {
298 if (_buf) {
299 throw std::logic_error("detach() called on a used input_stream");
300 }
301
302 return std::move(_fd);
303}
304
305// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
306template <typename CharType>
307future<>
308output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
309 assert(_end == 0);
310
311 return repeat([this, buf = std::move(buf)] () mutable {
312 if (buf.size() < _size) {
313 if (!_buf) {
314 _buf = _fd.allocate_buffer(_size);
315 }
316 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
317 _end = buf.size();
318 return make_ready_future<stop_iteration>(stop_iteration::yes);
319 }
320 auto chunk = buf.share(0, _size);
321 buf.trim_front(_size);
322 return put(std::move(chunk)).then([] {
323 return stop_iteration::no;
324 });
325 });
326}
327
328template <typename CharType>
329future<>
330output_stream<CharType>::write(const char_type* buf, size_t n) {
9f95a23c
TL
331 if (__builtin_expect(!_buf || n > _size - _end, false)) {
332 return slow_write(buf, n);
333 }
334 std::copy_n(buf, n, _buf.get_write() + _end);
335 _end += n;
336 return make_ready_future<>();
337}
338
339template <typename CharType>
340future<>
341output_stream<CharType>::slow_write(const char_type* buf, size_t n) {
11fdf7f2
TL
342 assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
343 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
344 if (n >= bulk_threshold) {
345 if (_end) {
346 auto now = _size - _end;
347 std::copy(buf, buf + now, _buf.get_write() + _end);
348 _end = _size;
349 temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
350 std::copy(buf + now, buf + n, tmp.get_write());
351 _buf.trim(_end);
352 _end = 0;
353 return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
354 if (_trim_to_size) {
355 return split_and_put(std::move(tmp));
356 } else {
357 return put(std::move(tmp));
358 }
359 });
360 } else {
361 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
362 std::copy(buf, buf + n, tmp.get_write());
363 if (_trim_to_size) {
364 return split_and_put(std::move(tmp));
365 } else {
366 return put(std::move(tmp));
367 }
368 }
369 }
370
371 if (!_buf) {
372 _buf = _fd.allocate_buffer(_size);
373 }
374
375 auto now = std::min(n, _size - _end);
376 std::copy(buf, buf + now, _buf.get_write() + _end);
377 _end += now;
378 if (now == n) {
379 return make_ready_future<>();
380 } else {
381 temporary_buffer<char> next = _fd.allocate_buffer(_size);
382 std::copy(buf + now, buf + n, next.get_write());
383 _end = n - now;
384 std::swap(next, _buf);
385 return put(std::move(next));
386 }
387}
388
389template <typename CharType>
390future<>
391output_stream<CharType>::flush() {
392 if (!_batch_flushes) {
393 if (_end) {
394 _buf.trim(_end);
395 _end = 0;
396 return put(std::move(_buf)).then([this] {
397 return _fd.flush();
398 });
399 } else if (_zc_bufs) {
400 return zero_copy_put(std::move(_zc_bufs)).then([this] {
401 return _fd.flush();
402 });
403 }
404 } else {
405 if (_ex) {
406 // flush is a good time to deliver outstanding errors
407 return make_exception_future<>(std::move(_ex));
408 } else {
409 _flush = true;
410 if (!_in_batch) {
411 add_to_flush_poller(this);
412 _in_batch = promise<>();
413 }
414 }
415 }
416 return make_ready_future<>();
417}
418
419void add_to_flush_poller(output_stream<char>* x);
420
421template <typename CharType>
422future<>
423output_stream<CharType>::put(temporary_buffer<CharType> buf) {
424 // if flush is scheduled, disable it, so it will not try to write in parallel
425 _flush = false;
426 if (_flushing) {
427 // flush in progress, wait for it to end before continuing
428 return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
429 return _fd.put(std::move(buf));
430 });
431 } else {
432 return _fd.put(std::move(buf));
433 }
434}
435
436template <typename CharType>
437void
438output_stream<CharType>::poll_flush() {
439 if (!_flush) {
440 // flush was canceled, do nothing
441 _flushing = false;
442 _in_batch.value().set_value();
f67539c2 443 _in_batch = std::nullopt;
11fdf7f2
TL
444 return;
445 }
446
447 auto f = make_ready_future();
448 _flush = false;
449 _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
450
451 if (_end) {
452 // send whatever is in the buffer right now
453 _buf.trim(_end);
454 _end = 0;
455 f = _fd.put(std::move(_buf));
456 } else if(_zc_bufs) {
457 f = _fd.put(std::move(_zc_bufs));
458 }
459
9f95a23c
TL
460 // FIXME: future is discarded
461 (void)f.then([this] {
11fdf7f2
TL
462 return _fd.flush();
463 }).then_wrapped([this] (future<> f) {
464 try {
465 f.get();
466 } catch (...) {
467 _ex = std::current_exception();
468 }
469 // if flush() was called while flushing flush once more
470 poll_flush();
471 });
472}
473
474template <typename CharType>
475future<>
476output_stream<CharType>::close() {
477 return flush().finally([this] {
478 if (_in_batch) {
479 return _in_batch.value().get_future();
480 } else {
481 return make_ready_future();
482 }
483 }).then([this] {
484 // report final exception as close error
485 if (_ex) {
486 std::rethrow_exception(_ex);
487 }
488 }).finally([this] {
489 return _fd.close();
490 });
491}
492
493template <typename CharType>
494data_sink
495output_stream<CharType>::detach() && {
496 if (_buf) {
497 throw std::logic_error("detach() called on a used output_stream");
498 }
499
500 return std::move(_fd);
501}
502
503namespace internal {
504
505/// \cond internal
506template <typename CharType>
507struct stream_copy_consumer {
508private:
509 output_stream<CharType>& _os;
f67539c2 510 using unconsumed_remainder = std::optional<temporary_buffer<CharType>>;
11fdf7f2
TL
511public:
512 stream_copy_consumer(output_stream<CharType>& os) : _os(os) {
513 }
514 future<unconsumed_remainder> operator()(temporary_buffer<CharType> data) {
515 if (data.empty()) {
516 return make_ready_future<unconsumed_remainder>(std::move(data));
517 }
518 return _os.write(data.get(), data.size()).then([] () {
519 return make_ready_future<unconsumed_remainder>();
520 });
521 }
522};
523/// \endcond
524
525}
526
527extern template struct internal::stream_copy_consumer<char>;
528
529template <typename CharType>
530future<> copy(input_stream<CharType>& in, output_stream<CharType>& out) {
531 return in.consume(internal::stream_copy_consumer<CharType>(out));
532}
533
534extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
535}