1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "io_handler.h"
8 #include "crimson/common/formatter.h"
9 #include "crimson/common/log.h"
10 #include "crimson/net/Errors.h"
11 #include "crimson/net/chained_dispatchers.h"
12 #include "crimson/net/SocketMessenger.h"
13 #include "msg/Message.h"
14 #include "msg/msg_fmt.h"
16 using namespace ceph::msgr::v2
;
17 using crimson::common::local_conf
;
21 seastar::logger
& logger() {
22 return crimson::get_logger(ceph_subsys_ms
);
25 [[noreturn
]] void abort_in_fault() {
26 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
29 [[noreturn
]] void abort_protocol() {
30 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted
));
33 std::size_t get_msg_size(const FrameAssembler
&rx_frame_asm
)
35 ceph_assert(rx_frame_asm
.get_num_segments() > 0);
37 // we don't include SegmentIndex::Msg::HEADER.
38 for (size_t idx
= 1; idx
< rx_frame_asm
.get_num_segments(); idx
++) {
39 sum
+= rx_frame_asm
.get_segment_logical_len(idx
);
44 } // namespace anonymous
46 namespace crimson::net
{
48 IOHandler::IOHandler(ChainedDispatchers
&dispatchers
,
49 SocketConnection
&conn
)
50 : dispatchers(dispatchers
),
52 conn_ref(conn
.get_local_shared_foreign_from_this())
55 IOHandler::~IOHandler()
57 ceph_assert(gate
.is_closed());
58 assert(!out_exit_dispatching
);
61 ceph::bufferlist
IOHandler::sweep_out_pending_msgs_to_sent(
62 bool require_keepalive
,
63 std::optional
<utime_t
> maybe_keepalive_ack
,
66 std::size_t num_msgs
= out_pending_msgs
.size();
69 if (unlikely(require_keepalive
)) {
70 auto keepalive_frame
= KeepAliveFrame::Encode();
71 bl
.append(frame_assembler
->get_buffer(keepalive_frame
));
74 if (unlikely(maybe_keepalive_ack
.has_value())) {
75 auto keepalive_ack_frame
= KeepAliveFrameAck::Encode(*maybe_keepalive_ack
);
76 bl
.append(frame_assembler
->get_buffer(keepalive_ack_frame
));
79 if (require_ack
&& num_msgs
== 0u) {
80 auto ack_frame
= AckFrame::Encode(get_in_seq());
81 bl
.append(frame_assembler
->get_buffer(ack_frame
));
85 out_pending_msgs
.begin(),
86 out_pending_msgs
.begin()+num_msgs
,
87 [this, &bl
](const MessageURef
& msg
) {
89 msg
->get_header().src
= conn
.messenger
.get_myname();
91 msg
->encode(conn
.features
, 0);
93 ceph_assert(!msg
->get_seq() && "message already has seq");
94 msg
->set_seq(++out_seq
);
96 ceph_msg_header
&header
= msg
->get_header();
97 ceph_msg_footer
&footer
= msg
->get_footer();
99 ceph_msg_header2 header2
{header
.seq
, header
.tid
,
100 header
.type
, header
.priority
,
102 ceph_le32(0), header
.data_off
,
103 ceph_le64(get_in_seq()),
104 footer
.flags
, header
.compat_version
,
107 auto message
= MessageFrame::Encode(header2
,
108 msg
->get_payload(), msg
->get_middle(), msg
->get_data());
109 logger().debug("{} --> #{} === {} ({})",
110 conn
, msg
->get_seq(), *msg
, msg
->get_type());
111 bl
.append(frame_assembler
->get_buffer(message
));
114 if (!conn
.policy
.lossy
) {
115 out_sent_msgs
.insert(
117 std::make_move_iterator(out_pending_msgs
.begin()),
118 std::make_move_iterator(out_pending_msgs
.end()));
120 out_pending_msgs
.clear();
124 seastar::future
<> IOHandler::send(MessageURef msg
)
126 if (io_state
!= io_state_t::drop
) {
127 out_pending_msgs
.push_back(std::move(msg
));
128 notify_out_dispatch();
130 return seastar::now();
133 seastar::future
<> IOHandler::send_keepalive()
135 if (!need_keepalive
) {
136 need_keepalive
= true;
137 notify_out_dispatch();
139 return seastar::now();
142 void IOHandler::mark_down()
144 ceph_assert_always(io_state
!= io_state_t::none
);
145 need_dispatch_reset
= false;
146 if (io_state
== io_state_t::drop
) {
150 logger().info("{} mark_down() with {}",
151 conn
, io_stat_printer
{*this});
152 set_io_state(io_state_t::drop
);
153 handshake_listener
->notify_mark_down();
156 void IOHandler::print_io_stat(std::ostream
&out
) const
159 << "io_state=" << fmt::format("{}", io_state
)
160 << ", in_seq=" << in_seq
161 << ", out_seq=" << out_seq
162 << ", out_pending_msgs_size=" << out_pending_msgs
.size()
163 << ", out_sent_msgs_size=" << out_sent_msgs
.size()
164 << ", need_ack=" << (ack_left
> 0)
165 << ", need_keepalive=" << need_keepalive
166 << ", need_keepalive_ack=" << bool(next_keepalive_ack
)
170 void IOHandler::set_io_state(
171 const IOHandler::io_state_t
&new_state
,
172 FrameAssemblerV2Ref fa
)
174 ceph_assert_always(!(
175 (new_state
== io_state_t::none
&& io_state
!= io_state_t::none
) ||
176 (new_state
== io_state_t::open
&& io_state
== io_state_t::open
) ||
177 (new_state
!= io_state_t::drop
&& io_state
== io_state_t::drop
)
180 bool dispatch_in
= false;
181 if (new_state
== io_state_t::open
) {
183 ceph_assert_always(protocol_is_connected
== true);
184 assert(fa
!= nullptr);
185 ceph_assert_always(frame_assembler
== nullptr);
186 frame_assembler
= std::move(fa
);
187 ceph_assert_always(frame_assembler
->is_socket_valid());
189 #ifdef UNIT_TESTS_BUILT
190 if (conn
.interceptor
) {
191 conn
.interceptor
->register_conn_ready(conn
);
194 } else if (io_state
== io_state_t::open
) {
196 ceph_assert_always(protocol_is_connected
== true);
197 protocol_is_connected
= false;
198 assert(fa
== nullptr);
199 ceph_assert_always(frame_assembler
->is_socket_valid());
200 frame_assembler
->shutdown_socket();
201 if (out_dispatching
) {
202 ceph_assert_always(!out_exit_dispatching
.has_value());
203 out_exit_dispatching
= seastar::promise
<>();
206 assert(fa
== nullptr);
209 if (io_state
!= new_state
) {
210 io_state
= new_state
;
211 io_state_changed
.set_value();
212 io_state_changed
= seastar::promise
<>();
224 seastar::future
<FrameAssemblerV2Ref
> IOHandler::wait_io_exit_dispatching()
226 ceph_assert_always(io_state
!= io_state_t::open
);
227 ceph_assert_always(frame_assembler
!= nullptr);
228 ceph_assert_always(!frame_assembler
->is_socket_valid());
229 return seastar::when_all(
231 if (out_exit_dispatching
) {
232 return out_exit_dispatching
->get_future();
234 return seastar::now();
238 if (in_exit_dispatching
) {
239 return in_exit_dispatching
->get_future();
241 return seastar::now();
244 ).discard_result().then([this] {
245 return std::move(frame_assembler
);
249 void IOHandler::reset_session(bool full
)
255 dispatch_remote_reset();
259 void IOHandler::requeue_out_sent()
261 assert(io_state
!= io_state_t::open
);
262 if (out_sent_msgs
.empty()) {
266 out_seq
-= out_sent_msgs
.size();
267 logger().debug("{} requeue {} items, revert out_seq to {}",
268 conn
, out_sent_msgs
.size(), out_seq
);
269 for (MessageURef
& msg
: out_sent_msgs
) {
270 msg
->clear_payload();
273 out_pending_msgs
.insert(
274 out_pending_msgs
.begin(),
275 std::make_move_iterator(out_sent_msgs
.begin()),
276 std::make_move_iterator(out_sent_msgs
.end()));
277 out_sent_msgs
.clear();
278 notify_out_dispatch();
281 void IOHandler::requeue_out_sent_up_to(seq_num_t seq
)
283 assert(io_state
!= io_state_t::open
);
284 if (out_sent_msgs
.empty() && out_pending_msgs
.empty()) {
285 logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
290 logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
291 conn
, seq
, out_sent_msgs
.size(), out_seq
);
292 while (!out_sent_msgs
.empty()) {
293 auto cur_seq
= out_sent_msgs
.front()->get_seq();
294 if (cur_seq
== 0 || cur_seq
> seq
) {
297 out_sent_msgs
.pop_front();
303 void IOHandler::reset_out()
305 assert(io_state
!= io_state_t::open
);
307 out_pending_msgs
.clear();
308 out_sent_msgs
.clear();
309 need_keepalive
= false;
310 next_keepalive_ack
= std::nullopt
;
314 void IOHandler::dispatch_accept()
316 if (io_state
== io_state_t::drop
) {
319 // protocol_is_connected can be from true to true here if the replacing is
320 // happening to a connected connection.
321 protocol_is_connected
= true;
322 dispatchers
.ms_handle_accept(conn_ref
);
325 void IOHandler::dispatch_connect()
327 if (io_state
== io_state_t::drop
) {
330 ceph_assert_always(protocol_is_connected
== false);
331 protocol_is_connected
= true;
332 dispatchers
.ms_handle_connect(conn_ref
);
335 void IOHandler::dispatch_reset(bool is_replace
)
337 ceph_assert_always(io_state
== io_state_t::drop
);
338 if (!need_dispatch_reset
) {
341 need_dispatch_reset
= false;
342 dispatchers
.ms_handle_reset(conn_ref
, is_replace
);
345 void IOHandler::dispatch_remote_reset()
347 if (io_state
== io_state_t::drop
) {
350 dispatchers
.ms_handle_remote_reset(conn_ref
);
353 void IOHandler::ack_out_sent(seq_num_t seq
)
355 if (conn
.policy
.lossy
) { // lossy connections don't keep sent messages
358 while (!out_sent_msgs
.empty() &&
359 out_sent_msgs
.front()->get_seq() <= seq
) {
360 logger().trace("{} got ack seq {} >= {}, pop {}",
361 conn
, seq
, out_sent_msgs
.front()->get_seq(),
362 *out_sent_msgs
.front());
363 out_sent_msgs
.pop_front();
367 seastar::future
<stop_t
> IOHandler::try_exit_out_dispatch() {
368 assert(!is_out_queued());
369 return frame_assembler
->flush(
371 if (!is_out_queued()) {
372 // still nothing pending to send after flush,
373 // the dispatching can ONLY stop now
374 ceph_assert(out_dispatching
);
375 out_dispatching
= false;
376 if (unlikely(out_exit_dispatching
.has_value())) {
377 out_exit_dispatching
->set_value();
378 out_exit_dispatching
= std::nullopt
;
379 logger().info("{} do_out_dispatch: nothing queued at {},"
380 " set out_exit_dispatching",
383 return seastar::make_ready_future
<stop_t
>(stop_t::yes
);
385 // something is pending to send during flushing
386 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
391 seastar::future
<> IOHandler::do_out_dispatch()
393 return seastar::repeat([this] {
395 case io_state_t::open
: {
396 bool still_queued
= is_out_queued();
397 if (unlikely(!still_queued
)) {
398 return try_exit_out_dispatch();
400 auto to_ack
= ack_left
;
401 assert(to_ack
== 0 || in_seq
> 0);
402 return frame_assembler
->write(
403 sweep_out_pending_msgs_to_sent(
404 need_keepalive
, next_keepalive_ack
, to_ack
> 0)
405 ).then([this, prv_keepalive_ack
=next_keepalive_ack
, to_ack
] {
406 need_keepalive
= false;
407 if (next_keepalive_ack
== prv_keepalive_ack
) {
408 next_keepalive_ack
= std::nullopt
;
410 assert(ack_left
>= to_ack
);
412 if (!is_out_queued()) {
413 return try_exit_out_dispatch();
415 // messages were enqueued during socket write
416 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
420 case io_state_t::delay
:
421 // delay out dispatching until open
422 if (out_exit_dispatching
) {
423 out_exit_dispatching
->set_value();
424 out_exit_dispatching
= std::nullopt
;
425 logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn
);
427 logger().info("{} do_out_dispatch: delay ...", conn
);
429 return io_state_changed
.get_future(
430 ).then([] { return stop_t::no
; });
431 case io_state_t::drop
:
432 ceph_assert(out_dispatching
);
433 out_dispatching
= false;
434 if (out_exit_dispatching
) {
435 out_exit_dispatching
->set_value();
436 out_exit_dispatching
= std::nullopt
;
437 logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn
);
439 logger().info("{} do_out_dispatch: dropped", conn
);
441 return seastar::make_ready_future
<stop_t
>(stop_t::yes
);
445 }).handle_exception_type([this] (const std::system_error
& e
) {
446 if (e
.code() != std::errc::broken_pipe
&&
447 e
.code() != std::errc::connection_reset
&&
448 e
.code() != error::negotiation_failure
) {
449 logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
450 conn
, io_state
, e
.what());
454 if (io_state
== io_state_t::open
) {
455 logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
456 conn
, io_state
, e
.what());
457 std::exception_ptr eptr
;
461 eptr
= std::current_exception();
463 set_io_state(io_state_t::delay
);
464 handshake_listener
->notify_out_fault("do_out_dispatch", eptr
);
466 logger().info("{} do_out_dispatch(): fault at {} -- {}",
467 conn
, io_state
, e
.what());
470 return do_out_dispatch();
474 void IOHandler::notify_out_dispatch()
476 handshake_listener
->notify_out();
477 if (out_dispatching
) {
478 // already dispatching
481 out_dispatching
= true;
483 case io_state_t::open
:
485 case io_state_t::delay
:
486 assert(!gate
.is_closed());
487 gate
.dispatch_in_background("do_out_dispatch", conn
, [this] {
488 return do_out_dispatch();
491 case io_state_t::drop
:
492 out_dispatching
= false;
500 IOHandler::read_message(utime_t throttle_stamp
, std::size_t msg_size
)
502 return frame_assembler
->read_frame_payload(
503 ).then([this, throttle_stamp
, msg_size
](auto payload
) {
504 if (unlikely(io_state
!= io_state_t::open
)) {
505 logger().debug("{} triggered {} during read_message()",
510 utime_t recv_stamp
{seastar::lowres_system_clock::now()};
512 // we need to get the size before std::moving segments data
513 auto msg_frame
= MessageFrame::Decode(*payload
);
514 // XXX: paranoid copy just to avoid oops
515 ceph_msg_header2 current_header
= msg_frame
.header();
517 logger().trace("{} got {} + {} + {} byte message,"
518 " envelope type={} src={} off={} seq={}",
520 msg_frame
.front_len(),
521 msg_frame
.middle_len(),
522 msg_frame
.data_len(),
524 conn
.get_peer_name(),
525 current_header
.data_off
,
528 ceph_msg_header header
{current_header
.seq
,
531 current_header
.priority
,
532 current_header
.version
,
533 ceph_le32(msg_frame
.front_len()),
534 ceph_le32(msg_frame
.middle_len()),
535 ceph_le32(msg_frame
.data_len()),
536 current_header
.data_off
,
537 conn
.get_peer_name(),
538 current_header
.compat_version
,
539 current_header
.reserved
,
541 ceph_msg_footer footer
{ceph_le32(0), ceph_le32(0),
542 ceph_le32(0), ceph_le64(0), current_header
.flags
};
544 Message
*message
= decode_message(nullptr, 0, header
, footer
,
545 msg_frame
.front(), msg_frame
.middle(), msg_frame
.data(), nullptr);
547 logger().warn("{} decode message failed", conn
);
551 // store reservation size in message, so we don't get confused
552 // by messages entering the dispatch queue through other paths.
553 message
->set_dispatch_throttle_size(msg_size
);
555 message
->set_throttle_stamp(throttle_stamp
);
556 message
->set_recv_stamp(recv_stamp
);
557 message
->set_recv_complete_stamp(utime_t
{seastar::lowres_system_clock::now()});
559 // check received seq#. if it is old, drop the message.
560 // note that incoming messages may skip ahead. this is convenient for the
561 // client side queueing because messages can't be renumbered, but the (kernel)
562 // client will occasionally pull a message out of the sent queue to send
563 // elsewhere. in that case it doesn't matter if we "got" it or not.
564 uint64_t cur_seq
= get_in_seq();
565 if (message
->get_seq() <= cur_seq
) {
566 logger().error("{} got old message {} <= {} {}, discarding",
567 conn
, message
->get_seq(), cur_seq
, *message
);
568 if (HAVE_FEATURE(conn
.features
, RECONNECT_SEQ
) &&
569 local_conf()->ms_die_on_old_message
) {
570 ceph_assert(0 == "old msgs despite reconnect_seq feature");
572 return seastar::now();
573 } else if (message
->get_seq() > cur_seq
+ 1) {
574 logger().error("{} missed message? skipped from seq {} to {}",
575 conn
, cur_seq
, message
->get_seq());
576 if (local_conf()->ms_die_on_skipped_message
) {
577 ceph_assert(0 == "skipped incoming seq");
581 // note last received message.
582 in_seq
= message
->get_seq();
583 if (conn
.policy
.lossy
) {
584 logger().debug("{} <== #{} === {} ({})",
588 message
->get_type());
590 logger().debug("{} <== #{},{} === {} ({})",
593 current_header
.ack_seq
,
595 message
->get_type());
599 if (!conn
.policy
.lossy
) {
601 notify_out_dispatch();
604 ack_out_sent(current_header
.ack_seq
);
606 // TODO: change MessageRef with seastar::shared_ptr
607 auto msg_ref
= MessageRef
{message
, false};
608 assert(io_state
== io_state_t::open
);
609 // throttle the reading process by the returned future
610 return dispatchers
.ms_dispatch(conn_ref
, std::move(msg_ref
));
614 void IOHandler::do_in_dispatch()
616 ceph_assert_always(!in_exit_dispatching
.has_value());
617 in_exit_dispatching
= seastar::promise
<>();
618 gate
.dispatch_in_background("do_in_dispatch", conn
, [this] {
619 return seastar::keep_doing([this] {
620 return frame_assembler
->read_main_preamble(
621 ).then([this](auto ret
) {
624 size_t msg_size
= get_msg_size(*ret
.rx_frame_asm
);
625 return seastar::futurize_invoke([this] {
626 // throttle_message() logic
627 if (!conn
.policy
.throttler_messages
) {
628 return seastar::now();
630 // TODO: message throttler
632 return seastar::now();
633 }).then([this, msg_size
] {
634 // throttle_bytes() logic
635 if (!conn
.policy
.throttler_bytes
) {
636 return seastar::now();
639 return seastar::now();
641 logger().trace("{} wants {} bytes from policy throttler {}/{}",
643 conn
.policy
.throttler_bytes
->get_current(),
644 conn
.policy
.throttler_bytes
->get_max());
645 return conn
.policy
.throttler_bytes
->get(msg_size
);
646 }).then([this, msg_size
] {
647 // TODO: throttle_dispatch_queue() logic
648 utime_t throttle_stamp
{seastar::lowres_system_clock::now()};
649 return read_message(throttle_stamp
, msg_size
);
653 return frame_assembler
->read_frame_payload(
654 ).then([this](auto payload
) {
655 // handle_message_ack() logic
656 auto ack
= AckFrame::Decode(payload
->back());
657 logger().debug("{} GOT AckFrame: seq={}", conn
, ack
.seq());
658 ack_out_sent(ack
.seq());
660 case Tag::KEEPALIVE2
:
661 return frame_assembler
->read_frame_payload(
662 ).then([this](auto payload
) {
663 // handle_keepalive2() logic
664 auto keepalive_frame
= KeepAliveFrame::Decode(payload
->back());
665 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
666 conn
, keepalive_frame
.timestamp());
667 // notify keepalive ack
668 next_keepalive_ack
= keepalive_frame
.timestamp();
669 notify_out_dispatch();
671 last_keepalive
= seastar::lowres_system_clock::now();
673 case Tag::KEEPALIVE2_ACK
:
674 return frame_assembler
->read_frame_payload(
675 ).then([this](auto payload
) {
676 // handle_keepalive2_ack() logic
677 auto keepalive_ack_frame
= KeepAliveFrameAck::Decode(payload
->back());
678 auto _last_keepalive_ack
=
679 seastar::lowres_system_clock::time_point
{keepalive_ack_frame
.timestamp()};
680 set_last_keepalive_ack(_last_keepalive_ack
);
681 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
682 conn
, _last_keepalive_ack
);
685 logger().warn("{} do_in_dispatch() received unexpected tag: {}",
686 conn
, static_cast<uint32_t>(ret
.tag
));
691 }).handle_exception([this](std::exception_ptr eptr
) {
694 std::rethrow_exception(eptr
);
695 } catch (std::exception
&e
) {
699 if (io_state
== io_state_t::open
) {
700 logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
701 conn
, io_state
, e_what
);
702 set_io_state(io_state_t::delay
);
703 handshake_listener
->notify_out_fault("do_in_dispatch", eptr
);
705 logger().info("{} do_in_dispatch(): fault at {} -- {}",
706 conn
, io_state
, e_what
);
709 ceph_assert_always(in_exit_dispatching
.has_value());
710 in_exit_dispatching
->set_value();
711 in_exit_dispatching
= std::nullopt
;
716 } // namespace crimson::net