1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "io_handler.h"
8 #include "crimson/common/formatter.h"
9 #include "crimson/common/log.h"
10 #include "crimson/net/Errors.h"
11 #include "crimson/net/chained_dispatchers.h"
12 #include "crimson/net/SocketMessenger.h"
13 #include "msg/Message.h"
14 #include "msg/msg_fmt.h"
16 using namespace ceph::msgr::v2
;
17 using crimson::common::local_conf
;
21 seastar::logger
& logger() {
22 return crimson::get_logger(ceph_subsys_ms
);
25 [[noreturn
]] void abort_in_fault() {
26 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
29 [[noreturn
]] void abort_protocol() {
30 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted
));
33 std::size_t get_msg_size(const FrameAssembler
&rx_frame_asm
)
35 ceph_assert(rx_frame_asm
.get_num_segments() > 0);
37 // we don't include SegmentIndex::Msg::HEADER.
38 for (size_t idx
= 1; idx
< rx_frame_asm
.get_num_segments(); idx
++) {
39 sum
+= rx_frame_asm
.get_segment_logical_len(idx
);
44 } // namespace anonymous
46 namespace crimson::net
{
48 IOHandler::IOHandler(ChainedDispatchers
&dispatchers
,
49 SocketConnection
&conn
)
50 : shard_states(shard_states_t::create(
51 seastar::this_shard_id(), io_state_t::none
)),
52 dispatchers(dispatchers
),
54 conn_ref(conn
.get_local_shared_foreign_from_this())
57 IOHandler::~IOHandler()
59 // close_io() must be finished
60 ceph_assert_always(maybe_prv_shard_states
== nullptr);
61 // should be true in the according shard
62 // ceph_assert_always(shard_states->assert_closed_and_exit());
66 #ifdef UNIT_TESTS_BUILT
71 IOHandler::sweep_out_pending_msgs_to_sent(
72 bool require_keepalive
,
73 std::optional
<utime_t
> maybe_keepalive_ack
,
76 std::size_t num_msgs
= out_pending_msgs
.size();
79 #ifdef UNIT_TESTS_BUILT
80 std::vector
<Tag
> tags
;
83 if (unlikely(require_keepalive
)) {
84 auto keepalive_frame
= KeepAliveFrame::Encode();
85 bl
.append(frame_assembler
->get_buffer(keepalive_frame
));
86 #ifdef UNIT_TESTS_BUILT
87 auto tag
= KeepAliveFrame::tag
;
92 if (unlikely(maybe_keepalive_ack
.has_value())) {
93 auto keepalive_ack_frame
= KeepAliveFrameAck::Encode(*maybe_keepalive_ack
);
94 bl
.append(frame_assembler
->get_buffer(keepalive_ack_frame
));
95 #ifdef UNIT_TESTS_BUILT
96 auto tag
= KeepAliveFrameAck::tag
;
101 if (require_ack
&& num_msgs
== 0u) {
102 auto ack_frame
= AckFrame::Encode(in_seq
);
103 bl
.append(frame_assembler
->get_buffer(ack_frame
));
104 #ifdef UNIT_TESTS_BUILT
105 auto tag
= AckFrame::tag
;
111 out_pending_msgs
.begin(),
112 out_pending_msgs
.begin()+num_msgs
,
114 #ifdef UNIT_TESTS_BUILT
117 ](const MessageFRef
& msg
) {
119 msg
->get_header().src
= conn
.messenger
.get_myname();
121 msg
->encode(conn
.features
, 0);
123 ceph_assert(!msg
->get_seq() && "message already has seq");
124 msg
->set_seq(++out_seq
);
126 ceph_msg_header
&header
= msg
->get_header();
127 ceph_msg_footer
&footer
= msg
->get_footer();
129 ceph_msg_header2 header2
{header
.seq
, header
.tid
,
130 header
.type
, header
.priority
,
132 ceph_le32(0), header
.data_off
,
134 footer
.flags
, header
.compat_version
,
137 auto message
= MessageFrame::Encode(header2
,
138 msg
->get_payload(), msg
->get_middle(), msg
->get_data());
139 logger().debug("{} --> #{} === {} ({})",
140 conn
, msg
->get_seq(), *msg
, msg
->get_type());
141 bl
.append(frame_assembler
->get_buffer(message
));
142 #ifdef UNIT_TESTS_BUILT
143 auto tag
= MessageFrame::tag
;
148 if (!conn
.policy
.lossy
) {
149 out_sent_msgs
.insert(
151 std::make_move_iterator(out_pending_msgs
.begin()),
152 std::make_move_iterator(out_pending_msgs
.end()));
154 out_pending_msgs
.clear();
156 #ifdef UNIT_TESTS_BUILT
157 return sweep_ret
{std::move(bl
), tags
};
163 seastar::future
<> IOHandler::send(MessageFRef msg
)
165 // sid may be changed on-the-fly during the submission
166 if (seastar::this_shard_id() == get_shard_id()) {
167 return do_send(std::move(msg
));
169 logger().trace("{} send() is directed to {} -- {}",
170 conn
, get_shard_id(), *msg
);
171 return seastar::smp::submit_to(
172 get_shard_id(), [this, msg
=std::move(msg
)]() mutable {
173 return send_redirected(std::move(msg
));
178 seastar::future
<> IOHandler::send_redirected(MessageFRef msg
)
180 // sid may be changed on-the-fly during the submission
181 if (seastar::this_shard_id() == get_shard_id()) {
182 return do_send(std::move(msg
));
184 logger().debug("{} send() is redirected to {} -- {}",
185 conn
, get_shard_id(), *msg
);
186 return seastar::smp::submit_to(
187 get_shard_id(), [this, msg
=std::move(msg
)]() mutable {
188 return send_redirected(std::move(msg
));
193 seastar::future
<> IOHandler::do_send(MessageFRef msg
)
195 assert(seastar::this_shard_id() == get_shard_id());
196 logger().trace("{} do_send() got message -- {}", conn
, *msg
);
197 if (get_io_state() != io_state_t::drop
) {
198 out_pending_msgs
.push_back(std::move(msg
));
199 notify_out_dispatch();
201 return seastar::now();
204 seastar::future
<> IOHandler::send_keepalive()
206 // sid may be changed on-the-fly during the submission
207 if (seastar::this_shard_id() == get_shard_id()) {
208 return do_send_keepalive();
210 logger().trace("{} send_keepalive() is directed to {}", conn
, get_shard_id());
211 return seastar::smp::submit_to(
212 get_shard_id(), [this] {
213 return send_keepalive_redirected();
218 seastar::future
<> IOHandler::send_keepalive_redirected()
220 // sid may be changed on-the-fly during the submission
221 if (seastar::this_shard_id() == get_shard_id()) {
222 return do_send_keepalive();
224 logger().debug("{} send_keepalive() is redirected to {}", conn
, get_shard_id());
225 return seastar::smp::submit_to(
226 get_shard_id(), [this] {
227 return send_keepalive_redirected();
232 seastar::future
<> IOHandler::do_send_keepalive()
234 assert(seastar::this_shard_id() == get_shard_id());
235 logger().trace("{} do_send_keeplive(): need_keepalive={}", conn
, need_keepalive
);
236 if (!need_keepalive
) {
237 need_keepalive
= true;
238 notify_out_dispatch();
240 return seastar::now();
243 void IOHandler::mark_down()
245 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
246 ceph_assert_always(get_io_state() != io_state_t::none
);
247 need_dispatch_reset
= false;
248 if (get_io_state() == io_state_t::drop
) {
252 auto cc_seq
= crosscore
.prepare_submit();
253 logger().info("{} mark_down() at {}, send {} notify_mark_down()",
254 conn
, io_stat_printer
{*this}, cc_seq
);
255 do_set_io_state(io_state_t::drop
);
256 shard_states
->dispatch_in_background(
257 "notify_mark_down", conn
, [this, cc_seq
] {
258 return seastar::smp::submit_to(
259 conn
.get_messenger_shard_id(), [this, cc_seq
] {
260 return handshake_listener
->notify_mark_down(cc_seq
);
265 void IOHandler::print_io_stat(std::ostream
&out
) const
267 assert(seastar::this_shard_id() == get_shard_id());
269 << "io_state=" << fmt::format("{}", get_io_state())
270 << ", in_seq=" << in_seq
271 << ", out_seq=" << out_seq
272 << ", out_pending_msgs_size=" << out_pending_msgs
.size()
273 << ", out_sent_msgs_size=" << out_sent_msgs
.size()
274 << ", need_ack=" << (ack_left
> 0)
275 << ", need_keepalive=" << need_keepalive
276 << ", need_keepalive_ack=" << bool(next_keepalive_ack
)
280 void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa
)
282 assert(fa
!= nullptr);
283 ceph_assert_always(frame_assembler
== nullptr);
284 frame_assembler
= std::move(fa
);
286 frame_assembler
->get_shard_id() == get_shard_id());
287 // should have been set through dispatch_accept/connect()
289 frame_assembler
->get_socket_shard_id() == get_shard_id());
290 ceph_assert_always(frame_assembler
->is_socket_valid());
293 void IOHandler::do_set_io_state(
294 io_state_t new_state
,
295 std::optional
<crosscore_t::seq_t
> cc_seq
,
296 FrameAssemblerV2Ref fa
,
299 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
300 auto prv_state
= get_io_state();
301 logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, "
302 "fa={}, set_notify_out={}, at {}",
304 cc_seq
.has_value() ? fmt::format("{} ", *cc_seq
) : "",
305 prv_state
, new_state
,
306 fa
? "present" : "N/A", set_notify_out
,
307 io_stat_printer
{*this});
308 ceph_assert_always(!(
309 (new_state
== io_state_t::none
&& prv_state
!= io_state_t::none
) ||
310 (new_state
== io_state_t::open
&& prv_state
== io_state_t::open
)
313 if (prv_state
== io_state_t::drop
) {
314 // only possible due to a racing mark_down() from user
315 if (new_state
== io_state_t::open
) {
316 assign_frame_assembler(std::move(fa
));
317 frame_assembler
->shutdown_socket
<false>(nullptr);
319 assert(fa
== nullptr);
324 bool dispatch_in
= false;
325 if (new_state
== io_state_t::open
) {
327 ceph_assert_always(protocol_is_connected
== true);
328 assign_frame_assembler(std::move(fa
));
330 } else if (prv_state
== io_state_t::open
) {
332 ceph_assert_always(protocol_is_connected
== true);
333 protocol_is_connected
= false;
334 assert(fa
== nullptr);
335 ceph_assert_always(frame_assembler
->is_socket_valid());
336 frame_assembler
->shutdown_socket
<false>(nullptr);
338 assert(fa
== nullptr);
341 if (new_state
== io_state_t::delay
) {
342 need_notify_out
= set_notify_out
;
343 if (need_notify_out
) {
344 maybe_notify_out_dispatch();
347 assert(set_notify_out
== false);
348 need_notify_out
= false;
351 // FIXME: simplify and drop the prv_state == new_state case
352 if (prv_state
!= new_state
) {
353 shard_states
->set_io_state(new_state
);
365 seastar::future
<> IOHandler::set_io_state(
366 crosscore_t::seq_t cc_seq
,
367 io_state_t new_state
,
368 FrameAssemblerV2Ref fa
,
371 assert(seastar::this_shard_id() == get_shard_id());
372 if (!crosscore
.proceed_or_wait(cc_seq
)) {
373 logger().debug("{} got {} set_io_state(), wait at {}",
374 conn
, cc_seq
, crosscore
.get_in_seq());
375 return crosscore
.wait(cc_seq
376 ).then([this, cc_seq
, new_state
,
377 fa
=std::move(fa
), set_notify_out
]() mutable {
378 return set_io_state(cc_seq
, new_state
, std::move(fa
), set_notify_out
);
382 do_set_io_state(new_state
, cc_seq
, std::move(fa
), set_notify_out
);
383 return seastar::now();
386 seastar::future
<IOHandler::exit_dispatching_ret
>
387 IOHandler::wait_io_exit_dispatching(
388 crosscore_t::seq_t cc_seq
)
390 assert(seastar::this_shard_id() == get_shard_id());
391 if (!crosscore
.proceed_or_wait(cc_seq
)) {
392 logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
393 conn
, cc_seq
, crosscore
.get_in_seq());
394 return crosscore
.wait(cc_seq
395 ).then([this, cc_seq
] {
396 return wait_io_exit_dispatching(cc_seq
);
400 logger().debug("{} got {} wait_io_exit_dispatching()",
402 ceph_assert_always(get_io_state() != io_state_t::open
);
403 ceph_assert_always(frame_assembler
!= nullptr);
404 ceph_assert_always(!frame_assembler
->is_socket_valid());
405 return seastar::futurize_invoke([this] {
406 // cannot be running in parallel with to_new_sid()
407 if (maybe_dropped_sid
.has_value()) {
408 ceph_assert_always(get_io_state() == io_state_t::drop
);
409 assert(shard_states
->assert_closed_and_exit());
410 auto prv_sid
= *maybe_dropped_sid
;
411 return seastar::smp::submit_to(prv_sid
, [this] {
412 logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn
);
413 assert(maybe_prv_shard_states
!= nullptr);
414 return maybe_prv_shard_states
->wait_io_exit_dispatching();
417 return shard_states
->wait_io_exit_dispatching();
420 logger().debug("{} finish wait_io_exit_dispatching at {}",
421 conn
, io_stat_printer
{*this});
422 ceph_assert_always(frame_assembler
!= nullptr);
423 ceph_assert_always(!frame_assembler
->is_socket_valid());
424 frame_assembler
->set_shard_id(conn
.get_messenger_shard_id());
425 return exit_dispatching_ret
{
426 std::move(frame_assembler
),
431 seastar::future
<> IOHandler::reset_session(
432 crosscore_t::seq_t cc_seq
,
435 assert(seastar::this_shard_id() == get_shard_id());
436 if (!crosscore
.proceed_or_wait(cc_seq
)) {
437 logger().debug("{} got {} reset_session(), wait at {}",
438 conn
, cc_seq
, crosscore
.get_in_seq());
439 return crosscore
.wait(cc_seq
440 ).then([this, cc_seq
, full
] {
441 return reset_session(cc_seq
, full
);
445 logger().debug("{} got {} reset_session({})",
447 assert(get_io_state() != io_state_t::open
);
451 dispatch_remote_reset();
453 return seastar::now();
456 seastar::future
<> IOHandler::reset_peer_state(
457 crosscore_t::seq_t cc_seq
)
459 assert(seastar::this_shard_id() == get_shard_id());
460 if (!crosscore
.proceed_or_wait(cc_seq
)) {
461 logger().debug("{} got {} reset_peer_state(), wait at {}",
462 conn
, cc_seq
, crosscore
.get_in_seq());
463 return crosscore
.wait(cc_seq
464 ).then([this, cc_seq
] {
465 return reset_peer_state(cc_seq
);
469 logger().debug("{} got {} reset_peer_state()",
471 assert(get_io_state() != io_state_t::open
);
473 do_requeue_out_sent_up_to(0);
475 return seastar::now();
478 seastar::future
<> IOHandler::requeue_out_sent(
479 crosscore_t::seq_t cc_seq
)
481 assert(seastar::this_shard_id() == get_shard_id());
482 if (!crosscore
.proceed_or_wait(cc_seq
)) {
483 logger().debug("{} got {} requeue_out_sent(), wait at {}",
484 conn
, cc_seq
, crosscore
.get_in_seq());
485 return crosscore
.wait(cc_seq
486 ).then([this, cc_seq
] {
487 return requeue_out_sent(cc_seq
);
491 logger().debug("{} got {} requeue_out_sent()",
493 do_requeue_out_sent();
494 return seastar::now();
497 void IOHandler::do_requeue_out_sent()
499 assert(get_io_state() != io_state_t::open
);
500 if (out_sent_msgs
.empty()) {
504 out_seq
-= out_sent_msgs
.size();
505 logger().debug("{} requeue {} items, revert out_seq to {}",
506 conn
, out_sent_msgs
.size(), out_seq
);
507 for (MessageFRef
& msg
: out_sent_msgs
) {
508 msg
->clear_payload();
511 out_pending_msgs
.insert(
512 out_pending_msgs
.begin(),
513 std::make_move_iterator(out_sent_msgs
.begin()),
514 std::make_move_iterator(out_sent_msgs
.end()));
515 out_sent_msgs
.clear();
516 maybe_notify_out_dispatch();
519 seastar::future
<> IOHandler::requeue_out_sent_up_to(
520 crosscore_t::seq_t cc_seq
,
523 assert(seastar::this_shard_id() == get_shard_id());
524 if (!crosscore
.proceed_or_wait(cc_seq
)) {
525 logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
526 conn
, cc_seq
, crosscore
.get_in_seq());
527 return crosscore
.wait(cc_seq
528 ).then([this, cc_seq
, msg_seq
] {
529 return requeue_out_sent_up_to(cc_seq
, msg_seq
);
533 logger().debug("{} got {} requeue_out_sent_up_to({})",
534 conn
, cc_seq
, msg_seq
);
535 do_requeue_out_sent_up_to(msg_seq
);
536 return seastar::now();
539 void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq
)
541 assert(get_io_state() != io_state_t::open
);
542 if (out_sent_msgs
.empty() && out_pending_msgs
.empty()) {
543 logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
548 logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
549 conn
, seq
, out_sent_msgs
.size(), out_seq
);
550 while (!out_sent_msgs
.empty()) {
551 auto cur_seq
= out_sent_msgs
.front()->get_seq();
552 if (cur_seq
== 0 || cur_seq
> seq
) {
555 out_sent_msgs
.pop_front();
558 do_requeue_out_sent();
561 void IOHandler::reset_in()
563 assert(get_io_state() != io_state_t::open
);
567 void IOHandler::reset_out()
569 assert(get_io_state() != io_state_t::open
);
571 out_pending_msgs
.clear();
572 need_keepalive
= false;
573 next_keepalive_ack
= std::nullopt
;
577 void IOHandler::discard_out_sent()
579 assert(get_io_state() != io_state_t::open
);
581 out_sent_msgs
.clear();
585 IOHandler::dispatch_accept(
586 crosscore_t::seq_t cc_seq
,
587 seastar::shard_id new_sid
,
588 ConnectionFRef conn_fref
,
591 return to_new_sid(cc_seq
, new_sid
, std::move(conn_fref
), is_replace
);
595 IOHandler::dispatch_connect(
596 crosscore_t::seq_t cc_seq
,
597 seastar::shard_id new_sid
,
598 ConnectionFRef conn_fref
)
600 return to_new_sid(cc_seq
, new_sid
, std::move(conn_fref
), std::nullopt
);
604 IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid
)
606 assert(seastar::this_shard_id() == get_shard_id());
607 return seastar::smp::submit_to(prv_sid
, [this] {
608 logger().debug("{} got cleanup_prv_shard()", conn
);
609 assert(maybe_prv_shard_states
!= nullptr);
610 auto ref_prv_states
= std::move(maybe_prv_shard_states
);
611 auto &prv_states
= *ref_prv_states
;
612 return prv_states
.close(
613 ).then([ref_prv_states
=std::move(ref_prv_states
)] {
614 ceph_assert_always(ref_prv_states
->assert_closed_and_exit());
617 ceph_assert_always(maybe_prv_shard_states
== nullptr);
622 IOHandler::to_new_sid(
623 crosscore_t::seq_t cc_seq
,
624 seastar::shard_id new_sid
,
625 ConnectionFRef conn_fref
,
626 std::optional
<bool> is_replace
)
628 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
629 if (!crosscore
.proceed_or_wait(cc_seq
)) {
630 logger().debug("{} got {} to_new_sid(), wait at {}",
631 conn
, cc_seq
, crosscore
.get_in_seq());
632 return crosscore
.wait(cc_seq
633 ).then([this, cc_seq
, new_sid
, is_replace
,
634 conn_fref
=std::move(conn_fref
)]() mutable {
635 return to_new_sid(cc_seq
, new_sid
, std::move(conn_fref
), is_replace
);
639 bool is_accept_or_connect
= is_replace
.has_value();
640 logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}",
641 conn
, cc_seq
, new_sid
,
643 is_accept_or_connect
?
644 (*is_replace
? "accept(replace)" : "accept(!replace)") :
646 io_stat_printer
{*this});
647 auto next_cc_seq
= ++cc_seq
;
649 if (get_io_state() != io_state_t::drop
) {
650 ceph_assert_always(conn_ref
);
651 if (new_sid
!= seastar::this_shard_id()) {
652 dispatchers
.ms_handle_shard_change(conn_ref
, new_sid
, is_accept_or_connect
);
653 // user can make changes
656 // it is possible that both io_handler and protocolv2 are
657 // trying to close each other from different cores simultaneously.
658 assert(!protocol_is_connected
);
661 if (get_io_state() != io_state_t::drop
) {
662 if (is_accept_or_connect
) {
663 // protocol_is_connected can be from true to true here if the replacing is
664 // happening to a connected connection.
666 ceph_assert_always(protocol_is_connected
== false);
668 protocol_is_connected
= true;
670 assert(!protocol_is_connected
);
673 bool is_dropped
= false;
674 if (get_io_state() == io_state_t::drop
) {
677 ceph_assert_always(get_io_state() != io_state_t::open
);
679 // apply the switching atomically
680 ceph_assert_always(conn_ref
);
682 auto prv_sid
= get_shard_id();
683 ceph_assert_always(maybe_prv_shard_states
== nullptr);
684 maybe_prv_shard_states
= std::move(shard_states
);
685 shard_states
= shard_states_t::create_from_previous(
686 *maybe_prv_shard_states
, new_sid
);
687 assert(new_sid
== get_shard_id());
689 return seastar::smp::submit_to(new_sid
,
690 [this, next_cc_seq
, is_dropped
, prv_sid
, is_replace
, conn_fref
=std::move(conn_fref
)]() mutable {
691 logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}",
692 conn
, next_cc_seq
, prv_sid
, is_dropped
,
694 is_replace
.has_value() ?
695 (*is_replace
? "accept(replace)" : "accept(!replace)") :
697 io_stat_printer
{*this});
699 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
700 ceph_assert_always(get_io_state() != io_state_t::open
);
701 ceph_assert_always(!maybe_dropped_sid
.has_value());
702 ceph_assert_always(crosscore
.proceed_or_wait(next_cc_seq
));
705 ceph_assert_always(get_io_state() == io_state_t::drop
);
706 ceph_assert_always(shard_states
->assert_closed_and_exit());
707 maybe_dropped_sid
= prv_sid
;
708 // cleanup_prv_shard() will be done in a follow-up close_io()
710 // possible at io_state_t::drop
712 // previous shard is not cleaned,
713 // but close_io() is responsible to clean up the current shard,
714 // so cleanup the previous shard here.
715 shard_states
->dispatch_in_background(
716 "cleanup_prv_sid", conn
, [this, prv_sid
] {
717 return cleanup_prv_shard(prv_sid
);
719 maybe_notify_out_dispatch();
722 ceph_assert_always(!conn_ref
);
723 // assign even if already dropping
724 conn_ref
= make_local_shared_foreign(std::move(conn_fref
));
726 if (get_io_state() != io_state_t::drop
) {
727 if (is_replace
.has_value()) {
728 dispatchers
.ms_handle_accept(conn_ref
, prv_sid
, *is_replace
);
730 dispatchers
.ms_handle_connect(conn_ref
, prv_sid
);
732 // user can make changes
737 seastar::future
<> IOHandler::set_accepted_sid(
738 crosscore_t::seq_t cc_seq
,
739 seastar::shard_id sid
,
740 ConnectionFRef conn_fref
)
742 assert(seastar::this_shard_id() == get_shard_id());
743 assert(get_io_state() == io_state_t::none
);
744 ceph_assert_always(conn_ref
);
746 assert(maybe_prv_shard_states
== nullptr);
747 shard_states
.reset();
748 shard_states
= shard_states_t::create(sid
, io_state_t::none
);
749 return seastar::smp::submit_to(sid
,
750 [this, cc_seq
, conn_fref
=std::move(conn_fref
)]() mutable {
751 // must be the first to proceed
752 ceph_assert_always(crosscore
.proceed_or_wait(cc_seq
));
754 logger().debug("{} set accepted sid", conn
);
755 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
756 ceph_assert_always(get_io_state() == io_state_t::none
);
757 assert(maybe_prv_shard_states
== nullptr);
758 ceph_assert_always(!conn_ref
);
759 conn_ref
= make_local_shared_foreign(std::move(conn_fref
));
763 void IOHandler::dispatch_reset(bool is_replace
)
765 ceph_assert_always(get_io_state() == io_state_t::drop
);
766 if (!need_dispatch_reset
) {
769 need_dispatch_reset
= false;
770 ceph_assert_always(conn_ref
);
772 dispatchers
.ms_handle_reset(conn_ref
, is_replace
);
773 // user can make changes
776 void IOHandler::dispatch_remote_reset()
778 if (get_io_state() == io_state_t::drop
) {
781 ceph_assert_always(conn_ref
);
783 dispatchers
.ms_handle_remote_reset(conn_ref
);
784 // user can make changes
787 void IOHandler::ack_out_sent(seq_num_t seq
)
789 if (conn
.policy
.lossy
) { // lossy connections don't keep sent messages
792 while (!out_sent_msgs
.empty() &&
793 out_sent_msgs
.front()->get_seq() <= seq
) {
794 logger().trace("{} got ack seq {} >= {}, pop {}",
795 conn
, seq
, out_sent_msgs
.front()->get_seq(),
796 *out_sent_msgs
.front());
797 out_sent_msgs
.pop_front();
802 IOHandler::do_out_dispatch(shard_states_t
&ctx
)
804 return seastar::repeat([this, &ctx
] {
805 switch (ctx
.get_io_state()) {
806 case io_state_t::open
: {
807 if (unlikely(!is_out_queued())) {
808 // try exit open dispatching
809 return frame_assembler
->flush
<false>(
810 ).then([this, &ctx
] {
811 if (ctx
.get_io_state() != io_state_t::open
|| is_out_queued()) {
812 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
814 // still nothing pending to send after flush,
815 // open dispatching can ONLY stop now
816 ctx
.exit_out_dispatching("exit-open", conn
);
817 return seastar::make_ready_future
<stop_t
>(stop_t::yes
);
821 auto require_keepalive
= need_keepalive
;
822 need_keepalive
= false;
823 auto maybe_keepalive_ack
= next_keepalive_ack
;
824 next_keepalive_ack
= std::nullopt
;
825 auto to_ack
= ack_left
;
826 assert(to_ack
== 0 || in_seq
> 0);
828 #ifdef UNIT_TESTS_BUILT
829 auto ret
= sweep_out_pending_msgs_to_sent(
830 require_keepalive
, maybe_keepalive_ack
, to_ack
> 0);
831 return frame_assembler
->intercept_frames(ret
.tags
, true
832 ).then([this, bl
=std::move(ret
.bl
)]() mutable {
833 return frame_assembler
->write
<false>(std::move(bl
));
836 auto bl
= sweep_out_pending_msgs_to_sent(
837 require_keepalive
, maybe_keepalive_ack
, to_ack
> 0);
838 return frame_assembler
->write
<false>(std::move(bl
)
840 ).then([this, &ctx
] {
841 if (ctx
.get_io_state() != io_state_t::open
) {
842 return frame_assembler
->flush
<false>(
844 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
848 // FIXME: may leak a flush if state is changed after return and before
849 // the next repeat body.
850 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
853 case io_state_t::delay
:
854 // delay out dispatching until open
855 ctx
.notify_out_dispatching_stopped("delay...", conn
);
856 return ctx
.wait_state_change(
857 ).then([] { return stop_t::no
; });
858 case io_state_t::drop
:
859 ctx
.exit_out_dispatching("dropped", conn
);
860 return seastar::make_ready_future
<stop_t
>(stop_t::yes
);
861 case io_state_t::switched
:
862 ctx
.exit_out_dispatching("switched", conn
);
863 return seastar::make_ready_future
<stop_t
>(stop_t::yes
);
865 ceph_abort("impossible");
867 }).handle_exception_type([this, &ctx
](const std::system_error
& e
) {
868 auto io_state
= ctx
.get_io_state();
869 if (e
.code() != std::errc::broken_pipe
&&
870 e
.code() != std::errc::connection_reset
&&
871 e
.code() != error::negotiation_failure
) {
872 logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
873 conn
, io_state
, e
.what());
877 if (io_state
== io_state_t::open
) {
878 auto cc_seq
= crosscore
.prepare_submit();
879 logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
880 "send {} notify_out_fault()",
881 conn
, io_state
, io_stat_printer
{*this}, e
.what(), cc_seq
);
882 std::exception_ptr eptr
;
886 eptr
= std::current_exception();
888 do_set_io_state(io_state_t::delay
);
889 shard_states
->dispatch_in_background(
890 "notify_out_fault(out)", conn
, [this, cc_seq
, eptr
] {
891 auto states
= get_states();
892 return seastar::smp::submit_to(
893 conn
.get_messenger_shard_id(), [this, cc_seq
, eptr
, states
] {
894 return handshake_listener
->notify_out_fault(
895 cc_seq
, "do_out_dispatch", eptr
, states
);
899 if (io_state
!= io_state_t::switched
) {
900 logger().info("{} do_out_dispatch(): fault at {}, {} -- {}",
901 conn
, io_state
, io_stat_printer
{*this}, e
.what());
903 logger().info("{} do_out_dispatch(): fault at {} -- {}",
904 conn
, io_state
, e
.what());
908 return do_out_dispatch(ctx
);
912 void IOHandler::maybe_notify_out_dispatch()
914 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
915 if (is_out_queued()) {
916 notify_out_dispatch();
920 void IOHandler::notify_out_dispatch()
922 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
923 assert(is_out_queued());
924 if (need_notify_out
) {
925 auto cc_seq
= crosscore
.prepare_submit();
926 logger().debug("{} send {} notify_out()",
928 shard_states
->dispatch_in_background(
929 "notify_out", conn
, [this, cc_seq
] {
930 return seastar::smp::submit_to(
931 conn
.get_messenger_shard_id(), [this, cc_seq
] {
932 return handshake_listener
->notify_out(cc_seq
);
936 if (shard_states
->try_enter_out_dispatching()) {
937 shard_states
->dispatch_in_background(
938 "do_out_dispatch", conn
, [this] {
939 return do_out_dispatch(*shard_states
);
945 IOHandler::read_message(
947 utime_t throttle_stamp
,
948 std::size_t msg_size
)
950 return frame_assembler
->read_frame_payload
<false>(
951 ).then([this, throttle_stamp
, msg_size
, &ctx
](auto payload
) {
952 if (unlikely(ctx
.get_io_state() != io_state_t::open
)) {
953 logger().debug("{} triggered {} during read_message()",
954 conn
, ctx
.get_io_state());
958 utime_t recv_stamp
{seastar::lowres_system_clock::now()};
960 // we need to get the size before std::moving segments data
961 auto msg_frame
= MessageFrame::Decode(*payload
);
962 // XXX: paranoid copy just to avoid oops
963 ceph_msg_header2 current_header
= msg_frame
.header();
965 logger().trace("{} got {} + {} + {} byte message,"
966 " envelope type={} src={} off={} seq={}",
968 msg_frame
.front_len(),
969 msg_frame
.middle_len(),
970 msg_frame
.data_len(),
972 conn
.get_peer_name(),
973 current_header
.data_off
,
976 ceph_msg_header header
{current_header
.seq
,
979 current_header
.priority
,
980 current_header
.version
,
981 ceph_le32(msg_frame
.front_len()),
982 ceph_le32(msg_frame
.middle_len()),
983 ceph_le32(msg_frame
.data_len()),
984 current_header
.data_off
,
985 conn
.get_peer_name(),
986 current_header
.compat_version
,
987 current_header
.reserved
,
989 ceph_msg_footer footer
{ceph_le32(0), ceph_le32(0),
990 ceph_le32(0), ceph_le64(0), current_header
.flags
};
992 Message
*message
= decode_message(nullptr, 0, header
, footer
,
993 msg_frame
.front(), msg_frame
.middle(), msg_frame
.data(), nullptr);
995 logger().warn("{} decode message failed", conn
);
999 // store reservation size in message, so we don't get confused
1000 // by messages entering the dispatch queue through other paths.
1001 message
->set_dispatch_throttle_size(msg_size
);
1003 message
->set_throttle_stamp(throttle_stamp
);
1004 message
->set_recv_stamp(recv_stamp
);
1005 message
->set_recv_complete_stamp(utime_t
{seastar::lowres_system_clock::now()});
1007 // check received seq#. if it is old, drop the message.
1008 // note that incoming messages may skip ahead. this is convenient for the
1009 // client side queueing because messages can't be renumbered, but the (kernel)
1010 // client will occasionally pull a message out of the sent queue to send
1011 // elsewhere. in that case it doesn't matter if we "got" it or not.
1012 uint64_t cur_seq
= in_seq
;
1013 if (message
->get_seq() <= cur_seq
) {
1014 logger().error("{} got old message {} <= {} {}, discarding",
1015 conn
, message
->get_seq(), cur_seq
, *message
);
1016 if (HAVE_FEATURE(conn
.features
, RECONNECT_SEQ
) &&
1017 local_conf()->ms_die_on_old_message
) {
1018 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1020 return seastar::now();
1021 } else if (message
->get_seq() > cur_seq
+ 1) {
1022 logger().error("{} missed message? skipped from seq {} to {}",
1023 conn
, cur_seq
, message
->get_seq());
1024 if (local_conf()->ms_die_on_skipped_message
) {
1025 ceph_assert(0 == "skipped incoming seq");
1029 // note last received message.
1030 in_seq
= message
->get_seq();
1031 if (conn
.policy
.lossy
) {
1032 logger().debug("{} <== #{} === {} ({})",
1036 message
->get_type());
1038 logger().debug("{} <== #{},{} === {} ({})",
1041 current_header
.ack_seq
,
1043 message
->get_type());
1047 if (!conn
.policy
.lossy
) {
1049 notify_out_dispatch();
1052 ack_out_sent(current_header
.ack_seq
);
1054 // TODO: change MessageRef with seastar::shared_ptr
1055 auto msg_ref
= MessageRef
{message
, false};
1056 assert(ctx
.get_io_state() == io_state_t::open
);
1057 assert(get_io_state() == io_state_t::open
);
1058 ceph_assert_always(conn_ref
);
1060 // throttle the reading process by the returned future
1061 return dispatchers
.ms_dispatch(conn_ref
, std::move(msg_ref
));
1062 // user can make changes
1066 void IOHandler::do_in_dispatch()
1068 shard_states
->enter_in_dispatching();
1069 shard_states
->dispatch_in_background(
1070 "do_in_dispatch", conn
, [this, &ctx
=*shard_states
] {
1071 return seastar::keep_doing([this, &ctx
] {
1072 return frame_assembler
->read_main_preamble
<false>(
1073 ).then([this, &ctx
](auto ret
) {
1075 case Tag::MESSAGE
: {
1076 size_t msg_size
= get_msg_size(*ret
.rx_frame_asm
);
1077 return seastar::futurize_invoke([this] {
1078 // throttle_message() logic
1079 if (!conn
.policy
.throttler_messages
) {
1080 return seastar::now();
1082 // TODO: message throttler
1084 return seastar::now();
1085 }).then([this, msg_size
] {
1086 // throttle_bytes() logic
1087 if (!conn
.policy
.throttler_bytes
) {
1088 return seastar::now();
1091 return seastar::now();
1093 logger().trace("{} wants {} bytes from policy throttler {}/{}",
1095 conn
.policy
.throttler_bytes
->get_current(),
1096 conn
.policy
.throttler_bytes
->get_max());
1097 return conn
.policy
.throttler_bytes
->get(msg_size
);
1098 }).then([this, msg_size
, &ctx
] {
1099 // TODO: throttle_dispatch_queue() logic
1100 utime_t throttle_stamp
{seastar::lowres_system_clock::now()};
1101 return read_message(ctx
, throttle_stamp
, msg_size
);
1105 return frame_assembler
->read_frame_payload
<false>(
1106 ).then([this](auto payload
) {
1107 // handle_message_ack() logic
1108 auto ack
= AckFrame::Decode(payload
->back());
1109 logger().debug("{} GOT AckFrame: seq={}", conn
, ack
.seq());
1110 ack_out_sent(ack
.seq());
1112 case Tag::KEEPALIVE2
:
1113 return frame_assembler
->read_frame_payload
<false>(
1114 ).then([this](auto payload
) {
1115 // handle_keepalive2() logic
1116 auto keepalive_frame
= KeepAliveFrame::Decode(payload
->back());
1117 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
1118 conn
, keepalive_frame
.timestamp());
1119 // notify keepalive ack
1120 next_keepalive_ack
= keepalive_frame
.timestamp();
1121 if (seastar::this_shard_id() == get_shard_id()) {
1122 notify_out_dispatch();
1125 last_keepalive
= seastar::lowres_system_clock::now();
1127 case Tag::KEEPALIVE2_ACK
:
1128 return frame_assembler
->read_frame_payload
<false>(
1129 ).then([this](auto payload
) {
1130 // handle_keepalive2_ack() logic
1131 auto keepalive_ack_frame
= KeepAliveFrameAck::Decode(payload
->back());
1132 auto _last_keepalive_ack
=
1133 seastar::lowres_system_clock::time_point
{keepalive_ack_frame
.timestamp()};
1134 set_last_keepalive_ack(_last_keepalive_ack
);
1135 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
1136 conn
, _last_keepalive_ack
);
1139 logger().warn("{} do_in_dispatch() received unexpected tag: {}",
1140 conn
, static_cast<uint32_t>(ret
.tag
));
1145 }).handle_exception([this, &ctx
](std::exception_ptr eptr
) {
1148 std::rethrow_exception(eptr
);
1149 } catch (std::exception
&e
) {
1153 auto io_state
= ctx
.get_io_state();
1154 if (io_state
== io_state_t::open
) {
1155 auto cc_seq
= crosscore
.prepare_submit();
1156 logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
1157 "send {} notify_out_fault()",
1158 conn
, io_state
, io_stat_printer
{*this}, e_what
, cc_seq
);
1159 do_set_io_state(io_state_t::delay
);
1160 shard_states
->dispatch_in_background(
1161 "notify_out_fault(in)", conn
, [this, cc_seq
, eptr
] {
1162 auto states
= get_states();
1163 return seastar::smp::submit_to(
1164 conn
.get_messenger_shard_id(), [this, cc_seq
, eptr
, states
] {
1165 return handshake_listener
->notify_out_fault(
1166 cc_seq
, "do_in_dispatch", eptr
, states
);
1170 if (io_state
!= io_state_t::switched
) {
1171 logger().info("{} do_in_dispatch(): fault at {}, {} -- {}",
1172 conn
, io_state
, io_stat_printer
{*this}, e_what
);
1174 logger().info("{} do_in_dispatch(): fault at {} -- {}",
1175 conn
, io_state
, e_what
);
1179 ctx
.exit_in_dispatching();
1185 IOHandler::close_io(
1186 crosscore_t::seq_t cc_seq
,
1187 bool is_dispatch_reset
,
1190 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
1191 if (!crosscore
.proceed_or_wait(cc_seq
)) {
1192 logger().debug("{} got {} close_io(), wait at {}",
1193 conn
, cc_seq
, crosscore
.get_in_seq());
1194 return crosscore
.wait(cc_seq
1195 ).then([this, cc_seq
, is_dispatch_reset
, is_replace
] {
1196 return close_io(cc_seq
, is_dispatch_reset
, is_replace
);
1200 logger().debug("{} got {} close_io(reset={}, replace={})",
1201 conn
, cc_seq
, is_dispatch_reset
, is_replace
);
1202 ceph_assert_always(get_io_state() == io_state_t::drop
);
1204 if (is_dispatch_reset
) {
1205 dispatch_reset(is_replace
);
1208 ceph_assert_always(conn_ref
);
1211 // cannot be running in parallel with to_new_sid()
1212 if (maybe_dropped_sid
.has_value()) {
1213 assert(shard_states
->assert_closed_and_exit());
1214 auto prv_sid
= *maybe_dropped_sid
;
1215 return cleanup_prv_shard(prv_sid
);
1217 return shard_states
->close(
1219 assert(shard_states
->assert_closed_and_exit());
1225 * IOHandler::shard_states_t
1229 IOHandler::shard_states_t::notify_out_dispatching_stopped(
1230 const char *what
, SocketConnection
&conn
)
1232 assert(seastar::this_shard_id() == sid
);
1233 if (unlikely(out_exit_dispatching
.has_value())) {
1234 out_exit_dispatching
->set_value();
1235 out_exit_dispatching
= std::nullopt
;
1236 logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching",
1237 conn
, what
, io_state
);
1239 if (unlikely(io_state
!= io_state_t::open
)) {
1240 logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching",
1241 conn
, what
, io_state
);
1247 IOHandler::shard_states_t::wait_io_exit_dispatching()
1249 assert(seastar::this_shard_id() == sid
);
1250 assert(io_state
!= io_state_t::open
);
1251 assert(!gate
.is_closed());
1252 return seastar::when_all(
1254 if (out_exit_dispatching
) {
1255 return out_exit_dispatching
->get_future();
1257 return seastar::now();
1261 if (in_exit_dispatching
) {
1262 return in_exit_dispatching
->get_future();
1264 return seastar::now();
1270 IOHandler::shard_states_ref_t
1271 IOHandler::shard_states_t::create_from_previous(
1272 shard_states_t
&prv_states
,
1273 seastar::shard_id new_sid
)
1275 auto io_state
= prv_states
.io_state
;
1276 assert(io_state
!= io_state_t::open
);
1277 auto ret
= shard_states_t::create(new_sid
, io_state
);
1278 if (io_state
== io_state_t::drop
) {
1279 // the new gate should not never be used
1280 auto fut
= ret
->gate
.close();
1281 ceph_assert_always(fut
.available());
1283 prv_states
.set_io_state(io_state_t::switched
);
1287 } // namespace crimson::net