]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2015 Cloudius Systems, Ltd. | |
21 | */ | |
22 | ||
23 | ||
24 | #pragma once | |
25 | ||
26 | #include <seastar/core/future-util.hh> | |
27 | #include <seastar/net/packet.hh> | |
28 | #include <seastar/core/future-util.hh> | |
29 | #include <seastar/util/variant_utils.hh> | |
30 | ||
31 | namespace seastar { | |
32 | ||
33 | inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n) | |
34 | { | |
35 | return do_with(uint64_t(n), [this] (uint64_t& n) { | |
36 | return repeat_until_value([&] { | |
37 | return get().then([&] (temporary_buffer<char> buffer) -> compat::optional<temporary_buffer<char>> { | |
38 | if (buffer.size() >= n) { | |
39 | buffer.trim_front(n); | |
9f95a23c | 40 | return SEASTAR_COPY_ELISION(buffer); |
11fdf7f2 TL |
41 | } |
42 | n -= buffer.size(); | |
43 | return { }; | |
44 | }); | |
45 | }); | |
46 | }); | |
47 | } | |
48 | ||
49 | template<typename CharType> | |
50 | inline | |
51 | future<> output_stream<CharType>::write(const char_type* buf) { | |
52 | return write(buf, strlen(buf)); | |
53 | } | |
54 | ||
55 | template<typename CharType> | |
56 | template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate> | |
57 | inline | |
58 | future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) { | |
59 | return write(reinterpret_cast<const CharType *>(s.c_str()), s.size()); | |
60 | } | |
61 | ||
62 | template<typename CharType> | |
63 | inline | |
64 | future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) { | |
65 | return write(s.c_str(), s.size()); | |
66 | } | |
67 | ||
68 | template<typename CharType> | |
69 | future<> output_stream<CharType>::write(scattered_message<CharType> msg) { | |
70 | return write(std::move(msg).release()); | |
71 | } | |
72 | ||
73 | template<typename CharType> | |
74 | future<> | |
75 | output_stream<CharType>::zero_copy_put(net::packet p) { | |
76 | // if flush is scheduled, disable it, so it will not try to write in parallel | |
77 | _flush = false; | |
78 | if (_flushing) { | |
79 | // flush in progress, wait for it to end before continuing | |
80 | return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable { | |
81 | return _fd.put(std::move(p)); | |
82 | }); | |
83 | } else { | |
84 | return _fd.put(std::move(p)); | |
85 | } | |
86 | } | |
87 | ||
88 | // Writes @p in chunks of _size length. The last chunk is buffered if smaller. | |
89 | template <typename CharType> | |
90 | future<> | |
91 | output_stream<CharType>::zero_copy_split_and_put(net::packet p) { | |
92 | return repeat([this, p = std::move(p)] () mutable { | |
93 | if (p.len() < _size) { | |
94 | if (p.len()) { | |
95 | _zc_bufs = std::move(p); | |
96 | } else { | |
97 | _zc_bufs = net::packet::make_null_packet(); | |
98 | } | |
99 | return make_ready_future<stop_iteration>(stop_iteration::yes); | |
100 | } | |
101 | auto chunk = p.share(0, _size); | |
102 | p.trim_front(_size); | |
103 | return zero_copy_put(std::move(chunk)).then([] { | |
104 | return stop_iteration::no; | |
105 | }); | |
106 | }); | |
107 | } | |
108 | ||
109 | template<typename CharType> | |
110 | future<> output_stream<CharType>::write(net::packet p) { | |
111 | static_assert(std::is_same<CharType, char>::value, "packet works on char"); | |
112 | ||
113 | if (p.len() != 0) { | |
114 | assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet"); | |
115 | ||
116 | if (_zc_bufs) { | |
117 | _zc_bufs.append(std::move(p)); | |
118 | } else { | |
119 | _zc_bufs = std::move(p); | |
120 | } | |
121 | ||
122 | if (_zc_bufs.len() >= _size) { | |
123 | if (_trim_to_size) { | |
124 | return zero_copy_split_and_put(std::move(_zc_bufs)); | |
125 | } else { | |
126 | return zero_copy_put(std::move(_zc_bufs)); | |
127 | } | |
128 | } | |
129 | } | |
130 | return make_ready_future<>(); | |
131 | } | |
132 | ||
133 | template<typename CharType> | |
134 | future<> output_stream<CharType>::write(temporary_buffer<CharType> p) { | |
135 | if (p.empty()) { | |
136 | return make_ready_future<>(); | |
137 | } | |
138 | assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet"); | |
139 | ||
140 | return write(net::packet(std::move(p))); | |
141 | } | |
142 | ||
143 | template <typename CharType> | |
144 | future<temporary_buffer<CharType>> | |
145 | input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) { | |
146 | if (available()) { | |
147 | auto now = std::min(n - completed, available()); | |
148 | std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed); | |
149 | _buf.trim_front(now); | |
150 | completed += now; | |
151 | } | |
152 | if (completed == n) { | |
153 | return make_ready_future<tmp_buf>(std::move(out)); | |
154 | } | |
155 | ||
156 | // _buf is now empty | |
157 | return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable { | |
158 | if (buf.size() == 0) { | |
159 | _eof = true; | |
160 | return make_ready_future<tmp_buf>(std::move(buf)); | |
161 | } | |
162 | _buf = std::move(buf); | |
163 | return this->read_exactly_part(n, std::move(out), completed); | |
164 | }); | |
165 | } | |
166 | ||
167 | template <typename CharType> | |
168 | future<temporary_buffer<CharType>> | |
169 | input_stream<CharType>::read_exactly(size_t n) { | |
170 | if (_buf.size() == n) { | |
171 | // easy case: steal buffer, return to caller | |
172 | return make_ready_future<tmp_buf>(std::move(_buf)); | |
173 | } else if (_buf.size() > n) { | |
174 | // buffer large enough, share it with caller | |
175 | auto front = _buf.share(0, n); | |
176 | _buf.trim_front(n); | |
177 | return make_ready_future<tmp_buf>(std::move(front)); | |
178 | } else if (_buf.size() == 0) { | |
179 | // buffer is empty: grab one and retry | |
180 | return _fd.get().then([this, n] (auto buf) mutable { | |
181 | if (buf.size() == 0) { | |
182 | _eof = true; | |
183 | return make_ready_future<tmp_buf>(std::move(buf)); | |
184 | } | |
185 | _buf = std::move(buf); | |
186 | return this->read_exactly(n); | |
187 | }); | |
188 | } else { | |
189 | // buffer too small: start copy/read loop | |
190 | tmp_buf b(n); | |
191 | return read_exactly_part(n, std::move(b), 0); | |
192 | } | |
193 | } | |
194 | ||
195 | template <typename CharType> | |
196 | template <typename Consumer> | |
197 | GCC6_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>) | |
198 | future<> | |
199 | input_stream<CharType>::consume(Consumer&& consumer) { | |
200 | return repeat([consumer = std::move(consumer), this] () mutable { | |
201 | if (_buf.empty() && !_eof) { | |
202 | return _fd.get().then([this] (tmp_buf buf) { | |
203 | _buf = std::move(buf); | |
204 | _eof = _buf.empty(); | |
205 | return make_ready_future<stop_iteration>(stop_iteration::no); | |
206 | }); | |
207 | } | |
208 | return consumer(std::move(_buf)).then([this] (consumption_result_type result) { | |
209 | return seastar::visit(result.get(), [this] (const continue_consuming&) { | |
210 | // If we're here, consumer consumed entire buffer and is ready for | |
211 | // more now. So we do not return, and rather continue the loop. | |
212 | // | |
213 | // If we're at eof, we should stop. | |
214 | return make_ready_future<stop_iteration>(stop_iteration(this->_eof)); | |
215 | }, [this] (stop_consuming<CharType>& stop) { | |
216 | // consumer is done | |
217 | this->_buf = std::move(stop.get_buffer()); | |
218 | return make_ready_future<stop_iteration>(stop_iteration::yes); | |
219 | }, [this] (const skip_bytes& skip) { | |
220 | return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) { | |
221 | if (!buf.empty()) { | |
222 | this->_buf = std::move(buf); | |
223 | } | |
224 | return make_ready_future<stop_iteration>(stop_iteration::no); | |
225 | }); | |
226 | }); | |
227 | }); | |
228 | }); | |
229 | } | |
230 | ||
231 | template <typename CharType> | |
232 | template <typename Consumer> | |
233 | GCC6_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>) | |
234 | future<> | |
235 | input_stream<CharType>::consume(Consumer& consumer) { | |
236 | return consume(std::ref(consumer)); | |
237 | } | |
238 | ||
239 | template <typename CharType> | |
240 | future<temporary_buffer<CharType>> | |
241 | input_stream<CharType>::read_up_to(size_t n) { | |
242 | using tmp_buf = temporary_buffer<CharType>; | |
243 | if (_buf.empty()) { | |
244 | if (_eof) { | |
245 | return make_ready_future<tmp_buf>(); | |
246 | } else { | |
247 | return _fd.get().then([this, n] (tmp_buf buf) { | |
248 | _eof = buf.empty(); | |
249 | _buf = std::move(buf); | |
250 | return read_up_to(n); | |
251 | }); | |
252 | } | |
253 | } else if (_buf.size() <= n) { | |
254 | // easy case: steal buffer, return to caller | |
255 | return make_ready_future<tmp_buf>(std::move(_buf)); | |
256 | } else { | |
257 | // buffer is larger than n, so share its head with a caller | |
258 | auto front = _buf.share(0, n); | |
259 | _buf.trim_front(n); | |
260 | return make_ready_future<tmp_buf>(std::move(front)); | |
261 | } | |
262 | } | |
263 | ||
264 | template <typename CharType> | |
265 | future<temporary_buffer<CharType>> | |
266 | input_stream<CharType>::read() { | |
267 | using tmp_buf = temporary_buffer<CharType>; | |
268 | if (_eof) { | |
269 | return make_ready_future<tmp_buf>(); | |
270 | } | |
271 | if (_buf.empty()) { | |
272 | return _fd.get().then([this] (tmp_buf buf) { | |
273 | _eof = buf.empty(); | |
274 | return make_ready_future<tmp_buf>(std::move(buf)); | |
275 | }); | |
276 | } else { | |
277 | return make_ready_future<tmp_buf>(std::move(_buf)); | |
278 | } | |
279 | } | |
280 | ||
281 | template <typename CharType> | |
282 | future<> | |
283 | input_stream<CharType>::skip(uint64_t n) { | |
284 | auto skip_buf = std::min(n, _buf.size()); | |
285 | _buf.trim_front(skip_buf); | |
286 | n -= skip_buf; | |
287 | if (!n) { | |
288 | return make_ready_future<>(); | |
289 | } | |
290 | return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) { | |
291 | _buf = std::move(buffer); | |
292 | }); | |
293 | } | |
294 | ||
295 | template <typename CharType> | |
296 | data_source | |
297 | input_stream<CharType>::detach() && { | |
298 | if (_buf) { | |
299 | throw std::logic_error("detach() called on a used input_stream"); | |
300 | } | |
301 | ||
302 | return std::move(_fd); | |
303 | } | |
304 | ||
305 | // Writes @buf in chunks of _size length. The last chunk is buffered if smaller. | |
306 | template <typename CharType> | |
307 | future<> | |
308 | output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) { | |
309 | assert(_end == 0); | |
310 | ||
311 | return repeat([this, buf = std::move(buf)] () mutable { | |
312 | if (buf.size() < _size) { | |
313 | if (!_buf) { | |
314 | _buf = _fd.allocate_buffer(_size); | |
315 | } | |
316 | std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write()); | |
317 | _end = buf.size(); | |
318 | return make_ready_future<stop_iteration>(stop_iteration::yes); | |
319 | } | |
320 | auto chunk = buf.share(0, _size); | |
321 | buf.trim_front(_size); | |
322 | return put(std::move(chunk)).then([] { | |
323 | return stop_iteration::no; | |
324 | }); | |
325 | }); | |
326 | } | |
327 | ||
328 | template <typename CharType> | |
329 | future<> | |
330 | output_stream<CharType>::write(const char_type* buf, size_t n) { | |
9f95a23c TL |
331 | if (__builtin_expect(!_buf || n > _size - _end, false)) { |
332 | return slow_write(buf, n); | |
333 | } | |
334 | std::copy_n(buf, n, _buf.get_write() + _end); | |
335 | _end += n; | |
336 | return make_ready_future<>(); | |
337 | } | |
338 | ||
339 | template <typename CharType> | |
340 | future<> | |
341 | output_stream<CharType>::slow_write(const char_type* buf, size_t n) { | |
11fdf7f2 TL |
342 | assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet"); |
343 | auto bulk_threshold = _end ? (2 * _size - _end) : _size; | |
344 | if (n >= bulk_threshold) { | |
345 | if (_end) { | |
346 | auto now = _size - _end; | |
347 | std::copy(buf, buf + now, _buf.get_write() + _end); | |
348 | _end = _size; | |
349 | temporary_buffer<char> tmp = _fd.allocate_buffer(n - now); | |
350 | std::copy(buf + now, buf + n, tmp.get_write()); | |
351 | _buf.trim(_end); | |
352 | _end = 0; | |
353 | return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable { | |
354 | if (_trim_to_size) { | |
355 | return split_and_put(std::move(tmp)); | |
356 | } else { | |
357 | return put(std::move(tmp)); | |
358 | } | |
359 | }); | |
360 | } else { | |
361 | temporary_buffer<char> tmp = _fd.allocate_buffer(n); | |
362 | std::copy(buf, buf + n, tmp.get_write()); | |
363 | if (_trim_to_size) { | |
364 | return split_and_put(std::move(tmp)); | |
365 | } else { | |
366 | return put(std::move(tmp)); | |
367 | } | |
368 | } | |
369 | } | |
370 | ||
371 | if (!_buf) { | |
372 | _buf = _fd.allocate_buffer(_size); | |
373 | } | |
374 | ||
375 | auto now = std::min(n, _size - _end); | |
376 | std::copy(buf, buf + now, _buf.get_write() + _end); | |
377 | _end += now; | |
378 | if (now == n) { | |
379 | return make_ready_future<>(); | |
380 | } else { | |
381 | temporary_buffer<char> next = _fd.allocate_buffer(_size); | |
382 | std::copy(buf + now, buf + n, next.get_write()); | |
383 | _end = n - now; | |
384 | std::swap(next, _buf); | |
385 | return put(std::move(next)); | |
386 | } | |
387 | } | |
388 | ||
389 | template <typename CharType> | |
390 | future<> | |
391 | output_stream<CharType>::flush() { | |
392 | if (!_batch_flushes) { | |
393 | if (_end) { | |
394 | _buf.trim(_end); | |
395 | _end = 0; | |
396 | return put(std::move(_buf)).then([this] { | |
397 | return _fd.flush(); | |
398 | }); | |
399 | } else if (_zc_bufs) { | |
400 | return zero_copy_put(std::move(_zc_bufs)).then([this] { | |
401 | return _fd.flush(); | |
402 | }); | |
403 | } | |
404 | } else { | |
405 | if (_ex) { | |
406 | // flush is a good time to deliver outstanding errors | |
407 | return make_exception_future<>(std::move(_ex)); | |
408 | } else { | |
409 | _flush = true; | |
410 | if (!_in_batch) { | |
411 | add_to_flush_poller(this); | |
412 | _in_batch = promise<>(); | |
413 | } | |
414 | } | |
415 | } | |
416 | return make_ready_future<>(); | |
417 | } | |
418 | ||
419 | void add_to_flush_poller(output_stream<char>* x); | |
420 | ||
421 | template <typename CharType> | |
422 | future<> | |
423 | output_stream<CharType>::put(temporary_buffer<CharType> buf) { | |
424 | // if flush is scheduled, disable it, so it will not try to write in parallel | |
425 | _flush = false; | |
426 | if (_flushing) { | |
427 | // flush in progress, wait for it to end before continuing | |
428 | return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable { | |
429 | return _fd.put(std::move(buf)); | |
430 | }); | |
431 | } else { | |
432 | return _fd.put(std::move(buf)); | |
433 | } | |
434 | } | |
435 | ||
436 | template <typename CharType> | |
437 | void | |
438 | output_stream<CharType>::poll_flush() { | |
439 | if (!_flush) { | |
440 | // flush was canceled, do nothing | |
441 | _flushing = false; | |
442 | _in_batch.value().set_value(); | |
443 | _in_batch = compat::nullopt; | |
444 | return; | |
445 | } | |
446 | ||
447 | auto f = make_ready_future(); | |
448 | _flush = false; | |
449 | _flushing = true; // make whoever wants to write into the fd to wait for flush to complete | |
450 | ||
451 | if (_end) { | |
452 | // send whatever is in the buffer right now | |
453 | _buf.trim(_end); | |
454 | _end = 0; | |
455 | f = _fd.put(std::move(_buf)); | |
456 | } else if(_zc_bufs) { | |
457 | f = _fd.put(std::move(_zc_bufs)); | |
458 | } | |
459 | ||
9f95a23c TL |
460 | // FIXME: future is discarded |
461 | (void)f.then([this] { | |
11fdf7f2 TL |
462 | return _fd.flush(); |
463 | }).then_wrapped([this] (future<> f) { | |
464 | try { | |
465 | f.get(); | |
466 | } catch (...) { | |
467 | _ex = std::current_exception(); | |
468 | } | |
469 | // if flush() was called while flushing flush once more | |
470 | poll_flush(); | |
471 | }); | |
472 | } | |
473 | ||
474 | template <typename CharType> | |
475 | future<> | |
476 | output_stream<CharType>::close() { | |
477 | return flush().finally([this] { | |
478 | if (_in_batch) { | |
479 | return _in_batch.value().get_future(); | |
480 | } else { | |
481 | return make_ready_future(); | |
482 | } | |
483 | }).then([this] { | |
484 | // report final exception as close error | |
485 | if (_ex) { | |
486 | std::rethrow_exception(_ex); | |
487 | } | |
488 | }).finally([this] { | |
489 | return _fd.close(); | |
490 | }); | |
491 | } | |
492 | ||
493 | template <typename CharType> | |
494 | data_sink | |
495 | output_stream<CharType>::detach() && { | |
496 | if (_buf) { | |
497 | throw std::logic_error("detach() called on a used output_stream"); | |
498 | } | |
499 | ||
500 | return std::move(_fd); | |
501 | } | |
502 | ||
503 | namespace internal { | |
504 | ||
505 | /// \cond internal | |
506 | template <typename CharType> | |
507 | struct stream_copy_consumer { | |
508 | private: | |
509 | output_stream<CharType>& _os; | |
510 | using unconsumed_remainder = compat::optional<temporary_buffer<CharType>>; | |
511 | public: | |
512 | stream_copy_consumer(output_stream<CharType>& os) : _os(os) { | |
513 | } | |
514 | future<unconsumed_remainder> operator()(temporary_buffer<CharType> data) { | |
515 | if (data.empty()) { | |
516 | return make_ready_future<unconsumed_remainder>(std::move(data)); | |
517 | } | |
518 | return _os.write(data.get(), data.size()).then([] () { | |
519 | return make_ready_future<unconsumed_remainder>(); | |
520 | }); | |
521 | } | |
522 | }; | |
523 | /// \endcond | |
524 | ||
525 | } | |
526 | ||
527 | extern template struct internal::stream_copy_consumer<char>; | |
528 | ||
529 | template <typename CharType> | |
530 | future<> copy(input_stream<CharType>& in, output_stream<CharType>& out) { | |
531 | return in.consume(internal::stream_copy_consumer<CharType>(out)); | |
532 | } | |
533 | ||
534 | extern template future<> copy<char>(input_stream<char>&, output_stream<char>&); | |
535 | } |