]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2022 Kefu Chai ( tchaikov@gmail.com ) | |
21 | */ | |
22 | ||
23 | #include <seastar/core/fstream.hh> | |
24 | #include <seastar/core/internal/buffer_allocator.hh> | |
25 | #include <seastar/core/io_queue.hh> | |
26 | #include <seastar/core/polymorphic_temporary_buffer.hh> | |
27 | #include <seastar/core/reactor.hh> | |
28 | #include <seastar/util/process.hh> | |
29 | ||
30 | namespace seastar::experimental { | |
31 | ||
32 | namespace { | |
33 | class pipe_data_source_impl final : public data_source_impl { | |
34 | static constexpr std::size_t buffer_size = 8192; | |
35 | struct buffer_allocator : public internal::buffer_allocator { | |
36 | temporary_buffer<char> allocate_buffer() override { | |
37 | return make_temporary_buffer<char>(memory::malloc_allocator, buffer_size); | |
38 | } | |
39 | }; | |
40 | pollable_fd _fd; | |
41 | buffer_allocator _ba; | |
42 | public: | |
43 | explicit pipe_data_source_impl(pollable_fd fd) | |
44 | : _fd(std::move(fd)) {} | |
45 | static auto from_fd(file_desc&& fd) { | |
46 | return std::make_unique<pipe_data_source_impl>(pollable_fd(std::move(fd))); | |
47 | } | |
48 | future<temporary_buffer<char>> get() override { | |
49 | return _fd.read_some(&_ba); | |
50 | } | |
51 | future<> close() override { | |
52 | _fd.close(); | |
53 | return make_ready_future(); | |
54 | } | |
55 | }; | |
56 | ||
57 | class pipe_data_sink_impl final : public data_sink_impl { | |
58 | file_desc _fd; | |
59 | io_queue& _io_queue; | |
60 | const size_t _buffer_size; | |
61 | public: | |
62 | explicit pipe_data_sink_impl(file_desc&& fd) | |
63 | : _fd(std::move(fd)) | |
64 | , _io_queue(engine().get_io_queue(0)) | |
65 | , _buffer_size(file_input_stream_options{}.buffer_size) {} | |
66 | static auto from_fd(file_desc&& fd) { | |
67 | return std::make_unique<pipe_data_sink_impl>(std::move(fd)); | |
68 | } | |
69 | using data_sink_impl::put; | |
70 | future<> put(temporary_buffer<char> buf) override { | |
71 | size_t buf_size = buf.size(); | |
72 | auto req = internal::io_request::make_write(_fd.get(), 0, buf.get(), buf_size, false); | |
73 | return _io_queue.submit_io_write(default_priority_class(), buf_size, std::move(req), nullptr).then( | |
74 | [this, buf = std::move(buf), buf_size] (size_t written) mutable { | |
75 | if (written < buf_size) { | |
76 | buf.trim_front(written); | |
77 | return put(std::move(buf)); | |
78 | } | |
79 | return make_ready_future(); | |
80 | }); | |
81 | } | |
82 | future<> put(net::packet data) override { | |
83 | return do_with(data.release(), [this] (std::vector<temporary_buffer<char>>& bufs) { | |
84 | return do_for_each(bufs, [this] (temporary_buffer<char>& buf) { | |
85 | return put(buf.share()); | |
86 | }); | |
87 | }); | |
88 | } | |
89 | future<> close() override { | |
90 | _fd.close(); | |
91 | return make_ready_future(); | |
92 | } | |
93 | size_t buffer_size() const noexcept override { | |
94 | return _buffer_size; | |
95 | } | |
96 | }; | |
97 | } | |
98 | ||
99 | process::process(create_tag, pid_t pid, file_desc&& stdin, file_desc&& stdout, file_desc&& stderr) | |
100 | : _pid(pid) | |
101 | , _stdin(std::move(stdin)) | |
102 | , _stdout(std::move(stdout)) | |
103 | , _stderr(std::move(stderr)) {} | |
104 | ||
105 | future<process::wait_status> process::wait() { | |
106 | return engine().waitpid(_pid).then([] (int wstatus) -> wait_status { | |
107 | if (WIFEXITED(wstatus)) { | |
108 | return wait_exited{WEXITSTATUS(wstatus)}; | |
109 | } else { | |
110 | assert(WIFSIGNALED(wstatus)); | |
111 | return wait_signaled{WTERMSIG(wstatus)}; | |
112 | } | |
113 | }); | |
114 | } | |
115 | ||
116 | void process::terminate() { | |
117 | engine().kill(_pid, SIGTERM); | |
118 | } | |
119 | ||
120 | void process::kill() { | |
121 | engine().kill(_pid, SIGKILL); | |
122 | } | |
123 | ||
124 | future<process> process::spawn(const std::filesystem::path& pathname, | |
125 | spawn_parameters params) { | |
126 | assert(!params.argv.empty()); | |
127 | return engine().spawn(pathname.native(), std::move(params.argv), std::move(params.env)).then_unpack( | |
128 | [] (pid_t pid, file_desc stdin_pipe, file_desc stdout_pipe, file_desc stderr_pipe) { | |
129 | return make_ready_future<process>(create_tag{}, pid, std::move(stdin_pipe), std::move(stdout_pipe), std::move(stderr_pipe)); | |
130 | }); | |
131 | } | |
132 | ||
133 | future<process> process::spawn(const std::filesystem::path& pathname) { | |
134 | return spawn(pathname, {{pathname.native()}, {}}); | |
135 | } | |
136 | ||
137 | output_stream<char> process::stdin() { | |
138 | return output_stream<char>(data_sink(pipe_data_sink_impl::from_fd(std::move(_stdin)))); | |
139 | } | |
140 | ||
141 | input_stream<char> process::stdout() { | |
142 | return input_stream<char>(data_source(pipe_data_source_impl::from_fd(std::move(_stdout)))); | |
143 | } | |
144 | ||
145 | input_stream<char> process::stderr() { | |
146 | return input_stream<char>(data_source(pipe_data_source_impl::from_fd(std::move(_stderr)))); | |
147 | } | |
148 | ||
149 | } |