]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/util/process.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / src / util / process.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19 /*
20 * Copyright (C) 2022 Kefu Chai ( tchaikov@gmail.com )
21 */
22
23 #include <seastar/core/fstream.hh>
24 #include <seastar/core/internal/buffer_allocator.hh>
25 #include <seastar/core/io_queue.hh>
26 #include <seastar/core/polymorphic_temporary_buffer.hh>
27 #include <seastar/core/reactor.hh>
28 #include <seastar/util/process.hh>
29
30 namespace seastar::experimental {
31
32 namespace {
33 class pipe_data_source_impl final : public data_source_impl {
34 static constexpr std::size_t buffer_size = 8192;
35 struct buffer_allocator : public internal::buffer_allocator {
36 temporary_buffer<char> allocate_buffer() override {
37 return make_temporary_buffer<char>(memory::malloc_allocator, buffer_size);
38 }
39 };
40 pollable_fd _fd;
41 buffer_allocator _ba;
42 public:
43 explicit pipe_data_source_impl(pollable_fd fd)
44 : _fd(std::move(fd)) {}
45 static auto from_fd(file_desc&& fd) {
46 return std::make_unique<pipe_data_source_impl>(pollable_fd(std::move(fd)));
47 }
48 future<temporary_buffer<char>> get() override {
49 return _fd.read_some(&_ba);
50 }
51 future<> close() override {
52 _fd.close();
53 return make_ready_future();
54 }
55 };
56
57 class pipe_data_sink_impl final : public data_sink_impl {
58 file_desc _fd;
59 io_queue& _io_queue;
60 const size_t _buffer_size;
61 public:
62 explicit pipe_data_sink_impl(file_desc&& fd)
63 : _fd(std::move(fd))
64 , _io_queue(engine().get_io_queue(0))
65 , _buffer_size(file_input_stream_options{}.buffer_size) {}
66 static auto from_fd(file_desc&& fd) {
67 return std::make_unique<pipe_data_sink_impl>(std::move(fd));
68 }
69 using data_sink_impl::put;
70 future<> put(temporary_buffer<char> buf) override {
71 size_t buf_size = buf.size();
72 auto req = internal::io_request::make_write(_fd.get(), 0, buf.get(), buf_size, false);
73 return _io_queue.submit_io_write(default_priority_class(), buf_size, std::move(req), nullptr).then(
74 [this, buf = std::move(buf), buf_size] (size_t written) mutable {
75 if (written < buf_size) {
76 buf.trim_front(written);
77 return put(std::move(buf));
78 }
79 return make_ready_future();
80 });
81 }
82 future<> put(net::packet data) override {
83 return do_with(data.release(), [this] (std::vector<temporary_buffer<char>>& bufs) {
84 return do_for_each(bufs, [this] (temporary_buffer<char>& buf) {
85 return put(buf.share());
86 });
87 });
88 }
89 future<> close() override {
90 _fd.close();
91 return make_ready_future();
92 }
93 size_t buffer_size() const noexcept override {
94 return _buffer_size;
95 }
96 };
97 }
98
99 process::process(create_tag, pid_t pid, file_desc&& stdin, file_desc&& stdout, file_desc&& stderr)
100 : _pid(pid)
101 , _stdin(std::move(stdin))
102 , _stdout(std::move(stdout))
103 , _stderr(std::move(stderr)) {}
104
105 future<process::wait_status> process::wait() {
106 return engine().waitpid(_pid).then([] (int wstatus) -> wait_status {
107 if (WIFEXITED(wstatus)) {
108 return wait_exited{WEXITSTATUS(wstatus)};
109 } else {
110 assert(WIFSIGNALED(wstatus));
111 return wait_signaled{WTERMSIG(wstatus)};
112 }
113 });
114 }
115
116 void process::terminate() {
117 engine().kill(_pid, SIGTERM);
118 }
119
120 void process::kill() {
121 engine().kill(_pid, SIGKILL);
122 }
123
124 future<process> process::spawn(const std::filesystem::path& pathname,
125 spawn_parameters params) {
126 assert(!params.argv.empty());
127 return engine().spawn(pathname.native(), std::move(params.argv), std::move(params.env)).then_unpack(
128 [] (pid_t pid, file_desc stdin_pipe, file_desc stdout_pipe, file_desc stderr_pipe) {
129 return make_ready_future<process>(create_tag{}, pid, std::move(stdin_pipe), std::move(stdout_pipe), std::move(stderr_pipe));
130 });
131 }
132
133 future<process> process::spawn(const std::filesystem::path& pathname) {
134 return spawn(pathname, {{pathname.native()}, {}});
135 }
136
137 output_stream<char> process::stdin() {
138 return output_stream<char>(data_sink(pipe_data_sink_impl::from_fd(std::move(_stdin))));
139 }
140
141 input_stream<char> process::stdout() {
142 return input_stream<char>(data_source(pipe_data_source_impl::from_fd(std::move(_stdout))));
143 }
144
145 input_stream<char> process::stderr() {
146 return input_stream<char>(data_source(pipe_data_source_impl::from_fd(std::move(_stderr))));
147 }
148
149 }