]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/ProtocolV2.cc
d68dba174ce593964405cba3edecc9d275fba838
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV2.h"
5
6 #include <seastar/core/lowres_clock.hh>
7 #include <fmt/format.h>
8 #if FMT_VERSION >= 60000
9 #include <fmt/chrono.h>
10 #else
11 #include <fmt/time.h>
12 #endif
13 #include "include/msgr.h"
14 #include "include/random.h"
15
16 #include "crimson/auth/AuthClient.h"
17 #include "crimson/auth/AuthServer.h"
18
19 #include "Config.h"
20 #include "Dispatcher.h"
21 #include "Errors.h"
22 #include "Socket.h"
23 #include "SocketConnection.h"
24 #include "SocketMessenger.h"
25
26 #ifdef UNIT_TESTS_BUILT
27 #include "Interceptor.h"
28 #endif
29
30 using namespace ceph::msgr::v2;
31
32 namespace {
33
34 // TODO: apply the same logging policy to Protocol V1
35 // Log levels in V2 Protocol:
36 // * error level, something error that cause connection to terminate:
37 // - fatal errors;
38 // - bugs;
39 // * warn level: something unusual that identifies connection fault or replacement:
40 // - unstable network;
41 // - incompatible peer;
42 // - auth failure;
43 // - connection race;
44 // - connection reset;
45 // * info level, something very important to show connection lifecycle,
46 // which doesn't happen very frequently;
47 // * debug level, important logs for debugging, including:
48 // - all the messages sent/received (-->/<==);
49 // - all the frames exchanged (WRITE/GOT);
50 // - important fields updated (UPDATE);
51 // - connection state transitions (TRIGGER);
52 // * trace level, trivial logs showing:
53 // - the exact bytes being sent/received (SEND/RECV(bytes));
54 // - detailed information of sub-frames;
55 // - integrity checks;
56 // - etc.
57 seastar::logger& logger() {
58 return crimson::get_logger(ceph_subsys_ms);
59 }
60
61 void abort_in_fault() {
62 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
63 }
64
65 void abort_protocol() {
66 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
67 }
68
69 void abort_in_close(crimson::net::ProtocolV2& proto) {
70 (void) proto.close();
71 abort_protocol();
72 }
73
74 inline void expect_tag(const Tag& expected,
75 const Tag& actual,
76 crimson::net::SocketConnection& conn,
77 const char *where) {
78 if (actual != expected) {
79 logger().warn("{} {} received wrong tag: {}, expected {}",
80 conn, where,
81 static_cast<uint32_t>(actual),
82 static_cast<uint32_t>(expected));
83 abort_in_fault();
84 }
85 }
86
87 inline void unexpected_tag(const Tag& unexpected,
88 crimson::net::SocketConnection& conn,
89 const char *where) {
90 logger().warn("{} {} received unexpected tag: {}",
91 conn, where, static_cast<uint32_t>(unexpected));
92 abort_in_fault();
93 }
94
95 inline uint64_t generate_client_cookie() {
96 return ceph::util::generate_random_number<uint64_t>(
97 1, std::numeric_limits<uint64_t>::max());
98 }
99
100 } // namespace anonymous
101
102 template <>
103 struct fmt::formatter<seastar::lowres_system_clock::time_point> {
104 // ignore the format string
105 template <typename ParseContext>
106 constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }
107
108 template <typename FormatContext>
109 auto format(const seastar::lowres_system_clock::time_point& t,
110 FormatContext& ctx) {
111 std::time_t tt = std::chrono::duration_cast<std::chrono::seconds>(
112 t.time_since_epoch()).count();
113 auto milliseconds = (t.time_since_epoch() %
114 std::chrono::seconds(1)).count();
115 return fmt::format_to(ctx.out(), "{:%Y-%m-%d %H:%M:%S} {:03d}",
116 fmt::localtime(tt), milliseconds);
117 }
118 };
119
120 namespace std {
121 inline ostream& operator<<(
122 ostream& out, const seastar::lowres_system_clock::time_point& t)
123 {
124 return out << fmt::format("{}", t);
125 }
126 }
127
128 namespace crimson::net {
129
130 #ifdef UNIT_TESTS_BUILT
131 void intercept(Breakpoint bp, bp_type_t type,
132 SocketConnection& conn, SocketRef& socket) {
133 if (conn.interceptor) {
134 auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
135 socket->set_trap(type, action, &conn.interceptor->blocker);
136 }
137 }
138
139 #define INTERCEPT_CUSTOM(bp, type) \
140 intercept({bp}, type, conn, socket)
141
142 #define INTERCEPT_FRAME(tag, type) \
143 intercept({static_cast<Tag>(tag), type}, \
144 type, conn, socket)
145
146 #define INTERCEPT_N_RW(bp) \
147 if (conn.interceptor) { \
148 auto action = conn.interceptor->intercept(conn, {bp}); \
149 ceph_assert(action != bp_action_t::BLOCK); \
150 if (action == bp_action_t::FAULT) { \
151 abort_in_fault(); \
152 } \
153 }
154
155 #else
156 #define INTERCEPT_CUSTOM(bp, type)
157 #define INTERCEPT_FRAME(tag, type)
158 #define INTERCEPT_N_RW(bp)
159 #endif
160
161 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
162 {
163 logger().warn("{} waiting {} seconds ...", conn, seconds);
164 cancel();
165 last_dur_ = seconds;
166 as = seastar::abort_source();
167 auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
168 std::chrono::duration<double>(seconds));
169 return seastar::sleep_abortable(dur, *as
170 ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
171 logger().debug("{} wait aborted", conn);
172 abort_protocol();
173 });
174 }
175
176 ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
177 SocketConnection& conn,
178 SocketMessenger& messenger)
179 : Protocol(proto_t::v2, dispatcher, conn),
180 messenger{messenger},
181 protocol_timer{conn},
182 tx_frame_asm(&session_stream_handlers, false)
183 {}
184
185 ProtocolV2::~ProtocolV2() {}
186
187 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
188 const entity_type_t& _peer_type)
189 {
190 ceph_assert(state == state_t::NONE);
191 ceph_assert(!socket);
192 conn.peer_addr = _peer_addr;
193 conn.target_addr = _peer_addr;
194 conn.set_peer_type(_peer_type);
195 conn.policy = messenger.get_policy(_peer_type);
196 client_cookie = generate_client_cookie();
197 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={}, cc={}"
198 " policy(lossy={}, server={}, standby={}, resetcheck={})",
199 conn, _peer_addr, ceph_entity_type_name(_peer_type), client_cookie,
200 conn.policy.lossy, conn.policy.server,
201 conn.policy.standby, conn.policy.resetcheck);
202 messenger.register_conn(
203 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
204 execute_connecting();
205 }
206
207 void ProtocolV2::start_accept(SocketRef&& sock,
208 const entity_addr_t& _peer_addr)
209 {
210 ceph_assert(state == state_t::NONE);
211 ceph_assert(!socket);
212 // until we know better
213 conn.target_addr = _peer_addr;
214 conn.set_ephemeral_port(_peer_addr.get_port(),
215 SocketConnection::side_t::acceptor);
216 socket = std::move(sock);
217 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
218 messenger.accept_conn(
219 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
220 execute_accepting();
221 }
222
223 // TODO: Frame related implementations, probably to a separate class.
224
225 void ProtocolV2::enable_recording()
226 {
227 rxbuf.clear();
228 txbuf.clear();
229 record_io = true;
230 }
231
232 seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
233 {
234 if (unlikely(record_io)) {
235 return socket->read_exactly(bytes)
236 .then([this] (auto bl) {
237 rxbuf.append(buffer::create(bl.share()));
238 return bl;
239 });
240 } else {
241 return socket->read_exactly(bytes);
242 };
243 }
244
245 seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
246 {
247 if (unlikely(record_io)) {
248 return socket->read(bytes)
249 .then([this] (auto buf) {
250 rxbuf.append(buf);
251 return buf;
252 });
253 } else {
254 return socket->read(bytes);
255 }
256 }
257
258 seastar::future<> ProtocolV2::write(bufferlist&& buf)
259 {
260 if (unlikely(record_io)) {
261 txbuf.append(buf);
262 }
263 return socket->write(std::move(buf));
264 }
265
266 seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
267 {
268 if (unlikely(record_io)) {
269 txbuf.append(buf);
270 }
271 return socket->write_flush(std::move(buf));
272 }
273
274 size_t ProtocolV2::get_current_msg_size() const
275 {
276 ceph_assert(!rx_segments_desc.empty());
277 size_t sum = 0;
278 // we don't include SegmentIndex::Msg::HEADER.
279 for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
280 sum += rx_segments_desc[idx].length;
281 }
282 return sum;
283 }
284
285 seastar::future<Tag> ProtocolV2::read_main_preamble()
286 {
287 return read_exactly(sizeof(preamble_block_t))
288 .then([this] (auto bl) {
289 if (session_stream_handlers.rx) {
290 session_stream_handlers.rx->reset_rx_handler();
291 /*
292 bl = session_stream_handlers.rx->authenticated_decrypt_update(
293 std::move(bl), segment_t::DEFAULT_ALIGNMENT);
294 */
295 }
296
297 // I expect ceph_le32 will make the endian conversion for me. Passing
298 // everything through ::Decode is unnecessary.
299 const auto& main_preamble = \
300 *reinterpret_cast<const preamble_block_t*>(bl.get());
301 logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
302 conn, bl.size(), (int)main_preamble.tag,
303 (int)main_preamble.num_segments, main_preamble.crc);
304
305 // verify preamble's CRC before any further processing
306 const auto rx_crc = ceph_crc32c(0,
307 reinterpret_cast<const unsigned char*>(&main_preamble),
308 sizeof(main_preamble) - sizeof(main_preamble.crc));
309 if (rx_crc != main_preamble.crc) {
310 logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
311 conn, rx_crc, main_preamble.crc);
312 abort_in_fault();
313 }
314
315 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
316 if (main_preamble.num_segments < 1 ||
317 main_preamble.num_segments > MAX_NUM_SEGMENTS) {
318 logger().warn("{} unsupported num_segments={}",
319 conn, main_preamble.num_segments);
320 abort_in_fault();
321 }
322 if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
323 logger().warn("{} num_segments too much: {}",
324 conn, main_preamble.num_segments);
325 abort_in_fault();
326 }
327
328 rx_segments_desc.clear();
329 rx_segments_data.clear();
330
331 for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
332 logger().trace("{} GOT frame segment: len={} align={}",
333 conn, main_preamble.segments[idx].length,
334 main_preamble.segments[idx].alignment);
335 rx_segments_desc.emplace_back(main_preamble.segments[idx]);
336 }
337
338 INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ);
339 return static_cast<Tag>(main_preamble.tag);
340 });
341 }
342
343 seastar::future<> ProtocolV2::read_frame_payload()
344 {
345 ceph_assert(!rx_segments_desc.empty());
346 ceph_assert(rx_segments_data.empty());
347
348 return seastar::do_until(
349 [this] { return rx_segments_desc.size() == rx_segments_data.size(); },
350 [this] {
351 // description of current segment to read
352 const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
353 // TODO: create aligned and contiguous buffer from socket
354 if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) {
355 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
356 conn, cur_rx_desc.alignment, rx_segments_data.size());
357 }
358 // TODO: create aligned and contiguous buffer from socket
359 return read_exactly(cur_rx_desc.length)
360 .then([this] (auto tmp_bl) {
361 logger().trace("{} RECV({}) frame segment[{}]",
362 conn, tmp_bl.size(), rx_segments_data.size());
363 bufferlist data;
364 data.append(buffer::create(std::move(tmp_bl)));
365 if (session_stream_handlers.rx) {
366 // TODO
367 ceph_assert(false);
368 }
369 rx_segments_data.emplace_back(std::move(data));
370 });
371 }
372 ).then([this] {
373 // TODO: get_epilogue_size()
374 ceph_assert(!session_stream_handlers.rx);
375 return read_exactly(sizeof(epilogue_crc_rev0_block_t));
376 }).then([this] (auto bl) {
377 logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
378
379 __u8 late_flags;
380 if (session_stream_handlers.rx) {
381 // TODO
382 ceph_assert(false);
383 } else {
384 auto& epilogue = *reinterpret_cast<const epilogue_crc_rev0_block_t*>(bl.get());
385 for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
386 const __u32 expected_crc = epilogue.crc_values[idx];
387 const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
388 if (expected_crc != calculated_crc) {
389 logger().warn("{} message integrity check failed at index {}:"
390 " expected_crc={} calculated_crc={}",
391 conn, (unsigned int)idx, expected_crc, calculated_crc);
392 abort_in_fault();
393 } else {
394 logger().trace("{} message integrity check success at index {}: crc={}",
395 conn, (unsigned int)idx, expected_crc);
396 }
397 }
398 late_flags = epilogue.late_flags;
399 }
400 logger().trace("{} GOT frame epilogue: late_flags={}",
401 conn, (unsigned)late_flags);
402
403 // we do have a mechanism that allows transmitter to start sending message
404 // and abort after putting entire data field on wire. This will be used by
405 // the kernel client to avoid unnecessary buffering.
406 if (late_flags & FRAME_LATE_FLAG_ABORTED) {
407 // TODO
408 ceph_assert(false);
409 }
410 });
411 }
412
413 template <class F>
414 seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
415 {
416 auto bl = frame.get_buffer(tx_frame_asm);
417 const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
418 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
419 conn, bl.length(), (int)main_preamble->tag,
420 (int)main_preamble->num_segments, main_preamble->crc);
421 INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
422 if (flush) {
423 return write_flush(std::move(bl));
424 } else {
425 return write(std::move(bl));
426 }
427 }
428
429 void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
430 {
431 if (!reentrant && _state == state) {
432 logger().error("{} is not allowed to re-trigger state {}",
433 conn, get_state_name(state));
434 ceph_assert(false);
435 }
436 logger().debug("{} TRIGGER {}, was {}",
437 conn, get_state_name(_state), get_state_name(state));
438 state = _state;
439 set_write_state(_write_state);
440 }
441
442 void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
443 {
444 if (conn.policy.lossy) {
445 logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
446 conn, func_name, get_state_name(state), eptr);
447 dispatch_reset();
448 (void) close();
449 } else if (conn.policy.server ||
450 (conn.policy.standby &&
451 (!is_queued() && conn.sent.empty()))) {
452 logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
453 conn, func_name, get_state_name(state), eptr);
454 execute_standby();
455 } else if (backoff) {
456 logger().info("{} {}: fault at {}, going to WAIT -- {}",
457 conn, func_name, get_state_name(state), eptr);
458 execute_wait(false);
459 } else {
460 logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
461 conn, func_name, get_state_name(state), eptr);
462 execute_connecting();
463 }
464 }
465
466 void ProtocolV2::dispatch_reset()
467 {
468 (void) seastar::with_gate(pending_dispatch, [this] {
469 return dispatcher.ms_handle_reset(
470 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
471 }).handle_exception([this] (std::exception_ptr eptr) {
472 logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
473 ceph_abort("unexpected exception from ms_handle_reset()");
474 });
475 }
476
477 void ProtocolV2::reset_session(bool full)
478 {
479 server_cookie = 0;
480 connect_seq = 0;
481 conn.in_seq = 0;
482 if (full) {
483 client_cookie = generate_client_cookie();
484 peer_global_seq = 0;
485 reset_write();
486 (void) seastar::with_gate(pending_dispatch, [this] {
487 return dispatcher.ms_handle_remote_reset(
488 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
489 }).handle_exception([this] (std::exception_ptr eptr) {
490 logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr);
491 ceph_abort("unexpected exception from ms_handle_remote_reset()");
492 });
493 }
494 }
495
496 seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
497 {
498 // 1. prepare and send banner
499 bufferlist banner_payload;
500 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
501 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
502
503 bufferlist bl;
504 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
505 auto len_payload = static_cast<uint16_t>(banner_payload.length());
506 encode(len_payload, bl, 0);
507 bl.claim_append(banner_payload);
508 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
509 "required={}, banner=\"{}\"",
510 conn, bl.length(), len_payload,
511 CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
512 CEPH_BANNER_V2_PREFIX);
513 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
514 return write_flush(std::move(bl)).then([this] {
515 // 2. read peer banner
516 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
517 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
518 return read_exactly(banner_len); // or read exactly?
519 }).then([this] (auto bl) {
520 // 3. process peer banner and read banner_payload
521 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
522 logger().debug("{} RECV({}) banner: \"{}\"",
523 conn, bl.size(),
524 std::string((const char*)bl.get(), banner_prefix_len));
525
526 if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
527 if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
528 logger().warn("{} peer is using V1 protocol", conn);
529 } else {
530 logger().warn("{} peer sent bad banner", conn);
531 }
532 abort_in_fault();
533 }
534 bl.trim_front(banner_prefix_len);
535
536 uint16_t payload_len;
537 bufferlist buf;
538 buf.append(buffer::create(std::move(bl)));
539 auto ti = buf.cbegin();
540 try {
541 decode(payload_len, ti);
542 } catch (const buffer::error &e) {
543 logger().warn("{} decode banner payload len failed", conn);
544 abort_in_fault();
545 }
546 logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
547 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
548 return read(payload_len);
549 }).then([this] (bufferlist bl) {
550 // 4. process peer banner_payload and send HelloFrame
551 auto p = bl.cbegin();
552 uint64_t peer_supported_features;
553 uint64_t peer_required_features;
554 try {
555 decode(peer_supported_features, p);
556 decode(peer_required_features, p);
557 } catch (const buffer::error &e) {
558 logger().warn("{} decode banner payload failed", conn);
559 abort_in_fault();
560 }
561 logger().debug("{} RECV({}) banner features: supported={} required={}",
562 conn, bl.length(),
563 peer_supported_features, peer_required_features);
564
565 // Check feature bit compatibility
566 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
567 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
568 if ((required_features & peer_supported_features) != required_features) {
569 logger().error("{} peer does not support all required features"
570 " required={} peer_supported={}",
571 conn, required_features, peer_supported_features);
572 abort_in_close(*this);
573 }
574 if ((supported_features & peer_required_features) != peer_required_features) {
575 logger().error("{} we do not support all peer required features"
576 " peer_required={} supported={}",
577 conn, peer_required_features, supported_features);
578 abort_in_close(*this);
579 }
580 this->peer_required_features = peer_required_features;
581 if (this->peer_required_features == 0) {
582 this->connection_features = msgr2_required;
583 }
584
585 auto hello = HelloFrame::Encode(messenger.get_mytype(),
586 conn.target_addr);
587 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
588 conn, ceph_entity_type_name(messenger.get_mytype()),
589 conn.target_addr);
590 return write_frame(hello);
591 }).then([this] {
592 //5. read peer HelloFrame
593 return read_main_preamble();
594 }).then([this] (Tag tag) {
595 expect_tag(Tag::HELLO, tag, conn, __func__);
596 return read_frame_payload();
597 }).then([this] {
598 // 6. process peer HelloFrame
599 auto hello = HelloFrame::Decode(rx_segments_data.back());
600 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
601 conn, ceph_entity_type_name(hello.entity_type()),
602 hello.peer_addr());
603 return seastar::make_ready_future<entity_type_t, entity_addr_t>(
604 hello.entity_type(), hello.peer_addr());
605 });
606 }
607
608 // CONNECTING state
609
610 seastar::future<> ProtocolV2::handle_auth_reply()
611 {
612 return read_main_preamble()
613 .then([this] (Tag tag) {
614 switch (tag) {
615 case Tag::AUTH_BAD_METHOD:
616 return read_frame_payload().then([this] {
617 // handle_auth_bad_method() logic
618 auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
619 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
620 "allowed_methods={}, allowed_modes={}",
621 conn, bad_method.method(), cpp_strerror(bad_method.result()),
622 bad_method.allowed_methods(), bad_method.allowed_modes());
623 ceph_assert(messenger.get_auth_client());
624 int r = messenger.get_auth_client()->handle_auth_bad_method(
625 conn.shared_from_this(), auth_meta,
626 bad_method.method(), bad_method.result(),
627 bad_method.allowed_methods(), bad_method.allowed_modes());
628 if (r < 0) {
629 logger().warn("{} auth_client handle_auth_bad_method returned {}",
630 conn, r);
631 abort_in_fault();
632 }
633 return client_auth(bad_method.allowed_methods());
634 });
635 case Tag::AUTH_REPLY_MORE:
636 return read_frame_payload().then([this] {
637 // handle_auth_reply_more() logic
638 auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
639 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
640 conn, auth_more.auth_payload().length());
641 ceph_assert(messenger.get_auth_client());
642 // let execute_connecting() take care of the thrown exception
643 auto reply = messenger.get_auth_client()->handle_auth_reply_more(
644 conn.shared_from_this(), auth_meta, auth_more.auth_payload());
645 auto more_reply = AuthRequestMoreFrame::Encode(reply);
646 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
647 conn, reply.length());
648 return write_frame(more_reply);
649 }).then([this] {
650 return handle_auth_reply();
651 });
652 case Tag::AUTH_DONE:
653 return read_frame_payload().then([this] {
654 // handle_auth_done() logic
655 auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
656 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
657 conn, auth_done.global_id(),
658 ceph_con_mode_name(auth_done.con_mode()),
659 auth_done.auth_payload().length());
660 ceph_assert(messenger.get_auth_client());
661 int r = messenger.get_auth_client()->handle_auth_done(
662 conn.shared_from_this(), auth_meta,
663 auth_done.global_id(),
664 auth_done.con_mode(),
665 auth_done.auth_payload());
666 if (r < 0) {
667 logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
668 abort_in_fault();
669 }
670 auth_meta->con_mode = auth_done.con_mode();
671 // TODO
672 ceph_assert(!auth_meta->is_mode_secure());
673 session_stream_handlers = { nullptr, nullptr };
674 return finish_auth();
675 });
676 default: {
677 unexpected_tag(tag, conn, __func__);
678 return seastar::now();
679 }
680 }
681 });
682 }
683
684 seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
685 {
686 // send_auth_request() logic
687 ceph_assert(messenger.get_auth_client());
688
689 try {
690 auto [auth_method, preferred_modes, bl] =
691 messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
692 auth_meta->auth_method = auth_method;
693 auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
694 logger().debug("{} WRITE AuthRequestFrame: method={},"
695 " preferred_modes={}, payload_len={}",
696 conn, auth_method, preferred_modes, bl.length());
697 return write_frame(frame).then([this] {
698 return handle_auth_reply();
699 });
700 } catch (const crimson::auth::error& e) {
701 logger().error("{} get_initial_auth_request returned {}", conn, e);
702 dispatch_reset();
703 abort_in_close(*this);
704 return seastar::now();
705 }
706 }
707
708 seastar::future<ProtocolV2::next_step_t>
709 ProtocolV2::process_wait()
710 {
711 return read_frame_payload().then([this] {
712 // handle_wait() logic
713 logger().debug("{} GOT WaitFrame", conn);
714 WaitFrame::Decode(rx_segments_data.back());
715 return next_step_t::wait;
716 });
717 }
718
719 seastar::future<ProtocolV2::next_step_t>
720 ProtocolV2::client_connect()
721 {
722 // send_client_ident() logic
723 uint64_t flags = 0;
724 if (conn.policy.lossy) {
725 flags |= CEPH_MSG_CONNECT_LOSSY;
726 }
727
728 auto client_ident = ClientIdentFrame::Encode(
729 messenger.get_myaddrs(),
730 conn.target_addr,
731 messenger.get_myname().num(),
732 global_seq,
733 conn.policy.features_supported,
734 conn.policy.features_required | msgr2_required, flags,
735 client_cookie);
736
737 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
738 " gs={}, features_supported={}, features_required={},"
739 " flags={}, cookie={}",
740 conn, messenger.get_myaddrs(), conn.target_addr,
741 messenger.get_myname().num(), global_seq,
742 conn.policy.features_supported,
743 conn.policy.features_required | msgr2_required,
744 flags, client_cookie);
745 return write_frame(client_ident).then([this] {
746 return read_main_preamble();
747 }).then([this] (Tag tag) {
748 switch (tag) {
749 case Tag::IDENT_MISSING_FEATURES:
750 return read_frame_payload().then([this] {
751 // handle_ident_missing_features() logic
752 auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
753 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
754 " (client does not support all server features)",
755 conn, ident_missing.features());
756 abort_in_fault();
757 return next_step_t::none;
758 });
759 case Tag::WAIT:
760 return process_wait();
761 case Tag::SERVER_IDENT:
762 return read_frame_payload().then([this] {
763 // handle_server_ident() logic
764 requeue_sent();
765 auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
766 logger().debug("{} GOT ServerIdentFrame:"
767 " addrs={}, gid={}, gs={},"
768 " features_supported={}, features_required={},"
769 " flags={}, cookie={}",
770 conn,
771 server_ident.addrs(), server_ident.gid(),
772 server_ident.global_seq(),
773 server_ident.supported_features(),
774 server_ident.required_features(),
775 server_ident.flags(), server_ident.cookie());
776
777 // is this who we intended to talk to?
778 // be a bit forgiving here, since we may be connecting based on addresses parsed out
779 // of mon_host or something.
780 if (!server_ident.addrs().contains(conn.target_addr)) {
781 logger().warn("{} peer identifies as {}, does not include {}",
782 conn, server_ident.addrs(), conn.target_addr);
783 throw std::system_error(
784 make_error_code(crimson::net::error::bad_peer_address));
785 }
786
787 server_cookie = server_ident.cookie();
788
789 // TODO: change peer_addr to entity_addrvec_t
790 if (server_ident.addrs().front() != conn.peer_addr) {
791 logger().warn("{} peer advertises as {}, does not match {}",
792 conn, server_ident.addrs(), conn.peer_addr);
793 throw std::system_error(
794 make_error_code(crimson::net::error::bad_peer_address));
795 }
796 conn.set_peer_id(server_ident.gid());
797 conn.set_features(server_ident.supported_features() &
798 conn.policy.features_supported);
799 peer_global_seq = server_ident.global_seq();
800
801 bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
802 if (lossy != conn.policy.lossy) {
803 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
804 conn.policy.lossy = lossy;
805 }
806 if (lossy && (connect_seq != 0 || server_cookie != 0)) {
807 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
808 conn, connect_seq, server_cookie);
809 connect_seq = 0;
810 server_cookie = 0;
811 }
812
813 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
814 });
815 default: {
816 unexpected_tag(tag, conn, "post_client_connect");
817 return seastar::make_ready_future<next_step_t>(next_step_t::none);
818 }
819 }
820 });
821 }
822
823 seastar::future<ProtocolV2::next_step_t>
824 ProtocolV2::client_reconnect()
825 {
826 // send_reconnect() logic
827 auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
828 client_cookie,
829 server_cookie,
830 global_seq,
831 connect_seq,
832 conn.in_seq);
833 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
834 " server_cookie={}, gs={}, cs={}, msg_seq={}",
835 conn, messenger.get_myaddrs(),
836 client_cookie, server_cookie,
837 global_seq, connect_seq, conn.in_seq);
838 return write_frame(reconnect).then([this] {
839 return read_main_preamble();
840 }).then([this] (Tag tag) {
841 switch (tag) {
842 case Tag::SESSION_RETRY_GLOBAL:
843 return read_frame_payload().then([this] {
844 // handle_session_retry_global() logic
845 auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
846 logger().warn("{} GOT RetryGlobalFrame: gs={}",
847 conn, retry.global_seq());
848 return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
849 global_seq = gs;
850 logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
851 return client_reconnect();
852 });
853 });
854 case Tag::SESSION_RETRY:
855 return read_frame_payload().then([this] {
856 // handle_session_retry() logic
857 auto retry = RetryFrame::Decode(rx_segments_data.back());
858 logger().warn("{} GOT RetryFrame: cs={}",
859 conn, retry.connect_seq());
860 connect_seq = retry.connect_seq() + 1;
861 logger().warn("{} UPDATE: cs={}", conn, connect_seq);
862 return client_reconnect();
863 });
864 case Tag::SESSION_RESET:
865 return read_frame_payload().then([this] {
866 // handle_session_reset() logic
867 auto reset = ResetFrame::Decode(rx_segments_data.back());
868 logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
869 reset_session(reset.full());
870 return client_connect();
871 });
872 case Tag::WAIT:
873 return process_wait();
874 case Tag::SESSION_RECONNECT_OK:
875 return read_frame_payload().then([this] {
876 // handle_reconnect_ok() logic
877 auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
878 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
879 conn, reconnect_ok.msg_seq());
880 requeue_up_to(reconnect_ok.msg_seq());
881 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
882 });
883 default: {
884 unexpected_tag(tag, conn, "post_client_reconnect");
885 return seastar::make_ready_future<next_step_t>(next_step_t::none);
886 }
887 }
888 });
889 }
890
891 void ProtocolV2::execute_connecting()
892 {
893 trigger_state(state_t::CONNECTING, write_state_t::delay, true);
894 if (socket) {
895 socket->shutdown();
896 }
897 execution_done = seastar::with_gate(pending_dispatch, [this] {
898 // we don't know my socket_port yet
899 conn.set_ephemeral_port(0, SocketConnection::side_t::none);
900 return messenger.get_global_seq().then([this] (auto gs) {
901 global_seq = gs;
902 assert(client_cookie != 0);
903 if (!conn.policy.lossy && server_cookie != 0) {
904 ++connect_seq;
905 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
906 conn, global_seq, connect_seq);
907 } else { // conn.policy.lossy || server_cookie == 0
908 assert(connect_seq == 0);
909 assert(server_cookie == 0);
910 logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
911 }
912
913 return wait_write_exit();
914 }).then([this] {
915 if (unlikely(state != state_t::CONNECTING)) {
916 logger().debug("{} triggered {} before Socket::connect()",
917 conn, get_state_name(state));
918 abort_protocol();
919 }
920 if (socket) {
921 (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
922 return sock->close().then([sock = std::move(sock)] {});
923 });
924 }
925 INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
926 return Socket::connect(conn.peer_addr);
927 }).then([this](SocketRef sock) {
928 logger().debug("{} socket connected", conn);
929 if (unlikely(state != state_t::CONNECTING)) {
930 logger().debug("{} triggered {} during Socket::connect()",
931 conn, get_state_name(state));
932 return sock->close().then([sock = std::move(sock)] {
933 abort_protocol();
934 });
935 }
936 socket = std::move(sock);
937 return seastar::now();
938 }).then([this] {
939 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
940 session_stream_handlers = { nullptr, nullptr };
941 enable_recording();
942 return banner_exchange();
943 }).then([this] (entity_type_t _peer_type,
944 entity_addr_t _my_addr_from_peer) {
945 if (conn.get_peer_type() != _peer_type) {
946 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
947 conn, ceph_entity_type_name(conn.get_peer_type()),
948 ceph_entity_type_name(_peer_type));
949 dispatch_reset();
950 abort_in_close(*this);
951 }
952 conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
953 SocketConnection::side_t::connector);
954 if (unlikely(_my_addr_from_peer.is_legacy())) {
955 logger().warn("{} peer sent a legacy address for me: {}",
956 conn, _my_addr_from_peer);
957 throw std::system_error(
958 make_error_code(crimson::net::error::bad_peer_address));
959 }
960 _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
961 return messenger.learned_addr(_my_addr_from_peer, conn);
962 }).then([this] {
963 return client_auth();
964 }).then([this] {
965 if (server_cookie == 0) {
966 ceph_assert(connect_seq == 0);
967 return client_connect();
968 } else {
969 ceph_assert(connect_seq > 0);
970 return client_reconnect();
971 }
972 }).then([this] (next_step_t next) {
973 if (unlikely(state != state_t::CONNECTING)) {
974 logger().debug("{} triggered {} at the end of execute_connecting()",
975 conn, get_state_name(state));
976 abort_protocol();
977 }
978 switch (next) {
979 case next_step_t::ready: {
980 (void) seastar::with_gate(pending_dispatch, [this] {
981 return dispatcher.ms_handle_connect(
982 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
983 }).handle_exception([this] (std::exception_ptr eptr) {
984 logger().error("{} ms_handle_connect caught exception: {}", conn, eptr);
985 ceph_abort("unexpected exception from ms_handle_connect()");
986 });
987 logger().info("{} connected:"
988 " gs={}, pgs={}, cs={}, client_cookie={},"
989 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
990 conn, global_seq, peer_global_seq, connect_seq,
991 client_cookie, server_cookie, conn.in_seq,
992 conn.out_seq, conn.out_q.size());
993 execute_ready();
994 break;
995 }
996 case next_step_t::wait: {
997 logger().info("{} execute_connecting(): going to WAIT", conn);
998 execute_wait(true);
999 break;
1000 }
1001 default: {
1002 ceph_abort("impossible next step");
1003 }
1004 }
1005 }).handle_exception([this] (std::exception_ptr eptr) {
1006 if (state != state_t::CONNECTING) {
1007 logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
1008 conn, get_state_name(state), eptr);
1009 assert(state == state_t::CLOSING ||
1010 state == state_t::REPLACING);
1011 return;
1012 }
1013
1014 if (conn.policy.server ||
1015 (conn.policy.standby &&
1016 (!is_queued() && conn.sent.empty()))) {
1017 logger().info("{} execute_connecting(): fault at {} with nothing to send,"
1018 " going to STANDBY -- {}",
1019 conn, get_state_name(state), eptr);
1020 execute_standby();
1021 } else {
1022 logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
1023 conn, get_state_name(state), eptr);
1024 execute_wait(false);
1025 }
1026 });
1027 });
1028 }
1029
1030 // ACCEPTING state
1031
1032 seastar::future<> ProtocolV2::_auth_bad_method(int r)
1033 {
1034 // _auth_bad_method() logic
1035 ceph_assert(r < 0);
1036 auto [allowed_methods, allowed_modes] =
1037 messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
1038 auto bad_method = AuthBadMethodFrame::Encode(
1039 auth_meta->auth_method, r, allowed_methods, allowed_modes);
1040 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
1041 "allowed_methods={}, allowed_modes={})",
1042 conn, auth_meta->auth_method, cpp_strerror(r),
1043 allowed_methods, allowed_modes);
1044 return write_frame(bad_method).then([this] {
1045 return server_auth();
1046 });
1047 }
1048
1049 seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
1050 {
1051 // _handle_auth_request() logic
1052 ceph_assert(messenger.get_auth_server());
1053 bufferlist reply;
1054 int r = messenger.get_auth_server()->handle_auth_request(
1055 conn.shared_from_this(), auth_meta,
1056 more, auth_meta->auth_method, auth_payload,
1057 &reply);
1058 switch (r) {
1059 // successful
1060 case 1: {
1061 auto auth_done = AuthDoneFrame::Encode(
1062 conn.peer_global_id, auth_meta->con_mode, reply);
1063 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
1064 conn, conn.peer_global_id,
1065 ceph_con_mode_name(auth_meta->con_mode), reply.length());
1066 return write_frame(auth_done).then([this] {
1067 ceph_assert(auth_meta);
1068 // TODO
1069 ceph_assert(!auth_meta->is_mode_secure());
1070 session_stream_handlers = { nullptr, nullptr };
1071 return finish_auth();
1072 });
1073 }
1074 // auth more
1075 case 0: {
1076 auto more = AuthReplyMoreFrame::Encode(reply);
1077 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
1078 conn, reply.length());
1079 return write_frame(more).then([this] {
1080 return read_main_preamble();
1081 }).then([this] (Tag tag) {
1082 expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
1083 return read_frame_payload();
1084 }).then([this] {
1085 auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
1086 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
1087 conn, auth_more.auth_payload().length());
1088 return _handle_auth_request(auth_more.auth_payload(), true);
1089 });
1090 }
1091 case -EBUSY: {
1092 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
1093 abort_in_fault();
1094 return seastar::now();
1095 }
1096 default: {
1097 logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
1098 return _auth_bad_method(r);
1099 }
1100 }
1101 }
1102
1103 seastar::future<> ProtocolV2::server_auth()
1104 {
1105 return read_main_preamble()
1106 .then([this] (Tag tag) {
1107 expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
1108 return read_frame_payload();
1109 }).then([this] {
1110 // handle_auth_request() logic
1111 auto request = AuthRequestFrame::Decode(rx_segments_data.back());
1112 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1113 " payload_len={}",
1114 conn, request.method(), request.preferred_modes(),
1115 request.auth_payload().length());
1116 auth_meta->auth_method = request.method();
1117 auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
1118 conn.get_peer_type(), auth_meta->auth_method,
1119 request.preferred_modes());
1120 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
1121 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
1122 return _auth_bad_method(-EOPNOTSUPP);
1123 }
1124 return _handle_auth_request(request.auth_payload(), false);
1125 });
1126 }
1127
1128 seastar::future<ProtocolV2::next_step_t>
1129 ProtocolV2::send_wait()
1130 {
1131 auto wait = WaitFrame::Encode();
1132 logger().debug("{} WRITE WaitFrame", conn);
1133 return write_frame(wait).then([] {
1134 return next_step_t::wait;
1135 });
1136 }
1137
1138 seastar::future<ProtocolV2::next_step_t>
1139 ProtocolV2::reuse_connection(
1140 ProtocolV2* existing_proto, bool do_reset,
1141 bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
1142 {
1143 existing_proto->trigger_replacing(reconnect,
1144 do_reset,
1145 std::move(socket),
1146 std::move(auth_meta),
1147 std::move(session_stream_handlers),
1148 peer_global_seq,
1149 client_cookie,
1150 conn.get_peer_name(),
1151 connection_features,
1152 conn_seq,
1153 msg_seq);
1154 #ifdef UNIT_TESTS_BUILT
1155 if (conn.interceptor) {
1156 conn.interceptor->register_conn_replaced(conn);
1157 }
1158 #endif
1159 // close this connection because all the necessary information is delivered
1160 // to the exisiting connection, and jump to error handling code to abort the
1161 // current state.
1162 abort_in_close(*this);
1163 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1164 }
1165
1166 seastar::future<ProtocolV2::next_step_t>
1167 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
1168 {
1169 // handle_existing_connection() logic
1170 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1171 existing_conn->protocol.get());
1172 ceph_assert(existing_proto);
1173 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1174 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1175 conn, global_seq, peer_global_seq, connect_seq,
1176 client_cookie, server_cookie,
1177 existing_conn, get_state_name(existing_proto->state),
1178 existing_proto->global_seq,
1179 existing_proto->peer_global_seq,
1180 existing_proto->connect_seq,
1181 existing_proto->client_cookie,
1182 existing_proto->server_cookie);
1183
1184 if (existing_proto->state == state_t::REPLACING) {
1185 logger().warn("{} server_connect: racing replace happened while"
1186 " replacing existing connection {}, send wait.",
1187 conn, *existing_conn);
1188 return send_wait();
1189 }
1190
1191 if (existing_proto->peer_global_seq > peer_global_seq) {
1192 logger().warn("{} server_connect:"
1193 " this is a stale connection, because peer_global_seq({})"
1194 " < existing->peer_global_seq({}), close this connection"
1195 " in favor of existing connection {}",
1196 conn, peer_global_seq,
1197 existing_proto->peer_global_seq, *existing_conn);
1198 abort_in_fault();
1199 }
1200
1201 if (existing_conn->policy.lossy) {
1202 // existing connection can be thrown out in favor of this one
1203 logger().warn("{} server_connect:"
1204 " existing connection {} is a lossy channel. Close existing in favor of"
1205 " this connection", conn, *existing_conn);
1206 existing_proto->dispatch_reset();
1207 (void) existing_proto->close();
1208
1209 if (unlikely(state != state_t::ACCEPTING)) {
1210 logger().debug("{} triggered {} in execute_accepting()",
1211 conn, get_state_name(state));
1212 abort_protocol();
1213 }
1214 execute_establishing();
1215 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1216 }
1217
1218 if (existing_proto->server_cookie != 0) {
1219 if (existing_proto->client_cookie != client_cookie) {
1220 // Found previous session
1221 // peer has reset and we're going to reuse the existing connection
1222 // by replacing the socket
1223 logger().warn("{} server_connect:"
1224 " found new session (cs={})"
1225 " when existing {} is with stale session (cs={}, ss={}),"
1226 " peer must have reset",
1227 conn, client_cookie,
1228 *existing_conn, existing_proto->client_cookie,
1229 existing_proto->server_cookie);
1230 return reuse_connection(existing_proto, conn.policy.resetcheck);
1231 } else {
1232 // session establishment interrupted between client_ident and server_ident,
1233 // continuing...
1234 logger().warn("{} server_connect: found client session with existing {}"
1235 " matched (cs={}, ss={}), continuing session establishment",
1236 conn, *existing_conn, client_cookie, existing_proto->server_cookie);
1237 return reuse_connection(existing_proto);
1238 }
1239 } else {
1240 // Looks like a connection race: server and client are both connecting to
1241 // each other at the same time.
1242 if (existing_proto->client_cookie != client_cookie) {
1243 if (existing_conn->peer_wins()) {
1244 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1245 " and win, reusing existing {}",
1246 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1247 return reuse_connection(existing_proto);
1248 } else {
1249 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1250 " and lose to existing {}, ask client to wait",
1251 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1252 return existing_conn->keepalive().then([this] {
1253 return send_wait();
1254 });
1255 }
1256 } else {
1257 logger().warn("{} server_connect: found client session with existing {}"
1258 " matched (cs={}, ss={}), continuing session establishment",
1259 conn, *existing_conn, client_cookie, existing_proto->server_cookie);
1260 return reuse_connection(existing_proto);
1261 }
1262 }
1263 }
1264
1265 seastar::future<ProtocolV2::next_step_t>
1266 ProtocolV2::server_connect()
1267 {
1268 return read_frame_payload().then([this] {
1269 // handle_client_ident() logic
1270 auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
1271 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1272 " gid={}, gs={}, features_supported={},"
1273 " features_required={}, flags={}, cookie={}",
1274 conn, client_ident.addrs(), client_ident.target_addr(),
1275 client_ident.gid(), client_ident.global_seq(),
1276 client_ident.supported_features(),
1277 client_ident.required_features(),
1278 client_ident.flags(), client_ident.cookie());
1279
1280 if (client_ident.addrs().empty() ||
1281 client_ident.addrs().front() == entity_addr_t()) {
1282 logger().warn("{} oops, client_ident.addrs() is empty", conn);
1283 throw std::system_error(
1284 make_error_code(crimson::net::error::bad_peer_address));
1285 }
1286 if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
1287 logger().warn("{} peer is trying to reach {} which is not us ({})",
1288 conn, client_ident.target_addr(), messenger.get_myaddrs());
1289 throw std::system_error(
1290 make_error_code(crimson::net::error::bad_peer_address));
1291 }
1292 // TODO: change peer_addr to entity_addrvec_t
1293 entity_addr_t paddr = client_ident.addrs().front();
1294 if ((paddr.is_msgr2() || paddr.is_any()) &&
1295 paddr.is_same_host(conn.target_addr)) {
1296 // good
1297 } else {
1298 logger().warn("{} peer's address {} is not v2 or not the same host with {}",
1299 conn, paddr, conn.target_addr);
1300 throw std::system_error(
1301 make_error_code(crimson::net::error::bad_peer_address));
1302 }
1303 conn.peer_addr = paddr;
1304 logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
1305 conn.target_addr = conn.peer_addr;
1306 if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
1307 logger().warn("{} we don't know how to reconnect to peer {}",
1308 conn, conn.target_addr);
1309 throw std::system_error(
1310 make_error_code(crimson::net::error::bad_peer_address));
1311 }
1312
1313 conn.set_peer_id(client_ident.gid());
1314 client_cookie = client_ident.cookie();
1315
1316 uint64_t feat_missing =
1317 (conn.policy.features_required | msgr2_required) &
1318 ~(uint64_t)client_ident.supported_features();
1319 if (feat_missing) {
1320 auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
1321 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1322 conn, feat_missing);
1323 return write_frame(ident_missing_features).then([] {
1324 return next_step_t::wait;
1325 });
1326 }
1327 connection_features =
1328 client_ident.supported_features() & conn.policy.features_supported;
1329 logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
1330
1331 peer_global_seq = client_ident.global_seq();
1332
1333 // Looks good so far, let's check if there is already an existing connection
1334 // to this peer.
1335
1336 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1337
1338 if (existing_conn) {
1339 if (existing_conn->protocol->proto_type != proto_t::v2) {
1340 logger().warn("{} existing connection {} proto version is {}, close existing",
1341 conn, *existing_conn,
1342 static_cast<int>(existing_conn->protocol->proto_type));
1343 // should unregister the existing from msgr atomically
1344 (void) existing_conn->close();
1345 } else {
1346 return handle_existing_connection(existing_conn);
1347 }
1348 }
1349
1350 if (unlikely(state != state_t::ACCEPTING)) {
1351 logger().debug("{} triggered {} in execute_accepting()",
1352 conn, get_state_name(state));
1353 abort_protocol();
1354 }
1355 execute_establishing();
1356 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1357 });
1358 }
1359
1360 seastar::future<ProtocolV2::next_step_t>
1361 ProtocolV2::read_reconnect()
1362 {
1363 return read_main_preamble()
1364 .then([this] (Tag tag) {
1365 expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
1366 return server_reconnect();
1367 });
1368 }
1369
1370 seastar::future<ProtocolV2::next_step_t>
1371 ProtocolV2::send_retry(uint64_t connect_seq)
1372 {
1373 auto retry = RetryFrame::Encode(connect_seq);
1374 logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
1375 return write_frame(retry).then([this] {
1376 return read_reconnect();
1377 });
1378 }
1379
1380 seastar::future<ProtocolV2::next_step_t>
1381 ProtocolV2::send_retry_global(uint64_t global_seq)
1382 {
1383 auto retry = RetryGlobalFrame::Encode(global_seq);
1384 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
1385 return write_frame(retry).then([this] {
1386 return read_reconnect();
1387 });
1388 }
1389
1390 seastar::future<ProtocolV2::next_step_t>
1391 ProtocolV2::send_reset(bool full)
1392 {
1393 auto reset = ResetFrame::Encode(full);
1394 logger().warn("{} WRITE ResetFrame: full={}", conn, full);
1395 return write_frame(reset).then([this] {
1396 return read_main_preamble();
1397 }).then([this] (Tag tag) {
1398 expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
1399 return server_connect();
1400 });
1401 }
1402
1403 seastar::future<ProtocolV2::next_step_t>
1404 ProtocolV2::server_reconnect()
1405 {
1406 return read_frame_payload().then([this] {
1407 // handle_reconnect() logic
1408 auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
1409
1410 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1411 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1412 conn, reconnect.addrs(),
1413 reconnect.client_cookie(), reconnect.server_cookie(),
1414 reconnect.global_seq(), reconnect.connect_seq(),
1415 reconnect.msg_seq());
1416
1417 // can peer_addrs be changed on-the-fly?
1418 // TODO: change peer_addr to entity_addrvec_t
1419 entity_addr_t paddr = reconnect.addrs().front();
1420 if (paddr.is_msgr2() || paddr.is_any()) {
1421 // good
1422 } else {
1423 logger().warn("{} peer's address {} is not v2", conn, paddr);
1424 throw std::system_error(
1425 make_error_code(crimson::net::error::bad_peer_address));
1426 }
1427 if (conn.peer_addr == entity_addr_t()) {
1428 conn.peer_addr = paddr;
1429 } else if (conn.peer_addr != paddr) {
1430 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1431 " reconnect failed",
1432 conn, paddr, conn.peer_addr);
1433 throw std::system_error(
1434 make_error_code(crimson::net::error::bad_peer_address));
1435 }
1436 peer_global_seq = reconnect.global_seq();
1437
1438 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1439
1440 if (!existing_conn) {
1441 // there is no existing connection therefore cannot reconnect to previous
1442 // session
1443 logger().warn("{} server_reconnect: no existing connection from address {},"
1444 " reseting client", conn, conn.peer_addr);
1445 return send_reset(true);
1446 }
1447
1448 if (existing_conn->protocol->proto_type != proto_t::v2) {
1449 logger().warn("{} server_reconnect: existing connection {} proto version is {},"
1450 "close existing and reset client.",
1451 conn, *existing_conn,
1452 static_cast<int>(existing_conn->protocol->proto_type));
1453 (void) existing_conn->close();
1454 return send_reset(true);
1455 }
1456
1457 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1458 existing_conn->protocol.get());
1459 ceph_assert(existing_proto);
1460 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1461 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1462 conn, global_seq, peer_global_seq, reconnect.connect_seq(),
1463 reconnect.client_cookie(), reconnect.server_cookie(),
1464 existing_conn,
1465 get_state_name(existing_proto->state),
1466 existing_proto->global_seq,
1467 existing_proto->peer_global_seq,
1468 existing_proto->connect_seq,
1469 existing_proto->client_cookie,
1470 existing_proto->server_cookie);
1471
1472 if (existing_proto->state == state_t::REPLACING) {
1473 logger().warn("{} server_reconnect: racing replace happened while "
1474 " replacing existing connection {}, retry global.",
1475 conn, *existing_conn);
1476 return send_retry_global(existing_proto->peer_global_seq);
1477 }
1478
1479 if (existing_proto->client_cookie != reconnect.client_cookie()) {
1480 logger().warn("{} server_reconnect:"
1481 " client_cookie mismatch with existing connection {},"
1482 " cc={} rcc={}. I must have reset, reseting client.",
1483 conn, *existing_conn,
1484 existing_proto->client_cookie, reconnect.client_cookie());
1485 return send_reset(conn.policy.resetcheck);
1486 } else if (existing_proto->server_cookie == 0) {
1487 // this happens when:
1488 // - a connects to b
1489 // - a sends client_ident
1490 // - b gets client_ident, sends server_ident and sets cookie X
1491 // - connection fault
1492 // - b reconnects to a with cookie X, connect_seq=1
1493 // - a has cookie==0
1494 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1495 " server_ident with existing connection {}."
1496 " Asking peer to resume session establishment",
1497 conn, existing_proto->client_cookie, *existing_conn);
1498 return send_reset(false);
1499 }
1500
1501 if (existing_proto->peer_global_seq > reconnect.global_seq()) {
1502 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1503 " with existing connection {},"
1504 " ask client to retry global",
1505 conn, existing_proto->peer_global_seq,
1506 reconnect.global_seq(), *existing_conn);
1507 return send_retry_global(existing_proto->peer_global_seq);
1508 }
1509
1510 if (existing_proto->connect_seq > reconnect.connect_seq()) {
1511 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1512 " with existing connection {}, ask client to retry",
1513 conn, reconnect.connect_seq(),
1514 existing_proto->connect_seq, *existing_conn);
1515 return send_retry(existing_proto->connect_seq);
1516 } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
1517 // reconnect race: both peers are sending reconnect messages
1518 if (existing_conn->peer_wins()) {
1519 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1520 " and win, reusing existing {}",
1521 conn, reconnect.connect_seq(), *existing_conn);
1522 return reuse_connection(
1523 existing_proto, false,
1524 true, reconnect.connect_seq(), reconnect.msg_seq());
1525 } else {
1526 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1527 " and lose to existing {}, ask client to wait",
1528 conn, reconnect.connect_seq(), *existing_conn);
1529 return send_wait();
1530 }
1531 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1532 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1533 " reusing existing {}",
1534 conn, existing_proto->connect_seq,
1535 reconnect.connect_seq(), *existing_conn);
1536 return reuse_connection(
1537 existing_proto, false,
1538 true, reconnect.connect_seq(), reconnect.msg_seq());
1539 }
1540 });
1541 }
1542
1543 void ProtocolV2::execute_accepting()
1544 {
1545 trigger_state(state_t::ACCEPTING, write_state_t::none, false);
1546 (void) seastar::with_gate(pending_dispatch, [this] {
1547 return seastar::futurize_apply([this] {
1548 INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
1549 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
1550 session_stream_handlers = { nullptr, nullptr };
1551 enable_recording();
1552 return banner_exchange();
1553 }).then([this] (entity_type_t _peer_type,
1554 entity_addr_t _my_addr_from_peer) {
1555 ceph_assert(conn.get_peer_type() == 0);
1556 conn.set_peer_type(_peer_type);
1557
1558 conn.policy = messenger.get_policy(_peer_type);
1559 logger().info("{} UPDATE: peer_type={},"
1560 " policy(lossy={} server={} standby={} resetcheck={})",
1561 conn, ceph_entity_type_name(_peer_type),
1562 conn.policy.lossy, conn.policy.server,
1563 conn.policy.standby, conn.policy.resetcheck);
1564 if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
1565 messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
1566 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1567 conn, _my_addr_from_peer, messenger.get_myaddr());
1568 throw std::system_error(
1569 make_error_code(crimson::net::error::bad_peer_address));
1570 }
1571 return messenger.learned_addr(_my_addr_from_peer, conn);
1572 }).then([this] {
1573 return server_auth();
1574 }).then([this] {
1575 return read_main_preamble();
1576 }).then([this] (Tag tag) {
1577 switch (tag) {
1578 case Tag::CLIENT_IDENT:
1579 return server_connect();
1580 case Tag::SESSION_RECONNECT:
1581 return server_reconnect();
1582 default: {
1583 unexpected_tag(tag, conn, "post_server_auth");
1584 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1585 }
1586 }
1587 }).then([this] (next_step_t next) {
1588 switch (next) {
1589 case next_step_t::ready:
1590 assert(state != state_t::ACCEPTING);
1591 break;
1592 case next_step_t::wait:
1593 if (unlikely(state != state_t::ACCEPTING)) {
1594 logger().debug("{} triggered {} at the end of execute_accepting()",
1595 conn, get_state_name(state));
1596 abort_protocol();
1597 }
1598 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
1599 execute_server_wait();
1600 break;
1601 default:
1602 ceph_abort("impossible next step");
1603 }
1604 }).handle_exception([this] (std::exception_ptr eptr) {
1605 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1606 conn, get_state_name(state), eptr);
1607 (void) close();
1608 });
1609 });
1610 }
1611
1612 // CONNECTING or ACCEPTING state
1613
1614 seastar::future<> ProtocolV2::finish_auth()
1615 {
1616 ceph_assert(auth_meta);
1617
1618 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1619 auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
1620 auto sig_frame = AuthSignatureFrame::Encode(sig);
1621 ceph_assert(record_io);
1622 record_io = false;
1623 rxbuf.clear();
1624 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
1625 return write_frame(sig_frame).then([this] {
1626 return read_main_preamble();
1627 }).then([this] (Tag tag) {
1628 expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
1629 return read_frame_payload();
1630 }).then([this] {
1631 // handle_auth_signature() logic
1632 auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
1633 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
1634
1635 const auto actual_tx_sig = auth_meta->session_key.empty() ?
1636 sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
1637 if (sig_frame.signature() != actual_tx_sig) {
1638 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1639 " sig_frame.signature()={}",
1640 conn, actual_tx_sig, sig_frame.signature());
1641 abort_in_fault();
1642 }
1643 txbuf.clear();
1644 });
1645 }
1646
1647 // ESTABLISHING
1648
1649 void ProtocolV2::execute_establishing() {
1650 trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
1651 (void) seastar::with_gate(pending_dispatch, [this] {
1652 return dispatcher.ms_handle_accept(
1653 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1654 }).handle_exception([this] (std::exception_ptr eptr) {
1655 logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
1656 ceph_abort("unexpected exception from ms_handle_accept()");
1657 });
1658 messenger.register_conn(
1659 seastar::static_pointer_cast<SocketConnection>(
1660 conn.shared_from_this()));
1661 messenger.unaccept_conn(
1662 seastar::static_pointer_cast<SocketConnection>(
1663 conn.shared_from_this()));
1664 execution_done = seastar::with_gate(pending_dispatch, [this] {
1665 return seastar::futurize_apply([this] {
1666 return send_server_ident();
1667 }).then([this] {
1668 if (unlikely(state != state_t::ESTABLISHING)) {
1669 logger().debug("{} triggered {} at the end of execute_establishing()",
1670 conn, get_state_name(state));
1671 abort_protocol();
1672 }
1673 logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
1674 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
1675 conn, global_seq, peer_global_seq, connect_seq,
1676 client_cookie, server_cookie, conn.in_seq,
1677 conn.out_seq, conn.out_q.size());
1678 execute_ready();
1679 }).handle_exception([this] (std::exception_ptr eptr) {
1680 if (state != state_t::ESTABLISHING) {
1681 logger().info("{} execute_establishing() protocol aborted at {} -- {}",
1682 conn, get_state_name(state), eptr);
1683 assert(state == state_t::CLOSING ||
1684 state == state_t::REPLACING);
1685 return;
1686 }
1687 fault(false, "execute_establishing()", eptr);
1688 });
1689 });
1690 }
1691
1692 // ESTABLISHING or REPLACING state
1693
1694 seastar::future<>
1695 ProtocolV2::send_server_ident()
1696 {
1697 // send_server_ident() logic
1698
1699 // refered to async-conn v2: not assign gs to global_seq
1700 return messenger.get_global_seq().then([this] (auto gs) {
1701 logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
1702
1703 // this is required for the case when this connection is being replaced
1704 requeue_up_to(0);
1705 conn.in_seq = 0;
1706
1707 if (!conn.policy.lossy) {
1708 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1709 }
1710
1711 uint64_t flags = 0;
1712 if (conn.policy.lossy) {
1713 flags = flags | CEPH_MSG_CONNECT_LOSSY;
1714 }
1715
1716 auto server_ident = ServerIdentFrame::Encode(
1717 messenger.get_myaddrs(),
1718 messenger.get_myname().num(),
1719 gs,
1720 conn.policy.features_supported,
1721 conn.policy.features_required | msgr2_required,
1722 flags,
1723 server_cookie);
1724
1725 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1726 " gs={}, features_supported={}, features_required={},"
1727 " flags={}, cookie={}",
1728 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
1729 gs, conn.policy.features_supported,
1730 conn.policy.features_required | msgr2_required,
1731 flags, server_cookie);
1732
1733 conn.set_features(connection_features);
1734
1735 return write_frame(server_ident);
1736 });
1737 }
1738
1739 // REPLACING state
1740
1741 void ProtocolV2::trigger_replacing(bool reconnect,
1742 bool do_reset,
1743 SocketRef&& new_socket,
1744 AuthConnectionMetaRef&& new_auth_meta,
1745 ceph::crypto::onwire::rxtx_t new_rxtx,
1746 uint64_t new_peer_global_seq,
1747 uint64_t new_client_cookie,
1748 entity_name_t new_peer_name,
1749 uint64_t new_conn_features,
1750 uint64_t new_connect_seq,
1751 uint64_t new_msg_seq)
1752 {
1753 trigger_state(state_t::REPLACING, write_state_t::delay, false);
1754 if (socket) {
1755 socket->shutdown();
1756 }
1757 (void) seastar::with_gate(pending_dispatch, [this] {
1758 return dispatcher.ms_handle_accept(
1759 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1760 }).handle_exception([this] (std::exception_ptr eptr) {
1761 logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
1762 ceph_abort("unexpected exception from ms_handle_accept()");
1763 });
1764 (void) seastar::with_gate(pending_dispatch,
1765 [this,
1766 reconnect,
1767 do_reset,
1768 new_socket = std::move(new_socket),
1769 new_auth_meta = std::move(new_auth_meta),
1770 new_rxtx = std::move(new_rxtx),
1771 new_client_cookie, new_peer_name,
1772 new_conn_features, new_peer_global_seq,
1773 new_connect_seq, new_msg_seq] () mutable {
1774 return wait_write_exit().then([this, do_reset] {
1775 if (do_reset) {
1776 reset_session(true);
1777 }
1778 protocol_timer.cancel();
1779 return execution_done.get_future();
1780 }).then([this,
1781 reconnect,
1782 new_socket = std::move(new_socket),
1783 new_auth_meta = std::move(new_auth_meta),
1784 new_rxtx = std::move(new_rxtx),
1785 new_client_cookie, new_peer_name,
1786 new_conn_features, new_peer_global_seq,
1787 new_connect_seq, new_msg_seq] () mutable {
1788 if (unlikely(state != state_t::REPLACING)) {
1789 return new_socket->close().then([sock = std::move(new_socket)] {
1790 abort_protocol();
1791 });
1792 }
1793
1794 if (socket) {
1795 (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
1796 return sock->close().then([sock = std::move(sock)] {});
1797 });
1798 }
1799 socket = std::move(new_socket);
1800 auth_meta = std::move(new_auth_meta);
1801 session_stream_handlers = std::move(new_rxtx);
1802 record_io = false;
1803 peer_global_seq = new_peer_global_seq;
1804
1805 if (reconnect) {
1806 connect_seq = new_connect_seq;
1807 // send_reconnect_ok() logic
1808 requeue_up_to(new_msg_seq);
1809 auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
1810 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
1811 return write_frame(reconnect_ok);
1812 } else {
1813 client_cookie = new_client_cookie;
1814 conn.set_peer_name(new_peer_name);
1815 connection_features = new_conn_features;
1816 return send_server_ident();
1817 }
1818 }).then([this, reconnect] {
1819 if (unlikely(state != state_t::REPLACING)) {
1820 logger().debug("{} triggered {} at the end of trigger_replacing()",
1821 conn, get_state_name(state));
1822 abort_protocol();
1823 }
1824 logger().info("{} replaced ({}):"
1825 " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
1826 " in_seq={}, out_seq={}, out_q={}",
1827 conn, reconnect ? "reconnected" : "connected",
1828 global_seq, peer_global_seq, connect_seq, client_cookie,
1829 server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
1830 execute_ready();
1831 }).handle_exception([this] (std::exception_ptr eptr) {
1832 if (state != state_t::REPLACING) {
1833 logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
1834 conn, get_state_name(state), eptr);
1835 assert(state == state_t::CLOSING);
1836 return;
1837 }
1838 fault(true, "trigger_replacing()", eptr);
1839 });
1840 });
1841 }
1842
1843 // READY state
1844
1845 ceph::bufferlist ProtocolV2::do_sweep_messages(
1846 const std::deque<MessageRef>& msgs,
1847 size_t num_msgs,
1848 bool require_keepalive,
1849 std::optional<utime_t> _keepalive_ack,
1850 bool require_ack)
1851 {
1852 ceph::bufferlist bl;
1853
1854 if (unlikely(require_keepalive)) {
1855 auto keepalive_frame = KeepAliveFrame::Encode();
1856 bl.append(keepalive_frame.get_buffer(tx_frame_asm));
1857 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
1858 }
1859
1860 if (unlikely(_keepalive_ack.has_value())) {
1861 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
1862 bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
1863 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
1864 }
1865
1866 if (require_ack && !num_msgs) {
1867 auto ack_frame = AckFrame::Encode(conn.in_seq);
1868 bl.append(ack_frame.get_buffer(tx_frame_asm));
1869 INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
1870 }
1871
1872 std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
1873 // TODO: move to common code
1874 // set priority
1875 msg->get_header().src = messenger.get_myname();
1876
1877 msg->encode(conn.features, 0);
1878
1879 ceph_assert(!msg->get_seq() && "message already has seq");
1880 msg->set_seq(++conn.out_seq);
1881
1882 ceph_msg_header &header = msg->get_header();
1883 ceph_msg_footer &footer = msg->get_footer();
1884
1885 ceph_msg_header2 header2{header.seq, header.tid,
1886 header.type, header.priority,
1887 header.version,
1888 init_le32(0), header.data_off,
1889 init_le64(conn.in_seq),
1890 footer.flags, header.compat_version,
1891 header.reserved};
1892
1893 auto message = MessageFrame::Encode(header2,
1894 msg->get_payload(), msg->get_middle(), msg->get_data());
1895 logger().debug("{} --> #{} === {} ({})",
1896 conn, msg->get_seq(), *msg, msg->get_type());
1897 bl.append(message.get_buffer(tx_frame_asm));
1898 INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
1899 });
1900
1901 return bl;
1902 }
1903
1904 seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
1905 {
1906 return read_frame_payload()
1907 .then([this, throttle_stamp] {
1908 utime_t recv_stamp{seastar::lowres_system_clock::now()};
1909
1910 // we need to get the size before std::moving segments data
1911 const size_t cur_msg_size = get_current_msg_size();
1912 auto msg_frame = MessageFrame::Decode(rx_segments_data);
1913 // XXX: paranoid copy just to avoid oops
1914 ceph_msg_header2 current_header = msg_frame.header();
1915
1916 logger().trace("{} got {} + {} + {} byte message,"
1917 " envelope type={} src={} off={} seq={}",
1918 conn, msg_frame.front_len(), msg_frame.middle_len(),
1919 msg_frame.data_len(), current_header.type, conn.get_peer_name(),
1920 current_header.data_off, current_header.seq);
1921
1922 ceph_msg_header header{current_header.seq,
1923 current_header.tid,
1924 current_header.type,
1925 current_header.priority,
1926 current_header.version,
1927 init_le32(msg_frame.front_len()),
1928 init_le32(msg_frame.middle_len()),
1929 init_le32(msg_frame.data_len()),
1930 current_header.data_off,
1931 conn.get_peer_name(),
1932 current_header.compat_version,
1933 current_header.reserved,
1934 init_le32(0)};
1935 ceph_msg_footer footer{init_le32(0), init_le32(0),
1936 init_le32(0), init_le64(0), current_header.flags};
1937
1938 auto pconn = seastar::static_pointer_cast<SocketConnection>(
1939 conn.shared_from_this());
1940 Message *message = decode_message(nullptr, 0, header, footer,
1941 msg_frame.front(), msg_frame.middle(), msg_frame.data(),
1942 std::move(pconn));
1943 if (!message) {
1944 logger().warn("{} decode message failed", conn);
1945 abort_in_fault();
1946 }
1947
1948 // store reservation size in message, so we don't get confused
1949 // by messages entering the dispatch queue through other paths.
1950 message->set_dispatch_throttle_size(cur_msg_size);
1951
1952 message->set_throttle_stamp(throttle_stamp);
1953 message->set_recv_stamp(recv_stamp);
1954 message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
1955
1956 // check received seq#. if it is old, drop the message.
1957 // note that incoming messages may skip ahead. this is convenient for the
1958 // client side queueing because messages can't be renumbered, but the (kernel)
1959 // client will occasionally pull a message out of the sent queue to send
1960 // elsewhere. in that case it doesn't matter if we "got" it or not.
1961 uint64_t cur_seq = conn.in_seq;
1962 if (message->get_seq() <= cur_seq) {
1963 logger().error("{} got old message {} <= {} {} {}, discarding",
1964 conn, message->get_seq(), cur_seq, message, *message);
1965 if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
1966 conf.ms_die_on_old_message) {
1967 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1968 }
1969 return;
1970 } else if (message->get_seq() > cur_seq + 1) {
1971 logger().error("{} missed message? skipped from seq {} to {}",
1972 conn, cur_seq, message->get_seq());
1973 if (conf.ms_die_on_skipped_message) {
1974 ceph_assert(0 == "skipped incoming seq");
1975 }
1976 }
1977
1978 // note last received message.
1979 conn.in_seq = message->get_seq();
1980 logger().debug("{} <== #{} === {} ({})",
1981 conn, message->get_seq(), *message, message->get_type());
1982 notify_ack();
1983 ack_writes(current_header.ack_seq);
1984
1985 // TODO: change MessageRef with seastar::shared_ptr
1986 auto msg_ref = MessageRef{message, false};
1987 (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
1988 return dispatcher.ms_dispatch(&conn, std::move(msg));
1989 }).handle_exception([this] (std::exception_ptr eptr) {
1990 logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
1991 ceph_abort("unexpected exception from ms_dispatch()");
1992 });
1993 });
1994 }
1995
1996 void ProtocolV2::execute_ready()
1997 {
1998 assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
1999 trigger_state(state_t::READY, write_state_t::open, false);
2000 #ifdef UNIT_TESTS_BUILT
2001 if (conn.interceptor) {
2002 conn.interceptor->register_conn_ready(conn);
2003 }
2004 #endif
2005 execution_done = seastar::with_gate(pending_dispatch, [this] {
2006 protocol_timer.cancel();
2007 return seastar::keep_doing([this] {
2008 return read_main_preamble()
2009 .then([this] (Tag tag) {
2010 switch (tag) {
2011 case Tag::MESSAGE: {
2012 return seastar::futurize_apply([this] {
2013 // throttle_message() logic
2014 if (!conn.policy.throttler_messages) {
2015 return seastar::now();
2016 }
2017 // TODO: message throttler
2018 ceph_assert(false);
2019 return seastar::now();
2020 }).then([this] {
2021 // throttle_bytes() logic
2022 if (!conn.policy.throttler_bytes) {
2023 return seastar::now();
2024 }
2025 size_t cur_msg_size = get_current_msg_size();
2026 if (!cur_msg_size) {
2027 return seastar::now();
2028 }
2029 logger().trace("{} wants {} bytes from policy throttler {}/{}",
2030 conn, cur_msg_size,
2031 conn.policy.throttler_bytes->get_current(),
2032 conn.policy.throttler_bytes->get_max());
2033 return conn.policy.throttler_bytes->get(cur_msg_size);
2034 }).then([this] {
2035 // TODO: throttle_dispatch_queue() logic
2036 utime_t throttle_stamp{seastar::lowres_system_clock::now()};
2037 return read_message(throttle_stamp);
2038 });
2039 }
2040 case Tag::ACK:
2041 return read_frame_payload().then([this] {
2042 // handle_message_ack() logic
2043 auto ack = AckFrame::Decode(rx_segments_data.back());
2044 logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
2045 ack_writes(ack.seq());
2046 });
2047 case Tag::KEEPALIVE2:
2048 return read_frame_payload().then([this] {
2049 // handle_keepalive2() logic
2050 auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
2051 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
2052 conn, keepalive_frame.timestamp());
2053 notify_keepalive_ack(keepalive_frame.timestamp());
2054 conn.set_last_keepalive(seastar::lowres_system_clock::now());
2055 });
2056 case Tag::KEEPALIVE2_ACK:
2057 return read_frame_payload().then([this] {
2058 // handle_keepalive2_ack() logic
2059 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
2060 conn.set_last_keepalive_ack(
2061 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
2062 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
2063 conn, conn.last_keepalive_ack);
2064 });
2065 default: {
2066 unexpected_tag(tag, conn, "execute_ready");
2067 return seastar::now();
2068 }
2069 }
2070 });
2071 }).handle_exception([this] (std::exception_ptr eptr) {
2072 if (state != state_t::READY) {
2073 logger().info("{} execute_ready(): protocol aborted at {} -- {}",
2074 conn, get_state_name(state), eptr);
2075 assert(state == state_t::REPLACING ||
2076 state == state_t::CLOSING);
2077 return;
2078 }
2079 fault(false, "execute_ready()", eptr);
2080 });
2081 });
2082 }
2083
2084 // STANDBY state
2085
2086 void ProtocolV2::execute_standby()
2087 {
2088 trigger_state(state_t::STANDBY, write_state_t::delay, true);
2089 if (socket) {
2090 socket->shutdown();
2091 }
2092 }
2093
2094 void ProtocolV2::notify_write()
2095 {
2096 if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
2097 logger().info("{} notify_write(): at {}, going to CONNECTING",
2098 conn, get_state_name(state));
2099 execute_connecting();
2100 }
2101 }
2102
2103 // WAIT state
2104
2105 void ProtocolV2::execute_wait(bool max_backoff)
2106 {
2107 trigger_state(state_t::WAIT, write_state_t::delay, true);
2108 if (socket) {
2109 socket->shutdown();
2110 }
2111 execution_done = seastar::with_gate(pending_dispatch,
2112 [this, max_backoff] {
2113 double backoff = protocol_timer.last_dur();
2114 if (max_backoff) {
2115 backoff = conf.ms_max_backoff;
2116 } else if (backoff > 0) {
2117 backoff = std::min(conf.ms_max_backoff, 2 * backoff);
2118 } else {
2119 backoff = conf.ms_initial_backoff;
2120 }
2121 return protocol_timer.backoff(backoff).then([this] {
2122 if (unlikely(state != state_t::WAIT)) {
2123 logger().debug("{} triggered {} at the end of execute_wait()",
2124 conn, get_state_name(state));
2125 abort_protocol();
2126 }
2127 logger().info("{} execute_wait(): going to CONNECTING", conn);
2128 execute_connecting();
2129 }).handle_exception([this] (std::exception_ptr eptr) {
2130 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
2131 conn, get_state_name(state), eptr);
2132 assert(state == state_t::REPLACING ||
2133 state == state_t::CLOSING);
2134 });
2135 });
2136 }
2137
2138 // SERVER_WAIT state
2139
2140 void ProtocolV2::execute_server_wait()
2141 {
2142 trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
2143 execution_done = seastar::with_gate(pending_dispatch, [this] {
2144 return read_exactly(1).then([this] (auto bl) {
2145 logger().warn("{} SERVER_WAIT got read, abort", conn);
2146 abort_in_fault();
2147 }).handle_exception([this] (std::exception_ptr eptr) {
2148 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
2149 conn, get_state_name(state), eptr);
2150 (void) close();
2151 });
2152 });
2153 }
2154
2155 // CLOSING state
2156
2157 void ProtocolV2::trigger_close()
2158 {
2159 if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
2160 messenger.unaccept_conn(
2161 seastar::static_pointer_cast<SocketConnection>(
2162 conn.shared_from_this()));
2163 } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
2164 messenger.unregister_conn(
2165 seastar::static_pointer_cast<SocketConnection>(
2166 conn.shared_from_this()));
2167 } else {
2168 // cannot happen
2169 ceph_assert(false);
2170 }
2171
2172 protocol_timer.cancel();
2173
2174 trigger_state(state_t::CLOSING, write_state_t::drop, false);
2175 #ifdef UNIT_TESTS_BUILT
2176 if (conn.interceptor) {
2177 conn.interceptor->register_conn_closed(conn);
2178 }
2179 #endif
2180 }
2181
2182 } // namespace crimson::net