1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/reactor.hh>
8 #include <seastar/core/sharded.hh>
9 #include <seastar/net/packet.hh>
11 #include "include/buffer.h"
13 #include "crimson/common/log.h"
17 #ifdef UNIT_TESTS_BUILT
18 #include "Interceptor.h"
21 namespace crimson::net
{
24 using SocketRef
= std::unique_ptr
<Socket
>;
28 struct construct_tag
{};
31 // if acceptor side, peer is using a different port (ephemeral_port)
32 // if connector side, I'm using a different port (ephemeral_port)
38 Socket(seastar::connected_socket
&& _socket
, side_t _side
, uint16_t e_port
, construct_tag
)
39 : sid
{seastar::this_shard_id()},
40 socket(std::move(_socket
)),
42 // the default buffer size 8192 is too small that may impact our write
43 // performance. see seastar::net::connected_socket::output()
44 out(socket
.output(65536)),
46 ephemeral_port(e_port
) {}
54 Socket(Socket
&& o
) = delete;
56 static seastar::future
<SocketRef
>
57 connect(const entity_addr_t
& peer_addr
) {
58 return seastar::connect(peer_addr
.in4_addr()
59 ).then([] (seastar::connected_socket socket
) {
60 return std::make_unique
<Socket
>(
61 std::move(socket
), side_t::connector
, 0, construct_tag
{});
65 /// read the requested number of bytes into a bufferlist
66 seastar::future
<bufferlist
> read(size_t bytes
);
67 using tmp_buf
= seastar::temporary_buffer
<char>;
68 using packet
= seastar::net::packet
;
69 seastar::future
<tmp_buf
> read_exactly(size_t bytes
);
71 seastar::future
<> write(packet
&& buf
) {
72 #ifdef UNIT_TESTS_BUILT
73 return try_trap_pre(next_trap_write
).then([buf
= std::move(buf
), this] () mutable {
75 return out
.write(std::move(buf
));
76 #ifdef UNIT_TESTS_BUILT
78 return try_trap_post(next_trap_write
);
82 seastar::future
<> flush() {
85 seastar::future
<> write_flush(packet
&& buf
) {
86 #ifdef UNIT_TESTS_BUILT
87 return try_trap_pre(next_trap_write
).then([buf
= std::move(buf
), this] () mutable {
89 return out
.write(std::move(buf
)).then([this] { return out
.flush(); });
90 #ifdef UNIT_TESTS_BUILT
92 return try_trap_post(next_trap_write
);
97 // preemptively disable further reads or writes, can only be shutdown once.
100 /// Socket can only be closed once.
101 seastar::future
<> close();
103 // shutdown input_stream only, for tests
104 void force_shutdown_in() {
105 socket
.shutdown_input();
108 // shutdown output_stream only, for tests
109 void force_shutdown_out() {
110 socket
.shutdown_output();
113 side_t
get_side() const {
117 uint16_t get_ephemeral_port() const {
118 return ephemeral_port
;
121 // learn my ephemeral_port as connector.
122 // unfortunately, there's no way to identify which port I'm using as
123 // connector with current seastar interface.
124 void learn_ephemeral_port_as_connector(uint16_t port
) {
125 assert(side
== side_t::connector
&&
126 (ephemeral_port
== 0 || ephemeral_port
== port
));
127 ephemeral_port
= port
;
131 const seastar::shard_id sid
;
132 seastar::connected_socket socket
;
133 seastar::input_stream
<char> in
;
134 seastar::output_stream
<char> out
;
136 uint16_t ephemeral_port
;
142 /// buffer state for read()
148 #ifdef UNIT_TESTS_BUILT
150 void set_trap(bp_type_t type
, bp_action_t action
, socket_blocker
* blocker_
);
153 bp_action_t next_trap_read
= bp_action_t::CONTINUE
;
154 bp_action_t next_trap_write
= bp_action_t::CONTINUE
;
155 socket_blocker
* blocker
= nullptr;
156 seastar::future
<> try_trap_pre(bp_action_t
& trap
);
157 seastar::future
<> try_trap_post(bp_action_t
& trap
);
160 friend class FixedCPUServerSocket
;
163 class FixedCPUServerSocket
164 : public seastar::peering_sharded_service
<FixedCPUServerSocket
> {
165 const seastar::shard_id cpu
;
167 std::optional
<seastar::server_socket
> listener
;
168 seastar::gate shutdown_gate
;
170 using sharded_service_t
= seastar::sharded
<FixedCPUServerSocket
>;
171 std::unique_ptr
<sharded_service_t
> service
;
173 struct construct_tag
{};
175 static seastar::logger
& logger() {
176 return crimson::get_logger(ceph_subsys_ms
);
179 seastar::future
<> reset() {
180 return container().invoke_on_all([] (auto& ss
) {
181 assert(ss
.shutdown_gate
.is_closed());
182 ss
.shutdown_gate
= seastar::gate();
183 ss
.addr
= entity_addr_t();
189 FixedCPUServerSocket(seastar::shard_id cpu
, construct_tag
) : cpu
{cpu
} {}
190 ~FixedCPUServerSocket() {
192 // detect whether user have called destroy() properly
193 ceph_assert(!service
);
196 FixedCPUServerSocket(FixedCPUServerSocket
&&) = delete;
197 FixedCPUServerSocket(const FixedCPUServerSocket
&) = delete;
198 FixedCPUServerSocket
& operator=(const FixedCPUServerSocket
&) = delete;
200 using listen_ertr
= crimson::errorator
<
201 crimson::ct_error::address_in_use
// The address is already bound
203 listen_ertr::future
<> listen(entity_addr_t addr
);
205 // fn_accept should be a nothrow function of type
206 // seastar::future<>(SocketRef, entity_addr_t)
207 template <typename Func
>
208 seastar::future
<> accept(Func
&& fn_accept
) {
209 assert(seastar::this_shard_id() == cpu
);
210 logger().trace("FixedCPUServerSocket({})::accept()...", addr
);
211 return container().invoke_on_all(
212 [fn_accept
= std::move(fn_accept
)] (auto& ss
) mutable {
215 // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
216 // so ignore the returned future
217 std::ignore
= seastar::with_gate(ss
.shutdown_gate
,
218 [&ss
, fn_accept
= std::move(fn_accept
)] () mutable {
219 return seastar::keep_doing([&ss
, fn_accept
= std::move(fn_accept
)] () mutable {
220 return ss
.listener
->accept().then(
221 [&ss
, fn_accept
= std::move(fn_accept
)]
222 (seastar::accept_result accept_result
) mutable {
223 // assert seastar::listen_options::set_fixed_cpu() works
224 assert(seastar::this_shard_id() == ss
.cpu
);
225 auto [socket
, paddr
] = std::move(accept_result
);
226 entity_addr_t peer_addr
;
227 peer_addr
.set_sockaddr(&paddr
.as_posix_sockaddr());
228 peer_addr
.set_type(entity_addr_t::TYPE_ANY
);
229 SocketRef _socket
= std::make_unique
<Socket
>(
230 std::move(socket
), Socket::side_t::acceptor
,
231 peer_addr
.get_port(), Socket::construct_tag
{});
232 std::ignore
= seastar::with_gate(ss
.shutdown_gate
,
233 [socket
= std::move(_socket
), peer_addr
,
234 &ss
, fn_accept
= std::move(fn_accept
)] () mutable {
235 logger().trace("FixedCPUServerSocket({})::accept(): "
236 "accepted peer {}", ss
.addr
, peer_addr
);
237 return fn_accept(std::move(socket
), peer_addr
238 ).handle_exception([&ss
, peer_addr
] (auto eptr
) {
239 logger().error("FixedCPUServerSocket({})::accept(): "
240 "fn_accept(s, {}) got unexpected exception {}",
241 ss
.addr
, peer_addr
, eptr
);
246 }).handle_exception_type([&ss
] (const std::system_error
& e
) {
247 if (e
.code() == std::errc::connection_aborted
||
248 e
.code() == std::errc::invalid_argument
) {
249 logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
254 }).handle_exception([&ss
] (auto eptr
) {
255 logger().error("FixedCPUServerSocket({})::accept(): "
256 "got unexpected exception {}", ss
.addr
, eptr
);
263 seastar::future
<> shutdown();
264 seastar::future
<> destroy();
265 static seastar::future
<FixedCPUServerSocket
*> create();
268 } // namespace crimson::net