]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/ProtocolV2.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV2.h"
5
6 #include <seastar/core/lowres_clock.hh>
7 #include <fmt/format.h>
8 #include "include/msgr.h"
9 #include "include/random.h"
10
11 #include "crimson/auth/AuthClient.h"
12 #include "crimson/auth/AuthServer.h"
13 #include "crimson/common/formatter.h"
14
15 #include "chained_dispatchers.h"
16 #include "Errors.h"
17 #include "Socket.h"
18 #include "SocketConnection.h"
19 #include "SocketMessenger.h"
20
21 #ifdef UNIT_TESTS_BUILT
22 #include "Interceptor.h"
23 #endif
24
25 using namespace ceph::msgr::v2;
26 using crimson::common::local_conf;
27
28 namespace {
29
30 // TODO: apply the same logging policy to Protocol V1
31 // Log levels in V2 Protocol:
32 // * error level, something error that cause connection to terminate:
33 // - fatal errors;
34 // - bugs;
35 // * warn level: something unusual that identifies connection fault or replacement:
36 // - unstable network;
37 // - incompatible peer;
38 // - auth failure;
39 // - connection race;
40 // - connection reset;
41 // * info level, something very important to show connection lifecycle,
42 // which doesn't happen very frequently;
43 // * debug level, important logs for debugging, including:
44 // - all the messages sent/received (-->/<==);
45 // - all the frames exchanged (WRITE/GOT);
46 // - important fields updated (UPDATE);
47 // - connection state transitions (TRIGGER);
48 // * trace level, trivial logs showing:
49 // - the exact bytes being sent/received (SEND/RECV(bytes));
50 // - detailed information of sub-frames;
51 // - integrity checks;
52 // - etc.
53 seastar::logger& logger() {
54 return crimson::get_logger(ceph_subsys_ms);
55 }
56
57 [[noreturn]] void abort_in_fault() {
58 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
59 }
60
61 [[noreturn]] void abort_protocol() {
62 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
63 }
64
65 [[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
66 proto.close(dispatch_reset);
67 abort_protocol();
68 }
69
70 inline void expect_tag(const Tag& expected,
71 const Tag& actual,
72 crimson::net::SocketConnection& conn,
73 const char *where) {
74 if (actual != expected) {
75 logger().warn("{} {} received wrong tag: {}, expected {}",
76 conn, where,
77 static_cast<uint32_t>(actual),
78 static_cast<uint32_t>(expected));
79 abort_in_fault();
80 }
81 }
82
83 inline void unexpected_tag(const Tag& unexpected,
84 crimson::net::SocketConnection& conn,
85 const char *where) {
86 logger().warn("{} {} received unexpected tag: {}",
87 conn, where, static_cast<uint32_t>(unexpected));
88 abort_in_fault();
89 }
90
91 inline uint64_t generate_client_cookie() {
92 return ceph::util::generate_random_number<uint64_t>(
93 1, std::numeric_limits<uint64_t>::max());
94 }
95
96 } // namespace anonymous
97
98 namespace crimson::net {
99
100 #ifdef UNIT_TESTS_BUILT
101 void intercept(Breakpoint bp, bp_type_t type,
102 SocketConnection& conn, SocketRef& socket) {
103 if (conn.interceptor) {
104 auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
105 socket->set_trap(type, action, &conn.interceptor->blocker);
106 }
107 }
108
109 #define INTERCEPT_CUSTOM(bp, type) \
110 intercept({bp}, type, conn, socket)
111
112 #define INTERCEPT_FRAME(tag, type) \
113 intercept({static_cast<Tag>(tag), type}, \
114 type, conn, socket)
115
116 #define INTERCEPT_N_RW(bp) \
117 if (conn.interceptor) { \
118 auto action = conn.interceptor->intercept(conn, {bp}); \
119 ceph_assert(action != bp_action_t::BLOCK); \
120 if (action == bp_action_t::FAULT) { \
121 abort_in_fault(); \
122 } \
123 }
124
125 #else
126 #define INTERCEPT_CUSTOM(bp, type)
127 #define INTERCEPT_FRAME(tag, type)
128 #define INTERCEPT_N_RW(bp)
129 #endif
130
131 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
132 {
133 logger().warn("{} waiting {} seconds ...", conn, seconds);
134 cancel();
135 last_dur_ = seconds;
136 as = seastar::abort_source();
137 auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
138 std::chrono::duration<double>(seconds));
139 return seastar::sleep_abortable(dur, *as
140 ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
141 logger().debug("{} wait aborted", conn);
142 abort_protocol();
143 });
144 }
145
146 ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
147 SocketConnection& conn,
148 SocketMessenger& messenger)
149 : Protocol(proto_t::v2, dispatchers, conn),
150 messenger{messenger},
151 protocol_timer{conn}
152 {}
153
154 ProtocolV2::~ProtocolV2() {}
155
156 bool ProtocolV2::is_connected() const {
157 return state == state_t::READY ||
158 state == state_t::ESTABLISHING ||
159 state == state_t::REPLACING;
160 }
161
162 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
163 const entity_name_t& _peer_name)
164 {
165 ceph_assert(state == state_t::NONE);
166 ceph_assert(!socket);
167 ceph_assert(!gate.is_closed());
168 conn.peer_addr = _peer_addr;
169 conn.target_addr = _peer_addr;
170 conn.set_peer_name(_peer_name);
171 conn.policy = messenger.get_policy(_peer_name.type());
172 client_cookie = generate_client_cookie();
173 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
174 " policy(lossy={}, server={}, standby={}, resetcheck={})",
175 conn, _peer_addr, _peer_name, client_cookie,
176 conn.policy.lossy, conn.policy.server,
177 conn.policy.standby, conn.policy.resetcheck);
178 messenger.register_conn(
179 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
180 execute_connecting();
181 }
182
183 void ProtocolV2::start_accept(SocketRef&& sock,
184 const entity_addr_t& _peer_addr)
185 {
186 ceph_assert(state == state_t::NONE);
187 ceph_assert(!socket);
188 // until we know better
189 conn.target_addr = _peer_addr;
190 socket = std::move(sock);
191 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
192 messenger.accept_conn(
193 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
194 execute_accepting();
195 }
196
197 // TODO: Frame related implementations, probably to a separate class.
198
199 void ProtocolV2::enable_recording()
200 {
201 rxbuf.clear();
202 txbuf.clear();
203 record_io = true;
204 }
205
206 seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
207 {
208 if (unlikely(record_io)) {
209 return socket->read_exactly(bytes)
210 .then([this] (auto bl) {
211 rxbuf.append(buffer::create(bl.share()));
212 return bl;
213 });
214 } else {
215 return socket->read_exactly(bytes);
216 };
217 }
218
219 seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
220 {
221 if (unlikely(record_io)) {
222 return socket->read(bytes)
223 .then([this] (auto buf) {
224 rxbuf.append(buf);
225 return buf;
226 });
227 } else {
228 return socket->read(bytes);
229 }
230 }
231
232 seastar::future<> ProtocolV2::write(bufferlist&& buf)
233 {
234 if (unlikely(record_io)) {
235 txbuf.append(buf);
236 }
237 return socket->write(std::move(buf));
238 }
239
240 seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
241 {
242 if (unlikely(record_io)) {
243 txbuf.append(buf);
244 }
245 return socket->write_flush(std::move(buf));
246 }
247
248 size_t ProtocolV2::get_current_msg_size() const
249 {
250 ceph_assert(rx_frame_asm.get_num_segments() > 0);
251 size_t sum = 0;
252 // we don't include SegmentIndex::Msg::HEADER.
253 for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
254 sum += rx_frame_asm.get_segment_logical_len(idx);
255 }
256 return sum;
257 }
258
259 seastar::future<Tag> ProtocolV2::read_main_preamble()
260 {
261 rx_preamble.clear();
262 return read_exactly(rx_frame_asm.get_preamble_onwire_len())
263 .then([this] (auto bl) {
264 rx_segments_data.clear();
265 try {
266 rx_preamble.append(buffer::create(std::move(bl)));
267 const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
268 INTERCEPT_FRAME(tag, bp_type_t::READ);
269 return tag;
270 } catch (FrameError& e) {
271 logger().warn("{} read_main_preamble: {}", conn, e.what());
272 abort_in_fault();
273 }
274 });
275 }
276
277 seastar::future<> ProtocolV2::read_frame_payload()
278 {
279 ceph_assert(rx_segments_data.empty());
280
281 return seastar::do_until(
282 [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
283 [this] {
284 // TODO: create aligned and contiguous buffer from socket
285 const size_t seg_idx = rx_segments_data.size();
286 if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
287 alignment != segment_t::DEFAULT_ALIGNMENT) {
288 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
289 conn, alignment, rx_segments_data.size());
290 }
291 uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
292 // TODO: create aligned and contiguous buffer from socket
293 return read_exactly(onwire_len).then([this] (auto tmp_bl) {
294 logger().trace("{} RECV({}) frame segment[{}]",
295 conn, tmp_bl.size(), rx_segments_data.size());
296 bufferlist segment;
297 segment.append(buffer::create(std::move(tmp_bl)));
298 rx_segments_data.emplace_back(std::move(segment));
299 });
300 }
301 ).then([this] {
302 return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
303 }).then([this] (auto bl) {
304 logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
305 bool ok = false;
306 try {
307 rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
308 bufferlist rx_epilogue;
309 rx_epilogue.append(buffer::create(std::move(bl)));
310 ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
311 } catch (FrameError& e) {
312 logger().error("read_frame_payload: {} {}", conn, e.what());
313 abort_in_fault();
314 } catch (ceph::crypto::onwire::MsgAuthError&) {
315 logger().error("read_frame_payload: {} bad auth tag", conn);
316 abort_in_fault();
317 }
318 // we do have a mechanism that allows transmitter to start sending message
319 // and abort after putting entire data field on wire. This will be used by
320 // the kernel client to avoid unnecessary buffering.
321 if (!ok) {
322 // TODO
323 ceph_assert(false);
324 }
325 });
326 }
327
328 template <class F>
329 seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
330 {
331 auto bl = frame.get_buffer(tx_frame_asm);
332 const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
333 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
334 conn, bl.length(), (int)main_preamble->tag,
335 (int)main_preamble->num_segments, main_preamble->crc);
336 INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
337 if (flush) {
338 return write_flush(std::move(bl));
339 } else {
340 return write(std::move(bl));
341 }
342 }
343
344 void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
345 {
346 if (!reentrant && _state == state) {
347 logger().error("{} is not allowed to re-trigger state {}",
348 conn, get_state_name(state));
349 ceph_assert(false);
350 }
351 logger().debug("{} TRIGGER {}, was {}",
352 conn, get_state_name(_state), get_state_name(state));
353 state = _state;
354 set_write_state(_write_state);
355 }
356
357 void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
358 {
359 if (conn.policy.lossy) {
360 logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
361 conn, func_name, get_state_name(state), eptr);
362 close(true);
363 } else if (conn.policy.server ||
364 (conn.policy.standby &&
365 (!is_queued() && conn.sent.empty()))) {
366 logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
367 conn, func_name, get_state_name(state), eptr);
368 execute_standby();
369 } else if (backoff) {
370 logger().info("{} {}: fault at {}, going to WAIT -- {}",
371 conn, func_name, get_state_name(state), eptr);
372 execute_wait(false);
373 } else {
374 logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
375 conn, func_name, get_state_name(state), eptr);
376 execute_connecting();
377 }
378 }
379
380 void ProtocolV2::reset_session(bool full)
381 {
382 server_cookie = 0;
383 connect_seq = 0;
384 conn.in_seq = 0;
385 if (full) {
386 client_cookie = generate_client_cookie();
387 peer_global_seq = 0;
388 reset_write();
389 dispatchers.ms_handle_remote_reset(
390 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
391 }
392 }
393
394 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
395 ProtocolV2::banner_exchange(bool is_connect)
396 {
397 // 1. prepare and send banner
398 bufferlist banner_payload;
399 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
400 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
401
402 bufferlist bl;
403 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
404 auto len_payload = static_cast<uint16_t>(banner_payload.length());
405 encode(len_payload, bl, 0);
406 bl.claim_append(banner_payload);
407 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
408 "required={}, banner=\"{}\"",
409 conn, bl.length(), len_payload,
410 CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
411 CEPH_BANNER_V2_PREFIX);
412 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
413 return write_flush(std::move(bl)).then([this] {
414 // 2. read peer banner
415 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
416 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
417 return read_exactly(banner_len); // or read exactly?
418 }).then([this] (auto bl) {
419 // 3. process peer banner and read banner_payload
420 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
421 logger().debug("{} RECV({}) banner: \"{}\"",
422 conn, bl.size(),
423 std::string((const char*)bl.get(), banner_prefix_len));
424
425 if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
426 if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
427 logger().warn("{} peer is using V1 protocol", conn);
428 } else {
429 logger().warn("{} peer sent bad banner", conn);
430 }
431 abort_in_fault();
432 }
433 bl.trim_front(banner_prefix_len);
434
435 uint16_t payload_len;
436 bufferlist buf;
437 buf.append(buffer::create(std::move(bl)));
438 auto ti = buf.cbegin();
439 try {
440 decode(payload_len, ti);
441 } catch (const buffer::error &e) {
442 logger().warn("{} decode banner payload len failed", conn);
443 abort_in_fault();
444 }
445 logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
446 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
447 return read(payload_len);
448 }).then([this, is_connect] (bufferlist bl) {
449 // 4. process peer banner_payload and send HelloFrame
450 auto p = bl.cbegin();
451 uint64_t peer_supported_features;
452 uint64_t peer_required_features;
453 try {
454 decode(peer_supported_features, p);
455 decode(peer_required_features, p);
456 } catch (const buffer::error &e) {
457 logger().warn("{} decode banner payload failed", conn);
458 abort_in_fault();
459 }
460 logger().debug("{} RECV({}) banner features: supported={} required={}",
461 conn, bl.length(),
462 peer_supported_features, peer_required_features);
463
464 // Check feature bit compatibility
465 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
466 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
467 if ((required_features & peer_supported_features) != required_features) {
468 logger().error("{} peer does not support all required features"
469 " required={} peer_supported={}",
470 conn, required_features, peer_supported_features);
471 abort_in_close(*this, is_connect);
472 }
473 if ((supported_features & peer_required_features) != peer_required_features) {
474 logger().error("{} we do not support all peer required features"
475 " peer_required={} supported={}",
476 conn, peer_required_features, supported_features);
477 abort_in_close(*this, is_connect);
478 }
479 this->peer_required_features = peer_required_features;
480 if (this->peer_required_features == 0) {
481 this->connection_features = msgr2_required;
482 }
483 const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
484 tx_frame_asm.set_is_rev1(is_rev1);
485 rx_frame_asm.set_is_rev1(is_rev1);
486
487 auto hello = HelloFrame::Encode(messenger.get_mytype(),
488 conn.target_addr);
489 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
490 conn, ceph_entity_type_name(messenger.get_mytype()),
491 conn.target_addr);
492 return write_frame(hello);
493 }).then([this] {
494 //5. read peer HelloFrame
495 return read_main_preamble();
496 }).then([this] (Tag tag) {
497 expect_tag(Tag::HELLO, tag, conn, __func__);
498 return read_frame_payload();
499 }).then([this] {
500 // 6. process peer HelloFrame
501 auto hello = HelloFrame::Decode(rx_segments_data.back());
502 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
503 conn, ceph_entity_type_name(hello.entity_type()),
504 hello.peer_addr());
505 return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
506 std::make_tuple(hello.entity_type(), hello.peer_addr()));
507 });
508 }
509
510 // CONNECTING state
511
512 seastar::future<> ProtocolV2::handle_auth_reply()
513 {
514 return read_main_preamble()
515 .then([this] (Tag tag) {
516 switch (tag) {
517 case Tag::AUTH_BAD_METHOD:
518 return read_frame_payload().then([this] {
519 // handle_auth_bad_method() logic
520 auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
521 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
522 "allowed_methods={}, allowed_modes={}",
523 conn, bad_method.method(), cpp_strerror(bad_method.result()),
524 bad_method.allowed_methods(), bad_method.allowed_modes());
525 ceph_assert(messenger.get_auth_client());
526 int r = messenger.get_auth_client()->handle_auth_bad_method(
527 conn.shared_from_this(), auth_meta,
528 bad_method.method(), bad_method.result(),
529 bad_method.allowed_methods(), bad_method.allowed_modes());
530 if (r < 0) {
531 logger().warn("{} auth_client handle_auth_bad_method returned {}",
532 conn, r);
533 abort_in_fault();
534 }
535 return client_auth(bad_method.allowed_methods());
536 });
537 case Tag::AUTH_REPLY_MORE:
538 return read_frame_payload().then([this] {
539 // handle_auth_reply_more() logic
540 auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
541 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
542 conn, auth_more.auth_payload().length());
543 ceph_assert(messenger.get_auth_client());
544 // let execute_connecting() take care of the thrown exception
545 auto reply = messenger.get_auth_client()->handle_auth_reply_more(
546 conn.shared_from_this(), auth_meta, auth_more.auth_payload());
547 auto more_reply = AuthRequestMoreFrame::Encode(reply);
548 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
549 conn, reply.length());
550 return write_frame(more_reply);
551 }).then([this] {
552 return handle_auth_reply();
553 });
554 case Tag::AUTH_DONE:
555 return read_frame_payload().then([this] {
556 // handle_auth_done() logic
557 auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
558 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
559 conn, auth_done.global_id(),
560 ceph_con_mode_name(auth_done.con_mode()),
561 auth_done.auth_payload().length());
562 ceph_assert(messenger.get_auth_client());
563 int r = messenger.get_auth_client()->handle_auth_done(
564 conn.shared_from_this(), auth_meta,
565 auth_done.global_id(),
566 auth_done.con_mode(),
567 auth_done.auth_payload());
568 if (r < 0) {
569 logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
570 abort_in_fault();
571 }
572 auth_meta->con_mode = auth_done.con_mode();
573 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
574 nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false);
575 return finish_auth();
576 });
577 default: {
578 unexpected_tag(tag, conn, __func__);
579 return seastar::now();
580 }
581 }
582 });
583 }
584
585 seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
586 {
587 // send_auth_request() logic
588 ceph_assert(messenger.get_auth_client());
589
590 try {
591 auto [auth_method, preferred_modes, bl] =
592 messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
593 auth_meta->auth_method = auth_method;
594 auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
595 logger().debug("{} WRITE AuthRequestFrame: method={},"
596 " preferred_modes={}, payload_len={}",
597 conn, auth_method, preferred_modes, bl.length());
598 return write_frame(frame).then([this] {
599 return handle_auth_reply();
600 });
601 } catch (const crimson::auth::error& e) {
602 logger().error("{} get_initial_auth_request returned {}", conn, e);
603 abort_in_close(*this, true);
604 return seastar::now();
605 }
606 }
607
608 seastar::future<ProtocolV2::next_step_t>
609 ProtocolV2::process_wait()
610 {
611 return read_frame_payload().then([this] {
612 // handle_wait() logic
613 logger().debug("{} GOT WaitFrame", conn);
614 WaitFrame::Decode(rx_segments_data.back());
615 return next_step_t::wait;
616 });
617 }
618
619 seastar::future<ProtocolV2::next_step_t>
620 ProtocolV2::client_connect()
621 {
622 // send_client_ident() logic
623 uint64_t flags = 0;
624 if (conn.policy.lossy) {
625 flags |= CEPH_MSG_CONNECT_LOSSY;
626 }
627
628 auto client_ident = ClientIdentFrame::Encode(
629 messenger.get_myaddrs(),
630 conn.target_addr,
631 messenger.get_myname().num(),
632 global_seq,
633 conn.policy.features_supported,
634 conn.policy.features_required | msgr2_required, flags,
635 client_cookie);
636
637 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
638 " gs={}, features_supported={}, features_required={},"
639 " flags={}, cookie={}",
640 conn, messenger.get_myaddrs(), conn.target_addr,
641 messenger.get_myname().num(), global_seq,
642 conn.policy.features_supported,
643 conn.policy.features_required | msgr2_required,
644 flags, client_cookie);
645 return write_frame(client_ident).then([this] {
646 return read_main_preamble();
647 }).then([this] (Tag tag) {
648 switch (tag) {
649 case Tag::IDENT_MISSING_FEATURES:
650 return read_frame_payload().then([this] {
651 // handle_ident_missing_features() logic
652 auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
653 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
654 " (client does not support all server features)",
655 conn, ident_missing.features());
656 abort_in_fault();
657 return next_step_t::none;
658 });
659 case Tag::WAIT:
660 return process_wait();
661 case Tag::SERVER_IDENT:
662 return read_frame_payload().then([this] {
663 // handle_server_ident() logic
664 requeue_sent();
665 auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
666 logger().debug("{} GOT ServerIdentFrame:"
667 " addrs={}, gid={}, gs={},"
668 " features_supported={}, features_required={},"
669 " flags={}, cookie={}",
670 conn,
671 server_ident.addrs(), server_ident.gid(),
672 server_ident.global_seq(),
673 server_ident.supported_features(),
674 server_ident.required_features(),
675 server_ident.flags(), server_ident.cookie());
676
677 // is this who we intended to talk to?
678 // be a bit forgiving here, since we may be connecting based on addresses parsed out
679 // of mon_host or something.
680 if (!server_ident.addrs().contains(conn.target_addr)) {
681 logger().warn("{} peer identifies as {}, does not include {}",
682 conn, server_ident.addrs(), conn.target_addr);
683 throw std::system_error(
684 make_error_code(crimson::net::error::bad_peer_address));
685 }
686
687 server_cookie = server_ident.cookie();
688
689 // TODO: change peer_addr to entity_addrvec_t
690 if (server_ident.addrs().front() != conn.peer_addr) {
691 logger().warn("{} peer advertises as {}, does not match {}",
692 conn, server_ident.addrs(), conn.peer_addr);
693 throw std::system_error(
694 make_error_code(crimson::net::error::bad_peer_address));
695 }
696 if (conn.get_peer_id() != entity_name_t::NEW &&
697 conn.get_peer_id() != server_ident.gid()) {
698 logger().error("{} connection peer id ({}) does not match "
699 "what it should be ({}) during connecting, close",
700 conn, server_ident.gid(), conn.get_peer_id());
701 abort_in_close(*this, true);
702 }
703 conn.set_peer_id(server_ident.gid());
704 conn.set_features(server_ident.supported_features() &
705 conn.policy.features_supported);
706 peer_global_seq = server_ident.global_seq();
707
708 bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
709 if (lossy != conn.policy.lossy) {
710 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy);
711 conn.policy.lossy = lossy;
712 }
713 if (lossy && (connect_seq != 0 || server_cookie != 0)) {
714 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
715 conn, connect_seq, server_cookie);
716 connect_seq = 0;
717 server_cookie = 0;
718 }
719
720 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
721 });
722 default: {
723 unexpected_tag(tag, conn, "post_client_connect");
724 return seastar::make_ready_future<next_step_t>(next_step_t::none);
725 }
726 }
727 });
728 }
729
730 seastar::future<ProtocolV2::next_step_t>
731 ProtocolV2::client_reconnect()
732 {
733 // send_reconnect() logic
734 auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
735 client_cookie,
736 server_cookie,
737 global_seq,
738 connect_seq,
739 conn.in_seq);
740 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
741 " server_cookie={}, gs={}, cs={}, msg_seq={}",
742 conn, messenger.get_myaddrs(),
743 client_cookie, server_cookie,
744 global_seq, connect_seq, conn.in_seq);
745 return write_frame(reconnect).then([this] {
746 return read_main_preamble();
747 }).then([this] (Tag tag) {
748 switch (tag) {
749 case Tag::SESSION_RETRY_GLOBAL:
750 return read_frame_payload().then([this] {
751 // handle_session_retry_global() logic
752 auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
753 logger().warn("{} GOT RetryGlobalFrame: gs={}",
754 conn, retry.global_seq());
755 return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
756 global_seq = gs;
757 logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
758 return client_reconnect();
759 });
760 });
761 case Tag::SESSION_RETRY:
762 return read_frame_payload().then([this] {
763 // handle_session_retry() logic
764 auto retry = RetryFrame::Decode(rx_segments_data.back());
765 logger().warn("{} GOT RetryFrame: cs={}",
766 conn, retry.connect_seq());
767 connect_seq = retry.connect_seq() + 1;
768 logger().warn("{} UPDATE: cs={}", conn, connect_seq);
769 return client_reconnect();
770 });
771 case Tag::SESSION_RESET:
772 return read_frame_payload().then([this] {
773 // handle_session_reset() logic
774 auto reset = ResetFrame::Decode(rx_segments_data.back());
775 logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
776 reset_session(reset.full());
777 return client_connect();
778 });
779 case Tag::WAIT:
780 return process_wait();
781 case Tag::SESSION_RECONNECT_OK:
782 return read_frame_payload().then([this] {
783 // handle_reconnect_ok() logic
784 auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
785 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
786 conn, reconnect_ok.msg_seq());
787 requeue_up_to(reconnect_ok.msg_seq());
788 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
789 });
790 default: {
791 unexpected_tag(tag, conn, "post_client_reconnect");
792 return seastar::make_ready_future<next_step_t>(next_step_t::none);
793 }
794 }
795 });
796 }
797
798 void ProtocolV2::execute_connecting()
799 {
800 trigger_state(state_t::CONNECTING, write_state_t::delay, true);
801 if (socket) {
802 socket->shutdown();
803 }
804 gated_execute("execute_connecting", [this] {
805 return messenger.get_global_seq().then([this] (auto gs) {
806 global_seq = gs;
807 assert(client_cookie != 0);
808 if (!conn.policy.lossy && server_cookie != 0) {
809 ++connect_seq;
810 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
811 conn, global_seq, connect_seq);
812 } else { // conn.policy.lossy || server_cookie == 0
813 assert(connect_seq == 0);
814 assert(server_cookie == 0);
815 logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
816 }
817
818 return wait_write_exit();
819 }).then([this] {
820 if (unlikely(state != state_t::CONNECTING)) {
821 logger().debug("{} triggered {} before Socket::connect()",
822 conn, get_state_name(state));
823 abort_protocol();
824 }
825 if (socket) {
826 gate.dispatch_in_background("close_sockect_connecting", *this,
827 [sock = std::move(socket)] () mutable {
828 return sock->close().then([sock = std::move(sock)] {});
829 });
830 }
831 INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
832 return Socket::connect(conn.peer_addr);
833 }).then([this](SocketRef sock) {
834 logger().debug("{} socket connected", conn);
835 if (unlikely(state != state_t::CONNECTING)) {
836 logger().debug("{} triggered {} during Socket::connect()",
837 conn, get_state_name(state));
838 return sock->close().then([sock = std::move(sock)] {
839 abort_protocol();
840 });
841 }
842 socket = std::move(sock);
843 return seastar::now();
844 }).then([this] {
845 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
846 session_stream_handlers = { nullptr, nullptr };
847 enable_recording();
848 return banner_exchange(true);
849 }).then([this] (auto&& ret) {
850 auto [_peer_type, _my_addr_from_peer] = std::move(ret);
851 if (conn.get_peer_type() != _peer_type) {
852 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
853 conn, ceph_entity_type_name(conn.get_peer_type()),
854 ceph_entity_type_name(_peer_type));
855 abort_in_close(*this, true);
856 }
857 if (unlikely(state != state_t::CONNECTING)) {
858 logger().debug("{} triggered {} during banner_exchange(), abort",
859 conn, get_state_name(state));
860 abort_protocol();
861 }
862 socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
863 if (unlikely(_my_addr_from_peer.is_legacy())) {
864 logger().warn("{} peer sent a legacy address for me: {}",
865 conn, _my_addr_from_peer);
866 throw std::system_error(
867 make_error_code(crimson::net::error::bad_peer_address));
868 }
869 _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
870 return messenger.learned_addr(_my_addr_from_peer, conn);
871 }).then([this] {
872 return client_auth();
873 }).then([this] {
874 if (server_cookie == 0) {
875 ceph_assert(connect_seq == 0);
876 return client_connect();
877 } else {
878 ceph_assert(connect_seq > 0);
879 return client_reconnect();
880 }
881 }).then([this] (next_step_t next) {
882 if (unlikely(state != state_t::CONNECTING)) {
883 logger().debug("{} triggered {} at the end of execute_connecting()",
884 conn, get_state_name(state));
885 abort_protocol();
886 }
887 switch (next) {
888 case next_step_t::ready: {
889 logger().info("{} connected:"
890 " gs={}, pgs={}, cs={}, client_cookie={},"
891 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
892 conn, global_seq, peer_global_seq, connect_seq,
893 client_cookie, server_cookie, conn.in_seq,
894 conn.out_seq, conn.out_q.size());
895 execute_ready(true);
896 break;
897 }
898 case next_step_t::wait: {
899 logger().info("{} execute_connecting(): going to WAIT", conn);
900 execute_wait(true);
901 break;
902 }
903 default: {
904 ceph_abort("impossible next step");
905 }
906 }
907 }).handle_exception([this] (std::exception_ptr eptr) {
908 if (state != state_t::CONNECTING) {
909 logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
910 conn, get_state_name(state), eptr);
911 assert(state == state_t::CLOSING ||
912 state == state_t::REPLACING);
913 return;
914 }
915
916 if (conn.policy.server ||
917 (conn.policy.standby &&
918 (!is_queued() && conn.sent.empty()))) {
919 logger().info("{} execute_connecting(): fault at {} with nothing to send,"
920 " going to STANDBY -- {}",
921 conn, get_state_name(state), eptr);
922 execute_standby();
923 } else {
924 logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
925 conn, get_state_name(state), eptr);
926 execute_wait(false);
927 }
928 });
929 });
930 }
931
932 // ACCEPTING state
933
934 seastar::future<> ProtocolV2::_auth_bad_method(int r)
935 {
936 // _auth_bad_method() logic
937 ceph_assert(r < 0);
938 auto [allowed_methods, allowed_modes] =
939 messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
940 auto bad_method = AuthBadMethodFrame::Encode(
941 auth_meta->auth_method, r, allowed_methods, allowed_modes);
942 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
943 "allowed_methods={}, allowed_modes={})",
944 conn, auth_meta->auth_method, cpp_strerror(r),
945 allowed_methods, allowed_modes);
946 return write_frame(bad_method).then([this] {
947 return server_auth();
948 });
949 }
950
951 seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
952 {
953 // _handle_auth_request() logic
954 ceph_assert(messenger.get_auth_server());
955 bufferlist reply;
956 int r = messenger.get_auth_server()->handle_auth_request(
957 conn.shared_from_this(), auth_meta,
958 more, auth_meta->auth_method, auth_payload,
959 &reply);
960 switch (r) {
961 // successful
962 case 1: {
963 auto auth_done = AuthDoneFrame::Encode(
964 conn.peer_global_id, auth_meta->con_mode, reply);
965 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
966 conn, conn.peer_global_id,
967 ceph_con_mode_name(auth_meta->con_mode), reply.length());
968 return write_frame(auth_done).then([this] {
969 ceph_assert(auth_meta);
970 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
971 nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true);
972 return finish_auth();
973 });
974 }
975 // auth more
976 case 0: {
977 auto more = AuthReplyMoreFrame::Encode(reply);
978 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
979 conn, reply.length());
980 return write_frame(more).then([this] {
981 return read_main_preamble();
982 }).then([this] (Tag tag) {
983 expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
984 return read_frame_payload();
985 }).then([this] {
986 auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
987 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
988 conn, auth_more.auth_payload().length());
989 return _handle_auth_request(auth_more.auth_payload(), true);
990 });
991 }
992 case -EBUSY: {
993 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn);
994 abort_in_fault();
995 return seastar::now();
996 }
997 default: {
998 logger().warn("{} auth_server handle_auth_request returned {}", conn, r);
999 return _auth_bad_method(r);
1000 }
1001 }
1002 }
1003
1004 seastar::future<> ProtocolV2::server_auth()
1005 {
1006 return read_main_preamble()
1007 .then([this] (Tag tag) {
1008 expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
1009 return read_frame_payload();
1010 }).then([this] {
1011 // handle_auth_request() logic
1012 auto request = AuthRequestFrame::Decode(rx_segments_data.back());
1013 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1014 " payload_len={}",
1015 conn, request.method(), request.preferred_modes(),
1016 request.auth_payload().length());
1017 auth_meta->auth_method = request.method();
1018 auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode(
1019 conn.get_peer_type(), auth_meta->auth_method,
1020 request.preferred_modes());
1021 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
1022 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn);
1023 return _auth_bad_method(-EOPNOTSUPP);
1024 }
1025 return _handle_auth_request(request.auth_payload(), false);
1026 });
1027 }
1028
1029 bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
1030 {
1031 auto my_peer_name = conn.get_peer_name();
1032 if (my_peer_name.type() != peer_name.type()) {
1033 return false;
1034 }
1035 if (my_peer_name.num() != entity_name_t::NEW &&
1036 peer_name.num() != entity_name_t::NEW &&
1037 my_peer_name.num() != peer_name.num()) {
1038 return false;
1039 }
1040 return true;
1041 }
1042
1043 seastar::future<ProtocolV2::next_step_t>
1044 ProtocolV2::send_wait()
1045 {
1046 auto wait = WaitFrame::Encode();
1047 logger().debug("{} WRITE WaitFrame", conn);
1048 return write_frame(wait).then([] {
1049 return next_step_t::wait;
1050 });
1051 }
1052
1053 seastar::future<ProtocolV2::next_step_t>
1054 ProtocolV2::reuse_connection(
1055 ProtocolV2* existing_proto, bool do_reset,
1056 bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
1057 {
1058 existing_proto->trigger_replacing(reconnect,
1059 do_reset,
1060 std::move(socket),
1061 std::move(auth_meta),
1062 std::move(session_stream_handlers),
1063 peer_global_seq,
1064 client_cookie,
1065 conn.get_peer_name(),
1066 connection_features,
1067 tx_frame_asm.get_is_rev1(),
1068 rx_frame_asm.get_is_rev1(),
1069 conn_seq,
1070 msg_seq);
1071 #ifdef UNIT_TESTS_BUILT
1072 if (conn.interceptor) {
1073 conn.interceptor->register_conn_replaced(conn);
1074 }
1075 #endif
1076 // close this connection because all the necessary information is delivered
1077 // to the exisiting connection, and jump to error handling code to abort the
1078 // current state.
1079 abort_in_close(*this, false);
1080 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1081 }
1082
1083 seastar::future<ProtocolV2::next_step_t>
1084 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
1085 {
1086 // handle_existing_connection() logic
1087 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1088 existing_conn->protocol.get());
1089 ceph_assert(existing_proto);
1090 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1091 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1092 conn, global_seq, peer_global_seq, connect_seq,
1093 client_cookie, server_cookie,
1094 existing_conn, get_state_name(existing_proto->state),
1095 existing_proto->global_seq,
1096 existing_proto->peer_global_seq,
1097 existing_proto->connect_seq,
1098 existing_proto->client_cookie,
1099 existing_proto->server_cookie);
1100
1101 if (!validate_peer_name(existing_conn->get_peer_name())) {
1102 logger().error("{} server_connect: my peer_name doesn't match"
1103 " the existing connection {}, abort", conn, existing_conn);
1104 abort_in_fault();
1105 }
1106
1107 if (existing_proto->state == state_t::REPLACING) {
1108 logger().warn("{} server_connect: racing replace happened while"
1109 " replacing existing connection {}, send wait.",
1110 conn, *existing_conn);
1111 return send_wait();
1112 }
1113
1114 if (existing_proto->peer_global_seq > peer_global_seq) {
1115 logger().warn("{} server_connect:"
1116 " this is a stale connection, because peer_global_seq({})"
1117 " < existing->peer_global_seq({}), close this connection"
1118 " in favor of existing connection {}",
1119 conn, peer_global_seq,
1120 existing_proto->peer_global_seq, *existing_conn);
1121 abort_in_fault();
1122 }
1123
1124 if (existing_conn->policy.lossy) {
1125 // existing connection can be thrown out in favor of this one
1126 logger().warn("{} server_connect:"
1127 " existing connection {} is a lossy channel. Close existing in favor of"
1128 " this connection", conn, *existing_conn);
1129 execute_establishing(existing_conn, true);
1130 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1131 }
1132
1133 if (existing_proto->server_cookie != 0) {
1134 if (existing_proto->client_cookie != client_cookie) {
1135 // Found previous session
1136 // peer has reset and we're going to reuse the existing connection
1137 // by replacing the socket
1138 logger().warn("{} server_connect:"
1139 " found new session (cs={})"
1140 " when existing {} is with stale session (cs={}, ss={}),"
1141 " peer must have reset",
1142 conn, client_cookie,
1143 *existing_conn, existing_proto->client_cookie,
1144 existing_proto->server_cookie);
1145 return reuse_connection(existing_proto, conn.policy.resetcheck);
1146 } else {
1147 // session establishment interrupted between client_ident and server_ident,
1148 // continuing...
1149 logger().warn("{} server_connect: found client session with existing {}"
1150 " matched (cs={}, ss={}), continuing session establishment",
1151 conn, *existing_conn, client_cookie, existing_proto->server_cookie);
1152 return reuse_connection(existing_proto);
1153 }
1154 } else {
1155 // Looks like a connection race: server and client are both connecting to
1156 // each other at the same time.
1157 if (existing_proto->client_cookie != client_cookie) {
1158 if (existing_conn->peer_wins()) {
1159 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1160 " and win, reusing existing {}",
1161 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1162 return reuse_connection(existing_proto);
1163 } else {
1164 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1165 " and lose to existing {}, ask client to wait",
1166 conn, client_cookie, existing_proto->client_cookie, *existing_conn);
1167 return existing_conn->keepalive().then([this] {
1168 return send_wait();
1169 });
1170 }
1171 } else {
1172 logger().warn("{} server_connect: found client session with existing {}"
1173 " matched (cs={}, ss={}), continuing session establishment",
1174 conn, *existing_conn, client_cookie, existing_proto->server_cookie);
1175 return reuse_connection(existing_proto);
1176 }
1177 }
1178 }
1179
1180 seastar::future<ProtocolV2::next_step_t>
1181 ProtocolV2::server_connect()
1182 {
1183 return read_frame_payload().then([this] {
1184 // handle_client_ident() logic
1185 auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
1186 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1187 " gid={}, gs={}, features_supported={},"
1188 " features_required={}, flags={}, cookie={}",
1189 conn, client_ident.addrs(), client_ident.target_addr(),
1190 client_ident.gid(), client_ident.global_seq(),
1191 client_ident.supported_features(),
1192 client_ident.required_features(),
1193 client_ident.flags(), client_ident.cookie());
1194
1195 if (client_ident.addrs().empty() ||
1196 client_ident.addrs().front() == entity_addr_t()) {
1197 logger().warn("{} oops, client_ident.addrs() is empty", conn);
1198 throw std::system_error(
1199 make_error_code(crimson::net::error::bad_peer_address));
1200 }
1201 if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
1202 logger().warn("{} peer is trying to reach {} which is not us ({})",
1203 conn, client_ident.target_addr(), messenger.get_myaddrs());
1204 throw std::system_error(
1205 make_error_code(crimson::net::error::bad_peer_address));
1206 }
1207 // TODO: change peer_addr to entity_addrvec_t
1208 entity_addr_t paddr = client_ident.addrs().front();
1209 if ((paddr.is_msgr2() || paddr.is_any()) &&
1210 paddr.is_same_host(conn.target_addr)) {
1211 // good
1212 } else {
1213 logger().warn("{} peer's address {} is not v2 or not the same host with {}",
1214 conn, paddr, conn.target_addr);
1215 throw std::system_error(
1216 make_error_code(crimson::net::error::bad_peer_address));
1217 }
1218 conn.peer_addr = paddr;
1219 logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
1220 conn.target_addr = conn.peer_addr;
1221 if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
1222 logger().warn("{} we don't know how to reconnect to peer {}",
1223 conn, conn.target_addr);
1224 throw std::system_error(
1225 make_error_code(crimson::net::error::bad_peer_address));
1226 }
1227
1228 if (conn.get_peer_id() != entity_name_t::NEW &&
1229 conn.get_peer_id() != client_ident.gid()) {
1230 logger().error("{} client_ident peer_id ({}) does not match"
1231 " what it should be ({}) during accepting, abort",
1232 conn, client_ident.gid(), conn.get_peer_id());
1233 abort_in_fault();
1234 }
1235 conn.set_peer_id(client_ident.gid());
1236 client_cookie = client_ident.cookie();
1237
1238 uint64_t feat_missing =
1239 (conn.policy.features_required | msgr2_required) &
1240 ~(uint64_t)client_ident.supported_features();
1241 if (feat_missing) {
1242 auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
1243 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1244 conn, feat_missing);
1245 return write_frame(ident_missing_features).then([] {
1246 return next_step_t::wait;
1247 });
1248 }
1249 connection_features =
1250 client_ident.supported_features() & conn.policy.features_supported;
1251 logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
1252
1253 peer_global_seq = client_ident.global_seq();
1254
1255 // Looks good so far, let's check if there is already an existing connection
1256 // to this peer.
1257
1258 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1259
1260 if (existing_conn) {
1261 if (existing_conn->protocol->proto_type != proto_t::v2) {
1262 logger().warn("{} existing connection {} proto version is {}, close existing",
1263 conn, *existing_conn,
1264 static_cast<int>(existing_conn->protocol->proto_type));
1265 // should unregister the existing from msgr atomically
1266 // NOTE: this is following async messenger logic, but we may miss the reset event.
1267 execute_establishing(existing_conn, false);
1268 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1269 } else {
1270 return handle_existing_connection(existing_conn);
1271 }
1272 } else {
1273 execute_establishing(nullptr, true);
1274 return seastar::make_ready_future<next_step_t>(next_step_t::ready);
1275 }
1276 });
1277 }
1278
1279 seastar::future<ProtocolV2::next_step_t>
1280 ProtocolV2::read_reconnect()
1281 {
1282 return read_main_preamble()
1283 .then([this] (Tag tag) {
1284 expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
1285 return server_reconnect();
1286 });
1287 }
1288
1289 seastar::future<ProtocolV2::next_step_t>
1290 ProtocolV2::send_retry(uint64_t connect_seq)
1291 {
1292 auto retry = RetryFrame::Encode(connect_seq);
1293 logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
1294 return write_frame(retry).then([this] {
1295 return read_reconnect();
1296 });
1297 }
1298
1299 seastar::future<ProtocolV2::next_step_t>
1300 ProtocolV2::send_retry_global(uint64_t global_seq)
1301 {
1302 auto retry = RetryGlobalFrame::Encode(global_seq);
1303 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
1304 return write_frame(retry).then([this] {
1305 return read_reconnect();
1306 });
1307 }
1308
1309 seastar::future<ProtocolV2::next_step_t>
1310 ProtocolV2::send_reset(bool full)
1311 {
1312 auto reset = ResetFrame::Encode(full);
1313 logger().warn("{} WRITE ResetFrame: full={}", conn, full);
1314 return write_frame(reset).then([this] {
1315 return read_main_preamble();
1316 }).then([this] (Tag tag) {
1317 expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
1318 return server_connect();
1319 });
1320 }
1321
1322 seastar::future<ProtocolV2::next_step_t>
1323 ProtocolV2::server_reconnect()
1324 {
1325 return read_frame_payload().then([this] {
1326 // handle_reconnect() logic
1327 auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
1328
1329 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1330 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1331 conn, reconnect.addrs(),
1332 reconnect.client_cookie(), reconnect.server_cookie(),
1333 reconnect.global_seq(), reconnect.connect_seq(),
1334 reconnect.msg_seq());
1335
1336 // can peer_addrs be changed on-the-fly?
1337 // TODO: change peer_addr to entity_addrvec_t
1338 entity_addr_t paddr = reconnect.addrs().front();
1339 if (paddr.is_msgr2() || paddr.is_any()) {
1340 // good
1341 } else {
1342 logger().warn("{} peer's address {} is not v2", conn, paddr);
1343 throw std::system_error(
1344 make_error_code(crimson::net::error::bad_peer_address));
1345 }
1346 if (conn.peer_addr == entity_addr_t()) {
1347 conn.peer_addr = paddr;
1348 } else if (conn.peer_addr != paddr) {
1349 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1350 " reconnect failed",
1351 conn, paddr, conn.peer_addr);
1352 throw std::system_error(
1353 make_error_code(crimson::net::error::bad_peer_address));
1354 }
1355 peer_global_seq = reconnect.global_seq();
1356
1357 SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
1358
1359 if (!existing_conn) {
1360 // there is no existing connection therefore cannot reconnect to previous
1361 // session
1362 logger().warn("{} server_reconnect: no existing connection from address {},"
1363 " reseting client", conn, conn.peer_addr);
1364 return send_reset(true);
1365 }
1366
1367 if (existing_conn->protocol->proto_type != proto_t::v2) {
1368 logger().warn("{} server_reconnect: existing connection {} proto version is {},"
1369 "close existing and reset client.",
1370 conn, *existing_conn,
1371 static_cast<int>(existing_conn->protocol->proto_type));
1372 // NOTE: this is following async messenger logic, but we may miss the reset event.
1373 existing_conn->mark_down();
1374 return send_reset(true);
1375 }
1376
1377 ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
1378 existing_conn->protocol.get());
1379 ceph_assert(existing_proto);
1380 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1381 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1382 conn, global_seq, peer_global_seq, reconnect.connect_seq(),
1383 reconnect.client_cookie(), reconnect.server_cookie(),
1384 existing_conn,
1385 get_state_name(existing_proto->state),
1386 existing_proto->global_seq,
1387 existing_proto->peer_global_seq,
1388 existing_proto->connect_seq,
1389 existing_proto->client_cookie,
1390 existing_proto->server_cookie);
1391
1392 if (!validate_peer_name(existing_conn->get_peer_name())) {
1393 logger().error("{} server_reconnect: my peer_name doesn't match"
1394 " the existing connection {}, abort", conn, existing_conn);
1395 abort_in_fault();
1396 }
1397
1398 if (existing_proto->state == state_t::REPLACING) {
1399 logger().warn("{} server_reconnect: racing replace happened while "
1400 " replacing existing connection {}, retry global.",
1401 conn, *existing_conn);
1402 return send_retry_global(existing_proto->peer_global_seq);
1403 }
1404
1405 if (existing_proto->client_cookie != reconnect.client_cookie()) {
1406 logger().warn("{} server_reconnect:"
1407 " client_cookie mismatch with existing connection {},"
1408 " cc={} rcc={}. I must have reset, reseting client.",
1409 conn, *existing_conn,
1410 existing_proto->client_cookie, reconnect.client_cookie());
1411 return send_reset(conn.policy.resetcheck);
1412 } else if (existing_proto->server_cookie == 0) {
1413 // this happens when:
1414 // - a connects to b
1415 // - a sends client_ident
1416 // - b gets client_ident, sends server_ident and sets cookie X
1417 // - connection fault
1418 // - b reconnects to a with cookie X, connect_seq=1
1419 // - a has cookie==0
1420 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1421 " server_ident with existing connection {}."
1422 " Asking peer to resume session establishment",
1423 conn, existing_proto->client_cookie, *existing_conn);
1424 return send_reset(false);
1425 }
1426
1427 if (existing_proto->peer_global_seq > reconnect.global_seq()) {
1428 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1429 " with existing connection {},"
1430 " ask client to retry global",
1431 conn, existing_proto->peer_global_seq,
1432 reconnect.global_seq(), *existing_conn);
1433 return send_retry_global(existing_proto->peer_global_seq);
1434 }
1435
1436 if (existing_proto->connect_seq > reconnect.connect_seq()) {
1437 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1438 " with existing connection {}, ask client to retry",
1439 conn, reconnect.connect_seq(),
1440 existing_proto->connect_seq, *existing_conn);
1441 return send_retry(existing_proto->connect_seq);
1442 } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
1443 // reconnect race: both peers are sending reconnect messages
1444 if (existing_conn->peer_wins()) {
1445 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1446 " and win, reusing existing {}",
1447 conn, reconnect.connect_seq(), *existing_conn);
1448 return reuse_connection(
1449 existing_proto, false,
1450 true, reconnect.connect_seq(), reconnect.msg_seq());
1451 } else {
1452 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1453 " and lose to existing {}, ask client to wait",
1454 conn, reconnect.connect_seq(), *existing_conn);
1455 return send_wait();
1456 }
1457 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1458 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1459 " reusing existing {}",
1460 conn, existing_proto->connect_seq,
1461 reconnect.connect_seq(), *existing_conn);
1462 return reuse_connection(
1463 existing_proto, false,
1464 true, reconnect.connect_seq(), reconnect.msg_seq());
1465 }
1466 });
1467 }
1468
1469 void ProtocolV2::execute_accepting()
1470 {
1471 trigger_state(state_t::ACCEPTING, write_state_t::none, false);
1472 gate.dispatch_in_background("execute_accepting", *this, [this] {
1473 return seastar::futurize_invoke([this] {
1474 INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
1475 auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
1476 session_stream_handlers = { nullptr, nullptr };
1477 enable_recording();
1478 return banner_exchange(false);
1479 }).then([this] (auto&& ret) {
1480 auto [_peer_type, _my_addr_from_peer] = std::move(ret);
1481 ceph_assert(conn.get_peer_type() == 0);
1482 conn.set_peer_type(_peer_type);
1483
1484 conn.policy = messenger.get_policy(_peer_type);
1485 logger().info("{} UPDATE: peer_type={},"
1486 " policy(lossy={} server={} standby={} resetcheck={})",
1487 conn, ceph_entity_type_name(_peer_type),
1488 conn.policy.lossy, conn.policy.server,
1489 conn.policy.standby, conn.policy.resetcheck);
1490 if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
1491 messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
1492 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1493 conn, _my_addr_from_peer, messenger.get_myaddr());
1494 throw std::system_error(
1495 make_error_code(crimson::net::error::bad_peer_address));
1496 }
1497 return messenger.learned_addr(_my_addr_from_peer, conn);
1498 }).then([this] {
1499 return server_auth();
1500 }).then([this] {
1501 return read_main_preamble();
1502 }).then([this] (Tag tag) {
1503 switch (tag) {
1504 case Tag::CLIENT_IDENT:
1505 return server_connect();
1506 case Tag::SESSION_RECONNECT:
1507 return server_reconnect();
1508 default: {
1509 unexpected_tag(tag, conn, "post_server_auth");
1510 return seastar::make_ready_future<next_step_t>(next_step_t::none);
1511 }
1512 }
1513 }).then([this] (next_step_t next) {
1514 switch (next) {
1515 case next_step_t::ready:
1516 assert(state != state_t::ACCEPTING);
1517 break;
1518 case next_step_t::wait:
1519 if (unlikely(state != state_t::ACCEPTING)) {
1520 logger().debug("{} triggered {} at the end of execute_accepting()",
1521 conn, get_state_name(state));
1522 abort_protocol();
1523 }
1524 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
1525 execute_server_wait();
1526 break;
1527 default:
1528 ceph_abort("impossible next step");
1529 }
1530 }).handle_exception([this] (std::exception_ptr eptr) {
1531 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1532 conn, get_state_name(state), eptr);
1533 close(false);
1534 });
1535 });
1536 }
1537
1538 // CONNECTING or ACCEPTING state
1539
1540 seastar::future<> ProtocolV2::finish_auth()
1541 {
1542 ceph_assert(auth_meta);
1543
1544 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1545 auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
1546 auto sig_frame = AuthSignatureFrame::Encode(sig);
1547 ceph_assert(record_io);
1548 record_io = false;
1549 rxbuf.clear();
1550 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
1551 return write_frame(sig_frame).then([this] {
1552 return read_main_preamble();
1553 }).then([this] (Tag tag) {
1554 expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
1555 return read_frame_payload();
1556 }).then([this] {
1557 // handle_auth_signature() logic
1558 auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
1559 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
1560
1561 const auto actual_tx_sig = auth_meta->session_key.empty() ?
1562 sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
1563 if (sig_frame.signature() != actual_tx_sig) {
1564 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1565 " sig_frame.signature()={}",
1566 conn, actual_tx_sig, sig_frame.signature());
1567 abort_in_fault();
1568 }
1569 txbuf.clear();
1570 });
1571 }
1572
1573 // ESTABLISHING
1574
1575 void ProtocolV2::execute_establishing(
1576 SocketConnectionRef existing_conn, bool dispatch_reset) {
1577 if (unlikely(state != state_t::ACCEPTING)) {
1578 logger().debug("{} triggered {} before execute_establishing()",
1579 conn, get_state_name(state));
1580 abort_protocol();
1581 }
1582
1583 auto accept_me = [this] {
1584 messenger.register_conn(
1585 seastar::static_pointer_cast<SocketConnection>(
1586 conn.shared_from_this()));
1587 messenger.unaccept_conn(
1588 seastar::static_pointer_cast<SocketConnection>(
1589 conn.shared_from_this()));
1590 };
1591
1592 trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
1593 if (existing_conn) {
1594 existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
1595 if (unlikely(state != state_t::ESTABLISHING)) {
1596 logger().warn("{} triggered {} during execute_establishing(), "
1597 "the accept event will not be delivered!",
1598 conn, get_state_name(state));
1599 abort_protocol();
1600 }
1601 } else {
1602 accept_me();
1603 }
1604
1605 dispatchers.ms_handle_accept(
1606 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1607
1608 gated_execute("execute_establishing", [this] {
1609 return seastar::futurize_invoke([this] {
1610 return send_server_ident();
1611 }).then([this] {
1612 if (unlikely(state != state_t::ESTABLISHING)) {
1613 logger().debug("{} triggered {} at the end of execute_establishing()",
1614 conn, get_state_name(state));
1615 abort_protocol();
1616 }
1617 logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
1618 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
1619 conn, global_seq, peer_global_seq, connect_seq,
1620 client_cookie, server_cookie, conn.in_seq,
1621 conn.out_seq, conn.out_q.size());
1622 execute_ready(false);
1623 }).handle_exception([this] (std::exception_ptr eptr) {
1624 if (state != state_t::ESTABLISHING) {
1625 logger().info("{} execute_establishing() protocol aborted at {} -- {}",
1626 conn, get_state_name(state), eptr);
1627 assert(state == state_t::CLOSING ||
1628 state == state_t::REPLACING);
1629 return;
1630 }
1631 fault(false, "execute_establishing()", eptr);
1632 });
1633 });
1634 }
1635
1636 // ESTABLISHING or REPLACING state
1637
1638 seastar::future<>
1639 ProtocolV2::send_server_ident()
1640 {
1641 // send_server_ident() logic
1642
1643 // refered to async-conn v2: not assign gs to global_seq
1644 return messenger.get_global_seq().then([this] (auto gs) {
1645 logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
1646
1647 // this is required for the case when this connection is being replaced
1648 requeue_up_to(0);
1649 conn.in_seq = 0;
1650
1651 if (!conn.policy.lossy) {
1652 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1653 }
1654
1655 uint64_t flags = 0;
1656 if (conn.policy.lossy) {
1657 flags = flags | CEPH_MSG_CONNECT_LOSSY;
1658 }
1659
1660 auto server_ident = ServerIdentFrame::Encode(
1661 messenger.get_myaddrs(),
1662 messenger.get_myname().num(),
1663 gs,
1664 conn.policy.features_supported,
1665 conn.policy.features_required | msgr2_required,
1666 flags,
1667 server_cookie);
1668
1669 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1670 " gs={}, features_supported={}, features_required={},"
1671 " flags={}, cookie={}",
1672 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
1673 gs, conn.policy.features_supported,
1674 conn.policy.features_required | msgr2_required,
1675 flags, server_cookie);
1676
1677 conn.set_features(connection_features);
1678
1679 return write_frame(server_ident);
1680 });
1681 }
1682
1683 // REPLACING state
1684
1685 void ProtocolV2::trigger_replacing(bool reconnect,
1686 bool do_reset,
1687 SocketRef&& new_socket,
1688 AuthConnectionMetaRef&& new_auth_meta,
1689 ceph::crypto::onwire::rxtx_t new_rxtx,
1690 uint64_t new_peer_global_seq,
1691 uint64_t new_client_cookie,
1692 entity_name_t new_peer_name,
1693 uint64_t new_conn_features,
1694 bool tx_is_rev1,
1695 bool rx_is_rev1,
1696 uint64_t new_connect_seq,
1697 uint64_t new_msg_seq)
1698 {
1699 trigger_state(state_t::REPLACING, write_state_t::delay, false);
1700 if (socket) {
1701 socket->shutdown();
1702 }
1703 dispatchers.ms_handle_accept(
1704 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1705 gate.dispatch_in_background("trigger_replacing", *this,
1706 [this,
1707 reconnect,
1708 do_reset,
1709 new_socket = std::move(new_socket),
1710 new_auth_meta = std::move(new_auth_meta),
1711 new_rxtx = std::move(new_rxtx),
1712 tx_is_rev1, rx_is_rev1,
1713 new_client_cookie, new_peer_name,
1714 new_conn_features, new_peer_global_seq,
1715 new_connect_seq, new_msg_seq] () mutable {
1716 return wait_write_exit().then([this, do_reset] {
1717 if (do_reset) {
1718 reset_session(true);
1719 }
1720 protocol_timer.cancel();
1721 return execution_done.get_future();
1722 }).then([this,
1723 reconnect,
1724 new_socket = std::move(new_socket),
1725 new_auth_meta = std::move(new_auth_meta),
1726 new_rxtx = std::move(new_rxtx),
1727 tx_is_rev1, rx_is_rev1,
1728 new_client_cookie, new_peer_name,
1729 new_conn_features, new_peer_global_seq,
1730 new_connect_seq, new_msg_seq] () mutable {
1731 if (unlikely(state != state_t::REPLACING)) {
1732 return new_socket->close().then([sock = std::move(new_socket)] {
1733 abort_protocol();
1734 });
1735 }
1736
1737 if (socket) {
1738 gate.dispatch_in_background("close_socket_replacing", *this,
1739 [sock = std::move(socket)] () mutable {
1740 return sock->close().then([sock = std::move(sock)] {});
1741 });
1742 }
1743 socket = std::move(new_socket);
1744 auth_meta = std::move(new_auth_meta);
1745 session_stream_handlers = std::move(new_rxtx);
1746 record_io = false;
1747 peer_global_seq = new_peer_global_seq;
1748
1749 if (reconnect) {
1750 connect_seq = new_connect_seq;
1751 // send_reconnect_ok() logic
1752 requeue_up_to(new_msg_seq);
1753 auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
1754 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
1755 return write_frame(reconnect_ok);
1756 } else {
1757 client_cookie = new_client_cookie;
1758 assert(conn.get_peer_type() == new_peer_name.type());
1759 if (conn.get_peer_id() == entity_name_t::NEW) {
1760 conn.set_peer_id(new_peer_name.num());
1761 }
1762 connection_features = new_conn_features;
1763 tx_frame_asm.set_is_rev1(tx_is_rev1);
1764 rx_frame_asm.set_is_rev1(rx_is_rev1);
1765 return send_server_ident();
1766 }
1767 }).then([this, reconnect] {
1768 if (unlikely(state != state_t::REPLACING)) {
1769 logger().debug("{} triggered {} at the end of trigger_replacing()",
1770 conn, get_state_name(state));
1771 abort_protocol();
1772 }
1773 logger().info("{} replaced ({}):"
1774 " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
1775 " in_seq={}, out_seq={}, out_q={}",
1776 conn, reconnect ? "reconnected" : "connected",
1777 global_seq, peer_global_seq, connect_seq, client_cookie,
1778 server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
1779 execute_ready(false);
1780 }).handle_exception([this] (std::exception_ptr eptr) {
1781 if (state != state_t::REPLACING) {
1782 logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
1783 conn, get_state_name(state), eptr);
1784 assert(state == state_t::CLOSING);
1785 return;
1786 }
1787 fault(true, "trigger_replacing()", eptr);
1788 });
1789 });
1790 }
1791
1792 // READY state
1793
1794 ceph::bufferlist ProtocolV2::do_sweep_messages(
1795 const std::deque<MessageRef>& msgs,
1796 size_t num_msgs,
1797 bool require_keepalive,
1798 std::optional<utime_t> _keepalive_ack,
1799 bool require_ack)
1800 {
1801 ceph::bufferlist bl;
1802
1803 if (unlikely(require_keepalive)) {
1804 auto keepalive_frame = KeepAliveFrame::Encode();
1805 bl.append(keepalive_frame.get_buffer(tx_frame_asm));
1806 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
1807 }
1808
1809 if (unlikely(_keepalive_ack.has_value())) {
1810 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
1811 bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
1812 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
1813 }
1814
1815 if (require_ack && !num_msgs) {
1816 auto ack_frame = AckFrame::Encode(conn.in_seq);
1817 bl.append(ack_frame.get_buffer(tx_frame_asm));
1818 INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
1819 }
1820
1821 std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
1822 // TODO: move to common code
1823 // set priority
1824 msg->get_header().src = messenger.get_myname();
1825
1826 msg->encode(conn.features, 0);
1827
1828 ceph_assert(!msg->get_seq() && "message already has seq");
1829 msg->set_seq(++conn.out_seq);
1830
1831 ceph_msg_header &header = msg->get_header();
1832 ceph_msg_footer &footer = msg->get_footer();
1833
1834 ceph_msg_header2 header2{header.seq, header.tid,
1835 header.type, header.priority,
1836 header.version,
1837 init_le32(0), header.data_off,
1838 init_le64(conn.in_seq),
1839 footer.flags, header.compat_version,
1840 header.reserved};
1841
1842 auto message = MessageFrame::Encode(header2,
1843 msg->get_payload(), msg->get_middle(), msg->get_data());
1844 logger().debug("{} --> #{} === {} ({})",
1845 conn, msg->get_seq(), *msg, msg->get_type());
1846 bl.append(message.get_buffer(tx_frame_asm));
1847 INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
1848 });
1849
1850 return bl;
1851 }
1852
1853 seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
1854 {
1855 return read_frame_payload()
1856 .then([this, throttle_stamp] {
1857 utime_t recv_stamp{seastar::lowres_system_clock::now()};
1858
1859 // we need to get the size before std::moving segments data
1860 const size_t cur_msg_size = get_current_msg_size();
1861 auto msg_frame = MessageFrame::Decode(rx_segments_data);
1862 // XXX: paranoid copy just to avoid oops
1863 ceph_msg_header2 current_header = msg_frame.header();
1864
1865 logger().trace("{} got {} + {} + {} byte message,"
1866 " envelope type={} src={} off={} seq={}",
1867 conn, msg_frame.front_len(), msg_frame.middle_len(),
1868 msg_frame.data_len(), current_header.type, conn.get_peer_name(),
1869 current_header.data_off, current_header.seq);
1870
1871 ceph_msg_header header{current_header.seq,
1872 current_header.tid,
1873 current_header.type,
1874 current_header.priority,
1875 current_header.version,
1876 init_le32(msg_frame.front_len()),
1877 init_le32(msg_frame.middle_len()),
1878 init_le32(msg_frame.data_len()),
1879 current_header.data_off,
1880 conn.get_peer_name(),
1881 current_header.compat_version,
1882 current_header.reserved,
1883 init_le32(0)};
1884 ceph_msg_footer footer{init_le32(0), init_le32(0),
1885 init_le32(0), init_le64(0), current_header.flags};
1886
1887 auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
1888 conn.shared_from_this());
1889 Message *message = decode_message(nullptr, 0, header, footer,
1890 msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
1891 if (!message) {
1892 logger().warn("{} decode message failed", conn);
1893 abort_in_fault();
1894 }
1895
1896 // store reservation size in message, so we don't get confused
1897 // by messages entering the dispatch queue through other paths.
1898 message->set_dispatch_throttle_size(cur_msg_size);
1899
1900 message->set_throttle_stamp(throttle_stamp);
1901 message->set_recv_stamp(recv_stamp);
1902 message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
1903
1904 // check received seq#. if it is old, drop the message.
1905 // note that incoming messages may skip ahead. this is convenient for the
1906 // client side queueing because messages can't be renumbered, but the (kernel)
1907 // client will occasionally pull a message out of the sent queue to send
1908 // elsewhere. in that case it doesn't matter if we "got" it or not.
1909 uint64_t cur_seq = conn.in_seq;
1910 if (message->get_seq() <= cur_seq) {
1911 logger().error("{} got old message {} <= {} {}, discarding",
1912 conn, message->get_seq(), cur_seq, *message);
1913 if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
1914 local_conf()->ms_die_on_old_message) {
1915 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1916 }
1917 return seastar::now();
1918 } else if (message->get_seq() > cur_seq + 1) {
1919 logger().error("{} missed message? skipped from seq {} to {}",
1920 conn, cur_seq, message->get_seq());
1921 if (local_conf()->ms_die_on_skipped_message) {
1922 ceph_assert(0 == "skipped incoming seq");
1923 }
1924 }
1925
1926 // note last received message.
1927 conn.in_seq = message->get_seq();
1928 logger().debug("{} <== #{} === {} ({})",
1929 conn, message->get_seq(), *message, message->get_type());
1930 notify_ack();
1931 ack_writes(current_header.ack_seq);
1932
1933 // TODO: change MessageRef with seastar::shared_ptr
1934 auto msg_ref = MessageRef{message, false};
1935 // throttle the reading process by the returned future
1936 return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
1937 });
1938 }
1939
1940 void ProtocolV2::execute_ready(bool dispatch_connect)
1941 {
1942 assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
1943 trigger_state(state_t::READY, write_state_t::open, false);
1944 if (dispatch_connect) {
1945 dispatchers.ms_handle_connect(
1946 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
1947 }
1948 #ifdef UNIT_TESTS_BUILT
1949 if (conn.interceptor) {
1950 conn.interceptor->register_conn_ready(conn);
1951 }
1952 #endif
1953 gated_execute("execute_ready", [this] {
1954 protocol_timer.cancel();
1955 return seastar::keep_doing([this] {
1956 return read_main_preamble()
1957 .then([this] (Tag tag) {
1958 switch (tag) {
1959 case Tag::MESSAGE: {
1960 return seastar::futurize_invoke([this] {
1961 // throttle_message() logic
1962 if (!conn.policy.throttler_messages) {
1963 return seastar::now();
1964 }
1965 // TODO: message throttler
1966 ceph_assert(false);
1967 return seastar::now();
1968 }).then([this] {
1969 // throttle_bytes() logic
1970 if (!conn.policy.throttler_bytes) {
1971 return seastar::now();
1972 }
1973 size_t cur_msg_size = get_current_msg_size();
1974 if (!cur_msg_size) {
1975 return seastar::now();
1976 }
1977 logger().trace("{} wants {} bytes from policy throttler {}/{}",
1978 conn, cur_msg_size,
1979 conn.policy.throttler_bytes->get_current(),
1980 conn.policy.throttler_bytes->get_max());
1981 return conn.policy.throttler_bytes->get(cur_msg_size);
1982 }).then([this] {
1983 // TODO: throttle_dispatch_queue() logic
1984 utime_t throttle_stamp{seastar::lowres_system_clock::now()};
1985 return read_message(throttle_stamp);
1986 });
1987 }
1988 case Tag::ACK:
1989 return read_frame_payload().then([this] {
1990 // handle_message_ack() logic
1991 auto ack = AckFrame::Decode(rx_segments_data.back());
1992 logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
1993 ack_writes(ack.seq());
1994 });
1995 case Tag::KEEPALIVE2:
1996 return read_frame_payload().then([this] {
1997 // handle_keepalive2() logic
1998 auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
1999 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
2000 conn, keepalive_frame.timestamp());
2001 notify_keepalive_ack(keepalive_frame.timestamp());
2002 conn.set_last_keepalive(seastar::lowres_system_clock::now());
2003 });
2004 case Tag::KEEPALIVE2_ACK:
2005 return read_frame_payload().then([this] {
2006 // handle_keepalive2_ack() logic
2007 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
2008 conn.set_last_keepalive_ack(
2009 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
2010 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
2011 conn, conn.last_keepalive_ack);
2012 });
2013 default: {
2014 unexpected_tag(tag, conn, "execute_ready");
2015 return seastar::now();
2016 }
2017 }
2018 });
2019 }).handle_exception([this] (std::exception_ptr eptr) {
2020 if (state != state_t::READY) {
2021 logger().info("{} execute_ready(): protocol aborted at {} -- {}",
2022 conn, get_state_name(state), eptr);
2023 assert(state == state_t::REPLACING ||
2024 state == state_t::CLOSING);
2025 return;
2026 }
2027 fault(false, "execute_ready()", eptr);
2028 });
2029 });
2030 }
2031
2032 // STANDBY state
2033
2034 void ProtocolV2::execute_standby()
2035 {
2036 trigger_state(state_t::STANDBY, write_state_t::delay, true);
2037 if (socket) {
2038 socket->shutdown();
2039 }
2040 }
2041
2042 void ProtocolV2::notify_write()
2043 {
2044 if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
2045 logger().info("{} notify_write(): at {}, going to CONNECTING",
2046 conn, get_state_name(state));
2047 execute_connecting();
2048 }
2049 }
2050
2051 // WAIT state
2052
2053 void ProtocolV2::execute_wait(bool max_backoff)
2054 {
2055 trigger_state(state_t::WAIT, write_state_t::delay, true);
2056 if (socket) {
2057 socket->shutdown();
2058 }
2059 gated_execute("execute_wait", [this, max_backoff] {
2060 double backoff = protocol_timer.last_dur();
2061 if (max_backoff) {
2062 backoff = local_conf().get_val<double>("ms_max_backoff");
2063 } else if (backoff > 0) {
2064 backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
2065 } else {
2066 backoff = local_conf().get_val<double>("ms_initial_backoff");
2067 }
2068 return protocol_timer.backoff(backoff).then([this] {
2069 if (unlikely(state != state_t::WAIT)) {
2070 logger().debug("{} triggered {} at the end of execute_wait()",
2071 conn, get_state_name(state));
2072 abort_protocol();
2073 }
2074 logger().info("{} execute_wait(): going to CONNECTING", conn);
2075 execute_connecting();
2076 }).handle_exception([this] (std::exception_ptr eptr) {
2077 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
2078 conn, get_state_name(state), eptr);
2079 assert(state == state_t::REPLACING ||
2080 state == state_t::CLOSING);
2081 });
2082 });
2083 }
2084
2085 // SERVER_WAIT state
2086
2087 void ProtocolV2::execute_server_wait()
2088 {
2089 trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
2090 gated_execute("execute_server_wait", [this] {
2091 return read_exactly(1).then([this] (auto bl) {
2092 logger().warn("{} SERVER_WAIT got read, abort", conn);
2093 abort_in_fault();
2094 }).handle_exception([this] (std::exception_ptr eptr) {
2095 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
2096 conn, get_state_name(state), eptr);
2097 close(false);
2098 });
2099 });
2100 }
2101
2102 // CLOSING state
2103
2104 void ProtocolV2::trigger_close()
2105 {
2106 messenger.closing_conn(
2107 seastar::static_pointer_cast<SocketConnection>(
2108 conn.shared_from_this()));
2109
2110 if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
2111 messenger.unaccept_conn(
2112 seastar::static_pointer_cast<SocketConnection>(
2113 conn.shared_from_this()));
2114 } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
2115 messenger.unregister_conn(
2116 seastar::static_pointer_cast<SocketConnection>(
2117 conn.shared_from_this()));
2118 } else {
2119 // cannot happen
2120 ceph_assert(false);
2121 }
2122
2123 protocol_timer.cancel();
2124 trigger_state(state_t::CLOSING, write_state_t::drop, false);
2125 }
2126
2127 void ProtocolV2::on_closed()
2128 {
2129 messenger.closed_conn(
2130 seastar::static_pointer_cast<SocketConnection>(
2131 conn.shared_from_this()));
2132 }
2133
2134 void ProtocolV2::print(std::ostream& out) const
2135 {
2136 out << conn;
2137 }
2138
2139 } // namespace crimson::net