]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "ProtocolV2.h" | |
5 | ||
9f95a23c | 6 | #include <fmt/format.h> |
1e59de90 | 7 | #include <fmt/ranges.h> |
9f95a23c TL |
8 | #include "include/msgr.h" |
9 | #include "include/random.h" | |
1e59de90 | 10 | #include "msg/msg_fmt.h" |
9f95a23c TL |
11 | |
12 | #include "crimson/auth/AuthClient.h" | |
13 | #include "crimson/auth/AuthServer.h" | |
f67539c2 | 14 | #include "crimson/common/formatter.h" |
1e59de90 | 15 | #include "crimson/common/log.h" |
9f95a23c | 16 | |
9f95a23c | 17 | #include "Errors.h" |
9f95a23c TL |
18 | #include "SocketMessenger.h" |
19 | ||
20 | #ifdef UNIT_TESTS_BUILT | |
21 | #include "Interceptor.h" | |
22 | #endif | |
23 | ||
24 | using namespace ceph::msgr::v2; | |
f67539c2 | 25 | using crimson::common::local_conf; |
1e59de90 TL |
26 | using io_state_t = crimson::net::IOHandler::io_state_t; |
27 | using io_stat_printer = crimson::net::IOHandler::io_stat_printer; | |
9f95a23c TL |
28 | |
29 | namespace { | |
30 | ||
20effc67 TL |
31 | // TODO: CEPH_MSGR2_FEATURE_COMPRESSION |
32 | const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES = | |
33 | (CEPH_MSGR2_FEATURE_REVISION_1 | | |
34 | // CEPH_MSGR2_FEATURE_COMPRESSION | | |
35 | UINT64_C(0)); | |
36 | ||
9f95a23c TL |
37 | // Log levels in V2 Protocol: |
38 | // * error level, something error that cause connection to terminate: | |
39 | // - fatal errors; | |
40 | // - bugs; | |
41 | // * warn level: something unusual that identifies connection fault or replacement: | |
42 | // - unstable network; | |
43 | // - incompatible peer; | |
44 | // - auth failure; | |
45 | // - connection race; | |
46 | // - connection reset; | |
47 | // * info level, something very important to show connection lifecycle, | |
48 | // which doesn't happen very frequently; | |
49 | // * debug level, important logs for debugging, including: | |
50 | // - all the messages sent/received (-->/<==); | |
51 | // - all the frames exchanged (WRITE/GOT); | |
52 | // - important fields updated (UPDATE); | |
53 | // - connection state transitions (TRIGGER); | |
54 | // * trace level, trivial logs showing: | |
55 | // - the exact bytes being sent/received (SEND/RECV(bytes)); | |
56 | // - detailed information of sub-frames; | |
57 | // - integrity checks; | |
58 | // - etc. | |
59 | seastar::logger& logger() { | |
60 | return crimson::get_logger(ceph_subsys_ms); | |
61 | } | |
62 | ||
f67539c2 | 63 | [[noreturn]] void abort_in_fault() { |
9f95a23c TL |
64 | throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); |
65 | } | |
66 | ||
f67539c2 | 67 | [[noreturn]] void abort_protocol() { |
9f95a23c TL |
68 | throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); |
69 | } | |
70 | ||
1e59de90 TL |
71 | #define ABORT_IN_CLOSE(is_dispatch_reset) { \ |
72 | do_close(is_dispatch_reset); \ | |
73 | abort_protocol(); \ | |
9f95a23c TL |
74 | } |
75 | ||
76 | inline void expect_tag(const Tag& expected, | |
77 | const Tag& actual, | |
78 | crimson::net::SocketConnection& conn, | |
79 | const char *where) { | |
80 | if (actual != expected) { | |
81 | logger().warn("{} {} received wrong tag: {}, expected {}", | |
82 | conn, where, | |
83 | static_cast<uint32_t>(actual), | |
84 | static_cast<uint32_t>(expected)); | |
85 | abort_in_fault(); | |
86 | } | |
87 | } | |
88 | ||
89 | inline void unexpected_tag(const Tag& unexpected, | |
90 | crimson::net::SocketConnection& conn, | |
91 | const char *where) { | |
92 | logger().warn("{} {} received unexpected tag: {}", | |
93 | conn, where, static_cast<uint32_t>(unexpected)); | |
94 | abort_in_fault(); | |
95 | } | |
96 | ||
97 | inline uint64_t generate_client_cookie() { | |
98 | return ceph::util::generate_random_number<uint64_t>( | |
99 | 1, std::numeric_limits<uint64_t>::max()); | |
100 | } | |
101 | ||
102 | } // namespace anonymous | |
103 | ||
9f95a23c TL |
104 | namespace crimson::net { |
105 | ||
106 | #ifdef UNIT_TESTS_BUILT | |
1e59de90 TL |
107 | // should be consistent to intercept_frame() in FrameAssemblerV2.cc |
108 | void intercept(Breakpoint bp, | |
109 | bp_type_t type, | |
110 | SocketConnection& conn, | |
111 | Interceptor *interceptor, | |
112 | SocketRef& socket) { | |
113 | if (interceptor) { | |
114 | auto action = interceptor->intercept(conn, Breakpoint(bp)); | |
115 | socket->set_trap(type, action, &interceptor->blocker); | |
9f95a23c TL |
116 | } |
117 | } | |
118 | ||
119 | #define INTERCEPT_CUSTOM(bp, type) \ | |
1e59de90 TL |
120 | intercept({bp}, type, conn, \ |
121 | conn.interceptor, conn.socket) | |
9f95a23c TL |
122 | #else |
123 | #define INTERCEPT_CUSTOM(bp, type) | |
9f95a23c TL |
124 | #endif |
125 | ||
126 | seastar::future<> ProtocolV2::Timer::backoff(double seconds) | |
127 | { | |
128 | logger().warn("{} waiting {} seconds ...", conn, seconds); | |
129 | cancel(); | |
130 | last_dur_ = seconds; | |
131 | as = seastar::abort_source(); | |
132 | auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>( | |
133 | std::chrono::duration<double>(seconds)); | |
134 | return seastar::sleep_abortable(dur, *as | |
135 | ).handle_exception_type([this] (const seastar::sleep_aborted& e) { | |
136 | logger().debug("{} wait aborted", conn); | |
137 | abort_protocol(); | |
138 | }); | |
139 | } | |
140 | ||
1e59de90 TL |
141 | ProtocolV2::ProtocolV2(SocketConnection& conn, |
142 | IOHandler &io_handler) | |
143 | : conn{conn}, | |
144 | messenger{conn.messenger}, | |
145 | io_handler{io_handler}, | |
146 | frame_assembler{FrameAssemblerV2::create(conn)}, | |
147 | auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}, | |
f67539c2 | 148 | protocol_timer{conn} |
9f95a23c TL |
149 | {} |
150 | ||
151 | ProtocolV2::~ProtocolV2() {} | |
152 | ||
153 | void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, | |
f67539c2 | 154 | const entity_name_t& _peer_name) |
9f95a23c TL |
155 | { |
156 | ceph_assert(state == state_t::NONE); | |
f67539c2 | 157 | ceph_assert(!gate.is_closed()); |
9f95a23c TL |
158 | conn.peer_addr = _peer_addr; |
159 | conn.target_addr = _peer_addr; | |
f67539c2 TL |
160 | conn.set_peer_name(_peer_name); |
161 | conn.policy = messenger.get_policy(_peer_name.type()); | |
9f95a23c | 162 | client_cookie = generate_client_cookie(); |
f67539c2 | 163 | logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}" |
9f95a23c | 164 | " policy(lossy={}, server={}, standby={}, resetcheck={})", |
f67539c2 | 165 | conn, _peer_addr, _peer_name, client_cookie, |
9f95a23c TL |
166 | conn.policy.lossy, conn.policy.server, |
167 | conn.policy.standby, conn.policy.resetcheck); | |
168 | messenger.register_conn( | |
169 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); | |
170 | execute_connecting(); | |
171 | } | |
172 | ||
1e59de90 | 173 | void ProtocolV2::start_accept(SocketRef&& new_socket, |
9f95a23c TL |
174 | const entity_addr_t& _peer_addr) |
175 | { | |
176 | ceph_assert(state == state_t::NONE); | |
9f95a23c TL |
177 | // until we know better |
178 | conn.target_addr = _peer_addr; | |
1e59de90 TL |
179 | frame_assembler->set_socket(std::move(new_socket)); |
180 | has_socket = true; | |
181 | is_socket_valid = true; | |
9f95a23c TL |
182 | logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); |
183 | messenger.accept_conn( | |
184 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); | |
185 | execute_accepting(); | |
186 | } | |
187 | ||
1e59de90 | 188 | void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant) |
9f95a23c | 189 | { |
1e59de90 TL |
190 | if (!reentrant && new_state == state) { |
191 | logger().error("{} is not allowed to re-trigger state {}", | |
192 | conn, get_state_name(state)); | |
193 | ceph_abort(); | |
194 | } | |
195 | if (state == state_t::CLOSING) { | |
196 | logger().error("{} CLOSING is not allowed to trigger state {}", | |
197 | conn, get_state_name(new_state)); | |
198 | ceph_abort(); | |
199 | } | |
200 | logger().debug("{} TRIGGER {}, was {}", | |
201 | conn, get_state_name(new_state), get_state_name(state)); | |
202 | auto pre_state = state; | |
203 | if (pre_state == state_t::READY) { | |
204 | assert(!gate.is_closed()); | |
205 | ceph_assert_always(!exit_io.has_value()); | |
206 | exit_io = seastar::shared_promise<>(); | |
207 | } | |
208 | state = new_state; | |
209 | if (new_state == state_t::READY) { | |
210 | // I'm not responsible to shutdown the socket at READY | |
211 | is_socket_valid = false; | |
212 | io_handler.set_io_state(new_io_state, std::move(frame_assembler)); | |
9f95a23c | 213 | } else { |
1e59de90 TL |
214 | io_handler.set_io_state(new_io_state, nullptr); |
215 | } | |
9f95a23c | 216 | |
1e59de90 TL |
217 | /* |
218 | * not atomic below | |
219 | */ | |
220 | ||
221 | if (pre_state == state_t::READY) { | |
222 | gate.dispatch_in_background("exit_io", conn, [this] { | |
223 | return io_handler.wait_io_exit_dispatching( | |
224 | ).then([this](FrameAssemblerV2Ref fa) { | |
225 | frame_assembler = std::move(fa); | |
226 | exit_io->set_value(); | |
227 | exit_io = std::nullopt; | |
228 | }); | |
9f95a23c | 229 | }); |
9f95a23c TL |
230 | } |
231 | } | |
232 | ||
1e59de90 TL |
233 | void ProtocolV2::fault( |
234 | state_t expected_state, | |
235 | const char *where, | |
236 | std::exception_ptr eptr) | |
9f95a23c | 237 | { |
1e59de90 TL |
238 | assert(expected_state == state_t::CONNECTING || |
239 | expected_state == state_t::ESTABLISHING || | |
240 | expected_state == state_t::REPLACING || | |
241 | expected_state == state_t::READY); | |
242 | const char *e_what; | |
243 | try { | |
244 | std::rethrow_exception(eptr); | |
245 | } catch (std::exception &e) { | |
246 | e_what = e.what(); | |
9f95a23c | 247 | } |
9f95a23c | 248 | |
1e59de90 TL |
249 | if (state != expected_state) { |
250 | logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}", | |
251 | conn, | |
252 | get_state_name(expected_state), | |
253 | where, | |
254 | get_state_name(state), | |
255 | e_what); | |
256 | #ifndef NDEBUG | |
257 | if (expected_state == state_t::REPLACING) { | |
258 | assert(state == state_t::CLOSING); | |
259 | } else if (expected_state == state_t::READY) { | |
260 | assert(state == state_t::CLOSING || | |
261 | state == state_t::REPLACING || | |
262 | state == state_t::CONNECTING || | |
263 | state == state_t::STANDBY); | |
264 | } else { | |
265 | assert(state == state_t::CLOSING || | |
266 | state == state_t::REPLACING); | |
267 | } | |
268 | #endif | |
269 | return; | |
9f95a23c | 270 | } |
1e59de90 TL |
271 | assert(state == expected_state); |
272 | ||
273 | if (state != state_t::CONNECTING && conn.policy.lossy) { | |
274 | // socket will be shutdown in do_close() | |
275 | logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}", | |
276 | conn, get_state_name(state), where, e_what); | |
277 | do_close(true); | |
278 | return; | |
9f95a23c | 279 | } |
9f95a23c | 280 | |
1e59de90 TL |
281 | if (likely(has_socket)) { |
282 | if (likely(is_socket_valid)) { | |
283 | ceph_assert_always(state != state_t::READY); | |
284 | frame_assembler->shutdown_socket(); | |
285 | is_socket_valid = false; | |
286 | } else { | |
287 | ceph_assert_always(state != state_t::ESTABLISHING); | |
9f95a23c | 288 | } |
1e59de90 TL |
289 | } else { // !has_socket |
290 | ceph_assert_always(state == state_t::CONNECTING); | |
291 | assert(!is_socket_valid); | |
9f95a23c | 292 | } |
9f95a23c | 293 | |
1e59de90 TL |
294 | if (conn.policy.server || |
295 | (conn.policy.standby && !io_handler.is_out_queued_or_sent())) { | |
296 | if (conn.policy.server) { | |
297 | logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}", | |
298 | conn, | |
299 | get_state_name(state), | |
300 | where, | |
301 | io_stat_printer{io_handler}, | |
302 | e_what); | |
303 | } else { | |
304 | logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}", | |
305 | conn, | |
306 | get_state_name(state), | |
307 | where, | |
308 | io_stat_printer{io_handler}, | |
309 | e_what); | |
310 | } | |
9f95a23c | 311 | execute_standby(); |
1e59de90 TL |
312 | } else if (state == state_t::CONNECTING || |
313 | state == state_t::REPLACING) { | |
314 | logger().info("{} protocol {} {} fault, going to WAIT {} -- {}", | |
315 | conn, | |
316 | get_state_name(state), | |
317 | where, | |
318 | io_stat_printer{io_handler}, | |
319 | e_what); | |
9f95a23c TL |
320 | execute_wait(false); |
321 | } else { | |
1e59de90 TL |
322 | assert(state == state_t::READY || |
323 | state == state_t::ESTABLISHING); | |
324 | logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}", | |
325 | conn, | |
326 | get_state_name(state), | |
327 | where, | |
328 | io_stat_printer{io_handler}, | |
329 | e_what); | |
9f95a23c TL |
330 | execute_connecting(); |
331 | } | |
332 | } | |
333 | ||
9f95a23c TL |
334 | void ProtocolV2::reset_session(bool full) |
335 | { | |
336 | server_cookie = 0; | |
337 | connect_seq = 0; | |
9f95a23c TL |
338 | if (full) { |
339 | client_cookie = generate_client_cookie(); | |
340 | peer_global_seq = 0; | |
9f95a23c | 341 | } |
1e59de90 | 342 | io_handler.reset_session(full); |
9f95a23c TL |
343 | } |
344 | ||
f67539c2 TL |
345 | seastar::future<std::tuple<entity_type_t, entity_addr_t>> |
346 | ProtocolV2::banner_exchange(bool is_connect) | |
9f95a23c TL |
347 | { |
348 | // 1. prepare and send banner | |
349 | bufferlist banner_payload; | |
20effc67 | 350 | encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0); |
9f95a23c TL |
351 | encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0); |
352 | ||
353 | bufferlist bl; | |
354 | bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX)); | |
355 | auto len_payload = static_cast<uint16_t>(banner_payload.length()); | |
356 | encode(len_payload, bl, 0); | |
357 | bl.claim_append(banner_payload); | |
358 | logger().debug("{} SEND({}) banner: len_payload={}, supported={}, " | |
359 | "required={}, banner=\"{}\"", | |
360 | conn, bl.length(), len_payload, | |
20effc67 TL |
361 | CRIMSON_MSGR2_SUPPORTED_FEATURES, |
362 | CEPH_MSGR2_REQUIRED_FEATURES, | |
9f95a23c TL |
363 | CEPH_BANNER_V2_PREFIX); |
364 | INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE); | |
1e59de90 | 365 | return frame_assembler->write_flush(std::move(bl)).then([this] { |
9f95a23c TL |
366 | // 2. read peer banner |
367 | unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); | |
368 | INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ); | |
1e59de90 | 369 | return frame_assembler->read_exactly(banner_len); // or read exactly? |
9f95a23c TL |
370 | }).then([this] (auto bl) { |
371 | // 3. process peer banner and read banner_payload | |
372 | unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); | |
373 | logger().debug("{} RECV({}) banner: \"{}\"", | |
374 | conn, bl.size(), | |
375 | std::string((const char*)bl.get(), banner_prefix_len)); | |
376 | ||
377 | if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { | |
378 | if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { | |
379 | logger().warn("{} peer is using V1 protocol", conn); | |
380 | } else { | |
381 | logger().warn("{} peer sent bad banner", conn); | |
382 | } | |
383 | abort_in_fault(); | |
384 | } | |
385 | bl.trim_front(banner_prefix_len); | |
386 | ||
387 | uint16_t payload_len; | |
388 | bufferlist buf; | |
389 | buf.append(buffer::create(std::move(bl))); | |
390 | auto ti = buf.cbegin(); | |
391 | try { | |
392 | decode(payload_len, ti); | |
393 | } catch (const buffer::error &e) { | |
394 | logger().warn("{} decode banner payload len failed", conn); | |
395 | abort_in_fault(); | |
396 | } | |
397 | logger().debug("{} GOT banner: payload_len={}", conn, payload_len); | |
398 | INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); | |
1e59de90 | 399 | return frame_assembler->read(payload_len); |
f67539c2 | 400 | }).then([this, is_connect] (bufferlist bl) { |
9f95a23c TL |
401 | // 4. process peer banner_payload and send HelloFrame |
402 | auto p = bl.cbegin(); | |
1e59de90 TL |
403 | uint64_t _peer_supported_features; |
404 | uint64_t _peer_required_features; | |
9f95a23c | 405 | try { |
1e59de90 TL |
406 | decode(_peer_supported_features, p); |
407 | decode(_peer_required_features, p); | |
9f95a23c TL |
408 | } catch (const buffer::error &e) { |
409 | logger().warn("{} decode banner payload failed", conn); | |
410 | abort_in_fault(); | |
411 | } | |
412 | logger().debug("{} RECV({}) banner features: supported={} required={}", | |
413 | conn, bl.length(), | |
1e59de90 | 414 | _peer_supported_features, _peer_required_features); |
9f95a23c TL |
415 | |
416 | // Check feature bit compatibility | |
20effc67 | 417 | uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES; |
9f95a23c | 418 | uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES; |
1e59de90 | 419 | if ((required_features & _peer_supported_features) != required_features) { |
9f95a23c TL |
420 | logger().error("{} peer does not support all required features" |
421 | " required={} peer_supported={}", | |
1e59de90 TL |
422 | conn, required_features, _peer_supported_features); |
423 | ABORT_IN_CLOSE(is_connect); | |
9f95a23c | 424 | } |
1e59de90 | 425 | if ((supported_features & _peer_required_features) != _peer_required_features) { |
9f95a23c TL |
426 | logger().error("{} we do not support all peer required features" |
427 | " peer_required={} supported={}", | |
1e59de90 TL |
428 | conn, _peer_required_features, supported_features); |
429 | ABORT_IN_CLOSE(is_connect); | |
9f95a23c | 430 | } |
1e59de90 TL |
431 | peer_supported_features = _peer_supported_features; |
432 | bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); | |
433 | frame_assembler->set_is_rev1(is_rev1); | |
9f95a23c TL |
434 | |
435 | auto hello = HelloFrame::Encode(messenger.get_mytype(), | |
436 | conn.target_addr); | |
437 | logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", | |
438 | conn, ceph_entity_type_name(messenger.get_mytype()), | |
439 | conn.target_addr); | |
1e59de90 | 440 | return frame_assembler->write_flush_frame(hello); |
9f95a23c TL |
441 | }).then([this] { |
442 | //5. read peer HelloFrame | |
1e59de90 TL |
443 | return frame_assembler->read_main_preamble(); |
444 | }).then([this](auto ret) { | |
445 | expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame"); | |
446 | return frame_assembler->read_frame_payload(); | |
447 | }).then([this](auto payload) { | |
9f95a23c | 448 | // 6. process peer HelloFrame |
1e59de90 | 449 | auto hello = HelloFrame::Decode(payload->back()); |
9f95a23c TL |
450 | logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}", |
451 | conn, ceph_entity_type_name(hello.entity_type()), | |
452 | hello.peer_addr()); | |
f67539c2 TL |
453 | return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>( |
454 | std::make_tuple(hello.entity_type(), hello.peer_addr())); | |
9f95a23c TL |
455 | }); |
456 | } | |
457 | ||
458 | // CONNECTING state | |
459 | ||
460 | seastar::future<> ProtocolV2::handle_auth_reply() | |
461 | { | |
1e59de90 TL |
462 | return frame_assembler->read_main_preamble( |
463 | ).then([this](auto ret) { | |
464 | switch (ret.tag) { | |
9f95a23c | 465 | case Tag::AUTH_BAD_METHOD: |
1e59de90 TL |
466 | return frame_assembler->read_frame_payload( |
467 | ).then([this](auto payload) { | |
9f95a23c | 468 | // handle_auth_bad_method() logic |
1e59de90 | 469 | auto bad_method = AuthBadMethodFrame::Decode(payload->back()); |
9f95a23c TL |
470 | logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, " |
471 | "allowed_methods={}, allowed_modes={}", | |
472 | conn, bad_method.method(), cpp_strerror(bad_method.result()), | |
473 | bad_method.allowed_methods(), bad_method.allowed_modes()); | |
474 | ceph_assert(messenger.get_auth_client()); | |
475 | int r = messenger.get_auth_client()->handle_auth_bad_method( | |
1e59de90 | 476 | conn, *auth_meta, |
9f95a23c TL |
477 | bad_method.method(), bad_method.result(), |
478 | bad_method.allowed_methods(), bad_method.allowed_modes()); | |
479 | if (r < 0) { | |
480 | logger().warn("{} auth_client handle_auth_bad_method returned {}", | |
481 | conn, r); | |
482 | abort_in_fault(); | |
483 | } | |
484 | return client_auth(bad_method.allowed_methods()); | |
485 | }); | |
486 | case Tag::AUTH_REPLY_MORE: | |
1e59de90 TL |
487 | return frame_assembler->read_frame_payload( |
488 | ).then([this](auto payload) { | |
9f95a23c | 489 | // handle_auth_reply_more() logic |
1e59de90 | 490 | auto auth_more = AuthReplyMoreFrame::Decode(payload->back()); |
9f95a23c TL |
491 | logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}", |
492 | conn, auth_more.auth_payload().length()); | |
493 | ceph_assert(messenger.get_auth_client()); | |
494 | // let execute_connecting() take care of the thrown exception | |
495 | auto reply = messenger.get_auth_client()->handle_auth_reply_more( | |
1e59de90 | 496 | conn, *auth_meta, auth_more.auth_payload()); |
9f95a23c TL |
497 | auto more_reply = AuthRequestMoreFrame::Encode(reply); |
498 | logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}", | |
499 | conn, reply.length()); | |
1e59de90 | 500 | return frame_assembler->write_flush_frame(more_reply); |
9f95a23c TL |
501 | }).then([this] { |
502 | return handle_auth_reply(); | |
503 | }); | |
504 | case Tag::AUTH_DONE: | |
1e59de90 TL |
505 | return frame_assembler->read_frame_payload( |
506 | ).then([this](auto payload) { | |
9f95a23c | 507 | // handle_auth_done() logic |
1e59de90 | 508 | auto auth_done = AuthDoneFrame::Decode(payload->back()); |
9f95a23c TL |
509 | logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}", |
510 | conn, auth_done.global_id(), | |
511 | ceph_con_mode_name(auth_done.con_mode()), | |
512 | auth_done.auth_payload().length()); | |
513 | ceph_assert(messenger.get_auth_client()); | |
514 | int r = messenger.get_auth_client()->handle_auth_done( | |
1e59de90 TL |
515 | conn, |
516 | *auth_meta, | |
9f95a23c TL |
517 | auth_done.global_id(), |
518 | auth_done.con_mode(), | |
519 | auth_done.auth_payload()); | |
520 | if (r < 0) { | |
521 | logger().warn("{} auth_client handle_auth_done returned {}", conn, r); | |
522 | abort_in_fault(); | |
523 | } | |
524 | auth_meta->con_mode = auth_done.con_mode(); | |
1e59de90 | 525 | frame_assembler->create_session_stream_handlers(*auth_meta, false); |
9f95a23c TL |
526 | return finish_auth(); |
527 | }); | |
528 | default: { | |
1e59de90 | 529 | unexpected_tag(ret.tag, conn, "handle_auth_reply"); |
9f95a23c TL |
530 | return seastar::now(); |
531 | } | |
532 | } | |
533 | }); | |
534 | } | |
535 | ||
536 | seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods) | |
537 | { | |
538 | // send_auth_request() logic | |
539 | ceph_assert(messenger.get_auth_client()); | |
540 | ||
541 | try { | |
542 | auto [auth_method, preferred_modes, bl] = | |
1e59de90 | 543 | messenger.get_auth_client()->get_auth_request(conn, *auth_meta); |
9f95a23c TL |
544 | auth_meta->auth_method = auth_method; |
545 | auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl); | |
546 | logger().debug("{} WRITE AuthRequestFrame: method={}," | |
547 | " preferred_modes={}, payload_len={}", | |
548 | conn, auth_method, preferred_modes, bl.length()); | |
1e59de90 TL |
549 | return frame_assembler->write_flush_frame(frame |
550 | ).then([this] { | |
9f95a23c TL |
551 | return handle_auth_reply(); |
552 | }); | |
553 | } catch (const crimson::auth::error& e) { | |
1e59de90 TL |
554 | logger().error("{} get_initial_auth_request returned {}", conn, e.what()); |
555 | ABORT_IN_CLOSE(true); | |
9f95a23c TL |
556 | return seastar::now(); |
557 | } | |
558 | } | |
559 | ||
560 | seastar::future<ProtocolV2::next_step_t> | |
561 | ProtocolV2::process_wait() | |
562 | { | |
1e59de90 TL |
563 | return frame_assembler->read_frame_payload( |
564 | ).then([this](auto payload) { | |
9f95a23c TL |
565 | // handle_wait() logic |
566 | logger().debug("{} GOT WaitFrame", conn); | |
1e59de90 | 567 | WaitFrame::Decode(payload->back()); |
9f95a23c TL |
568 | return next_step_t::wait; |
569 | }); | |
570 | } | |
571 | ||
572 | seastar::future<ProtocolV2::next_step_t> | |
573 | ProtocolV2::client_connect() | |
574 | { | |
575 | // send_client_ident() logic | |
576 | uint64_t flags = 0; | |
577 | if (conn.policy.lossy) { | |
578 | flags |= CEPH_MSG_CONNECT_LOSSY; | |
579 | } | |
580 | ||
581 | auto client_ident = ClientIdentFrame::Encode( | |
582 | messenger.get_myaddrs(), | |
583 | conn.target_addr, | |
584 | messenger.get_myname().num(), | |
585 | global_seq, | |
586 | conn.policy.features_supported, | |
587 | conn.policy.features_required | msgr2_required, flags, | |
588 | client_cookie); | |
589 | ||
590 | logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={}," | |
591 | " gs={}, features_supported={}, features_required={}," | |
592 | " flags={}, cookie={}", | |
593 | conn, messenger.get_myaddrs(), conn.target_addr, | |
594 | messenger.get_myname().num(), global_seq, | |
595 | conn.policy.features_supported, | |
596 | conn.policy.features_required | msgr2_required, | |
597 | flags, client_cookie); | |
1e59de90 TL |
598 | return frame_assembler->write_flush_frame(client_ident |
599 | ).then([this] { | |
600 | return frame_assembler->read_main_preamble(); | |
601 | }).then([this](auto ret) { | |
602 | switch (ret.tag) { | |
9f95a23c | 603 | case Tag::IDENT_MISSING_FEATURES: |
1e59de90 TL |
604 | return frame_assembler->read_frame_payload( |
605 | ).then([this](auto payload) { | |
9f95a23c | 606 | // handle_ident_missing_features() logic |
1e59de90 | 607 | auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back()); |
9f95a23c TL |
608 | logger().warn("{} GOT IdentMissingFeaturesFrame: features={}" |
609 | " (client does not support all server features)", | |
610 | conn, ident_missing.features()); | |
611 | abort_in_fault(); | |
612 | return next_step_t::none; | |
613 | }); | |
614 | case Tag::WAIT: | |
615 | return process_wait(); | |
616 | case Tag::SERVER_IDENT: | |
1e59de90 TL |
617 | return frame_assembler->read_frame_payload( |
618 | ).then([this](auto payload) { | |
9f95a23c | 619 | // handle_server_ident() logic |
1e59de90 TL |
620 | io_handler.requeue_out_sent(); |
621 | auto server_ident = ServerIdentFrame::Decode(payload->back()); | |
9f95a23c TL |
622 | logger().debug("{} GOT ServerIdentFrame:" |
623 | " addrs={}, gid={}, gs={}," | |
624 | " features_supported={}, features_required={}," | |
625 | " flags={}, cookie={}", | |
626 | conn, | |
627 | server_ident.addrs(), server_ident.gid(), | |
628 | server_ident.global_seq(), | |
629 | server_ident.supported_features(), | |
630 | server_ident.required_features(), | |
631 | server_ident.flags(), server_ident.cookie()); | |
632 | ||
633 | // is this who we intended to talk to? | |
634 | // be a bit forgiving here, since we may be connecting based on addresses parsed out | |
635 | // of mon_host or something. | |
636 | if (!server_ident.addrs().contains(conn.target_addr)) { | |
637 | logger().warn("{} peer identifies as {}, does not include {}", | |
638 | conn, server_ident.addrs(), conn.target_addr); | |
639 | throw std::system_error( | |
640 | make_error_code(crimson::net::error::bad_peer_address)); | |
641 | } | |
642 | ||
643 | server_cookie = server_ident.cookie(); | |
644 | ||
645 | // TODO: change peer_addr to entity_addrvec_t | |
646 | if (server_ident.addrs().front() != conn.peer_addr) { | |
647 | logger().warn("{} peer advertises as {}, does not match {}", | |
648 | conn, server_ident.addrs(), conn.peer_addr); | |
649 | throw std::system_error( | |
650 | make_error_code(crimson::net::error::bad_peer_address)); | |
651 | } | |
f67539c2 TL |
652 | if (conn.get_peer_id() != entity_name_t::NEW && |
653 | conn.get_peer_id() != server_ident.gid()) { | |
654 | logger().error("{} connection peer id ({}) does not match " | |
655 | "what it should be ({}) during connecting, close", | |
656 | conn, server_ident.gid(), conn.get_peer_id()); | |
1e59de90 | 657 | ABORT_IN_CLOSE(true); |
f67539c2 | 658 | } |
9f95a23c TL |
659 | conn.set_peer_id(server_ident.gid()); |
660 | conn.set_features(server_ident.supported_features() & | |
661 | conn.policy.features_supported); | |
1e59de90 | 662 | logger().debug("{} UPDATE: features={}", conn, conn.get_features()); |
9f95a23c TL |
663 | peer_global_seq = server_ident.global_seq(); |
664 | ||
665 | bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY; | |
666 | if (lossy != conn.policy.lossy) { | |
667 | logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy); | |
668 | conn.policy.lossy = lossy; | |
669 | } | |
670 | if (lossy && (connect_seq != 0 || server_cookie != 0)) { | |
671 | logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy", | |
672 | conn, connect_seq, server_cookie); | |
673 | connect_seq = 0; | |
674 | server_cookie = 0; | |
675 | } | |
676 | ||
677 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); | |
678 | }); | |
679 | default: { | |
1e59de90 | 680 | unexpected_tag(ret.tag, conn, "post_client_connect"); |
9f95a23c TL |
681 | return seastar::make_ready_future<next_step_t>(next_step_t::none); |
682 | } | |
683 | } | |
684 | }); | |
685 | } | |
686 | ||
687 | seastar::future<ProtocolV2::next_step_t> | |
688 | ProtocolV2::client_reconnect() | |
689 | { | |
690 | // send_reconnect() logic | |
691 | auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(), | |
692 | client_cookie, | |
693 | server_cookie, | |
694 | global_seq, | |
695 | connect_seq, | |
1e59de90 | 696 | io_handler.get_in_seq()); |
9f95a23c | 697 | logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={}," |
1e59de90 | 698 | " server_cookie={}, gs={}, cs={}, in_seq={}", |
9f95a23c TL |
699 | conn, messenger.get_myaddrs(), |
700 | client_cookie, server_cookie, | |
1e59de90 TL |
701 | global_seq, connect_seq, io_handler.get_in_seq()); |
702 | return frame_assembler->write_flush_frame(reconnect).then([this] { | |
703 | return frame_assembler->read_main_preamble(); | |
704 | }).then([this](auto ret) { | |
705 | switch (ret.tag) { | |
9f95a23c | 706 | case Tag::SESSION_RETRY_GLOBAL: |
1e59de90 TL |
707 | return frame_assembler->read_frame_payload( |
708 | ).then([this](auto payload) { | |
9f95a23c | 709 | // handle_session_retry_global() logic |
1e59de90 | 710 | auto retry = RetryGlobalFrame::Decode(payload->back()); |
9f95a23c TL |
711 | logger().warn("{} GOT RetryGlobalFrame: gs={}", |
712 | conn, retry.global_seq()); | |
1e59de90 TL |
713 | global_seq = messenger.get_global_seq(retry.global_seq()); |
714 | logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq); | |
715 | return client_reconnect(); | |
9f95a23c TL |
716 | }); |
717 | case Tag::SESSION_RETRY: | |
1e59de90 TL |
718 | return frame_assembler->read_frame_payload( |
719 | ).then([this](auto payload) { | |
9f95a23c | 720 | // handle_session_retry() logic |
1e59de90 | 721 | auto retry = RetryFrame::Decode(payload->back()); |
9f95a23c TL |
722 | logger().warn("{} GOT RetryFrame: cs={}", |
723 | conn, retry.connect_seq()); | |
724 | connect_seq = retry.connect_seq() + 1; | |
725 | logger().warn("{} UPDATE: cs={}", conn, connect_seq); | |
726 | return client_reconnect(); | |
727 | }); | |
728 | case Tag::SESSION_RESET: | |
1e59de90 TL |
729 | return frame_assembler->read_frame_payload( |
730 | ).then([this](auto payload) { | |
731 | if (unlikely(state != state_t::CONNECTING)) { | |
732 | logger().debug("{} triggered {} before reset_session()", | |
733 | conn, get_state_name(state)); | |
734 | abort_protocol(); | |
735 | } | |
9f95a23c | 736 | // handle_session_reset() logic |
1e59de90 | 737 | auto reset = ResetFrame::Decode(payload->back()); |
9f95a23c TL |
738 | logger().warn("{} GOT ResetFrame: full={}", conn, reset.full()); |
739 | reset_session(reset.full()); | |
740 | return client_connect(); | |
741 | }); | |
742 | case Tag::WAIT: | |
743 | return process_wait(); | |
744 | case Tag::SESSION_RECONNECT_OK: | |
1e59de90 TL |
745 | return frame_assembler->read_frame_payload( |
746 | ).then([this](auto payload) { | |
9f95a23c | 747 | // handle_reconnect_ok() logic |
1e59de90 | 748 | auto reconnect_ok = ReconnectOkFrame::Decode(payload->back()); |
9f95a23c TL |
749 | logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", |
750 | conn, reconnect_ok.msg_seq()); | |
1e59de90 | 751 | io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq()); |
9f95a23c TL |
752 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); |
753 | }); | |
754 | default: { | |
1e59de90 | 755 | unexpected_tag(ret.tag, conn, "post_client_reconnect"); |
9f95a23c TL |
756 | return seastar::make_ready_future<next_step_t>(next_step_t::none); |
757 | } | |
758 | } | |
759 | }); | |
760 | } | |
761 | ||
762 | void ProtocolV2::execute_connecting() | |
763 | { | |
1e59de90 TL |
764 | ceph_assert_always(!is_socket_valid); |
765 | trigger_state(state_t::CONNECTING, io_state_t::delay, false); | |
766 | gated_execute("execute_connecting", conn, [this] { | |
767 | global_seq = messenger.get_global_seq(); | |
768 | assert(client_cookie != 0); | |
769 | if (!conn.policy.lossy && server_cookie != 0) { | |
770 | ++connect_seq; | |
771 | logger().debug("{} UPDATE: gs={}, cs={} for reconnect", | |
772 | conn, global_seq, connect_seq); | |
773 | } else { // conn.policy.lossy || server_cookie == 0 | |
774 | assert(connect_seq == 0); | |
775 | assert(server_cookie == 0); | |
776 | logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); | |
777 | } | |
778 | return wait_exit_io().then([this] { | |
779 | #ifdef UNIT_TESTS_BUILT | |
780 | // process custom_bp_t::SOCKET_CONNECTING | |
781 | // supports CONTINUE/FAULT/BLOCK | |
782 | if (conn.interceptor) { | |
783 | auto action = conn.interceptor->intercept( | |
784 | conn, {custom_bp_t::SOCKET_CONNECTING}); | |
785 | switch (action) { | |
786 | case bp_action_t::CONTINUE: | |
787 | return seastar::now(); | |
788 | case bp_action_t::FAULT: | |
789 | logger().info("[Test] got FAULT"); | |
790 | abort_in_fault(); | |
791 | case bp_action_t::BLOCK: | |
792 | logger().info("[Test] got BLOCK"); | |
793 | return conn.interceptor->blocker.block(); | |
794 | default: | |
795 | ceph_abort("unexpected action from trap"); | |
796 | } | |
797 | } else { | |
798 | return seastar::now(); | |
9f95a23c | 799 | } |
9f95a23c | 800 | }).then([this] { |
1e59de90 TL |
801 | #endif |
802 | ceph_assert_always(frame_assembler); | |
9f95a23c TL |
803 | if (unlikely(state != state_t::CONNECTING)) { |
804 | logger().debug("{} triggered {} before Socket::connect()", | |
805 | conn, get_state_name(state)); | |
806 | abort_protocol(); | |
807 | } | |
9f95a23c | 808 | return Socket::connect(conn.peer_addr); |
1e59de90 | 809 | }).then([this](SocketRef new_socket) { |
9f95a23c TL |
810 | logger().debug("{} socket connected", conn); |
811 | if (unlikely(state != state_t::CONNECTING)) { | |
812 | logger().debug("{} triggered {} during Socket::connect()", | |
813 | conn, get_state_name(state)); | |
1e59de90 | 814 | return new_socket->close().then([sock=std::move(new_socket)] { |
9f95a23c TL |
815 | abort_protocol(); |
816 | }); | |
817 | } | |
1e59de90 TL |
818 | if (!has_socket) { |
819 | frame_assembler->set_socket(std::move(new_socket)); | |
820 | has_socket = true; | |
821 | } else { | |
822 | gate.dispatch_in_background( | |
823 | "replace_socket_connecting", | |
824 | conn, | |
825 | [this, new_socket=std::move(new_socket)]() mutable { | |
826 | return frame_assembler->replace_shutdown_socket(std::move(new_socket)); | |
827 | } | |
828 | ); | |
829 | } | |
830 | is_socket_valid = true; | |
9f95a23c TL |
831 | return seastar::now(); |
832 | }).then([this] { | |
833 | auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); | |
1e59de90 TL |
834 | frame_assembler->reset_handlers(); |
835 | frame_assembler->start_recording(); | |
f67539c2 TL |
836 | return banner_exchange(true); |
837 | }).then([this] (auto&& ret) { | |
838 | auto [_peer_type, _my_addr_from_peer] = std::move(ret); | |
9f95a23c TL |
839 | if (conn.get_peer_type() != _peer_type) { |
840 | logger().warn("{} connection peer type does not match what peer advertises {} != {}", | |
841 | conn, ceph_entity_type_name(conn.get_peer_type()), | |
842 | ceph_entity_type_name(_peer_type)); | |
1e59de90 | 843 | ABORT_IN_CLOSE(true); |
9f95a23c | 844 | } |
f67539c2 TL |
845 | if (unlikely(state != state_t::CONNECTING)) { |
846 | logger().debug("{} triggered {} during banner_exchange(), abort", | |
847 | conn, get_state_name(state)); | |
848 | abort_protocol(); | |
849 | } | |
1e59de90 TL |
850 | frame_assembler->learn_socket_ephemeral_port_as_connector( |
851 | _my_addr_from_peer.get_port()); | |
9f95a23c TL |
852 | if (unlikely(_my_addr_from_peer.is_legacy())) { |
853 | logger().warn("{} peer sent a legacy address for me: {}", | |
854 | conn, _my_addr_from_peer); | |
855 | throw std::system_error( | |
856 | make_error_code(crimson::net::error::bad_peer_address)); | |
857 | } | |
858 | _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2); | |
1e59de90 | 859 | messenger.learned_addr(_my_addr_from_peer, conn); |
9f95a23c TL |
860 | return client_auth(); |
861 | }).then([this] { | |
862 | if (server_cookie == 0) { | |
863 | ceph_assert(connect_seq == 0); | |
864 | return client_connect(); | |
865 | } else { | |
866 | ceph_assert(connect_seq > 0); | |
867 | return client_reconnect(); | |
868 | } | |
869 | }).then([this] (next_step_t next) { | |
870 | if (unlikely(state != state_t::CONNECTING)) { | |
871 | logger().debug("{} triggered {} at the end of execute_connecting()", | |
872 | conn, get_state_name(state)); | |
873 | abort_protocol(); | |
874 | } | |
875 | switch (next) { | |
876 | case next_step_t::ready: { | |
1e59de90 TL |
877 | logger().info("{} connected: gs={}, pgs={}, cs={}, " |
878 | "client_cookie={}, server_cookie={}, {}", | |
9f95a23c | 879 | conn, global_seq, peer_global_seq, connect_seq, |
1e59de90 TL |
880 | client_cookie, server_cookie, |
881 | io_stat_printer{io_handler}); | |
882 | io_handler.dispatch_connect(); | |
883 | if (unlikely(state != state_t::CONNECTING)) { | |
884 | logger().debug("{} triggered {} after ms_handle_connect(), abort", | |
885 | conn, get_state_name(state)); | |
886 | abort_protocol(); | |
887 | } | |
888 | execute_ready(); | |
9f95a23c TL |
889 | break; |
890 | } | |
891 | case next_step_t::wait: { | |
1e59de90 TL |
892 | logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn); |
893 | ceph_assert_always(is_socket_valid); | |
894 | frame_assembler->shutdown_socket(); | |
895 | is_socket_valid = false; | |
9f95a23c TL |
896 | execute_wait(true); |
897 | break; | |
898 | } | |
899 | default: { | |
900 | ceph_abort("impossible next step"); | |
901 | } | |
902 | } | |
1e59de90 TL |
903 | }).handle_exception([this](std::exception_ptr eptr) { |
904 | fault(state_t::CONNECTING, "execute_connecting", eptr); | |
9f95a23c TL |
905 | }); |
906 | }); | |
907 | } | |
908 | ||
909 | // ACCEPTING state | |
910 | ||
911 | seastar::future<> ProtocolV2::_auth_bad_method(int r) | |
912 | { | |
913 | // _auth_bad_method() logic | |
914 | ceph_assert(r < 0); | |
915 | auto [allowed_methods, allowed_modes] = | |
916 | messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type()); | |
917 | auto bad_method = AuthBadMethodFrame::Encode( | |
918 | auth_meta->auth_method, r, allowed_methods, allowed_modes); | |
919 | logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, " | |
920 | "allowed_methods={}, allowed_modes={})", | |
921 | conn, auth_meta->auth_method, cpp_strerror(r), | |
922 | allowed_methods, allowed_modes); | |
1e59de90 TL |
923 | return frame_assembler->write_flush_frame(bad_method |
924 | ).then([this] { | |
9f95a23c TL |
925 | return server_auth(); |
926 | }); | |
927 | } | |
928 | ||
929 | seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) | |
930 | { | |
931 | // _handle_auth_request() logic | |
932 | ceph_assert(messenger.get_auth_server()); | |
933 | bufferlist reply; | |
934 | int r = messenger.get_auth_server()->handle_auth_request( | |
1e59de90 TL |
935 | conn, |
936 | *auth_meta, | |
937 | more, | |
938 | auth_meta->auth_method, | |
939 | auth_payload, | |
940 | &conn.peer_global_id, | |
9f95a23c TL |
941 | &reply); |
942 | switch (r) { | |
943 | // successful | |
944 | case 1: { | |
945 | auto auth_done = AuthDoneFrame::Encode( | |
946 | conn.peer_global_id, auth_meta->con_mode, reply); | |
947 | logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}", | |
948 | conn, conn.peer_global_id, | |
949 | ceph_con_mode_name(auth_meta->con_mode), reply.length()); | |
1e59de90 TL |
950 | return frame_assembler->write_flush_frame(auth_done |
951 | ).then([this] { | |
9f95a23c | 952 | ceph_assert(auth_meta); |
1e59de90 | 953 | frame_assembler->create_session_stream_handlers(*auth_meta, true); |
9f95a23c TL |
954 | return finish_auth(); |
955 | }); | |
956 | } | |
957 | // auth more | |
958 | case 0: { | |
959 | auto more = AuthReplyMoreFrame::Encode(reply); | |
960 | logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}", | |
961 | conn, reply.length()); | |
1e59de90 TL |
962 | return frame_assembler->write_flush_frame(more |
963 | ).then([this] { | |
964 | return frame_assembler->read_main_preamble(); | |
965 | }).then([this](auto ret) { | |
966 | expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, "read_auth_request_more"); | |
967 | return frame_assembler->read_frame_payload(); | |
968 | }).then([this](auto payload) { | |
969 | auto auth_more = AuthRequestMoreFrame::Decode(payload->back()); | |
9f95a23c TL |
970 | logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}", |
971 | conn, auth_more.auth_payload().length()); | |
972 | return _handle_auth_request(auth_more.auth_payload(), true); | |
973 | }); | |
974 | } | |
975 | case -EBUSY: { | |
976 | logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn); | |
977 | abort_in_fault(); | |
978 | return seastar::now(); | |
979 | } | |
980 | default: { | |
981 | logger().warn("{} auth_server handle_auth_request returned {}", conn, r); | |
982 | return _auth_bad_method(r); | |
983 | } | |
984 | } | |
985 | } | |
986 | ||
987 | seastar::future<> ProtocolV2::server_auth() | |
988 | { | |
1e59de90 TL |
989 | return frame_assembler->read_main_preamble( |
990 | ).then([this](auto ret) { | |
991 | expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, "read_auth_request"); | |
992 | return frame_assembler->read_frame_payload(); | |
993 | }).then([this](auto payload) { | |
9f95a23c | 994 | // handle_auth_request() logic |
1e59de90 | 995 | auto request = AuthRequestFrame::Decode(payload->back()); |
9f95a23c TL |
996 | logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={}," |
997 | " payload_len={}", | |
998 | conn, request.method(), request.preferred_modes(), | |
999 | request.auth_payload().length()); | |
1000 | auth_meta->auth_method = request.method(); | |
1001 | auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode( | |
1002 | conn.get_peer_type(), auth_meta->auth_method, | |
1003 | request.preferred_modes()); | |
1004 | if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) { | |
1005 | logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn); | |
1006 | return _auth_bad_method(-EOPNOTSUPP); | |
1007 | } | |
1008 | return _handle_auth_request(request.auth_payload(), false); | |
1009 | }); | |
1010 | } | |
1011 | ||
f67539c2 TL |
1012 | bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const |
1013 | { | |
1014 | auto my_peer_name = conn.get_peer_name(); | |
1015 | if (my_peer_name.type() != peer_name.type()) { | |
1016 | return false; | |
1017 | } | |
1018 | if (my_peer_name.num() != entity_name_t::NEW && | |
1019 | peer_name.num() != entity_name_t::NEW && | |
1020 | my_peer_name.num() != peer_name.num()) { | |
1021 | return false; | |
1022 | } | |
1023 | return true; | |
1024 | } | |
1025 | ||
9f95a23c TL |
1026 | seastar::future<ProtocolV2::next_step_t> |
1027 | ProtocolV2::send_wait() | |
1028 | { | |
1029 | auto wait = WaitFrame::Encode(); | |
1030 | logger().debug("{} WRITE WaitFrame", conn); | |
1e59de90 TL |
1031 | return frame_assembler->write_flush_frame(wait |
1032 | ).then([] { | |
9f95a23c TL |
1033 | return next_step_t::wait; |
1034 | }); | |
1035 | } | |
1036 | ||
1037 | seastar::future<ProtocolV2::next_step_t> | |
1038 | ProtocolV2::reuse_connection( | |
1039 | ProtocolV2* existing_proto, bool do_reset, | |
1040 | bool reconnect, uint64_t conn_seq, uint64_t msg_seq) | |
1041 | { | |
1e59de90 TL |
1042 | if (unlikely(state != state_t::ACCEPTING)) { |
1043 | logger().debug("{} triggered {} before trigger_replacing()", | |
1044 | conn, get_state_name(state)); | |
1045 | abort_protocol(); | |
1046 | } | |
1047 | ||
9f95a23c TL |
1048 | existing_proto->trigger_replacing(reconnect, |
1049 | do_reset, | |
1e59de90 | 1050 | frame_assembler->to_replace(), |
9f95a23c | 1051 | std::move(auth_meta), |
9f95a23c TL |
1052 | peer_global_seq, |
1053 | client_cookie, | |
1054 | conn.get_peer_name(), | |
1e59de90 TL |
1055 | conn.get_features(), |
1056 | peer_supported_features, | |
9f95a23c TL |
1057 | conn_seq, |
1058 | msg_seq); | |
1e59de90 TL |
1059 | ceph_assert_always(has_socket && is_socket_valid); |
1060 | is_socket_valid = false; | |
1061 | has_socket = false; | |
9f95a23c TL |
1062 | #ifdef UNIT_TESTS_BUILT |
1063 | if (conn.interceptor) { | |
1064 | conn.interceptor->register_conn_replaced(conn); | |
1065 | } | |
1066 | #endif | |
1067 | // close this connection because all the necessary information is delivered | |
1068 | // to the exisiting connection, and jump to error handling code to abort the | |
1069 | // current state. | |
1e59de90 | 1070 | ABORT_IN_CLOSE(false); |
9f95a23c TL |
1071 | return seastar::make_ready_future<next_step_t>(next_step_t::none); |
1072 | } | |
1073 | ||
1074 | seastar::future<ProtocolV2::next_step_t> | |
1075 | ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) | |
1076 | { | |
1077 | // handle_existing_connection() logic | |
1078 | ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( | |
1079 | existing_conn->protocol.get()); | |
1080 | ceph_assert(existing_proto); | |
1081 | logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting," | |
1082 | " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", | |
1083 | conn, global_seq, peer_global_seq, connect_seq, | |
1084 | client_cookie, server_cookie, | |
1e59de90 | 1085 | fmt::ptr(existing_conn.get()), get_state_name(existing_proto->state), |
9f95a23c TL |
1086 | existing_proto->global_seq, |
1087 | existing_proto->peer_global_seq, | |
1088 | existing_proto->connect_seq, | |
1089 | existing_proto->client_cookie, | |
1090 | existing_proto->server_cookie); | |
1091 | ||
f67539c2 TL |
1092 | if (!validate_peer_name(existing_conn->get_peer_name())) { |
1093 | logger().error("{} server_connect: my peer_name doesn't match" | |
1e59de90 | 1094 | " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get())); |
f67539c2 TL |
1095 | abort_in_fault(); |
1096 | } | |
1097 | ||
9f95a23c TL |
1098 | if (existing_proto->state == state_t::REPLACING) { |
1099 | logger().warn("{} server_connect: racing replace happened while" | |
1100 | " replacing existing connection {}, send wait.", | |
1101 | conn, *existing_conn); | |
1102 | return send_wait(); | |
1103 | } | |
1104 | ||
1105 | if (existing_proto->peer_global_seq > peer_global_seq) { | |
1106 | logger().warn("{} server_connect:" | |
1107 | " this is a stale connection, because peer_global_seq({})" | |
1108 | " < existing->peer_global_seq({}), close this connection" | |
1109 | " in favor of existing connection {}", | |
1110 | conn, peer_global_seq, | |
1111 | existing_proto->peer_global_seq, *existing_conn); | |
1112 | abort_in_fault(); | |
1113 | } | |
1114 | ||
1115 | if (existing_conn->policy.lossy) { | |
1116 | // existing connection can be thrown out in favor of this one | |
1117 | logger().warn("{} server_connect:" | |
1118 | " existing connection {} is a lossy channel. Close existing in favor of" | |
1119 | " this connection", conn, *existing_conn); | |
1e59de90 TL |
1120 | if (unlikely(state != state_t::ACCEPTING)) { |
1121 | logger().debug("{} triggered {} before execute_establishing()", | |
1122 | conn, get_state_name(state)); | |
1123 | abort_protocol(); | |
1124 | } | |
1125 | execute_establishing(existing_conn); | |
9f95a23c TL |
1126 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); |
1127 | } | |
1128 | ||
1129 | if (existing_proto->server_cookie != 0) { | |
1130 | if (existing_proto->client_cookie != client_cookie) { | |
1131 | // Found previous session | |
1132 | // peer has reset and we're going to reuse the existing connection | |
1133 | // by replacing the socket | |
1134 | logger().warn("{} server_connect:" | |
1135 | " found new session (cs={})" | |
1e59de90 | 1136 | " when existing {} {} is with stale session (cs={}, ss={})," |
9f95a23c | 1137 | " peer must have reset", |
1e59de90 TL |
1138 | conn, |
1139 | client_cookie, | |
1140 | get_state_name(existing_proto->state), | |
1141 | *existing_conn, | |
1142 | existing_proto->client_cookie, | |
9f95a23c TL |
1143 | existing_proto->server_cookie); |
1144 | return reuse_connection(existing_proto, conn.policy.resetcheck); | |
1145 | } else { | |
1146 | // session establishment interrupted between client_ident and server_ident, | |
1147 | // continuing... | |
1e59de90 | 1148 | logger().warn("{} server_connect: found client session with existing {} {}" |
9f95a23c | 1149 | " matched (cs={}, ss={}), continuing session establishment", |
1e59de90 TL |
1150 | conn, |
1151 | get_state_name(existing_proto->state), | |
1152 | *existing_conn, | |
1153 | client_cookie, | |
1154 | existing_proto->server_cookie); | |
9f95a23c TL |
1155 | return reuse_connection(existing_proto); |
1156 | } | |
1157 | } else { | |
1158 | // Looks like a connection race: server and client are both connecting to | |
1159 | // each other at the same time. | |
1160 | if (existing_proto->client_cookie != client_cookie) { | |
1161 | if (existing_conn->peer_wins()) { | |
1e59de90 | 1162 | // acceptor (this connection, the peer) wins |
9f95a23c | 1163 | logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" |
1e59de90 TL |
1164 | " and win, reusing existing {} {}", |
1165 | conn, | |
1166 | client_cookie, | |
1167 | existing_proto->client_cookie, | |
1168 | get_state_name(existing_proto->state), | |
1169 | *existing_conn); | |
9f95a23c TL |
1170 | return reuse_connection(existing_proto); |
1171 | } else { | |
1e59de90 | 1172 | // acceptor (this connection, the peer) loses |
9f95a23c TL |
1173 | logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" |
1174 | " and lose to existing {}, ask client to wait", | |
1175 | conn, client_cookie, existing_proto->client_cookie, *existing_conn); | |
1e59de90 | 1176 | return existing_conn->send_keepalive().then([this] { |
9f95a23c TL |
1177 | return send_wait(); |
1178 | }); | |
1179 | } | |
1180 | } else { | |
1e59de90 | 1181 | logger().warn("{} server_connect: found client session with existing {} {}" |
9f95a23c | 1182 | " matched (cs={}, ss={}), continuing session establishment", |
1e59de90 TL |
1183 | conn, |
1184 | get_state_name(existing_proto->state), | |
1185 | *existing_conn, | |
1186 | client_cookie, | |
1187 | existing_proto->server_cookie); | |
9f95a23c TL |
1188 | return reuse_connection(existing_proto); |
1189 | } | |
1190 | } | |
1191 | } | |
1192 | ||
1193 | seastar::future<ProtocolV2::next_step_t> | |
1194 | ProtocolV2::server_connect() | |
1195 | { | |
1e59de90 TL |
1196 | return frame_assembler->read_frame_payload( |
1197 | ).then([this](auto payload) { | |
9f95a23c | 1198 | // handle_client_ident() logic |
1e59de90 | 1199 | auto client_ident = ClientIdentFrame::Decode(payload->back()); |
9f95a23c TL |
1200 | logger().debug("{} GOT ClientIdentFrame: addrs={}, target={}," |
1201 | " gid={}, gs={}, features_supported={}," | |
1202 | " features_required={}, flags={}, cookie={}", | |
1203 | conn, client_ident.addrs(), client_ident.target_addr(), | |
1204 | client_ident.gid(), client_ident.global_seq(), | |
1205 | client_ident.supported_features(), | |
1206 | client_ident.required_features(), | |
1207 | client_ident.flags(), client_ident.cookie()); | |
1208 | ||
1209 | if (client_ident.addrs().empty() || | |
1210 | client_ident.addrs().front() == entity_addr_t()) { | |
1211 | logger().warn("{} oops, client_ident.addrs() is empty", conn); | |
1212 | throw std::system_error( | |
1213 | make_error_code(crimson::net::error::bad_peer_address)); | |
1214 | } | |
1215 | if (!messenger.get_myaddrs().contains(client_ident.target_addr())) { | |
1216 | logger().warn("{} peer is trying to reach {} which is not us ({})", | |
1217 | conn, client_ident.target_addr(), messenger.get_myaddrs()); | |
1218 | throw std::system_error( | |
1219 | make_error_code(crimson::net::error::bad_peer_address)); | |
1220 | } | |
20effc67 | 1221 | conn.peer_addr = client_ident.addrs().front(); |
9f95a23c TL |
1222 | logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr); |
1223 | conn.target_addr = conn.peer_addr; | |
1224 | if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) { | |
1225 | logger().warn("{} we don't know how to reconnect to peer {}", | |
1226 | conn, conn.target_addr); | |
1227 | throw std::system_error( | |
1228 | make_error_code(crimson::net::error::bad_peer_address)); | |
1229 | } | |
1230 | ||
f67539c2 TL |
1231 | if (conn.get_peer_id() != entity_name_t::NEW && |
1232 | conn.get_peer_id() != client_ident.gid()) { | |
1233 | logger().error("{} client_ident peer_id ({}) does not match" | |
1234 | " what it should be ({}) during accepting, abort", | |
1235 | conn, client_ident.gid(), conn.get_peer_id()); | |
1236 | abort_in_fault(); | |
1237 | } | |
9f95a23c TL |
1238 | conn.set_peer_id(client_ident.gid()); |
1239 | client_cookie = client_ident.cookie(); | |
1240 | ||
1241 | uint64_t feat_missing = | |
1242 | (conn.policy.features_required | msgr2_required) & | |
1243 | ~(uint64_t)client_ident.supported_features(); | |
1244 | if (feat_missing) { | |
1245 | auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); | |
1246 | logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", | |
1247 | conn, feat_missing); | |
1e59de90 TL |
1248 | return frame_assembler->write_flush_frame(ident_missing_features |
1249 | ).then([] { | |
9f95a23c TL |
1250 | return next_step_t::wait; |
1251 | }); | |
1252 | } | |
1e59de90 TL |
1253 | conn.set_features(client_ident.supported_features() & |
1254 | conn.policy.features_supported); | |
1255 | logger().debug("{} UPDATE: features={}", conn, conn.get_features()); | |
9f95a23c TL |
1256 | |
1257 | peer_global_seq = client_ident.global_seq(); | |
1258 | ||
1e59de90 TL |
1259 | bool lossy = client_ident.flags() & CEPH_MSG_CONNECT_LOSSY; |
1260 | if (lossy != conn.policy.lossy) { | |
1261 | logger().warn("{} my lossy policy {} doesn't match client {}, ignore", | |
1262 | conn, conn.policy.lossy, lossy); | |
1263 | } | |
1264 | ||
9f95a23c TL |
1265 | // Looks good so far, let's check if there is already an existing connection |
1266 | // to this peer. | |
1267 | ||
1268 | SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); | |
1269 | ||
1270 | if (existing_conn) { | |
1e59de90 | 1271 | return handle_existing_connection(existing_conn); |
f67539c2 | 1272 | } else { |
1e59de90 TL |
1273 | if (unlikely(state != state_t::ACCEPTING)) { |
1274 | logger().debug("{} triggered {} before execute_establishing()", | |
1275 | conn, get_state_name(state)); | |
1276 | abort_protocol(); | |
1277 | } | |
1278 | execute_establishing(nullptr); | |
f67539c2 | 1279 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); |
9f95a23c | 1280 | } |
9f95a23c TL |
1281 | }); |
1282 | } | |
1283 | ||
1284 | seastar::future<ProtocolV2::next_step_t> | |
1285 | ProtocolV2::read_reconnect() | |
1286 | { | |
1e59de90 TL |
1287 | return frame_assembler->read_main_preamble( |
1288 | ).then([this](auto ret) { | |
1289 | expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_session_reconnect"); | |
9f95a23c TL |
1290 | return server_reconnect(); |
1291 | }); | |
1292 | } | |
1293 | ||
1294 | seastar::future<ProtocolV2::next_step_t> | |
1295 | ProtocolV2::send_retry(uint64_t connect_seq) | |
1296 | { | |
1297 | auto retry = RetryFrame::Encode(connect_seq); | |
1298 | logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); | |
1e59de90 TL |
1299 | return frame_assembler->write_flush_frame(retry |
1300 | ).then([this] { | |
9f95a23c TL |
1301 | return read_reconnect(); |
1302 | }); | |
1303 | } | |
1304 | ||
1305 | seastar::future<ProtocolV2::next_step_t> | |
1306 | ProtocolV2::send_retry_global(uint64_t global_seq) | |
1307 | { | |
1308 | auto retry = RetryGlobalFrame::Encode(global_seq); | |
1309 | logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); | |
1e59de90 TL |
1310 | return frame_assembler->write_flush_frame(retry |
1311 | ).then([this] { | |
9f95a23c TL |
1312 | return read_reconnect(); |
1313 | }); | |
1314 | } | |
1315 | ||
1316 | seastar::future<ProtocolV2::next_step_t> | |
1317 | ProtocolV2::send_reset(bool full) | |
1318 | { | |
1319 | auto reset = ResetFrame::Encode(full); | |
1320 | logger().warn("{} WRITE ResetFrame: full={}", conn, full); | |
1e59de90 TL |
1321 | return frame_assembler->write_flush_frame(reset |
1322 | ).then([this] { | |
1323 | return frame_assembler->read_main_preamble(); | |
1324 | }).then([this](auto ret) { | |
1325 | expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset"); | |
9f95a23c TL |
1326 | return server_connect(); |
1327 | }); | |
1328 | } | |
1329 | ||
1330 | seastar::future<ProtocolV2::next_step_t> | |
1331 | ProtocolV2::server_reconnect() | |
1332 | { | |
1e59de90 TL |
1333 | return frame_assembler->read_frame_payload( |
1334 | ).then([this](auto payload) { | |
9f95a23c | 1335 | // handle_reconnect() logic |
1e59de90 | 1336 | auto reconnect = ReconnectFrame::Decode(payload->back()); |
9f95a23c TL |
1337 | |
1338 | logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={}," | |
1339 | " server_cookie={}, gs={}, cs={}, msg_seq={}", | |
1340 | conn, reconnect.addrs(), | |
1341 | reconnect.client_cookie(), reconnect.server_cookie(), | |
1342 | reconnect.global_seq(), reconnect.connect_seq(), | |
1343 | reconnect.msg_seq()); | |
1344 | ||
1345 | // can peer_addrs be changed on-the-fly? | |
1346 | // TODO: change peer_addr to entity_addrvec_t | |
1347 | entity_addr_t paddr = reconnect.addrs().front(); | |
1348 | if (paddr.is_msgr2() || paddr.is_any()) { | |
1349 | // good | |
1350 | } else { | |
1351 | logger().warn("{} peer's address {} is not v2", conn, paddr); | |
1352 | throw std::system_error( | |
1353 | make_error_code(crimson::net::error::bad_peer_address)); | |
1354 | } | |
1355 | if (conn.peer_addr == entity_addr_t()) { | |
1356 | conn.peer_addr = paddr; | |
1357 | } else if (conn.peer_addr != paddr) { | |
1358 | logger().error("{} peer identifies as {}, while conn.peer_addr={}," | |
1359 | " reconnect failed", | |
1360 | conn, paddr, conn.peer_addr); | |
1361 | throw std::system_error( | |
1362 | make_error_code(crimson::net::error::bad_peer_address)); | |
1363 | } | |
1364 | peer_global_seq = reconnect.global_seq(); | |
1365 | ||
1366 | SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); | |
1367 | ||
1368 | if (!existing_conn) { | |
1369 | // there is no existing connection therefore cannot reconnect to previous | |
1370 | // session | |
1371 | logger().warn("{} server_reconnect: no existing connection from address {}," | |
1372 | " reseting client", conn, conn.peer_addr); | |
1373 | return send_reset(true); | |
1374 | } | |
1375 | ||
9f95a23c TL |
1376 | ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( |
1377 | existing_conn->protocol.get()); | |
1378 | ceph_assert(existing_proto); | |
1379 | logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting," | |
1380 | " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", | |
1381 | conn, global_seq, peer_global_seq, reconnect.connect_seq(), | |
1382 | reconnect.client_cookie(), reconnect.server_cookie(), | |
1e59de90 | 1383 | fmt::ptr(existing_conn.get()), |
9f95a23c TL |
1384 | get_state_name(existing_proto->state), |
1385 | existing_proto->global_seq, | |
1386 | existing_proto->peer_global_seq, | |
1387 | existing_proto->connect_seq, | |
1388 | existing_proto->client_cookie, | |
1389 | existing_proto->server_cookie); | |
1390 | ||
f67539c2 TL |
1391 | if (!validate_peer_name(existing_conn->get_peer_name())) { |
1392 | logger().error("{} server_reconnect: my peer_name doesn't match" | |
1e59de90 | 1393 | " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get())); |
f67539c2 TL |
1394 | abort_in_fault(); |
1395 | } | |
1396 | ||
9f95a23c TL |
1397 | if (existing_proto->state == state_t::REPLACING) { |
1398 | logger().warn("{} server_reconnect: racing replace happened while " | |
1399 | " replacing existing connection {}, retry global.", | |
1400 | conn, *existing_conn); | |
1401 | return send_retry_global(existing_proto->peer_global_seq); | |
1402 | } | |
1403 | ||
1404 | if (existing_proto->client_cookie != reconnect.client_cookie()) { | |
1405 | logger().warn("{} server_reconnect:" | |
1406 | " client_cookie mismatch with existing connection {}," | |
1407 | " cc={} rcc={}. I must have reset, reseting client.", | |
1408 | conn, *existing_conn, | |
1409 | existing_proto->client_cookie, reconnect.client_cookie()); | |
1410 | return send_reset(conn.policy.resetcheck); | |
1411 | } else if (existing_proto->server_cookie == 0) { | |
1412 | // this happens when: | |
1413 | // - a connects to b | |
1414 | // - a sends client_ident | |
1415 | // - b gets client_ident, sends server_ident and sets cookie X | |
1416 | // - connection fault | |
1417 | // - b reconnects to a with cookie X, connect_seq=1 | |
1418 | // - a has cookie==0 | |
1419 | logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the" | |
1420 | " server_ident with existing connection {}." | |
1421 | " Asking peer to resume session establishment", | |
1422 | conn, existing_proto->client_cookie, *existing_conn); | |
1423 | return send_reset(false); | |
1424 | } | |
1425 | ||
1426 | if (existing_proto->peer_global_seq > reconnect.global_seq()) { | |
1427 | logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({})," | |
1428 | " with existing connection {}," | |
1429 | " ask client to retry global", | |
1430 | conn, existing_proto->peer_global_seq, | |
1431 | reconnect.global_seq(), *existing_conn); | |
1432 | return send_retry_global(existing_proto->peer_global_seq); | |
1433 | } | |
1434 | ||
1435 | if (existing_proto->connect_seq > reconnect.connect_seq()) { | |
1436 | logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({})," | |
1437 | " with existing connection {}, ask client to retry", | |
1438 | conn, reconnect.connect_seq(), | |
1439 | existing_proto->connect_seq, *existing_conn); | |
1440 | return send_retry(existing_proto->connect_seq); | |
1441 | } else if (existing_proto->connect_seq == reconnect.connect_seq()) { | |
1442 | // reconnect race: both peers are sending reconnect messages | |
1443 | if (existing_conn->peer_wins()) { | |
1e59de90 | 1444 | // acceptor (this connection, the peer) wins |
9f95a23c | 1445 | logger().warn("{} server_reconnect: reconnect race detected (cs={})" |
1e59de90 TL |
1446 | " and win, reusing existing {} {}", |
1447 | conn, | |
1448 | reconnect.connect_seq(), | |
1449 | get_state_name(existing_proto->state), | |
1450 | *existing_conn); | |
9f95a23c TL |
1451 | return reuse_connection( |
1452 | existing_proto, false, | |
1453 | true, reconnect.connect_seq(), reconnect.msg_seq()); | |
1454 | } else { | |
1e59de90 | 1455 | // acceptor (this connection, the peer) loses |
9f95a23c TL |
1456 | logger().warn("{} server_reconnect: reconnect race detected (cs={})" |
1457 | " and lose to existing {}, ask client to wait", | |
1458 | conn, reconnect.connect_seq(), *existing_conn); | |
1459 | return send_wait(); | |
1460 | } | |
1461 | } else { // existing_proto->connect_seq < reconnect.connect_seq() | |
1462 | logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({})," | |
1e59de90 TL |
1463 | " reusing existing {} {}", |
1464 | conn, | |
1465 | existing_proto->connect_seq, | |
1466 | reconnect.connect_seq(), | |
1467 | get_state_name(existing_proto->state), | |
1468 | *existing_conn); | |
9f95a23c TL |
1469 | return reuse_connection( |
1470 | existing_proto, false, | |
1471 | true, reconnect.connect_seq(), reconnect.msg_seq()); | |
1472 | } | |
1473 | }); | |
1474 | } | |
1475 | ||
1476 | void ProtocolV2::execute_accepting() | |
1477 | { | |
1e59de90 TL |
1478 | assert(is_socket_valid); |
1479 | trigger_state(state_t::ACCEPTING, io_state_t::none, false); | |
1480 | gate.dispatch_in_background("execute_accepting", conn, [this] { | |
f67539c2 | 1481 | return seastar::futurize_invoke([this] { |
1e59de90 TL |
1482 | #ifdef UNIT_TESTS_BUILT |
1483 | if (conn.interceptor) { | |
1484 | auto action = conn.interceptor->intercept( | |
1485 | conn, {custom_bp_t::SOCKET_ACCEPTED}); | |
1486 | switch (action) { | |
1487 | case bp_action_t::CONTINUE: | |
1488 | break; | |
1489 | case bp_action_t::FAULT: | |
1490 | logger().info("[Test] got FAULT"); | |
1491 | abort_in_fault(); | |
1492 | default: | |
1493 | ceph_abort("unexpected action from trap"); | |
1494 | } | |
1495 | } | |
1496 | #endif | |
9f95a23c | 1497 | auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); |
1e59de90 TL |
1498 | frame_assembler->reset_handlers(); |
1499 | frame_assembler->start_recording(); | |
f67539c2 TL |
1500 | return banner_exchange(false); |
1501 | }).then([this] (auto&& ret) { | |
1502 | auto [_peer_type, _my_addr_from_peer] = std::move(ret); | |
9f95a23c TL |
1503 | ceph_assert(conn.get_peer_type() == 0); |
1504 | conn.set_peer_type(_peer_type); | |
1505 | ||
1506 | conn.policy = messenger.get_policy(_peer_type); | |
1507 | logger().info("{} UPDATE: peer_type={}," | |
1508 | " policy(lossy={} server={} standby={} resetcheck={})", | |
1509 | conn, ceph_entity_type_name(_peer_type), | |
1510 | conn.policy.lossy, conn.policy.server, | |
1511 | conn.policy.standby, conn.policy.resetcheck); | |
20effc67 TL |
1512 | if (!messenger.get_myaddr().is_blank_ip() && |
1513 | (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() || | |
1514 | messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) { | |
9f95a23c TL |
1515 | logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}", |
1516 | conn, _my_addr_from_peer, messenger.get_myaddr()); | |
1517 | throw std::system_error( | |
1518 | make_error_code(crimson::net::error::bad_peer_address)); | |
1519 | } | |
1e59de90 | 1520 | messenger.learned_addr(_my_addr_from_peer, conn); |
9f95a23c TL |
1521 | return server_auth(); |
1522 | }).then([this] { | |
1e59de90 TL |
1523 | return frame_assembler->read_main_preamble(); |
1524 | }).then([this](auto ret) { | |
1525 | switch (ret.tag) { | |
9f95a23c TL |
1526 | case Tag::CLIENT_IDENT: |
1527 | return server_connect(); | |
1528 | case Tag::SESSION_RECONNECT: | |
1529 | return server_reconnect(); | |
1530 | default: { | |
1e59de90 | 1531 | unexpected_tag(ret.tag, conn, "post_server_auth"); |
9f95a23c TL |
1532 | return seastar::make_ready_future<next_step_t>(next_step_t::none); |
1533 | } | |
1534 | } | |
1535 | }).then([this] (next_step_t next) { | |
1536 | switch (next) { | |
1537 | case next_step_t::ready: | |
1538 | assert(state != state_t::ACCEPTING); | |
1539 | break; | |
1540 | case next_step_t::wait: | |
1541 | if (unlikely(state != state_t::ACCEPTING)) { | |
1542 | logger().debug("{} triggered {} at the end of execute_accepting()", | |
1543 | conn, get_state_name(state)); | |
1544 | abort_protocol(); | |
1545 | } | |
1546 | logger().info("{} execute_accepting(): going to SERVER_WAIT", conn); | |
1547 | execute_server_wait(); | |
1548 | break; | |
1549 | default: | |
1550 | ceph_abort("impossible next step"); | |
1551 | } | |
1e59de90 TL |
1552 | }).handle_exception([this](std::exception_ptr eptr) { |
1553 | const char *e_what; | |
1554 | try { | |
1555 | std::rethrow_exception(eptr); | |
1556 | } catch (std::exception &e) { | |
1557 | e_what = e.what(); | |
1558 | } | |
9f95a23c | 1559 | logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", |
1e59de90 TL |
1560 | conn, get_state_name(state), e_what); |
1561 | do_close(false); | |
9f95a23c TL |
1562 | }); |
1563 | }); | |
1564 | } | |
1565 | ||
1566 | // CONNECTING or ACCEPTING state | |
1567 | ||
1568 | seastar::future<> ProtocolV2::finish_auth() | |
1569 | { | |
1570 | ceph_assert(auth_meta); | |
1571 | ||
1e59de90 | 1572 | auto records = frame_assembler->stop_recording(); |
9f95a23c | 1573 | const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() : |
1e59de90 | 1574 | auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf); |
9f95a23c | 1575 | auto sig_frame = AuthSignatureFrame::Encode(sig); |
9f95a23c | 1576 | logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig); |
1e59de90 TL |
1577 | return frame_assembler->write_flush_frame(sig_frame |
1578 | ).then([this] { | |
1579 | return frame_assembler->read_main_preamble(); | |
1580 | }).then([this](auto ret) { | |
1581 | expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth"); | |
1582 | return frame_assembler->read_frame_payload(); | |
1583 | }).then([this, txbuf=std::move(records.txbuf)](auto payload) { | |
9f95a23c | 1584 | // handle_auth_signature() logic |
1e59de90 | 1585 | auto sig_frame = AuthSignatureFrame::Decode(payload->back()); |
9f95a23c TL |
1586 | logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature()); |
1587 | ||
1588 | const auto actual_tx_sig = auth_meta->session_key.empty() ? | |
1589 | sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf); | |
1590 | if (sig_frame.signature() != actual_tx_sig) { | |
1591 | logger().warn("{} pre-auth signature mismatch actual_tx_sig={}" | |
1592 | " sig_frame.signature()={}", | |
1593 | conn, actual_tx_sig, sig_frame.signature()); | |
1594 | abort_in_fault(); | |
1595 | } | |
9f95a23c TL |
1596 | }); |
1597 | } | |
1598 | ||
1599 | // ESTABLISHING | |
1600 | ||
1e59de90 | 1601 | void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { |
f67539c2 TL |
1602 | auto accept_me = [this] { |
1603 | messenger.register_conn( | |
1604 | seastar::static_pointer_cast<SocketConnection>( | |
1605 | conn.shared_from_this())); | |
1606 | messenger.unaccept_conn( | |
1607 | seastar::static_pointer_cast<SocketConnection>( | |
1608 | conn.shared_from_this())); | |
1609 | }; | |
1610 | ||
1e59de90 TL |
1611 | ceph_assert_always(is_socket_valid); |
1612 | trigger_state(state_t::ESTABLISHING, io_state_t::delay, false); | |
f67539c2 | 1613 | if (existing_conn) { |
1e59de90 TL |
1614 | static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close( |
1615 | true /* is_dispatch_reset */, std::move(accept_me)); | |
f67539c2 TL |
1616 | if (unlikely(state != state_t::ESTABLISHING)) { |
1617 | logger().warn("{} triggered {} during execute_establishing(), " | |
1618 | "the accept event will not be delivered!", | |
1619 | conn, get_state_name(state)); | |
1620 | abort_protocol(); | |
1621 | } | |
1622 | } else { | |
1623 | accept_me(); | |
1624 | } | |
1625 | ||
1e59de90 TL |
1626 | io_handler.dispatch_accept(); |
1627 | if (unlikely(state != state_t::ESTABLISHING)) { | |
1628 | logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()", | |
1629 | conn, get_state_name(state)); | |
1630 | abort_protocol(); | |
1631 | } | |
f67539c2 | 1632 | |
1e59de90 | 1633 | gated_execute("execute_establishing", conn, [this] { |
f67539c2 | 1634 | return seastar::futurize_invoke([this] { |
9f95a23c TL |
1635 | return send_server_ident(); |
1636 | }).then([this] { | |
1637 | if (unlikely(state != state_t::ESTABLISHING)) { | |
1638 | logger().debug("{} triggered {} at the end of execute_establishing()", | |
1639 | conn, get_state_name(state)); | |
1640 | abort_protocol(); | |
1641 | } | |
1e59de90 TL |
1642 | logger().info("{} established: gs={}, pgs={}, cs={}, " |
1643 | "client_cookie={}, server_cookie={}, {}", | |
9f95a23c | 1644 | conn, global_seq, peer_global_seq, connect_seq, |
1e59de90 TL |
1645 | client_cookie, server_cookie, |
1646 | io_stat_printer{io_handler}); | |
1647 | execute_ready(); | |
1648 | }).handle_exception([this](std::exception_ptr eptr) { | |
1649 | fault(state_t::ESTABLISHING, "execute_establishing", eptr); | |
9f95a23c TL |
1650 | }); |
1651 | }); | |
1652 | } | |
1653 | ||
1654 | // ESTABLISHING or REPLACING state | |
1655 | ||
1656 | seastar::future<> | |
1657 | ProtocolV2::send_server_ident() | |
1658 | { | |
1659 | // send_server_ident() logic | |
1660 | ||
1661 | // refered to async-conn v2: not assign gs to global_seq | |
1e59de90 TL |
1662 | global_seq = messenger.get_global_seq(); |
1663 | logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); | |
9f95a23c | 1664 | |
1e59de90 TL |
1665 | // this is required for the case when this connection is being replaced |
1666 | io_handler.requeue_out_sent_up_to(0); | |
1667 | io_handler.reset_session(false); | |
9f95a23c | 1668 | |
1e59de90 TL |
1669 | if (!conn.policy.lossy) { |
1670 | server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll); | |
1671 | } | |
9f95a23c | 1672 | |
1e59de90 TL |
1673 | uint64_t flags = 0; |
1674 | if (conn.policy.lossy) { | |
1675 | flags = flags | CEPH_MSG_CONNECT_LOSSY; | |
1676 | } | |
9f95a23c | 1677 | |
1e59de90 TL |
1678 | auto server_ident = ServerIdentFrame::Encode( |
1679 | messenger.get_myaddrs(), | |
1680 | messenger.get_myname().num(), | |
1681 | global_seq, | |
1682 | conn.policy.features_supported, | |
1683 | conn.policy.features_required | msgr2_required, | |
1684 | flags, | |
1685 | server_cookie); | |
1686 | ||
1687 | logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," | |
1688 | " gs={}, features_supported={}, features_required={}," | |
1689 | " flags={}, cookie={}", | |
1690 | conn, messenger.get_myaddrs(), messenger.get_myname().num(), | |
1691 | global_seq, conn.policy.features_supported, | |
1692 | conn.policy.features_required | msgr2_required, | |
1693 | flags, server_cookie); | |
1694 | ||
1695 | return frame_assembler->write_flush_frame(server_ident); | |
9f95a23c TL |
1696 | } |
1697 | ||
1698 | // REPLACING state | |
1699 | ||
1700 | void ProtocolV2::trigger_replacing(bool reconnect, | |
1701 | bool do_reset, | |
1e59de90 | 1702 | FrameAssemblerV2::mover_t &&mover, |
9f95a23c | 1703 | AuthConnectionMetaRef&& new_auth_meta, |
9f95a23c TL |
1704 | uint64_t new_peer_global_seq, |
1705 | uint64_t new_client_cookie, | |
1706 | entity_name_t new_peer_name, | |
1707 | uint64_t new_conn_features, | |
1e59de90 | 1708 | uint64_t new_peer_supported_features, |
9f95a23c TL |
1709 | uint64_t new_connect_seq, |
1710 | uint64_t new_msg_seq) | |
1711 | { | |
1e59de90 TL |
1712 | ceph_assert_always(has_socket || state == state_t::CONNECTING); |
1713 | ceph_assert_always(!mover.socket->is_shutdown()); | |
1714 | trigger_state(state_t::REPLACING, io_state_t::delay, false); | |
1715 | if (is_socket_valid) { | |
1716 | frame_assembler->shutdown_socket(); | |
1717 | is_socket_valid = false; | |
9f95a23c | 1718 | } |
1e59de90 TL |
1719 | gate.dispatch_in_background( |
1720 | "trigger_replacing", | |
1721 | conn, | |
1722 | [this, | |
1723 | reconnect, | |
1724 | do_reset, | |
1725 | mover = std::move(mover), | |
1726 | new_auth_meta = std::move(new_auth_meta), | |
1727 | new_client_cookie, new_peer_name, | |
1728 | new_conn_features, new_peer_supported_features, | |
1729 | new_peer_global_seq, | |
1730 | new_connect_seq, new_msg_seq] () mutable { | |
1731 | ceph_assert_always(state == state_t::REPLACING); | |
1732 | io_handler.dispatch_accept(); | |
1733 | // state may become CLOSING, close mover.socket and abort later | |
1734 | return wait_exit_io( | |
1735 | ).then([this] { | |
1736 | ceph_assert_always(frame_assembler); | |
9f95a23c | 1737 | protocol_timer.cancel(); |
1e59de90 TL |
1738 | auto done = std::move(execution_done); |
1739 | execution_done = seastar::now(); | |
1740 | return done; | |
9f95a23c TL |
1741 | }).then([this, |
1742 | reconnect, | |
1e59de90 TL |
1743 | do_reset, |
1744 | mover = std::move(mover), | |
9f95a23c | 1745 | new_auth_meta = std::move(new_auth_meta), |
9f95a23c | 1746 | new_client_cookie, new_peer_name, |
1e59de90 TL |
1747 | new_conn_features, new_peer_supported_features, |
1748 | new_peer_global_seq, | |
9f95a23c | 1749 | new_connect_seq, new_msg_seq] () mutable { |
1e59de90 TL |
1750 | if (state == state_t::REPLACING && do_reset) { |
1751 | reset_session(true); | |
1752 | } | |
1753 | ||
9f95a23c | 1754 | if (unlikely(state != state_t::REPLACING)) { |
1e59de90 TL |
1755 | return mover.socket->close( |
1756 | ).then([sock = std::move(mover.socket)] { | |
9f95a23c TL |
1757 | abort_protocol(); |
1758 | }); | |
1759 | } | |
1760 | ||
9f95a23c | 1761 | auth_meta = std::move(new_auth_meta); |
9f95a23c | 1762 | peer_global_seq = new_peer_global_seq; |
1e59de90 TL |
1763 | gate.dispatch_in_background( |
1764 | "replace_frame_assembler", | |
1765 | conn, | |
1766 | [this, mover=std::move(mover)]() mutable { | |
1767 | return frame_assembler->replace_by(std::move(mover)); | |
1768 | } | |
1769 | ); | |
1770 | is_socket_valid = true; | |
1771 | has_socket = true; | |
9f95a23c TL |
1772 | |
1773 | if (reconnect) { | |
1774 | connect_seq = new_connect_seq; | |
1775 | // send_reconnect_ok() logic | |
1e59de90 TL |
1776 | io_handler.requeue_out_sent_up_to(new_msg_seq); |
1777 | auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq()); | |
1778 | logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq()); | |
1779 | return frame_assembler->write_flush_frame(reconnect_ok); | |
9f95a23c TL |
1780 | } else { |
1781 | client_cookie = new_client_cookie; | |
f67539c2 TL |
1782 | assert(conn.get_peer_type() == new_peer_name.type()); |
1783 | if (conn.get_peer_id() == entity_name_t::NEW) { | |
1784 | conn.set_peer_id(new_peer_name.num()); | |
1785 | } | |
1e59de90 TL |
1786 | conn.set_features(new_conn_features); |
1787 | peer_supported_features = new_peer_supported_features; | |
1788 | bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); | |
1789 | frame_assembler->set_is_rev1(is_rev1); | |
9f95a23c TL |
1790 | return send_server_ident(); |
1791 | } | |
1792 | }).then([this, reconnect] { | |
1793 | if (unlikely(state != state_t::REPLACING)) { | |
1794 | logger().debug("{} triggered {} at the end of trigger_replacing()", | |
1795 | conn, get_state_name(state)); | |
1796 | abort_protocol(); | |
1797 | } | |
1e59de90 TL |
1798 | logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, " |
1799 | "client_cookie={}, server_cookie={}, {}", | |
9f95a23c | 1800 | conn, reconnect ? "reconnected" : "connected", |
1e59de90 TL |
1801 | global_seq, peer_global_seq, connect_seq, |
1802 | client_cookie, server_cookie, | |
1803 | io_stat_printer{io_handler}); | |
1804 | execute_ready(); | |
1805 | }).handle_exception([this](std::exception_ptr eptr) { | |
1806 | fault(state_t::REPLACING, "trigger_replacing", eptr); | |
9f95a23c TL |
1807 | }); |
1808 | }); | |
1809 | } | |
1810 | ||
1811 | // READY state | |
1812 | ||
1e59de90 | 1813 | void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr) |
9f95a23c | 1814 | { |
1e59de90 | 1815 | fault(state_t::READY, where, eptr); |
9f95a23c TL |
1816 | } |
1817 | ||
1e59de90 | 1818 | void ProtocolV2::execute_ready() |
9f95a23c TL |
1819 | { |
1820 | assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); | |
1e59de90 TL |
1821 | protocol_timer.cancel(); |
1822 | ceph_assert_always(is_socket_valid); | |
1823 | trigger_state(state_t::READY, io_state_t::open, false); | |
9f95a23c TL |
1824 | } |
1825 | ||
1826 | // STANDBY state | |
1827 | ||
1828 | void ProtocolV2::execute_standby() | |
1829 | { | |
1e59de90 TL |
1830 | ceph_assert_always(!is_socket_valid); |
1831 | trigger_state(state_t::STANDBY, io_state_t::delay, false); | |
9f95a23c TL |
1832 | } |
1833 | ||
1e59de90 | 1834 | void ProtocolV2::notify_out() |
9f95a23c TL |
1835 | { |
1836 | if (unlikely(state == state_t::STANDBY && !conn.policy.server)) { | |
1e59de90 | 1837 | logger().info("{} notify_out(): at {}, going to CONNECTING", |
9f95a23c TL |
1838 | conn, get_state_name(state)); |
1839 | execute_connecting(); | |
1840 | } | |
1841 | } | |
1842 | ||
1843 | // WAIT state | |
1844 | ||
1845 | void ProtocolV2::execute_wait(bool max_backoff) | |
1846 | { | |
1e59de90 TL |
1847 | ceph_assert_always(!is_socket_valid); |
1848 | trigger_state(state_t::WAIT, io_state_t::delay, false); | |
1849 | gated_execute("execute_wait", conn, [this, max_backoff] { | |
9f95a23c TL |
1850 | double backoff = protocol_timer.last_dur(); |
1851 | if (max_backoff) { | |
f67539c2 | 1852 | backoff = local_conf().get_val<double>("ms_max_backoff"); |
9f95a23c | 1853 | } else if (backoff > 0) { |
f67539c2 | 1854 | backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff); |
9f95a23c | 1855 | } else { |
f67539c2 | 1856 | backoff = local_conf().get_val<double>("ms_initial_backoff"); |
9f95a23c TL |
1857 | } |
1858 | return protocol_timer.backoff(backoff).then([this] { | |
1859 | if (unlikely(state != state_t::WAIT)) { | |
1860 | logger().debug("{} triggered {} at the end of execute_wait()", | |
1861 | conn, get_state_name(state)); | |
1862 | abort_protocol(); | |
1863 | } | |
1864 | logger().info("{} execute_wait(): going to CONNECTING", conn); | |
1865 | execute_connecting(); | |
1e59de90 TL |
1866 | }).handle_exception([this](std::exception_ptr eptr) { |
1867 | const char *e_what; | |
1868 | try { | |
1869 | std::rethrow_exception(eptr); | |
1870 | } catch (std::exception &e) { | |
1871 | e_what = e.what(); | |
1872 | } | |
9f95a23c | 1873 | logger().info("{} execute_wait(): protocol aborted at {} -- {}", |
1e59de90 | 1874 | conn, get_state_name(state), e_what); |
9f95a23c TL |
1875 | assert(state == state_t::REPLACING || |
1876 | state == state_t::CLOSING); | |
1877 | }); | |
1878 | }); | |
1879 | } | |
1880 | ||
1881 | // SERVER_WAIT state | |
1882 | ||
1883 | void ProtocolV2::execute_server_wait() | |
1884 | { | |
1e59de90 TL |
1885 | ceph_assert_always(is_socket_valid); |
1886 | trigger_state(state_t::SERVER_WAIT, io_state_t::none, false); | |
1887 | gated_execute("execute_server_wait", conn, [this] { | |
1888 | return frame_assembler->read_exactly(1 | |
1889 | ).then([this](auto bl) { | |
9f95a23c TL |
1890 | logger().warn("{} SERVER_WAIT got read, abort", conn); |
1891 | abort_in_fault(); | |
1e59de90 TL |
1892 | }).handle_exception([this](std::exception_ptr eptr) { |
1893 | const char *e_what; | |
1894 | try { | |
1895 | std::rethrow_exception(eptr); | |
1896 | } catch (std::exception &e) { | |
1897 | e_what = e.what(); | |
1898 | } | |
9f95a23c | 1899 | logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}", |
1e59de90 TL |
1900 | conn, get_state_name(state), e_what); |
1901 | do_close(false); | |
9f95a23c TL |
1902 | }); |
1903 | }); | |
1904 | } | |
1905 | ||
1906 | // CLOSING state | |
1907 | ||
1e59de90 | 1908 | void ProtocolV2::notify_mark_down() |
9f95a23c | 1909 | { |
1e59de90 TL |
1910 | do_close(false); |
1911 | } | |
1912 | ||
1913 | seastar::future<> ProtocolV2::close_clean_yielded() | |
1914 | { | |
1915 | // yield() so that do_close() can be called *after* close_clean_yielded() is | |
1916 | // applied to all connections in a container using | |
1917 | // seastar::parallel_for_each(). otherwise, we could erase a connection in | |
1918 | // the container when seastar::parallel_for_each() is still iterating in it. | |
1919 | // that'd lead to a segfault. | |
1920 | return seastar::yield( | |
1921 | ).then([this, conn_ref = conn.shared_from_this()] { | |
1922 | do_close(false); | |
1923 | // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset() | |
1924 | // which will otherwise result in deadlock | |
1925 | assert(closed_clean_fut.valid()); | |
1926 | return closed_clean_fut.get_future(); | |
1927 | }); | |
1928 | } | |
1929 | ||
1930 | void ProtocolV2::do_close( | |
1931 | bool is_dispatch_reset, | |
1932 | std::optional<std::function<void()>> f_accept_new) | |
1933 | { | |
1934 | if (closed) { | |
1935 | // already closing | |
1936 | assert(state == state_t::CLOSING); | |
1937 | return; | |
1938 | } | |
1939 | ||
1940 | bool is_replace = f_accept_new ? true : false; | |
1941 | logger().info("{} closing: reset {}, replace {}", conn, | |
1942 | is_dispatch_reset ? "yes" : "no", | |
1943 | is_replace ? "yes" : "no"); | |
1944 | ||
1945 | /* | |
1946 | * atomic operations | |
1947 | */ | |
1948 | ||
1949 | closed = true; | |
1950 | ||
1951 | // trigger close | |
f67539c2 TL |
1952 | messenger.closing_conn( |
1953 | seastar::static_pointer_cast<SocketConnection>( | |
1954 | conn.shared_from_this())); | |
9f95a23c TL |
1955 | if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) { |
1956 | messenger.unaccept_conn( | |
1957 | seastar::static_pointer_cast<SocketConnection>( | |
1958 | conn.shared_from_this())); | |
1959 | } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) { | |
1960 | messenger.unregister_conn( | |
1961 | seastar::static_pointer_cast<SocketConnection>( | |
1962 | conn.shared_from_this())); | |
1963 | } else { | |
1964 | // cannot happen | |
1965 | ceph_assert(false); | |
1966 | } | |
9f95a23c | 1967 | protocol_timer.cancel(); |
1e59de90 | 1968 | trigger_state(state_t::CLOSING, io_state_t::drop, false); |
f67539c2 | 1969 | |
1e59de90 TL |
1970 | if (f_accept_new) { |
1971 | (*f_accept_new)(); | |
1972 | } | |
1973 | if (is_socket_valid) { | |
1974 | frame_assembler->shutdown_socket(); | |
1975 | is_socket_valid = false; | |
1976 | } | |
1977 | assert(!gate.is_closed()); | |
1978 | auto handshake_closed = gate.close(); | |
1979 | auto io_closed = io_handler.close_io( | |
1980 | is_dispatch_reset, is_replace); | |
1981 | ||
1982 | // asynchronous operations | |
1983 | assert(!closed_clean_fut.valid()); | |
1984 | closed_clean_fut = seastar::when_all( | |
1985 | std::move(handshake_closed), std::move(io_closed) | |
1986 | ).discard_result().then([this] { | |
1987 | ceph_assert_always(!exit_io.has_value()); | |
1988 | if (has_socket) { | |
1989 | ceph_assert_always(frame_assembler); | |
1990 | return frame_assembler->close_shutdown_socket(); | |
1991 | } else { | |
1992 | return seastar::now(); | |
1993 | } | |
1994 | }).then([this] { | |
1995 | logger().debug("{} closed!", conn); | |
1996 | messenger.closed_conn( | |
1997 | seastar::static_pointer_cast<SocketConnection>( | |
1998 | conn.shared_from_this())); | |
1999 | #ifdef UNIT_TESTS_BUILT | |
2000 | closed_clean = true; | |
2001 | if (conn.interceptor) { | |
2002 | conn.interceptor->register_conn_closed(conn); | |
2003 | } | |
2004 | #endif | |
2005 | }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { | |
2006 | logger().error("{} closing: closed_clean_fut got unexpected exception {}", | |
2007 | conn, eptr); | |
2008 | ceph_abort(); | |
2009 | }); | |
9f95a23c TL |
2010 | } |
2011 | ||
2012 | } // namespace crimson::net |