]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/ProtocolV2.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV2.h"
5
6 #include <fmt/format.h>
7 #include <fmt/ranges.h>
8 #include "include/msgr.h"
9 #include "include/random.h"
10 #include "msg/msg_fmt.h"
11
12 #include "crimson/auth/AuthClient.h"
13 #include "crimson/auth/AuthServer.h"
14 #include "crimson/common/formatter.h"
15 #include "crimson/common/log.h"
16
17 #include "Errors.h"
18 #include "SocketMessenger.h"
19
20 #ifdef UNIT_TESTS_BUILT
21 #include "Interceptor.h"
22 #endif
23
24 using namespace ceph::msgr::v2;
25 using crimson::common::local_conf;
26 using io_state_t = crimson::net::IOHandler::io_state_t;
27 using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
28
29 namespace {
30
31 // TODO: CEPH_MSGR2_FEATURE_COMPRESSION
32 const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
33 (CEPH_MSGR2_FEATURE_REVISION_1 |
34 // CEPH_MSGR2_FEATURE_COMPRESSION |
35 UINT64_C(0));
36
37 // Log levels in V2 Protocol:
38 // * error level, something error that cause connection to terminate:
39 // - fatal errors;
40 // - bugs;
41 // * warn level: something unusual that identifies connection fault or replacement:
42 // - unstable network;
43 // - incompatible peer;
44 // - auth failure;
45 // - connection race;
46 // - connection reset;
47 // * info level, something very important to show connection lifecycle,
48 // which doesn't happen very frequently;
49 // * debug level, important logs for debugging, including:
50 // - all the messages sent/received (-->/<==);
51 // - all the frames exchanged (WRITE/GOT);
52 // - important fields updated (UPDATE);
53 // - connection state transitions (TRIGGER);
54 // * trace level, trivial logs showing:
55 // - the exact bytes being sent/received (SEND/RECV(bytes));
56 // - detailed information of sub-frames;
57 // - integrity checks;
58 // - etc.
59 seastar::logger& logger() {
60 return crimson::get_logger(ceph_subsys_ms);
61 }
62
63 [[noreturn]] void abort_in_fault() {
64 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
65 }
66
67 [[noreturn]] void abort_protocol() {
68 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
69 }
70
71 #define ABORT_IN_CLOSE(is_dispatch_reset) { \
72 do_close(is_dispatch_reset); \
73 abort_protocol(); \
74 }
75
76 inline void expect_tag(const Tag& expected,
77 const Tag& actual,
78 crimson::net::SocketConnection& conn,
79 const char *where) {
80 if (actual != expected) {
81 logger().warn("{} {} received wrong tag: {}, expected {}",
82 conn, where,
83 static_cast<uint32_t>(actual),
84 static_cast<uint32_t>(expected));
85 abort_in_fault();
86 }
87 }
88
89 inline void unexpected_tag(const Tag& unexpected,
90 crimson::net::SocketConnection& conn,
91 const char *where) {
92 logger().warn("{} {} received unexpected tag: {}",
93 conn, where, static_cast<uint32_t>(unexpected));
94 abort_in_fault();
95 }
96
97 inline uint64_t generate_client_cookie() {
98 return ceph::util::generate_random_number<uint64_t>(
99 1, std::numeric_limits<uint64_t>::max());
100 }
101
102 } // namespace anonymous
103
104 namespace crimson::net {
105
106 #ifdef UNIT_TESTS_BUILT
107 // should be consistent to intercept_frame() in FrameAssemblerV2.cc
108 void intercept(Breakpoint bp,
109 bp_type_t type,
110 SocketConnection& conn,
111 Interceptor *interceptor,
112 SocketRef& socket) {
113 if (interceptor) {
114 auto action = interceptor->intercept(conn, Breakpoint(bp));
115 socket->set_trap(type, action, &interceptor->blocker);
116 }
117 }
118
119 #define INTERCEPT_CUSTOM(bp, type) \
120 intercept({bp}, type, conn, \
121 conn.interceptor, conn.socket)
122 #else
123 #define INTERCEPT_CUSTOM(bp, type)
124 #endif
125
126 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
127 {
128 logger().warn("{} waiting {} seconds ...", conn, seconds);
129 cancel();
130 last_dur_ = seconds;
131 as = seastar::abort_source();
132 auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
133 std::chrono::duration<double>(seconds));
134 return seastar::sleep_abortable(dur, *as
135 ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
136 logger().debug("{} wait aborted", conn);
137 abort_protocol();
138 });
139 }
140
141 ProtocolV2::ProtocolV2(SocketConnection& conn,
142 IOHandler &io_handler)
143 : conn{conn},
144 messenger{conn.messenger},
145 io_handler{io_handler},
146 frame_assembler{FrameAssemblerV2::create(conn)},
147 auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
148 protocol_timer{conn}
149 {}
150
151 ProtocolV2::~ProtocolV2() {}
152
153 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
154 const entity_name_t& _peer_name)
155 {
156 ceph_assert(state == state_t::NONE);
157 ceph_assert(!gate.is_closed());
158 conn.peer_addr = _peer_addr;
159 conn.target_addr = _peer_addr;
160 conn.set_peer_name(_peer_name);
161 conn.policy = messenger.get_policy(_peer_name.type());
162 client_cookie = generate_client_cookie();
163 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
164 " policy(lossy={}, server={}, standby={}, resetcheck={})",
165 conn, _peer_addr, _peer_name, client_cookie,
166 conn.policy.lossy, conn.policy.server,
167 conn.policy.standby, conn.policy.resetcheck);
168 messenger.register_conn(
169 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
170 execute_connecting();
171 }
172
173 void ProtocolV2::start_accept(SocketRef&& new_socket,
174 const entity_addr_t& _peer_addr)
175 {
176 ceph_assert(state == state_t::NONE);
177 // until we know better
178 conn.target_addr = _peer_addr;
179 frame_assembler->set_socket(std::move(new_socket));
180 has_socket = true;
181 is_socket_valid = true;
182 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
183 messenger.accept_conn(
184 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
185 execute_accepting();
186 }
187
188 void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
189 {
190 if (!reentrant && new_state == state) {
191 logger().error("{} is not allowed to re-trigger state {}",
192 conn, get_state_name(state));
193 ceph_abort();
194 }
195 if (state == state_t::CLOSING) {
196 logger().error("{} CLOSING is not allowed to trigger state {}",
197 conn, get_state_name(new_state));
198 ceph_abort();
199 }
200 logger().debug("{} TRIGGER {}, was {}",
201 conn, get_state_name(new_state), get_state_name(state));
202 auto pre_state = state;
203 if (pre_state == state_t::READY) {
204 assert(!gate.is_closed());
205 ceph_assert_always(!exit_io.has_value());
206 exit_io = seastar::shared_promise<>();
207 }
208 state = new_state;
209 if (new_state == state_t::READY) {
210 // I'm not responsible to shutdown the socket at READY
211 is_socket_valid = false;
212 io_handler.set_io_state(new_io_state, std::move(frame_assembler));
213 } else {
214 io_handler.set_io_state(new_io_state, nullptr);
215 }
216
217 /*
218 * not atomic below
219 */
220
221 if (pre_state == state_t::READY) {
222 gate.dispatch_in_background("exit_io", conn, [this] {
223 return io_handler.wait_io_exit_dispatching(
224 ).then([this](FrameAssemblerV2Ref fa) {
225 frame_assembler = std::move(fa);
226 exit_io->set_value();
227 exit_io = std::nullopt;
228 });
229 });
230 }
231 }
232
233 void ProtocolV2::fault(
234 state_t expected_state,
235 const char *where,
236 std::exception_ptr eptr)
237 {
238 assert(expected_state == state_t::CONNECTING ||
239 expected_state == state_t::ESTABLISHING ||
240 expected_state == state_t::REPLACING ||
241 expected_state == state_t::READY);
242 const char *e_what;
243 try {
244 std::rethrow_exception(eptr);
245 } catch (std::exception &e) {
246 e_what = e.what();
247 }
248
249 if (state != expected_state) {
250 logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
251 conn,
252 get_state_name(expected_state),
253 where,
254 get_state_name(state),
255 e_what);
256 #ifndef NDEBUG
257 if (expected_state == state_t::REPLACING) {
258 assert(state == state_t::CLOSING);
259 } else if (expected_state == state_t::READY) {
260 assert(state == state_t::CLOSING ||
261 state == state_t::REPLACING ||
262 state == state_t::CONNECTING ||
263 state == state_t::STANDBY);
264 } else {
265 assert(state == state_t::CLOSING ||
266 state == state_t::REPLACING);
267 }
268 #endif
269 return;
270 }
271 assert(state == expected_state);
272
273 if (state != state_t::CONNECTING && conn.policy.lossy) {
274 // socket will be shutdown in do_close()
275 logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
276 conn, get_state_name(state), where, e_what);
277 do_close(true);
278 return;
279 }
280
281 if (likely(has_socket)) {
282 if (likely(is_socket_valid)) {
283 ceph_assert_always(state != state_t::READY);
284 frame_assembler->shutdown_socket();
285 is_socket_valid = false;
286 } else {
287 ceph_assert_always(state != state_t::ESTABLISHING);
288 }
289 } else { // !has_socket
290 ceph_assert_always(state == state_t::CONNECTING);
291 assert(!is_socket_valid);
292 }
293
294 if (conn.policy.server ||
295 (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
296 if (conn.policy.server) {
297 logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
298 conn,
299 get_state_name(state),
300 where,
301 io_stat_printer{io_handler},
302 e_what);
303 } else {
304 logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
305 conn,
306 get_state_name(state),
307 where,
308 io_stat_printer{io_handler},
309 e_what);
310 }
311 execute_standby();
312 } else if (state == state_t::CONNECTING ||
313 state == state_t::REPLACING) {
314 logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
315 conn,
316 get_state_name(state),
317 where,
318 io_stat_printer{io_handler},
319 e_what);
320 execute_wait(false);
321 } else {
322 assert(state == state_t::READY ||
323 state == state_t::ESTABLISHING);
324 logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
325 conn,
326 get_state_name(state),
327 where,
328 io_stat_printer{io_handler},
329 e_what);
330 execute_connecting();
331 }
332 }
333
334 void ProtocolV2::reset_session(bool full)
335 {
336 server_cookie = 0;
337 connect_seq = 0;
338 if (full) {
339 client_cookie = generate_client_cookie();
340 peer_global_seq = 0;
341 }
342 io_handler.reset_session(full);
343 }
344
345 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
346 ProtocolV2::banner_exchange(bool is_connect)
347 {
348 // 1. prepare and send banner
349 bufferlist banner_payload;
350 encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
351 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
352
353 bufferlist bl;
354 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
355 auto len_payload = static_cast<uint16_t>(banner_payload.length());
356 encode(len_payload, bl, 0);
357 bl.claim_append(banner_payload);
358 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
359 "required={}, banner=\"{}\"",
360 conn, bl.length(), len_payload,
361 CRIMSON_MSGR2_SUPPORTED_FEATURES,
362 CEPH_MSGR2_REQUIRED_FEATURES,
363 CEPH_BANNER_V2_PREFIX);
364 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
365 return frame_assembler->write_flush(std::move(bl)).then([this] {
366 // 2. read peer banner
367 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
368 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
369 return frame_assembler->read_exactly(banner_len); // or read exactly?
370 }).then([this] (auto bl) {
371 // 3. process peer banner and read banner_payload
372 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
373 logger().debug("{} RECV({}) banner: \"{}\"",
374 conn, bl.size(),
375 std::string((const char*)bl.get(), banner_prefix_len));
376
377 if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
378 if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
379 logger().warn("{} peer is using V1 protocol", conn);
380 } else {
381 logger().warn("{} peer sent bad banner", conn);
382 }
383 abort_in_fault();
384 }
385 bl.trim_front(banner_prefix_len);
386
387 uint16_t payload_len;
388 bufferlist buf;
389 buf.append(buffer::create(std::move(bl)));
390 auto ti = buf.cbegin();
391 try {
392 decode(payload_len, ti);
393 } catch (const buffer::error &e) {
394 logger().warn("{} decode banner payload len failed", conn);
395 abort_in_fault();
396 }
397 logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
398 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
399 return frame_assembler->read(payload_len);
400 }).then([this, is_connect] (bufferlist bl) {
401 // 4. process peer banner_payload and send HelloFrame
402 auto p = bl.cbegin();
403 uint64_t _peer_supported_features;
404 uint64_t _peer_required_features;
405 try {
406 decode(_peer_supported_features, p);
407 decode(_peer_required_features, p);
408 } catch (const buffer::error &e) {
409 logger().warn("{} decode banner payload failed", conn);
410 abort_in_fault();
411 }
412 logger().debug("{} RECV({}) banner features: supported={} required={}",
413 conn, bl.length(),
414 _peer_supported_features, _peer_required_features);
415
416 // Check feature bit compatibility
417 uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
418 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
419 if ((required_features & _peer_supported_features) != required_features) {
420 logger().error("{} peer does not support all required features"
421 " required={} peer_supported={}",
422 conn, required_features, _peer_supported_features);
423 ABORT_IN_CLOSE(is_connect);
424 }
425 if ((supported_features & _peer_required_features) != _peer_required_features) {
426 logger().error("{} we do not support all peer required features"
427 " peer_required={} supported={}",
428 conn, _peer_required_features, supported_features);
429 ABORT_IN_CLOSE(is_connect);
430 }
431 peer_supported_features = _peer_supported_features;
432 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
433 frame_assembler->set_is_rev1(is_rev1);
434
435 auto hello = HelloFrame::Encode(messenger.get_mytype(),
436 conn.target_addr);
437 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
438 conn, ceph_entity_type_name(messenger.get_mytype()),
439 conn.target_addr);
440 return frame_assembler->write_flush_frame(hello);
441 }).then([this] {
442 //5. read peer HelloFrame
443 return frame_assembler->read_main_preamble();
444 }).then([this](auto ret) {
445 expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
446 return frame_assembler->read_frame_payload();
447 }).then([this](auto payload) {
448 // 6. process peer HelloFrame
449 auto hello = HelloFrame::Decode(payload->back());
450 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
451 conn, ceph_entity_type_name(hello.entity_type()),
452 hello.peer_addr());
453 return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
454 std::make_tuple(hello.entity_type(), hello.peer_addr()));
455 });
456 }
457
458 // CONNECTING state
459
460 seastar::future<> ProtocolV2::handle_auth_reply()
461 {
462 return frame_assembler->read_main_preamble(
463 ).then([this](auto ret) {
464 switch (ret.tag) {
465 case Tag::AUTH_BAD_METHOD:
466 return frame_assembler->read_frame_payload(
467 ).then([this](auto payload) {
468 // handle_auth_bad_method() logic
469 auto bad_method = AuthBadMethodFrame::Decode(payload->back());
470 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
471 "allowed_methods={}, allowed_modes={}",
472 conn, bad_method.method(), cpp_strerror(bad_method.result()),
473 bad_method.allowed_methods(), bad_method.allowed_modes());
474 ceph_assert(messenger.get_auth_client());
475 int r = messenger.get_auth_client()->handle_auth_bad_method(
476 conn, *auth_meta,
477 bad_method.method(), bad_method.result(),
478 bad_method.allowed_methods(), bad_method.allowed_modes());
479 if (r < 0) {
480 logger().warn("{} auth_client handle_auth_bad_method returned {}",
481 conn, r);
482 abort_in_fault();
483 }
484 return client_auth(bad_method.allowed_methods());
485 });
486 case Tag::AUTH_REPLY_MORE:
487 return frame_assembler->read_frame_payload(
488 ).then([this](auto payload) {
489 // handle_auth_reply_more() logic
490 auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
491 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
492 conn, auth_more.auth_payload().length());
493 ceph_assert(messenger.get_auth_client());
494 // let execute_connecting() take care of the thrown exception
495 auto reply = messenger.get_auth_client()->handle_auth_reply_more(
496 conn, *auth_meta, auth_more.auth_payload());
497 auto more_reply = AuthRequestMoreFrame::Encode(reply);
498 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
499 conn, reply.length());
500 return frame_assembler->write_flush_frame(more_reply);
501 }).then([this] {
502 return handle_auth_reply();
503 });
504 case Tag::AUTH_DONE:
505 return frame_assembler->read_frame_payload(
506 ).then([this](auto payload) {
507 // handle_auth_done() logic
508 auto auth_done = AuthDoneFrame::Decode(payload->back());
509 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
510 conn, auth_done.global_id(),
511 ceph_con_mode_name(auth_done.con_mode()),
512 auth_done.auth_payload().length());
513 ceph_assert(messenger.get_auth_client());
514 int r = messenger.get_auth_client()->handle_auth_done(
515 conn,
516 *auth_meta,
517 auth_done.global_id(),
518 auth_done.con_mode(),
519 auth_done.auth_payload());
520 if (r < 0) {
521 logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
522 abort_in_fault();
523 }
524 auth_meta->con_mode = auth_done.con_mode();
525 frame_assembler->create_session_stream_handlers(*auth_meta, false);
526 return finish_auth();
527 });
528 default: {
529 unexpected_tag(ret.tag, conn, "handle_auth_reply");
530 return seastar::now();
531 }
532 }
533 });
534 }
535
536 seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
537 {
538 // send_auth_request() logic
539 ceph_assert(messenger.get_auth_client());
540
541 try {
542 auto [auth_method, preferred_modes, bl] =
543 messenger.get_auth_client()->get_auth_request(conn, *auth_meta);
544 auth_meta->auth_method = auth_method;
545 auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
546 logger().debug("{} WRITE AuthRequestFrame: method={},"
547 " preferred_modes={}, payload_len={}",
548 conn, auth_method, preferred_modes, bl.length());
549 return frame_assembler->write_flush_frame(frame
550 ).then([this] {
551 return handle_auth_reply();
552 });
553 } catch (const crimson::auth::error& e) {
554 logger().error("{} get_initial_auth_request returned {}", conn, e.what());
555 ABORT_IN_CLOSE(true);
556 return seastar::now();
557 }
558 }
559
560 seastar::future<ProtocolV2::next_step_t>
561 ProtocolV2::process_wait()
562 {
563 return frame_assembler->read_frame_payload(
564 ).then([this](auto payload) {
565 // handle_wait() logic
566 logger().debug("{} GOT WaitFrame", conn);
567 WaitFrame::Decode(payload->back());
568 return next_step_t::wait;
569 });
570 }
571
572 seastar::future<ProtocolV2::next_step_t>
573 ProtocolV2::client_connect()
574 {
575 // send_client_ident() logic
576 uint64_t flags = 0;
577 if (conn.policy.lossy) {
578 flags |= CEPH_MSG_CONNECT_LOSSY;
579 }
580
581 auto client_ident = ClientIdentFrame::Encode(
582 messenger.get_myaddrs(),
583 conn.target_addr,
584 messenger.get_myname().num(),
585 global_seq,
586 conn.policy.features_supported,
587 conn.policy.features_required | msgr2_required, flags,
588 client_cookie);
589
590 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
591 " gs={}, features_supported={}, features_required={},"
592 " flags={}, cookie={}",
593 conn, messenger.get_myaddrs(), conn.target_addr,
594 messenger.get_myname().num(), global_seq,
595 conn.policy.features_supported,
596 conn.policy.features_required | msgr2_required,
597 flags, client_cookie);
598 return frame_assembler->write_flush_frame(client_ident
599 ).then([this] {
600 return frame_assembler->read_main_preamble();
601 }).then([this](auto ret) {
602 switch (ret.tag) {
603 case Tag::IDENT_MISSING_FEATURES:
604 return frame_assembler->read_frame_payload(
605 ).then([this](auto payload) {
606 // handle_ident_missing_features() logic
607 auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
608 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
609 " (client does not support all server features)",
610 conn, ident_missing.features());
611 abort_in_fault();
612 return next_step_t::none;
613 });
614 case Tag::WAIT:
615 return process_wait();
616 case Tag::SERVER_IDENT:
617 return frame_assembler->read_frame_payload(
618 ).then([this](auto payload) {
619 // handle_server_ident() logic
620 io_handler.requeue_out_sent();
621 auto server_ident = ServerIdentFrame::Decode(payload->back());
622 logger().debug("{} GOT ServerIdentFrame:"
623 " addrs={}, gid={}, gs={},"
624 " features_supported={}, features_required={},"
625 " flags={}, cookie={}",
626 conn,
627 server_ident.addrs(), server_ident.gid(),
628 server_ident.global_seq(),
629 server_ident.supported_features(),
630 server_ident.required_features(),
631 server_ident.flags(), server_ident.cookie());
632
633 // is this who we intended to talk to?
634 // be a bit forgiving here, since we may be connecting based on addresses parsed out
635 // of mon_host or something.
636 if (!server_ident.addrs().contains(conn.target_addr)) {
637 logger().warn("{} peer identifies as {}, does not include {}",
638 conn, server_ident.addrs(), conn.target_addr);
639 throw std::system_error(
640 make_error_code(crimson::net::error::bad_peer_address));
641 }
642
643 server_cookie = server_ident.cookie();
644
645 // TODO: change peer_addr to entity_addrvec_t
646 if (server_ident.addrs().front() != conn.peer_addr) {
647 logger().warn("{} peer advertises as {}, does not match {}",
648 conn, server_ident.addrs(), conn.peer_addr);
649 throw std::system_error(
650 make_error_code(crimson::net::error::bad_peer_address));
651 }
652 if (conn.get_peer_id() != entity_name_t::NEW &&
653 conn.get_peer_id() != server_ident.gid()) {
654 logger().error("{} connection peer id ({}) does not match "
655 "what it should be ({}) during connecting, close",
656 conn, server_ident.gid(), conn.get_peer_id());
657 ABORT_IN_CLOSE(true);
658 }
659 conn.set_peer_id(server_ident.gid());
660 conn.set_features(server_ident.supported_features() &
661 conn.policy.features_supported);
662 logger().debug("{} UPDATE: features={}", conn, conn.get_features());
663 peer_global_seq = server_ident.global_seq();
664
665 bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
666 if (lossy != conn.policy.lossy) {
667 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
668 conn.policy.lossy = lossy;
669 }
670 if (lossy && (connect_seq != 0 || server_cookie != 0)) {
671 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
672 conn, connect_seq, server_cookie);
673 connect_seq = 0;
674 server_cookie = 0;
675 }
676
677 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
678 });
679 default: {
680 unexpected_tag(ret.tag, conn, "post_client_connect");
681 return seastar::make_ready_future<next_step_t>(next_step_t::none);
682 }
683 }
684 });
685 }
686
687 seastar::future<ProtocolV2::next_step_t>
688 ProtocolV2::client_reconnect()
689 {
690 // send_reconnect() logic
691 auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
692 client_cookie,
693 server_cookie,
694 global_seq,
695 connect_seq,
696 io_handler.get_in_seq());
697 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
698 " server_cookie={}, gs={}, cs={}, in_seq={}",
699 conn, messenger.get_myaddrs(),
700 client_cookie, server_cookie,
701 global_seq, connect_seq, io_handler.get_in_seq());
702 return frame_assembler->write_flush_frame(reconnect).then([this] {
703 return frame_assembler->read_main_preamble();
704 }).then([this](auto ret) {
705 switch (ret.tag) {
706 case Tag::SESSION_RETRY_GLOBAL:
707 return frame_assembler->read_frame_payload(
708 ).then([this](auto payload) {
709 // handle_session_retry_global() logic
710 auto retry = RetryGlobalFrame::Decode(payload->back());
711 logger().warn("{} GOT RetryGlobalFrame: gs={}",
712 conn, retry.global_seq());
713 global_seq = messenger.get_global_seq(retry.global_seq());
714 logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
715 return client_reconnect();
716 });
717 case Tag::SESSION_RETRY:
718 return frame_assembler->read_frame_payload(
719 ).then([this](auto payload) {
720 // handle_session_retry() logic
721 auto retry = RetryFrame::Decode(payload->back());
722 logger().warn("{} GOT RetryFrame: cs={}",
723 conn, retry.connect_seq());
724 connect_seq = retry.connect_seq() + 1;
725 logger().warn("{} UPDATE: cs={}", conn, connect_seq);
726 return client_reconnect();
727 });
728 case Tag::SESSION_RESET:
729 return frame_assembler->read_frame_payload(
730 ).then([this](auto payload) {
731 if (unlikely(state != state_t::CONNECTING)) {
732 logger().debug("{} triggered {} before reset_session()",
733 conn, get_state_name(state));
734 abort_protocol();
735 }
736 // handle_session_reset() logic
737 auto reset = ResetFrame::Decode(payload->back());
738 logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
739 reset_session(reset.full());
740 return client_connect();
741 });
742 case Tag::WAIT:
743 return process_wait();
744 case Tag::SESSION_RECONNECT_OK:
745 return frame_assembler->read_frame_payload(
746 ).then([this](auto payload) {
747 // handle_reconnect_ok() logic
748 auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
749 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
750 conn, reconnect_ok.msg_seq());
751 io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
752 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
753 });
754 default: {
755 unexpected_tag(ret.tag, conn, "post_client_reconnect");
756 return seastar::make_ready_future<next_step_t>(next_step_t::none);
757 }
758 }
759 });
760 }
761
762 void ProtocolV2::execute_connecting()
763 {
764 ceph_assert_always(!is_socket_valid);
765 trigger_state(state_t::CONNECTING, io_state_t::delay, false);
766 gated_execute("execute_connecting", conn, [this] {
767 global_seq = messenger.get_global_seq();
768 assert(client_cookie != 0);
769 if (!conn.policy.lossy && server_cookie != 0) {
770 ++connect_seq;
771 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
772 conn, global_seq, connect_seq);
773 } else { // conn.policy.lossy || server_cookie == 0
774 assert(connect_seq == 0);
775 assert(server_cookie == 0);
776 logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
777 }
778 return wait_exit_io().then([this] {
779 #ifdef UNIT_TESTS_BUILT
780 // process custom_bp_t::SOCKET_CONNECTING
781 // supports CONTINUE/FAULT/BLOCK
782 if (conn.interceptor) {
783 auto action = conn.interceptor->intercept(
784 conn, {custom_bp_t::SOCKET_CONNECTING});
785 switch (action) {
786 case bp_action_t::CONTINUE:
787 return seastar::now();
788 case bp_action_t::FAULT:
789 logger().info("[Test] got FAULT");
790 abort_in_fault();
791 case bp_action_t::BLOCK:
792 logger().info("[Test] got BLOCK");
793 return conn.interceptor->blocker.block();
794 default:
795 ceph_abort("unexpected action from trap");
796 }
797 } else {
798 return seastar::now();
799 }
800 }).then([this] {
801 #endif
802 ceph_assert_always(frame_assembler);
803 if (unlikely(state != state_t::CONNECTING)) {
804 logger().debug("{} triggered {} before Socket::connect()",
805 conn, get_state_name(state));
806 abort_protocol();
807 }
808 return Socket::connect(conn.peer_addr);
809 }).then([this](SocketRef new_socket) {
810 logger().debug("{} socket connected", conn);
811 if (unlikely(state != state_t::CONNECTING)) {
812 logger().debug("{} triggered {} during Socket::connect()",
813 conn, get_state_name(state));
814 return new_socket->close().then([sock=std::move(new_socket)] {
815 abort_protocol();
816 });
817 }
818 if (!has_socket) {
819 frame_assembler->set_socket(std::move(new_socket));
820 has_socket = true;
821 } else {
822 gate.dispatch_in_background(
823 "replace_socket_connecting",
824 conn,
825 [this, new_socket=std::move(new_socket)]() mutable {
826 return frame_assembler->replace_shutdown_socket(std::move(new_socket));
827 }
828 );
829 }
830 is_socket_valid = true;
831 return seastar::now();
832 }).then([this] {
833 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
834 frame_assembler->reset_handlers();
835 frame_assembler->start_recording();
836 return banner_exchange(true);
837 }).then([this] (auto&& ret) {
838 auto [_peer_type, _my_addr_from_peer] = std::move(ret);
839 if (conn.get_peer_type() != _peer_type) {
840 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
841 conn, ceph_entity_type_name(conn.get_peer_type()),
842 ceph_entity_type_name(_peer_type));
843 ABORT_IN_CLOSE(true);
844 }
845 if (unlikely(state != state_t::CONNECTING)) {
846 logger().debug("{} triggered {} during banner_exchange(), abort",
847 conn, get_state_name(state));
848 abort_protocol();
849 }
850 frame_assembler->learn_socket_ephemeral_port_as_connector(
851 _my_addr_from_peer.get_port());
852 if (unlikely(_my_addr_from_peer.is_legacy())) {
853 logger().warn("{} peer sent a legacy address for me: {}",
854 conn, _my_addr_from_peer);
855 throw std::system_error(
856 make_error_code(crimson::net::error::bad_peer_address));
857 }
858 _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
859 messenger.learned_addr(_my_addr_from_peer, conn);
860 return client_auth();
861 }).then([this] {
862 if (server_cookie == 0) {
863 ceph_assert(connect_seq == 0);
864 return client_connect();
865 } else {
866 ceph_assert(connect_seq > 0);
867 return client_reconnect();
868 }
869 }).then([this] (next_step_t next) {
870 if (unlikely(state != state_t::CONNECTING)) {
871 logger().debug("{} triggered {} at the end of execute_connecting()",
872 conn, get_state_name(state));
873 abort_protocol();
874 }
875 switch (next) {
876 case next_step_t::ready: {
877 logger().info("{} connected: gs={}, pgs={}, cs={}, "
878 "client_cookie={}, server_cookie={}, {}",
879 conn, global_seq, peer_global_seq, connect_seq,
880 client_cookie, server_cookie,
881 io_stat_printer{io_handler});
882 io_handler.dispatch_connect();
883 if (unlikely(state != state_t::CONNECTING)) {
884 logger().debug("{} triggered {} after ms_handle_connect(), abort",
885 conn, get_state_name(state));
886 abort_protocol();
887 }
888 execute_ready();
889 break;
890 }
891 case next_step_t::wait: {
892 logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
893 ceph_assert_always(is_socket_valid);
894 frame_assembler->shutdown_socket();
895 is_socket_valid = false;
896 execute_wait(true);
897 break;
898 }
899 default: {
900 ceph_abort("impossible next step");
901 }
902 }
903 }).handle_exception([this](std::exception_ptr eptr) {
904 fault(state_t::CONNECTING, "execute_connecting", eptr);
905 });
906 });
907 }
908
909 // ACCEPTING state
910
911 seastar::future<> ProtocolV2::_auth_bad_method(int r)
912 {
913 // _auth_bad_method() logic
914 ceph_assert(r < 0);
915 auto [allowed_methods, allowed_modes] =
916 messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
917 auto bad_method = AuthBadMethodFrame::Encode(
918 auth_meta->auth_method, r, allowed_methods, allowed_modes);
919 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
920 "allowed_methods={}, allowed_modes={})",
921 conn, auth_meta->auth_method, cpp_strerror(r),
922 allowed_methods, allowed_modes);
923 return frame_assembler->write_flush_frame(bad_method
924 ).then([this] {
925 return server_auth();
926 });
927 }
928
929 seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
930 {
931 // _handle_auth_request() logic
932 ceph_assert(messenger.get_auth_server());
933 bufferlist reply;
934 int r = messenger.get_auth_server()->handle_auth_request(
935 conn,
936 *auth_meta,
937 more,
938 auth_meta->auth_method,
939 auth_payload,
940 &conn.peer_global_id,
941 &reply);
942 switch (r) {
943 // successful
944 case 1: {
945 auto auth_done = AuthDoneFrame::Encode(
946 conn.peer_global_id, auth_meta->con_mode, reply);
947 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
948 conn, conn.peer_global_id,
949 ceph_con_mode_name(auth_meta->con_mode), reply.length());
950 return frame_assembler->write_flush_frame(auth_done
951 ).then([this] {
952 ceph_assert(auth_meta);
953 frame_assembler->create_session_stream_handlers(*auth_meta, true);
954 return finish_auth();
955 });
956 }
957 // auth more
958 case 0: {
959 auto more = AuthReplyMoreFrame::Encode(reply);
960 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
961 conn, reply.length());
962 return frame_assembler->write_flush_frame(more
963 ).then([this] {
964 return frame_assembler->read_main_preamble();
965 }).then([this](auto ret) {
966 expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, "read_auth_request_more");
967 return frame_assembler->read_frame_payload();
968 }).then([this](auto payload) {
969 auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
970 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
971 conn, auth_more.auth_payload().length());
972 return _handle_auth_request(auth_more.auth_payload(), true);
973 });
974 }
975 case -EBUSY: {
976 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
977 abort_in_fault();
978 return seastar::now();
979 }
980 default: {
981 logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
982 return _auth_bad_method(r);
983 }
984 }
985 }
986
987 seastar::future<> ProtocolV2::server_auth()
988 {
989 return frame_assembler->read_main_preamble(
990 ).then([this](auto ret) {
991 expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, "read_auth_request");
992 return frame_assembler->read_frame_payload();
993 }).then([this](auto payload) {
994 // handle_auth_request() logic
995 auto request = AuthRequestFrame::Decode(payload->back());
996 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
997 " payload_len={}",
998 conn, request.method(), request.preferred_modes(),
999 request.auth_payload().length());
1000 auth_meta->auth_method = request.method();
1001 auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
1002 conn.get_peer_type(), auth_meta->auth_method,
1003 request.preferred_modes());
1004 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
1005 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
1006 return _auth_bad_method(-EOPNOTSUPP);
1007 }
1008 return _handle_auth_request(request.auth_payload(), false);
1009 });
1010 }
1011
1012 bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
1013 {
1014 auto my_peer_name = conn.get_peer_name();
1015 if (my_peer_name.type() != peer_name.type()) {
1016 return false;
1017 }
1018 if (my_peer_name.num() != entity_name_t::NEW &&
1019 peer_name.num() != entity_name_t::NEW &&
1020 my_peer_name.num() != peer_name.num()) {
1021 return false;
1022 }
1023 return true;
1024 }
1025
1026 seastar::future<ProtocolV2::next_step_t>
1027 ProtocolV2::send_wait()
1028 {
1029 auto wait = WaitFrame::Encode();
1030 logger().debug("{} WRITE WaitFrame", conn);
1031 return frame_assembler->write_flush_frame(wait
1032 ).then([] {
1033 return next_step_t::wait;
1034 });
1035 }
1036
1037 seastar::future<ProtocolV2::next_step_t>
1038 ProtocolV2::reuse_connection(
1039 ProtocolV2* existing_proto, bool do_reset,
1040 bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
1041 {
1042 if (unlikely(state != state_t::ACCEPTING)) {
1043 logger().debug("{} triggered {} before trigger_replacing()",
1044 conn, get_state_name(state));
1045 abort_protocol();
1046 }
1047
1048 existing_proto->trigger_replacing(reconnect,
1049 do_reset,
1050 frame_assembler->to_replace(),
1051 std::move(auth_meta),
1052 peer_global_seq,
1053 client_cookie,
1054 conn.get_peer_name(),
1055 conn.get_features(),
1056 peer_supported_features,
1057 conn_seq,
1058 msg_seq);
1059 ceph_assert_always(has_socket && is_socket_valid);
1060 is_socket_valid = false;
1061 has_socket = false;
1062 #ifdef UNIT_TESTS_BUILT
1063 if (conn.interceptor) {
1064 conn.interceptor->register_conn_replaced(conn);
1065 }
1066 #endif
1067 // close this connection because all the necessary information is delivered
1068 // to the exisiting connection, and jump to error handling code to abort the
1069 // current state.
1070 ABORT_IN_CLOSE(false);
1071 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1072 }
1073
1074 seastar::future<ProtocolV2::next_step_t>
1075 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
1076 {
1077 // handle_existing_connection() logic
1078 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1079 existing_conn->protocol.get());
1080 ceph_assert(existing_proto);
1081 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1082 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1083 conn, global_seq, peer_global_seq, connect_seq,
1084 client_cookie, server_cookie,
1085 fmt::ptr(existing_conn.get()), get_state_name(existing_proto->state),
1086 existing_proto->global_seq,
1087 existing_proto->peer_global_seq,
1088 existing_proto->connect_seq,
1089 existing_proto->client_cookie,
1090 existing_proto->server_cookie);
1091
1092 if (!validate_peer_name(existing_conn->get_peer_name())) {
1093 logger().error("{} server_connect: my peer_name doesn't match"
1094 " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
1095 abort_in_fault();
1096 }
1097
1098 if (existing_proto->state == state_t::REPLACING) {
1099 logger().warn("{} server_connect: racing replace happened while"
1100 " replacing existing connection {}, send wait.",
1101 conn, *existing_conn);
1102 return send_wait();
1103 }
1104
1105 if (existing_proto->peer_global_seq > peer_global_seq) {
1106 logger().warn("{} server_connect:"
1107 " this is a stale connection, because peer_global_seq({})"
1108 " < existing->peer_global_seq({}), close this connection"
1109 " in favor of existing connection {}",
1110 conn, peer_global_seq,
1111 existing_proto->peer_global_seq, *existing_conn);
1112 abort_in_fault();
1113 }
1114
1115 if (existing_conn->policy.lossy) {
1116 // existing connection can be thrown out in favor of this one
1117 logger().warn("{} server_connect:"
1118 " existing connection {} is a lossy channel. Close existing in favor of"
1119 " this connection", conn, *existing_conn);
1120 if (unlikely(state != state_t::ACCEPTING)) {
1121 logger().debug("{} triggered {} before execute_establishing()",
1122 conn, get_state_name(state));
1123 abort_protocol();
1124 }
1125 execute_establishing(existing_conn);
1126 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1127 }
1128
1129 if (existing_proto->server_cookie != 0) {
1130 if (existing_proto->client_cookie != client_cookie) {
1131 // Found previous session
1132 // peer has reset and we're going to reuse the existing connection
1133 // by replacing the socket
1134 logger().warn("{} server_connect:"
1135 " found new session (cs={})"
1136 " when existing {} {} is with stale session (cs={}, ss={}),"
1137 " peer must have reset",
1138 conn,
1139 client_cookie,
1140 get_state_name(existing_proto->state),
1141 *existing_conn,
1142 existing_proto->client_cookie,
1143 existing_proto->server_cookie);
1144 return reuse_connection(existing_proto, conn.policy.resetcheck);
1145 } else {
1146 // session establishment interrupted between client_ident and server_ident,
1147 // continuing...
1148 logger().warn("{} server_connect: found client session with existing {} {}"
1149 " matched (cs={}, ss={}), continuing session establishment",
1150 conn,
1151 get_state_name(existing_proto->state),
1152 *existing_conn,
1153 client_cookie,
1154 existing_proto->server_cookie);
1155 return reuse_connection(existing_proto);
1156 }
1157 } else {
1158 // Looks like a connection race: server and client are both connecting to
1159 // each other at the same time.
1160 if (existing_proto->client_cookie != client_cookie) {
1161 if (existing_conn->peer_wins()) {
1162 // acceptor (this connection, the peer) wins
1163 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1164 " and win, reusing existing {} {}",
1165 conn,
1166 client_cookie,
1167 existing_proto->client_cookie,
1168 get_state_name(existing_proto->state),
1169 *existing_conn);
1170 return reuse_connection(existing_proto);
1171 } else {
1172 // acceptor (this connection, the peer) loses
1173 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1174 " and lose to existing {}, ask client to wait",
1175 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1176 return existing_conn->send_keepalive().then([this] {
1177 return send_wait();
1178 });
1179 }
1180 } else {
1181 logger().warn("{} server_connect: found client session with existing {} {}"
1182 " matched (cs={}, ss={}), continuing session establishment",
1183 conn,
1184 get_state_name(existing_proto->state),
1185 *existing_conn,
1186 client_cookie,
1187 existing_proto->server_cookie);
1188 return reuse_connection(existing_proto);
1189 }
1190 }
1191 }
1192
1193 seastar::future<ProtocolV2::next_step_t>
1194 ProtocolV2::server_connect()
1195 {
1196 return frame_assembler->read_frame_payload(
1197 ).then([this](auto payload) {
1198 // handle_client_ident() logic
1199 auto client_ident = ClientIdentFrame::Decode(payload->back());
1200 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1201 " gid={}, gs={}, features_supported={},"
1202 " features_required={}, flags={}, cookie={}",
1203 conn, client_ident.addrs(), client_ident.target_addr(),
1204 client_ident.gid(), client_ident.global_seq(),
1205 client_ident.supported_features(),
1206 client_ident.required_features(),
1207 client_ident.flags(), client_ident.cookie());
1208
1209 if (client_ident.addrs().empty() ||
1210 client_ident.addrs().front() == entity_addr_t()) {
1211 logger().warn("{} oops, client_ident.addrs() is empty", conn);
1212 throw std::system_error(
1213 make_error_code(crimson::net::error::bad_peer_address));
1214 }
1215 if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
1216 logger().warn("{} peer is trying to reach {} which is not us ({})",
1217 conn, client_ident.target_addr(), messenger.get_myaddrs());
1218 throw std::system_error(
1219 make_error_code(crimson::net::error::bad_peer_address));
1220 }
1221 conn.peer_addr = client_ident.addrs().front();
1222 logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
1223 conn.target_addr = conn.peer_addr;
1224 if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
1225 logger().warn("{} we don't know how to reconnect to peer {}",
1226 conn, conn.target_addr);
1227 throw std::system_error(
1228 make_error_code(crimson::net::error::bad_peer_address));
1229 }
1230
1231 if (conn.get_peer_id() != entity_name_t::NEW &&
1232 conn.get_peer_id() != client_ident.gid()) {
1233 logger().error("{} client_ident peer_id ({}) does not match"
1234 " what it should be ({}) during accepting, abort",
1235 conn, client_ident.gid(), conn.get_peer_id());
1236 abort_in_fault();
1237 }
1238 conn.set_peer_id(client_ident.gid());
1239 client_cookie = client_ident.cookie();
1240
1241 uint64_t feat_missing =
1242 (conn.policy.features_required | msgr2_required) &
1243 ~(uint64_t)client_ident.supported_features();
1244 if (feat_missing) {
1245 auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
1246 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1247 conn, feat_missing);
1248 return frame_assembler->write_flush_frame(ident_missing_features
1249 ).then([] {
1250 return next_step_t::wait;
1251 });
1252 }
1253 conn.set_features(client_ident.supported_features() &
1254 conn.policy.features_supported);
1255 logger().debug("{} UPDATE: features={}", conn, conn.get_features());
1256
1257 peer_global_seq = client_ident.global_seq();
1258
1259 bool lossy = client_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
1260 if (lossy != conn.policy.lossy) {
1261 logger().warn("{} my lossy policy {} doesn't match client {}, ignore",
1262 conn, conn.policy.lossy, lossy);
1263 }
1264
1265 // Looks good so far, let's check if there is already an existing connection
1266 // to this peer.
1267
1268 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1269
1270 if (existing_conn) {
1271 return handle_existing_connection(existing_conn);
1272 } else {
1273 if (unlikely(state != state_t::ACCEPTING)) {
1274 logger().debug("{} triggered {} before execute_establishing()",
1275 conn, get_state_name(state));
1276 abort_protocol();
1277 }
1278 execute_establishing(nullptr);
1279 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1280 }
1281 });
1282 }
1283
1284 seastar::future<ProtocolV2::next_step_t>
1285 ProtocolV2::read_reconnect()
1286 {
1287 return frame_assembler->read_main_preamble(
1288 ).then([this](auto ret) {
1289 expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_session_reconnect");
1290 return server_reconnect();
1291 });
1292 }
1293
1294 seastar::future<ProtocolV2::next_step_t>
1295 ProtocolV2::send_retry(uint64_t connect_seq)
1296 {
1297 auto retry = RetryFrame::Encode(connect_seq);
1298 logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
1299 return frame_assembler->write_flush_frame(retry
1300 ).then([this] {
1301 return read_reconnect();
1302 });
1303 }
1304
1305 seastar::future<ProtocolV2::next_step_t>
1306 ProtocolV2::send_retry_global(uint64_t global_seq)
1307 {
1308 auto retry = RetryGlobalFrame::Encode(global_seq);
1309 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
1310 return frame_assembler->write_flush_frame(retry
1311 ).then([this] {
1312 return read_reconnect();
1313 });
1314 }
1315
1316 seastar::future<ProtocolV2::next_step_t>
1317 ProtocolV2::send_reset(bool full)
1318 {
1319 auto reset = ResetFrame::Encode(full);
1320 logger().warn("{} WRITE ResetFrame: full={}", conn, full);
1321 return frame_assembler->write_flush_frame(reset
1322 ).then([this] {
1323 return frame_assembler->read_main_preamble();
1324 }).then([this](auto ret) {
1325 expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
1326 return server_connect();
1327 });
1328 }
1329
1330 seastar::future<ProtocolV2::next_step_t>
1331 ProtocolV2::server_reconnect()
1332 {
1333 return frame_assembler->read_frame_payload(
1334 ).then([this](auto payload) {
1335 // handle_reconnect() logic
1336 auto reconnect = ReconnectFrame::Decode(payload->back());
1337
1338 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1339 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1340 conn, reconnect.addrs(),
1341 reconnect.client_cookie(), reconnect.server_cookie(),
1342 reconnect.global_seq(), reconnect.connect_seq(),
1343 reconnect.msg_seq());
1344
1345 // can peer_addrs be changed on-the-fly?
1346 // TODO: change peer_addr to entity_addrvec_t
1347 entity_addr_t paddr = reconnect.addrs().front();
1348 if (paddr.is_msgr2() || paddr.is_any()) {
1349 // good
1350 } else {
1351 logger().warn("{} peer's address {} is not v2", conn, paddr);
1352 throw std::system_error(
1353 make_error_code(crimson::net::error::bad_peer_address));
1354 }
1355 if (conn.peer_addr == entity_addr_t()) {
1356 conn.peer_addr = paddr;
1357 } else if (conn.peer_addr != paddr) {
1358 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1359 " reconnect failed",
1360 conn, paddr, conn.peer_addr);
1361 throw std::system_error(
1362 make_error_code(crimson::net::error::bad_peer_address));
1363 }
1364 peer_global_seq = reconnect.global_seq();
1365
1366 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1367
1368 if (!existing_conn) {
1369 // there is no existing connection therefore cannot reconnect to previous
1370 // session
1371 logger().warn("{} server_reconnect: no existing connection from address {},"
1372 " reseting client", conn, conn.peer_addr);
1373 return send_reset(true);
1374 }
1375
1376 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1377 existing_conn->protocol.get());
1378 ceph_assert(existing_proto);
1379 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1380 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1381 conn, global_seq, peer_global_seq, reconnect.connect_seq(),
1382 reconnect.client_cookie(), reconnect.server_cookie(),
1383 fmt::ptr(existing_conn.get()),
1384 get_state_name(existing_proto->state),
1385 existing_proto->global_seq,
1386 existing_proto->peer_global_seq,
1387 existing_proto->connect_seq,
1388 existing_proto->client_cookie,
1389 existing_proto->server_cookie);
1390
1391 if (!validate_peer_name(existing_conn->get_peer_name())) {
1392 logger().error("{} server_reconnect: my peer_name doesn't match"
1393 " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
1394 abort_in_fault();
1395 }
1396
1397 if (existing_proto->state == state_t::REPLACING) {
1398 logger().warn("{} server_reconnect: racing replace happened while "
1399 " replacing existing connection {}, retry global.",
1400 conn, *existing_conn);
1401 return send_retry_global(existing_proto->peer_global_seq);
1402 }
1403
1404 if (existing_proto->client_cookie != reconnect.client_cookie()) {
1405 logger().warn("{} server_reconnect:"
1406 " client_cookie mismatch with existing connection {},"
1407 " cc={} rcc={}. I must have reset, reseting client.",
1408 conn, *existing_conn,
1409 existing_proto->client_cookie, reconnect.client_cookie());
1410 return send_reset(conn.policy.resetcheck);
1411 } else if (existing_proto->server_cookie == 0) {
1412 // this happens when:
1413 // - a connects to b
1414 // - a sends client_ident
1415 // - b gets client_ident, sends server_ident and sets cookie X
1416 // - connection fault
1417 // - b reconnects to a with cookie X, connect_seq=1
1418 // - a has cookie==0
1419 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1420 " server_ident with existing connection {}."
1421 " Asking peer to resume session establishment",
1422 conn, existing_proto->client_cookie, *existing_conn);
1423 return send_reset(false);
1424 }
1425
1426 if (existing_proto->peer_global_seq > reconnect.global_seq()) {
1427 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1428 " with existing connection {},"
1429 " ask client to retry global",
1430 conn, existing_proto->peer_global_seq,
1431 reconnect.global_seq(), *existing_conn);
1432 return send_retry_global(existing_proto->peer_global_seq);
1433 }
1434
1435 if (existing_proto->connect_seq > reconnect.connect_seq()) {
1436 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1437 " with existing connection {}, ask client to retry",
1438 conn, reconnect.connect_seq(),
1439 existing_proto->connect_seq, *existing_conn);
1440 return send_retry(existing_proto->connect_seq);
1441 } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
1442 // reconnect race: both peers are sending reconnect messages
1443 if (existing_conn->peer_wins()) {
1444 // acceptor (this connection, the peer) wins
1445 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1446 " and win, reusing existing {} {}",
1447 conn,
1448 reconnect.connect_seq(),
1449 get_state_name(existing_proto->state),
1450 *existing_conn);
1451 return reuse_connection(
1452 existing_proto, false,
1453 true, reconnect.connect_seq(), reconnect.msg_seq());
1454 } else {
1455 // acceptor (this connection, the peer) loses
1456 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1457 " and lose to existing {}, ask client to wait",
1458 conn, reconnect.connect_seq(), *existing_conn);
1459 return send_wait();
1460 }
1461 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1462 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1463 " reusing existing {} {}",
1464 conn,
1465 existing_proto->connect_seq,
1466 reconnect.connect_seq(),
1467 get_state_name(existing_proto->state),
1468 *existing_conn);
1469 return reuse_connection(
1470 existing_proto, false,
1471 true, reconnect.connect_seq(), reconnect.msg_seq());
1472 }
1473 });
1474 }
1475
1476 void ProtocolV2::execute_accepting()
1477 {
1478 assert(is_socket_valid);
1479 trigger_state(state_t::ACCEPTING, io_state_t::none, false);
1480 gate.dispatch_in_background("execute_accepting", conn, [this] {
1481 return seastar::futurize_invoke([this] {
1482 #ifdef UNIT_TESTS_BUILT
1483 if (conn.interceptor) {
1484 auto action = conn.interceptor->intercept(
1485 conn, {custom_bp_t::SOCKET_ACCEPTED});
1486 switch (action) {
1487 case bp_action_t::CONTINUE:
1488 break;
1489 case bp_action_t::FAULT:
1490 logger().info("[Test] got FAULT");
1491 abort_in_fault();
1492 default:
1493 ceph_abort("unexpected action from trap");
1494 }
1495 }
1496 #endif
1497 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
1498 frame_assembler->reset_handlers();
1499 frame_assembler->start_recording();
1500 return banner_exchange(false);
1501 }).then([this] (auto&& ret) {
1502 auto [_peer_type, _my_addr_from_peer] = std::move(ret);
1503 ceph_assert(conn.get_peer_type() == 0);
1504 conn.set_peer_type(_peer_type);
1505
1506 conn.policy = messenger.get_policy(_peer_type);
1507 logger().info("{} UPDATE: peer_type={},"
1508 " policy(lossy={} server={} standby={} resetcheck={})",
1509 conn, ceph_entity_type_name(_peer_type),
1510 conn.policy.lossy, conn.policy.server,
1511 conn.policy.standby, conn.policy.resetcheck);
1512 if (!messenger.get_myaddr().is_blank_ip() &&
1513 (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
1514 messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
1515 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1516 conn, _my_addr_from_peer, messenger.get_myaddr());
1517 throw std::system_error(
1518 make_error_code(crimson::net::error::bad_peer_address));
1519 }
1520 messenger.learned_addr(_my_addr_from_peer, conn);
1521 return server_auth();
1522 }).then([this] {
1523 return frame_assembler->read_main_preamble();
1524 }).then([this](auto ret) {
1525 switch (ret.tag) {
1526 case Tag::CLIENT_IDENT:
1527 return server_connect();
1528 case Tag::SESSION_RECONNECT:
1529 return server_reconnect();
1530 default: {
1531 unexpected_tag(ret.tag, conn, "post_server_auth");
1532 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1533 }
1534 }
1535 }).then([this] (next_step_t next) {
1536 switch (next) {
1537 case next_step_t::ready:
1538 assert(state != state_t::ACCEPTING);
1539 break;
1540 case next_step_t::wait:
1541 if (unlikely(state != state_t::ACCEPTING)) {
1542 logger().debug("{} triggered {} at the end of execute_accepting()",
1543 conn, get_state_name(state));
1544 abort_protocol();
1545 }
1546 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
1547 execute_server_wait();
1548 break;
1549 default:
1550 ceph_abort("impossible next step");
1551 }
1552 }).handle_exception([this](std::exception_ptr eptr) {
1553 const char *e_what;
1554 try {
1555 std::rethrow_exception(eptr);
1556 } catch (std::exception &e) {
1557 e_what = e.what();
1558 }
1559 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1560 conn, get_state_name(state), e_what);
1561 do_close(false);
1562 });
1563 });
1564 }
1565
1566 // CONNECTING or ACCEPTING state
1567
1568 seastar::future<> ProtocolV2::finish_auth()
1569 {
1570 ceph_assert(auth_meta);
1571
1572 auto records = frame_assembler->stop_recording();
1573 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1574 auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
1575 auto sig_frame = AuthSignatureFrame::Encode(sig);
1576 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
1577 return frame_assembler->write_flush_frame(sig_frame
1578 ).then([this] {
1579 return frame_assembler->read_main_preamble();
1580 }).then([this](auto ret) {
1581 expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
1582 return frame_assembler->read_frame_payload();
1583 }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
1584 // handle_auth_signature() logic
1585 auto sig_frame = AuthSignatureFrame::Decode(payload->back());
1586 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
1587
1588 const auto actual_tx_sig = auth_meta->session_key.empty() ?
1589 sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
1590 if (sig_frame.signature() != actual_tx_sig) {
1591 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1592 " sig_frame.signature()={}",
1593 conn, actual_tx_sig, sig_frame.signature());
1594 abort_in_fault();
1595 }
1596 });
1597 }
1598
1599 // ESTABLISHING
1600
1601 void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
1602 auto accept_me = [this] {
1603 messenger.register_conn(
1604 seastar::static_pointer_cast<SocketConnection>(
1605 conn.shared_from_this()));
1606 messenger.unaccept_conn(
1607 seastar::static_pointer_cast<SocketConnection>(
1608 conn.shared_from_this()));
1609 };
1610
1611 ceph_assert_always(is_socket_valid);
1612 trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
1613 if (existing_conn) {
1614 static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
1615 true /* is_dispatch_reset */, std::move(accept_me));
1616 if (unlikely(state != state_t::ESTABLISHING)) {
1617 logger().warn("{} triggered {} during execute_establishing(), "
1618 "the accept event will not be delivered!",
1619 conn, get_state_name(state));
1620 abort_protocol();
1621 }
1622 } else {
1623 accept_me();
1624 }
1625
1626 io_handler.dispatch_accept();
1627 if (unlikely(state != state_t::ESTABLISHING)) {
1628 logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
1629 conn, get_state_name(state));
1630 abort_protocol();
1631 }
1632
1633 gated_execute("execute_establishing", conn, [this] {
1634 return seastar::futurize_invoke([this] {
1635 return send_server_ident();
1636 }).then([this] {
1637 if (unlikely(state != state_t::ESTABLISHING)) {
1638 logger().debug("{} triggered {} at the end of execute_establishing()",
1639 conn, get_state_name(state));
1640 abort_protocol();
1641 }
1642 logger().info("{} established: gs={}, pgs={}, cs={}, "
1643 "client_cookie={}, server_cookie={}, {}",
1644 conn, global_seq, peer_global_seq, connect_seq,
1645 client_cookie, server_cookie,
1646 io_stat_printer{io_handler});
1647 execute_ready();
1648 }).handle_exception([this](std::exception_ptr eptr) {
1649 fault(state_t::ESTABLISHING, "execute_establishing", eptr);
1650 });
1651 });
1652 }
1653
1654 // ESTABLISHING or REPLACING state
1655
1656 seastar::future<>
1657 ProtocolV2::send_server_ident()
1658 {
1659 // send_server_ident() logic
1660
1661 // refered to async-conn v2: not assign gs to global_seq
1662 global_seq = messenger.get_global_seq();
1663 logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
1664
1665 // this is required for the case when this connection is being replaced
1666 io_handler.requeue_out_sent_up_to(0);
1667 io_handler.reset_session(false);
1668
1669 if (!conn.policy.lossy) {
1670 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1671 }
1672
1673 uint64_t flags = 0;
1674 if (conn.policy.lossy) {
1675 flags = flags | CEPH_MSG_CONNECT_LOSSY;
1676 }
1677
1678 auto server_ident = ServerIdentFrame::Encode(
1679 messenger.get_myaddrs(),
1680 messenger.get_myname().num(),
1681 global_seq,
1682 conn.policy.features_supported,
1683 conn.policy.features_required | msgr2_required,
1684 flags,
1685 server_cookie);
1686
1687 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1688 " gs={}, features_supported={}, features_required={},"
1689 " flags={}, cookie={}",
1690 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
1691 global_seq, conn.policy.features_supported,
1692 conn.policy.features_required | msgr2_required,
1693 flags, server_cookie);
1694
1695 return frame_assembler->write_flush_frame(server_ident);
1696 }
1697
1698 // REPLACING state
1699
1700 void ProtocolV2::trigger_replacing(bool reconnect,
1701 bool do_reset,
1702 FrameAssemblerV2::mover_t &&mover,
1703 AuthConnectionMetaRef&& new_auth_meta,
1704 uint64_t new_peer_global_seq,
1705 uint64_t new_client_cookie,
1706 entity_name_t new_peer_name,
1707 uint64_t new_conn_features,
1708 uint64_t new_peer_supported_features,
1709 uint64_t new_connect_seq,
1710 uint64_t new_msg_seq)
1711 {
1712 ceph_assert_always(has_socket || state == state_t::CONNECTING);
1713 ceph_assert_always(!mover.socket->is_shutdown());
1714 trigger_state(state_t::REPLACING, io_state_t::delay, false);
1715 if (is_socket_valid) {
1716 frame_assembler->shutdown_socket();
1717 is_socket_valid = false;
1718 }
1719 gate.dispatch_in_background(
1720 "trigger_replacing",
1721 conn,
1722 [this,
1723 reconnect,
1724 do_reset,
1725 mover = std::move(mover),
1726 new_auth_meta = std::move(new_auth_meta),
1727 new_client_cookie, new_peer_name,
1728 new_conn_features, new_peer_supported_features,
1729 new_peer_global_seq,
1730 new_connect_seq, new_msg_seq] () mutable {
1731 ceph_assert_always(state == state_t::REPLACING);
1732 io_handler.dispatch_accept();
1733 // state may become CLOSING, close mover.socket and abort later
1734 return wait_exit_io(
1735 ).then([this] {
1736 ceph_assert_always(frame_assembler);
1737 protocol_timer.cancel();
1738 auto done = std::move(execution_done);
1739 execution_done = seastar::now();
1740 return done;
1741 }).then([this,
1742 reconnect,
1743 do_reset,
1744 mover = std::move(mover),
1745 new_auth_meta = std::move(new_auth_meta),
1746 new_client_cookie, new_peer_name,
1747 new_conn_features, new_peer_supported_features,
1748 new_peer_global_seq,
1749 new_connect_seq, new_msg_seq] () mutable {
1750 if (state == state_t::REPLACING && do_reset) {
1751 reset_session(true);
1752 }
1753
1754 if (unlikely(state != state_t::REPLACING)) {
1755 return mover.socket->close(
1756 ).then([sock = std::move(mover.socket)] {
1757 abort_protocol();
1758 });
1759 }
1760
1761 auth_meta = std::move(new_auth_meta);
1762 peer_global_seq = new_peer_global_seq;
1763 gate.dispatch_in_background(
1764 "replace_frame_assembler",
1765 conn,
1766 [this, mover=std::move(mover)]() mutable {
1767 return frame_assembler->replace_by(std::move(mover));
1768 }
1769 );
1770 is_socket_valid = true;
1771 has_socket = true;
1772
1773 if (reconnect) {
1774 connect_seq = new_connect_seq;
1775 // send_reconnect_ok() logic
1776 io_handler.requeue_out_sent_up_to(new_msg_seq);
1777 auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
1778 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
1779 return frame_assembler->write_flush_frame(reconnect_ok);
1780 } else {
1781 client_cookie = new_client_cookie;
1782 assert(conn.get_peer_type() == new_peer_name.type());
1783 if (conn.get_peer_id() == entity_name_t::NEW) {
1784 conn.set_peer_id(new_peer_name.num());
1785 }
1786 conn.set_features(new_conn_features);
1787 peer_supported_features = new_peer_supported_features;
1788 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
1789 frame_assembler->set_is_rev1(is_rev1);
1790 return send_server_ident();
1791 }
1792 }).then([this, reconnect] {
1793 if (unlikely(state != state_t::REPLACING)) {
1794 logger().debug("{} triggered {} at the end of trigger_replacing()",
1795 conn, get_state_name(state));
1796 abort_protocol();
1797 }
1798 logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
1799 "client_cookie={}, server_cookie={}, {}",
1800 conn, reconnect ? "reconnected" : "connected",
1801 global_seq, peer_global_seq, connect_seq,
1802 client_cookie, server_cookie,
1803 io_stat_printer{io_handler});
1804 execute_ready();
1805 }).handle_exception([this](std::exception_ptr eptr) {
1806 fault(state_t::REPLACING, "trigger_replacing", eptr);
1807 });
1808 });
1809 }
1810
1811 // READY state
1812
1813 void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
1814 {
1815 fault(state_t::READY, where, eptr);
1816 }
1817
1818 void ProtocolV2::execute_ready()
1819 {
1820 assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
1821 protocol_timer.cancel();
1822 ceph_assert_always(is_socket_valid);
1823 trigger_state(state_t::READY, io_state_t::open, false);
1824 }
1825
1826 // STANDBY state
1827
1828 void ProtocolV2::execute_standby()
1829 {
1830 ceph_assert_always(!is_socket_valid);
1831 trigger_state(state_t::STANDBY, io_state_t::delay, false);
1832 }
1833
1834 void ProtocolV2::notify_out()
1835 {
1836 if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
1837 logger().info("{} notify_out(): at {}, going to CONNECTING",
1838 conn, get_state_name(state));
1839 execute_connecting();
1840 }
1841 }
1842
1843 // WAIT state
1844
1845 void ProtocolV2::execute_wait(bool max_backoff)
1846 {
1847 ceph_assert_always(!is_socket_valid);
1848 trigger_state(state_t::WAIT, io_state_t::delay, false);
1849 gated_execute("execute_wait", conn, [this, max_backoff] {
1850 double backoff = protocol_timer.last_dur();
1851 if (max_backoff) {
1852 backoff = local_conf().get_val<double>("ms_max_backoff");
1853 } else if (backoff > 0) {
1854 backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
1855 } else {
1856 backoff = local_conf().get_val<double>("ms_initial_backoff");
1857 }
1858 return protocol_timer.backoff(backoff).then([this] {
1859 if (unlikely(state != state_t::WAIT)) {
1860 logger().debug("{} triggered {} at the end of execute_wait()",
1861 conn, get_state_name(state));
1862 abort_protocol();
1863 }
1864 logger().info("{} execute_wait(): going to CONNECTING", conn);
1865 execute_connecting();
1866 }).handle_exception([this](std::exception_ptr eptr) {
1867 const char *e_what;
1868 try {
1869 std::rethrow_exception(eptr);
1870 } catch (std::exception &e) {
1871 e_what = e.what();
1872 }
1873 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
1874 conn, get_state_name(state), e_what);
1875 assert(state == state_t::REPLACING ||
1876 state == state_t::CLOSING);
1877 });
1878 });
1879 }
1880
1881 // SERVER_WAIT state
1882
1883 void ProtocolV2::execute_server_wait()
1884 {
1885 ceph_assert_always(is_socket_valid);
1886 trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
1887 gated_execute("execute_server_wait", conn, [this] {
1888 return frame_assembler->read_exactly(1
1889 ).then([this](auto bl) {
1890 logger().warn("{} SERVER_WAIT got read, abort", conn);
1891 abort_in_fault();
1892 }).handle_exception([this](std::exception_ptr eptr) {
1893 const char *e_what;
1894 try {
1895 std::rethrow_exception(eptr);
1896 } catch (std::exception &e) {
1897 e_what = e.what();
1898 }
1899 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
1900 conn, get_state_name(state), e_what);
1901 do_close(false);
1902 });
1903 });
1904 }
1905
1906 // CLOSING state
1907
1908 void ProtocolV2::notify_mark_down()
1909 {
1910 do_close(false);
1911 }
1912
1913 seastar::future<> ProtocolV2::close_clean_yielded()
1914 {
1915 // yield() so that do_close() can be called *after* close_clean_yielded() is
1916 // applied to all connections in a container using
1917 // seastar::parallel_for_each(). otherwise, we could erase a connection in
1918 // the container when seastar::parallel_for_each() is still iterating in it.
1919 // that'd lead to a segfault.
1920 return seastar::yield(
1921 ).then([this, conn_ref = conn.shared_from_this()] {
1922 do_close(false);
1923 // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
1924 // which will otherwise result in deadlock
1925 assert(closed_clean_fut.valid());
1926 return closed_clean_fut.get_future();
1927 });
1928 }
1929
1930 void ProtocolV2::do_close(
1931 bool is_dispatch_reset,
1932 std::optional<std::function<void()>> f_accept_new)
1933 {
1934 if (closed) {
1935 // already closing
1936 assert(state == state_t::CLOSING);
1937 return;
1938 }
1939
1940 bool is_replace = f_accept_new ? true : false;
1941 logger().info("{} closing: reset {}, replace {}", conn,
1942 is_dispatch_reset ? "yes" : "no",
1943 is_replace ? "yes" : "no");
1944
1945 /*
1946 * atomic operations
1947 */
1948
1949 closed = true;
1950
1951 // trigger close
1952 messenger.closing_conn(
1953 seastar::static_pointer_cast<SocketConnection>(
1954 conn.shared_from_this()));
1955 if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
1956 messenger.unaccept_conn(
1957 seastar::static_pointer_cast<SocketConnection>(
1958 conn.shared_from_this()));
1959 } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
1960 messenger.unregister_conn(
1961 seastar::static_pointer_cast<SocketConnection>(
1962 conn.shared_from_this()));
1963 } else {
1964 // cannot happen
1965 ceph_assert(false);
1966 }
1967 protocol_timer.cancel();
1968 trigger_state(state_t::CLOSING, io_state_t::drop, false);
1969
1970 if (f_accept_new) {
1971 (*f_accept_new)();
1972 }
1973 if (is_socket_valid) {
1974 frame_assembler->shutdown_socket();
1975 is_socket_valid = false;
1976 }
1977 assert(!gate.is_closed());
1978 auto handshake_closed = gate.close();
1979 auto io_closed = io_handler.close_io(
1980 is_dispatch_reset, is_replace);
1981
1982 // asynchronous operations
1983 assert(!closed_clean_fut.valid());
1984 closed_clean_fut = seastar::when_all(
1985 std::move(handshake_closed), std::move(io_closed)
1986 ).discard_result().then([this] {
1987 ceph_assert_always(!exit_io.has_value());
1988 if (has_socket) {
1989 ceph_assert_always(frame_assembler);
1990 return frame_assembler->close_shutdown_socket();
1991 } else {
1992 return seastar::now();
1993 }
1994 }).then([this] {
1995 logger().debug("{} closed!", conn);
1996 messenger.closed_conn(
1997 seastar::static_pointer_cast<SocketConnection>(
1998 conn.shared_from_this()));
1999 #ifdef UNIT_TESTS_BUILT
2000 closed_clean = true;
2001 if (conn.interceptor) {
2002 conn.interceptor->register_conn_closed(conn);
2003 }
2004 #endif
2005 }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
2006 logger().error("{} closing: closed_clean_fut got unexpected exception {}",
2007 conn, eptr);
2008 ceph_abort();
2009 });
2010 }
2011
2012 } // namespace crimson::net