1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/reactor.hh>
8 #include <seastar/core/sharded.hh>
9 #include <seastar/net/packet.hh>
11 #include "include/buffer.h"
13 #include "crimson/common/log.h"
17 #ifdef UNIT_TESTS_BUILT
18 #include "Interceptor.h"
21 namespace crimson::net
{
24 using SocketRef
= std::unique_ptr
<Socket
>;
28 struct construct_tag
{};
31 // if acceptor side, peer is using a different port (ephemeral_port)
32 // if connector side, I'm using a different port (ephemeral_port)
38 Socket(seastar::connected_socket
&& _socket
, side_t _side
, uint16_t e_port
, construct_tag
)
39 : sid
{seastar::this_shard_id()},
40 socket(std::move(_socket
)),
42 // the default buffer size 8192 is too small that may impact our write
43 // performance. see seastar::net::connected_socket::output()
44 out(socket
.output(65536)),
46 ephemeral_port(e_port
) {}
54 Socket(Socket
&& o
) = delete;
56 static seastar::future
<SocketRef
>
57 connect(const entity_addr_t
& peer_addr
) {
58 return seastar::connect(peer_addr
.in4_addr()
59 ).then([] (seastar::connected_socket socket
) {
60 return std::make_unique
<Socket
>(
61 std::move(socket
), side_t::connector
, 0, construct_tag
{});
65 /// read the requested number of bytes into a bufferlist
66 seastar::future
<bufferlist
> read(size_t bytes
);
67 using tmp_buf
= seastar::temporary_buffer
<char>;
68 using packet
= seastar::net::packet
;
69 seastar::future
<tmp_buf
> read_exactly(size_t bytes
);
71 seastar::future
<> write(packet
&& buf
) {
72 #ifdef UNIT_TESTS_BUILT
73 return try_trap_pre(next_trap_write
).then([buf
= std::move(buf
), this] () mutable {
75 return out
.write(std::move(buf
));
76 #ifdef UNIT_TESTS_BUILT
78 return try_trap_post(next_trap_write
);
82 seastar::future
<> flush() {
85 seastar::future
<> write_flush(packet
&& buf
) {
86 #ifdef UNIT_TESTS_BUILT
87 return try_trap_pre(next_trap_write
).then([buf
= std::move(buf
), this] () mutable {
89 return out
.write(std::move(buf
)).then([this] { return out
.flush(); });
90 #ifdef UNIT_TESTS_BUILT
92 return try_trap_post(next_trap_write
);
97 // preemptively disable further reads or writes, can only be shutdown once.
100 /// Socket can only be closed once.
101 seastar::future
<> close();
103 // shutdown input_stream only, for tests
104 void force_shutdown_in() {
105 socket
.shutdown_input();
108 // shutdown output_stream only, for tests
109 void force_shutdown_out() {
110 socket
.shutdown_output();
113 side_t
get_side() const {
117 uint16_t get_ephemeral_port() const {
118 return ephemeral_port
;
121 // learn my ephemeral_port as connector.
122 // unfortunately, there's no way to identify which port I'm using as
123 // connector with current seastar interface.
124 void learn_ephemeral_port_as_connector(uint16_t port
) {
125 assert(side
== side_t::connector
&&
126 (ephemeral_port
== 0 || ephemeral_port
== port
));
127 ephemeral_port
= port
;
130 seastar::socket_address
get_local_address() const {
131 return socket
.local_address();
135 const seastar::shard_id sid
;
136 seastar::connected_socket socket
;
137 seastar::input_stream
<char> in
;
138 seastar::output_stream
<char> out
;
140 uint16_t ephemeral_port
;
146 /// buffer state for read()
152 #ifdef UNIT_TESTS_BUILT
154 void set_trap(bp_type_t type
, bp_action_t action
, socket_blocker
* blocker_
);
157 bp_action_t next_trap_read
= bp_action_t::CONTINUE
;
158 bp_action_t next_trap_write
= bp_action_t::CONTINUE
;
159 socket_blocker
* blocker
= nullptr;
160 seastar::future
<> try_trap_pre(bp_action_t
& trap
);
161 seastar::future
<> try_trap_post(bp_action_t
& trap
);
164 friend class FixedCPUServerSocket
;
167 using listen_ertr
= crimson::errorator
<
168 crimson::ct_error::address_in_use
, // The address is already bound
169 crimson::ct_error::address_not_available
// https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
172 class FixedCPUServerSocket
173 : public seastar::peering_sharded_service
<FixedCPUServerSocket
> {
174 const seastar::shard_id cpu
;
176 std::optional
<seastar::server_socket
> listener
;
177 seastar::gate shutdown_gate
;
179 using sharded_service_t
= seastar::sharded
<FixedCPUServerSocket
>;
180 std::unique_ptr
<sharded_service_t
> service
;
182 struct construct_tag
{};
184 static seastar::logger
& logger() {
185 return crimson::get_logger(ceph_subsys_ms
);
188 seastar::future
<> reset() {
189 return container().invoke_on_all([] (auto& ss
) {
190 assert(ss
.shutdown_gate
.is_closed());
191 ss
.addr
= entity_addr_t();
197 FixedCPUServerSocket(seastar::shard_id cpu
, construct_tag
) : cpu
{cpu
} {}
198 ~FixedCPUServerSocket() {
200 // detect whether user have called destroy() properly
201 ceph_assert(!service
);
204 FixedCPUServerSocket(FixedCPUServerSocket
&&) = delete;
205 FixedCPUServerSocket(const FixedCPUServerSocket
&) = delete;
206 FixedCPUServerSocket
& operator=(const FixedCPUServerSocket
&) = delete;
208 listen_ertr::future
<> listen(entity_addr_t addr
);
210 // fn_accept should be a nothrow function of type
211 // seastar::future<>(SocketRef, entity_addr_t)
212 template <typename Func
>
213 seastar::future
<> accept(Func
&& fn_accept
) {
214 assert(seastar::this_shard_id() == cpu
);
215 logger().trace("FixedCPUServerSocket({})::accept()...", addr
);
216 return container().invoke_on_all(
217 [fn_accept
= std::move(fn_accept
)] (auto& ss
) mutable {
220 // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
221 // so ignore the returned future
222 std::ignore
= seastar::with_gate(ss
.shutdown_gate
,
223 [&ss
, fn_accept
= std::move(fn_accept
)] () mutable {
224 return seastar::keep_doing([&ss
, fn_accept
= std::move(fn_accept
)] () mutable {
225 return ss
.listener
->accept().then(
226 [&ss
, fn_accept
= std::move(fn_accept
)]
227 (seastar::accept_result accept_result
) mutable {
228 // assert seastar::listen_options::set_fixed_cpu() works
229 assert(seastar::this_shard_id() == ss
.cpu
);
230 auto [socket
, paddr
] = std::move(accept_result
);
231 entity_addr_t peer_addr
;
232 peer_addr
.set_sockaddr(&paddr
.as_posix_sockaddr());
233 peer_addr
.set_type(ss
.addr
.get_type());
234 SocketRef _socket
= std::make_unique
<Socket
>(
235 std::move(socket
), Socket::side_t::acceptor
,
236 peer_addr
.get_port(), Socket::construct_tag
{});
237 std::ignore
= seastar::with_gate(ss
.shutdown_gate
,
238 [socket
= std::move(_socket
), peer_addr
,
239 &ss
, fn_accept
= std::move(fn_accept
)] () mutable {
240 logger().trace("FixedCPUServerSocket({})::accept(): "
241 "accepted peer {}", ss
.addr
, peer_addr
);
242 return fn_accept(std::move(socket
), peer_addr
243 ).handle_exception([&ss
, peer_addr
] (auto eptr
) {
244 logger().error("FixedCPUServerSocket({})::accept(): "
245 "fn_accept(s, {}) got unexpected exception {}",
246 ss
.addr
, peer_addr
, eptr
);
251 }).handle_exception_type([&ss
] (const std::system_error
& e
) {
252 if (e
.code() == std::errc::connection_aborted
||
253 e
.code() == std::errc::invalid_argument
) {
254 logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
259 }).handle_exception([&ss
] (auto eptr
) {
260 logger().error("FixedCPUServerSocket({})::accept(): "
261 "got unexpected exception {}", ss
.addr
, eptr
);
268 seastar::future
<> shutdown();
269 seastar::future
<> destroy();
270 static seastar::future
<FixedCPUServerSocket
*> create();
273 } // namespace crimson::net