1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/sleep.hh>
7 #include <seastar/core/when_all.hh>
8 #include <seastar/net/packet.hh>
10 #include "crimson/common/log.h"
13 using crimson::common::local_conf
;
15 namespace crimson::net
{
19 seastar::logger
& logger() {
20 return crimson::get_logger(ceph_subsys_ms
);
23 using tmp_buf
= seastar::temporary_buffer
<char>;
24 using packet
= seastar::net::packet
;
26 // an input_stream consumer that reads buffer segments into a bufferlist up to
27 // the given number of remaining bytes
28 struct bufferlist_consumer
{
32 bufferlist_consumer(bufferlist
& bl
, size_t& remaining
)
33 : bl(bl
), remaining(remaining
) {}
35 using consumption_result_type
= typename
seastar::input_stream
<char>::consumption_result_type
;
37 // consume some or all of a buffer segment
38 seastar::future
<consumption_result_type
> operator()(tmp_buf
&& data
) {
39 if (remaining
>= data
.size()) {
40 // consume the whole buffer
41 remaining
-= data
.size();
42 bl
.append(buffer::create(std::move(data
)));
44 // return none to request more segments
45 return seastar::make_ready_future
<consumption_result_type
>(
46 seastar::continue_consuming
{});
48 // return an empty buffer to singal that we're done
49 return seastar::make_ready_future
<consumption_result_type
>(
50 consumption_result_type::stop_consuming_type({}));
55 bl
.append(buffer::create(data
.share(0, remaining
)));
56 data
.trim_front(remaining
);
59 // give the rest back to signal that we're done
60 return seastar::make_ready_future
<consumption_result_type
>(
61 consumption_result_type::stop_consuming_type
{std::move(data
)});
65 seastar::future
<> inject_delay()
67 if (float delay_period
= local_conf()->ms_inject_internal_delays
;
69 logger().debug("Socket::inject_delay: sleep for {}", delay_period
);
70 return seastar::sleep(
71 std::chrono::milliseconds((int)(delay_period
* 1000.0)));
73 return seastar::now();
78 if (local_conf()->ms_inject_socket_failures
) {
80 ceph::util::generate_random_number
<uint64_t>(1, RAND_MAX
);
81 if (rand
% local_conf()->ms_inject_socket_failures
== 0) {
82 logger().warn("Socket::inject_failure: injecting socket failure");
83 throw std::system_error(make_error_code(
84 error::negotiation_failure
));
89 } // anonymous namespace
92 seastar::connected_socket
&&_socket
,
96 : sid
{seastar::this_shard_id()},
97 socket(std::move(_socket
)),
99 // the default buffer size 8192 is too small that may impact our write
100 // performance. see seastar::net::connected_socket::output()
101 out(socket
.output(65536)),
102 socket_is_shutdown(false),
104 ephemeral_port(e_port
)
106 if (local_conf()->ms_tcp_nodelay
) {
107 socket
.set_nodelay(true);
113 assert(seastar::this_shard_id() == sid
);
119 seastar::future
<bufferlist
>
120 Socket::read(size_t bytes
)
122 assert(seastar::this_shard_id() == sid
);
123 #ifdef UNIT_TESTS_BUILT
124 return try_trap_pre(next_trap_read
).then([bytes
, this] {
127 return seastar::make_ready_future
<bufferlist
>();
131 return in
.consume(bufferlist_consumer
{r
.buffer
, r
.remaining
}).then([this] {
132 if (r
.remaining
) { // throw on short reads
133 throw std::system_error(make_error_code(error::read_eof
));
136 return inject_delay().then([this] {
137 return seastar::make_ready_future
<bufferlist
>(std::move(r
.buffer
));
140 #ifdef UNIT_TESTS_BUILT
141 }).then([this](auto buf
) {
142 return try_trap_post(next_trap_read
143 ).then([buf
= std::move(buf
)]() mutable {
144 return std::move(buf
);
150 seastar::future
<bufferptr
>
151 Socket::read_exactly(size_t bytes
) {
152 assert(seastar::this_shard_id() == sid
);
153 #ifdef UNIT_TESTS_BUILT
154 return try_trap_pre(next_trap_read
).then([bytes
, this] {
157 return seastar::make_ready_future
<bufferptr
>();
159 return in
.read_exactly(bytes
).then([bytes
](auto buf
) {
160 bufferptr
ptr(buffer::create(buf
.share()));
161 if (ptr
.length() < bytes
) {
162 throw std::system_error(make_error_code(error::read_eof
));
166 ).then([ptr
= std::move(ptr
)]() mutable {
167 return seastar::make_ready_future
<bufferptr
>(std::move(ptr
));
170 #ifdef UNIT_TESTS_BUILT
171 }).then([this](auto ptr
) {
172 return try_trap_post(next_trap_read
173 ).then([ptr
= std::move(ptr
)]() mutable {
174 return std::move(ptr
);
181 Socket::write(bufferlist buf
)
183 assert(seastar::this_shard_id() == sid
);
184 #ifdef UNIT_TESTS_BUILT
185 return try_trap_pre(next_trap_write
186 ).then([buf
= std::move(buf
), this]() mutable {
190 ).then([buf
= std::move(buf
), this]() mutable {
191 packet
p(std::move(buf
));
192 return out
.write(std::move(p
));
194 #ifdef UNIT_TESTS_BUILT
196 return try_trap_post(next_trap_write
);
204 assert(seastar::this_shard_id() == sid
);
206 return inject_delay().then([this] {
212 Socket::write_flush(bufferlist buf
)
214 assert(seastar::this_shard_id() == sid
);
215 #ifdef UNIT_TESTS_BUILT
216 return try_trap_pre(next_trap_write
217 ).then([buf
= std::move(buf
), this]() mutable {
221 ).then([buf
= std::move(buf
), this]() mutable {
222 packet
p(std::move(buf
));
223 return out
.write(std::move(p
)
228 #ifdef UNIT_TESTS_BUILT
230 return try_trap_post(next_trap_write
);
235 void Socket::shutdown()
237 assert(seastar::this_shard_id() == sid
);
238 socket_is_shutdown
= true;
239 socket
.shutdown_input();
240 socket
.shutdown_output();
243 static inline seastar::future
<>
244 close_and_handle_errors(seastar::output_stream
<char>& out
)
246 return out
.close().handle_exception_type([](const std::system_error
& e
) {
247 if (e
.code() != std::errc::broken_pipe
&&
248 e
.code() != std::errc::connection_reset
) {
249 logger().error("Socket::close(): unexpected error {}", e
.what());
252 // can happen when out is already shutdown, ignore
259 assert(seastar::this_shard_id() == sid
);
261 ceph_assert_always(!closed
);
264 return seastar::when_all_succeed(
267 close_and_handle_errors(out
)
269 return seastar::make_ready_future
<>();
270 }).handle_exception([](auto eptr
) {
273 std::rethrow_exception(eptr
);
274 } catch (std::exception
&e
) {
277 logger().error("Socket::close(): unexpected exception {}", e_what
);
282 seastar::future
<SocketRef
>
283 Socket::connect(const entity_addr_t
&peer_addr
)
288 return seastar::connect(peer_addr
.in4_addr());
289 }).then([peer_addr
](seastar::connected_socket socket
) {
290 auto ret
= std::make_unique
<Socket
>(
291 std::move(socket
), side_t::connector
, 0, construct_tag
{});
292 logger().debug("Socket::connect(): connected to {}, socket {}",
293 peer_addr
, fmt::ptr(ret
));
298 #ifdef UNIT_TESTS_BUILT
299 void Socket::set_trap(bp_type_t type
, bp_action_t action
, socket_blocker
* blocker_
) {
300 assert(seastar::this_shard_id() == sid
);
302 if (type
== bp_type_t::READ
) {
303 ceph_assert_always(next_trap_read
== bp_action_t::CONTINUE
);
304 next_trap_read
= action
;
305 } else { // type == bp_type_t::WRITE
306 if (next_trap_write
== bp_action_t::CONTINUE
) {
307 next_trap_write
= action
;
308 } else if (next_trap_write
== bp_action_t::FAULT
) {
309 // do_sweep_messages() may combine multiple write events into one socket write
310 ceph_assert_always(action
== bp_action_t::FAULT
|| action
== bp_action_t::CONTINUE
);
318 Socket::try_trap_pre(bp_action_t
& trap
) {
320 trap
= bp_action_t::CONTINUE
;
322 case bp_action_t::CONTINUE
:
324 case bp_action_t::FAULT
:
325 logger().info("[Test] got FAULT");
326 throw std::system_error(make_error_code(error::negotiation_failure
));
327 case bp_action_t::BLOCK
:
328 logger().info("[Test] got BLOCK");
329 return blocker
->block();
330 case bp_action_t::STALL
:
334 ceph_abort("unexpected action from trap");
336 return seastar::make_ready_future
<>();
340 Socket::try_trap_post(bp_action_t
& trap
) {
342 trap
= bp_action_t::CONTINUE
;
344 case bp_action_t::CONTINUE
:
346 case bp_action_t::STALL
:
347 logger().info("[Test] got STALL and block");
349 return blocker
->block();
351 ceph_abort("unexpected action from trap");
353 return seastar::make_ready_future
<>();
357 ShardedServerSocket::ShardedServerSocket(
358 seastar::shard_id sid
,
359 bool dispatch_only_on_primary_sid
,
361 : primary_sid
{sid
}, dispatch_only_on_primary_sid
{dispatch_only_on_primary_sid
}
365 ShardedServerSocket::~ShardedServerSocket()
368 // detect whether user have called destroy() properly
369 ceph_assert_always(!service
);
372 listen_ertr::future
<>
373 ShardedServerSocket::listen(entity_addr_t addr
)
375 ceph_assert_always(seastar::this_shard_id() == primary_sid
);
376 logger().debug("ShardedServerSocket({})::listen()...", addr
);
377 return this->container().invoke_on_all([addr
](auto& ss
) {
378 ss
.listen_addr
= addr
;
379 seastar::socket_address
s_addr(addr
.in4_addr());
380 seastar::listen_options lo
;
381 lo
.reuse_address
= true;
382 if (ss
.dispatch_only_on_primary_sid
) {
383 lo
.set_fixed_cpu(ss
.primary_sid
);
385 ss
.listener
= seastar::listen(s_addr
, lo
);
387 return listen_ertr::now();
388 }).handle_exception_type(
389 [addr
](const std::system_error
& e
) -> listen_ertr::future
<> {
390 if (e
.code() == std::errc::address_in_use
) {
391 logger().debug("ShardedServerSocket({})::listen(): address in use", addr
);
392 return crimson::ct_error::address_in_use::make();
393 } else if (e
.code() == std::errc::address_not_available
) {
394 logger().debug("ShardedServerSocket({})::listen(): address not available",
396 return crimson::ct_error::address_not_available::make();
398 logger().error("ShardedServerSocket({})::listen(): "
399 "got unexpeted error {}", addr
, e
.what());
405 ShardedServerSocket::accept(accept_func_t
&&_fn_accept
)
407 ceph_assert_always(seastar::this_shard_id() == primary_sid
);
408 logger().debug("ShardedServerSocket({})::accept()...", listen_addr
);
409 return this->container().invoke_on_all([_fn_accept
](auto &ss
) {
411 ss
.fn_accept
= _fn_accept
;
413 // ShardedServerSocket::shutdown() will drain the continuations in the gate
414 // so ignore the returned future
415 std::ignore
= seastar::with_gate(ss
.shutdown_gate
, [&ss
] {
416 return seastar::keep_doing([&ss
] {
417 return ss
.listener
->accept(
418 ).then([&ss
](seastar::accept_result accept_result
) {
420 if (ss
.dispatch_only_on_primary_sid
) {
421 // see seastar::listen_options::set_fixed_cpu()
422 ceph_assert_always(seastar::this_shard_id() == ss
.primary_sid
);
425 auto [socket
, paddr
] = std::move(accept_result
);
426 entity_addr_t peer_addr
;
427 peer_addr
.set_sockaddr(&paddr
.as_posix_sockaddr());
428 peer_addr
.set_type(ss
.listen_addr
.get_type());
429 SocketRef _socket
= std::make_unique
<Socket
>(
430 std::move(socket
), Socket::side_t::acceptor
,
431 peer_addr
.get_port(), Socket::construct_tag
{});
432 logger().debug("ShardedServerSocket({})::accept(): accepted peer {}, "
433 "socket {}, dispatch_only_on_primary_sid = {}",
434 ss
.listen_addr
, peer_addr
, fmt::ptr(_socket
),
435 ss
.dispatch_only_on_primary_sid
);
436 std::ignore
= seastar::with_gate(
438 [socket
=std::move(_socket
), peer_addr
, &ss
]() mutable {
439 return ss
.fn_accept(std::move(socket
), peer_addr
440 ).handle_exception([&ss
, peer_addr
](auto eptr
) {
443 std::rethrow_exception(eptr
);
444 } catch (std::exception
&e
) {
447 logger().error("ShardedServerSocket({})::accept(): "
448 "fn_accept(s, {}) got unexpected exception {}",
449 ss
.listen_addr
, peer_addr
, e_what
);
454 }).handle_exception_type([&ss
](const std::system_error
& e
) {
455 if (e
.code() == std::errc::connection_aborted
||
456 e
.code() == std::errc::invalid_argument
) {
457 logger().debug("ShardedServerSocket({})::accept(): stopped ({})",
458 ss
.listen_addr
, e
.what());
462 }).handle_exception([&ss
](auto eptr
) {
465 std::rethrow_exception(eptr
);
466 } catch (std::exception
&e
) {
469 logger().error("ShardedServerSocket({})::accept(): "
470 "got unexpected exception {}", ss
.listen_addr
, e_what
);
478 ShardedServerSocket::shutdown_destroy()
480 assert(seastar::this_shard_id() == primary_sid
);
481 logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr
);
483 return this->container().invoke_on_all([](auto& ss
) {
485 ss
.listener
->abort_accept();
487 return ss
.shutdown_gate
.close();
490 return this->container().invoke_on_all([](auto& ss
) {
491 assert(ss
.shutdown_gate
.is_closed());
492 ss
.listen_addr
= entity_addr_t();
496 // stop the sharded service: we should only construct/stop shards on #0
497 return this->container().invoke_on(0, [](auto& ss
) {
499 return ss
.service
->stop().finally([cleanup
= std::move(ss
.service
)] {});
504 seastar::future
<ShardedServerSocket
*>
505 ShardedServerSocket::create(bool dispatch_only_on_this_shard
)
507 auto primary_sid
= seastar::this_shard_id();
508 // start the sharded service: we should only construct/stop shards on #0
509 return seastar::smp::submit_to(0, [primary_sid
, dispatch_only_on_this_shard
] {
510 auto service
= std::make_unique
<sharded_service_t
>();
511 return service
->start(
512 primary_sid
, dispatch_only_on_this_shard
, construct_tag
{}
513 ).then([service
= std::move(service
)]() mutable {
514 auto p_shard
= service
.get();
515 p_shard
->local().service
= std::move(service
);
518 }).then([](auto p_shard
) {
519 return &p_shard
->local();
523 } // namespace crimson::net