]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/ProtocolV2.cc
import 15.2.1 Octopus source
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <type_traits>
5
6 #include "ProtocolV2.h"
7 #include "AsyncMessenger.h"
8
9 #include "common/EventTrace.h"
10 #include "common/ceph_crypto.h"
11 #include "common/errno.h"
12 #include "include/random.h"
13 #include "auth/AuthClient.h"
14 #include "auth/AuthServer.h"
15
16 #define dout_subsys ceph_subsys_ms
17 #undef dout_prefix
18 #define dout_prefix _conn_prefix(_dout)
19 ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
20 return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21 << *connection->peer_addrs << " conn(" << connection << " "
22 << this
23 << " " << ceph_con_mode_name(auth_meta->con_mode)
24 << " :" << connection->port
25 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq << " l=" << connection->policy.lossy
27 << " rx=" << session_stream_handlers.rx.get()
28 << " tx=" << session_stream_handlers.tx.get()
29 << ").";
30 }
31
32 using namespace ceph::msgr::v2;
33
34 using CtPtr = Ct<ProtocolV2> *;
35 using CtRef = Ct<ProtocolV2> &;
36
37 void ProtocolV2::run_continuation(CtPtr pcontinuation) {
38 if (pcontinuation) {
39 run_continuation(*pcontinuation);
40 }
41 }
42
43 void ProtocolV2::run_continuation(CtRef continuation) {
44 try {
45 CONTINUATION_RUN(continuation)
46 } catch (const buffer::error &e) {
47 lderr(cct) << __func__ << " failed decoding of frame header: " << e
48 << dendl;
49 _fault();
50 } catch (const ceph::crypto::onwire::MsgAuthError &e) {
51 lderr(cct) << __func__ << " " << e.what() << dendl;
52 _fault();
53 } catch (const DecryptionError &) {
54 lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
55 }
56 }
57
58 #define WRITE(B, D, C) write(D, CONTINUATION(C), B)
59
60 #define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
61
62 #define READ_RXBUF(B, C) read(CONTINUATION(C), B)
63
64 #ifdef UNIT_TESTS_BUILT
65
66 #define INTERCEPT(S) { \
67 if(connection->interceptor) { \
68 auto a = connection->interceptor->intercept(connection, (S)); \
69 if (a == Interceptor::ACTION::FAIL) { \
70 return _fault(); \
71 } else if (a == Interceptor::ACTION::STOP) { \
72 stop(); \
73 connection->dispatch_queue->queue_reset(connection); \
74 return nullptr; \
75 }}}
76
77 #else
78 #define INTERCEPT(S)
79 #endif
80
81 ProtocolV2::ProtocolV2(AsyncConnection *connection)
82 : Protocol(2, connection),
83 state(NONE),
84 peer_required_features(0),
85 client_cookie(0),
86 server_cookie(0),
87 global_seq(0),
88 connect_seq(0),
89 peer_global_seq(0),
90 message_seq(0),
91 reconnecting(false),
92 replacing(false),
93 can_write(false),
94 bannerExchangeCallback(nullptr),
95 next_tag(static_cast<Tag>(0)),
96 keepalive(false) {
97 }
98
99 ProtocolV2::~ProtocolV2() {
100 }
101
102 void ProtocolV2::connect() {
103 ldout(cct, 1) << __func__ << dendl;
104 state = START_CONNECT;
105 pre_auth.enabled = true;
106 }
107
108 void ProtocolV2::accept() {
109 ldout(cct, 1) << __func__ << dendl;
110 state = START_ACCEPT;
111 }
112
113 bool ProtocolV2::is_connected() { return can_write; }
114
115 /*
116 * Tears down the message queues, and removes them from the
117 * DispatchQueue Must hold write_lock prior to calling.
118 */
119 void ProtocolV2::discard_out_queue() {
120 ldout(cct, 10) << __func__ << " started" << dendl;
121
122 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
123 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
124 (*p)->put();
125 }
126 sent.clear();
127 for (auto& [ prio, entries ] : out_queue) {
128 static_cast<void>(prio);
129 for (auto& entry : entries) {
130 ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
131 entry.m->put();
132 }
133 }
134 out_queue.clear();
135 write_in_progress = false;
136 }
137
138 void ProtocolV2::reset_session() {
139 ldout(cct, 1) << __func__ << dendl;
140
141 std::lock_guard<std::mutex> l(connection->write_lock);
142 if (connection->delay_state) {
143 connection->delay_state->discard();
144 }
145
146 connection->dispatch_queue->discard_queue(connection->conn_id);
147 discard_out_queue();
148 connection->outgoing_bl.clear();
149
150 connection->dispatch_queue->queue_remote_reset(connection);
151
152 out_seq = 0;
153 in_seq = 0;
154 client_cookie = 0;
155 server_cookie = 0;
156 connect_seq = 0;
157 peer_global_seq = 0;
158 message_seq = 0;
159 ack_left = 0;
160 can_write = false;
161 }
162
163 void ProtocolV2::stop() {
164 ldout(cct, 1) << __func__ << dendl;
165 if (state == CLOSED) {
166 return;
167 }
168
169 if (connection->delay_state) connection->delay_state->flush();
170
171 std::lock_guard<std::mutex> l(connection->write_lock);
172
173 reset_recv_state();
174 discard_out_queue();
175
176 connection->_stop();
177
178 can_write = false;
179 state = CLOSED;
180 }
181
182 void ProtocolV2::fault() { _fault(); }
183
184 void ProtocolV2::requeue_sent() {
185 write_in_progress = false;
186 if (sent.empty()) {
187 return;
188 }
189
190 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
191 out_seq -= sent.size();
192 while (!sent.empty()) {
193 Message *m = sent.back();
194 sent.pop_back();
195 ldout(cct, 5) << __func__ << " requeueing message m=" << m
196 << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
197 << *m << dendl;
198 m->clear_payload();
199 rq.emplace_front(out_queue_entry_t{false, m});
200 }
201 }
202
203 uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
204 ldout(cct, 10) << __func__ << " " << seq << dendl;
205 std::lock_guard<std::mutex> l(connection->write_lock);
206 if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
207 return seq;
208 }
209 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
210 uint64_t count = out_seq;
211 while (!rq.empty()) {
212 Message* const m = rq.front().m;
213 if (m->get_seq() == 0 || m->get_seq() > seq) break;
214 ldout(cct, 5) << __func__ << " discarding message m=" << m
215 << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
216 << *m << dendl;
217 m->put();
218 rq.pop_front();
219 count++;
220 }
221 if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
222 return count;
223 }
224
225 void ProtocolV2::reset_security() {
226 ldout(cct, 5) << __func__ << dendl;
227
228 auth_meta.reset(new AuthConnectionMeta);
229 session_stream_handlers.rx.reset(nullptr);
230 session_stream_handlers.tx.reset(nullptr);
231 pre_auth.rxbuf.clear();
232 pre_auth.txbuf.clear();
233 }
234
235 // it's expected the `write_lock` is held while calling this method.
236 void ProtocolV2::reset_recv_state() {
237 ldout(cct, 5) << __func__ << dendl;
238
239 if (!connection->center->in_thread()) {
240 // execute in the same thread that uses the rx/tx handlers. We need
241 // to do the warp because holding `write_lock` is not enough as
242 // `write_event()` unlocks it just before calling `write_message()`.
243 // `submit_to()` here is NOT blocking.
244 connection->center->submit_to(connection->center->get_id(), [this] {
245 ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
246 << dendl;
247 // Possibly unnecessary. See the comment in `deactivate_existing`.
248 std::lock_guard<std::mutex> l(connection->lock);
249 std::lock_guard<std::mutex> wl(connection->write_lock);
250 reset_security();
251 }, /* always_async = */true);
252 } else {
253 reset_security();
254 }
255
256 // clean read and write callbacks
257 connection->pendingReadLen.reset();
258 connection->writeCallback.reset();
259
260 next_tag = static_cast<Tag>(0);
261
262 reset_throttle();
263 }
264
265 size_t ProtocolV2::get_current_msg_size() const {
266 ceph_assert(!rx_segments_desc.empty());
267 size_t sum = 0;
268 // we don't include SegmentIndex::Msg::HEADER.
269 for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
270 sum += rx_segments_desc[idx].length;
271 }
272 return sum;
273 }
274
275 void ProtocolV2::reset_throttle() {
276 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
277 connection->policy.throttler_messages) {
278 ldout(cct, 10) << __func__ << " releasing " << 1
279 << " message to policy throttler "
280 << connection->policy.throttler_messages->get_current()
281 << "/" << connection->policy.throttler_messages->get_max()
282 << dendl;
283 connection->policy.throttler_messages->put();
284 }
285 if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
286 if (connection->policy.throttler_bytes) {
287 const size_t cur_msg_size = get_current_msg_size();
288 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
289 << " bytes to policy throttler "
290 << connection->policy.throttler_bytes->get_current() << "/"
291 << connection->policy.throttler_bytes->get_max() << dendl;
292 connection->policy.throttler_bytes->put(cur_msg_size);
293 }
294 }
295 if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
296 const size_t cur_msg_size = get_current_msg_size();
297 ldout(cct, 10)
298 << __func__ << " releasing " << cur_msg_size
299 << " bytes to dispatch_queue throttler "
300 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
301 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
302 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
303 }
304 }
305
306 CtPtr ProtocolV2::_fault() {
307 ldout(cct, 10) << __func__ << dendl;
308
309 if (state == CLOSED || state == NONE) {
310 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
311 return nullptr;
312 }
313
314 if (connection->policy.lossy &&
315 !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
316 ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
317 stop();
318 connection->dispatch_queue->queue_reset(connection);
319 return nullptr;
320 }
321
322 connection->write_lock.lock();
323
324 can_write = false;
325 // requeue sent items
326 requeue_sent();
327
328 if (out_queue.empty() && state >= START_ACCEPT &&
329 state <= SESSION_ACCEPTING && !replacing) {
330 ldout(cct, 2) << __func__ << " with nothing to send and in the half "
331 << " accept state just closed" << dendl;
332 connection->write_lock.unlock();
333 stop();
334 connection->dispatch_queue->queue_reset(connection);
335 return nullptr;
336 }
337
338 replacing = false;
339 connection->fault();
340 reset_recv_state();
341
342 reconnecting = false;
343
344 if (connection->policy.standby && out_queue.empty() && !keepalive &&
345 state != WAIT) {
346 ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
347 << dendl;
348 state = STANDBY;
349 connection->write_lock.unlock();
350 return nullptr;
351 }
352 if (connection->policy.server) {
353 ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
354 state = STANDBY;
355 connection->write_lock.unlock();
356 return nullptr;
357 }
358
359 connection->write_lock.unlock();
360
361 if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
362 state != WAIT &&
363 state != SESSION_ACCEPTING /* due to connection race */) {
364 // policy maybe empty when state is in accept
365 if (connection->policy.server) {
366 ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
367 state = STANDBY;
368 } else {
369 ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
370 connect_seq++;
371 global_seq = messenger->get_global_seq();
372 state = START_CONNECT;
373 pre_auth.enabled = true;
374 connection->state = AsyncConnection::STATE_CONNECTING;
375 }
376 backoff = utime_t();
377 connection->center->dispatch_event_external(connection->read_handler);
378 } else {
379 if (state == WAIT) {
380 backoff.set_from_double(cct->_conf->ms_max_backoff);
381 } else if (backoff == utime_t()) {
382 backoff.set_from_double(cct->_conf->ms_initial_backoff);
383 } else {
384 backoff += backoff;
385 if (backoff > cct->_conf->ms_max_backoff)
386 backoff.set_from_double(cct->_conf->ms_max_backoff);
387 }
388
389 if (server_cookie) {
390 connect_seq++;
391 }
392
393 global_seq = messenger->get_global_seq();
394 state = START_CONNECT;
395 pre_auth.enabled = true;
396 connection->state = AsyncConnection::STATE_CONNECTING;
397 ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
398 // woke up again;
399 connection->register_time_events.insert(
400 connection->center->create_time_event(backoff.to_nsec() / 1000,
401 connection->wakeup_handler));
402 }
403 return nullptr;
404 }
405
406 void ProtocolV2::prepare_send_message(uint64_t features,
407 Message *m) {
408 ldout(cct, 20) << __func__ << " m=" << *m << dendl;
409
410 // associate message with Connection (for benefit of encode_payload)
411 ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
412 << features << " " << m << " " << *m << dendl;
413
414 // encode and copy out of *m
415 m->encode(features, 0);
416 }
417
418 void ProtocolV2::send_message(Message *m) {
419 uint64_t f = connection->get_features();
420
421 // TODO: Currently not all messages supports reencode like MOSDMap, so here
422 // only let fast dispatch support messages prepare message
423 const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
424 if (can_fast_prepare) {
425 prepare_send_message(f, m);
426 }
427
428 std::lock_guard<std::mutex> l(connection->write_lock);
429 bool is_prepared = can_fast_prepare;
430 // "features" changes will change the payload encoding
431 if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
432 // ensure the correctness of message encoding
433 m->clear_payload();
434 is_prepared = false;
435 ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
436 << " != " << connection->get_features() << dendl;
437 }
438 if (state == CLOSED) {
439 ldout(cct, 10) << __func__ << " connection closed."
440 << " Drop message " << m << dendl;
441 m->put();
442 } else {
443 ldout(cct, 5) << __func__ << " enqueueing message m=" << m
444 << " type=" << m->get_type() << " " << *m << dendl;
445 m->queue_start = ceph::mono_clock::now();
446 m->trace.event("async enqueueing message");
447 out_queue[m->get_priority()].emplace_back(
448 out_queue_entry_t{is_prepared, m});
449 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
450 << dendl;
451 if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
452 write_in_progress = true;
453 connection->center->dispatch_event_external(connection->write_handler);
454 }
455 }
456 }
457
458 void ProtocolV2::send_keepalive() {
459 ldout(cct, 10) << __func__ << dendl;
460 std::lock_guard<std::mutex> l(connection->write_lock);
461 if (state != CLOSED) {
462 keepalive = true;
463 connection->center->dispatch_event_external(connection->write_handler);
464 }
465 }
466
467 void ProtocolV2::read_event() {
468 ldout(cct, 20) << __func__ << dendl;
469
470 switch (state) {
471 case START_CONNECT:
472 run_continuation(CONTINUATION(start_client_banner_exchange));
473 break;
474 case START_ACCEPT:
475 run_continuation(CONTINUATION(start_server_banner_exchange));
476 break;
477 case READY:
478 run_continuation(CONTINUATION(read_frame));
479 break;
480 case THROTTLE_MESSAGE:
481 run_continuation(CONTINUATION(throttle_message));
482 break;
483 case THROTTLE_BYTES:
484 run_continuation(CONTINUATION(throttle_bytes));
485 break;
486 case THROTTLE_DISPATCH_QUEUE:
487 run_continuation(CONTINUATION(throttle_dispatch_queue));
488 break;
489 default:
490 break;
491 }
492 }
493
494 ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
495 out_queue_entry_t out_entry;
496
497 if (!out_queue.empty()) {
498 auto it = out_queue.rbegin();
499 auto& entries = it->second;
500 ceph_assert(!entries.empty());
501 out_entry = entries.front();
502 entries.pop_front();
503 if (entries.empty()) {
504 out_queue.erase(it->first);
505 }
506 }
507 return out_entry;
508 }
509
510 ssize_t ProtocolV2::write_message(Message *m, bool more) {
511 FUNCTRACE(cct);
512 ceph_assert(connection->center->in_thread());
513 m->set_seq(++out_seq);
514
515 connection->lock.lock();
516 uint64_t ack_seq = in_seq;
517 ack_left = 0;
518 connection->lock.unlock();
519
520 ceph_msg_header &header = m->get_header();
521 ceph_msg_footer &footer = m->get_footer();
522
523 ceph_msg_header2 header2{header.seq, header.tid,
524 header.type, header.priority,
525 header.version,
526 init_le32(0), header.data_off,
527 init_le64(ack_seq),
528 footer.flags, header.compat_version,
529 header.reserved};
530
531 auto message = MessageFrame::Encode(
532 header2,
533 m->get_payload(),
534 m->get_middle(),
535 m->get_data());
536 if (!append_frame(message)) {
537 m->put();
538 return -EILSEQ;
539 }
540
541 ldout(cct, 5) << __func__ << " sending message m=" << m
542 << " seq=" << m->get_seq() << " " << *m << dendl;
543
544 m->trace.event("async writing message");
545 ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
546 << " src=" << entity_name_t(messenger->get_myname())
547 << " off=" << header2.data_off
548 << dendl;
549 ssize_t total_send_size = connection->outgoing_bl.length();
550 ssize_t rc = connection->_try_send(more);
551 if (rc < 0) {
552 ldout(cct, 1) << __func__ << " error sending " << m << ", "
553 << cpp_strerror(rc) << dendl;
554 } else {
555 connection->logger->inc(
556 l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
557 ldout(cct, 10) << __func__ << " sending " << m
558 << (rc ? " continuely." : " done.") << dendl;
559 }
560
561 #if defined(WITH_EVENTTRACE)
562 if (m->get_type() == CEPH_MSG_OSD_OP)
563 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
564 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
565 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
566 #endif
567 m->put();
568
569 return rc;
570 }
571
572 template <class F>
573 bool ProtocolV2::append_frame(F& frame) {
574 ceph::bufferlist bl;
575 try {
576 bl = frame.get_buffer(session_stream_handlers);
577 } catch (ceph::crypto::onwire::TxHandlerError &e) {
578 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
579 return false;
580 }
581 connection->outgoing_bl.append(bl);
582 return true;
583 }
584
585 void ProtocolV2::handle_message_ack(uint64_t seq) {
586 if (connection->policy.lossy) { // lossy connections don't keep sent messages
587 return;
588 }
589
590 ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
591
592 // trim sent list
593 static const int max_pending = 128;
594 int i = 0;
595 Message *pending[max_pending];
596 auto now = ceph::mono_clock::now();
597 connection->write_lock.lock();
598 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
599 Message *m = sent.front();
600 sent.pop_front();
601 pending[i++] = m;
602 ldout(cct, 10) << __func__ << " got ack seq " << seq
603 << " >= " << m->get_seq() << " on " << m << " " << *m
604 << dendl;
605 }
606 connection->write_lock.unlock();
607 connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
608 for (int k = 0; k < i; k++) {
609 pending[k]->put();
610 }
611 }
612
613 void ProtocolV2::write_event() {
614 ldout(cct, 10) << __func__ << dendl;
615 ssize_t r = 0;
616
617 connection->write_lock.lock();
618 if (can_write) {
619 if (keepalive) {
620 ldout(cct, 10) << __func__ << " appending keepalive" << dendl;
621 auto keepalive_frame = KeepAliveFrame::Encode();
622 if (!append_frame(keepalive_frame)) {
623 connection->write_lock.unlock();
624 connection->lock.lock();
625 fault();
626 connection->lock.unlock();
627 return;
628 }
629 keepalive = false;
630 }
631
632 auto start = ceph::mono_clock::now();
633 bool more;
634 do {
635 const auto out_entry = _get_next_outgoing();
636 if (!out_entry.m) {
637 break;
638 }
639
640 if (!connection->policy.lossy) {
641 // put on sent list
642 sent.push_back(out_entry.m);
643 out_entry.m->get();
644 }
645 more = !out_queue.empty();
646 connection->write_lock.unlock();
647
648 // send_message or requeue messages may not encode message
649 if (!out_entry.is_prepared) {
650 prepare_send_message(connection->get_features(), out_entry.m);
651 }
652
653 if (out_entry.m->queue_start != ceph::mono_time()) {
654 connection->logger->tinc(l_msgr_send_messages_queue_lat,
655 ceph::mono_clock::now() -
656 out_entry.m->queue_start);
657 }
658
659 r = write_message(out_entry.m, more);
660
661 connection->write_lock.lock();
662 if (r == 0) {
663 ;
664 } else if (r < 0) {
665 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
666 break;
667 } else if (r > 0) {
668 // Outbound message in-progress, thread will be re-awoken
669 // when the outbound socket is writeable again
670 break;
671 }
672 } while (can_write);
673 write_in_progress = false;
674
675 // if r > 0 mean data still lefted, so no need _try_send.
676 if (r == 0) {
677 uint64_t left = ack_left;
678 if (left) {
679 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
680 << " messages" << dendl;
681 auto ack_frame = AckFrame::Encode(in_seq);
682 if (append_frame(ack_frame)) {
683 ack_left -= left;
684 left = ack_left;
685 r = connection->_try_send(left);
686 } else {
687 r = -EILSEQ;
688 }
689 } else if (is_queued()) {
690 r = connection->_try_send();
691 }
692 }
693 connection->write_lock.unlock();
694
695 connection->logger->tinc(l_msgr_running_send_time,
696 ceph::mono_clock::now() - start);
697 if (r < 0) {
698 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
699 connection->lock.lock();
700 fault();
701 connection->lock.unlock();
702 return;
703 }
704 } else {
705 write_in_progress = false;
706 connection->write_lock.unlock();
707 connection->lock.lock();
708 connection->write_lock.lock();
709 if (state == STANDBY && !connection->policy.server && is_queued()) {
710 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
711 if (server_cookie) { // only increment connect_seq if there is a session
712 connect_seq++;
713 }
714 connection->_connect();
715 } else if (connection->cs && state != NONE && state != CLOSED &&
716 state != START_CONNECT) {
717 r = connection->_try_send();
718 if (r < 0) {
719 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
720 connection->write_lock.unlock();
721 fault();
722 connection->lock.unlock();
723 return;
724 }
725 }
726 connection->write_lock.unlock();
727 connection->lock.unlock();
728 }
729 }
730
731 bool ProtocolV2::is_queued() {
732 return !out_queue.empty() || connection->is_queued();
733 }
734
735 uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
736 if (session_stream_handlers.rx) {
737 return segment_onwire_size(logical_size);
738 } else {
739 return logical_size;
740 }
741 }
742
743 uint32_t ProtocolV2::get_epilogue_size() const {
744 // In secure mode size of epilogue is flexible and depends on particular
745 // cipher implementation. See the comment for epilogue_secure_block_t or
746 // epilogue_plain_block_t.
747 if (session_stream_handlers.rx) {
748 return FRAME_SECURE_EPILOGUE_SIZE + \
749 session_stream_handlers.rx->get_extra_size_at_final();
750 } else {
751 return FRAME_PLAIN_EPILOGUE_SIZE;
752 }
753 }
754
755 CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
756 rx_buffer_t &&buffer) {
757 const auto len = buffer->length();
758 const auto buf = buffer->c_str();
759 next.node = std::move(buffer);
760 ssize_t r = connection->read(len, buf,
761 [&next, this](char *buffer, int r) {
762 if (unlikely(pre_auth.enabled) && r >= 0) {
763 pre_auth.rxbuf.append(*next.node);
764 ceph_assert(!cct->_conf->ms_die_on_bug ||
765 pre_auth.rxbuf.length() < 1000000);
766 }
767 next.r = r;
768 run_continuation(next);
769 });
770 if (r <= 0) {
771 // error or done synchronously
772 if (unlikely(pre_auth.enabled) && r >= 0) {
773 pre_auth.rxbuf.append(*next.node);
774 ceph_assert(!cct->_conf->ms_die_on_bug ||
775 pre_auth.rxbuf.length() < 1000000);
776 }
777 next.r = r;
778 return &next;
779 }
780
781 return nullptr;
782 }
783
784 template <class F>
785 CtPtr ProtocolV2::write(const std::string &desc,
786 CONTINUATION_TYPE<ProtocolV2> &next,
787 F &frame) {
788 ceph::bufferlist bl;
789 try {
790 bl = frame.get_buffer(session_stream_handlers);
791 } catch (ceph::crypto::onwire::TxHandlerError &e) {
792 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
793 return _fault();
794 }
795 return write(desc, next, bl);
796 }
797
798 CtPtr ProtocolV2::write(const std::string &desc,
799 CONTINUATION_TYPE<ProtocolV2> &next,
800 bufferlist &buffer) {
801 if (unlikely(pre_auth.enabled)) {
802 pre_auth.txbuf.append(buffer);
803 ceph_assert(!cct->_conf->ms_die_on_bug ||
804 pre_auth.txbuf.length() < 1000000);
805 }
806
807 ssize_t r =
808 connection->write(buffer, [&next, desc, this](int r) {
809 if (r < 0) {
810 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
811 << " (" << cpp_strerror(r) << ")" << dendl;
812 connection->inject_delay();
813 _fault();
814 }
815 run_continuation(next);
816 });
817
818 if (r < 0) {
819 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
820 << " (" << cpp_strerror(r) << ")" << dendl;
821 return _fault();
822 } else if (r == 0) {
823 next.setParams();
824 return &next;
825 }
826
827 return nullptr;
828 }
829
830 CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
831 ldout(cct, 20) << __func__ << dendl;
832 bannerExchangeCallback = &callback;
833
834 bufferlist banner_payload;
835 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
836 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
837
838 bufferlist bl;
839 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
840 encode((uint16_t)banner_payload.length(), bl, 0);
841 bl.claim_append(banner_payload);
842
843 INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
844
845 return WRITE(bl, "banner", _wait_for_peer_banner);
846 }
847
848 CtPtr ProtocolV2::_wait_for_peer_banner() {
849 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
850 return READ(banner_len, _handle_peer_banner);
851 }
852
853 CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
854 ldout(cct, 20) << __func__ << " r=" << r << dendl;
855
856 if (r < 0) {
857 ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
858 << cpp_strerror(r) << ")" << dendl;
859 return _fault();
860 }
861
862 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
863
864 if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
865 if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
866 lderr(cct) << __func__ << " peer " << *connection->peer_addrs
867 << " is using msgr V1 protocol" << dendl;
868 return _fault();
869 }
870 ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
871 return _fault();
872 }
873
874 uint16_t payload_len;
875 bufferlist bl;
876 buffer->set_offset(banner_prefix_len);
877 buffer->set_length(sizeof(ceph_le16));
878 bl.push_back(std::move(buffer));
879 auto ti = bl.cbegin();
880 try {
881 decode(payload_len, ti);
882 } catch (const buffer::error &e) {
883 lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
884 return _fault();
885 }
886
887 INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
888
889 return READ(payload_len, _handle_peer_banner_payload);
890 }
891
892 CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
893 ldout(cct, 20) << __func__ << " r=" << r << dendl;
894
895 if (r < 0) {
896 ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
897 << " (" << cpp_strerror(r) << ")" << dendl;
898 return _fault();
899 }
900
901 uint64_t peer_supported_features;
902 uint64_t peer_required_features;
903
904 bufferlist bl;
905 bl.push_back(std::move(buffer));
906 auto ti = bl.cbegin();
907 try {
908 decode(peer_supported_features, ti);
909 decode(peer_required_features, ti);
910 } catch (const buffer::error &e) {
911 lderr(cct) << __func__ << " decode banner payload failed " << dendl;
912 return _fault();
913 }
914
915 ldout(cct, 1) << __func__ << " supported=" << std::hex
916 << peer_supported_features << " required=" << std::hex
917 << peer_required_features << std::dec << dendl;
918
919 // Check feature bit compatibility
920
921 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
922 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
923
924 if ((required_features & peer_supported_features) != required_features) {
925 ldout(cct, 1) << __func__ << " peer does not support all required features"
926 << " required=" << std::hex << required_features
927 << " supported=" << std::hex << peer_supported_features
928 << std::dec << dendl;
929 stop();
930 connection->dispatch_queue->queue_reset(connection);
931 return nullptr;
932 }
933 if ((supported_features & peer_required_features) != peer_required_features) {
934 ldout(cct, 1) << __func__ << " we do not support all peer required features"
935 << " required=" << std::hex << peer_required_features
936 << " supported=" << supported_features << std::dec << dendl;
937 stop();
938 connection->dispatch_queue->queue_reset(connection);
939 return nullptr;
940 }
941
942 this->peer_required_features = peer_required_features;
943 if (this->peer_required_features == 0) {
944 this->connection_features = msgr2_required;
945 }
946
947 // at this point we can change how the client protocol behaves based on
948 // this->peer_required_features
949
950 if (state == BANNER_CONNECTING) {
951 state = HELLO_CONNECTING;
952 }
953 else {
954 ceph_assert(state == BANNER_ACCEPTING);
955 state = HELLO_ACCEPTING;
956 }
957
958 auto hello = HelloFrame::Encode(messenger->get_mytype(),
959 connection->target_addr);
960
961 INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
962
963 return WRITE(hello, "hello frame", read_frame);
964 }
965
966 CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
967 {
968 ldout(cct, 20) << __func__
969 << " payload.length()=" << payload.length() << dendl;
970
971 if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
972 lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
973 return _fault();
974 }
975
976 auto hello = HelloFrame::Decode(payload);
977
978 ldout(cct, 5) << __func__ << " received hello:"
979 << " peer_type=" << (int)hello.entity_type()
980 << " peer_addr_for_me=" << hello.peer_addr() << dendl;
981
982 sockaddr_storage ss;
983 socklen_t len = sizeof(ss);
984 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
985 ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
986 << " when talking to " << connection->target_addr << dendl;
987
988 if (connection->get_peer_type() == -1) {
989 connection->set_peer_type(hello.entity_type());
990
991 ceph_assert(state == HELLO_ACCEPTING);
992 connection->policy = messenger->get_policy(hello.entity_type());
993 ldout(cct, 10) << __func__ << " accept of host_type "
994 << (int)hello.entity_type()
995 << ", policy.lossy=" << connection->policy.lossy
996 << " policy.server=" << connection->policy.server
997 << " policy.standby=" << connection->policy.standby
998 << " policy.resetcheck=" << connection->policy.resetcheck
999 << dendl;
1000 } else {
1001 ceph_assert(state == HELLO_CONNECTING);
1002 if (connection->get_peer_type() != hello.entity_type()) {
1003 ldout(cct, 1) << __func__ << " connection peer type does not match what"
1004 << " peer advertises " << connection->get_peer_type()
1005 << " != " << (int)hello.entity_type() << dendl;
1006 stop();
1007 connection->dispatch_queue->queue_reset(connection);
1008 return nullptr;
1009 }
1010 }
1011
1012 if (messenger->get_myaddrs().empty() ||
1013 messenger->get_myaddrs().front().is_blank_ip()) {
1014 entity_addr_t a;
1015 if (cct->_conf->ms_learn_addr_from_peer) {
1016 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
1017 << " says I am " << hello.peer_addr() << " (socket says "
1018 << (sockaddr*)&ss << ")" << dendl;
1019 a = hello.peer_addr();
1020 } else {
1021 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
1022 << " says I am " << (sockaddr*)&ss
1023 << " (peer says " << hello.peer_addr() << ")" << dendl;
1024 a.set_sockaddr((sockaddr *)&ss);
1025 }
1026 a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
1027 a.set_port(0);
1028 connection->lock.unlock();
1029 messenger->learned_addr(a);
1030 if (cct->_conf->ms_inject_internal_delays &&
1031 cct->_conf->ms_inject_socket_failures) {
1032 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1033 ldout(cct, 10) << __func__ << " sleep for "
1034 << cct->_conf->ms_inject_internal_delays << dendl;
1035 utime_t t;
1036 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1037 t.sleep();
1038 }
1039 }
1040 connection->lock.lock();
1041 if (state != HELLO_CONNECTING) {
1042 ldout(cct, 1) << __func__
1043 << " state changed while learned_addr, mark_down or "
1044 << " replacing must be happened just now" << dendl;
1045 return nullptr;
1046 }
1047 }
1048
1049
1050
1051 CtPtr callback;
1052 callback = bannerExchangeCallback;
1053 bannerExchangeCallback = nullptr;
1054 ceph_assert(callback);
1055 return callback;
1056 }
1057
1058 CtPtr ProtocolV2::read_frame() {
1059 if (state == CLOSED) {
1060 return nullptr;
1061 }
1062
1063 ldout(cct, 20) << __func__ << dendl;
1064 return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
1065 }
1066
1067 CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
1068 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1069
1070 if (r < 0) {
1071 ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
1072 << " (" << cpp_strerror(r) << ")" << dendl;
1073 return _fault();
1074 }
1075
1076 ceph::bufferlist preamble;
1077 preamble.push_back(std::move(buffer));
1078
1079 ldout(cct, 30) << __func__ << " preamble\n";
1080 preamble.hexdump(*_dout);
1081 *_dout << dendl;
1082
1083 if (session_stream_handlers.rx) {
1084 ceph_assert(session_stream_handlers.rx);
1085
1086 session_stream_handlers.rx->reset_rx_handler();
1087 preamble = session_stream_handlers.rx->authenticated_decrypt_update(
1088 std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
1089
1090 ldout(cct, 10) << __func__ << " got encrypted preamble."
1091 << " after decrypt premable.length()=" << preamble.length()
1092 << dendl;
1093
1094 ldout(cct, 30) << __func__ << " preamble after decrypt\n";
1095 preamble.hexdump(*_dout);
1096 *_dout << dendl;
1097 }
1098
1099 {
1100 // I expect ceph_le32 will make the endian conversion for me. Passing
1101 // everything through ::Decode is unnecessary.
1102 const auto& main_preamble = \
1103 reinterpret_cast<preamble_block_t&>(*preamble.c_str());
1104
1105 // verify preamble's CRC before any further processing
1106 const auto rx_crc = ceph_crc32c(0,
1107 reinterpret_cast<const unsigned char*>(&main_preamble),
1108 sizeof(main_preamble) - sizeof(main_preamble.crc));
1109 if (rx_crc != main_preamble.crc) {
1110 ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
1111 << " rx_crc=" << rx_crc
1112 << " tx_crc=" << main_preamble.crc << dendl;
1113 return _fault();
1114 }
1115
1116 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1117 if (main_preamble.num_segments < 1 ||
1118 main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1119 ldout(cct, 10) << __func__ << " unsupported num_segments="
1120 << " tx_crc=" << main_preamble.num_segments << dendl;
1121 return _fault();
1122 }
1123
1124 next_tag = static_cast<Tag>(main_preamble.tag);
1125
1126 rx_segments_desc.clear();
1127 rx_segments_data.clear();
1128
1129 if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1130 ldout(cct, 30) << __func__
1131 << " num_segments=" << main_preamble.num_segments
1132 << " is too much" << dendl;
1133 return _fault();
1134 }
1135 for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
1136 ldout(cct, 10) << __func__ << " got new segment:"
1137 << " len=" << main_preamble.segments[idx].length
1138 << " align=" << main_preamble.segments[idx].alignment
1139 << dendl;
1140 rx_segments_desc.emplace_back(main_preamble.segments[idx]);
1141 }
1142 }
1143
1144 // does it need throttle?
1145 if (next_tag == Tag::MESSAGE) {
1146 if (state != READY) {
1147 lderr(cct) << __func__ << " not in ready state!" << dendl;
1148 return _fault();
1149 }
1150 state = THROTTLE_MESSAGE;
1151 return CONTINUE(throttle_message);
1152 } else {
1153 return read_frame_segment();
1154 }
1155 }
1156
1157 CtPtr ProtocolV2::handle_read_frame_dispatch() {
1158 ldout(cct, 10) << __func__
1159 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1160
1161 switch (next_tag) {
1162 case Tag::HELLO:
1163 case Tag::AUTH_REQUEST:
1164 case Tag::AUTH_BAD_METHOD:
1165 case Tag::AUTH_REPLY_MORE:
1166 case Tag::AUTH_REQUEST_MORE:
1167 case Tag::AUTH_DONE:
1168 case Tag::AUTH_SIGNATURE:
1169 case Tag::CLIENT_IDENT:
1170 case Tag::SERVER_IDENT:
1171 case Tag::IDENT_MISSING_FEATURES:
1172 case Tag::SESSION_RECONNECT:
1173 case Tag::SESSION_RESET:
1174 case Tag::SESSION_RETRY:
1175 case Tag::SESSION_RETRY_GLOBAL:
1176 case Tag::SESSION_RECONNECT_OK:
1177 case Tag::KEEPALIVE2:
1178 case Tag::KEEPALIVE2_ACK:
1179 case Tag::ACK:
1180 case Tag::WAIT:
1181 return handle_frame_payload();
1182 case Tag::MESSAGE:
1183 return handle_message();
1184 default: {
1185 lderr(cct) << __func__
1186 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1187 << dendl;
1188 return _fault();
1189 }
1190 }
1191
1192 return nullptr;
1193 }
1194
1195 CtPtr ProtocolV2::read_frame_segment() {
1196 ldout(cct, 20) << __func__ << dendl;
1197 ceph_assert(!rx_segments_desc.empty());
1198
1199 // description of current segment to read
1200 const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
1201 rx_buffer_t rx_buffer;
1202 try {
1203 rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
1204 get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
1205 } catch (std::bad_alloc&) {
1206 // Catching because of potential issues with satisfying alignment.
1207 ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
1208 << " len=" << get_onwire_size(cur_rx_desc.length)
1209 << " align=" << cur_rx_desc.alignment
1210 << dendl;
1211 return _fault();
1212 }
1213
1214 return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1215 }
1216
1217 CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1218 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1219
1220 if (r < 0) {
1221 ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1222 << cpp_strerror(r) << ")" << dendl;
1223 return _fault();
1224 }
1225
1226 rx_segments_data.emplace_back();
1227 rx_segments_data.back().push_back(std::move(rx_buffer));
1228
1229 // decrypt incoming data
1230 // FIXME: if (auth_meta->is_mode_secure()) {
1231 if (session_stream_handlers.rx) {
1232 ceph_assert(session_stream_handlers.rx);
1233
1234 auto& new_seg = rx_segments_data.back();
1235 if (new_seg.length()) {
1236 auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
1237 std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
1238 const auto idx = rx_segments_data.size() - 1;
1239 new_seg.clear();
1240 padded.splice(0, rx_segments_desc[idx].length, &new_seg);
1241
1242 ldout(cct, 20) << __func__
1243 << " unpadded new_seg.length()=" << new_seg.length()
1244 << dendl;
1245 }
1246 }
1247
1248 if (rx_segments_desc.size() == rx_segments_data.size()) {
1249 // OK, all segments planned to read are read. Can go with epilogue.
1250 return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
1251 } else {
1252 // TODO: for makeshift only. This will be more generic and throttled
1253 return read_frame_segment();
1254 }
1255 }
1256
1257 CtPtr ProtocolV2::handle_frame_payload() {
1258 ceph_assert(!rx_segments_data.empty());
1259 auto& payload = rx_segments_data.back();
1260
1261 ldout(cct, 30) << __func__ << "\n";
1262 payload.hexdump(*_dout);
1263 *_dout << dendl;
1264
1265 switch (next_tag) {
1266 case Tag::HELLO:
1267 return handle_hello(payload);
1268 case Tag::AUTH_REQUEST:
1269 return handle_auth_request(payload);
1270 case Tag::AUTH_BAD_METHOD:
1271 return handle_auth_bad_method(payload);
1272 case Tag::AUTH_REPLY_MORE:
1273 return handle_auth_reply_more(payload);
1274 case Tag::AUTH_REQUEST_MORE:
1275 return handle_auth_request_more(payload);
1276 case Tag::AUTH_DONE:
1277 return handle_auth_done(payload);
1278 case Tag::AUTH_SIGNATURE:
1279 return handle_auth_signature(payload);
1280 case Tag::CLIENT_IDENT:
1281 return handle_client_ident(payload);
1282 case Tag::SERVER_IDENT:
1283 return handle_server_ident(payload);
1284 case Tag::IDENT_MISSING_FEATURES:
1285 return handle_ident_missing_features(payload);
1286 case Tag::SESSION_RECONNECT:
1287 return handle_reconnect(payload);
1288 case Tag::SESSION_RESET:
1289 return handle_session_reset(payload);
1290 case Tag::SESSION_RETRY:
1291 return handle_session_retry(payload);
1292 case Tag::SESSION_RETRY_GLOBAL:
1293 return handle_session_retry_global(payload);
1294 case Tag::SESSION_RECONNECT_OK:
1295 return handle_reconnect_ok(payload);
1296 case Tag::KEEPALIVE2:
1297 return handle_keepalive2(payload);
1298 case Tag::KEEPALIVE2_ACK:
1299 return handle_keepalive2_ack(payload);
1300 case Tag::ACK:
1301 return handle_message_ack(payload);
1302 case Tag::WAIT:
1303 return handle_wait(payload);
1304 default:
1305 ceph_abort();
1306 }
1307 return nullptr;
1308 }
1309
1310 CtPtr ProtocolV2::ready() {
1311 ldout(cct, 25) << __func__ << dendl;
1312
1313 reconnecting = false;
1314 replacing = false;
1315
1316 // make sure no pending tick timer
1317 if (connection->last_tick_id) {
1318 connection->center->delete_time_event(connection->last_tick_id);
1319 }
1320 connection->last_tick_id = connection->center->create_time_event(
1321 connection->inactive_timeout_us, connection->tick_handler);
1322
1323 {
1324 std::lock_guard<std::mutex> l(connection->write_lock);
1325 can_write = true;
1326 if (!out_queue.empty()) {
1327 connection->center->dispatch_event_external(connection->write_handler);
1328 }
1329 }
1330
1331 connection->maybe_start_delay_thread();
1332
1333 state = READY;
1334 ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1335 << std::hex << client_cookie << " server_cookie="
1336 << server_cookie << std::dec << " in_seq=" << in_seq
1337 << " out_seq=" << out_seq << dendl;
1338
1339 INTERCEPT(15);
1340
1341 return CONTINUE(read_frame);
1342 }
1343
1344 CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1345 {
1346 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1347
1348 if (r < 0) {
1349 ldout(cct, 1) << __func__ << " read data error " << dendl;
1350 return _fault();
1351 }
1352
1353 __u8 late_flags;
1354
1355 // FIXME: if (auth_meta->is_mode_secure()) {
1356 if (session_stream_handlers.rx) {
1357 ldout(cct, 1) << __func__ << " read frame epilogue bytes="
1358 << get_epilogue_size() << dendl;
1359
1360 // decrypt epilogue and authenticate entire frame.
1361 ceph::bufferlist epilogue_bl;
1362 {
1363 epilogue_bl.push_back(std::move(buffer));
1364 try {
1365 epilogue_bl =
1366 session_stream_handlers.rx->authenticated_decrypt_update_final(
1367 std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
1368 } catch (ceph::crypto::onwire::MsgAuthError &e) {
1369 ldout(cct, 5) << __func__ << " message authentication failed: "
1370 << e.what() << dendl;
1371 return _fault();
1372 }
1373 }
1374 auto& epilogue =
1375 reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
1376 late_flags = epilogue.late_flags;
1377 } else {
1378 auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
1379
1380 for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
1381 const __u32 expected_crc = epilogue.crc_values[idx];
1382 const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
1383 if (expected_crc != calculated_crc) {
1384 ldout(cct, 5) << __func__ << " message integrity check failed: "
1385 << " expected_crc=" << expected_crc
1386 << " calculated_crc=" << calculated_crc
1387 << dendl;
1388 return _fault();
1389 } else {
1390 ldout(cct, 20) << __func__ << " message integrity check success: "
1391 << " expected_crc=" << expected_crc
1392 << " calculated_crc=" << calculated_crc
1393 << dendl;
1394 }
1395 }
1396 late_flags = epilogue.late_flags;
1397 }
1398
1399 // we do have a mechanism that allows transmitter to start sending message
1400 // and abort after putting entire data field on wire. This will be used by
1401 // the kernel client to avoid unnecessary buffering.
1402 if (late_flags & FRAME_FLAGS_LATEABRT) {
1403 reset_throttle();
1404 state = READY;
1405 return CONTINUE(read_frame);
1406 } else {
1407 return handle_read_frame_dispatch();
1408 }
1409 }
1410
1411 CtPtr ProtocolV2::handle_message() {
1412 ldout(cct, 20) << __func__ << dendl;
1413 ceph_assert(state == THROTTLE_DONE);
1414
1415 #if defined(WITH_EVENTTRACE)
1416 utime_t ltt_recv_stamp = ceph_clock_now();
1417 #endif
1418 recv_stamp = ceph_clock_now();
1419
1420 // we need to get the size before std::moving segments data
1421 const size_t cur_msg_size = get_current_msg_size();
1422 auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
1423
1424 // XXX: paranoid copy just to avoid oops
1425 ceph_msg_header2 current_header = msg_frame.header();
1426
1427 ldout(cct, 5) << __func__
1428 << " got " << msg_frame.front_len()
1429 << " + " << msg_frame.middle_len()
1430 << " + " << msg_frame.data_len()
1431 << " byte message."
1432 << " envelope type=" << current_header.type
1433 << " src " << peer_name
1434 << " off " << current_header.data_off
1435 << dendl;
1436
1437 INTERCEPT(16);
1438 ceph_msg_header header{current_header.seq,
1439 current_header.tid,
1440 current_header.type,
1441 current_header.priority,
1442 current_header.version,
1443 init_le32(msg_frame.front_len()),
1444 init_le32(msg_frame.middle_len()),
1445 init_le32(msg_frame.data_len()),
1446 current_header.data_off,
1447 peer_name,
1448 current_header.compat_version,
1449 current_header.reserved,
1450 init_le32(0)};
1451 ceph_msg_footer footer{init_le32(0), init_le32(0),
1452 init_le32(0), init_le64(0), current_header.flags};
1453
1454 Message *message = decode_message(cct, 0, header, footer,
1455 msg_frame.front(),
1456 msg_frame.middle(),
1457 msg_frame.data(),
1458 connection);
1459 if (!message) {
1460 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1461 return _fault();
1462 } else {
1463 state = READ_MESSAGE_COMPLETE;
1464 }
1465
1466 INTERCEPT(17);
1467
1468 message->set_byte_throttler(connection->policy.throttler_bytes);
1469 message->set_message_throttler(connection->policy.throttler_messages);
1470
1471 // store reservation size in message, so we don't get confused
1472 // by messages entering the dispatch queue through other paths.
1473 message->set_dispatch_throttle_size(cur_msg_size);
1474
1475 message->set_recv_stamp(recv_stamp);
1476 message->set_throttle_stamp(throttle_stamp);
1477 message->set_recv_complete_stamp(ceph_clock_now());
1478
1479 // check received seq#. if it is old, drop the message.
1480 // note that incoming messages may skip ahead. this is convenient for the
1481 // client side queueing because messages can't be renumbered, but the (kernel)
1482 // client will occasionally pull a message out of the sent queue to send
1483 // elsewhere. in that case it doesn't matter if we "got" it or not.
1484 uint64_t cur_seq = in_seq;
1485 if (message->get_seq() <= cur_seq) {
1486 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1487 << " <= " << cur_seq << " " << message << " " << *message
1488 << ", discarding" << dendl;
1489 message->put();
1490 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1491 cct->_conf->ms_die_on_old_message) {
1492 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1493 }
1494 return nullptr;
1495 }
1496 if (message->get_seq() > cur_seq + 1) {
1497 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
1498 << cur_seq << " to " << message->get_seq() << dendl;
1499 if (cct->_conf->ms_die_on_skipped_message) {
1500 ceph_assert(0 == "skipped incoming seq");
1501 }
1502 }
1503
1504 #if defined(WITH_EVENTTRACE)
1505 if (message->get_type() == CEPH_MSG_OSD_OP ||
1506 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1507 utime_t ltt_processed_stamp = ceph_clock_now();
1508 double usecs_elapsed =
1509 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
1510 ostringstream buf;
1511 if (message->get_type() == CEPH_MSG_OSD_OP)
1512 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1513 false);
1514 else
1515 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1516 false);
1517 }
1518 #endif
1519
1520 // note last received message.
1521 in_seq = message->get_seq();
1522 ldout(cct, 5) << __func__ << " received message m=" << message
1523 << " seq=" << message->get_seq()
1524 << " from=" << message->get_source() << " type=" << header.type
1525 << " " << *message << dendl;
1526
1527 bool need_dispatch_writer = false;
1528 if (!connection->policy.lossy) {
1529 ack_left++;
1530 need_dispatch_writer = true;
1531 }
1532
1533 state = READY;
1534
1535 ceph::mono_time fast_dispatch_time;
1536
1537 if (connection->is_blackhole()) {
1538 ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1539 message->put();
1540 goto out;
1541 }
1542
1543 connection->logger->inc(l_msgr_recv_messages);
1544 connection->logger->inc(
1545 l_msgr_recv_bytes,
1546 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1547
1548 messenger->ms_fast_preprocess(message);
1549 fast_dispatch_time = ceph::mono_clock::now();
1550 connection->logger->tinc(l_msgr_running_recv_time,
1551 fast_dispatch_time - connection->recv_start_time);
1552 if (connection->delay_state) {
1553 double delay_period = 0;
1554 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1555 delay_period =
1556 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1557 ldout(cct, 1) << "queue_received will delay after "
1558 << (ceph_clock_now() + delay_period) << " on " << message
1559 << " " << *message << dendl;
1560 }
1561 connection->delay_state->queue(delay_period, message);
1562 } else if (messenger->ms_can_fast_dispatch(message)) {
1563 connection->lock.unlock();
1564 connection->dispatch_queue->fast_dispatch(message);
1565 connection->recv_start_time = ceph::mono_clock::now();
1566 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1567 connection->recv_start_time - fast_dispatch_time);
1568 connection->lock.lock();
1569 // we might have been reused by another connection
1570 // let's check if that is the case
1571 if (state != READY) {
1572 // yes, that was the case, let's do nothing
1573 return nullptr;
1574 }
1575 } else {
1576 connection->dispatch_queue->enqueue(message, message->get_priority(),
1577 connection->conn_id);
1578 }
1579
1580 handle_message_ack(current_header.ack_seq);
1581
1582 out:
1583 if (need_dispatch_writer && connection->is_connected()) {
1584 connection->center->dispatch_event_external(connection->write_handler);
1585 }
1586
1587 return CONTINUE(read_frame);
1588 }
1589
1590
1591 CtPtr ProtocolV2::throttle_message() {
1592 ldout(cct, 20) << __func__ << dendl;
1593
1594 if (connection->policy.throttler_messages) {
1595 ldout(cct, 10) << __func__ << " wants " << 1
1596 << " message from policy throttler "
1597 << connection->policy.throttler_messages->get_current()
1598 << "/" << connection->policy.throttler_messages->get_max()
1599 << dendl;
1600 if (!connection->policy.throttler_messages->get_or_fail()) {
1601 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
1602 << connection->policy.throttler_messages->get_current()
1603 << "/" << connection->policy.throttler_messages->get_max()
1604 << " failed, just wait." << dendl;
1605 // following thread pool deal with th full message queue isn't a
1606 // short time, so we can wait a ms.
1607 if (connection->register_time_events.empty()) {
1608 connection->register_time_events.insert(
1609 connection->center->create_time_event(1000,
1610 connection->wakeup_handler));
1611 }
1612 return nullptr;
1613 }
1614 }
1615
1616 state = THROTTLE_BYTES;
1617 return CONTINUE(throttle_bytes);
1618 }
1619
1620 CtPtr ProtocolV2::throttle_bytes() {
1621 ldout(cct, 20) << __func__ << dendl;
1622
1623 const size_t cur_msg_size = get_current_msg_size();
1624 if (cur_msg_size) {
1625 if (connection->policy.throttler_bytes) {
1626 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1627 << " bytes from policy throttler "
1628 << connection->policy.throttler_bytes->get_current() << "/"
1629 << connection->policy.throttler_bytes->get_max() << dendl;
1630 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
1631 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1632 << " bytes from policy throttler "
1633 << connection->policy.throttler_bytes->get_current()
1634 << "/" << connection->policy.throttler_bytes->get_max()
1635 << " failed, just wait." << dendl;
1636 // following thread pool deal with th full message queue isn't a
1637 // short time, so we can wait a ms.
1638 if (connection->register_time_events.empty()) {
1639 connection->register_time_events.insert(
1640 connection->center->create_time_event(
1641 1000, connection->wakeup_handler));
1642 }
1643 return nullptr;
1644 }
1645 }
1646 }
1647
1648 state = THROTTLE_DISPATCH_QUEUE;
1649 return CONTINUE(throttle_dispatch_queue);
1650 }
1651
1652 CtPtr ProtocolV2::throttle_dispatch_queue() {
1653 ldout(cct, 20) << __func__ << dendl;
1654
1655 const size_t cur_msg_size = get_current_msg_size();
1656 if (cur_msg_size) {
1657 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1658 cur_msg_size)) {
1659 ldout(cct, 10)
1660 << __func__ << " wants " << cur_msg_size
1661 << " bytes from dispatch throttle "
1662 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1663 << connection->dispatch_queue->dispatch_throttler.get_max()
1664 << " failed, just wait." << dendl;
1665 // following thread pool deal with th full message queue isn't a
1666 // short time, so we can wait a ms.
1667 if (connection->register_time_events.empty()) {
1668 connection->register_time_events.insert(
1669 connection->center->create_time_event(1000,
1670 connection->wakeup_handler));
1671 }
1672 return nullptr;
1673 }
1674 }
1675
1676 throttle_stamp = ceph_clock_now();
1677 state = THROTTLE_DONE;
1678
1679 return read_frame_segment();
1680 }
1681
1682 CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1683 {
1684 ldout(cct, 20) << __func__
1685 << " payload.length()=" << payload.length() << dendl;
1686
1687 if (state != READY) {
1688 lderr(cct) << __func__ << " not in ready state!" << dendl;
1689 return _fault();
1690 }
1691
1692 auto keepalive_frame = KeepAliveFrame::Decode(payload);
1693
1694 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1695
1696 connection->write_lock.lock();
1697 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(keepalive_frame.timestamp());
1698 if (!append_frame(keepalive_ack_frame)) {
1699 connection->write_lock.unlock();
1700 return _fault();
1701 }
1702 connection->write_lock.unlock();
1703
1704 ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1705 << keepalive_frame.timestamp() << dendl;
1706 connection->set_last_keepalive(ceph_clock_now());
1707
1708 if (is_connected()) {
1709 connection->center->dispatch_event_external(connection->write_handler);
1710 }
1711
1712 return CONTINUE(read_frame);
1713 }
1714
1715 CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1716 {
1717 ldout(cct, 20) << __func__
1718 << " payload.length()=" << payload.length() << dendl;
1719
1720 if (state != READY) {
1721 lderr(cct) << __func__ << " not in ready state!" << dendl;
1722 return _fault();
1723 }
1724
1725 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1726 connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1727 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1728
1729 return CONTINUE(read_frame);
1730 }
1731
1732 CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1733 {
1734 ldout(cct, 20) << __func__
1735 << " payload.length()=" << payload.length() << dendl;
1736
1737 if (state != READY) {
1738 lderr(cct) << __func__ << " not in ready state!" << dendl;
1739 return _fault();
1740 }
1741
1742 auto ack = AckFrame::Decode(payload);
1743 handle_message_ack(ack.seq());
1744 return CONTINUE(read_frame);
1745 }
1746
1747 /* Client Protocol Methods */
1748
1749 CtPtr ProtocolV2::start_client_banner_exchange() {
1750 ldout(cct, 20) << __func__ << dendl;
1751
1752 INTERCEPT(1);
1753
1754 state = BANNER_CONNECTING;
1755
1756 global_seq = messenger->get_global_seq();
1757
1758 return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1759 }
1760
1761 CtPtr ProtocolV2::post_client_banner_exchange() {
1762 ldout(cct, 20) << __func__ << dendl;
1763
1764 state = AUTH_CONNECTING;
1765
1766 return send_auth_request();
1767 }
1768
1769 CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
1770 ceph_assert(messenger->auth_client);
1771 ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1772 << " auth_client " << messenger->auth_client << dendl;
1773
1774 bufferlist bl;
1775 vector<uint32_t> preferred_modes;
1776 auto am = auth_meta;
1777 connection->lock.unlock();
1778 int r = messenger->auth_client->get_auth_request(
1779 connection, am.get(),
1780 &am->auth_method, &preferred_modes, &bl);
1781 connection->lock.lock();
1782 if (state != AUTH_CONNECTING) {
1783 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1784 return _fault();
1785 }
1786 if (r < 0) {
1787 ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1788 << dendl;
1789 stop();
1790 connection->dispatch_queue->queue_reset(connection);
1791 return nullptr;
1792 }
1793
1794 INTERCEPT(9);
1795
1796 auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1797 bl);
1798 return WRITE(frame, "auth request", read_frame);
1799 }
1800
1801 CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1802 ldout(cct, 20) << __func__
1803 << " payload.length()=" << payload.length() << dendl;
1804
1805 if (state != AUTH_CONNECTING) {
1806 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1807 return _fault();
1808 }
1809
1810 auto bad_method = AuthBadMethodFrame::Decode(payload);
1811 ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1812 << " result " << cpp_strerror(bad_method.result())
1813 << ", allowed methods=" << bad_method.allowed_methods()
1814 << ", allowed modes=" << bad_method.allowed_modes()
1815 << dendl;
1816 ceph_assert(messenger->auth_client);
1817 auto am = auth_meta;
1818 connection->lock.unlock();
1819 int r = messenger->auth_client->handle_auth_bad_method(
1820 connection,
1821 am.get(),
1822 bad_method.method(), bad_method.result(),
1823 bad_method.allowed_methods(),
1824 bad_method.allowed_modes());
1825 connection->lock.lock();
1826 if (state != AUTH_CONNECTING || r < 0) {
1827 return _fault();
1828 }
1829 return send_auth_request(bad_method.allowed_methods());
1830 }
1831
1832 CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1833 {
1834 ldout(cct, 20) << __func__
1835 << " payload.length()=" << payload.length() << dendl;
1836
1837 if (state != AUTH_CONNECTING) {
1838 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1839 return _fault();
1840 }
1841
1842 auto auth_more = AuthReplyMoreFrame::Decode(payload);
1843 ldout(cct, 5) << __func__
1844 << " auth reply more len=" << auth_more.auth_payload().length()
1845 << dendl;
1846 ceph_assert(messenger->auth_client);
1847 ceph::bufferlist reply;
1848 auto am = auth_meta;
1849 connection->lock.unlock();
1850 int r = messenger->auth_client->handle_auth_reply_more(
1851 connection, am.get(), auth_more.auth_payload(), &reply);
1852 connection->lock.lock();
1853 if (state != AUTH_CONNECTING) {
1854 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1855 return _fault();
1856 }
1857 if (r < 0) {
1858 lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1859 << r << dendl;
1860 return _fault();
1861 }
1862 auto more_reply = AuthRequestMoreFrame::Encode(reply);
1863 return WRITE(more_reply, "auth request more", read_frame);
1864 }
1865
1866 CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1867 {
1868 ldout(cct, 20) << __func__
1869 << " payload.length()=" << payload.length() << dendl;
1870
1871 if (state != AUTH_CONNECTING) {
1872 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1873 return _fault();
1874 }
1875
1876 auto auth_done = AuthDoneFrame::Decode(payload);
1877
1878 ceph_assert(messenger->auth_client);
1879 auto am = auth_meta;
1880 connection->lock.unlock();
1881 int r = messenger->auth_client->handle_auth_done(
1882 connection,
1883 am.get(),
1884 auth_done.global_id(),
1885 auth_done.con_mode(),
1886 auth_done.auth_payload(),
1887 &am->session_key,
1888 &am->connection_secret);
1889 connection->lock.lock();
1890 if (state != AUTH_CONNECTING) {
1891 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1892 return _fault();
1893 }
1894 if (r < 0) {
1895 return _fault();
1896 }
1897 auth_meta->con_mode = auth_done.con_mode();
1898 session_stream_handlers = \
1899 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
1900
1901 state = AUTH_CONNECTING_SIGN;
1902
1903 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1904 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1905 auto sig_frame = AuthSignatureFrame::Encode(sig);
1906 pre_auth.enabled = false;
1907 pre_auth.rxbuf.clear();
1908 return WRITE(sig_frame, "auth signature", read_frame);
1909 }
1910
1911 CtPtr ProtocolV2::finish_client_auth() {
1912 if (!server_cookie) {
1913 ceph_assert(connect_seq == 0);
1914 state = SESSION_CONNECTING;
1915 return send_client_ident();
1916 } else { // reconnecting to previous session
1917 state = SESSION_RECONNECTING;
1918 ceph_assert(connect_seq > 0);
1919 return send_reconnect();
1920 }
1921 }
1922
1923 CtPtr ProtocolV2::send_client_ident() {
1924 ldout(cct, 20) << __func__ << dendl;
1925
1926 if (!connection->policy.lossy && !client_cookie) {
1927 client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1928 }
1929
1930 uint64_t flags = 0;
1931 if (connection->policy.lossy) {
1932 flags |= CEPH_MSG_CONNECT_LOSSY;
1933 }
1934
1935 auto client_ident = ClientIdentFrame::Encode(
1936 messenger->get_myaddrs(),
1937 connection->target_addr,
1938 messenger->get_myname().num(),
1939 global_seq,
1940 connection->policy.features_supported,
1941 connection->policy.features_required | msgr2_required,
1942 flags,
1943 client_cookie);
1944
1945 ldout(cct, 5) << __func__ << " sending identification: "
1946 << "addrs=" << messenger->get_myaddrs()
1947 << " target=" << connection->target_addr
1948 << " gid=" << messenger->get_myname().num()
1949 << " global_seq=" << global_seq
1950 << " features_supported=" << std::hex
1951 << connection->policy.features_supported
1952 << " features_required="
1953 << (connection->policy.features_required | msgr2_required)
1954 << " flags=" << flags
1955 << " cookie=" << client_cookie << std::dec << dendl;
1956
1957 INTERCEPT(11);
1958
1959 return WRITE(client_ident, "client ident", read_frame);
1960 }
1961
1962 CtPtr ProtocolV2::send_reconnect() {
1963 ldout(cct, 20) << __func__ << dendl;
1964
1965 auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1966 client_cookie,
1967 server_cookie,
1968 global_seq,
1969 connect_seq,
1970 in_seq);
1971
1972 ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1973 << std::hex << client_cookie << " server_cookie="
1974 << server_cookie << std::dec
1975 << " gs=" << global_seq << " cs=" << connect_seq
1976 << " ms=" << in_seq << dendl;
1977
1978 INTERCEPT(13);
1979
1980 return WRITE(reconnect, "reconnect", read_frame);
1981 }
1982
1983 CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1984 {
1985 ldout(cct, 20) << __func__
1986 << " payload.length()=" << payload.length() << dendl;
1987
1988 if (state != SESSION_CONNECTING) {
1989 lderr(cct) << __func__ << " not in session connect state!" << dendl;
1990 return _fault();
1991 }
1992
1993 auto ident_missing =
1994 IdentMissingFeaturesFrame::Decode(payload);
1995 lderr(cct) << __func__
1996 << " client does not support all server features: " << std::hex
1997 << ident_missing.features() << std::dec << dendl;
1998
1999 return _fault();
2000 }
2001
2002 CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
2003 {
2004 ldout(cct, 20) << __func__
2005 << " payload.length()=" << payload.length() << dendl;
2006
2007 if (state != SESSION_RECONNECTING) {
2008 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2009 return _fault();
2010 }
2011
2012 auto reset = ResetFrame::Decode(payload);
2013
2014 ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
2015 << dendl;
2016 if (reset.full()) {
2017 reset_session();
2018 } else {
2019 server_cookie = 0;
2020 connect_seq = 0;
2021 in_seq = 0;
2022 }
2023
2024 state = SESSION_CONNECTING;
2025 return send_client_ident();
2026 }
2027
2028 CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
2029 {
2030 ldout(cct, 20) << __func__
2031 << " payload.length()=" << payload.length() << dendl;
2032
2033 if (state != SESSION_RECONNECTING) {
2034 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2035 return _fault();
2036 }
2037
2038 auto retry = RetryFrame::Decode(payload);
2039 connect_seq = retry.connect_seq() + 1;
2040
2041 ldout(cct, 1) << __func__
2042 << " received session retry connect_seq=" << retry.connect_seq()
2043 << ", inc to cs=" << connect_seq << dendl;
2044
2045 return send_reconnect();
2046 }
2047
2048 CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
2049 {
2050 ldout(cct, 20) << __func__
2051 << " payload.length()=" << payload.length() << dendl;
2052
2053 if (state != SESSION_RECONNECTING) {
2054 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2055 return _fault();
2056 }
2057
2058 auto retry = RetryGlobalFrame::Decode(payload);
2059 global_seq = messenger->get_global_seq(retry.global_seq());
2060
2061 ldout(cct, 1) << __func__ << " received session retry global global_seq="
2062 << retry.global_seq() << ", choose new gs=" << global_seq
2063 << dendl;
2064
2065 return send_reconnect();
2066 }
2067
2068 CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
2069 ldout(cct, 20) << __func__
2070 << " received WAIT (connection race)"
2071 << " payload.length()=" << payload.length()
2072 << dendl;
2073
2074 if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
2075 lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
2076 return _fault();
2077 }
2078
2079 state = WAIT;
2080 WaitFrame::Decode(payload);
2081 return _fault();
2082 }
2083
2084 CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2085 {
2086 ldout(cct, 20) << __func__
2087 << " payload.length()=" << payload.length() << dendl;
2088
2089 if (state != SESSION_RECONNECTING) {
2090 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2091 return _fault();
2092 }
2093
2094 auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2095 ldout(cct, 5) << __func__
2096 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2097 << dendl;
2098
2099 out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2100
2101 backoff = utime_t();
2102 ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2103 << ", lossy = " << connection->policy.lossy << ", features "
2104 << connection->get_features() << dendl;
2105
2106 if (connection->delay_state) {
2107 ceph_assert(connection->delay_state->ready());
2108 }
2109
2110 connection->dispatch_queue->queue_connect(connection);
2111 messenger->ms_deliver_handle_fast_connect(connection);
2112
2113 return ready();
2114 }
2115
2116 CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2117 {
2118 ldout(cct, 20) << __func__
2119 << " payload.length()=" << payload.length() << dendl;
2120
2121 if (state != SESSION_CONNECTING) {
2122 lderr(cct) << __func__ << " not in session connect state!" << dendl;
2123 return _fault();
2124 }
2125
2126 auto server_ident = ServerIdentFrame::Decode(payload);
2127 ldout(cct, 5) << __func__ << " received server identification:"
2128 << " addrs=" << server_ident.addrs()
2129 << " gid=" << server_ident.gid()
2130 << " global_seq=" << server_ident.global_seq()
2131 << " features_supported=" << std::hex
2132 << server_ident.supported_features()
2133 << " features_required=" << server_ident.required_features()
2134 << " flags=" << server_ident.flags() << " cookie=" << std::dec
2135 << server_ident.cookie() << dendl;
2136
2137 // is this who we intended to talk to?
2138 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2139 // of mon_host or something.
2140 if (!server_ident.addrs().contains(connection->target_addr)) {
2141 ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2142 << ", does not include " << connection->target_addr << dendl;
2143 return _fault();
2144 }
2145
2146 server_cookie = server_ident.cookie();
2147
2148 connection->set_peer_addrs(server_ident.addrs());
2149 peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2150 connection->set_features(server_ident.supported_features() &
2151 connection->policy.features_supported);
2152 peer_global_seq = server_ident.global_seq();
2153
2154 connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2155
2156 backoff = utime_t();
2157 ldout(cct, 10) << __func__ << " connect success " << connect_seq
2158 << ", lossy = " << connection->policy.lossy << ", features "
2159 << connection->get_features() << dendl;
2160
2161 if (connection->delay_state) {
2162 ceph_assert(connection->delay_state->ready());
2163 }
2164
2165 connection->dispatch_queue->queue_connect(connection);
2166 messenger->ms_deliver_handle_fast_connect(connection);
2167
2168 return ready();
2169 }
2170
2171 /* Server Protocol Methods */
2172
2173 CtPtr ProtocolV2::start_server_banner_exchange() {
2174 ldout(cct, 20) << __func__ << dendl;
2175
2176 INTERCEPT(2);
2177
2178 state = BANNER_ACCEPTING;
2179
2180 return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2181 }
2182
2183 CtPtr ProtocolV2::post_server_banner_exchange() {
2184 ldout(cct, 20) << __func__ << dendl;
2185
2186 state = AUTH_ACCEPTING;
2187
2188 return CONTINUE(read_frame);
2189 }
2190
2191 CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2192 ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2193 << dendl;
2194
2195 if (state != AUTH_ACCEPTING) {
2196 lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2197 return _fault();
2198 }
2199
2200 auto request = AuthRequestFrame::Decode(payload);
2201 ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2202 << ", preferred_modes=" << request.preferred_modes()
2203 << ", payload_len=" << request.auth_payload().length() << ")"
2204 << dendl;
2205 auth_meta->auth_method = request.method();
2206 auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2207 connection->get_peer_type(), auth_meta->auth_method,
2208 request.preferred_modes());
2209 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2210 return _auth_bad_method(-EOPNOTSUPP);
2211 }
2212 return _handle_auth_request(request.auth_payload(), false);
2213 }
2214
2215 CtPtr ProtocolV2::_auth_bad_method(int r)
2216 {
2217 ceph_assert(r < 0);
2218 std::vector<uint32_t> allowed_methods;
2219 std::vector<uint32_t> allowed_modes;
2220 messenger->auth_server->get_supported_auth_methods(
2221 connection->get_peer_type(), &allowed_methods, &allowed_modes);
2222 ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2223 << " r " << cpp_strerror(r)
2224 << ", allowed_methods " << allowed_methods
2225 << ", allowed_modes " << allowed_modes
2226 << dendl;
2227 auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2228 allowed_methods, allowed_modes);
2229 return WRITE(bad_method, "bad auth method", read_frame);
2230 }
2231
2232 CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
2233 {
2234 if (!messenger->auth_server) {
2235 return _fault();
2236 }
2237 bufferlist reply;
2238 auto am = auth_meta;
2239 connection->lock.unlock();
2240 int r = messenger->auth_server->handle_auth_request(
2241 connection, am.get(),
2242 more, am->auth_method, auth_payload,
2243 &reply);
2244 connection->lock.lock();
2245 if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2246 ldout(cct, 1) << __func__
2247 << " state changed while accept, it must be mark_down"
2248 << dendl;
2249 ceph_assert(state == CLOSED);
2250 return _fault();
2251 }
2252 if (r == 1) {
2253 INTERCEPT(10);
2254 state = AUTH_ACCEPTING_SIGN;
2255
2256 auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2257 auth_meta->con_mode,
2258 reply);
2259 return WRITE(auth_done, "auth done", finish_auth);
2260 } else if (r == 0) {
2261 state = AUTH_ACCEPTING_MORE;
2262
2263 auto more = AuthReplyMoreFrame::Encode(reply);
2264 return WRITE(more, "auth reply more", read_frame);
2265 } else if (r == -EBUSY) {
2266 // kick the client and maybe they'll come back later
2267 return _fault();
2268 } else {
2269 return _auth_bad_method(r);
2270 }
2271 }
2272
2273 CtPtr ProtocolV2::finish_auth()
2274 {
2275 ceph_assert(auth_meta);
2276 // TODO: having a possibility to check whether we're server or client could
2277 // allow reusing finish_auth().
2278 session_stream_handlers = \
2279 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
2280
2281 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2282 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2283 auto sig_frame = AuthSignatureFrame::Encode(sig);
2284 pre_auth.enabled = false;
2285 pre_auth.rxbuf.clear();
2286 return WRITE(sig_frame, "auth signature", read_frame);
2287 }
2288
2289 CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2290 {
2291 ldout(cct, 20) << __func__
2292 << " payload.length()=" << payload.length() << dendl;
2293
2294 if (state != AUTH_ACCEPTING_MORE) {
2295 lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2296 return _fault();
2297 }
2298
2299 auto auth_more = AuthRequestMoreFrame::Decode(payload);
2300 return _handle_auth_request(auth_more.auth_payload(), true);
2301 }
2302
2303 CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2304 {
2305 ldout(cct, 20) << __func__
2306 << " payload.length()=" << payload.length() << dendl;
2307
2308 if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2309 lderr(cct) << __func__
2310 << " pre-auth verification signature seen in wrong state!"
2311 << dendl;
2312 return _fault();
2313 }
2314
2315 auto sig_frame = AuthSignatureFrame::Decode(payload);
2316
2317 const auto actual_tx_sig = auth_meta->session_key.empty() ?
2318 sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2319 if (sig_frame.signature() != actual_tx_sig) {
2320 ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2321 << " actual_tx_sig=" << actual_tx_sig
2322 << " sig_frame.signature()=" << sig_frame.signature()
2323 << dendl;
2324 return _fault();
2325 } else {
2326 ldout(cct, 20) << __func__ << " pre-auth signature success"
2327 << " sig_frame.signature()=" << sig_frame.signature()
2328 << dendl;
2329 pre_auth.txbuf.clear();
2330 }
2331
2332 if (state == AUTH_ACCEPTING_SIGN) {
2333 // server had sent AuthDone and client responded with correct pre-auth
2334 // signature. we can start accepting new sessions/reconnects.
2335 state = SESSION_ACCEPTING;
2336 return CONTINUE(read_frame);
2337 } else if (state == AUTH_CONNECTING_SIGN) {
2338 // this happened at client side
2339 return finish_client_auth();
2340 } else {
2341 ceph_abort("state corruption");
2342 }
2343 }
2344
2345 CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2346 {
2347 ldout(cct, 20) << __func__
2348 << " payload.length()=" << payload.length() << dendl;
2349
2350 if (state != SESSION_ACCEPTING) {
2351 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2352 return _fault();
2353 }
2354
2355 auto client_ident = ClientIdentFrame::Decode(payload);
2356
2357 ldout(cct, 5) << __func__ << " received client identification:"
2358 << " addrs=" << client_ident.addrs()
2359 << " target=" << client_ident.target_addr()
2360 << " gid=" << client_ident.gid()
2361 << " global_seq=" << client_ident.global_seq()
2362 << " features_supported=" << std::hex
2363 << client_ident.supported_features()
2364 << " features_required=" << client_ident.required_features()
2365 << " flags=" << client_ident.flags()
2366 << " cookie=" << client_ident.cookie() << std::dec << dendl;
2367
2368 if (client_ident.addrs().empty() ||
2369 client_ident.addrs().front() == entity_addr_t()) {
2370 ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2371 return _fault(); // a v2 peer should never do this
2372 }
2373 if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2374 ldout(cct,5) << __func__ << " peer is trying to reach "
2375 << client_ident.target_addr()
2376 << " which is not us (" << messenger->get_myaddrs() << ")"
2377 << dendl;
2378 return _fault();
2379 }
2380
2381 connection->set_peer_addrs(client_ident.addrs());
2382 connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2383
2384 peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2385 connection->set_peer_id(client_ident.gid());
2386
2387 client_cookie = client_ident.cookie();
2388
2389 uint64_t feat_missing =
2390 (connection->policy.features_required | msgr2_required) &
2391 ~(uint64_t)client_ident.supported_features();
2392 if (feat_missing) {
2393 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2394 << feat_missing << std::dec << dendl;
2395 auto ident_missing_features =
2396 IdentMissingFeaturesFrame::Encode(feat_missing);
2397
2398 return WRITE(ident_missing_features, "ident missing features", read_frame);
2399 }
2400
2401 connection_features =
2402 client_ident.supported_features() & connection->policy.features_supported;
2403
2404 peer_global_seq = client_ident.global_seq();
2405
2406 if (connection->policy.server &&
2407 connection->policy.lossy &&
2408 !connection->policy.register_lossy_clients) {
2409 // incoming lossy client, no need to register this connection
2410 } else {
2411 // Looks good so far, let's check if there is already an existing connection
2412 // to this peer.
2413 connection->lock.unlock();
2414 AsyncConnectionRef existing = messenger->lookup_conn(
2415 *connection->peer_addrs);
2416
2417 if (existing &&
2418 existing->protocol->proto_type != 2) {
2419 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2420 << existing->protocol.get() << " version is "
2421 << existing->protocol->proto_type << ", marking down"
2422 << dendl;
2423 existing->mark_down();
2424 existing = nullptr;
2425 }
2426
2427 connection->inject_delay();
2428
2429 connection->lock.lock();
2430 if (state != SESSION_ACCEPTING) {
2431 ldout(cct, 1) << __func__
2432 << " state changed while accept, it must be mark_down"
2433 << dendl;
2434 ceph_assert(state == CLOSED);
2435 return _fault();
2436 }
2437
2438 if (existing) {
2439 return handle_existing_connection(existing);
2440 }
2441 }
2442
2443 // if everything is OK reply with server identification
2444 return send_server_ident();
2445 }
2446
2447 CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2448 {
2449 ldout(cct, 20) << __func__
2450 << " payload.length()=" << payload.length() << dendl;
2451
2452 if (state != SESSION_ACCEPTING) {
2453 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2454 return _fault();
2455 }
2456
2457 auto reconnect = ReconnectFrame::Decode(payload);
2458
2459 ldout(cct, 5) << __func__
2460 << " received reconnect:"
2461 << " client_cookie=" << std::hex << reconnect.client_cookie()
2462 << " server_cookie=" << reconnect.server_cookie() << std::dec
2463 << " gs=" << reconnect.global_seq()
2464 << " cs=" << reconnect.connect_seq()
2465 << " ms=" << reconnect.msg_seq()
2466 << dendl;
2467
2468 // Should we check if one of the ident.addrs match connection->target_addr
2469 // as we do in ProtocolV1?
2470 connection->set_peer_addrs(reconnect.addrs());
2471 connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2472 peer_global_seq = reconnect.global_seq();
2473
2474 connection->lock.unlock();
2475 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2476
2477 if (existing &&
2478 existing->protocol->proto_type != 2) {
2479 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2480 << existing->protocol.get() << " version is "
2481 << existing->protocol->proto_type << ", marking down" << dendl;
2482 existing->mark_down();
2483 existing = nullptr;
2484 }
2485
2486 connection->inject_delay();
2487
2488 connection->lock.lock();
2489 if (state != SESSION_ACCEPTING) {
2490 ldout(cct, 1) << __func__
2491 << " state changed while accept, it must be mark_down"
2492 << dendl;
2493 ceph_assert(state == CLOSED);
2494 return _fault();
2495 }
2496
2497 if (!existing) {
2498 // there is no existing connection therefore cannot reconnect to previous
2499 // session
2500 ldout(cct, 0) << __func__
2501 << " no existing connection exists, reseting client" << dendl;
2502 auto reset = ResetFrame::Encode(true);
2503 return WRITE(reset, "session reset", read_frame);
2504 }
2505
2506 std::lock_guard<std::mutex> l(existing->lock);
2507
2508 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2509 if (!exproto) {
2510 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2511 ceph_assert(false);
2512 }
2513
2514 if (exproto->state == CLOSED) {
2515 ldout(cct, 5) << __func__ << " existing " << existing
2516 << " already closed. Reseting client" << dendl;
2517 auto reset = ResetFrame::Encode(true);
2518 return WRITE(reset, "session reset", read_frame);
2519 }
2520
2521 if (exproto->replacing) {
2522 ldout(cct, 1) << __func__
2523 << " existing racing replace happened while replacing."
2524 << " existing=" << existing << dendl;
2525 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2526 return WRITE(retry, "session retry", read_frame);
2527 }
2528
2529 if (exproto->client_cookie != reconnect.client_cookie()) {
2530 ldout(cct, 1) << __func__ << " existing=" << existing
2531 << " client cookie mismatch, I must have reseted:"
2532 << " cc=" << std::hex << exproto->client_cookie
2533 << " rcc=" << reconnect.client_cookie()
2534 << ", reseting client." << std::dec
2535 << dendl;
2536 auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2537 return WRITE(reset, "session reset", read_frame);
2538 } else if (exproto->server_cookie == 0) {
2539 // this happens when:
2540 // - a connects to b
2541 // - a sends client_ident
2542 // - b gets client_ident, sends server_ident and sets cookie X
2543 // - connection fault
2544 // - b reconnects to a with cookie X, connect_seq=1
2545 // - a has cookie==0
2546 ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2547 << " server_ident. Asking peer to resume session"
2548 << " establishment" << dendl;
2549 auto reset = ResetFrame::Encode(false);
2550 return WRITE(reset, "session reset", read_frame);
2551 }
2552
2553 if (exproto->peer_global_seq > reconnect.global_seq()) {
2554 ldout(cct, 5) << __func__
2555 << " stale global_seq: sgs=" << exproto->peer_global_seq
2556 << " cgs=" << reconnect.global_seq()
2557 << ", ask client to retry global" << dendl;
2558 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2559
2560 INTERCEPT(18);
2561
2562 return WRITE(retry, "session retry", read_frame);
2563 }
2564
2565 if (exproto->connect_seq > reconnect.connect_seq()) {
2566 ldout(cct, 5) << __func__
2567 << " stale connect_seq scs=" << exproto->connect_seq
2568 << " ccs=" << reconnect.connect_seq()
2569 << " , ask client to retry" << dendl;
2570 auto retry = RetryFrame::Encode(exproto->connect_seq);
2571 return WRITE(retry, "session retry", read_frame);
2572 }
2573
2574 if (exproto->connect_seq == reconnect.connect_seq()) {
2575 // reconnect race: both peers are sending reconnect messages
2576 if (existing->peer_addrs->msgr2_addr() >
2577 messenger->get_myaddrs().msgr2_addr() &&
2578 !existing->policy.server) {
2579 // the existing connection wins
2580 ldout(cct, 1)
2581 << __func__
2582 << " reconnect race detected, this connection loses to existing="
2583 << existing << dendl;
2584
2585 auto wait = WaitFrame::Encode();
2586 return WRITE(wait, "wait", read_frame);
2587 } else {
2588 // this connection wins
2589 ldout(cct, 1) << __func__
2590 << " reconnect race detected, replacing existing="
2591 << existing << " socket by this connection's socket"
2592 << dendl;
2593 }
2594 }
2595
2596 ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2597
2598 reconnecting = true;
2599
2600 // everything looks good
2601 exproto->connect_seq = reconnect.connect_seq();
2602 exproto->message_seq = reconnect.msg_seq();
2603
2604 return reuse_connection(existing, exproto);
2605 }
2606
2607 CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
2608 ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2609
2610 std::lock_guard<std::mutex> l(existing->lock);
2611
2612 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2613 if (!exproto) {
2614 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2615 ceph_assert(false);
2616 }
2617
2618 if (exproto->state == CLOSED) {
2619 ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2620 << dendl;
2621 return send_server_ident();
2622 }
2623
2624 if (exproto->replacing) {
2625 ldout(cct, 1) << __func__
2626 << " existing racing replace happened while replacing."
2627 << " existing=" << existing << dendl;
2628 auto wait = WaitFrame::Encode();
2629 return WRITE(wait, "wait", read_frame);
2630 }
2631
2632 if (exproto->peer_global_seq > peer_global_seq) {
2633 ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2634 << peer_global_seq
2635 << " existing->peer_global_seq=" << exproto->peer_global_seq
2636 << ", stopping this connection." << dendl;
2637 stop();
2638 connection->dispatch_queue->queue_reset(connection);
2639 return nullptr;
2640 }
2641
2642 if (existing->policy.lossy) {
2643 // existing connection can be thrown out in favor of this one
2644 ldout(cct, 1)
2645 << __func__ << " existing=" << existing
2646 << " is a lossy channel. Stopping existing in favor of this connection"
2647 << dendl;
2648 existing->protocol->stop();
2649 existing->dispatch_queue->queue_reset(existing.get());
2650 return send_server_ident();
2651 }
2652
2653 if (exproto->server_cookie && exproto->client_cookie &&
2654 exproto->client_cookie != client_cookie) {
2655 // Found previous session
2656 // peer has reseted and we're going to reuse the existing connection
2657 // by replacing the communication socket
2658 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2659 << ", peer must have reseted." << dendl;
2660 if (connection->policy.resetcheck) {
2661 exproto->reset_session();
2662 }
2663 return reuse_connection(existing, exproto);
2664 }
2665
2666 if (exproto->client_cookie == client_cookie) {
2667 // session establishment interrupted between client_ident and server_ident,
2668 // continuing...
2669 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2670 << ", continuing session establishment." << dendl;
2671 return reuse_connection(existing, exproto);
2672 }
2673
2674 if (exproto->state == READY || exproto->state == STANDBY) {
2675 ldout(cct, 1) << __func__ << " existing=" << existing
2676 << " is READY/STANDBY, lets reuse it" << dendl;
2677 return reuse_connection(existing, exproto);
2678 }
2679
2680 // Looks like a connection race: server and client are both connecting to
2681 // each other at the same time.
2682 if (connection->peer_addrs->msgr2_addr() <
2683 messenger->get_myaddrs().msgr2_addr() ||
2684 existing->policy.server) {
2685 // this connection wins
2686 ldout(cct, 1) << __func__
2687 << " connection race detected, replacing existing="
2688 << existing << " socket by this connection's socket" << dendl;
2689 return reuse_connection(existing, exproto);
2690 } else {
2691 // the existing connection wins
2692 ldout(cct, 1)
2693 << __func__
2694 << " connection race detected, this connection loses to existing="
2695 << existing << dendl;
2696 ceph_assert(connection->peer_addrs->msgr2_addr() >
2697 messenger->get_myaddrs().msgr2_addr());
2698
2699 // make sure we follow through with opening the existing
2700 // connection (if it isn't yet open) since we know the peer
2701 // has something to send to us.
2702 existing->send_keepalive();
2703 auto wait = WaitFrame::Encode();
2704 return WRITE(wait, "wait", read_frame);
2705 }
2706 }
2707
2708 CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
2709 ProtocolV2 *exproto) {
2710 ldout(cct, 20) << __func__ << " existing=" << existing
2711 << " reconnect=" << reconnecting << dendl;
2712
2713 connection->inject_delay();
2714
2715 std::lock_guard<std::mutex> l(existing->write_lock);
2716
2717 connection->center->delete_file_event(connection->cs.fd(),
2718 EVENT_READABLE | EVENT_WRITABLE);
2719
2720 if (existing->delay_state) {
2721 existing->delay_state->flush();
2722 ceph_assert(!connection->delay_state);
2723 }
2724 exproto->reset_recv_state();
2725 exproto->pre_auth.enabled = false;
2726
2727 if (!reconnecting) {
2728 exproto->client_cookie = client_cookie;
2729 exproto->peer_name = peer_name;
2730 exproto->connection_features = connection_features;
2731 existing->set_features(connection_features);
2732 }
2733 exproto->peer_global_seq = peer_global_seq;
2734
2735 ceph_assert(connection->center->in_thread());
2736 auto temp_cs = std::move(connection->cs);
2737 EventCenter *new_center = connection->center;
2738 Worker *new_worker = connection->worker;
2739 // we can steal the session_stream_handlers under the assumption
2740 // this happens in the event center's thread as there should be
2741 // no user outside its boundaries (simlarly to e.g. outgoing_bl).
2742 auto temp_stream_handlers = std::move(session_stream_handlers);
2743 exproto->auth_meta = auth_meta;
2744
2745 ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2746 << dendl;
2747
2748 // avoid _stop shutdown replacing socket
2749 // queue a reset on the new connection, which we're dumping for the old
2750 stop();
2751
2752 connection->dispatch_queue->queue_reset(connection);
2753
2754 exproto->can_write = false;
2755 exproto->write_in_progress = false;
2756 exproto->reconnecting = reconnecting;
2757 exproto->replacing = true;
2758 existing->state_offset = 0;
2759 // avoid previous thread modify event
2760 exproto->state = NONE;
2761 existing->state = AsyncConnection::STATE_NONE;
2762 // Discard existing prefetch buffer in `recv_buf`
2763 existing->recv_start = existing->recv_end = 0;
2764 // there shouldn't exist any buffer
2765 ceph_assert(connection->recv_start == connection->recv_end);
2766
2767 auto deactivate_existing = std::bind(
2768 [ existing,
2769 new_worker,
2770 new_center,
2771 exproto,
2772 temp_stream_handlers=std::move(temp_stream_handlers)
2773 ](ConnectedSocket &cs) mutable {
2774 // we need to delete time event in original thread
2775 {
2776 std::lock_guard<std::mutex> l(existing->lock);
2777 existing->write_lock.lock();
2778 exproto->requeue_sent();
2779 // XXX: do we really need the locking for `outgoing_bl`? There is
2780 // a comment just above its definition saying "lockfree, only used
2781 // in own thread". I'm following lockfull schema just in the case.
2782 // From performance point of view it should be fine – this happens
2783 // far away from hot paths.
2784 existing->outgoing_bl.clear();
2785 existing->open_write = false;
2786 exproto->session_stream_handlers = std::move(temp_stream_handlers);
2787 existing->write_lock.unlock();
2788 if (exproto->state == NONE) {
2789 existing->shutdown_socket();
2790 existing->cs = std::move(cs);
2791 existing->worker->references--;
2792 new_worker->references++;
2793 existing->logger = new_worker->get_perf_counter();
2794 existing->worker = new_worker;
2795 existing->center = new_center;
2796 if (existing->delay_state)
2797 existing->delay_state->set_center(new_center);
2798 } else if (exproto->state == CLOSED) {
2799 auto back_to_close = std::bind(
2800 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2801 new_center->submit_to(new_center->get_id(),
2802 std::move(back_to_close), true);
2803 return;
2804 } else {
2805 ceph_abort();
2806 }
2807 }
2808
2809 // Before changing existing->center, it may already exists some
2810 // events in existing->center's queue. Then if we mark down
2811 // `existing`, it will execute in another thread and clean up
2812 // connection. Previous event will result in segment fault
2813 auto transfer_existing = [existing, exproto]() mutable {
2814 std::lock_guard<std::mutex> l(existing->lock);
2815 if (exproto->state == CLOSED) return;
2816 ceph_assert(exproto->state == NONE);
2817
2818 exproto->state = SESSION_ACCEPTING;
2819 // we have called shutdown_socket above
2820 ceph_assert(existing->last_tick_id == 0);
2821 // restart timer since we are going to re-build connection
2822 existing->last_connect_started = ceph::coarse_mono_clock::now();
2823 existing->last_tick_id = existing->center->create_time_event(
2824 existing->connect_timeout_us, existing->tick_handler);
2825 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2826 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2827 existing->read_handler);
2828 if (!exproto->reconnecting) {
2829 exproto->run_continuation(exproto->send_server_ident());
2830 } else {
2831 exproto->run_continuation(exproto->send_reconnect_ok());
2832 }
2833 };
2834 if (existing->center->in_thread())
2835 transfer_existing();
2836 else
2837 existing->center->submit_to(existing->center->get_id(),
2838 std::move(transfer_existing), true);
2839 },
2840 std::move(temp_cs));
2841
2842 existing->center->submit_to(existing->center->get_id(),
2843 std::move(deactivate_existing), true);
2844 return nullptr;
2845 }
2846
2847 CtPtr ProtocolV2::send_server_ident() {
2848 ldout(cct, 20) << __func__ << dendl;
2849
2850 // this is required for the case when this connection is being replaced
2851 out_seq = discard_requeued_up_to(out_seq, 0);
2852 in_seq = 0;
2853
2854 if (!connection->policy.lossy) {
2855 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2856 }
2857
2858 uint64_t flags = 0;
2859 if (connection->policy.lossy) {
2860 flags = flags | CEPH_MSG_CONNECT_LOSSY;
2861 }
2862
2863 uint64_t gs = messenger->get_global_seq();
2864 auto server_ident = ServerIdentFrame::Encode(
2865 messenger->get_myaddrs(),
2866 messenger->get_myname().num(),
2867 gs,
2868 connection->policy.features_supported,
2869 connection->policy.features_required | msgr2_required,
2870 flags,
2871 server_cookie);
2872
2873 ldout(cct, 5) << __func__ << " sending identification:"
2874 << " addrs=" << messenger->get_myaddrs()
2875 << " gid=" << messenger->get_myname().num()
2876 << " global_seq=" << gs << " features_supported=" << std::hex
2877 << connection->policy.features_supported
2878 << " features_required="
2879 << (connection->policy.features_required | msgr2_required)
2880 << " flags=" << flags << " cookie=" << std::dec << server_cookie
2881 << dendl;
2882
2883 connection->lock.unlock();
2884 // Because "replacing" will prevent other connections preempt this addr,
2885 // it's safe that here we don't acquire Connection's lock
2886 ssize_t r = messenger->accept_conn(connection);
2887
2888 connection->inject_delay();
2889
2890 connection->lock.lock();
2891
2892 if (r < 0) {
2893 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2894 << connection->peer_addrs->msgr2_addr()
2895 << " just fail later one(this)" << dendl;
2896 connection->inject_delay();
2897 return _fault();
2898 }
2899 if (state != SESSION_ACCEPTING) {
2900 ldout(cct, 1) << __func__
2901 << " state changed while accept_conn, it must be mark_down"
2902 << dendl;
2903 ceph_assert(state == CLOSED || state == NONE);
2904 messenger->unregister_conn(connection);
2905 connection->inject_delay();
2906 return _fault();
2907 }
2908
2909 connection->set_features(connection_features);
2910
2911 // notify
2912 connection->dispatch_queue->queue_accept(connection);
2913 messenger->ms_deliver_handle_fast_accept(connection);
2914
2915 INTERCEPT(12);
2916
2917 return WRITE(server_ident, "server ident", server_ready);
2918 }
2919
2920 CtPtr ProtocolV2::server_ready() {
2921 ldout(cct, 20) << __func__ << dendl;
2922
2923 if (connection->delay_state) {
2924 ceph_assert(connection->delay_state->ready());
2925 }
2926
2927 return ready();
2928 }
2929
2930 CtPtr ProtocolV2::send_reconnect_ok() {
2931 ldout(cct, 20) << __func__ << dendl;
2932
2933 out_seq = discard_requeued_up_to(out_seq, message_seq);
2934
2935 uint64_t ms = in_seq;
2936 auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2937
2938 ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2939
2940 connection->lock.unlock();
2941 // Because "replacing" will prevent other connections preempt this addr,
2942 // it's safe that here we don't acquire Connection's lock
2943 ssize_t r = messenger->accept_conn(connection);
2944
2945 connection->inject_delay();
2946
2947 connection->lock.lock();
2948
2949 if (r < 0) {
2950 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2951 << connection->peer_addrs->msgr2_addr()
2952 << " just fail later one(this)" << dendl;
2953 connection->inject_delay();
2954 return _fault();
2955 }
2956 if (state != SESSION_ACCEPTING) {
2957 ldout(cct, 1) << __func__
2958 << " state changed while accept_conn, it must be mark_down"
2959 << dendl;
2960 ceph_assert(state == CLOSED || state == NONE);
2961 messenger->unregister_conn(connection);
2962 connection->inject_delay();
2963 return _fault();
2964 }
2965
2966 // notify
2967 connection->dispatch_queue->queue_accept(connection);
2968 messenger->ms_deliver_handle_fast_accept(connection);
2969
2970 INTERCEPT(14);
2971
2972 return WRITE(reconnect_ok, "reconnect ok", server_ready);
2973 }