]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/util/short_streams.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / util / short_streams.cc
CommitLineData
20effc67
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2021 ScyllaDB
20 */
21
22#include <seastar/core/future.hh>
23#include <seastar/core/iostream.hh>
24#include <seastar/core/temporary_buffer.hh>
25
26namespace seastar {
27
28namespace util {
29
30future<std::vector<temporary_buffer<char>>> read_entire_stream(input_stream<char>& inp) {
31 using tmp_buf = temporary_buffer<char>;
32 using consumption_result_type = consumption_result<char>;
33 return do_with(std::vector<tmp_buf>(), [&inp] (std::vector<tmp_buf>& bufs) {
34 return inp.consume([&bufs] (tmp_buf buf) {
35 if (buf.empty()) {
36 return make_ready_future<consumption_result_type>(stop_consuming(std::move(buf)));
37 }
38 bufs.push_back(std::move(buf));
39 return make_ready_future<consumption_result_type>(continue_consuming());
40 }).then([&bufs] {
41 return std::move(bufs);
42 });
43 });
44}
45
46future<sstring> read_entire_stream_contiguous(input_stream<char>& inp) {
47 return read_entire_stream(inp).then([] (std::vector<temporary_buffer<char>> bufs) {
48 size_t total_size = 0;
49 for (auto&& buf : bufs) {
50 total_size += buf.size();
51 }
52 sstring ret(sstring::initialized_later(), total_size);
53 size_t pos = 0;
54 for (auto&& buf : bufs) {
55 std::copy(buf.begin(), buf.end(), ret.data() + pos);
56 pos += buf.size();
57 }
58 return ret;
59 });
60};
61
62future<> skip_entire_stream(input_stream<char>& inp) {
63 return inp.consume([] (temporary_buffer<char> tmp) {
64 return tmp.empty() ? make_ready_future<consumption_result<char>>(stop_consuming(temporary_buffer<char>()))
65 : make_ready_future<consumption_result<char>>(continue_consuming());
66 });
67}
68
69}
70
71}