2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2022 Kefu Chai ( tchaikov@gmail.com )
23 #include <seastar/core/fstream.hh>
24 #include <seastar/core/internal/buffer_allocator.hh>
25 #include <seastar/core/io_queue.hh>
26 #include <seastar/core/polymorphic_temporary_buffer.hh>
27 #include <seastar/core/reactor.hh>
28 #include <seastar/util/process.hh>
30 namespace seastar::experimental
{
33 class pipe_data_source_impl final
: public data_source_impl
{
34 static constexpr std::size_t buffer_size
= 8192;
35 struct buffer_allocator
: public internal::buffer_allocator
{
36 temporary_buffer
<char> allocate_buffer() override
{
37 return make_temporary_buffer
<char>(memory::malloc_allocator
, buffer_size
);
43 explicit pipe_data_source_impl(pollable_fd fd
)
44 : _fd(std::move(fd
)) {}
45 static auto from_fd(file_desc
&& fd
) {
46 return std::make_unique
<pipe_data_source_impl
>(pollable_fd(std::move(fd
)));
48 future
<temporary_buffer
<char>> get() override
{
49 return _fd
.read_some(&_ba
);
51 future
<> close() override
{
53 return make_ready_future();
57 class pipe_data_sink_impl final
: public data_sink_impl
{
60 const size_t _buffer_size
;
62 explicit pipe_data_sink_impl(file_desc
&& fd
)
64 , _io_queue(engine().get_io_queue(0))
65 , _buffer_size(file_input_stream_options
{}.buffer_size
) {}
66 static auto from_fd(file_desc
&& fd
) {
67 return std::make_unique
<pipe_data_sink_impl
>(std::move(fd
));
69 using data_sink_impl::put
;
70 future
<> put(temporary_buffer
<char> buf
) override
{
71 size_t buf_size
= buf
.size();
72 auto req
= internal::io_request::make_write(_fd
.get(), 0, buf
.get(), buf_size
, false);
73 return _io_queue
.submit_io_write(default_priority_class(), buf_size
, std::move(req
), nullptr).then(
74 [this, buf
= std::move(buf
), buf_size
] (size_t written
) mutable {
75 if (written
< buf_size
) {
76 buf
.trim_front(written
);
77 return put(std::move(buf
));
79 return make_ready_future();
82 future
<> put(net::packet data
) override
{
83 return do_with(data
.release(), [this] (std::vector
<temporary_buffer
<char>>& bufs
) {
84 return do_for_each(bufs
, [this] (temporary_buffer
<char>& buf
) {
85 return put(buf
.share());
89 future
<> close() override
{
91 return make_ready_future();
93 size_t buffer_size() const noexcept override
{
99 process::process(create_tag
, pid_t pid
, file_desc
&& stdin
, file_desc
&& stdout
, file_desc
&& stderr
)
101 , _stdin(std::move(stdin
))
102 , _stdout(std::move(stdout
))
103 , _stderr(std::move(stderr
)) {}
105 future
<process::wait_status
> process::wait() {
106 return engine().waitpid(_pid
).then([] (int wstatus
) -> wait_status
{
107 if (WIFEXITED(wstatus
)) {
108 return wait_exited
{WEXITSTATUS(wstatus
)};
110 assert(WIFSIGNALED(wstatus
));
111 return wait_signaled
{WTERMSIG(wstatus
)};
116 void process::terminate() {
117 engine().kill(_pid
, SIGTERM
);
120 void process::kill() {
121 engine().kill(_pid
, SIGKILL
);
124 future
<process
> process::spawn(const std::filesystem::path
& pathname
,
125 spawn_parameters params
) {
126 assert(!params
.argv
.empty());
127 return engine().spawn(pathname
.native(), std::move(params
.argv
), std::move(params
.env
)).then_unpack(
128 [] (pid_t pid
, file_desc stdin_pipe
, file_desc stdout_pipe
, file_desc stderr_pipe
) {
129 return make_ready_future
<process
>(create_tag
{}, pid
, std::move(stdin_pipe
), std::move(stdout_pipe
), std::move(stderr_pipe
));
133 future
<process
> process::spawn(const std::filesystem::path
& pathname
) {
134 return spawn(pathname
, {{pathname
.native()}, {}});
137 output_stream
<char> process::stdin() {
138 return output_stream
<char>(data_sink(pipe_data_sink_impl::from_fd(std::move(_stdin
))));
141 input_stream
<char> process::stdout() {
142 return input_stream
<char>(data_source(pipe_data_source_impl::from_fd(std::move(_stdout
))));
145 input_stream
<char> process::stderr() {
146 return input_stream
<char>(data_source(pipe_data_source_impl::from_fd(std::move(_stderr
))));