]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/ProtocolV2.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV2.h"
5
6 #include <seastar/core/lowres_clock.hh>
7 #include <fmt/format.h>
8 #if FMT_VERSION >= 60000
9 #include <fmt/chrono.h>
10 #else
11 #include <fmt/time.h>
12 #endif
13 #include "include/msgr.h"
14 #include "include/random.h"
15
16 #include "crimson/auth/AuthClient.h"
17 #include "crimson/auth/AuthServer.h"
18
19 #include "Config.h"
20 #include "Dispatcher.h"
21 #include "Errors.h"
22 #include "Socket.h"
23 #include "SocketConnection.h"
24 #include "SocketMessenger.h"
25
26 #ifdef UNIT_TESTS_BUILT
27 #include "Interceptor.h"
28 #endif
29
30 using namespace ceph::msgr::v2;
31
32 namespace {
33
34 // TODO: apply the same logging policy to Protocol V1
35 // Log levels in V2 Protocol:
36 // * error level, something error that cause connection to terminate:
37 // - fatal errors;
38 // - bugs;
39 // * warn level: something unusual that identifies connection fault or replacement:
40 // - unstable network;
41 // - incompatible peer;
42 // - auth failure;
43 // - connection race;
44 // - connection reset;
45 // * info level, something very important to show connection lifecycle,
46 // which doesn't happen very frequently;
47 // * debug level, important logs for debugging, including:
48 // - all the messages sent/received (-->/<==);
49 // - all the frames exchanged (WRITE/GOT);
50 // - important fields updated (UPDATE);
51 // - connection state transitions (TRIGGER);
52 // * trace level, trivial logs showing:
53 // - the exact bytes being sent/received (SEND/RECV(bytes));
54 // - detailed information of sub-frames;
55 // - integrity checks;
56 // - etc.
57 seastar::logger& logger() {
58 return crimson::get_logger(ceph_subsys_ms);
59 }
60
61 void abort_in_fault() {
62 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
63 }
64
65 void abort_protocol() {
66 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
67 }
68
69 void abort_in_close(crimson::net::ProtocolV2& proto) {
70 (void) proto.close();
71 abort_protocol();
72 }
73
74 inline void expect_tag(const Tag& expected,
75 const Tag& actual,
76 crimson::net::SocketConnection& conn,
77 const char *where) {
78 if (actual != expected) {
79 logger().warn("{} {} received wrong tag: {}, expected {}",
80 conn, where,
81 static_cast<uint32_t>(actual),
82 static_cast<uint32_t>(expected));
83 abort_in_fault();
84 }
85 }
86
87 inline void unexpected_tag(const Tag& unexpected,
88 crimson::net::SocketConnection& conn,
89 const char *where) {
90 logger().warn("{} {} received unexpected tag: {}",
91 conn, where, static_cast<uint32_t>(unexpected));
92 abort_in_fault();
93 }
94
95 inline uint64_t generate_client_cookie() {
96 return ceph::util::generate_random_number<uint64_t>(
97 1, std::numeric_limits<uint64_t>::max());
98 }
99
100 } // namespace anonymous
101
102 template <>
103 struct fmt::formatter<seastar::lowres_system_clock::time_point> {
104 // ignore the format string
105 template <typename ParseContext>
106 constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }
107
108 template <typename FormatContext>
109 auto format(const seastar::lowres_system_clock::time_point& t,
110 FormatContext& ctx) {
111 std::time_t tt = std::chrono::duration_cast<std::chrono::seconds>(
112 t.time_since_epoch()).count();
113 auto milliseconds = (t.time_since_epoch() %
114 std::chrono::seconds(1)).count();
115 return fmt::format_to(ctx.out(), "{:%Y-%m-%d %H:%M:%S} {:03d}",
116 fmt::localtime(tt), milliseconds);
117 }
118 };
119
120 namespace std {
121 inline ostream& operator<<(
122 ostream& out, const seastar::lowres_system_clock::time_point& t)
123 {
124 return out << fmt::format("{}", t);
125 }
126 }
127
128 namespace crimson::net {
129
130 #ifdef UNIT_TESTS_BUILT
131 void intercept(Breakpoint bp, bp_type_t type,
132 SocketConnection& conn, SocketRef& socket) {
133 if (conn.interceptor) {
134 auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
135 socket->set_trap(type, action, &conn.interceptor->blocker);
136 }
137 }
138
139 #define INTERCEPT_CUSTOM(bp, type) \
140 intercept({bp}, type, conn, socket)
141
142 #define INTERCEPT_FRAME(tag, type) \
143 intercept({static_cast<Tag>(tag), type}, \
144 type, conn, socket)
145
146 #define INTERCEPT_N_RW(bp) \
147 if (conn.interceptor) { \
148 auto action = conn.interceptor->intercept(conn, {bp}); \
149 ceph_assert(action != bp_action_t::BLOCK); \
150 if (action == bp_action_t::FAULT) { \
151 abort_in_fault(); \
152 } \
153 }
154
155 #else
156 #define INTERCEPT_CUSTOM(bp, type)
157 #define INTERCEPT_FRAME(tag, type)
158 #define INTERCEPT_N_RW(bp)
159 #endif
160
161 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
162 {
163 logger().warn("{} waiting {} seconds ...", conn, seconds);
164 cancel();
165 last_dur_ = seconds;
166 as = seastar::abort_source();
167 auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
168 std::chrono::duration<double>(seconds));
169 return seastar::sleep_abortable(dur, *as
170 ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
171 logger().debug("{} wait aborted", conn);
172 abort_protocol();
173 });
174 }
175
176 ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
177 SocketConnection& conn,
178 SocketMessenger& messenger)
179 : Protocol(proto_t::v2, dispatcher, conn),
180 messenger{messenger},
181 protocol_timer{conn}
182 {}
183
184 ProtocolV2::~ProtocolV2() {}
185
186 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
187 const entity_type_t& _peer_type)
188 {
189 ceph_assert(state == state_t::NONE);
190 ceph_assert(!socket);
191 conn.peer_addr = _peer_addr;
192 conn.target_addr = _peer_addr;
193 conn.set_peer_type(_peer_type);
194 conn.policy = messenger.get_policy(_peer_type);
195 client_cookie = generate_client_cookie();
196 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={}, cc={}"
197 " policy(lossy={}, server={}, standby={}, resetcheck={})",
198 conn, _peer_addr, ceph_entity_type_name(_peer_type), client_cookie,
199 conn.policy.lossy, conn.policy.server,
200 conn.policy.standby, conn.policy.resetcheck);
201 messenger.register_conn(
202 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
203 execute_connecting();
204 }
205
206 void ProtocolV2::start_accept(SocketRef&& sock,
207 const entity_addr_t& _peer_addr)
208 {
209 ceph_assert(state == state_t::NONE);
210 ceph_assert(!socket);
211 // until we know better
212 conn.target_addr = _peer_addr;
213 conn.set_ephemeral_port(_peer_addr.get_port(),
214 SocketConnection::side_t::acceptor);
215 socket = std::move(sock);
216 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
217 messenger.accept_conn(
218 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
219 execute_accepting();
220 }
221
222 // TODO: Frame related implementations, probably to a separate class.
223
224 void ProtocolV2::enable_recording()
225 {
226 rxbuf.clear();
227 txbuf.clear();
228 record_io = true;
229 }
230
231 seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
232 {
233 if (unlikely(record_io)) {
234 return socket->read_exactly(bytes)
235 .then([this] (auto bl) {
236 rxbuf.append(buffer::create(bl.share()));
237 return bl;
238 });
239 } else {
240 return socket->read_exactly(bytes);
241 };
242 }
243
244 seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
245 {
246 if (unlikely(record_io)) {
247 return socket->read(bytes)
248 .then([this] (auto buf) {
249 rxbuf.append(buf);
250 return buf;
251 });
252 } else {
253 return socket->read(bytes);
254 }
255 }
256
257 seastar::future<> ProtocolV2::write(bufferlist&& buf)
258 {
259 if (unlikely(record_io)) {
260 txbuf.append(buf);
261 }
262 return socket->write(std::move(buf));
263 }
264
265 seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
266 {
267 if (unlikely(record_io)) {
268 txbuf.append(buf);
269 }
270 return socket->write_flush(std::move(buf));
271 }
272
273 size_t ProtocolV2::get_current_msg_size() const
274 {
275 ceph_assert(!rx_segments_desc.empty());
276 size_t sum = 0;
277 // we don't include SegmentIndex::Msg::HEADER.
278 for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
279 sum += rx_segments_desc[idx].length;
280 }
281 return sum;
282 }
283
284 seastar::future<Tag> ProtocolV2::read_main_preamble()
285 {
286 return read_exactly(FRAME_PREAMBLE_SIZE)
287 .then([this] (auto bl) {
288 if (session_stream_handlers.rx) {
289 session_stream_handlers.rx->reset_rx_handler();
290 /*
291 bl = session_stream_handlers.rx->authenticated_decrypt_update(
292 std::move(bl), segment_t::DEFAULT_ALIGNMENT);
293 */
294 }
295
296 // I expect ceph_le32 will make the endian conversion for me. Passing
297 // everything through ::Decode is unnecessary.
298 const auto& main_preamble = \
299 *reinterpret_cast<const preamble_block_t*>(bl.get());
300 logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
301 conn, bl.size(), (int)main_preamble.tag,
302 (int)main_preamble.num_segments, main_preamble.crc);
303
304 // verify preamble's CRC before any further processing
305 const auto rx_crc = ceph_crc32c(0,
306 reinterpret_cast<const unsigned char*>(&main_preamble),
307 sizeof(main_preamble) - sizeof(main_preamble.crc));
308 if (rx_crc != main_preamble.crc) {
309 logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
310 conn, rx_crc, main_preamble.crc);
311 abort_in_fault();
312 }
313
314 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
315 if (main_preamble.num_segments < 1 ||
316 main_preamble.num_segments > MAX_NUM_SEGMENTS) {
317 logger().warn("{} unsupported num_segments={}",
318 conn, main_preamble.num_segments);
319 abort_in_fault();
320 }
321 if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
322 logger().warn("{} num_segments too much: {}",
323 conn, main_preamble.num_segments);
324 abort_in_fault();
325 }
326
327 rx_segments_desc.clear();
328 rx_segments_data.clear();
329
330 for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
331 logger().trace("{} GOT frame segment: len={} align={}",
332 conn, main_preamble.segments[idx].length,
333 main_preamble.segments[idx].alignment);
334 rx_segments_desc.emplace_back(main_preamble.segments[idx]);
335 }
336
337 INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ);
338 return static_cast<Tag>(main_preamble.tag);
339 });
340 }
341
342 seastar::future<> ProtocolV2::read_frame_payload()
343 {
344 ceph_assert(!rx_segments_desc.empty());
345 ceph_assert(rx_segments_data.empty());
346
347 return seastar::do_until(
348 [this] { return rx_segments_desc.size() == rx_segments_data.size(); },
349 [this] {
350 // description of current segment to read
351 const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
352 // TODO: create aligned and contiguous buffer from socket
353 if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) {
354 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
355 conn, cur_rx_desc.alignment, rx_segments_data.size());
356 }
357 // TODO: create aligned and contiguous buffer from socket
358 return read_exactly(cur_rx_desc.length)
359 .then([this] (auto tmp_bl) {
360 logger().trace("{} RECV({}) frame segment[{}]",
361 conn, tmp_bl.size(), rx_segments_data.size());
362 bufferlist data;
363 data.append(buffer::create(std::move(tmp_bl)));
364 if (session_stream_handlers.rx) {
365 // TODO
366 ceph_assert(false);
367 }
368 rx_segments_data.emplace_back(std::move(data));
369 });
370 }
371 ).then([this] {
372 // TODO: get_epilogue_size()
373 ceph_assert(!session_stream_handlers.rx);
374 return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE);
375 }).then([this] (auto bl) {
376 logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
377
378 __u8 late_flags;
379 if (session_stream_handlers.rx) {
380 // TODO
381 ceph_assert(false);
382 } else {
383 auto& epilogue = *reinterpret_cast<const epilogue_plain_block_t*>(bl.get());
384 for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
385 const __u32 expected_crc = epilogue.crc_values[idx];
386 const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
387 if (expected_crc != calculated_crc) {
388 logger().warn("{} message integrity check failed at index {}:"
389 " expected_crc={} calculated_crc={}",
390 conn, (unsigned int)idx, expected_crc, calculated_crc);
391 abort_in_fault();
392 } else {
393 logger().trace("{} message integrity check success at index {}: crc={}",
394 conn, (unsigned int)idx, expected_crc);
395 }
396 }
397 late_flags = epilogue.late_flags;
398 }
399 logger().trace("{} GOT frame epilogue: late_flags={}",
400 conn, (unsigned)late_flags);
401
402 // we do have a mechanism that allows transmitter to start sending message
403 // and abort after putting entire data field on wire. This will be used by
404 // the kernel client to avoid unnecessary buffering.
405 if (late_flags & FRAME_FLAGS_LATEABRT) {
406 // TODO
407 ceph_assert(false);
408 }
409 });
410 }
411
412 template <class F>
413 seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
414 {
415 auto bl = frame.get_buffer(session_stream_handlers);
416 const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
417 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
418 conn, bl.length(), (int)main_preamble->tag,
419 (int)main_preamble->num_segments, main_preamble->crc);
420 INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
421 if (flush) {
422 return write_flush(std::move(bl));
423 } else {
424 return write(std::move(bl));
425 }
426 }
427
428 void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
429 {
430 if (!reentrant && _state == state) {
431 logger().error("{} is not allowed to re-trigger state {}",
432 conn, get_state_name(state));
433 ceph_assert(false);
434 }
435 logger().debug("{} TRIGGER {}, was {}",
436 conn, get_state_name(_state), get_state_name(state));
437 state = _state;
438 set_write_state(_write_state);
439 }
440
441 void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
442 {
443 if (conn.policy.lossy) {
444 logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
445 conn, func_name, get_state_name(state), eptr);
446 dispatch_reset();
447 (void) close();
448 } else if (conn.policy.server ||
449 (conn.policy.standby &&
450 (!is_queued() && conn.sent.empty()))) {
451 logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
452 conn, func_name, get_state_name(state), eptr);
453 execute_standby();
454 } else if (backoff) {
455 logger().info("{} {}: fault at {}, going to WAIT -- {}",
456 conn, func_name, get_state_name(state), eptr);
457 execute_wait(false);
458 } else {
459 logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
460 conn, func_name, get_state_name(state), eptr);
461 execute_connecting();
462 }
463 }
464
465 void ProtocolV2::dispatch_reset()
466 {
467 (void) seastar::with_gate(pending_dispatch, [this] {
468 return dispatcher.ms_handle_reset(
469 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
470 }).handle_exception([this] (std::exception_ptr eptr) {
471 logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
472 ceph_abort("unexpected exception from ms_handle_reset()");
473 });
474 }
475
476 void ProtocolV2::reset_session(bool full)
477 {
478 server_cookie = 0;
479 connect_seq = 0;
480 conn.in_seq = 0;
481 if (full) {
482 client_cookie = generate_client_cookie();
483 peer_global_seq = 0;
484 reset_write();
485 (void) seastar::with_gate(pending_dispatch, [this] {
486 return dispatcher.ms_handle_remote_reset(
487 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
488 }).handle_exception([this] (std::exception_ptr eptr) {
489 logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr);
490 ceph_abort("unexpected exception from ms_handle_remote_reset()");
491 });
492 }
493 }
494
495 seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
496 {
497 // 1. prepare and send banner
498 bufferlist banner_payload;
499 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
500 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
501
502 bufferlist bl;
503 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
504 auto len_payload = static_cast<uint16_t>(banner_payload.length());
505 encode(len_payload, bl, 0);
506 bl.claim_append(banner_payload);
507 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
508 "required={}, banner=\"{}\"",
509 conn, bl.length(), len_payload,
510 CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
511 CEPH_BANNER_V2_PREFIX);
512 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
513 return write_flush(std::move(bl)).then([this] {
514 // 2. read peer banner
515 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
516 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
517 return read_exactly(banner_len); // or read exactly?
518 }).then([this] (auto bl) {
519 // 3. process peer banner and read banner_payload
520 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
521 logger().debug("{} RECV({}) banner: \"{}\"",
522 conn, bl.size(),
523 std::string((const char*)bl.get(), banner_prefix_len));
524
525 if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
526 if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
527 logger().warn("{} peer is using V1 protocol", conn);
528 } else {
529 logger().warn("{} peer sent bad banner", conn);
530 }
531 abort_in_fault();
532 }
533 bl.trim_front(banner_prefix_len);
534
535 uint16_t payload_len;
536 bufferlist buf;
537 buf.append(buffer::create(std::move(bl)));
538 auto ti = buf.cbegin();
539 try {
540 decode(payload_len, ti);
541 } catch (const buffer::error &e) {
542 logger().warn("{} decode banner payload len failed", conn);
543 abort_in_fault();
544 }
545 logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
546 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
547 return read(payload_len);
548 }).then([this] (bufferlist bl) {
549 // 4. process peer banner_payload and send HelloFrame
550 auto p = bl.cbegin();
551 uint64_t peer_supported_features;
552 uint64_t peer_required_features;
553 try {
554 decode(peer_supported_features, p);
555 decode(peer_required_features, p);
556 } catch (const buffer::error &e) {
557 logger().warn("{} decode banner payload failed", conn);
558 abort_in_fault();
559 }
560 logger().debug("{} RECV({}) banner features: supported={} required={}",
561 conn, bl.length(),
562 peer_supported_features, peer_required_features);
563
564 // Check feature bit compatibility
565 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
566 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
567 if ((required_features & peer_supported_features) != required_features) {
568 logger().error("{} peer does not support all required features"
569 " required={} peer_supported={}",
570 conn, required_features, peer_supported_features);
571 abort_in_close(*this);
572 }
573 if ((supported_features & peer_required_features) != peer_required_features) {
574 logger().error("{} we do not support all peer required features"
575 " peer_required={} supported={}",
576 conn, peer_required_features, supported_features);
577 abort_in_close(*this);
578 }
579 this->peer_required_features = peer_required_features;
580 if (this->peer_required_features == 0) {
581 this->connection_features = msgr2_required;
582 }
583
584 auto hello = HelloFrame::Encode(messenger.get_mytype(),
585 conn.target_addr);
586 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
587 conn, ceph_entity_type_name(messenger.get_mytype()),
588 conn.target_addr);
589 return write_frame(hello);
590 }).then([this] {
591 //5. read peer HelloFrame
592 return read_main_preamble();
593 }).then([this] (Tag tag) {
594 expect_tag(Tag::HELLO, tag, conn, __func__);
595 return read_frame_payload();
596 }).then([this] {
597 // 6. process peer HelloFrame
598 auto hello = HelloFrame::Decode(rx_segments_data.back());
599 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
600 conn, ceph_entity_type_name(hello.entity_type()),
601 hello.peer_addr());
602 return seastar::make_ready_future<entity_type_t, entity_addr_t>(
603 hello.entity_type(), hello.peer_addr());
604 });
605 }
606
607 // CONNECTING state
608
609 seastar::future<> ProtocolV2::handle_auth_reply()
610 {
611 return read_main_preamble()
612 .then([this] (Tag tag) {
613 switch (tag) {
614 case Tag::AUTH_BAD_METHOD:
615 return read_frame_payload().then([this] {
616 // handle_auth_bad_method() logic
617 auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
618 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
619 "allowed_methods={}, allowed_modes={}",
620 conn, bad_method.method(), cpp_strerror(bad_method.result()),
621 bad_method.allowed_methods(), bad_method.allowed_modes());
622 ceph_assert(messenger.get_auth_client());
623 int r = messenger.get_auth_client()->handle_auth_bad_method(
624 conn.shared_from_this(), auth_meta,
625 bad_method.method(), bad_method.result(),
626 bad_method.allowed_methods(), bad_method.allowed_modes());
627 if (r < 0) {
628 logger().warn("{} auth_client handle_auth_bad_method returned {}",
629 conn, r);
630 abort_in_fault();
631 }
632 return client_auth(bad_method.allowed_methods());
633 });
634 case Tag::AUTH_REPLY_MORE:
635 return read_frame_payload().then([this] {
636 // handle_auth_reply_more() logic
637 auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
638 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
639 conn, auth_more.auth_payload().length());
640 ceph_assert(messenger.get_auth_client());
641 // let execute_connecting() take care of the thrown exception
642 auto reply = messenger.get_auth_client()->handle_auth_reply_more(
643 conn.shared_from_this(), auth_meta, auth_more.auth_payload());
644 auto more_reply = AuthRequestMoreFrame::Encode(reply);
645 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
646 conn, reply.length());
647 return write_frame(more_reply);
648 }).then([this] {
649 return handle_auth_reply();
650 });
651 case Tag::AUTH_DONE:
652 return read_frame_payload().then([this] {
653 // handle_auth_done() logic
654 auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
655 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
656 conn, auth_done.global_id(),
657 ceph_con_mode_name(auth_done.con_mode()),
658 auth_done.auth_payload().length());
659 ceph_assert(messenger.get_auth_client());
660 int r = messenger.get_auth_client()->handle_auth_done(
661 conn.shared_from_this(), auth_meta,
662 auth_done.global_id(),
663 auth_done.con_mode(),
664 auth_done.auth_payload());
665 if (r < 0) {
666 logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
667 abort_in_fault();
668 }
669 auth_meta->con_mode = auth_done.con_mode();
670 // TODO
671 ceph_assert(!auth_meta->is_mode_secure());
672 session_stream_handlers = { nullptr, nullptr };
673 return finish_auth();
674 });
675 default: {
676 unexpected_tag(tag, conn, __func__);
677 return seastar::now();
678 }
679 }
680 });
681 }
682
683 seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
684 {
685 // send_auth_request() logic
686 ceph_assert(messenger.get_auth_client());
687
688 try {
689 auto [auth_method, preferred_modes, bl] =
690 messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
691 auth_meta->auth_method = auth_method;
692 auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
693 logger().debug("{} WRITE AuthRequestFrame: method={},"
694 " preferred_modes={}, payload_len={}",
695 conn, auth_method, preferred_modes, bl.length());
696 return write_frame(frame).then([this] {
697 return handle_auth_reply();
698 });
699 } catch (const crimson::auth::error& e) {
700 logger().error("{} get_initial_auth_request returned {}", conn, e);
701 dispatch_reset();
702 abort_in_close(*this);
703 return seastar::now();
704 }
705 }
706
707 seastar::future<ProtocolV2::next_step_t>
708 ProtocolV2::process_wait()
709 {
710 return read_frame_payload().then([this] {
711 // handle_wait() logic
712 logger().debug("{} GOT WaitFrame", conn);
713 WaitFrame::Decode(rx_segments_data.back());
714 return next_step_t::wait;
715 });
716 }
717
718 seastar::future<ProtocolV2::next_step_t>
719 ProtocolV2::client_connect()
720 {
721 // send_client_ident() logic
722 uint64_t flags = 0;
723 if (conn.policy.lossy) {
724 flags |= CEPH_MSG_CONNECT_LOSSY;
725 }
726
727 auto client_ident = ClientIdentFrame::Encode(
728 messenger.get_myaddrs(),
729 conn.target_addr,
730 messenger.get_myname().num(),
731 global_seq,
732 conn.policy.features_supported,
733 conn.policy.features_required | msgr2_required, flags,
734 client_cookie);
735
736 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
737 " gs={}, features_supported={}, features_required={},"
738 " flags={}, cookie={}",
739 conn, messenger.get_myaddrs(), conn.target_addr,
740 messenger.get_myname().num(), global_seq,
741 conn.policy.features_supported,
742 conn.policy.features_required | msgr2_required,
743 flags, client_cookie);
744 return write_frame(client_ident).then([this] {
745 return read_main_preamble();
746 }).then([this] (Tag tag) {
747 switch (tag) {
748 case Tag::IDENT_MISSING_FEATURES:
749 return read_frame_payload().then([this] {
750 // handle_ident_missing_features() logic
751 auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
752 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
753 " (client does not support all server features)",
754 conn, ident_missing.features());
755 abort_in_fault();
756 return next_step_t::none;
757 });
758 case Tag::WAIT:
759 return process_wait();
760 case Tag::SERVER_IDENT:
761 return read_frame_payload().then([this] {
762 // handle_server_ident() logic
763 requeue_sent();
764 auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
765 logger().debug("{} GOT ServerIdentFrame:"
766 " addrs={}, gid={}, gs={},"
767 " features_supported={}, features_required={},"
768 " flags={}, cookie={}",
769 conn,
770 server_ident.addrs(), server_ident.gid(),
771 server_ident.global_seq(),
772 server_ident.supported_features(),
773 server_ident.required_features(),
774 server_ident.flags(), server_ident.cookie());
775
776 // is this who we intended to talk to?
777 // be a bit forgiving here, since we may be connecting based on addresses parsed out
778 // of mon_host or something.
779 if (!server_ident.addrs().contains(conn.target_addr)) {
780 logger().warn("{} peer identifies as {}, does not include {}",
781 conn, server_ident.addrs(), conn.target_addr);
782 throw std::system_error(
783 make_error_code(crimson::net::error::bad_peer_address));
784 }
785
786 server_cookie = server_ident.cookie();
787
788 // TODO: change peer_addr to entity_addrvec_t
789 if (server_ident.addrs().front() != conn.peer_addr) {
790 logger().warn("{} peer advertises as {}, does not match {}",
791 conn, server_ident.addrs(), conn.peer_addr);
792 throw std::system_error(
793 make_error_code(crimson::net::error::bad_peer_address));
794 }
795 conn.set_peer_id(server_ident.gid());
796 conn.set_features(server_ident.supported_features() &
797 conn.policy.features_supported);
798 peer_global_seq = server_ident.global_seq();
799
800 bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
801 if (lossy != conn.policy.lossy) {
802 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
803 conn.policy.lossy = lossy;
804 }
805 if (lossy && (connect_seq != 0 || server_cookie != 0)) {
806 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
807 conn, connect_seq, server_cookie);
808 connect_seq = 0;
809 server_cookie = 0;
810 }
811
812 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
813 });
814 default: {
815 unexpected_tag(tag, conn, "post_client_connect");
816 return seastar::make_ready_future<next_step_t>(next_step_t::none);
817 }
818 }
819 });
820 }
821
822 seastar::future<ProtocolV2::next_step_t>
823 ProtocolV2::client_reconnect()
824 {
825 // send_reconnect() logic
826 auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
827 client_cookie,
828 server_cookie,
829 global_seq,
830 connect_seq,
831 conn.in_seq);
832 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
833 " server_cookie={}, gs={}, cs={}, msg_seq={}",
834 conn, messenger.get_myaddrs(),
835 client_cookie, server_cookie,
836 global_seq, connect_seq, conn.in_seq);
837 return write_frame(reconnect).then([this] {
838 return read_main_preamble();
839 }).then([this] (Tag tag) {
840 switch (tag) {
841 case Tag::SESSION_RETRY_GLOBAL:
842 return read_frame_payload().then([this] {
843 // handle_session_retry_global() logic
844 auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
845 logger().warn("{} GOT RetryGlobalFrame: gs={}",
846 conn, retry.global_seq());
847 return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
848 global_seq = gs;
849 logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
850 return client_reconnect();
851 });
852 });
853 case Tag::SESSION_RETRY:
854 return read_frame_payload().then([this] {
855 // handle_session_retry() logic
856 auto retry = RetryFrame::Decode(rx_segments_data.back());
857 logger().warn("{} GOT RetryFrame: cs={}",
858 conn, retry.connect_seq());
859 connect_seq = retry.connect_seq() + 1;
860 logger().warn("{} UPDATE: cs={}", conn, connect_seq);
861 return client_reconnect();
862 });
863 case Tag::SESSION_RESET:
864 return read_frame_payload().then([this] {
865 // handle_session_reset() logic
866 auto reset = ResetFrame::Decode(rx_segments_data.back());
867 logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
868 reset_session(reset.full());
869 return client_connect();
870 });
871 case Tag::WAIT:
872 return process_wait();
873 case Tag::SESSION_RECONNECT_OK:
874 return read_frame_payload().then([this] {
875 // handle_reconnect_ok() logic
876 auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
877 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
878 conn, reconnect_ok.msg_seq());
879 requeue_up_to(reconnect_ok.msg_seq());
880 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
881 });
882 default: {
883 unexpected_tag(tag, conn, "post_client_reconnect");
884 return seastar::make_ready_future<next_step_t>(next_step_t::none);
885 }
886 }
887 });
888 }
889
890 void ProtocolV2::execute_connecting()
891 {
892 trigger_state(state_t::CONNECTING, write_state_t::delay, true);
893 if (socket) {
894 socket->shutdown();
895 }
896 execution_done = seastar::with_gate(pending_dispatch, [this] {
897 // we don't know my socket_port yet
898 conn.set_ephemeral_port(0, SocketConnection::side_t::none);
899 return messenger.get_global_seq().then([this] (auto gs) {
900 global_seq = gs;
901 assert(client_cookie != 0);
902 if (!conn.policy.lossy && server_cookie != 0) {
903 ++connect_seq;
904 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
905 conn, global_seq, connect_seq);
906 } else { // conn.policy.lossy || server_cookie == 0
907 assert(connect_seq == 0);
908 assert(server_cookie == 0);
909 logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
910 }
911
912 return wait_write_exit();
913 }).then([this] {
914 if (unlikely(state != state_t::CONNECTING)) {
915 logger().debug("{} triggered {} before Socket::connect()",
916 conn, get_state_name(state));
917 abort_protocol();
918 }
919 if (socket) {
920 (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
921 return sock->close().then([sock = std::move(sock)] {});
922 });
923 }
924 INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
925 return Socket::connect(conn.peer_addr);
926 }).then([this](SocketRef sock) {
927 logger().debug("{} socket connected", conn);
928 if (unlikely(state != state_t::CONNECTING)) {
929 logger().debug("{} triggered {} during Socket::connect()",
930 conn, get_state_name(state));
931 return sock->close().then([sock = std::move(sock)] {
932 abort_protocol();
933 });
934 }
935 socket = std::move(sock);
936 return seastar::now();
937 }).then([this] {
938 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
939 session_stream_handlers = { nullptr, nullptr };
940 enable_recording();
941 return banner_exchange();
942 }).then([this] (entity_type_t _peer_type,
943 entity_addr_t _my_addr_from_peer) {
944 if (conn.get_peer_type() != _peer_type) {
945 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
946 conn, ceph_entity_type_name(conn.get_peer_type()),
947 ceph_entity_type_name(_peer_type));
948 dispatch_reset();
949 abort_in_close(*this);
950 }
951 conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
952 SocketConnection::side_t::connector);
953 if (unlikely(_my_addr_from_peer.is_legacy())) {
954 logger().warn("{} peer sent a legacy address for me: {}",
955 conn, _my_addr_from_peer);
956 throw std::system_error(
957 make_error_code(crimson::net::error::bad_peer_address));
958 }
959 _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
960 return messenger.learned_addr(_my_addr_from_peer, conn);
961 }).then([this] {
962 return client_auth();
963 }).then([this] {
964 if (server_cookie == 0) {
965 ceph_assert(connect_seq == 0);
966 return client_connect();
967 } else {
968 ceph_assert(connect_seq > 0);
969 return client_reconnect();
970 }
971 }).then([this] (next_step_t next) {
972 if (unlikely(state != state_t::CONNECTING)) {
973 logger().debug("{} triggered {} at the end of execute_connecting()",
974 conn, get_state_name(state));
975 abort_protocol();
976 }
977 switch (next) {
978 case next_step_t::ready: {
979 (void) seastar::with_gate(pending_dispatch, [this] {
980 return dispatcher.ms_handle_connect(
981 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
982 }).handle_exception([this] (std::exception_ptr eptr) {
983 logger().error("{} ms_handle_connect caught exception: {}", conn, eptr);
984 ceph_abort("unexpected exception from ms_handle_connect()");
985 });
986 logger().info("{} connected:"
987 " gs={}, pgs={}, cs={}, client_cookie={},"
988 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
989 conn, global_seq, peer_global_seq, connect_seq,
990 client_cookie, server_cookie, conn.in_seq,
991 conn.out_seq, conn.out_q.size());
992 execute_ready();
993 break;
994 }
995 case next_step_t::wait: {
996 logger().info("{} execute_connecting(): going to WAIT", conn);
997 execute_wait(true);
998 break;
999 }
1000 default: {
1001 ceph_abort("impossible next step");
1002 }
1003 }
1004 }).handle_exception([this] (std::exception_ptr eptr) {
1005 if (state != state_t::CONNECTING) {
1006 logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
1007 conn, get_state_name(state), eptr);
1008 assert(state == state_t::CLOSING ||
1009 state == state_t::REPLACING);
1010 return;
1011 }
1012
1013 if (conn.policy.server ||
1014 (conn.policy.standby &&
1015 (!is_queued() && conn.sent.empty()))) {
1016 logger().info("{} execute_connecting(): fault at {} with nothing to send,"
1017 " going to STANDBY -- {}",
1018 conn, get_state_name(state), eptr);
1019 execute_standby();
1020 } else {
1021 logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
1022 conn, get_state_name(state), eptr);
1023 execute_wait(false);
1024 }
1025 });
1026 });
1027 }
1028
1029 // ACCEPTING state
1030
1031 seastar::future<> ProtocolV2::_auth_bad_method(int r)
1032 {
1033 // _auth_bad_method() logic
1034 ceph_assert(r < 0);
1035 auto [allowed_methods, allowed_modes] =
1036 messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
1037 auto bad_method = AuthBadMethodFrame::Encode(
1038 auth_meta->auth_method, r, allowed_methods, allowed_modes);
1039 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
1040 "allowed_methods={}, allowed_modes={})",
1041 conn, auth_meta->auth_method, cpp_strerror(r),
1042 allowed_methods, allowed_modes);
1043 return write_frame(bad_method).then([this] {
1044 return server_auth();
1045 });
1046 }
1047
1048 seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
1049 {
1050 // _handle_auth_request() logic
1051 ceph_assert(messenger.get_auth_server());
1052 bufferlist reply;
1053 int r = messenger.get_auth_server()->handle_auth_request(
1054 conn.shared_from_this(), auth_meta,
1055 more, auth_meta->auth_method, auth_payload,
1056 &reply);
1057 switch (r) {
1058 // successful
1059 case 1: {
1060 auto auth_done = AuthDoneFrame::Encode(
1061 conn.peer_global_id, auth_meta->con_mode, reply);
1062 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
1063 conn, conn.peer_global_id,
1064 ceph_con_mode_name(auth_meta->con_mode), reply.length());
1065 return write_frame(auth_done).then([this] {
1066 ceph_assert(auth_meta);
1067 // TODO
1068 ceph_assert(!auth_meta->is_mode_secure());
1069 session_stream_handlers = { nullptr, nullptr };
1070 return finish_auth();
1071 });
1072 }
1073 // auth more
1074 case 0: {
1075 auto more = AuthReplyMoreFrame::Encode(reply);
1076 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
1077 conn, reply.length());
1078 return write_frame(more).then([this] {
1079 return read_main_preamble();
1080 }).then([this] (Tag tag) {
1081 expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
1082 return read_frame_payload();
1083 }).then([this] {
1084 auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
1085 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
1086 conn, auth_more.auth_payload().length());
1087 return _handle_auth_request(auth_more.auth_payload(), true);
1088 });
1089 }
1090 case -EBUSY: {
1091 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
1092 abort_in_fault();
1093 return seastar::now();
1094 }
1095 default: {
1096 logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
1097 return _auth_bad_method(r);
1098 }
1099 }
1100 }
1101
1102 seastar::future<> ProtocolV2::server_auth()
1103 {
1104 return read_main_preamble()
1105 .then([this] (Tag tag) {
1106 expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
1107 return read_frame_payload();
1108 }).then([this] {
1109 // handle_auth_request() logic
1110 auto request = AuthRequestFrame::Decode(rx_segments_data.back());
1111 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1112 " payload_len={}",
1113 conn, request.method(), request.preferred_modes(),
1114 request.auth_payload().length());
1115 auth_meta->auth_method = request.method();
1116 auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
1117 conn.get_peer_type(), auth_meta->auth_method,
1118 request.preferred_modes());
1119 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
1120 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
1121 return _auth_bad_method(-EOPNOTSUPP);
1122 }
1123 return _handle_auth_request(request.auth_payload(), false);
1124 });
1125 }
1126
1127 seastar::future<ProtocolV2::next_step_t>
1128 ProtocolV2::send_wait()
1129 {
1130 auto wait = WaitFrame::Encode();
1131 logger().debug("{} WRITE WaitFrame", conn);
1132 return write_frame(wait).then([] {
1133 return next_step_t::wait;
1134 });
1135 }
1136
1137 seastar::future<ProtocolV2::next_step_t>
1138 ProtocolV2::reuse_connection(
1139 ProtocolV2* existing_proto, bool do_reset,
1140 bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
1141 {
1142 existing_proto->trigger_replacing(reconnect,
1143 do_reset,
1144 std::move(socket),
1145 std::move(auth_meta),
1146 std::move(session_stream_handlers),
1147 peer_global_seq,
1148 client_cookie,
1149 conn.get_peer_name(),
1150 connection_features,
1151 conn_seq,
1152 msg_seq);
1153 #ifdef UNIT_TESTS_BUILT
1154 if (conn.interceptor) {
1155 conn.interceptor->register_conn_replaced(conn);
1156 }
1157 #endif
1158 // close this connection because all the necessary information is delivered
1159 // to the exisiting connection, and jump to error handling code to abort the
1160 // current state.
1161 abort_in_close(*this);
1162 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1163 }
1164
1165 seastar::future<ProtocolV2::next_step_t>
1166 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
1167 {
1168 // handle_existing_connection() logic
1169 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1170 existing_conn->protocol.get());
1171 ceph_assert(existing_proto);
1172 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1173 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1174 conn, global_seq, peer_global_seq, connect_seq,
1175 client_cookie, server_cookie,
1176 existing_conn, get_state_name(existing_proto->state),
1177 existing_proto->global_seq,
1178 existing_proto->peer_global_seq,
1179 existing_proto->connect_seq,
1180 existing_proto->client_cookie,
1181 existing_proto->server_cookie);
1182
1183 if (existing_proto->state == state_t::REPLACING) {
1184 logger().warn("{} server_connect: racing replace happened while"
1185 " replacing existing connection {}, send wait.",
1186 conn, *existing_conn);
1187 return send_wait();
1188 }
1189
1190 if (existing_proto->peer_global_seq > peer_global_seq) {
1191 logger().warn("{} server_connect:"
1192 " this is a stale connection, because peer_global_seq({})"
1193 " < existing->peer_global_seq({}), close this connection"
1194 " in favor of existing connection {}",
1195 conn, peer_global_seq,
1196 existing_proto->peer_global_seq, *existing_conn);
1197 abort_in_fault();
1198 }
1199
1200 if (existing_conn->policy.lossy) {
1201 // existing connection can be thrown out in favor of this one
1202 logger().warn("{} server_connect:"
1203 " existing connection {} is a lossy channel. Close existing in favor of"
1204 " this connection", conn, *existing_conn);
1205 existing_proto->dispatch_reset();
1206 (void) existing_proto->close();
1207
1208 if (unlikely(state != state_t::ACCEPTING)) {
1209 logger().debug("{} triggered {} in execute_accepting()",
1210 conn, get_state_name(state));
1211 abort_protocol();
1212 }
1213 execute_establishing();
1214 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1215 }
1216
1217 if (existing_proto->server_cookie != 0) {
1218 if (existing_proto->client_cookie != client_cookie) {
1219 // Found previous session
1220 // peer has reset and we're going to reuse the existing connection
1221 // by replacing the socket
1222 logger().warn("{} server_connect:"
1223 " found new session (cs={})"
1224 " when existing {} is with stale session (cs={}, ss={}),"
1225 " peer must have reset",
1226 conn, client_cookie,
1227 *existing_conn, existing_proto->client_cookie,
1228 existing_proto->server_cookie);
1229 return reuse_connection(existing_proto, conn.policy.resetcheck);
1230 } else {
1231 // session establishment interrupted between client_ident and server_ident,
1232 // continuing...
1233 logger().warn("{} server_connect: found client session with existing {}"
1234 " matched (cs={}, ss={}), continuing session establishment",
1235 conn, *existing_conn, client_cookie, existing_proto->server_cookie);
1236 return reuse_connection(existing_proto);
1237 }
1238 } else {
1239 // Looks like a connection race: server and client are both connecting to
1240 // each other at the same time.
1241 if (existing_proto->client_cookie != client_cookie) {
1242 if (existing_conn->peer_wins()) {
1243 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1244 " and win, reusing existing {}",
1245 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1246 return reuse_connection(existing_proto);
1247 } else {
1248 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1249 " and lose to existing {}, ask client to wait",
1250 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1251 return existing_conn->keepalive().then([this] {
1252 return send_wait();
1253 });
1254 }
1255 } else {
1256 logger().warn("{} server_connect: found client session with existing {}"
1257 " matched (cs={}, ss={}), continuing session establishment",
1258 conn, *existing_conn, client_cookie, existing_proto->server_cookie);
1259 return reuse_connection(existing_proto);
1260 }
1261 }
1262 }
1263
1264 seastar::future<ProtocolV2::next_step_t>
1265 ProtocolV2::server_connect()
1266 {
1267 return read_frame_payload().then([this] {
1268 // handle_client_ident() logic
1269 auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
1270 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1271 " gid={}, gs={}, features_supported={},"
1272 " features_required={}, flags={}, cookie={}",
1273 conn, client_ident.addrs(), client_ident.target_addr(),
1274 client_ident.gid(), client_ident.global_seq(),
1275 client_ident.supported_features(),
1276 client_ident.required_features(),
1277 client_ident.flags(), client_ident.cookie());
1278
1279 if (client_ident.addrs().empty() ||
1280 client_ident.addrs().front() == entity_addr_t()) {
1281 logger().warn("{} oops, client_ident.addrs() is empty", conn);
1282 throw std::system_error(
1283 make_error_code(crimson::net::error::bad_peer_address));
1284 }
1285 if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
1286 logger().warn("{} peer is trying to reach {} which is not us ({})",
1287 conn, client_ident.target_addr(), messenger.get_myaddrs());
1288 throw std::system_error(
1289 make_error_code(crimson::net::error::bad_peer_address));
1290 }
1291 // TODO: change peer_addr to entity_addrvec_t
1292 entity_addr_t paddr = client_ident.addrs().front();
1293 if ((paddr.is_msgr2() || paddr.is_any()) &&
1294 paddr.is_same_host(conn.target_addr)) {
1295 // good
1296 } else {
1297 logger().warn("{} peer's address {} is not v2 or not the same host with {}",
1298 conn, paddr, conn.target_addr);
1299 throw std::system_error(
1300 make_error_code(crimson::net::error::bad_peer_address));
1301 }
1302 conn.peer_addr = paddr;
1303 logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
1304 conn.target_addr = conn.peer_addr;
1305 if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
1306 logger().warn("{} we don't know how to reconnect to peer {}",
1307 conn, conn.target_addr);
1308 throw std::system_error(
1309 make_error_code(crimson::net::error::bad_peer_address));
1310 }
1311
1312 conn.set_peer_id(client_ident.gid());
1313 client_cookie = client_ident.cookie();
1314
1315 uint64_t feat_missing =
1316 (conn.policy.features_required | msgr2_required) &
1317 ~(uint64_t)client_ident.supported_features();
1318 if (feat_missing) {
1319 auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
1320 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1321 conn, feat_missing);
1322 return write_frame(ident_missing_features).then([] {
1323 return next_step_t::wait;
1324 });
1325 }
1326 connection_features =
1327 client_ident.supported_features() & conn.policy.features_supported;
1328 logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
1329
1330 peer_global_seq = client_ident.global_seq();
1331
1332 // Looks good so far, let's check if there is already an existing connection
1333 // to this peer.
1334
1335 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1336
1337 if (existing_conn) {
1338 if (existing_conn->protocol->proto_type != proto_t::v2) {
1339 logger().warn("{} existing connection {} proto version is {}, close existing",
1340 conn, *existing_conn,
1341 static_cast<int>(existing_conn->protocol->proto_type));
1342 // should unregister the existing from msgr atomically
1343 (void) existing_conn->close();
1344 } else {
1345 return handle_existing_connection(existing_conn);
1346 }
1347 }
1348
1349 if (unlikely(state != state_t::ACCEPTING)) {
1350 logger().debug("{} triggered {} in execute_accepting()",
1351 conn, get_state_name(state));
1352 abort_protocol();
1353 }
1354 execute_establishing();
1355 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1356 });
1357 }
1358
1359 seastar::future<ProtocolV2::next_step_t>
1360 ProtocolV2::read_reconnect()
1361 {
1362 return read_main_preamble()
1363 .then([this] (Tag tag) {
1364 expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
1365 return server_reconnect();
1366 });
1367 }
1368
1369 seastar::future<ProtocolV2::next_step_t>
1370 ProtocolV2::send_retry(uint64_t connect_seq)
1371 {
1372 auto retry = RetryFrame::Encode(connect_seq);
1373 logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
1374 return write_frame(retry).then([this] {
1375 return read_reconnect();
1376 });
1377 }
1378
1379 seastar::future<ProtocolV2::next_step_t>
1380 ProtocolV2::send_retry_global(uint64_t global_seq)
1381 {
1382 auto retry = RetryGlobalFrame::Encode(global_seq);
1383 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
1384 return write_frame(retry).then([this] {
1385 return read_reconnect();
1386 });
1387 }
1388
1389 seastar::future<ProtocolV2::next_step_t>
1390 ProtocolV2::send_reset(bool full)
1391 {
1392 auto reset = ResetFrame::Encode(full);
1393 logger().warn("{} WRITE ResetFrame: full={}", conn, full);
1394 return write_frame(reset).then([this] {
1395 return read_main_preamble();
1396 }).then([this] (Tag tag) {
1397 expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
1398 return server_connect();
1399 });
1400 }
1401
1402 seastar::future<ProtocolV2::next_step_t>
1403 ProtocolV2::server_reconnect()
1404 {
1405 return read_frame_payload().then([this] {
1406 // handle_reconnect() logic
1407 auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
1408
1409 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1410 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1411 conn, reconnect.addrs(),
1412 reconnect.client_cookie(), reconnect.server_cookie(),
1413 reconnect.global_seq(), reconnect.connect_seq(),
1414 reconnect.msg_seq());
1415
1416 // can peer_addrs be changed on-the-fly?
1417 // TODO: change peer_addr to entity_addrvec_t
1418 entity_addr_t paddr = reconnect.addrs().front();
1419 if (paddr.is_msgr2() || paddr.is_any()) {
1420 // good
1421 } else {
1422 logger().warn("{} peer's address {} is not v2", conn, paddr);
1423 throw std::system_error(
1424 make_error_code(crimson::net::error::bad_peer_address));
1425 }
1426 if (conn.peer_addr == entity_addr_t()) {
1427 conn.peer_addr = paddr;
1428 } else if (conn.peer_addr != paddr) {
1429 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1430 " reconnect failed",
1431 conn, paddr, conn.peer_addr);
1432 throw std::system_error(
1433 make_error_code(crimson::net::error::bad_peer_address));
1434 }
1435 peer_global_seq = reconnect.global_seq();
1436
1437 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1438
1439 if (!existing_conn) {
1440 // there is no existing connection therefore cannot reconnect to previous
1441 // session
1442 logger().warn("{} server_reconnect: no existing connection from address {},"
1443 " reseting client", conn, conn.peer_addr);
1444 return send_reset(true);
1445 }
1446
1447 if (existing_conn->protocol->proto_type != proto_t::v2) {
1448 logger().warn("{} server_reconnect: existing connection {} proto version is {},"
1449 "close existing and reset client.",
1450 conn, *existing_conn,
1451 static_cast<int>(existing_conn->protocol->proto_type));
1452 (void) existing_conn->close();
1453 return send_reset(true);
1454 }
1455
1456 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1457 existing_conn->protocol.get());
1458 ceph_assert(existing_proto);
1459 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1460 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1461 conn, global_seq, peer_global_seq, reconnect.connect_seq(),
1462 reconnect.client_cookie(), reconnect.server_cookie(),
1463 existing_conn,
1464 get_state_name(existing_proto->state),
1465 existing_proto->global_seq,
1466 existing_proto->peer_global_seq,
1467 existing_proto->connect_seq,
1468 existing_proto->client_cookie,
1469 existing_proto->server_cookie);
1470
1471 if (existing_proto->state == state_t::REPLACING) {
1472 logger().warn("{} server_reconnect: racing replace happened while "
1473 " replacing existing connection {}, retry global.",
1474 conn, *existing_conn);
1475 return send_retry_global(existing_proto->peer_global_seq);
1476 }
1477
1478 if (existing_proto->client_cookie != reconnect.client_cookie()) {
1479 logger().warn("{} server_reconnect:"
1480 " client_cookie mismatch with existing connection {},"
1481 " cc={} rcc={}. I must have reset, reseting client.",
1482 conn, *existing_conn,
1483 existing_proto->client_cookie, reconnect.client_cookie());
1484 return send_reset(conn.policy.resetcheck);
1485 } else if (existing_proto->server_cookie == 0) {
1486 // this happens when:
1487 // - a connects to b
1488 // - a sends client_ident
1489 // - b gets client_ident, sends server_ident and sets cookie X
1490 // - connection fault
1491 // - b reconnects to a with cookie X, connect_seq=1
1492 // - a has cookie==0
1493 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1494 " server_ident with existing connection {}."
1495 " Asking peer to resume session establishment",
1496 conn, existing_proto->client_cookie, *existing_conn);
1497 return send_reset(false);
1498 }
1499
1500 if (existing_proto->peer_global_seq > reconnect.global_seq()) {
1501 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1502 " with existing connection {},"
1503 " ask client to retry global",
1504 conn, existing_proto->peer_global_seq,
1505 reconnect.global_seq(), *existing_conn);
1506 return send_retry_global(existing_proto->peer_global_seq);
1507 }
1508
1509 if (existing_proto->connect_seq > reconnect.connect_seq()) {
1510 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1511 " with existing connection {}, ask client to retry",
1512 conn, reconnect.connect_seq(),
1513 existing_proto->connect_seq, *existing_conn);
1514 return send_retry(existing_proto->connect_seq);
1515 } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
1516 // reconnect race: both peers are sending reconnect messages
1517 if (existing_conn->peer_wins()) {
1518 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1519 " and win, reusing existing {}",
1520 conn, reconnect.connect_seq(), *existing_conn);
1521 return reuse_connection(
1522 existing_proto, false,
1523 true, reconnect.connect_seq(), reconnect.msg_seq());
1524 } else {
1525 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1526 " and lose to existing {}, ask client to wait",
1527 conn, reconnect.connect_seq(), *existing_conn);
1528 return send_wait();
1529 }
1530 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1531 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1532 " reusing existing {}",
1533 conn, existing_proto->connect_seq,
1534 reconnect.connect_seq(), *existing_conn);
1535 return reuse_connection(
1536 existing_proto, false,
1537 true, reconnect.connect_seq(), reconnect.msg_seq());
1538 }
1539 });
1540 }
1541
1542 void ProtocolV2::execute_accepting()
1543 {
1544 trigger_state(state_t::ACCEPTING, write_state_t::none, false);
1545 (void) seastar::with_gate(pending_dispatch, [this] {
1546 return seastar::futurize_apply([this] {
1547 INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
1548 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
1549 session_stream_handlers = { nullptr, nullptr };
1550 enable_recording();
1551 return banner_exchange();
1552 }).then([this] (entity_type_t _peer_type,
1553 entity_addr_t _my_addr_from_peer) {
1554 ceph_assert(conn.get_peer_type() == 0);
1555 conn.set_peer_type(_peer_type);
1556
1557 conn.policy = messenger.get_policy(_peer_type);
1558 logger().info("{} UPDATE: peer_type={},"
1559 " policy(lossy={} server={} standby={} resetcheck={})",
1560 conn, ceph_entity_type_name(_peer_type),
1561 conn.policy.lossy, conn.policy.server,
1562 conn.policy.standby, conn.policy.resetcheck);
1563 if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
1564 messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
1565 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1566 conn, _my_addr_from_peer, messenger.get_myaddr());
1567 throw std::system_error(
1568 make_error_code(crimson::net::error::bad_peer_address));
1569 }
1570 return messenger.learned_addr(_my_addr_from_peer, conn);
1571 }).then([this] {
1572 return server_auth();
1573 }).then([this] {
1574 return read_main_preamble();
1575 }).then([this] (Tag tag) {
1576 switch (tag) {
1577 case Tag::CLIENT_IDENT:
1578 return server_connect();
1579 case Tag::SESSION_RECONNECT:
1580 return server_reconnect();
1581 default: {
1582 unexpected_tag(tag, conn, "post_server_auth");
1583 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1584 }
1585 }
1586 }).then([this] (next_step_t next) {
1587 switch (next) {
1588 case next_step_t::ready:
1589 assert(state != state_t::ACCEPTING);
1590 break;
1591 case next_step_t::wait:
1592 if (unlikely(state != state_t::ACCEPTING)) {
1593 logger().debug("{} triggered {} at the end of execute_accepting()",
1594 conn, get_state_name(state));
1595 abort_protocol();
1596 }
1597 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
1598 execute_server_wait();
1599 break;
1600 default:
1601 ceph_abort("impossible next step");
1602 }
1603 }).handle_exception([this] (std::exception_ptr eptr) {
1604 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1605 conn, get_state_name(state), eptr);
1606 (void) close();
1607 });
1608 });
1609 }
1610
1611 // CONNECTING or ACCEPTING state
1612
1613 seastar::future<> ProtocolV2::finish_auth()
1614 {
1615 ceph_assert(auth_meta);
1616
1617 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1618 auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
1619 auto sig_frame = AuthSignatureFrame::Encode(sig);
1620 ceph_assert(record_io);
1621 record_io = false;
1622 rxbuf.clear();
1623 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
1624 return write_frame(sig_frame).then([this] {
1625 return read_main_preamble();
1626 }).then([this] (Tag tag) {
1627 expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
1628 return read_frame_payload();
1629 }).then([this] {
1630 // handle_auth_signature() logic
1631 auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
1632 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
1633
1634 const auto actual_tx_sig = auth_meta->session_key.empty() ?
1635 sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
1636 if (sig_frame.signature() != actual_tx_sig) {
1637 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1638 " sig_frame.signature()={}",
1639 conn, actual_tx_sig, sig_frame.signature());
1640 abort_in_fault();
1641 }
1642 txbuf.clear();
1643 });
1644 }
1645
1646 // ESTABLISHING
1647
1648 void ProtocolV2::execute_establishing() {
1649 trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
1650 (void) seastar::with_gate(pending_dispatch, [this] {
1651 return dispatcher.ms_handle_accept(
1652 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1653 }).handle_exception([this] (std::exception_ptr eptr) {
1654 logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
1655 ceph_abort("unexpected exception from ms_handle_accept()");
1656 });
1657 messenger.register_conn(
1658 seastar::static_pointer_cast<SocketConnection>(
1659 conn.shared_from_this()));
1660 messenger.unaccept_conn(
1661 seastar::static_pointer_cast<SocketConnection>(
1662 conn.shared_from_this()));
1663 execution_done = seastar::with_gate(pending_dispatch, [this] {
1664 return seastar::futurize_apply([this] {
1665 return send_server_ident();
1666 }).then([this] {
1667 if (unlikely(state != state_t::ESTABLISHING)) {
1668 logger().debug("{} triggered {} at the end of execute_establishing()",
1669 conn, get_state_name(state));
1670 abort_protocol();
1671 }
1672 logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
1673 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
1674 conn, global_seq, peer_global_seq, connect_seq,
1675 client_cookie, server_cookie, conn.in_seq,
1676 conn.out_seq, conn.out_q.size());
1677 execute_ready();
1678 }).handle_exception([this] (std::exception_ptr eptr) {
1679 if (state != state_t::ESTABLISHING) {
1680 logger().info("{} execute_establishing() protocol aborted at {} -- {}",
1681 conn, get_state_name(state), eptr);
1682 assert(state == state_t::CLOSING ||
1683 state == state_t::REPLACING);
1684 return;
1685 }
1686 fault(false, "execute_establishing()", eptr);
1687 });
1688 });
1689 }
1690
1691 // ESTABLISHING or REPLACING state
1692
1693 seastar::future<>
1694 ProtocolV2::send_server_ident()
1695 {
1696 // send_server_ident() logic
1697
1698 // refered to async-conn v2: not assign gs to global_seq
1699 return messenger.get_global_seq().then([this] (auto gs) {
1700 logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
1701
1702 // this is required for the case when this connection is being replaced
1703 requeue_up_to(0);
1704 conn.in_seq = 0;
1705
1706 if (!conn.policy.lossy) {
1707 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1708 }
1709
1710 uint64_t flags = 0;
1711 if (conn.policy.lossy) {
1712 flags = flags | CEPH_MSG_CONNECT_LOSSY;
1713 }
1714
1715 auto server_ident = ServerIdentFrame::Encode(
1716 messenger.get_myaddrs(),
1717 messenger.get_myname().num(),
1718 gs,
1719 conn.policy.features_supported,
1720 conn.policy.features_required | msgr2_required,
1721 flags,
1722 server_cookie);
1723
1724 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1725 " gs={}, features_supported={}, features_required={},"
1726 " flags={}, cookie={}",
1727 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
1728 gs, conn.policy.features_supported,
1729 conn.policy.features_required | msgr2_required,
1730 flags, server_cookie);
1731
1732 conn.set_features(connection_features);
1733
1734 return write_frame(server_ident);
1735 });
1736 }
1737
1738 // REPLACING state
1739
1740 void ProtocolV2::trigger_replacing(bool reconnect,
1741 bool do_reset,
1742 SocketRef&& new_socket,
1743 AuthConnectionMetaRef&& new_auth_meta,
1744 ceph::crypto::onwire::rxtx_t new_rxtx,
1745 uint64_t new_peer_global_seq,
1746 uint64_t new_client_cookie,
1747 entity_name_t new_peer_name,
1748 uint64_t new_conn_features,
1749 uint64_t new_connect_seq,
1750 uint64_t new_msg_seq)
1751 {
1752 trigger_state(state_t::REPLACING, write_state_t::delay, false);
1753 if (socket) {
1754 socket->shutdown();
1755 }
1756 (void) seastar::with_gate(pending_dispatch, [this] {
1757 return dispatcher.ms_handle_accept(
1758 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1759 }).handle_exception([this] (std::exception_ptr eptr) {
1760 logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
1761 ceph_abort("unexpected exception from ms_handle_accept()");
1762 });
1763 (void) seastar::with_gate(pending_dispatch,
1764 [this,
1765 reconnect,
1766 do_reset,
1767 new_socket = std::move(new_socket),
1768 new_auth_meta = std::move(new_auth_meta),
1769 new_rxtx = std::move(new_rxtx),
1770 new_client_cookie, new_peer_name,
1771 new_conn_features, new_peer_global_seq,
1772 new_connect_seq, new_msg_seq] () mutable {
1773 return wait_write_exit().then([this, do_reset] {
1774 if (do_reset) {
1775 reset_session(true);
1776 }
1777 protocol_timer.cancel();
1778 return execution_done.get_future();
1779 }).then([this,
1780 reconnect,
1781 new_socket = std::move(new_socket),
1782 new_auth_meta = std::move(new_auth_meta),
1783 new_rxtx = std::move(new_rxtx),
1784 new_client_cookie, new_peer_name,
1785 new_conn_features, new_peer_global_seq,
1786 new_connect_seq, new_msg_seq] () mutable {
1787 if (unlikely(state != state_t::REPLACING)) {
1788 return new_socket->close().then([sock = std::move(new_socket)] {
1789 abort_protocol();
1790 });
1791 }
1792
1793 if (socket) {
1794 (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
1795 return sock->close().then([sock = std::move(sock)] {});
1796 });
1797 }
1798 socket = std::move(new_socket);
1799 auth_meta = std::move(new_auth_meta);
1800 session_stream_handlers = std::move(new_rxtx);
1801 record_io = false;
1802 peer_global_seq = new_peer_global_seq;
1803
1804 if (reconnect) {
1805 connect_seq = new_connect_seq;
1806 // send_reconnect_ok() logic
1807 requeue_up_to(new_msg_seq);
1808 auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
1809 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
1810 return write_frame(reconnect_ok);
1811 } else {
1812 client_cookie = new_client_cookie;
1813 conn.set_peer_name(new_peer_name);
1814 connection_features = new_conn_features;
1815 return send_server_ident();
1816 }
1817 }).then([this, reconnect] {
1818 if (unlikely(state != state_t::REPLACING)) {
1819 logger().debug("{} triggered {} at the end of trigger_replacing()",
1820 conn, get_state_name(state));
1821 abort_protocol();
1822 }
1823 logger().info("{} replaced ({}):"
1824 " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
1825 " in_seq={}, out_seq={}, out_q={}",
1826 conn, reconnect ? "reconnected" : "connected",
1827 global_seq, peer_global_seq, connect_seq, client_cookie,
1828 server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
1829 execute_ready();
1830 }).handle_exception([this] (std::exception_ptr eptr) {
1831 if (state != state_t::REPLACING) {
1832 logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
1833 conn, get_state_name(state), eptr);
1834 assert(state == state_t::CLOSING);
1835 return;
1836 }
1837 fault(true, "trigger_replacing()", eptr);
1838 });
1839 });
1840 }
1841
1842 // READY state
1843
1844 ceph::bufferlist ProtocolV2::do_sweep_messages(
1845 const std::deque<MessageRef>& msgs,
1846 size_t num_msgs,
1847 bool require_keepalive,
1848 std::optional<utime_t> _keepalive_ack,
1849 bool require_ack)
1850 {
1851 ceph::bufferlist bl;
1852
1853 if (unlikely(require_keepalive)) {
1854 auto keepalive_frame = KeepAliveFrame::Encode();
1855 bl.append(keepalive_frame.get_buffer(session_stream_handlers));
1856 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
1857 }
1858
1859 if (unlikely(_keepalive_ack.has_value())) {
1860 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
1861 bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
1862 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
1863 }
1864
1865 if (require_ack && !num_msgs) {
1866 auto ack_frame = AckFrame::Encode(conn.in_seq);
1867 bl.append(ack_frame.get_buffer(session_stream_handlers));
1868 INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
1869 }
1870
1871 std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
1872 // TODO: move to common code
1873 // set priority
1874 msg->get_header().src = messenger.get_myname();
1875
1876 msg->encode(conn.features, 0);
1877
1878 ceph_assert(!msg->get_seq() && "message already has seq");
1879 msg->set_seq(++conn.out_seq);
1880
1881 ceph_msg_header &header = msg->get_header();
1882 ceph_msg_footer &footer = msg->get_footer();
1883
1884 ceph_msg_header2 header2{header.seq, header.tid,
1885 header.type, header.priority,
1886 header.version,
1887 init_le32(0), header.data_off,
1888 init_le64(conn.in_seq),
1889 footer.flags, header.compat_version,
1890 header.reserved};
1891
1892 auto message = MessageFrame::Encode(header2,
1893 msg->get_payload(), msg->get_middle(), msg->get_data());
1894 logger().debug("{} --> #{} === {} ({})",
1895 conn, msg->get_seq(), *msg, msg->get_type());
1896 bl.append(message.get_buffer(session_stream_handlers));
1897 INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
1898 });
1899
1900 return bl;
1901 }
1902
1903 seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
1904 {
1905 return read_frame_payload()
1906 .then([this, throttle_stamp] {
1907 utime_t recv_stamp{seastar::lowres_system_clock::now()};
1908
1909 // we need to get the size before std::moving segments data
1910 const size_t cur_msg_size = get_current_msg_size();
1911 auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
1912 // XXX: paranoid copy just to avoid oops
1913 ceph_msg_header2 current_header = msg_frame.header();
1914
1915 logger().trace("{} got {} + {} + {} byte message,"
1916 " envelope type={} src={} off={} seq={}",
1917 conn, msg_frame.front_len(), msg_frame.middle_len(),
1918 msg_frame.data_len(), current_header.type, conn.get_peer_name(),
1919 current_header.data_off, current_header.seq);
1920
1921 ceph_msg_header header{current_header.seq,
1922 current_header.tid,
1923 current_header.type,
1924 current_header.priority,
1925 current_header.version,
1926 init_le32(msg_frame.front_len()),
1927 init_le32(msg_frame.middle_len()),
1928 init_le32(msg_frame.data_len()),
1929 current_header.data_off,
1930 conn.get_peer_name(),
1931 current_header.compat_version,
1932 current_header.reserved,
1933 init_le32(0)};
1934 ceph_msg_footer footer{init_le32(0), init_le32(0),
1935 init_le32(0), init_le64(0), current_header.flags};
1936
1937 auto pconn = seastar::static_pointer_cast<SocketConnection>(
1938 conn.shared_from_this());
1939 Message *message = decode_message(nullptr, 0, header, footer,
1940 msg_frame.front(), msg_frame.middle(), msg_frame.data(),
1941 std::move(pconn));
1942 if (!message) {
1943 logger().warn("{} decode message failed", conn);
1944 abort_in_fault();
1945 }
1946
1947 // store reservation size in message, so we don't get confused
1948 // by messages entering the dispatch queue through other paths.
1949 message->set_dispatch_throttle_size(cur_msg_size);
1950
1951 message->set_throttle_stamp(throttle_stamp);
1952 message->set_recv_stamp(recv_stamp);
1953 message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
1954
1955 // check received seq#. if it is old, drop the message.
1956 // note that incoming messages may skip ahead. this is convenient for the
1957 // client side queueing because messages can't be renumbered, but the (kernel)
1958 // client will occasionally pull a message out of the sent queue to send
1959 // elsewhere. in that case it doesn't matter if we "got" it or not.
1960 uint64_t cur_seq = conn.in_seq;
1961 if (message->get_seq() <= cur_seq) {
1962 logger().error("{} got old message {} <= {} {} {}, discarding",
1963 conn, message->get_seq(), cur_seq, message, *message);
1964 if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
1965 conf.ms_die_on_old_message) {
1966 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1967 }
1968 return;
1969 } else if (message->get_seq() > cur_seq + 1) {
1970 logger().error("{} missed message? skipped from seq {} to {}",
1971 conn, cur_seq, message->get_seq());
1972 if (conf.ms_die_on_skipped_message) {
1973 ceph_assert(0 == "skipped incoming seq");
1974 }
1975 }
1976
1977 // note last received message.
1978 conn.in_seq = message->get_seq();
1979 logger().debug("{} <== #{} === {} ({})",
1980 conn, message->get_seq(), *message, message->get_type());
1981 notify_ack();
1982 ack_writes(current_header.ack_seq);
1983
1984 // TODO: change MessageRef with seastar::shared_ptr
1985 auto msg_ref = MessageRef{message, false};
1986 (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
1987 return dispatcher.ms_dispatch(&conn, std::move(msg));
1988 }).handle_exception([this] (std::exception_ptr eptr) {
1989 logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
1990 ceph_abort("unexpected exception from ms_dispatch()");
1991 });
1992 });
1993 }
1994
1995 void ProtocolV2::execute_ready()
1996 {
1997 assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
1998 trigger_state(state_t::READY, write_state_t::open, false);
1999 #ifdef UNIT_TESTS_BUILT
2000 if (conn.interceptor) {
2001 conn.interceptor->register_conn_ready(conn);
2002 }
2003 #endif
2004 execution_done = seastar::with_gate(pending_dispatch, [this] {
2005 protocol_timer.cancel();
2006 return seastar::keep_doing([this] {
2007 return read_main_preamble()
2008 .then([this] (Tag tag) {
2009 switch (tag) {
2010 case Tag::MESSAGE: {
2011 return seastar::futurize_apply([this] {
2012 // throttle_message() logic
2013 if (!conn.policy.throttler_messages) {
2014 return seastar::now();
2015 }
2016 // TODO: message throttler
2017 ceph_assert(false);
2018 return seastar::now();
2019 }).then([this] {
2020 // throttle_bytes() logic
2021 if (!conn.policy.throttler_bytes) {
2022 return seastar::now();
2023 }
2024 size_t cur_msg_size = get_current_msg_size();
2025 if (!cur_msg_size) {
2026 return seastar::now();
2027 }
2028 logger().trace("{} wants {} bytes from policy throttler {}/{}",
2029 conn, cur_msg_size,
2030 conn.policy.throttler_bytes->get_current(),
2031 conn.policy.throttler_bytes->get_max());
2032 return conn.policy.throttler_bytes->get(cur_msg_size);
2033 }).then([this] {
2034 // TODO: throttle_dispatch_queue() logic
2035 utime_t throttle_stamp{seastar::lowres_system_clock::now()};
2036 return read_message(throttle_stamp);
2037 });
2038 }
2039 case Tag::ACK:
2040 return read_frame_payload().then([this] {
2041 // handle_message_ack() logic
2042 auto ack = AckFrame::Decode(rx_segments_data.back());
2043 logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
2044 ack_writes(ack.seq());
2045 });
2046 case Tag::KEEPALIVE2:
2047 return read_frame_payload().then([this] {
2048 // handle_keepalive2() logic
2049 auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
2050 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
2051 conn, keepalive_frame.timestamp());
2052 notify_keepalive_ack(keepalive_frame.timestamp());
2053 conn.set_last_keepalive(seastar::lowres_system_clock::now());
2054 });
2055 case Tag::KEEPALIVE2_ACK:
2056 return read_frame_payload().then([this] {
2057 // handle_keepalive2_ack() logic
2058 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
2059 conn.set_last_keepalive_ack(
2060 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
2061 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
2062 conn, conn.last_keepalive_ack);
2063 });
2064 default: {
2065 unexpected_tag(tag, conn, "execute_ready");
2066 return seastar::now();
2067 }
2068 }
2069 });
2070 }).handle_exception([this] (std::exception_ptr eptr) {
2071 if (state != state_t::READY) {
2072 logger().info("{} execute_ready(): protocol aborted at {} -- {}",
2073 conn, get_state_name(state), eptr);
2074 assert(state == state_t::REPLACING ||
2075 state == state_t::CLOSING);
2076 return;
2077 }
2078 fault(false, "execute_ready()", eptr);
2079 });
2080 });
2081 }
2082
2083 // STANDBY state
2084
2085 void ProtocolV2::execute_standby()
2086 {
2087 trigger_state(state_t::STANDBY, write_state_t::delay, true);
2088 if (socket) {
2089 socket->shutdown();
2090 }
2091 }
2092
2093 void ProtocolV2::notify_write()
2094 {
2095 if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
2096 logger().info("{} notify_write(): at {}, going to CONNECTING",
2097 conn, get_state_name(state));
2098 execute_connecting();
2099 }
2100 }
2101
2102 // WAIT state
2103
2104 void ProtocolV2::execute_wait(bool max_backoff)
2105 {
2106 trigger_state(state_t::WAIT, write_state_t::delay, true);
2107 if (socket) {
2108 socket->shutdown();
2109 }
2110 execution_done = seastar::with_gate(pending_dispatch,
2111 [this, max_backoff] {
2112 double backoff = protocol_timer.last_dur();
2113 if (max_backoff) {
2114 backoff = conf.ms_max_backoff;
2115 } else if (backoff > 0) {
2116 backoff = std::min(conf.ms_max_backoff, 2 * backoff);
2117 } else {
2118 backoff = conf.ms_initial_backoff;
2119 }
2120 return protocol_timer.backoff(backoff).then([this] {
2121 if (unlikely(state != state_t::WAIT)) {
2122 logger().debug("{} triggered {} at the end of execute_wait()",
2123 conn, get_state_name(state));
2124 abort_protocol();
2125 }
2126 logger().info("{} execute_wait(): going to CONNECTING", conn);
2127 execute_connecting();
2128 }).handle_exception([this] (std::exception_ptr eptr) {
2129 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
2130 conn, get_state_name(state), eptr);
2131 assert(state == state_t::REPLACING ||
2132 state == state_t::CLOSING);
2133 });
2134 });
2135 }
2136
2137 // SERVER_WAIT state
2138
2139 void ProtocolV2::execute_server_wait()
2140 {
2141 trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
2142 execution_done = seastar::with_gate(pending_dispatch, [this] {
2143 return read_exactly(1).then([this] (auto bl) {
2144 logger().warn("{} SERVER_WAIT got read, abort", conn);
2145 abort_in_fault();
2146 }).handle_exception([this] (std::exception_ptr eptr) {
2147 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
2148 conn, get_state_name(state), eptr);
2149 (void) close();
2150 });
2151 });
2152 }
2153
2154 // CLOSING state
2155
2156 void ProtocolV2::trigger_close()
2157 {
2158 if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
2159 messenger.unaccept_conn(
2160 seastar::static_pointer_cast<SocketConnection>(
2161 conn.shared_from_this()));
2162 } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
2163 messenger.unregister_conn(
2164 seastar::static_pointer_cast<SocketConnection>(
2165 conn.shared_from_this()));
2166 } else {
2167 // cannot happen
2168 ceph_assert(false);
2169 }
2170
2171 protocol_timer.cancel();
2172
2173 trigger_state(state_t::CLOSING, write_state_t::drop, false);
2174 #ifdef UNIT_TESTS_BUILT
2175 if (conn.interceptor) {
2176 conn.interceptor->register_conn_closed(conn);
2177 }
2178 #endif
2179 }
2180
2181 } // namespace crimson::net