]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "ProtocolV2.h" | |
5 | ||
6 | #include <seastar/core/lowres_clock.hh> | |
7 | #include <fmt/format.h> | |
9f95a23c TL |
8 | #include "include/msgr.h" |
9 | #include "include/random.h" | |
10 | ||
11 | #include "crimson/auth/AuthClient.h" | |
12 | #include "crimson/auth/AuthServer.h" | |
f67539c2 | 13 | #include "crimson/common/formatter.h" |
9f95a23c | 14 | |
f67539c2 | 15 | #include "chained_dispatchers.h" |
9f95a23c TL |
16 | #include "Errors.h" |
17 | #include "Socket.h" | |
18 | #include "SocketConnection.h" | |
19 | #include "SocketMessenger.h" | |
20 | ||
21 | #ifdef UNIT_TESTS_BUILT | |
22 | #include "Interceptor.h" | |
23 | #endif | |
24 | ||
25 | using namespace ceph::msgr::v2; | |
f67539c2 | 26 | using crimson::common::local_conf; |
9f95a23c TL |
27 | |
28 | namespace { | |
29 | ||
20effc67 TL |
30 | // TODO: CEPH_MSGR2_FEATURE_COMPRESSION |
31 | const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES = | |
32 | (CEPH_MSGR2_FEATURE_REVISION_1 | | |
33 | // CEPH_MSGR2_FEATURE_COMPRESSION | | |
34 | UINT64_C(0)); | |
35 | ||
9f95a23c TL |
36 | // Log levels in V2 Protocol: |
37 | // * error level, something error that cause connection to terminate: | |
38 | // - fatal errors; | |
39 | // - bugs; | |
40 | // * warn level: something unusual that identifies connection fault or replacement: | |
41 | // - unstable network; | |
42 | // - incompatible peer; | |
43 | // - auth failure; | |
44 | // - connection race; | |
45 | // - connection reset; | |
46 | // * info level, something very important to show connection lifecycle, | |
47 | // which doesn't happen very frequently; | |
48 | // * debug level, important logs for debugging, including: | |
49 | // - all the messages sent/received (-->/<==); | |
50 | // - all the frames exchanged (WRITE/GOT); | |
51 | // - important fields updated (UPDATE); | |
52 | // - connection state transitions (TRIGGER); | |
53 | // * trace level, trivial logs showing: | |
54 | // - the exact bytes being sent/received (SEND/RECV(bytes)); | |
55 | // - detailed information of sub-frames; | |
56 | // - integrity checks; | |
57 | // - etc. | |
58 | seastar::logger& logger() { | |
59 | return crimson::get_logger(ceph_subsys_ms); | |
60 | } | |
61 | ||
f67539c2 | 62 | [[noreturn]] void abort_in_fault() { |
9f95a23c TL |
63 | throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); |
64 | } | |
65 | ||
f67539c2 | 66 | [[noreturn]] void abort_protocol() { |
9f95a23c TL |
67 | throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); |
68 | } | |
69 | ||
f67539c2 TL |
70 | [[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) { |
71 | proto.close(dispatch_reset); | |
9f95a23c TL |
72 | abort_protocol(); |
73 | } | |
74 | ||
75 | inline void expect_tag(const Tag& expected, | |
76 | const Tag& actual, | |
77 | crimson::net::SocketConnection& conn, | |
78 | const char *where) { | |
79 | if (actual != expected) { | |
80 | logger().warn("{} {} received wrong tag: {}, expected {}", | |
81 | conn, where, | |
82 | static_cast<uint32_t>(actual), | |
83 | static_cast<uint32_t>(expected)); | |
84 | abort_in_fault(); | |
85 | } | |
86 | } | |
87 | ||
88 | inline void unexpected_tag(const Tag& unexpected, | |
89 | crimson::net::SocketConnection& conn, | |
90 | const char *where) { | |
91 | logger().warn("{} {} received unexpected tag: {}", | |
92 | conn, where, static_cast<uint32_t>(unexpected)); | |
93 | abort_in_fault(); | |
94 | } | |
95 | ||
96 | inline uint64_t generate_client_cookie() { | |
97 | return ceph::util::generate_random_number<uint64_t>( | |
98 | 1, std::numeric_limits<uint64_t>::max()); | |
99 | } | |
100 | ||
101 | } // namespace anonymous | |
102 | ||
9f95a23c TL |
103 | namespace crimson::net { |
104 | ||
105 | #ifdef UNIT_TESTS_BUILT | |
106 | void intercept(Breakpoint bp, bp_type_t type, | |
107 | SocketConnection& conn, SocketRef& socket) { | |
108 | if (conn.interceptor) { | |
109 | auto action = conn.interceptor->intercept(conn, Breakpoint(bp)); | |
110 | socket->set_trap(type, action, &conn.interceptor->blocker); | |
111 | } | |
112 | } | |
113 | ||
114 | #define INTERCEPT_CUSTOM(bp, type) \ | |
115 | intercept({bp}, type, conn, socket) | |
116 | ||
117 | #define INTERCEPT_FRAME(tag, type) \ | |
118 | intercept({static_cast<Tag>(tag), type}, \ | |
119 | type, conn, socket) | |
120 | ||
121 | #define INTERCEPT_N_RW(bp) \ | |
122 | if (conn.interceptor) { \ | |
123 | auto action = conn.interceptor->intercept(conn, {bp}); \ | |
124 | ceph_assert(action != bp_action_t::BLOCK); \ | |
125 | if (action == bp_action_t::FAULT) { \ | |
126 | abort_in_fault(); \ | |
127 | } \ | |
128 | } | |
129 | ||
130 | #else | |
131 | #define INTERCEPT_CUSTOM(bp, type) | |
132 | #define INTERCEPT_FRAME(tag, type) | |
133 | #define INTERCEPT_N_RW(bp) | |
134 | #endif | |
135 | ||
136 | seastar::future<> ProtocolV2::Timer::backoff(double seconds) | |
137 | { | |
138 | logger().warn("{} waiting {} seconds ...", conn, seconds); | |
139 | cancel(); | |
140 | last_dur_ = seconds; | |
141 | as = seastar::abort_source(); | |
142 | auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>( | |
143 | std::chrono::duration<double>(seconds)); | |
144 | return seastar::sleep_abortable(dur, *as | |
145 | ).handle_exception_type([this] (const seastar::sleep_aborted& e) { | |
146 | logger().debug("{} wait aborted", conn); | |
147 | abort_protocol(); | |
148 | }); | |
149 | } | |
150 | ||
f67539c2 | 151 | ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers, |
9f95a23c TL |
152 | SocketConnection& conn, |
153 | SocketMessenger& messenger) | |
f67539c2 | 154 | : Protocol(proto_t::v2, dispatchers, conn), |
9f95a23c | 155 | messenger{messenger}, |
f67539c2 | 156 | protocol_timer{conn} |
9f95a23c TL |
157 | {} |
158 | ||
159 | ProtocolV2::~ProtocolV2() {} | |
160 | ||
f67539c2 TL |
161 | bool ProtocolV2::is_connected() const { |
162 | return state == state_t::READY || | |
163 | state == state_t::ESTABLISHING || | |
164 | state == state_t::REPLACING; | |
165 | } | |
166 | ||
9f95a23c | 167 | void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, |
f67539c2 | 168 | const entity_name_t& _peer_name) |
9f95a23c TL |
169 | { |
170 | ceph_assert(state == state_t::NONE); | |
171 | ceph_assert(!socket); | |
f67539c2 | 172 | ceph_assert(!gate.is_closed()); |
9f95a23c TL |
173 | conn.peer_addr = _peer_addr; |
174 | conn.target_addr = _peer_addr; | |
f67539c2 TL |
175 | conn.set_peer_name(_peer_name); |
176 | conn.policy = messenger.get_policy(_peer_name.type()); | |
9f95a23c | 177 | client_cookie = generate_client_cookie(); |
f67539c2 | 178 | logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}" |
9f95a23c | 179 | " policy(lossy={}, server={}, standby={}, resetcheck={})", |
f67539c2 | 180 | conn, _peer_addr, _peer_name, client_cookie, |
9f95a23c TL |
181 | conn.policy.lossy, conn.policy.server, |
182 | conn.policy.standby, conn.policy.resetcheck); | |
183 | messenger.register_conn( | |
184 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); | |
185 | execute_connecting(); | |
186 | } | |
187 | ||
188 | void ProtocolV2::start_accept(SocketRef&& sock, | |
189 | const entity_addr_t& _peer_addr) | |
190 | { | |
191 | ceph_assert(state == state_t::NONE); | |
192 | ceph_assert(!socket); | |
193 | // until we know better | |
194 | conn.target_addr = _peer_addr; | |
9f95a23c TL |
195 | socket = std::move(sock); |
196 | logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); | |
197 | messenger.accept_conn( | |
198 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); | |
199 | execute_accepting(); | |
200 | } | |
201 | ||
202 | // TODO: Frame related implementations, probably to a separate class. | |
203 | ||
204 | void ProtocolV2::enable_recording() | |
205 | { | |
206 | rxbuf.clear(); | |
207 | txbuf.clear(); | |
208 | record_io = true; | |
209 | } | |
210 | ||
211 | seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes) | |
212 | { | |
213 | if (unlikely(record_io)) { | |
214 | return socket->read_exactly(bytes) | |
215 | .then([this] (auto bl) { | |
216 | rxbuf.append(buffer::create(bl.share())); | |
217 | return bl; | |
218 | }); | |
219 | } else { | |
220 | return socket->read_exactly(bytes); | |
221 | }; | |
222 | } | |
223 | ||
224 | seastar::future<bufferlist> ProtocolV2::read(size_t bytes) | |
225 | { | |
226 | if (unlikely(record_io)) { | |
227 | return socket->read(bytes) | |
228 | .then([this] (auto buf) { | |
229 | rxbuf.append(buf); | |
230 | return buf; | |
231 | }); | |
232 | } else { | |
233 | return socket->read(bytes); | |
234 | } | |
235 | } | |
236 | ||
237 | seastar::future<> ProtocolV2::write(bufferlist&& buf) | |
238 | { | |
239 | if (unlikely(record_io)) { | |
240 | txbuf.append(buf); | |
241 | } | |
242 | return socket->write(std::move(buf)); | |
243 | } | |
244 | ||
245 | seastar::future<> ProtocolV2::write_flush(bufferlist&& buf) | |
246 | { | |
247 | if (unlikely(record_io)) { | |
248 | txbuf.append(buf); | |
249 | } | |
250 | return socket->write_flush(std::move(buf)); | |
251 | } | |
252 | ||
253 | size_t ProtocolV2::get_current_msg_size() const | |
254 | { | |
f67539c2 | 255 | ceph_assert(rx_frame_asm.get_num_segments() > 0); |
9f95a23c TL |
256 | size_t sum = 0; |
257 | // we don't include SegmentIndex::Msg::HEADER. | |
f67539c2 TL |
258 | for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { |
259 | sum += rx_frame_asm.get_segment_logical_len(idx); | |
9f95a23c TL |
260 | } |
261 | return sum; | |
262 | } | |
263 | ||
264 | seastar::future<Tag> ProtocolV2::read_main_preamble() | |
265 | { | |
f67539c2 TL |
266 | rx_preamble.clear(); |
267 | return read_exactly(rx_frame_asm.get_preamble_onwire_len()) | |
9f95a23c | 268 | .then([this] (auto bl) { |
9f95a23c | 269 | rx_segments_data.clear(); |
f67539c2 TL |
270 | try { |
271 | rx_preamble.append(buffer::create(std::move(bl))); | |
272 | const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble); | |
273 | INTERCEPT_FRAME(tag, bp_type_t::READ); | |
274 | return tag; | |
275 | } catch (FrameError& e) { | |
276 | logger().warn("{} read_main_preamble: {}", conn, e.what()); | |
277 | abort_in_fault(); | |
9f95a23c | 278 | } |
9f95a23c TL |
279 | }); |
280 | } | |
281 | ||
282 | seastar::future<> ProtocolV2::read_frame_payload() | |
283 | { | |
9f95a23c TL |
284 | ceph_assert(rx_segments_data.empty()); |
285 | ||
286 | return seastar::do_until( | |
f67539c2 | 287 | [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); }, |
9f95a23c | 288 | [this] { |
9f95a23c | 289 | // TODO: create aligned and contiguous buffer from socket |
f67539c2 TL |
290 | const size_t seg_idx = rx_segments_data.size(); |
291 | if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx); | |
292 | alignment != segment_t::DEFAULT_ALIGNMENT) { | |
9f95a23c | 293 | logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}", |
f67539c2 | 294 | conn, alignment, rx_segments_data.size()); |
9f95a23c | 295 | } |
f67539c2 | 296 | uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); |
9f95a23c | 297 | // TODO: create aligned and contiguous buffer from socket |
f67539c2 | 298 | return read_exactly(onwire_len).then([this] (auto tmp_bl) { |
9f95a23c TL |
299 | logger().trace("{} RECV({}) frame segment[{}]", |
300 | conn, tmp_bl.size(), rx_segments_data.size()); | |
f67539c2 TL |
301 | bufferlist segment; |
302 | segment.append(buffer::create(std::move(tmp_bl))); | |
303 | rx_segments_data.emplace_back(std::move(segment)); | |
9f95a23c TL |
304 | }); |
305 | } | |
306 | ).then([this] { | |
f67539c2 | 307 | return read_exactly(rx_frame_asm.get_epilogue_onwire_len()); |
9f95a23c TL |
308 | }).then([this] (auto bl) { |
309 | logger().trace("{} RECV({}) frame epilogue", conn, bl.size()); | |
f67539c2 TL |
310 | bool ok = false; |
311 | try { | |
f67539c2 TL |
312 | bufferlist rx_epilogue; |
313 | rx_epilogue.append(buffer::create(std::move(bl))); | |
20effc67 | 314 | ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue); |
f67539c2 TL |
315 | } catch (FrameError& e) { |
316 | logger().error("read_frame_payload: {} {}", conn, e.what()); | |
317 | abort_in_fault(); | |
318 | } catch (ceph::crypto::onwire::MsgAuthError&) { | |
319 | logger().error("read_frame_payload: {} bad auth tag", conn); | |
320 | abort_in_fault(); | |
9f95a23c | 321 | } |
9f95a23c TL |
322 | // we do have a mechanism that allows transmitter to start sending message |
323 | // and abort after putting entire data field on wire. This will be used by | |
324 | // the kernel client to avoid unnecessary buffering. | |
f67539c2 | 325 | if (!ok) { |
9f95a23c TL |
326 | // TODO |
327 | ceph_assert(false); | |
328 | } | |
329 | }); | |
330 | } | |
331 | ||
332 | template <class F> | |
333 | seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) | |
334 | { | |
f6b5b4d7 | 335 | auto bl = frame.get_buffer(tx_frame_asm); |
9f95a23c TL |
336 | const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str()); |
337 | logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", | |
338 | conn, bl.length(), (int)main_preamble->tag, | |
339 | (int)main_preamble->num_segments, main_preamble->crc); | |
340 | INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE); | |
341 | if (flush) { | |
342 | return write_flush(std::move(bl)); | |
343 | } else { | |
344 | return write(std::move(bl)); | |
345 | } | |
346 | } | |
347 | ||
348 | void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant) | |
349 | { | |
350 | if (!reentrant && _state == state) { | |
351 | logger().error("{} is not allowed to re-trigger state {}", | |
352 | conn, get_state_name(state)); | |
353 | ceph_assert(false); | |
354 | } | |
355 | logger().debug("{} TRIGGER {}, was {}", | |
356 | conn, get_state_name(_state), get_state_name(state)); | |
357 | state = _state; | |
358 | set_write_state(_write_state); | |
359 | } | |
360 | ||
361 | void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr) | |
362 | { | |
363 | if (conn.policy.lossy) { | |
364 | logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}", | |
365 | conn, func_name, get_state_name(state), eptr); | |
f67539c2 | 366 | close(true); |
9f95a23c TL |
367 | } else if (conn.policy.server || |
368 | (conn.policy.standby && | |
369 | (!is_queued() && conn.sent.empty()))) { | |
370 | logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}", | |
371 | conn, func_name, get_state_name(state), eptr); | |
372 | execute_standby(); | |
373 | } else if (backoff) { | |
374 | logger().info("{} {}: fault at {}, going to WAIT -- {}", | |
375 | conn, func_name, get_state_name(state), eptr); | |
376 | execute_wait(false); | |
377 | } else { | |
378 | logger().info("{} {}: fault at {}, going to CONNECTING -- {}", | |
379 | conn, func_name, get_state_name(state), eptr); | |
380 | execute_connecting(); | |
381 | } | |
382 | } | |
383 | ||
9f95a23c TL |
384 | void ProtocolV2::reset_session(bool full) |
385 | { | |
386 | server_cookie = 0; | |
387 | connect_seq = 0; | |
388 | conn.in_seq = 0; | |
389 | if (full) { | |
390 | client_cookie = generate_client_cookie(); | |
391 | peer_global_seq = 0; | |
392 | reset_write(); | |
f67539c2 TL |
393 | dispatchers.ms_handle_remote_reset( |
394 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); | |
9f95a23c TL |
395 | } |
396 | } | |
397 | ||
f67539c2 TL |
398 | seastar::future<std::tuple<entity_type_t, entity_addr_t>> |
399 | ProtocolV2::banner_exchange(bool is_connect) | |
9f95a23c TL |
400 | { |
401 | // 1. prepare and send banner | |
402 | bufferlist banner_payload; | |
20effc67 | 403 | encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0); |
9f95a23c TL |
404 | encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0); |
405 | ||
406 | bufferlist bl; | |
407 | bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX)); | |
408 | auto len_payload = static_cast<uint16_t>(banner_payload.length()); | |
409 | encode(len_payload, bl, 0); | |
410 | bl.claim_append(banner_payload); | |
411 | logger().debug("{} SEND({}) banner: len_payload={}, supported={}, " | |
412 | "required={}, banner=\"{}\"", | |
413 | conn, bl.length(), len_payload, | |
20effc67 TL |
414 | CRIMSON_MSGR2_SUPPORTED_FEATURES, |
415 | CEPH_MSGR2_REQUIRED_FEATURES, | |
9f95a23c TL |
416 | CEPH_BANNER_V2_PREFIX); |
417 | INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE); | |
418 | return write_flush(std::move(bl)).then([this] { | |
419 | // 2. read peer banner | |
420 | unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); | |
421 | INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ); | |
422 | return read_exactly(banner_len); // or read exactly? | |
423 | }).then([this] (auto bl) { | |
424 | // 3. process peer banner and read banner_payload | |
425 | unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); | |
426 | logger().debug("{} RECV({}) banner: \"{}\"", | |
427 | conn, bl.size(), | |
428 | std::string((const char*)bl.get(), banner_prefix_len)); | |
429 | ||
430 | if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { | |
431 | if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { | |
432 | logger().warn("{} peer is using V1 protocol", conn); | |
433 | } else { | |
434 | logger().warn("{} peer sent bad banner", conn); | |
435 | } | |
436 | abort_in_fault(); | |
437 | } | |
438 | bl.trim_front(banner_prefix_len); | |
439 | ||
440 | uint16_t payload_len; | |
441 | bufferlist buf; | |
442 | buf.append(buffer::create(std::move(bl))); | |
443 | auto ti = buf.cbegin(); | |
444 | try { | |
445 | decode(payload_len, ti); | |
446 | } catch (const buffer::error &e) { | |
447 | logger().warn("{} decode banner payload len failed", conn); | |
448 | abort_in_fault(); | |
449 | } | |
450 | logger().debug("{} GOT banner: payload_len={}", conn, payload_len); | |
451 | INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); | |
452 | return read(payload_len); | |
f67539c2 | 453 | }).then([this, is_connect] (bufferlist bl) { |
9f95a23c TL |
454 | // 4. process peer banner_payload and send HelloFrame |
455 | auto p = bl.cbegin(); | |
456 | uint64_t peer_supported_features; | |
457 | uint64_t peer_required_features; | |
458 | try { | |
459 | decode(peer_supported_features, p); | |
460 | decode(peer_required_features, p); | |
461 | } catch (const buffer::error &e) { | |
462 | logger().warn("{} decode banner payload failed", conn); | |
463 | abort_in_fault(); | |
464 | } | |
465 | logger().debug("{} RECV({}) banner features: supported={} required={}", | |
466 | conn, bl.length(), | |
467 | peer_supported_features, peer_required_features); | |
468 | ||
469 | // Check feature bit compatibility | |
20effc67 | 470 | uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES; |
9f95a23c TL |
471 | uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES; |
472 | if ((required_features & peer_supported_features) != required_features) { | |
473 | logger().error("{} peer does not support all required features" | |
474 | " required={} peer_supported={}", | |
475 | conn, required_features, peer_supported_features); | |
f67539c2 | 476 | abort_in_close(*this, is_connect); |
9f95a23c TL |
477 | } |
478 | if ((supported_features & peer_required_features) != peer_required_features) { | |
479 | logger().error("{} we do not support all peer required features" | |
480 | " peer_required={} supported={}", | |
481 | conn, peer_required_features, supported_features); | |
f67539c2 | 482 | abort_in_close(*this, is_connect); |
9f95a23c TL |
483 | } |
484 | this->peer_required_features = peer_required_features; | |
485 | if (this->peer_required_features == 0) { | |
486 | this->connection_features = msgr2_required; | |
487 | } | |
f67539c2 TL |
488 | const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); |
489 | tx_frame_asm.set_is_rev1(is_rev1); | |
490 | rx_frame_asm.set_is_rev1(is_rev1); | |
9f95a23c TL |
491 | |
492 | auto hello = HelloFrame::Encode(messenger.get_mytype(), | |
493 | conn.target_addr); | |
494 | logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", | |
495 | conn, ceph_entity_type_name(messenger.get_mytype()), | |
496 | conn.target_addr); | |
497 | return write_frame(hello); | |
498 | }).then([this] { | |
499 | //5. read peer HelloFrame | |
500 | return read_main_preamble(); | |
501 | }).then([this] (Tag tag) { | |
502 | expect_tag(Tag::HELLO, tag, conn, __func__); | |
503 | return read_frame_payload(); | |
504 | }).then([this] { | |
505 | // 6. process peer HelloFrame | |
506 | auto hello = HelloFrame::Decode(rx_segments_data.back()); | |
507 | logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}", | |
508 | conn, ceph_entity_type_name(hello.entity_type()), | |
509 | hello.peer_addr()); | |
f67539c2 TL |
510 | return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>( |
511 | std::make_tuple(hello.entity_type(), hello.peer_addr())); | |
9f95a23c TL |
512 | }); |
513 | } | |
514 | ||
515 | // CONNECTING state | |
516 | ||
517 | seastar::future<> ProtocolV2::handle_auth_reply() | |
518 | { | |
519 | return read_main_preamble() | |
520 | .then([this] (Tag tag) { | |
521 | switch (tag) { | |
522 | case Tag::AUTH_BAD_METHOD: | |
523 | return read_frame_payload().then([this] { | |
524 | // handle_auth_bad_method() logic | |
525 | auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back()); | |
526 | logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, " | |
527 | "allowed_methods={}, allowed_modes={}", | |
528 | conn, bad_method.method(), cpp_strerror(bad_method.result()), | |
529 | bad_method.allowed_methods(), bad_method.allowed_modes()); | |
530 | ceph_assert(messenger.get_auth_client()); | |
531 | int r = messenger.get_auth_client()->handle_auth_bad_method( | |
532 | conn.shared_from_this(), auth_meta, | |
533 | bad_method.method(), bad_method.result(), | |
534 | bad_method.allowed_methods(), bad_method.allowed_modes()); | |
535 | if (r < 0) { | |
536 | logger().warn("{} auth_client handle_auth_bad_method returned {}", | |
537 | conn, r); | |
538 | abort_in_fault(); | |
539 | } | |
540 | return client_auth(bad_method.allowed_methods()); | |
541 | }); | |
542 | case Tag::AUTH_REPLY_MORE: | |
543 | return read_frame_payload().then([this] { | |
544 | // handle_auth_reply_more() logic | |
545 | auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back()); | |
546 | logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}", | |
547 | conn, auth_more.auth_payload().length()); | |
548 | ceph_assert(messenger.get_auth_client()); | |
549 | // let execute_connecting() take care of the thrown exception | |
550 | auto reply = messenger.get_auth_client()->handle_auth_reply_more( | |
551 | conn.shared_from_this(), auth_meta, auth_more.auth_payload()); | |
552 | auto more_reply = AuthRequestMoreFrame::Encode(reply); | |
553 | logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}", | |
554 | conn, reply.length()); | |
555 | return write_frame(more_reply); | |
556 | }).then([this] { | |
557 | return handle_auth_reply(); | |
558 | }); | |
559 | case Tag::AUTH_DONE: | |
560 | return read_frame_payload().then([this] { | |
561 | // handle_auth_done() logic | |
562 | auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back()); | |
563 | logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}", | |
564 | conn, auth_done.global_id(), | |
565 | ceph_con_mode_name(auth_done.con_mode()), | |
566 | auth_done.auth_payload().length()); | |
567 | ceph_assert(messenger.get_auth_client()); | |
568 | int r = messenger.get_auth_client()->handle_auth_done( | |
569 | conn.shared_from_this(), auth_meta, | |
570 | auth_done.global_id(), | |
571 | auth_done.con_mode(), | |
572 | auth_done.auth_payload()); | |
573 | if (r < 0) { | |
574 | logger().warn("{} auth_client handle_auth_done returned {}", conn, r); | |
575 | abort_in_fault(); | |
576 | } | |
577 | auth_meta->con_mode = auth_done.con_mode(); | |
f67539c2 TL |
578 | session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( |
579 | nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false); | |
9f95a23c TL |
580 | return finish_auth(); |
581 | }); | |
582 | default: { | |
583 | unexpected_tag(tag, conn, __func__); | |
584 | return seastar::now(); | |
585 | } | |
586 | } | |
587 | }); | |
588 | } | |
589 | ||
590 | seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods) | |
591 | { | |
592 | // send_auth_request() logic | |
593 | ceph_assert(messenger.get_auth_client()); | |
594 | ||
595 | try { | |
596 | auto [auth_method, preferred_modes, bl] = | |
597 | messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta); | |
598 | auth_meta->auth_method = auth_method; | |
599 | auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl); | |
600 | logger().debug("{} WRITE AuthRequestFrame: method={}," | |
601 | " preferred_modes={}, payload_len={}", | |
602 | conn, auth_method, preferred_modes, bl.length()); | |
603 | return write_frame(frame).then([this] { | |
604 | return handle_auth_reply(); | |
605 | }); | |
606 | } catch (const crimson::auth::error& e) { | |
607 | logger().error("{} get_initial_auth_request returned {}", conn, e); | |
f67539c2 | 608 | abort_in_close(*this, true); |
9f95a23c TL |
609 | return seastar::now(); |
610 | } | |
611 | } | |
612 | ||
613 | seastar::future<ProtocolV2::next_step_t> | |
614 | ProtocolV2::process_wait() | |
615 | { | |
616 | return read_frame_payload().then([this] { | |
617 | // handle_wait() logic | |
618 | logger().debug("{} GOT WaitFrame", conn); | |
619 | WaitFrame::Decode(rx_segments_data.back()); | |
620 | return next_step_t::wait; | |
621 | }); | |
622 | } | |
623 | ||
624 | seastar::future<ProtocolV2::next_step_t> | |
625 | ProtocolV2::client_connect() | |
626 | { | |
627 | // send_client_ident() logic | |
628 | uint64_t flags = 0; | |
629 | if (conn.policy.lossy) { | |
630 | flags |= CEPH_MSG_CONNECT_LOSSY; | |
631 | } | |
632 | ||
633 | auto client_ident = ClientIdentFrame::Encode( | |
634 | messenger.get_myaddrs(), | |
635 | conn.target_addr, | |
636 | messenger.get_myname().num(), | |
637 | global_seq, | |
638 | conn.policy.features_supported, | |
639 | conn.policy.features_required | msgr2_required, flags, | |
640 | client_cookie); | |
641 | ||
642 | logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={}," | |
643 | " gs={}, features_supported={}, features_required={}," | |
644 | " flags={}, cookie={}", | |
645 | conn, messenger.get_myaddrs(), conn.target_addr, | |
646 | messenger.get_myname().num(), global_seq, | |
647 | conn.policy.features_supported, | |
648 | conn.policy.features_required | msgr2_required, | |
649 | flags, client_cookie); | |
650 | return write_frame(client_ident).then([this] { | |
651 | return read_main_preamble(); | |
652 | }).then([this] (Tag tag) { | |
653 | switch (tag) { | |
654 | case Tag::IDENT_MISSING_FEATURES: | |
655 | return read_frame_payload().then([this] { | |
656 | // handle_ident_missing_features() logic | |
657 | auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back()); | |
658 | logger().warn("{} GOT IdentMissingFeaturesFrame: features={}" | |
659 | " (client does not support all server features)", | |
660 | conn, ident_missing.features()); | |
661 | abort_in_fault(); | |
662 | return next_step_t::none; | |
663 | }); | |
664 | case Tag::WAIT: | |
665 | return process_wait(); | |
666 | case Tag::SERVER_IDENT: | |
667 | return read_frame_payload().then([this] { | |
668 | // handle_server_ident() logic | |
669 | requeue_sent(); | |
670 | auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back()); | |
671 | logger().debug("{} GOT ServerIdentFrame:" | |
672 | " addrs={}, gid={}, gs={}," | |
673 | " features_supported={}, features_required={}," | |
674 | " flags={}, cookie={}", | |
675 | conn, | |
676 | server_ident.addrs(), server_ident.gid(), | |
677 | server_ident.global_seq(), | |
678 | server_ident.supported_features(), | |
679 | server_ident.required_features(), | |
680 | server_ident.flags(), server_ident.cookie()); | |
681 | ||
682 | // is this who we intended to talk to? | |
683 | // be a bit forgiving here, since we may be connecting based on addresses parsed out | |
684 | // of mon_host or something. | |
685 | if (!server_ident.addrs().contains(conn.target_addr)) { | |
686 | logger().warn("{} peer identifies as {}, does not include {}", | |
687 | conn, server_ident.addrs(), conn.target_addr); | |
688 | throw std::system_error( | |
689 | make_error_code(crimson::net::error::bad_peer_address)); | |
690 | } | |
691 | ||
692 | server_cookie = server_ident.cookie(); | |
693 | ||
694 | // TODO: change peer_addr to entity_addrvec_t | |
695 | if (server_ident.addrs().front() != conn.peer_addr) { | |
696 | logger().warn("{} peer advertises as {}, does not match {}", | |
697 | conn, server_ident.addrs(), conn.peer_addr); | |
698 | throw std::system_error( | |
699 | make_error_code(crimson::net::error::bad_peer_address)); | |
700 | } | |
f67539c2 TL |
701 | if (conn.get_peer_id() != entity_name_t::NEW && |
702 | conn.get_peer_id() != server_ident.gid()) { | |
703 | logger().error("{} connection peer id ({}) does not match " | |
704 | "what it should be ({}) during connecting, close", | |
705 | conn, server_ident.gid(), conn.get_peer_id()); | |
706 | abort_in_close(*this, true); | |
707 | } | |
9f95a23c TL |
708 | conn.set_peer_id(server_ident.gid()); |
709 | conn.set_features(server_ident.supported_features() & | |
710 | conn.policy.features_supported); | |
711 | peer_global_seq = server_ident.global_seq(); | |
712 | ||
713 | bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY; | |
714 | if (lossy != conn.policy.lossy) { | |
715 | logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn, lossy); | |
716 | conn.policy.lossy = lossy; | |
717 | } | |
718 | if (lossy && (connect_seq != 0 || server_cookie != 0)) { | |
719 | logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy", | |
720 | conn, connect_seq, server_cookie); | |
721 | connect_seq = 0; | |
722 | server_cookie = 0; | |
723 | } | |
724 | ||
725 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); | |
726 | }); | |
727 | default: { | |
728 | unexpected_tag(tag, conn, "post_client_connect"); | |
729 | return seastar::make_ready_future<next_step_t>(next_step_t::none); | |
730 | } | |
731 | } | |
732 | }); | |
733 | } | |
734 | ||
735 | seastar::future<ProtocolV2::next_step_t> | |
736 | ProtocolV2::client_reconnect() | |
737 | { | |
738 | // send_reconnect() logic | |
739 | auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(), | |
740 | client_cookie, | |
741 | server_cookie, | |
742 | global_seq, | |
743 | connect_seq, | |
744 | conn.in_seq); | |
745 | logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={}," | |
746 | " server_cookie={}, gs={}, cs={}, msg_seq={}", | |
747 | conn, messenger.get_myaddrs(), | |
748 | client_cookie, server_cookie, | |
749 | global_seq, connect_seq, conn.in_seq); | |
750 | return write_frame(reconnect).then([this] { | |
751 | return read_main_preamble(); | |
752 | }).then([this] (Tag tag) { | |
753 | switch (tag) { | |
754 | case Tag::SESSION_RETRY_GLOBAL: | |
755 | return read_frame_payload().then([this] { | |
756 | // handle_session_retry_global() logic | |
757 | auto retry = RetryGlobalFrame::Decode(rx_segments_data.back()); | |
758 | logger().warn("{} GOT RetryGlobalFrame: gs={}", | |
759 | conn, retry.global_seq()); | |
760 | return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) { | |
761 | global_seq = gs; | |
762 | logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq); | |
763 | return client_reconnect(); | |
764 | }); | |
765 | }); | |
766 | case Tag::SESSION_RETRY: | |
767 | return read_frame_payload().then([this] { | |
768 | // handle_session_retry() logic | |
769 | auto retry = RetryFrame::Decode(rx_segments_data.back()); | |
770 | logger().warn("{} GOT RetryFrame: cs={}", | |
771 | conn, retry.connect_seq()); | |
772 | connect_seq = retry.connect_seq() + 1; | |
773 | logger().warn("{} UPDATE: cs={}", conn, connect_seq); | |
774 | return client_reconnect(); | |
775 | }); | |
776 | case Tag::SESSION_RESET: | |
777 | return read_frame_payload().then([this] { | |
778 | // handle_session_reset() logic | |
779 | auto reset = ResetFrame::Decode(rx_segments_data.back()); | |
780 | logger().warn("{} GOT ResetFrame: full={}", conn, reset.full()); | |
781 | reset_session(reset.full()); | |
782 | return client_connect(); | |
783 | }); | |
784 | case Tag::WAIT: | |
785 | return process_wait(); | |
786 | case Tag::SESSION_RECONNECT_OK: | |
787 | return read_frame_payload().then([this] { | |
788 | // handle_reconnect_ok() logic | |
789 | auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back()); | |
790 | logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", | |
791 | conn, reconnect_ok.msg_seq()); | |
792 | requeue_up_to(reconnect_ok.msg_seq()); | |
793 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); | |
794 | }); | |
795 | default: { | |
796 | unexpected_tag(tag, conn, "post_client_reconnect"); | |
797 | return seastar::make_ready_future<next_step_t>(next_step_t::none); | |
798 | } | |
799 | } | |
800 | }); | |
801 | } | |
802 | ||
803 | void ProtocolV2::execute_connecting() | |
804 | { | |
805 | trigger_state(state_t::CONNECTING, write_state_t::delay, true); | |
806 | if (socket) { | |
807 | socket->shutdown(); | |
808 | } | |
f67539c2 | 809 | gated_execute("execute_connecting", [this] { |
9f95a23c TL |
810 | return messenger.get_global_seq().then([this] (auto gs) { |
811 | global_seq = gs; | |
812 | assert(client_cookie != 0); | |
813 | if (!conn.policy.lossy && server_cookie != 0) { | |
814 | ++connect_seq; | |
815 | logger().debug("{} UPDATE: gs={}, cs={} for reconnect", | |
816 | conn, global_seq, connect_seq); | |
817 | } else { // conn.policy.lossy || server_cookie == 0 | |
818 | assert(connect_seq == 0); | |
819 | assert(server_cookie == 0); | |
820 | logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); | |
821 | } | |
822 | ||
823 | return wait_write_exit(); | |
824 | }).then([this] { | |
825 | if (unlikely(state != state_t::CONNECTING)) { | |
826 | logger().debug("{} triggered {} before Socket::connect()", | |
827 | conn, get_state_name(state)); | |
828 | abort_protocol(); | |
829 | } | |
830 | if (socket) { | |
f67539c2 TL |
831 | gate.dispatch_in_background("close_sockect_connecting", *this, |
832 | [sock = std::move(socket)] () mutable { | |
9f95a23c TL |
833 | return sock->close().then([sock = std::move(sock)] {}); |
834 | }); | |
835 | } | |
836 | INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING); | |
837 | return Socket::connect(conn.peer_addr); | |
838 | }).then([this](SocketRef sock) { | |
839 | logger().debug("{} socket connected", conn); | |
840 | if (unlikely(state != state_t::CONNECTING)) { | |
841 | logger().debug("{} triggered {} during Socket::connect()", | |
842 | conn, get_state_name(state)); | |
843 | return sock->close().then([sock = std::move(sock)] { | |
844 | abort_protocol(); | |
845 | }); | |
846 | } | |
847 | socket = std::move(sock); | |
848 | return seastar::now(); | |
849 | }).then([this] { | |
850 | auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); | |
851 | session_stream_handlers = { nullptr, nullptr }; | |
852 | enable_recording(); | |
f67539c2 TL |
853 | return banner_exchange(true); |
854 | }).then([this] (auto&& ret) { | |
855 | auto [_peer_type, _my_addr_from_peer] = std::move(ret); | |
9f95a23c TL |
856 | if (conn.get_peer_type() != _peer_type) { |
857 | logger().warn("{} connection peer type does not match what peer advertises {} != {}", | |
858 | conn, ceph_entity_type_name(conn.get_peer_type()), | |
859 | ceph_entity_type_name(_peer_type)); | |
f67539c2 | 860 | abort_in_close(*this, true); |
9f95a23c | 861 | } |
f67539c2 TL |
862 | if (unlikely(state != state_t::CONNECTING)) { |
863 | logger().debug("{} triggered {} during banner_exchange(), abort", | |
864 | conn, get_state_name(state)); | |
865 | abort_protocol(); | |
866 | } | |
867 | socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port()); | |
9f95a23c TL |
868 | if (unlikely(_my_addr_from_peer.is_legacy())) { |
869 | logger().warn("{} peer sent a legacy address for me: {}", | |
870 | conn, _my_addr_from_peer); | |
871 | throw std::system_error( | |
872 | make_error_code(crimson::net::error::bad_peer_address)); | |
873 | } | |
874 | _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2); | |
875 | return messenger.learned_addr(_my_addr_from_peer, conn); | |
876 | }).then([this] { | |
877 | return client_auth(); | |
878 | }).then([this] { | |
879 | if (server_cookie == 0) { | |
880 | ceph_assert(connect_seq == 0); | |
881 | return client_connect(); | |
882 | } else { | |
883 | ceph_assert(connect_seq > 0); | |
884 | return client_reconnect(); | |
885 | } | |
886 | }).then([this] (next_step_t next) { | |
887 | if (unlikely(state != state_t::CONNECTING)) { | |
888 | logger().debug("{} triggered {} at the end of execute_connecting()", | |
889 | conn, get_state_name(state)); | |
890 | abort_protocol(); | |
891 | } | |
892 | switch (next) { | |
893 | case next_step_t::ready: { | |
9f95a23c TL |
894 | logger().info("{} connected:" |
895 | " gs={}, pgs={}, cs={}, client_cookie={}," | |
896 | " server_cookie={}, in_seq={}, out_seq={}, out_q={}", | |
897 | conn, global_seq, peer_global_seq, connect_seq, | |
898 | client_cookie, server_cookie, conn.in_seq, | |
899 | conn.out_seq, conn.out_q.size()); | |
f67539c2 | 900 | execute_ready(true); |
9f95a23c TL |
901 | break; |
902 | } | |
903 | case next_step_t::wait: { | |
904 | logger().info("{} execute_connecting(): going to WAIT", conn); | |
905 | execute_wait(true); | |
906 | break; | |
907 | } | |
908 | default: { | |
909 | ceph_abort("impossible next step"); | |
910 | } | |
911 | } | |
912 | }).handle_exception([this] (std::exception_ptr eptr) { | |
913 | if (state != state_t::CONNECTING) { | |
914 | logger().info("{} execute_connecting(): protocol aborted at {} -- {}", | |
915 | conn, get_state_name(state), eptr); | |
916 | assert(state == state_t::CLOSING || | |
917 | state == state_t::REPLACING); | |
918 | return; | |
919 | } | |
920 | ||
921 | if (conn.policy.server || | |
922 | (conn.policy.standby && | |
923 | (!is_queued() && conn.sent.empty()))) { | |
924 | logger().info("{} execute_connecting(): fault at {} with nothing to send," | |
925 | " going to STANDBY -- {}", | |
926 | conn, get_state_name(state), eptr); | |
927 | execute_standby(); | |
928 | } else { | |
929 | logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}", | |
930 | conn, get_state_name(state), eptr); | |
931 | execute_wait(false); | |
932 | } | |
933 | }); | |
934 | }); | |
935 | } | |
936 | ||
937 | // ACCEPTING state | |
938 | ||
939 | seastar::future<> ProtocolV2::_auth_bad_method(int r) | |
940 | { | |
941 | // _auth_bad_method() logic | |
942 | ceph_assert(r < 0); | |
943 | auto [allowed_methods, allowed_modes] = | |
944 | messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type()); | |
945 | auto bad_method = AuthBadMethodFrame::Encode( | |
946 | auth_meta->auth_method, r, allowed_methods, allowed_modes); | |
947 | logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, " | |
948 | "allowed_methods={}, allowed_modes={})", | |
949 | conn, auth_meta->auth_method, cpp_strerror(r), | |
950 | allowed_methods, allowed_modes); | |
951 | return write_frame(bad_method).then([this] { | |
952 | return server_auth(); | |
953 | }); | |
954 | } | |
955 | ||
956 | seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) | |
957 | { | |
958 | // _handle_auth_request() logic | |
959 | ceph_assert(messenger.get_auth_server()); | |
960 | bufferlist reply; | |
961 | int r = messenger.get_auth_server()->handle_auth_request( | |
962 | conn.shared_from_this(), auth_meta, | |
963 | more, auth_meta->auth_method, auth_payload, | |
964 | &reply); | |
965 | switch (r) { | |
966 | // successful | |
967 | case 1: { | |
968 | auto auth_done = AuthDoneFrame::Encode( | |
969 | conn.peer_global_id, auth_meta->con_mode, reply); | |
970 | logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}", | |
971 | conn, conn.peer_global_id, | |
972 | ceph_con_mode_name(auth_meta->con_mode), reply.length()); | |
973 | return write_frame(auth_done).then([this] { | |
974 | ceph_assert(auth_meta); | |
f67539c2 TL |
975 | session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( |
976 | nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true); | |
9f95a23c TL |
977 | return finish_auth(); |
978 | }); | |
979 | } | |
980 | // auth more | |
981 | case 0: { | |
982 | auto more = AuthReplyMoreFrame::Encode(reply); | |
983 | logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}", | |
984 | conn, reply.length()); | |
985 | return write_frame(more).then([this] { | |
986 | return read_main_preamble(); | |
987 | }).then([this] (Tag tag) { | |
988 | expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__); | |
989 | return read_frame_payload(); | |
990 | }).then([this] { | |
991 | auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back()); | |
992 | logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}", | |
993 | conn, auth_more.auth_payload().length()); | |
994 | return _handle_auth_request(auth_more.auth_payload(), true); | |
995 | }); | |
996 | } | |
997 | case -EBUSY: { | |
998 | logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn); | |
999 | abort_in_fault(); | |
1000 | return seastar::now(); | |
1001 | } | |
1002 | default: { | |
1003 | logger().warn("{} auth_server handle_auth_request returned {}", conn, r); | |
1004 | return _auth_bad_method(r); | |
1005 | } | |
1006 | } | |
1007 | } | |
1008 | ||
1009 | seastar::future<> ProtocolV2::server_auth() | |
1010 | { | |
1011 | return read_main_preamble() | |
1012 | .then([this] (Tag tag) { | |
1013 | expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__); | |
1014 | return read_frame_payload(); | |
1015 | }).then([this] { | |
1016 | // handle_auth_request() logic | |
1017 | auto request = AuthRequestFrame::Decode(rx_segments_data.back()); | |
1018 | logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={}," | |
1019 | " payload_len={}", | |
1020 | conn, request.method(), request.preferred_modes(), | |
1021 | request.auth_payload().length()); | |
1022 | auth_meta->auth_method = request.method(); | |
1023 | auth_meta->con_mode = messenger.get_auth_server()->pick_con_mode( | |
1024 | conn.get_peer_type(), auth_meta->auth_method, | |
1025 | request.preferred_modes()); | |
1026 | if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) { | |
1027 | logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn); | |
1028 | return _auth_bad_method(-EOPNOTSUPP); | |
1029 | } | |
1030 | return _handle_auth_request(request.auth_payload(), false); | |
1031 | }); | |
1032 | } | |
1033 | ||
f67539c2 TL |
1034 | bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const |
1035 | { | |
1036 | auto my_peer_name = conn.get_peer_name(); | |
1037 | if (my_peer_name.type() != peer_name.type()) { | |
1038 | return false; | |
1039 | } | |
1040 | if (my_peer_name.num() != entity_name_t::NEW && | |
1041 | peer_name.num() != entity_name_t::NEW && | |
1042 | my_peer_name.num() != peer_name.num()) { | |
1043 | return false; | |
1044 | } | |
1045 | return true; | |
1046 | } | |
1047 | ||
9f95a23c TL |
1048 | seastar::future<ProtocolV2::next_step_t> |
1049 | ProtocolV2::send_wait() | |
1050 | { | |
1051 | auto wait = WaitFrame::Encode(); | |
1052 | logger().debug("{} WRITE WaitFrame", conn); | |
1053 | return write_frame(wait).then([] { | |
1054 | return next_step_t::wait; | |
1055 | }); | |
1056 | } | |
1057 | ||
1058 | seastar::future<ProtocolV2::next_step_t> | |
1059 | ProtocolV2::reuse_connection( | |
1060 | ProtocolV2* existing_proto, bool do_reset, | |
1061 | bool reconnect, uint64_t conn_seq, uint64_t msg_seq) | |
1062 | { | |
1063 | existing_proto->trigger_replacing(reconnect, | |
1064 | do_reset, | |
1065 | std::move(socket), | |
1066 | std::move(auth_meta), | |
1067 | std::move(session_stream_handlers), | |
1068 | peer_global_seq, | |
1069 | client_cookie, | |
1070 | conn.get_peer_name(), | |
1071 | connection_features, | |
f67539c2 TL |
1072 | tx_frame_asm.get_is_rev1(), |
1073 | rx_frame_asm.get_is_rev1(), | |
9f95a23c TL |
1074 | conn_seq, |
1075 | msg_seq); | |
1076 | #ifdef UNIT_TESTS_BUILT | |
1077 | if (conn.interceptor) { | |
1078 | conn.interceptor->register_conn_replaced(conn); | |
1079 | } | |
1080 | #endif | |
1081 | // close this connection because all the necessary information is delivered | |
1082 | // to the exisiting connection, and jump to error handling code to abort the | |
1083 | // current state. | |
f67539c2 | 1084 | abort_in_close(*this, false); |
9f95a23c TL |
1085 | return seastar::make_ready_future<next_step_t>(next_step_t::none); |
1086 | } | |
1087 | ||
1088 | seastar::future<ProtocolV2::next_step_t> | |
1089 | ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) | |
1090 | { | |
1091 | // handle_existing_connection() logic | |
1092 | ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( | |
1093 | existing_conn->protocol.get()); | |
1094 | ceph_assert(existing_proto); | |
1095 | logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting," | |
1096 | " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", | |
1097 | conn, global_seq, peer_global_seq, connect_seq, | |
1098 | client_cookie, server_cookie, | |
1099 | existing_conn, get_state_name(existing_proto->state), | |
1100 | existing_proto->global_seq, | |
1101 | existing_proto->peer_global_seq, | |
1102 | existing_proto->connect_seq, | |
1103 | existing_proto->client_cookie, | |
1104 | existing_proto->server_cookie); | |
1105 | ||
f67539c2 TL |
1106 | if (!validate_peer_name(existing_conn->get_peer_name())) { |
1107 | logger().error("{} server_connect: my peer_name doesn't match" | |
1108 | " the existing connection {}, abort", conn, existing_conn); | |
1109 | abort_in_fault(); | |
1110 | } | |
1111 | ||
9f95a23c TL |
1112 | if (existing_proto->state == state_t::REPLACING) { |
1113 | logger().warn("{} server_connect: racing replace happened while" | |
1114 | " replacing existing connection {}, send wait.", | |
1115 | conn, *existing_conn); | |
1116 | return send_wait(); | |
1117 | } | |
1118 | ||
1119 | if (existing_proto->peer_global_seq > peer_global_seq) { | |
1120 | logger().warn("{} server_connect:" | |
1121 | " this is a stale connection, because peer_global_seq({})" | |
1122 | " < existing->peer_global_seq({}), close this connection" | |
1123 | " in favor of existing connection {}", | |
1124 | conn, peer_global_seq, | |
1125 | existing_proto->peer_global_seq, *existing_conn); | |
1126 | abort_in_fault(); | |
1127 | } | |
1128 | ||
1129 | if (existing_conn->policy.lossy) { | |
1130 | // existing connection can be thrown out in favor of this one | |
1131 | logger().warn("{} server_connect:" | |
1132 | " existing connection {} is a lossy channel. Close existing in favor of" | |
1133 | " this connection", conn, *existing_conn); | |
f67539c2 | 1134 | execute_establishing(existing_conn, true); |
9f95a23c TL |
1135 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); |
1136 | } | |
1137 | ||
1138 | if (existing_proto->server_cookie != 0) { | |
1139 | if (existing_proto->client_cookie != client_cookie) { | |
1140 | // Found previous session | |
1141 | // peer has reset and we're going to reuse the existing connection | |
1142 | // by replacing the socket | |
1143 | logger().warn("{} server_connect:" | |
1144 | " found new session (cs={})" | |
1145 | " when existing {} is with stale session (cs={}, ss={})," | |
1146 | " peer must have reset", | |
1147 | conn, client_cookie, | |
1148 | *existing_conn, existing_proto->client_cookie, | |
1149 | existing_proto->server_cookie); | |
1150 | return reuse_connection(existing_proto, conn.policy.resetcheck); | |
1151 | } else { | |
1152 | // session establishment interrupted between client_ident and server_ident, | |
1153 | // continuing... | |
1154 | logger().warn("{} server_connect: found client session with existing {}" | |
1155 | " matched (cs={}, ss={}), continuing session establishment", | |
1156 | conn, *existing_conn, client_cookie, existing_proto->server_cookie); | |
1157 | return reuse_connection(existing_proto); | |
1158 | } | |
1159 | } else { | |
1160 | // Looks like a connection race: server and client are both connecting to | |
1161 | // each other at the same time. | |
1162 | if (existing_proto->client_cookie != client_cookie) { | |
1163 | if (existing_conn->peer_wins()) { | |
1164 | logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" | |
1165 | " and win, reusing existing {}", | |
1166 | conn, client_cookie, existing_proto->client_cookie, *existing_conn); | |
1167 | return reuse_connection(existing_proto); | |
1168 | } else { | |
1169 | logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" | |
1170 | " and lose to existing {}, ask client to wait", | |
1171 | conn, client_cookie, existing_proto->client_cookie, *existing_conn); | |
1172 | return existing_conn->keepalive().then([this] { | |
1173 | return send_wait(); | |
1174 | }); | |
1175 | } | |
1176 | } else { | |
1177 | logger().warn("{} server_connect: found client session with existing {}" | |
1178 | " matched (cs={}, ss={}), continuing session establishment", | |
1179 | conn, *existing_conn, client_cookie, existing_proto->server_cookie); | |
1180 | return reuse_connection(existing_proto); | |
1181 | } | |
1182 | } | |
1183 | } | |
1184 | ||
1185 | seastar::future<ProtocolV2::next_step_t> | |
1186 | ProtocolV2::server_connect() | |
1187 | { | |
1188 | return read_frame_payload().then([this] { | |
1189 | // handle_client_ident() logic | |
1190 | auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back()); | |
1191 | logger().debug("{} GOT ClientIdentFrame: addrs={}, target={}," | |
1192 | " gid={}, gs={}, features_supported={}," | |
1193 | " features_required={}, flags={}, cookie={}", | |
1194 | conn, client_ident.addrs(), client_ident.target_addr(), | |
1195 | client_ident.gid(), client_ident.global_seq(), | |
1196 | client_ident.supported_features(), | |
1197 | client_ident.required_features(), | |
1198 | client_ident.flags(), client_ident.cookie()); | |
1199 | ||
1200 | if (client_ident.addrs().empty() || | |
1201 | client_ident.addrs().front() == entity_addr_t()) { | |
1202 | logger().warn("{} oops, client_ident.addrs() is empty", conn); | |
1203 | throw std::system_error( | |
1204 | make_error_code(crimson::net::error::bad_peer_address)); | |
1205 | } | |
1206 | if (!messenger.get_myaddrs().contains(client_ident.target_addr())) { | |
1207 | logger().warn("{} peer is trying to reach {} which is not us ({})", | |
1208 | conn, client_ident.target_addr(), messenger.get_myaddrs()); | |
1209 | throw std::system_error( | |
1210 | make_error_code(crimson::net::error::bad_peer_address)); | |
1211 | } | |
20effc67 | 1212 | conn.peer_addr = client_ident.addrs().front(); |
9f95a23c TL |
1213 | logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr); |
1214 | conn.target_addr = conn.peer_addr; | |
1215 | if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) { | |
1216 | logger().warn("{} we don't know how to reconnect to peer {}", | |
1217 | conn, conn.target_addr); | |
1218 | throw std::system_error( | |
1219 | make_error_code(crimson::net::error::bad_peer_address)); | |
1220 | } | |
1221 | ||
f67539c2 TL |
1222 | if (conn.get_peer_id() != entity_name_t::NEW && |
1223 | conn.get_peer_id() != client_ident.gid()) { | |
1224 | logger().error("{} client_ident peer_id ({}) does not match" | |
1225 | " what it should be ({}) during accepting, abort", | |
1226 | conn, client_ident.gid(), conn.get_peer_id()); | |
1227 | abort_in_fault(); | |
1228 | } | |
9f95a23c TL |
1229 | conn.set_peer_id(client_ident.gid()); |
1230 | client_cookie = client_ident.cookie(); | |
1231 | ||
1232 | uint64_t feat_missing = | |
1233 | (conn.policy.features_required | msgr2_required) & | |
1234 | ~(uint64_t)client_ident.supported_features(); | |
1235 | if (feat_missing) { | |
1236 | auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); | |
1237 | logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", | |
1238 | conn, feat_missing); | |
1239 | return write_frame(ident_missing_features).then([] { | |
1240 | return next_step_t::wait; | |
1241 | }); | |
1242 | } | |
1243 | connection_features = | |
1244 | client_ident.supported_features() & conn.policy.features_supported; | |
1245 | logger().debug("{} UPDATE: connection_features={}", conn, connection_features); | |
1246 | ||
1247 | peer_global_seq = client_ident.global_seq(); | |
1248 | ||
1249 | // Looks good so far, let's check if there is already an existing connection | |
1250 | // to this peer. | |
1251 | ||
1252 | SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); | |
1253 | ||
1254 | if (existing_conn) { | |
1255 | if (existing_conn->protocol->proto_type != proto_t::v2) { | |
1256 | logger().warn("{} existing connection {} proto version is {}, close existing", | |
1257 | conn, *existing_conn, | |
1258 | static_cast<int>(existing_conn->protocol->proto_type)); | |
1259 | // should unregister the existing from msgr atomically | |
f67539c2 TL |
1260 | // NOTE: this is following async messenger logic, but we may miss the reset event. |
1261 | execute_establishing(existing_conn, false); | |
1262 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); | |
9f95a23c TL |
1263 | } else { |
1264 | return handle_existing_connection(existing_conn); | |
1265 | } | |
f67539c2 TL |
1266 | } else { |
1267 | execute_establishing(nullptr, true); | |
1268 | return seastar::make_ready_future<next_step_t>(next_step_t::ready); | |
9f95a23c | 1269 | } |
9f95a23c TL |
1270 | }); |
1271 | } | |
1272 | ||
1273 | seastar::future<ProtocolV2::next_step_t> | |
1274 | ProtocolV2::read_reconnect() | |
1275 | { | |
1276 | return read_main_preamble() | |
1277 | .then([this] (Tag tag) { | |
1278 | expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect"); | |
1279 | return server_reconnect(); | |
1280 | }); | |
1281 | } | |
1282 | ||
1283 | seastar::future<ProtocolV2::next_step_t> | |
1284 | ProtocolV2::send_retry(uint64_t connect_seq) | |
1285 | { | |
1286 | auto retry = RetryFrame::Encode(connect_seq); | |
1287 | logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); | |
1288 | return write_frame(retry).then([this] { | |
1289 | return read_reconnect(); | |
1290 | }); | |
1291 | } | |
1292 | ||
1293 | seastar::future<ProtocolV2::next_step_t> | |
1294 | ProtocolV2::send_retry_global(uint64_t global_seq) | |
1295 | { | |
1296 | auto retry = RetryGlobalFrame::Encode(global_seq); | |
1297 | logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); | |
1298 | return write_frame(retry).then([this] { | |
1299 | return read_reconnect(); | |
1300 | }); | |
1301 | } | |
1302 | ||
1303 | seastar::future<ProtocolV2::next_step_t> | |
1304 | ProtocolV2::send_reset(bool full) | |
1305 | { | |
1306 | auto reset = ResetFrame::Encode(full); | |
1307 | logger().warn("{} WRITE ResetFrame: full={}", conn, full); | |
1308 | return write_frame(reset).then([this] { | |
1309 | return read_main_preamble(); | |
1310 | }).then([this] (Tag tag) { | |
1311 | expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset"); | |
1312 | return server_connect(); | |
1313 | }); | |
1314 | } | |
1315 | ||
1316 | seastar::future<ProtocolV2::next_step_t> | |
1317 | ProtocolV2::server_reconnect() | |
1318 | { | |
1319 | return read_frame_payload().then([this] { | |
1320 | // handle_reconnect() logic | |
1321 | auto reconnect = ReconnectFrame::Decode(rx_segments_data.back()); | |
1322 | ||
1323 | logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={}," | |
1324 | " server_cookie={}, gs={}, cs={}, msg_seq={}", | |
1325 | conn, reconnect.addrs(), | |
1326 | reconnect.client_cookie(), reconnect.server_cookie(), | |
1327 | reconnect.global_seq(), reconnect.connect_seq(), | |
1328 | reconnect.msg_seq()); | |
1329 | ||
1330 | // can peer_addrs be changed on-the-fly? | |
1331 | // TODO: change peer_addr to entity_addrvec_t | |
1332 | entity_addr_t paddr = reconnect.addrs().front(); | |
1333 | if (paddr.is_msgr2() || paddr.is_any()) { | |
1334 | // good | |
1335 | } else { | |
1336 | logger().warn("{} peer's address {} is not v2", conn, paddr); | |
1337 | throw std::system_error( | |
1338 | make_error_code(crimson::net::error::bad_peer_address)); | |
1339 | } | |
1340 | if (conn.peer_addr == entity_addr_t()) { | |
1341 | conn.peer_addr = paddr; | |
1342 | } else if (conn.peer_addr != paddr) { | |
1343 | logger().error("{} peer identifies as {}, while conn.peer_addr={}," | |
1344 | " reconnect failed", | |
1345 | conn, paddr, conn.peer_addr); | |
1346 | throw std::system_error( | |
1347 | make_error_code(crimson::net::error::bad_peer_address)); | |
1348 | } | |
1349 | peer_global_seq = reconnect.global_seq(); | |
1350 | ||
1351 | SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); | |
1352 | ||
1353 | if (!existing_conn) { | |
1354 | // there is no existing connection therefore cannot reconnect to previous | |
1355 | // session | |
1356 | logger().warn("{} server_reconnect: no existing connection from address {}," | |
1357 | " reseting client", conn, conn.peer_addr); | |
1358 | return send_reset(true); | |
1359 | } | |
1360 | ||
1361 | if (existing_conn->protocol->proto_type != proto_t::v2) { | |
1362 | logger().warn("{} server_reconnect: existing connection {} proto version is {}," | |
1363 | "close existing and reset client.", | |
1364 | conn, *existing_conn, | |
1365 | static_cast<int>(existing_conn->protocol->proto_type)); | |
f67539c2 TL |
1366 | // NOTE: this is following async messenger logic, but we may miss the reset event. |
1367 | existing_conn->mark_down(); | |
9f95a23c TL |
1368 | return send_reset(true); |
1369 | } | |
1370 | ||
1371 | ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>( | |
1372 | existing_conn->protocol.get()); | |
1373 | ceph_assert(existing_proto); | |
1374 | logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting," | |
1375 | " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", | |
1376 | conn, global_seq, peer_global_seq, reconnect.connect_seq(), | |
1377 | reconnect.client_cookie(), reconnect.server_cookie(), | |
1378 | existing_conn, | |
1379 | get_state_name(existing_proto->state), | |
1380 | existing_proto->global_seq, | |
1381 | existing_proto->peer_global_seq, | |
1382 | existing_proto->connect_seq, | |
1383 | existing_proto->client_cookie, | |
1384 | existing_proto->server_cookie); | |
1385 | ||
f67539c2 TL |
1386 | if (!validate_peer_name(existing_conn->get_peer_name())) { |
1387 | logger().error("{} server_reconnect: my peer_name doesn't match" | |
1388 | " the existing connection {}, abort", conn, existing_conn); | |
1389 | abort_in_fault(); | |
1390 | } | |
1391 | ||
9f95a23c TL |
1392 | if (existing_proto->state == state_t::REPLACING) { |
1393 | logger().warn("{} server_reconnect: racing replace happened while " | |
1394 | " replacing existing connection {}, retry global.", | |
1395 | conn, *existing_conn); | |
1396 | return send_retry_global(existing_proto->peer_global_seq); | |
1397 | } | |
1398 | ||
1399 | if (existing_proto->client_cookie != reconnect.client_cookie()) { | |
1400 | logger().warn("{} server_reconnect:" | |
1401 | " client_cookie mismatch with existing connection {}," | |
1402 | " cc={} rcc={}. I must have reset, reseting client.", | |
1403 | conn, *existing_conn, | |
1404 | existing_proto->client_cookie, reconnect.client_cookie()); | |
1405 | return send_reset(conn.policy.resetcheck); | |
1406 | } else if (existing_proto->server_cookie == 0) { | |
1407 | // this happens when: | |
1408 | // - a connects to b | |
1409 | // - a sends client_ident | |
1410 | // - b gets client_ident, sends server_ident and sets cookie X | |
1411 | // - connection fault | |
1412 | // - b reconnects to a with cookie X, connect_seq=1 | |
1413 | // - a has cookie==0 | |
1414 | logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the" | |
1415 | " server_ident with existing connection {}." | |
1416 | " Asking peer to resume session establishment", | |
1417 | conn, existing_proto->client_cookie, *existing_conn); | |
1418 | return send_reset(false); | |
1419 | } | |
1420 | ||
1421 | if (existing_proto->peer_global_seq > reconnect.global_seq()) { | |
1422 | logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({})," | |
1423 | " with existing connection {}," | |
1424 | " ask client to retry global", | |
1425 | conn, existing_proto->peer_global_seq, | |
1426 | reconnect.global_seq(), *existing_conn); | |
1427 | return send_retry_global(existing_proto->peer_global_seq); | |
1428 | } | |
1429 | ||
1430 | if (existing_proto->connect_seq > reconnect.connect_seq()) { | |
1431 | logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({})," | |
1432 | " with existing connection {}, ask client to retry", | |
1433 | conn, reconnect.connect_seq(), | |
1434 | existing_proto->connect_seq, *existing_conn); | |
1435 | return send_retry(existing_proto->connect_seq); | |
1436 | } else if (existing_proto->connect_seq == reconnect.connect_seq()) { | |
1437 | // reconnect race: both peers are sending reconnect messages | |
1438 | if (existing_conn->peer_wins()) { | |
1439 | logger().warn("{} server_reconnect: reconnect race detected (cs={})" | |
1440 | " and win, reusing existing {}", | |
1441 | conn, reconnect.connect_seq(), *existing_conn); | |
1442 | return reuse_connection( | |
1443 | existing_proto, false, | |
1444 | true, reconnect.connect_seq(), reconnect.msg_seq()); | |
1445 | } else { | |
1446 | logger().warn("{} server_reconnect: reconnect race detected (cs={})" | |
1447 | " and lose to existing {}, ask client to wait", | |
1448 | conn, reconnect.connect_seq(), *existing_conn); | |
1449 | return send_wait(); | |
1450 | } | |
1451 | } else { // existing_proto->connect_seq < reconnect.connect_seq() | |
1452 | logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({})," | |
1453 | " reusing existing {}", | |
1454 | conn, existing_proto->connect_seq, | |
1455 | reconnect.connect_seq(), *existing_conn); | |
1456 | return reuse_connection( | |
1457 | existing_proto, false, | |
1458 | true, reconnect.connect_seq(), reconnect.msg_seq()); | |
1459 | } | |
1460 | }); | |
1461 | } | |
1462 | ||
1463 | void ProtocolV2::execute_accepting() | |
1464 | { | |
1465 | trigger_state(state_t::ACCEPTING, write_state_t::none, false); | |
f67539c2 TL |
1466 | gate.dispatch_in_background("execute_accepting", *this, [this] { |
1467 | return seastar::futurize_invoke([this] { | |
9f95a23c TL |
1468 | INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); |
1469 | auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); | |
1470 | session_stream_handlers = { nullptr, nullptr }; | |
20effc67 | 1471 | session_comp_handlers = { nullptr, nullptr }; |
9f95a23c | 1472 | enable_recording(); |
f67539c2 TL |
1473 | return banner_exchange(false); |
1474 | }).then([this] (auto&& ret) { | |
1475 | auto [_peer_type, _my_addr_from_peer] = std::move(ret); | |
9f95a23c TL |
1476 | ceph_assert(conn.get_peer_type() == 0); |
1477 | conn.set_peer_type(_peer_type); | |
1478 | ||
1479 | conn.policy = messenger.get_policy(_peer_type); | |
1480 | logger().info("{} UPDATE: peer_type={}," | |
1481 | " policy(lossy={} server={} standby={} resetcheck={})", | |
1482 | conn, ceph_entity_type_name(_peer_type), | |
1483 | conn.policy.lossy, conn.policy.server, | |
1484 | conn.policy.standby, conn.policy.resetcheck); | |
20effc67 TL |
1485 | if (!messenger.get_myaddr().is_blank_ip() && |
1486 | (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() || | |
1487 | messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) { | |
9f95a23c TL |
1488 | logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}", |
1489 | conn, _my_addr_from_peer, messenger.get_myaddr()); | |
1490 | throw std::system_error( | |
1491 | make_error_code(crimson::net::error::bad_peer_address)); | |
1492 | } | |
1493 | return messenger.learned_addr(_my_addr_from_peer, conn); | |
1494 | }).then([this] { | |
1495 | return server_auth(); | |
1496 | }).then([this] { | |
1497 | return read_main_preamble(); | |
1498 | }).then([this] (Tag tag) { | |
1499 | switch (tag) { | |
1500 | case Tag::CLIENT_IDENT: | |
1501 | return server_connect(); | |
1502 | case Tag::SESSION_RECONNECT: | |
1503 | return server_reconnect(); | |
1504 | default: { | |
1505 | unexpected_tag(tag, conn, "post_server_auth"); | |
1506 | return seastar::make_ready_future<next_step_t>(next_step_t::none); | |
1507 | } | |
1508 | } | |
1509 | }).then([this] (next_step_t next) { | |
1510 | switch (next) { | |
1511 | case next_step_t::ready: | |
1512 | assert(state != state_t::ACCEPTING); | |
1513 | break; | |
1514 | case next_step_t::wait: | |
1515 | if (unlikely(state != state_t::ACCEPTING)) { | |
1516 | logger().debug("{} triggered {} at the end of execute_accepting()", | |
1517 | conn, get_state_name(state)); | |
1518 | abort_protocol(); | |
1519 | } | |
1520 | logger().info("{} execute_accepting(): going to SERVER_WAIT", conn); | |
1521 | execute_server_wait(); | |
1522 | break; | |
1523 | default: | |
1524 | ceph_abort("impossible next step"); | |
1525 | } | |
1526 | }).handle_exception([this] (std::exception_ptr eptr) { | |
1527 | logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", | |
1528 | conn, get_state_name(state), eptr); | |
f67539c2 | 1529 | close(false); |
9f95a23c TL |
1530 | }); |
1531 | }); | |
1532 | } | |
1533 | ||
1534 | // CONNECTING or ACCEPTING state | |
1535 | ||
1536 | seastar::future<> ProtocolV2::finish_auth() | |
1537 | { | |
1538 | ceph_assert(auth_meta); | |
1539 | ||
1540 | const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() : | |
1541 | auth_meta->session_key.hmac_sha256(nullptr, rxbuf); | |
1542 | auto sig_frame = AuthSignatureFrame::Encode(sig); | |
1543 | ceph_assert(record_io); | |
1544 | record_io = false; | |
1545 | rxbuf.clear(); | |
1546 | logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig); | |
1547 | return write_frame(sig_frame).then([this] { | |
1548 | return read_main_preamble(); | |
1549 | }).then([this] (Tag tag) { | |
1550 | expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth"); | |
1551 | return read_frame_payload(); | |
1552 | }).then([this] { | |
1553 | // handle_auth_signature() logic | |
1554 | auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back()); | |
1555 | logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature()); | |
1556 | ||
1557 | const auto actual_tx_sig = auth_meta->session_key.empty() ? | |
1558 | sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf); | |
1559 | if (sig_frame.signature() != actual_tx_sig) { | |
1560 | logger().warn("{} pre-auth signature mismatch actual_tx_sig={}" | |
1561 | " sig_frame.signature()={}", | |
1562 | conn, actual_tx_sig, sig_frame.signature()); | |
1563 | abort_in_fault(); | |
1564 | } | |
1565 | txbuf.clear(); | |
1566 | }); | |
1567 | } | |
1568 | ||
1569 | // ESTABLISHING | |
1570 | ||
f67539c2 TL |
1571 | void ProtocolV2::execute_establishing( |
1572 | SocketConnectionRef existing_conn, bool dispatch_reset) { | |
1573 | if (unlikely(state != state_t::ACCEPTING)) { | |
1574 | logger().debug("{} triggered {} before execute_establishing()", | |
1575 | conn, get_state_name(state)); | |
1576 | abort_protocol(); | |
1577 | } | |
1578 | ||
1579 | auto accept_me = [this] { | |
1580 | messenger.register_conn( | |
1581 | seastar::static_pointer_cast<SocketConnection>( | |
1582 | conn.shared_from_this())); | |
1583 | messenger.unaccept_conn( | |
1584 | seastar::static_pointer_cast<SocketConnection>( | |
1585 | conn.shared_from_this())); | |
1586 | }; | |
1587 | ||
9f95a23c | 1588 | trigger_state(state_t::ESTABLISHING, write_state_t::delay, false); |
f67539c2 TL |
1589 | if (existing_conn) { |
1590 | existing_conn->protocol->close(dispatch_reset, std::move(accept_me)); | |
1591 | if (unlikely(state != state_t::ESTABLISHING)) { | |
1592 | logger().warn("{} triggered {} during execute_establishing(), " | |
1593 | "the accept event will not be delivered!", | |
1594 | conn, get_state_name(state)); | |
1595 | abort_protocol(); | |
1596 | } | |
1597 | } else { | |
1598 | accept_me(); | |
1599 | } | |
1600 | ||
1601 | dispatchers.ms_handle_accept( | |
1602 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); | |
1603 | ||
1604 | gated_execute("execute_establishing", [this] { | |
1605 | return seastar::futurize_invoke([this] { | |
9f95a23c TL |
1606 | return send_server_ident(); |
1607 | }).then([this] { | |
1608 | if (unlikely(state != state_t::ESTABLISHING)) { | |
1609 | logger().debug("{} triggered {} at the end of execute_establishing()", | |
1610 | conn, get_state_name(state)); | |
1611 | abort_protocol(); | |
1612 | } | |
1613 | logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={}," | |
1614 | " server_cookie={}, in_seq={}, out_seq={}, out_q={}", | |
1615 | conn, global_seq, peer_global_seq, connect_seq, | |
1616 | client_cookie, server_cookie, conn.in_seq, | |
1617 | conn.out_seq, conn.out_q.size()); | |
f67539c2 | 1618 | execute_ready(false); |
9f95a23c TL |
1619 | }).handle_exception([this] (std::exception_ptr eptr) { |
1620 | if (state != state_t::ESTABLISHING) { | |
1621 | logger().info("{} execute_establishing() protocol aborted at {} -- {}", | |
1622 | conn, get_state_name(state), eptr); | |
1623 | assert(state == state_t::CLOSING || | |
1624 | state == state_t::REPLACING); | |
1625 | return; | |
1626 | } | |
1627 | fault(false, "execute_establishing()", eptr); | |
1628 | }); | |
1629 | }); | |
1630 | } | |
1631 | ||
1632 | // ESTABLISHING or REPLACING state | |
1633 | ||
1634 | seastar::future<> | |
1635 | ProtocolV2::send_server_ident() | |
1636 | { | |
1637 | // send_server_ident() logic | |
1638 | ||
1639 | // refered to async-conn v2: not assign gs to global_seq | |
1640 | return messenger.get_global_seq().then([this] (auto gs) { | |
1641 | logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); | |
1642 | ||
1643 | // this is required for the case when this connection is being replaced | |
1644 | requeue_up_to(0); | |
1645 | conn.in_seq = 0; | |
1646 | ||
1647 | if (!conn.policy.lossy) { | |
1648 | server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll); | |
1649 | } | |
1650 | ||
1651 | uint64_t flags = 0; | |
1652 | if (conn.policy.lossy) { | |
1653 | flags = flags | CEPH_MSG_CONNECT_LOSSY; | |
1654 | } | |
1655 | ||
1656 | auto server_ident = ServerIdentFrame::Encode( | |
1657 | messenger.get_myaddrs(), | |
1658 | messenger.get_myname().num(), | |
1659 | gs, | |
1660 | conn.policy.features_supported, | |
1661 | conn.policy.features_required | msgr2_required, | |
1662 | flags, | |
1663 | server_cookie); | |
1664 | ||
1665 | logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," | |
1666 | " gs={}, features_supported={}, features_required={}," | |
1667 | " flags={}, cookie={}", | |
1668 | conn, messenger.get_myaddrs(), messenger.get_myname().num(), | |
1669 | gs, conn.policy.features_supported, | |
1670 | conn.policy.features_required | msgr2_required, | |
1671 | flags, server_cookie); | |
1672 | ||
1673 | conn.set_features(connection_features); | |
1674 | ||
1675 | return write_frame(server_ident); | |
1676 | }); | |
1677 | } | |
1678 | ||
1679 | // REPLACING state | |
1680 | ||
1681 | void ProtocolV2::trigger_replacing(bool reconnect, | |
1682 | bool do_reset, | |
1683 | SocketRef&& new_socket, | |
1684 | AuthConnectionMetaRef&& new_auth_meta, | |
1685 | ceph::crypto::onwire::rxtx_t new_rxtx, | |
1686 | uint64_t new_peer_global_seq, | |
1687 | uint64_t new_client_cookie, | |
1688 | entity_name_t new_peer_name, | |
1689 | uint64_t new_conn_features, | |
f67539c2 TL |
1690 | bool tx_is_rev1, |
1691 | bool rx_is_rev1, | |
9f95a23c TL |
1692 | uint64_t new_connect_seq, |
1693 | uint64_t new_msg_seq) | |
1694 | { | |
1695 | trigger_state(state_t::REPLACING, write_state_t::delay, false); | |
1696 | if (socket) { | |
1697 | socket->shutdown(); | |
1698 | } | |
f67539c2 TL |
1699 | dispatchers.ms_handle_accept( |
1700 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); | |
1701 | gate.dispatch_in_background("trigger_replacing", *this, | |
1702 | [this, | |
1703 | reconnect, | |
1704 | do_reset, | |
1705 | new_socket = std::move(new_socket), | |
1706 | new_auth_meta = std::move(new_auth_meta), | |
1707 | new_rxtx = std::move(new_rxtx), | |
1708 | tx_is_rev1, rx_is_rev1, | |
1709 | new_client_cookie, new_peer_name, | |
1710 | new_conn_features, new_peer_global_seq, | |
1711 | new_connect_seq, new_msg_seq] () mutable { | |
9f95a23c TL |
1712 | return wait_write_exit().then([this, do_reset] { |
1713 | if (do_reset) { | |
1714 | reset_session(true); | |
1715 | } | |
1716 | protocol_timer.cancel(); | |
1717 | return execution_done.get_future(); | |
1718 | }).then([this, | |
1719 | reconnect, | |
1720 | new_socket = std::move(new_socket), | |
1721 | new_auth_meta = std::move(new_auth_meta), | |
1722 | new_rxtx = std::move(new_rxtx), | |
f67539c2 | 1723 | tx_is_rev1, rx_is_rev1, |
9f95a23c TL |
1724 | new_client_cookie, new_peer_name, |
1725 | new_conn_features, new_peer_global_seq, | |
1726 | new_connect_seq, new_msg_seq] () mutable { | |
1727 | if (unlikely(state != state_t::REPLACING)) { | |
1728 | return new_socket->close().then([sock = std::move(new_socket)] { | |
1729 | abort_protocol(); | |
1730 | }); | |
1731 | } | |
1732 | ||
1733 | if (socket) { | |
f67539c2 TL |
1734 | gate.dispatch_in_background("close_socket_replacing", *this, |
1735 | [sock = std::move(socket)] () mutable { | |
9f95a23c TL |
1736 | return sock->close().then([sock = std::move(sock)] {}); |
1737 | }); | |
1738 | } | |
1739 | socket = std::move(new_socket); | |
1740 | auth_meta = std::move(new_auth_meta); | |
1741 | session_stream_handlers = std::move(new_rxtx); | |
1742 | record_io = false; | |
1743 | peer_global_seq = new_peer_global_seq; | |
1744 | ||
1745 | if (reconnect) { | |
1746 | connect_seq = new_connect_seq; | |
1747 | // send_reconnect_ok() logic | |
1748 | requeue_up_to(new_msg_seq); | |
1749 | auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq); | |
1750 | logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq); | |
1751 | return write_frame(reconnect_ok); | |
1752 | } else { | |
1753 | client_cookie = new_client_cookie; | |
f67539c2 TL |
1754 | assert(conn.get_peer_type() == new_peer_name.type()); |
1755 | if (conn.get_peer_id() == entity_name_t::NEW) { | |
1756 | conn.set_peer_id(new_peer_name.num()); | |
1757 | } | |
9f95a23c | 1758 | connection_features = new_conn_features; |
f67539c2 TL |
1759 | tx_frame_asm.set_is_rev1(tx_is_rev1); |
1760 | rx_frame_asm.set_is_rev1(rx_is_rev1); | |
9f95a23c TL |
1761 | return send_server_ident(); |
1762 | } | |
1763 | }).then([this, reconnect] { | |
1764 | if (unlikely(state != state_t::REPLACING)) { | |
1765 | logger().debug("{} triggered {} at the end of trigger_replacing()", | |
1766 | conn, get_state_name(state)); | |
1767 | abort_protocol(); | |
1768 | } | |
1769 | logger().info("{} replaced ({}):" | |
1770 | " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={}," | |
1771 | " in_seq={}, out_seq={}, out_q={}", | |
1772 | conn, reconnect ? "reconnected" : "connected", | |
1773 | global_seq, peer_global_seq, connect_seq, client_cookie, | |
1774 | server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size()); | |
f67539c2 | 1775 | execute_ready(false); |
9f95a23c TL |
1776 | }).handle_exception([this] (std::exception_ptr eptr) { |
1777 | if (state != state_t::REPLACING) { | |
1778 | logger().info("{} trigger_replacing(): protocol aborted at {} -- {}", | |
1779 | conn, get_state_name(state), eptr); | |
1780 | assert(state == state_t::CLOSING); | |
1781 | return; | |
1782 | } | |
1783 | fault(true, "trigger_replacing()", eptr); | |
1784 | }); | |
1785 | }); | |
1786 | } | |
1787 | ||
1788 | // READY state | |
1789 | ||
1790 | ceph::bufferlist ProtocolV2::do_sweep_messages( | |
20effc67 | 1791 | const std::deque<MessageURef>& msgs, |
9f95a23c TL |
1792 | size_t num_msgs, |
1793 | bool require_keepalive, | |
1794 | std::optional<utime_t> _keepalive_ack, | |
1795 | bool require_ack) | |
1796 | { | |
1797 | ceph::bufferlist bl; | |
1798 | ||
1799 | if (unlikely(require_keepalive)) { | |
1800 | auto keepalive_frame = KeepAliveFrame::Encode(); | |
f6b5b4d7 | 1801 | bl.append(keepalive_frame.get_buffer(tx_frame_asm)); |
9f95a23c TL |
1802 | INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE); |
1803 | } | |
1804 | ||
1805 | if (unlikely(_keepalive_ack.has_value())) { | |
1806 | auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack); | |
f6b5b4d7 | 1807 | bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm)); |
9f95a23c TL |
1808 | INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE); |
1809 | } | |
1810 | ||
20effc67 | 1811 | if (require_ack && num_msgs == 0u) { |
9f95a23c | 1812 | auto ack_frame = AckFrame::Encode(conn.in_seq); |
f6b5b4d7 | 1813 | bl.append(ack_frame.get_buffer(tx_frame_asm)); |
9f95a23c TL |
1814 | INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE); |
1815 | } | |
1816 | ||
20effc67 | 1817 | std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) { |
9f95a23c TL |
1818 | // TODO: move to common code |
1819 | // set priority | |
1820 | msg->get_header().src = messenger.get_myname(); | |
1821 | ||
1822 | msg->encode(conn.features, 0); | |
1823 | ||
1824 | ceph_assert(!msg->get_seq() && "message already has seq"); | |
1825 | msg->set_seq(++conn.out_seq); | |
1826 | ||
1827 | ceph_msg_header &header = msg->get_header(); | |
1828 | ceph_msg_footer &footer = msg->get_footer(); | |
1829 | ||
1830 | ceph_msg_header2 header2{header.seq, header.tid, | |
1831 | header.type, header.priority, | |
1832 | header.version, | |
20effc67 TL |
1833 | ceph_le32(0), header.data_off, |
1834 | ceph_le64(conn.in_seq), | |
9f95a23c TL |
1835 | footer.flags, header.compat_version, |
1836 | header.reserved}; | |
1837 | ||
1838 | auto message = MessageFrame::Encode(header2, | |
1839 | msg->get_payload(), msg->get_middle(), msg->get_data()); | |
1840 | logger().debug("{} --> #{} === {} ({})", | |
1841 | conn, msg->get_seq(), *msg, msg->get_type()); | |
f6b5b4d7 | 1842 | bl.append(message.get_buffer(tx_frame_asm)); |
9f95a23c TL |
1843 | INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE); |
1844 | }); | |
1845 | ||
1846 | return bl; | |
1847 | } | |
1848 | ||
1849 | seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) | |
1850 | { | |
1851 | return read_frame_payload() | |
1852 | .then([this, throttle_stamp] { | |
1853 | utime_t recv_stamp{seastar::lowres_system_clock::now()}; | |
1854 | ||
1855 | // we need to get the size before std::moving segments data | |
1856 | const size_t cur_msg_size = get_current_msg_size(); | |
f6b5b4d7 | 1857 | auto msg_frame = MessageFrame::Decode(rx_segments_data); |
9f95a23c TL |
1858 | // XXX: paranoid copy just to avoid oops |
1859 | ceph_msg_header2 current_header = msg_frame.header(); | |
1860 | ||
1861 | logger().trace("{} got {} + {} + {} byte message," | |
1862 | " envelope type={} src={} off={} seq={}", | |
1863 | conn, msg_frame.front_len(), msg_frame.middle_len(), | |
1864 | msg_frame.data_len(), current_header.type, conn.get_peer_name(), | |
1865 | current_header.data_off, current_header.seq); | |
1866 | ||
1867 | ceph_msg_header header{current_header.seq, | |
1868 | current_header.tid, | |
1869 | current_header.type, | |
1870 | current_header.priority, | |
1871 | current_header.version, | |
20effc67 TL |
1872 | ceph_le32(msg_frame.front_len()), |
1873 | ceph_le32(msg_frame.middle_len()), | |
1874 | ceph_le32(msg_frame.data_len()), | |
9f95a23c TL |
1875 | current_header.data_off, |
1876 | conn.get_peer_name(), | |
1877 | current_header.compat_version, | |
1878 | current_header.reserved, | |
20effc67 TL |
1879 | ceph_le32(0)}; |
1880 | ceph_msg_footer footer{ceph_le32(0), ceph_le32(0), | |
1881 | ceph_le32(0), ceph_le64(0), current_header.flags}; | |
9f95a23c | 1882 | |
f67539c2 | 1883 | auto conn_ref = seastar::static_pointer_cast<SocketConnection>( |
9f95a23c TL |
1884 | conn.shared_from_this()); |
1885 | Message *message = decode_message(nullptr, 0, header, footer, | |
f67539c2 | 1886 | msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref); |
9f95a23c TL |
1887 | if (!message) { |
1888 | logger().warn("{} decode message failed", conn); | |
1889 | abort_in_fault(); | |
1890 | } | |
1891 | ||
1892 | // store reservation size in message, so we don't get confused | |
1893 | // by messages entering the dispatch queue through other paths. | |
1894 | message->set_dispatch_throttle_size(cur_msg_size); | |
1895 | ||
1896 | message->set_throttle_stamp(throttle_stamp); | |
1897 | message->set_recv_stamp(recv_stamp); | |
1898 | message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()}); | |
1899 | ||
1900 | // check received seq#. if it is old, drop the message. | |
1901 | // note that incoming messages may skip ahead. this is convenient for the | |
1902 | // client side queueing because messages can't be renumbered, but the (kernel) | |
1903 | // client will occasionally pull a message out of the sent queue to send | |
1904 | // elsewhere. in that case it doesn't matter if we "got" it or not. | |
1905 | uint64_t cur_seq = conn.in_seq; | |
1906 | if (message->get_seq() <= cur_seq) { | |
f67539c2 TL |
1907 | logger().error("{} got old message {} <= {} {}, discarding", |
1908 | conn, message->get_seq(), cur_seq, *message); | |
9f95a23c | 1909 | if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && |
f67539c2 | 1910 | local_conf()->ms_die_on_old_message) { |
9f95a23c TL |
1911 | ceph_assert(0 == "old msgs despite reconnect_seq feature"); |
1912 | } | |
f67539c2 | 1913 | return seastar::now(); |
9f95a23c TL |
1914 | } else if (message->get_seq() > cur_seq + 1) { |
1915 | logger().error("{} missed message? skipped from seq {} to {}", | |
1916 | conn, cur_seq, message->get_seq()); | |
f67539c2 | 1917 | if (local_conf()->ms_die_on_skipped_message) { |
9f95a23c TL |
1918 | ceph_assert(0 == "skipped incoming seq"); |
1919 | } | |
1920 | } | |
1921 | ||
1922 | // note last received message. | |
1923 | conn.in_seq = message->get_seq(); | |
1924 | logger().debug("{} <== #{} === {} ({})", | |
1925 | conn, message->get_seq(), *message, message->get_type()); | |
1926 | notify_ack(); | |
1927 | ack_writes(current_header.ack_seq); | |
1928 | ||
1929 | // TODO: change MessageRef with seastar::shared_ptr | |
1930 | auto msg_ref = MessageRef{message, false}; | |
f67539c2 TL |
1931 | // throttle the reading process by the returned future |
1932 | return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); | |
9f95a23c TL |
1933 | }); |
1934 | } | |
1935 | ||
f67539c2 | 1936 | void ProtocolV2::execute_ready(bool dispatch_connect) |
9f95a23c TL |
1937 | { |
1938 | assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); | |
1939 | trigger_state(state_t::READY, write_state_t::open, false); | |
f67539c2 TL |
1940 | if (dispatch_connect) { |
1941 | dispatchers.ms_handle_connect( | |
1942 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); | |
1943 | } | |
9f95a23c TL |
1944 | #ifdef UNIT_TESTS_BUILT |
1945 | if (conn.interceptor) { | |
1946 | conn.interceptor->register_conn_ready(conn); | |
1947 | } | |
1948 | #endif | |
f67539c2 | 1949 | gated_execute("execute_ready", [this] { |
9f95a23c TL |
1950 | protocol_timer.cancel(); |
1951 | return seastar::keep_doing([this] { | |
1952 | return read_main_preamble() | |
1953 | .then([this] (Tag tag) { | |
1954 | switch (tag) { | |
1955 | case Tag::MESSAGE: { | |
f67539c2 | 1956 | return seastar::futurize_invoke([this] { |
9f95a23c TL |
1957 | // throttle_message() logic |
1958 | if (!conn.policy.throttler_messages) { | |
1959 | return seastar::now(); | |
1960 | } | |
1961 | // TODO: message throttler | |
1962 | ceph_assert(false); | |
1963 | return seastar::now(); | |
1964 | }).then([this] { | |
1965 | // throttle_bytes() logic | |
1966 | if (!conn.policy.throttler_bytes) { | |
1967 | return seastar::now(); | |
1968 | } | |
1969 | size_t cur_msg_size = get_current_msg_size(); | |
1970 | if (!cur_msg_size) { | |
1971 | return seastar::now(); | |
1972 | } | |
1973 | logger().trace("{} wants {} bytes from policy throttler {}/{}", | |
1974 | conn, cur_msg_size, | |
1975 | conn.policy.throttler_bytes->get_current(), | |
1976 | conn.policy.throttler_bytes->get_max()); | |
1977 | return conn.policy.throttler_bytes->get(cur_msg_size); | |
1978 | }).then([this] { | |
1979 | // TODO: throttle_dispatch_queue() logic | |
1980 | utime_t throttle_stamp{seastar::lowres_system_clock::now()}; | |
1981 | return read_message(throttle_stamp); | |
1982 | }); | |
1983 | } | |
1984 | case Tag::ACK: | |
1985 | return read_frame_payload().then([this] { | |
1986 | // handle_message_ack() logic | |
1987 | auto ack = AckFrame::Decode(rx_segments_data.back()); | |
1988 | logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); | |
1989 | ack_writes(ack.seq()); | |
1990 | }); | |
1991 | case Tag::KEEPALIVE2: | |
1992 | return read_frame_payload().then([this] { | |
1993 | // handle_keepalive2() logic | |
1994 | auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back()); | |
1995 | logger().debug("{} GOT KeepAliveFrame: timestamp={}", | |
1996 | conn, keepalive_frame.timestamp()); | |
1997 | notify_keepalive_ack(keepalive_frame.timestamp()); | |
1998 | conn.set_last_keepalive(seastar::lowres_system_clock::now()); | |
1999 | }); | |
2000 | case Tag::KEEPALIVE2_ACK: | |
2001 | return read_frame_payload().then([this] { | |
2002 | // handle_keepalive2_ack() logic | |
2003 | auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back()); | |
2004 | conn.set_last_keepalive_ack( | |
2005 | seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}); | |
2006 | logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", | |
2007 | conn, conn.last_keepalive_ack); | |
2008 | }); | |
2009 | default: { | |
2010 | unexpected_tag(tag, conn, "execute_ready"); | |
2011 | return seastar::now(); | |
2012 | } | |
2013 | } | |
2014 | }); | |
2015 | }).handle_exception([this] (std::exception_ptr eptr) { | |
2016 | if (state != state_t::READY) { | |
2017 | logger().info("{} execute_ready(): protocol aborted at {} -- {}", | |
2018 | conn, get_state_name(state), eptr); | |
2019 | assert(state == state_t::REPLACING || | |
2020 | state == state_t::CLOSING); | |
2021 | return; | |
2022 | } | |
2023 | fault(false, "execute_ready()", eptr); | |
2024 | }); | |
2025 | }); | |
2026 | } | |
2027 | ||
2028 | // STANDBY state | |
2029 | ||
2030 | void ProtocolV2::execute_standby() | |
2031 | { | |
2032 | trigger_state(state_t::STANDBY, write_state_t::delay, true); | |
2033 | if (socket) { | |
2034 | socket->shutdown(); | |
2035 | } | |
2036 | } | |
2037 | ||
2038 | void ProtocolV2::notify_write() | |
2039 | { | |
2040 | if (unlikely(state == state_t::STANDBY && !conn.policy.server)) { | |
2041 | logger().info("{} notify_write(): at {}, going to CONNECTING", | |
2042 | conn, get_state_name(state)); | |
2043 | execute_connecting(); | |
2044 | } | |
2045 | } | |
2046 | ||
2047 | // WAIT state | |
2048 | ||
2049 | void ProtocolV2::execute_wait(bool max_backoff) | |
2050 | { | |
2051 | trigger_state(state_t::WAIT, write_state_t::delay, true); | |
2052 | if (socket) { | |
2053 | socket->shutdown(); | |
2054 | } | |
f67539c2 | 2055 | gated_execute("execute_wait", [this, max_backoff] { |
9f95a23c TL |
2056 | double backoff = protocol_timer.last_dur(); |
2057 | if (max_backoff) { | |
f67539c2 | 2058 | backoff = local_conf().get_val<double>("ms_max_backoff"); |
9f95a23c | 2059 | } else if (backoff > 0) { |
f67539c2 | 2060 | backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff); |
9f95a23c | 2061 | } else { |
f67539c2 | 2062 | backoff = local_conf().get_val<double>("ms_initial_backoff"); |
9f95a23c TL |
2063 | } |
2064 | return protocol_timer.backoff(backoff).then([this] { | |
2065 | if (unlikely(state != state_t::WAIT)) { | |
2066 | logger().debug("{} triggered {} at the end of execute_wait()", | |
2067 | conn, get_state_name(state)); | |
2068 | abort_protocol(); | |
2069 | } | |
2070 | logger().info("{} execute_wait(): going to CONNECTING", conn); | |
2071 | execute_connecting(); | |
2072 | }).handle_exception([this] (std::exception_ptr eptr) { | |
2073 | logger().info("{} execute_wait(): protocol aborted at {} -- {}", | |
2074 | conn, get_state_name(state), eptr); | |
2075 | assert(state == state_t::REPLACING || | |
2076 | state == state_t::CLOSING); | |
2077 | }); | |
2078 | }); | |
2079 | } | |
2080 | ||
2081 | // SERVER_WAIT state | |
2082 | ||
2083 | void ProtocolV2::execute_server_wait() | |
2084 | { | |
2085 | trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false); | |
f67539c2 | 2086 | gated_execute("execute_server_wait", [this] { |
9f95a23c TL |
2087 | return read_exactly(1).then([this] (auto bl) { |
2088 | logger().warn("{} SERVER_WAIT got read, abort", conn); | |
2089 | abort_in_fault(); | |
2090 | }).handle_exception([this] (std::exception_ptr eptr) { | |
2091 | logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}", | |
2092 | conn, get_state_name(state), eptr); | |
f67539c2 | 2093 | close(false); |
9f95a23c TL |
2094 | }); |
2095 | }); | |
2096 | } | |
2097 | ||
2098 | // CLOSING state | |
2099 | ||
2100 | void ProtocolV2::trigger_close() | |
2101 | { | |
f67539c2 TL |
2102 | messenger.closing_conn( |
2103 | seastar::static_pointer_cast<SocketConnection>( | |
2104 | conn.shared_from_this())); | |
2105 | ||
9f95a23c TL |
2106 | if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) { |
2107 | messenger.unaccept_conn( | |
2108 | seastar::static_pointer_cast<SocketConnection>( | |
2109 | conn.shared_from_this())); | |
2110 | } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) { | |
2111 | messenger.unregister_conn( | |
2112 | seastar::static_pointer_cast<SocketConnection>( | |
2113 | conn.shared_from_this())); | |
2114 | } else { | |
2115 | // cannot happen | |
2116 | ceph_assert(false); | |
2117 | } | |
2118 | ||
2119 | protocol_timer.cancel(); | |
9f95a23c | 2120 | trigger_state(state_t::CLOSING, write_state_t::drop, false); |
f67539c2 TL |
2121 | } |
2122 | ||
2123 | void ProtocolV2::on_closed() | |
2124 | { | |
2125 | messenger.closed_conn( | |
2126 | seastar::static_pointer_cast<SocketConnection>( | |
2127 | conn.shared_from_this())); | |
2128 | } | |
2129 | ||
2130 | void ProtocolV2::print(std::ostream& out) const | |
2131 | { | |
2132 | out << conn; | |
9f95a23c TL |
2133 | } |
2134 | ||
2135 | } // namespace crimson::net |