]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/ProtocolV2.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <type_traits>
5
6#include "ProtocolV2.h"
7#include "AsyncMessenger.h"
8
9#include "common/EventTrace.h"
10#include "common/ceph_crypto.h"
11#include "common/errno.h"
12#include "include/random.h"
13#include "auth/AuthClient.h"
14#include "auth/AuthServer.h"
15
16#define dout_subsys ceph_subsys_ms
17#undef dout_prefix
18#define dout_prefix _conn_prefix(_dout)
f67539c2 19std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
11fdf7f2
TL
20 return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21 << *connection->peer_addrs << " conn(" << connection << " "
22 << this
23 << " " << ceph_con_mode_name(auth_meta->con_mode)
24 << " :" << connection->port
25 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq << " l=" << connection->policy.lossy
f6b5b4d7
TL
27 << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
28 REVISION_1)
20effc67 29 << " crypto rx=" << session_stream_handlers.rx.get()
11fdf7f2 30 << " tx=" << session_stream_handlers.tx.get()
20effc67
TL
31 << " comp rx=" << session_compression_handlers.rx.get()
32 << " tx=" << session_compression_handlers.tx.get()
11fdf7f2
TL
33 << ").";
34}
35
36using namespace ceph::msgr::v2;
37
38using CtPtr = Ct<ProtocolV2> *;
39using CtRef = Ct<ProtocolV2> &;
40
41void ProtocolV2::run_continuation(CtPtr pcontinuation) {
42 if (pcontinuation) {
43 run_continuation(*pcontinuation);
44 }
45}
46
47void ProtocolV2::run_continuation(CtRef continuation) {
48 try {
49 CONTINUATION_RUN(continuation)
f67539c2
TL
50 } catch (const ceph::buffer::error &e) {
51 lderr(cct) << __func__ << " failed decoding of frame header: " << e.what()
11fdf7f2
TL
52 << dendl;
53 _fault();
54 } catch (const ceph::crypto::onwire::MsgAuthError &e) {
55 lderr(cct) << __func__ << " " << e.what() << dendl;
56 _fault();
57 } catch (const DecryptionError &) {
58 lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
59 }
60}
61
62#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
63
f67539c2 64#define READ(L, C) read(CONTINUATION(C), ceph::buffer::ptr_node::create(ceph::buffer::create(L)))
11fdf7f2
TL
65
66#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
67
68#ifdef UNIT_TESTS_BUILT
69
70#define INTERCEPT(S) { \
71if(connection->interceptor) { \
72 auto a = connection->interceptor->intercept(connection, (S)); \
73 if (a == Interceptor::ACTION::FAIL) { \
74 return _fault(); \
75 } else if (a == Interceptor::ACTION::STOP) { \
76 stop(); \
77 connection->dispatch_queue->queue_reset(connection); \
78 return nullptr; \
79 }}}
80
81#else
82#define INTERCEPT(S)
83#endif
84
85ProtocolV2::ProtocolV2(AsyncConnection *connection)
86 : Protocol(2, connection),
87 state(NONE),
f6b5b4d7 88 peer_supported_features(0),
11fdf7f2
TL
89 client_cookie(0),
90 server_cookie(0),
91 global_seq(0),
92 connect_seq(0),
93 peer_global_seq(0),
94 message_seq(0),
95 reconnecting(false),
96 replacing(false),
97 can_write(false),
98 bannerExchangeCallback(nullptr),
20effc67
TL
99 tx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
100 &session_compression_handlers),
101 rx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
102 &session_compression_handlers),
11fdf7f2
TL
103 next_tag(static_cast<Tag>(0)),
104 keepalive(false) {
105}
106
107ProtocolV2::~ProtocolV2() {
108}
109
110void ProtocolV2::connect() {
111 ldout(cct, 1) << __func__ << dendl;
112 state = START_CONNECT;
113 pre_auth.enabled = true;
114}
115
116void ProtocolV2::accept() {
117 ldout(cct, 1) << __func__ << dendl;
118 state = START_ACCEPT;
119}
120
121bool ProtocolV2::is_connected() { return can_write; }
122
123/*
124 * Tears down the message queues, and removes them from the
125 * DispatchQueue Must hold write_lock prior to calling.
126 */
127void ProtocolV2::discard_out_queue() {
128 ldout(cct, 10) << __func__ << " started" << dendl;
129
f67539c2 130 for (auto p = sent.begin(); p != sent.end(); ++p) {
11fdf7f2
TL
131 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
132 (*p)->put();
133 }
134 sent.clear();
135 for (auto& [ prio, entries ] : out_queue) {
136 static_cast<void>(prio);
137 for (auto& entry : entries) {
138 ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
139 entry.m->put();
140 }
141 }
142 out_queue.clear();
494da23a 143 write_in_progress = false;
11fdf7f2
TL
144}
145
146void ProtocolV2::reset_session() {
147 ldout(cct, 1) << __func__ << dendl;
148
149 std::lock_guard<std::mutex> l(connection->write_lock);
150 if (connection->delay_state) {
151 connection->delay_state->discard();
152 }
153
154 connection->dispatch_queue->discard_queue(connection->conn_id);
155 discard_out_queue();
9f95a23c 156 connection->outgoing_bl.clear();
11fdf7f2
TL
157
158 connection->dispatch_queue->queue_remote_reset(connection);
159
160 out_seq = 0;
161 in_seq = 0;
162 client_cookie = 0;
163 server_cookie = 0;
164 connect_seq = 0;
165 peer_global_seq = 0;
166 message_seq = 0;
167 ack_left = 0;
168 can_write = false;
169}
170
171void ProtocolV2::stop() {
172 ldout(cct, 1) << __func__ << dendl;
173 if (state == CLOSED) {
174 return;
175 }
176
177 if (connection->delay_state) connection->delay_state->flush();
178
179 std::lock_guard<std::mutex> l(connection->write_lock);
180
181 reset_recv_state();
182 discard_out_queue();
183
184 connection->_stop();
185
186 can_write = false;
187 state = CLOSED;
188}
189
190void ProtocolV2::fault() { _fault(); }
191
192void ProtocolV2::requeue_sent() {
494da23a 193 write_in_progress = false;
11fdf7f2
TL
194 if (sent.empty()) {
195 return;
196 }
197
198 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
199 out_seq -= sent.size();
200 while (!sent.empty()) {
201 Message *m = sent.back();
202 sent.pop_back();
203 ldout(cct, 5) << __func__ << " requeueing message m=" << m
204 << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
205 << *m << dendl;
9f95a23c 206 m->clear_payload();
11fdf7f2
TL
207 rq.emplace_front(out_queue_entry_t{false, m});
208 }
209}
210
211uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
212 ldout(cct, 10) << __func__ << " " << seq << dendl;
213 std::lock_guard<std::mutex> l(connection->write_lock);
214 if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
215 return seq;
216 }
217 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
218 uint64_t count = out_seq;
219 while (!rq.empty()) {
220 Message* const m = rq.front().m;
221 if (m->get_seq() == 0 || m->get_seq() > seq) break;
222 ldout(cct, 5) << __func__ << " discarding message m=" << m
223 << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
224 << *m << dendl;
225 m->put();
226 rq.pop_front();
227 count++;
228 }
229 if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
230 return count;
231}
232
9f95a23c
TL
233void ProtocolV2::reset_security() {
234 ldout(cct, 5) << __func__ << dendl;
235
494da23a 236 auth_meta.reset(new AuthConnectionMeta);
494da23a 237 session_stream_handlers.rx.reset(nullptr);
9f95a23c 238 session_stream_handlers.tx.reset(nullptr);
494da23a 239 pre_auth.rxbuf.clear();
9f95a23c
TL
240 pre_auth.txbuf.clear();
241}
242
243// it's expected the `write_lock` is held while calling this method.
244void ProtocolV2::reset_recv_state() {
245 ldout(cct, 5) << __func__ << dendl;
246
247 if (!connection->center->in_thread()) {
248 // execute in the same thread that uses the rx/tx handlers. We need
249 // to do the warp because holding `write_lock` is not enough as
250 // `write_event()` unlocks it just before calling `write_message()`.
251 // `submit_to()` here is NOT blocking.
252 connection->center->submit_to(connection->center->get_id(), [this] {
20effc67 253 ldout(cct, 5) << "reset_recv_state (warped) reseting crypto and compression handlers"
9f95a23c
TL
254 << dendl;
255 // Possibly unnecessary. See the comment in `deactivate_existing`.
256 std::lock_guard<std::mutex> l(connection->lock);
257 std::lock_guard<std::mutex> wl(connection->write_lock);
258 reset_security();
20effc67 259 reset_compression();
9f95a23c
TL
260 }, /* always_async = */true);
261 } else {
262 reset_security();
20effc67 263 reset_compression();
9f95a23c 264 }
11fdf7f2
TL
265
266 // clean read and write callbacks
267 connection->pendingReadLen.reset();
268 connection->writeCallback.reset();
269
270 next_tag = static_cast<Tag>(0);
271
272 reset_throttle();
273}
274
275size_t ProtocolV2::get_current_msg_size() const {
f6b5b4d7 276 ceph_assert(rx_frame_asm.get_num_segments() > 0);
11fdf7f2
TL
277 size_t sum = 0;
278 // we don't include SegmentIndex::Msg::HEADER.
f6b5b4d7
TL
279 for (size_t i = 1; i < rx_frame_asm.get_num_segments(); i++) {
280 sum += rx_frame_asm.get_segment_logical_len(i);
11fdf7f2
TL
281 }
282 return sum;
283}
284
285void ProtocolV2::reset_throttle() {
286 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
287 connection->policy.throttler_messages) {
288 ldout(cct, 10) << __func__ << " releasing " << 1
289 << " message to policy throttler "
290 << connection->policy.throttler_messages->get_current()
291 << "/" << connection->policy.throttler_messages->get_max()
292 << dendl;
293 connection->policy.throttler_messages->put();
294 }
295 if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
296 if (connection->policy.throttler_bytes) {
297 const size_t cur_msg_size = get_current_msg_size();
298 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
299 << " bytes to policy throttler "
300 << connection->policy.throttler_bytes->get_current() << "/"
301 << connection->policy.throttler_bytes->get_max() << dendl;
302 connection->policy.throttler_bytes->put(cur_msg_size);
303 }
304 }
305 if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
306 const size_t cur_msg_size = get_current_msg_size();
307 ldout(cct, 10)
308 << __func__ << " releasing " << cur_msg_size
309 << " bytes to dispatch_queue throttler "
310 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
311 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
312 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
313 }
314}
315
316CtPtr ProtocolV2::_fault() {
317 ldout(cct, 10) << __func__ << dendl;
318
319 if (state == CLOSED || state == NONE) {
320 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
321 return nullptr;
322 }
323
324 if (connection->policy.lossy &&
325 !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
326 ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
327 stop();
328 connection->dispatch_queue->queue_reset(connection);
329 return nullptr;
330 }
331
332 connection->write_lock.lock();
333
334 can_write = false;
335 // requeue sent items
336 requeue_sent();
337
338 if (out_queue.empty() && state >= START_ACCEPT &&
339 state <= SESSION_ACCEPTING && !replacing) {
340 ldout(cct, 2) << __func__ << " with nothing to send and in the half "
341 << " accept state just closed" << dendl;
342 connection->write_lock.unlock();
343 stop();
344 connection->dispatch_queue->queue_reset(connection);
345 return nullptr;
346 }
347
348 replacing = false;
349 connection->fault();
350 reset_recv_state();
351
352 reconnecting = false;
353
354 if (connection->policy.standby && out_queue.empty() && !keepalive &&
355 state != WAIT) {
356 ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
357 << dendl;
358 state = STANDBY;
359 connection->write_lock.unlock();
360 return nullptr;
361 }
362 if (connection->policy.server) {
363 ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
364 state = STANDBY;
365 connection->write_lock.unlock();
366 return nullptr;
367 }
368
369 connection->write_lock.unlock();
370
371 if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
372 state != WAIT &&
373 state != SESSION_ACCEPTING /* due to connection race */) {
374 // policy maybe empty when state is in accept
375 if (connection->policy.server) {
376 ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
377 state = STANDBY;
378 } else {
379 ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
380 connect_seq++;
381 global_seq = messenger->get_global_seq();
382 state = START_CONNECT;
383 pre_auth.enabled = true;
384 connection->state = AsyncConnection::STATE_CONNECTING;
385 }
386 backoff = utime_t();
387 connection->center->dispatch_event_external(connection->read_handler);
388 } else {
389 if (state == WAIT) {
390 backoff.set_from_double(cct->_conf->ms_max_backoff);
391 } else if (backoff == utime_t()) {
392 backoff.set_from_double(cct->_conf->ms_initial_backoff);
393 } else {
394 backoff += backoff;
395 if (backoff > cct->_conf->ms_max_backoff)
396 backoff.set_from_double(cct->_conf->ms_max_backoff);
397 }
398
399 if (server_cookie) {
400 connect_seq++;
401 }
402
403 global_seq = messenger->get_global_seq();
404 state = START_CONNECT;
405 pre_auth.enabled = true;
406 connection->state = AsyncConnection::STATE_CONNECTING;
407 ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
408 // woke up again;
409 connection->register_time_events.insert(
410 connection->center->create_time_event(backoff.to_nsec() / 1000,
411 connection->wakeup_handler));
412 }
413 return nullptr;
414}
415
416void ProtocolV2::prepare_send_message(uint64_t features,
417 Message *m) {
418 ldout(cct, 20) << __func__ << " m=" << *m << dendl;
419
420 // associate message with Connection (for benefit of encode_payload)
9f95a23c
TL
421 ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
422 << features << " " << m << " " << *m << dendl;
11fdf7f2
TL
423
424 // encode and copy out of *m
425 m->encode(features, 0);
426}
427
428void ProtocolV2::send_message(Message *m) {
429 uint64_t f = connection->get_features();
430
431 // TODO: Currently not all messages supports reencode like MOSDMap, so here
432 // only let fast dispatch support messages prepare message
433 const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
434 if (can_fast_prepare) {
435 prepare_send_message(f, m);
436 }
437
438 std::lock_guard<std::mutex> l(connection->write_lock);
439 bool is_prepared = can_fast_prepare;
440 // "features" changes will change the payload encoding
441 if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
442 // ensure the correctness of message encoding
443 m->clear_payload();
444 is_prepared = false;
445 ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
446 << " != " << connection->get_features() << dendl;
447 }
448 if (state == CLOSED) {
449 ldout(cct, 10) << __func__ << " connection closed."
450 << " Drop message " << m << dendl;
451 m->put();
452 } else {
453 ldout(cct, 5) << __func__ << " enqueueing message m=" << m
454 << " type=" << m->get_type() << " " << *m << dendl;
9f95a23c 455 m->queue_start = ceph::mono_clock::now();
11fdf7f2
TL
456 m->trace.event("async enqueueing message");
457 out_queue[m->get_priority()].emplace_back(
458 out_queue_entry_t{is_prepared, m});
459 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
460 << dendl;
494da23a
TL
461 if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
462 write_in_progress = true;
11fdf7f2
TL
463 connection->center->dispatch_event_external(connection->write_handler);
464 }
465 }
466}
467
468void ProtocolV2::send_keepalive() {
469 ldout(cct, 10) << __func__ << dendl;
470 std::lock_guard<std::mutex> l(connection->write_lock);
471 if (state != CLOSED) {
472 keepalive = true;
473 connection->center->dispatch_event_external(connection->write_handler);
474 }
475}
476
477void ProtocolV2::read_event() {
478 ldout(cct, 20) << __func__ << dendl;
479
480 switch (state) {
481 case START_CONNECT:
482 run_continuation(CONTINUATION(start_client_banner_exchange));
483 break;
484 case START_ACCEPT:
485 run_continuation(CONTINUATION(start_server_banner_exchange));
486 break;
487 case READY:
488 run_continuation(CONTINUATION(read_frame));
489 break;
490 case THROTTLE_MESSAGE:
491 run_continuation(CONTINUATION(throttle_message));
492 break;
493 case THROTTLE_BYTES:
494 run_continuation(CONTINUATION(throttle_bytes));
495 break;
496 case THROTTLE_DISPATCH_QUEUE:
497 run_continuation(CONTINUATION(throttle_dispatch_queue));
498 break;
499 default:
500 break;
501 }
502}
503
504ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
505 out_queue_entry_t out_entry;
506
507 if (!out_queue.empty()) {
508 auto it = out_queue.rbegin();
509 auto& entries = it->second;
510 ceph_assert(!entries.empty());
511 out_entry = entries.front();
512 entries.pop_front();
513 if (entries.empty()) {
514 out_queue.erase(it->first);
515 }
516 }
517 return out_entry;
518}
519
520ssize_t ProtocolV2::write_message(Message *m, bool more) {
521 FUNCTRACE(cct);
522 ceph_assert(connection->center->in_thread());
523 m->set_seq(++out_seq);
524
525 connection->lock.lock();
526 uint64_t ack_seq = in_seq;
527 ack_left = 0;
528 connection->lock.unlock();
529
530 ceph_msg_header &header = m->get_header();
531 ceph_msg_footer &footer = m->get_footer();
532
533 ceph_msg_header2 header2{header.seq, header.tid,
534 header.type, header.priority,
535 header.version,
20effc67
TL
536 ceph_le32(0), header.data_off,
537 ceph_le64(ack_seq),
11fdf7f2
TL
538 footer.flags, header.compat_version,
539 header.reserved};
540
541 auto message = MessageFrame::Encode(
542 header2,
543 m->get_payload(),
544 m->get_middle(),
545 m->get_data());
801d1391
TL
546 if (!append_frame(message)) {
547 m->put();
548 return -EILSEQ;
549 }
11fdf7f2
TL
550
551 ldout(cct, 5) << __func__ << " sending message m=" << m
552 << " seq=" << m->get_seq() << " " << *m << dendl;
553
554 m->trace.event("async writing message");
555 ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
556 << " src=" << entity_name_t(messenger->get_myname())
557 << " off=" << header2.data_off
558 << dendl;
9f95a23c 559 ssize_t total_send_size = connection->outgoing_bl.length();
11fdf7f2
TL
560 ssize_t rc = connection->_try_send(more);
561 if (rc < 0) {
562 ldout(cct, 1) << __func__ << " error sending " << m << ", "
563 << cpp_strerror(rc) << dendl;
564 } else {
565 connection->logger->inc(
9f95a23c 566 l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
11fdf7f2
TL
567 ldout(cct, 10) << __func__ << " sending " << m
568 << (rc ? " continuely." : " done.") << dendl;
569 }
9f95a23c
TL
570
571#if defined(WITH_EVENTTRACE)
11fdf7f2
TL
572 if (m->get_type() == CEPH_MSG_OSD_OP)
573 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
574 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
575 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
9f95a23c 576#endif
11fdf7f2
TL
577 m->put();
578
579 return rc;
580}
581
801d1391
TL
582template <class F>
583bool ProtocolV2::append_frame(F& frame) {
584 ceph::bufferlist bl;
585 try {
f6b5b4d7 586 bl = frame.get_buffer(tx_frame_asm);
801d1391
TL
587 } catch (ceph::crypto::onwire::TxHandlerError &e) {
588 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
589 return false;
590 }
f6b5b4d7
TL
591
592 ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
593 << " bytes " << tx_frame_asm << dendl;
20effc67 594 connection->outgoing_bl.claim_append(bl);
801d1391 595 return true;
11fdf7f2
TL
596}
597
598void ProtocolV2::handle_message_ack(uint64_t seq) {
599 if (connection->policy.lossy) { // lossy connections don't keep sent messages
600 return;
601 }
602
603 ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
604
605 // trim sent list
606 static const int max_pending = 128;
607 int i = 0;
608 Message *pending[max_pending];
9f95a23c 609 auto now = ceph::mono_clock::now();
11fdf7f2
TL
610 connection->write_lock.lock();
611 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
612 Message *m = sent.front();
613 sent.pop_front();
614 pending[i++] = m;
615 ldout(cct, 10) << __func__ << " got ack seq " << seq
616 << " >= " << m->get_seq() << " on " << m << " " << *m
617 << dendl;
618 }
619 connection->write_lock.unlock();
9f95a23c 620 connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
11fdf7f2
TL
621 for (int k = 0; k < i; k++) {
622 pending[k]->put();
623 }
624}
625
20effc67
TL
626void ProtocolV2::reset_compression() {
627 ldout(cct, 5) << __func__ << dendl;
628
629 comp_meta = CompConnectionMeta{};
630 session_compression_handlers.rx.reset(nullptr);
631 session_compression_handlers.tx.reset(nullptr);
632}
633
11fdf7f2
TL
634void ProtocolV2::write_event() {
635 ldout(cct, 10) << __func__ << dendl;
636 ssize_t r = 0;
637
638 connection->write_lock.lock();
639 if (can_write) {
640 if (keepalive) {
801d1391
TL
641 ldout(cct, 10) << __func__ << " appending keepalive" << dendl;
642 auto keepalive_frame = KeepAliveFrame::Encode();
643 if (!append_frame(keepalive_frame)) {
644 connection->write_lock.unlock();
645 connection->lock.lock();
646 fault();
647 connection->lock.unlock();
648 return;
649 }
11fdf7f2
TL
650 keepalive = false;
651 }
652
653 auto start = ceph::mono_clock::now();
654 bool more;
655 do {
656 const auto out_entry = _get_next_outgoing();
657 if (!out_entry.m) {
658 break;
659 }
660
661 if (!connection->policy.lossy) {
662 // put on sent list
663 sent.push_back(out_entry.m);
664 out_entry.m->get();
665 }
666 more = !out_queue.empty();
667 connection->write_lock.unlock();
668
669 // send_message or requeue messages may not encode message
670 if (!out_entry.is_prepared) {
671 prepare_send_message(connection->get_features(), out_entry.m);
672 }
673
9f95a23c
TL
674 if (out_entry.m->queue_start != ceph::mono_time()) {
675 connection->logger->tinc(l_msgr_send_messages_queue_lat,
676 ceph::mono_clock::now() -
677 out_entry.m->queue_start);
678 }
679
11fdf7f2
TL
680 r = write_message(out_entry.m, more);
681
682 connection->write_lock.lock();
683 if (r == 0) {
684 ;
685 } else if (r < 0) {
686 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
687 break;
9f95a23c
TL
688 } else if (r > 0) {
689 // Outbound message in-progress, thread will be re-awoken
690 // when the outbound socket is writeable again
11fdf7f2 691 break;
9f95a23c 692 }
11fdf7f2 693 } while (can_write);
494da23a 694 write_in_progress = false;
11fdf7f2
TL
695
696 // if r > 0 mean data still lefted, so no need _try_send.
697 if (r == 0) {
698 uint64_t left = ack_left;
699 if (left) {
11fdf7f2
TL
700 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
701 << " messages" << dendl;
801d1391
TL
702 auto ack_frame = AckFrame::Encode(in_seq);
703 if (append_frame(ack_frame)) {
704 ack_left -= left;
705 left = ack_left;
706 r = connection->_try_send(left);
707 } else {
708 r = -EILSEQ;
709 }
11fdf7f2
TL
710 } else if (is_queued()) {
711 r = connection->_try_send();
712 }
713 }
714 connection->write_lock.unlock();
715
716 connection->logger->tinc(l_msgr_running_send_time,
717 ceph::mono_clock::now() - start);
718 if (r < 0) {
719 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
720 connection->lock.lock();
721 fault();
722 connection->lock.unlock();
723 return;
724 }
725 } else {
494da23a 726 write_in_progress = false;
11fdf7f2
TL
727 connection->write_lock.unlock();
728 connection->lock.lock();
729 connection->write_lock.lock();
730 if (state == STANDBY && !connection->policy.server && is_queued()) {
731 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
732 if (server_cookie) { // only increment connect_seq if there is a session
733 connect_seq++;
734 }
735 connection->_connect();
736 } else if (connection->cs && state != NONE && state != CLOSED &&
737 state != START_CONNECT) {
738 r = connection->_try_send();
739 if (r < 0) {
740 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
741 connection->write_lock.unlock();
742 fault();
743 connection->lock.unlock();
744 return;
745 }
746 }
747 connection->write_lock.unlock();
748 connection->lock.unlock();
749 }
750}
751
752bool ProtocolV2::is_queued() {
753 return !out_queue.empty() || connection->is_queued();
754}
755
11fdf7f2
TL
756CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
757 rx_buffer_t &&buffer) {
758 const auto len = buffer->length();
759 const auto buf = buffer->c_str();
760 next.node = std::move(buffer);
761 ssize_t r = connection->read(len, buf,
762 [&next, this](char *buffer, int r) {
763 if (unlikely(pre_auth.enabled) && r >= 0) {
764 pre_auth.rxbuf.append(*next.node);
765 ceph_assert(!cct->_conf->ms_die_on_bug ||
adb31ebb 766 pre_auth.rxbuf.length() < 20000000);
11fdf7f2
TL
767 }
768 next.r = r;
769 run_continuation(next);
770 });
771 if (r <= 0) {
772 // error or done synchronously
f67539c2 773 if (unlikely(pre_auth.enabled) && r == 0) {
11fdf7f2
TL
774 pre_auth.rxbuf.append(*next.node);
775 ceph_assert(!cct->_conf->ms_die_on_bug ||
adb31ebb 776 pre_auth.rxbuf.length() < 20000000);
11fdf7f2
TL
777 }
778 next.r = r;
779 return &next;
780 }
781
782 return nullptr;
783}
784
785template <class F>
786CtPtr ProtocolV2::write(const std::string &desc,
787 CONTINUATION_TYPE<ProtocolV2> &next,
788 F &frame) {
801d1391
TL
789 ceph::bufferlist bl;
790 try {
f6b5b4d7 791 bl = frame.get_buffer(tx_frame_asm);
801d1391
TL
792 } catch (ceph::crypto::onwire::TxHandlerError &e) {
793 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
794 return _fault();
795 }
f6b5b4d7
TL
796
797 ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
798 << " bytes " << tx_frame_asm << dendl;
11fdf7f2
TL
799 return write(desc, next, bl);
800}
801
802CtPtr ProtocolV2::write(const std::string &desc,
803 CONTINUATION_TYPE<ProtocolV2> &next,
f67539c2 804 ceph::bufferlist &buffer) {
11fdf7f2
TL
805 if (unlikely(pre_auth.enabled)) {
806 pre_auth.txbuf.append(buffer);
807 ceph_assert(!cct->_conf->ms_die_on_bug ||
adb31ebb 808 pre_auth.txbuf.length() < 20000000);
11fdf7f2
TL
809 }
810
811 ssize_t r =
812 connection->write(buffer, [&next, desc, this](int r) {
813 if (r < 0) {
814 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
815 << " (" << cpp_strerror(r) << ")" << dendl;
816 connection->inject_delay();
817 _fault();
818 }
819 run_continuation(next);
820 });
821
822 if (r < 0) {
823 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
824 << " (" << cpp_strerror(r) << ")" << dendl;
825 return _fault();
826 } else if (r == 0) {
827 next.setParams();
828 return &next;
829 }
830
831 return nullptr;
832}
833
834CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
835 ldout(cct, 20) << __func__ << dendl;
836 bannerExchangeCallback = &callback;
837
f67539c2
TL
838 ceph::bufferlist banner_payload;
839 using ceph::encode;
11fdf7f2
TL
840 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
841 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
842
f67539c2 843 ceph::bufferlist bl;
11fdf7f2
TL
844 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
845 encode((uint16_t)banner_payload.length(), bl, 0);
846 bl.claim_append(banner_payload);
847
848 INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
849
850 return WRITE(bl, "banner", _wait_for_peer_banner);
851}
852
853CtPtr ProtocolV2::_wait_for_peer_banner() {
9f95a23c 854 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
11fdf7f2
TL
855 return READ(banner_len, _handle_peer_banner);
856}
857
858CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
859 ldout(cct, 20) << __func__ << " r=" << r << dendl;
860
861 if (r < 0) {
862 ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
863 << cpp_strerror(r) << ")" << dendl;
864 return _fault();
865 }
866
867 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
868
869 if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
870 if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
871 lderr(cct) << __func__ << " peer " << *connection->peer_addrs
872 << " is using msgr V1 protocol" << dendl;
873 return _fault();
874 }
875 ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
876 return _fault();
877 }
878
879 uint16_t payload_len;
f67539c2 880 ceph::bufferlist bl;
11fdf7f2 881 buffer->set_offset(banner_prefix_len);
9f95a23c 882 buffer->set_length(sizeof(ceph_le16));
11fdf7f2
TL
883 bl.push_back(std::move(buffer));
884 auto ti = bl.cbegin();
f67539c2 885 using ceph::decode;
11fdf7f2
TL
886 try {
887 decode(payload_len, ti);
f67539c2 888 } catch (const ceph::buffer::error &e) {
11fdf7f2
TL
889 lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
890 return _fault();
891 }
892
893 INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
894
895 return READ(payload_len, _handle_peer_banner_payload);
896}
897
898CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
899 ldout(cct, 20) << __func__ << " r=" << r << dendl;
900
901 if (r < 0) {
902 ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
903 << " (" << cpp_strerror(r) << ")" << dendl;
904 return _fault();
905 }
906
907 uint64_t peer_supported_features;
908 uint64_t peer_required_features;
909
f67539c2
TL
910 ceph::bufferlist bl;
911 using ceph::decode;
11fdf7f2
TL
912 bl.push_back(std::move(buffer));
913 auto ti = bl.cbegin();
914 try {
915 decode(peer_supported_features, ti);
916 decode(peer_required_features, ti);
f67539c2 917 } catch (const ceph::buffer::error &e) {
11fdf7f2
TL
918 lderr(cct) << __func__ << " decode banner payload failed " << dendl;
919 return _fault();
920 }
921
922 ldout(cct, 1) << __func__ << " supported=" << std::hex
923 << peer_supported_features << " required=" << std::hex
924 << peer_required_features << std::dec << dendl;
925
926 // Check feature bit compatibility
927
928 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
929 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
930
931 if ((required_features & peer_supported_features) != required_features) {
932 ldout(cct, 1) << __func__ << " peer does not support all required features"
933 << " required=" << std::hex << required_features
934 << " supported=" << std::hex << peer_supported_features
935 << std::dec << dendl;
936 stop();
937 connection->dispatch_queue->queue_reset(connection);
938 return nullptr;
939 }
940 if ((supported_features & peer_required_features) != peer_required_features) {
941 ldout(cct, 1) << __func__ << " we do not support all peer required features"
942 << " required=" << std::hex << peer_required_features
943 << " supported=" << supported_features << std::dec << dendl;
944 stop();
945 connection->dispatch_queue->queue_reset(connection);
946 return nullptr;
947 }
948
f6b5b4d7
TL
949 this->peer_supported_features = peer_supported_features;
950 if (peer_required_features == 0) {
11fdf7f2
TL
951 this->connection_features = msgr2_required;
952 }
953
f6b5b4d7
TL
954 // if the peer supports msgr2.1, switch to it
955 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
956 tx_frame_asm.set_is_rev1(is_rev1);
957 rx_frame_asm.set_is_rev1(is_rev1);
11fdf7f2
TL
958
959 if (state == BANNER_CONNECTING) {
960 state = HELLO_CONNECTING;
961 }
962 else {
963 ceph_assert(state == BANNER_ACCEPTING);
964 state = HELLO_ACCEPTING;
965 }
966
967 auto hello = HelloFrame::Encode(messenger->get_mytype(),
968 connection->target_addr);
969
970 INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
971
972 return WRITE(hello, "hello frame", read_frame);
973}
974
975CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
976{
977 ldout(cct, 20) << __func__
978 << " payload.length()=" << payload.length() << dendl;
979
980 if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
981 lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
982 return _fault();
983 }
984
985 auto hello = HelloFrame::Decode(payload);
986
987 ldout(cct, 5) << __func__ << " received hello:"
988 << " peer_type=" << (int)hello.entity_type()
989 << " peer_addr_for_me=" << hello.peer_addr() << dendl;
990
81eedcae
TL
991 sockaddr_storage ss;
992 socklen_t len = sizeof(ss);
993 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
994 ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
995 << " when talking to " << connection->target_addr << dendl;
996
11fdf7f2
TL
997 if (connection->get_peer_type() == -1) {
998 connection->set_peer_type(hello.entity_type());
999
1000 ceph_assert(state == HELLO_ACCEPTING);
1001 connection->policy = messenger->get_policy(hello.entity_type());
1002 ldout(cct, 10) << __func__ << " accept of host_type "
1003 << (int)hello.entity_type()
1004 << ", policy.lossy=" << connection->policy.lossy
1005 << " policy.server=" << connection->policy.server
1006 << " policy.standby=" << connection->policy.standby
1007 << " policy.resetcheck=" << connection->policy.resetcheck
1008 << dendl;
1009 } else {
1010 ceph_assert(state == HELLO_CONNECTING);
1011 if (connection->get_peer_type() != hello.entity_type()) {
1012 ldout(cct, 1) << __func__ << " connection peer type does not match what"
1013 << " peer advertises " << connection->get_peer_type()
1014 << " != " << (int)hello.entity_type() << dendl;
1015 stop();
1016 connection->dispatch_queue->queue_reset(connection);
1017 return nullptr;
1018 }
1019 }
1020
81eedcae
TL
1021 if (messenger->get_myaddrs().empty() ||
1022 messenger->get_myaddrs().front().is_blank_ip()) {
1023 entity_addr_t a;
1024 if (cct->_conf->ms_learn_addr_from_peer) {
1025 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
1026 << " says I am " << hello.peer_addr() << " (socket says "
1027 << (sockaddr*)&ss << ")" << dendl;
1028 a = hello.peer_addr();
1029 } else {
1030 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
1031 << " says I am " << (sockaddr*)&ss
1032 << " (peer says " << hello.peer_addr() << ")" << dendl;
1033 a.set_sockaddr((sockaddr *)&ss);
1034 }
1035 a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
1036 a.set_port(0);
1037 connection->lock.unlock();
1038 messenger->learned_addr(a);
1039 if (cct->_conf->ms_inject_internal_delays &&
1040 cct->_conf->ms_inject_socket_failures) {
1041 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1042 ldout(cct, 10) << __func__ << " sleep for "
1043 << cct->_conf->ms_inject_internal_delays << dendl;
1044 utime_t t;
1045 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1046 t.sleep();
1047 }
1048 }
1049 connection->lock.lock();
1050 if (state != HELLO_CONNECTING) {
1051 ldout(cct, 1) << __func__
1052 << " state changed while learned_addr, mark_down or "
1053 << " replacing must be happened just now" << dendl;
1054 return nullptr;
1055 }
1056 }
1057
1058
1059
11fdf7f2
TL
1060 CtPtr callback;
1061 callback = bannerExchangeCallback;
1062 bannerExchangeCallback = nullptr;
1063 ceph_assert(callback);
1064 return callback;
1065}
1066
1067CtPtr ProtocolV2::read_frame() {
1068 if (state == CLOSED) {
1069 return nullptr;
1070 }
1071
1072 ldout(cct, 20) << __func__ << dendl;
f6b5b4d7
TL
1073 rx_preamble.clear();
1074 rx_epilogue.clear();
1075 rx_segments_data.clear();
1076
1077 return READ(rx_frame_asm.get_preamble_onwire_len(),
1078 handle_read_frame_preamble_main);
11fdf7f2
TL
1079}
1080
1081CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
1082 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1083
1084 if (r < 0) {
f6b5b4d7 1085 ldout(cct, 1) << __func__ << " read frame preamble failed r=" << r
11fdf7f2
TL
1086 << " (" << cpp_strerror(r) << ")" << dendl;
1087 return _fault();
1088 }
1089
f6b5b4d7 1090 rx_preamble.push_back(std::move(buffer));
11fdf7f2
TL
1091
1092 ldout(cct, 30) << __func__ << " preamble\n";
f6b5b4d7 1093 rx_preamble.hexdump(*_dout);
11fdf7f2
TL
1094 *_dout << dendl;
1095
f6b5b4d7
TL
1096 try {
1097 next_tag = rx_frame_asm.disassemble_preamble(rx_preamble);
1098 } catch (FrameError& e) {
1099 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
1100 return _fault();
1101 } catch (ceph::crypto::onwire::MsgAuthError&) {
1102 ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
1103 return _fault();
1104 }
11fdf7f2 1105
f6b5b4d7
TL
1106 ldout(cct, 25) << __func__ << " disassembled preamble " << rx_frame_asm
1107 << dendl;
11fdf7f2 1108
f6b5b4d7 1109 if (session_stream_handlers.rx) {
11fdf7f2 1110 ldout(cct, 30) << __func__ << " preamble after decrypt\n";
f6b5b4d7 1111 rx_preamble.hexdump(*_dout);
11fdf7f2
TL
1112 *_dout << dendl;
1113 }
1114
11fdf7f2
TL
1115 // does it need throttle?
1116 if (next_tag == Tag::MESSAGE) {
1117 if (state != READY) {
1118 lderr(cct) << __func__ << " not in ready state!" << dendl;
1119 return _fault();
1120 }
a4b75251 1121 recv_stamp = ceph_clock_now();
11fdf7f2
TL
1122 state = THROTTLE_MESSAGE;
1123 return CONTINUE(throttle_message);
1124 } else {
1125 return read_frame_segment();
1126 }
1127}
1128
1129CtPtr ProtocolV2::handle_read_frame_dispatch() {
1130 ldout(cct, 10) << __func__
1131 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1132
1133 switch (next_tag) {
1134 case Tag::HELLO:
1135 case Tag::AUTH_REQUEST:
1136 case Tag::AUTH_BAD_METHOD:
1137 case Tag::AUTH_REPLY_MORE:
1138 case Tag::AUTH_REQUEST_MORE:
1139 case Tag::AUTH_DONE:
1140 case Tag::AUTH_SIGNATURE:
1141 case Tag::CLIENT_IDENT:
1142 case Tag::SERVER_IDENT:
1143 case Tag::IDENT_MISSING_FEATURES:
1144 case Tag::SESSION_RECONNECT:
1145 case Tag::SESSION_RESET:
1146 case Tag::SESSION_RETRY:
1147 case Tag::SESSION_RETRY_GLOBAL:
1148 case Tag::SESSION_RECONNECT_OK:
1149 case Tag::KEEPALIVE2:
1150 case Tag::KEEPALIVE2_ACK:
1151 case Tag::ACK:
1152 case Tag::WAIT:
20effc67
TL
1153 case Tag::COMPRESSION_REQUEST:
1154 case Tag::COMPRESSION_DONE:
11fdf7f2
TL
1155 return handle_frame_payload();
1156 case Tag::MESSAGE:
1157 return handle_message();
1158 default: {
1159 lderr(cct) << __func__
1160 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1161 << dendl;
1162 return _fault();
1163 }
1164 }
1165
1166 return nullptr;
1167}
1168
1169CtPtr ProtocolV2::read_frame_segment() {
f6b5b4d7
TL
1170 size_t seg_idx = rx_segments_data.size();
1171 ldout(cct, 20) << __func__ << " seg_idx=" << seg_idx << dendl;
1172 rx_segments_data.emplace_back();
1173
1174 uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
1175 if (onwire_len == 0) {
1176 return _handle_read_frame_segment();
1177 }
11fdf7f2 1178
11fdf7f2 1179 rx_buffer_t rx_buffer;
f6b5b4d7 1180 uint16_t align = rx_frame_asm.get_segment_align(seg_idx);
11fdf7f2 1181 try {
f67539c2 1182 rx_buffer = ceph::buffer::ptr_node::create(ceph::buffer::create_aligned(
f6b5b4d7 1183 onwire_len, align));
b3b6e05e 1184 } catch (const ceph::buffer::bad_alloc&) {
11fdf7f2 1185 // Catching because of potential issues with satisfying alignment.
f6b5b4d7
TL
1186 ldout(cct, 1) << __func__ << " can't allocate aligned rx_buffer"
1187 << " len=" << onwire_len
1188 << " align=" << align
1189 << dendl;
11fdf7f2
TL
1190 return _fault();
1191 }
1192
1193 return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1194}
1195
1196CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1197 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1198
1199 if (r < 0) {
1200 ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1201 << cpp_strerror(r) << ")" << dendl;
1202 return _fault();
1203 }
1204
11fdf7f2 1205 rx_segments_data.back().push_back(std::move(rx_buffer));
f6b5b4d7
TL
1206 return _handle_read_frame_segment();
1207}
11fdf7f2 1208
f6b5b4d7
TL
1209CtPtr ProtocolV2::_handle_read_frame_segment() {
1210 if (rx_segments_data.size() == rx_frame_asm.get_num_segments()) {
11fdf7f2 1211 // OK, all segments planned to read are read. Can go with epilogue.
f6b5b4d7
TL
1212 uint32_t epilogue_onwire_len = rx_frame_asm.get_epilogue_onwire_len();
1213 if (epilogue_onwire_len == 0) {
1214 return _handle_read_frame_epilogue_main();
1215 }
1216 return READ(epilogue_onwire_len, handle_read_frame_epilogue_main);
11fdf7f2 1217 }
f6b5b4d7
TL
1218 // TODO: for makeshift only. This will be more generic and throttled
1219 return read_frame_segment();
11fdf7f2
TL
1220}
1221
1222CtPtr ProtocolV2::handle_frame_payload() {
1223 ceph_assert(!rx_segments_data.empty());
1224 auto& payload = rx_segments_data.back();
1225
1226 ldout(cct, 30) << __func__ << "\n";
1227 payload.hexdump(*_dout);
1228 *_dout << dendl;
1229
1230 switch (next_tag) {
1231 case Tag::HELLO:
1232 return handle_hello(payload);
1233 case Tag::AUTH_REQUEST:
1234 return handle_auth_request(payload);
1235 case Tag::AUTH_BAD_METHOD:
1236 return handle_auth_bad_method(payload);
1237 case Tag::AUTH_REPLY_MORE:
1238 return handle_auth_reply_more(payload);
1239 case Tag::AUTH_REQUEST_MORE:
1240 return handle_auth_request_more(payload);
1241 case Tag::AUTH_DONE:
1242 return handle_auth_done(payload);
1243 case Tag::AUTH_SIGNATURE:
1244 return handle_auth_signature(payload);
1245 case Tag::CLIENT_IDENT:
1246 return handle_client_ident(payload);
1247 case Tag::SERVER_IDENT:
1248 return handle_server_ident(payload);
1249 case Tag::IDENT_MISSING_FEATURES:
1250 return handle_ident_missing_features(payload);
1251 case Tag::SESSION_RECONNECT:
1252 return handle_reconnect(payload);
1253 case Tag::SESSION_RESET:
1254 return handle_session_reset(payload);
1255 case Tag::SESSION_RETRY:
1256 return handle_session_retry(payload);
1257 case Tag::SESSION_RETRY_GLOBAL:
1258 return handle_session_retry_global(payload);
1259 case Tag::SESSION_RECONNECT_OK:
1260 return handle_reconnect_ok(payload);
1261 case Tag::KEEPALIVE2:
1262 return handle_keepalive2(payload);
1263 case Tag::KEEPALIVE2_ACK:
1264 return handle_keepalive2_ack(payload);
1265 case Tag::ACK:
1266 return handle_message_ack(payload);
1267 case Tag::WAIT:
1268 return handle_wait(payload);
20effc67
TL
1269 case Tag::COMPRESSION_REQUEST:
1270 return handle_compression_request(payload);
1271 case Tag::COMPRESSION_DONE:
1272 return handle_compression_done(payload);
11fdf7f2
TL
1273 default:
1274 ceph_abort();
1275 }
1276 return nullptr;
1277}
1278
1279CtPtr ProtocolV2::ready() {
1280 ldout(cct, 25) << __func__ << dendl;
1281
1282 reconnecting = false;
1283 replacing = false;
1284
1285 // make sure no pending tick timer
1286 if (connection->last_tick_id) {
1287 connection->center->delete_time_event(connection->last_tick_id);
1288 }
1289 connection->last_tick_id = connection->center->create_time_event(
1290 connection->inactive_timeout_us, connection->tick_handler);
1291
1292 {
1293 std::lock_guard<std::mutex> l(connection->write_lock);
1294 can_write = true;
1295 if (!out_queue.empty()) {
1296 connection->center->dispatch_event_external(connection->write_handler);
1297 }
1298 }
1299
1300 connection->maybe_start_delay_thread();
1301
1302 state = READY;
1303 ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1304 << std::hex << client_cookie << " server_cookie="
1305 << server_cookie << std::dec << " in_seq=" << in_seq
1306 << " out_seq=" << out_seq << dendl;
1307
1308 INTERCEPT(15);
1309
1310 return CONTINUE(read_frame);
1311}
1312
1313CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1314{
1315 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1316
1317 if (r < 0) {
f6b5b4d7
TL
1318 ldout(cct, 1) << __func__ << " read frame epilogue failed r=" << r
1319 << " (" << cpp_strerror(r) << ")" << dendl;
11fdf7f2
TL
1320 return _fault();
1321 }
1322
f6b5b4d7
TL
1323 rx_epilogue.push_back(std::move(buffer));
1324 return _handle_read_frame_epilogue_main();
1325}
11fdf7f2 1326
f6b5b4d7 1327CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
20effc67 1328 bool ok = false;
f6b5b4d7 1329 try {
20effc67 1330 ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
f6b5b4d7
TL
1331 } catch (FrameError& e) {
1332 ldout(cct, 1) << __func__ << " " << e.what() << dendl;
1333 return _fault();
1334 } catch (ceph::crypto::onwire::MsgAuthError&) {
1335 ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
1336 return _fault();
11fdf7f2
TL
1337 }
1338
1339 // we do have a mechanism that allows transmitter to start sending message
1340 // and abort after putting entire data field on wire. This will be used by
1341 // the kernel client to avoid unnecessary buffering.
20effc67 1342 if (!ok) {
11fdf7f2
TL
1343 reset_throttle();
1344 state = READY;
1345 return CONTINUE(read_frame);
11fdf7f2 1346 }
f6b5b4d7 1347 return handle_read_frame_dispatch();
11fdf7f2
TL
1348}
1349
1350CtPtr ProtocolV2::handle_message() {
1351 ldout(cct, 20) << __func__ << dendl;
1352 ceph_assert(state == THROTTLE_DONE);
1353
11fdf7f2 1354 const size_t cur_msg_size = get_current_msg_size();
f6b5b4d7 1355 auto msg_frame = MessageFrame::Decode(rx_segments_data);
11fdf7f2
TL
1356
1357 // XXX: paranoid copy just to avoid oops
1358 ceph_msg_header2 current_header = msg_frame.header();
1359
1360 ldout(cct, 5) << __func__
1361 << " got " << msg_frame.front_len()
1362 << " + " << msg_frame.middle_len()
1363 << " + " << msg_frame.data_len()
1364 << " byte message."
1365 << " envelope type=" << current_header.type
1366 << " src " << peer_name
1367 << " off " << current_header.data_off
1368 << dendl;
1369
1370 INTERCEPT(16);
1371 ceph_msg_header header{current_header.seq,
1372 current_header.tid,
1373 current_header.type,
1374 current_header.priority,
1375 current_header.version,
20effc67
TL
1376 ceph_le32(msg_frame.front_len()),
1377 ceph_le32(msg_frame.middle_len()),
1378 ceph_le32(msg_frame.data_len()),
11fdf7f2
TL
1379 current_header.data_off,
1380 peer_name,
1381 current_header.compat_version,
1382 current_header.reserved,
20effc67
TL
1383 ceph_le32(0)};
1384 ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
1385 ceph_le32(0), ceph_le64(0), current_header.flags};
11fdf7f2
TL
1386
1387 Message *message = decode_message(cct, 0, header, footer,
1388 msg_frame.front(),
1389 msg_frame.middle(),
1390 msg_frame.data(),
1391 connection);
1392 if (!message) {
1393 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1394 return _fault();
1395 } else {
1396 state = READ_MESSAGE_COMPLETE;
1397 }
1398
1399 INTERCEPT(17);
1400
1401 message->set_byte_throttler(connection->policy.throttler_bytes);
1402 message->set_message_throttler(connection->policy.throttler_messages);
1403
1404 // store reservation size in message, so we don't get confused
1405 // by messages entering the dispatch queue through other paths.
1406 message->set_dispatch_throttle_size(cur_msg_size);
1407
1408 message->set_recv_stamp(recv_stamp);
1409 message->set_throttle_stamp(throttle_stamp);
1410 message->set_recv_complete_stamp(ceph_clock_now());
1411
1412 // check received seq#. if it is old, drop the message.
1413 // note that incoming messages may skip ahead. this is convenient for the
1414 // client side queueing because messages can't be renumbered, but the (kernel)
1415 // client will occasionally pull a message out of the sent queue to send
1416 // elsewhere. in that case it doesn't matter if we "got" it or not.
1417 uint64_t cur_seq = in_seq;
1418 if (message->get_seq() <= cur_seq) {
1419 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1420 << " <= " << cur_seq << " " << message << " " << *message
1421 << ", discarding" << dendl;
1422 message->put();
1423 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1424 cct->_conf->ms_die_on_old_message) {
1425 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1426 }
1427 return nullptr;
1428 }
1429 if (message->get_seq() > cur_seq + 1) {
1430 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
1431 << cur_seq << " to " << message->get_seq() << dendl;
1432 if (cct->_conf->ms_die_on_skipped_message) {
1433 ceph_assert(0 == "skipped incoming seq");
1434 }
1435 }
1436
9f95a23c 1437#if defined(WITH_EVENTTRACE)
11fdf7f2
TL
1438 if (message->get_type() == CEPH_MSG_OSD_OP ||
1439 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1440 utime_t ltt_processed_stamp = ceph_clock_now();
1441 double usecs_elapsed =
20effc67 1442 ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
11fdf7f2
TL
1443 ostringstream buf;
1444 if (message->get_type() == CEPH_MSG_OSD_OP)
1445 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1446 false);
1447 else
1448 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1449 false);
1450 }
1451#endif
1452
1453 // note last received message.
1454 in_seq = message->get_seq();
1455 ldout(cct, 5) << __func__ << " received message m=" << message
1456 << " seq=" << message->get_seq()
1457 << " from=" << message->get_source() << " type=" << header.type
1458 << " " << *message << dendl;
1459
1460 bool need_dispatch_writer = false;
1461 if (!connection->policy.lossy) {
1462 ack_left++;
1463 need_dispatch_writer = true;
1464 }
1465
1466 state = READY;
1467
9f95a23c
TL
1468 ceph::mono_time fast_dispatch_time;
1469
1470 if (connection->is_blackhole()) {
1471 ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1472 message->put();
1473 goto out;
1474 }
1475
11fdf7f2 1476 connection->logger->inc(l_msgr_recv_messages);
f6b5b4d7
TL
1477 connection->logger->inc(l_msgr_recv_bytes,
1478 rx_frame_asm.get_frame_onwire_len());
11fdf7f2
TL
1479
1480 messenger->ms_fast_preprocess(message);
9f95a23c 1481 fast_dispatch_time = ceph::mono_clock::now();
11fdf7f2 1482 connection->logger->tinc(l_msgr_running_recv_time,
9f95a23c 1483 fast_dispatch_time - connection->recv_start_time);
11fdf7f2
TL
1484 if (connection->delay_state) {
1485 double delay_period = 0;
1486 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1487 delay_period =
1488 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1489 ldout(cct, 1) << "queue_received will delay after "
1490 << (ceph_clock_now() + delay_period) << " on " << message
1491 << " " << *message << dendl;
1492 }
1493 connection->delay_state->queue(delay_period, message);
1494 } else if (messenger->ms_can_fast_dispatch(message)) {
1495 connection->lock.unlock();
1496 connection->dispatch_queue->fast_dispatch(message);
1497 connection->recv_start_time = ceph::mono_clock::now();
1498 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1499 connection->recv_start_time - fast_dispatch_time);
1500 connection->lock.lock();
9f95a23c
TL
1501 // we might have been reused by another connection
1502 // let's check if that is the case
1503 if (state != READY) {
1504 // yes, that was the case, let's do nothing
1505 return nullptr;
1506 }
11fdf7f2
TL
1507 } else {
1508 connection->dispatch_queue->enqueue(message, message->get_priority(),
1509 connection->conn_id);
1510 }
1511
1512 handle_message_ack(current_header.ack_seq);
1513
9f95a23c 1514 out:
11fdf7f2
TL
1515 if (need_dispatch_writer && connection->is_connected()) {
1516 connection->center->dispatch_event_external(connection->write_handler);
1517 }
1518
1519 return CONTINUE(read_frame);
1520}
1521
1522
1523CtPtr ProtocolV2::throttle_message() {
1524 ldout(cct, 20) << __func__ << dendl;
1525
1526 if (connection->policy.throttler_messages) {
1527 ldout(cct, 10) << __func__ << " wants " << 1
1528 << " message from policy throttler "
1529 << connection->policy.throttler_messages->get_current()
1530 << "/" << connection->policy.throttler_messages->get_max()
1531 << dendl;
1532 if (!connection->policy.throttler_messages->get_or_fail()) {
2a845540 1533 ldout(cct, 1) << __func__ << " wants 1 message from policy throttle "
11fdf7f2
TL
1534 << connection->policy.throttler_messages->get_current()
1535 << "/" << connection->policy.throttler_messages->get_max()
1536 << " failed, just wait." << dendl;
1537 // following thread pool deal with th full message queue isn't a
1538 // short time, so we can wait a ms.
1539 if (connection->register_time_events.empty()) {
1540 connection->register_time_events.insert(
1541 connection->center->create_time_event(1000,
1542 connection->wakeup_handler));
1543 }
1544 return nullptr;
1545 }
1546 }
1547
1548 state = THROTTLE_BYTES;
1549 return CONTINUE(throttle_bytes);
1550}
1551
1552CtPtr ProtocolV2::throttle_bytes() {
1553 ldout(cct, 20) << __func__ << dendl;
1554
1555 const size_t cur_msg_size = get_current_msg_size();
1556 if (cur_msg_size) {
1557 if (connection->policy.throttler_bytes) {
1558 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1559 << " bytes from policy throttler "
1560 << connection->policy.throttler_bytes->get_current() << "/"
1561 << connection->policy.throttler_bytes->get_max() << dendl;
1562 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
2a845540 1563 ldout(cct, 1) << __func__ << " wants " << cur_msg_size
11fdf7f2
TL
1564 << " bytes from policy throttler "
1565 << connection->policy.throttler_bytes->get_current()
1566 << "/" << connection->policy.throttler_bytes->get_max()
1567 << " failed, just wait." << dendl;
1568 // following thread pool deal with th full message queue isn't a
1569 // short time, so we can wait a ms.
1570 if (connection->register_time_events.empty()) {
1571 connection->register_time_events.insert(
1572 connection->center->create_time_event(
1573 1000, connection->wakeup_handler));
1574 }
1575 return nullptr;
1576 }
1577 }
1578 }
1579
1580 state = THROTTLE_DISPATCH_QUEUE;
1581 return CONTINUE(throttle_dispatch_queue);
1582}
1583
1584CtPtr ProtocolV2::throttle_dispatch_queue() {
1585 ldout(cct, 20) << __func__ << dendl;
1586
1587 const size_t cur_msg_size = get_current_msg_size();
1588 if (cur_msg_size) {
1589 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1590 cur_msg_size)) {
2a845540 1591 ldout(cct, 1)
11fdf7f2
TL
1592 << __func__ << " wants " << cur_msg_size
1593 << " bytes from dispatch throttle "
1594 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1595 << connection->dispatch_queue->dispatch_throttler.get_max()
1596 << " failed, just wait." << dendl;
1597 // following thread pool deal with th full message queue isn't a
1598 // short time, so we can wait a ms.
1599 if (connection->register_time_events.empty()) {
1600 connection->register_time_events.insert(
1601 connection->center->create_time_event(1000,
1602 connection->wakeup_handler));
1603 }
1604 return nullptr;
1605 }
1606 }
1607
1608 throttle_stamp = ceph_clock_now();
1609 state = THROTTLE_DONE;
1610
1611 return read_frame_segment();
1612}
1613
1614CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1615{
1616 ldout(cct, 20) << __func__
1617 << " payload.length()=" << payload.length() << dendl;
1618
1619 if (state != READY) {
1620 lderr(cct) << __func__ << " not in ready state!" << dendl;
1621 return _fault();
1622 }
1623
1624 auto keepalive_frame = KeepAliveFrame::Decode(payload);
1625
1626 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1627
1628 connection->write_lock.lock();
801d1391
TL
1629 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(keepalive_frame.timestamp());
1630 if (!append_frame(keepalive_ack_frame)) {
1631 connection->write_lock.unlock();
1632 return _fault();
1633 }
11fdf7f2
TL
1634 connection->write_lock.unlock();
1635
1636 ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1637 << keepalive_frame.timestamp() << dendl;
1638 connection->set_last_keepalive(ceph_clock_now());
1639
1640 if (is_connected()) {
1641 connection->center->dispatch_event_external(connection->write_handler);
1642 }
1643
1644 return CONTINUE(read_frame);
1645}
1646
1647CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1648{
1649 ldout(cct, 20) << __func__
1650 << " payload.length()=" << payload.length() << dendl;
1651
1652 if (state != READY) {
1653 lderr(cct) << __func__ << " not in ready state!" << dendl;
1654 return _fault();
1655 }
1656
1657 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1658 connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1659 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1660
1661 return CONTINUE(read_frame);
1662}
1663
1664CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1665{
1666 ldout(cct, 20) << __func__
1667 << " payload.length()=" << payload.length() << dendl;
1668
1669 if (state != READY) {
1670 lderr(cct) << __func__ << " not in ready state!" << dendl;
1671 return _fault();
1672 }
1673
1674 auto ack = AckFrame::Decode(payload);
1675 handle_message_ack(ack.seq());
1676 return CONTINUE(read_frame);
1677}
1678
1679/* Client Protocol Methods */
1680
1681CtPtr ProtocolV2::start_client_banner_exchange() {
1682 ldout(cct, 20) << __func__ << dendl;
1683
1684 INTERCEPT(1);
1685
1686 state = BANNER_CONNECTING;
1687
1688 global_seq = messenger->get_global_seq();
1689
1690 return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1691}
1692
1693CtPtr ProtocolV2::post_client_banner_exchange() {
1694 ldout(cct, 20) << __func__ << dendl;
1695
1696 state = AUTH_CONNECTING;
1697
1698 return send_auth_request();
1699}
1700
1701CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
9f95a23c 1702 ceph_assert(messenger->auth_client);
11fdf7f2
TL
1703 ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1704 << " auth_client " << messenger->auth_client << dendl;
11fdf7f2 1705
f67539c2
TL
1706 ceph::bufferlist bl;
1707 std::vector<uint32_t> preferred_modes;
11fdf7f2
TL
1708 auto am = auth_meta;
1709 connection->lock.unlock();
1710 int r = messenger->auth_client->get_auth_request(
1711 connection, am.get(),
1712 &am->auth_method, &preferred_modes, &bl);
1713 connection->lock.lock();
1714 if (state != AUTH_CONNECTING) {
1715 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1716 return _fault();
1717 }
1718 if (r < 0) {
1719 ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1720 << dendl;
1721 stop();
1722 connection->dispatch_queue->queue_reset(connection);
1723 return nullptr;
1724 }
1725
1726 INTERCEPT(9);
1727
1728 auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1729 bl);
1730 return WRITE(frame, "auth request", read_frame);
1731}
1732
1733CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1734 ldout(cct, 20) << __func__
1735 << " payload.length()=" << payload.length() << dendl;
1736
1737 if (state != AUTH_CONNECTING) {
1738 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1739 return _fault();
1740 }
1741
1742 auto bad_method = AuthBadMethodFrame::Decode(payload);
1743 ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1744 << " result " << cpp_strerror(bad_method.result())
1745 << ", allowed methods=" << bad_method.allowed_methods()
1746 << ", allowed modes=" << bad_method.allowed_modes()
1747 << dendl;
1748 ceph_assert(messenger->auth_client);
1749 auto am = auth_meta;
1750 connection->lock.unlock();
1751 int r = messenger->auth_client->handle_auth_bad_method(
1752 connection,
1753 am.get(),
1754 bad_method.method(), bad_method.result(),
1755 bad_method.allowed_methods(),
1756 bad_method.allowed_modes());
1757 connection->lock.lock();
1758 if (state != AUTH_CONNECTING || r < 0) {
1759 return _fault();
1760 }
1761 return send_auth_request(bad_method.allowed_methods());
1762}
1763
1764CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1765{
1766 ldout(cct, 20) << __func__
1767 << " payload.length()=" << payload.length() << dendl;
1768
1769 if (state != AUTH_CONNECTING) {
1770 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1771 return _fault();
1772 }
1773
1774 auto auth_more = AuthReplyMoreFrame::Decode(payload);
1775 ldout(cct, 5) << __func__
1776 << " auth reply more len=" << auth_more.auth_payload().length()
1777 << dendl;
1778 ceph_assert(messenger->auth_client);
1779 ceph::bufferlist reply;
1780 auto am = auth_meta;
1781 connection->lock.unlock();
1782 int r = messenger->auth_client->handle_auth_reply_more(
1783 connection, am.get(), auth_more.auth_payload(), &reply);
1784 connection->lock.lock();
1785 if (state != AUTH_CONNECTING) {
1786 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1787 return _fault();
1788 }
1789 if (r < 0) {
1790 lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1791 << r << dendl;
1792 return _fault();
1793 }
1794 auto more_reply = AuthRequestMoreFrame::Encode(reply);
1795 return WRITE(more_reply, "auth request more", read_frame);
1796}
1797
1798CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1799{
1800 ldout(cct, 20) << __func__
1801 << " payload.length()=" << payload.length() << dendl;
1802
1803 if (state != AUTH_CONNECTING) {
1804 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1805 return _fault();
1806 }
1807
1808 auto auth_done = AuthDoneFrame::Decode(payload);
1809
1810 ceph_assert(messenger->auth_client);
1811 auto am = auth_meta;
1812 connection->lock.unlock();
1813 int r = messenger->auth_client->handle_auth_done(
1814 connection,
1815 am.get(),
1816 auth_done.global_id(),
1817 auth_done.con_mode(),
1818 auth_done.auth_payload(),
1819 &am->session_key,
1820 &am->connection_secret);
1821 connection->lock.lock();
1822 if (state != AUTH_CONNECTING) {
1823 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1824 return _fault();
1825 }
1826 if (r < 0) {
1827 return _fault();
1828 }
1829 auth_meta->con_mode = auth_done.con_mode();
f6b5b4d7
TL
1830 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
1831 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
1832 cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/false);
11fdf7f2
TL
1833
1834 state = AUTH_CONNECTING_SIGN;
1835
1836 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1837 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1838 auto sig_frame = AuthSignatureFrame::Encode(sig);
1839 pre_auth.enabled = false;
1840 pre_auth.rxbuf.clear();
1841 return WRITE(sig_frame, "auth signature", read_frame);
1842}
1843
1844CtPtr ProtocolV2::finish_client_auth() {
20effc67
TL
1845 if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
1846 return send_compression_request();
1847 }
1848
1849 return start_session_connect();
1850}
1851
1852CtPtr ProtocolV2::finish_server_auth() {
1853 // server had sent AuthDone and client responded with correct pre-auth
1854 // signature.
1855 // We can start conditioanl msgr protocol
1856 if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
1857 state = COMPRESSION_ACCEPTING;
1858 } else {
1859 // No msgr protocol features to process
1860 // we can start accepting new sessions/reconnects.
1861 state = SESSION_ACCEPTING;
1862 }
1863
1864 return CONTINUE(read_frame);
1865}
1866
1867CtPtr ProtocolV2::start_session_connect() {
11fdf7f2
TL
1868 if (!server_cookie) {
1869 ceph_assert(connect_seq == 0);
1870 state = SESSION_CONNECTING;
1871 return send_client_ident();
1872 } else { // reconnecting to previous session
1873 state = SESSION_RECONNECTING;
1874 ceph_assert(connect_seq > 0);
1875 return send_reconnect();
1876 }
1877}
1878
1879CtPtr ProtocolV2::send_client_ident() {
1880 ldout(cct, 20) << __func__ << dendl;
1881
1882 if (!connection->policy.lossy && !client_cookie) {
1883 client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1884 }
1885
1886 uint64_t flags = 0;
1887 if (connection->policy.lossy) {
1888 flags |= CEPH_MSG_CONNECT_LOSSY;
1889 }
1890
11fdf7f2
TL
1891 auto client_ident = ClientIdentFrame::Encode(
1892 messenger->get_myaddrs(),
1893 connection->target_addr,
1894 messenger->get_myname().num(),
1895 global_seq,
1896 connection->policy.features_supported,
1897 connection->policy.features_required | msgr2_required,
1898 flags,
1899 client_cookie);
1900
1901 ldout(cct, 5) << __func__ << " sending identification: "
1902 << "addrs=" << messenger->get_myaddrs()
1903 << " target=" << connection->target_addr
1904 << " gid=" << messenger->get_myname().num()
1905 << " global_seq=" << global_seq
1906 << " features_supported=" << std::hex
1907 << connection->policy.features_supported
1908 << " features_required="
1909 << (connection->policy.features_required | msgr2_required)
1910 << " flags=" << flags
1911 << " cookie=" << client_cookie << std::dec << dendl;
1912
1913 INTERCEPT(11);
1914
1915 return WRITE(client_ident, "client ident", read_frame);
1916}
1917
1918CtPtr ProtocolV2::send_reconnect() {
1919 ldout(cct, 20) << __func__ << dendl;
1920
1921 auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1922 client_cookie,
1923 server_cookie,
1924 global_seq,
1925 connect_seq,
1926 in_seq);
1927
1928 ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1929 << std::hex << client_cookie << " server_cookie="
1930 << server_cookie << std::dec
1931 << " gs=" << global_seq << " cs=" << connect_seq
1932 << " ms=" << in_seq << dendl;
1933
1934 INTERCEPT(13);
1935
1936 return WRITE(reconnect, "reconnect", read_frame);
1937}
1938
1939CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1940{
1941 ldout(cct, 20) << __func__
1942 << " payload.length()=" << payload.length() << dendl;
1943
1944 if (state != SESSION_CONNECTING) {
1945 lderr(cct) << __func__ << " not in session connect state!" << dendl;
1946 return _fault();
1947 }
1948
1949 auto ident_missing =
1950 IdentMissingFeaturesFrame::Decode(payload);
1951 lderr(cct) << __func__
1952 << " client does not support all server features: " << std::hex
1953 << ident_missing.features() << std::dec << dendl;
1954
1955 return _fault();
1956}
1957
1958CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
1959{
1960 ldout(cct, 20) << __func__
1961 << " payload.length()=" << payload.length() << dendl;
1962
1963 if (state != SESSION_RECONNECTING) {
1964 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1965 return _fault();
1966 }
1967
1968 auto reset = ResetFrame::Decode(payload);
1969
1970 ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
1971 << dendl;
1972 if (reset.full()) {
1973 reset_session();
1974 } else {
1975 server_cookie = 0;
1976 connect_seq = 0;
1977 in_seq = 0;
1978 }
1979
1980 state = SESSION_CONNECTING;
1981 return send_client_ident();
1982}
1983
1984CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
1985{
1986 ldout(cct, 20) << __func__
1987 << " payload.length()=" << payload.length() << dendl;
1988
1989 if (state != SESSION_RECONNECTING) {
1990 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1991 return _fault();
1992 }
1993
1994 auto retry = RetryFrame::Decode(payload);
1995 connect_seq = retry.connect_seq() + 1;
1996
1997 ldout(cct, 1) << __func__
1998 << " received session retry connect_seq=" << retry.connect_seq()
1999 << ", inc to cs=" << connect_seq << dendl;
2000
2001 return send_reconnect();
2002}
2003
2004CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
2005{
2006 ldout(cct, 20) << __func__
2007 << " payload.length()=" << payload.length() << dendl;
2008
2009 if (state != SESSION_RECONNECTING) {
2010 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2011 return _fault();
2012 }
2013
2014 auto retry = RetryGlobalFrame::Decode(payload);
2015 global_seq = messenger->get_global_seq(retry.global_seq());
2016
2017 ldout(cct, 1) << __func__ << " received session retry global global_seq="
2018 << retry.global_seq() << ", choose new gs=" << global_seq
2019 << dendl;
2020
2021 return send_reconnect();
2022}
2023
2024CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
2025 ldout(cct, 20) << __func__
2026 << " received WAIT (connection race)"
2027 << " payload.length()=" << payload.length()
2028 << dendl;
2029
2030 if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
2031 lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
2032 return _fault();
2033 }
2034
2035 state = WAIT;
2036 WaitFrame::Decode(payload);
2037 return _fault();
2038}
2039
2040CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2041{
2042 ldout(cct, 20) << __func__
2043 << " payload.length()=" << payload.length() << dendl;
2044
2045 if (state != SESSION_RECONNECTING) {
2046 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2047 return _fault();
2048 }
2049
2050 auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2051 ldout(cct, 5) << __func__
2052 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2053 << dendl;
2054
2055 out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2056
2057 backoff = utime_t();
2058 ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2059 << ", lossy = " << connection->policy.lossy << ", features "
2060 << connection->get_features() << dendl;
2061
2062 if (connection->delay_state) {
2063 ceph_assert(connection->delay_state->ready());
2064 }
2065
2066 connection->dispatch_queue->queue_connect(connection);
2067 messenger->ms_deliver_handle_fast_connect(connection);
2068
2069 return ready();
2070}
2071
2072CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2073{
2074 ldout(cct, 20) << __func__
2075 << " payload.length()=" << payload.length() << dendl;
2076
2077 if (state != SESSION_CONNECTING) {
2078 lderr(cct) << __func__ << " not in session connect state!" << dendl;
2079 return _fault();
2080 }
2081
2082 auto server_ident = ServerIdentFrame::Decode(payload);
2083 ldout(cct, 5) << __func__ << " received server identification:"
2084 << " addrs=" << server_ident.addrs()
2085 << " gid=" << server_ident.gid()
2086 << " global_seq=" << server_ident.global_seq()
2087 << " features_supported=" << std::hex
2088 << server_ident.supported_features()
2089 << " features_required=" << server_ident.required_features()
f6b5b4d7
TL
2090 << " flags=" << server_ident.flags()
2091 << " cookie=" << server_ident.cookie() << std::dec << dendl;
11fdf7f2
TL
2092
2093 // is this who we intended to talk to?
2094 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2095 // of mon_host or something.
2096 if (!server_ident.addrs().contains(connection->target_addr)) {
2097 ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2098 << ", does not include " << connection->target_addr << dendl;
2099 return _fault();
2100 }
2101
2102 server_cookie = server_ident.cookie();
2103
2104 connection->set_peer_addrs(server_ident.addrs());
2105 peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2106 connection->set_features(server_ident.supported_features() &
2107 connection->policy.features_supported);
2108 peer_global_seq = server_ident.global_seq();
2109
2110 connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2111
2112 backoff = utime_t();
2113 ldout(cct, 10) << __func__ << " connect success " << connect_seq
2114 << ", lossy = " << connection->policy.lossy << ", features "
2115 << connection->get_features() << dendl;
2116
2117 if (connection->delay_state) {
2118 ceph_assert(connection->delay_state->ready());
2119 }
2120
2121 connection->dispatch_queue->queue_connect(connection);
2122 messenger->ms_deliver_handle_fast_connect(connection);
2123
2124 return ready();
2125}
2126
20effc67
TL
2127CtPtr ProtocolV2::send_compression_request() {
2128 state = COMPRESSION_CONNECTING;
2129
2130 const entity_type_t peer_type = connection->get_peer_type();
2131 comp_meta.con_mode =
2132 static_cast<Compressor::CompressionMode>(
2133 messenger->comp_registry.get_mode(peer_type, auth_meta->is_mode_secure()));
2134 const auto preferred_methods = messenger->comp_registry.get_methods(peer_type);
2135 auto comp_req_frame = CompressionRequestFrame::Encode(comp_meta.is_compress(), preferred_methods);
2136
2137 INTERCEPT(19);
2138 return WRITE(comp_req_frame, "compression request", read_frame);
2139}
2140
2141CtPtr ProtocolV2::handle_compression_done(ceph::bufferlist &payload) {
2142 if (state != COMPRESSION_CONNECTING) {
2143 lderr(cct) << __func__ << " state changed!" << dendl;
2144 return _fault();
2145 }
2146
2147 auto response = CompressionDoneFrame::Decode(payload);
2148 ldout(cct, 10) << __func__ << " CompressionDoneFrame(is_compress=" << response.is_compress()
2149 << ", method=" << response.method() << ")" << dendl;
2150
2151 comp_meta.con_method = static_cast<Compressor::CompressionAlgorithm>(response.method());
2152 if (comp_meta.is_compress() != response.is_compress()) {
2153 comp_meta.con_mode = Compressor::COMP_NONE;
2154 }
2155 session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
2156 cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
2157
2158 return start_session_connect();
2159}
2160
11fdf7f2
TL
2161/* Server Protocol Methods */
2162
2163CtPtr ProtocolV2::start_server_banner_exchange() {
2164 ldout(cct, 20) << __func__ << dendl;
2165
2166 INTERCEPT(2);
2167
2168 state = BANNER_ACCEPTING;
2169
2170 return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2171}
2172
2173CtPtr ProtocolV2::post_server_banner_exchange() {
2174 ldout(cct, 20) << __func__ << dendl;
2175
2176 state = AUTH_ACCEPTING;
2177
2178 return CONTINUE(read_frame);
2179}
2180
2181CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2182 ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2183 << dendl;
2184
2185 if (state != AUTH_ACCEPTING) {
2186 lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2187 return _fault();
2188 }
2189
2190 auto request = AuthRequestFrame::Decode(payload);
2191 ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2192 << ", preferred_modes=" << request.preferred_modes()
2193 << ", payload_len=" << request.auth_payload().length() << ")"
2194 << dendl;
2195 auth_meta->auth_method = request.method();
2196 auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2197 connection->get_peer_type(), auth_meta->auth_method,
2198 request.preferred_modes());
2199 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2200 return _auth_bad_method(-EOPNOTSUPP);
2201 }
2202 return _handle_auth_request(request.auth_payload(), false);
2203}
2204
2205CtPtr ProtocolV2::_auth_bad_method(int r)
2206{
2207 ceph_assert(r < 0);
2208 std::vector<uint32_t> allowed_methods;
2209 std::vector<uint32_t> allowed_modes;
2210 messenger->auth_server->get_supported_auth_methods(
2211 connection->get_peer_type(), &allowed_methods, &allowed_modes);
2212 ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2213 << " r " << cpp_strerror(r)
2214 << ", allowed_methods " << allowed_methods
2215 << ", allowed_modes " << allowed_modes
2216 << dendl;
2217 auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2218 allowed_methods, allowed_modes);
2219 return WRITE(bad_method, "bad auth method", read_frame);
2220}
2221
f67539c2 2222CtPtr ProtocolV2::_handle_auth_request(ceph::bufferlist& auth_payload, bool more)
11fdf7f2
TL
2223{
2224 if (!messenger->auth_server) {
2225 return _fault();
2226 }
f67539c2 2227 ceph::bufferlist reply;
11fdf7f2
TL
2228 auto am = auth_meta;
2229 connection->lock.unlock();
2230 int r = messenger->auth_server->handle_auth_request(
2231 connection, am.get(),
2232 more, am->auth_method, auth_payload,
2233 &reply);
2234 connection->lock.lock();
2235 if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2236 ldout(cct, 1) << __func__
2237 << " state changed while accept, it must be mark_down"
2238 << dendl;
2239 ceph_assert(state == CLOSED);
2240 return _fault();
2241 }
2242 if (r == 1) {
2243 INTERCEPT(10);
2244 state = AUTH_ACCEPTING_SIGN;
2245
2246 auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2247 auth_meta->con_mode,
2248 reply);
2249 return WRITE(auth_done, "auth done", finish_auth);
2250 } else if (r == 0) {
2251 state = AUTH_ACCEPTING_MORE;
2252
2253 auto more = AuthReplyMoreFrame::Encode(reply);
2254 return WRITE(more, "auth reply more", read_frame);
2255 } else if (r == -EBUSY) {
2256 // kick the client and maybe they'll come back later
2257 return _fault();
2258 } else {
2259 return _auth_bad_method(r);
2260 }
2261}
2262
2263CtPtr ProtocolV2::finish_auth()
2264{
2265 ceph_assert(auth_meta);
2266 // TODO: having a possibility to check whether we're server or client could
2267 // allow reusing finish_auth().
f6b5b4d7
TL
2268 bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
2269 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
2270 cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/true);
11fdf7f2
TL
2271
2272 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2273 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2274 auto sig_frame = AuthSignatureFrame::Encode(sig);
2275 pre_auth.enabled = false;
2276 pre_auth.rxbuf.clear();
2277 return WRITE(sig_frame, "auth signature", read_frame);
2278}
2279
2280CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2281{
2282 ldout(cct, 20) << __func__
2283 << " payload.length()=" << payload.length() << dendl;
2284
2285 if (state != AUTH_ACCEPTING_MORE) {
2286 lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2287 return _fault();
2288 }
2289
2290 auto auth_more = AuthRequestMoreFrame::Decode(payload);
2291 return _handle_auth_request(auth_more.auth_payload(), true);
2292}
2293
2294CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2295{
2296 ldout(cct, 20) << __func__
2297 << " payload.length()=" << payload.length() << dendl;
2298
2299 if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2300 lderr(cct) << __func__
2301 << " pre-auth verification signature seen in wrong state!"
2302 << dendl;
2303 return _fault();
2304 }
2305
2306 auto sig_frame = AuthSignatureFrame::Decode(payload);
2307
2308 const auto actual_tx_sig = auth_meta->session_key.empty() ?
2309 sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2310 if (sig_frame.signature() != actual_tx_sig) {
2311 ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2312 << " actual_tx_sig=" << actual_tx_sig
2313 << " sig_frame.signature()=" << sig_frame.signature()
2314 << dendl;
2315 return _fault();
2316 } else {
2317 ldout(cct, 20) << __func__ << " pre-auth signature success"
2318 << " sig_frame.signature()=" << sig_frame.signature()
2319 << dendl;
2320 pre_auth.txbuf.clear();
2321 }
2322
2323 if (state == AUTH_ACCEPTING_SIGN) {
20effc67
TL
2324 // this happened on server side
2325 return finish_server_auth();
11fdf7f2
TL
2326 } else if (state == AUTH_CONNECTING_SIGN) {
2327 // this happened at client side
2328 return finish_client_auth();
2329 } else {
9f95a23c 2330 ceph_abort("state corruption");
11fdf7f2
TL
2331 }
2332}
2333
2334CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2335{
2336 ldout(cct, 20) << __func__
2337 << " payload.length()=" << payload.length() << dendl;
2338
2339 if (state != SESSION_ACCEPTING) {
2340 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2341 return _fault();
2342 }
2343
2344 auto client_ident = ClientIdentFrame::Decode(payload);
2345
2346 ldout(cct, 5) << __func__ << " received client identification:"
2347 << " addrs=" << client_ident.addrs()
2348 << " target=" << client_ident.target_addr()
2349 << " gid=" << client_ident.gid()
2350 << " global_seq=" << client_ident.global_seq()
2351 << " features_supported=" << std::hex
2352 << client_ident.supported_features()
2353 << " features_required=" << client_ident.required_features()
2354 << " flags=" << client_ident.flags()
2355 << " cookie=" << client_ident.cookie() << std::dec << dendl;
2356
2357 if (client_ident.addrs().empty() ||
2358 client_ident.addrs().front() == entity_addr_t()) {
2359 ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2360 return _fault(); // a v2 peer should never do this
2361 }
2362 if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2363 ldout(cct,5) << __func__ << " peer is trying to reach "
2364 << client_ident.target_addr()
2365 << " which is not us (" << messenger->get_myaddrs() << ")"
2366 << dendl;
2367 return _fault();
2368 }
2369
2370 connection->set_peer_addrs(client_ident.addrs());
2371 connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2372
2373 peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2374 connection->set_peer_id(client_ident.gid());
2375
2376 client_cookie = client_ident.cookie();
2377
2378 uint64_t feat_missing =
2379 (connection->policy.features_required | msgr2_required) &
2380 ~(uint64_t)client_ident.supported_features();
2381 if (feat_missing) {
2382 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2383 << feat_missing << std::dec << dendl;
2384 auto ident_missing_features =
2385 IdentMissingFeaturesFrame::Encode(feat_missing);
2386
2387 return WRITE(ident_missing_features, "ident missing features", read_frame);
2388 }
2389
2390 connection_features =
2391 client_ident.supported_features() & connection->policy.features_supported;
2392
2393 peer_global_seq = client_ident.global_seq();
2394
9f95a23c
TL
2395 if (connection->policy.server &&
2396 connection->policy.lossy &&
2397 !connection->policy.register_lossy_clients) {
2398 // incoming lossy client, no need to register this connection
2399 } else {
2400 // Looks good so far, let's check if there is already an existing connection
2401 // to this peer.
2402 connection->lock.unlock();
2403 AsyncConnectionRef existing = messenger->lookup_conn(
2404 *connection->peer_addrs);
2405
2406 if (existing &&
2407 existing->protocol->proto_type != 2) {
2408 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2409 << existing->protocol.get() << " version is "
2410 << existing->protocol->proto_type << ", marking down"
2411 << dendl;
2412 existing->mark_down();
2413 existing = nullptr;
2414 }
11fdf7f2 2415
9f95a23c 2416 connection->inject_delay();
11fdf7f2 2417
9f95a23c
TL
2418 connection->lock.lock();
2419 if (state != SESSION_ACCEPTING) {
2420 ldout(cct, 1) << __func__
2421 << " state changed while accept, it must be mark_down"
2422 << dendl;
2423 ceph_assert(state == CLOSED);
2424 return _fault();
2425 }
11fdf7f2 2426
9f95a23c
TL
2427 if (existing) {
2428 return handle_existing_connection(existing);
2429 }
11fdf7f2
TL
2430 }
2431
2432 // if everything is OK reply with server identification
2433 return send_server_ident();
2434}
2435
2436CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2437{
2438 ldout(cct, 20) << __func__
2439 << " payload.length()=" << payload.length() << dendl;
2440
2441 if (state != SESSION_ACCEPTING) {
2442 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2443 return _fault();
2444 }
2445
2446 auto reconnect = ReconnectFrame::Decode(payload);
2447
2448 ldout(cct, 5) << __func__
2449 << " received reconnect:"
2450 << " client_cookie=" << std::hex << reconnect.client_cookie()
2451 << " server_cookie=" << reconnect.server_cookie() << std::dec
2452 << " gs=" << reconnect.global_seq()
2453 << " cs=" << reconnect.connect_seq()
2454 << " ms=" << reconnect.msg_seq()
2455 << dendl;
2456
2457 // Should we check if one of the ident.addrs match connection->target_addr
2458 // as we do in ProtocolV1?
2459 connection->set_peer_addrs(reconnect.addrs());
2460 connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2461 peer_global_seq = reconnect.global_seq();
2462
2463 connection->lock.unlock();
2464 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2465
2466 if (existing &&
2467 existing->protocol->proto_type != 2) {
2468 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2469 << existing->protocol.get() << " version is "
2470 << existing->protocol->proto_type << ", marking down" << dendl;
2471 existing->mark_down();
2472 existing = nullptr;
2473 }
2474
2475 connection->inject_delay();
2476
2477 connection->lock.lock();
2478 if (state != SESSION_ACCEPTING) {
2479 ldout(cct, 1) << __func__
2480 << " state changed while accept, it must be mark_down"
2481 << dendl;
2482 ceph_assert(state == CLOSED);
2483 return _fault();
2484 }
2485
2486 if (!existing) {
2487 // there is no existing connection therefore cannot reconnect to previous
2488 // session
2489 ldout(cct, 0) << __func__
2490 << " no existing connection exists, reseting client" << dendl;
2491 auto reset = ResetFrame::Encode(true);
2492 return WRITE(reset, "session reset", read_frame);
2493 }
2494
2495 std::lock_guard<std::mutex> l(existing->lock);
2496
2497 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2498 if (!exproto) {
2499 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2500 ceph_assert(false);
2501 }
2502
2503 if (exproto->state == CLOSED) {
2504 ldout(cct, 5) << __func__ << " existing " << existing
2505 << " already closed. Reseting client" << dendl;
2506 auto reset = ResetFrame::Encode(true);
2507 return WRITE(reset, "session reset", read_frame);
2508 }
2509
2510 if (exproto->replacing) {
2511 ldout(cct, 1) << __func__
2512 << " existing racing replace happened while replacing."
2513 << " existing=" << existing << dendl;
2514 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2515 return WRITE(retry, "session retry", read_frame);
2516 }
2517
2518 if (exproto->client_cookie != reconnect.client_cookie()) {
2519 ldout(cct, 1) << __func__ << " existing=" << existing
2520 << " client cookie mismatch, I must have reseted:"
2521 << " cc=" << std::hex << exproto->client_cookie
2522 << " rcc=" << reconnect.client_cookie()
2523 << ", reseting client." << std::dec
2524 << dendl;
2525 auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2526 return WRITE(reset, "session reset", read_frame);
2527 } else if (exproto->server_cookie == 0) {
2528 // this happens when:
2529 // - a connects to b
2530 // - a sends client_ident
2531 // - b gets client_ident, sends server_ident and sets cookie X
2532 // - connection fault
2533 // - b reconnects to a with cookie X, connect_seq=1
2534 // - a has cookie==0
2535 ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2536 << " server_ident. Asking peer to resume session"
2537 << " establishment" << dendl;
2538 auto reset = ResetFrame::Encode(false);
2539 return WRITE(reset, "session reset", read_frame);
2540 }
2541
2542 if (exproto->peer_global_seq > reconnect.global_seq()) {
2543 ldout(cct, 5) << __func__
2544 << " stale global_seq: sgs=" << exproto->peer_global_seq
2545 << " cgs=" << reconnect.global_seq()
2546 << ", ask client to retry global" << dendl;
2547 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2548
2549 INTERCEPT(18);
2550
2551 return WRITE(retry, "session retry", read_frame);
2552 }
2553
2554 if (exproto->connect_seq > reconnect.connect_seq()) {
2555 ldout(cct, 5) << __func__
2556 << " stale connect_seq scs=" << exproto->connect_seq
2557 << " ccs=" << reconnect.connect_seq()
2558 << " , ask client to retry" << dendl;
2559 auto retry = RetryFrame::Encode(exproto->connect_seq);
2560 return WRITE(retry, "session retry", read_frame);
2561 }
2562
2563 if (exproto->connect_seq == reconnect.connect_seq()) {
2564 // reconnect race: both peers are sending reconnect messages
2565 if (existing->peer_addrs->msgr2_addr() >
2566 messenger->get_myaddrs().msgr2_addr() &&
2567 !existing->policy.server) {
2568 // the existing connection wins
2569 ldout(cct, 1)
2570 << __func__
2571 << " reconnect race detected, this connection loses to existing="
2572 << existing << dendl;
2573
2574 auto wait = WaitFrame::Encode();
2575 return WRITE(wait, "wait", read_frame);
2576 } else {
2577 // this connection wins
2578 ldout(cct, 1) << __func__
2579 << " reconnect race detected, replacing existing="
2580 << existing << " socket by this connection's socket"
2581 << dendl;
2582 }
2583 }
2584
2585 ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2586
2587 reconnecting = true;
2588
2589 // everything looks good
2590 exproto->connect_seq = reconnect.connect_seq();
2591 exproto->message_seq = reconnect.msg_seq();
2592
2593 return reuse_connection(existing, exproto);
2594}
2595
9f95a23c 2596CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
11fdf7f2
TL
2597 ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2598
2a845540 2599 std::unique_lock<std::mutex> l(existing->lock);
11fdf7f2
TL
2600
2601 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2602 if (!exproto) {
2603 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2604 ceph_assert(false);
2605 }
2606
2607 if (exproto->state == CLOSED) {
2608 ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2609 << dendl;
2a845540 2610 l.unlock();
11fdf7f2
TL
2611 return send_server_ident();
2612 }
2613
2614 if (exproto->replacing) {
2615 ldout(cct, 1) << __func__
2616 << " existing racing replace happened while replacing."
2617 << " existing=" << existing << dendl;
2618 auto wait = WaitFrame::Encode();
2619 return WRITE(wait, "wait", read_frame);
2620 }
2621
2622 if (exproto->peer_global_seq > peer_global_seq) {
2623 ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2624 << peer_global_seq
2625 << " existing->peer_global_seq=" << exproto->peer_global_seq
2626 << ", stopping this connection." << dendl;
2627 stop();
2628 connection->dispatch_queue->queue_reset(connection);
2629 return nullptr;
2630 }
2631
2632 if (existing->policy.lossy) {
2633 // existing connection can be thrown out in favor of this one
2634 ldout(cct, 1)
2635 << __func__ << " existing=" << existing
2636 << " is a lossy channel. Stopping existing in favor of this connection"
2637 << dendl;
2638 existing->protocol->stop();
2639 existing->dispatch_queue->queue_reset(existing.get());
2a845540 2640 l.unlock();
11fdf7f2
TL
2641 return send_server_ident();
2642 }
2643
2644 if (exproto->server_cookie && exproto->client_cookie &&
2645 exproto->client_cookie != client_cookie) {
2646 // Found previous session
2647 // peer has reseted and we're going to reuse the existing connection
2648 // by replacing the communication socket
2649 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2650 << ", peer must have reseted." << dendl;
2651 if (connection->policy.resetcheck) {
2652 exproto->reset_session();
2653 }
2654 return reuse_connection(existing, exproto);
2655 }
2656
2657 if (exproto->client_cookie == client_cookie) {
2658 // session establishment interrupted between client_ident and server_ident,
2659 // continuing...
2660 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2661 << ", continuing session establishment." << dendl;
2662 return reuse_connection(existing, exproto);
2663 }
2664
2665 if (exproto->state == READY || exproto->state == STANDBY) {
2666 ldout(cct, 1) << __func__ << " existing=" << existing
2667 << " is READY/STANDBY, lets reuse it" << dendl;
2668 return reuse_connection(existing, exproto);
2669 }
2670
2671 // Looks like a connection race: server and client are both connecting to
2672 // each other at the same time.
2673 if (connection->peer_addrs->msgr2_addr() <
2674 messenger->get_myaddrs().msgr2_addr() ||
2675 existing->policy.server) {
2676 // this connection wins
2677 ldout(cct, 1) << __func__
2678 << " connection race detected, replacing existing="
2679 << existing << " socket by this connection's socket" << dendl;
2680 return reuse_connection(existing, exproto);
2681 } else {
2682 // the existing connection wins
2683 ldout(cct, 1)
2684 << __func__
2685 << " connection race detected, this connection loses to existing="
2686 << existing << dendl;
2687 ceph_assert(connection->peer_addrs->msgr2_addr() >
2688 messenger->get_myaddrs().msgr2_addr());
2689
2690 // make sure we follow through with opening the existing
2691 // connection (if it isn't yet open) since we know the peer
2692 // has something to send to us.
2693 existing->send_keepalive();
2694 auto wait = WaitFrame::Encode();
2695 return WRITE(wait, "wait", read_frame);
2696 }
2697}
2698
9f95a23c 2699CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
11fdf7f2
TL
2700 ProtocolV2 *exproto) {
2701 ldout(cct, 20) << __func__ << " existing=" << existing
2702 << " reconnect=" << reconnecting << dendl;
2703
2704 connection->inject_delay();
2705
2706 std::lock_guard<std::mutex> l(existing->write_lock);
2707
2708 connection->center->delete_file_event(connection->cs.fd(),
2709 EVENT_READABLE | EVENT_WRITABLE);
2710
2711 if (existing->delay_state) {
2712 existing->delay_state->flush();
2713 ceph_assert(!connection->delay_state);
2714 }
2715 exproto->reset_recv_state();
2716 exproto->pre_auth.enabled = false;
2717
2718 if (!reconnecting) {
2719 exproto->client_cookie = client_cookie;
2720 exproto->peer_name = peer_name;
2721 exproto->connection_features = connection_features;
2722 existing->set_features(connection_features);
2a845540 2723 exproto->peer_supported_features = peer_supported_features;
11fdf7f2
TL
2724 }
2725 exproto->peer_global_seq = peer_global_seq;
2726
9f95a23c 2727 ceph_assert(connection->center->in_thread());
11fdf7f2
TL
2728 auto temp_cs = std::move(connection->cs);
2729 EventCenter *new_center = connection->center;
2730 Worker *new_worker = connection->worker;
9f95a23c
TL
2731 // we can steal the session_stream_handlers under the assumption
2732 // this happens in the event center's thread as there should be
2733 // no user outside its boundaries (simlarly to e.g. outgoing_bl).
2734 auto temp_stream_handlers = std::move(session_stream_handlers);
20effc67 2735 auto temp_compression_handlers = std::move(session_compression_handlers);
9f95a23c 2736 exproto->auth_meta = auth_meta;
20effc67 2737 exproto->comp_meta = comp_meta;
11fdf7f2
TL
2738
2739 ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2740 << dendl;
494da23a 2741
11fdf7f2
TL
2742 // avoid _stop shutdown replacing socket
2743 // queue a reset on the new connection, which we're dumping for the old
2744 stop();
2745
2746 connection->dispatch_queue->queue_reset(connection);
2747
2748 exproto->can_write = false;
494da23a 2749 exproto->write_in_progress = false;
11fdf7f2
TL
2750 exproto->reconnecting = reconnecting;
2751 exproto->replacing = true;
11fdf7f2
TL
2752 existing->state_offset = 0;
2753 // avoid previous thread modify event
2754 exproto->state = NONE;
2755 existing->state = AsyncConnection::STATE_NONE;
2756 // Discard existing prefetch buffer in `recv_buf`
2757 existing->recv_start = existing->recv_end = 0;
2758 // there shouldn't exist any buffer
2759 ceph_assert(connection->recv_start == connection->recv_end);
2760
2761 auto deactivate_existing = std::bind(
9f95a23c
TL
2762 [ existing,
2763 new_worker,
2764 new_center,
2765 exproto,
2a845540
TL
2766 reconnecting=reconnecting,
2767 tx_is_rev1=tx_frame_asm.get_is_rev1(),
2768 rx_is_rev1=rx_frame_asm.get_is_rev1(),
20effc67
TL
2769 temp_stream_handlers=std::move(temp_stream_handlers),
2770 temp_compression_handlers=std::move(temp_compression_handlers)
9f95a23c 2771 ](ConnectedSocket &cs) mutable {
11fdf7f2
TL
2772 // we need to delete time event in original thread
2773 {
2774 std::lock_guard<std::mutex> l(existing->lock);
2775 existing->write_lock.lock();
2776 exproto->requeue_sent();
9f95a23c
TL
2777 // XXX: do we really need the locking for `outgoing_bl`? There is
2778 // a comment just above its definition saying "lockfree, only used
2779 // in own thread". I'm following lockfull schema just in the case.
2780 // From performance point of view it should be fine – this happens
2781 // far away from hot paths.
2782 existing->outgoing_bl.clear();
11fdf7f2 2783 existing->open_write = false;
9f95a23c 2784 exproto->session_stream_handlers = std::move(temp_stream_handlers);
20effc67 2785 exproto->session_compression_handlers = std::move(temp_compression_handlers);
2a845540
TL
2786 if (!reconnecting) {
2787 exproto->tx_frame_asm.set_is_rev1(tx_is_rev1);
2788 exproto->rx_frame_asm.set_is_rev1(rx_is_rev1);
2789 }
11fdf7f2
TL
2790 existing->write_lock.unlock();
2791 if (exproto->state == NONE) {
2792 existing->shutdown_socket();
2793 existing->cs = std::move(cs);
2794 existing->worker->references--;
2795 new_worker->references++;
2796 existing->logger = new_worker->get_perf_counter();
2797 existing->worker = new_worker;
2798 existing->center = new_center;
2799 if (existing->delay_state)
2800 existing->delay_state->set_center(new_center);
2801 } else if (exproto->state == CLOSED) {
2802 auto back_to_close = std::bind(
2803 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2804 new_center->submit_to(new_center->get_id(),
2805 std::move(back_to_close), true);
2806 return;
2807 } else {
2808 ceph_abort();
2809 }
2810 }
2811
2812 // Before changing existing->center, it may already exists some
2813 // events in existing->center's queue. Then if we mark down
2814 // `existing`, it will execute in another thread and clean up
2815 // connection. Previous event will result in segment fault
2816 auto transfer_existing = [existing, exproto]() mutable {
2817 std::lock_guard<std::mutex> l(existing->lock);
2818 if (exproto->state == CLOSED) return;
2819 ceph_assert(exproto->state == NONE);
2820
2821 exproto->state = SESSION_ACCEPTING;
81eedcae
TL
2822 // we have called shutdown_socket above
2823 ceph_assert(existing->last_tick_id == 0);
2824 // restart timer since we are going to re-build connection
2825 existing->last_connect_started = ceph::coarse_mono_clock::now();
2826 existing->last_tick_id = existing->center->create_time_event(
2827 existing->connect_timeout_us, existing->tick_handler);
11fdf7f2
TL
2828 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2829 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2830 existing->read_handler);
2831 if (!exproto->reconnecting) {
2832 exproto->run_continuation(exproto->send_server_ident());
2833 } else {
2834 exproto->run_continuation(exproto->send_reconnect_ok());
2835 }
2836 };
2837 if (existing->center->in_thread())
2838 transfer_existing();
2839 else
2840 existing->center->submit_to(existing->center->get_id(),
2841 std::move(transfer_existing), true);
2842 },
2843 std::move(temp_cs));
2844
2845 existing->center->submit_to(existing->center->get_id(),
2846 std::move(deactivate_existing), true);
2847 return nullptr;
2848}
2849
2850CtPtr ProtocolV2::send_server_ident() {
2851 ldout(cct, 20) << __func__ << dendl;
2852
2853 // this is required for the case when this connection is being replaced
2854 out_seq = discard_requeued_up_to(out_seq, 0);
2855 in_seq = 0;
2856
2857 if (!connection->policy.lossy) {
2858 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2859 }
2860
2861 uint64_t flags = 0;
2862 if (connection->policy.lossy) {
2863 flags = flags | CEPH_MSG_CONNECT_LOSSY;
2864 }
2865
2866 uint64_t gs = messenger->get_global_seq();
2867 auto server_ident = ServerIdentFrame::Encode(
2868 messenger->get_myaddrs(),
2869 messenger->get_myname().num(),
2870 gs,
2871 connection->policy.features_supported,
2872 connection->policy.features_required | msgr2_required,
2873 flags,
2874 server_cookie);
2875
2876 ldout(cct, 5) << __func__ << " sending identification:"
2877 << " addrs=" << messenger->get_myaddrs()
2878 << " gid=" << messenger->get_myname().num()
2879 << " global_seq=" << gs << " features_supported=" << std::hex
2880 << connection->policy.features_supported
2881 << " features_required="
2882 << (connection->policy.features_required | msgr2_required)
f6b5b4d7
TL
2883 << " flags=" << flags
2884 << " cookie=" << server_cookie << std::dec << dendl;
11fdf7f2
TL
2885
2886 connection->lock.unlock();
2887 // Because "replacing" will prevent other connections preempt this addr,
2888 // it's safe that here we don't acquire Connection's lock
2889 ssize_t r = messenger->accept_conn(connection);
2890
2891 connection->inject_delay();
2892
2893 connection->lock.lock();
2894
2895 if (r < 0) {
2896 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2897 << connection->peer_addrs->msgr2_addr()
2898 << " just fail later one(this)" << dendl;
2899 connection->inject_delay();
2900 return _fault();
2901 }
2902 if (state != SESSION_ACCEPTING) {
2903 ldout(cct, 1) << __func__
2904 << " state changed while accept_conn, it must be mark_down"
2905 << dendl;
2906 ceph_assert(state == CLOSED || state == NONE);
2907 messenger->unregister_conn(connection);
2908 connection->inject_delay();
2909 return _fault();
2910 }
2911
2912 connection->set_features(connection_features);
2913
2914 // notify
2915 connection->dispatch_queue->queue_accept(connection);
2916 messenger->ms_deliver_handle_fast_accept(connection);
2917
2918 INTERCEPT(12);
2919
2920 return WRITE(server_ident, "server ident", server_ready);
2921}
2922
2923CtPtr ProtocolV2::server_ready() {
2924 ldout(cct, 20) << __func__ << dendl;
2925
2926 if (connection->delay_state) {
2927 ceph_assert(connection->delay_state->ready());
2928 }
2929
2930 return ready();
2931}
2932
2933CtPtr ProtocolV2::send_reconnect_ok() {
2934 ldout(cct, 20) << __func__ << dendl;
2935
2936 out_seq = discard_requeued_up_to(out_seq, message_seq);
2937
2938 uint64_t ms = in_seq;
2939 auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2940
2941 ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2942
2943 connection->lock.unlock();
2944 // Because "replacing" will prevent other connections preempt this addr,
2945 // it's safe that here we don't acquire Connection's lock
2946 ssize_t r = messenger->accept_conn(connection);
2947
2948 connection->inject_delay();
2949
2950 connection->lock.lock();
2951
2952 if (r < 0) {
2953 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2954 << connection->peer_addrs->msgr2_addr()
2955 << " just fail later one(this)" << dendl;
2956 connection->inject_delay();
2957 return _fault();
2958 }
2959 if (state != SESSION_ACCEPTING) {
2960 ldout(cct, 1) << __func__
2961 << " state changed while accept_conn, it must be mark_down"
2962 << dendl;
2963 ceph_assert(state == CLOSED || state == NONE);
2964 messenger->unregister_conn(connection);
2965 connection->inject_delay();
2966 return _fault();
2967 }
2968
2969 // notify
2970 connection->dispatch_queue->queue_accept(connection);
2971 messenger->ms_deliver_handle_fast_accept(connection);
2972
2973 INTERCEPT(14);
2974
2975 return WRITE(reconnect_ok, "reconnect ok", server_ready);
2976}
20effc67
TL
2977
2978
2979CtPtr ProtocolV2::handle_compression_request(ceph::bufferlist &payload) {
2980 if (state != COMPRESSION_ACCEPTING) {
2981 lderr(cct) << __func__ << " state changed!" << dendl;
2982 return _fault();
2983 }
2984
2985 auto request = CompressionRequestFrame::Decode(payload);
2986 ldout(cct, 10) << __func__ << " CompressionRequestFrame(is_compress=" << request.is_compress()
2987 << ", preferred_methods=" << request.preferred_methods() << ")" << dendl;
2988
2989 const int peer_type = connection->get_peer_type();
2990 if (Compressor::CompressionMode mode = messenger->comp_registry.get_mode(
2991 peer_type, auth_meta->is_mode_secure());
2992 mode != Compressor::COMP_NONE && request.is_compress()) {
2993 comp_meta.con_method = messenger->comp_registry.pick_method(peer_type, request.preferred_methods());
2994 if (comp_meta.con_method != Compressor::COMP_ALG_NONE) {
2995 comp_meta.con_mode = mode;
2996 }
2997 } else {
2998 comp_meta.con_method = Compressor::COMP_ALG_NONE;
2999 }
3000
3001 auto response = CompressionDoneFrame::Encode(comp_meta.is_compress(), comp_meta.get_method());
3002
3003 INTERCEPT(20);
3004 return WRITE(response, "compression done", finish_compression);
3005}
3006
3007CtPtr ProtocolV2::finish_compression() {
3008 // TODO: having a possibility to check whether we're server or client could
3009 // allow reusing finish_compression().
3010
3011 session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
3012 cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
3013
3014 state = SESSION_ACCEPTING;
3015 return CONTINUE(read_frame);
3016}