1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/reactor.hh>
8 #include <seastar/core/sharded.hh>
9 #include <seastar/net/packet.hh>
11 #include "include/buffer.h"
13 #include "crimson/common/log.h"
17 #ifdef UNIT_TESTS_BUILT
18 #include "Interceptor.h"
21 namespace crimson::net
{
24 using SocketRef
= std::unique_ptr
<Socket
>;
28 struct construct_tag
{};
31 // if acceptor side, peer is using a different port (ephemeral_port)
32 // if connector side, I'm using a different port (ephemeral_port)
38 Socket(seastar::connected_socket
&& _socket
, side_t _side
, uint16_t e_port
, construct_tag
)
39 : sid
{seastar::this_shard_id()},
40 socket(std::move(_socket
)),
42 // the default buffer size 8192 is too small that may impact our write
43 // performance. see seastar::net::connected_socket::output()
44 out(socket
.output(65536)),
45 socket_is_shutdown(false),
47 ephemeral_port(e_port
) {}
55 Socket(Socket
&& o
) = delete;
57 static seastar::future
<SocketRef
>
58 connect(const entity_addr_t
& peer_addr
) {
62 return seastar::connect(peer_addr
.in4_addr());
63 }).then([] (seastar::connected_socket socket
) {
64 return std::make_unique
<Socket
>(
65 std::move(socket
), side_t::connector
, 0, construct_tag
{});
69 /// read the requested number of bytes into a bufferlist
70 seastar::future
<bufferlist
> read(size_t bytes
);
71 using tmp_buf
= seastar::temporary_buffer
<char>;
72 using packet
= seastar::net::packet
;
73 seastar::future
<tmp_buf
> read_exactly(size_t bytes
);
75 seastar::future
<> write(packet
&& buf
) {
76 #ifdef UNIT_TESTS_BUILT
77 return try_trap_pre(next_trap_write
78 ).then([buf
= std::move(buf
), this] () mutable {
82 ).then([buf
= std::move(buf
), this] () mutable {
83 return out
.write(std::move(buf
));
85 #ifdef UNIT_TESTS_BUILT
87 return try_trap_post(next_trap_write
);
91 seastar::future
<> flush() {
93 return inject_delay().then([this] {
97 seastar::future
<> write_flush(packet
&& buf
) {
98 #ifdef UNIT_TESTS_BUILT
99 return try_trap_pre(next_trap_write
).then([buf
= std::move(buf
), this] () mutable {
103 ).then([buf
= std::move(buf
), this] () mutable {
104 return out
.write(std::move(buf
)).then([this] { return out
.flush(); });
106 #ifdef UNIT_TESTS_BUILT
108 return try_trap_post(next_trap_write
);
113 bool is_shutdown() const {
114 return socket_is_shutdown
;
117 // preemptively disable further reads or writes, can only be shutdown once.
120 /// Socket can only be closed once.
121 seastar::future
<> close();
123 static seastar::future
<> inject_delay();
125 static void inject_failure();
127 // shutdown for tests
128 void force_shutdown() {
129 socket
.shutdown_input();
130 socket
.shutdown_output();
133 // shutdown input_stream only, for tests
134 void force_shutdown_in() {
135 socket
.shutdown_input();
138 // shutdown output_stream only, for tests
139 void force_shutdown_out() {
140 socket
.shutdown_output();
143 side_t
get_side() const {
147 uint16_t get_ephemeral_port() const {
148 return ephemeral_port
;
151 // learn my ephemeral_port as connector.
152 // unfortunately, there's no way to identify which port I'm using as
153 // connector with current seastar interface.
154 void learn_ephemeral_port_as_connector(uint16_t port
) {
155 assert(side
== side_t::connector
&&
156 (ephemeral_port
== 0 || ephemeral_port
== port
));
157 ephemeral_port
= port
;
160 seastar::socket_address
get_local_address() const {
161 return socket
.local_address();
165 const seastar::shard_id sid
;
166 seastar::connected_socket socket
;
167 seastar::input_stream
<char> in
;
168 seastar::output_stream
<char> out
;
169 bool socket_is_shutdown
;
171 uint16_t ephemeral_port
;
177 /// buffer state for read()
183 #ifdef UNIT_TESTS_BUILT
185 void set_trap(bp_type_t type
, bp_action_t action
, socket_blocker
* blocker_
);
188 bp_action_t next_trap_read
= bp_action_t::CONTINUE
;
189 bp_action_t next_trap_write
= bp_action_t::CONTINUE
;
190 socket_blocker
* blocker
= nullptr;
191 seastar::future
<> try_trap_pre(bp_action_t
& trap
);
192 seastar::future
<> try_trap_post(bp_action_t
& trap
);
195 friend class FixedCPUServerSocket
;
198 using listen_ertr
= crimson::errorator
<
199 crimson::ct_error::address_in_use
, // The address is already bound
200 crimson::ct_error::address_not_available
// https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
203 class FixedCPUServerSocket
204 : public seastar::peering_sharded_service
<FixedCPUServerSocket
> {
205 const seastar::shard_id cpu
;
207 std::optional
<seastar::server_socket
> listener
;
208 seastar::gate shutdown_gate
;
210 using sharded_service_t
= seastar::sharded
<FixedCPUServerSocket
>;
211 std::unique_ptr
<sharded_service_t
> service
;
213 struct construct_tag
{};
215 static seastar::logger
& logger() {
216 return crimson::get_logger(ceph_subsys_ms
);
219 seastar::future
<> reset() {
220 return container().invoke_on_all([] (auto& ss
) {
221 assert(ss
.shutdown_gate
.is_closed());
222 ss
.addr
= entity_addr_t();
228 FixedCPUServerSocket(seastar::shard_id cpu
, construct_tag
) : cpu
{cpu
} {}
229 ~FixedCPUServerSocket() {
231 // detect whether user have called destroy() properly
232 ceph_assert(!service
);
235 FixedCPUServerSocket(FixedCPUServerSocket
&&) = delete;
236 FixedCPUServerSocket(const FixedCPUServerSocket
&) = delete;
237 FixedCPUServerSocket
& operator=(const FixedCPUServerSocket
&) = delete;
239 listen_ertr::future
<> listen(entity_addr_t addr
);
241 // fn_accept should be a nothrow function of type
242 // seastar::future<>(SocketRef, entity_addr_t)
243 template <typename Func
>
244 seastar::future
<> accept(Func
&& fn_accept
) {
245 assert(seastar::this_shard_id() == cpu
);
246 logger().trace("FixedCPUServerSocket({})::accept()...", addr
);
247 return container().invoke_on_all(
248 [fn_accept
= std::move(fn_accept
)] (auto& ss
) mutable {
251 // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
252 // so ignore the returned future
253 std::ignore
= seastar::with_gate(ss
.shutdown_gate
,
254 [&ss
, fn_accept
= std::move(fn_accept
)] () mutable {
255 return seastar::keep_doing([&ss
, fn_accept
= std::move(fn_accept
)] () mutable {
256 return ss
.listener
->accept().then(
257 [&ss
, fn_accept
= std::move(fn_accept
)]
258 (seastar::accept_result accept_result
) mutable {
259 // assert seastar::listen_options::set_fixed_cpu() works
260 assert(seastar::this_shard_id() == ss
.cpu
);
261 auto [socket
, paddr
] = std::move(accept_result
);
262 entity_addr_t peer_addr
;
263 peer_addr
.set_sockaddr(&paddr
.as_posix_sockaddr());
264 peer_addr
.set_type(ss
.addr
.get_type());
265 SocketRef _socket
= std::make_unique
<Socket
>(
266 std::move(socket
), Socket::side_t::acceptor
,
267 peer_addr
.get_port(), Socket::construct_tag
{});
268 std::ignore
= seastar::with_gate(ss
.shutdown_gate
,
269 [socket
= std::move(_socket
), peer_addr
,
270 &ss
, fn_accept
= std::move(fn_accept
)] () mutable {
271 logger().trace("FixedCPUServerSocket({})::accept(): "
272 "accepted peer {}", ss
.addr
, peer_addr
);
273 return fn_accept(std::move(socket
), peer_addr
274 ).handle_exception([&ss
, peer_addr
] (auto eptr
) {
275 logger().error("FixedCPUServerSocket({})::accept(): "
276 "fn_accept(s, {}) got unexpected exception {}",
277 ss
.addr
, peer_addr
, eptr
);
282 }).handle_exception_type([&ss
] (const std::system_error
& e
) {
283 if (e
.code() == std::errc::connection_aborted
||
284 e
.code() == std::errc::invalid_argument
) {
285 logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
290 }).handle_exception([&ss
] (auto eptr
) {
291 logger().error("FixedCPUServerSocket({})::accept(): "
292 "got unexpected exception {}", ss
.addr
, eptr
);
299 seastar::future
<> shutdown();
300 seastar::future
<> destroy();
301 static seastar::future
<FixedCPUServerSocket
*> create();
304 } // namespace crimson::net