]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/ProtocolV2.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <type_traits>
5
6#include "ProtocolV2.h"
7#include "AsyncMessenger.h"
8
9#include "common/EventTrace.h"
10#include "common/ceph_crypto.h"
11#include "common/errno.h"
12#include "include/random.h"
13#include "auth/AuthClient.h"
14#include "auth/AuthServer.h"
15
16#define dout_subsys ceph_subsys_ms
17#undef dout_prefix
18#define dout_prefix _conn_prefix(_dout)
19ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
20 return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21 << *connection->peer_addrs << " conn(" << connection << " "
22 << this
23 << " " << ceph_con_mode_name(auth_meta->con_mode)
24 << " :" << connection->port
25 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq << " l=" << connection->policy.lossy
27 << " rx=" << session_stream_handlers.rx.get()
28 << " tx=" << session_stream_handlers.tx.get()
29 << ").";
30}
31
32using namespace ceph::msgr::v2;
33
34using CtPtr = Ct<ProtocolV2> *;
35using CtRef = Ct<ProtocolV2> &;
36
37void ProtocolV2::run_continuation(CtPtr pcontinuation) {
38 if (pcontinuation) {
39 run_continuation(*pcontinuation);
40 }
41}
42
43void ProtocolV2::run_continuation(CtRef continuation) {
44 try {
45 CONTINUATION_RUN(continuation)
46 } catch (const buffer::error &e) {
47 lderr(cct) << __func__ << " failed decoding of frame header: " << e
48 << dendl;
49 _fault();
50 } catch (const ceph::crypto::onwire::MsgAuthError &e) {
51 lderr(cct) << __func__ << " " << e.what() << dendl;
52 _fault();
53 } catch (const DecryptionError &) {
54 lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
55 }
56}
57
58#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
59
60#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
61
62#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
63
64#ifdef UNIT_TESTS_BUILT
65
66#define INTERCEPT(S) { \
67if(connection->interceptor) { \
68 auto a = connection->interceptor->intercept(connection, (S)); \
69 if (a == Interceptor::ACTION::FAIL) { \
70 return _fault(); \
71 } else if (a == Interceptor::ACTION::STOP) { \
72 stop(); \
73 connection->dispatch_queue->queue_reset(connection); \
74 return nullptr; \
75 }}}
76
77#else
78#define INTERCEPT(S)
79#endif
80
81ProtocolV2::ProtocolV2(AsyncConnection *connection)
82 : Protocol(2, connection),
83 state(NONE),
84 peer_required_features(0),
85 client_cookie(0),
86 server_cookie(0),
87 global_seq(0),
88 connect_seq(0),
89 peer_global_seq(0),
90 message_seq(0),
91 reconnecting(false),
92 replacing(false),
93 can_write(false),
94 bannerExchangeCallback(nullptr),
95 next_tag(static_cast<Tag>(0)),
96 keepalive(false) {
97}
98
99ProtocolV2::~ProtocolV2() {
100}
101
102void ProtocolV2::connect() {
103 ldout(cct, 1) << __func__ << dendl;
104 state = START_CONNECT;
105 pre_auth.enabled = true;
106}
107
108void ProtocolV2::accept() {
109 ldout(cct, 1) << __func__ << dendl;
110 state = START_ACCEPT;
111}
112
113bool ProtocolV2::is_connected() { return can_write; }
114
115/*
116 * Tears down the message queues, and removes them from the
117 * DispatchQueue Must hold write_lock prior to calling.
118 */
119void ProtocolV2::discard_out_queue() {
120 ldout(cct, 10) << __func__ << " started" << dendl;
121
122 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
123 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
124 (*p)->put();
125 }
126 sent.clear();
127 for (auto& [ prio, entries ] : out_queue) {
128 static_cast<void>(prio);
129 for (auto& entry : entries) {
130 ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
131 entry.m->put();
132 }
133 }
134 out_queue.clear();
135}
136
137void ProtocolV2::reset_session() {
138 ldout(cct, 1) << __func__ << dendl;
139
140 std::lock_guard<std::mutex> l(connection->write_lock);
141 if (connection->delay_state) {
142 connection->delay_state->discard();
143 }
144
145 connection->dispatch_queue->discard_queue(connection->conn_id);
146 discard_out_queue();
147 connection->outcoming_bl.clear();
148
149 connection->dispatch_queue->queue_remote_reset(connection);
150
151 out_seq = 0;
152 in_seq = 0;
153 client_cookie = 0;
154 server_cookie = 0;
155 connect_seq = 0;
156 peer_global_seq = 0;
157 message_seq = 0;
158 ack_left = 0;
159 can_write = false;
160}
161
162void ProtocolV2::stop() {
163 ldout(cct, 1) << __func__ << dendl;
164 if (state == CLOSED) {
165 return;
166 }
167
168 if (connection->delay_state) connection->delay_state->flush();
169
170 std::lock_guard<std::mutex> l(connection->write_lock);
171
172 reset_recv_state();
173 discard_out_queue();
174
175 connection->_stop();
176
177 can_write = false;
178 state = CLOSED;
179}
180
181void ProtocolV2::fault() { _fault(); }
182
183void ProtocolV2::requeue_sent() {
184 if (sent.empty()) {
185 return;
186 }
187
188 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
189 out_seq -= sent.size();
190 while (!sent.empty()) {
191 Message *m = sent.back();
192 sent.pop_back();
193 ldout(cct, 5) << __func__ << " requeueing message m=" << m
194 << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
195 << *m << dendl;
196 rq.emplace_front(out_queue_entry_t{false, m});
197 }
198}
199
200uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
201 ldout(cct, 10) << __func__ << " " << seq << dendl;
202 std::lock_guard<std::mutex> l(connection->write_lock);
203 if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
204 return seq;
205 }
206 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
207 uint64_t count = out_seq;
208 while (!rq.empty()) {
209 Message* const m = rq.front().m;
210 if (m->get_seq() == 0 || m->get_seq() > seq) break;
211 ldout(cct, 5) << __func__ << " discarding message m=" << m
212 << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
213 << *m << dendl;
214 m->put();
215 rq.pop_front();
216 count++;
217 }
218 if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
219 return count;
220}
221
222void ProtocolV2::reset_recv_state() {
223 if ((state >= AUTH_CONNECTING && state <= SESSION_RECONNECTING) ||
224 state == READY) {
225 auth_meta.reset(new AuthConnectionMeta);
226 session_stream_handlers.tx.reset(nullptr);
227 session_stream_handlers.rx.reset(nullptr);
228 pre_auth.txbuf.clear();
229 pre_auth.rxbuf.clear();
230 }
231
232 // clean read and write callbacks
233 connection->pendingReadLen.reset();
234 connection->writeCallback.reset();
235
236 next_tag = static_cast<Tag>(0);
237
238 reset_throttle();
239}
240
241size_t ProtocolV2::get_current_msg_size() const {
242 ceph_assert(!rx_segments_desc.empty());
243 size_t sum = 0;
244 // we don't include SegmentIndex::Msg::HEADER.
245 for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
246 sum += rx_segments_desc[idx].length;
247 }
248 return sum;
249}
250
251void ProtocolV2::reset_throttle() {
252 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
253 connection->policy.throttler_messages) {
254 ldout(cct, 10) << __func__ << " releasing " << 1
255 << " message to policy throttler "
256 << connection->policy.throttler_messages->get_current()
257 << "/" << connection->policy.throttler_messages->get_max()
258 << dendl;
259 connection->policy.throttler_messages->put();
260 }
261 if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
262 if (connection->policy.throttler_bytes) {
263 const size_t cur_msg_size = get_current_msg_size();
264 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
265 << " bytes to policy throttler "
266 << connection->policy.throttler_bytes->get_current() << "/"
267 << connection->policy.throttler_bytes->get_max() << dendl;
268 connection->policy.throttler_bytes->put(cur_msg_size);
269 }
270 }
271 if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
272 const size_t cur_msg_size = get_current_msg_size();
273 ldout(cct, 10)
274 << __func__ << " releasing " << cur_msg_size
275 << " bytes to dispatch_queue throttler "
276 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
277 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
278 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
279 }
280}
281
282CtPtr ProtocolV2::_fault() {
283 ldout(cct, 10) << __func__ << dendl;
284
285 if (state == CLOSED || state == NONE) {
286 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
287 return nullptr;
288 }
289
290 if (connection->policy.lossy &&
291 !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
292 ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
293 stop();
294 connection->dispatch_queue->queue_reset(connection);
295 return nullptr;
296 }
297
298 connection->write_lock.lock();
299
300 can_write = false;
301 // requeue sent items
302 requeue_sent();
303
304 if (out_queue.empty() && state >= START_ACCEPT &&
305 state <= SESSION_ACCEPTING && !replacing) {
306 ldout(cct, 2) << __func__ << " with nothing to send and in the half "
307 << " accept state just closed" << dendl;
308 connection->write_lock.unlock();
309 stop();
310 connection->dispatch_queue->queue_reset(connection);
311 return nullptr;
312 }
313
314 replacing = false;
315 connection->fault();
316 reset_recv_state();
317
318 reconnecting = false;
319
320 if (connection->policy.standby && out_queue.empty() && !keepalive &&
321 state != WAIT) {
322 ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
323 << dendl;
324 state = STANDBY;
325 connection->write_lock.unlock();
326 return nullptr;
327 }
328 if (connection->policy.server) {
329 ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
330 state = STANDBY;
331 connection->write_lock.unlock();
332 return nullptr;
333 }
334
335 connection->write_lock.unlock();
336
337 if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
338 state != WAIT &&
339 state != SESSION_ACCEPTING /* due to connection race */) {
340 // policy maybe empty when state is in accept
341 if (connection->policy.server) {
342 ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
343 state = STANDBY;
344 } else {
345 ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
346 connect_seq++;
347 global_seq = messenger->get_global_seq();
348 state = START_CONNECT;
349 pre_auth.enabled = true;
350 connection->state = AsyncConnection::STATE_CONNECTING;
351 }
352 backoff = utime_t();
353 connection->center->dispatch_event_external(connection->read_handler);
354 } else {
355 if (state == WAIT) {
356 backoff.set_from_double(cct->_conf->ms_max_backoff);
357 } else if (backoff == utime_t()) {
358 backoff.set_from_double(cct->_conf->ms_initial_backoff);
359 } else {
360 backoff += backoff;
361 if (backoff > cct->_conf->ms_max_backoff)
362 backoff.set_from_double(cct->_conf->ms_max_backoff);
363 }
364
365 if (server_cookie) {
366 connect_seq++;
367 }
368
369 global_seq = messenger->get_global_seq();
370 state = START_CONNECT;
371 pre_auth.enabled = true;
372 connection->state = AsyncConnection::STATE_CONNECTING;
373 ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
374 // woke up again;
375 connection->register_time_events.insert(
376 connection->center->create_time_event(backoff.to_nsec() / 1000,
377 connection->wakeup_handler));
378 }
379 return nullptr;
380}
381
382void ProtocolV2::prepare_send_message(uint64_t features,
383 Message *m) {
384 ldout(cct, 20) << __func__ << " m=" << *m << dendl;
385
386 // associate message with Connection (for benefit of encode_payload)
387 if (m->empty_payload()) {
388 ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
389 << " " << *m << dendl;
390 } else {
391 ldout(cct, 20) << __func__ << " half-reencoding features " << features
392 << " " << m << " " << *m << dendl;
393 }
394
395 // encode and copy out of *m
396 m->encode(features, 0);
397}
398
399void ProtocolV2::send_message(Message *m) {
400 uint64_t f = connection->get_features();
401
402 // TODO: Currently not all messages supports reencode like MOSDMap, so here
403 // only let fast dispatch support messages prepare message
404 const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
405 if (can_fast_prepare) {
406 prepare_send_message(f, m);
407 }
408
409 std::lock_guard<std::mutex> l(connection->write_lock);
410 bool is_prepared = can_fast_prepare;
411 // "features" changes will change the payload encoding
412 if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
413 // ensure the correctness of message encoding
414 m->clear_payload();
415 is_prepared = false;
416 ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
417 << " != " << connection->get_features() << dendl;
418 }
419 if (state == CLOSED) {
420 ldout(cct, 10) << __func__ << " connection closed."
421 << " Drop message " << m << dendl;
422 m->put();
423 } else {
424 ldout(cct, 5) << __func__ << " enqueueing message m=" << m
425 << " type=" << m->get_type() << " " << *m << dendl;
426 m->trace.event("async enqueueing message");
427 out_queue[m->get_priority()].emplace_back(
428 out_queue_entry_t{is_prepared, m});
429 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
430 << dendl;
431 if ((!replacing && can_write) || state == STANDBY) {
432 connection->center->dispatch_event_external(connection->write_handler);
433 }
434 }
435}
436
437void ProtocolV2::send_keepalive() {
438 ldout(cct, 10) << __func__ << dendl;
439 std::lock_guard<std::mutex> l(connection->write_lock);
440 if (state != CLOSED) {
441 keepalive = true;
442 connection->center->dispatch_event_external(connection->write_handler);
443 }
444}
445
446void ProtocolV2::read_event() {
447 ldout(cct, 20) << __func__ << dendl;
448
449 switch (state) {
450 case START_CONNECT:
451 run_continuation(CONTINUATION(start_client_banner_exchange));
452 break;
453 case START_ACCEPT:
454 run_continuation(CONTINUATION(start_server_banner_exchange));
455 break;
456 case READY:
457 run_continuation(CONTINUATION(read_frame));
458 break;
459 case THROTTLE_MESSAGE:
460 run_continuation(CONTINUATION(throttle_message));
461 break;
462 case THROTTLE_BYTES:
463 run_continuation(CONTINUATION(throttle_bytes));
464 break;
465 case THROTTLE_DISPATCH_QUEUE:
466 run_continuation(CONTINUATION(throttle_dispatch_queue));
467 break;
468 default:
469 break;
470 }
471}
472
473ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
474 out_queue_entry_t out_entry;
475
476 if (!out_queue.empty()) {
477 auto it = out_queue.rbegin();
478 auto& entries = it->second;
479 ceph_assert(!entries.empty());
480 out_entry = entries.front();
481 entries.pop_front();
482 if (entries.empty()) {
483 out_queue.erase(it->first);
484 }
485 }
486 return out_entry;
487}
488
489ssize_t ProtocolV2::write_message(Message *m, bool more) {
490 FUNCTRACE(cct);
491 ceph_assert(connection->center->in_thread());
492 m->set_seq(++out_seq);
493
494 connection->lock.lock();
495 uint64_t ack_seq = in_seq;
496 ack_left = 0;
497 connection->lock.unlock();
498
499 ceph_msg_header &header = m->get_header();
500 ceph_msg_footer &footer = m->get_footer();
501
502 ceph_msg_header2 header2{header.seq, header.tid,
503 header.type, header.priority,
504 header.version,
505 0, header.data_off,
506 ack_seq,
507 footer.flags, header.compat_version,
508 header.reserved};
509
510 auto message = MessageFrame::Encode(
511 header2,
512 m->get_payload(),
513 m->get_middle(),
514 m->get_data());
515 connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
516
517 ldout(cct, 5) << __func__ << " sending message m=" << m
518 << " seq=" << m->get_seq() << " " << *m << dendl;
519
520 m->trace.event("async writing message");
521 ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
522 << " src=" << entity_name_t(messenger->get_myname())
523 << " off=" << header2.data_off
524 << dendl;
525 ssize_t total_send_size = connection->outcoming_bl.length();
526 ssize_t rc = connection->_try_send(more);
527 if (rc < 0) {
528 ldout(cct, 1) << __func__ << " error sending " << m << ", "
529 << cpp_strerror(rc) << dendl;
530 } else {
531 connection->logger->inc(
532 l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
533 ldout(cct, 10) << __func__ << " sending " << m
534 << (rc ? " continuely." : " done.") << dendl;
535 }
536 if (m->get_type() == CEPH_MSG_OSD_OP)
537 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
538 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
539 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
540 m->put();
541
542 return rc;
543}
544
545void ProtocolV2::append_keepalive() {
546 ldout(cct, 10) << __func__ << dendl;
547 auto keepalive_frame = KeepAliveFrame::Encode();
548 connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
549}
550
551void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
552 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
553 connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
554}
555
556void ProtocolV2::handle_message_ack(uint64_t seq) {
557 if (connection->policy.lossy) { // lossy connections don't keep sent messages
558 return;
559 }
560
561 ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
562
563 // trim sent list
564 static const int max_pending = 128;
565 int i = 0;
566 Message *pending[max_pending];
567 connection->write_lock.lock();
568 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
569 Message *m = sent.front();
570 sent.pop_front();
571 pending[i++] = m;
572 ldout(cct, 10) << __func__ << " got ack seq " << seq
573 << " >= " << m->get_seq() << " on " << m << " " << *m
574 << dendl;
575 }
576 connection->write_lock.unlock();
577 for (int k = 0; k < i; k++) {
578 pending[k]->put();
579 }
580}
581
582void ProtocolV2::write_event() {
583 ldout(cct, 10) << __func__ << dendl;
584 ssize_t r = 0;
585
586 connection->write_lock.lock();
587 if (can_write) {
588 if (keepalive) {
589 append_keepalive();
590 keepalive = false;
591 }
592
593 auto start = ceph::mono_clock::now();
594 bool more;
595 do {
596 const auto out_entry = _get_next_outgoing();
597 if (!out_entry.m) {
598 break;
599 }
600
601 if (!connection->policy.lossy) {
602 // put on sent list
603 sent.push_back(out_entry.m);
604 out_entry.m->get();
605 }
606 more = !out_queue.empty();
607 connection->write_lock.unlock();
608
609 // send_message or requeue messages may not encode message
610 if (!out_entry.is_prepared) {
611 prepare_send_message(connection->get_features(), out_entry.m);
612 }
613
614 r = write_message(out_entry.m, more);
615
616 connection->write_lock.lock();
617 if (r == 0) {
618 ;
619 } else if (r < 0) {
620 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
621 break;
622 } else if (r > 0)
623 break;
624 } while (can_write);
625
626 // if r > 0 mean data still lefted, so no need _try_send.
627 if (r == 0) {
628 uint64_t left = ack_left;
629 if (left) {
630 ceph_le64 s;
631 s = in_seq;
632 auto ack = AckFrame::Encode(in_seq);
633 connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
634 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
635 << " messages" << dendl;
636 ack_left -= left;
637 left = ack_left;
638 r = connection->_try_send(left);
639 } else if (is_queued()) {
640 r = connection->_try_send();
641 }
642 }
643 connection->write_lock.unlock();
644
645 connection->logger->tinc(l_msgr_running_send_time,
646 ceph::mono_clock::now() - start);
647 if (r < 0) {
648 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
649 connection->lock.lock();
650 fault();
651 connection->lock.unlock();
652 return;
653 }
654 } else {
655 connection->write_lock.unlock();
656 connection->lock.lock();
657 connection->write_lock.lock();
658 if (state == STANDBY && !connection->policy.server && is_queued()) {
659 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
660 if (server_cookie) { // only increment connect_seq if there is a session
661 connect_seq++;
662 }
663 connection->_connect();
664 } else if (connection->cs && state != NONE && state != CLOSED &&
665 state != START_CONNECT) {
666 r = connection->_try_send();
667 if (r < 0) {
668 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
669 connection->write_lock.unlock();
670 fault();
671 connection->lock.unlock();
672 return;
673 }
674 }
675 connection->write_lock.unlock();
676 connection->lock.unlock();
677 }
678}
679
680bool ProtocolV2::is_queued() {
681 return !out_queue.empty() || connection->is_queued();
682}
683
684uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
685 if (session_stream_handlers.rx) {
686 return segment_onwire_size(logical_size);
687 } else {
688 return logical_size;
689 }
690}
691
692uint32_t ProtocolV2::get_epilogue_size() const {
693 // In secure mode size of epilogue is flexible and depends on particular
694 // cipher implementation. See the comment for epilogue_secure_block_t or
695 // epilogue_plain_block_t.
696 if (session_stream_handlers.rx) {
697 return FRAME_SECURE_EPILOGUE_SIZE + \
698 session_stream_handlers.rx->get_extra_size_at_final();
699 } else {
700 return FRAME_PLAIN_EPILOGUE_SIZE;
701 }
702}
703
704CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
705 rx_buffer_t &&buffer) {
706 const auto len = buffer->length();
707 const auto buf = buffer->c_str();
708 next.node = std::move(buffer);
709 ssize_t r = connection->read(len, buf,
710 [&next, this](char *buffer, int r) {
711 if (unlikely(pre_auth.enabled) && r >= 0) {
712 pre_auth.rxbuf.append(*next.node);
713 ceph_assert(!cct->_conf->ms_die_on_bug ||
714 pre_auth.rxbuf.length() < 1000000);
715 }
716 next.r = r;
717 run_continuation(next);
718 });
719 if (r <= 0) {
720 // error or done synchronously
721 if (unlikely(pre_auth.enabled) && r >= 0) {
722 pre_auth.rxbuf.append(*next.node);
723 ceph_assert(!cct->_conf->ms_die_on_bug ||
724 pre_auth.rxbuf.length() < 1000000);
725 }
726 next.r = r;
727 return &next;
728 }
729
730 return nullptr;
731}
732
733template <class F>
734CtPtr ProtocolV2::write(const std::string &desc,
735 CONTINUATION_TYPE<ProtocolV2> &next,
736 F &frame) {
737 ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
738 return write(desc, next, bl);
739}
740
741CtPtr ProtocolV2::write(const std::string &desc,
742 CONTINUATION_TYPE<ProtocolV2> &next,
743 bufferlist &buffer) {
744 if (unlikely(pre_auth.enabled)) {
745 pre_auth.txbuf.append(buffer);
746 ceph_assert(!cct->_conf->ms_die_on_bug ||
747 pre_auth.txbuf.length() < 1000000);
748 }
749
750 ssize_t r =
751 connection->write(buffer, [&next, desc, this](int r) {
752 if (r < 0) {
753 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
754 << " (" << cpp_strerror(r) << ")" << dendl;
755 connection->inject_delay();
756 _fault();
757 }
758 run_continuation(next);
759 });
760
761 if (r < 0) {
762 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
763 << " (" << cpp_strerror(r) << ")" << dendl;
764 return _fault();
765 } else if (r == 0) {
766 next.setParams();
767 return &next;
768 }
769
770 return nullptr;
771}
772
773CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
774 ldout(cct, 20) << __func__ << dendl;
775 bannerExchangeCallback = &callback;
776
777 bufferlist banner_payload;
778 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
779 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
780
781 bufferlist bl;
782 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
783 encode((uint16_t)banner_payload.length(), bl, 0);
784 bl.claim_append(banner_payload);
785
786 INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
787
788 return WRITE(bl, "banner", _wait_for_peer_banner);
789}
790
791CtPtr ProtocolV2::_wait_for_peer_banner() {
792 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
793 return READ(banner_len, _handle_peer_banner);
794}
795
796CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
797 ldout(cct, 20) << __func__ << " r=" << r << dendl;
798
799 if (r < 0) {
800 ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
801 << cpp_strerror(r) << ")" << dendl;
802 return _fault();
803 }
804
805 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
806
807 if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
808 if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
809 lderr(cct) << __func__ << " peer " << *connection->peer_addrs
810 << " is using msgr V1 protocol" << dendl;
811 return _fault();
812 }
813 ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
814 return _fault();
815 }
816
817 uint16_t payload_len;
818 bufferlist bl;
819 buffer->set_offset(banner_prefix_len);
820 buffer->set_length(sizeof(__le16));
821 bl.push_back(std::move(buffer));
822 auto ti = bl.cbegin();
823 try {
824 decode(payload_len, ti);
825 } catch (const buffer::error &e) {
826 lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
827 return _fault();
828 }
829
830 INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
831
832 return READ(payload_len, _handle_peer_banner_payload);
833}
834
835CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
836 ldout(cct, 20) << __func__ << " r=" << r << dendl;
837
838 if (r < 0) {
839 ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
840 << " (" << cpp_strerror(r) << ")" << dendl;
841 return _fault();
842 }
843
844 uint64_t peer_supported_features;
845 uint64_t peer_required_features;
846
847 bufferlist bl;
848 bl.push_back(std::move(buffer));
849 auto ti = bl.cbegin();
850 try {
851 decode(peer_supported_features, ti);
852 decode(peer_required_features, ti);
853 } catch (const buffer::error &e) {
854 lderr(cct) << __func__ << " decode banner payload failed " << dendl;
855 return _fault();
856 }
857
858 ldout(cct, 1) << __func__ << " supported=" << std::hex
859 << peer_supported_features << " required=" << std::hex
860 << peer_required_features << std::dec << dendl;
861
862 // Check feature bit compatibility
863
864 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
865 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
866
867 if ((required_features & peer_supported_features) != required_features) {
868 ldout(cct, 1) << __func__ << " peer does not support all required features"
869 << " required=" << std::hex << required_features
870 << " supported=" << std::hex << peer_supported_features
871 << std::dec << dendl;
872 stop();
873 connection->dispatch_queue->queue_reset(connection);
874 return nullptr;
875 }
876 if ((supported_features & peer_required_features) != peer_required_features) {
877 ldout(cct, 1) << __func__ << " we do not support all peer required features"
878 << " required=" << std::hex << peer_required_features
879 << " supported=" << supported_features << std::dec << dendl;
880 stop();
881 connection->dispatch_queue->queue_reset(connection);
882 return nullptr;
883 }
884
885 this->peer_required_features = peer_required_features;
886 if (this->peer_required_features == 0) {
887 this->connection_features = msgr2_required;
888 }
889
890 // at this point we can change how the client protocol behaves based on
891 // this->peer_required_features
892
893 if (state == BANNER_CONNECTING) {
894 state = HELLO_CONNECTING;
895 }
896 else {
897 ceph_assert(state == BANNER_ACCEPTING);
898 state = HELLO_ACCEPTING;
899 }
900
901 auto hello = HelloFrame::Encode(messenger->get_mytype(),
902 connection->target_addr);
903
904 INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
905
906 return WRITE(hello, "hello frame", read_frame);
907}
908
909CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
910{
911 ldout(cct, 20) << __func__
912 << " payload.length()=" << payload.length() << dendl;
913
914 if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
915 lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
916 return _fault();
917 }
918
919 auto hello = HelloFrame::Decode(payload);
920
921 ldout(cct, 5) << __func__ << " received hello:"
922 << " peer_type=" << (int)hello.entity_type()
923 << " peer_addr_for_me=" << hello.peer_addr() << dendl;
924
925 if (connection->get_peer_type() == -1) {
926 connection->set_peer_type(hello.entity_type());
927
928 ceph_assert(state == HELLO_ACCEPTING);
929 connection->policy = messenger->get_policy(hello.entity_type());
930 ldout(cct, 10) << __func__ << " accept of host_type "
931 << (int)hello.entity_type()
932 << ", policy.lossy=" << connection->policy.lossy
933 << " policy.server=" << connection->policy.server
934 << " policy.standby=" << connection->policy.standby
935 << " policy.resetcheck=" << connection->policy.resetcheck
936 << dendl;
937 } else {
938 ceph_assert(state == HELLO_CONNECTING);
939 if (connection->get_peer_type() != hello.entity_type()) {
940 ldout(cct, 1) << __func__ << " connection peer type does not match what"
941 << " peer advertises " << connection->get_peer_type()
942 << " != " << (int)hello.entity_type() << dendl;
943 stop();
944 connection->dispatch_queue->queue_reset(connection);
945 return nullptr;
946 }
947 }
948
949 CtPtr callback;
950 callback = bannerExchangeCallback;
951 bannerExchangeCallback = nullptr;
952 ceph_assert(callback);
953 return callback;
954}
955
956CtPtr ProtocolV2::read_frame() {
957 if (state == CLOSED) {
958 return nullptr;
959 }
960
961 ldout(cct, 20) << __func__ << dendl;
962 return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
963}
964
965CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
966 ldout(cct, 20) << __func__ << " r=" << r << dendl;
967
968 if (r < 0) {
969 ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
970 << " (" << cpp_strerror(r) << ")" << dendl;
971 return _fault();
972 }
973
974 ceph::bufferlist preamble;
975 preamble.push_back(std::move(buffer));
976
977 ldout(cct, 30) << __func__ << " preamble\n";
978 preamble.hexdump(*_dout);
979 *_dout << dendl;
980
981 if (session_stream_handlers.rx) {
982 ceph_assert(session_stream_handlers.rx);
983
984 session_stream_handlers.rx->reset_rx_handler();
985 preamble = session_stream_handlers.rx->authenticated_decrypt_update(
986 std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
987
988 ldout(cct, 10) << __func__ << " got encrypted preamble."
989 << " after decrypt premable.length()=" << preamble.length()
990 << dendl;
991
992 ldout(cct, 30) << __func__ << " preamble after decrypt\n";
993 preamble.hexdump(*_dout);
994 *_dout << dendl;
995 }
996
997 {
998 // I expect ceph_le32 will make the endian conversion for me. Passing
999 // everything through ::Decode is unnecessary.
1000 const auto& main_preamble = \
1001 reinterpret_cast<preamble_block_t&>(*preamble.c_str());
1002
1003 // verify preamble's CRC before any further processing
1004 const auto rx_crc = ceph_crc32c(0,
1005 reinterpret_cast<const unsigned char*>(&main_preamble),
1006 sizeof(main_preamble) - sizeof(main_preamble.crc));
1007 if (rx_crc != main_preamble.crc) {
1008 ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
1009 << " rx_crc=" << rx_crc
1010 << " tx_crc=" << main_preamble.crc << dendl;
1011 return _fault();
1012 }
1013
1014 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1015 if (main_preamble.num_segments < 1 ||
1016 main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1017 ldout(cct, 10) << __func__ << " unsupported num_segments="
1018 << " tx_crc=" << main_preamble.num_segments << dendl;
1019 return _fault();
1020 }
1021
1022 next_tag = static_cast<Tag>(main_preamble.tag);
1023
1024 rx_segments_desc.clear();
1025 rx_segments_data.clear();
1026
1027 if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1028 ldout(cct, 30) << __func__
1029 << " num_segments=" << main_preamble.num_segments
1030 << " is too much" << dendl;
1031 return _fault();
1032 }
1033 for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
1034 ldout(cct, 10) << __func__ << " got new segment:"
1035 << " len=" << main_preamble.segments[idx].length
1036 << " align=" << main_preamble.segments[idx].alignment
1037 << dendl;
1038 rx_segments_desc.emplace_back(main_preamble.segments[idx]);
1039 }
1040 }
1041
1042 // does it need throttle?
1043 if (next_tag == Tag::MESSAGE) {
1044 if (state != READY) {
1045 lderr(cct) << __func__ << " not in ready state!" << dendl;
1046 return _fault();
1047 }
1048 state = THROTTLE_MESSAGE;
1049 return CONTINUE(throttle_message);
1050 } else {
1051 return read_frame_segment();
1052 }
1053}
1054
1055CtPtr ProtocolV2::handle_read_frame_dispatch() {
1056 ldout(cct, 10) << __func__
1057 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1058
1059 switch (next_tag) {
1060 case Tag::HELLO:
1061 case Tag::AUTH_REQUEST:
1062 case Tag::AUTH_BAD_METHOD:
1063 case Tag::AUTH_REPLY_MORE:
1064 case Tag::AUTH_REQUEST_MORE:
1065 case Tag::AUTH_DONE:
1066 case Tag::AUTH_SIGNATURE:
1067 case Tag::CLIENT_IDENT:
1068 case Tag::SERVER_IDENT:
1069 case Tag::IDENT_MISSING_FEATURES:
1070 case Tag::SESSION_RECONNECT:
1071 case Tag::SESSION_RESET:
1072 case Tag::SESSION_RETRY:
1073 case Tag::SESSION_RETRY_GLOBAL:
1074 case Tag::SESSION_RECONNECT_OK:
1075 case Tag::KEEPALIVE2:
1076 case Tag::KEEPALIVE2_ACK:
1077 case Tag::ACK:
1078 case Tag::WAIT:
1079 return handle_frame_payload();
1080 case Tag::MESSAGE:
1081 return handle_message();
1082 default: {
1083 lderr(cct) << __func__
1084 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1085 << dendl;
1086 return _fault();
1087 }
1088 }
1089
1090 return nullptr;
1091}
1092
1093CtPtr ProtocolV2::read_frame_segment() {
1094 ldout(cct, 20) << __func__ << dendl;
1095 ceph_assert(!rx_segments_desc.empty());
1096
1097 // description of current segment to read
1098 const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
1099 rx_buffer_t rx_buffer;
1100 try {
1101 rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
1102 get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
1103 } catch (std::bad_alloc&) {
1104 // Catching because of potential issues with satisfying alignment.
1105 ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
1106 << " len=" << get_onwire_size(cur_rx_desc.length)
1107 << " align=" << cur_rx_desc.alignment
1108 << dendl;
1109 return _fault();
1110 }
1111
1112 return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1113}
1114
1115CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1116 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1117
1118 if (r < 0) {
1119 ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1120 << cpp_strerror(r) << ")" << dendl;
1121 return _fault();
1122 }
1123
1124 rx_segments_data.emplace_back();
1125 rx_segments_data.back().push_back(std::move(rx_buffer));
1126
1127 // decrypt incoming data
1128 // FIXME: if (auth_meta->is_mode_secure()) {
1129 if (session_stream_handlers.rx) {
1130 ceph_assert(session_stream_handlers.rx);
1131
1132 auto& new_seg = rx_segments_data.back();
1133 if (new_seg.length()) {
1134 auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
1135 std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
1136 const auto idx = rx_segments_data.size() - 1;
1137 new_seg.clear();
1138 padded.splice(0, rx_segments_desc[idx].length, &new_seg);
1139
1140 ldout(cct, 20) << __func__
1141 << " unpadded new_seg.length()=" << new_seg.length()
1142 << dendl;
1143 }
1144 }
1145
1146 if (rx_segments_desc.size() == rx_segments_data.size()) {
1147 // OK, all segments planned to read are read. Can go with epilogue.
1148 return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
1149 } else {
1150 // TODO: for makeshift only. This will be more generic and throttled
1151 return read_frame_segment();
1152 }
1153}
1154
1155CtPtr ProtocolV2::handle_frame_payload() {
1156 ceph_assert(!rx_segments_data.empty());
1157 auto& payload = rx_segments_data.back();
1158
1159 ldout(cct, 30) << __func__ << "\n";
1160 payload.hexdump(*_dout);
1161 *_dout << dendl;
1162
1163 switch (next_tag) {
1164 case Tag::HELLO:
1165 return handle_hello(payload);
1166 case Tag::AUTH_REQUEST:
1167 return handle_auth_request(payload);
1168 case Tag::AUTH_BAD_METHOD:
1169 return handle_auth_bad_method(payload);
1170 case Tag::AUTH_REPLY_MORE:
1171 return handle_auth_reply_more(payload);
1172 case Tag::AUTH_REQUEST_MORE:
1173 return handle_auth_request_more(payload);
1174 case Tag::AUTH_DONE:
1175 return handle_auth_done(payload);
1176 case Tag::AUTH_SIGNATURE:
1177 return handle_auth_signature(payload);
1178 case Tag::CLIENT_IDENT:
1179 return handle_client_ident(payload);
1180 case Tag::SERVER_IDENT:
1181 return handle_server_ident(payload);
1182 case Tag::IDENT_MISSING_FEATURES:
1183 return handle_ident_missing_features(payload);
1184 case Tag::SESSION_RECONNECT:
1185 return handle_reconnect(payload);
1186 case Tag::SESSION_RESET:
1187 return handle_session_reset(payload);
1188 case Tag::SESSION_RETRY:
1189 return handle_session_retry(payload);
1190 case Tag::SESSION_RETRY_GLOBAL:
1191 return handle_session_retry_global(payload);
1192 case Tag::SESSION_RECONNECT_OK:
1193 return handle_reconnect_ok(payload);
1194 case Tag::KEEPALIVE2:
1195 return handle_keepalive2(payload);
1196 case Tag::KEEPALIVE2_ACK:
1197 return handle_keepalive2_ack(payload);
1198 case Tag::ACK:
1199 return handle_message_ack(payload);
1200 case Tag::WAIT:
1201 return handle_wait(payload);
1202 default:
1203 ceph_abort();
1204 }
1205 return nullptr;
1206}
1207
1208CtPtr ProtocolV2::ready() {
1209 ldout(cct, 25) << __func__ << dendl;
1210
1211 reconnecting = false;
1212 replacing = false;
1213
1214 // make sure no pending tick timer
1215 if (connection->last_tick_id) {
1216 connection->center->delete_time_event(connection->last_tick_id);
1217 }
1218 connection->last_tick_id = connection->center->create_time_event(
1219 connection->inactive_timeout_us, connection->tick_handler);
1220
1221 {
1222 std::lock_guard<std::mutex> l(connection->write_lock);
1223 can_write = true;
1224 if (!out_queue.empty()) {
1225 connection->center->dispatch_event_external(connection->write_handler);
1226 }
1227 }
1228
1229 connection->maybe_start_delay_thread();
1230
1231 state = READY;
1232 ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1233 << std::hex << client_cookie << " server_cookie="
1234 << server_cookie << std::dec << " in_seq=" << in_seq
1235 << " out_seq=" << out_seq << dendl;
1236
1237 INTERCEPT(15);
1238
1239 return CONTINUE(read_frame);
1240}
1241
1242CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1243{
1244 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1245
1246 if (r < 0) {
1247 ldout(cct, 1) << __func__ << " read data error " << dendl;
1248 return _fault();
1249 }
1250
1251 __u8 late_flags;
1252
1253 // FIXME: if (auth_meta->is_mode_secure()) {
1254 if (session_stream_handlers.rx) {
1255 ldout(cct, 1) << __func__ << " read frame epilogue bytes="
1256 << get_epilogue_size() << dendl;
1257
1258 // decrypt epilogue and authenticate entire frame.
1259 ceph::bufferlist epilogue_bl;
1260 {
1261 epilogue_bl.push_back(std::move(buffer));
1262 try {
1263 epilogue_bl =
1264 session_stream_handlers.rx->authenticated_decrypt_update_final(
1265 std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
1266 } catch (ceph::crypto::onwire::MsgAuthError &e) {
1267 ldout(cct, 5) << __func__ << " message authentication failed: "
1268 << e.what() << dendl;
1269 return _fault();
1270 }
1271 }
1272 auto& epilogue =
1273 reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
1274 late_flags = epilogue.late_flags;
1275 } else {
1276 auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
1277
1278 for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
1279 const __u32 expected_crc = epilogue.crc_values[idx];
1280 const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
1281 if (expected_crc != calculated_crc) {
1282 ldout(cct, 5) << __func__ << " message integrity check failed: "
1283 << " expected_crc=" << expected_crc
1284 << " calculated_crc=" << calculated_crc
1285 << dendl;
1286 return _fault();
1287 } else {
1288 ldout(cct, 20) << __func__ << " message integrity check success: "
1289 << " expected_crc=" << expected_crc
1290 << " calculated_crc=" << calculated_crc
1291 << dendl;
1292 }
1293 }
1294 late_flags = epilogue.late_flags;
1295 }
1296
1297 // we do have a mechanism that allows transmitter to start sending message
1298 // and abort after putting entire data field on wire. This will be used by
1299 // the kernel client to avoid unnecessary buffering.
1300 if (late_flags & FRAME_FLAGS_LATEABRT) {
1301 reset_throttle();
1302 state = READY;
1303 return CONTINUE(read_frame);
1304 } else {
1305 return handle_read_frame_dispatch();
1306 }
1307}
1308
1309CtPtr ProtocolV2::handle_message() {
1310 ldout(cct, 20) << __func__ << dendl;
1311 ceph_assert(state == THROTTLE_DONE);
1312
1313#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1314 ltt_recv_stamp = ceph_clock_now();
1315#endif
1316 recv_stamp = ceph_clock_now();
1317
1318 // we need to get the size before std::moving segments data
1319 const size_t cur_msg_size = get_current_msg_size();
1320 auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
1321
1322 // XXX: paranoid copy just to avoid oops
1323 ceph_msg_header2 current_header = msg_frame.header();
1324
1325 ldout(cct, 5) << __func__
1326 << " got " << msg_frame.front_len()
1327 << " + " << msg_frame.middle_len()
1328 << " + " << msg_frame.data_len()
1329 << " byte message."
1330 << " envelope type=" << current_header.type
1331 << " src " << peer_name
1332 << " off " << current_header.data_off
1333 << dendl;
1334
1335 INTERCEPT(16);
1336 ceph_msg_header header{current_header.seq,
1337 current_header.tid,
1338 current_header.type,
1339 current_header.priority,
1340 current_header.version,
1341 msg_frame.front_len(),
1342 msg_frame.middle_len(),
1343 msg_frame.data_len(),
1344 current_header.data_off,
1345 peer_name,
1346 current_header.compat_version,
1347 current_header.reserved,
1348 0};
1349 ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
1350
1351 Message *message = decode_message(cct, 0, header, footer,
1352 msg_frame.front(),
1353 msg_frame.middle(),
1354 msg_frame.data(),
1355 connection);
1356 if (!message) {
1357 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1358 return _fault();
1359 } else {
1360 state = READ_MESSAGE_COMPLETE;
1361 }
1362
1363 INTERCEPT(17);
1364
1365 message->set_byte_throttler(connection->policy.throttler_bytes);
1366 message->set_message_throttler(connection->policy.throttler_messages);
1367
1368 // store reservation size in message, so we don't get confused
1369 // by messages entering the dispatch queue through other paths.
1370 message->set_dispatch_throttle_size(cur_msg_size);
1371
1372 message->set_recv_stamp(recv_stamp);
1373 message->set_throttle_stamp(throttle_stamp);
1374 message->set_recv_complete_stamp(ceph_clock_now());
1375
1376 // check received seq#. if it is old, drop the message.
1377 // note that incoming messages may skip ahead. this is convenient for the
1378 // client side queueing because messages can't be renumbered, but the (kernel)
1379 // client will occasionally pull a message out of the sent queue to send
1380 // elsewhere. in that case it doesn't matter if we "got" it or not.
1381 uint64_t cur_seq = in_seq;
1382 if (message->get_seq() <= cur_seq) {
1383 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1384 << " <= " << cur_seq << " " << message << " " << *message
1385 << ", discarding" << dendl;
1386 message->put();
1387 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1388 cct->_conf->ms_die_on_old_message) {
1389 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1390 }
1391 return nullptr;
1392 }
1393 if (message->get_seq() > cur_seq + 1) {
1394 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
1395 << cur_seq << " to " << message->get_seq() << dendl;
1396 if (cct->_conf->ms_die_on_skipped_message) {
1397 ceph_assert(0 == "skipped incoming seq");
1398 }
1399 }
1400
1401 message->set_connection(connection);
1402
1403#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1404 if (message->get_type() == CEPH_MSG_OSD_OP ||
1405 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1406 utime_t ltt_processed_stamp = ceph_clock_now();
1407 double usecs_elapsed =
1408 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
1409 ostringstream buf;
1410 if (message->get_type() == CEPH_MSG_OSD_OP)
1411 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1412 false);
1413 else
1414 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1415 false);
1416 }
1417#endif
1418
1419 // note last received message.
1420 in_seq = message->get_seq();
1421 ldout(cct, 5) << __func__ << " received message m=" << message
1422 << " seq=" << message->get_seq()
1423 << " from=" << message->get_source() << " type=" << header.type
1424 << " " << *message << dendl;
1425
1426 bool need_dispatch_writer = false;
1427 if (!connection->policy.lossy) {
1428 ack_left++;
1429 need_dispatch_writer = true;
1430 }
1431
1432 state = READY;
1433
1434 connection->logger->inc(l_msgr_recv_messages);
1435 connection->logger->inc(
1436 l_msgr_recv_bytes,
1437 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1438
1439 messenger->ms_fast_preprocess(message);
1440 auto fast_dispatch_time = ceph::mono_clock::now();
1441 connection->logger->tinc(l_msgr_running_recv_time,
1442 fast_dispatch_time - connection->recv_start_time);
1443 if (connection->delay_state) {
1444 double delay_period = 0;
1445 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1446 delay_period =
1447 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1448 ldout(cct, 1) << "queue_received will delay after "
1449 << (ceph_clock_now() + delay_period) << " on " << message
1450 << " " << *message << dendl;
1451 }
1452 connection->delay_state->queue(delay_period, message);
1453 } else if (messenger->ms_can_fast_dispatch(message)) {
1454 connection->lock.unlock();
1455 connection->dispatch_queue->fast_dispatch(message);
1456 connection->recv_start_time = ceph::mono_clock::now();
1457 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1458 connection->recv_start_time - fast_dispatch_time);
1459 connection->lock.lock();
1460 } else {
1461 connection->dispatch_queue->enqueue(message, message->get_priority(),
1462 connection->conn_id);
1463 }
1464
1465 handle_message_ack(current_header.ack_seq);
1466
1467 // we might have been reused by another connection
1468 // let's check if that is the case
1469 if (state != READY) {
1470 // yes, that was the case, let's do nothing
1471 return nullptr;
1472 }
1473
1474 if (need_dispatch_writer && connection->is_connected()) {
1475 connection->center->dispatch_event_external(connection->write_handler);
1476 }
1477
1478 return CONTINUE(read_frame);
1479}
1480
1481
1482CtPtr ProtocolV2::throttle_message() {
1483 ldout(cct, 20) << __func__ << dendl;
1484
1485 if (connection->policy.throttler_messages) {
1486 ldout(cct, 10) << __func__ << " wants " << 1
1487 << " message from policy throttler "
1488 << connection->policy.throttler_messages->get_current()
1489 << "/" << connection->policy.throttler_messages->get_max()
1490 << dendl;
1491 if (!connection->policy.throttler_messages->get_or_fail()) {
1492 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
1493 << connection->policy.throttler_messages->get_current()
1494 << "/" << connection->policy.throttler_messages->get_max()
1495 << " failed, just wait." << dendl;
1496 // following thread pool deal with th full message queue isn't a
1497 // short time, so we can wait a ms.
1498 if (connection->register_time_events.empty()) {
1499 connection->register_time_events.insert(
1500 connection->center->create_time_event(1000,
1501 connection->wakeup_handler));
1502 }
1503 return nullptr;
1504 }
1505 }
1506
1507 state = THROTTLE_BYTES;
1508 return CONTINUE(throttle_bytes);
1509}
1510
1511CtPtr ProtocolV2::throttle_bytes() {
1512 ldout(cct, 20) << __func__ << dendl;
1513
1514 const size_t cur_msg_size = get_current_msg_size();
1515 if (cur_msg_size) {
1516 if (connection->policy.throttler_bytes) {
1517 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1518 << " bytes from policy throttler "
1519 << connection->policy.throttler_bytes->get_current() << "/"
1520 << connection->policy.throttler_bytes->get_max() << dendl;
1521 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
1522 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1523 << " bytes from policy throttler "
1524 << connection->policy.throttler_bytes->get_current()
1525 << "/" << connection->policy.throttler_bytes->get_max()
1526 << " failed, just wait." << dendl;
1527 // following thread pool deal with th full message queue isn't a
1528 // short time, so we can wait a ms.
1529 if (connection->register_time_events.empty()) {
1530 connection->register_time_events.insert(
1531 connection->center->create_time_event(
1532 1000, connection->wakeup_handler));
1533 }
1534 return nullptr;
1535 }
1536 }
1537 }
1538
1539 state = THROTTLE_DISPATCH_QUEUE;
1540 return CONTINUE(throttle_dispatch_queue);
1541}
1542
1543CtPtr ProtocolV2::throttle_dispatch_queue() {
1544 ldout(cct, 20) << __func__ << dendl;
1545
1546 const size_t cur_msg_size = get_current_msg_size();
1547 if (cur_msg_size) {
1548 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1549 cur_msg_size)) {
1550 ldout(cct, 10)
1551 << __func__ << " wants " << cur_msg_size
1552 << " bytes from dispatch throttle "
1553 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1554 << connection->dispatch_queue->dispatch_throttler.get_max()
1555 << " failed, just wait." << dendl;
1556 // following thread pool deal with th full message queue isn't a
1557 // short time, so we can wait a ms.
1558 if (connection->register_time_events.empty()) {
1559 connection->register_time_events.insert(
1560 connection->center->create_time_event(1000,
1561 connection->wakeup_handler));
1562 }
1563 return nullptr;
1564 }
1565 }
1566
1567 throttle_stamp = ceph_clock_now();
1568 state = THROTTLE_DONE;
1569
1570 return read_frame_segment();
1571}
1572
1573CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1574{
1575 ldout(cct, 20) << __func__
1576 << " payload.length()=" << payload.length() << dendl;
1577
1578 if (state != READY) {
1579 lderr(cct) << __func__ << " not in ready state!" << dendl;
1580 return _fault();
1581 }
1582
1583 auto keepalive_frame = KeepAliveFrame::Decode(payload);
1584
1585 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1586
1587 connection->write_lock.lock();
1588 append_keepalive_ack(keepalive_frame.timestamp());
1589 connection->write_lock.unlock();
1590
1591 ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1592 << keepalive_frame.timestamp() << dendl;
1593 connection->set_last_keepalive(ceph_clock_now());
1594
1595 if (is_connected()) {
1596 connection->center->dispatch_event_external(connection->write_handler);
1597 }
1598
1599 return CONTINUE(read_frame);
1600}
1601
1602CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1603{
1604 ldout(cct, 20) << __func__
1605 << " payload.length()=" << payload.length() << dendl;
1606
1607 if (state != READY) {
1608 lderr(cct) << __func__ << " not in ready state!" << dendl;
1609 return _fault();
1610 }
1611
1612 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1613 connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1614 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1615
1616 return CONTINUE(read_frame);
1617}
1618
1619CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1620{
1621 ldout(cct, 20) << __func__
1622 << " payload.length()=" << payload.length() << dendl;
1623
1624 if (state != READY) {
1625 lderr(cct) << __func__ << " not in ready state!" << dendl;
1626 return _fault();
1627 }
1628
1629 auto ack = AckFrame::Decode(payload);
1630 handle_message_ack(ack.seq());
1631 return CONTINUE(read_frame);
1632}
1633
1634/* Client Protocol Methods */
1635
1636CtPtr ProtocolV2::start_client_banner_exchange() {
1637 ldout(cct, 20) << __func__ << dendl;
1638
1639 INTERCEPT(1);
1640
1641 state = BANNER_CONNECTING;
1642
1643 global_seq = messenger->get_global_seq();
1644
1645 return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1646}
1647
1648CtPtr ProtocolV2::post_client_banner_exchange() {
1649 ldout(cct, 20) << __func__ << dendl;
1650
1651 state = AUTH_CONNECTING;
1652
1653 return send_auth_request();
1654}
1655
1656CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
1657 ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1658 << " auth_client " << messenger->auth_client << dendl;
1659 ceph_assert(messenger->auth_client);
1660
1661 bufferlist bl;
1662 vector<uint32_t> preferred_modes;
1663 auto am = auth_meta;
1664 connection->lock.unlock();
1665 int r = messenger->auth_client->get_auth_request(
1666 connection, am.get(),
1667 &am->auth_method, &preferred_modes, &bl);
1668 connection->lock.lock();
1669 if (state != AUTH_CONNECTING) {
1670 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1671 return _fault();
1672 }
1673 if (r < 0) {
1674 ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1675 << dendl;
1676 stop();
1677 connection->dispatch_queue->queue_reset(connection);
1678 return nullptr;
1679 }
1680
1681 INTERCEPT(9);
1682
1683 auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1684 bl);
1685 return WRITE(frame, "auth request", read_frame);
1686}
1687
1688CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1689 ldout(cct, 20) << __func__
1690 << " payload.length()=" << payload.length() << dendl;
1691
1692 if (state != AUTH_CONNECTING) {
1693 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1694 return _fault();
1695 }
1696
1697 auto bad_method = AuthBadMethodFrame::Decode(payload);
1698 ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1699 << " result " << cpp_strerror(bad_method.result())
1700 << ", allowed methods=" << bad_method.allowed_methods()
1701 << ", allowed modes=" << bad_method.allowed_modes()
1702 << dendl;
1703 ceph_assert(messenger->auth_client);
1704 auto am = auth_meta;
1705 connection->lock.unlock();
1706 int r = messenger->auth_client->handle_auth_bad_method(
1707 connection,
1708 am.get(),
1709 bad_method.method(), bad_method.result(),
1710 bad_method.allowed_methods(),
1711 bad_method.allowed_modes());
1712 connection->lock.lock();
1713 if (state != AUTH_CONNECTING || r < 0) {
1714 return _fault();
1715 }
1716 return send_auth_request(bad_method.allowed_methods());
1717}
1718
1719CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1720{
1721 ldout(cct, 20) << __func__
1722 << " payload.length()=" << payload.length() << dendl;
1723
1724 if (state != AUTH_CONNECTING) {
1725 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1726 return _fault();
1727 }
1728
1729 auto auth_more = AuthReplyMoreFrame::Decode(payload);
1730 ldout(cct, 5) << __func__
1731 << " auth reply more len=" << auth_more.auth_payload().length()
1732 << dendl;
1733 ceph_assert(messenger->auth_client);
1734 ceph::bufferlist reply;
1735 auto am = auth_meta;
1736 connection->lock.unlock();
1737 int r = messenger->auth_client->handle_auth_reply_more(
1738 connection, am.get(), auth_more.auth_payload(), &reply);
1739 connection->lock.lock();
1740 if (state != AUTH_CONNECTING) {
1741 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1742 return _fault();
1743 }
1744 if (r < 0) {
1745 lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1746 << r << dendl;
1747 return _fault();
1748 }
1749 auto more_reply = AuthRequestMoreFrame::Encode(reply);
1750 return WRITE(more_reply, "auth request more", read_frame);
1751}
1752
1753CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1754{
1755 ldout(cct, 20) << __func__
1756 << " payload.length()=" << payload.length() << dendl;
1757
1758 if (state != AUTH_CONNECTING) {
1759 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1760 return _fault();
1761 }
1762
1763 auto auth_done = AuthDoneFrame::Decode(payload);
1764
1765 ceph_assert(messenger->auth_client);
1766 auto am = auth_meta;
1767 connection->lock.unlock();
1768 int r = messenger->auth_client->handle_auth_done(
1769 connection,
1770 am.get(),
1771 auth_done.global_id(),
1772 auth_done.con_mode(),
1773 auth_done.auth_payload(),
1774 &am->session_key,
1775 &am->connection_secret);
1776 connection->lock.lock();
1777 if (state != AUTH_CONNECTING) {
1778 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1779 return _fault();
1780 }
1781 if (r < 0) {
1782 return _fault();
1783 }
1784 auth_meta->con_mode = auth_done.con_mode();
1785 session_stream_handlers = \
1786 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
1787
1788 state = AUTH_CONNECTING_SIGN;
1789
1790 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1791 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1792 auto sig_frame = AuthSignatureFrame::Encode(sig);
1793 pre_auth.enabled = false;
1794 pre_auth.rxbuf.clear();
1795 return WRITE(sig_frame, "auth signature", read_frame);
1796}
1797
1798CtPtr ProtocolV2::finish_client_auth() {
1799 if (!server_cookie) {
1800 ceph_assert(connect_seq == 0);
1801 state = SESSION_CONNECTING;
1802 return send_client_ident();
1803 } else { // reconnecting to previous session
1804 state = SESSION_RECONNECTING;
1805 ceph_assert(connect_seq > 0);
1806 return send_reconnect();
1807 }
1808}
1809
1810CtPtr ProtocolV2::send_client_ident() {
1811 ldout(cct, 20) << __func__ << dendl;
1812
1813 if (!connection->policy.lossy && !client_cookie) {
1814 client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1815 }
1816
1817 uint64_t flags = 0;
1818 if (connection->policy.lossy) {
1819 flags |= CEPH_MSG_CONNECT_LOSSY;
1820 }
1821
1822 if (messenger->get_myaddrs().empty() ||
1823 messenger->get_myaddrs().front().is_blank_ip()) {
1824 sockaddr_storage ss;
1825 socklen_t len = sizeof(ss);
1826 int r = getsockname(connection->cs.socket_fd(), (sockaddr *)&ss, &len);
1827 ceph_assert(r == 0);
1828 ldout(cct, 1) << __func__ << " getsockname reveals I am " << (sockaddr *)&ss
1829 << " when talking to " << connection->target_addr << dendl;
1830 entity_addr_t a;
1831 a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
1832 a.set_sockaddr((sockaddr *)&ss);
1833 a.set_port(0);
1834 connection->lock.unlock();
1835 messenger->learned_addr(a);
1836 if (cct->_conf->ms_inject_internal_delays &&
1837 cct->_conf->ms_inject_socket_failures) {
1838 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1839 ldout(cct, 10) << __func__ << " sleep for "
1840 << cct->_conf->ms_inject_internal_delays << dendl;
1841 utime_t t;
1842 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1843 t.sleep();
1844 }
1845 }
1846 connection->lock.lock();
1847 if (state != SESSION_CONNECTING) {
1848 ldout(cct, 1) << __func__
1849 << " state changed while learned_addr, mark_down or "
1850 << " replacing must be happened just now" << dendl;
1851 return nullptr;
1852 }
1853 }
1854
1855 auto client_ident = ClientIdentFrame::Encode(
1856 messenger->get_myaddrs(),
1857 connection->target_addr,
1858 messenger->get_myname().num(),
1859 global_seq,
1860 connection->policy.features_supported,
1861 connection->policy.features_required | msgr2_required,
1862 flags,
1863 client_cookie);
1864
1865 ldout(cct, 5) << __func__ << " sending identification: "
1866 << "addrs=" << messenger->get_myaddrs()
1867 << " target=" << connection->target_addr
1868 << " gid=" << messenger->get_myname().num()
1869 << " global_seq=" << global_seq
1870 << " features_supported=" << std::hex
1871 << connection->policy.features_supported
1872 << " features_required="
1873 << (connection->policy.features_required | msgr2_required)
1874 << " flags=" << flags
1875 << " cookie=" << client_cookie << std::dec << dendl;
1876
1877 INTERCEPT(11);
1878
1879 return WRITE(client_ident, "client ident", read_frame);
1880}
1881
1882CtPtr ProtocolV2::send_reconnect() {
1883 ldout(cct, 20) << __func__ << dendl;
1884
1885 auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1886 client_cookie,
1887 server_cookie,
1888 global_seq,
1889 connect_seq,
1890 in_seq);
1891
1892 ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1893 << std::hex << client_cookie << " server_cookie="
1894 << server_cookie << std::dec
1895 << " gs=" << global_seq << " cs=" << connect_seq
1896 << " ms=" << in_seq << dendl;
1897
1898 INTERCEPT(13);
1899
1900 return WRITE(reconnect, "reconnect", read_frame);
1901}
1902
1903CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1904{
1905 ldout(cct, 20) << __func__
1906 << " payload.length()=" << payload.length() << dendl;
1907
1908 if (state != SESSION_CONNECTING) {
1909 lderr(cct) << __func__ << " not in session connect state!" << dendl;
1910 return _fault();
1911 }
1912
1913 auto ident_missing =
1914 IdentMissingFeaturesFrame::Decode(payload);
1915 lderr(cct) << __func__
1916 << " client does not support all server features: " << std::hex
1917 << ident_missing.features() << std::dec << dendl;
1918
1919 return _fault();
1920}
1921
1922CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
1923{
1924 ldout(cct, 20) << __func__
1925 << " payload.length()=" << payload.length() << dendl;
1926
1927 if (state != SESSION_RECONNECTING) {
1928 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1929 return _fault();
1930 }
1931
1932 auto reset = ResetFrame::Decode(payload);
1933
1934 ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
1935 << dendl;
1936 if (reset.full()) {
1937 reset_session();
1938 } else {
1939 server_cookie = 0;
1940 connect_seq = 0;
1941 in_seq = 0;
1942 }
1943
1944 state = SESSION_CONNECTING;
1945 return send_client_ident();
1946}
1947
1948CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
1949{
1950 ldout(cct, 20) << __func__
1951 << " payload.length()=" << payload.length() << dendl;
1952
1953 if (state != SESSION_RECONNECTING) {
1954 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1955 return _fault();
1956 }
1957
1958 auto retry = RetryFrame::Decode(payload);
1959 connect_seq = retry.connect_seq() + 1;
1960
1961 ldout(cct, 1) << __func__
1962 << " received session retry connect_seq=" << retry.connect_seq()
1963 << ", inc to cs=" << connect_seq << dendl;
1964
1965 return send_reconnect();
1966}
1967
1968CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
1969{
1970 ldout(cct, 20) << __func__
1971 << " payload.length()=" << payload.length() << dendl;
1972
1973 if (state != SESSION_RECONNECTING) {
1974 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1975 return _fault();
1976 }
1977
1978 auto retry = RetryGlobalFrame::Decode(payload);
1979 global_seq = messenger->get_global_seq(retry.global_seq());
1980
1981 ldout(cct, 1) << __func__ << " received session retry global global_seq="
1982 << retry.global_seq() << ", choose new gs=" << global_seq
1983 << dendl;
1984
1985 return send_reconnect();
1986}
1987
1988CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
1989 ldout(cct, 20) << __func__
1990 << " received WAIT (connection race)"
1991 << " payload.length()=" << payload.length()
1992 << dendl;
1993
1994 if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
1995 lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
1996 return _fault();
1997 }
1998
1999 state = WAIT;
2000 WaitFrame::Decode(payload);
2001 return _fault();
2002}
2003
2004CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2005{
2006 ldout(cct, 20) << __func__
2007 << " payload.length()=" << payload.length() << dendl;
2008
2009 if (state != SESSION_RECONNECTING) {
2010 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2011 return _fault();
2012 }
2013
2014 auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2015 ldout(cct, 5) << __func__
2016 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2017 << dendl;
2018
2019 out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2020
2021 backoff = utime_t();
2022 ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2023 << ", lossy = " << connection->policy.lossy << ", features "
2024 << connection->get_features() << dendl;
2025
2026 if (connection->delay_state) {
2027 ceph_assert(connection->delay_state->ready());
2028 }
2029
2030 connection->dispatch_queue->queue_connect(connection);
2031 messenger->ms_deliver_handle_fast_connect(connection);
2032
2033 return ready();
2034}
2035
2036CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2037{
2038 ldout(cct, 20) << __func__
2039 << " payload.length()=" << payload.length() << dendl;
2040
2041 if (state != SESSION_CONNECTING) {
2042 lderr(cct) << __func__ << " not in session connect state!" << dendl;
2043 return _fault();
2044 }
2045
2046 auto server_ident = ServerIdentFrame::Decode(payload);
2047 ldout(cct, 5) << __func__ << " received server identification:"
2048 << " addrs=" << server_ident.addrs()
2049 << " gid=" << server_ident.gid()
2050 << " global_seq=" << server_ident.global_seq()
2051 << " features_supported=" << std::hex
2052 << server_ident.supported_features()
2053 << " features_required=" << server_ident.required_features()
2054 << " flags=" << server_ident.flags() << " cookie=" << std::dec
2055 << server_ident.cookie() << dendl;
2056
2057 // is this who we intended to talk to?
2058 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2059 // of mon_host or something.
2060 if (!server_ident.addrs().contains(connection->target_addr)) {
2061 ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2062 << ", does not include " << connection->target_addr << dendl;
2063 return _fault();
2064 }
2065
2066 server_cookie = server_ident.cookie();
2067
2068 connection->set_peer_addrs(server_ident.addrs());
2069 peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2070 connection->set_features(server_ident.supported_features() &
2071 connection->policy.features_supported);
2072 peer_global_seq = server_ident.global_seq();
2073
2074 connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2075
2076 backoff = utime_t();
2077 ldout(cct, 10) << __func__ << " connect success " << connect_seq
2078 << ", lossy = " << connection->policy.lossy << ", features "
2079 << connection->get_features() << dendl;
2080
2081 if (connection->delay_state) {
2082 ceph_assert(connection->delay_state->ready());
2083 }
2084
2085 connection->dispatch_queue->queue_connect(connection);
2086 messenger->ms_deliver_handle_fast_connect(connection);
2087
2088 return ready();
2089}
2090
2091/* Server Protocol Methods */
2092
2093CtPtr ProtocolV2::start_server_banner_exchange() {
2094 ldout(cct, 20) << __func__ << dendl;
2095
2096 INTERCEPT(2);
2097
2098 state = BANNER_ACCEPTING;
2099
2100 return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2101}
2102
2103CtPtr ProtocolV2::post_server_banner_exchange() {
2104 ldout(cct, 20) << __func__ << dendl;
2105
2106 state = AUTH_ACCEPTING;
2107
2108 return CONTINUE(read_frame);
2109}
2110
2111CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2112 ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2113 << dendl;
2114
2115 if (state != AUTH_ACCEPTING) {
2116 lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2117 return _fault();
2118 }
2119
2120 auto request = AuthRequestFrame::Decode(payload);
2121 ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2122 << ", preferred_modes=" << request.preferred_modes()
2123 << ", payload_len=" << request.auth_payload().length() << ")"
2124 << dendl;
2125 auth_meta->auth_method = request.method();
2126 auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2127 connection->get_peer_type(), auth_meta->auth_method,
2128 request.preferred_modes());
2129 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2130 return _auth_bad_method(-EOPNOTSUPP);
2131 }
2132 return _handle_auth_request(request.auth_payload(), false);
2133}
2134
2135CtPtr ProtocolV2::_auth_bad_method(int r)
2136{
2137 ceph_assert(r < 0);
2138 std::vector<uint32_t> allowed_methods;
2139 std::vector<uint32_t> allowed_modes;
2140 messenger->auth_server->get_supported_auth_methods(
2141 connection->get_peer_type(), &allowed_methods, &allowed_modes);
2142 ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2143 << " r " << cpp_strerror(r)
2144 << ", allowed_methods " << allowed_methods
2145 << ", allowed_modes " << allowed_modes
2146 << dendl;
2147 auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2148 allowed_methods, allowed_modes);
2149 return WRITE(bad_method, "bad auth method", read_frame);
2150}
2151
2152CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
2153{
2154 if (!messenger->auth_server) {
2155 return _fault();
2156 }
2157 bufferlist reply;
2158 auto am = auth_meta;
2159 connection->lock.unlock();
2160 int r = messenger->auth_server->handle_auth_request(
2161 connection, am.get(),
2162 more, am->auth_method, auth_payload,
2163 &reply);
2164 connection->lock.lock();
2165 if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2166 ldout(cct, 1) << __func__
2167 << " state changed while accept, it must be mark_down"
2168 << dendl;
2169 ceph_assert(state == CLOSED);
2170 return _fault();
2171 }
2172 if (r == 1) {
2173 INTERCEPT(10);
2174 state = AUTH_ACCEPTING_SIGN;
2175
2176 auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2177 auth_meta->con_mode,
2178 reply);
2179 return WRITE(auth_done, "auth done", finish_auth);
2180 } else if (r == 0) {
2181 state = AUTH_ACCEPTING_MORE;
2182
2183 auto more = AuthReplyMoreFrame::Encode(reply);
2184 return WRITE(more, "auth reply more", read_frame);
2185 } else if (r == -EBUSY) {
2186 // kick the client and maybe they'll come back later
2187 return _fault();
2188 } else {
2189 return _auth_bad_method(r);
2190 }
2191}
2192
2193CtPtr ProtocolV2::finish_auth()
2194{
2195 ceph_assert(auth_meta);
2196 // TODO: having a possibility to check whether we're server or client could
2197 // allow reusing finish_auth().
2198 session_stream_handlers = \
2199 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
2200
2201 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2202 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2203 auto sig_frame = AuthSignatureFrame::Encode(sig);
2204 pre_auth.enabled = false;
2205 pre_auth.rxbuf.clear();
2206 return WRITE(sig_frame, "auth signature", read_frame);
2207}
2208
2209CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2210{
2211 ldout(cct, 20) << __func__
2212 << " payload.length()=" << payload.length() << dendl;
2213
2214 if (state != AUTH_ACCEPTING_MORE) {
2215 lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2216 return _fault();
2217 }
2218
2219 auto auth_more = AuthRequestMoreFrame::Decode(payload);
2220 return _handle_auth_request(auth_more.auth_payload(), true);
2221}
2222
2223CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2224{
2225 ldout(cct, 20) << __func__
2226 << " payload.length()=" << payload.length() << dendl;
2227
2228 if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2229 lderr(cct) << __func__
2230 << " pre-auth verification signature seen in wrong state!"
2231 << dendl;
2232 return _fault();
2233 }
2234
2235 auto sig_frame = AuthSignatureFrame::Decode(payload);
2236
2237 const auto actual_tx_sig = auth_meta->session_key.empty() ?
2238 sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2239 if (sig_frame.signature() != actual_tx_sig) {
2240 ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2241 << " actual_tx_sig=" << actual_tx_sig
2242 << " sig_frame.signature()=" << sig_frame.signature()
2243 << dendl;
2244 return _fault();
2245 } else {
2246 ldout(cct, 20) << __func__ << " pre-auth signature success"
2247 << " sig_frame.signature()=" << sig_frame.signature()
2248 << dendl;
2249 pre_auth.txbuf.clear();
2250 }
2251
2252 if (state == AUTH_ACCEPTING_SIGN) {
2253 // server had sent AuthDone and client responded with correct pre-auth
2254 // signature. we can start accepting new sessions/reconnects.
2255 state = SESSION_ACCEPTING;
2256 return CONTINUE(read_frame);
2257 } else if (state == AUTH_CONNECTING_SIGN) {
2258 // this happened at client side
2259 return finish_client_auth();
2260 } else {
2261 ceph_assert_always("state corruption" == nullptr);
2262 }
2263}
2264
2265CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2266{
2267 ldout(cct, 20) << __func__
2268 << " payload.length()=" << payload.length() << dendl;
2269
2270 if (state != SESSION_ACCEPTING) {
2271 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2272 return _fault();
2273 }
2274
2275 auto client_ident = ClientIdentFrame::Decode(payload);
2276
2277 ldout(cct, 5) << __func__ << " received client identification:"
2278 << " addrs=" << client_ident.addrs()
2279 << " target=" << client_ident.target_addr()
2280 << " gid=" << client_ident.gid()
2281 << " global_seq=" << client_ident.global_seq()
2282 << " features_supported=" << std::hex
2283 << client_ident.supported_features()
2284 << " features_required=" << client_ident.required_features()
2285 << " flags=" << client_ident.flags()
2286 << " cookie=" << client_ident.cookie() << std::dec << dendl;
2287
2288 if (client_ident.addrs().empty() ||
2289 client_ident.addrs().front() == entity_addr_t()) {
2290 ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2291 return _fault(); // a v2 peer should never do this
2292 }
2293 if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2294 ldout(cct,5) << __func__ << " peer is trying to reach "
2295 << client_ident.target_addr()
2296 << " which is not us (" << messenger->get_myaddrs() << ")"
2297 << dendl;
2298 return _fault();
2299 }
2300
2301 connection->set_peer_addrs(client_ident.addrs());
2302 connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2303
2304 peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2305 connection->set_peer_id(client_ident.gid());
2306
2307 client_cookie = client_ident.cookie();
2308
2309 uint64_t feat_missing =
2310 (connection->policy.features_required | msgr2_required) &
2311 ~(uint64_t)client_ident.supported_features();
2312 if (feat_missing) {
2313 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2314 << feat_missing << std::dec << dendl;
2315 auto ident_missing_features =
2316 IdentMissingFeaturesFrame::Encode(feat_missing);
2317
2318 return WRITE(ident_missing_features, "ident missing features", read_frame);
2319 }
2320
2321 connection_features =
2322 client_ident.supported_features() & connection->policy.features_supported;
2323
2324 peer_global_seq = client_ident.global_seq();
2325
2326 // Looks good so far, let's check if there is already an existing connection
2327 // to this peer.
2328
2329 connection->lock.unlock();
2330 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2331
2332 if (existing &&
2333 existing->protocol->proto_type != 2) {
2334 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2335 << existing->protocol.get() << " version is "
2336 << existing->protocol->proto_type << ", marking down" << dendl;
2337 existing->mark_down();
2338 existing = nullptr;
2339 }
2340
2341 connection->inject_delay();
2342
2343 connection->lock.lock();
2344 if (state != SESSION_ACCEPTING) {
2345 ldout(cct, 1) << __func__
2346 << " state changed while accept, it must be mark_down"
2347 << dendl;
2348 ceph_assert(state == CLOSED);
2349 return _fault();
2350 }
2351
2352 if (existing) {
2353 return handle_existing_connection(existing);
2354 }
2355
2356 // if everything is OK reply with server identification
2357 return send_server_ident();
2358}
2359
2360CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2361{
2362 ldout(cct, 20) << __func__
2363 << " payload.length()=" << payload.length() << dendl;
2364
2365 if (state != SESSION_ACCEPTING) {
2366 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2367 return _fault();
2368 }
2369
2370 auto reconnect = ReconnectFrame::Decode(payload);
2371
2372 ldout(cct, 5) << __func__
2373 << " received reconnect:"
2374 << " client_cookie=" << std::hex << reconnect.client_cookie()
2375 << " server_cookie=" << reconnect.server_cookie() << std::dec
2376 << " gs=" << reconnect.global_seq()
2377 << " cs=" << reconnect.connect_seq()
2378 << " ms=" << reconnect.msg_seq()
2379 << dendl;
2380
2381 // Should we check if one of the ident.addrs match connection->target_addr
2382 // as we do in ProtocolV1?
2383 connection->set_peer_addrs(reconnect.addrs());
2384 connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2385 peer_global_seq = reconnect.global_seq();
2386
2387 connection->lock.unlock();
2388 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2389
2390 if (existing &&
2391 existing->protocol->proto_type != 2) {
2392 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2393 << existing->protocol.get() << " version is "
2394 << existing->protocol->proto_type << ", marking down" << dendl;
2395 existing->mark_down();
2396 existing = nullptr;
2397 }
2398
2399 connection->inject_delay();
2400
2401 connection->lock.lock();
2402 if (state != SESSION_ACCEPTING) {
2403 ldout(cct, 1) << __func__
2404 << " state changed while accept, it must be mark_down"
2405 << dendl;
2406 ceph_assert(state == CLOSED);
2407 return _fault();
2408 }
2409
2410 if (!existing) {
2411 // there is no existing connection therefore cannot reconnect to previous
2412 // session
2413 ldout(cct, 0) << __func__
2414 << " no existing connection exists, reseting client" << dendl;
2415 auto reset = ResetFrame::Encode(true);
2416 return WRITE(reset, "session reset", read_frame);
2417 }
2418
2419 std::lock_guard<std::mutex> l(existing->lock);
2420
2421 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2422 if (!exproto) {
2423 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2424 ceph_assert(false);
2425 }
2426
2427 if (exproto->state == CLOSED) {
2428 ldout(cct, 5) << __func__ << " existing " << existing
2429 << " already closed. Reseting client" << dendl;
2430 auto reset = ResetFrame::Encode(true);
2431 return WRITE(reset, "session reset", read_frame);
2432 }
2433
2434 if (exproto->replacing) {
2435 ldout(cct, 1) << __func__
2436 << " existing racing replace happened while replacing."
2437 << " existing=" << existing << dendl;
2438 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2439 return WRITE(retry, "session retry", read_frame);
2440 }
2441
2442 if (exproto->client_cookie != reconnect.client_cookie()) {
2443 ldout(cct, 1) << __func__ << " existing=" << existing
2444 << " client cookie mismatch, I must have reseted:"
2445 << " cc=" << std::hex << exproto->client_cookie
2446 << " rcc=" << reconnect.client_cookie()
2447 << ", reseting client." << std::dec
2448 << dendl;
2449 auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2450 return WRITE(reset, "session reset", read_frame);
2451 } else if (exproto->server_cookie == 0) {
2452 // this happens when:
2453 // - a connects to b
2454 // - a sends client_ident
2455 // - b gets client_ident, sends server_ident and sets cookie X
2456 // - connection fault
2457 // - b reconnects to a with cookie X, connect_seq=1
2458 // - a has cookie==0
2459 ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2460 << " server_ident. Asking peer to resume session"
2461 << " establishment" << dendl;
2462 auto reset = ResetFrame::Encode(false);
2463 return WRITE(reset, "session reset", read_frame);
2464 }
2465
2466 if (exproto->peer_global_seq > reconnect.global_seq()) {
2467 ldout(cct, 5) << __func__
2468 << " stale global_seq: sgs=" << exproto->peer_global_seq
2469 << " cgs=" << reconnect.global_seq()
2470 << ", ask client to retry global" << dendl;
2471 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2472
2473 INTERCEPT(18);
2474
2475 return WRITE(retry, "session retry", read_frame);
2476 }
2477
2478 if (exproto->connect_seq > reconnect.connect_seq()) {
2479 ldout(cct, 5) << __func__
2480 << " stale connect_seq scs=" << exproto->connect_seq
2481 << " ccs=" << reconnect.connect_seq()
2482 << " , ask client to retry" << dendl;
2483 auto retry = RetryFrame::Encode(exproto->connect_seq);
2484 return WRITE(retry, "session retry", read_frame);
2485 }
2486
2487 if (exproto->connect_seq == reconnect.connect_seq()) {
2488 // reconnect race: both peers are sending reconnect messages
2489 if (existing->peer_addrs->msgr2_addr() >
2490 messenger->get_myaddrs().msgr2_addr() &&
2491 !existing->policy.server) {
2492 // the existing connection wins
2493 ldout(cct, 1)
2494 << __func__
2495 << " reconnect race detected, this connection loses to existing="
2496 << existing << dendl;
2497
2498 auto wait = WaitFrame::Encode();
2499 return WRITE(wait, "wait", read_frame);
2500 } else {
2501 // this connection wins
2502 ldout(cct, 1) << __func__
2503 << " reconnect race detected, replacing existing="
2504 << existing << " socket by this connection's socket"
2505 << dendl;
2506 }
2507 }
2508
2509 ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2510
2511 reconnecting = true;
2512
2513 // everything looks good
2514 exproto->connect_seq = reconnect.connect_seq();
2515 exproto->message_seq = reconnect.msg_seq();
2516
2517 return reuse_connection(existing, exproto);
2518}
2519
2520CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
2521 ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2522
2523 std::lock_guard<std::mutex> l(existing->lock);
2524
2525 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2526 if (!exproto) {
2527 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2528 ceph_assert(false);
2529 }
2530
2531 if (exproto->state == CLOSED) {
2532 ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2533 << dendl;
2534 return send_server_ident();
2535 }
2536
2537 if (exproto->replacing) {
2538 ldout(cct, 1) << __func__
2539 << " existing racing replace happened while replacing."
2540 << " existing=" << existing << dendl;
2541 auto wait = WaitFrame::Encode();
2542 return WRITE(wait, "wait", read_frame);
2543 }
2544
2545 if (exproto->peer_global_seq > peer_global_seq) {
2546 ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2547 << peer_global_seq
2548 << " existing->peer_global_seq=" << exproto->peer_global_seq
2549 << ", stopping this connection." << dendl;
2550 stop();
2551 connection->dispatch_queue->queue_reset(connection);
2552 return nullptr;
2553 }
2554
2555 if (existing->policy.lossy) {
2556 // existing connection can be thrown out in favor of this one
2557 ldout(cct, 1)
2558 << __func__ << " existing=" << existing
2559 << " is a lossy channel. Stopping existing in favor of this connection"
2560 << dendl;
2561 existing->protocol->stop();
2562 existing->dispatch_queue->queue_reset(existing.get());
2563 return send_server_ident();
2564 }
2565
2566 if (exproto->server_cookie && exproto->client_cookie &&
2567 exproto->client_cookie != client_cookie) {
2568 // Found previous session
2569 // peer has reseted and we're going to reuse the existing connection
2570 // by replacing the communication socket
2571 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2572 << ", peer must have reseted." << dendl;
2573 if (connection->policy.resetcheck) {
2574 exproto->reset_session();
2575 }
2576 return reuse_connection(existing, exproto);
2577 }
2578
2579 if (exproto->client_cookie == client_cookie) {
2580 // session establishment interrupted between client_ident and server_ident,
2581 // continuing...
2582 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2583 << ", continuing session establishment." << dendl;
2584 return reuse_connection(existing, exproto);
2585 }
2586
2587 if (exproto->state == READY || exproto->state == STANDBY) {
2588 ldout(cct, 1) << __func__ << " existing=" << existing
2589 << " is READY/STANDBY, lets reuse it" << dendl;
2590 return reuse_connection(existing, exproto);
2591 }
2592
2593 // Looks like a connection race: server and client are both connecting to
2594 // each other at the same time.
2595 if (connection->peer_addrs->msgr2_addr() <
2596 messenger->get_myaddrs().msgr2_addr() ||
2597 existing->policy.server) {
2598 // this connection wins
2599 ldout(cct, 1) << __func__
2600 << " connection race detected, replacing existing="
2601 << existing << " socket by this connection's socket" << dendl;
2602 return reuse_connection(existing, exproto);
2603 } else {
2604 // the existing connection wins
2605 ldout(cct, 1)
2606 << __func__
2607 << " connection race detected, this connection loses to existing="
2608 << existing << dendl;
2609 ceph_assert(connection->peer_addrs->msgr2_addr() >
2610 messenger->get_myaddrs().msgr2_addr());
2611
2612 // make sure we follow through with opening the existing
2613 // connection (if it isn't yet open) since we know the peer
2614 // has something to send to us.
2615 existing->send_keepalive();
2616 auto wait = WaitFrame::Encode();
2617 return WRITE(wait, "wait", read_frame);
2618 }
2619}
2620
2621CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
2622 ProtocolV2 *exproto) {
2623 ldout(cct, 20) << __func__ << " existing=" << existing
2624 << " reconnect=" << reconnecting << dendl;
2625
2626 connection->inject_delay();
2627
2628 std::lock_guard<std::mutex> l(existing->write_lock);
2629
2630 connection->center->delete_file_event(connection->cs.fd(),
2631 EVENT_READABLE | EVENT_WRITABLE);
2632
2633 if (existing->delay_state) {
2634 existing->delay_state->flush();
2635 ceph_assert(!connection->delay_state);
2636 }
2637 exproto->reset_recv_state();
2638 exproto->pre_auth.enabled = false;
2639
2640 if (!reconnecting) {
2641 exproto->client_cookie = client_cookie;
2642 exproto->peer_name = peer_name;
2643 exproto->connection_features = connection_features;
2644 existing->set_features(connection_features);
2645 }
2646 exproto->peer_global_seq = peer_global_seq;
2647
2648 auto temp_cs = std::move(connection->cs);
2649 EventCenter *new_center = connection->center;
2650 Worker *new_worker = connection->worker;
2651
2652 ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2653 << dendl;
2654 // avoid _stop shutdown replacing socket
2655 // queue a reset on the new connection, which we're dumping for the old
2656 stop();
2657
2658 connection->dispatch_queue->queue_reset(connection);
2659
2660 exproto->can_write = false;
2661 exproto->reconnecting = reconnecting;
2662 exproto->replacing = true;
2663 std::swap(exproto->session_stream_handlers, session_stream_handlers);
2664 exproto->auth_meta = auth_meta;
2665 existing->state_offset = 0;
2666 // avoid previous thread modify event
2667 exproto->state = NONE;
2668 existing->state = AsyncConnection::STATE_NONE;
2669 // Discard existing prefetch buffer in `recv_buf`
2670 existing->recv_start = existing->recv_end = 0;
2671 // there shouldn't exist any buffer
2672 ceph_assert(connection->recv_start == connection->recv_end);
2673
2674 auto deactivate_existing = std::bind(
2675 [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
2676 // we need to delete time event in original thread
2677 {
2678 std::lock_guard<std::mutex> l(existing->lock);
2679 existing->write_lock.lock();
2680 exproto->requeue_sent();
2681 existing->outcoming_bl.clear();
2682 existing->open_write = false;
2683 existing->write_lock.unlock();
2684 if (exproto->state == NONE) {
2685 existing->shutdown_socket();
2686 existing->cs = std::move(cs);
2687 existing->worker->references--;
2688 new_worker->references++;
2689 existing->logger = new_worker->get_perf_counter();
2690 existing->worker = new_worker;
2691 existing->center = new_center;
2692 if (existing->delay_state)
2693 existing->delay_state->set_center(new_center);
2694 } else if (exproto->state == CLOSED) {
2695 auto back_to_close = std::bind(
2696 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2697 new_center->submit_to(new_center->get_id(),
2698 std::move(back_to_close), true);
2699 return;
2700 } else {
2701 ceph_abort();
2702 }
2703 }
2704
2705 // Before changing existing->center, it may already exists some
2706 // events in existing->center's queue. Then if we mark down
2707 // `existing`, it will execute in another thread and clean up
2708 // connection. Previous event will result in segment fault
2709 auto transfer_existing = [existing, exproto]() mutable {
2710 std::lock_guard<std::mutex> l(existing->lock);
2711 if (exproto->state == CLOSED) return;
2712 ceph_assert(exproto->state == NONE);
2713
2714 exproto->state = SESSION_ACCEPTING;
2715 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2716 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2717 existing->read_handler);
2718 if (!exproto->reconnecting) {
2719 exproto->run_continuation(exproto->send_server_ident());
2720 } else {
2721 exproto->run_continuation(exproto->send_reconnect_ok());
2722 }
2723 };
2724 if (existing->center->in_thread())
2725 transfer_existing();
2726 else
2727 existing->center->submit_to(existing->center->get_id(),
2728 std::move(transfer_existing), true);
2729 },
2730 std::move(temp_cs));
2731
2732 existing->center->submit_to(existing->center->get_id(),
2733 std::move(deactivate_existing), true);
2734 return nullptr;
2735}
2736
2737CtPtr ProtocolV2::send_server_ident() {
2738 ldout(cct, 20) << __func__ << dendl;
2739
2740 // this is required for the case when this connection is being replaced
2741 out_seq = discard_requeued_up_to(out_seq, 0);
2742 in_seq = 0;
2743
2744 if (!connection->policy.lossy) {
2745 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2746 }
2747
2748 uint64_t flags = 0;
2749 if (connection->policy.lossy) {
2750 flags = flags | CEPH_MSG_CONNECT_LOSSY;
2751 }
2752
2753 uint64_t gs = messenger->get_global_seq();
2754 auto server_ident = ServerIdentFrame::Encode(
2755 messenger->get_myaddrs(),
2756 messenger->get_myname().num(),
2757 gs,
2758 connection->policy.features_supported,
2759 connection->policy.features_required | msgr2_required,
2760 flags,
2761 server_cookie);
2762
2763 ldout(cct, 5) << __func__ << " sending identification:"
2764 << " addrs=" << messenger->get_myaddrs()
2765 << " gid=" << messenger->get_myname().num()
2766 << " global_seq=" << gs << " features_supported=" << std::hex
2767 << connection->policy.features_supported
2768 << " features_required="
2769 << (connection->policy.features_required | msgr2_required)
2770 << " flags=" << flags << " cookie=" << std::dec << server_cookie
2771 << dendl;
2772
2773 connection->lock.unlock();
2774 // Because "replacing" will prevent other connections preempt this addr,
2775 // it's safe that here we don't acquire Connection's lock
2776 ssize_t r = messenger->accept_conn(connection);
2777
2778 connection->inject_delay();
2779
2780 connection->lock.lock();
2781
2782 if (r < 0) {
2783 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2784 << connection->peer_addrs->msgr2_addr()
2785 << " just fail later one(this)" << dendl;
2786 connection->inject_delay();
2787 return _fault();
2788 }
2789 if (state != SESSION_ACCEPTING) {
2790 ldout(cct, 1) << __func__
2791 << " state changed while accept_conn, it must be mark_down"
2792 << dendl;
2793 ceph_assert(state == CLOSED || state == NONE);
2794 messenger->unregister_conn(connection);
2795 connection->inject_delay();
2796 return _fault();
2797 }
2798
2799 connection->set_features(connection_features);
2800
2801 // notify
2802 connection->dispatch_queue->queue_accept(connection);
2803 messenger->ms_deliver_handle_fast_accept(connection);
2804
2805 INTERCEPT(12);
2806
2807 return WRITE(server_ident, "server ident", server_ready);
2808}
2809
2810CtPtr ProtocolV2::server_ready() {
2811 ldout(cct, 20) << __func__ << dendl;
2812
2813 if (connection->delay_state) {
2814 ceph_assert(connection->delay_state->ready());
2815 }
2816
2817 return ready();
2818}
2819
2820CtPtr ProtocolV2::send_reconnect_ok() {
2821 ldout(cct, 20) << __func__ << dendl;
2822
2823 out_seq = discard_requeued_up_to(out_seq, message_seq);
2824
2825 uint64_t ms = in_seq;
2826 auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2827
2828 ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2829
2830 connection->lock.unlock();
2831 // Because "replacing" will prevent other connections preempt this addr,
2832 // it's safe that here we don't acquire Connection's lock
2833 ssize_t r = messenger->accept_conn(connection);
2834
2835 connection->inject_delay();
2836
2837 connection->lock.lock();
2838
2839 if (r < 0) {
2840 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2841 << connection->peer_addrs->msgr2_addr()
2842 << " just fail later one(this)" << dendl;
2843 connection->inject_delay();
2844 return _fault();
2845 }
2846 if (state != SESSION_ACCEPTING) {
2847 ldout(cct, 1) << __func__
2848 << " state changed while accept_conn, it must be mark_down"
2849 << dendl;
2850 ceph_assert(state == CLOSED || state == NONE);
2851 messenger->unregister_conn(connection);
2852 connection->inject_delay();
2853 return _fault();
2854 }
2855
2856 // notify
2857 connection->dispatch_queue->queue_accept(connection);
2858 messenger->ms_deliver_handle_fast_accept(connection);
2859
2860 INTERCEPT(14);
2861
2862 return WRITE(reconnect_ok, "reconnect ok", server_ready);
2863}