]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/io_handler.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / net / io_handler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "io_handler.h"
5
6 #include "auth/Auth.h"
7
8 #include "crimson/common/formatter.h"
9 #include "crimson/common/log.h"
10 #include "crimson/net/Errors.h"
11 #include "crimson/net/chained_dispatchers.h"
12 #include "crimson/net/SocketMessenger.h"
13 #include "msg/Message.h"
14 #include "msg/msg_fmt.h"
15
16 using namespace ceph::msgr::v2;
17 using crimson::common::local_conf;
18
19 namespace {
20
21 seastar::logger& logger() {
22 return crimson::get_logger(ceph_subsys_ms);
23 }
24
25 [[noreturn]] void abort_in_fault() {
26 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
27 }
28
29 [[noreturn]] void abort_protocol() {
30 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
31 }
32
33 std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
34 {
35 ceph_assert(rx_frame_asm.get_num_segments() > 0);
36 size_t sum = 0;
37 // we don't include SegmentIndex::Msg::HEADER.
38 for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
39 sum += rx_frame_asm.get_segment_logical_len(idx);
40 }
41 return sum;
42 }
43
44 } // namespace anonymous
45
46 namespace crimson::net {
47
48 IOHandler::IOHandler(ChainedDispatchers &dispatchers,
49 SocketConnection &conn)
50 : dispatchers(dispatchers),
51 conn(conn),
52 conn_ref(conn.get_local_shared_foreign_from_this())
53 {}
54
55 IOHandler::~IOHandler()
56 {
57 ceph_assert(gate.is_closed());
58 assert(!out_exit_dispatching);
59 }
60
61 ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
62 bool require_keepalive,
63 std::optional<utime_t> maybe_keepalive_ack,
64 bool require_ack)
65 {
66 std::size_t num_msgs = out_pending_msgs.size();
67 ceph::bufferlist bl;
68
69 if (unlikely(require_keepalive)) {
70 auto keepalive_frame = KeepAliveFrame::Encode();
71 bl.append(frame_assembler->get_buffer(keepalive_frame));
72 }
73
74 if (unlikely(maybe_keepalive_ack.has_value())) {
75 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
76 bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
77 }
78
79 if (require_ack && num_msgs == 0u) {
80 auto ack_frame = AckFrame::Encode(get_in_seq());
81 bl.append(frame_assembler->get_buffer(ack_frame));
82 }
83
84 std::for_each(
85 out_pending_msgs.begin(),
86 out_pending_msgs.begin()+num_msgs,
87 [this, &bl](const MessageURef& msg) {
88 // set priority
89 msg->get_header().src = conn.messenger.get_myname();
90
91 msg->encode(conn.features, 0);
92
93 ceph_assert(!msg->get_seq() && "message already has seq");
94 msg->set_seq(++out_seq);
95
96 ceph_msg_header &header = msg->get_header();
97 ceph_msg_footer &footer = msg->get_footer();
98
99 ceph_msg_header2 header2{header.seq, header.tid,
100 header.type, header.priority,
101 header.version,
102 ceph_le32(0), header.data_off,
103 ceph_le64(get_in_seq()),
104 footer.flags, header.compat_version,
105 header.reserved};
106
107 auto message = MessageFrame::Encode(header2,
108 msg->get_payload(), msg->get_middle(), msg->get_data());
109 logger().debug("{} --> #{} === {} ({})",
110 conn, msg->get_seq(), *msg, msg->get_type());
111 bl.append(frame_assembler->get_buffer(message));
112 });
113
114 if (!conn.policy.lossy) {
115 out_sent_msgs.insert(
116 out_sent_msgs.end(),
117 std::make_move_iterator(out_pending_msgs.begin()),
118 std::make_move_iterator(out_pending_msgs.end()));
119 }
120 out_pending_msgs.clear();
121 return bl;
122 }
123
124 seastar::future<> IOHandler::send(MessageURef msg)
125 {
126 if (io_state != io_state_t::drop) {
127 out_pending_msgs.push_back(std::move(msg));
128 notify_out_dispatch();
129 }
130 return seastar::now();
131 }
132
133 seastar::future<> IOHandler::send_keepalive()
134 {
135 if (!need_keepalive) {
136 need_keepalive = true;
137 notify_out_dispatch();
138 }
139 return seastar::now();
140 }
141
142 void IOHandler::mark_down()
143 {
144 ceph_assert_always(io_state != io_state_t::none);
145 need_dispatch_reset = false;
146 if (io_state == io_state_t::drop) {
147 return;
148 }
149
150 logger().info("{} mark_down() with {}",
151 conn, io_stat_printer{*this});
152 set_io_state(io_state_t::drop);
153 handshake_listener->notify_mark_down();
154 }
155
156 void IOHandler::print_io_stat(std::ostream &out) const
157 {
158 out << "io_stat("
159 << "io_state=" << fmt::format("{}", io_state)
160 << ", in_seq=" << in_seq
161 << ", out_seq=" << out_seq
162 << ", out_pending_msgs_size=" << out_pending_msgs.size()
163 << ", out_sent_msgs_size=" << out_sent_msgs.size()
164 << ", need_ack=" << (ack_left > 0)
165 << ", need_keepalive=" << need_keepalive
166 << ", need_keepalive_ack=" << bool(next_keepalive_ack)
167 << ")";
168 }
169
170 void IOHandler::set_io_state(
171 const IOHandler::io_state_t &new_state,
172 FrameAssemblerV2Ref fa)
173 {
174 ceph_assert_always(!(
175 (new_state == io_state_t::none && io_state != io_state_t::none) ||
176 (new_state == io_state_t::open && io_state == io_state_t::open) ||
177 (new_state != io_state_t::drop && io_state == io_state_t::drop)
178 ));
179
180 bool dispatch_in = false;
181 if (new_state == io_state_t::open) {
182 // to open
183 ceph_assert_always(protocol_is_connected == true);
184 assert(fa != nullptr);
185 ceph_assert_always(frame_assembler == nullptr);
186 frame_assembler = std::move(fa);
187 ceph_assert_always(frame_assembler->is_socket_valid());
188 dispatch_in = true;
189 #ifdef UNIT_TESTS_BUILT
190 if (conn.interceptor) {
191 conn.interceptor->register_conn_ready(conn);
192 }
193 #endif
194 } else if (io_state == io_state_t::open) {
195 // from open
196 ceph_assert_always(protocol_is_connected == true);
197 protocol_is_connected = false;
198 assert(fa == nullptr);
199 ceph_assert_always(frame_assembler->is_socket_valid());
200 frame_assembler->shutdown_socket();
201 if (out_dispatching) {
202 ceph_assert_always(!out_exit_dispatching.has_value());
203 out_exit_dispatching = seastar::promise<>();
204 }
205 } else {
206 assert(fa == nullptr);
207 }
208
209 if (io_state != new_state) {
210 io_state = new_state;
211 io_state_changed.set_value();
212 io_state_changed = seastar::promise<>();
213 }
214
215 /*
216 * not atomic below
217 */
218
219 if (dispatch_in) {
220 do_in_dispatch();
221 }
222 }
223
224 seastar::future<FrameAssemblerV2Ref> IOHandler::wait_io_exit_dispatching()
225 {
226 ceph_assert_always(io_state != io_state_t::open);
227 ceph_assert_always(frame_assembler != nullptr);
228 ceph_assert_always(!frame_assembler->is_socket_valid());
229 return seastar::when_all(
230 [this] {
231 if (out_exit_dispatching) {
232 return out_exit_dispatching->get_future();
233 } else {
234 return seastar::now();
235 }
236 }(),
237 [this] {
238 if (in_exit_dispatching) {
239 return in_exit_dispatching->get_future();
240 } else {
241 return seastar::now();
242 }
243 }()
244 ).discard_result().then([this] {
245 return std::move(frame_assembler);
246 });
247 }
248
249 void IOHandler::reset_session(bool full)
250 {
251 // reset in
252 in_seq = 0;
253 if (full) {
254 reset_out();
255 dispatch_remote_reset();
256 }
257 }
258
259 void IOHandler::requeue_out_sent()
260 {
261 assert(io_state != io_state_t::open);
262 if (out_sent_msgs.empty()) {
263 return;
264 }
265
266 out_seq -= out_sent_msgs.size();
267 logger().debug("{} requeue {} items, revert out_seq to {}",
268 conn, out_sent_msgs.size(), out_seq);
269 for (MessageURef& msg : out_sent_msgs) {
270 msg->clear_payload();
271 msg->set_seq(0);
272 }
273 out_pending_msgs.insert(
274 out_pending_msgs.begin(),
275 std::make_move_iterator(out_sent_msgs.begin()),
276 std::make_move_iterator(out_sent_msgs.end()));
277 out_sent_msgs.clear();
278 notify_out_dispatch();
279 }
280
281 void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
282 {
283 assert(io_state != io_state_t::open);
284 if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
285 logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
286 conn, out_seq, seq);
287 out_seq = seq;
288 return;
289 }
290 logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
291 conn, seq, out_sent_msgs.size(), out_seq);
292 while (!out_sent_msgs.empty()) {
293 auto cur_seq = out_sent_msgs.front()->get_seq();
294 if (cur_seq == 0 || cur_seq > seq) {
295 break;
296 } else {
297 out_sent_msgs.pop_front();
298 }
299 }
300 requeue_out_sent();
301 }
302
303 void IOHandler::reset_out()
304 {
305 assert(io_state != io_state_t::open);
306 out_seq = 0;
307 out_pending_msgs.clear();
308 out_sent_msgs.clear();
309 need_keepalive = false;
310 next_keepalive_ack = std::nullopt;
311 ack_left = 0;
312 }
313
314 void IOHandler::dispatch_accept()
315 {
316 if (io_state == io_state_t::drop) {
317 return;
318 }
319 // protocol_is_connected can be from true to true here if the replacing is
320 // happening to a connected connection.
321 protocol_is_connected = true;
322 dispatchers.ms_handle_accept(conn_ref);
323 }
324
325 void IOHandler::dispatch_connect()
326 {
327 if (io_state == io_state_t::drop) {
328 return;
329 }
330 ceph_assert_always(protocol_is_connected == false);
331 protocol_is_connected = true;
332 dispatchers.ms_handle_connect(conn_ref);
333 }
334
335 void IOHandler::dispatch_reset(bool is_replace)
336 {
337 ceph_assert_always(io_state == io_state_t::drop);
338 if (!need_dispatch_reset) {
339 return;
340 }
341 need_dispatch_reset = false;
342 dispatchers.ms_handle_reset(conn_ref, is_replace);
343 }
344
345 void IOHandler::dispatch_remote_reset()
346 {
347 if (io_state == io_state_t::drop) {
348 return;
349 }
350 dispatchers.ms_handle_remote_reset(conn_ref);
351 }
352
353 void IOHandler::ack_out_sent(seq_num_t seq)
354 {
355 if (conn.policy.lossy) { // lossy connections don't keep sent messages
356 return;
357 }
358 while (!out_sent_msgs.empty() &&
359 out_sent_msgs.front()->get_seq() <= seq) {
360 logger().trace("{} got ack seq {} >= {}, pop {}",
361 conn, seq, out_sent_msgs.front()->get_seq(),
362 *out_sent_msgs.front());
363 out_sent_msgs.pop_front();
364 }
365 }
366
367 seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
368 assert(!is_out_queued());
369 return frame_assembler->flush(
370 ).then([this] {
371 if (!is_out_queued()) {
372 // still nothing pending to send after flush,
373 // the dispatching can ONLY stop now
374 ceph_assert(out_dispatching);
375 out_dispatching = false;
376 if (unlikely(out_exit_dispatching.has_value())) {
377 out_exit_dispatching->set_value();
378 out_exit_dispatching = std::nullopt;
379 logger().info("{} do_out_dispatch: nothing queued at {},"
380 " set out_exit_dispatching",
381 conn, io_state);
382 }
383 return seastar::make_ready_future<stop_t>(stop_t::yes);
384 } else {
385 // something is pending to send during flushing
386 return seastar::make_ready_future<stop_t>(stop_t::no);
387 }
388 });
389 }
390
391 seastar::future<> IOHandler::do_out_dispatch()
392 {
393 return seastar::repeat([this] {
394 switch (io_state) {
395 case io_state_t::open: {
396 bool still_queued = is_out_queued();
397 if (unlikely(!still_queued)) {
398 return try_exit_out_dispatch();
399 }
400 auto to_ack = ack_left;
401 assert(to_ack == 0 || in_seq > 0);
402 return frame_assembler->write(
403 sweep_out_pending_msgs_to_sent(
404 need_keepalive, next_keepalive_ack, to_ack > 0)
405 ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
406 need_keepalive = false;
407 if (next_keepalive_ack == prv_keepalive_ack) {
408 next_keepalive_ack = std::nullopt;
409 }
410 assert(ack_left >= to_ack);
411 ack_left -= to_ack;
412 if (!is_out_queued()) {
413 return try_exit_out_dispatch();
414 } else {
415 // messages were enqueued during socket write
416 return seastar::make_ready_future<stop_t>(stop_t::no);
417 }
418 });
419 }
420 case io_state_t::delay:
421 // delay out dispatching until open
422 if (out_exit_dispatching) {
423 out_exit_dispatching->set_value();
424 out_exit_dispatching = std::nullopt;
425 logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
426 } else {
427 logger().info("{} do_out_dispatch: delay ...", conn);
428 }
429 return io_state_changed.get_future(
430 ).then([] { return stop_t::no; });
431 case io_state_t::drop:
432 ceph_assert(out_dispatching);
433 out_dispatching = false;
434 if (out_exit_dispatching) {
435 out_exit_dispatching->set_value();
436 out_exit_dispatching = std::nullopt;
437 logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
438 } else {
439 logger().info("{} do_out_dispatch: dropped", conn);
440 }
441 return seastar::make_ready_future<stop_t>(stop_t::yes);
442 default:
443 ceph_assert(false);
444 }
445 }).handle_exception_type([this] (const std::system_error& e) {
446 if (e.code() != std::errc::broken_pipe &&
447 e.code() != std::errc::connection_reset &&
448 e.code() != error::negotiation_failure) {
449 logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
450 conn, io_state, e.what());
451 ceph_abort();
452 }
453
454 if (io_state == io_state_t::open) {
455 logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
456 conn, io_state, e.what());
457 std::exception_ptr eptr;
458 try {
459 throw e;
460 } catch(...) {
461 eptr = std::current_exception();
462 }
463 set_io_state(io_state_t::delay);
464 handshake_listener->notify_out_fault("do_out_dispatch", eptr);
465 } else {
466 logger().info("{} do_out_dispatch(): fault at {} -- {}",
467 conn, io_state, e.what());
468 }
469
470 return do_out_dispatch();
471 });
472 }
473
474 void IOHandler::notify_out_dispatch()
475 {
476 handshake_listener->notify_out();
477 if (out_dispatching) {
478 // already dispatching
479 return;
480 }
481 out_dispatching = true;
482 switch (io_state) {
483 case io_state_t::open:
484 [[fallthrough]];
485 case io_state_t::delay:
486 assert(!gate.is_closed());
487 gate.dispatch_in_background("do_out_dispatch", conn, [this] {
488 return do_out_dispatch();
489 });
490 return;
491 case io_state_t::drop:
492 out_dispatching = false;
493 return;
494 default:
495 ceph_assert(false);
496 }
497 }
498
499 seastar::future<>
500 IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
501 {
502 return frame_assembler->read_frame_payload(
503 ).then([this, throttle_stamp, msg_size](auto payload) {
504 if (unlikely(io_state != io_state_t::open)) {
505 logger().debug("{} triggered {} during read_message()",
506 conn, io_state);
507 abort_protocol();
508 }
509
510 utime_t recv_stamp{seastar::lowres_system_clock::now()};
511
512 // we need to get the size before std::moving segments data
513 auto msg_frame = MessageFrame::Decode(*payload);
514 // XXX: paranoid copy just to avoid oops
515 ceph_msg_header2 current_header = msg_frame.header();
516
517 logger().trace("{} got {} + {} + {} byte message,"
518 " envelope type={} src={} off={} seq={}",
519 conn,
520 msg_frame.front_len(),
521 msg_frame.middle_len(),
522 msg_frame.data_len(),
523 current_header.type,
524 conn.get_peer_name(),
525 current_header.data_off,
526 current_header.seq);
527
528 ceph_msg_header header{current_header.seq,
529 current_header.tid,
530 current_header.type,
531 current_header.priority,
532 current_header.version,
533 ceph_le32(msg_frame.front_len()),
534 ceph_le32(msg_frame.middle_len()),
535 ceph_le32(msg_frame.data_len()),
536 current_header.data_off,
537 conn.get_peer_name(),
538 current_header.compat_version,
539 current_header.reserved,
540 ceph_le32(0)};
541 ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
542 ceph_le32(0), ceph_le64(0), current_header.flags};
543
544 Message *message = decode_message(nullptr, 0, header, footer,
545 msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr);
546 if (!message) {
547 logger().warn("{} decode message failed", conn);
548 abort_in_fault();
549 }
550
551 // store reservation size in message, so we don't get confused
552 // by messages entering the dispatch queue through other paths.
553 message->set_dispatch_throttle_size(msg_size);
554
555 message->set_throttle_stamp(throttle_stamp);
556 message->set_recv_stamp(recv_stamp);
557 message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
558
559 // check received seq#. if it is old, drop the message.
560 // note that incoming messages may skip ahead. this is convenient for the
561 // client side queueing because messages can't be renumbered, but the (kernel)
562 // client will occasionally pull a message out of the sent queue to send
563 // elsewhere. in that case it doesn't matter if we "got" it or not.
564 uint64_t cur_seq = get_in_seq();
565 if (message->get_seq() <= cur_seq) {
566 logger().error("{} got old message {} <= {} {}, discarding",
567 conn, message->get_seq(), cur_seq, *message);
568 if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
569 local_conf()->ms_die_on_old_message) {
570 ceph_assert(0 == "old msgs despite reconnect_seq feature");
571 }
572 return seastar::now();
573 } else if (message->get_seq() > cur_seq + 1) {
574 logger().error("{} missed message? skipped from seq {} to {}",
575 conn, cur_seq, message->get_seq());
576 if (local_conf()->ms_die_on_skipped_message) {
577 ceph_assert(0 == "skipped incoming seq");
578 }
579 }
580
581 // note last received message.
582 in_seq = message->get_seq();
583 if (conn.policy.lossy) {
584 logger().debug("{} <== #{} === {} ({})",
585 conn,
586 message->get_seq(),
587 *message,
588 message->get_type());
589 } else {
590 logger().debug("{} <== #{},{} === {} ({})",
591 conn,
592 message->get_seq(),
593 current_header.ack_seq,
594 *message,
595 message->get_type());
596 }
597
598 // notify ack
599 if (!conn.policy.lossy) {
600 ++ack_left;
601 notify_out_dispatch();
602 }
603
604 ack_out_sent(current_header.ack_seq);
605
606 // TODO: change MessageRef with seastar::shared_ptr
607 auto msg_ref = MessageRef{message, false};
608 assert(io_state == io_state_t::open);
609 // throttle the reading process by the returned future
610 return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
611 });
612 }
613
614 void IOHandler::do_in_dispatch()
615 {
616 ceph_assert_always(!in_exit_dispatching.has_value());
617 in_exit_dispatching = seastar::promise<>();
618 gate.dispatch_in_background("do_in_dispatch", conn, [this] {
619 return seastar::keep_doing([this] {
620 return frame_assembler->read_main_preamble(
621 ).then([this](auto ret) {
622 switch (ret.tag) {
623 case Tag::MESSAGE: {
624 size_t msg_size = get_msg_size(*ret.rx_frame_asm);
625 return seastar::futurize_invoke([this] {
626 // throttle_message() logic
627 if (!conn.policy.throttler_messages) {
628 return seastar::now();
629 }
630 // TODO: message throttler
631 ceph_assert(false);
632 return seastar::now();
633 }).then([this, msg_size] {
634 // throttle_bytes() logic
635 if (!conn.policy.throttler_bytes) {
636 return seastar::now();
637 }
638 if (!msg_size) {
639 return seastar::now();
640 }
641 logger().trace("{} wants {} bytes from policy throttler {}/{}",
642 conn, msg_size,
643 conn.policy.throttler_bytes->get_current(),
644 conn.policy.throttler_bytes->get_max());
645 return conn.policy.throttler_bytes->get(msg_size);
646 }).then([this, msg_size] {
647 // TODO: throttle_dispatch_queue() logic
648 utime_t throttle_stamp{seastar::lowres_system_clock::now()};
649 return read_message(throttle_stamp, msg_size);
650 });
651 }
652 case Tag::ACK:
653 return frame_assembler->read_frame_payload(
654 ).then([this](auto payload) {
655 // handle_message_ack() logic
656 auto ack = AckFrame::Decode(payload->back());
657 logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
658 ack_out_sent(ack.seq());
659 });
660 case Tag::KEEPALIVE2:
661 return frame_assembler->read_frame_payload(
662 ).then([this](auto payload) {
663 // handle_keepalive2() logic
664 auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
665 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
666 conn, keepalive_frame.timestamp());
667 // notify keepalive ack
668 next_keepalive_ack = keepalive_frame.timestamp();
669 notify_out_dispatch();
670
671 last_keepalive = seastar::lowres_system_clock::now();
672 });
673 case Tag::KEEPALIVE2_ACK:
674 return frame_assembler->read_frame_payload(
675 ).then([this](auto payload) {
676 // handle_keepalive2_ack() logic
677 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
678 auto _last_keepalive_ack =
679 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
680 set_last_keepalive_ack(_last_keepalive_ack);
681 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
682 conn, _last_keepalive_ack);
683 });
684 default: {
685 logger().warn("{} do_in_dispatch() received unexpected tag: {}",
686 conn, static_cast<uint32_t>(ret.tag));
687 abort_in_fault();
688 }
689 }
690 });
691 }).handle_exception([this](std::exception_ptr eptr) {
692 const char *e_what;
693 try {
694 std::rethrow_exception(eptr);
695 } catch (std::exception &e) {
696 e_what = e.what();
697 }
698
699 if (io_state == io_state_t::open) {
700 logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
701 conn, io_state, e_what);
702 set_io_state(io_state_t::delay);
703 handshake_listener->notify_out_fault("do_in_dispatch", eptr);
704 } else {
705 logger().info("{} do_in_dispatch(): fault at {} -- {}",
706 conn, io_state, e_what);
707 }
708 }).finally([this] {
709 ceph_assert_always(in_exit_dispatching.has_value());
710 in_exit_dispatching->set_value();
711 in_exit_dispatching = std::nullopt;
712 });
713 });
714 }
715
716 } // namespace crimson::net