]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/io_handler.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / io_handler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "io_handler.h"
5
6 #include "auth/Auth.h"
7
8 #include "crimson/common/formatter.h"
9 #include "crimson/common/log.h"
10 #include "crimson/net/Errors.h"
11 #include "crimson/net/chained_dispatchers.h"
12 #include "crimson/net/SocketMessenger.h"
13 #include "msg/Message.h"
14 #include "msg/msg_fmt.h"
15
16 using namespace ceph::msgr::v2;
17 using crimson::common::local_conf;
18
19 namespace {
20
21 seastar::logger& logger() {
22 return crimson::get_logger(ceph_subsys_ms);
23 }
24
25 [[noreturn]] void abort_in_fault() {
26 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
27 }
28
29 [[noreturn]] void abort_protocol() {
30 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
31 }
32
33 std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
34 {
35 ceph_assert(rx_frame_asm.get_num_segments() > 0);
36 size_t sum = 0;
37 // we don't include SegmentIndex::Msg::HEADER.
38 for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
39 sum += rx_frame_asm.get_segment_logical_len(idx);
40 }
41 return sum;
42 }
43
44 } // namespace anonymous
45
46 namespace crimson::net {
47
48 IOHandler::IOHandler(ChainedDispatchers &dispatchers,
49 SocketConnection &conn)
50 : shard_states(shard_states_t::create(
51 seastar::this_shard_id(), io_state_t::none)),
52 dispatchers(dispatchers),
53 conn(conn),
54 conn_ref(conn.get_local_shared_foreign_from_this())
55 {}
56
57 IOHandler::~IOHandler()
58 {
59 // close_io() must be finished
60 ceph_assert_always(maybe_prv_shard_states == nullptr);
61 // should be true in the according shard
62 // ceph_assert_always(shard_states->assert_closed_and_exit());
63 assert(!conn_ref);
64 }
65
66 #ifdef UNIT_TESTS_BUILT
67 IOHandler::sweep_ret
68 #else
69 ceph::bufferlist
70 #endif
71 IOHandler::sweep_out_pending_msgs_to_sent(
72 bool require_keepalive,
73 std::optional<utime_t> maybe_keepalive_ack,
74 bool require_ack)
75 {
76 std::size_t num_msgs = out_pending_msgs.size();
77 ceph::bufferlist bl;
78
79 #ifdef UNIT_TESTS_BUILT
80 std::vector<Tag> tags;
81 #endif
82
83 if (unlikely(require_keepalive)) {
84 auto keepalive_frame = KeepAliveFrame::Encode();
85 bl.append(frame_assembler->get_buffer(keepalive_frame));
86 #ifdef UNIT_TESTS_BUILT
87 auto tag = KeepAliveFrame::tag;
88 tags.push_back(tag);
89 #endif
90 }
91
92 if (unlikely(maybe_keepalive_ack.has_value())) {
93 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
94 bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
95 #ifdef UNIT_TESTS_BUILT
96 auto tag = KeepAliveFrameAck::tag;
97 tags.push_back(tag);
98 #endif
99 }
100
101 if (require_ack && num_msgs == 0u) {
102 auto ack_frame = AckFrame::Encode(in_seq);
103 bl.append(frame_assembler->get_buffer(ack_frame));
104 #ifdef UNIT_TESTS_BUILT
105 auto tag = AckFrame::tag;
106 tags.push_back(tag);
107 #endif
108 }
109
110 std::for_each(
111 out_pending_msgs.begin(),
112 out_pending_msgs.begin()+num_msgs,
113 [this, &bl
114 #ifdef UNIT_TESTS_BUILT
115 , &tags
116 #endif
117 ](const MessageFRef& msg) {
118 // set priority
119 msg->get_header().src = conn.messenger.get_myname();
120
121 msg->encode(conn.features, 0);
122
123 ceph_assert(!msg->get_seq() && "message already has seq");
124 msg->set_seq(++out_seq);
125
126 ceph_msg_header &header = msg->get_header();
127 ceph_msg_footer &footer = msg->get_footer();
128
129 ceph_msg_header2 header2{header.seq, header.tid,
130 header.type, header.priority,
131 header.version,
132 ceph_le32(0), header.data_off,
133 ceph_le64(in_seq),
134 footer.flags, header.compat_version,
135 header.reserved};
136
137 auto message = MessageFrame::Encode(header2,
138 msg->get_payload(), msg->get_middle(), msg->get_data());
139 logger().debug("{} --> #{} === {} ({})",
140 conn, msg->get_seq(), *msg, msg->get_type());
141 bl.append(frame_assembler->get_buffer(message));
142 #ifdef UNIT_TESTS_BUILT
143 auto tag = MessageFrame::tag;
144 tags.push_back(tag);
145 #endif
146 });
147
148 if (!conn.policy.lossy) {
149 out_sent_msgs.insert(
150 out_sent_msgs.end(),
151 std::make_move_iterator(out_pending_msgs.begin()),
152 std::make_move_iterator(out_pending_msgs.end()));
153 }
154 out_pending_msgs.clear();
155
156 #ifdef UNIT_TESTS_BUILT
157 return sweep_ret{std::move(bl), tags};
158 #else
159 return bl;
160 #endif
161 }
162
163 seastar::future<> IOHandler::send(MessageFRef msg)
164 {
165 // sid may be changed on-the-fly during the submission
166 if (seastar::this_shard_id() == get_shard_id()) {
167 return do_send(std::move(msg));
168 } else {
169 logger().trace("{} send() is directed to {} -- {}",
170 conn, get_shard_id(), *msg);
171 return seastar::smp::submit_to(
172 get_shard_id(), [this, msg=std::move(msg)]() mutable {
173 return send_redirected(std::move(msg));
174 });
175 }
176 }
177
178 seastar::future<> IOHandler::send_redirected(MessageFRef msg)
179 {
180 // sid may be changed on-the-fly during the submission
181 if (seastar::this_shard_id() == get_shard_id()) {
182 return do_send(std::move(msg));
183 } else {
184 logger().debug("{} send() is redirected to {} -- {}",
185 conn, get_shard_id(), *msg);
186 return seastar::smp::submit_to(
187 get_shard_id(), [this, msg=std::move(msg)]() mutable {
188 return send_redirected(std::move(msg));
189 });
190 }
191 }
192
193 seastar::future<> IOHandler::do_send(MessageFRef msg)
194 {
195 assert(seastar::this_shard_id() == get_shard_id());
196 logger().trace("{} do_send() got message -- {}", conn, *msg);
197 if (get_io_state() != io_state_t::drop) {
198 out_pending_msgs.push_back(std::move(msg));
199 notify_out_dispatch();
200 }
201 return seastar::now();
202 }
203
204 seastar::future<> IOHandler::send_keepalive()
205 {
206 // sid may be changed on-the-fly during the submission
207 if (seastar::this_shard_id() == get_shard_id()) {
208 return do_send_keepalive();
209 } else {
210 logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
211 return seastar::smp::submit_to(
212 get_shard_id(), [this] {
213 return send_keepalive_redirected();
214 });
215 }
216 }
217
218 seastar::future<> IOHandler::send_keepalive_redirected()
219 {
220 // sid may be changed on-the-fly during the submission
221 if (seastar::this_shard_id() == get_shard_id()) {
222 return do_send_keepalive();
223 } else {
224 logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
225 return seastar::smp::submit_to(
226 get_shard_id(), [this] {
227 return send_keepalive_redirected();
228 });
229 }
230 }
231
232 seastar::future<> IOHandler::do_send_keepalive()
233 {
234 assert(seastar::this_shard_id() == get_shard_id());
235 logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
236 if (!need_keepalive) {
237 need_keepalive = true;
238 notify_out_dispatch();
239 }
240 return seastar::now();
241 }
242
243 void IOHandler::mark_down()
244 {
245 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
246 ceph_assert_always(get_io_state() != io_state_t::none);
247 need_dispatch_reset = false;
248 if (get_io_state() == io_state_t::drop) {
249 return;
250 }
251
252 auto cc_seq = crosscore.prepare_submit();
253 logger().info("{} mark_down() at {}, send {} notify_mark_down()",
254 conn, io_stat_printer{*this}, cc_seq);
255 do_set_io_state(io_state_t::drop);
256 shard_states->dispatch_in_background(
257 "notify_mark_down", conn, [this, cc_seq] {
258 return seastar::smp::submit_to(
259 conn.get_messenger_shard_id(), [this, cc_seq] {
260 return handshake_listener->notify_mark_down(cc_seq);
261 });
262 });
263 }
264
265 void IOHandler::print_io_stat(std::ostream &out) const
266 {
267 assert(seastar::this_shard_id() == get_shard_id());
268 out << "io_stat("
269 << "io_state=" << fmt::format("{}", get_io_state())
270 << ", in_seq=" << in_seq
271 << ", out_seq=" << out_seq
272 << ", out_pending_msgs_size=" << out_pending_msgs.size()
273 << ", out_sent_msgs_size=" << out_sent_msgs.size()
274 << ", need_ack=" << (ack_left > 0)
275 << ", need_keepalive=" << need_keepalive
276 << ", need_keepalive_ack=" << bool(next_keepalive_ack)
277 << ")";
278 }
279
280 void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
281 {
282 assert(fa != nullptr);
283 ceph_assert_always(frame_assembler == nullptr);
284 frame_assembler = std::move(fa);
285 ceph_assert_always(
286 frame_assembler->get_shard_id() == get_shard_id());
287 // should have been set through dispatch_accept/connect()
288 ceph_assert_always(
289 frame_assembler->get_socket_shard_id() == get_shard_id());
290 ceph_assert_always(frame_assembler->is_socket_valid());
291 }
292
293 void IOHandler::do_set_io_state(
294 io_state_t new_state,
295 std::optional<crosscore_t::seq_t> cc_seq,
296 FrameAssemblerV2Ref fa,
297 bool set_notify_out)
298 {
299 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
300 auto prv_state = get_io_state();
301 logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, "
302 "fa={}, set_notify_out={}, at {}",
303 conn,
304 cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "",
305 prv_state, new_state,
306 fa ? "present" : "N/A", set_notify_out,
307 io_stat_printer{*this});
308 ceph_assert_always(!(
309 (new_state == io_state_t::none && prv_state != io_state_t::none) ||
310 (new_state == io_state_t::open && prv_state == io_state_t::open)
311 ));
312
313 if (prv_state == io_state_t::drop) {
314 // only possible due to a racing mark_down() from user
315 if (new_state == io_state_t::open) {
316 assign_frame_assembler(std::move(fa));
317 frame_assembler->shutdown_socket<false>(nullptr);
318 } else {
319 assert(fa == nullptr);
320 }
321 return;
322 }
323
324 bool dispatch_in = false;
325 if (new_state == io_state_t::open) {
326 // to open
327 ceph_assert_always(protocol_is_connected == true);
328 assign_frame_assembler(std::move(fa));
329 dispatch_in = true;
330 } else if (prv_state == io_state_t::open) {
331 // from open
332 ceph_assert_always(protocol_is_connected == true);
333 protocol_is_connected = false;
334 assert(fa == nullptr);
335 ceph_assert_always(frame_assembler->is_socket_valid());
336 frame_assembler->shutdown_socket<false>(nullptr);
337 } else {
338 assert(fa == nullptr);
339 }
340
341 if (new_state == io_state_t::delay) {
342 need_notify_out = set_notify_out;
343 if (need_notify_out) {
344 maybe_notify_out_dispatch();
345 }
346 } else {
347 assert(set_notify_out == false);
348 need_notify_out = false;
349 }
350
351 // FIXME: simplify and drop the prv_state == new_state case
352 if (prv_state != new_state) {
353 shard_states->set_io_state(new_state);
354 }
355
356 /*
357 * not atomic below
358 */
359
360 if (dispatch_in) {
361 do_in_dispatch();
362 }
363 }
364
365 seastar::future<> IOHandler::set_io_state(
366 crosscore_t::seq_t cc_seq,
367 io_state_t new_state,
368 FrameAssemblerV2Ref fa,
369 bool set_notify_out)
370 {
371 assert(seastar::this_shard_id() == get_shard_id());
372 if (!crosscore.proceed_or_wait(cc_seq)) {
373 logger().debug("{} got {} set_io_state(), wait at {}",
374 conn, cc_seq, crosscore.get_in_seq());
375 return crosscore.wait(cc_seq
376 ).then([this, cc_seq, new_state,
377 fa=std::move(fa), set_notify_out]() mutable {
378 return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
379 });
380 }
381
382 do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out);
383 return seastar::now();
384 }
385
386 seastar::future<IOHandler::exit_dispatching_ret>
387 IOHandler::wait_io_exit_dispatching(
388 crosscore_t::seq_t cc_seq)
389 {
390 assert(seastar::this_shard_id() == get_shard_id());
391 if (!crosscore.proceed_or_wait(cc_seq)) {
392 logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
393 conn, cc_seq, crosscore.get_in_seq());
394 return crosscore.wait(cc_seq
395 ).then([this, cc_seq] {
396 return wait_io_exit_dispatching(cc_seq);
397 });
398 }
399
400 logger().debug("{} got {} wait_io_exit_dispatching()",
401 conn, cc_seq);
402 ceph_assert_always(get_io_state() != io_state_t::open);
403 ceph_assert_always(frame_assembler != nullptr);
404 ceph_assert_always(!frame_assembler->is_socket_valid());
405 return seastar::futurize_invoke([this] {
406 // cannot be running in parallel with to_new_sid()
407 if (maybe_dropped_sid.has_value()) {
408 ceph_assert_always(get_io_state() == io_state_t::drop);
409 assert(shard_states->assert_closed_and_exit());
410 auto prv_sid = *maybe_dropped_sid;
411 return seastar::smp::submit_to(prv_sid, [this] {
412 logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn);
413 assert(maybe_prv_shard_states != nullptr);
414 return maybe_prv_shard_states->wait_io_exit_dispatching();
415 });
416 } else {
417 return shard_states->wait_io_exit_dispatching();
418 }
419 }).then([this] {
420 logger().debug("{} finish wait_io_exit_dispatching at {}",
421 conn, io_stat_printer{*this});
422 ceph_assert_always(frame_assembler != nullptr);
423 ceph_assert_always(!frame_assembler->is_socket_valid());
424 frame_assembler->set_shard_id(conn.get_messenger_shard_id());
425 return exit_dispatching_ret{
426 std::move(frame_assembler),
427 get_states()};
428 });
429 }
430
431 seastar::future<> IOHandler::reset_session(
432 crosscore_t::seq_t cc_seq,
433 bool full)
434 {
435 assert(seastar::this_shard_id() == get_shard_id());
436 if (!crosscore.proceed_or_wait(cc_seq)) {
437 logger().debug("{} got {} reset_session(), wait at {}",
438 conn, cc_seq, crosscore.get_in_seq());
439 return crosscore.wait(cc_seq
440 ).then([this, cc_seq, full] {
441 return reset_session(cc_seq, full);
442 });
443 }
444
445 logger().debug("{} got {} reset_session({})",
446 conn, cc_seq, full);
447 assert(get_io_state() != io_state_t::open);
448 reset_in();
449 if (full) {
450 reset_out();
451 dispatch_remote_reset();
452 }
453 return seastar::now();
454 }
455
456 seastar::future<> IOHandler::reset_peer_state(
457 crosscore_t::seq_t cc_seq)
458 {
459 assert(seastar::this_shard_id() == get_shard_id());
460 if (!crosscore.proceed_or_wait(cc_seq)) {
461 logger().debug("{} got {} reset_peer_state(), wait at {}",
462 conn, cc_seq, crosscore.get_in_seq());
463 return crosscore.wait(cc_seq
464 ).then([this, cc_seq] {
465 return reset_peer_state(cc_seq);
466 });
467 }
468
469 logger().debug("{} got {} reset_peer_state()",
470 conn, cc_seq);
471 assert(get_io_state() != io_state_t::open);
472 reset_in();
473 do_requeue_out_sent_up_to(0);
474 discard_out_sent();
475 return seastar::now();
476 }
477
478 seastar::future<> IOHandler::requeue_out_sent(
479 crosscore_t::seq_t cc_seq)
480 {
481 assert(seastar::this_shard_id() == get_shard_id());
482 if (!crosscore.proceed_or_wait(cc_seq)) {
483 logger().debug("{} got {} requeue_out_sent(), wait at {}",
484 conn, cc_seq, crosscore.get_in_seq());
485 return crosscore.wait(cc_seq
486 ).then([this, cc_seq] {
487 return requeue_out_sent(cc_seq);
488 });
489 }
490
491 logger().debug("{} got {} requeue_out_sent()",
492 conn, cc_seq);
493 do_requeue_out_sent();
494 return seastar::now();
495 }
496
497 void IOHandler::do_requeue_out_sent()
498 {
499 assert(get_io_state() != io_state_t::open);
500 if (out_sent_msgs.empty()) {
501 return;
502 }
503
504 out_seq -= out_sent_msgs.size();
505 logger().debug("{} requeue {} items, revert out_seq to {}",
506 conn, out_sent_msgs.size(), out_seq);
507 for (MessageFRef& msg : out_sent_msgs) {
508 msg->clear_payload();
509 msg->set_seq(0);
510 }
511 out_pending_msgs.insert(
512 out_pending_msgs.begin(),
513 std::make_move_iterator(out_sent_msgs.begin()),
514 std::make_move_iterator(out_sent_msgs.end()));
515 out_sent_msgs.clear();
516 maybe_notify_out_dispatch();
517 }
518
519 seastar::future<> IOHandler::requeue_out_sent_up_to(
520 crosscore_t::seq_t cc_seq,
521 seq_num_t msg_seq)
522 {
523 assert(seastar::this_shard_id() == get_shard_id());
524 if (!crosscore.proceed_or_wait(cc_seq)) {
525 logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
526 conn, cc_seq, crosscore.get_in_seq());
527 return crosscore.wait(cc_seq
528 ).then([this, cc_seq, msg_seq] {
529 return requeue_out_sent_up_to(cc_seq, msg_seq);
530 });
531 }
532
533 logger().debug("{} got {} requeue_out_sent_up_to({})",
534 conn, cc_seq, msg_seq);
535 do_requeue_out_sent_up_to(msg_seq);
536 return seastar::now();
537 }
538
539 void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq)
540 {
541 assert(get_io_state() != io_state_t::open);
542 if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
543 logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
544 conn, out_seq, seq);
545 out_seq = seq;
546 return;
547 }
548 logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
549 conn, seq, out_sent_msgs.size(), out_seq);
550 while (!out_sent_msgs.empty()) {
551 auto cur_seq = out_sent_msgs.front()->get_seq();
552 if (cur_seq == 0 || cur_seq > seq) {
553 break;
554 } else {
555 out_sent_msgs.pop_front();
556 }
557 }
558 do_requeue_out_sent();
559 }
560
561 void IOHandler::reset_in()
562 {
563 assert(get_io_state() != io_state_t::open);
564 in_seq = 0;
565 }
566
567 void IOHandler::reset_out()
568 {
569 assert(get_io_state() != io_state_t::open);
570 discard_out_sent();
571 out_pending_msgs.clear();
572 need_keepalive = false;
573 next_keepalive_ack = std::nullopt;
574 ack_left = 0;
575 }
576
577 void IOHandler::discard_out_sent()
578 {
579 assert(get_io_state() != io_state_t::open);
580 out_seq = 0;
581 out_sent_msgs.clear();
582 }
583
584 seastar::future<>
585 IOHandler::dispatch_accept(
586 crosscore_t::seq_t cc_seq,
587 seastar::shard_id new_sid,
588 ConnectionFRef conn_fref,
589 bool is_replace)
590 {
591 return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
592 }
593
594 seastar::future<>
595 IOHandler::dispatch_connect(
596 crosscore_t::seq_t cc_seq,
597 seastar::shard_id new_sid,
598 ConnectionFRef conn_fref)
599 {
600 return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt);
601 }
602
603 seastar::future<>
604 IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
605 {
606 assert(seastar::this_shard_id() == get_shard_id());
607 return seastar::smp::submit_to(prv_sid, [this] {
608 logger().debug("{} got cleanup_prv_shard()", conn);
609 assert(maybe_prv_shard_states != nullptr);
610 auto ref_prv_states = std::move(maybe_prv_shard_states);
611 auto &prv_states = *ref_prv_states;
612 return prv_states.close(
613 ).then([ref_prv_states=std::move(ref_prv_states)] {
614 ceph_assert_always(ref_prv_states->assert_closed_and_exit());
615 });
616 }).then([this] {
617 ceph_assert_always(maybe_prv_shard_states == nullptr);
618 });
619 }
620
621 seastar::future<>
622 IOHandler::to_new_sid(
623 crosscore_t::seq_t cc_seq,
624 seastar::shard_id new_sid,
625 ConnectionFRef conn_fref,
626 std::optional<bool> is_replace)
627 {
628 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
629 if (!crosscore.proceed_or_wait(cc_seq)) {
630 logger().debug("{} got {} to_new_sid(), wait at {}",
631 conn, cc_seq, crosscore.get_in_seq());
632 return crosscore.wait(cc_seq
633 ).then([this, cc_seq, new_sid, is_replace,
634 conn_fref=std::move(conn_fref)]() mutable {
635 return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
636 });
637 }
638
639 bool is_accept_or_connect = is_replace.has_value();
640 logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}",
641 conn, cc_seq, new_sid,
642 fmt::format("{}",
643 is_accept_or_connect ?
644 (*is_replace ? "accept(replace)" : "accept(!replace)") :
645 "connect"),
646 io_stat_printer{*this});
647 auto next_cc_seq = ++cc_seq;
648
649 if (get_io_state() != io_state_t::drop) {
650 ceph_assert_always(conn_ref);
651 if (new_sid != seastar::this_shard_id()) {
652 dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect);
653 // user can make changes
654 }
655 } else {
656 // it is possible that both io_handler and protocolv2 are
657 // trying to close each other from different cores simultaneously.
658 assert(!protocol_is_connected);
659 }
660
661 if (get_io_state() != io_state_t::drop) {
662 if (is_accept_or_connect) {
663 // protocol_is_connected can be from true to true here if the replacing is
664 // happening to a connected connection.
665 } else {
666 ceph_assert_always(protocol_is_connected == false);
667 }
668 protocol_is_connected = true;
669 } else {
670 assert(!protocol_is_connected);
671 }
672
673 bool is_dropped = false;
674 if (get_io_state() == io_state_t::drop) {
675 is_dropped = true;
676 }
677 ceph_assert_always(get_io_state() != io_state_t::open);
678
679 // apply the switching atomically
680 ceph_assert_always(conn_ref);
681 conn_ref.reset();
682 auto prv_sid = get_shard_id();
683 ceph_assert_always(maybe_prv_shard_states == nullptr);
684 maybe_prv_shard_states = std::move(shard_states);
685 shard_states = shard_states_t::create_from_previous(
686 *maybe_prv_shard_states, new_sid);
687 assert(new_sid == get_shard_id());
688
689 return seastar::smp::submit_to(new_sid,
690 [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
691 logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}",
692 conn, next_cc_seq, prv_sid, is_dropped,
693 fmt::format("{}",
694 is_replace.has_value() ?
695 (*is_replace ? "accept(replace)" : "accept(!replace)") :
696 "connect"),
697 io_stat_printer{*this});
698
699 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
700 ceph_assert_always(get_io_state() != io_state_t::open);
701 ceph_assert_always(!maybe_dropped_sid.has_value());
702 ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
703
704 if (is_dropped) {
705 ceph_assert_always(get_io_state() == io_state_t::drop);
706 ceph_assert_always(shard_states->assert_closed_and_exit());
707 maybe_dropped_sid = prv_sid;
708 // cleanup_prv_shard() will be done in a follow-up close_io()
709 } else {
710 // possible at io_state_t::drop
711
712 // previous shard is not cleaned,
713 // but close_io() is responsible to clean up the current shard,
714 // so cleanup the previous shard here.
715 shard_states->dispatch_in_background(
716 "cleanup_prv_sid", conn, [this, prv_sid] {
717 return cleanup_prv_shard(prv_sid);
718 });
719 maybe_notify_out_dispatch();
720 }
721
722 ceph_assert_always(!conn_ref);
723 // assign even if already dropping
724 conn_ref = make_local_shared_foreign(std::move(conn_fref));
725
726 if (get_io_state() != io_state_t::drop) {
727 if (is_replace.has_value()) {
728 dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace);
729 } else {
730 dispatchers.ms_handle_connect(conn_ref, prv_sid);
731 }
732 // user can make changes
733 }
734 });
735 }
736
737 seastar::future<> IOHandler::set_accepted_sid(
738 crosscore_t::seq_t cc_seq,
739 seastar::shard_id sid,
740 ConnectionFRef conn_fref)
741 {
742 assert(seastar::this_shard_id() == get_shard_id());
743 assert(get_io_state() == io_state_t::none);
744 ceph_assert_always(conn_ref);
745 conn_ref.reset();
746 assert(maybe_prv_shard_states == nullptr);
747 shard_states.reset();
748 shard_states = shard_states_t::create(sid, io_state_t::none);
749 return seastar::smp::submit_to(sid,
750 [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable {
751 // must be the first to proceed
752 ceph_assert_always(crosscore.proceed_or_wait(cc_seq));
753
754 logger().debug("{} set accepted sid", conn);
755 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
756 ceph_assert_always(get_io_state() == io_state_t::none);
757 assert(maybe_prv_shard_states == nullptr);
758 ceph_assert_always(!conn_ref);
759 conn_ref = make_local_shared_foreign(std::move(conn_fref));
760 });
761 }
762
763 void IOHandler::dispatch_reset(bool is_replace)
764 {
765 ceph_assert_always(get_io_state() == io_state_t::drop);
766 if (!need_dispatch_reset) {
767 return;
768 }
769 need_dispatch_reset = false;
770 ceph_assert_always(conn_ref);
771
772 dispatchers.ms_handle_reset(conn_ref, is_replace);
773 // user can make changes
774 }
775
776 void IOHandler::dispatch_remote_reset()
777 {
778 if (get_io_state() == io_state_t::drop) {
779 return;
780 }
781 ceph_assert_always(conn_ref);
782
783 dispatchers.ms_handle_remote_reset(conn_ref);
784 // user can make changes
785 }
786
787 void IOHandler::ack_out_sent(seq_num_t seq)
788 {
789 if (conn.policy.lossy) { // lossy connections don't keep sent messages
790 return;
791 }
792 while (!out_sent_msgs.empty() &&
793 out_sent_msgs.front()->get_seq() <= seq) {
794 logger().trace("{} got ack seq {} >= {}, pop {}",
795 conn, seq, out_sent_msgs.front()->get_seq(),
796 *out_sent_msgs.front());
797 out_sent_msgs.pop_front();
798 }
799 }
800
801 seastar::future<>
802 IOHandler::do_out_dispatch(shard_states_t &ctx)
803 {
804 return seastar::repeat([this, &ctx] {
805 switch (ctx.get_io_state()) {
806 case io_state_t::open: {
807 if (unlikely(!is_out_queued())) {
808 // try exit open dispatching
809 return frame_assembler->flush<false>(
810 ).then([this, &ctx] {
811 if (ctx.get_io_state() != io_state_t::open || is_out_queued()) {
812 return seastar::make_ready_future<stop_t>(stop_t::no);
813 }
814 // still nothing pending to send after flush,
815 // open dispatching can ONLY stop now
816 ctx.exit_out_dispatching("exit-open", conn);
817 return seastar::make_ready_future<stop_t>(stop_t::yes);
818 });
819 }
820
821 auto require_keepalive = need_keepalive;
822 need_keepalive = false;
823 auto maybe_keepalive_ack = next_keepalive_ack;
824 next_keepalive_ack = std::nullopt;
825 auto to_ack = ack_left;
826 assert(to_ack == 0 || in_seq > 0);
827 ack_left = 0;
828 #ifdef UNIT_TESTS_BUILT
829 auto ret = sweep_out_pending_msgs_to_sent(
830 require_keepalive, maybe_keepalive_ack, to_ack > 0);
831 return frame_assembler->intercept_frames(ret.tags, true
832 ).then([this, bl=std::move(ret.bl)]() mutable {
833 return frame_assembler->write<false>(std::move(bl));
834 }
835 #else
836 auto bl = sweep_out_pending_msgs_to_sent(
837 require_keepalive, maybe_keepalive_ack, to_ack > 0);
838 return frame_assembler->write<false>(std::move(bl)
839 #endif
840 ).then([this, &ctx] {
841 if (ctx.get_io_state() != io_state_t::open) {
842 return frame_assembler->flush<false>(
843 ).then([] {
844 return seastar::make_ready_future<stop_t>(stop_t::no);
845 });
846 }
847
848 // FIXME: may leak a flush if state is changed after return and before
849 // the next repeat body.
850 return seastar::make_ready_future<stop_t>(stop_t::no);
851 });
852 }
853 case io_state_t::delay:
854 // delay out dispatching until open
855 ctx.notify_out_dispatching_stopped("delay...", conn);
856 return ctx.wait_state_change(
857 ).then([] { return stop_t::no; });
858 case io_state_t::drop:
859 ctx.exit_out_dispatching("dropped", conn);
860 return seastar::make_ready_future<stop_t>(stop_t::yes);
861 case io_state_t::switched:
862 ctx.exit_out_dispatching("switched", conn);
863 return seastar::make_ready_future<stop_t>(stop_t::yes);
864 default:
865 ceph_abort("impossible");
866 }
867 }).handle_exception_type([this, &ctx](const std::system_error& e) {
868 auto io_state = ctx.get_io_state();
869 if (e.code() != std::errc::broken_pipe &&
870 e.code() != std::errc::connection_reset &&
871 e.code() != error::negotiation_failure) {
872 logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
873 conn, io_state, e.what());
874 ceph_abort();
875 }
876
877 if (io_state == io_state_t::open) {
878 auto cc_seq = crosscore.prepare_submit();
879 logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
880 "send {} notify_out_fault()",
881 conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
882 std::exception_ptr eptr;
883 try {
884 throw e;
885 } catch(...) {
886 eptr = std::current_exception();
887 }
888 do_set_io_state(io_state_t::delay);
889 shard_states->dispatch_in_background(
890 "notify_out_fault(out)", conn, [this, cc_seq, eptr] {
891 auto states = get_states();
892 return seastar::smp::submit_to(
893 conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
894 return handshake_listener->notify_out_fault(
895 cc_seq, "do_out_dispatch", eptr, states);
896 });
897 });
898 } else {
899 if (io_state != io_state_t::switched) {
900 logger().info("{} do_out_dispatch(): fault at {}, {} -- {}",
901 conn, io_state, io_stat_printer{*this}, e.what());
902 } else {
903 logger().info("{} do_out_dispatch(): fault at {} -- {}",
904 conn, io_state, e.what());
905 }
906 }
907
908 return do_out_dispatch(ctx);
909 });
910 }
911
912 void IOHandler::maybe_notify_out_dispatch()
913 {
914 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
915 if (is_out_queued()) {
916 notify_out_dispatch();
917 }
918 }
919
920 void IOHandler::notify_out_dispatch()
921 {
922 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
923 assert(is_out_queued());
924 if (need_notify_out) {
925 auto cc_seq = crosscore.prepare_submit();
926 logger().debug("{} send {} notify_out()",
927 conn, cc_seq);
928 shard_states->dispatch_in_background(
929 "notify_out", conn, [this, cc_seq] {
930 return seastar::smp::submit_to(
931 conn.get_messenger_shard_id(), [this, cc_seq] {
932 return handshake_listener->notify_out(cc_seq);
933 });
934 });
935 }
936 if (shard_states->try_enter_out_dispatching()) {
937 shard_states->dispatch_in_background(
938 "do_out_dispatch", conn, [this] {
939 return do_out_dispatch(*shard_states);
940 });
941 }
942 }
943
944 seastar::future<>
945 IOHandler::read_message(
946 shard_states_t &ctx,
947 utime_t throttle_stamp,
948 std::size_t msg_size)
949 {
950 return frame_assembler->read_frame_payload<false>(
951 ).then([this, throttle_stamp, msg_size, &ctx](auto payload) {
952 if (unlikely(ctx.get_io_state() != io_state_t::open)) {
953 logger().debug("{} triggered {} during read_message()",
954 conn, ctx.get_io_state());
955 abort_protocol();
956 }
957
958 utime_t recv_stamp{seastar::lowres_system_clock::now()};
959
960 // we need to get the size before std::moving segments data
961 auto msg_frame = MessageFrame::Decode(*payload);
962 // XXX: paranoid copy just to avoid oops
963 ceph_msg_header2 current_header = msg_frame.header();
964
965 logger().trace("{} got {} + {} + {} byte message,"
966 " envelope type={} src={} off={} seq={}",
967 conn,
968 msg_frame.front_len(),
969 msg_frame.middle_len(),
970 msg_frame.data_len(),
971 current_header.type,
972 conn.get_peer_name(),
973 current_header.data_off,
974 current_header.seq);
975
976 ceph_msg_header header{current_header.seq,
977 current_header.tid,
978 current_header.type,
979 current_header.priority,
980 current_header.version,
981 ceph_le32(msg_frame.front_len()),
982 ceph_le32(msg_frame.middle_len()),
983 ceph_le32(msg_frame.data_len()),
984 current_header.data_off,
985 conn.get_peer_name(),
986 current_header.compat_version,
987 current_header.reserved,
988 ceph_le32(0)};
989 ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
990 ceph_le32(0), ceph_le64(0), current_header.flags};
991
992 Message *message = decode_message(nullptr, 0, header, footer,
993 msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr);
994 if (!message) {
995 logger().warn("{} decode message failed", conn);
996 abort_in_fault();
997 }
998
999 // store reservation size in message, so we don't get confused
1000 // by messages entering the dispatch queue through other paths.
1001 message->set_dispatch_throttle_size(msg_size);
1002
1003 message->set_throttle_stamp(throttle_stamp);
1004 message->set_recv_stamp(recv_stamp);
1005 message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
1006
1007 // check received seq#. if it is old, drop the message.
1008 // note that incoming messages may skip ahead. this is convenient for the
1009 // client side queueing because messages can't be renumbered, but the (kernel)
1010 // client will occasionally pull a message out of the sent queue to send
1011 // elsewhere. in that case it doesn't matter if we "got" it or not.
1012 uint64_t cur_seq = in_seq;
1013 if (message->get_seq() <= cur_seq) {
1014 logger().error("{} got old message {} <= {} {}, discarding",
1015 conn, message->get_seq(), cur_seq, *message);
1016 if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
1017 local_conf()->ms_die_on_old_message) {
1018 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1019 }
1020 return seastar::now();
1021 } else if (message->get_seq() > cur_seq + 1) {
1022 logger().error("{} missed message? skipped from seq {} to {}",
1023 conn, cur_seq, message->get_seq());
1024 if (local_conf()->ms_die_on_skipped_message) {
1025 ceph_assert(0 == "skipped incoming seq");
1026 }
1027 }
1028
1029 // note last received message.
1030 in_seq = message->get_seq();
1031 if (conn.policy.lossy) {
1032 logger().debug("{} <== #{} === {} ({})",
1033 conn,
1034 message->get_seq(),
1035 *message,
1036 message->get_type());
1037 } else {
1038 logger().debug("{} <== #{},{} === {} ({})",
1039 conn,
1040 message->get_seq(),
1041 current_header.ack_seq,
1042 *message,
1043 message->get_type());
1044 }
1045
1046 // notify ack
1047 if (!conn.policy.lossy) {
1048 ++ack_left;
1049 notify_out_dispatch();
1050 }
1051
1052 ack_out_sent(current_header.ack_seq);
1053
1054 // TODO: change MessageRef with seastar::shared_ptr
1055 auto msg_ref = MessageRef{message, false};
1056 assert(ctx.get_io_state() == io_state_t::open);
1057 assert(get_io_state() == io_state_t::open);
1058 ceph_assert_always(conn_ref);
1059
1060 // throttle the reading process by the returned future
1061 return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
1062 // user can make changes
1063 });
1064 }
1065
1066 void IOHandler::do_in_dispatch()
1067 {
1068 shard_states->enter_in_dispatching();
1069 shard_states->dispatch_in_background(
1070 "do_in_dispatch", conn, [this, &ctx=*shard_states] {
1071 return seastar::keep_doing([this, &ctx] {
1072 return frame_assembler->read_main_preamble<false>(
1073 ).then([this, &ctx](auto ret) {
1074 switch (ret.tag) {
1075 case Tag::MESSAGE: {
1076 size_t msg_size = get_msg_size(*ret.rx_frame_asm);
1077 return seastar::futurize_invoke([this] {
1078 // throttle_message() logic
1079 if (!conn.policy.throttler_messages) {
1080 return seastar::now();
1081 }
1082 // TODO: message throttler
1083 ceph_abort("TODO");
1084 return seastar::now();
1085 }).then([this, msg_size] {
1086 // throttle_bytes() logic
1087 if (!conn.policy.throttler_bytes) {
1088 return seastar::now();
1089 }
1090 if (!msg_size) {
1091 return seastar::now();
1092 }
1093 logger().trace("{} wants {} bytes from policy throttler {}/{}",
1094 conn, msg_size,
1095 conn.policy.throttler_bytes->get_current(),
1096 conn.policy.throttler_bytes->get_max());
1097 return conn.policy.throttler_bytes->get(msg_size);
1098 }).then([this, msg_size, &ctx] {
1099 // TODO: throttle_dispatch_queue() logic
1100 utime_t throttle_stamp{seastar::lowres_system_clock::now()};
1101 return read_message(ctx, throttle_stamp, msg_size);
1102 });
1103 }
1104 case Tag::ACK:
1105 return frame_assembler->read_frame_payload<false>(
1106 ).then([this](auto payload) {
1107 // handle_message_ack() logic
1108 auto ack = AckFrame::Decode(payload->back());
1109 logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
1110 ack_out_sent(ack.seq());
1111 });
1112 case Tag::KEEPALIVE2:
1113 return frame_assembler->read_frame_payload<false>(
1114 ).then([this](auto payload) {
1115 // handle_keepalive2() logic
1116 auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
1117 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
1118 conn, keepalive_frame.timestamp());
1119 // notify keepalive ack
1120 next_keepalive_ack = keepalive_frame.timestamp();
1121 if (seastar::this_shard_id() == get_shard_id()) {
1122 notify_out_dispatch();
1123 }
1124
1125 last_keepalive = seastar::lowres_system_clock::now();
1126 });
1127 case Tag::KEEPALIVE2_ACK:
1128 return frame_assembler->read_frame_payload<false>(
1129 ).then([this](auto payload) {
1130 // handle_keepalive2_ack() logic
1131 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
1132 auto _last_keepalive_ack =
1133 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
1134 set_last_keepalive_ack(_last_keepalive_ack);
1135 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
1136 conn, _last_keepalive_ack);
1137 });
1138 default: {
1139 logger().warn("{} do_in_dispatch() received unexpected tag: {}",
1140 conn, static_cast<uint32_t>(ret.tag));
1141 abort_in_fault();
1142 }
1143 }
1144 });
1145 }).handle_exception([this, &ctx](std::exception_ptr eptr) {
1146 const char *e_what;
1147 try {
1148 std::rethrow_exception(eptr);
1149 } catch (std::exception &e) {
1150 e_what = e.what();
1151 }
1152
1153 auto io_state = ctx.get_io_state();
1154 if (io_state == io_state_t::open) {
1155 auto cc_seq = crosscore.prepare_submit();
1156 logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
1157 "send {} notify_out_fault()",
1158 conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
1159 do_set_io_state(io_state_t::delay);
1160 shard_states->dispatch_in_background(
1161 "notify_out_fault(in)", conn, [this, cc_seq, eptr] {
1162 auto states = get_states();
1163 return seastar::smp::submit_to(
1164 conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
1165 return handshake_listener->notify_out_fault(
1166 cc_seq, "do_in_dispatch", eptr, states);
1167 });
1168 });
1169 } else {
1170 if (io_state != io_state_t::switched) {
1171 logger().info("{} do_in_dispatch(): fault at {}, {} -- {}",
1172 conn, io_state, io_stat_printer{*this}, e_what);
1173 } else {
1174 logger().info("{} do_in_dispatch(): fault at {} -- {}",
1175 conn, io_state, e_what);
1176 }
1177 }
1178 }).finally([&ctx] {
1179 ctx.exit_in_dispatching();
1180 });
1181 });
1182 }
1183
1184 seastar::future<>
1185 IOHandler::close_io(
1186 crosscore_t::seq_t cc_seq,
1187 bool is_dispatch_reset,
1188 bool is_replace)
1189 {
1190 ceph_assert_always(seastar::this_shard_id() == get_shard_id());
1191 if (!crosscore.proceed_or_wait(cc_seq)) {
1192 logger().debug("{} got {} close_io(), wait at {}",
1193 conn, cc_seq, crosscore.get_in_seq());
1194 return crosscore.wait(cc_seq
1195 ).then([this, cc_seq, is_dispatch_reset, is_replace] {
1196 return close_io(cc_seq, is_dispatch_reset, is_replace);
1197 });
1198 }
1199
1200 logger().debug("{} got {} close_io(reset={}, replace={})",
1201 conn, cc_seq, is_dispatch_reset, is_replace);
1202 ceph_assert_always(get_io_state() == io_state_t::drop);
1203
1204 if (is_dispatch_reset) {
1205 dispatch_reset(is_replace);
1206 }
1207
1208 ceph_assert_always(conn_ref);
1209 conn_ref.reset();
1210
1211 // cannot be running in parallel with to_new_sid()
1212 if (maybe_dropped_sid.has_value()) {
1213 assert(shard_states->assert_closed_and_exit());
1214 auto prv_sid = *maybe_dropped_sid;
1215 return cleanup_prv_shard(prv_sid);
1216 } else {
1217 return shard_states->close(
1218 ).then([this] {
1219 assert(shard_states->assert_closed_and_exit());
1220 });
1221 }
1222 }
1223
1224 /*
1225 * IOHandler::shard_states_t
1226 */
1227
1228 void
1229 IOHandler::shard_states_t::notify_out_dispatching_stopped(
1230 const char *what, SocketConnection &conn)
1231 {
1232 assert(seastar::this_shard_id() == sid);
1233 if (unlikely(out_exit_dispatching.has_value())) {
1234 out_exit_dispatching->set_value();
1235 out_exit_dispatching = std::nullopt;
1236 logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching",
1237 conn, what, io_state);
1238 } else {
1239 if (unlikely(io_state != io_state_t::open)) {
1240 logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching",
1241 conn, what, io_state);
1242 }
1243 }
1244 }
1245
1246 seastar::future<>
1247 IOHandler::shard_states_t::wait_io_exit_dispatching()
1248 {
1249 assert(seastar::this_shard_id() == sid);
1250 assert(io_state != io_state_t::open);
1251 assert(!gate.is_closed());
1252 return seastar::when_all(
1253 [this] {
1254 if (out_exit_dispatching) {
1255 return out_exit_dispatching->get_future();
1256 } else {
1257 return seastar::now();
1258 }
1259 }(),
1260 [this] {
1261 if (in_exit_dispatching) {
1262 return in_exit_dispatching->get_future();
1263 } else {
1264 return seastar::now();
1265 }
1266 }()
1267 ).discard_result();
1268 }
1269
1270 IOHandler::shard_states_ref_t
1271 IOHandler::shard_states_t::create_from_previous(
1272 shard_states_t &prv_states,
1273 seastar::shard_id new_sid)
1274 {
1275 auto io_state = prv_states.io_state;
1276 assert(io_state != io_state_t::open);
1277 auto ret = shard_states_t::create(new_sid, io_state);
1278 if (io_state == io_state_t::drop) {
1279 // the new gate should not never be used
1280 auto fut = ret->gate.close();
1281 ceph_assert_always(fut.available());
1282 }
1283 prv_states.set_io_state(io_state_t::switched);
1284 return ret;
1285 }
1286
1287 } // namespace crimson::net