]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/ProtocolV2.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <type_traits>
5
6#include "ProtocolV2.h"
7#include "AsyncMessenger.h"
8
9#include "common/EventTrace.h"
10#include "common/ceph_crypto.h"
11#include "common/errno.h"
12#include "include/random.h"
13#include "auth/AuthClient.h"
14#include "auth/AuthServer.h"
15
16#define dout_subsys ceph_subsys_ms
17#undef dout_prefix
18#define dout_prefix _conn_prefix(_dout)
f67539c2 19std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
11fdf7f2
TL
20 return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21 << *connection->peer_addrs << " conn(" << connection << " "
22 << this
23 << " " << ceph_con_mode_name(auth_meta->con_mode)
24 << " :" << connection->port
25 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq << " l=" << connection->policy.lossy
f6b5b4d7
TL
27 << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
28 REVISION_1)
11fdf7f2
TL
29 << " rx=" << session_stream_handlers.rx.get()
30 << " tx=" << session_stream_handlers.tx.get()
31 << ").";
32}
33
34using namespace ceph::msgr::v2;
35
36using CtPtr = Ct<ProtocolV2> *;
37using CtRef = Ct<ProtocolV2> &;
38
39void ProtocolV2::run_continuation(CtPtr pcontinuation) {
40 if (pcontinuation) {
41 run_continuation(*pcontinuation);
42 }
43}
44
45void ProtocolV2::run_continuation(CtRef continuation) {
46 try {
47 CONTINUATION_RUN(continuation)
f67539c2
TL
48 } catch (const ceph::buffer::error &e) {
49 lderr(cct) << __func__ << " failed decoding of frame header: " << e.what()
11fdf7f2
TL
50 << dendl;
51 _fault();
52 } catch (const ceph::crypto::onwire::MsgAuthError &e) {
53 lderr(cct) << __func__ << " " << e.what() << dendl;
54 _fault();
55 } catch (const DecryptionError &) {
56 lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
57 }
58}
59
60#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
61
f67539c2 62#define READ(L, C) read(CONTINUATION(C), ceph::buffer::ptr_node::create(ceph::buffer::create(L)))
11fdf7f2
TL
63
64#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
65
66#ifdef UNIT_TESTS_BUILT
67
68#define INTERCEPT(S) { \
69if(connection->interceptor) { \
70 auto a = connection->interceptor->intercept(connection, (S)); \
71 if (a == Interceptor::ACTION::FAIL) { \
72 return _fault(); \
73 } else if (a == Interceptor::ACTION::STOP) { \
74 stop(); \
75 connection->dispatch_queue->queue_reset(connection); \
76 return nullptr; \
77 }}}
78
79#else
80#define INTERCEPT(S)
81#endif
82
83ProtocolV2::ProtocolV2(AsyncConnection *connection)
84 : Protocol(2, connection),
85 state(NONE),
f6b5b4d7 86 peer_supported_features(0),
11fdf7f2
TL
87 client_cookie(0),
88 server_cookie(0),
89 global_seq(0),
90 connect_seq(0),
91 peer_global_seq(0),
92 message_seq(0),
93 reconnecting(false),
94 replacing(false),
95 can_write(false),
96 bannerExchangeCallback(nullptr),
f6b5b4d7
TL
97 tx_frame_asm(&session_stream_handlers, false),
98 rx_frame_asm(&session_stream_handlers, false),
11fdf7f2
TL
99 next_tag(static_cast<Tag>(0)),
100 keepalive(false) {
101}
102
103ProtocolV2::~ProtocolV2() {
104}
105
106void ProtocolV2::connect() {
107 ldout(cct, 1) << __func__ << dendl;
108 state = START_CONNECT;
109 pre_auth.enabled = true;
110}
111
112void ProtocolV2::accept() {
113 ldout(cct, 1) << __func__ << dendl;
114 state = START_ACCEPT;
115}
116
117bool ProtocolV2::is_connected() { return can_write; }
118
119/*
120 * Tears down the message queues, and removes them from the
121 * DispatchQueue Must hold write_lock prior to calling.
122 */
123void ProtocolV2::discard_out_queue() {
124 ldout(cct, 10) << __func__ << " started" << dendl;
125
f67539c2 126 for (auto p = sent.begin(); p != sent.end(); ++p) {
11fdf7f2
TL
127 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
128 (*p)->put();
129 }
130 sent.clear();
131 for (auto& [ prio, entries ] : out_queue) {
132 static_cast<void>(prio);
133 for (auto& entry : entries) {
134 ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
135 entry.m->put();
136 }
137 }
138 out_queue.clear();
494da23a 139 write_in_progress = false;
11fdf7f2
TL
140}
141
142void ProtocolV2::reset_session() {
143 ldout(cct, 1) << __func__ << dendl;
144
145 std::lock_guard<std::mutex> l(connection->write_lock);
146 if (connection->delay_state) {
147 connection->delay_state->discard();
148 }
149
150 connection->dispatch_queue->discard_queue(connection->conn_id);
151 discard_out_queue();
9f95a23c 152 connection->outgoing_bl.clear();
11fdf7f2
TL
153
154 connection->dispatch_queue->queue_remote_reset(connection);
155
156 out_seq = 0;
157 in_seq = 0;
158 client_cookie = 0;
159 server_cookie = 0;
160 connect_seq = 0;
161 peer_global_seq = 0;
162 message_seq = 0;
163 ack_left = 0;
164 can_write = false;
165}
166
167void ProtocolV2::stop() {
168 ldout(cct, 1) << __func__ << dendl;
169 if (state == CLOSED) {
170 return;
171 }
172
173 if (connection->delay_state) connection->delay_state->flush();
174
175 std::lock_guard<std::mutex> l(connection->write_lock);
176
177 reset_recv_state();
178 discard_out_queue();
179
180 connection->_stop();
181
182 can_write = false;
183 state = CLOSED;
184}
185
186void ProtocolV2::fault() { _fault(); }
187
188void ProtocolV2::requeue_sent() {
494da23a 189 write_in_progress = false;
11fdf7f2
TL
190 if (sent.empty()) {
191 return;
192 }
193
194 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
195 out_seq -= sent.size();
196 while (!sent.empty()) {
197 Message *m = sent.back();
198 sent.pop_back();
199 ldout(cct, 5) << __func__ << " requeueing message m=" << m
200 << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
201 << *m << dendl;
9f95a23c 202 m->clear_payload();
11fdf7f2
TL
203 rq.emplace_front(out_queue_entry_t{false, m});
204 }
205}
206
207uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
208 ldout(cct, 10) << __func__ << " " << seq << dendl;
209 std::lock_guard<std::mutex> l(connection->write_lock);
210 if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
211 return seq;
212 }
213 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
214 uint64_t count = out_seq;
215 while (!rq.empty()) {
216 Message* const m = rq.front().m;
217 if (m->get_seq() == 0 || m->get_seq() > seq) break;
218 ldout(cct, 5) << __func__ << " discarding message m=" << m
219 << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
220 << *m << dendl;
221 m->put();
222 rq.pop_front();
223 count++;
224 }
225 if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
226 return count;
227}
228
9f95a23c
TL
229void ProtocolV2::reset_security() {
230 ldout(cct, 5) << __func__ << dendl;
231
494da23a 232 auth_meta.reset(new AuthConnectionMeta);
494da23a 233 session_stream_handlers.rx.reset(nullptr);
9f95a23c 234 session_stream_handlers.tx.reset(nullptr);
494da23a 235 pre_auth.rxbuf.clear();
9f95a23c
TL
236 pre_auth.txbuf.clear();
237}
238
239// it's expected the `write_lock` is held while calling this method.
240void ProtocolV2::reset_recv_state() {
241 ldout(cct, 5) << __func__ << dendl;
242
243 if (!connection->center->in_thread()) {
244 // execute in the same thread that uses the rx/tx handlers. We need
245 // to do the warp because holding `write_lock` is not enough as
246 // `write_event()` unlocks it just before calling `write_message()`.
247 // `submit_to()` here is NOT blocking.
248 connection->center->submit_to(connection->center->get_id(), [this] {
249 ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
250 << dendl;
251 // Possibly unnecessary. See the comment in `deactivate_existing`.
252 std::lock_guard<std::mutex> l(connection->lock);
253 std::lock_guard<std::mutex> wl(connection->write_lock);
254 reset_security();
255 }, /* always_async = */true);
256 } else {
257 reset_security();
258 }
11fdf7f2
TL
259
260 // clean read and write callbacks
261 connection->pendingReadLen.reset();
262 connection->writeCallback.reset();
263
264 next_tag = static_cast<Tag>(0);
265
266 reset_throttle();
267}
268
269size_t ProtocolV2::get_current_msg_size() const {
f6b5b4d7 270 ceph_assert(rx_frame_asm.get_num_segments() > 0);
11fdf7f2
TL
271 size_t sum = 0;
272 // we don't include SegmentIndex::Msg::HEADER.
f6b5b4d7
TL
273 for (size_t i = 1; i < rx_frame_asm.get_num_segments(); i++) {
274 sum += rx_frame_asm.get_segment_logical_len(i);
11fdf7f2
TL
275 }
276 return sum;
277}
278
279void ProtocolV2::reset_throttle() {
280 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
281 connection->policy.throttler_messages) {
282 ldout(cct, 10) << __func__ << " releasing " << 1
283 << " message to policy throttler "
284 << connection->policy.throttler_messages->get_current()
285 << "/" << connection->policy.throttler_messages->get_max()
286 << dendl;
287 connection->policy.throttler_messages->put();
288 }
289 if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
290 if (connection->policy.throttler_bytes) {
291 const size_t cur_msg_size = get_current_msg_size();
292 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
293 << " bytes to policy throttler "
294 << connection->policy.throttler_bytes->get_current() << "/"
295 << connection->policy.throttler_bytes->get_max() << dendl;
296 connection->policy.throttler_bytes->put(cur_msg_size);
297 }
298 }
299 if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
300 const size_t cur_msg_size = get_current_msg_size();
301 ldout(cct, 10)
302 << __func__ << " releasing " << cur_msg_size
303 << " bytes to dispatch_queue throttler "
304 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
305 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
306 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
307 }
308}
309
310CtPtr ProtocolV2::_fault() {
311 ldout(cct, 10) << __func__ << dendl;
312
313 if (state == CLOSED || state == NONE) {
314 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
315 return nullptr;
316 }
317
318 if (connection->policy.lossy &&
319 !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
320 ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
321 stop();
322 connection->dispatch_queue->queue_reset(connection);
323 return nullptr;
324 }
325
326 connection->write_lock.lock();
327
328 can_write = false;
329 // requeue sent items
330 requeue_sent();
331
332 if (out_queue.empty() && state >= START_ACCEPT &&
333 state <= SESSION_ACCEPTING && !replacing) {
334 ldout(cct, 2) << __func__ << " with nothing to send and in the half "
335 << " accept state just closed" << dendl;
336 connection->write_lock.unlock();
337 stop();
338 connection->dispatch_queue->queue_reset(connection);
339 return nullptr;
340 }
341
342 replacing = false;
343 connection->fault();
344 reset_recv_state();
345
346 reconnecting = false;
347
348 if (connection->policy.standby && out_queue.empty() && !keepalive &&
349 state != WAIT) {
350 ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
351 << dendl;
352 state = STANDBY;
353 connection->write_lock.unlock();
354 return nullptr;
355 }
356 if (connection->policy.server) {
357 ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
358 state = STANDBY;
359 connection->write_lock.unlock();
360 return nullptr;
361 }
362
363 connection->write_lock.unlock();
364
365 if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
366 state != WAIT &&
367 state != SESSION_ACCEPTING /* due to connection race */) {
368 // policy maybe empty when state is in accept
369 if (connection->policy.server) {
370 ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
371 state = STANDBY;
372 } else {
373 ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
374 connect_seq++;
375 global_seq = messenger->get_global_seq();
376 state = START_CONNECT;
377 pre_auth.enabled = true;
378 connection->state = AsyncConnection::STATE_CONNECTING;
379 }
380 backoff = utime_t();
381 connection->center->dispatch_event_external(connection->read_handler);
382 } else {
383 if (state == WAIT) {
384 backoff.set_from_double(cct->_conf->ms_max_backoff);
385 } else if (backoff == utime_t()) {
386 backoff.set_from_double(cct->_conf->ms_initial_backoff);
387 } else {
388 backoff += backoff;
389 if (backoff > cct->_conf->ms_max_backoff)
390 backoff.set_from_double(cct->_conf->ms_max_backoff);
391 }
392
393 if (server_cookie) {
394 connect_seq++;
395 }
396
397 global_seq = messenger->get_global_seq();
398 state = START_CONNECT;
399 pre_auth.enabled = true;
400 connection->state = AsyncConnection::STATE_CONNECTING;
401 ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
402 // woke up again;
403 connection->register_time_events.insert(
404 connection->center->create_time_event(backoff.to_nsec() / 1000,
405 connection->wakeup_handler));
406 }
407 return nullptr;
408}
409
410void ProtocolV2::prepare_send_message(uint64_t features,
411 Message *m) {
412 ldout(cct, 20) << __func__ << " m=" << *m << dendl;
413
414 // associate message with Connection (for benefit of encode_payload)
9f95a23c
TL
415 ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
416 << features << " " << m << " " << *m << dendl;
11fdf7f2
TL
417
418 // encode and copy out of *m
419 m->encode(features, 0);
420}
421
422void ProtocolV2::send_message(Message *m) {
423 uint64_t f = connection->get_features();
424
425 // TODO: Currently not all messages supports reencode like MOSDMap, so here
426 // only let fast dispatch support messages prepare message
427 const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
428 if (can_fast_prepare) {
429 prepare_send_message(f, m);
430 }
431
432 std::lock_guard<std::mutex> l(connection->write_lock);
433 bool is_prepared = can_fast_prepare;
434 // "features" changes will change the payload encoding
435 if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
436 // ensure the correctness of message encoding
437 m->clear_payload();
438 is_prepared = false;
439 ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
440 << " != " << connection->get_features() << dendl;
441 }
442 if (state == CLOSED) {
443 ldout(cct, 10) << __func__ << " connection closed."
444 << " Drop message " << m << dendl;
445 m->put();
446 } else {
447 ldout(cct, 5) << __func__ << " enqueueing message m=" << m
448 << " type=" << m->get_type() << " " << *m << dendl;
9f95a23c 449 m->queue_start = ceph::mono_clock::now();
11fdf7f2
TL
450 m->trace.event("async enqueueing message");
451 out_queue[m->get_priority()].emplace_back(
452 out_queue_entry_t{is_prepared, m});
453 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
454 << dendl;
494da23a
TL
455 if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
456 write_in_progress = true;
11fdf7f2
TL
457 connection->center->dispatch_event_external(connection->write_handler);
458 }
459 }
460}
461
462void ProtocolV2::send_keepalive() {
463 ldout(cct, 10) << __func__ << dendl;
464 std::lock_guard<std::mutex> l(connection->write_lock);
465 if (state != CLOSED) {
466 keepalive = true;
467 connection->center->dispatch_event_external(connection->write_handler);
468 }
469}
470
471void ProtocolV2::read_event() {
472 ldout(cct, 20) << __func__ << dendl;
473
474 switch (state) {
475 case START_CONNECT:
476 run_continuation(CONTINUATION(start_client_banner_exchange));
477 break;
478 case START_ACCEPT:
479 run_continuation(CONTINUATION(start_server_banner_exchange));
480 break;
481 case READY:
482 run_continuation(CONTINUATION(read_frame));
483 break;
484 case THROTTLE_MESSAGE:
485 run_continuation(CONTINUATION(throttle_message));
486 break;
487 case THROTTLE_BYTES:
488 run_continuation(CONTINUATION(throttle_bytes));
489 break;
490 case THROTTLE_DISPATCH_QUEUE:
491 run_continuation(CONTINUATION(throttle_dispatch_queue));
492 break;
493 default:
494 break;
495 }
496}
497
498ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
499 out_queue_entry_t out_entry;
500
501 if (!out_queue.empty()) {
502 auto it = out_queue.rbegin();
503 auto& entries = it->second;
504 ceph_assert(!entries.empty());
505 out_entry = entries.front();
506 entries.pop_front();
507 if (entries.empty()) {
508 out_queue.erase(it->first);
509 }
510 }
511 return out_entry;
512}
513
514ssize_t ProtocolV2::write_message(Message *m, bool more) {
515 FUNCTRACE(cct);
516 ceph_assert(connection->center->in_thread());
517 m->set_seq(++out_seq);
518
519 connection->lock.lock();
520 uint64_t ack_seq = in_seq;
521 ack_left = 0;
522 connection->lock.unlock();
523
524 ceph_msg_header &header = m->get_header();
525 ceph_msg_footer &footer = m->get_footer();
526
527 ceph_msg_header2 header2{header.seq, header.tid,
528 header.type, header.priority,
529 header.version,
eafe8130
TL
530 init_le32(0), header.data_off,
531 init_le64(ack_seq),
11fdf7f2
TL
532 footer.flags, header.compat_version,
533 header.reserved};
534
535 auto message = MessageFrame::Encode(
536 header2,
537 m->get_payload(),
538 m->get_middle(),
539 m->get_data());
801d1391
TL
540 if (!append_frame(message)) {
541 m->put();
542 return -EILSEQ;
543 }
11fdf7f2
TL
544
545 ldout(cct, 5) << __func__ << " sending message m=" << m
546 << " seq=" << m->get_seq() << " " << *m << dendl;
547
548 m->trace.event("async writing message");
549 ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
550 << " src=" << entity_name_t(messenger->get_myname())
551 << " off=" << header2.data_off
552 << dendl;
9f95a23c 553 ssize_t total_send_size = connection->outgoing_bl.length();
11fdf7f2
TL
554 ssize_t rc = connection->_try_send(more);
555 if (rc < 0) {
556 ldout(cct, 1) << __func__ << " error sending " << m << ", "
557 << cpp_strerror(rc) << dendl;
558 } else {
559 connection->logger->inc(
9f95a23c 560 l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
11fdf7f2
TL
561 ldout(cct, 10) << __func__ << " sending " << m
562 << (rc ? " continuely." : " done.") << dendl;
563 }
9f95a23c
TL
564
565#if defined(WITH_EVENTTRACE)
11fdf7f2
TL
566 if (m->get_type() == CEPH_MSG_OSD_OP)
567 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
568 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
569 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
9f95a23c 570#endif
11fdf7f2
TL
571 m->put();
572
573 return rc;
574}
575
801d1391
TL
576template <class F>
577bool ProtocolV2::append_frame(F& frame) {
578 ceph::bufferlist bl;
579 try {
f6b5b4d7 580 bl = frame.get_buffer(tx_frame_asm);
801d1391
TL
581 } catch (ceph::crypto::onwire::TxHandlerError &e) {
582 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
583 return false;
584 }
f6b5b4d7
TL
585
586 ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
587 << " bytes " << tx_frame_asm << dendl;
801d1391
TL
588 connection->outgoing_bl.append(bl);
589 return true;
11fdf7f2
TL
590}
591
592void ProtocolV2::handle_message_ack(uint64_t seq) {
593 if (connection->policy.lossy) { // lossy connections don't keep sent messages
594 return;
595 }
596
597 ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
598
599 // trim sent list
600 static const int max_pending = 128;
601 int i = 0;
602 Message *pending[max_pending];
9f95a23c 603 auto now = ceph::mono_clock::now();
11fdf7f2
TL
604 connection->write_lock.lock();
605 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
606 Message *m = sent.front();
607 sent.pop_front();
608 pending[i++] = m;
609 ldout(cct, 10) << __func__ << " got ack seq " << seq
610 << " >= " << m->get_seq() << " on " << m << " " << *m
611 << dendl;
612 }
613 connection->write_lock.unlock();
9f95a23c 614 connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
11fdf7f2
TL
615 for (int k = 0; k < i; k++) {
616 pending[k]->put();
617 }
618}
619
620void ProtocolV2::write_event() {
621 ldout(cct, 10) << __func__ << dendl;
622 ssize_t r = 0;
623
624 connection->write_lock.lock();
625 if (can_write) {
626 if (keepalive) {
801d1391
TL
627 ldout(cct, 10) << __func__ << " appending keepalive" << dendl;
628 auto keepalive_frame = KeepAliveFrame::Encode();
629 if (!append_frame(keepalive_frame)) {
630 connection->write_lock.unlock();
631 connection->lock.lock();
632 fault();
633 connection->lock.unlock();
634 return;
635 }
11fdf7f2
TL
636 keepalive = false;
637 }
638
639 auto start = ceph::mono_clock::now();
640 bool more;
641 do {
642 const auto out_entry = _get_next_outgoing();
643 if (!out_entry.m) {
644 break;
645 }
646
647 if (!connection->policy.lossy) {
648 // put on sent list
649 sent.push_back(out_entry.m);
650 out_entry.m->get();
651 }
652 more = !out_queue.empty();
653 connection->write_lock.unlock();
654
655 // send_message or requeue messages may not encode message
656 if (!out_entry.is_prepared) {
657 prepare_send_message(connection->get_features(), out_entry.m);
658 }
659
9f95a23c
TL
660 if (out_entry.m->queue_start != ceph::mono_time()) {
661 connection->logger->tinc(l_msgr_send_messages_queue_lat,
662 ceph::mono_clock::now() -
663 out_entry.m->queue_start);
664 }
665
11fdf7f2
TL
666 r = write_message(out_entry.m, more);
667
668 connection->write_lock.lock();
669 if (r == 0) {
670 ;
671 } else if (r < 0) {
672 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
673 break;
9f95a23c
TL
674 } else if (r > 0) {
675 // Outbound message in-progress, thread will be re-awoken
676 // when the outbound socket is writeable again
11fdf7f2 677 break;
9f95a23c 678 }
11fdf7f2 679 } while (can_write);
494da23a 680 write_in_progress = false;
11fdf7f2
TL
681
682 // if r > 0 mean data still lefted, so no need _try_send.
683 if (r == 0) {
684 uint64_t left = ack_left;
685 if (left) {
11fdf7f2
TL
686 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
687 << " messages" << dendl;
801d1391
TL
688 auto ack_frame = AckFrame::Encode(in_seq);
689 if (append_frame(ack_frame)) {
690 ack_left -= left;
691 left = ack_left;
692 r = connection->_try_send(left);
693 } else {
694 r = -EILSEQ;
695 }
11fdf7f2
TL
696 } else if (is_queued()) {
697 r = connection->_try_send();
698 }
699 }
700 connection->write_lock.unlock();
701
702 connection->logger->tinc(l_msgr_running_send_time,
703 ceph::mono_clock::now() - start);
704 if (r < 0) {
705 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
706 connection->lock.lock();
707 fault();
708 connection->lock.unlock();
709 return;
710 }
711 } else {
494da23a 712 write_in_progress = false;
11fdf7f2
TL
713 connection->write_lock.unlock();
714 connection->lock.lock();
715 connection->write_lock.lock();
716 if (state == STANDBY && !connection->policy.server && is_queued()) {
717 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
718 if (server_cookie) { // only increment connect_seq if there is a session
719 connect_seq++;
720 }
721 connection->_connect();
722 } else if (connection->cs && state != NONE && state != CLOSED &&
723 state != START_CONNECT) {
724 r = connection->_try_send();
725 if (r < 0) {
726 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
727 connection->write_lock.unlock();
728 fault();
729 connection->lock.unlock();
730 return;
731 }
732 }
733 connection->write_lock.unlock();
734 connection->lock.unlock();
735 }
736}
737
738bool ProtocolV2::is_queued() {
739 return !out_queue.empty() || connection->is_queued();
740}
741
11fdf7f2
TL
742CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
743 rx_buffer_t &&buffer) {
744 const auto len = buffer->length();
745 const auto buf = buffer->c_str();
746 next.node = std::move(buffer);
747 ssize_t r = connection->read(len, buf,
748 [&next, this](char *buffer, int r) {
749 if (unlikely(pre_auth.enabled) && r >= 0) {
750 pre_auth.rxbuf.append(*next.node);
751 ceph_assert(!cct->_conf->ms_die_on_bug ||
adb31ebb 752 pre_auth.rxbuf.length() < 20000000);
11fdf7f2
TL
753 }
754 next.r = r;
755 run_continuation(next);
756 });
757 if (r <= 0) {
758 // error or done synchronously
f67539c2 759 if (unlikely(pre_auth.enabled) && r == 0) {
11fdf7f2
TL
760 pre_auth.rxbuf.append(*next.node);
761 ceph_assert(!cct->_conf->ms_die_on_bug ||
adb31ebb 762 pre_auth.rxbuf.length() < 20000000);
11fdf7f2
TL
763 }
764 next.r = r;
765 return &next;
766 }
767
768 return nullptr;
769}
770
771template <class F>
772CtPtr ProtocolV2::write(const std::string &desc,
773 CONTINUATION_TYPE<ProtocolV2> &next,
774 F &frame) {
801d1391
TL
775 ceph::bufferlist bl;
776 try {
f6b5b4d7 777 bl = frame.get_buffer(tx_frame_asm);
801d1391
TL
778 } catch (ceph::crypto::onwire::TxHandlerError &e) {
779 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
780 return _fault();
781 }
f6b5b4d7
TL
782
783 ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
784 << " bytes " << tx_frame_asm << dendl;
11fdf7f2
TL
785 return write(desc, next, bl);
786}
787
788CtPtr ProtocolV2::write(const std::string &desc,
789 CONTINUATION_TYPE<ProtocolV2> &next,
f67539c2 790 ceph::bufferlist &buffer) {
11fdf7f2
TL
791 if (unlikely(pre_auth.enabled)) {
792 pre_auth.txbuf.append(buffer);
793 ceph_assert(!cct->_conf->ms_die_on_bug ||
adb31ebb 794 pre_auth.txbuf.length() < 20000000);
11fdf7f2
TL
795 }
796
797 ssize_t r =
798 connection->write(buffer, [&next, desc, this](int r) {
799 if (r < 0) {
800 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
801 << " (" << cpp_strerror(r) << ")" << dendl;
802 connection->inject_delay();
803 _fault();
804 }
805 run_continuation(next);
806 });
807
808 if (r < 0) {
809 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
810 << " (" << cpp_strerror(r) << ")" << dendl;
811 return _fault();
812 } else if (r == 0) {
813 next.setParams();
814 return &next;
815 }
816
817 return nullptr;
818}
819
820CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
821 ldout(cct, 20) << __func__ << dendl;
822 bannerExchangeCallback = &callback;
823
f67539c2
TL
824 ceph::bufferlist banner_payload;
825 using ceph::encode;
11fdf7f2
TL
826 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
827 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
828
f67539c2 829 ceph::bufferlist bl;
11fdf7f2
TL
830 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
831 encode((uint16_t)banner_payload.length(), bl, 0);
832 bl.claim_append(banner_payload);
833
834 INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
835
836 return WRITE(bl, "banner", _wait_for_peer_banner);
837}
838
839CtPtr ProtocolV2::_wait_for_peer_banner() {
9f95a23c 840 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
11fdf7f2
TL
841 return READ(banner_len, _handle_peer_banner);
842}
843
844CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
845 ldout(cct, 20) << __func__ << " r=" << r << dendl;
846
847 if (r < 0) {
848 ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
849 << cpp_strerror(r) << ")" << dendl;
850 return _fault();
851 }
852
853 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
854
855 if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
856 if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
857 lderr(cct) << __func__ << " peer " << *connection->peer_addrs
858 << " is using msgr V1 protocol" << dendl;
859 return _fault();
860 }
861 ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
862 return _fault();
863 }
864
865 uint16_t payload_len;
f67539c2 866 ceph::bufferlist bl;
11fdf7f2 867 buffer->set_offset(banner_prefix_len);
9f95a23c 868 buffer->set_length(sizeof(ceph_le16));
11fdf7f2
TL
869 bl.push_back(std::move(buffer));
870 auto ti = bl.cbegin();
f67539c2 871 using ceph::decode;
11fdf7f2
TL
872 try {
873 decode(payload_len, ti);
f67539c2 874 } catch (const ceph::buffer::error &e) {
11fdf7f2
TL
875 lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
876 return _fault();
877 }
878
879 INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
880
881 return READ(payload_len, _handle_peer_banner_payload);
882}
883
884CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
885 ldout(cct, 20) << __func__ << " r=" << r << dendl;
886
887 if (r < 0) {
888 ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
889 << " (" << cpp_strerror(r) << ")" << dendl;
890 return _fault();
891 }
892
893 uint64_t peer_supported_features;
894 uint64_t peer_required_features;
895
f67539c2
TL
896 ceph::bufferlist bl;
897 using ceph::decode;
11fdf7f2
TL
898 bl.push_back(std::move(buffer));
899 auto ti = bl.cbegin();
900 try {
901 decode(peer_supported_features, ti);
902 decode(peer_required_features, ti);
f67539c2 903 } catch (const ceph::buffer::error &e) {
11fdf7f2
TL
904 lderr(cct) << __func__ << " decode banner payload failed " << dendl;
905 return _fault();
906 }
907
908 ldout(cct, 1) << __func__ << " supported=" << std::hex
909 << peer_supported_features << " required=" << std::hex
910 << peer_required_features << std::dec << dendl;
911
912 // Check feature bit compatibility
913
914 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
915 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
916
917 if ((required_features & peer_supported_features) != required_features) {
918 ldout(cct, 1) << __func__ << " peer does not support all required features"
919 << " required=" << std::hex << required_features
920 << " supported=" << std::hex << peer_supported_features
921 << std::dec << dendl;
922 stop();
923 connection->dispatch_queue->queue_reset(connection);
924 return nullptr;
925 }
926 if ((supported_features & peer_required_features) != peer_required_features) {
927 ldout(cct, 1) << __func__ << " we do not support all peer required features"
928 << " required=" << std::hex << peer_required_features
929 << " supported=" << supported_features << std::dec << dendl;
930 stop();
931 connection->dispatch_queue->queue_reset(connection);
932 return nullptr;
933 }
934
f6b5b4d7
TL
935 this->peer_supported_features = peer_supported_features;
936 if (peer_required_features == 0) {
11fdf7f2
TL
937 this->connection_features = msgr2_required;
938 }
939
f6b5b4d7
TL
940 // if the peer supports msgr2.1, switch to it
941 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
942 tx_frame_asm.set_is_rev1(is_rev1);
943 rx_frame_asm.set_is_rev1(is_rev1);
11fdf7f2
TL
944
945 if (state == BANNER_CONNECTING) {
946 state = HELLO_CONNECTING;
947 }
948 else {
949 ceph_assert(state == BANNER_ACCEPTING);
950 state = HELLO_ACCEPTING;
951 }
952
953 auto hello = HelloFrame::Encode(messenger->get_mytype(),
954 connection->target_addr);
955
956 INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
957
958 return WRITE(hello, "hello frame", read_frame);
959}
960
961CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
962{
963 ldout(cct, 20) << __func__
964 << " payload.length()=" << payload.length() << dendl;
965
966 if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
967 lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
968 return _fault();
969 }
970
971 auto hello = HelloFrame::Decode(payload);
972
973 ldout(cct, 5) << __func__ << " received hello:"
974 << " peer_type=" << (int)hello.entity_type()
975 << " peer_addr_for_me=" << hello.peer_addr() << dendl;
976
81eedcae
TL
977 sockaddr_storage ss;
978 socklen_t len = sizeof(ss);
979 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
980 ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
981 << " when talking to " << connection->target_addr << dendl;
982
11fdf7f2
TL
983 if (connection->get_peer_type() == -1) {
984 connection->set_peer_type(hello.entity_type());
985
986 ceph_assert(state == HELLO_ACCEPTING);
987 connection->policy = messenger->get_policy(hello.entity_type());
988 ldout(cct, 10) << __func__ << " accept of host_type "
989 << (int)hello.entity_type()
990 << ", policy.lossy=" << connection->policy.lossy
991 << " policy.server=" << connection->policy.server
992 << " policy.standby=" << connection->policy.standby
993 << " policy.resetcheck=" << connection->policy.resetcheck
994 << dendl;
995 } else {
996 ceph_assert(state == HELLO_CONNECTING);
997 if (connection->get_peer_type() != hello.entity_type()) {
998 ldout(cct, 1) << __func__ << " connection peer type does not match what"
999 << " peer advertises " << connection->get_peer_type()
1000 << " != " << (int)hello.entity_type() << dendl;
1001 stop();
1002 connection->dispatch_queue->queue_reset(connection);
1003 return nullptr;
1004 }
1005 }
1006
81eedcae
TL
1007 if (messenger->get_myaddrs().empty() ||
1008 messenger->get_myaddrs().front().is_blank_ip()) {
1009 entity_addr_t a;
1010 if (cct->_conf->ms_learn_addr_from_peer) {
1011 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
1012 << " says I am " << hello.peer_addr() << " (socket says "
1013 << (sockaddr*)&ss << ")" << dendl;
1014 a = hello.peer_addr();
1015 } else {
1016 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
1017 << " says I am " << (sockaddr*)&ss
1018 << " (peer says " << hello.peer_addr() << ")" << dendl;
1019 a.set_sockaddr((sockaddr *)&ss);
1020 }
1021 a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
1022 a.set_port(0);
1023 connection->lock.unlock();
1024 messenger->learned_addr(a);
1025 if (cct->_conf->ms_inject_internal_delays &&
1026 cct->_conf->ms_inject_socket_failures) {
1027 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1028 ldout(cct, 10) << __func__ << " sleep for "
1029 << cct->_conf->ms_inject_internal_delays << dendl;
1030 utime_t t;
1031 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1032 t.sleep();
1033 }
1034 }
1035 connection->lock.lock();
1036 if (state != HELLO_CONNECTING) {
1037 ldout(cct, 1) << __func__
1038 << " state changed while learned_addr, mark_down or "
1039 << " replacing must be happened just now" << dendl;
1040 return nullptr;
1041 }
1042 }
1043
1044
1045
11fdf7f2
TL
1046 CtPtr callback;
1047 callback = bannerExchangeCallback;
1048 bannerExchangeCallback = nullptr;
1049 ceph_assert(callback);
1050 return callback;
1051}
1052
1053CtPtr ProtocolV2::read_frame() {
1054 if (state == CLOSED) {
1055 return nullptr;
1056 }
1057
1058 ldout(cct, 20) << __func__ << dendl;
f6b5b4d7
TL
1059 rx_preamble.clear();
1060 rx_epilogue.clear();
1061 rx_segments_data.clear();
1062
1063 return READ(rx_frame_asm.get_preamble_onwire_len(),
1064 handle_read_frame_preamble_main);
11fdf7f2
TL
1065}
1066
1067CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
1068 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1069
1070 if (r < 0) {
f6b5b4d7 1071 ldout(cct, 1) << __func__ << " read frame preamble failed r=" << r
11fdf7f2
TL
1072 << " (" << cpp_strerror(r) << ")" << dendl;
1073 return _fault();
1074 }
1075
f6b5b4d7 1076 rx_preamble.push_back(std::move(buffer));
11fdf7f2
TL
1077
1078 ldout(cct, 30) << __func__ << " preamble\n";
f6b5b4d7 1079 rx_preamble.hexdump(*_dout);
11fdf7f2
TL
1080 *_dout << dendl;
1081
f6b5b4d7
TL
1082 try {
1083 next_tag = rx_frame_asm.disassemble_preamble(rx_preamble);
1084 } catch (FrameError& e) {
1085 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
1086 return _fault();
1087 } catch (ceph::crypto::onwire::MsgAuthError&) {
1088 ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
1089 return _fault();
1090 }
11fdf7f2 1091
f6b5b4d7
TL
1092 ldout(cct, 25) << __func__ << " disassembled preamble " << rx_frame_asm
1093 << dendl;
11fdf7f2 1094
f6b5b4d7 1095 if (session_stream_handlers.rx) {
11fdf7f2 1096 ldout(cct, 30) << __func__ << " preamble after decrypt\n";
f6b5b4d7 1097 rx_preamble.hexdump(*_dout);
11fdf7f2
TL
1098 *_dout << dendl;
1099 }
1100
11fdf7f2
TL
1101 // does it need throttle?
1102 if (next_tag == Tag::MESSAGE) {
1103 if (state != READY) {
1104 lderr(cct) << __func__ << " not in ready state!" << dendl;
1105 return _fault();
1106 }
1107 state = THROTTLE_MESSAGE;
1108 return CONTINUE(throttle_message);
1109 } else {
1110 return read_frame_segment();
1111 }
1112}
1113
1114CtPtr ProtocolV2::handle_read_frame_dispatch() {
1115 ldout(cct, 10) << __func__
1116 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1117
1118 switch (next_tag) {
1119 case Tag::HELLO:
1120 case Tag::AUTH_REQUEST:
1121 case Tag::AUTH_BAD_METHOD:
1122 case Tag::AUTH_REPLY_MORE:
1123 case Tag::AUTH_REQUEST_MORE:
1124 case Tag::AUTH_DONE:
1125 case Tag::AUTH_SIGNATURE:
1126 case Tag::CLIENT_IDENT:
1127 case Tag::SERVER_IDENT:
1128 case Tag::IDENT_MISSING_FEATURES:
1129 case Tag::SESSION_RECONNECT:
1130 case Tag::SESSION_RESET:
1131 case Tag::SESSION_RETRY:
1132 case Tag::SESSION_RETRY_GLOBAL:
1133 case Tag::SESSION_RECONNECT_OK:
1134 case Tag::KEEPALIVE2:
1135 case Tag::KEEPALIVE2_ACK:
1136 case Tag::ACK:
1137 case Tag::WAIT:
1138 return handle_frame_payload();
1139 case Tag::MESSAGE:
1140 return handle_message();
1141 default: {
1142 lderr(cct) << __func__
1143 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1144 << dendl;
1145 return _fault();
1146 }
1147 }
1148
1149 return nullptr;
1150}
1151
1152CtPtr ProtocolV2::read_frame_segment() {
f6b5b4d7
TL
1153 size_t seg_idx = rx_segments_data.size();
1154 ldout(cct, 20) << __func__ << " seg_idx=" << seg_idx << dendl;
1155 rx_segments_data.emplace_back();
1156
1157 uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
1158 if (onwire_len == 0) {
1159 return _handle_read_frame_segment();
1160 }
11fdf7f2 1161
11fdf7f2 1162 rx_buffer_t rx_buffer;
f6b5b4d7 1163 uint16_t align = rx_frame_asm.get_segment_align(seg_idx);
11fdf7f2 1164 try {
f67539c2 1165 rx_buffer = ceph::buffer::ptr_node::create(ceph::buffer::create_aligned(
f6b5b4d7 1166 onwire_len, align));
b3b6e05e 1167 } catch (const ceph::buffer::bad_alloc&) {
11fdf7f2 1168 // Catching because of potential issues with satisfying alignment.
f6b5b4d7
TL
1169 ldout(cct, 1) << __func__ << " can't allocate aligned rx_buffer"
1170 << " len=" << onwire_len
1171 << " align=" << align
1172 << dendl;
11fdf7f2
TL
1173 return _fault();
1174 }
1175
1176 return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1177}
1178
1179CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1180 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1181
1182 if (r < 0) {
1183 ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1184 << cpp_strerror(r) << ")" << dendl;
1185 return _fault();
1186 }
1187
11fdf7f2 1188 rx_segments_data.back().push_back(std::move(rx_buffer));
f6b5b4d7
TL
1189 return _handle_read_frame_segment();
1190}
11fdf7f2 1191
f6b5b4d7
TL
1192CtPtr ProtocolV2::_handle_read_frame_segment() {
1193 if (rx_segments_data.size() == rx_frame_asm.get_num_segments()) {
11fdf7f2 1194 // OK, all segments planned to read are read. Can go with epilogue.
f6b5b4d7
TL
1195 uint32_t epilogue_onwire_len = rx_frame_asm.get_epilogue_onwire_len();
1196 if (epilogue_onwire_len == 0) {
1197 return _handle_read_frame_epilogue_main();
1198 }
1199 return READ(epilogue_onwire_len, handle_read_frame_epilogue_main);
11fdf7f2 1200 }
f6b5b4d7
TL
1201 // TODO: for makeshift only. This will be more generic and throttled
1202 return read_frame_segment();
11fdf7f2
TL
1203}
1204
1205CtPtr ProtocolV2::handle_frame_payload() {
1206 ceph_assert(!rx_segments_data.empty());
1207 auto& payload = rx_segments_data.back();
1208
1209 ldout(cct, 30) << __func__ << "\n";
1210 payload.hexdump(*_dout);
1211 *_dout << dendl;
1212
1213 switch (next_tag) {
1214 case Tag::HELLO:
1215 return handle_hello(payload);
1216 case Tag::AUTH_REQUEST:
1217 return handle_auth_request(payload);
1218 case Tag::AUTH_BAD_METHOD:
1219 return handle_auth_bad_method(payload);
1220 case Tag::AUTH_REPLY_MORE:
1221 return handle_auth_reply_more(payload);
1222 case Tag::AUTH_REQUEST_MORE:
1223 return handle_auth_request_more(payload);
1224 case Tag::AUTH_DONE:
1225 return handle_auth_done(payload);
1226 case Tag::AUTH_SIGNATURE:
1227 return handle_auth_signature(payload);
1228 case Tag::CLIENT_IDENT:
1229 return handle_client_ident(payload);
1230 case Tag::SERVER_IDENT:
1231 return handle_server_ident(payload);
1232 case Tag::IDENT_MISSING_FEATURES:
1233 return handle_ident_missing_features(payload);
1234 case Tag::SESSION_RECONNECT:
1235 return handle_reconnect(payload);
1236 case Tag::SESSION_RESET:
1237 return handle_session_reset(payload);
1238 case Tag::SESSION_RETRY:
1239 return handle_session_retry(payload);
1240 case Tag::SESSION_RETRY_GLOBAL:
1241 return handle_session_retry_global(payload);
1242 case Tag::SESSION_RECONNECT_OK:
1243 return handle_reconnect_ok(payload);
1244 case Tag::KEEPALIVE2:
1245 return handle_keepalive2(payload);
1246 case Tag::KEEPALIVE2_ACK:
1247 return handle_keepalive2_ack(payload);
1248 case Tag::ACK:
1249 return handle_message_ack(payload);
1250 case Tag::WAIT:
1251 return handle_wait(payload);
1252 default:
1253 ceph_abort();
1254 }
1255 return nullptr;
1256}
1257
1258CtPtr ProtocolV2::ready() {
1259 ldout(cct, 25) << __func__ << dendl;
1260
1261 reconnecting = false;
1262 replacing = false;
1263
1264 // make sure no pending tick timer
1265 if (connection->last_tick_id) {
1266 connection->center->delete_time_event(connection->last_tick_id);
1267 }
1268 connection->last_tick_id = connection->center->create_time_event(
1269 connection->inactive_timeout_us, connection->tick_handler);
1270
1271 {
1272 std::lock_guard<std::mutex> l(connection->write_lock);
1273 can_write = true;
1274 if (!out_queue.empty()) {
1275 connection->center->dispatch_event_external(connection->write_handler);
1276 }
1277 }
1278
1279 connection->maybe_start_delay_thread();
1280
1281 state = READY;
1282 ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1283 << std::hex << client_cookie << " server_cookie="
1284 << server_cookie << std::dec << " in_seq=" << in_seq
1285 << " out_seq=" << out_seq << dendl;
1286
1287 INTERCEPT(15);
1288
1289 return CONTINUE(read_frame);
1290}
1291
1292CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1293{
1294 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1295
1296 if (r < 0) {
f6b5b4d7
TL
1297 ldout(cct, 1) << __func__ << " read frame epilogue failed r=" << r
1298 << " (" << cpp_strerror(r) << ")" << dendl;
11fdf7f2
TL
1299 return _fault();
1300 }
1301
f6b5b4d7
TL
1302 rx_epilogue.push_back(std::move(buffer));
1303 return _handle_read_frame_epilogue_main();
1304}
11fdf7f2 1305
f6b5b4d7
TL
1306CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
1307 bool aborted;
1308 try {
1309 rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
1310 aborted = !rx_frame_asm.disassemble_remaining_segments(
1311 rx_segments_data.data(), rx_epilogue);
1312 } catch (FrameError& e) {
1313 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
1314 return _fault();
1315 } catch (ceph::crypto::onwire::MsgAuthError&) {
1316 ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
1317 return _fault();
11fdf7f2
TL
1318 }
1319
1320 // we do have a mechanism that allows transmitter to start sending message
1321 // and abort after putting entire data field on wire. This will be used by
1322 // the kernel client to avoid unnecessary buffering.
f6b5b4d7 1323 if (aborted) {
11fdf7f2
TL
1324 reset_throttle();
1325 state = READY;
1326 return CONTINUE(read_frame);
11fdf7f2 1327 }
f6b5b4d7 1328 return handle_read_frame_dispatch();
11fdf7f2
TL
1329}
1330
1331CtPtr ProtocolV2::handle_message() {
1332 ldout(cct, 20) << __func__ << dendl;
1333 ceph_assert(state == THROTTLE_DONE);
1334
9f95a23c
TL
1335#if defined(WITH_EVENTTRACE)
1336 utime_t ltt_recv_stamp = ceph_clock_now();
11fdf7f2
TL
1337#endif
1338 recv_stamp = ceph_clock_now();
1339
11fdf7f2 1340 const size_t cur_msg_size = get_current_msg_size();
f6b5b4d7 1341 auto msg_frame = MessageFrame::Decode(rx_segments_data);
11fdf7f2
TL
1342
1343 // XXX: paranoid copy just to avoid oops
1344 ceph_msg_header2 current_header = msg_frame.header();
1345
1346 ldout(cct, 5) << __func__
1347 << " got " << msg_frame.front_len()
1348 << " + " << msg_frame.middle_len()
1349 << " + " << msg_frame.data_len()
1350 << " byte message."
1351 << " envelope type=" << current_header.type
1352 << " src " << peer_name
1353 << " off " << current_header.data_off
1354 << dendl;
1355
1356 INTERCEPT(16);
1357 ceph_msg_header header{current_header.seq,
1358 current_header.tid,
1359 current_header.type,
1360 current_header.priority,
1361 current_header.version,
eafe8130
TL
1362 init_le32(msg_frame.front_len()),
1363 init_le32(msg_frame.middle_len()),
1364 init_le32(msg_frame.data_len()),
11fdf7f2
TL
1365 current_header.data_off,
1366 peer_name,
1367 current_header.compat_version,
1368 current_header.reserved,
eafe8130
TL
1369 init_le32(0)};
1370 ceph_msg_footer footer{init_le32(0), init_le32(0),
1371 init_le32(0), init_le64(0), current_header.flags};
11fdf7f2
TL
1372
1373 Message *message = decode_message(cct, 0, header, footer,
1374 msg_frame.front(),
1375 msg_frame.middle(),
1376 msg_frame.data(),
1377 connection);
1378 if (!message) {
1379 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1380 return _fault();
1381 } else {
1382 state = READ_MESSAGE_COMPLETE;
1383 }
1384
1385 INTERCEPT(17);
1386
1387 message->set_byte_throttler(connection->policy.throttler_bytes);
1388 message->set_message_throttler(connection->policy.throttler_messages);
1389
1390 // store reservation size in message, so we don't get confused
1391 // by messages entering the dispatch queue through other paths.
1392 message->set_dispatch_throttle_size(cur_msg_size);
1393
1394 message->set_recv_stamp(recv_stamp);
1395 message->set_throttle_stamp(throttle_stamp);
1396 message->set_recv_complete_stamp(ceph_clock_now());
1397
1398 // check received seq#. if it is old, drop the message.
1399 // note that incoming messages may skip ahead. this is convenient for the
1400 // client side queueing because messages can't be renumbered, but the (kernel)
1401 // client will occasionally pull a message out of the sent queue to send
1402 // elsewhere. in that case it doesn't matter if we "got" it or not.
1403 uint64_t cur_seq = in_seq;
1404 if (message->get_seq() <= cur_seq) {
1405 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1406 << " <= " << cur_seq << " " << message << " " << *message
1407 << ", discarding" << dendl;
1408 message->put();
1409 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1410 cct->_conf->ms_die_on_old_message) {
1411 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1412 }
1413 return nullptr;
1414 }
1415 if (message->get_seq() > cur_seq + 1) {
1416 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
1417 << cur_seq << " to " << message->get_seq() << dendl;
1418 if (cct->_conf->ms_die_on_skipped_message) {
1419 ceph_assert(0 == "skipped incoming seq");
1420 }
1421 }
1422
9f95a23c 1423#if defined(WITH_EVENTTRACE)
11fdf7f2
TL
1424 if (message->get_type() == CEPH_MSG_OSD_OP ||
1425 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1426 utime_t ltt_processed_stamp = ceph_clock_now();
1427 double usecs_elapsed =
1428 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
1429 ostringstream buf;
1430 if (message->get_type() == CEPH_MSG_OSD_OP)
1431 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1432 false);
1433 else
1434 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1435 false);
1436 }
1437#endif
1438
1439 // note last received message.
1440 in_seq = message->get_seq();
1441 ldout(cct, 5) << __func__ << " received message m=" << message
1442 << " seq=" << message->get_seq()
1443 << " from=" << message->get_source() << " type=" << header.type
1444 << " " << *message << dendl;
1445
1446 bool need_dispatch_writer = false;
1447 if (!connection->policy.lossy) {
1448 ack_left++;
1449 need_dispatch_writer = true;
1450 }
1451
1452 state = READY;
1453
9f95a23c
TL
1454 ceph::mono_time fast_dispatch_time;
1455
1456 if (connection->is_blackhole()) {
1457 ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1458 message->put();
1459 goto out;
1460 }
1461
11fdf7f2 1462 connection->logger->inc(l_msgr_recv_messages);
f6b5b4d7
TL
1463 connection->logger->inc(l_msgr_recv_bytes,
1464 rx_frame_asm.get_frame_onwire_len());
11fdf7f2
TL
1465
1466 messenger->ms_fast_preprocess(message);
9f95a23c 1467 fast_dispatch_time = ceph::mono_clock::now();
11fdf7f2 1468 connection->logger->tinc(l_msgr_running_recv_time,
9f95a23c 1469 fast_dispatch_time - connection->recv_start_time);
11fdf7f2
TL
1470 if (connection->delay_state) {
1471 double delay_period = 0;
1472 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1473 delay_period =
1474 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1475 ldout(cct, 1) << "queue_received will delay after "
1476 << (ceph_clock_now() + delay_period) << " on " << message
1477 << " " << *message << dendl;
1478 }
1479 connection->delay_state->queue(delay_period, message);
1480 } else if (messenger->ms_can_fast_dispatch(message)) {
1481 connection->lock.unlock();
1482 connection->dispatch_queue->fast_dispatch(message);
1483 connection->recv_start_time = ceph::mono_clock::now();
1484 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1485 connection->recv_start_time - fast_dispatch_time);
1486 connection->lock.lock();
9f95a23c
TL
1487 // we might have been reused by another connection
1488 // let's check if that is the case
1489 if (state != READY) {
1490 // yes, that was the case, let's do nothing
1491 return nullptr;
1492 }
11fdf7f2
TL
1493 } else {
1494 connection->dispatch_queue->enqueue(message, message->get_priority(),
1495 connection->conn_id);
1496 }
1497
1498 handle_message_ack(current_header.ack_seq);
1499
9f95a23c 1500 out:
11fdf7f2
TL
1501 if (need_dispatch_writer && connection->is_connected()) {
1502 connection->center->dispatch_event_external(connection->write_handler);
1503 }
1504
1505 return CONTINUE(read_frame);
1506}
1507
1508
1509CtPtr ProtocolV2::throttle_message() {
1510 ldout(cct, 20) << __func__ << dendl;
1511
1512 if (connection->policy.throttler_messages) {
1513 ldout(cct, 10) << __func__ << " wants " << 1
1514 << " message from policy throttler "
1515 << connection->policy.throttler_messages->get_current()
1516 << "/" << connection->policy.throttler_messages->get_max()
1517 << dendl;
1518 if (!connection->policy.throttler_messages->get_or_fail()) {
1519 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
1520 << connection->policy.throttler_messages->get_current()
1521 << "/" << connection->policy.throttler_messages->get_max()
1522 << " failed, just wait." << dendl;
1523 // following thread pool deal with th full message queue isn't a
1524 // short time, so we can wait a ms.
1525 if (connection->register_time_events.empty()) {
1526 connection->register_time_events.insert(
1527 connection->center->create_time_event(1000,
1528 connection->wakeup_handler));
1529 }
1530 return nullptr;
1531 }
1532 }
1533
1534 state = THROTTLE_BYTES;
1535 return CONTINUE(throttle_bytes);
1536}
1537
1538CtPtr ProtocolV2::throttle_bytes() {
1539 ldout(cct, 20) << __func__ << dendl;
1540
1541 const size_t cur_msg_size = get_current_msg_size();
1542 if (cur_msg_size) {
1543 if (connection->policy.throttler_bytes) {
1544 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1545 << " bytes from policy throttler "
1546 << connection->policy.throttler_bytes->get_current() << "/"
1547 << connection->policy.throttler_bytes->get_max() << dendl;
1548 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
1549 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1550 << " bytes from policy throttler "
1551 << connection->policy.throttler_bytes->get_current()
1552 << "/" << connection->policy.throttler_bytes->get_max()
1553 << " failed, just wait." << dendl;
1554 // following thread pool deal with th full message queue isn't a
1555 // short time, so we can wait a ms.
1556 if (connection->register_time_events.empty()) {
1557 connection->register_time_events.insert(
1558 connection->center->create_time_event(
1559 1000, connection->wakeup_handler));
1560 }
1561 return nullptr;
1562 }
1563 }
1564 }
1565
1566 state = THROTTLE_DISPATCH_QUEUE;
1567 return CONTINUE(throttle_dispatch_queue);
1568}
1569
1570CtPtr ProtocolV2::throttle_dispatch_queue() {
1571 ldout(cct, 20) << __func__ << dendl;
1572
1573 const size_t cur_msg_size = get_current_msg_size();
1574 if (cur_msg_size) {
1575 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1576 cur_msg_size)) {
1577 ldout(cct, 10)
1578 << __func__ << " wants " << cur_msg_size
1579 << " bytes from dispatch throttle "
1580 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1581 << connection->dispatch_queue->dispatch_throttler.get_max()
1582 << " failed, just wait." << dendl;
1583 // following thread pool deal with th full message queue isn't a
1584 // short time, so we can wait a ms.
1585 if (connection->register_time_events.empty()) {
1586 connection->register_time_events.insert(
1587 connection->center->create_time_event(1000,
1588 connection->wakeup_handler));
1589 }
1590 return nullptr;
1591 }
1592 }
1593
1594 throttle_stamp = ceph_clock_now();
1595 state = THROTTLE_DONE;
1596
1597 return read_frame_segment();
1598}
1599
1600CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1601{
1602 ldout(cct, 20) << __func__
1603 << " payload.length()=" << payload.length() << dendl;
1604
1605 if (state != READY) {
1606 lderr(cct) << __func__ << " not in ready state!" << dendl;
1607 return _fault();
1608 }
1609
1610 auto keepalive_frame = KeepAliveFrame::Decode(payload);
1611
1612 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1613
1614 connection->write_lock.lock();
801d1391
TL
1615 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(keepalive_frame.timestamp());
1616 if (!append_frame(keepalive_ack_frame)) {
1617 connection->write_lock.unlock();
1618 return _fault();
1619 }
11fdf7f2
TL
1620 connection->write_lock.unlock();
1621
1622 ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1623 << keepalive_frame.timestamp() << dendl;
1624 connection->set_last_keepalive(ceph_clock_now());
1625
1626 if (is_connected()) {
1627 connection->center->dispatch_event_external(connection->write_handler);
1628 }
1629
1630 return CONTINUE(read_frame);
1631}
1632
1633CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1634{
1635 ldout(cct, 20) << __func__
1636 << " payload.length()=" << payload.length() << dendl;
1637
1638 if (state != READY) {
1639 lderr(cct) << __func__ << " not in ready state!" << dendl;
1640 return _fault();
1641 }
1642
1643 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1644 connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1645 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1646
1647 return CONTINUE(read_frame);
1648}
1649
1650CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1651{
1652 ldout(cct, 20) << __func__
1653 << " payload.length()=" << payload.length() << dendl;
1654
1655 if (state != READY) {
1656 lderr(cct) << __func__ << " not in ready state!" << dendl;
1657 return _fault();
1658 }
1659
1660 auto ack = AckFrame::Decode(payload);
1661 handle_message_ack(ack.seq());
1662 return CONTINUE(read_frame);
1663}
1664
1665/* Client Protocol Methods */
1666
1667CtPtr ProtocolV2::start_client_banner_exchange() {
1668 ldout(cct, 20) << __func__ << dendl;
1669
1670 INTERCEPT(1);
1671
1672 state = BANNER_CONNECTING;
1673
1674 global_seq = messenger->get_global_seq();
1675
1676 return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1677}
1678
1679CtPtr ProtocolV2::post_client_banner_exchange() {
1680 ldout(cct, 20) << __func__ << dendl;
1681
1682 state = AUTH_CONNECTING;
1683
1684 return send_auth_request();
1685}
1686
1687CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
9f95a23c 1688 ceph_assert(messenger->auth_client);
11fdf7f2
TL
1689 ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1690 << " auth_client " << messenger->auth_client << dendl;
11fdf7f2 1691
f67539c2
TL
1692 ceph::bufferlist bl;
1693 std::vector<uint32_t> preferred_modes;
11fdf7f2
TL
1694 auto am = auth_meta;
1695 connection->lock.unlock();
1696 int r = messenger->auth_client->get_auth_request(
1697 connection, am.get(),
1698 &am->auth_method, &preferred_modes, &bl);
1699 connection->lock.lock();
1700 if (state != AUTH_CONNECTING) {
1701 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1702 return _fault();
1703 }
1704 if (r < 0) {
1705 ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1706 << dendl;
1707 stop();
1708 connection->dispatch_queue->queue_reset(connection);
1709 return nullptr;
1710 }
1711
1712 INTERCEPT(9);
1713
1714 auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1715 bl);
1716 return WRITE(frame, "auth request", read_frame);
1717}
1718
1719CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1720 ldout(cct, 20) << __func__
1721 << " payload.length()=" << payload.length() << dendl;
1722
1723 if (state != AUTH_CONNECTING) {
1724 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1725 return _fault();
1726 }
1727
1728 auto bad_method = AuthBadMethodFrame::Decode(payload);
1729 ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1730 << " result " << cpp_strerror(bad_method.result())
1731 << ", allowed methods=" << bad_method.allowed_methods()
1732 << ", allowed modes=" << bad_method.allowed_modes()
1733 << dendl;
1734 ceph_assert(messenger->auth_client);
1735 auto am = auth_meta;
1736 connection->lock.unlock();
1737 int r = messenger->auth_client->handle_auth_bad_method(
1738 connection,
1739 am.get(),
1740 bad_method.method(), bad_method.result(),
1741 bad_method.allowed_methods(),
1742 bad_method.allowed_modes());
1743 connection->lock.lock();
1744 if (state != AUTH_CONNECTING || r < 0) {
1745 return _fault();
1746 }
1747 return send_auth_request(bad_method.allowed_methods());
1748}
1749
1750CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1751{
1752 ldout(cct, 20) << __func__
1753 << " payload.length()=" << payload.length() << dendl;
1754
1755 if (state != AUTH_CONNECTING) {
1756 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1757 return _fault();
1758 }
1759
1760 auto auth_more = AuthReplyMoreFrame::Decode(payload);
1761 ldout(cct, 5) << __func__
1762 << " auth reply more len=" << auth_more.auth_payload().length()
1763 << dendl;
1764 ceph_assert(messenger->auth_client);
1765 ceph::bufferlist reply;
1766 auto am = auth_meta;
1767 connection->lock.unlock();
1768 int r = messenger->auth_client->handle_auth_reply_more(
1769 connection, am.get(), auth_more.auth_payload(), &reply);
1770 connection->lock.lock();
1771 if (state != AUTH_CONNECTING) {
1772 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1773 return _fault();
1774 }
1775 if (r < 0) {
1776 lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1777 << r << dendl;
1778 return _fault();
1779 }
1780 auto more_reply = AuthRequestMoreFrame::Encode(reply);
1781 return WRITE(more_reply, "auth request more", read_frame);
1782}
1783
1784CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1785{
1786 ldout(cct, 20) << __func__
1787 << " payload.length()=" << payload.length() << dendl;
1788
1789 if (state != AUTH_CONNECTING) {
1790 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1791 return _fault();
1792 }
1793
1794 auto auth_done = AuthDoneFrame::Decode(payload);
1795
1796 ceph_assert(messenger->auth_client);
1797 auto am = auth_meta;
1798 connection->lock.unlock();
1799 int r = messenger->auth_client->handle_auth_done(
1800 connection,
1801 am.get(),
1802 auth_done.global_id(),
1803 auth_done.con_mode(),
1804 auth_done.auth_payload(),
1805 &am->session_key,
1806 &am->connection_secret);
1807 connection->lock.lock();
1808 if (state != AUTH_CONNECTING) {
1809 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1810 return _fault();
1811 }
1812 if (r < 0) {
1813 return _fault();
1814 }
1815 auth_meta->con_mode = auth_done.con_mode();
f6b5b4d7
TL
1816 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
1817 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
1818 cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/false);
11fdf7f2
TL
1819
1820 state = AUTH_CONNECTING_SIGN;
1821
1822 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1823 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1824 auto sig_frame = AuthSignatureFrame::Encode(sig);
1825 pre_auth.enabled = false;
1826 pre_auth.rxbuf.clear();
1827 return WRITE(sig_frame, "auth signature", read_frame);
1828}
1829
1830CtPtr ProtocolV2::finish_client_auth() {
1831 if (!server_cookie) {
1832 ceph_assert(connect_seq == 0);
1833 state = SESSION_CONNECTING;
1834 return send_client_ident();
1835 } else { // reconnecting to previous session
1836 state = SESSION_RECONNECTING;
1837 ceph_assert(connect_seq > 0);
1838 return send_reconnect();
1839 }
1840}
1841
1842CtPtr ProtocolV2::send_client_ident() {
1843 ldout(cct, 20) << __func__ << dendl;
1844
1845 if (!connection->policy.lossy && !client_cookie) {
1846 client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1847 }
1848
1849 uint64_t flags = 0;
1850 if (connection->policy.lossy) {
1851 flags |= CEPH_MSG_CONNECT_LOSSY;
1852 }
1853
11fdf7f2
TL
1854 auto client_ident = ClientIdentFrame::Encode(
1855 messenger->get_myaddrs(),
1856 connection->target_addr,
1857 messenger->get_myname().num(),
1858 global_seq,
1859 connection->policy.features_supported,
1860 connection->policy.features_required | msgr2_required,
1861 flags,
1862 client_cookie);
1863
1864 ldout(cct, 5) << __func__ << " sending identification: "
1865 << "addrs=" << messenger->get_myaddrs()
1866 << " target=" << connection->target_addr
1867 << " gid=" << messenger->get_myname().num()
1868 << " global_seq=" << global_seq
1869 << " features_supported=" << std::hex
1870 << connection->policy.features_supported
1871 << " features_required="
1872 << (connection->policy.features_required | msgr2_required)
1873 << " flags=" << flags
1874 << " cookie=" << client_cookie << std::dec << dendl;
1875
1876 INTERCEPT(11);
1877
1878 return WRITE(client_ident, "client ident", read_frame);
1879}
1880
1881CtPtr ProtocolV2::send_reconnect() {
1882 ldout(cct, 20) << __func__ << dendl;
1883
1884 auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1885 client_cookie,
1886 server_cookie,
1887 global_seq,
1888 connect_seq,
1889 in_seq);
1890
1891 ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1892 << std::hex << client_cookie << " server_cookie="
1893 << server_cookie << std::dec
1894 << " gs=" << global_seq << " cs=" << connect_seq
1895 << " ms=" << in_seq << dendl;
1896
1897 INTERCEPT(13);
1898
1899 return WRITE(reconnect, "reconnect", read_frame);
1900}
1901
1902CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1903{
1904 ldout(cct, 20) << __func__
1905 << " payload.length()=" << payload.length() << dendl;
1906
1907 if (state != SESSION_CONNECTING) {
1908 lderr(cct) << __func__ << " not in session connect state!" << dendl;
1909 return _fault();
1910 }
1911
1912 auto ident_missing =
1913 IdentMissingFeaturesFrame::Decode(payload);
1914 lderr(cct) << __func__
1915 << " client does not support all server features: " << std::hex
1916 << ident_missing.features() << std::dec << dendl;
1917
1918 return _fault();
1919}
1920
1921CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
1922{
1923 ldout(cct, 20) << __func__
1924 << " payload.length()=" << payload.length() << dendl;
1925
1926 if (state != SESSION_RECONNECTING) {
1927 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1928 return _fault();
1929 }
1930
1931 auto reset = ResetFrame::Decode(payload);
1932
1933 ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
1934 << dendl;
1935 if (reset.full()) {
1936 reset_session();
1937 } else {
1938 server_cookie = 0;
1939 connect_seq = 0;
1940 in_seq = 0;
1941 }
1942
1943 state = SESSION_CONNECTING;
1944 return send_client_ident();
1945}
1946
1947CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
1948{
1949 ldout(cct, 20) << __func__
1950 << " payload.length()=" << payload.length() << dendl;
1951
1952 if (state != SESSION_RECONNECTING) {
1953 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1954 return _fault();
1955 }
1956
1957 auto retry = RetryFrame::Decode(payload);
1958 connect_seq = retry.connect_seq() + 1;
1959
1960 ldout(cct, 1) << __func__
1961 << " received session retry connect_seq=" << retry.connect_seq()
1962 << ", inc to cs=" << connect_seq << dendl;
1963
1964 return send_reconnect();
1965}
1966
1967CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
1968{
1969 ldout(cct, 20) << __func__
1970 << " payload.length()=" << payload.length() << dendl;
1971
1972 if (state != SESSION_RECONNECTING) {
1973 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1974 return _fault();
1975 }
1976
1977 auto retry = RetryGlobalFrame::Decode(payload);
1978 global_seq = messenger->get_global_seq(retry.global_seq());
1979
1980 ldout(cct, 1) << __func__ << " received session retry global global_seq="
1981 << retry.global_seq() << ", choose new gs=" << global_seq
1982 << dendl;
1983
1984 return send_reconnect();
1985}
1986
1987CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
1988 ldout(cct, 20) << __func__
1989 << " received WAIT (connection race)"
1990 << " payload.length()=" << payload.length()
1991 << dendl;
1992
1993 if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
1994 lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
1995 return _fault();
1996 }
1997
1998 state = WAIT;
1999 WaitFrame::Decode(payload);
2000 return _fault();
2001}
2002
2003CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2004{
2005 ldout(cct, 20) << __func__
2006 << " payload.length()=" << payload.length() << dendl;
2007
2008 if (state != SESSION_RECONNECTING) {
2009 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2010 return _fault();
2011 }
2012
2013 auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2014 ldout(cct, 5) << __func__
2015 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2016 << dendl;
2017
2018 out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2019
2020 backoff = utime_t();
2021 ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2022 << ", lossy = " << connection->policy.lossy << ", features "
2023 << connection->get_features() << dendl;
2024
2025 if (connection->delay_state) {
2026 ceph_assert(connection->delay_state->ready());
2027 }
2028
2029 connection->dispatch_queue->queue_connect(connection);
2030 messenger->ms_deliver_handle_fast_connect(connection);
2031
2032 return ready();
2033}
2034
2035CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2036{
2037 ldout(cct, 20) << __func__
2038 << " payload.length()=" << payload.length() << dendl;
2039
2040 if (state != SESSION_CONNECTING) {
2041 lderr(cct) << __func__ << " not in session connect state!" << dendl;
2042 return _fault();
2043 }
2044
2045 auto server_ident = ServerIdentFrame::Decode(payload);
2046 ldout(cct, 5) << __func__ << " received server identification:"
2047 << " addrs=" << server_ident.addrs()
2048 << " gid=" << server_ident.gid()
2049 << " global_seq=" << server_ident.global_seq()
2050 << " features_supported=" << std::hex
2051 << server_ident.supported_features()
2052 << " features_required=" << server_ident.required_features()
f6b5b4d7
TL
2053 << " flags=" << server_ident.flags()
2054 << " cookie=" << server_ident.cookie() << std::dec << dendl;
11fdf7f2
TL
2055
2056 // is this who we intended to talk to?
2057 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2058 // of mon_host or something.
2059 if (!server_ident.addrs().contains(connection->target_addr)) {
2060 ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2061 << ", does not include " << connection->target_addr << dendl;
2062 return _fault();
2063 }
2064
2065 server_cookie = server_ident.cookie();
2066
2067 connection->set_peer_addrs(server_ident.addrs());
2068 peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2069 connection->set_features(server_ident.supported_features() &
2070 connection->policy.features_supported);
2071 peer_global_seq = server_ident.global_seq();
2072
2073 connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2074
2075 backoff = utime_t();
2076 ldout(cct, 10) << __func__ << " connect success " << connect_seq
2077 << ", lossy = " << connection->policy.lossy << ", features "
2078 << connection->get_features() << dendl;
2079
2080 if (connection->delay_state) {
2081 ceph_assert(connection->delay_state->ready());
2082 }
2083
2084 connection->dispatch_queue->queue_connect(connection);
2085 messenger->ms_deliver_handle_fast_connect(connection);
2086
2087 return ready();
2088}
2089
2090/* Server Protocol Methods */
2091
2092CtPtr ProtocolV2::start_server_banner_exchange() {
2093 ldout(cct, 20) << __func__ << dendl;
2094
2095 INTERCEPT(2);
2096
2097 state = BANNER_ACCEPTING;
2098
2099 return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2100}
2101
2102CtPtr ProtocolV2::post_server_banner_exchange() {
2103 ldout(cct, 20) << __func__ << dendl;
2104
2105 state = AUTH_ACCEPTING;
2106
2107 return CONTINUE(read_frame);
2108}
2109
2110CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2111 ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2112 << dendl;
2113
2114 if (state != AUTH_ACCEPTING) {
2115 lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2116 return _fault();
2117 }
2118
2119 auto request = AuthRequestFrame::Decode(payload);
2120 ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2121 << ", preferred_modes=" << request.preferred_modes()
2122 << ", payload_len=" << request.auth_payload().length() << ")"
2123 << dendl;
2124 auth_meta->auth_method = request.method();
2125 auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2126 connection->get_peer_type(), auth_meta->auth_method,
2127 request.preferred_modes());
2128 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2129 return _auth_bad_method(-EOPNOTSUPP);
2130 }
2131 return _handle_auth_request(request.auth_payload(), false);
2132}
2133
2134CtPtr ProtocolV2::_auth_bad_method(int r)
2135{
2136 ceph_assert(r < 0);
2137 std::vector<uint32_t> allowed_methods;
2138 std::vector<uint32_t> allowed_modes;
2139 messenger->auth_server->get_supported_auth_methods(
2140 connection->get_peer_type(), &allowed_methods, &allowed_modes);
2141 ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2142 << " r " << cpp_strerror(r)
2143 << ", allowed_methods " << allowed_methods
2144 << ", allowed_modes " << allowed_modes
2145 << dendl;
2146 auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2147 allowed_methods, allowed_modes);
2148 return WRITE(bad_method, "bad auth method", read_frame);
2149}
2150
f67539c2 2151CtPtr ProtocolV2::_handle_auth_request(ceph::bufferlist& auth_payload, bool more)
11fdf7f2
TL
2152{
2153 if (!messenger->auth_server) {
2154 return _fault();
2155 }
f67539c2 2156 ceph::bufferlist reply;
11fdf7f2
TL
2157 auto am = auth_meta;
2158 connection->lock.unlock();
2159 int r = messenger->auth_server->handle_auth_request(
2160 connection, am.get(),
2161 more, am->auth_method, auth_payload,
2162 &reply);
2163 connection->lock.lock();
2164 if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2165 ldout(cct, 1) << __func__
2166 << " state changed while accept, it must be mark_down"
2167 << dendl;
2168 ceph_assert(state == CLOSED);
2169 return _fault();
2170 }
2171 if (r == 1) {
2172 INTERCEPT(10);
2173 state = AUTH_ACCEPTING_SIGN;
2174
2175 auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2176 auth_meta->con_mode,
2177 reply);
2178 return WRITE(auth_done, "auth done", finish_auth);
2179 } else if (r == 0) {
2180 state = AUTH_ACCEPTING_MORE;
2181
2182 auto more = AuthReplyMoreFrame::Encode(reply);
2183 return WRITE(more, "auth reply more", read_frame);
2184 } else if (r == -EBUSY) {
2185 // kick the client and maybe they'll come back later
2186 return _fault();
2187 } else {
2188 return _auth_bad_method(r);
2189 }
2190}
2191
2192CtPtr ProtocolV2::finish_auth()
2193{
2194 ceph_assert(auth_meta);
2195 // TODO: having a possibility to check whether we're server or client could
2196 // allow reusing finish_auth().
f6b5b4d7
TL
2197 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
2198 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
2199 cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/true);
11fdf7f2
TL
2200
2201 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2202 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2203 auto sig_frame = AuthSignatureFrame::Encode(sig);
2204 pre_auth.enabled = false;
2205 pre_auth.rxbuf.clear();
2206 return WRITE(sig_frame, "auth signature", read_frame);
2207}
2208
2209CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2210{
2211 ldout(cct, 20) << __func__
2212 << " payload.length()=" << payload.length() << dendl;
2213
2214 if (state != AUTH_ACCEPTING_MORE) {
2215 lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2216 return _fault();
2217 }
2218
2219 auto auth_more = AuthRequestMoreFrame::Decode(payload);
2220 return _handle_auth_request(auth_more.auth_payload(), true);
2221}
2222
2223CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2224{
2225 ldout(cct, 20) << __func__
2226 << " payload.length()=" << payload.length() << dendl;
2227
2228 if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2229 lderr(cct) << __func__
2230 << " pre-auth verification signature seen in wrong state!"
2231 << dendl;
2232 return _fault();
2233 }
2234
2235 auto sig_frame = AuthSignatureFrame::Decode(payload);
2236
2237 const auto actual_tx_sig = auth_meta->session_key.empty() ?
2238 sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2239 if (sig_frame.signature() != actual_tx_sig) {
2240 ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2241 << " actual_tx_sig=" << actual_tx_sig
2242 << " sig_frame.signature()=" << sig_frame.signature()
2243 << dendl;
2244 return _fault();
2245 } else {
2246 ldout(cct, 20) << __func__ << " pre-auth signature success"
2247 << " sig_frame.signature()=" << sig_frame.signature()
2248 << dendl;
2249 pre_auth.txbuf.clear();
2250 }
2251
2252 if (state == AUTH_ACCEPTING_SIGN) {
2253 // server had sent AuthDone and client responded with correct pre-auth
2254 // signature. we can start accepting new sessions/reconnects.
2255 state = SESSION_ACCEPTING;
2256 return CONTINUE(read_frame);
2257 } else if (state == AUTH_CONNECTING_SIGN) {
2258 // this happened at client side
2259 return finish_client_auth();
2260 } else {
9f95a23c 2261 ceph_abort("state corruption");
11fdf7f2
TL
2262 }
2263}
2264
2265CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2266{
2267 ldout(cct, 20) << __func__
2268 << " payload.length()=" << payload.length() << dendl;
2269
2270 if (state != SESSION_ACCEPTING) {
2271 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2272 return _fault();
2273 }
2274
2275 auto client_ident = ClientIdentFrame::Decode(payload);
2276
2277 ldout(cct, 5) << __func__ << " received client identification:"
2278 << " addrs=" << client_ident.addrs()
2279 << " target=" << client_ident.target_addr()
2280 << " gid=" << client_ident.gid()
2281 << " global_seq=" << client_ident.global_seq()
2282 << " features_supported=" << std::hex
2283 << client_ident.supported_features()
2284 << " features_required=" << client_ident.required_features()
2285 << " flags=" << client_ident.flags()
2286 << " cookie=" << client_ident.cookie() << std::dec << dendl;
2287
2288 if (client_ident.addrs().empty() ||
2289 client_ident.addrs().front() == entity_addr_t()) {
2290 ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2291 return _fault(); // a v2 peer should never do this
2292 }
2293 if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2294 ldout(cct,5) << __func__ << " peer is trying to reach "
2295 << client_ident.target_addr()
2296 << " which is not us (" << messenger->get_myaddrs() << ")"
2297 << dendl;
2298 return _fault();
2299 }
2300
2301 connection->set_peer_addrs(client_ident.addrs());
2302 connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2303
2304 peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2305 connection->set_peer_id(client_ident.gid());
2306
2307 client_cookie = client_ident.cookie();
2308
2309 uint64_t feat_missing =
2310 (connection->policy.features_required | msgr2_required) &
2311 ~(uint64_t)client_ident.supported_features();
2312 if (feat_missing) {
2313 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2314 << feat_missing << std::dec << dendl;
2315 auto ident_missing_features =
2316 IdentMissingFeaturesFrame::Encode(feat_missing);
2317
2318 return WRITE(ident_missing_features, "ident missing features", read_frame);
2319 }
2320
2321 connection_features =
2322 client_ident.supported_features() & connection->policy.features_supported;
2323
2324 peer_global_seq = client_ident.global_seq();
2325
9f95a23c
TL
2326 if (connection->policy.server &&
2327 connection->policy.lossy &&
2328 !connection->policy.register_lossy_clients) {
2329 // incoming lossy client, no need to register this connection
2330 } else {
2331 // Looks good so far, let's check if there is already an existing connection
2332 // to this peer.
2333 connection->lock.unlock();
2334 AsyncConnectionRef existing = messenger->lookup_conn(
2335 *connection->peer_addrs);
2336
2337 if (existing &&
2338 existing->protocol->proto_type != 2) {
2339 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2340 << existing->protocol.get() << " version is "
2341 << existing->protocol->proto_type << ", marking down"
2342 << dendl;
2343 existing->mark_down();
2344 existing = nullptr;
2345 }
11fdf7f2 2346
9f95a23c 2347 connection->inject_delay();
11fdf7f2 2348
9f95a23c
TL
2349 connection->lock.lock();
2350 if (state != SESSION_ACCEPTING) {
2351 ldout(cct, 1) << __func__
2352 << " state changed while accept, it must be mark_down"
2353 << dendl;
2354 ceph_assert(state == CLOSED);
2355 return _fault();
2356 }
11fdf7f2 2357
9f95a23c
TL
2358 if (existing) {
2359 return handle_existing_connection(existing);
2360 }
11fdf7f2
TL
2361 }
2362
2363 // if everything is OK reply with server identification
2364 return send_server_ident();
2365}
2366
2367CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2368{
2369 ldout(cct, 20) << __func__
2370 << " payload.length()=" << payload.length() << dendl;
2371
2372 if (state != SESSION_ACCEPTING) {
2373 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2374 return _fault();
2375 }
2376
2377 auto reconnect = ReconnectFrame::Decode(payload);
2378
2379 ldout(cct, 5) << __func__
2380 << " received reconnect:"
2381 << " client_cookie=" << std::hex << reconnect.client_cookie()
2382 << " server_cookie=" << reconnect.server_cookie() << std::dec
2383 << " gs=" << reconnect.global_seq()
2384 << " cs=" << reconnect.connect_seq()
2385 << " ms=" << reconnect.msg_seq()
2386 << dendl;
2387
2388 // Should we check if one of the ident.addrs match connection->target_addr
2389 // as we do in ProtocolV1?
2390 connection->set_peer_addrs(reconnect.addrs());
2391 connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2392 peer_global_seq = reconnect.global_seq();
2393
2394 connection->lock.unlock();
2395 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2396
2397 if (existing &&
2398 existing->protocol->proto_type != 2) {
2399 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2400 << existing->protocol.get() << " version is "
2401 << existing->protocol->proto_type << ", marking down" << dendl;
2402 existing->mark_down();
2403 existing = nullptr;
2404 }
2405
2406 connection->inject_delay();
2407
2408 connection->lock.lock();
2409 if (state != SESSION_ACCEPTING) {
2410 ldout(cct, 1) << __func__
2411 << " state changed while accept, it must be mark_down"
2412 << dendl;
2413 ceph_assert(state == CLOSED);
2414 return _fault();
2415 }
2416
2417 if (!existing) {
2418 // there is no existing connection therefore cannot reconnect to previous
2419 // session
2420 ldout(cct, 0) << __func__
2421 << " no existing connection exists, reseting client" << dendl;
2422 auto reset = ResetFrame::Encode(true);
2423 return WRITE(reset, "session reset", read_frame);
2424 }
2425
2426 std::lock_guard<std::mutex> l(existing->lock);
2427
2428 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2429 if (!exproto) {
2430 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2431 ceph_assert(false);
2432 }
2433
2434 if (exproto->state == CLOSED) {
2435 ldout(cct, 5) << __func__ << " existing " << existing
2436 << " already closed. Reseting client" << dendl;
2437 auto reset = ResetFrame::Encode(true);
2438 return WRITE(reset, "session reset", read_frame);
2439 }
2440
2441 if (exproto->replacing) {
2442 ldout(cct, 1) << __func__
2443 << " existing racing replace happened while replacing."
2444 << " existing=" << existing << dendl;
2445 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2446 return WRITE(retry, "session retry", read_frame);
2447 }
2448
2449 if (exproto->client_cookie != reconnect.client_cookie()) {
2450 ldout(cct, 1) << __func__ << " existing=" << existing
2451 << " client cookie mismatch, I must have reseted:"
2452 << " cc=" << std::hex << exproto->client_cookie
2453 << " rcc=" << reconnect.client_cookie()
2454 << ", reseting client." << std::dec
2455 << dendl;
2456 auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2457 return WRITE(reset, "session reset", read_frame);
2458 } else if (exproto->server_cookie == 0) {
2459 // this happens when:
2460 // - a connects to b
2461 // - a sends client_ident
2462 // - b gets client_ident, sends server_ident and sets cookie X
2463 // - connection fault
2464 // - b reconnects to a with cookie X, connect_seq=1
2465 // - a has cookie==0
2466 ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2467 << " server_ident. Asking peer to resume session"
2468 << " establishment" << dendl;
2469 auto reset = ResetFrame::Encode(false);
2470 return WRITE(reset, "session reset", read_frame);
2471 }
2472
2473 if (exproto->peer_global_seq > reconnect.global_seq()) {
2474 ldout(cct, 5) << __func__
2475 << " stale global_seq: sgs=" << exproto->peer_global_seq
2476 << " cgs=" << reconnect.global_seq()
2477 << ", ask client to retry global" << dendl;
2478 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2479
2480 INTERCEPT(18);
2481
2482 return WRITE(retry, "session retry", read_frame);
2483 }
2484
2485 if (exproto->connect_seq > reconnect.connect_seq()) {
2486 ldout(cct, 5) << __func__
2487 << " stale connect_seq scs=" << exproto->connect_seq
2488 << " ccs=" << reconnect.connect_seq()
2489 << " , ask client to retry" << dendl;
2490 auto retry = RetryFrame::Encode(exproto->connect_seq);
2491 return WRITE(retry, "session retry", read_frame);
2492 }
2493
2494 if (exproto->connect_seq == reconnect.connect_seq()) {
2495 // reconnect race: both peers are sending reconnect messages
2496 if (existing->peer_addrs->msgr2_addr() >
2497 messenger->get_myaddrs().msgr2_addr() &&
2498 !existing->policy.server) {
2499 // the existing connection wins
2500 ldout(cct, 1)
2501 << __func__
2502 << " reconnect race detected, this connection loses to existing="
2503 << existing << dendl;
2504
2505 auto wait = WaitFrame::Encode();
2506 return WRITE(wait, "wait", read_frame);
2507 } else {
2508 // this connection wins
2509 ldout(cct, 1) << __func__
2510 << " reconnect race detected, replacing existing="
2511 << existing << " socket by this connection's socket"
2512 << dendl;
2513 }
2514 }
2515
2516 ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2517
2518 reconnecting = true;
2519
2520 // everything looks good
2521 exproto->connect_seq = reconnect.connect_seq();
2522 exproto->message_seq = reconnect.msg_seq();
2523
2524 return reuse_connection(existing, exproto);
2525}
2526
9f95a23c 2527CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
11fdf7f2
TL
2528 ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2529
2530 std::lock_guard<std::mutex> l(existing->lock);
2531
2532 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2533 if (!exproto) {
2534 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2535 ceph_assert(false);
2536 }
2537
2538 if (exproto->state == CLOSED) {
2539 ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2540 << dendl;
2541 return send_server_ident();
2542 }
2543
2544 if (exproto->replacing) {
2545 ldout(cct, 1) << __func__
2546 << " existing racing replace happened while replacing."
2547 << " existing=" << existing << dendl;
2548 auto wait = WaitFrame::Encode();
2549 return WRITE(wait, "wait", read_frame);
2550 }
2551
2552 if (exproto->peer_global_seq > peer_global_seq) {
2553 ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2554 << peer_global_seq
2555 << " existing->peer_global_seq=" << exproto->peer_global_seq
2556 << ", stopping this connection." << dendl;
2557 stop();
2558 connection->dispatch_queue->queue_reset(connection);
2559 return nullptr;
2560 }
2561
2562 if (existing->policy.lossy) {
2563 // existing connection can be thrown out in favor of this one
2564 ldout(cct, 1)
2565 << __func__ << " existing=" << existing
2566 << " is a lossy channel. Stopping existing in favor of this connection"
2567 << dendl;
2568 existing->protocol->stop();
2569 existing->dispatch_queue->queue_reset(existing.get());
2570 return send_server_ident();
2571 }
2572
2573 if (exproto->server_cookie && exproto->client_cookie &&
2574 exproto->client_cookie != client_cookie) {
2575 // Found previous session
2576 // peer has reseted and we're going to reuse the existing connection
2577 // by replacing the communication socket
2578 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2579 << ", peer must have reseted." << dendl;
2580 if (connection->policy.resetcheck) {
2581 exproto->reset_session();
2582 }
2583 return reuse_connection(existing, exproto);
2584 }
2585
2586 if (exproto->client_cookie == client_cookie) {
2587 // session establishment interrupted between client_ident and server_ident,
2588 // continuing...
2589 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2590 << ", continuing session establishment." << dendl;
2591 return reuse_connection(existing, exproto);
2592 }
2593
2594 if (exproto->state == READY || exproto->state == STANDBY) {
2595 ldout(cct, 1) << __func__ << " existing=" << existing
2596 << " is READY/STANDBY, lets reuse it" << dendl;
2597 return reuse_connection(existing, exproto);
2598 }
2599
2600 // Looks like a connection race: server and client are both connecting to
2601 // each other at the same time.
2602 if (connection->peer_addrs->msgr2_addr() <
2603 messenger->get_myaddrs().msgr2_addr() ||
2604 existing->policy.server) {
2605 // this connection wins
2606 ldout(cct, 1) << __func__
2607 << " connection race detected, replacing existing="
2608 << existing << " socket by this connection's socket" << dendl;
2609 return reuse_connection(existing, exproto);
2610 } else {
2611 // the existing connection wins
2612 ldout(cct, 1)
2613 << __func__
2614 << " connection race detected, this connection loses to existing="
2615 << existing << dendl;
2616 ceph_assert(connection->peer_addrs->msgr2_addr() >
2617 messenger->get_myaddrs().msgr2_addr());
2618
2619 // make sure we follow through with opening the existing
2620 // connection (if it isn't yet open) since we know the peer
2621 // has something to send to us.
2622 existing->send_keepalive();
2623 auto wait = WaitFrame::Encode();
2624 return WRITE(wait, "wait", read_frame);
2625 }
2626}
2627
9f95a23c 2628CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
11fdf7f2
TL
2629 ProtocolV2 *exproto) {
2630 ldout(cct, 20) << __func__ << " existing=" << existing
2631 << " reconnect=" << reconnecting << dendl;
2632
2633 connection->inject_delay();
2634
2635 std::lock_guard<std::mutex> l(existing->write_lock);
2636
2637 connection->center->delete_file_event(connection->cs.fd(),
2638 EVENT_READABLE | EVENT_WRITABLE);
2639
2640 if (existing->delay_state) {
2641 existing->delay_state->flush();
2642 ceph_assert(!connection->delay_state);
2643 }
2644 exproto->reset_recv_state();
2645 exproto->pre_auth.enabled = false;
2646
2647 if (!reconnecting) {
f6b5b4d7
TL
2648 exproto->peer_supported_features = peer_supported_features;
2649 exproto->tx_frame_asm.set_is_rev1(tx_frame_asm.get_is_rev1());
2650 exproto->rx_frame_asm.set_is_rev1(rx_frame_asm.get_is_rev1());
2651
11fdf7f2
TL
2652 exproto->client_cookie = client_cookie;
2653 exproto->peer_name = peer_name;
2654 exproto->connection_features = connection_features;
2655 existing->set_features(connection_features);
2656 }
2657 exproto->peer_global_seq = peer_global_seq;
2658
9f95a23c 2659 ceph_assert(connection->center->in_thread());
11fdf7f2
TL
2660 auto temp_cs = std::move(connection->cs);
2661 EventCenter *new_center = connection->center;
2662 Worker *new_worker = connection->worker;
9f95a23c
TL
2663 // we can steal the session_stream_handlers under the assumption
2664 // this happens in the event center's thread as there should be
2665 // no user outside its boundaries (simlarly to e.g. outgoing_bl).
2666 auto temp_stream_handlers = std::move(session_stream_handlers);
2667 exproto->auth_meta = auth_meta;
11fdf7f2
TL
2668
2669 ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2670 << dendl;
494da23a 2671
11fdf7f2
TL
2672 // avoid _stop shutdown replacing socket
2673 // queue a reset on the new connection, which we're dumping for the old
2674 stop();
2675
2676 connection->dispatch_queue->queue_reset(connection);
2677
2678 exproto->can_write = false;
494da23a 2679 exproto->write_in_progress = false;
11fdf7f2
TL
2680 exproto->reconnecting = reconnecting;
2681 exproto->replacing = true;
11fdf7f2
TL
2682 existing->state_offset = 0;
2683 // avoid previous thread modify event
2684 exproto->state = NONE;
2685 existing->state = AsyncConnection::STATE_NONE;
2686 // Discard existing prefetch buffer in `recv_buf`
2687 existing->recv_start = existing->recv_end = 0;
2688 // there shouldn't exist any buffer
2689 ceph_assert(connection->recv_start == connection->recv_end);
2690
2691 auto deactivate_existing = std::bind(
9f95a23c
TL
2692 [ existing,
2693 new_worker,
2694 new_center,
2695 exproto,
2696 temp_stream_handlers=std::move(temp_stream_handlers)
2697 ](ConnectedSocket &cs) mutable {
11fdf7f2
TL
2698 // we need to delete time event in original thread
2699 {
2700 std::lock_guard<std::mutex> l(existing->lock);
2701 existing->write_lock.lock();
2702 exproto->requeue_sent();
9f95a23c
TL
2703 // XXX: do we really need the locking for `outgoing_bl`? There is
2704 // a comment just above its definition saying "lockfree, only used
2705 // in own thread". I'm following lockfull schema just in the case.
2706 // From performance point of view it should be fine – this happens
2707 // far away from hot paths.
2708 existing->outgoing_bl.clear();
11fdf7f2 2709 existing->open_write = false;
9f95a23c 2710 exproto->session_stream_handlers = std::move(temp_stream_handlers);
11fdf7f2
TL
2711 existing->write_lock.unlock();
2712 if (exproto->state == NONE) {
2713 existing->shutdown_socket();
2714 existing->cs = std::move(cs);
2715 existing->worker->references--;
2716 new_worker->references++;
2717 existing->logger = new_worker->get_perf_counter();
2718 existing->worker = new_worker;
2719 existing->center = new_center;
2720 if (existing->delay_state)
2721 existing->delay_state->set_center(new_center);
2722 } else if (exproto->state == CLOSED) {
2723 auto back_to_close = std::bind(
2724 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2725 new_center->submit_to(new_center->get_id(),
2726 std::move(back_to_close), true);
2727 return;
2728 } else {
2729 ceph_abort();
2730 }
2731 }
2732
2733 // Before changing existing->center, it may already exists some
2734 // events in existing->center's queue. Then if we mark down
2735 // `existing`, it will execute in another thread and clean up
2736 // connection. Previous event will result in segment fault
2737 auto transfer_existing = [existing, exproto]() mutable {
2738 std::lock_guard<std::mutex> l(existing->lock);
2739 if (exproto->state == CLOSED) return;
2740 ceph_assert(exproto->state == NONE);
2741
2742 exproto->state = SESSION_ACCEPTING;
81eedcae
TL
2743 // we have called shutdown_socket above
2744 ceph_assert(existing->last_tick_id == 0);
2745 // restart timer since we are going to re-build connection
2746 existing->last_connect_started = ceph::coarse_mono_clock::now();
2747 existing->last_tick_id = existing->center->create_time_event(
2748 existing->connect_timeout_us, existing->tick_handler);
11fdf7f2
TL
2749 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2750 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2751 existing->read_handler);
2752 if (!exproto->reconnecting) {
2753 exproto->run_continuation(exproto->send_server_ident());
2754 } else {
2755 exproto->run_continuation(exproto->send_reconnect_ok());
2756 }
2757 };
2758 if (existing->center->in_thread())
2759 transfer_existing();
2760 else
2761 existing->center->submit_to(existing->center->get_id(),
2762 std::move(transfer_existing), true);
2763 },
2764 std::move(temp_cs));
2765
2766 existing->center->submit_to(existing->center->get_id(),
2767 std::move(deactivate_existing), true);
2768 return nullptr;
2769}
2770
2771CtPtr ProtocolV2::send_server_ident() {
2772 ldout(cct, 20) << __func__ << dendl;
2773
2774 // this is required for the case when this connection is being replaced
2775 out_seq = discard_requeued_up_to(out_seq, 0);
2776 in_seq = 0;
2777
2778 if (!connection->policy.lossy) {
2779 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2780 }
2781
2782 uint64_t flags = 0;
2783 if (connection->policy.lossy) {
2784 flags = flags | CEPH_MSG_CONNECT_LOSSY;
2785 }
2786
2787 uint64_t gs = messenger->get_global_seq();
2788 auto server_ident = ServerIdentFrame::Encode(
2789 messenger->get_myaddrs(),
2790 messenger->get_myname().num(),
2791 gs,
2792 connection->policy.features_supported,
2793 connection->policy.features_required | msgr2_required,
2794 flags,
2795 server_cookie);
2796
2797 ldout(cct, 5) << __func__ << " sending identification:"
2798 << " addrs=" << messenger->get_myaddrs()
2799 << " gid=" << messenger->get_myname().num()
2800 << " global_seq=" << gs << " features_supported=" << std::hex
2801 << connection->policy.features_supported
2802 << " features_required="
2803 << (connection->policy.features_required | msgr2_required)
f6b5b4d7
TL
2804 << " flags=" << flags
2805 << " cookie=" << server_cookie << std::dec << dendl;
11fdf7f2
TL
2806
2807 connection->lock.unlock();
2808 // Because "replacing" will prevent other connections preempt this addr,
2809 // it's safe that here we don't acquire Connection's lock
2810 ssize_t r = messenger->accept_conn(connection);
2811
2812 connection->inject_delay();
2813
2814 connection->lock.lock();
2815
2816 if (r < 0) {
2817 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2818 << connection->peer_addrs->msgr2_addr()
2819 << " just fail later one(this)" << dendl;
2820 connection->inject_delay();
2821 return _fault();
2822 }
2823 if (state != SESSION_ACCEPTING) {
2824 ldout(cct, 1) << __func__
2825 << " state changed while accept_conn, it must be mark_down"
2826 << dendl;
2827 ceph_assert(state == CLOSED || state == NONE);
2828 messenger->unregister_conn(connection);
2829 connection->inject_delay();
2830 return _fault();
2831 }
2832
2833 connection->set_features(connection_features);
2834
2835 // notify
2836 connection->dispatch_queue->queue_accept(connection);
2837 messenger->ms_deliver_handle_fast_accept(connection);
2838
2839 INTERCEPT(12);
2840
2841 return WRITE(server_ident, "server ident", server_ready);
2842}
2843
2844CtPtr ProtocolV2::server_ready() {
2845 ldout(cct, 20) << __func__ << dendl;
2846
2847 if (connection->delay_state) {
2848 ceph_assert(connection->delay_state->ready());
2849 }
2850
2851 return ready();
2852}
2853
2854CtPtr ProtocolV2::send_reconnect_ok() {
2855 ldout(cct, 20) << __func__ << dendl;
2856
2857 out_seq = discard_requeued_up_to(out_seq, message_seq);
2858
2859 uint64_t ms = in_seq;
2860 auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2861
2862 ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2863
2864 connection->lock.unlock();
2865 // Because "replacing" will prevent other connections preempt this addr,
2866 // it's safe that here we don't acquire Connection's lock
2867 ssize_t r = messenger->accept_conn(connection);
2868
2869 connection->inject_delay();
2870
2871 connection->lock.lock();
2872
2873 if (r < 0) {
2874 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2875 << connection->peer_addrs->msgr2_addr()
2876 << " just fail later one(this)" << dendl;
2877 connection->inject_delay();
2878 return _fault();
2879 }
2880 if (state != SESSION_ACCEPTING) {
2881 ldout(cct, 1) << __func__
2882 << " state changed while accept_conn, it must be mark_down"
2883 << dendl;
2884 ceph_assert(state == CLOSED || state == NONE);
2885 messenger->unregister_conn(connection);
2886 connection->inject_delay();
2887 return _fault();
2888 }
2889
2890 // notify
2891 connection->dispatch_queue->queue_accept(connection);
2892 messenger->ms_deliver_handle_fast_accept(connection);
2893
2894 INTERCEPT(14);
2895
2896 return WRITE(reconnect_ok, "reconnect ok", server_ready);
2897}