]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/io_handler.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / io_handler.cc
CommitLineData
1e59de90
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "io_handler.h"
5
6#include "auth/Auth.h"
7
8#include "crimson/common/formatter.h"
9#include "crimson/common/log.h"
10#include "crimson/net/Errors.h"
11#include "crimson/net/chained_dispatchers.h"
12#include "crimson/net/SocketMessenger.h"
13#include "msg/Message.h"
14#include "msg/msg_fmt.h"
15
16using namespace ceph::msgr::v2;
17using crimson::common::local_conf;
18
19namespace {
20
21seastar::logger& logger() {
22 return crimson::get_logger(ceph_subsys_ms);
23}
24
25[[noreturn]] void abort_in_fault() {
26 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
27}
28
29[[noreturn]] void abort_protocol() {
30 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
31}
32
33std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
34{
35 ceph_assert(rx_frame_asm.get_num_segments() > 0);
36 size_t sum = 0;
37 // we don't include SegmentIndex::Msg::HEADER.
38 for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
39 sum += rx_frame_asm.get_segment_logical_len(idx);
40 }
41 return sum;
42}
43
44} // namespace anonymous
45
46namespace crimson::net {
47
48IOHandler::IOHandler(ChainedDispatchers &dispatchers,
49 SocketConnection &conn)
aee94f69
TL
50 : shard_states(shard_states_t::create(
51 seastar::this_shard_id(), io_state_t::none)),
52 dispatchers(dispatchers),
1e59de90
TL
53 conn(conn),
54 conn_ref(conn.get_local_shared_foreign_from_this())
55{}
56
57IOHandler::~IOHandler()
58{
aee94f69
TL
59 // close_io() must be finished
60 ceph_assert_always(maybe_prv_shard_states == nullptr);
61 // should be true in the according shard
62 // ceph_assert_always(shard_states->assert_closed_and_exit());
63 assert(!conn_ref);
1e59de90
TL
64}
65
aee94f69
TL
66#ifdef UNIT_TESTS_BUILT
67IOHandler::sweep_ret
68#else
69ceph::bufferlist
70#endif
71IOHandler::sweep_out_pending_msgs_to_sent(
1e59de90
TL
72 bool require_keepalive,
73 std::optional<utime_t> maybe_keepalive_ack,
74 bool require_ack)
75{
76 std::size_t num_msgs = out_pending_msgs.size();
77 ceph::bufferlist bl;
78
aee94f69
TL
79#ifdef UNIT_TESTS_BUILT
80 std::vector<Tag> tags;
81#endif
82
1e59de90
TL
83 if (unlikely(require_keepalive)) {
84 auto keepalive_frame = KeepAliveFrame::Encode();
85 bl.append(frame_assembler->get_buffer(keepalive_frame));
aee94f69
TL
86#ifdef UNIT_TESTS_BUILT
87 auto tag = KeepAliveFrame::tag;
88 tags.push_back(tag);
89#endif
1e59de90
TL
90 }
91
92 if (unlikely(maybe_keepalive_ack.has_value())) {
93 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
94 bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
aee94f69
TL
95#ifdef UNIT_TESTS_BUILT
96 auto tag = KeepAliveFrameAck::tag;
97 tags.push_back(tag);
98#endif
1e59de90
TL
99 }
100
101 if (require_ack && num_msgs == 0u) {
aee94f69 102 auto ack_frame = AckFrame::Encode(in_seq);
1e59de90 103 bl.append(frame_assembler->get_buffer(ack_frame));
aee94f69
TL
104#ifdef UNIT_TESTS_BUILT
105 auto tag = AckFrame::tag;
106 tags.push_back(tag);
107#endif
1e59de90
TL
108 }
109
110 std::for_each(
111 out_pending_msgs.begin(),
112 out_pending_msgs.begin()+num_msgs,
aee94f69
TL
113 [this, &bl
114#ifdef UNIT_TESTS_BUILT
115 , &tags
116#endif
117 ](const MessageFRef& msg) {
1e59de90
TL
118 // set priority
119 msg->get_header().src = conn.messenger.get_myname();
120
121 msg->encode(conn.features, 0);
122
123 ceph_assert(!msg->get_seq() && "message already has seq");
124 msg->set_seq(++out_seq);
125
126 ceph_msg_header &header = msg->get_header();
127 ceph_msg_footer &footer = msg->get_footer();
128
129 ceph_msg_header2 header2{header.seq, header.tid,
130 header.type, header.priority,
131 header.version,
132 ceph_le32(0), header.data_off,
aee94f69 133 ceph_le64(in_seq),
1e59de90
TL
134 footer.flags, header.compat_version,
135 header.reserved};
136
137 auto message = MessageFrame::Encode(header2,
138 msg->get_payload(), msg->get_middle(), msg->get_data());
139 logger().debug("{} --> #{} === {} ({})",
140 conn, msg->get_seq(), *msg, msg->get_type());
141 bl.append(frame_assembler->get_buffer(message));
aee94f69
TL
142#ifdef UNIT_TESTS_BUILT
143 auto tag = MessageFrame::tag;
144 tags.push_back(tag);
145#endif
1e59de90
TL
146 });
147
148 if (!conn.policy.lossy) {
149 out_sent_msgs.insert(
150 out_sent_msgs.end(),
151 std::make_move_iterator(out_pending_msgs.begin()),
152 std::make_move_iterator(out_pending_msgs.end()));
153 }
154 out_pending_msgs.clear();
aee94f69
TL
155
156#ifdef UNIT_TESTS_BUILT
157 return sweep_ret{std::move(bl), tags};
158#else
1e59de90 159 return bl;
aee94f69 160#endif
1e59de90
TL
161}
162
aee94f69 163seastar::future<> IOHandler::send(MessageFRef msg)
1e59de90 164{
aee94f69
TL
165 // sid may be changed on-the-fly during the submission
166 if (seastar::this_shard_id() == get_shard_id()) {
167 return do_send(std::move(msg));
168 } else {
169 logger().trace("{} send() is directed to {} -- {}",
170 conn, get_shard_id(), *msg);
171 return seastar::smp::submit_to(
172 get_shard_id(), [this, msg=std::move(msg)]() mutable {
173 return send_redirected(std::move(msg));
174 });
175 }
176}
177
178seastar::future<> IOHandler::send_redirected(MessageFRef msg)
179{
180 // sid may be changed on-the-fly during the submission
181 if (seastar::this_shard_id() == get_shard_id()) {
182 return do_send(std::move(msg));
183 } else {
184 logger().debug("{} send() is redirected to {} -- {}",
185 conn, get_shard_id(), *msg);
186 return seastar::smp::submit_to(
187 get_shard_id(), [this, msg=std::move(msg)]() mutable {
188 return send_redirected(std::move(msg));
189 });
190 }
191}
192
193seastar::future<> IOHandler::do_send(MessageFRef msg)
194{
195 assert(seastar::this_shard_id() == get_shard_id());
196 logger().trace("{} do_send() got message -- {}", conn, *msg);
197 if (get_io_state() != io_state_t::drop) {
1e59de90
TL
198 out_pending_msgs.push_back(std::move(msg));
199 notify_out_dispatch();
200 }
201 return seastar::now();
202}
203
204seastar::future<> IOHandler::send_keepalive()
205{
aee94f69
TL
206 // sid may be changed on-the-fly during the submission
207 if (seastar::this_shard_id() == get_shard_id()) {
208 return do_send_keepalive();
209 } else {
210 logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
211 return seastar::smp::submit_to(
212 get_shard_id(), [this] {
213 return send_keepalive_redirected();
214 });
215 }
216}
217
218seastar::future<> IOHandler::send_keepalive_redirected()
219{
220 // sid may be changed on-the-fly during the submission
221 if (seastar::this_shard_id() == get_shard_id()) {
222 return do_send_keepalive();
223 } else {
224 logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
225 return seastar::smp::submit_to(
226 get_shard_id(), [this] {
227 return send_keepalive_redirected();
228 });
229 }
230}
231
232seastar::future<> IOHandler::do_send_keepalive()
233{
234 assert(seastar::this_shard_id() == get_shard_id());
235 logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
1e59de90
TL
236 if (!need_keepalive) {
237 need_keepalive = true;
238 notify_out_dispatch();
239 }
240 return seastar::now();
241}
242
243void IOHandler::mark_down()
244{
aee94f69
TL
245 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
246 ceph_assert_always(get_io_state() != io_state_t::none);
1e59de90 247 need_dispatch_reset = false;
aee94f69 248 if (get_io_state() == io_state_t::drop) {
1e59de90
TL
249 return;
250 }
251
aee94f69
TL
252 auto cc_seq = crosscore.prepare_submit();
253 logger().info("{} mark_down() at {}, send {} notify_mark_down()",
254 conn, io_stat_printer{*this}, cc_seq);
255 do_set_io_state(io_state_t::drop);
256 shard_states->dispatch_in_background(
257 "notify_mark_down", conn, [this, cc_seq] {
258 return seastar::smp::submit_to(
259 conn.get_messenger_shard_id(), [this, cc_seq] {
260 return handshake_listener->notify_mark_down(cc_seq);
261 });
262 });
1e59de90
TL
263}
264
265void IOHandler::print_io_stat(std::ostream &out) const
266{
aee94f69 267 assert(seastar::this_shard_id() == get_shard_id());
1e59de90 268 out << "io_stat("
aee94f69 269 << "io_state=" << fmt::format("{}", get_io_state())
1e59de90
TL
270 << ", in_seq=" << in_seq
271 << ", out_seq=" << out_seq
272 << ", out_pending_msgs_size=" << out_pending_msgs.size()
273 << ", out_sent_msgs_size=" << out_sent_msgs.size()
274 << ", need_ack=" << (ack_left > 0)
275 << ", need_keepalive=" << need_keepalive
276 << ", need_keepalive_ack=" << bool(next_keepalive_ack)
277 << ")";
278}
279
aee94f69
TL
280void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
281{
282 assert(fa != nullptr);
283 ceph_assert_always(frame_assembler == nullptr);
284 frame_assembler = std::move(fa);
285 ceph_assert_always(
286 frame_assembler->get_shard_id() == get_shard_id());
287 // should have been set through dispatch_accept/connect()
288 ceph_assert_always(
289 frame_assembler->get_socket_shard_id() == get_shard_id());
290 ceph_assert_always(frame_assembler->is_socket_valid());
291}
292
293void IOHandler::do_set_io_state(
294 io_state_t new_state,
295 std::optional<crosscore_t::seq_t> cc_seq,
296 FrameAssemblerV2Ref fa,
297 bool set_notify_out)
1e59de90 298{
aee94f69
TL
299 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
300 auto prv_state = get_io_state();
301 logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, "
302 "fa={}, set_notify_out={}, at {}",
303 conn,
304 cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "",
305 prv_state, new_state,
306 fa ? "present" : "N/A", set_notify_out,
307 io_stat_printer{*this});
1e59de90 308 ceph_assert_always(!(
aee94f69
TL
309 (new_state == io_state_t::none && prv_state != io_state_t::none) ||
310 (new_state == io_state_t::open && prv_state == io_state_t::open)
1e59de90
TL
311 ));
312
aee94f69
TL
313 if (prv_state == io_state_t::drop) {
314 // only possible due to a racing mark_down() from user
315 if (new_state == io_state_t::open) {
316 assign_frame_assembler(std::move(fa));
317 frame_assembler->shutdown_socket<false>(nullptr);
318 } else {
319 assert(fa == nullptr);
320 }
321 return;
322 }
323
1e59de90
TL
324 bool dispatch_in = false;
325 if (new_state == io_state_t::open) {
326 // to open
327 ceph_assert_always(protocol_is_connected == true);
aee94f69 328 assign_frame_assembler(std::move(fa));
1e59de90 329 dispatch_in = true;
aee94f69 330 } else if (prv_state == io_state_t::open) {
1e59de90
TL
331 // from open
332 ceph_assert_always(protocol_is_connected == true);
333 protocol_is_connected = false;
334 assert(fa == nullptr);
335 ceph_assert_always(frame_assembler->is_socket_valid());
aee94f69 336 frame_assembler->shutdown_socket<false>(nullptr);
1e59de90
TL
337 } else {
338 assert(fa == nullptr);
339 }
340
aee94f69
TL
341 if (new_state == io_state_t::delay) {
342 need_notify_out = set_notify_out;
343 if (need_notify_out) {
344 maybe_notify_out_dispatch();
345 }
346 } else {
347 assert(set_notify_out == false);
348 need_notify_out = false;
349 }
350
351 // FIXME: simplify and drop the prv_state == new_state case
352 if (prv_state != new_state) {
353 shard_states->set_io_state(new_state);
1e59de90
TL
354 }
355
356 /*
357 * not atomic below
358 */
359
360 if (dispatch_in) {
361 do_in_dispatch();
362 }
363}
364
aee94f69
TL
365seastar::future<> IOHandler::set_io_state(
366 crosscore_t::seq_t cc_seq,
367 io_state_t new_state,
368 FrameAssemblerV2Ref fa,
369 bool set_notify_out)
1e59de90 370{
aee94f69
TL
371 assert(seastar::this_shard_id() == get_shard_id());
372 if (!crosscore.proceed_or_wait(cc_seq)) {
373 logger().debug("{} got {} set_io_state(), wait at {}",
374 conn, cc_seq, crosscore.get_in_seq());
375 return crosscore.wait(cc_seq
376 ).then([this, cc_seq, new_state,
377 fa=std::move(fa), set_notify_out]() mutable {
378 return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
379 });
380 }
381
382 do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out);
383 return seastar::now();
384}
385
386seastar::future<IOHandler::exit_dispatching_ret>
387IOHandler::wait_io_exit_dispatching(
388 crosscore_t::seq_t cc_seq)
389{
390 assert(seastar::this_shard_id() == get_shard_id());
391 if (!crosscore.proceed_or_wait(cc_seq)) {
392 logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
393 conn, cc_seq, crosscore.get_in_seq());
394 return crosscore.wait(cc_seq
395 ).then([this, cc_seq] {
396 return wait_io_exit_dispatching(cc_seq);
397 });
398 }
399
400 logger().debug("{} got {} wait_io_exit_dispatching()",
401 conn, cc_seq);
402 ceph_assert_always(get_io_state() != io_state_t::open);
1e59de90
TL
403 ceph_assert_always(frame_assembler != nullptr);
404 ceph_assert_always(!frame_assembler->is_socket_valid());
aee94f69
TL
405 return seastar::futurize_invoke([this] {
406 // cannot be running in parallel with to_new_sid()
407 if (maybe_dropped_sid.has_value()) {
408 ceph_assert_always(get_io_state() == io_state_t::drop);
409 assert(shard_states->assert_closed_and_exit());
410 auto prv_sid = *maybe_dropped_sid;
411 return seastar::smp::submit_to(prv_sid, [this] {
412 logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn);
413 assert(maybe_prv_shard_states != nullptr);
414 return maybe_prv_shard_states->wait_io_exit_dispatching();
415 });
416 } else {
417 return shard_states->wait_io_exit_dispatching();
418 }
419 }).then([this] {
420 logger().debug("{} finish wait_io_exit_dispatching at {}",
421 conn, io_stat_printer{*this});
422 ceph_assert_always(frame_assembler != nullptr);
423 ceph_assert_always(!frame_assembler->is_socket_valid());
424 frame_assembler->set_shard_id(conn.get_messenger_shard_id());
425 return exit_dispatching_ret{
426 std::move(frame_assembler),
427 get_states()};
1e59de90
TL
428 });
429}
430
aee94f69
TL
431seastar::future<> IOHandler::reset_session(
432 crosscore_t::seq_t cc_seq,
433 bool full)
1e59de90 434{
aee94f69
TL
435 assert(seastar::this_shard_id() == get_shard_id());
436 if (!crosscore.proceed_or_wait(cc_seq)) {
437 logger().debug("{} got {} reset_session(), wait at {}",
438 conn, cc_seq, crosscore.get_in_seq());
439 return crosscore.wait(cc_seq
440 ).then([this, cc_seq, full] {
441 return reset_session(cc_seq, full);
442 });
443 }
444
445 logger().debug("{} got {} reset_session({})",
446 conn, cc_seq, full);
447 assert(get_io_state() != io_state_t::open);
448 reset_in();
1e59de90
TL
449 if (full) {
450 reset_out();
451 dispatch_remote_reset();
452 }
aee94f69 453 return seastar::now();
1e59de90
TL
454}
455
aee94f69
TL
456seastar::future<> IOHandler::reset_peer_state(
457 crosscore_t::seq_t cc_seq)
1e59de90 458{
aee94f69
TL
459 assert(seastar::this_shard_id() == get_shard_id());
460 if (!crosscore.proceed_or_wait(cc_seq)) {
461 logger().debug("{} got {} reset_peer_state(), wait at {}",
462 conn, cc_seq, crosscore.get_in_seq());
463 return crosscore.wait(cc_seq
464 ).then([this, cc_seq] {
465 return reset_peer_state(cc_seq);
466 });
467 }
468
469 logger().debug("{} got {} reset_peer_state()",
470 conn, cc_seq);
471 assert(get_io_state() != io_state_t::open);
472 reset_in();
473 do_requeue_out_sent_up_to(0);
474 discard_out_sent();
475 return seastar::now();
476}
477
478seastar::future<> IOHandler::requeue_out_sent(
479 crosscore_t::seq_t cc_seq)
480{
481 assert(seastar::this_shard_id() == get_shard_id());
482 if (!crosscore.proceed_or_wait(cc_seq)) {
483 logger().debug("{} got {} requeue_out_sent(), wait at {}",
484 conn, cc_seq, crosscore.get_in_seq());
485 return crosscore.wait(cc_seq
486 ).then([this, cc_seq] {
487 return requeue_out_sent(cc_seq);
488 });
489 }
490
491 logger().debug("{} got {} requeue_out_sent()",
492 conn, cc_seq);
493 do_requeue_out_sent();
494 return seastar::now();
495}
496
497void IOHandler::do_requeue_out_sent()
498{
499 assert(get_io_state() != io_state_t::open);
1e59de90
TL
500 if (out_sent_msgs.empty()) {
501 return;
502 }
503
504 out_seq -= out_sent_msgs.size();
505 logger().debug("{} requeue {} items, revert out_seq to {}",
506 conn, out_sent_msgs.size(), out_seq);
aee94f69 507 for (MessageFRef& msg : out_sent_msgs) {
1e59de90
TL
508 msg->clear_payload();
509 msg->set_seq(0);
510 }
511 out_pending_msgs.insert(
512 out_pending_msgs.begin(),
513 std::make_move_iterator(out_sent_msgs.begin()),
514 std::make_move_iterator(out_sent_msgs.end()));
515 out_sent_msgs.clear();
aee94f69 516 maybe_notify_out_dispatch();
1e59de90
TL
517}
518
aee94f69
TL
519seastar::future<> IOHandler::requeue_out_sent_up_to(
520 crosscore_t::seq_t cc_seq,
521 seq_num_t msg_seq)
1e59de90 522{
aee94f69
TL
523 assert(seastar::this_shard_id() == get_shard_id());
524 if (!crosscore.proceed_or_wait(cc_seq)) {
525 logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
526 conn, cc_seq, crosscore.get_in_seq());
527 return crosscore.wait(cc_seq
528 ).then([this, cc_seq, msg_seq] {
529 return requeue_out_sent_up_to(cc_seq, msg_seq);
530 });
531 }
532
533 logger().debug("{} got {} requeue_out_sent_up_to({})",
534 conn, cc_seq, msg_seq);
535 do_requeue_out_sent_up_to(msg_seq);
536 return seastar::now();
537}
538
539void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq)
540{
541 assert(get_io_state() != io_state_t::open);
1e59de90
TL
542 if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
543 logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
544 conn, out_seq, seq);
545 out_seq = seq;
546 return;
547 }
548 logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
549 conn, seq, out_sent_msgs.size(), out_seq);
550 while (!out_sent_msgs.empty()) {
551 auto cur_seq = out_sent_msgs.front()->get_seq();
552 if (cur_seq == 0 || cur_seq > seq) {
553 break;
554 } else {
555 out_sent_msgs.pop_front();
556 }
557 }
aee94f69
TL
558 do_requeue_out_sent();
559}
560
561void IOHandler::reset_in()
562{
563 assert(get_io_state() != io_state_t::open);
564 in_seq = 0;
1e59de90
TL
565}
566
567void IOHandler::reset_out()
568{
aee94f69
TL
569 assert(get_io_state() != io_state_t::open);
570 discard_out_sent();
1e59de90 571 out_pending_msgs.clear();
1e59de90
TL
572 need_keepalive = false;
573 next_keepalive_ack = std::nullopt;
574 ack_left = 0;
575}
576
aee94f69 577void IOHandler::discard_out_sent()
1e59de90 578{
aee94f69
TL
579 assert(get_io_state() != io_state_t::open);
580 out_seq = 0;
581 out_sent_msgs.clear();
1e59de90
TL
582}
583
aee94f69
TL
584seastar::future<>
585IOHandler::dispatch_accept(
586 crosscore_t::seq_t cc_seq,
587 seastar::shard_id new_sid,
588 ConnectionFRef conn_fref,
589 bool is_replace)
1e59de90 590{
aee94f69
TL
591 return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
592}
593
594seastar::future<>
595IOHandler::dispatch_connect(
596 crosscore_t::seq_t cc_seq,
597 seastar::shard_id new_sid,
598 ConnectionFRef conn_fref)
599{
600 return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt);
601}
602
603seastar::future<>
604IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
605{
606 assert(seastar::this_shard_id() == get_shard_id());
607 return seastar::smp::submit_to(prv_sid, [this] {
608 logger().debug("{} got cleanup_prv_shard()", conn);
609 assert(maybe_prv_shard_states != nullptr);
610 auto ref_prv_states = std::move(maybe_prv_shard_states);
611 auto &prv_states = *ref_prv_states;
612 return prv_states.close(
613 ).then([ref_prv_states=std::move(ref_prv_states)] {
614 ceph_assert_always(ref_prv_states->assert_closed_and_exit());
615 });
616 }).then([this] {
617 ceph_assert_always(maybe_prv_shard_states == nullptr);
618 });
619}
620
621seastar::future<>
622IOHandler::to_new_sid(
623 crosscore_t::seq_t cc_seq,
624 seastar::shard_id new_sid,
625 ConnectionFRef conn_fref,
626 std::optional<bool> is_replace)
627{
628 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
629 if (!crosscore.proceed_or_wait(cc_seq)) {
630 logger().debug("{} got {} to_new_sid(), wait at {}",
631 conn, cc_seq, crosscore.get_in_seq());
632 return crosscore.wait(cc_seq
633 ).then([this, cc_seq, new_sid, is_replace,
634 conn_fref=std::move(conn_fref)]() mutable {
635 return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
636 });
1e59de90 637 }
aee94f69
TL
638
639 bool is_accept_or_connect = is_replace.has_value();
640 logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}",
641 conn, cc_seq, new_sid,
642 fmt::format("{}",
643 is_accept_or_connect ?
644 (*is_replace ? "accept(replace)" : "accept(!replace)") :
645 "connect"),
646 io_stat_printer{*this});
647 auto next_cc_seq = ++cc_seq;
648
649 if (get_io_state() != io_state_t::drop) {
650 ceph_assert_always(conn_ref);
651 if (new_sid != seastar::this_shard_id()) {
652 dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect);
653 // user can make changes
654 }
655 } else {
656 // it is possible that both io_handler and protocolv2 are
657 // trying to close each other from different cores simultaneously.
658 assert(!protocol_is_connected);
659 }
660
661 if (get_io_state() != io_state_t::drop) {
662 if (is_accept_or_connect) {
663 // protocol_is_connected can be from true to true here if the replacing is
664 // happening to a connected connection.
665 } else {
666 ceph_assert_always(protocol_is_connected == false);
667 }
668 protocol_is_connected = true;
669 } else {
670 assert(!protocol_is_connected);
671 }
672
673 bool is_dropped = false;
674 if (get_io_state() == io_state_t::drop) {
675 is_dropped = true;
676 }
677 ceph_assert_always(get_io_state() != io_state_t::open);
678
679 // apply the switching atomically
680 ceph_assert_always(conn_ref);
681 conn_ref.reset();
682 auto prv_sid = get_shard_id();
683 ceph_assert_always(maybe_prv_shard_states == nullptr);
684 maybe_prv_shard_states = std::move(shard_states);
685 shard_states = shard_states_t::create_from_previous(
686 *maybe_prv_shard_states, new_sid);
687 assert(new_sid == get_shard_id());
688
689 return seastar::smp::submit_to(new_sid,
690 [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
691 logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}",
692 conn, next_cc_seq, prv_sid, is_dropped,
693 fmt::format("{}",
694 is_replace.has_value() ?
695 (*is_replace ? "accept(replace)" : "accept(!replace)") :
696 "connect"),
697 io_stat_printer{*this});
698
699 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
700 ceph_assert_always(get_io_state() != io_state_t::open);
701 ceph_assert_always(!maybe_dropped_sid.has_value());
702 ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
703
704 if (is_dropped) {
705 ceph_assert_always(get_io_state() == io_state_t::drop);
706 ceph_assert_always(shard_states->assert_closed_and_exit());
707 maybe_dropped_sid = prv_sid;
708 // cleanup_prv_shard() will be done in a follow-up close_io()
709 } else {
710 // possible at io_state_t::drop
711
712 // previous shard is not cleaned,
713 // but close_io() is responsible to clean up the current shard,
714 // so cleanup the previous shard here.
715 shard_states->dispatch_in_background(
716 "cleanup_prv_sid", conn, [this, prv_sid] {
717 return cleanup_prv_shard(prv_sid);
718 });
719 maybe_notify_out_dispatch();
720 }
721
722 ceph_assert_always(!conn_ref);
723 // assign even if already dropping
724 conn_ref = make_local_shared_foreign(std::move(conn_fref));
725
726 if (get_io_state() != io_state_t::drop) {
727 if (is_replace.has_value()) {
728 dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace);
729 } else {
730 dispatchers.ms_handle_connect(conn_ref, prv_sid);
731 }
732 // user can make changes
733 }
734 });
735}
736
737seastar::future<> IOHandler::set_accepted_sid(
738 crosscore_t::seq_t cc_seq,
739 seastar::shard_id sid,
740 ConnectionFRef conn_fref)
741{
742 assert(seastar::this_shard_id() == get_shard_id());
743 assert(get_io_state() == io_state_t::none);
744 ceph_assert_always(conn_ref);
745 conn_ref.reset();
746 assert(maybe_prv_shard_states == nullptr);
747 shard_states.reset();
748 shard_states = shard_states_t::create(sid, io_state_t::none);
749 return seastar::smp::submit_to(sid,
750 [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable {
751 // must be the first to proceed
752 ceph_assert_always(crosscore.proceed_or_wait(cc_seq));
753
754 logger().debug("{} set accepted sid", conn);
755 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
756 ceph_assert_always(get_io_state() == io_state_t::none);
757 assert(maybe_prv_shard_states == nullptr);
758 ceph_assert_always(!conn_ref);
759 conn_ref = make_local_shared_foreign(std::move(conn_fref));
760 });
1e59de90
TL
761}
762
763void IOHandler::dispatch_reset(bool is_replace)
764{
aee94f69 765 ceph_assert_always(get_io_state() == io_state_t::drop);
1e59de90
TL
766 if (!need_dispatch_reset) {
767 return;
768 }
769 need_dispatch_reset = false;
aee94f69
TL
770 ceph_assert_always(conn_ref);
771
1e59de90 772 dispatchers.ms_handle_reset(conn_ref, is_replace);
aee94f69 773 // user can make changes
1e59de90
TL
774}
775
776void IOHandler::dispatch_remote_reset()
777{
aee94f69 778 if (get_io_state() == io_state_t::drop) {
1e59de90
TL
779 return;
780 }
aee94f69
TL
781 ceph_assert_always(conn_ref);
782
1e59de90 783 dispatchers.ms_handle_remote_reset(conn_ref);
aee94f69 784 // user can make changes
1e59de90
TL
785}
786
787void IOHandler::ack_out_sent(seq_num_t seq)
788{
789 if (conn.policy.lossy) { // lossy connections don't keep sent messages
790 return;
791 }
792 while (!out_sent_msgs.empty() &&
793 out_sent_msgs.front()->get_seq() <= seq) {
794 logger().trace("{} got ack seq {} >= {}, pop {}",
795 conn, seq, out_sent_msgs.front()->get_seq(),
796 *out_sent_msgs.front());
797 out_sent_msgs.pop_front();
798 }
799}
800
aee94f69
TL
801seastar::future<>
802IOHandler::do_out_dispatch(shard_states_t &ctx)
1e59de90 803{
aee94f69
TL
804 return seastar::repeat([this, &ctx] {
805 switch (ctx.get_io_state()) {
1e59de90 806 case io_state_t::open: {
aee94f69
TL
807 if (unlikely(!is_out_queued())) {
808 // try exit open dispatching
809 return frame_assembler->flush<false>(
810 ).then([this, &ctx] {
811 if (ctx.get_io_state() != io_state_t::open || is_out_queued()) {
812 return seastar::make_ready_future<stop_t>(stop_t::no);
813 }
814 // still nothing pending to send after flush,
815 // open dispatching can ONLY stop now
816 ctx.exit_out_dispatching("exit-open", conn);
817 return seastar::make_ready_future<stop_t>(stop_t::yes);
818 });
1e59de90 819 }
aee94f69
TL
820
821 auto require_keepalive = need_keepalive;
822 need_keepalive = false;
823 auto maybe_keepalive_ack = next_keepalive_ack;
824 next_keepalive_ack = std::nullopt;
1e59de90
TL
825 auto to_ack = ack_left;
826 assert(to_ack == 0 || in_seq > 0);
aee94f69
TL
827 ack_left = 0;
828#ifdef UNIT_TESTS_BUILT
829 auto ret = sweep_out_pending_msgs_to_sent(
830 require_keepalive, maybe_keepalive_ack, to_ack > 0);
831 return frame_assembler->intercept_frames(ret.tags, true
832 ).then([this, bl=std::move(ret.bl)]() mutable {
833 return frame_assembler->write<false>(std::move(bl));
834 }
835#else
836 auto bl = sweep_out_pending_msgs_to_sent(
837 require_keepalive, maybe_keepalive_ack, to_ack > 0);
838 return frame_assembler->write<false>(std::move(bl)
839#endif
840 ).then([this, &ctx] {
841 if (ctx.get_io_state() != io_state_t::open) {
842 return frame_assembler->flush<false>(
843 ).then([] {
844 return seastar::make_ready_future<stop_t>(stop_t::no);
845 });
1e59de90 846 }
aee94f69
TL
847
848 // FIXME: may leak a flush if state is changed after return and before
849 // the next repeat body.
850 return seastar::make_ready_future<stop_t>(stop_t::no);
1e59de90
TL
851 });
852 }
853 case io_state_t::delay:
854 // delay out dispatching until open
aee94f69
TL
855 ctx.notify_out_dispatching_stopped("delay...", conn);
856 return ctx.wait_state_change(
1e59de90
TL
857 ).then([] { return stop_t::no; });
858 case io_state_t::drop:
aee94f69
TL
859 ctx.exit_out_dispatching("dropped", conn);
860 return seastar::make_ready_future<stop_t>(stop_t::yes);
861 case io_state_t::switched:
862 ctx.exit_out_dispatching("switched", conn);
1e59de90
TL
863 return seastar::make_ready_future<stop_t>(stop_t::yes);
864 default:
aee94f69 865 ceph_abort("impossible");
1e59de90 866 }
aee94f69
TL
867 }).handle_exception_type([this, &ctx](const std::system_error& e) {
868 auto io_state = ctx.get_io_state();
1e59de90
TL
869 if (e.code() != std::errc::broken_pipe &&
870 e.code() != std::errc::connection_reset &&
871 e.code() != error::negotiation_failure) {
872 logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
873 conn, io_state, e.what());
874 ceph_abort();
875 }
876
877 if (io_state == io_state_t::open) {
aee94f69
TL
878 auto cc_seq = crosscore.prepare_submit();
879 logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
880 "send {} notify_out_fault()",
881 conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
1e59de90
TL
882 std::exception_ptr eptr;
883 try {
884 throw e;
885 } catch(...) {
886 eptr = std::current_exception();
887 }
aee94f69
TL
888 do_set_io_state(io_state_t::delay);
889 shard_states->dispatch_in_background(
890 "notify_out_fault(out)", conn, [this, cc_seq, eptr] {
891 auto states = get_states();
892 return seastar::smp::submit_to(
893 conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
894 return handshake_listener->notify_out_fault(
895 cc_seq, "do_out_dispatch", eptr, states);
896 });
897 });
1e59de90 898 } else {
aee94f69
TL
899 if (io_state != io_state_t::switched) {
900 logger().info("{} do_out_dispatch(): fault at {}, {} -- {}",
901 conn, io_state, io_stat_printer{*this}, e.what());
902 } else {
903 logger().info("{} do_out_dispatch(): fault at {} -- {}",
904 conn, io_state, e.what());
905 }
1e59de90
TL
906 }
907
aee94f69 908 return do_out_dispatch(ctx);
1e59de90
TL
909 });
910}
911
aee94f69
TL
912void IOHandler::maybe_notify_out_dispatch()
913{
914 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
915 if (is_out_queued()) {
916 notify_out_dispatch();
917 }
918}
919
1e59de90
TL
920void IOHandler::notify_out_dispatch()
921{
aee94f69
TL
922 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
923 assert(is_out_queued());
924 if (need_notify_out) {
925 auto cc_seq = crosscore.prepare_submit();
926 logger().debug("{} send {} notify_out()",
927 conn, cc_seq);
928 shard_states->dispatch_in_background(
929 "notify_out", conn, [this, cc_seq] {
930 return seastar::smp::submit_to(
931 conn.get_messenger_shard_id(), [this, cc_seq] {
932 return handshake_listener->notify_out(cc_seq);
933 });
934 });
1e59de90 935 }
aee94f69
TL
936 if (shard_states->try_enter_out_dispatching()) {
937 shard_states->dispatch_in_background(
938 "do_out_dispatch", conn, [this] {
939 return do_out_dispatch(*shard_states);
1e59de90 940 });
1e59de90
TL
941 }
942}
943
944seastar::future<>
aee94f69
TL
945IOHandler::read_message(
946 shard_states_t &ctx,
947 utime_t throttle_stamp,
948 std::size_t msg_size)
1e59de90 949{
aee94f69
TL
950 return frame_assembler->read_frame_payload<false>(
951 ).then([this, throttle_stamp, msg_size, &ctx](auto payload) {
952 if (unlikely(ctx.get_io_state() != io_state_t::open)) {
1e59de90 953 logger().debug("{} triggered {} during read_message()",
aee94f69 954 conn, ctx.get_io_state());
1e59de90
TL
955 abort_protocol();
956 }
957
958 utime_t recv_stamp{seastar::lowres_system_clock::now()};
959
960 // we need to get the size before std::moving segments data
961 auto msg_frame = MessageFrame::Decode(*payload);
962 // XXX: paranoid copy just to avoid oops
963 ceph_msg_header2 current_header = msg_frame.header();
964
965 logger().trace("{} got {} + {} + {} byte message,"
966 " envelope type={} src={} off={} seq={}",
967 conn,
968 msg_frame.front_len(),
969 msg_frame.middle_len(),
970 msg_frame.data_len(),
971 current_header.type,
972 conn.get_peer_name(),
973 current_header.data_off,
974 current_header.seq);
975
976 ceph_msg_header header{current_header.seq,
977 current_header.tid,
978 current_header.type,
979 current_header.priority,
980 current_header.version,
981 ceph_le32(msg_frame.front_len()),
982 ceph_le32(msg_frame.middle_len()),
983 ceph_le32(msg_frame.data_len()),
984 current_header.data_off,
985 conn.get_peer_name(),
986 current_header.compat_version,
987 current_header.reserved,
988 ceph_le32(0)};
989 ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
990 ceph_le32(0), ceph_le64(0), current_header.flags};
991
992 Message *message = decode_message(nullptr, 0, header, footer,
993 msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr);
994 if (!message) {
995 logger().warn("{} decode message failed", conn);
996 abort_in_fault();
997 }
998
999 // store reservation size in message, so we don't get confused
1000 // by messages entering the dispatch queue through other paths.
1001 message->set_dispatch_throttle_size(msg_size);
1002
1003 message->set_throttle_stamp(throttle_stamp);
1004 message->set_recv_stamp(recv_stamp);
1005 message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
1006
1007 // check received seq#. if it is old, drop the message.
1008 // note that incoming messages may skip ahead. this is convenient for the
1009 // client side queueing because messages can't be renumbered, but the (kernel)
1010 // client will occasionally pull a message out of the sent queue to send
1011 // elsewhere. in that case it doesn't matter if we "got" it or not.
aee94f69 1012 uint64_t cur_seq = in_seq;
1e59de90
TL
1013 if (message->get_seq() <= cur_seq) {
1014 logger().error("{} got old message {} <= {} {}, discarding",
1015 conn, message->get_seq(), cur_seq, *message);
1016 if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
1017 local_conf()->ms_die_on_old_message) {
1018 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1019 }
1020 return seastar::now();
1021 } else if (message->get_seq() > cur_seq + 1) {
1022 logger().error("{} missed message? skipped from seq {} to {}",
1023 conn, cur_seq, message->get_seq());
1024 if (local_conf()->ms_die_on_skipped_message) {
1025 ceph_assert(0 == "skipped incoming seq");
1026 }
1027 }
1028
1029 // note last received message.
1030 in_seq = message->get_seq();
1031 if (conn.policy.lossy) {
1032 logger().debug("{} <== #{} === {} ({})",
1033 conn,
1034 message->get_seq(),
1035 *message,
1036 message->get_type());
1037 } else {
1038 logger().debug("{} <== #{},{} === {} ({})",
1039 conn,
1040 message->get_seq(),
1041 current_header.ack_seq,
1042 *message,
1043 message->get_type());
1044 }
1045
1046 // notify ack
1047 if (!conn.policy.lossy) {
1048 ++ack_left;
1049 notify_out_dispatch();
1050 }
1051
1052 ack_out_sent(current_header.ack_seq);
1053
1054 // TODO: change MessageRef with seastar::shared_ptr
1055 auto msg_ref = MessageRef{message, false};
aee94f69
TL
1056 assert(ctx.get_io_state() == io_state_t::open);
1057 assert(get_io_state() == io_state_t::open);
1058 ceph_assert_always(conn_ref);
1059
1e59de90
TL
1060 // throttle the reading process by the returned future
1061 return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
aee94f69 1062 // user can make changes
1e59de90
TL
1063 });
1064}
1065
1066void IOHandler::do_in_dispatch()
1067{
aee94f69
TL
1068 shard_states->enter_in_dispatching();
1069 shard_states->dispatch_in_background(
1070 "do_in_dispatch", conn, [this, &ctx=*shard_states] {
1071 return seastar::keep_doing([this, &ctx] {
1072 return frame_assembler->read_main_preamble<false>(
1073 ).then([this, &ctx](auto ret) {
1e59de90
TL
1074 switch (ret.tag) {
1075 case Tag::MESSAGE: {
1076 size_t msg_size = get_msg_size(*ret.rx_frame_asm);
1077 return seastar::futurize_invoke([this] {
1078 // throttle_message() logic
1079 if (!conn.policy.throttler_messages) {
1080 return seastar::now();
1081 }
1082 // TODO: message throttler
aee94f69 1083 ceph_abort("TODO");
1e59de90
TL
1084 return seastar::now();
1085 }).then([this, msg_size] {
1086 // throttle_bytes() logic
1087 if (!conn.policy.throttler_bytes) {
1088 return seastar::now();
1089 }
1090 if (!msg_size) {
1091 return seastar::now();
1092 }
1093 logger().trace("{} wants {} bytes from policy throttler {}/{}",
1094 conn, msg_size,
1095 conn.policy.throttler_bytes->get_current(),
1096 conn.policy.throttler_bytes->get_max());
1097 return conn.policy.throttler_bytes->get(msg_size);
aee94f69 1098 }).then([this, msg_size, &ctx] {
1e59de90
TL
1099 // TODO: throttle_dispatch_queue() logic
1100 utime_t throttle_stamp{seastar::lowres_system_clock::now()};
aee94f69 1101 return read_message(ctx, throttle_stamp, msg_size);
1e59de90
TL
1102 });
1103 }
1104 case Tag::ACK:
aee94f69 1105 return frame_assembler->read_frame_payload<false>(
1e59de90
TL
1106 ).then([this](auto payload) {
1107 // handle_message_ack() logic
1108 auto ack = AckFrame::Decode(payload->back());
1109 logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
1110 ack_out_sent(ack.seq());
1111 });
1112 case Tag::KEEPALIVE2:
aee94f69 1113 return frame_assembler->read_frame_payload<false>(
1e59de90
TL
1114 ).then([this](auto payload) {
1115 // handle_keepalive2() logic
1116 auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
1117 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
1118 conn, keepalive_frame.timestamp());
1119 // notify keepalive ack
1120 next_keepalive_ack = keepalive_frame.timestamp();
aee94f69
TL
1121 if (seastar::this_shard_id() == get_shard_id()) {
1122 notify_out_dispatch();
1123 }
1e59de90
TL
1124
1125 last_keepalive = seastar::lowres_system_clock::now();
1126 });
1127 case Tag::KEEPALIVE2_ACK:
aee94f69 1128 return frame_assembler->read_frame_payload<false>(
1e59de90
TL
1129 ).then([this](auto payload) {
1130 // handle_keepalive2_ack() logic
1131 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
1132 auto _last_keepalive_ack =
1133 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
1134 set_last_keepalive_ack(_last_keepalive_ack);
1135 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
1136 conn, _last_keepalive_ack);
1137 });
1138 default: {
1139 logger().warn("{} do_in_dispatch() received unexpected tag: {}",
1140 conn, static_cast<uint32_t>(ret.tag));
1141 abort_in_fault();
1142 }
1143 }
1144 });
aee94f69 1145 }).handle_exception([this, &ctx](std::exception_ptr eptr) {
1e59de90
TL
1146 const char *e_what;
1147 try {
1148 std::rethrow_exception(eptr);
1149 } catch (std::exception &e) {
1150 e_what = e.what();
1151 }
1152
aee94f69 1153 auto io_state = ctx.get_io_state();
1e59de90 1154 if (io_state == io_state_t::open) {
aee94f69
TL
1155 auto cc_seq = crosscore.prepare_submit();
1156 logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
1157 "send {} notify_out_fault()",
1158 conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
1159 do_set_io_state(io_state_t::delay);
1160 shard_states->dispatch_in_background(
1161 "notify_out_fault(in)", conn, [this, cc_seq, eptr] {
1162 auto states = get_states();
1163 return seastar::smp::submit_to(
1164 conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
1165 return handshake_listener->notify_out_fault(
1166 cc_seq, "do_in_dispatch", eptr, states);
1167 });
1168 });
1e59de90 1169 } else {
aee94f69
TL
1170 if (io_state != io_state_t::switched) {
1171 logger().info("{} do_in_dispatch(): fault at {}, {} -- {}",
1172 conn, io_state, io_stat_printer{*this}, e_what);
1173 } else {
1174 logger().info("{} do_in_dispatch(): fault at {} -- {}",
1175 conn, io_state, e_what);
1176 }
1e59de90 1177 }
aee94f69
TL
1178 }).finally([&ctx] {
1179 ctx.exit_in_dispatching();
1e59de90
TL
1180 });
1181 });
1182}
1183
aee94f69
TL
1184seastar::future<>
1185IOHandler::close_io(
1186 crosscore_t::seq_t cc_seq,
1187 bool is_dispatch_reset,
1188 bool is_replace)
1189{
1190 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
1191 if (!crosscore.proceed_or_wait(cc_seq)) {
1192 logger().debug("{} got {} close_io(), wait at {}",
1193 conn, cc_seq, crosscore.get_in_seq());
1194 return crosscore.wait(cc_seq
1195 ).then([this, cc_seq, is_dispatch_reset, is_replace] {
1196 return close_io(cc_seq, is_dispatch_reset, is_replace);
1197 });
1198 }
1199
1200 logger().debug("{} got {} close_io(reset={}, replace={})",
1201 conn, cc_seq, is_dispatch_reset, is_replace);
1202 ceph_assert_always(get_io_state() == io_state_t::drop);
1203
1204 if (is_dispatch_reset) {
1205 dispatch_reset(is_replace);
1206 }
1207
1208 ceph_assert_always(conn_ref);
1209 conn_ref.reset();
1210
1211 // cannot be running in parallel with to_new_sid()
1212 if (maybe_dropped_sid.has_value()) {
1213 assert(shard_states->assert_closed_and_exit());
1214 auto prv_sid = *maybe_dropped_sid;
1215 return cleanup_prv_shard(prv_sid);
1216 } else {
1217 return shard_states->close(
1218 ).then([this] {
1219 assert(shard_states->assert_closed_and_exit());
1220 });
1221 }
1222}
1223
1224/*
1225 * IOHandler::shard_states_t
1226 */
1227
1228void
1229IOHandler::shard_states_t::notify_out_dispatching_stopped(
1230 const char *what, SocketConnection &conn)
1231{
1232 assert(seastar::this_shard_id() == sid);
1233 if (unlikely(out_exit_dispatching.has_value())) {
1234 out_exit_dispatching->set_value();
1235 out_exit_dispatching = std::nullopt;
1236 logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching",
1237 conn, what, io_state);
1238 } else {
1239 if (unlikely(io_state != io_state_t::open)) {
1240 logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching",
1241 conn, what, io_state);
1242 }
1243 }
1244}
1245
1246seastar::future<>
1247IOHandler::shard_states_t::wait_io_exit_dispatching()
1248{
1249 assert(seastar::this_shard_id() == sid);
1250 assert(io_state != io_state_t::open);
1251 assert(!gate.is_closed());
1252 return seastar::when_all(
1253 [this] {
1254 if (out_exit_dispatching) {
1255 return out_exit_dispatching->get_future();
1256 } else {
1257 return seastar::now();
1258 }
1259 }(),
1260 [this] {
1261 if (in_exit_dispatching) {
1262 return in_exit_dispatching->get_future();
1263 } else {
1264 return seastar::now();
1265 }
1266 }()
1267 ).discard_result();
1268}
1269
1270IOHandler::shard_states_ref_t
1271IOHandler::shard_states_t::create_from_previous(
1272 shard_states_t &prv_states,
1273 seastar::shard_id new_sid)
1274{
1275 auto io_state = prv_states.io_state;
1276 assert(io_state != io_state_t::open);
1277 auto ret = shard_states_t::create(new_sid, io_state);
1278 if (io_state == io_state_t::drop) {
1279 // the new gate should not never be used
1280 auto fut = ret->gate.close();
1281 ceph_assert_always(fut.available());
1282 }
1283 prv_states.set_io_state(io_state_t::switched);
1284 return ret;
1285}
1286
1e59de90 1287} // namespace crimson::net