]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2015 Cloudius Systems, Ltd. | |
21 | */ | |
22 | ||
23 | ||
24 | #pragma once | |
25 | ||
f67539c2 TL |
26 | #include <seastar/core/do_with.hh> |
27 | #include <seastar/core/loop.hh> | |
11fdf7f2 | 28 | #include <seastar/net/packet.hh> |
11fdf7f2 TL |
29 | #include <seastar/util/variant_utils.hh> |
30 | ||
31 | namespace seastar { | |
32 | ||
33 | inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n) | |
34 | { | |
35 | return do_with(uint64_t(n), [this] (uint64_t& n) { | |
36 | return repeat_until_value([&] { | |
f67539c2 | 37 | return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> { |
20effc67 TL |
38 | if (buffer.empty()) { |
39 | return buffer; | |
40 | } | |
11fdf7f2 TL |
41 | if (buffer.size() >= n) { |
42 | buffer.trim_front(n); | |
f67539c2 | 43 | return buffer; |
11fdf7f2 TL |
44 | } |
45 | n -= buffer.size(); | |
46 | return { }; | |
47 | }); | |
48 | }); | |
49 | }); | |
50 | } | |
51 | ||
52 | template<typename CharType> | |
53 | inline | |
20effc67 | 54 | future<> output_stream<CharType>::write(const char_type* buf) noexcept { |
11fdf7f2 TL |
55 | return write(buf, strlen(buf)); |
56 | } | |
57 | ||
58 | template<typename CharType> | |
59 | template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate> | |
60 | inline | |
20effc67 | 61 | future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept { |
11fdf7f2 TL |
62 | return write(reinterpret_cast<const CharType *>(s.c_str()), s.size()); |
63 | } | |
64 | ||
65 | template<typename CharType> | |
66 | inline | |
20effc67 | 67 | future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept { |
11fdf7f2 TL |
68 | return write(s.c_str(), s.size()); |
69 | } | |
70 | ||
71 | template<typename CharType> | |
20effc67 | 72 | future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept { |
11fdf7f2 TL |
73 | return write(std::move(msg).release()); |
74 | } | |
75 | ||
76 | template<typename CharType> | |
77 | future<> | |
20effc67 | 78 | output_stream<CharType>::zero_copy_put(net::packet p) noexcept { |
11fdf7f2 TL |
79 | // if flush is scheduled, disable it, so it will not try to write in parallel |
80 | _flush = false; | |
81 | if (_flushing) { | |
82 | // flush in progress, wait for it to end before continuing | |
83 | return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable { | |
84 | return _fd.put(std::move(p)); | |
85 | }); | |
86 | } else { | |
87 | return _fd.put(std::move(p)); | |
88 | } | |
89 | } | |
90 | ||
91 | // Writes @p in chunks of _size length. The last chunk is buffered if smaller. | |
92 | template <typename CharType> | |
93 | future<> | |
20effc67 | 94 | output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept { |
11fdf7f2 TL |
95 | return repeat([this, p = std::move(p)] () mutable { |
96 | if (p.len() < _size) { | |
97 | if (p.len()) { | |
98 | _zc_bufs = std::move(p); | |
99 | } else { | |
100 | _zc_bufs = net::packet::make_null_packet(); | |
101 | } | |
102 | return make_ready_future<stop_iteration>(stop_iteration::yes); | |
103 | } | |
104 | auto chunk = p.share(0, _size); | |
105 | p.trim_front(_size); | |
106 | return zero_copy_put(std::move(chunk)).then([] { | |
107 | return stop_iteration::no; | |
108 | }); | |
109 | }); | |
110 | } | |
111 | ||
112 | template<typename CharType> | |
20effc67 | 113 | future<> output_stream<CharType>::write(net::packet p) noexcept { |
11fdf7f2 | 114 | static_assert(std::is_same<CharType, char>::value, "packet works on char"); |
20effc67 | 115 | try { |
11fdf7f2 TL |
116 | if (p.len() != 0) { |
117 | assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet"); | |
118 | ||
119 | if (_zc_bufs) { | |
120 | _zc_bufs.append(std::move(p)); | |
121 | } else { | |
122 | _zc_bufs = std::move(p); | |
123 | } | |
124 | ||
125 | if (_zc_bufs.len() >= _size) { | |
126 | if (_trim_to_size) { | |
127 | return zero_copy_split_and_put(std::move(_zc_bufs)); | |
128 | } else { | |
129 | return zero_copy_put(std::move(_zc_bufs)); | |
130 | } | |
131 | } | |
132 | } | |
133 | return make_ready_future<>(); | |
20effc67 TL |
134 | } catch (...) { |
135 | return current_exception_as_future(); | |
136 | } | |
11fdf7f2 TL |
137 | } |
138 | ||
139 | template<typename CharType> | |
20effc67 TL |
140 | future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept { |
141 | try { | |
11fdf7f2 TL |
142 | if (p.empty()) { |
143 | return make_ready_future<>(); | |
144 | } | |
145 | assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet"); | |
11fdf7f2 | 146 | return write(net::packet(std::move(p))); |
20effc67 TL |
147 | } catch (...) { |
148 | return current_exception_as_future(); | |
149 | } | |
11fdf7f2 TL |
150 | } |
151 | ||
152 | template <typename CharType> | |
153 | future<temporary_buffer<CharType>> | |
20effc67 | 154 | input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) noexcept { |
11fdf7f2 TL |
155 | if (available()) { |
156 | auto now = std::min(n - completed, available()); | |
157 | std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed); | |
158 | _buf.trim_front(now); | |
159 | completed += now; | |
160 | } | |
161 | if (completed == n) { | |
162 | return make_ready_future<tmp_buf>(std::move(out)); | |
163 | } | |
164 | ||
165 | // _buf is now empty | |
166 | return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable { | |
167 | if (buf.size() == 0) { | |
168 | _eof = true; | |
20effc67 TL |
169 | out.trim(completed); |
170 | return make_ready_future<tmp_buf>(std::move(out)); | |
11fdf7f2 TL |
171 | } |
172 | _buf = std::move(buf); | |
173 | return this->read_exactly_part(n, std::move(out), completed); | |
174 | }); | |
175 | } | |
176 | ||
177 | template <typename CharType> | |
178 | future<temporary_buffer<CharType>> | |
20effc67 | 179 | input_stream<CharType>::read_exactly(size_t n) noexcept { |
11fdf7f2 TL |
180 | if (_buf.size() == n) { |
181 | // easy case: steal buffer, return to caller | |
182 | return make_ready_future<tmp_buf>(std::move(_buf)); | |
183 | } else if (_buf.size() > n) { | |
184 | // buffer large enough, share it with caller | |
185 | auto front = _buf.share(0, n); | |
186 | _buf.trim_front(n); | |
187 | return make_ready_future<tmp_buf>(std::move(front)); | |
188 | } else if (_buf.size() == 0) { | |
189 | // buffer is empty: grab one and retry | |
190 | return _fd.get().then([this, n] (auto buf) mutable { | |
191 | if (buf.size() == 0) { | |
192 | _eof = true; | |
193 | return make_ready_future<tmp_buf>(std::move(buf)); | |
194 | } | |
195 | _buf = std::move(buf); | |
196 | return this->read_exactly(n); | |
197 | }); | |
198 | } else { | |
20effc67 | 199 | try { |
11fdf7f2 TL |
200 | // buffer too small: start copy/read loop |
201 | tmp_buf b(n); | |
202 | return read_exactly_part(n, std::move(b), 0); | |
20effc67 TL |
203 | } catch (...) { |
204 | return current_exception_as_future<tmp_buf>(); | |
205 | } | |
11fdf7f2 TL |
206 | } |
207 | } | |
208 | ||
209 | template <typename CharType> | |
210 | template <typename Consumer> | |
f67539c2 | 211 | SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>) |
11fdf7f2 | 212 | future<> |
20effc67 | 213 | input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) { |
11fdf7f2 TL |
214 | return repeat([consumer = std::move(consumer), this] () mutable { |
215 | if (_buf.empty() && !_eof) { | |
216 | return _fd.get().then([this] (tmp_buf buf) { | |
217 | _buf = std::move(buf); | |
218 | _eof = _buf.empty(); | |
219 | return make_ready_future<stop_iteration>(stop_iteration::no); | |
220 | }); | |
221 | } | |
222 | return consumer(std::move(_buf)).then([this] (consumption_result_type result) { | |
223 | return seastar::visit(result.get(), [this] (const continue_consuming&) { | |
224 | // If we're here, consumer consumed entire buffer and is ready for | |
225 | // more now. So we do not return, and rather continue the loop. | |
226 | // | |
227 | // If we're at eof, we should stop. | |
228 | return make_ready_future<stop_iteration>(stop_iteration(this->_eof)); | |
229 | }, [this] (stop_consuming<CharType>& stop) { | |
230 | // consumer is done | |
231 | this->_buf = std::move(stop.get_buffer()); | |
232 | return make_ready_future<stop_iteration>(stop_iteration::yes); | |
233 | }, [this] (const skip_bytes& skip) { | |
234 | return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) { | |
235 | if (!buf.empty()) { | |
236 | this->_buf = std::move(buf); | |
237 | } | |
238 | return make_ready_future<stop_iteration>(stop_iteration::no); | |
239 | }); | |
240 | }); | |
241 | }); | |
242 | }); | |
243 | } | |
244 | ||
245 | template <typename CharType> | |
246 | template <typename Consumer> | |
f67539c2 | 247 | SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>) |
11fdf7f2 | 248 | future<> |
20effc67 | 249 | input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) { |
11fdf7f2 TL |
250 | return consume(std::ref(consumer)); |
251 | } | |
252 | ||
253 | template <typename CharType> | |
254 | future<temporary_buffer<CharType>> | |
20effc67 | 255 | input_stream<CharType>::read_up_to(size_t n) noexcept { |
11fdf7f2 TL |
256 | using tmp_buf = temporary_buffer<CharType>; |
257 | if (_buf.empty()) { | |
258 | if (_eof) { | |
259 | return make_ready_future<tmp_buf>(); | |
260 | } else { | |
261 | return _fd.get().then([this, n] (tmp_buf buf) { | |
262 | _eof = buf.empty(); | |
263 | _buf = std::move(buf); | |
264 | return read_up_to(n); | |
265 | }); | |
266 | } | |
267 | } else if (_buf.size() <= n) { | |
268 | // easy case: steal buffer, return to caller | |
269 | return make_ready_future<tmp_buf>(std::move(_buf)); | |
270 | } else { | |
20effc67 | 271 | try { |
11fdf7f2 TL |
272 | // buffer is larger than n, so share its head with a caller |
273 | auto front = _buf.share(0, n); | |
274 | _buf.trim_front(n); | |
275 | return make_ready_future<tmp_buf>(std::move(front)); | |
20effc67 TL |
276 | } catch (...) { |
277 | return current_exception_as_future<tmp_buf>(); | |
278 | } | |
11fdf7f2 TL |
279 | } |
280 | } | |
281 | ||
282 | template <typename CharType> | |
283 | future<temporary_buffer<CharType>> | |
20effc67 | 284 | input_stream<CharType>::read() noexcept { |
11fdf7f2 TL |
285 | using tmp_buf = temporary_buffer<CharType>; |
286 | if (_eof) { | |
287 | return make_ready_future<tmp_buf>(); | |
288 | } | |
289 | if (_buf.empty()) { | |
290 | return _fd.get().then([this] (tmp_buf buf) { | |
291 | _eof = buf.empty(); | |
292 | return make_ready_future<tmp_buf>(std::move(buf)); | |
293 | }); | |
294 | } else { | |
295 | return make_ready_future<tmp_buf>(std::move(_buf)); | |
296 | } | |
297 | } | |
298 | ||
299 | template <typename CharType> | |
300 | future<> | |
20effc67 | 301 | input_stream<CharType>::skip(uint64_t n) noexcept { |
11fdf7f2 TL |
302 | auto skip_buf = std::min(n, _buf.size()); |
303 | _buf.trim_front(skip_buf); | |
304 | n -= skip_buf; | |
305 | if (!n) { | |
306 | return make_ready_future<>(); | |
307 | } | |
308 | return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) { | |
309 | _buf = std::move(buffer); | |
310 | }); | |
311 | } | |
312 | ||
313 | template <typename CharType> | |
314 | data_source | |
315 | input_stream<CharType>::detach() && { | |
316 | if (_buf) { | |
317 | throw std::logic_error("detach() called on a used input_stream"); | |
318 | } | |
319 | ||
320 | return std::move(_fd); | |
321 | } | |
322 | ||
323 | // Writes @buf in chunks of _size length. The last chunk is buffered if smaller. | |
324 | template <typename CharType> | |
325 | future<> | |
20effc67 | 326 | output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) noexcept { |
11fdf7f2 TL |
327 | assert(_end == 0); |
328 | ||
329 | return repeat([this, buf = std::move(buf)] () mutable { | |
330 | if (buf.size() < _size) { | |
331 | if (!_buf) { | |
332 | _buf = _fd.allocate_buffer(_size); | |
333 | } | |
334 | std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write()); | |
335 | _end = buf.size(); | |
336 | return make_ready_future<stop_iteration>(stop_iteration::yes); | |
337 | } | |
338 | auto chunk = buf.share(0, _size); | |
339 | buf.trim_front(_size); | |
340 | return put(std::move(chunk)).then([] { | |
341 | return stop_iteration::no; | |
342 | }); | |
343 | }); | |
344 | } | |
345 | ||
346 | template <typename CharType> | |
347 | future<> | |
20effc67 | 348 | output_stream<CharType>::write(const char_type* buf, size_t n) noexcept { |
9f95a23c TL |
349 | if (__builtin_expect(!_buf || n > _size - _end, false)) { |
350 | return slow_write(buf, n); | |
351 | } | |
352 | std::copy_n(buf, n, _buf.get_write() + _end); | |
353 | _end += n; | |
354 | return make_ready_future<>(); | |
355 | } | |
356 | ||
357 | template <typename CharType> | |
358 | future<> | |
20effc67 TL |
359 | output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept { |
360 | try { | |
11fdf7f2 TL |
361 | assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet"); |
362 | auto bulk_threshold = _end ? (2 * _size - _end) : _size; | |
363 | if (n >= bulk_threshold) { | |
364 | if (_end) { | |
365 | auto now = _size - _end; | |
366 | std::copy(buf, buf + now, _buf.get_write() + _end); | |
367 | _end = _size; | |
368 | temporary_buffer<char> tmp = _fd.allocate_buffer(n - now); | |
369 | std::copy(buf + now, buf + n, tmp.get_write()); | |
370 | _buf.trim(_end); | |
371 | _end = 0; | |
372 | return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable { | |
373 | if (_trim_to_size) { | |
374 | return split_and_put(std::move(tmp)); | |
375 | } else { | |
376 | return put(std::move(tmp)); | |
377 | } | |
378 | }); | |
379 | } else { | |
380 | temporary_buffer<char> tmp = _fd.allocate_buffer(n); | |
381 | std::copy(buf, buf + n, tmp.get_write()); | |
382 | if (_trim_to_size) { | |
383 | return split_and_put(std::move(tmp)); | |
384 | } else { | |
385 | return put(std::move(tmp)); | |
386 | } | |
387 | } | |
388 | } | |
389 | ||
390 | if (!_buf) { | |
391 | _buf = _fd.allocate_buffer(_size); | |
392 | } | |
393 | ||
394 | auto now = std::min(n, _size - _end); | |
395 | std::copy(buf, buf + now, _buf.get_write() + _end); | |
396 | _end += now; | |
397 | if (now == n) { | |
398 | return make_ready_future<>(); | |
399 | } else { | |
400 | temporary_buffer<char> next = _fd.allocate_buffer(_size); | |
401 | std::copy(buf + now, buf + n, next.get_write()); | |
402 | _end = n - now; | |
403 | std::swap(next, _buf); | |
404 | return put(std::move(next)); | |
405 | } | |
20effc67 TL |
406 | } catch (...) { |
407 | return current_exception_as_future(); | |
408 | } | |
11fdf7f2 TL |
409 | } |
410 | ||
411 | template <typename CharType> | |
412 | future<> | |
20effc67 | 413 | output_stream<CharType>::flush() noexcept { |
11fdf7f2 TL |
414 | if (!_batch_flushes) { |
415 | if (_end) { | |
416 | _buf.trim(_end); | |
417 | _end = 0; | |
418 | return put(std::move(_buf)).then([this] { | |
419 | return _fd.flush(); | |
420 | }); | |
421 | } else if (_zc_bufs) { | |
422 | return zero_copy_put(std::move(_zc_bufs)).then([this] { | |
423 | return _fd.flush(); | |
424 | }); | |
425 | } | |
426 | } else { | |
427 | if (_ex) { | |
428 | // flush is a good time to deliver outstanding errors | |
429 | return make_exception_future<>(std::move(_ex)); | |
430 | } else { | |
431 | _flush = true; | |
432 | if (!_in_batch) { | |
433 | add_to_flush_poller(this); | |
434 | _in_batch = promise<>(); | |
435 | } | |
436 | } | |
437 | } | |
438 | return make_ready_future<>(); | |
439 | } | |
440 | ||
441 | void add_to_flush_poller(output_stream<char>* x); | |
442 | ||
443 | template <typename CharType> | |
444 | future<> | |
20effc67 | 445 | output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept { |
11fdf7f2 TL |
446 | // if flush is scheduled, disable it, so it will not try to write in parallel |
447 | _flush = false; | |
448 | if (_flushing) { | |
449 | // flush in progress, wait for it to end before continuing | |
450 | return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable { | |
451 | return _fd.put(std::move(buf)); | |
452 | }); | |
453 | } else { | |
454 | return _fd.put(std::move(buf)); | |
455 | } | |
456 | } | |
457 | ||
458 | template <typename CharType> | |
459 | void | |
20effc67 | 460 | output_stream<CharType>::poll_flush() noexcept { |
11fdf7f2 TL |
461 | if (!_flush) { |
462 | // flush was canceled, do nothing | |
463 | _flushing = false; | |
464 | _in_batch.value().set_value(); | |
f67539c2 | 465 | _in_batch = std::nullopt; |
11fdf7f2 TL |
466 | return; |
467 | } | |
468 | ||
469 | auto f = make_ready_future(); | |
470 | _flush = false; | |
471 | _flushing = true; // make whoever wants to write into the fd to wait for flush to complete | |
472 | ||
473 | if (_end) { | |
474 | // send whatever is in the buffer right now | |
475 | _buf.trim(_end); | |
476 | _end = 0; | |
477 | f = _fd.put(std::move(_buf)); | |
478 | } else if(_zc_bufs) { | |
479 | f = _fd.put(std::move(_zc_bufs)); | |
480 | } | |
481 | ||
9f95a23c TL |
482 | // FIXME: future is discarded |
483 | (void)f.then([this] { | |
11fdf7f2 TL |
484 | return _fd.flush(); |
485 | }).then_wrapped([this] (future<> f) { | |
486 | try { | |
487 | f.get(); | |
488 | } catch (...) { | |
489 | _ex = std::current_exception(); | |
490 | } | |
491 | // if flush() was called while flushing flush once more | |
492 | poll_flush(); | |
493 | }); | |
494 | } | |
495 | ||
496 | template <typename CharType> | |
497 | future<> | |
20effc67 | 498 | output_stream<CharType>::close() noexcept { |
11fdf7f2 TL |
499 | return flush().finally([this] { |
500 | if (_in_batch) { | |
501 | return _in_batch.value().get_future(); | |
502 | } else { | |
503 | return make_ready_future(); | |
504 | } | |
505 | }).then([this] { | |
506 | // report final exception as close error | |
507 | if (_ex) { | |
508 | std::rethrow_exception(_ex); | |
509 | } | |
510 | }).finally([this] { | |
511 | return _fd.close(); | |
512 | }); | |
513 | } | |
514 | ||
515 | template <typename CharType> | |
516 | data_sink | |
517 | output_stream<CharType>::detach() && { | |
518 | if (_buf) { | |
519 | throw std::logic_error("detach() called on a used output_stream"); | |
520 | } | |
521 | ||
522 | return std::move(_fd); | |
523 | } | |
524 | ||
525 | namespace internal { | |
526 | ||
527 | /// \cond internal | |
528 | template <typename CharType> | |
529 | struct stream_copy_consumer { | |
530 | private: | |
531 | output_stream<CharType>& _os; | |
f67539c2 | 532 | using unconsumed_remainder = std::optional<temporary_buffer<CharType>>; |
11fdf7f2 TL |
533 | public: |
534 | stream_copy_consumer(output_stream<CharType>& os) : _os(os) { | |
535 | } | |
536 | future<unconsumed_remainder> operator()(temporary_buffer<CharType> data) { | |
537 | if (data.empty()) { | |
538 | return make_ready_future<unconsumed_remainder>(std::move(data)); | |
539 | } | |
540 | return _os.write(data.get(), data.size()).then([] () { | |
541 | return make_ready_future<unconsumed_remainder>(); | |
542 | }); | |
543 | } | |
544 | }; | |
545 | /// \endcond | |
546 | ||
547 | } | |
548 | ||
549 | extern template struct internal::stream_copy_consumer<char>; | |
550 | ||
551 | template <typename CharType> | |
552 | future<> copy(input_stream<CharType>& in, output_stream<CharType>& out) { | |
553 | return in.consume(internal::stream_copy_consumer<CharType>(out)); | |
554 | } | |
555 | ||
556 | extern template future<> copy<char>(input_stream<char>&, output_stream<char>&); | |
557 | } |