]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/ProtocolV2.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "ProtocolV2.h"
5
9f95a23c 6#include <fmt/format.h>
1e59de90 7#include <fmt/ranges.h>
9f95a23c
TL
8#include "include/msgr.h"
9#include "include/random.h"
1e59de90 10#include "msg/msg_fmt.h"
9f95a23c
TL
11
12#include "crimson/auth/AuthClient.h"
13#include "crimson/auth/AuthServer.h"
f67539c2 14#include "crimson/common/formatter.h"
1e59de90 15#include "crimson/common/log.h"
9f95a23c 16
9f95a23c 17#include "Errors.h"
9f95a23c
TL
18#include "SocketMessenger.h"
19
9f95a23c 20using namespace ceph::msgr::v2;
f67539c2 21using crimson::common::local_conf;
9f95a23c
TL
22
23namespace {
24
20effc67
TL
25// TODO: CEPH_MSGR2_FEATURE_COMPRESSION
26const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
27 (CEPH_MSGR2_FEATURE_REVISION_1 |
28 // CEPH_MSGR2_FEATURE_COMPRESSION |
29 UINT64_C(0));
30
9f95a23c
TL
31// Log levels in V2 Protocol:
32// * error level, something error that cause connection to terminate:
33// - fatal errors;
34// - bugs;
35// * warn level: something unusual that identifies connection fault or replacement:
36// - unstable network;
37// - incompatible peer;
38// - auth failure;
39// - connection race;
40// - connection reset;
41// * info level, something very important to show connection lifecycle,
42// which doesn't happen very frequently;
43// * debug level, important logs for debugging, including:
44// - all the messages sent/received (-->/<==);
45// - all the frames exchanged (WRITE/GOT);
46// - important fields updated (UPDATE);
47// - connection state transitions (TRIGGER);
48// * trace level, trivial logs showing:
49// - the exact bytes being sent/received (SEND/RECV(bytes));
50// - detailed information of sub-frames;
51// - integrity checks;
52// - etc.
53seastar::logger& logger() {
54 return crimson::get_logger(ceph_subsys_ms);
55}
56
f67539c2 57[[noreturn]] void abort_in_fault() {
9f95a23c
TL
58 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
59}
60
f67539c2 61[[noreturn]] void abort_protocol() {
9f95a23c
TL
62 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
63}
64
1e59de90
TL
65#define ABORT_IN_CLOSE(is_dispatch_reset) { \
66 do_close(is_dispatch_reset); \
67 abort_protocol(); \
9f95a23c
TL
68}
69
70inline void expect_tag(const Tag& expected,
71 const Tag& actual,
72 crimson::net::SocketConnection& conn,
73 const char *where) {
74 if (actual != expected) {
75 logger().warn("{} {} received wrong tag: {}, expected {}",
76 conn, where,
77 static_cast<uint32_t>(actual),
78 static_cast<uint32_t>(expected));
79 abort_in_fault();
80 }
81}
82
83inline void unexpected_tag(const Tag& unexpected,
84 crimson::net::SocketConnection& conn,
85 const char *where) {
86 logger().warn("{} {} received unexpected tag: {}",
87 conn, where, static_cast<uint32_t>(unexpected));
88 abort_in_fault();
89}
90
91inline uint64_t generate_client_cookie() {
92 return ceph::util::generate_random_number<uint64_t>(
93 1, std::numeric_limits<uint64_t>::max());
94}
95
96} // namespace anonymous
97
9f95a23c
TL
98namespace crimson::net {
99
9f95a23c
TL
100seastar::future<> ProtocolV2::Timer::backoff(double seconds)
101{
102 logger().warn("{} waiting {} seconds ...", conn, seconds);
103 cancel();
104 last_dur_ = seconds;
105 as = seastar::abort_source();
106 auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
107 std::chrono::duration<double>(seconds));
108 return seastar::sleep_abortable(dur, *as
109 ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
110 logger().debug("{} wait aborted", conn);
111 abort_protocol();
112 });
113}
114
1e59de90
TL
115ProtocolV2::ProtocolV2(SocketConnection& conn,
116 IOHandler &io_handler)
117 : conn{conn},
118 messenger{conn.messenger},
119 io_handler{io_handler},
120 frame_assembler{FrameAssemblerV2::create(conn)},
121 auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
f67539c2 122 protocol_timer{conn}
aee94f69
TL
123{
124 io_states = io_handler.get_states();
125}
9f95a23c
TL
126
127ProtocolV2::~ProtocolV2() {}
128
129void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
f67539c2 130 const entity_name_t& _peer_name)
9f95a23c 131{
aee94f69 132 assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
9f95a23c 133 ceph_assert(state == state_t::NONE);
f67539c2 134 ceph_assert(!gate.is_closed());
9f95a23c
TL
135 conn.peer_addr = _peer_addr;
136 conn.target_addr = _peer_addr;
f67539c2
TL
137 conn.set_peer_name(_peer_name);
138 conn.policy = messenger.get_policy(_peer_name.type());
9f95a23c 139 client_cookie = generate_client_cookie();
f67539c2 140 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
9f95a23c 141 " policy(lossy={}, server={}, standby={}, resetcheck={})",
f67539c2 142 conn, _peer_addr, _peer_name, client_cookie,
9f95a23c
TL
143 conn.policy.lossy, conn.policy.server,
144 conn.policy.standby, conn.policy.resetcheck);
145 messenger.register_conn(
146 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
147 execute_connecting();
148}
149
aee94f69 150void ProtocolV2::start_accept(SocketFRef&& new_socket,
9f95a23c
TL
151 const entity_addr_t& _peer_addr)
152{
aee94f69 153 assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
9f95a23c 154 ceph_assert(state == state_t::NONE);
9f95a23c
TL
155 // until we know better
156 conn.target_addr = _peer_addr;
1e59de90
TL
157 frame_assembler->set_socket(std::move(new_socket));
158 has_socket = true;
159 is_socket_valid = true;
9f95a23c
TL
160 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
161 messenger.accept_conn(
162 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
aee94f69
TL
163
164 auto cc_seq = crosscore.prepare_submit();
165 gate.dispatch_in_background("set_accepted_sid", conn, [this, cc_seq] {
166 return io_handler.set_accepted_sid(
167 cc_seq,
168 frame_assembler->get_socket_shard_id(),
169 seastar::make_foreign(conn.shared_from_this()));
170 });
171
9f95a23c
TL
172 execute_accepting();
173}
174
aee94f69 175void ProtocolV2::trigger_state_phase1(state_t new_state)
9f95a23c 176{
aee94f69
TL
177 ceph_assert_always(!gate.is_closed());
178 if (new_state == state) {
1e59de90
TL
179 logger().error("{} is not allowed to re-trigger state {}",
180 conn, get_state_name(state));
181 ceph_abort();
182 }
183 if (state == state_t::CLOSING) {
184 logger().error("{} CLOSING is not allowed to trigger state {}",
185 conn, get_state_name(new_state));
186 ceph_abort();
187 }
188 logger().debug("{} TRIGGER {}, was {}",
189 conn, get_state_name(new_state), get_state_name(state));
aee94f69
TL
190
191 if (state == state_t::READY) {
192 // from READY
193 ceph_assert_always(!need_exit_io);
194 ceph_assert_always(!pr_exit_io.has_value());
195 need_exit_io = true;
196 pr_exit_io = seastar::shared_promise<>();
1e59de90 197 }
aee94f69
TL
198
199 if (new_state == state_t::STANDBY && !conn.policy.server) {
200 need_notify_out = true;
201 } else {
202 need_notify_out = false;
203 }
204
1e59de90 205 state = new_state;
aee94f69
TL
206}
207
208void ProtocolV2::trigger_state_phase2(
209 state_t new_state, io_state_t new_io_state)
210{
211 ceph_assert_always(new_state == state);
212 ceph_assert_always(!gate.is_closed());
213 ceph_assert_always(!pr_switch_io_shard.has_value());
214
215 FrameAssemblerV2Ref fa;
1e59de90 216 if (new_state == state_t::READY) {
aee94f69
TL
217 assert(new_io_state == io_state_t::open);
218 assert(io_handler.get_shard_id() ==
219 frame_assembler->get_socket_shard_id());
220 frame_assembler->set_shard_id(io_handler.get_shard_id());
221 fa = std::move(frame_assembler);
9f95a23c 222 } else {
aee94f69 223 assert(new_io_state != io_state_t::open);
1e59de90 224 }
9f95a23c 225
aee94f69
TL
226 auto cc_seq = crosscore.prepare_submit();
227 logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
228 "fa={}, set_notify_out={}",
229 conn, cc_seq, get_state_name(new_state), new_io_state,
230 fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
231 need_notify_out);
232 gate.dispatch_in_background(
233 "set_io_state", conn,
234 [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable {
235 return seastar::smp::submit_to(
236 io_handler.get_shard_id(),
237 [this, cc_seq, new_io_state,
238 fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
239 return io_handler.set_io_state(
240 cc_seq, new_io_state, std::move(fa), set_notify_out);
241 });
242 });
1e59de90 243
aee94f69
TL
244 if (need_exit_io) {
245 // from READY
246 auto cc_seq = crosscore.prepare_submit();
247 logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...",
248 conn, cc_seq);
249 assert(pr_exit_io.has_value());
250 assert(new_io_state != io_state_t::open);
251 need_exit_io = false;
252 gate.dispatch_in_background("exit_io", conn, [this, cc_seq] {
253 return seastar::smp::submit_to(
254 io_handler.get_shard_id(), [this, cc_seq] {
255 return io_handler.wait_io_exit_dispatching(cc_seq);
256 }).then([this, cc_seq](auto ret) {
257 logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}",
258 conn, cc_seq, ret.io_states);
259 frame_assembler = std::move(ret.frame_assembler);
260 assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
261 ceph_assert_always(
262 seastar::this_shard_id() == frame_assembler->get_shard_id());
263 ceph_assert_always(!frame_assembler->is_socket_valid());
264 assert(!need_exit_io);
265 io_states = ret.io_states;
266 pr_exit_io->set_value();
267 pr_exit_io = std::nullopt;
1e59de90 268 });
9f95a23c 269 });
9f95a23c
TL
270 }
271}
272
1e59de90
TL
273void ProtocolV2::fault(
274 state_t expected_state,
275 const char *where,
276 std::exception_ptr eptr)
9f95a23c 277{
1e59de90
TL
278 assert(expected_state == state_t::CONNECTING ||
279 expected_state == state_t::ESTABLISHING ||
280 expected_state == state_t::REPLACING ||
281 expected_state == state_t::READY);
282 const char *e_what;
283 try {
284 std::rethrow_exception(eptr);
285 } catch (std::exception &e) {
286 e_what = e.what();
9f95a23c 287 }
9f95a23c 288
1e59de90
TL
289 if (state != expected_state) {
290 logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
291 conn,
292 get_state_name(expected_state),
293 where,
294 get_state_name(state),
295 e_what);
296#ifndef NDEBUG
297 if (expected_state == state_t::REPLACING) {
298 assert(state == state_t::CLOSING);
299 } else if (expected_state == state_t::READY) {
300 assert(state == state_t::CLOSING ||
301 state == state_t::REPLACING ||
302 state == state_t::CONNECTING ||
303 state == state_t::STANDBY);
304 } else {
305 assert(state == state_t::CLOSING ||
306 state == state_t::REPLACING);
307 }
308#endif
309 return;
9f95a23c 310 }
1e59de90
TL
311 assert(state == expected_state);
312
313 if (state != state_t::CONNECTING && conn.policy.lossy) {
314 // socket will be shutdown in do_close()
315 logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
316 conn, get_state_name(state), where, e_what);
317 do_close(true);
318 return;
9f95a23c 319 }
9f95a23c 320
1e59de90
TL
321 if (likely(has_socket)) {
322 if (likely(is_socket_valid)) {
323 ceph_assert_always(state != state_t::READY);
aee94f69 324 frame_assembler->shutdown_socket<true>(&gate);
1e59de90
TL
325 is_socket_valid = false;
326 } else {
327 ceph_assert_always(state != state_t::ESTABLISHING);
9f95a23c 328 }
1e59de90
TL
329 } else { // !has_socket
330 ceph_assert_always(state == state_t::CONNECTING);
331 assert(!is_socket_valid);
9f95a23c 332 }
9f95a23c 333
1e59de90 334 if (conn.policy.server ||
aee94f69 335 (conn.policy.standby && !io_states.is_out_queued_or_sent())) {
1e59de90
TL
336 if (conn.policy.server) {
337 logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
338 conn,
339 get_state_name(state),
340 where,
aee94f69 341 io_states,
1e59de90
TL
342 e_what);
343 } else {
344 logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
345 conn,
346 get_state_name(state),
347 where,
aee94f69 348 io_states,
1e59de90
TL
349 e_what);
350 }
9f95a23c 351 execute_standby();
1e59de90
TL
352 } else if (state == state_t::CONNECTING ||
353 state == state_t::REPLACING) {
354 logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
355 conn,
356 get_state_name(state),
357 where,
aee94f69 358 io_states,
1e59de90 359 e_what);
9f95a23c
TL
360 execute_wait(false);
361 } else {
1e59de90
TL
362 assert(state == state_t::READY ||
363 state == state_t::ESTABLISHING);
364 logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
365 conn,
366 get_state_name(state),
367 where,
aee94f69 368 io_states,
1e59de90 369 e_what);
9f95a23c
TL
370 execute_connecting();
371 }
372}
373
9f95a23c
TL
374void ProtocolV2::reset_session(bool full)
375{
376 server_cookie = 0;
377 connect_seq = 0;
9f95a23c
TL
378 if (full) {
379 client_cookie = generate_client_cookie();
380 peer_global_seq = 0;
9f95a23c 381 }
aee94f69
TL
382
383 auto cc_seq = crosscore.prepare_submit();
384 logger().debug("{} send {} IOHandler::reset_session({})",
385 conn, cc_seq, full);
386 io_states.reset_session(full);
387 gate.dispatch_in_background(
388 "reset_session", conn, [this, cc_seq, full] {
389 return seastar::smp::submit_to(
390 io_handler.get_shard_id(), [this, cc_seq, full] {
391 return io_handler.reset_session(cc_seq, full);
392 });
393 });
394 // user can make changes
9f95a23c
TL
395}
396
f67539c2
TL
397seastar::future<std::tuple<entity_type_t, entity_addr_t>>
398ProtocolV2::banner_exchange(bool is_connect)
9f95a23c
TL
399{
400 // 1. prepare and send banner
401 bufferlist banner_payload;
20effc67 402 encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
9f95a23c
TL
403 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
404
405 bufferlist bl;
406 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
407 auto len_payload = static_cast<uint16_t>(banner_payload.length());
408 encode(len_payload, bl, 0);
409 bl.claim_append(banner_payload);
410 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
411 "required={}, banner=\"{}\"",
412 conn, bl.length(), len_payload,
20effc67
TL
413 CRIMSON_MSGR2_SUPPORTED_FEATURES,
414 CEPH_MSGR2_REQUIRED_FEATURES,
9f95a23c 415 CEPH_BANNER_V2_PREFIX);
aee94f69
TL
416#ifdef UNIT_TESTS_BUILT
417 return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true
418 ).then([this, bl=std::move(bl)]() mutable {
419 return frame_assembler->write_flush(std::move(bl));
420 }
421#else
422 return frame_assembler->write_flush(std::move(bl)
423#endif
424 ).then([this] {
425 // 2. read peer banner
426 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
427#ifdef UNIT_TESTS_BUILT
428 return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false
429 ).then([this, banner_len] {
430 return frame_assembler->read_exactly(banner_len);
431 });
432#else
433 return frame_assembler->read_exactly(banner_len);
434#endif
435 }).then([this](auto bptr) {
436 // 3. process peer banner and read banner_payload
437 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
438 logger().debug("{} RECV({}) banner: \"{}\"",
439 conn, bptr.length(),
440 std::string(bptr.c_str(), banner_prefix_len));
441
442 if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
443 if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
444 logger().warn("{} peer is using V1 protocol", conn);
445 } else {
446 logger().warn("{} peer sent bad banner", conn);
9f95a23c 447 }
aee94f69
TL
448 abort_in_fault();
449 }
9f95a23c 450
aee94f69
TL
451 bptr.set_offset(bptr.offset() + banner_prefix_len);
452 bptr.set_length(bptr.length() - banner_prefix_len);
453 assert(bptr.length() == sizeof(ceph_le16));
454
455 uint16_t payload_len;
456 bufferlist buf;
457 buf.append(std::move(bptr));
458 auto ti = buf.cbegin();
459 try {
460 decode(payload_len, ti);
461 } catch (const buffer::error &e) {
462 logger().warn("{} decode banner payload len failed", conn);
463 abort_in_fault();
464 }
465 logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
466#ifdef UNIT_TESTS_BUILT
467 return frame_assembler->intercept_frame(
468 custom_bp_t::BANNER_PAYLOAD_READ, false
469 ).then([this, payload_len] {
1e59de90 470 return frame_assembler->read(payload_len);
9f95a23c 471 });
aee94f69
TL
472#else
473 return frame_assembler->read(payload_len);
474#endif
475 }).then([this, is_connect] (bufferlist bl) {
476 // 4. process peer banner_payload and send HelloFrame
477 auto p = bl.cbegin();
478 uint64_t _peer_supported_features;
479 uint64_t _peer_required_features;
480 try {
481 decode(_peer_supported_features, p);
482 decode(_peer_required_features, p);
483 } catch (const buffer::error &e) {
484 logger().warn("{} decode banner payload failed", conn);
485 abort_in_fault();
486 }
487 logger().debug("{} RECV({}) banner features: supported={} required={}",
488 conn, bl.length(),
489 _peer_supported_features, _peer_required_features);
490
491 // Check feature bit compatibility
492 uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
493 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
494 if ((required_features & _peer_supported_features) != required_features) {
495 logger().error("{} peer does not support all required features"
496 " required={} peer_supported={}",
497 conn, required_features, _peer_supported_features);
498 ABORT_IN_CLOSE(is_connect);
499 }
500 if ((supported_features & _peer_required_features) != _peer_required_features) {
501 logger().error("{} we do not support all peer required features"
502 " peer_required={} supported={}",
503 conn, _peer_required_features, supported_features);
504 ABORT_IN_CLOSE(is_connect);
505 }
506 peer_supported_features = _peer_supported_features;
507 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
508 frame_assembler->set_is_rev1(is_rev1);
509
510 auto hello = HelloFrame::Encode(messenger.get_mytype(),
511 conn.target_addr);
512 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
513 conn, ceph_entity_type_name(messenger.get_mytype()),
514 conn.target_addr);
515 return frame_assembler->write_flush_frame(hello);
516 }).then([this] {
517 //5. read peer HelloFrame
518 return frame_assembler->read_main_preamble();
519 }).then([this](auto ret) {
520 expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
521 return frame_assembler->read_frame_payload();
522 }).then([this](auto payload) {
523 // 6. process peer HelloFrame
524 auto hello = HelloFrame::Decode(payload->back());
525 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
526 conn, ceph_entity_type_name(hello.entity_type()),
527 hello.peer_addr());
528 return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
529 std::make_tuple(hello.entity_type(), hello.peer_addr()));
530 });
9f95a23c
TL
531}
532
533// CONNECTING state
534
535seastar::future<> ProtocolV2::handle_auth_reply()
536{
1e59de90
TL
537 return frame_assembler->read_main_preamble(
538 ).then([this](auto ret) {
539 switch (ret.tag) {
9f95a23c 540 case Tag::AUTH_BAD_METHOD:
1e59de90
TL
541 return frame_assembler->read_frame_payload(
542 ).then([this](auto payload) {
9f95a23c 543 // handle_auth_bad_method() logic
1e59de90 544 auto bad_method = AuthBadMethodFrame::Decode(payload->back());
9f95a23c
TL
545 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
546 "allowed_methods={}, allowed_modes={}",
547 conn, bad_method.method(), cpp_strerror(bad_method.result()),
548 bad_method.allowed_methods(), bad_method.allowed_modes());
549 ceph_assert(messenger.get_auth_client());
550 int r = messenger.get_auth_client()->handle_auth_bad_method(
1e59de90 551 conn, *auth_meta,
9f95a23c
TL
552 bad_method.method(), bad_method.result(),
553 bad_method.allowed_methods(), bad_method.allowed_modes());
554 if (r < 0) {
555 logger().warn("{} auth_client handle_auth_bad_method returned {}",
556 conn, r);
557 abort_in_fault();
558 }
559 return client_auth(bad_method.allowed_methods());
560 });
561 case Tag::AUTH_REPLY_MORE:
1e59de90
TL
562 return frame_assembler->read_frame_payload(
563 ).then([this](auto payload) {
9f95a23c 564 // handle_auth_reply_more() logic
1e59de90 565 auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
9f95a23c
TL
566 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
567 conn, auth_more.auth_payload().length());
568 ceph_assert(messenger.get_auth_client());
569 // let execute_connecting() take care of the thrown exception
570 auto reply = messenger.get_auth_client()->handle_auth_reply_more(
1e59de90 571 conn, *auth_meta, auth_more.auth_payload());
9f95a23c
TL
572 auto more_reply = AuthRequestMoreFrame::Encode(reply);
573 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
574 conn, reply.length());
1e59de90 575 return frame_assembler->write_flush_frame(more_reply);
9f95a23c
TL
576 }).then([this] {
577 return handle_auth_reply();
578 });
579 case Tag::AUTH_DONE:
1e59de90
TL
580 return frame_assembler->read_frame_payload(
581 ).then([this](auto payload) {
9f95a23c 582 // handle_auth_done() logic
1e59de90 583 auto auth_done = AuthDoneFrame::Decode(payload->back());
9f95a23c
TL
584 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
585 conn, auth_done.global_id(),
586 ceph_con_mode_name(auth_done.con_mode()),
587 auth_done.auth_payload().length());
588 ceph_assert(messenger.get_auth_client());
589 int r = messenger.get_auth_client()->handle_auth_done(
1e59de90
TL
590 conn,
591 *auth_meta,
9f95a23c
TL
592 auth_done.global_id(),
593 auth_done.con_mode(),
594 auth_done.auth_payload());
595 if (r < 0) {
596 logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
597 abort_in_fault();
598 }
599 auth_meta->con_mode = auth_done.con_mode();
1e59de90 600 frame_assembler->create_session_stream_handlers(*auth_meta, false);
9f95a23c
TL
601 return finish_auth();
602 });
603 default: {
1e59de90 604 unexpected_tag(ret.tag, conn, "handle_auth_reply");
9f95a23c
TL
605 return seastar::now();
606 }
607 }
608 });
609}
610
611seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
612{
613 // send_auth_request() logic
614 ceph_assert(messenger.get_auth_client());
615
616 try {
617 auto [auth_method, preferred_modes, bl] =
1e59de90 618 messenger.get_auth_client()->get_auth_request(conn, *auth_meta);
9f95a23c
TL
619 auth_meta->auth_method = auth_method;
620 auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
621 logger().debug("{} WRITE AuthRequestFrame: method={},"
622 " preferred_modes={}, payload_len={}",
623 conn, auth_method, preferred_modes, bl.length());
1e59de90
TL
624 return frame_assembler->write_flush_frame(frame
625 ).then([this] {
9f95a23c
TL
626 return handle_auth_reply();
627 });
628 } catch (const crimson::auth::error& e) {
1e59de90
TL
629 logger().error("{} get_initial_auth_request returned {}", conn, e.what());
630 ABORT_IN_CLOSE(true);
9f95a23c
TL
631 return seastar::now();
632 }
633}
634
635seastar::future<ProtocolV2::next_step_t>
636ProtocolV2::process_wait()
637{
1e59de90
TL
638 return frame_assembler->read_frame_payload(
639 ).then([this](auto payload) {
9f95a23c
TL
640 // handle_wait() logic
641 logger().debug("{} GOT WaitFrame", conn);
1e59de90 642 WaitFrame::Decode(payload->back());
9f95a23c
TL
643 return next_step_t::wait;
644 });
645}
646
647seastar::future<ProtocolV2::next_step_t>
648ProtocolV2::client_connect()
649{
650 // send_client_ident() logic
651 uint64_t flags = 0;
652 if (conn.policy.lossy) {
653 flags |= CEPH_MSG_CONNECT_LOSSY;
654 }
655
656 auto client_ident = ClientIdentFrame::Encode(
657 messenger.get_myaddrs(),
658 conn.target_addr,
659 messenger.get_myname().num(),
660 global_seq,
661 conn.policy.features_supported,
662 conn.policy.features_required | msgr2_required, flags,
663 client_cookie);
664
665 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
666 " gs={}, features_supported={}, features_required={},"
667 " flags={}, cookie={}",
668 conn, messenger.get_myaddrs(), conn.target_addr,
669 messenger.get_myname().num(), global_seq,
670 conn.policy.features_supported,
671 conn.policy.features_required | msgr2_required,
672 flags, client_cookie);
1e59de90
TL
673 return frame_assembler->write_flush_frame(client_ident
674 ).then([this] {
675 return frame_assembler->read_main_preamble();
676 }).then([this](auto ret) {
677 switch (ret.tag) {
9f95a23c 678 case Tag::IDENT_MISSING_FEATURES:
1e59de90
TL
679 return frame_assembler->read_frame_payload(
680 ).then([this](auto payload) {
9f95a23c 681 // handle_ident_missing_features() logic
1e59de90 682 auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
9f95a23c
TL
683 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
684 " (client does not support all server features)",
685 conn, ident_missing.features());
686 abort_in_fault();
687 return next_step_t::none;
688 });
689 case Tag::WAIT:
690 return process_wait();
691 case Tag::SERVER_IDENT:
1e59de90
TL
692 return frame_assembler->read_frame_payload(
693 ).then([this](auto payload) {
aee94f69
TL
694 if (unlikely(state != state_t::CONNECTING)) {
695 logger().debug("{} triggered {} at receiving SERVER_IDENT",
696 conn, get_state_name(state));
697 abort_protocol();
698 }
699
9f95a23c 700 // handle_server_ident() logic
aee94f69
TL
701 auto cc_seq = crosscore.prepare_submit();
702 logger().debug("{} send {} IOHandler::requeue_out_sent()",
703 conn, cc_seq);
704 io_states.requeue_out_sent();
705 gate.dispatch_in_background(
706 "requeue_out_sent", conn, [this, cc_seq] {
707 return seastar::smp::submit_to(
708 io_handler.get_shard_id(), [this, cc_seq] {
709 return io_handler.requeue_out_sent(cc_seq);
710 });
711 });
712
1e59de90 713 auto server_ident = ServerIdentFrame::Decode(payload->back());
9f95a23c
TL
714 logger().debug("{} GOT ServerIdentFrame:"
715 " addrs={}, gid={}, gs={},"
716 " features_supported={}, features_required={},"
717 " flags={}, cookie={}",
718 conn,
719 server_ident.addrs(), server_ident.gid(),
720 server_ident.global_seq(),
721 server_ident.supported_features(),
722 server_ident.required_features(),
723 server_ident.flags(), server_ident.cookie());
724
725 // is this who we intended to talk to?
726 // be a bit forgiving here, since we may be connecting based on addresses parsed out
727 // of mon_host or something.
728 if (!server_ident.addrs().contains(conn.target_addr)) {
729 logger().warn("{} peer identifies as {}, does not include {}",
730 conn, server_ident.addrs(), conn.target_addr);
731 throw std::system_error(
732 make_error_code(crimson::net::error::bad_peer_address));
733 }
734
735 server_cookie = server_ident.cookie();
736
737 // TODO: change peer_addr to entity_addrvec_t
738 if (server_ident.addrs().front() != conn.peer_addr) {
739 logger().warn("{} peer advertises as {}, does not match {}",
740 conn, server_ident.addrs(), conn.peer_addr);
741 throw std::system_error(
742 make_error_code(crimson::net::error::bad_peer_address));
743 }
f67539c2
TL
744 if (conn.get_peer_id() != entity_name_t::NEW &&
745 conn.get_peer_id() != server_ident.gid()) {
746 logger().error("{} connection peer id ({}) does not match "
747 "what it should be ({}) during connecting, close",
748 conn, server_ident.gid(), conn.get_peer_id());
1e59de90 749 ABORT_IN_CLOSE(true);
f67539c2 750 }
9f95a23c
TL
751 conn.set_peer_id(server_ident.gid());
752 conn.set_features(server_ident.supported_features() &
753 conn.policy.features_supported);
1e59de90 754 logger().debug("{} UPDATE: features={}", conn, conn.get_features());
9f95a23c
TL
755 peer_global_seq = server_ident.global_seq();
756
757 bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
758 if (lossy != conn.policy.lossy) {
759 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
760 conn.policy.lossy = lossy;
761 }
762 if (lossy && (connect_seq != 0 || server_cookie != 0)) {
763 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
764 conn, connect_seq, server_cookie);
765 connect_seq = 0;
766 server_cookie = 0;
767 }
768
769 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
770 });
771 default: {
1e59de90 772 unexpected_tag(ret.tag, conn, "post_client_connect");
9f95a23c
TL
773 return seastar::make_ready_future<next_step_t>(next_step_t::none);
774 }
775 }
776 });
777}
778
779seastar::future<ProtocolV2::next_step_t>
780ProtocolV2::client_reconnect()
781{
782 // send_reconnect() logic
783 auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
784 client_cookie,
785 server_cookie,
786 global_seq,
787 connect_seq,
aee94f69 788 io_states.in_seq);
9f95a23c 789 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
1e59de90 790 " server_cookie={}, gs={}, cs={}, in_seq={}",
9f95a23c
TL
791 conn, messenger.get_myaddrs(),
792 client_cookie, server_cookie,
aee94f69 793 global_seq, connect_seq, io_states.in_seq);
1e59de90
TL
794 return frame_assembler->write_flush_frame(reconnect).then([this] {
795 return frame_assembler->read_main_preamble();
796 }).then([this](auto ret) {
797 switch (ret.tag) {
9f95a23c 798 case Tag::SESSION_RETRY_GLOBAL:
1e59de90
TL
799 return frame_assembler->read_frame_payload(
800 ).then([this](auto payload) {
9f95a23c 801 // handle_session_retry_global() logic
1e59de90 802 auto retry = RetryGlobalFrame::Decode(payload->back());
9f95a23c
TL
803 logger().warn("{} GOT RetryGlobalFrame: gs={}",
804 conn, retry.global_seq());
1e59de90
TL
805 global_seq = messenger.get_global_seq(retry.global_seq());
806 logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
807 return client_reconnect();
9f95a23c
TL
808 });
809 case Tag::SESSION_RETRY:
1e59de90
TL
810 return frame_assembler->read_frame_payload(
811 ).then([this](auto payload) {
9f95a23c 812 // handle_session_retry() logic
1e59de90 813 auto retry = RetryFrame::Decode(payload->back());
9f95a23c
TL
814 logger().warn("{} GOT RetryFrame: cs={}",
815 conn, retry.connect_seq());
816 connect_seq = retry.connect_seq() + 1;
817 logger().warn("{} UPDATE: cs={}", conn, connect_seq);
818 return client_reconnect();
819 });
820 case Tag::SESSION_RESET:
1e59de90
TL
821 return frame_assembler->read_frame_payload(
822 ).then([this](auto payload) {
823 if (unlikely(state != state_t::CONNECTING)) {
824 logger().debug("{} triggered {} before reset_session()",
825 conn, get_state_name(state));
826 abort_protocol();
827 }
9f95a23c 828 // handle_session_reset() logic
1e59de90 829 auto reset = ResetFrame::Decode(payload->back());
9f95a23c 830 logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
aee94f69 831
9f95a23c 832 reset_session(reset.full());
aee94f69
TL
833 // user can make changes
834
9f95a23c
TL
835 return client_connect();
836 });
837 case Tag::WAIT:
838 return process_wait();
839 case Tag::SESSION_RECONNECT_OK:
1e59de90
TL
840 return frame_assembler->read_frame_payload(
841 ).then([this](auto payload) {
aee94f69
TL
842 if (unlikely(state != state_t::CONNECTING)) {
843 logger().debug("{} triggered {} at receiving RECONNECT_OK",
844 conn, get_state_name(state));
845 abort_protocol();
846 }
847
9f95a23c 848 // handle_reconnect_ok() logic
1e59de90 849 auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
aee94f69
TL
850 auto cc_seq = crosscore.prepare_submit();
851 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
852 "send {} IOHandler::requeue_out_sent_up_to()",
853 conn, reconnect_ok.msg_seq(), cc_seq);
854
855 io_states.requeue_out_sent_up_to();
856 auto msg_seq = reconnect_ok.msg_seq();
857 gate.dispatch_in_background(
858 "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] {
859 return seastar::smp::submit_to(
860 io_handler.get_shard_id(), [this, cc_seq, msg_seq] {
861 return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq);
862 });
863 });
864
9f95a23c
TL
865 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
866 });
867 default: {
1e59de90 868 unexpected_tag(ret.tag, conn, "post_client_reconnect");
9f95a23c
TL
869 return seastar::make_ready_future<next_step_t>(next_step_t::none);
870 }
871 }
872 });
873}
874
875void ProtocolV2::execute_connecting()
876{
1e59de90 877 ceph_assert_always(!is_socket_valid);
aee94f69 878 trigger_state(state_t::CONNECTING, io_state_t::delay);
1e59de90 879 gated_execute("execute_connecting", conn, [this] {
aee94f69
TL
880 global_seq = messenger.get_global_seq();
881 assert(client_cookie != 0);
882 if (!conn.policy.lossy && server_cookie != 0) {
883 ++connect_seq;
884 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
885 conn, global_seq, connect_seq);
886 } else { // conn.policy.lossy || server_cookie == 0
887 assert(connect_seq == 0);
888 assert(server_cookie == 0);
889 logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
890 }
891 return wait_exit_io().then([this] {
1e59de90 892#ifdef UNIT_TESTS_BUILT
aee94f69
TL
893 // process custom_bp_t::SOCKET_CONNECTING
894 // supports CONTINUE/FAULT/BLOCK
895 if (!conn.interceptor) {
896 return seastar::now();
897 }
898 return conn.interceptor->intercept(
899 conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}}
900 ).then([this](bp_action_t action) {
901 switch (action) {
902 case bp_action_t::CONTINUE:
9f95a23c 903 return seastar::now();
aee94f69
TL
904 case bp_action_t::FAULT:
905 logger().info("[Test] got FAULT");
906 abort_in_fault();
907 case bp_action_t::BLOCK:
908 logger().info("[Test] got BLOCK");
909 return conn.interceptor->blocker.block();
910 default:
911 ceph_abort("unexpected action from trap");
912 return seastar::now();
913 }
914 });;
915 }).then([this] {
916#endif
917 ceph_assert_always(frame_assembler);
918 if (unlikely(state != state_t::CONNECTING)) {
919 logger().debug("{} triggered {} before Socket::connect()",
920 conn, get_state_name(state));
921 abort_protocol();
922 }
923 return Socket::connect(conn.peer_addr);
924 }).then([this](SocketRef _new_socket) {
925 logger().debug("{} socket connected", conn);
926 if (unlikely(state != state_t::CONNECTING)) {
927 logger().debug("{} triggered {} during Socket::connect()",
928 conn, get_state_name(state));
929 return _new_socket->close().then([sock=std::move(_new_socket)] {
930 abort_protocol();
931 });
932 }
933 SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
934 if (!has_socket) {
935 frame_assembler->set_socket(std::move(new_socket));
936 has_socket = true;
937 } else {
938 gate.dispatch_in_background(
939 "replace_socket_connecting",
940 conn,
941 [this, new_socket=std::move(new_socket)]() mutable {
942 return frame_assembler->replace_shutdown_socket(std::move(new_socket));
9f95a23c 943 }
aee94f69
TL
944 );
945 }
946 is_socket_valid = true;
947 return seastar::now();
948 }).then([this] {
949 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
950 frame_assembler->reset_handlers();
951 frame_assembler->start_recording();
952 return banner_exchange(true);
953 }).then([this] (auto&& ret) {
954 auto [_peer_type, _my_addr_from_peer] = std::move(ret);
955 if (conn.get_peer_type() != _peer_type) {
956 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
957 conn, ceph_entity_type_name(conn.get_peer_type()),
958 ceph_entity_type_name(_peer_type));
959 ABORT_IN_CLOSE(true);
960 }
961 if (unlikely(state != state_t::CONNECTING)) {
962 logger().debug("{} triggered {} during banner_exchange(), abort",
963 conn, get_state_name(state));
964 abort_protocol();
965 }
966 frame_assembler->learn_socket_ephemeral_port_as_connector(
967 _my_addr_from_peer.get_port());
968 if (unlikely(_my_addr_from_peer.is_legacy())) {
969 logger().warn("{} peer sent a legacy address for me: {}",
970 conn, _my_addr_from_peer);
971 throw std::system_error(
972 make_error_code(crimson::net::error::bad_peer_address));
973 }
974 _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
975 messenger.learned_addr(_my_addr_from_peer, conn);
976 return client_auth();
977 }).then([this] {
978 if (server_cookie == 0) {
979 ceph_assert(connect_seq == 0);
980 return client_connect();
981 } else {
982 ceph_assert(connect_seq > 0);
983 return client_reconnect();
984 }
985 }).then([this] (next_step_t next) {
986 if (unlikely(state != state_t::CONNECTING)) {
987 logger().debug("{} triggered {} at the end of execute_connecting()",
988 conn, get_state_name(state));
989 abort_protocol();
990 }
991 switch (next) {
992 case next_step_t::ready: {
993 if (unlikely(state != state_t::CONNECTING)) {
994 logger().debug("{} triggered {} before dispatch_connect(), abort",
995 conn, get_state_name(state));
996 abort_protocol();
997 }
998
999 auto cc_seq = crosscore.prepare_submit();
1000 // there are 2 hops with dispatch_connect()
1001 crosscore.prepare_submit();
1002 logger().info("{} connected: gs={}, pgs={}, cs={}, "
1003 "client_cookie={}, server_cookie={}, {}, new_sid={}, "
1004 "send {} IOHandler::dispatch_connect()",
1005 conn, global_seq, peer_global_seq, connect_seq,
1006 client_cookie, server_cookie, io_states,
1007 frame_assembler->get_socket_shard_id(), cc_seq);
1008
1009 // set io_handler to a new shard
1010 auto new_io_shard = frame_assembler->get_socket_shard_id();
1011 ConnectionFRef conn_fref = seastar::make_foreign(
1012 conn.shared_from_this());
1013 ceph_assert_always(!pr_switch_io_shard.has_value());
1014 pr_switch_io_shard = seastar::shared_promise<>();
1015 return seastar::smp::submit_to(
1016 io_handler.get_shard_id(),
1017 [this, cc_seq, new_io_shard,
1018 conn_fref=std::move(conn_fref)]() mutable {
1019 return io_handler.dispatch_connect(
1020 cc_seq, new_io_shard, std::move(conn_fref));
1021 }).then([this, new_io_shard] {
1022 ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
1023 pr_switch_io_shard->set_value();
1024 pr_switch_io_shard = std::nullopt;
1025 // user can make changes
1026
9f95a23c 1027 if (unlikely(state != state_t::CONNECTING)) {
aee94f69 1028 logger().debug("{} triggered {} after dispatch_connect(), abort",
9f95a23c
TL
1029 conn, get_state_name(state));
1030 abort_protocol();
1031 }
aee94f69 1032 execute_ready();
9f95a23c 1033 });
aee94f69
TL
1034 }
1035 case next_step_t::wait: {
1036 logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
1037 ceph_assert_always(is_socket_valid);
1038 frame_assembler->shutdown_socket<true>(&gate);
1039 is_socket_valid = false;
1040 execute_wait(true);
1041 return seastar::now();
1042 }
1043 default: {
1044 ceph_abort("impossible next step");
1045 }
1046 }
1047 }).handle_exception([this](std::exception_ptr eptr) {
1048 fault(state_t::CONNECTING, "execute_connecting", eptr);
9f95a23c 1049 });
aee94f69 1050 });
9f95a23c
TL
1051}
1052
1053// ACCEPTING state
1054
1055seastar::future<> ProtocolV2::_auth_bad_method(int r)
1056{
1057 // _auth_bad_method() logic
1058 ceph_assert(r < 0);
1059 auto [allowed_methods, allowed_modes] =
1060 messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
1061 auto bad_method = AuthBadMethodFrame::Encode(
1062 auth_meta->auth_method, r, allowed_methods, allowed_modes);
1063 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
1064 "allowed_methods={}, allowed_modes={})",
1065 conn, auth_meta->auth_method, cpp_strerror(r),
1066 allowed_methods, allowed_modes);
1e59de90
TL
1067 return frame_assembler->write_flush_frame(bad_method
1068 ).then([this] {
9f95a23c
TL
1069 return server_auth();
1070 });
1071}
1072
1073seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
1074{
1075 // _handle_auth_request() logic
1076 ceph_assert(messenger.get_auth_server());
1077 bufferlist reply;
1078 int r = messenger.get_auth_server()->handle_auth_request(
1e59de90
TL
1079 conn,
1080 *auth_meta,
1081 more,
1082 auth_meta->auth_method,
1083 auth_payload,
1084 &conn.peer_global_id,
9f95a23c
TL
1085 &reply);
1086 switch (r) {
1087 // successful
1088 case 1: {
1089 auto auth_done = AuthDoneFrame::Encode(
1090 conn.peer_global_id, auth_meta->con_mode, reply);
1091 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
1092 conn, conn.peer_global_id,
1093 ceph_con_mode_name(auth_meta->con_mode), reply.length());
1e59de90
TL
1094 return frame_assembler->write_flush_frame(auth_done
1095 ).then([this] {
9f95a23c 1096 ceph_assert(auth_meta);
1e59de90 1097 frame_assembler->create_session_stream_handlers(*auth_meta, true);
9f95a23c
TL
1098 return finish_auth();
1099 });
1100 }
1101 // auth more
1102 case 0: {
1103 auto more = AuthReplyMoreFrame::Encode(reply);
1104 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
1105 conn, reply.length());
1e59de90
TL
1106 return frame_assembler->write_flush_frame(more
1107 ).then([this] {
1108 return frame_assembler->read_main_preamble();
1109 }).then([this](auto ret) {
1110 expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, "read_auth_request_more");
1111 return frame_assembler->read_frame_payload();
1112 }).then([this](auto payload) {
1113 auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
9f95a23c
TL
1114 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
1115 conn, auth_more.auth_payload().length());
1116 return _handle_auth_request(auth_more.auth_payload(), true);
1117 });
1118 }
1119 case -EBUSY: {
1120 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
1121 abort_in_fault();
1122 return seastar::now();
1123 }
1124 default: {
1125 logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
1126 return _auth_bad_method(r);
1127 }
1128 }
1129}
1130
1131seastar::future<> ProtocolV2::server_auth()
1132{
1e59de90
TL
1133 return frame_assembler->read_main_preamble(
1134 ).then([this](auto ret) {
1135 expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, "read_auth_request");
1136 return frame_assembler->read_frame_payload();
1137 }).then([this](auto payload) {
9f95a23c 1138 // handle_auth_request() logic
1e59de90 1139 auto request = AuthRequestFrame::Decode(payload->back());
9f95a23c
TL
1140 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1141 " payload_len={}",
1142 conn, request.method(), request.preferred_modes(),
1143 request.auth_payload().length());
1144 auth_meta->auth_method = request.method();
1145 auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
1146 conn.get_peer_type(), auth_meta->auth_method,
1147 request.preferred_modes());
1148 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
1149 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
1150 return _auth_bad_method(-EOPNOTSUPP);
1151 }
1152 return _handle_auth_request(request.auth_payload(), false);
1153 });
1154}
1155
f67539c2
TL
1156bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
1157{
1158 auto my_peer_name = conn.get_peer_name();
1159 if (my_peer_name.type() != peer_name.type()) {
1160 return false;
1161 }
1162 if (my_peer_name.num() != entity_name_t::NEW &&
1163 peer_name.num() != entity_name_t::NEW &&
1164 my_peer_name.num() != peer_name.num()) {
1165 return false;
1166 }
1167 return true;
1168}
1169
9f95a23c
TL
1170seastar::future<ProtocolV2::next_step_t>
1171ProtocolV2::send_wait()
1172{
1173 auto wait = WaitFrame::Encode();
1174 logger().debug("{} WRITE WaitFrame", conn);
1e59de90
TL
1175 return frame_assembler->write_flush_frame(wait
1176 ).then([] {
9f95a23c
TL
1177 return next_step_t::wait;
1178 });
1179}
1180
1181seastar::future<ProtocolV2::next_step_t>
1182ProtocolV2::reuse_connection(
1183 ProtocolV2* existing_proto, bool do_reset,
1184 bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
1185{
1e59de90
TL
1186 if (unlikely(state != state_t::ACCEPTING)) {
1187 logger().debug("{} triggered {} before trigger_replacing()",
1188 conn, get_state_name(state));
1189 abort_protocol();
1190 }
1191
9f95a23c
TL
1192 existing_proto->trigger_replacing(reconnect,
1193 do_reset,
1e59de90 1194 frame_assembler->to_replace(),
9f95a23c 1195 std::move(auth_meta),
9f95a23c
TL
1196 peer_global_seq,
1197 client_cookie,
1198 conn.get_peer_name(),
1e59de90
TL
1199 conn.get_features(),
1200 peer_supported_features,
9f95a23c
TL
1201 conn_seq,
1202 msg_seq);
1e59de90
TL
1203 ceph_assert_always(has_socket && is_socket_valid);
1204 is_socket_valid = false;
1205 has_socket = false;
9f95a23c
TL
1206#ifdef UNIT_TESTS_BUILT
1207 if (conn.interceptor) {
aee94f69
TL
1208 conn.interceptor->register_conn_replaced(
1209 conn.get_local_shared_foreign_from_this());
9f95a23c
TL
1210 }
1211#endif
1212 // close this connection because all the necessary information is delivered
1213 // to the exisiting connection, and jump to error handling code to abort the
1214 // current state.
1e59de90 1215 ABORT_IN_CLOSE(false);
9f95a23c
TL
1216 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1217}
1218
1219seastar::future<ProtocolV2::next_step_t>
1220ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
1221{
1222 // handle_existing_connection() logic
1223 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1224 existing_conn->protocol.get());
1225 ceph_assert(existing_proto);
1226 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1227 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1228 conn, global_seq, peer_global_seq, connect_seq,
1229 client_cookie, server_cookie,
1e59de90 1230 fmt::ptr(existing_conn.get()), get_state_name(existing_proto->state),
9f95a23c
TL
1231 existing_proto->global_seq,
1232 existing_proto->peer_global_seq,
1233 existing_proto->connect_seq,
1234 existing_proto->client_cookie,
1235 existing_proto->server_cookie);
1236
f67539c2
TL
1237 if (!validate_peer_name(existing_conn->get_peer_name())) {
1238 logger().error("{} server_connect: my peer_name doesn't match"
1e59de90 1239 " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
f67539c2
TL
1240 abort_in_fault();
1241 }
1242
9f95a23c
TL
1243 if (existing_proto->state == state_t::REPLACING) {
1244 logger().warn("{} server_connect: racing replace happened while"
1245 " replacing existing connection {}, send wait.",
1246 conn, *existing_conn);
1247 return send_wait();
1248 }
1249
1250 if (existing_proto->peer_global_seq > peer_global_seq) {
1251 logger().warn("{} server_connect:"
1252 " this is a stale connection, because peer_global_seq({})"
1253 " < existing->peer_global_seq({}), close this connection"
1254 " in favor of existing connection {}",
1255 conn, peer_global_seq,
1256 existing_proto->peer_global_seq, *existing_conn);
1257 abort_in_fault();
1258 }
1259
1260 if (existing_conn->policy.lossy) {
1261 // existing connection can be thrown out in favor of this one
1262 logger().warn("{} server_connect:"
1263 " existing connection {} is a lossy channel. Close existing in favor of"
1264 " this connection", conn, *existing_conn);
1e59de90
TL
1265 if (unlikely(state != state_t::ACCEPTING)) {
1266 logger().debug("{} triggered {} before execute_establishing()",
1267 conn, get_state_name(state));
1268 abort_protocol();
1269 }
1270 execute_establishing(existing_conn);
9f95a23c
TL
1271 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1272 }
1273
1274 if (existing_proto->server_cookie != 0) {
1275 if (existing_proto->client_cookie != client_cookie) {
1276 // Found previous session
1277 // peer has reset and we're going to reuse the existing connection
1278 // by replacing the socket
1279 logger().warn("{} server_connect:"
1280 " found new session (cs={})"
1e59de90 1281 " when existing {} {} is with stale session (cs={}, ss={}),"
9f95a23c 1282 " peer must have reset",
1e59de90
TL
1283 conn,
1284 client_cookie,
1285 get_state_name(existing_proto->state),
1286 *existing_conn,
1287 existing_proto->client_cookie,
9f95a23c
TL
1288 existing_proto->server_cookie);
1289 return reuse_connection(existing_proto, conn.policy.resetcheck);
1290 } else {
1291 // session establishment interrupted between client_ident and server_ident,
1292 // continuing...
1e59de90 1293 logger().warn("{} server_connect: found client session with existing {} {}"
9f95a23c 1294 " matched (cs={}, ss={}), continuing session establishment",
1e59de90
TL
1295 conn,
1296 get_state_name(existing_proto->state),
1297 *existing_conn,
1298 client_cookie,
1299 existing_proto->server_cookie);
9f95a23c
TL
1300 return reuse_connection(existing_proto);
1301 }
1302 } else {
1303 // Looks like a connection race: server and client are both connecting to
1304 // each other at the same time.
1305 if (existing_proto->client_cookie != client_cookie) {
1306 if (existing_conn->peer_wins()) {
1e59de90 1307 // acceptor (this connection, the peer) wins
9f95a23c 1308 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1e59de90
TL
1309 " and win, reusing existing {} {}",
1310 conn,
1311 client_cookie,
1312 existing_proto->client_cookie,
1313 get_state_name(existing_proto->state),
1314 *existing_conn);
9f95a23c
TL
1315 return reuse_connection(existing_proto);
1316 } else {
1e59de90 1317 // acceptor (this connection, the peer) loses
9f95a23c
TL
1318 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1319 " and lose to existing {}, ask client to wait",
1320 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1e59de90 1321 return existing_conn->send_keepalive().then([this] {
9f95a23c
TL
1322 return send_wait();
1323 });
1324 }
1325 } else {
1e59de90 1326 logger().warn("{} server_connect: found client session with existing {} {}"
9f95a23c 1327 " matched (cs={}, ss={}), continuing session establishment",
1e59de90
TL
1328 conn,
1329 get_state_name(existing_proto->state),
1330 *existing_conn,
1331 client_cookie,
1332 existing_proto->server_cookie);
9f95a23c
TL
1333 return reuse_connection(existing_proto);
1334 }
1335 }
1336}
1337
1338seastar::future<ProtocolV2::next_step_t>
1339ProtocolV2::server_connect()
1340{
1e59de90
TL
1341 return frame_assembler->read_frame_payload(
1342 ).then([this](auto payload) {
9f95a23c 1343 // handle_client_ident() logic
1e59de90 1344 auto client_ident = ClientIdentFrame::Decode(payload->back());
9f95a23c
TL
1345 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1346 " gid={}, gs={}, features_supported={},"
1347 " features_required={}, flags={}, cookie={}",
1348 conn, client_ident.addrs(), client_ident.target_addr(),
1349 client_ident.gid(), client_ident.global_seq(),
1350 client_ident.supported_features(),
1351 client_ident.required_features(),
1352 client_ident.flags(), client_ident.cookie());
1353
1354 if (client_ident.addrs().empty() ||
1355 client_ident.addrs().front() == entity_addr_t()) {
1356 logger().warn("{} oops, client_ident.addrs() is empty", conn);
1357 throw std::system_error(
1358 make_error_code(crimson::net::error::bad_peer_address));
1359 }
1360 if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
1361 logger().warn("{} peer is trying to reach {} which is not us ({})",
1362 conn, client_ident.target_addr(), messenger.get_myaddrs());
1363 throw std::system_error(
1364 make_error_code(crimson::net::error::bad_peer_address));
1365 }
20effc67 1366 conn.peer_addr = client_ident.addrs().front();
9f95a23c
TL
1367 logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
1368 conn.target_addr = conn.peer_addr;
1369 if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
1370 logger().warn("{} we don't know how to reconnect to peer {}",
1371 conn, conn.target_addr);
1372 throw std::system_error(
1373 make_error_code(crimson::net::error::bad_peer_address));
1374 }
1375
f67539c2
TL
1376 if (conn.get_peer_id() != entity_name_t::NEW &&
1377 conn.get_peer_id() != client_ident.gid()) {
1378 logger().error("{} client_ident peer_id ({}) does not match"
1379 " what it should be ({}) during accepting, abort",
1380 conn, client_ident.gid(), conn.get_peer_id());
1381 abort_in_fault();
1382 }
9f95a23c
TL
1383 conn.set_peer_id(client_ident.gid());
1384 client_cookie = client_ident.cookie();
1385
1386 uint64_t feat_missing =
1387 (conn.policy.features_required | msgr2_required) &
1388 ~(uint64_t)client_ident.supported_features();
1389 if (feat_missing) {
1390 auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
1391 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1392 conn, feat_missing);
1e59de90
TL
1393 return frame_assembler->write_flush_frame(ident_missing_features
1394 ).then([] {
9f95a23c
TL
1395 return next_step_t::wait;
1396 });
1397 }
1e59de90
TL
1398 conn.set_features(client_ident.supported_features() &
1399 conn.policy.features_supported);
1400 logger().debug("{} UPDATE: features={}", conn, conn.get_features());
9f95a23c
TL
1401
1402 peer_global_seq = client_ident.global_seq();
1403
1e59de90
TL
1404 bool lossy = client_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
1405 if (lossy != conn.policy.lossy) {
1406 logger().warn("{} my lossy policy {} doesn't match client {}, ignore",
1407 conn, conn.policy.lossy, lossy);
1408 }
1409
9f95a23c
TL
1410 // Looks good so far, let's check if there is already an existing connection
1411 // to this peer.
1412
1413 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1414
1415 if (existing_conn) {
1e59de90 1416 return handle_existing_connection(existing_conn);
f67539c2 1417 } else {
1e59de90
TL
1418 if (unlikely(state != state_t::ACCEPTING)) {
1419 logger().debug("{} triggered {} before execute_establishing()",
1420 conn, get_state_name(state));
1421 abort_protocol();
1422 }
1423 execute_establishing(nullptr);
f67539c2 1424 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
9f95a23c 1425 }
9f95a23c
TL
1426 });
1427}
1428
1429seastar::future<ProtocolV2::next_step_t>
1430ProtocolV2::read_reconnect()
1431{
1e59de90
TL
1432 return frame_assembler->read_main_preamble(
1433 ).then([this](auto ret) {
1434 expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_session_reconnect");
9f95a23c
TL
1435 return server_reconnect();
1436 });
1437}
1438
1439seastar::future<ProtocolV2::next_step_t>
1440ProtocolV2::send_retry(uint64_t connect_seq)
1441{
1442 auto retry = RetryFrame::Encode(connect_seq);
1443 logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
1e59de90
TL
1444 return frame_assembler->write_flush_frame(retry
1445 ).then([this] {
9f95a23c
TL
1446 return read_reconnect();
1447 });
1448}
1449
1450seastar::future<ProtocolV2::next_step_t>
1451ProtocolV2::send_retry_global(uint64_t global_seq)
1452{
1453 auto retry = RetryGlobalFrame::Encode(global_seq);
1454 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
1e59de90
TL
1455 return frame_assembler->write_flush_frame(retry
1456 ).then([this] {
9f95a23c
TL
1457 return read_reconnect();
1458 });
1459}
1460
1461seastar::future<ProtocolV2::next_step_t>
1462ProtocolV2::send_reset(bool full)
1463{
1464 auto reset = ResetFrame::Encode(full);
1465 logger().warn("{} WRITE ResetFrame: full={}", conn, full);
1e59de90
TL
1466 return frame_assembler->write_flush_frame(reset
1467 ).then([this] {
1468 return frame_assembler->read_main_preamble();
1469 }).then([this](auto ret) {
1470 expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
9f95a23c
TL
1471 return server_connect();
1472 });
1473}
1474
1475seastar::future<ProtocolV2::next_step_t>
1476ProtocolV2::server_reconnect()
1477{
1e59de90
TL
1478 return frame_assembler->read_frame_payload(
1479 ).then([this](auto payload) {
9f95a23c 1480 // handle_reconnect() logic
1e59de90 1481 auto reconnect = ReconnectFrame::Decode(payload->back());
9f95a23c
TL
1482
1483 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1484 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1485 conn, reconnect.addrs(),
1486 reconnect.client_cookie(), reconnect.server_cookie(),
1487 reconnect.global_seq(), reconnect.connect_seq(),
1488 reconnect.msg_seq());
1489
1490 // can peer_addrs be changed on-the-fly?
1491 // TODO: change peer_addr to entity_addrvec_t
1492 entity_addr_t paddr = reconnect.addrs().front();
1493 if (paddr.is_msgr2() || paddr.is_any()) {
1494 // good
1495 } else {
1496 logger().warn("{} peer's address {} is not v2", conn, paddr);
1497 throw std::system_error(
1498 make_error_code(crimson::net::error::bad_peer_address));
1499 }
1500 if (conn.peer_addr == entity_addr_t()) {
1501 conn.peer_addr = paddr;
1502 } else if (conn.peer_addr != paddr) {
1503 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1504 " reconnect failed",
1505 conn, paddr, conn.peer_addr);
1506 throw std::system_error(
1507 make_error_code(crimson::net::error::bad_peer_address));
1508 }
1509 peer_global_seq = reconnect.global_seq();
1510
1511 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1512
1513 if (!existing_conn) {
1514 // there is no existing connection therefore cannot reconnect to previous
1515 // session
1516 logger().warn("{} server_reconnect: no existing connection from address {},"
1517 " reseting client", conn, conn.peer_addr);
1518 return send_reset(true);
1519 }
1520
9f95a23c
TL
1521 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1522 existing_conn->protocol.get());
1523 ceph_assert(existing_proto);
1524 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1525 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1526 conn, global_seq, peer_global_seq, reconnect.connect_seq(),
1527 reconnect.client_cookie(), reconnect.server_cookie(),
1e59de90 1528 fmt::ptr(existing_conn.get()),
9f95a23c
TL
1529 get_state_name(existing_proto->state),
1530 existing_proto->global_seq,
1531 existing_proto->peer_global_seq,
1532 existing_proto->connect_seq,
1533 existing_proto->client_cookie,
1534 existing_proto->server_cookie);
1535
f67539c2
TL
1536 if (!validate_peer_name(existing_conn->get_peer_name())) {
1537 logger().error("{} server_reconnect: my peer_name doesn't match"
1e59de90 1538 " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
f67539c2
TL
1539 abort_in_fault();
1540 }
1541
9f95a23c
TL
1542 if (existing_proto->state == state_t::REPLACING) {
1543 logger().warn("{} server_reconnect: racing replace happened while "
1544 " replacing existing connection {}, retry global.",
1545 conn, *existing_conn);
1546 return send_retry_global(existing_proto->peer_global_seq);
1547 }
1548
1549 if (existing_proto->client_cookie != reconnect.client_cookie()) {
1550 logger().warn("{} server_reconnect:"
1551 " client_cookie mismatch with existing connection {},"
1552 " cc={} rcc={}. I must have reset, reseting client.",
1553 conn, *existing_conn,
1554 existing_proto->client_cookie, reconnect.client_cookie());
1555 return send_reset(conn.policy.resetcheck);
1556 } else if (existing_proto->server_cookie == 0) {
1557 // this happens when:
1558 // - a connects to b
1559 // - a sends client_ident
1560 // - b gets client_ident, sends server_ident and sets cookie X
1561 // - connection fault
1562 // - b reconnects to a with cookie X, connect_seq=1
1563 // - a has cookie==0
1564 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1565 " server_ident with existing connection {}."
1566 " Asking peer to resume session establishment",
1567 conn, existing_proto->client_cookie, *existing_conn);
1568 return send_reset(false);
1569 }
1570
1571 if (existing_proto->peer_global_seq > reconnect.global_seq()) {
1572 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1573 " with existing connection {},"
1574 " ask client to retry global",
1575 conn, existing_proto->peer_global_seq,
1576 reconnect.global_seq(), *existing_conn);
1577 return send_retry_global(existing_proto->peer_global_seq);
1578 }
1579
1580 if (existing_proto->connect_seq > reconnect.connect_seq()) {
1581 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1582 " with existing connection {}, ask client to retry",
1583 conn, reconnect.connect_seq(),
1584 existing_proto->connect_seq, *existing_conn);
1585 return send_retry(existing_proto->connect_seq);
1586 } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
1587 // reconnect race: both peers are sending reconnect messages
1588 if (existing_conn->peer_wins()) {
1e59de90 1589 // acceptor (this connection, the peer) wins
9f95a23c 1590 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1e59de90
TL
1591 " and win, reusing existing {} {}",
1592 conn,
1593 reconnect.connect_seq(),
1594 get_state_name(existing_proto->state),
1595 *existing_conn);
9f95a23c
TL
1596 return reuse_connection(
1597 existing_proto, false,
1598 true, reconnect.connect_seq(), reconnect.msg_seq());
1599 } else {
1e59de90 1600 // acceptor (this connection, the peer) loses
9f95a23c
TL
1601 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1602 " and lose to existing {}, ask client to wait",
1603 conn, reconnect.connect_seq(), *existing_conn);
1604 return send_wait();
1605 }
1606 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1607 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1e59de90
TL
1608 " reusing existing {} {}",
1609 conn,
1610 existing_proto->connect_seq,
1611 reconnect.connect_seq(),
1612 get_state_name(existing_proto->state),
1613 *existing_conn);
9f95a23c
TL
1614 return reuse_connection(
1615 existing_proto, false,
1616 true, reconnect.connect_seq(), reconnect.msg_seq());
1617 }
1618 });
1619}
1620
1621void ProtocolV2::execute_accepting()
1622{
1e59de90 1623 assert(is_socket_valid);
aee94f69 1624 trigger_state(state_t::ACCEPTING, io_state_t::none);
1e59de90 1625 gate.dispatch_in_background("execute_accepting", conn, [this] {
aee94f69 1626 return seastar::futurize_invoke([this] {
1e59de90 1627#ifdef UNIT_TESTS_BUILT
aee94f69
TL
1628 if (conn.interceptor) {
1629 // only notify socket accepted
1630 gate.dispatch_in_background(
1631 "test_intercept_socket_accepted", conn, [this] {
1632 return conn.interceptor->intercept(
1633 conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}}
1634 ).then([](bp_action_t action) {
1635 ceph_assert(action == bp_action_t::CONTINUE);
1636 });
9f95a23c 1637 });
aee94f69
TL
1638 }
1639#endif
1640 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
1641 frame_assembler->reset_handlers();
1642 frame_assembler->start_recording();
1643 return banner_exchange(false);
1644 }).then([this] (auto&& ret) {
1645 auto [_peer_type, _my_addr_from_peer] = std::move(ret);
1646 ceph_assert(conn.get_peer_type() == 0);
1647 conn.set_peer_type(_peer_type);
1648
1649 conn.policy = messenger.get_policy(_peer_type);
1650 logger().info("{} UPDATE: peer_type={},"
1651 " policy(lossy={} server={} standby={} resetcheck={})",
1652 conn, ceph_entity_type_name(_peer_type),
1653 conn.policy.lossy, conn.policy.server,
1654 conn.policy.standby, conn.policy.resetcheck);
1655 if (!messenger.get_myaddr().is_blank_ip() &&
1656 (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
1657 messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
1658 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1659 conn, _my_addr_from_peer, messenger.get_myaddr());
1660 throw std::system_error(
1661 make_error_code(crimson::net::error::bad_peer_address));
1662 }
1663 messenger.learned_addr(_my_addr_from_peer, conn);
1664 return server_auth();
1665 }).then([this] {
1666 return frame_assembler->read_main_preamble();
1667 }).then([this](auto ret) {
1668 switch (ret.tag) {
1669 case Tag::CLIENT_IDENT:
1670 return server_connect();
1671 case Tag::SESSION_RECONNECT:
1672 return server_reconnect();
1673 default: {
1674 unexpected_tag(ret.tag, conn, "post_server_auth");
1675 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1676 }
1677 }
1678 }).then([this] (next_step_t next) {
1679 switch (next) {
1680 case next_step_t::ready:
1681 assert(state != state_t::ACCEPTING);
1682 break;
1683 case next_step_t::wait:
1684 if (unlikely(state != state_t::ACCEPTING)) {
1685 logger().debug("{} triggered {} at the end of execute_accepting()",
1686 conn, get_state_name(state));
1687 abort_protocol();
1688 }
1689 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
1690 execute_server_wait();
1691 break;
1692 default:
1693 ceph_abort("impossible next step");
1694 }
1695 }).handle_exception([this](std::exception_ptr eptr) {
1696 const char *e_what;
1697 try {
1698 std::rethrow_exception(eptr);
1699 } catch (std::exception &e) {
1700 e_what = e.what();
1701 }
1702 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1703 conn, get_state_name(state), e_what);
1704 do_close(false);
9f95a23c 1705 });
aee94f69 1706 });
9f95a23c
TL
1707}
1708
1709// CONNECTING or ACCEPTING state
1710
1711seastar::future<> ProtocolV2::finish_auth()
1712{
1713 ceph_assert(auth_meta);
1714
1e59de90 1715 auto records = frame_assembler->stop_recording();
9f95a23c 1716 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1e59de90 1717 auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
9f95a23c 1718 auto sig_frame = AuthSignatureFrame::Encode(sig);
9f95a23c 1719 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
1e59de90
TL
1720 return frame_assembler->write_flush_frame(sig_frame
1721 ).then([this] {
1722 return frame_assembler->read_main_preamble();
1723 }).then([this](auto ret) {
1724 expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
1725 return frame_assembler->read_frame_payload();
1726 }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
9f95a23c 1727 // handle_auth_signature() logic
1e59de90 1728 auto sig_frame = AuthSignatureFrame::Decode(payload->back());
9f95a23c
TL
1729 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
1730
1731 const auto actual_tx_sig = auth_meta->session_key.empty() ?
1732 sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
1733 if (sig_frame.signature() != actual_tx_sig) {
1734 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1735 " sig_frame.signature()={}",
1736 conn, actual_tx_sig, sig_frame.signature());
1737 abort_in_fault();
1738 }
9f95a23c
TL
1739 });
1740}
1741
1742// ESTABLISHING
1743
1e59de90 1744void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
f67539c2
TL
1745 auto accept_me = [this] {
1746 messenger.register_conn(
1747 seastar::static_pointer_cast<SocketConnection>(
1748 conn.shared_from_this()));
1749 messenger.unaccept_conn(
1750 seastar::static_pointer_cast<SocketConnection>(
1751 conn.shared_from_this()));
1752 };
1753
1e59de90 1754 ceph_assert_always(is_socket_valid);
aee94f69
TL
1755 trigger_state(state_t::ESTABLISHING, io_state_t::delay);
1756 bool is_replace;
f67539c2 1757 if (existing_conn) {
aee94f69
TL
1758 logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
1759 "client_cookie={}, server_cookie={}, {}, new_sid={}, "
1760 "close existing {}",
1761 conn, global_seq, peer_global_seq, connect_seq,
1762 client_cookie, server_cookie,
1763 io_states, frame_assembler->get_socket_shard_id(),
1764 *existing_conn);
1765 is_replace = true;
1766 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1767 existing_conn->protocol.get());
1768 existing_proto->do_close(
1769 true, // is_dispatch_reset
1770 std::move(accept_me));
f67539c2
TL
1771 if (unlikely(state != state_t::ESTABLISHING)) {
1772 logger().warn("{} triggered {} during execute_establishing(), "
1773 "the accept event will not be delivered!",
1774 conn, get_state_name(state));
1775 abort_protocol();
1776 }
1777 } else {
aee94f69
TL
1778 logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
1779 "client_cookie={}, server_cookie={}, {}, new_sid={}, "
1780 "no existing",
1781 conn, global_seq, peer_global_seq, connect_seq,
1782 client_cookie, server_cookie, io_states,
1783 frame_assembler->get_socket_shard_id());
1784 is_replace = false;
f67539c2
TL
1785 accept_me();
1786 }
1787
aee94f69
TL
1788 gated_execute("execute_establishing", conn, [this, is_replace] {
1789 ceph_assert_always(state == state_t::ESTABLISHING);
1790
1791 // set io_handler to a new shard
1792 auto cc_seq = crosscore.prepare_submit();
1793 // there are 2 hops with dispatch_accept()
1794 crosscore.prepare_submit();
1795 auto new_io_shard = frame_assembler->get_socket_shard_id();
1796 logger().debug("{} send {} IOHandler::dispatch_accept({})",
1797 conn, cc_seq, new_io_shard);
1798 ConnectionFRef conn_fref = seastar::make_foreign(
1799 conn.shared_from_this());
1800 ceph_assert_always(!pr_switch_io_shard.has_value());
1801 pr_switch_io_shard = seastar::shared_promise<>();
1802 return seastar::smp::submit_to(
1803 io_handler.get_shard_id(),
1804 [this, cc_seq, new_io_shard, is_replace,
1805 conn_fref=std::move(conn_fref)]() mutable {
1806 return io_handler.dispatch_accept(
1807 cc_seq, new_io_shard, std::move(conn_fref), is_replace);
1808 }).then([this, new_io_shard] {
1809 ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
1810 pr_switch_io_shard->set_value();
1811 pr_switch_io_shard = std::nullopt;
1812 // user can make changes
1813
1814 if (unlikely(state != state_t::ESTABLISHING)) {
1815 logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()",
1816 conn, get_state_name(state));
1817 abort_protocol();
1818 }
f67539c2 1819
9f95a23c
TL
1820 return send_server_ident();
1821 }).then([this] {
1822 if (unlikely(state != state_t::ESTABLISHING)) {
1823 logger().debug("{} triggered {} at the end of execute_establishing()",
1824 conn, get_state_name(state));
1825 abort_protocol();
1826 }
aee94f69 1827 logger().info("{} established, going to ready", conn);
1e59de90
TL
1828 execute_ready();
1829 }).handle_exception([this](std::exception_ptr eptr) {
1830 fault(state_t::ESTABLISHING, "execute_establishing", eptr);
9f95a23c
TL
1831 });
1832 });
1833}
1834
1835// ESTABLISHING or REPLACING state
1836
1837seastar::future<>
1838ProtocolV2::send_server_ident()
1839{
aee94f69
TL
1840 ceph_assert_always(state == state_t::ESTABLISHING ||
1841 state == state_t::REPLACING);
9f95a23c
TL
1842 // send_server_ident() logic
1843
1844 // refered to async-conn v2: not assign gs to global_seq
1e59de90 1845 global_seq = messenger.get_global_seq();
aee94f69
TL
1846 auto cc_seq = crosscore.prepare_submit();
1847 logger().debug("{} UPDATE: gs={} for server ident, "
1848 "send {} IOHandler::reset_peer_state()",
1849 conn, global_seq, cc_seq);
9f95a23c 1850
1e59de90 1851 // this is required for the case when this connection is being replaced
aee94f69
TL
1852 io_states.reset_peer_state();
1853 gate.dispatch_in_background(
1854 "reset_peer_state", conn, [this, cc_seq] {
1855 return seastar::smp::submit_to(
1856 io_handler.get_shard_id(), [this, cc_seq] {
1857 return io_handler.reset_peer_state(cc_seq);
1858 });
1859 });
9f95a23c 1860
1e59de90
TL
1861 if (!conn.policy.lossy) {
1862 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1863 }
9f95a23c 1864
1e59de90
TL
1865 uint64_t flags = 0;
1866 if (conn.policy.lossy) {
1867 flags = flags | CEPH_MSG_CONNECT_LOSSY;
1868 }
9f95a23c 1869
1e59de90
TL
1870 auto server_ident = ServerIdentFrame::Encode(
1871 messenger.get_myaddrs(),
1872 messenger.get_myname().num(),
1873 global_seq,
1874 conn.policy.features_supported,
1875 conn.policy.features_required | msgr2_required,
1876 flags,
1877 server_cookie);
1878
1879 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1880 " gs={}, features_supported={}, features_required={},"
1881 " flags={}, cookie={}",
1882 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
1883 global_seq, conn.policy.features_supported,
1884 conn.policy.features_required | msgr2_required,
1885 flags, server_cookie);
1886
1887 return frame_assembler->write_flush_frame(server_ident);
9f95a23c
TL
1888}
1889
1890// REPLACING state
1891
1892void ProtocolV2::trigger_replacing(bool reconnect,
1893 bool do_reset,
1e59de90 1894 FrameAssemblerV2::mover_t &&mover,
9f95a23c 1895 AuthConnectionMetaRef&& new_auth_meta,
9f95a23c
TL
1896 uint64_t new_peer_global_seq,
1897 uint64_t new_client_cookie,
1898 entity_name_t new_peer_name,
1899 uint64_t new_conn_features,
1e59de90 1900 uint64_t new_peer_supported_features,
9f95a23c
TL
1901 uint64_t new_connect_seq,
1902 uint64_t new_msg_seq)
1903{
aee94f69
TL
1904 ceph_assert_always(state >= state_t::ESTABLISHING);
1905 ceph_assert_always(state <= state_t::WAIT);
1e59de90 1906 ceph_assert_always(has_socket || state == state_t::CONNECTING);
aee94f69
TL
1907 // mover.socket shouldn't be shutdown
1908
1909 logger().info("{} start replacing ({}): pgs was {}, cs was {}, "
1910 "client_cookie was {}, {}, new_sid={}",
1911 conn, reconnect ? "reconnected" : "connected",
1912 peer_global_seq, connect_seq, client_cookie,
1913 io_states, mover.socket->get_shard_id());
1e59de90 1914 if (is_socket_valid) {
aee94f69 1915 frame_assembler->shutdown_socket<true>(&gate);
1e59de90 1916 is_socket_valid = false;
9f95a23c 1917 }
aee94f69 1918 trigger_state_phase1(state_t::REPLACING);
1e59de90
TL
1919 gate.dispatch_in_background(
1920 "trigger_replacing",
1921 conn,
1922 [this,
1923 reconnect,
1924 do_reset,
1925 mover = std::move(mover),
1926 new_auth_meta = std::move(new_auth_meta),
1927 new_client_cookie, new_peer_name,
1928 new_conn_features, new_peer_supported_features,
1929 new_peer_global_seq,
1930 new_connect_seq, new_msg_seq] () mutable {
1931 ceph_assert_always(state == state_t::REPLACING);
aee94f69
TL
1932 auto new_io_shard = mover.socket->get_shard_id();
1933 // state may become CLOSING below, but we cannot abort the chain until
1934 // mover.socket is correctly handled (closed or replaced).
1935
1936 // this is preemptive
1937 return wait_switch_io_shard(
1e59de90 1938 ).then([this] {
aee94f69
TL
1939 if (unlikely(state != state_t::REPLACING)) {
1940 ceph_assert_always(state == state_t::CLOSING);
1941 return seastar::now();
1942 }
1943
1944 trigger_state_phase2(state_t::REPLACING, io_state_t::delay);
1945 return wait_exit_io();
1946 }).then([this] {
1947 if (unlikely(state != state_t::REPLACING)) {
1948 ceph_assert_always(state == state_t::CLOSING);
1949 return seastar::now();
1950 }
1951
1e59de90 1952 ceph_assert_always(frame_assembler);
9f95a23c 1953 protocol_timer.cancel();
1e59de90
TL
1954 auto done = std::move(execution_done);
1955 execution_done = seastar::now();
1956 return done;
aee94f69
TL
1957 }).then([this, new_io_shard] {
1958 if (unlikely(state != state_t::REPLACING)) {
1959 ceph_assert_always(state == state_t::CLOSING);
1960 return seastar::now();
1961 }
1962
1963 // set io_handler to a new shard
1964 // we should prevent parallel switching core attemps
1965 auto cc_seq = crosscore.prepare_submit();
1966 // there are 2 hops with dispatch_accept()
1967 crosscore.prepare_submit();
1968 logger().debug("{} send {} IOHandler::dispatch_accept({})",
1969 conn, cc_seq, new_io_shard);
1970 ConnectionFRef conn_fref = seastar::make_foreign(
1971 conn.shared_from_this());
1972 ceph_assert_always(!pr_switch_io_shard.has_value());
1973 pr_switch_io_shard = seastar::shared_promise<>();
1974 return seastar::smp::submit_to(
1975 io_handler.get_shard_id(),
1976 [this, cc_seq, new_io_shard,
1977 conn_fref=std::move(conn_fref)]() mutable {
1978 return io_handler.dispatch_accept(
1979 cc_seq, new_io_shard, std::move(conn_fref), false);
1980 }).then([this, new_io_shard] {
1981 ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
1982 pr_switch_io_shard->set_value();
1983 pr_switch_io_shard = std::nullopt;
1984 // user can make changes
1985 });
9f95a23c
TL
1986 }).then([this,
1987 reconnect,
1e59de90
TL
1988 do_reset,
1989 mover = std::move(mover),
9f95a23c 1990 new_auth_meta = std::move(new_auth_meta),
9f95a23c 1991 new_client_cookie, new_peer_name,
1e59de90
TL
1992 new_conn_features, new_peer_supported_features,
1993 new_peer_global_seq,
9f95a23c 1994 new_connect_seq, new_msg_seq] () mutable {
1e59de90
TL
1995 if (state == state_t::REPLACING && do_reset) {
1996 reset_session(true);
aee94f69 1997 // user can make changes
1e59de90
TL
1998 }
1999
9f95a23c 2000 if (unlikely(state != state_t::REPLACING)) {
aee94f69
TL
2001 logger().debug("{} triggered {} in the middle of trigger_replacing(), abort",
2002 conn, get_state_name(state));
2003 ceph_assert_always(state == state_t::CLOSING);
1e59de90
TL
2004 return mover.socket->close(
2005 ).then([sock = std::move(mover.socket)] {
9f95a23c
TL
2006 abort_protocol();
2007 });
2008 }
2009
9f95a23c 2010 auth_meta = std::move(new_auth_meta);
9f95a23c 2011 peer_global_seq = new_peer_global_seq;
1e59de90
TL
2012 gate.dispatch_in_background(
2013 "replace_frame_assembler",
2014 conn,
2015 [this, mover=std::move(mover)]() mutable {
2016 return frame_assembler->replace_by(std::move(mover));
2017 }
2018 );
2019 is_socket_valid = true;
2020 has_socket = true;
9f95a23c
TL
2021
2022 if (reconnect) {
2023 connect_seq = new_connect_seq;
2024 // send_reconnect_ok() logic
aee94f69
TL
2025
2026 auto cc_seq = crosscore.prepare_submit();
2027 logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})",
2028 conn, cc_seq, new_msg_seq);
2029 io_states.requeue_out_sent_up_to();
2030 gate.dispatch_in_background(
2031 "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] {
2032 return seastar::smp::submit_to(
2033 io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] {
2034 return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq);
2035 });
2036 });
2037
2038 auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
2039 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
1e59de90 2040 return frame_assembler->write_flush_frame(reconnect_ok);
9f95a23c
TL
2041 } else {
2042 client_cookie = new_client_cookie;
f67539c2
TL
2043 assert(conn.get_peer_type() == new_peer_name.type());
2044 if (conn.get_peer_id() == entity_name_t::NEW) {
2045 conn.set_peer_id(new_peer_name.num());
2046 }
1e59de90
TL
2047 conn.set_features(new_conn_features);
2048 peer_supported_features = new_peer_supported_features;
2049 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
2050 frame_assembler->set_is_rev1(is_rev1);
9f95a23c
TL
2051 return send_server_ident();
2052 }
2053 }).then([this, reconnect] {
2054 if (unlikely(state != state_t::REPLACING)) {
aee94f69 2055 logger().debug("{} triggered {} at the end of trigger_replacing(), abort",
9f95a23c 2056 conn, get_state_name(state));
aee94f69 2057 ceph_assert_always(state == state_t::CLOSING);
9f95a23c
TL
2058 abort_protocol();
2059 }
aee94f69
TL
2060 logger().info("{} replaced ({}), going to ready: "
2061 "gs={}, pgs={}, cs={}, "
1e59de90 2062 "client_cookie={}, server_cookie={}, {}",
9f95a23c 2063 conn, reconnect ? "reconnected" : "connected",
1e59de90 2064 global_seq, peer_global_seq, connect_seq,
aee94f69 2065 client_cookie, server_cookie, io_states);
1e59de90
TL
2066 execute_ready();
2067 }).handle_exception([this](std::exception_ptr eptr) {
2068 fault(state_t::REPLACING, "trigger_replacing", eptr);
9f95a23c
TL
2069 });
2070 });
2071}
2072
2073// READY state
2074
aee94f69
TL
2075seastar::future<> ProtocolV2::notify_out_fault(
2076 crosscore_t::seq_t cc_seq,
2077 const char *where,
2078 std::exception_ptr eptr,
2079 io_handler_state _io_states)
9f95a23c 2080{
aee94f69
TL
2081 assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
2082 if (!crosscore.proceed_or_wait(cc_seq)) {
2083 logger().debug("{} got {} notify_out_fault(), wait at {}",
2084 conn, cc_seq, crosscore.get_in_seq());
2085 return crosscore.wait(cc_seq
2086 ).then([this, cc_seq, where, eptr, _io_states] {
2087 return notify_out_fault(cc_seq, where, eptr, _io_states);
2088 });
2089 }
2090
2091 io_states = _io_states;
2092 logger().debug("{} got {} notify_out_fault(): io_states={}",
2093 conn, cc_seq, io_states);
1e59de90 2094 fault(state_t::READY, where, eptr);
aee94f69 2095 return seastar::now();
9f95a23c
TL
2096}
2097
1e59de90 2098void ProtocolV2::execute_ready()
9f95a23c
TL
2099{
2100 assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
1e59de90
TL
2101 protocol_timer.cancel();
2102 ceph_assert_always(is_socket_valid);
aee94f69
TL
2103 // I'm not responsible to shutdown the socket at READY
2104 is_socket_valid = false;
2105 trigger_state(state_t::READY, io_state_t::open);
2106#ifdef UNIT_TESTS_BUILT
2107 if (conn.interceptor) {
2108 // FIXME: doesn't support cross-core
2109 conn.interceptor->register_conn_ready(
2110 conn.get_local_shared_foreign_from_this());
2111 }
2112#endif
9f95a23c
TL
2113}
2114
2115// STANDBY state
2116
2117void ProtocolV2::execute_standby()
2118{
1e59de90 2119 ceph_assert_always(!is_socket_valid);
aee94f69 2120 trigger_state(state_t::STANDBY, io_state_t::delay);
9f95a23c
TL
2121}
2122
aee94f69
TL
2123seastar::future<> ProtocolV2::notify_out(
2124 crosscore_t::seq_t cc_seq)
9f95a23c 2125{
aee94f69
TL
2126 assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
2127 if (!crosscore.proceed_or_wait(cc_seq)) {
2128 logger().debug("{} got {} notify_out(), wait at {}",
2129 conn, cc_seq, crosscore.get_in_seq());
2130 return crosscore.wait(cc_seq
2131 ).then([this, cc_seq] {
2132 return notify_out(cc_seq);
2133 });
2134 }
2135
2136 logger().debug("{} got {} notify_out(): at {}",
2137 conn, cc_seq, get_state_name(state));
2138 io_states.is_out_queued = true;
9f95a23c 2139 if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
1e59de90 2140 logger().info("{} notify_out(): at {}, going to CONNECTING",
9f95a23c
TL
2141 conn, get_state_name(state));
2142 execute_connecting();
2143 }
aee94f69 2144 return seastar::now();
9f95a23c
TL
2145}
2146
2147// WAIT state
2148
2149void ProtocolV2::execute_wait(bool max_backoff)
2150{
1e59de90 2151 ceph_assert_always(!is_socket_valid);
aee94f69 2152 trigger_state(state_t::WAIT, io_state_t::delay);
1e59de90 2153 gated_execute("execute_wait", conn, [this, max_backoff] {
9f95a23c
TL
2154 double backoff = protocol_timer.last_dur();
2155 if (max_backoff) {
f67539c2 2156 backoff = local_conf().get_val<double>("ms_max_backoff");
9f95a23c 2157 } else if (backoff > 0) {
f67539c2 2158 backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
9f95a23c 2159 } else {
f67539c2 2160 backoff = local_conf().get_val<double>("ms_initial_backoff");
9f95a23c
TL
2161 }
2162 return protocol_timer.backoff(backoff).then([this] {
2163 if (unlikely(state != state_t::WAIT)) {
2164 logger().debug("{} triggered {} at the end of execute_wait()",
2165 conn, get_state_name(state));
2166 abort_protocol();
2167 }
2168 logger().info("{} execute_wait(): going to CONNECTING", conn);
2169 execute_connecting();
1e59de90
TL
2170 }).handle_exception([this](std::exception_ptr eptr) {
2171 const char *e_what;
2172 try {
2173 std::rethrow_exception(eptr);
2174 } catch (std::exception &e) {
2175 e_what = e.what();
2176 }
9f95a23c 2177 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
1e59de90 2178 conn, get_state_name(state), e_what);
9f95a23c
TL
2179 assert(state == state_t::REPLACING ||
2180 state == state_t::CLOSING);
2181 });
2182 });
2183}
2184
2185// SERVER_WAIT state
2186
2187void ProtocolV2::execute_server_wait()
2188{
1e59de90 2189 ceph_assert_always(is_socket_valid);
aee94f69 2190 trigger_state(state_t::SERVER_WAIT, io_state_t::none);
1e59de90
TL
2191 gated_execute("execute_server_wait", conn, [this] {
2192 return frame_assembler->read_exactly(1
aee94f69 2193 ).then([this](auto bptr) {
9f95a23c
TL
2194 logger().warn("{} SERVER_WAIT got read, abort", conn);
2195 abort_in_fault();
1e59de90
TL
2196 }).handle_exception([this](std::exception_ptr eptr) {
2197 const char *e_what;
2198 try {
2199 std::rethrow_exception(eptr);
2200 } catch (std::exception &e) {
2201 e_what = e.what();
2202 }
9f95a23c 2203 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
1e59de90
TL
2204 conn, get_state_name(state), e_what);
2205 do_close(false);
9f95a23c
TL
2206 });
2207 });
2208}
2209
2210// CLOSING state
2211
aee94f69
TL
2212seastar::future<> ProtocolV2::notify_mark_down(
2213 crosscore_t::seq_t cc_seq)
9f95a23c 2214{
aee94f69
TL
2215 assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
2216 if (!crosscore.proceed_or_wait(cc_seq)) {
2217 logger().debug("{} got {} notify_mark_down(), wait at {}",
2218 conn, cc_seq, crosscore.get_in_seq());
2219 return crosscore.wait(cc_seq
2220 ).then([this, cc_seq] {
2221 return notify_mark_down(cc_seq);
2222 });
2223 }
2224
2225 logger().debug("{} got {} notify_mark_down()",
2226 conn, cc_seq);
1e59de90 2227 do_close(false);
aee94f69 2228 return seastar::now();
1e59de90
TL
2229}
2230
2231seastar::future<> ProtocolV2::close_clean_yielded()
2232{
2233 // yield() so that do_close() can be called *after* close_clean_yielded() is
2234 // applied to all connections in a container using
2235 // seastar::parallel_for_each(). otherwise, we could erase a connection in
2236 // the container when seastar::parallel_for_each() is still iterating in it.
2237 // that'd lead to a segfault.
2238 return seastar::yield(
aee94f69 2239 ).then([this] {
1e59de90 2240 do_close(false);
aee94f69
TL
2241 return pr_closed_clean.get_shared_future();
2242
2243 // connection may be unreferenced from the messenger,
2244 // so need to hold the additional reference.
2245 }).finally([conn_ref = conn.shared_from_this()] {});;
1e59de90
TL
2246}
2247
2248void ProtocolV2::do_close(
2249 bool is_dispatch_reset,
2250 std::optional<std::function<void()>> f_accept_new)
2251{
aee94f69 2252 if (state == state_t::CLOSING) {
1e59de90 2253 // already closing
1e59de90
TL
2254 return;
2255 }
2256
2257 bool is_replace = f_accept_new ? true : false;
2258 logger().info("{} closing: reset {}, replace {}", conn,
2259 is_dispatch_reset ? "yes" : "no",
2260 is_replace ? "yes" : "no");
2261
2262 /*
2263 * atomic operations
2264 */
2265
aee94f69 2266 ceph_assert_always(!gate.is_closed());
1e59de90 2267
aee94f69 2268 // messenger registrations, must before user events
f67539c2
TL
2269 messenger.closing_conn(
2270 seastar::static_pointer_cast<SocketConnection>(
2271 conn.shared_from_this()));
9f95a23c
TL
2272 if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
2273 messenger.unaccept_conn(
2274 seastar::static_pointer_cast<SocketConnection>(
2275 conn.shared_from_this()));
2276 } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
2277 messenger.unregister_conn(
2278 seastar::static_pointer_cast<SocketConnection>(
2279 conn.shared_from_this()));
2280 } else {
2281 // cannot happen
2282 ceph_assert(false);
2283 }
1e59de90 2284 if (f_accept_new) {
aee94f69
TL
2285 // the replacing connection must be registerred after the replaced
2286 // connection is unreigsterred.
1e59de90
TL
2287 (*f_accept_new)();
2288 }
aee94f69
TL
2289
2290 protocol_timer.cancel();
1e59de90 2291 if (is_socket_valid) {
aee94f69 2292 frame_assembler->shutdown_socket<true>(&gate);
1e59de90
TL
2293 is_socket_valid = false;
2294 }
aee94f69
TL
2295
2296 trigger_state_phase1(state_t::CLOSING);
2297 gate.dispatch_in_background(
2298 "close_io", conn, [this, is_dispatch_reset, is_replace] {
2299 // this is preemptive
2300 return wait_switch_io_shard(
2301 ).then([this, is_dispatch_reset, is_replace] {
2302 trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
2303 auto cc_seq = crosscore.prepare_submit();
2304 logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})",
2305 conn, cc_seq, is_dispatch_reset, is_replace);
2306
2307 std::ignore = gate.close(
2308 ).then([this] {
2309 ceph_assert_always(!need_exit_io);
2310 ceph_assert_always(!pr_exit_io.has_value());
2311 if (has_socket) {
2312 ceph_assert_always(frame_assembler);
2313 return frame_assembler->close_shutdown_socket();
2314 } else {
2315 return seastar::now();
2316 }
2317 }).then([this] {
2318 logger().debug("{} closed!", conn);
2319 messenger.closed_conn(
2320 seastar::static_pointer_cast<SocketConnection>(
2321 conn.shared_from_this()));
2322 pr_closed_clean.set_value();
1e59de90 2323#ifdef UNIT_TESTS_BUILT
aee94f69
TL
2324 closed_clean = true;
2325 if (conn.interceptor) {
2326 conn.interceptor->register_conn_closed(
2327 conn.get_local_shared_foreign_from_this());
2328 }
1e59de90 2329#endif
aee94f69
TL
2330 // connection is unreferenced from the messenger,
2331 // so need to hold the additional reference.
2332 }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
2333 logger().error("{} closing got unexpected exception {}",
2334 conn, eptr);
2335 ceph_abort();
2336 });
2337
2338 return seastar::smp::submit_to(
2339 io_handler.get_shard_id(),
2340 [this, cc_seq, is_dispatch_reset, is_replace] {
2341 return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace);
2342 });
2343 // user can make changes
2344 });
1e59de90 2345 });
9f95a23c
TL
2346}
2347
2348} // namespace crimson::net