1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/reactor.hh>
8 #include <seastar/core/sharded.hh>
9 #include <seastar/net/packet.hh>
11 #include "include/buffer.h"
12 #include "msg/msg_types.h"
14 #include "crimson/common/log.h"
17 #ifdef UNIT_TESTS_BUILT
18 #include "Interceptor.h"
21 namespace crimson::net
{
24 using SocketRef
= std::unique_ptr
<Socket
>;
28 const seastar::shard_id sid
;
29 seastar::connected_socket socket
;
30 seastar::input_stream
<char> in
;
31 seastar::output_stream
<char> out
;
37 /// buffer state for read()
43 struct construct_tag
{};
46 Socket(seastar::connected_socket
&& _socket
, construct_tag
)
47 : sid
{seastar::engine().cpu_id()},
48 socket(std::move(_socket
)),
50 // the default buffer size 8192 is too small that may impact our write
51 // performance. see seastar::net::connected_socket::output()
52 out(socket
.output(65536)) {}
60 Socket(Socket
&& o
) = delete;
62 static seastar::future
<SocketRef
>
63 connect(const entity_addr_t
& peer_addr
) {
64 return seastar::connect(peer_addr
.in4_addr()
65 ).then([] (seastar::connected_socket socket
) {
66 return std::make_unique
<Socket
>(std::move(socket
), construct_tag
{});
70 /// read the requested number of bytes into a bufferlist
71 seastar::future
<bufferlist
> read(size_t bytes
);
72 using tmp_buf
= seastar::temporary_buffer
<char>;
73 using packet
= seastar::net::packet
;
74 seastar::future
<tmp_buf
> read_exactly(size_t bytes
);
76 seastar::future
<> write(packet
&& buf
) {
77 #ifdef UNIT_TESTS_BUILT
78 return try_trap_pre(next_trap_write
).then([buf
= std::move(buf
), this] () mutable {
80 return out
.write(std::move(buf
));
81 #ifdef UNIT_TESTS_BUILT
83 return try_trap_post(next_trap_write
);
87 seastar::future
<> flush() {
90 seastar::future
<> write_flush(packet
&& buf
) {
91 #ifdef UNIT_TESTS_BUILT
92 return try_trap_pre(next_trap_write
).then([buf
= std::move(buf
), this] () mutable {
94 return out
.write(std::move(buf
)).then([this] { return out
.flush(); });
95 #ifdef UNIT_TESTS_BUILT
97 return try_trap_post(next_trap_write
);
102 // preemptively disable further reads or writes, can only be shutdown once.
105 /// Socket can only be closed once.
106 seastar::future
<> close();
108 // shutdown input_stream only, for tests
109 void force_shutdown_in() {
110 socket
.shutdown_input();
113 // shutdown output_stream only, for tests
114 void force_shutdown_out() {
115 socket
.shutdown_output();
118 #ifdef UNIT_TESTS_BUILT
120 bp_action_t next_trap_read
= bp_action_t::CONTINUE
;
121 bp_action_t next_trap_write
= bp_action_t::CONTINUE
;
122 socket_blocker
* blocker
= nullptr;
123 seastar::future
<> try_trap_pre(bp_action_t
& trap
);
124 seastar::future
<> try_trap_post(bp_action_t
& trap
);
127 void set_trap(bp_type_t type
, bp_action_t action
, socket_blocker
* blocker_
);
130 friend class FixedCPUServerSocket
;
133 class FixedCPUServerSocket
134 : public seastar::peering_sharded_service
<FixedCPUServerSocket
> {
135 const seastar::shard_id cpu
;
137 std::optional
<seastar::server_socket
> listener
;
138 seastar::gate shutdown_gate
;
140 using sharded_service_t
= seastar::sharded
<FixedCPUServerSocket
>;
141 std::unique_ptr
<sharded_service_t
> service
;
143 struct construct_tag
{};
145 static seastar::logger
& logger() {
146 return crimson::get_logger(ceph_subsys_ms
);
149 seastar::future
<> reset() {
150 return container().invoke_on_all([] (auto& ss
) {
151 assert(ss
.shutdown_gate
.is_closed());
152 ss
.shutdown_gate
= seastar::gate();
153 ss
.addr
= entity_addr_t();
159 FixedCPUServerSocket(seastar::shard_id cpu
, construct_tag
) : cpu
{cpu
} {}
160 ~FixedCPUServerSocket() {
162 // detect whether user have called destroy() properly
163 ceph_assert(!service
);
166 FixedCPUServerSocket(FixedCPUServerSocket
&&) = delete;
167 FixedCPUServerSocket(const FixedCPUServerSocket
&) = delete;
168 FixedCPUServerSocket
& operator=(const FixedCPUServerSocket
&) = delete;
170 seastar::future
<> listen(entity_addr_t addr
) {
171 assert(seastar::engine().cpu_id() == cpu
);
172 logger().trace("FixedCPUServerSocket::listen({})...", addr
);
173 return container().invoke_on_all([addr
] (auto& ss
) {
175 seastar::socket_address
s_addr(addr
.in4_addr());
176 seastar::listen_options lo
;
177 lo
.reuse_address
= true;
178 lo
.set_fixed_cpu(ss
.cpu
);
179 ss
.listener
= seastar::listen(s_addr
, lo
);
180 }).handle_exception_type([addr
] (const std::system_error
& e
) {
181 if (e
.code() == std::errc::address_in_use
) {
182 logger().trace("FixedCPUServerSocket::listen({}): address in use", addr
);
185 logger().error("FixedCPUServerSocket::listen({}): "
186 "got unexpeted error {}", addr
, e
);
192 // fn_accept should be a nothrow function of type
193 // seastar::future<>(SocketRef, entity_addr_t)
194 template <typename Func
>
195 seastar::future
<> accept(Func
&& fn_accept
) {
196 assert(seastar::engine().cpu_id() == cpu
);
197 logger().trace("FixedCPUServerSocket({})::accept()...", addr
);
198 return container().invoke_on_all(
199 [fn_accept
= std::move(fn_accept
)] (auto& ss
) mutable {
202 // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
203 // so ignore the returned future
204 std::ignore
= seastar::with_gate(ss
.shutdown_gate
,
205 [&ss
, fn_accept
= std::move(fn_accept
)] () mutable {
206 return seastar::keep_doing([&ss
, fn_accept
= std::move(fn_accept
)] () mutable {
207 return ss
.listener
->accept().then(
208 [&ss
, fn_accept
= std::move(fn_accept
)]
209 (seastar::accept_result accept_result
) mutable {
210 // assert seastar::listen_options::set_fixed_cpu() works
211 assert(seastar::engine().cpu_id() == ss
.cpu
);
212 auto [socket
, paddr
] = std::move(accept_result
);
213 entity_addr_t peer_addr
;
214 peer_addr
.set_sockaddr(&paddr
.as_posix_sockaddr());
215 peer_addr
.set_type(entity_addr_t::TYPE_ANY
);
216 SocketRef _socket
= std::make_unique
<Socket
>(
217 std::move(socket
), Socket::construct_tag
{});
218 std::ignore
= seastar::with_gate(ss
.shutdown_gate
,
219 [socket
= std::move(_socket
), peer_addr
,
220 &ss
, fn_accept
= std::move(fn_accept
)] () mutable {
221 logger().trace("FixedCPUServerSocket({})::accept(): "
222 "accepted peer {}", ss
.addr
, peer_addr
);
223 return fn_accept(std::move(socket
), peer_addr
224 ).handle_exception([&ss
, peer_addr
] (auto eptr
) {
225 logger().error("FixedCPUServerSocket({})::accept(): "
226 "fn_accept(s, {}) got unexpected exception {}",
227 ss
.addr
, peer_addr
, eptr
);
232 }).handle_exception_type([&ss
] (const std::system_error
& e
) {
233 if (e
.code() == std::errc::connection_aborted
||
234 e
.code() == std::errc::invalid_argument
) {
235 logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
240 }).handle_exception([&ss
] (auto eptr
) {
241 logger().error("FixedCPUServerSocket({})::accept(): "
242 "got unexpected exception {}", ss
.addr
, eptr
);
249 seastar::future
<> shutdown() {
250 assert(seastar::engine().cpu_id() == cpu
);
251 logger().trace("FixedCPUServerSocket({})::shutdown()...", addr
);
252 return container().invoke_on_all([] (auto& ss
) {
254 ss
.listener
->abort_accept();
256 return ss
.shutdown_gate
.close();
262 seastar::future
<> destroy() {
263 assert(seastar::engine().cpu_id() == cpu
);
264 return shutdown().then([this] {
265 // we should only construct/stop shards on #0
266 return container().invoke_on(0, [] (auto& ss
) {
268 return ss
.service
->stop().finally([cleanup
= std::move(ss
.service
)] {});
273 static seastar::future
<FixedCPUServerSocket
*> create() {
274 auto cpu
= seastar::engine().cpu_id();
275 // we should only construct/stop shards on #0
276 return seastar::smp::submit_to(0, [cpu
] {
277 auto service
= std::make_unique
<sharded_service_t
>();
278 return service
->start(cpu
, construct_tag
{}
279 ).then([service
= std::move(service
)] () mutable {
280 auto p_shard
= service
.get();
281 p_shard
->local().service
= std::move(service
);
284 }).then([] (auto p_shard
) {
285 return &p_shard
->local();
290 } // namespace crimson::net