]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/ProtocolV2.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "ProtocolV2.h"
5
6#include <seastar/core/lowres_clock.hh>
7#include <fmt/format.h>
9f95a23c
TL
8#include "include/msgr.h"
9#include "include/random.h"
10
11#include "crimson/auth/AuthClient.h"
12#include "crimson/auth/AuthServer.h"
f67539c2 13#include "crimson/common/formatter.h"
9f95a23c 14
f67539c2 15#include "chained_dispatchers.h"
9f95a23c
TL
16#include "Errors.h"
17#include "Socket.h"
18#include "SocketConnection.h"
19#include "SocketMessenger.h"
20
21#ifdef UNIT_TESTS_BUILT
22#include "Interceptor.h"
23#endif
24
25using namespace ceph::msgr::v2;
f67539c2 26using crimson::common::local_conf;
9f95a23c
TL
27
28namespace {
29
20effc67
TL
30// TODO: CEPH_MSGR2_FEATURE_COMPRESSION
31const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
32 (CEPH_MSGR2_FEATURE_REVISION_1 |
33 // CEPH_MSGR2_FEATURE_COMPRESSION |
34 UINT64_C(0));
35
9f95a23c
TL
36// Log levels in V2 Protocol:
37// * error level, something error that cause connection to terminate:
38// - fatal errors;
39// - bugs;
40// * warn level: something unusual that identifies connection fault or replacement:
41// - unstable network;
42// - incompatible peer;
43// - auth failure;
44// - connection race;
45// - connection reset;
46// * info level, something very important to show connection lifecycle,
47// which doesn't happen very frequently;
48// * debug level, important logs for debugging, including:
49// - all the messages sent/received (-->/<==);
50// - all the frames exchanged (WRITE/GOT);
51// - important fields updated (UPDATE);
52// - connection state transitions (TRIGGER);
53// * trace level, trivial logs showing:
54// - the exact bytes being sent/received (SEND/RECV(bytes));
55// - detailed information of sub-frames;
56// - integrity checks;
57// - etc.
58seastar::logger& logger() {
59 return crimson::get_logger(ceph_subsys_ms);
60}
61
f67539c2 62[[noreturn]] void abort_in_fault() {
9f95a23c
TL
63 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
64}
65
f67539c2 66[[noreturn]] void abort_protocol() {
9f95a23c
TL
67 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
68}
69
f67539c2
TL
70[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
71 proto.close(dispatch_reset);
9f95a23c
TL
72 abort_protocol();
73}
74
75inline void expect_tag(const Tag& expected,
76 const Tag& actual,
77 crimson::net::SocketConnection& conn,
78 const char *where) {
79 if (actual != expected) {
80 logger().warn("{} {} received wrong tag: {}, expected {}",
81 conn, where,
82 static_cast<uint32_t>(actual),
83 static_cast<uint32_t>(expected));
84 abort_in_fault();
85 }
86}
87
88inline void unexpected_tag(const Tag& unexpected,
89 crimson::net::SocketConnection& conn,
90 const char *where) {
91 logger().warn("{} {} received unexpected tag: {}",
92 conn, where, static_cast<uint32_t>(unexpected));
93 abort_in_fault();
94}
95
96inline uint64_t generate_client_cookie() {
97 return ceph::util::generate_random_number<uint64_t>(
98 1, std::numeric_limits<uint64_t>::max());
99}
100
101} // namespace anonymous
102
9f95a23c
TL
103namespace crimson::net {
104
105#ifdef UNIT_TESTS_BUILT
106void intercept(Breakpoint bp, bp_type_t type,
107 SocketConnection& conn, SocketRef& socket) {
108 if (conn.interceptor) {
109 auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
110 socket->set_trap(type, action, &conn.interceptor->blocker);
111 }
112}
113
114#define INTERCEPT_CUSTOM(bp, type) \
115intercept({bp}, type, conn, socket)
116
117#define INTERCEPT_FRAME(tag, type) \
118intercept({static_cast<Tag>(tag), type}, \
119 type, conn, socket)
120
121#define INTERCEPT_N_RW(bp) \
122if (conn.interceptor) { \
123 auto action = conn.interceptor->intercept(conn, {bp}); \
124 ceph_assert(action != bp_action_t::BLOCK); \
125 if (action == bp_action_t::FAULT) { \
126 abort_in_fault(); \
127 } \
128}
129
130#else
131#define INTERCEPT_CUSTOM(bp, type)
132#define INTERCEPT_FRAME(tag, type)
133#define INTERCEPT_N_RW(bp)
134#endif
135
136seastar::future<> ProtocolV2::Timer::backoff(double seconds)
137{
138 logger().warn("{} waiting {} seconds ...", conn, seconds);
139 cancel();
140 last_dur_ = seconds;
141 as = seastar::abort_source();
142 auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
143 std::chrono::duration<double>(seconds));
144 return seastar::sleep_abortable(dur, *as
145 ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
146 logger().debug("{} wait aborted", conn);
147 abort_protocol();
148 });
149}
150
f67539c2 151ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
9f95a23c
TL
152 SocketConnection& conn,
153 SocketMessenger& messenger)
f67539c2 154 : Protocol(proto_t::v2, dispatchers, conn),
9f95a23c 155 messenger{messenger},
f67539c2 156 protocol_timer{conn}
9f95a23c
TL
157{}
158
159ProtocolV2::~ProtocolV2() {}
160
f67539c2
TL
161bool ProtocolV2::is_connected() const {
162 return state == state_t::READY ||
163 state == state_t::ESTABLISHING ||
164 state == state_t::REPLACING;
165}
166
9f95a23c 167void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
f67539c2 168 const entity_name_t& _peer_name)
9f95a23c
TL
169{
170 ceph_assert(state == state_t::NONE);
171 ceph_assert(!socket);
f67539c2 172 ceph_assert(!gate.is_closed());
9f95a23c
TL
173 conn.peer_addr = _peer_addr;
174 conn.target_addr = _peer_addr;
f67539c2
TL
175 conn.set_peer_name(_peer_name);
176 conn.policy = messenger.get_policy(_peer_name.type());
9f95a23c 177 client_cookie = generate_client_cookie();
f67539c2 178 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
9f95a23c 179 " policy(lossy={}, server={}, standby={}, resetcheck={})",
f67539c2 180 conn, _peer_addr, _peer_name, client_cookie,
9f95a23c
TL
181 conn.policy.lossy, conn.policy.server,
182 conn.policy.standby, conn.policy.resetcheck);
183 messenger.register_conn(
184 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
185 execute_connecting();
186}
187
188void ProtocolV2::start_accept(SocketRef&& sock,
189 const entity_addr_t& _peer_addr)
190{
191 ceph_assert(state == state_t::NONE);
192 ceph_assert(!socket);
193 // until we know better
194 conn.target_addr = _peer_addr;
9f95a23c
TL
195 socket = std::move(sock);
196 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
197 messenger.accept_conn(
198 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
199 execute_accepting();
200}
201
202// TODO: Frame related implementations, probably to a separate class.
203
204void ProtocolV2::enable_recording()
205{
206 rxbuf.clear();
207 txbuf.clear();
208 record_io = true;
209}
210
211seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
212{
213 if (unlikely(record_io)) {
214 return socket->read_exactly(bytes)
215 .then([this] (auto bl) {
216 rxbuf.append(buffer::create(bl.share()));
217 return bl;
218 });
219 } else {
220 return socket->read_exactly(bytes);
221 };
222}
223
224seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
225{
226 if (unlikely(record_io)) {
227 return socket->read(bytes)
228 .then([this] (auto buf) {
229 rxbuf.append(buf);
230 return buf;
231 });
232 } else {
233 return socket->read(bytes);
234 }
235}
236
237seastar::future<> ProtocolV2::write(bufferlist&& buf)
238{
239 if (unlikely(record_io)) {
240 txbuf.append(buf);
241 }
242 return socket->write(std::move(buf));
243}
244
245seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
246{
247 if (unlikely(record_io)) {
248 txbuf.append(buf);
249 }
250 return socket->write_flush(std::move(buf));
251}
252
253size_t ProtocolV2::get_current_msg_size() const
254{
f67539c2 255 ceph_assert(rx_frame_asm.get_num_segments() > 0);
9f95a23c
TL
256 size_t sum = 0;
257 // we don't include SegmentIndex::Msg::HEADER.
f67539c2
TL
258 for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
259 sum += rx_frame_asm.get_segment_logical_len(idx);
9f95a23c
TL
260 }
261 return sum;
262}
263
264seastar::future<Tag> ProtocolV2::read_main_preamble()
265{
f67539c2
TL
266 rx_preamble.clear();
267 return read_exactly(rx_frame_asm.get_preamble_onwire_len())
9f95a23c 268 .then([this] (auto bl) {
9f95a23c 269 rx_segments_data.clear();
f67539c2
TL
270 try {
271 rx_preamble.append(buffer::create(std::move(bl)));
272 const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
273 INTERCEPT_FRAME(tag, bp_type_t::READ);
274 return tag;
275 } catch (FrameError& e) {
276 logger().warn("{} read_main_preamble: {}", conn, e.what());
277 abort_in_fault();
9f95a23c 278 }
9f95a23c
TL
279 });
280}
281
282seastar::future<> ProtocolV2::read_frame_payload()
283{
9f95a23c
TL
284 ceph_assert(rx_segments_data.empty());
285
286 return seastar::do_until(
f67539c2 287 [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
9f95a23c 288 [this] {
9f95a23c 289 // TODO: create aligned and contiguous buffer from socket
f67539c2
TL
290 const size_t seg_idx = rx_segments_data.size();
291 if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
292 alignment != segment_t::DEFAULT_ALIGNMENT) {
9f95a23c 293 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
f67539c2 294 conn, alignment, rx_segments_data.size());
9f95a23c 295 }
f67539c2 296 uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
9f95a23c 297 // TODO: create aligned and contiguous buffer from socket
f67539c2 298 return read_exactly(onwire_len).then([this] (auto tmp_bl) {
9f95a23c
TL
299 logger().trace("{} RECV({}) frame segment[{}]",
300 conn, tmp_bl.size(), rx_segments_data.size());
f67539c2
TL
301 bufferlist segment;
302 segment.append(buffer::create(std::move(tmp_bl)));
303 rx_segments_data.emplace_back(std::move(segment));
9f95a23c
TL
304 });
305 }
306 ).then([this] {
f67539c2 307 return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
9f95a23c
TL
308 }).then([this] (auto bl) {
309 logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
f67539c2
TL
310 bool ok = false;
311 try {
f67539c2
TL
312 bufferlist rx_epilogue;
313 rx_epilogue.append(buffer::create(std::move(bl)));
20effc67 314 ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
f67539c2
TL
315 } catch (FrameError& e) {
316 logger().error("read_frame_payload: {} {}", conn, e.what());
317 abort_in_fault();
318 } catch (ceph::crypto::onwire::MsgAuthError&) {
319 logger().error("read_frame_payload: {} bad auth tag", conn);
320 abort_in_fault();
9f95a23c 321 }
9f95a23c
TL
322 // we do have a mechanism that allows transmitter to start sending message
323 // and abort after putting entire data field on wire. This will be used by
324 // the kernel client to avoid unnecessary buffering.
f67539c2 325 if (!ok) {
9f95a23c
TL
326 // TODO
327 ceph_assert(false);
328 }
329 });
330}
331
332template <class F>
333seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
334{
f6b5b4d7 335 auto bl = frame.get_buffer(tx_frame_asm);
9f95a23c
TL
336 const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
337 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
338 conn, bl.length(), (int)main_preamble->tag,
339 (int)main_preamble->num_segments, main_preamble->crc);
340 INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
341 if (flush) {
342 return write_flush(std::move(bl));
343 } else {
344 return write(std::move(bl));
345 }
346}
347
348void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
349{
350 if (!reentrant && _state == state) {
351 logger().error("{} is not allowed to re-trigger state {}",
352 conn, get_state_name(state));
353 ceph_assert(false);
354 }
355 logger().debug("{} TRIGGER {}, was {}",
356 conn, get_state_name(_state), get_state_name(state));
357 state = _state;
358 set_write_state(_write_state);
359}
360
361void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
362{
363 if (conn.policy.lossy) {
364 logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
365 conn, func_name, get_state_name(state), eptr);
f67539c2 366 close(true);
9f95a23c
TL
367 } else if (conn.policy.server ||
368 (conn.policy.standby &&
369 (!is_queued() && conn.sent.empty()))) {
370 logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
371 conn, func_name, get_state_name(state), eptr);
372 execute_standby();
373 } else if (backoff) {
374 logger().info("{} {}: fault at {}, going to WAIT -- {}",
375 conn, func_name, get_state_name(state), eptr);
376 execute_wait(false);
377 } else {
378 logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
379 conn, func_name, get_state_name(state), eptr);
380 execute_connecting();
381 }
382}
383
9f95a23c
TL
384void ProtocolV2::reset_session(bool full)
385{
386 server_cookie = 0;
387 connect_seq = 0;
388 conn.in_seq = 0;
389 if (full) {
390 client_cookie = generate_client_cookie();
391 peer_global_seq = 0;
392 reset_write();
f67539c2
TL
393 dispatchers.ms_handle_remote_reset(
394 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
9f95a23c
TL
395 }
396}
397
f67539c2
TL
398seastar::future<std::tuple<entity_type_t, entity_addr_t>>
399ProtocolV2::banner_exchange(bool is_connect)
9f95a23c
TL
400{
401 // 1. prepare and send banner
402 bufferlist banner_payload;
20effc67 403 encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
9f95a23c
TL
404 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
405
406 bufferlist bl;
407 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
408 auto len_payload = static_cast<uint16_t>(banner_payload.length());
409 encode(len_payload, bl, 0);
410 bl.claim_append(banner_payload);
411 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
412 "required={}, banner=\"{}\"",
413 conn, bl.length(), len_payload,
20effc67
TL
414 CRIMSON_MSGR2_SUPPORTED_FEATURES,
415 CEPH_MSGR2_REQUIRED_FEATURES,
9f95a23c
TL
416 CEPH_BANNER_V2_PREFIX);
417 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
418 return write_flush(std::move(bl)).then([this] {
419 // 2. read peer banner
420 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
421 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
422 return read_exactly(banner_len); // or read exactly?
423 }).then([this] (auto bl) {
424 // 3. process peer banner and read banner_payload
425 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
426 logger().debug("{} RECV({}) banner: \"{}\"",
427 conn, bl.size(),
428 std::string((const char*)bl.get(), banner_prefix_len));
429
430 if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
431 if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
432 logger().warn("{} peer is using V1 protocol", conn);
433 } else {
434 logger().warn("{} peer sent bad banner", conn);
435 }
436 abort_in_fault();
437 }
438 bl.trim_front(banner_prefix_len);
439
440 uint16_t payload_len;
441 bufferlist buf;
442 buf.append(buffer::create(std::move(bl)));
443 auto ti = buf.cbegin();
444 try {
445 decode(payload_len, ti);
446 } catch (const buffer::error &e) {
447 logger().warn("{} decode banner payload len failed", conn);
448 abort_in_fault();
449 }
450 logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
451 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
452 return read(payload_len);
f67539c2 453 }).then([this, is_connect] (bufferlist bl) {
9f95a23c
TL
454 // 4. process peer banner_payload and send HelloFrame
455 auto p = bl.cbegin();
456 uint64_t peer_supported_features;
457 uint64_t peer_required_features;
458 try {
459 decode(peer_supported_features, p);
460 decode(peer_required_features, p);
461 } catch (const buffer::error &e) {
462 logger().warn("{} decode banner payload failed", conn);
463 abort_in_fault();
464 }
465 logger().debug("{} RECV({}) banner features: supported={} required={}",
466 conn, bl.length(),
467 peer_supported_features, peer_required_features);
468
469 // Check feature bit compatibility
20effc67 470 uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
9f95a23c
TL
471 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
472 if ((required_features & peer_supported_features) != required_features) {
473 logger().error("{} peer does not support all required features"
474 " required={} peer_supported={}",
475 conn, required_features, peer_supported_features);
f67539c2 476 abort_in_close(*this, is_connect);
9f95a23c
TL
477 }
478 if ((supported_features & peer_required_features) != peer_required_features) {
479 logger().error("{} we do not support all peer required features"
480 " peer_required={} supported={}",
481 conn, peer_required_features, supported_features);
f67539c2 482 abort_in_close(*this, is_connect);
9f95a23c
TL
483 }
484 this->peer_required_features = peer_required_features;
485 if (this->peer_required_features == 0) {
486 this->connection_features = msgr2_required;
487 }
f67539c2
TL
488 const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
489 tx_frame_asm.set_is_rev1(is_rev1);
490 rx_frame_asm.set_is_rev1(is_rev1);
9f95a23c
TL
491
492 auto hello = HelloFrame::Encode(messenger.get_mytype(),
493 conn.target_addr);
494 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
495 conn, ceph_entity_type_name(messenger.get_mytype()),
496 conn.target_addr);
497 return write_frame(hello);
498 }).then([this] {
499 //5. read peer HelloFrame
500 return read_main_preamble();
501 }).then([this] (Tag tag) {
502 expect_tag(Tag::HELLO, tag, conn, __func__);
503 return read_frame_payload();
504 }).then([this] {
505 // 6. process peer HelloFrame
506 auto hello = HelloFrame::Decode(rx_segments_data.back());
507 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
508 conn, ceph_entity_type_name(hello.entity_type()),
509 hello.peer_addr());
f67539c2
TL
510 return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
511 std::make_tuple(hello.entity_type(), hello.peer_addr()));
9f95a23c
TL
512 });
513}
514
515// CONNECTING state
516
517seastar::future<> ProtocolV2::handle_auth_reply()
518{
519 return read_main_preamble()
520 .then([this] (Tag tag) {
521 switch (tag) {
522 case Tag::AUTH_BAD_METHOD:
523 return read_frame_payload().then([this] {
524 // handle_auth_bad_method() logic
525 auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
526 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
527 "allowed_methods={}, allowed_modes={}",
528 conn, bad_method.method(), cpp_strerror(bad_method.result()),
529 bad_method.allowed_methods(), bad_method.allowed_modes());
530 ceph_assert(messenger.get_auth_client());
531 int r = messenger.get_auth_client()->handle_auth_bad_method(
532 conn.shared_from_this(), auth_meta,
533 bad_method.method(), bad_method.result(),
534 bad_method.allowed_methods(), bad_method.allowed_modes());
535 if (r < 0) {
536 logger().warn("{} auth_client handle_auth_bad_method returned {}",
537 conn, r);
538 abort_in_fault();
539 }
540 return client_auth(bad_method.allowed_methods());
541 });
542 case Tag::AUTH_REPLY_MORE:
543 return read_frame_payload().then([this] {
544 // handle_auth_reply_more() logic
545 auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
546 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
547 conn, auth_more.auth_payload().length());
548 ceph_assert(messenger.get_auth_client());
549 // let execute_connecting() take care of the thrown exception
550 auto reply = messenger.get_auth_client()->handle_auth_reply_more(
551 conn.shared_from_this(), auth_meta, auth_more.auth_payload());
552 auto more_reply = AuthRequestMoreFrame::Encode(reply);
553 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
554 conn, reply.length());
555 return write_frame(more_reply);
556 }).then([this] {
557 return handle_auth_reply();
558 });
559 case Tag::AUTH_DONE:
560 return read_frame_payload().then([this] {
561 // handle_auth_done() logic
562 auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
563 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
564 conn, auth_done.global_id(),
565 ceph_con_mode_name(auth_done.con_mode()),
566 auth_done.auth_payload().length());
567 ceph_assert(messenger.get_auth_client());
568 int r = messenger.get_auth_client()->handle_auth_done(
569 conn.shared_from_this(), auth_meta,
570 auth_done.global_id(),
571 auth_done.con_mode(),
572 auth_done.auth_payload());
573 if (r < 0) {
574 logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
575 abort_in_fault();
576 }
577 auth_meta->con_mode = auth_done.con_mode();
f67539c2
TL
578 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
579 nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false);
9f95a23c
TL
580 return finish_auth();
581 });
582 default: {
583 unexpected_tag(tag, conn, __func__);
584 return seastar::now();
585 }
586 }
587 });
588}
589
590seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
591{
592 // send_auth_request() logic
593 ceph_assert(messenger.get_auth_client());
594
595 try {
596 auto [auth_method, preferred_modes, bl] =
597 messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
598 auth_meta->auth_method = auth_method;
599 auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
600 logger().debug("{} WRITE AuthRequestFrame: method={},"
601 " preferred_modes={}, payload_len={}",
602 conn, auth_method, preferred_modes, bl.length());
603 return write_frame(frame).then([this] {
604 return handle_auth_reply();
605 });
606 } catch (const crimson::auth::error& e) {
607 logger().error("{} get_initial_auth_request returned {}", conn, e);
f67539c2 608 abort_in_close(*this, true);
9f95a23c
TL
609 return seastar::now();
610 }
611}
612
613seastar::future<ProtocolV2::next_step_t>
614ProtocolV2::process_wait()
615{
616 return read_frame_payload().then([this] {
617 // handle_wait() logic
618 logger().debug("{} GOT WaitFrame", conn);
619 WaitFrame::Decode(rx_segments_data.back());
620 return next_step_t::wait;
621 });
622}
623
624seastar::future<ProtocolV2::next_step_t>
625ProtocolV2::client_connect()
626{
627 // send_client_ident() logic
628 uint64_t flags = 0;
629 if (conn.policy.lossy) {
630 flags |= CEPH_MSG_CONNECT_LOSSY;
631 }
632
633 auto client_ident = ClientIdentFrame::Encode(
634 messenger.get_myaddrs(),
635 conn.target_addr,
636 messenger.get_myname().num(),
637 global_seq,
638 conn.policy.features_supported,
639 conn.policy.features_required | msgr2_required, flags,
640 client_cookie);
641
642 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
643 " gs={}, features_supported={}, features_required={},"
644 " flags={}, cookie={}",
645 conn, messenger.get_myaddrs(), conn.target_addr,
646 messenger.get_myname().num(), global_seq,
647 conn.policy.features_supported,
648 conn.policy.features_required | msgr2_required,
649 flags, client_cookie);
650 return write_frame(client_ident).then([this] {
651 return read_main_preamble();
652 }).then([this] (Tag tag) {
653 switch (tag) {
654 case Tag::IDENT_MISSING_FEATURES:
655 return read_frame_payload().then([this] {
656 // handle_ident_missing_features() logic
657 auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
658 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
659 " (client does not support all server features)",
660 conn, ident_missing.features());
661 abort_in_fault();
662 return next_step_t::none;
663 });
664 case Tag::WAIT:
665 return process_wait();
666 case Tag::SERVER_IDENT:
667 return read_frame_payload().then([this] {
668 // handle_server_ident() logic
669 requeue_sent();
670 auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
671 logger().debug("{} GOT ServerIdentFrame:"
672 " addrs={}, gid={}, gs={},"
673 " features_supported={}, features_required={},"
674 " flags={}, cookie={}",
675 conn,
676 server_ident.addrs(), server_ident.gid(),
677 server_ident.global_seq(),
678 server_ident.supported_features(),
679 server_ident.required_features(),
680 server_ident.flags(), server_ident.cookie());
681
682 // is this who we intended to talk to?
683 // be a bit forgiving here, since we may be connecting based on addresses parsed out
684 // of mon_host or something.
685 if (!server_ident.addrs().contains(conn.target_addr)) {
686 logger().warn("{} peer identifies as {}, does not include {}",
687 conn, server_ident.addrs(), conn.target_addr);
688 throw std::system_error(
689 make_error_code(crimson::net::error::bad_peer_address));
690 }
691
692 server_cookie = server_ident.cookie();
693
694 // TODO: change peer_addr to entity_addrvec_t
695 if (server_ident.addrs().front() != conn.peer_addr) {
696 logger().warn("{} peer advertises as {}, does not match {}",
697 conn, server_ident.addrs(), conn.peer_addr);
698 throw std::system_error(
699 make_error_code(crimson::net::error::bad_peer_address));
700 }
f67539c2
TL
701 if (conn.get_peer_id() != entity_name_t::NEW &&
702 conn.get_peer_id() != server_ident.gid()) {
703 logger().error("{} connection peer id ({}) does not match "
704 "what it should be ({}) during connecting, close",
705 conn, server_ident.gid(), conn.get_peer_id());
706 abort_in_close(*this, true);
707 }
9f95a23c
TL
708 conn.set_peer_id(server_ident.gid());
709 conn.set_features(server_ident.supported_features() &
710 conn.policy.features_supported);
711 peer_global_seq = server_ident.global_seq();
712
713 bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
714 if (lossy != conn.policy.lossy) {
715 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
716 conn.policy.lossy = lossy;
717 }
718 if (lossy && (connect_seq != 0 || server_cookie != 0)) {
719 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
720 conn, connect_seq, server_cookie);
721 connect_seq = 0;
722 server_cookie = 0;
723 }
724
725 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
726 });
727 default: {
728 unexpected_tag(tag, conn, "post_client_connect");
729 return seastar::make_ready_future<next_step_t>(next_step_t::none);
730 }
731 }
732 });
733}
734
735seastar::future<ProtocolV2::next_step_t>
736ProtocolV2::client_reconnect()
737{
738 // send_reconnect() logic
739 auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
740 client_cookie,
741 server_cookie,
742 global_seq,
743 connect_seq,
744 conn.in_seq);
745 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
746 " server_cookie={}, gs={}, cs={}, msg_seq={}",
747 conn, messenger.get_myaddrs(),
748 client_cookie, server_cookie,
749 global_seq, connect_seq, conn.in_seq);
750 return write_frame(reconnect).then([this] {
751 return read_main_preamble();
752 }).then([this] (Tag tag) {
753 switch (tag) {
754 case Tag::SESSION_RETRY_GLOBAL:
755 return read_frame_payload().then([this] {
756 // handle_session_retry_global() logic
757 auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
758 logger().warn("{} GOT RetryGlobalFrame: gs={}",
759 conn, retry.global_seq());
760 return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
761 global_seq = gs;
762 logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
763 return client_reconnect();
764 });
765 });
766 case Tag::SESSION_RETRY:
767 return read_frame_payload().then([this] {
768 // handle_session_retry() logic
769 auto retry = RetryFrame::Decode(rx_segments_data.back());
770 logger().warn("{} GOT RetryFrame: cs={}",
771 conn, retry.connect_seq());
772 connect_seq = retry.connect_seq() + 1;
773 logger().warn("{} UPDATE: cs={}", conn, connect_seq);
774 return client_reconnect();
775 });
776 case Tag::SESSION_RESET:
777 return read_frame_payload().then([this] {
778 // handle_session_reset() logic
779 auto reset = ResetFrame::Decode(rx_segments_data.back());
780 logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
781 reset_session(reset.full());
782 return client_connect();
783 });
784 case Tag::WAIT:
785 return process_wait();
786 case Tag::SESSION_RECONNECT_OK:
787 return read_frame_payload().then([this] {
788 // handle_reconnect_ok() logic
789 auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
790 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
791 conn, reconnect_ok.msg_seq());
792 requeue_up_to(reconnect_ok.msg_seq());
793 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
794 });
795 default: {
796 unexpected_tag(tag, conn, "post_client_reconnect");
797 return seastar::make_ready_future<next_step_t>(next_step_t::none);
798 }
799 }
800 });
801}
802
803void ProtocolV2::execute_connecting()
804{
805 trigger_state(state_t::CONNECTING, write_state_t::delay, true);
806 if (socket) {
807 socket->shutdown();
808 }
f67539c2 809 gated_execute("execute_connecting", [this] {
9f95a23c
TL
810 return messenger.get_global_seq().then([this] (auto gs) {
811 global_seq = gs;
812 assert(client_cookie != 0);
813 if (!conn.policy.lossy && server_cookie != 0) {
814 ++connect_seq;
815 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
816 conn, global_seq, connect_seq);
817 } else { // conn.policy.lossy || server_cookie == 0
818 assert(connect_seq == 0);
819 assert(server_cookie == 0);
820 logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
821 }
822
823 return wait_write_exit();
824 }).then([this] {
825 if (unlikely(state != state_t::CONNECTING)) {
826 logger().debug("{} triggered {} before Socket::connect()",
827 conn, get_state_name(state));
828 abort_protocol();
829 }
830 if (socket) {
f67539c2
TL
831 gate.dispatch_in_background("close_sockect_connecting", *this,
832 [sock = std::move(socket)] () mutable {
9f95a23c
TL
833 return sock->close().then([sock = std::move(sock)] {});
834 });
835 }
836 INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
837 return Socket::connect(conn.peer_addr);
838 }).then([this](SocketRef sock) {
839 logger().debug("{} socket connected", conn);
840 if (unlikely(state != state_t::CONNECTING)) {
841 logger().debug("{} triggered {} during Socket::connect()",
842 conn, get_state_name(state));
843 return sock->close().then([sock = std::move(sock)] {
844 abort_protocol();
845 });
846 }
847 socket = std::move(sock);
848 return seastar::now();
849 }).then([this] {
850 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
851 session_stream_handlers = { nullptr, nullptr };
852 enable_recording();
f67539c2
TL
853 return banner_exchange(true);
854 }).then([this] (auto&& ret) {
855 auto [_peer_type, _my_addr_from_peer] = std::move(ret);
9f95a23c
TL
856 if (conn.get_peer_type() != _peer_type) {
857 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
858 conn, ceph_entity_type_name(conn.get_peer_type()),
859 ceph_entity_type_name(_peer_type));
f67539c2 860 abort_in_close(*this, true);
9f95a23c 861 }
f67539c2
TL
862 if (unlikely(state != state_t::CONNECTING)) {
863 logger().debug("{} triggered {} during banner_exchange(), abort",
864 conn, get_state_name(state));
865 abort_protocol();
866 }
867 socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
9f95a23c
TL
868 if (unlikely(_my_addr_from_peer.is_legacy())) {
869 logger().warn("{} peer sent a legacy address for me: {}",
870 conn, _my_addr_from_peer);
871 throw std::system_error(
872 make_error_code(crimson::net::error::bad_peer_address));
873 }
874 _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
875 return messenger.learned_addr(_my_addr_from_peer, conn);
876 }).then([this] {
877 return client_auth();
878 }).then([this] {
879 if (server_cookie == 0) {
880 ceph_assert(connect_seq == 0);
881 return client_connect();
882 } else {
883 ceph_assert(connect_seq > 0);
884 return client_reconnect();
885 }
886 }).then([this] (next_step_t next) {
887 if (unlikely(state != state_t::CONNECTING)) {
888 logger().debug("{} triggered {} at the end of execute_connecting()",
889 conn, get_state_name(state));
890 abort_protocol();
891 }
892 switch (next) {
893 case next_step_t::ready: {
9f95a23c
TL
894 logger().info("{} connected:"
895 " gs={}, pgs={}, cs={}, client_cookie={},"
896 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
897 conn, global_seq, peer_global_seq, connect_seq,
898 client_cookie, server_cookie, conn.in_seq,
899 conn.out_seq, conn.out_q.size());
f67539c2 900 execute_ready(true);
9f95a23c
TL
901 break;
902 }
903 case next_step_t::wait: {
904 logger().info("{} execute_connecting(): going to WAIT", conn);
905 execute_wait(true);
906 break;
907 }
908 default: {
909 ceph_abort("impossible next step");
910 }
911 }
912 }).handle_exception([this] (std::exception_ptr eptr) {
913 if (state != state_t::CONNECTING) {
914 logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
915 conn, get_state_name(state), eptr);
916 assert(state == state_t::CLOSING ||
917 state == state_t::REPLACING);
918 return;
919 }
920
921 if (conn.policy.server ||
922 (conn.policy.standby &&
923 (!is_queued() && conn.sent.empty()))) {
924 logger().info("{} execute_connecting(): fault at {} with nothing to send,"
925 " going to STANDBY -- {}",
926 conn, get_state_name(state), eptr);
927 execute_standby();
928 } else {
929 logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
930 conn, get_state_name(state), eptr);
931 execute_wait(false);
932 }
933 });
934 });
935}
936
937// ACCEPTING state
938
939seastar::future<> ProtocolV2::_auth_bad_method(int r)
940{
941 // _auth_bad_method() logic
942 ceph_assert(r < 0);
943 auto [allowed_methods, allowed_modes] =
944 messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
945 auto bad_method = AuthBadMethodFrame::Encode(
946 auth_meta->auth_method, r, allowed_methods, allowed_modes);
947 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
948 "allowed_methods={}, allowed_modes={})",
949 conn, auth_meta->auth_method, cpp_strerror(r),
950 allowed_methods, allowed_modes);
951 return write_frame(bad_method).then([this] {
952 return server_auth();
953 });
954}
955
956seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
957{
958 // _handle_auth_request() logic
959 ceph_assert(messenger.get_auth_server());
960 bufferlist reply;
961 int r = messenger.get_auth_server()->handle_auth_request(
962 conn.shared_from_this(), auth_meta,
963 more, auth_meta->auth_method, auth_payload,
964 &reply);
965 switch (r) {
966 // successful
967 case 1: {
968 auto auth_done = AuthDoneFrame::Encode(
969 conn.peer_global_id, auth_meta->con_mode, reply);
970 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
971 conn, conn.peer_global_id,
972 ceph_con_mode_name(auth_meta->con_mode), reply.length());
973 return write_frame(auth_done).then([this] {
974 ceph_assert(auth_meta);
f67539c2
TL
975 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
976 nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true);
9f95a23c
TL
977 return finish_auth();
978 });
979 }
980 // auth more
981 case 0: {
982 auto more = AuthReplyMoreFrame::Encode(reply);
983 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
984 conn, reply.length());
985 return write_frame(more).then([this] {
986 return read_main_preamble();
987 }).then([this] (Tag tag) {
988 expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
989 return read_frame_payload();
990 }).then([this] {
991 auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
992 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
993 conn, auth_more.auth_payload().length());
994 return _handle_auth_request(auth_more.auth_payload(), true);
995 });
996 }
997 case -EBUSY: {
998 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
999 abort_in_fault();
1000 return seastar::now();
1001 }
1002 default: {
1003 logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
1004 return _auth_bad_method(r);
1005 }
1006 }
1007}
1008
1009seastar::future<> ProtocolV2::server_auth()
1010{
1011 return read_main_preamble()
1012 .then([this] (Tag tag) {
1013 expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
1014 return read_frame_payload();
1015 }).then([this] {
1016 // handle_auth_request() logic
1017 auto request = AuthRequestFrame::Decode(rx_segments_data.back());
1018 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1019 " payload_len={}",
1020 conn, request.method(), request.preferred_modes(),
1021 request.auth_payload().length());
1022 auth_meta->auth_method = request.method();
1023 auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
1024 conn.get_peer_type(), auth_meta->auth_method,
1025 request.preferred_modes());
1026 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
1027 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
1028 return _auth_bad_method(-EOPNOTSUPP);
1029 }
1030 return _handle_auth_request(request.auth_payload(), false);
1031 });
1032}
1033
f67539c2
TL
1034bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
1035{
1036 auto my_peer_name = conn.get_peer_name();
1037 if (my_peer_name.type() != peer_name.type()) {
1038 return false;
1039 }
1040 if (my_peer_name.num() != entity_name_t::NEW &&
1041 peer_name.num() != entity_name_t::NEW &&
1042 my_peer_name.num() != peer_name.num()) {
1043 return false;
1044 }
1045 return true;
1046}
1047
9f95a23c
TL
1048seastar::future<ProtocolV2::next_step_t>
1049ProtocolV2::send_wait()
1050{
1051 auto wait = WaitFrame::Encode();
1052 logger().debug("{} WRITE WaitFrame", conn);
1053 return write_frame(wait).then([] {
1054 return next_step_t::wait;
1055 });
1056}
1057
1058seastar::future<ProtocolV2::next_step_t>
1059ProtocolV2::reuse_connection(
1060 ProtocolV2* existing_proto, bool do_reset,
1061 bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
1062{
1063 existing_proto->trigger_replacing(reconnect,
1064 do_reset,
1065 std::move(socket),
1066 std::move(auth_meta),
1067 std::move(session_stream_handlers),
1068 peer_global_seq,
1069 client_cookie,
1070 conn.get_peer_name(),
1071 connection_features,
f67539c2
TL
1072 tx_frame_asm.get_is_rev1(),
1073 rx_frame_asm.get_is_rev1(),
9f95a23c
TL
1074 conn_seq,
1075 msg_seq);
1076#ifdef UNIT_TESTS_BUILT
1077 if (conn.interceptor) {
1078 conn.interceptor->register_conn_replaced(conn);
1079 }
1080#endif
1081 // close this connection because all the necessary information is delivered
1082 // to the exisiting connection, and jump to error handling code to abort the
1083 // current state.
f67539c2 1084 abort_in_close(*this, false);
9f95a23c
TL
1085 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1086}
1087
1088seastar::future<ProtocolV2::next_step_t>
1089ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
1090{
1091 // handle_existing_connection() logic
1092 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1093 existing_conn->protocol.get());
1094 ceph_assert(existing_proto);
1095 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1096 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1097 conn, global_seq, peer_global_seq, connect_seq,
1098 client_cookie, server_cookie,
1099 existing_conn, get_state_name(existing_proto->state),
1100 existing_proto->global_seq,
1101 existing_proto->peer_global_seq,
1102 existing_proto->connect_seq,
1103 existing_proto->client_cookie,
1104 existing_proto->server_cookie);
1105
f67539c2
TL
1106 if (!validate_peer_name(existing_conn->get_peer_name())) {
1107 logger().error("{} server_connect: my peer_name doesn't match"
1108 " the existing connection {}, abort", conn, existing_conn);
1109 abort_in_fault();
1110 }
1111
9f95a23c
TL
1112 if (existing_proto->state == state_t::REPLACING) {
1113 logger().warn("{} server_connect: racing replace happened while"
1114 " replacing existing connection {}, send wait.",
1115 conn, *existing_conn);
1116 return send_wait();
1117 }
1118
1119 if (existing_proto->peer_global_seq > peer_global_seq) {
1120 logger().warn("{} server_connect:"
1121 " this is a stale connection, because peer_global_seq({})"
1122 " < existing->peer_global_seq({}), close this connection"
1123 " in favor of existing connection {}",
1124 conn, peer_global_seq,
1125 existing_proto->peer_global_seq, *existing_conn);
1126 abort_in_fault();
1127 }
1128
1129 if (existing_conn->policy.lossy) {
1130 // existing connection can be thrown out in favor of this one
1131 logger().warn("{} server_connect:"
1132 " existing connection {} is a lossy channel. Close existing in favor of"
1133 " this connection", conn, *existing_conn);
f67539c2 1134 execute_establishing(existing_conn, true);
9f95a23c
TL
1135 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1136 }
1137
1138 if (existing_proto->server_cookie != 0) {
1139 if (existing_proto->client_cookie != client_cookie) {
1140 // Found previous session
1141 // peer has reset and we're going to reuse the existing connection
1142 // by replacing the socket
1143 logger().warn("{} server_connect:"
1144 " found new session (cs={})"
1145 " when existing {} is with stale session (cs={}, ss={}),"
1146 " peer must have reset",
1147 conn, client_cookie,
1148 *existing_conn, existing_proto->client_cookie,
1149 existing_proto->server_cookie);
1150 return reuse_connection(existing_proto, conn.policy.resetcheck);
1151 } else {
1152 // session establishment interrupted between client_ident and server_ident,
1153 // continuing...
1154 logger().warn("{} server_connect: found client session with existing {}"
1155 " matched (cs={}, ss={}), continuing session establishment",
1156 conn, *existing_conn, client_cookie, existing_proto->server_cookie);
1157 return reuse_connection(existing_proto);
1158 }
1159 } else {
1160 // Looks like a connection race: server and client are both connecting to
1161 // each other at the same time.
1162 if (existing_proto->client_cookie != client_cookie) {
1163 if (existing_conn->peer_wins()) {
1164 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1165 " and win, reusing existing {}",
1166 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1167 return reuse_connection(existing_proto);
1168 } else {
1169 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1170 " and lose to existing {}, ask client to wait",
1171 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1172 return existing_conn->keepalive().then([this] {
1173 return send_wait();
1174 });
1175 }
1176 } else {
1177 logger().warn("{} server_connect: found client session with existing {}"
1178 " matched (cs={}, ss={}), continuing session establishment",
1179 conn, *existing_conn, client_cookie, existing_proto->server_cookie);
1180 return reuse_connection(existing_proto);
1181 }
1182 }
1183}
1184
1185seastar::future<ProtocolV2::next_step_t>
1186ProtocolV2::server_connect()
1187{
1188 return read_frame_payload().then([this] {
1189 // handle_client_ident() logic
1190 auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
1191 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1192 " gid={}, gs={}, features_supported={},"
1193 " features_required={}, flags={}, cookie={}",
1194 conn, client_ident.addrs(), client_ident.target_addr(),
1195 client_ident.gid(), client_ident.global_seq(),
1196 client_ident.supported_features(),
1197 client_ident.required_features(),
1198 client_ident.flags(), client_ident.cookie());
1199
1200 if (client_ident.addrs().empty() ||
1201 client_ident.addrs().front() == entity_addr_t()) {
1202 logger().warn("{} oops, client_ident.addrs() is empty", conn);
1203 throw std::system_error(
1204 make_error_code(crimson::net::error::bad_peer_address));
1205 }
1206 if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
1207 logger().warn("{} peer is trying to reach {} which is not us ({})",
1208 conn, client_ident.target_addr(), messenger.get_myaddrs());
1209 throw std::system_error(
1210 make_error_code(crimson::net::error::bad_peer_address));
1211 }
20effc67 1212 conn.peer_addr = client_ident.addrs().front();
9f95a23c
TL
1213 logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
1214 conn.target_addr = conn.peer_addr;
1215 if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
1216 logger().warn("{} we don't know how to reconnect to peer {}",
1217 conn, conn.target_addr);
1218 throw std::system_error(
1219 make_error_code(crimson::net::error::bad_peer_address));
1220 }
1221
f67539c2
TL
1222 if (conn.get_peer_id() != entity_name_t::NEW &&
1223 conn.get_peer_id() != client_ident.gid()) {
1224 logger().error("{} client_ident peer_id ({}) does not match"
1225 " what it should be ({}) during accepting, abort",
1226 conn, client_ident.gid(), conn.get_peer_id());
1227 abort_in_fault();
1228 }
9f95a23c
TL
1229 conn.set_peer_id(client_ident.gid());
1230 client_cookie = client_ident.cookie();
1231
1232 uint64_t feat_missing =
1233 (conn.policy.features_required | msgr2_required) &
1234 ~(uint64_t)client_ident.supported_features();
1235 if (feat_missing) {
1236 auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
1237 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1238 conn, feat_missing);
1239 return write_frame(ident_missing_features).then([] {
1240 return next_step_t::wait;
1241 });
1242 }
1243 connection_features =
1244 client_ident.supported_features() & conn.policy.features_supported;
1245 logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
1246
1247 peer_global_seq = client_ident.global_seq();
1248
1249 // Looks good so far, let's check if there is already an existing connection
1250 // to this peer.
1251
1252 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1253
1254 if (existing_conn) {
1255 if (existing_conn->protocol->proto_type != proto_t::v2) {
1256 logger().warn("{} existing connection {} proto version is {}, close existing",
1257 conn, *existing_conn,
1258 static_cast<int>(existing_conn->protocol->proto_type));
1259 // should unregister the existing from msgr atomically
f67539c2
TL
1260 // NOTE: this is following async messenger logic, but we may miss the reset event.
1261 execute_establishing(existing_conn, false);
1262 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
9f95a23c
TL
1263 } else {
1264 return handle_existing_connection(existing_conn);
1265 }
f67539c2
TL
1266 } else {
1267 execute_establishing(nullptr, true);
1268 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
9f95a23c 1269 }
9f95a23c
TL
1270 });
1271}
1272
1273seastar::future<ProtocolV2::next_step_t>
1274ProtocolV2::read_reconnect()
1275{
1276 return read_main_preamble()
1277 .then([this] (Tag tag) {
1278 expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
1279 return server_reconnect();
1280 });
1281}
1282
1283seastar::future<ProtocolV2::next_step_t>
1284ProtocolV2::send_retry(uint64_t connect_seq)
1285{
1286 auto retry = RetryFrame::Encode(connect_seq);
1287 logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
1288 return write_frame(retry).then([this] {
1289 return read_reconnect();
1290 });
1291}
1292
1293seastar::future<ProtocolV2::next_step_t>
1294ProtocolV2::send_retry_global(uint64_t global_seq)
1295{
1296 auto retry = RetryGlobalFrame::Encode(global_seq);
1297 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
1298 return write_frame(retry).then([this] {
1299 return read_reconnect();
1300 });
1301}
1302
1303seastar::future<ProtocolV2::next_step_t>
1304ProtocolV2::send_reset(bool full)
1305{
1306 auto reset = ResetFrame::Encode(full);
1307 logger().warn("{} WRITE ResetFrame: full={}", conn, full);
1308 return write_frame(reset).then([this] {
1309 return read_main_preamble();
1310 }).then([this] (Tag tag) {
1311 expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
1312 return server_connect();
1313 });
1314}
1315
1316seastar::future<ProtocolV2::next_step_t>
1317ProtocolV2::server_reconnect()
1318{
1319 return read_frame_payload().then([this] {
1320 // handle_reconnect() logic
1321 auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
1322
1323 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1324 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1325 conn, reconnect.addrs(),
1326 reconnect.client_cookie(), reconnect.server_cookie(),
1327 reconnect.global_seq(), reconnect.connect_seq(),
1328 reconnect.msg_seq());
1329
1330 // can peer_addrs be changed on-the-fly?
1331 // TODO: change peer_addr to entity_addrvec_t
1332 entity_addr_t paddr = reconnect.addrs().front();
1333 if (paddr.is_msgr2() || paddr.is_any()) {
1334 // good
1335 } else {
1336 logger().warn("{} peer's address {} is not v2", conn, paddr);
1337 throw std::system_error(
1338 make_error_code(crimson::net::error::bad_peer_address));
1339 }
1340 if (conn.peer_addr == entity_addr_t()) {
1341 conn.peer_addr = paddr;
1342 } else if (conn.peer_addr != paddr) {
1343 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1344 " reconnect failed",
1345 conn, paddr, conn.peer_addr);
1346 throw std::system_error(
1347 make_error_code(crimson::net::error::bad_peer_address));
1348 }
1349 peer_global_seq = reconnect.global_seq();
1350
1351 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1352
1353 if (!existing_conn) {
1354 // there is no existing connection therefore cannot reconnect to previous
1355 // session
1356 logger().warn("{} server_reconnect: no existing connection from address {},"
1357 " reseting client", conn, conn.peer_addr);
1358 return send_reset(true);
1359 }
1360
1361 if (existing_conn->protocol->proto_type != proto_t::v2) {
1362 logger().warn("{} server_reconnect: existing connection {} proto version is {},"
1363 "close existing and reset client.",
1364 conn, *existing_conn,
1365 static_cast<int>(existing_conn->protocol->proto_type));
f67539c2
TL
1366 // NOTE: this is following async messenger logic, but we may miss the reset event.
1367 existing_conn->mark_down();
9f95a23c
TL
1368 return send_reset(true);
1369 }
1370
1371 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1372 existing_conn->protocol.get());
1373 ceph_assert(existing_proto);
1374 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1375 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1376 conn, global_seq, peer_global_seq, reconnect.connect_seq(),
1377 reconnect.client_cookie(), reconnect.server_cookie(),
1378 existing_conn,
1379 get_state_name(existing_proto->state),
1380 existing_proto->global_seq,
1381 existing_proto->peer_global_seq,
1382 existing_proto->connect_seq,
1383 existing_proto->client_cookie,
1384 existing_proto->server_cookie);
1385
f67539c2
TL
1386 if (!validate_peer_name(existing_conn->get_peer_name())) {
1387 logger().error("{} server_reconnect: my peer_name doesn't match"
1388 " the existing connection {}, abort", conn, existing_conn);
1389 abort_in_fault();
1390 }
1391
9f95a23c
TL
1392 if (existing_proto->state == state_t::REPLACING) {
1393 logger().warn("{} server_reconnect: racing replace happened while "
1394 " replacing existing connection {}, retry global.",
1395 conn, *existing_conn);
1396 return send_retry_global(existing_proto->peer_global_seq);
1397 }
1398
1399 if (existing_proto->client_cookie != reconnect.client_cookie()) {
1400 logger().warn("{} server_reconnect:"
1401 " client_cookie mismatch with existing connection {},"
1402 " cc={} rcc={}. I must have reset, reseting client.",
1403 conn, *existing_conn,
1404 existing_proto->client_cookie, reconnect.client_cookie());
1405 return send_reset(conn.policy.resetcheck);
1406 } else if (existing_proto->server_cookie == 0) {
1407 // this happens when:
1408 // - a connects to b
1409 // - a sends client_ident
1410 // - b gets client_ident, sends server_ident and sets cookie X
1411 // - connection fault
1412 // - b reconnects to a with cookie X, connect_seq=1
1413 // - a has cookie==0
1414 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1415 " server_ident with existing connection {}."
1416 " Asking peer to resume session establishment",
1417 conn, existing_proto->client_cookie, *existing_conn);
1418 return send_reset(false);
1419 }
1420
1421 if (existing_proto->peer_global_seq > reconnect.global_seq()) {
1422 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1423 " with existing connection {},"
1424 " ask client to retry global",
1425 conn, existing_proto->peer_global_seq,
1426 reconnect.global_seq(), *existing_conn);
1427 return send_retry_global(existing_proto->peer_global_seq);
1428 }
1429
1430 if (existing_proto->connect_seq > reconnect.connect_seq()) {
1431 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1432 " with existing connection {}, ask client to retry",
1433 conn, reconnect.connect_seq(),
1434 existing_proto->connect_seq, *existing_conn);
1435 return send_retry(existing_proto->connect_seq);
1436 } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
1437 // reconnect race: both peers are sending reconnect messages
1438 if (existing_conn->peer_wins()) {
1439 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1440 " and win, reusing existing {}",
1441 conn, reconnect.connect_seq(), *existing_conn);
1442 return reuse_connection(
1443 existing_proto, false,
1444 true, reconnect.connect_seq(), reconnect.msg_seq());
1445 } else {
1446 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1447 " and lose to existing {}, ask client to wait",
1448 conn, reconnect.connect_seq(), *existing_conn);
1449 return send_wait();
1450 }
1451 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1452 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1453 " reusing existing {}",
1454 conn, existing_proto->connect_seq,
1455 reconnect.connect_seq(), *existing_conn);
1456 return reuse_connection(
1457 existing_proto, false,
1458 true, reconnect.connect_seq(), reconnect.msg_seq());
1459 }
1460 });
1461}
1462
1463void ProtocolV2::execute_accepting()
1464{
1465 trigger_state(state_t::ACCEPTING, write_state_t::none, false);
f67539c2
TL
1466 gate.dispatch_in_background("execute_accepting", *this, [this] {
1467 return seastar::futurize_invoke([this] {
9f95a23c
TL
1468 INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
1469 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
1470 session_stream_handlers = { nullptr, nullptr };
20effc67 1471 session_comp_handlers = { nullptr, nullptr };
9f95a23c 1472 enable_recording();
f67539c2
TL
1473 return banner_exchange(false);
1474 }).then([this] (auto&& ret) {
1475 auto [_peer_type, _my_addr_from_peer] = std::move(ret);
9f95a23c
TL
1476 ceph_assert(conn.get_peer_type() == 0);
1477 conn.set_peer_type(_peer_type);
1478
1479 conn.policy = messenger.get_policy(_peer_type);
1480 logger().info("{} UPDATE: peer_type={},"
1481 " policy(lossy={} server={} standby={} resetcheck={})",
1482 conn, ceph_entity_type_name(_peer_type),
1483 conn.policy.lossy, conn.policy.server,
1484 conn.policy.standby, conn.policy.resetcheck);
20effc67
TL
1485 if (!messenger.get_myaddr().is_blank_ip() &&
1486 (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
1487 messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
9f95a23c
TL
1488 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1489 conn, _my_addr_from_peer, messenger.get_myaddr());
1490 throw std::system_error(
1491 make_error_code(crimson::net::error::bad_peer_address));
1492 }
1493 return messenger.learned_addr(_my_addr_from_peer, conn);
1494 }).then([this] {
1495 return server_auth();
1496 }).then([this] {
1497 return read_main_preamble();
1498 }).then([this] (Tag tag) {
1499 switch (tag) {
1500 case Tag::CLIENT_IDENT:
1501 return server_connect();
1502 case Tag::SESSION_RECONNECT:
1503 return server_reconnect();
1504 default: {
1505 unexpected_tag(tag, conn, "post_server_auth");
1506 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1507 }
1508 }
1509 }).then([this] (next_step_t next) {
1510 switch (next) {
1511 case next_step_t::ready:
1512 assert(state != state_t::ACCEPTING);
1513 break;
1514 case next_step_t::wait:
1515 if (unlikely(state != state_t::ACCEPTING)) {
1516 logger().debug("{} triggered {} at the end of execute_accepting()",
1517 conn, get_state_name(state));
1518 abort_protocol();
1519 }
1520 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
1521 execute_server_wait();
1522 break;
1523 default:
1524 ceph_abort("impossible next step");
1525 }
1526 }).handle_exception([this] (std::exception_ptr eptr) {
1527 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1528 conn, get_state_name(state), eptr);
f67539c2 1529 close(false);
9f95a23c
TL
1530 });
1531 });
1532}
1533
1534// CONNECTING or ACCEPTING state
1535
1536seastar::future<> ProtocolV2::finish_auth()
1537{
1538 ceph_assert(auth_meta);
1539
1540 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1541 auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
1542 auto sig_frame = AuthSignatureFrame::Encode(sig);
1543 ceph_assert(record_io);
1544 record_io = false;
1545 rxbuf.clear();
1546 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
1547 return write_frame(sig_frame).then([this] {
1548 return read_main_preamble();
1549 }).then([this] (Tag tag) {
1550 expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
1551 return read_frame_payload();
1552 }).then([this] {
1553 // handle_auth_signature() logic
1554 auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
1555 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
1556
1557 const auto actual_tx_sig = auth_meta->session_key.empty() ?
1558 sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
1559 if (sig_frame.signature() != actual_tx_sig) {
1560 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1561 " sig_frame.signature()={}",
1562 conn, actual_tx_sig, sig_frame.signature());
1563 abort_in_fault();
1564 }
1565 txbuf.clear();
1566 });
1567}
1568
1569// ESTABLISHING
1570
f67539c2
TL
1571void ProtocolV2::execute_establishing(
1572 SocketConnectionRef existing_conn, bool dispatch_reset) {
1573 if (unlikely(state != state_t::ACCEPTING)) {
1574 logger().debug("{} triggered {} before execute_establishing()",
1575 conn, get_state_name(state));
1576 abort_protocol();
1577 }
1578
1579 auto accept_me = [this] {
1580 messenger.register_conn(
1581 seastar::static_pointer_cast<SocketConnection>(
1582 conn.shared_from_this()));
1583 messenger.unaccept_conn(
1584 seastar::static_pointer_cast<SocketConnection>(
1585 conn.shared_from_this()));
1586 };
1587
9f95a23c 1588 trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
f67539c2
TL
1589 if (existing_conn) {
1590 existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
1591 if (unlikely(state != state_t::ESTABLISHING)) {
1592 logger().warn("{} triggered {} during execute_establishing(), "
1593 "the accept event will not be delivered!",
1594 conn, get_state_name(state));
1595 abort_protocol();
1596 }
1597 } else {
1598 accept_me();
1599 }
1600
1601 dispatchers.ms_handle_accept(
1602 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1603
1604 gated_execute("execute_establishing", [this] {
1605 return seastar::futurize_invoke([this] {
9f95a23c
TL
1606 return send_server_ident();
1607 }).then([this] {
1608 if (unlikely(state != state_t::ESTABLISHING)) {
1609 logger().debug("{} triggered {} at the end of execute_establishing()",
1610 conn, get_state_name(state));
1611 abort_protocol();
1612 }
1613 logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
1614 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
1615 conn, global_seq, peer_global_seq, connect_seq,
1616 client_cookie, server_cookie, conn.in_seq,
1617 conn.out_seq, conn.out_q.size());
f67539c2 1618 execute_ready(false);
9f95a23c
TL
1619 }).handle_exception([this] (std::exception_ptr eptr) {
1620 if (state != state_t::ESTABLISHING) {
1621 logger().info("{} execute_establishing() protocol aborted at {} -- {}",
1622 conn, get_state_name(state), eptr);
1623 assert(state == state_t::CLOSING ||
1624 state == state_t::REPLACING);
1625 return;
1626 }
1627 fault(false, "execute_establishing()", eptr);
1628 });
1629 });
1630}
1631
1632// ESTABLISHING or REPLACING state
1633
1634seastar::future<>
1635ProtocolV2::send_server_ident()
1636{
1637 // send_server_ident() logic
1638
1639 // refered to async-conn v2: not assign gs to global_seq
1640 return messenger.get_global_seq().then([this] (auto gs) {
1641 logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
1642
1643 // this is required for the case when this connection is being replaced
1644 requeue_up_to(0);
1645 conn.in_seq = 0;
1646
1647 if (!conn.policy.lossy) {
1648 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1649 }
1650
1651 uint64_t flags = 0;
1652 if (conn.policy.lossy) {
1653 flags = flags | CEPH_MSG_CONNECT_LOSSY;
1654 }
1655
1656 auto server_ident = ServerIdentFrame::Encode(
1657 messenger.get_myaddrs(),
1658 messenger.get_myname().num(),
1659 gs,
1660 conn.policy.features_supported,
1661 conn.policy.features_required | msgr2_required,
1662 flags,
1663 server_cookie);
1664
1665 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1666 " gs={}, features_supported={}, features_required={},"
1667 " flags={}, cookie={}",
1668 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
1669 gs, conn.policy.features_supported,
1670 conn.policy.features_required | msgr2_required,
1671 flags, server_cookie);
1672
1673 conn.set_features(connection_features);
1674
1675 return write_frame(server_ident);
1676 });
1677}
1678
1679// REPLACING state
1680
1681void ProtocolV2::trigger_replacing(bool reconnect,
1682 bool do_reset,
1683 SocketRef&& new_socket,
1684 AuthConnectionMetaRef&& new_auth_meta,
1685 ceph::crypto::onwire::rxtx_t new_rxtx,
1686 uint64_t new_peer_global_seq,
1687 uint64_t new_client_cookie,
1688 entity_name_t new_peer_name,
1689 uint64_t new_conn_features,
f67539c2
TL
1690 bool tx_is_rev1,
1691 bool rx_is_rev1,
9f95a23c
TL
1692 uint64_t new_connect_seq,
1693 uint64_t new_msg_seq)
1694{
1695 trigger_state(state_t::REPLACING, write_state_t::delay, false);
1696 if (socket) {
1697 socket->shutdown();
1698 }
f67539c2
TL
1699 dispatchers.ms_handle_accept(
1700 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1701 gate.dispatch_in_background("trigger_replacing", *this,
1702 [this,
1703 reconnect,
1704 do_reset,
1705 new_socket = std::move(new_socket),
1706 new_auth_meta = std::move(new_auth_meta),
1707 new_rxtx = std::move(new_rxtx),
1708 tx_is_rev1, rx_is_rev1,
1709 new_client_cookie, new_peer_name,
1710 new_conn_features, new_peer_global_seq,
1711 new_connect_seq, new_msg_seq] () mutable {
9f95a23c
TL
1712 return wait_write_exit().then([this, do_reset] {
1713 if (do_reset) {
1714 reset_session(true);
1715 }
1716 protocol_timer.cancel();
1717 return execution_done.get_future();
1718 }).then([this,
1719 reconnect,
1720 new_socket = std::move(new_socket),
1721 new_auth_meta = std::move(new_auth_meta),
1722 new_rxtx = std::move(new_rxtx),
f67539c2 1723 tx_is_rev1, rx_is_rev1,
9f95a23c
TL
1724 new_client_cookie, new_peer_name,
1725 new_conn_features, new_peer_global_seq,
1726 new_connect_seq, new_msg_seq] () mutable {
1727 if (unlikely(state != state_t::REPLACING)) {
1728 return new_socket->close().then([sock = std::move(new_socket)] {
1729 abort_protocol();
1730 });
1731 }
1732
1733 if (socket) {
f67539c2
TL
1734 gate.dispatch_in_background("close_socket_replacing", *this,
1735 [sock = std::move(socket)] () mutable {
9f95a23c
TL
1736 return sock->close().then([sock = std::move(sock)] {});
1737 });
1738 }
1739 socket = std::move(new_socket);
1740 auth_meta = std::move(new_auth_meta);
1741 session_stream_handlers = std::move(new_rxtx);
1742 record_io = false;
1743 peer_global_seq = new_peer_global_seq;
1744
1745 if (reconnect) {
1746 connect_seq = new_connect_seq;
1747 // send_reconnect_ok() logic
1748 requeue_up_to(new_msg_seq);
1749 auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
1750 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
1751 return write_frame(reconnect_ok);
1752 } else {
1753 client_cookie = new_client_cookie;
f67539c2
TL
1754 assert(conn.get_peer_type() == new_peer_name.type());
1755 if (conn.get_peer_id() == entity_name_t::NEW) {
1756 conn.set_peer_id(new_peer_name.num());
1757 }
9f95a23c 1758 connection_features = new_conn_features;
f67539c2
TL
1759 tx_frame_asm.set_is_rev1(tx_is_rev1);
1760 rx_frame_asm.set_is_rev1(rx_is_rev1);
9f95a23c
TL
1761 return send_server_ident();
1762 }
1763 }).then([this, reconnect] {
1764 if (unlikely(state != state_t::REPLACING)) {
1765 logger().debug("{} triggered {} at the end of trigger_replacing()",
1766 conn, get_state_name(state));
1767 abort_protocol();
1768 }
1769 logger().info("{} replaced ({}):"
1770 " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
1771 " in_seq={}, out_seq={}, out_q={}",
1772 conn, reconnect ? "reconnected" : "connected",
1773 global_seq, peer_global_seq, connect_seq, client_cookie,
1774 server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
f67539c2 1775 execute_ready(false);
9f95a23c
TL
1776 }).handle_exception([this] (std::exception_ptr eptr) {
1777 if (state != state_t::REPLACING) {
1778 logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
1779 conn, get_state_name(state), eptr);
1780 assert(state == state_t::CLOSING);
1781 return;
1782 }
1783 fault(true, "trigger_replacing()", eptr);
1784 });
1785 });
1786}
1787
1788// READY state
1789
1790ceph::bufferlist ProtocolV2::do_sweep_messages(
20effc67 1791 const std::deque<MessageURef>& msgs,
9f95a23c
TL
1792 size_t num_msgs,
1793 bool require_keepalive,
1794 std::optional<utime_t> _keepalive_ack,
1795 bool require_ack)
1796{
1797 ceph::bufferlist bl;
1798
1799 if (unlikely(require_keepalive)) {
1800 auto keepalive_frame = KeepAliveFrame::Encode();
f6b5b4d7 1801 bl.append(keepalive_frame.get_buffer(tx_frame_asm));
9f95a23c
TL
1802 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
1803 }
1804
1805 if (unlikely(_keepalive_ack.has_value())) {
1806 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
f6b5b4d7 1807 bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
9f95a23c
TL
1808 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
1809 }
1810
20effc67 1811 if (require_ack && num_msgs == 0u) {
9f95a23c 1812 auto ack_frame = AckFrame::Encode(conn.in_seq);
f6b5b4d7 1813 bl.append(ack_frame.get_buffer(tx_frame_asm));
9f95a23c
TL
1814 INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
1815 }
1816
20effc67 1817 std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
9f95a23c
TL
1818 // TODO: move to common code
1819 // set priority
1820 msg->get_header().src = messenger.get_myname();
1821
1822 msg->encode(conn.features, 0);
1823
1824 ceph_assert(!msg->get_seq() && "message already has seq");
1825 msg->set_seq(++conn.out_seq);
1826
1827 ceph_msg_header &header = msg->get_header();
1828 ceph_msg_footer &footer = msg->get_footer();
1829
1830 ceph_msg_header2 header2{header.seq, header.tid,
1831 header.type, header.priority,
1832 header.version,
20effc67
TL
1833 ceph_le32(0), header.data_off,
1834 ceph_le64(conn.in_seq),
9f95a23c
TL
1835 footer.flags, header.compat_version,
1836 header.reserved};
1837
1838 auto message = MessageFrame::Encode(header2,
1839 msg->get_payload(), msg->get_middle(), msg->get_data());
1840 logger().debug("{} --> #{} === {} ({})",
1841 conn, msg->get_seq(), *msg, msg->get_type());
f6b5b4d7 1842 bl.append(message.get_buffer(tx_frame_asm));
9f95a23c
TL
1843 INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
1844 });
1845
1846 return bl;
1847}
1848
1849seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
1850{
1851 return read_frame_payload()
1852 .then([this, throttle_stamp] {
1853 utime_t recv_stamp{seastar::lowres_system_clock::now()};
1854
1855 // we need to get the size before std::moving segments data
1856 const size_t cur_msg_size = get_current_msg_size();
f6b5b4d7 1857 auto msg_frame = MessageFrame::Decode(rx_segments_data);
9f95a23c
TL
1858 // XXX: paranoid copy just to avoid oops
1859 ceph_msg_header2 current_header = msg_frame.header();
1860
1861 logger().trace("{} got {} + {} + {} byte message,"
1862 " envelope type={} src={} off={} seq={}",
1863 conn, msg_frame.front_len(), msg_frame.middle_len(),
1864 msg_frame.data_len(), current_header.type, conn.get_peer_name(),
1865 current_header.data_off, current_header.seq);
1866
1867 ceph_msg_header header{current_header.seq,
1868 current_header.tid,
1869 current_header.type,
1870 current_header.priority,
1871 current_header.version,
20effc67
TL
1872 ceph_le32(msg_frame.front_len()),
1873 ceph_le32(msg_frame.middle_len()),
1874 ceph_le32(msg_frame.data_len()),
9f95a23c
TL
1875 current_header.data_off,
1876 conn.get_peer_name(),
1877 current_header.compat_version,
1878 current_header.reserved,
20effc67
TL
1879 ceph_le32(0)};
1880 ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
1881 ceph_le32(0), ceph_le64(0), current_header.flags};
9f95a23c 1882
f67539c2 1883 auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
9f95a23c
TL
1884 conn.shared_from_this());
1885 Message *message = decode_message(nullptr, 0, header, footer,
f67539c2 1886 msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
9f95a23c
TL
1887 if (!message) {
1888 logger().warn("{} decode message failed", conn);
1889 abort_in_fault();
1890 }
1891
1892 // store reservation size in message, so we don't get confused
1893 // by messages entering the dispatch queue through other paths.
1894 message->set_dispatch_throttle_size(cur_msg_size);
1895
1896 message->set_throttle_stamp(throttle_stamp);
1897 message->set_recv_stamp(recv_stamp);
1898 message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
1899
1900 // check received seq#. if it is old, drop the message.
1901 // note that incoming messages may skip ahead. this is convenient for the
1902 // client side queueing because messages can't be renumbered, but the (kernel)
1903 // client will occasionally pull a message out of the sent queue to send
1904 // elsewhere. in that case it doesn't matter if we "got" it or not.
1905 uint64_t cur_seq = conn.in_seq;
1906 if (message->get_seq() <= cur_seq) {
f67539c2
TL
1907 logger().error("{} got old message {} <= {} {}, discarding",
1908 conn, message->get_seq(), cur_seq, *message);
9f95a23c 1909 if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
f67539c2 1910 local_conf()->ms_die_on_old_message) {
9f95a23c
TL
1911 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1912 }
f67539c2 1913 return seastar::now();
9f95a23c
TL
1914 } else if (message->get_seq() > cur_seq + 1) {
1915 logger().error("{} missed message? skipped from seq {} to {}",
1916 conn, cur_seq, message->get_seq());
f67539c2 1917 if (local_conf()->ms_die_on_skipped_message) {
9f95a23c
TL
1918 ceph_assert(0 == "skipped incoming seq");
1919 }
1920 }
1921
1922 // note last received message.
1923 conn.in_seq = message->get_seq();
1924 logger().debug("{} <== #{} === {} ({})",
1925 conn, message->get_seq(), *message, message->get_type());
1926 notify_ack();
1927 ack_writes(current_header.ack_seq);
1928
1929 // TODO: change MessageRef with seastar::shared_ptr
1930 auto msg_ref = MessageRef{message, false};
f67539c2
TL
1931 // throttle the reading process by the returned future
1932 return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
9f95a23c
TL
1933 });
1934}
1935
f67539c2 1936void ProtocolV2::execute_ready(bool dispatch_connect)
9f95a23c
TL
1937{
1938 assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
1939 trigger_state(state_t::READY, write_state_t::open, false);
f67539c2
TL
1940 if (dispatch_connect) {
1941 dispatchers.ms_handle_connect(
1942 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1943 }
9f95a23c
TL
1944#ifdef UNIT_TESTS_BUILT
1945 if (conn.interceptor) {
1946 conn.interceptor->register_conn_ready(conn);
1947 }
1948#endif
f67539c2 1949 gated_execute("execute_ready", [this] {
9f95a23c
TL
1950 protocol_timer.cancel();
1951 return seastar::keep_doing([this] {
1952 return read_main_preamble()
1953 .then([this] (Tag tag) {
1954 switch (tag) {
1955 case Tag::MESSAGE: {
f67539c2 1956 return seastar::futurize_invoke([this] {
9f95a23c
TL
1957 // throttle_message() logic
1958 if (!conn.policy.throttler_messages) {
1959 return seastar::now();
1960 }
1961 // TODO: message throttler
1962 ceph_assert(false);
1963 return seastar::now();
1964 }).then([this] {
1965 // throttle_bytes() logic
1966 if (!conn.policy.throttler_bytes) {
1967 return seastar::now();
1968 }
1969 size_t cur_msg_size = get_current_msg_size();
1970 if (!cur_msg_size) {
1971 return seastar::now();
1972 }
1973 logger().trace("{} wants {} bytes from policy throttler {}/{}",
1974 conn, cur_msg_size,
1975 conn.policy.throttler_bytes->get_current(),
1976 conn.policy.throttler_bytes->get_max());
1977 return conn.policy.throttler_bytes->get(cur_msg_size);
1978 }).then([this] {
1979 // TODO: throttle_dispatch_queue() logic
1980 utime_t throttle_stamp{seastar::lowres_system_clock::now()};
1981 return read_message(throttle_stamp);
1982 });
1983 }
1984 case Tag::ACK:
1985 return read_frame_payload().then([this] {
1986 // handle_message_ack() logic
1987 auto ack = AckFrame::Decode(rx_segments_data.back());
1988 logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
1989 ack_writes(ack.seq());
1990 });
1991 case Tag::KEEPALIVE2:
1992 return read_frame_payload().then([this] {
1993 // handle_keepalive2() logic
1994 auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
1995 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
1996 conn, keepalive_frame.timestamp());
1997 notify_keepalive_ack(keepalive_frame.timestamp());
1998 conn.set_last_keepalive(seastar::lowres_system_clock::now());
1999 });
2000 case Tag::KEEPALIVE2_ACK:
2001 return read_frame_payload().then([this] {
2002 // handle_keepalive2_ack() logic
2003 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
2004 conn.set_last_keepalive_ack(
2005 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
2006 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
2007 conn, conn.last_keepalive_ack);
2008 });
2009 default: {
2010 unexpected_tag(tag, conn, "execute_ready");
2011 return seastar::now();
2012 }
2013 }
2014 });
2015 }).handle_exception([this] (std::exception_ptr eptr) {
2016 if (state != state_t::READY) {
2017 logger().info("{} execute_ready(): protocol aborted at {} -- {}",
2018 conn, get_state_name(state), eptr);
2019 assert(state == state_t::REPLACING ||
2020 state == state_t::CLOSING);
2021 return;
2022 }
2023 fault(false, "execute_ready()", eptr);
2024 });
2025 });
2026}
2027
2028// STANDBY state
2029
2030void ProtocolV2::execute_standby()
2031{
2032 trigger_state(state_t::STANDBY, write_state_t::delay, true);
2033 if (socket) {
2034 socket->shutdown();
2035 }
2036}
2037
2038void ProtocolV2::notify_write()
2039{
2040 if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
2041 logger().info("{} notify_write(): at {}, going to CONNECTING",
2042 conn, get_state_name(state));
2043 execute_connecting();
2044 }
2045}
2046
2047// WAIT state
2048
2049void ProtocolV2::execute_wait(bool max_backoff)
2050{
2051 trigger_state(state_t::WAIT, write_state_t::delay, true);
2052 if (socket) {
2053 socket->shutdown();
2054 }
f67539c2 2055 gated_execute("execute_wait", [this, max_backoff] {
9f95a23c
TL
2056 double backoff = protocol_timer.last_dur();
2057 if (max_backoff) {
f67539c2 2058 backoff = local_conf().get_val<double>("ms_max_backoff");
9f95a23c 2059 } else if (backoff > 0) {
f67539c2 2060 backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
9f95a23c 2061 } else {
f67539c2 2062 backoff = local_conf().get_val<double>("ms_initial_backoff");
9f95a23c
TL
2063 }
2064 return protocol_timer.backoff(backoff).then([this] {
2065 if (unlikely(state != state_t::WAIT)) {
2066 logger().debug("{} triggered {} at the end of execute_wait()",
2067 conn, get_state_name(state));
2068 abort_protocol();
2069 }
2070 logger().info("{} execute_wait(): going to CONNECTING", conn);
2071 execute_connecting();
2072 }).handle_exception([this] (std::exception_ptr eptr) {
2073 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
2074 conn, get_state_name(state), eptr);
2075 assert(state == state_t::REPLACING ||
2076 state == state_t::CLOSING);
2077 });
2078 });
2079}
2080
2081// SERVER_WAIT state
2082
2083void ProtocolV2::execute_server_wait()
2084{
2085 trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
f67539c2 2086 gated_execute("execute_server_wait", [this] {
9f95a23c
TL
2087 return read_exactly(1).then([this] (auto bl) {
2088 logger().warn("{} SERVER_WAIT got read, abort", conn);
2089 abort_in_fault();
2090 }).handle_exception([this] (std::exception_ptr eptr) {
2091 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
2092 conn, get_state_name(state), eptr);
f67539c2 2093 close(false);
9f95a23c
TL
2094 });
2095 });
2096}
2097
2098// CLOSING state
2099
2100void ProtocolV2::trigger_close()
2101{
f67539c2
TL
2102 messenger.closing_conn(
2103 seastar::static_pointer_cast<SocketConnection>(
2104 conn.shared_from_this()));
2105
9f95a23c
TL
2106 if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
2107 messenger.unaccept_conn(
2108 seastar::static_pointer_cast<SocketConnection>(
2109 conn.shared_from_this()));
2110 } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
2111 messenger.unregister_conn(
2112 seastar::static_pointer_cast<SocketConnection>(
2113 conn.shared_from_this()));
2114 } else {
2115 // cannot happen
2116 ceph_assert(false);
2117 }
2118
2119 protocol_timer.cancel();
9f95a23c 2120 trigger_state(state_t::CLOSING, write_state_t::drop, false);
f67539c2
TL
2121}
2122
2123void ProtocolV2::on_closed()
2124{
2125 messenger.closed_conn(
2126 seastar::static_pointer_cast<SocketConnection>(
2127 conn.shared_from_this()));
2128}
2129
2130void ProtocolV2::print(std::ostream& out) const
2131{
2132 out << conn;
9f95a23c
TL
2133}
2134
2135} // namespace crimson::net