]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/ProtocolV2.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <type_traits>
5
6#include "ProtocolV2.h"
7#include "AsyncMessenger.h"
8
9#include "common/EventTrace.h"
10#include "common/ceph_crypto.h"
11#include "common/errno.h"
12#include "include/random.h"
13#include "auth/AuthClient.h"
14#include "auth/AuthServer.h"
15
16#define dout_subsys ceph_subsys_ms
17#undef dout_prefix
18#define dout_prefix _conn_prefix(_dout)
19ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
20 return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21 << *connection->peer_addrs << " conn(" << connection << " "
22 << this
23 << " " << ceph_con_mode_name(auth_meta->con_mode)
24 << " :" << connection->port
25 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq << " l=" << connection->policy.lossy
27 << " rx=" << session_stream_handlers.rx.get()
28 << " tx=" << session_stream_handlers.tx.get()
29 << ").";
30}
31
32using namespace ceph::msgr::v2;
33
34using CtPtr = Ct<ProtocolV2> *;
35using CtRef = Ct<ProtocolV2> &;
36
37void ProtocolV2::run_continuation(CtPtr pcontinuation) {
38 if (pcontinuation) {
39 run_continuation(*pcontinuation);
40 }
41}
42
43void ProtocolV2::run_continuation(CtRef continuation) {
44 try {
45 CONTINUATION_RUN(continuation)
46 } catch (const buffer::error &e) {
47 lderr(cct) << __func__ << " failed decoding of frame header: " << e
48 << dendl;
49 _fault();
50 } catch (const ceph::crypto::onwire::MsgAuthError &e) {
51 lderr(cct) << __func__ << " " << e.what() << dendl;
52 _fault();
53 } catch (const DecryptionError &) {
54 lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
55 }
56}
57
58#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
59
60#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
61
62#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
63
64#ifdef UNIT_TESTS_BUILT
65
66#define INTERCEPT(S) { \
67if(connection->interceptor) { \
68 auto a = connection->interceptor->intercept(connection, (S)); \
69 if (a == Interceptor::ACTION::FAIL) { \
70 return _fault(); \
71 } else if (a == Interceptor::ACTION::STOP) { \
72 stop(); \
73 connection->dispatch_queue->queue_reset(connection); \
74 return nullptr; \
75 }}}
76
77#else
78#define INTERCEPT(S)
79#endif
80
81ProtocolV2::ProtocolV2(AsyncConnection *connection)
82 : Protocol(2, connection),
83 state(NONE),
84 peer_required_features(0),
85 client_cookie(0),
86 server_cookie(0),
87 global_seq(0),
88 connect_seq(0),
89 peer_global_seq(0),
90 message_seq(0),
91 reconnecting(false),
92 replacing(false),
93 can_write(false),
94 bannerExchangeCallback(nullptr),
95 next_tag(static_cast<Tag>(0)),
96 keepalive(false) {
97}
98
99ProtocolV2::~ProtocolV2() {
100}
101
102void ProtocolV2::connect() {
103 ldout(cct, 1) << __func__ << dendl;
104 state = START_CONNECT;
105 pre_auth.enabled = true;
106}
107
108void ProtocolV2::accept() {
109 ldout(cct, 1) << __func__ << dendl;
110 state = START_ACCEPT;
111}
112
113bool ProtocolV2::is_connected() { return can_write; }
114
115/*
116 * Tears down the message queues, and removes them from the
117 * DispatchQueue Must hold write_lock prior to calling.
118 */
119void ProtocolV2::discard_out_queue() {
120 ldout(cct, 10) << __func__ << " started" << dendl;
121
122 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
123 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
124 (*p)->put();
125 }
126 sent.clear();
127 for (auto& [ prio, entries ] : out_queue) {
128 static_cast<void>(prio);
129 for (auto& entry : entries) {
130 ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
131 entry.m->put();
132 }
133 }
134 out_queue.clear();
494da23a 135 write_in_progress = false;
11fdf7f2
TL
136}
137
138void ProtocolV2::reset_session() {
139 ldout(cct, 1) << __func__ << dendl;
140
141 std::lock_guard<std::mutex> l(connection->write_lock);
142 if (connection->delay_state) {
143 connection->delay_state->discard();
144 }
145
146 connection->dispatch_queue->discard_queue(connection->conn_id);
147 discard_out_queue();
9f95a23c 148 connection->outgoing_bl.clear();
11fdf7f2
TL
149
150 connection->dispatch_queue->queue_remote_reset(connection);
151
152 out_seq = 0;
153 in_seq = 0;
154 client_cookie = 0;
155 server_cookie = 0;
156 connect_seq = 0;
157 peer_global_seq = 0;
158 message_seq = 0;
159 ack_left = 0;
160 can_write = false;
161}
162
163void ProtocolV2::stop() {
164 ldout(cct, 1) << __func__ << dendl;
165 if (state == CLOSED) {
166 return;
167 }
168
169 if (connection->delay_state) connection->delay_state->flush();
170
171 std::lock_guard<std::mutex> l(connection->write_lock);
172
173 reset_recv_state();
174 discard_out_queue();
175
176 connection->_stop();
177
178 can_write = false;
179 state = CLOSED;
180}
181
182void ProtocolV2::fault() { _fault(); }
183
184void ProtocolV2::requeue_sent() {
494da23a 185 write_in_progress = false;
11fdf7f2
TL
186 if (sent.empty()) {
187 return;
188 }
189
190 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
191 out_seq -= sent.size();
192 while (!sent.empty()) {
193 Message *m = sent.back();
194 sent.pop_back();
195 ldout(cct, 5) << __func__ << " requeueing message m=" << m
196 << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
197 << *m << dendl;
9f95a23c 198 m->clear_payload();
11fdf7f2
TL
199 rq.emplace_front(out_queue_entry_t{false, m});
200 }
201}
202
203uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
204 ldout(cct, 10) << __func__ << " " << seq << dendl;
205 std::lock_guard<std::mutex> l(connection->write_lock);
206 if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
207 return seq;
208 }
209 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
210 uint64_t count = out_seq;
211 while (!rq.empty()) {
212 Message* const m = rq.front().m;
213 if (m->get_seq() == 0 || m->get_seq() > seq) break;
214 ldout(cct, 5) << __func__ << " discarding message m=" << m
215 << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
216 << *m << dendl;
217 m->put();
218 rq.pop_front();
219 count++;
220 }
221 if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
222 return count;
223}
224
9f95a23c
TL
225void ProtocolV2::reset_security() {
226 ldout(cct, 5) << __func__ << dendl;
227
494da23a 228 auth_meta.reset(new AuthConnectionMeta);
494da23a 229 session_stream_handlers.rx.reset(nullptr);
9f95a23c 230 session_stream_handlers.tx.reset(nullptr);
494da23a 231 pre_auth.rxbuf.clear();
9f95a23c
TL
232 pre_auth.txbuf.clear();
233}
234
235// it's expected the `write_lock` is held while calling this method.
236void ProtocolV2::reset_recv_state() {
237 ldout(cct, 5) << __func__ << dendl;
238
239 if (!connection->center->in_thread()) {
240 // execute in the same thread that uses the rx/tx handlers. We need
241 // to do the warp because holding `write_lock` is not enough as
242 // `write_event()` unlocks it just before calling `write_message()`.
243 // `submit_to()` here is NOT blocking.
244 connection->center->submit_to(connection->center->get_id(), [this] {
245 ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
246 << dendl;
247 // Possibly unnecessary. See the comment in `deactivate_existing`.
248 std::lock_guard<std::mutex> l(connection->lock);
249 std::lock_guard<std::mutex> wl(connection->write_lock);
250 reset_security();
251 }, /* always_async = */true);
252 } else {
253 reset_security();
254 }
11fdf7f2
TL
255
256 // clean read and write callbacks
257 connection->pendingReadLen.reset();
258 connection->writeCallback.reset();
259
260 next_tag = static_cast<Tag>(0);
261
262 reset_throttle();
263}
264
265size_t ProtocolV2::get_current_msg_size() const {
266 ceph_assert(!rx_segments_desc.empty());
267 size_t sum = 0;
268 // we don't include SegmentIndex::Msg::HEADER.
269 for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
270 sum += rx_segments_desc[idx].length;
271 }
272 return sum;
273}
274
275void ProtocolV2::reset_throttle() {
276 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
277 connection->policy.throttler_messages) {
278 ldout(cct, 10) << __func__ << " releasing " << 1
279 << " message to policy throttler "
280 << connection->policy.throttler_messages->get_current()
281 << "/" << connection->policy.throttler_messages->get_max()
282 << dendl;
283 connection->policy.throttler_messages->put();
284 }
285 if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
286 if (connection->policy.throttler_bytes) {
287 const size_t cur_msg_size = get_current_msg_size();
288 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
289 << " bytes to policy throttler "
290 << connection->policy.throttler_bytes->get_current() << "/"
291 << connection->policy.throttler_bytes->get_max() << dendl;
292 connection->policy.throttler_bytes->put(cur_msg_size);
293 }
294 }
295 if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
296 const size_t cur_msg_size = get_current_msg_size();
297 ldout(cct, 10)
298 << __func__ << " releasing " << cur_msg_size
299 << " bytes to dispatch_queue throttler "
300 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
301 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
302 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
303 }
304}
305
306CtPtr ProtocolV2::_fault() {
307 ldout(cct, 10) << __func__ << dendl;
308
309 if (state == CLOSED || state == NONE) {
310 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
311 return nullptr;
312 }
313
314 if (connection->policy.lossy &&
315 !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
316 ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
317 stop();
318 connection->dispatch_queue->queue_reset(connection);
319 return nullptr;
320 }
321
322 connection->write_lock.lock();
323
324 can_write = false;
325 // requeue sent items
326 requeue_sent();
327
328 if (out_queue.empty() && state >= START_ACCEPT &&
329 state <= SESSION_ACCEPTING && !replacing) {
330 ldout(cct, 2) << __func__ << " with nothing to send and in the half "
331 << " accept state just closed" << dendl;
332 connection->write_lock.unlock();
333 stop();
334 connection->dispatch_queue->queue_reset(connection);
335 return nullptr;
336 }
337
338 replacing = false;
339 connection->fault();
340 reset_recv_state();
341
342 reconnecting = false;
343
344 if (connection->policy.standby && out_queue.empty() && !keepalive &&
345 state != WAIT) {
346 ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
347 << dendl;
348 state = STANDBY;
349 connection->write_lock.unlock();
350 return nullptr;
351 }
352 if (connection->policy.server) {
353 ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
354 state = STANDBY;
355 connection->write_lock.unlock();
356 return nullptr;
357 }
358
359 connection->write_lock.unlock();
360
361 if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
362 state != WAIT &&
363 state != SESSION_ACCEPTING /* due to connection race */) {
364 // policy maybe empty when state is in accept
365 if (connection->policy.server) {
366 ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
367 state = STANDBY;
368 } else {
369 ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
370 connect_seq++;
371 global_seq = messenger->get_global_seq();
372 state = START_CONNECT;
373 pre_auth.enabled = true;
374 connection->state = AsyncConnection::STATE_CONNECTING;
375 }
376 backoff = utime_t();
377 connection->center->dispatch_event_external(connection->read_handler);
378 } else {
379 if (state == WAIT) {
380 backoff.set_from_double(cct->_conf->ms_max_backoff);
381 } else if (backoff == utime_t()) {
382 backoff.set_from_double(cct->_conf->ms_initial_backoff);
383 } else {
384 backoff += backoff;
385 if (backoff > cct->_conf->ms_max_backoff)
386 backoff.set_from_double(cct->_conf->ms_max_backoff);
387 }
388
389 if (server_cookie) {
390 connect_seq++;
391 }
392
393 global_seq = messenger->get_global_seq();
394 state = START_CONNECT;
395 pre_auth.enabled = true;
396 connection->state = AsyncConnection::STATE_CONNECTING;
397 ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
398 // woke up again;
399 connection->register_time_events.insert(
400 connection->center->create_time_event(backoff.to_nsec() / 1000,
401 connection->wakeup_handler));
402 }
403 return nullptr;
404}
405
406void ProtocolV2::prepare_send_message(uint64_t features,
407 Message *m) {
408 ldout(cct, 20) << __func__ << " m=" << *m << dendl;
409
410 // associate message with Connection (for benefit of encode_payload)
9f95a23c
TL
411 ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
412 << features << " " << m << " " << *m << dendl;
11fdf7f2
TL
413
414 // encode and copy out of *m
415 m->encode(features, 0);
416}
417
418void ProtocolV2::send_message(Message *m) {
419 uint64_t f = connection->get_features();
420
421 // TODO: Currently not all messages supports reencode like MOSDMap, so here
422 // only let fast dispatch support messages prepare message
423 const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
424 if (can_fast_prepare) {
425 prepare_send_message(f, m);
426 }
427
428 std::lock_guard<std::mutex> l(connection->write_lock);
429 bool is_prepared = can_fast_prepare;
430 // "features" changes will change the payload encoding
431 if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
432 // ensure the correctness of message encoding
433 m->clear_payload();
434 is_prepared = false;
435 ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
436 << " != " << connection->get_features() << dendl;
437 }
438 if (state == CLOSED) {
439 ldout(cct, 10) << __func__ << " connection closed."
440 << " Drop message " << m << dendl;
441 m->put();
442 } else {
443 ldout(cct, 5) << __func__ << " enqueueing message m=" << m
444 << " type=" << m->get_type() << " " << *m << dendl;
9f95a23c 445 m->queue_start = ceph::mono_clock::now();
11fdf7f2
TL
446 m->trace.event("async enqueueing message");
447 out_queue[m->get_priority()].emplace_back(
448 out_queue_entry_t{is_prepared, m});
449 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
450 << dendl;
494da23a
TL
451 if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
452 write_in_progress = true;
11fdf7f2
TL
453 connection->center->dispatch_event_external(connection->write_handler);
454 }
455 }
456}
457
458void ProtocolV2::send_keepalive() {
459 ldout(cct, 10) << __func__ << dendl;
460 std::lock_guard<std::mutex> l(connection->write_lock);
461 if (state != CLOSED) {
462 keepalive = true;
463 connection->center->dispatch_event_external(connection->write_handler);
464 }
465}
466
467void ProtocolV2::read_event() {
468 ldout(cct, 20) << __func__ << dendl;
469
470 switch (state) {
471 case START_CONNECT:
472 run_continuation(CONTINUATION(start_client_banner_exchange));
473 break;
474 case START_ACCEPT:
475 run_continuation(CONTINUATION(start_server_banner_exchange));
476 break;
477 case READY:
478 run_continuation(CONTINUATION(read_frame));
479 break;
480 case THROTTLE_MESSAGE:
481 run_continuation(CONTINUATION(throttle_message));
482 break;
483 case THROTTLE_BYTES:
484 run_continuation(CONTINUATION(throttle_bytes));
485 break;
486 case THROTTLE_DISPATCH_QUEUE:
487 run_continuation(CONTINUATION(throttle_dispatch_queue));
488 break;
489 default:
490 break;
491 }
492}
493
494ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
495 out_queue_entry_t out_entry;
496
497 if (!out_queue.empty()) {
498 auto it = out_queue.rbegin();
499 auto& entries = it->second;
500 ceph_assert(!entries.empty());
501 out_entry = entries.front();
502 entries.pop_front();
503 if (entries.empty()) {
504 out_queue.erase(it->first);
505 }
506 }
507 return out_entry;
508}
509
510ssize_t ProtocolV2::write_message(Message *m, bool more) {
511 FUNCTRACE(cct);
512 ceph_assert(connection->center->in_thread());
513 m->set_seq(++out_seq);
514
515 connection->lock.lock();
516 uint64_t ack_seq = in_seq;
517 ack_left = 0;
518 connection->lock.unlock();
519
520 ceph_msg_header &header = m->get_header();
521 ceph_msg_footer &footer = m->get_footer();
522
523 ceph_msg_header2 header2{header.seq, header.tid,
524 header.type, header.priority,
525 header.version,
eafe8130
TL
526 init_le32(0), header.data_off,
527 init_le64(ack_seq),
11fdf7f2
TL
528 footer.flags, header.compat_version,
529 header.reserved};
530
531 auto message = MessageFrame::Encode(
532 header2,
533 m->get_payload(),
534 m->get_middle(),
535 m->get_data());
9f95a23c 536 connection->outgoing_bl.append(message.get_buffer(session_stream_handlers));
11fdf7f2
TL
537
538 ldout(cct, 5) << __func__ << " sending message m=" << m
539 << " seq=" << m->get_seq() << " " << *m << dendl;
540
541 m->trace.event("async writing message");
542 ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
543 << " src=" << entity_name_t(messenger->get_myname())
544 << " off=" << header2.data_off
545 << dendl;
9f95a23c 546 ssize_t total_send_size = connection->outgoing_bl.length();
11fdf7f2
TL
547 ssize_t rc = connection->_try_send(more);
548 if (rc < 0) {
549 ldout(cct, 1) << __func__ << " error sending " << m << ", "
550 << cpp_strerror(rc) << dendl;
551 } else {
552 connection->logger->inc(
9f95a23c 553 l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
11fdf7f2
TL
554 ldout(cct, 10) << __func__ << " sending " << m
555 << (rc ? " continuely." : " done.") << dendl;
556 }
9f95a23c
TL
557
558#if defined(WITH_EVENTTRACE)
11fdf7f2
TL
559 if (m->get_type() == CEPH_MSG_OSD_OP)
560 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
561 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
562 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
9f95a23c 563#endif
11fdf7f2
TL
564 m->put();
565
566 return rc;
567}
568
569void ProtocolV2::append_keepalive() {
570 ldout(cct, 10) << __func__ << dendl;
571 auto keepalive_frame = KeepAliveFrame::Encode();
9f95a23c 572 connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
11fdf7f2
TL
573}
574
575void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
576 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
9f95a23c 577 connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
11fdf7f2
TL
578}
579
580void ProtocolV2::handle_message_ack(uint64_t seq) {
581 if (connection->policy.lossy) { // lossy connections don't keep sent messages
582 return;
583 }
584
585 ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
586
587 // trim sent list
588 static const int max_pending = 128;
589 int i = 0;
590 Message *pending[max_pending];
9f95a23c 591 auto now = ceph::mono_clock::now();
11fdf7f2
TL
592 connection->write_lock.lock();
593 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
594 Message *m = sent.front();
595 sent.pop_front();
596 pending[i++] = m;
597 ldout(cct, 10) << __func__ << " got ack seq " << seq
598 << " >= " << m->get_seq() << " on " << m << " " << *m
599 << dendl;
600 }
601 connection->write_lock.unlock();
9f95a23c 602 connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
11fdf7f2
TL
603 for (int k = 0; k < i; k++) {
604 pending[k]->put();
605 }
606}
607
608void ProtocolV2::write_event() {
609 ldout(cct, 10) << __func__ << dendl;
610 ssize_t r = 0;
611
612 connection->write_lock.lock();
613 if (can_write) {
614 if (keepalive) {
615 append_keepalive();
616 keepalive = false;
617 }
618
619 auto start = ceph::mono_clock::now();
620 bool more;
621 do {
622 const auto out_entry = _get_next_outgoing();
623 if (!out_entry.m) {
624 break;
625 }
626
627 if (!connection->policy.lossy) {
628 // put on sent list
629 sent.push_back(out_entry.m);
630 out_entry.m->get();
631 }
632 more = !out_queue.empty();
633 connection->write_lock.unlock();
634
635 // send_message or requeue messages may not encode message
636 if (!out_entry.is_prepared) {
637 prepare_send_message(connection->get_features(), out_entry.m);
638 }
639
9f95a23c
TL
640 if (out_entry.m->queue_start != ceph::mono_time()) {
641 connection->logger->tinc(l_msgr_send_messages_queue_lat,
642 ceph::mono_clock::now() -
643 out_entry.m->queue_start);
644 }
645
11fdf7f2
TL
646 r = write_message(out_entry.m, more);
647
648 connection->write_lock.lock();
649 if (r == 0) {
650 ;
651 } else if (r < 0) {
652 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
653 break;
9f95a23c
TL
654 } else if (r > 0) {
655 // Outbound message in-progress, thread will be re-awoken
656 // when the outbound socket is writeable again
11fdf7f2 657 break;
9f95a23c 658 }
11fdf7f2 659 } while (can_write);
494da23a 660 write_in_progress = false;
11fdf7f2
TL
661
662 // if r > 0 mean data still lefted, so no need _try_send.
663 if (r == 0) {
664 uint64_t left = ack_left;
665 if (left) {
11fdf7f2 666 auto ack = AckFrame::Encode(in_seq);
9f95a23c 667 connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers));
11fdf7f2
TL
668 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
669 << " messages" << dendl;
670 ack_left -= left;
671 left = ack_left;
672 r = connection->_try_send(left);
673 } else if (is_queued()) {
674 r = connection->_try_send();
675 }
676 }
677 connection->write_lock.unlock();
678
679 connection->logger->tinc(l_msgr_running_send_time,
680 ceph::mono_clock::now() - start);
681 if (r < 0) {
682 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
683 connection->lock.lock();
684 fault();
685 connection->lock.unlock();
686 return;
687 }
688 } else {
494da23a 689 write_in_progress = false;
11fdf7f2
TL
690 connection->write_lock.unlock();
691 connection->lock.lock();
692 connection->write_lock.lock();
693 if (state == STANDBY && !connection->policy.server && is_queued()) {
694 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
695 if (server_cookie) { // only increment connect_seq if there is a session
696 connect_seq++;
697 }
698 connection->_connect();
699 } else if (connection->cs && state != NONE && state != CLOSED &&
700 state != START_CONNECT) {
701 r = connection->_try_send();
702 if (r < 0) {
703 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
704 connection->write_lock.unlock();
705 fault();
706 connection->lock.unlock();
707 return;
708 }
709 }
710 connection->write_lock.unlock();
711 connection->lock.unlock();
712 }
713}
714
715bool ProtocolV2::is_queued() {
716 return !out_queue.empty() || connection->is_queued();
717}
718
719uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
720 if (session_stream_handlers.rx) {
721 return segment_onwire_size(logical_size);
722 } else {
723 return logical_size;
724 }
725}
726
727uint32_t ProtocolV2::get_epilogue_size() const {
728 // In secure mode size of epilogue is flexible and depends on particular
729 // cipher implementation. See the comment for epilogue_secure_block_t or
730 // epilogue_plain_block_t.
731 if (session_stream_handlers.rx) {
732 return FRAME_SECURE_EPILOGUE_SIZE + \
733 session_stream_handlers.rx->get_extra_size_at_final();
734 } else {
735 return FRAME_PLAIN_EPILOGUE_SIZE;
736 }
737}
738
739CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
740 rx_buffer_t &&buffer) {
741 const auto len = buffer->length();
742 const auto buf = buffer->c_str();
743 next.node = std::move(buffer);
744 ssize_t r = connection->read(len, buf,
745 [&next, this](char *buffer, int r) {
746 if (unlikely(pre_auth.enabled) && r >= 0) {
747 pre_auth.rxbuf.append(*next.node);
748 ceph_assert(!cct->_conf->ms_die_on_bug ||
749 pre_auth.rxbuf.length() < 1000000);
750 }
751 next.r = r;
752 run_continuation(next);
753 });
754 if (r <= 0) {
755 // error or done synchronously
756 if (unlikely(pre_auth.enabled) && r >= 0) {
757 pre_auth.rxbuf.append(*next.node);
758 ceph_assert(!cct->_conf->ms_die_on_bug ||
759 pre_auth.rxbuf.length() < 1000000);
760 }
761 next.r = r;
762 return &next;
763 }
764
765 return nullptr;
766}
767
768template <class F>
769CtPtr ProtocolV2::write(const std::string &desc,
770 CONTINUATION_TYPE<ProtocolV2> &next,
771 F &frame) {
772 ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
773 return write(desc, next, bl);
774}
775
776CtPtr ProtocolV2::write(const std::string &desc,
777 CONTINUATION_TYPE<ProtocolV2> &next,
778 bufferlist &buffer) {
779 if (unlikely(pre_auth.enabled)) {
780 pre_auth.txbuf.append(buffer);
781 ceph_assert(!cct->_conf->ms_die_on_bug ||
782 pre_auth.txbuf.length() < 1000000);
783 }
784
785 ssize_t r =
786 connection->write(buffer, [&next, desc, this](int r) {
787 if (r < 0) {
788 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
789 << " (" << cpp_strerror(r) << ")" << dendl;
790 connection->inject_delay();
791 _fault();
792 }
793 run_continuation(next);
794 });
795
796 if (r < 0) {
797 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
798 << " (" << cpp_strerror(r) << ")" << dendl;
799 return _fault();
800 } else if (r == 0) {
801 next.setParams();
802 return &next;
803 }
804
805 return nullptr;
806}
807
808CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
809 ldout(cct, 20) << __func__ << dendl;
810 bannerExchangeCallback = &callback;
811
812 bufferlist banner_payload;
813 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
814 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
815
816 bufferlist bl;
817 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
818 encode((uint16_t)banner_payload.length(), bl, 0);
819 bl.claim_append(banner_payload);
820
821 INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
822
823 return WRITE(bl, "banner", _wait_for_peer_banner);
824}
825
826CtPtr ProtocolV2::_wait_for_peer_banner() {
9f95a23c 827 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
11fdf7f2
TL
828 return READ(banner_len, _handle_peer_banner);
829}
830
831CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
832 ldout(cct, 20) << __func__ << " r=" << r << dendl;
833
834 if (r < 0) {
835 ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
836 << cpp_strerror(r) << ")" << dendl;
837 return _fault();
838 }
839
840 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
841
842 if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
843 if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
844 lderr(cct) << __func__ << " peer " << *connection->peer_addrs
845 << " is using msgr V1 protocol" << dendl;
846 return _fault();
847 }
848 ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
849 return _fault();
850 }
851
852 uint16_t payload_len;
853 bufferlist bl;
854 buffer->set_offset(banner_prefix_len);
9f95a23c 855 buffer->set_length(sizeof(ceph_le16));
11fdf7f2
TL
856 bl.push_back(std::move(buffer));
857 auto ti = bl.cbegin();
858 try {
859 decode(payload_len, ti);
860 } catch (const buffer::error &e) {
861 lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
862 return _fault();
863 }
864
865 INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
866
867 return READ(payload_len, _handle_peer_banner_payload);
868}
869
870CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
871 ldout(cct, 20) << __func__ << " r=" << r << dendl;
872
873 if (r < 0) {
874 ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
875 << " (" << cpp_strerror(r) << ")" << dendl;
876 return _fault();
877 }
878
879 uint64_t peer_supported_features;
880 uint64_t peer_required_features;
881
882 bufferlist bl;
883 bl.push_back(std::move(buffer));
884 auto ti = bl.cbegin();
885 try {
886 decode(peer_supported_features, ti);
887 decode(peer_required_features, ti);
888 } catch (const buffer::error &e) {
889 lderr(cct) << __func__ << " decode banner payload failed " << dendl;
890 return _fault();
891 }
892
893 ldout(cct, 1) << __func__ << " supported=" << std::hex
894 << peer_supported_features << " required=" << std::hex
895 << peer_required_features << std::dec << dendl;
896
897 // Check feature bit compatibility
898
899 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
900 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
901
902 if ((required_features & peer_supported_features) != required_features) {
903 ldout(cct, 1) << __func__ << " peer does not support all required features"
904 << " required=" << std::hex << required_features
905 << " supported=" << std::hex << peer_supported_features
906 << std::dec << dendl;
907 stop();
908 connection->dispatch_queue->queue_reset(connection);
909 return nullptr;
910 }
911 if ((supported_features & peer_required_features) != peer_required_features) {
912 ldout(cct, 1) << __func__ << " we do not support all peer required features"
913 << " required=" << std::hex << peer_required_features
914 << " supported=" << supported_features << std::dec << dendl;
915 stop();
916 connection->dispatch_queue->queue_reset(connection);
917 return nullptr;
918 }
919
920 this->peer_required_features = peer_required_features;
921 if (this->peer_required_features == 0) {
922 this->connection_features = msgr2_required;
923 }
924
925 // at this point we can change how the client protocol behaves based on
926 // this->peer_required_features
927
928 if (state == BANNER_CONNECTING) {
929 state = HELLO_CONNECTING;
930 }
931 else {
932 ceph_assert(state == BANNER_ACCEPTING);
933 state = HELLO_ACCEPTING;
934 }
935
936 auto hello = HelloFrame::Encode(messenger->get_mytype(),
937 connection->target_addr);
938
939 INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
940
941 return WRITE(hello, "hello frame", read_frame);
942}
943
944CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
945{
946 ldout(cct, 20) << __func__
947 << " payload.length()=" << payload.length() << dendl;
948
949 if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
950 lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
951 return _fault();
952 }
953
954 auto hello = HelloFrame::Decode(payload);
955
956 ldout(cct, 5) << __func__ << " received hello:"
957 << " peer_type=" << (int)hello.entity_type()
958 << " peer_addr_for_me=" << hello.peer_addr() << dendl;
959
81eedcae
TL
960 sockaddr_storage ss;
961 socklen_t len = sizeof(ss);
962 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
963 ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
964 << " when talking to " << connection->target_addr << dendl;
965
11fdf7f2
TL
966 if (connection->get_peer_type() == -1) {
967 connection->set_peer_type(hello.entity_type());
968
969 ceph_assert(state == HELLO_ACCEPTING);
970 connection->policy = messenger->get_policy(hello.entity_type());
971 ldout(cct, 10) << __func__ << " accept of host_type "
972 << (int)hello.entity_type()
973 << ", policy.lossy=" << connection->policy.lossy
974 << " policy.server=" << connection->policy.server
975 << " policy.standby=" << connection->policy.standby
976 << " policy.resetcheck=" << connection->policy.resetcheck
977 << dendl;
978 } else {
979 ceph_assert(state == HELLO_CONNECTING);
980 if (connection->get_peer_type() != hello.entity_type()) {
981 ldout(cct, 1) << __func__ << " connection peer type does not match what"
982 << " peer advertises " << connection->get_peer_type()
983 << " != " << (int)hello.entity_type() << dendl;
984 stop();
985 connection->dispatch_queue->queue_reset(connection);
986 return nullptr;
987 }
988 }
989
81eedcae
TL
990 if (messenger->get_myaddrs().empty() ||
991 messenger->get_myaddrs().front().is_blank_ip()) {
992 entity_addr_t a;
993 if (cct->_conf->ms_learn_addr_from_peer) {
994 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
995 << " says I am " << hello.peer_addr() << " (socket says "
996 << (sockaddr*)&ss << ")" << dendl;
997 a = hello.peer_addr();
998 } else {
999 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
1000 << " says I am " << (sockaddr*)&ss
1001 << " (peer says " << hello.peer_addr() << ")" << dendl;
1002 a.set_sockaddr((sockaddr *)&ss);
1003 }
1004 a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
1005 a.set_port(0);
1006 connection->lock.unlock();
1007 messenger->learned_addr(a);
1008 if (cct->_conf->ms_inject_internal_delays &&
1009 cct->_conf->ms_inject_socket_failures) {
1010 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1011 ldout(cct, 10) << __func__ << " sleep for "
1012 << cct->_conf->ms_inject_internal_delays << dendl;
1013 utime_t t;
1014 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1015 t.sleep();
1016 }
1017 }
1018 connection->lock.lock();
1019 if (state != HELLO_CONNECTING) {
1020 ldout(cct, 1) << __func__
1021 << " state changed while learned_addr, mark_down or "
1022 << " replacing must be happened just now" << dendl;
1023 return nullptr;
1024 }
1025 }
1026
1027
1028
11fdf7f2
TL
1029 CtPtr callback;
1030 callback = bannerExchangeCallback;
1031 bannerExchangeCallback = nullptr;
1032 ceph_assert(callback);
1033 return callback;
1034}
1035
1036CtPtr ProtocolV2::read_frame() {
1037 if (state == CLOSED) {
1038 return nullptr;
1039 }
1040
1041 ldout(cct, 20) << __func__ << dendl;
1042 return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
1043}
1044
1045CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
1046 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1047
1048 if (r < 0) {
1049 ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
1050 << " (" << cpp_strerror(r) << ")" << dendl;
1051 return _fault();
1052 }
1053
1054 ceph::bufferlist preamble;
1055 preamble.push_back(std::move(buffer));
1056
1057 ldout(cct, 30) << __func__ << " preamble\n";
1058 preamble.hexdump(*_dout);
1059 *_dout << dendl;
1060
1061 if (session_stream_handlers.rx) {
1062 ceph_assert(session_stream_handlers.rx);
1063
1064 session_stream_handlers.rx->reset_rx_handler();
1065 preamble = session_stream_handlers.rx->authenticated_decrypt_update(
1066 std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
1067
1068 ldout(cct, 10) << __func__ << " got encrypted preamble."
1069 << " after decrypt premable.length()=" << preamble.length()
1070 << dendl;
1071
1072 ldout(cct, 30) << __func__ << " preamble after decrypt\n";
1073 preamble.hexdump(*_dout);
1074 *_dout << dendl;
1075 }
1076
1077 {
1078 // I expect ceph_le32 will make the endian conversion for me. Passing
1079 // everything through ::Decode is unnecessary.
1080 const auto& main_preamble = \
1081 reinterpret_cast<preamble_block_t&>(*preamble.c_str());
1082
1083 // verify preamble's CRC before any further processing
1084 const auto rx_crc = ceph_crc32c(0,
1085 reinterpret_cast<const unsigned char*>(&main_preamble),
1086 sizeof(main_preamble) - sizeof(main_preamble.crc));
1087 if (rx_crc != main_preamble.crc) {
1088 ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
1089 << " rx_crc=" << rx_crc
1090 << " tx_crc=" << main_preamble.crc << dendl;
1091 return _fault();
1092 }
1093
1094 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1095 if (main_preamble.num_segments < 1 ||
1096 main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1097 ldout(cct, 10) << __func__ << " unsupported num_segments="
1098 << " tx_crc=" << main_preamble.num_segments << dendl;
1099 return _fault();
1100 }
1101
1102 next_tag = static_cast<Tag>(main_preamble.tag);
1103
1104 rx_segments_desc.clear();
1105 rx_segments_data.clear();
1106
1107 if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1108 ldout(cct, 30) << __func__
1109 << " num_segments=" << main_preamble.num_segments
1110 << " is too much" << dendl;
1111 return _fault();
1112 }
1113 for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
1114 ldout(cct, 10) << __func__ << " got new segment:"
1115 << " len=" << main_preamble.segments[idx].length
1116 << " align=" << main_preamble.segments[idx].alignment
1117 << dendl;
1118 rx_segments_desc.emplace_back(main_preamble.segments[idx]);
1119 }
1120 }
1121
1122 // does it need throttle?
1123 if (next_tag == Tag::MESSAGE) {
1124 if (state != READY) {
1125 lderr(cct) << __func__ << " not in ready state!" << dendl;
1126 return _fault();
1127 }
1128 state = THROTTLE_MESSAGE;
1129 return CONTINUE(throttle_message);
1130 } else {
1131 return read_frame_segment();
1132 }
1133}
1134
1135CtPtr ProtocolV2::handle_read_frame_dispatch() {
1136 ldout(cct, 10) << __func__
1137 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1138
1139 switch (next_tag) {
1140 case Tag::HELLO:
1141 case Tag::AUTH_REQUEST:
1142 case Tag::AUTH_BAD_METHOD:
1143 case Tag::AUTH_REPLY_MORE:
1144 case Tag::AUTH_REQUEST_MORE:
1145 case Tag::AUTH_DONE:
1146 case Tag::AUTH_SIGNATURE:
1147 case Tag::CLIENT_IDENT:
1148 case Tag::SERVER_IDENT:
1149 case Tag::IDENT_MISSING_FEATURES:
1150 case Tag::SESSION_RECONNECT:
1151 case Tag::SESSION_RESET:
1152 case Tag::SESSION_RETRY:
1153 case Tag::SESSION_RETRY_GLOBAL:
1154 case Tag::SESSION_RECONNECT_OK:
1155 case Tag::KEEPALIVE2:
1156 case Tag::KEEPALIVE2_ACK:
1157 case Tag::ACK:
1158 case Tag::WAIT:
1159 return handle_frame_payload();
1160 case Tag::MESSAGE:
1161 return handle_message();
1162 default: {
1163 lderr(cct) << __func__
1164 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1165 << dendl;
1166 return _fault();
1167 }
1168 }
1169
1170 return nullptr;
1171}
1172
1173CtPtr ProtocolV2::read_frame_segment() {
1174 ldout(cct, 20) << __func__ << dendl;
1175 ceph_assert(!rx_segments_desc.empty());
1176
1177 // description of current segment to read
1178 const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
1179 rx_buffer_t rx_buffer;
1180 try {
1181 rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
1182 get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
1183 } catch (std::bad_alloc&) {
1184 // Catching because of potential issues with satisfying alignment.
1185 ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
1186 << " len=" << get_onwire_size(cur_rx_desc.length)
1187 << " align=" << cur_rx_desc.alignment
1188 << dendl;
1189 return _fault();
1190 }
1191
1192 return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1193}
1194
1195CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1196 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1197
1198 if (r < 0) {
1199 ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1200 << cpp_strerror(r) << ")" << dendl;
1201 return _fault();
1202 }
1203
1204 rx_segments_data.emplace_back();
1205 rx_segments_data.back().push_back(std::move(rx_buffer));
1206
1207 // decrypt incoming data
1208 // FIXME: if (auth_meta->is_mode_secure()) {
1209 if (session_stream_handlers.rx) {
1210 ceph_assert(session_stream_handlers.rx);
1211
1212 auto& new_seg = rx_segments_data.back();
1213 if (new_seg.length()) {
1214 auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
1215 std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
1216 const auto idx = rx_segments_data.size() - 1;
1217 new_seg.clear();
1218 padded.splice(0, rx_segments_desc[idx].length, &new_seg);
1219
1220 ldout(cct, 20) << __func__
1221 << " unpadded new_seg.length()=" << new_seg.length()
1222 << dendl;
1223 }
1224 }
1225
1226 if (rx_segments_desc.size() == rx_segments_data.size()) {
1227 // OK, all segments planned to read are read. Can go with epilogue.
1228 return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
1229 } else {
1230 // TODO: for makeshift only. This will be more generic and throttled
1231 return read_frame_segment();
1232 }
1233}
1234
1235CtPtr ProtocolV2::handle_frame_payload() {
1236 ceph_assert(!rx_segments_data.empty());
1237 auto& payload = rx_segments_data.back();
1238
1239 ldout(cct, 30) << __func__ << "\n";
1240 payload.hexdump(*_dout);
1241 *_dout << dendl;
1242
1243 switch (next_tag) {
1244 case Tag::HELLO:
1245 return handle_hello(payload);
1246 case Tag::AUTH_REQUEST:
1247 return handle_auth_request(payload);
1248 case Tag::AUTH_BAD_METHOD:
1249 return handle_auth_bad_method(payload);
1250 case Tag::AUTH_REPLY_MORE:
1251 return handle_auth_reply_more(payload);
1252 case Tag::AUTH_REQUEST_MORE:
1253 return handle_auth_request_more(payload);
1254 case Tag::AUTH_DONE:
1255 return handle_auth_done(payload);
1256 case Tag::AUTH_SIGNATURE:
1257 return handle_auth_signature(payload);
1258 case Tag::CLIENT_IDENT:
1259 return handle_client_ident(payload);
1260 case Tag::SERVER_IDENT:
1261 return handle_server_ident(payload);
1262 case Tag::IDENT_MISSING_FEATURES:
1263 return handle_ident_missing_features(payload);
1264 case Tag::SESSION_RECONNECT:
1265 return handle_reconnect(payload);
1266 case Tag::SESSION_RESET:
1267 return handle_session_reset(payload);
1268 case Tag::SESSION_RETRY:
1269 return handle_session_retry(payload);
1270 case Tag::SESSION_RETRY_GLOBAL:
1271 return handle_session_retry_global(payload);
1272 case Tag::SESSION_RECONNECT_OK:
1273 return handle_reconnect_ok(payload);
1274 case Tag::KEEPALIVE2:
1275 return handle_keepalive2(payload);
1276 case Tag::KEEPALIVE2_ACK:
1277 return handle_keepalive2_ack(payload);
1278 case Tag::ACK:
1279 return handle_message_ack(payload);
1280 case Tag::WAIT:
1281 return handle_wait(payload);
1282 default:
1283 ceph_abort();
1284 }
1285 return nullptr;
1286}
1287
1288CtPtr ProtocolV2::ready() {
1289 ldout(cct, 25) << __func__ << dendl;
1290
1291 reconnecting = false;
1292 replacing = false;
1293
1294 // make sure no pending tick timer
1295 if (connection->last_tick_id) {
1296 connection->center->delete_time_event(connection->last_tick_id);
1297 }
1298 connection->last_tick_id = connection->center->create_time_event(
1299 connection->inactive_timeout_us, connection->tick_handler);
1300
1301 {
1302 std::lock_guard<std::mutex> l(connection->write_lock);
1303 can_write = true;
1304 if (!out_queue.empty()) {
1305 connection->center->dispatch_event_external(connection->write_handler);
1306 }
1307 }
1308
1309 connection->maybe_start_delay_thread();
1310
1311 state = READY;
1312 ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1313 << std::hex << client_cookie << " server_cookie="
1314 << server_cookie << std::dec << " in_seq=" << in_seq
1315 << " out_seq=" << out_seq << dendl;
1316
1317 INTERCEPT(15);
1318
1319 return CONTINUE(read_frame);
1320}
1321
1322CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1323{
1324 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1325
1326 if (r < 0) {
1327 ldout(cct, 1) << __func__ << " read data error " << dendl;
1328 return _fault();
1329 }
1330
1331 __u8 late_flags;
1332
1333 // FIXME: if (auth_meta->is_mode_secure()) {
1334 if (session_stream_handlers.rx) {
1335 ldout(cct, 1) << __func__ << " read frame epilogue bytes="
1336 << get_epilogue_size() << dendl;
1337
1338 // decrypt epilogue and authenticate entire frame.
1339 ceph::bufferlist epilogue_bl;
1340 {
1341 epilogue_bl.push_back(std::move(buffer));
1342 try {
1343 epilogue_bl =
1344 session_stream_handlers.rx->authenticated_decrypt_update_final(
1345 std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
1346 } catch (ceph::crypto::onwire::MsgAuthError &e) {
1347 ldout(cct, 5) << __func__ << " message authentication failed: "
1348 << e.what() << dendl;
1349 return _fault();
1350 }
1351 }
1352 auto& epilogue =
1353 reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
1354 late_flags = epilogue.late_flags;
1355 } else {
1356 auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
1357
1358 for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
1359 const __u32 expected_crc = epilogue.crc_values[idx];
1360 const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
1361 if (expected_crc != calculated_crc) {
1362 ldout(cct, 5) << __func__ << " message integrity check failed: "
1363 << " expected_crc=" << expected_crc
1364 << " calculated_crc=" << calculated_crc
1365 << dendl;
1366 return _fault();
1367 } else {
1368 ldout(cct, 20) << __func__ << " message integrity check success: "
1369 << " expected_crc=" << expected_crc
1370 << " calculated_crc=" << calculated_crc
1371 << dendl;
1372 }
1373 }
1374 late_flags = epilogue.late_flags;
1375 }
1376
1377 // we do have a mechanism that allows transmitter to start sending message
1378 // and abort after putting entire data field on wire. This will be used by
1379 // the kernel client to avoid unnecessary buffering.
1380 if (late_flags & FRAME_FLAGS_LATEABRT) {
1381 reset_throttle();
1382 state = READY;
1383 return CONTINUE(read_frame);
1384 } else {
1385 return handle_read_frame_dispatch();
1386 }
1387}
1388
1389CtPtr ProtocolV2::handle_message() {
1390 ldout(cct, 20) << __func__ << dendl;
1391 ceph_assert(state == THROTTLE_DONE);
1392
9f95a23c
TL
1393#if defined(WITH_EVENTTRACE)
1394 utime_t ltt_recv_stamp = ceph_clock_now();
11fdf7f2
TL
1395#endif
1396 recv_stamp = ceph_clock_now();
1397
1398 // we need to get the size before std::moving segments data
1399 const size_t cur_msg_size = get_current_msg_size();
1400 auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
1401
1402 // XXX: paranoid copy just to avoid oops
1403 ceph_msg_header2 current_header = msg_frame.header();
1404
1405 ldout(cct, 5) << __func__
1406 << " got " << msg_frame.front_len()
1407 << " + " << msg_frame.middle_len()
1408 << " + " << msg_frame.data_len()
1409 << " byte message."
1410 << " envelope type=" << current_header.type
1411 << " src " << peer_name
1412 << " off " << current_header.data_off
1413 << dendl;
1414
1415 INTERCEPT(16);
1416 ceph_msg_header header{current_header.seq,
1417 current_header.tid,
1418 current_header.type,
1419 current_header.priority,
1420 current_header.version,
eafe8130
TL
1421 init_le32(msg_frame.front_len()),
1422 init_le32(msg_frame.middle_len()),
1423 init_le32(msg_frame.data_len()),
11fdf7f2
TL
1424 current_header.data_off,
1425 peer_name,
1426 current_header.compat_version,
1427 current_header.reserved,
eafe8130
TL
1428 init_le32(0)};
1429 ceph_msg_footer footer{init_le32(0), init_le32(0),
1430 init_le32(0), init_le64(0), current_header.flags};
11fdf7f2
TL
1431
1432 Message *message = decode_message(cct, 0, header, footer,
1433 msg_frame.front(),
1434 msg_frame.middle(),
1435 msg_frame.data(),
1436 connection);
1437 if (!message) {
1438 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1439 return _fault();
1440 } else {
1441 state = READ_MESSAGE_COMPLETE;
1442 }
1443
1444 INTERCEPT(17);
1445
1446 message->set_byte_throttler(connection->policy.throttler_bytes);
1447 message->set_message_throttler(connection->policy.throttler_messages);
1448
1449 // store reservation size in message, so we don't get confused
1450 // by messages entering the dispatch queue through other paths.
1451 message->set_dispatch_throttle_size(cur_msg_size);
1452
1453 message->set_recv_stamp(recv_stamp);
1454 message->set_throttle_stamp(throttle_stamp);
1455 message->set_recv_complete_stamp(ceph_clock_now());
1456
1457 // check received seq#. if it is old, drop the message.
1458 // note that incoming messages may skip ahead. this is convenient for the
1459 // client side queueing because messages can't be renumbered, but the (kernel)
1460 // client will occasionally pull a message out of the sent queue to send
1461 // elsewhere. in that case it doesn't matter if we "got" it or not.
1462 uint64_t cur_seq = in_seq;
1463 if (message->get_seq() <= cur_seq) {
1464 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1465 << " <= " << cur_seq << " " << message << " " << *message
1466 << ", discarding" << dendl;
1467 message->put();
1468 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1469 cct->_conf->ms_die_on_old_message) {
1470 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1471 }
1472 return nullptr;
1473 }
1474 if (message->get_seq() > cur_seq + 1) {
1475 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
1476 << cur_seq << " to " << message->get_seq() << dendl;
1477 if (cct->_conf->ms_die_on_skipped_message) {
1478 ceph_assert(0 == "skipped incoming seq");
1479 }
1480 }
1481
9f95a23c 1482#if defined(WITH_EVENTTRACE)
11fdf7f2
TL
1483 if (message->get_type() == CEPH_MSG_OSD_OP ||
1484 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1485 utime_t ltt_processed_stamp = ceph_clock_now();
1486 double usecs_elapsed =
1487 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
1488 ostringstream buf;
1489 if (message->get_type() == CEPH_MSG_OSD_OP)
1490 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1491 false);
1492 else
1493 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1494 false);
1495 }
1496#endif
1497
1498 // note last received message.
1499 in_seq = message->get_seq();
1500 ldout(cct, 5) << __func__ << " received message m=" << message
1501 << " seq=" << message->get_seq()
1502 << " from=" << message->get_source() << " type=" << header.type
1503 << " " << *message << dendl;
1504
1505 bool need_dispatch_writer = false;
1506 if (!connection->policy.lossy) {
1507 ack_left++;
1508 need_dispatch_writer = true;
1509 }
1510
1511 state = READY;
1512
9f95a23c
TL
1513 ceph::mono_time fast_dispatch_time;
1514
1515 if (connection->is_blackhole()) {
1516 ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1517 message->put();
1518 goto out;
1519 }
1520
11fdf7f2
TL
1521 connection->logger->inc(l_msgr_recv_messages);
1522 connection->logger->inc(
1523 l_msgr_recv_bytes,
1524 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1525
1526 messenger->ms_fast_preprocess(message);
9f95a23c 1527 fast_dispatch_time = ceph::mono_clock::now();
11fdf7f2 1528 connection->logger->tinc(l_msgr_running_recv_time,
9f95a23c 1529 fast_dispatch_time - connection->recv_start_time);
11fdf7f2
TL
1530 if (connection->delay_state) {
1531 double delay_period = 0;
1532 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1533 delay_period =
1534 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1535 ldout(cct, 1) << "queue_received will delay after "
1536 << (ceph_clock_now() + delay_period) << " on " << message
1537 << " " << *message << dendl;
1538 }
1539 connection->delay_state->queue(delay_period, message);
1540 } else if (messenger->ms_can_fast_dispatch(message)) {
1541 connection->lock.unlock();
1542 connection->dispatch_queue->fast_dispatch(message);
1543 connection->recv_start_time = ceph::mono_clock::now();
1544 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1545 connection->recv_start_time - fast_dispatch_time);
1546 connection->lock.lock();
9f95a23c
TL
1547 // we might have been reused by another connection
1548 // let's check if that is the case
1549 if (state != READY) {
1550 // yes, that was the case, let's do nothing
1551 return nullptr;
1552 }
11fdf7f2
TL
1553 } else {
1554 connection->dispatch_queue->enqueue(message, message->get_priority(),
1555 connection->conn_id);
1556 }
1557
1558 handle_message_ack(current_header.ack_seq);
1559
9f95a23c 1560 out:
11fdf7f2
TL
1561 if (need_dispatch_writer && connection->is_connected()) {
1562 connection->center->dispatch_event_external(connection->write_handler);
1563 }
1564
1565 return CONTINUE(read_frame);
1566}
1567
1568
1569CtPtr ProtocolV2::throttle_message() {
1570 ldout(cct, 20) << __func__ << dendl;
1571
1572 if (connection->policy.throttler_messages) {
1573 ldout(cct, 10) << __func__ << " wants " << 1
1574 << " message from policy throttler "
1575 << connection->policy.throttler_messages->get_current()
1576 << "/" << connection->policy.throttler_messages->get_max()
1577 << dendl;
1578 if (!connection->policy.throttler_messages->get_or_fail()) {
1579 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
1580 << connection->policy.throttler_messages->get_current()
1581 << "/" << connection->policy.throttler_messages->get_max()
1582 << " failed, just wait." << dendl;
1583 // following thread pool deal with th full message queue isn't a
1584 // short time, so we can wait a ms.
1585 if (connection->register_time_events.empty()) {
1586 connection->register_time_events.insert(
1587 connection->center->create_time_event(1000,
1588 connection->wakeup_handler));
1589 }
1590 return nullptr;
1591 }
1592 }
1593
1594 state = THROTTLE_BYTES;
1595 return CONTINUE(throttle_bytes);
1596}
1597
1598CtPtr ProtocolV2::throttle_bytes() {
1599 ldout(cct, 20) << __func__ << dendl;
1600
1601 const size_t cur_msg_size = get_current_msg_size();
1602 if (cur_msg_size) {
1603 if (connection->policy.throttler_bytes) {
1604 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1605 << " bytes from policy throttler "
1606 << connection->policy.throttler_bytes->get_current() << "/"
1607 << connection->policy.throttler_bytes->get_max() << dendl;
1608 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
1609 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1610 << " bytes from policy throttler "
1611 << connection->policy.throttler_bytes->get_current()
1612 << "/" << connection->policy.throttler_bytes->get_max()
1613 << " failed, just wait." << dendl;
1614 // following thread pool deal with th full message queue isn't a
1615 // short time, so we can wait a ms.
1616 if (connection->register_time_events.empty()) {
1617 connection->register_time_events.insert(
1618 connection->center->create_time_event(
1619 1000, connection->wakeup_handler));
1620 }
1621 return nullptr;
1622 }
1623 }
1624 }
1625
1626 state = THROTTLE_DISPATCH_QUEUE;
1627 return CONTINUE(throttle_dispatch_queue);
1628}
1629
1630CtPtr ProtocolV2::throttle_dispatch_queue() {
1631 ldout(cct, 20) << __func__ << dendl;
1632
1633 const size_t cur_msg_size = get_current_msg_size();
1634 if (cur_msg_size) {
1635 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1636 cur_msg_size)) {
1637 ldout(cct, 10)
1638 << __func__ << " wants " << cur_msg_size
1639 << " bytes from dispatch throttle "
1640 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1641 << connection->dispatch_queue->dispatch_throttler.get_max()
1642 << " failed, just wait." << dendl;
1643 // following thread pool deal with th full message queue isn't a
1644 // short time, so we can wait a ms.
1645 if (connection->register_time_events.empty()) {
1646 connection->register_time_events.insert(
1647 connection->center->create_time_event(1000,
1648 connection->wakeup_handler));
1649 }
1650 return nullptr;
1651 }
1652 }
1653
1654 throttle_stamp = ceph_clock_now();
1655 state = THROTTLE_DONE;
1656
1657 return read_frame_segment();
1658}
1659
1660CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1661{
1662 ldout(cct, 20) << __func__
1663 << " payload.length()=" << payload.length() << dendl;
1664
1665 if (state != READY) {
1666 lderr(cct) << __func__ << " not in ready state!" << dendl;
1667 return _fault();
1668 }
1669
1670 auto keepalive_frame = KeepAliveFrame::Decode(payload);
1671
1672 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1673
1674 connection->write_lock.lock();
1675 append_keepalive_ack(keepalive_frame.timestamp());
1676 connection->write_lock.unlock();
1677
1678 ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1679 << keepalive_frame.timestamp() << dendl;
1680 connection->set_last_keepalive(ceph_clock_now());
1681
1682 if (is_connected()) {
1683 connection->center->dispatch_event_external(connection->write_handler);
1684 }
1685
1686 return CONTINUE(read_frame);
1687}
1688
1689CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1690{
1691 ldout(cct, 20) << __func__
1692 << " payload.length()=" << payload.length() << dendl;
1693
1694 if (state != READY) {
1695 lderr(cct) << __func__ << " not in ready state!" << dendl;
1696 return _fault();
1697 }
1698
1699 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1700 connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1701 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1702
1703 return CONTINUE(read_frame);
1704}
1705
1706CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1707{
1708 ldout(cct, 20) << __func__
1709 << " payload.length()=" << payload.length() << dendl;
1710
1711 if (state != READY) {
1712 lderr(cct) << __func__ << " not in ready state!" << dendl;
1713 return _fault();
1714 }
1715
1716 auto ack = AckFrame::Decode(payload);
1717 handle_message_ack(ack.seq());
1718 return CONTINUE(read_frame);
1719}
1720
1721/* Client Protocol Methods */
1722
1723CtPtr ProtocolV2::start_client_banner_exchange() {
1724 ldout(cct, 20) << __func__ << dendl;
1725
1726 INTERCEPT(1);
1727
1728 state = BANNER_CONNECTING;
1729
1730 global_seq = messenger->get_global_seq();
1731
1732 return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1733}
1734
1735CtPtr ProtocolV2::post_client_banner_exchange() {
1736 ldout(cct, 20) << __func__ << dendl;
1737
1738 state = AUTH_CONNECTING;
1739
1740 return send_auth_request();
1741}
1742
1743CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
9f95a23c 1744 ceph_assert(messenger->auth_client);
11fdf7f2
TL
1745 ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1746 << " auth_client " << messenger->auth_client << dendl;
11fdf7f2
TL
1747
1748 bufferlist bl;
1749 vector<uint32_t> preferred_modes;
1750 auto am = auth_meta;
1751 connection->lock.unlock();
1752 int r = messenger->auth_client->get_auth_request(
1753 connection, am.get(),
1754 &am->auth_method, &preferred_modes, &bl);
1755 connection->lock.lock();
1756 if (state != AUTH_CONNECTING) {
1757 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1758 return _fault();
1759 }
1760 if (r < 0) {
1761 ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1762 << dendl;
1763 stop();
1764 connection->dispatch_queue->queue_reset(connection);
1765 return nullptr;
1766 }
1767
1768 INTERCEPT(9);
1769
1770 auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1771 bl);
1772 return WRITE(frame, "auth request", read_frame);
1773}
1774
1775CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1776 ldout(cct, 20) << __func__
1777 << " payload.length()=" << payload.length() << dendl;
1778
1779 if (state != AUTH_CONNECTING) {
1780 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1781 return _fault();
1782 }
1783
1784 auto bad_method = AuthBadMethodFrame::Decode(payload);
1785 ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1786 << " result " << cpp_strerror(bad_method.result())
1787 << ", allowed methods=" << bad_method.allowed_methods()
1788 << ", allowed modes=" << bad_method.allowed_modes()
1789 << dendl;
1790 ceph_assert(messenger->auth_client);
1791 auto am = auth_meta;
1792 connection->lock.unlock();
1793 int r = messenger->auth_client->handle_auth_bad_method(
1794 connection,
1795 am.get(),
1796 bad_method.method(), bad_method.result(),
1797 bad_method.allowed_methods(),
1798 bad_method.allowed_modes());
1799 connection->lock.lock();
1800 if (state != AUTH_CONNECTING || r < 0) {
1801 return _fault();
1802 }
1803 return send_auth_request(bad_method.allowed_methods());
1804}
1805
1806CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1807{
1808 ldout(cct, 20) << __func__
1809 << " payload.length()=" << payload.length() << dendl;
1810
1811 if (state != AUTH_CONNECTING) {
1812 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1813 return _fault();
1814 }
1815
1816 auto auth_more = AuthReplyMoreFrame::Decode(payload);
1817 ldout(cct, 5) << __func__
1818 << " auth reply more len=" << auth_more.auth_payload().length()
1819 << dendl;
1820 ceph_assert(messenger->auth_client);
1821 ceph::bufferlist reply;
1822 auto am = auth_meta;
1823 connection->lock.unlock();
1824 int r = messenger->auth_client->handle_auth_reply_more(
1825 connection, am.get(), auth_more.auth_payload(), &reply);
1826 connection->lock.lock();
1827 if (state != AUTH_CONNECTING) {
1828 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1829 return _fault();
1830 }
1831 if (r < 0) {
1832 lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1833 << r << dendl;
1834 return _fault();
1835 }
1836 auto more_reply = AuthRequestMoreFrame::Encode(reply);
1837 return WRITE(more_reply, "auth request more", read_frame);
1838}
1839
1840CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1841{
1842 ldout(cct, 20) << __func__
1843 << " payload.length()=" << payload.length() << dendl;
1844
1845 if (state != AUTH_CONNECTING) {
1846 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1847 return _fault();
1848 }
1849
1850 auto auth_done = AuthDoneFrame::Decode(payload);
1851
1852 ceph_assert(messenger->auth_client);
1853 auto am = auth_meta;
1854 connection->lock.unlock();
1855 int r = messenger->auth_client->handle_auth_done(
1856 connection,
1857 am.get(),
1858 auth_done.global_id(),
1859 auth_done.con_mode(),
1860 auth_done.auth_payload(),
1861 &am->session_key,
1862 &am->connection_secret);
1863 connection->lock.lock();
1864 if (state != AUTH_CONNECTING) {
1865 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1866 return _fault();
1867 }
1868 if (r < 0) {
1869 return _fault();
1870 }
1871 auth_meta->con_mode = auth_done.con_mode();
1872 session_stream_handlers = \
1873 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
1874
1875 state = AUTH_CONNECTING_SIGN;
1876
1877 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1878 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1879 auto sig_frame = AuthSignatureFrame::Encode(sig);
1880 pre_auth.enabled = false;
1881 pre_auth.rxbuf.clear();
1882 return WRITE(sig_frame, "auth signature", read_frame);
1883}
1884
1885CtPtr ProtocolV2::finish_client_auth() {
1886 if (!server_cookie) {
1887 ceph_assert(connect_seq == 0);
1888 state = SESSION_CONNECTING;
1889 return send_client_ident();
1890 } else { // reconnecting to previous session
1891 state = SESSION_RECONNECTING;
1892 ceph_assert(connect_seq > 0);
1893 return send_reconnect();
1894 }
1895}
1896
1897CtPtr ProtocolV2::send_client_ident() {
1898 ldout(cct, 20) << __func__ << dendl;
1899
1900 if (!connection->policy.lossy && !client_cookie) {
1901 client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1902 }
1903
1904 uint64_t flags = 0;
1905 if (connection->policy.lossy) {
1906 flags |= CEPH_MSG_CONNECT_LOSSY;
1907 }
1908
11fdf7f2
TL
1909 auto client_ident = ClientIdentFrame::Encode(
1910 messenger->get_myaddrs(),
1911 connection->target_addr,
1912 messenger->get_myname().num(),
1913 global_seq,
1914 connection->policy.features_supported,
1915 connection->policy.features_required | msgr2_required,
1916 flags,
1917 client_cookie);
1918
1919 ldout(cct, 5) << __func__ << " sending identification: "
1920 << "addrs=" << messenger->get_myaddrs()
1921 << " target=" << connection->target_addr
1922 << " gid=" << messenger->get_myname().num()
1923 << " global_seq=" << global_seq
1924 << " features_supported=" << std::hex
1925 << connection->policy.features_supported
1926 << " features_required="
1927 << (connection->policy.features_required | msgr2_required)
1928 << " flags=" << flags
1929 << " cookie=" << client_cookie << std::dec << dendl;
1930
1931 INTERCEPT(11);
1932
1933 return WRITE(client_ident, "client ident", read_frame);
1934}
1935
1936CtPtr ProtocolV2::send_reconnect() {
1937 ldout(cct, 20) << __func__ << dendl;
1938
1939 auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1940 client_cookie,
1941 server_cookie,
1942 global_seq,
1943 connect_seq,
1944 in_seq);
1945
1946 ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1947 << std::hex << client_cookie << " server_cookie="
1948 << server_cookie << std::dec
1949 << " gs=" << global_seq << " cs=" << connect_seq
1950 << " ms=" << in_seq << dendl;
1951
1952 INTERCEPT(13);
1953
1954 return WRITE(reconnect, "reconnect", read_frame);
1955}
1956
1957CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1958{
1959 ldout(cct, 20) << __func__
1960 << " payload.length()=" << payload.length() << dendl;
1961
1962 if (state != SESSION_CONNECTING) {
1963 lderr(cct) << __func__ << " not in session connect state!" << dendl;
1964 return _fault();
1965 }
1966
1967 auto ident_missing =
1968 IdentMissingFeaturesFrame::Decode(payload);
1969 lderr(cct) << __func__
1970 << " client does not support all server features: " << std::hex
1971 << ident_missing.features() << std::dec << dendl;
1972
1973 return _fault();
1974}
1975
1976CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
1977{
1978 ldout(cct, 20) << __func__
1979 << " payload.length()=" << payload.length() << dendl;
1980
1981 if (state != SESSION_RECONNECTING) {
1982 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1983 return _fault();
1984 }
1985
1986 auto reset = ResetFrame::Decode(payload);
1987
1988 ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
1989 << dendl;
1990 if (reset.full()) {
1991 reset_session();
1992 } else {
1993 server_cookie = 0;
1994 connect_seq = 0;
1995 in_seq = 0;
1996 }
1997
1998 state = SESSION_CONNECTING;
1999 return send_client_ident();
2000}
2001
2002CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
2003{
2004 ldout(cct, 20) << __func__
2005 << " payload.length()=" << payload.length() << dendl;
2006
2007 if (state != SESSION_RECONNECTING) {
2008 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2009 return _fault();
2010 }
2011
2012 auto retry = RetryFrame::Decode(payload);
2013 connect_seq = retry.connect_seq() + 1;
2014
2015 ldout(cct, 1) << __func__
2016 << " received session retry connect_seq=" << retry.connect_seq()
2017 << ", inc to cs=" << connect_seq << dendl;
2018
2019 return send_reconnect();
2020}
2021
2022CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
2023{
2024 ldout(cct, 20) << __func__
2025 << " payload.length()=" << payload.length() << dendl;
2026
2027 if (state != SESSION_RECONNECTING) {
2028 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2029 return _fault();
2030 }
2031
2032 auto retry = RetryGlobalFrame::Decode(payload);
2033 global_seq = messenger->get_global_seq(retry.global_seq());
2034
2035 ldout(cct, 1) << __func__ << " received session retry global global_seq="
2036 << retry.global_seq() << ", choose new gs=" << global_seq
2037 << dendl;
2038
2039 return send_reconnect();
2040}
2041
2042CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
2043 ldout(cct, 20) << __func__
2044 << " received WAIT (connection race)"
2045 << " payload.length()=" << payload.length()
2046 << dendl;
2047
2048 if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
2049 lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
2050 return _fault();
2051 }
2052
2053 state = WAIT;
2054 WaitFrame::Decode(payload);
2055 return _fault();
2056}
2057
2058CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2059{
2060 ldout(cct, 20) << __func__
2061 << " payload.length()=" << payload.length() << dendl;
2062
2063 if (state != SESSION_RECONNECTING) {
2064 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2065 return _fault();
2066 }
2067
2068 auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2069 ldout(cct, 5) << __func__
2070 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2071 << dendl;
2072
2073 out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2074
2075 backoff = utime_t();
2076 ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2077 << ", lossy = " << connection->policy.lossy << ", features "
2078 << connection->get_features() << dendl;
2079
2080 if (connection->delay_state) {
2081 ceph_assert(connection->delay_state->ready());
2082 }
2083
2084 connection->dispatch_queue->queue_connect(connection);
2085 messenger->ms_deliver_handle_fast_connect(connection);
2086
2087 return ready();
2088}
2089
2090CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2091{
2092 ldout(cct, 20) << __func__
2093 << " payload.length()=" << payload.length() << dendl;
2094
2095 if (state != SESSION_CONNECTING) {
2096 lderr(cct) << __func__ << " not in session connect state!" << dendl;
2097 return _fault();
2098 }
2099
2100 auto server_ident = ServerIdentFrame::Decode(payload);
2101 ldout(cct, 5) << __func__ << " received server identification:"
2102 << " addrs=" << server_ident.addrs()
2103 << " gid=" << server_ident.gid()
2104 << " global_seq=" << server_ident.global_seq()
2105 << " features_supported=" << std::hex
2106 << server_ident.supported_features()
2107 << " features_required=" << server_ident.required_features()
2108 << " flags=" << server_ident.flags() << " cookie=" << std::dec
2109 << server_ident.cookie() << dendl;
2110
2111 // is this who we intended to talk to?
2112 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2113 // of mon_host or something.
2114 if (!server_ident.addrs().contains(connection->target_addr)) {
2115 ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2116 << ", does not include " << connection->target_addr << dendl;
2117 return _fault();
2118 }
2119
2120 server_cookie = server_ident.cookie();
2121
2122 connection->set_peer_addrs(server_ident.addrs());
2123 peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2124 connection->set_features(server_ident.supported_features() &
2125 connection->policy.features_supported);
2126 peer_global_seq = server_ident.global_seq();
2127
2128 connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2129
2130 backoff = utime_t();
2131 ldout(cct, 10) << __func__ << " connect success " << connect_seq
2132 << ", lossy = " << connection->policy.lossy << ", features "
2133 << connection->get_features() << dendl;
2134
2135 if (connection->delay_state) {
2136 ceph_assert(connection->delay_state->ready());
2137 }
2138
2139 connection->dispatch_queue->queue_connect(connection);
2140 messenger->ms_deliver_handle_fast_connect(connection);
2141
2142 return ready();
2143}
2144
2145/* Server Protocol Methods */
2146
2147CtPtr ProtocolV2::start_server_banner_exchange() {
2148 ldout(cct, 20) << __func__ << dendl;
2149
2150 INTERCEPT(2);
2151
2152 state = BANNER_ACCEPTING;
2153
2154 return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2155}
2156
2157CtPtr ProtocolV2::post_server_banner_exchange() {
2158 ldout(cct, 20) << __func__ << dendl;
2159
2160 state = AUTH_ACCEPTING;
2161
2162 return CONTINUE(read_frame);
2163}
2164
2165CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2166 ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2167 << dendl;
2168
2169 if (state != AUTH_ACCEPTING) {
2170 lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2171 return _fault();
2172 }
2173
2174 auto request = AuthRequestFrame::Decode(payload);
2175 ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2176 << ", preferred_modes=" << request.preferred_modes()
2177 << ", payload_len=" << request.auth_payload().length() << ")"
2178 << dendl;
2179 auth_meta->auth_method = request.method();
2180 auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2181 connection->get_peer_type(), auth_meta->auth_method,
2182 request.preferred_modes());
2183 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2184 return _auth_bad_method(-EOPNOTSUPP);
2185 }
2186 return _handle_auth_request(request.auth_payload(), false);
2187}
2188
2189CtPtr ProtocolV2::_auth_bad_method(int r)
2190{
2191 ceph_assert(r < 0);
2192 std::vector<uint32_t> allowed_methods;
2193 std::vector<uint32_t> allowed_modes;
2194 messenger->auth_server->get_supported_auth_methods(
2195 connection->get_peer_type(), &allowed_methods, &allowed_modes);
2196 ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2197 << " r " << cpp_strerror(r)
2198 << ", allowed_methods " << allowed_methods
2199 << ", allowed_modes " << allowed_modes
2200 << dendl;
2201 auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2202 allowed_methods, allowed_modes);
2203 return WRITE(bad_method, "bad auth method", read_frame);
2204}
2205
2206CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
2207{
2208 if (!messenger->auth_server) {
2209 return _fault();
2210 }
2211 bufferlist reply;
2212 auto am = auth_meta;
2213 connection->lock.unlock();
2214 int r = messenger->auth_server->handle_auth_request(
2215 connection, am.get(),
2216 more, am->auth_method, auth_payload,
2217 &reply);
2218 connection->lock.lock();
2219 if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2220 ldout(cct, 1) << __func__
2221 << " state changed while accept, it must be mark_down"
2222 << dendl;
2223 ceph_assert(state == CLOSED);
2224 return _fault();
2225 }
2226 if (r == 1) {
2227 INTERCEPT(10);
2228 state = AUTH_ACCEPTING_SIGN;
2229
2230 auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2231 auth_meta->con_mode,
2232 reply);
2233 return WRITE(auth_done, "auth done", finish_auth);
2234 } else if (r == 0) {
2235 state = AUTH_ACCEPTING_MORE;
2236
2237 auto more = AuthReplyMoreFrame::Encode(reply);
2238 return WRITE(more, "auth reply more", read_frame);
2239 } else if (r == -EBUSY) {
2240 // kick the client and maybe they'll come back later
2241 return _fault();
2242 } else {
2243 return _auth_bad_method(r);
2244 }
2245}
2246
2247CtPtr ProtocolV2::finish_auth()
2248{
2249 ceph_assert(auth_meta);
2250 // TODO: having a possibility to check whether we're server or client could
2251 // allow reusing finish_auth().
2252 session_stream_handlers = \
2253 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
2254
2255 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2256 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2257 auto sig_frame = AuthSignatureFrame::Encode(sig);
2258 pre_auth.enabled = false;
2259 pre_auth.rxbuf.clear();
2260 return WRITE(sig_frame, "auth signature", read_frame);
2261}
2262
2263CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2264{
2265 ldout(cct, 20) << __func__
2266 << " payload.length()=" << payload.length() << dendl;
2267
2268 if (state != AUTH_ACCEPTING_MORE) {
2269 lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2270 return _fault();
2271 }
2272
2273 auto auth_more = AuthRequestMoreFrame::Decode(payload);
2274 return _handle_auth_request(auth_more.auth_payload(), true);
2275}
2276
2277CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2278{
2279 ldout(cct, 20) << __func__
2280 << " payload.length()=" << payload.length() << dendl;
2281
2282 if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2283 lderr(cct) << __func__
2284 << " pre-auth verification signature seen in wrong state!"
2285 << dendl;
2286 return _fault();
2287 }
2288
2289 auto sig_frame = AuthSignatureFrame::Decode(payload);
2290
2291 const auto actual_tx_sig = auth_meta->session_key.empty() ?
2292 sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2293 if (sig_frame.signature() != actual_tx_sig) {
2294 ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2295 << " actual_tx_sig=" << actual_tx_sig
2296 << " sig_frame.signature()=" << sig_frame.signature()
2297 << dendl;
2298 return _fault();
2299 } else {
2300 ldout(cct, 20) << __func__ << " pre-auth signature success"
2301 << " sig_frame.signature()=" << sig_frame.signature()
2302 << dendl;
2303 pre_auth.txbuf.clear();
2304 }
2305
2306 if (state == AUTH_ACCEPTING_SIGN) {
2307 // server had sent AuthDone and client responded with correct pre-auth
2308 // signature. we can start accepting new sessions/reconnects.
2309 state = SESSION_ACCEPTING;
2310 return CONTINUE(read_frame);
2311 } else if (state == AUTH_CONNECTING_SIGN) {
2312 // this happened at client side
2313 return finish_client_auth();
2314 } else {
9f95a23c 2315 ceph_abort("state corruption");
11fdf7f2
TL
2316 }
2317}
2318
2319CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2320{
2321 ldout(cct, 20) << __func__
2322 << " payload.length()=" << payload.length() << dendl;
2323
2324 if (state != SESSION_ACCEPTING) {
2325 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2326 return _fault();
2327 }
2328
2329 auto client_ident = ClientIdentFrame::Decode(payload);
2330
2331 ldout(cct, 5) << __func__ << " received client identification:"
2332 << " addrs=" << client_ident.addrs()
2333 << " target=" << client_ident.target_addr()
2334 << " gid=" << client_ident.gid()
2335 << " global_seq=" << client_ident.global_seq()
2336 << " features_supported=" << std::hex
2337 << client_ident.supported_features()
2338 << " features_required=" << client_ident.required_features()
2339 << " flags=" << client_ident.flags()
2340 << " cookie=" << client_ident.cookie() << std::dec << dendl;
2341
2342 if (client_ident.addrs().empty() ||
2343 client_ident.addrs().front() == entity_addr_t()) {
2344 ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2345 return _fault(); // a v2 peer should never do this
2346 }
2347 if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2348 ldout(cct,5) << __func__ << " peer is trying to reach "
2349 << client_ident.target_addr()
2350 << " which is not us (" << messenger->get_myaddrs() << ")"
2351 << dendl;
2352 return _fault();
2353 }
2354
2355 connection->set_peer_addrs(client_ident.addrs());
2356 connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2357
2358 peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2359 connection->set_peer_id(client_ident.gid());
2360
2361 client_cookie = client_ident.cookie();
2362
2363 uint64_t feat_missing =
2364 (connection->policy.features_required | msgr2_required) &
2365 ~(uint64_t)client_ident.supported_features();
2366 if (feat_missing) {
2367 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2368 << feat_missing << std::dec << dendl;
2369 auto ident_missing_features =
2370 IdentMissingFeaturesFrame::Encode(feat_missing);
2371
2372 return WRITE(ident_missing_features, "ident missing features", read_frame);
2373 }
2374
2375 connection_features =
2376 client_ident.supported_features() & connection->policy.features_supported;
2377
2378 peer_global_seq = client_ident.global_seq();
2379
9f95a23c
TL
2380 if (connection->policy.server &&
2381 connection->policy.lossy &&
2382 !connection->policy.register_lossy_clients) {
2383 // incoming lossy client, no need to register this connection
2384 } else {
2385 // Looks good so far, let's check if there is already an existing connection
2386 // to this peer.
2387 connection->lock.unlock();
2388 AsyncConnectionRef existing = messenger->lookup_conn(
2389 *connection->peer_addrs);
2390
2391 if (existing &&
2392 existing->protocol->proto_type != 2) {
2393 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2394 << existing->protocol.get() << " version is "
2395 << existing->protocol->proto_type << ", marking down"
2396 << dendl;
2397 existing->mark_down();
2398 existing = nullptr;
2399 }
11fdf7f2 2400
9f95a23c 2401 connection->inject_delay();
11fdf7f2 2402
9f95a23c
TL
2403 connection->lock.lock();
2404 if (state != SESSION_ACCEPTING) {
2405 ldout(cct, 1) << __func__
2406 << " state changed while accept, it must be mark_down"
2407 << dendl;
2408 ceph_assert(state == CLOSED);
2409 return _fault();
2410 }
11fdf7f2 2411
9f95a23c
TL
2412 if (existing) {
2413 return handle_existing_connection(existing);
2414 }
11fdf7f2
TL
2415 }
2416
2417 // if everything is OK reply with server identification
2418 return send_server_ident();
2419}
2420
2421CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2422{
2423 ldout(cct, 20) << __func__
2424 << " payload.length()=" << payload.length() << dendl;
2425
2426 if (state != SESSION_ACCEPTING) {
2427 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2428 return _fault();
2429 }
2430
2431 auto reconnect = ReconnectFrame::Decode(payload);
2432
2433 ldout(cct, 5) << __func__
2434 << " received reconnect:"
2435 << " client_cookie=" << std::hex << reconnect.client_cookie()
2436 << " server_cookie=" << reconnect.server_cookie() << std::dec
2437 << " gs=" << reconnect.global_seq()
2438 << " cs=" << reconnect.connect_seq()
2439 << " ms=" << reconnect.msg_seq()
2440 << dendl;
2441
2442 // Should we check if one of the ident.addrs match connection->target_addr
2443 // as we do in ProtocolV1?
2444 connection->set_peer_addrs(reconnect.addrs());
2445 connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2446 peer_global_seq = reconnect.global_seq();
2447
2448 connection->lock.unlock();
2449 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2450
2451 if (existing &&
2452 existing->protocol->proto_type != 2) {
2453 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2454 << existing->protocol.get() << " version is "
2455 << existing->protocol->proto_type << ", marking down" << dendl;
2456 existing->mark_down();
2457 existing = nullptr;
2458 }
2459
2460 connection->inject_delay();
2461
2462 connection->lock.lock();
2463 if (state != SESSION_ACCEPTING) {
2464 ldout(cct, 1) << __func__
2465 << " state changed while accept, it must be mark_down"
2466 << dendl;
2467 ceph_assert(state == CLOSED);
2468 return _fault();
2469 }
2470
2471 if (!existing) {
2472 // there is no existing connection therefore cannot reconnect to previous
2473 // session
2474 ldout(cct, 0) << __func__
2475 << " no existing connection exists, reseting client" << dendl;
2476 auto reset = ResetFrame::Encode(true);
2477 return WRITE(reset, "session reset", read_frame);
2478 }
2479
2480 std::lock_guard<std::mutex> l(existing->lock);
2481
2482 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2483 if (!exproto) {
2484 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2485 ceph_assert(false);
2486 }
2487
2488 if (exproto->state == CLOSED) {
2489 ldout(cct, 5) << __func__ << " existing " << existing
2490 << " already closed. Reseting client" << dendl;
2491 auto reset = ResetFrame::Encode(true);
2492 return WRITE(reset, "session reset", read_frame);
2493 }
2494
2495 if (exproto->replacing) {
2496 ldout(cct, 1) << __func__
2497 << " existing racing replace happened while replacing."
2498 << " existing=" << existing << dendl;
2499 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2500 return WRITE(retry, "session retry", read_frame);
2501 }
2502
2503 if (exproto->client_cookie != reconnect.client_cookie()) {
2504 ldout(cct, 1) << __func__ << " existing=" << existing
2505 << " client cookie mismatch, I must have reseted:"
2506 << " cc=" << std::hex << exproto->client_cookie
2507 << " rcc=" << reconnect.client_cookie()
2508 << ", reseting client." << std::dec
2509 << dendl;
2510 auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2511 return WRITE(reset, "session reset", read_frame);
2512 } else if (exproto->server_cookie == 0) {
2513 // this happens when:
2514 // - a connects to b
2515 // - a sends client_ident
2516 // - b gets client_ident, sends server_ident and sets cookie X
2517 // - connection fault
2518 // - b reconnects to a with cookie X, connect_seq=1
2519 // - a has cookie==0
2520 ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2521 << " server_ident. Asking peer to resume session"
2522 << " establishment" << dendl;
2523 auto reset = ResetFrame::Encode(false);
2524 return WRITE(reset, "session reset", read_frame);
2525 }
2526
2527 if (exproto->peer_global_seq > reconnect.global_seq()) {
2528 ldout(cct, 5) << __func__
2529 << " stale global_seq: sgs=" << exproto->peer_global_seq
2530 << " cgs=" << reconnect.global_seq()
2531 << ", ask client to retry global" << dendl;
2532 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2533
2534 INTERCEPT(18);
2535
2536 return WRITE(retry, "session retry", read_frame);
2537 }
2538
2539 if (exproto->connect_seq > reconnect.connect_seq()) {
2540 ldout(cct, 5) << __func__
2541 << " stale connect_seq scs=" << exproto->connect_seq
2542 << " ccs=" << reconnect.connect_seq()
2543 << " , ask client to retry" << dendl;
2544 auto retry = RetryFrame::Encode(exproto->connect_seq);
2545 return WRITE(retry, "session retry", read_frame);
2546 }
2547
2548 if (exproto->connect_seq == reconnect.connect_seq()) {
2549 // reconnect race: both peers are sending reconnect messages
2550 if (existing->peer_addrs->msgr2_addr() >
2551 messenger->get_myaddrs().msgr2_addr() &&
2552 !existing->policy.server) {
2553 // the existing connection wins
2554 ldout(cct, 1)
2555 << __func__
2556 << " reconnect race detected, this connection loses to existing="
2557 << existing << dendl;
2558
2559 auto wait = WaitFrame::Encode();
2560 return WRITE(wait, "wait", read_frame);
2561 } else {
2562 // this connection wins
2563 ldout(cct, 1) << __func__
2564 << " reconnect race detected, replacing existing="
2565 << existing << " socket by this connection's socket"
2566 << dendl;
2567 }
2568 }
2569
2570 ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2571
2572 reconnecting = true;
2573
2574 // everything looks good
2575 exproto->connect_seq = reconnect.connect_seq();
2576 exproto->message_seq = reconnect.msg_seq();
2577
2578 return reuse_connection(existing, exproto);
2579}
2580
9f95a23c 2581CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
11fdf7f2
TL
2582 ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2583
2584 std::lock_guard<std::mutex> l(existing->lock);
2585
2586 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2587 if (!exproto) {
2588 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2589 ceph_assert(false);
2590 }
2591
2592 if (exproto->state == CLOSED) {
2593 ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2594 << dendl;
2595 return send_server_ident();
2596 }
2597
2598 if (exproto->replacing) {
2599 ldout(cct, 1) << __func__
2600 << " existing racing replace happened while replacing."
2601 << " existing=" << existing << dendl;
2602 auto wait = WaitFrame::Encode();
2603 return WRITE(wait, "wait", read_frame);
2604 }
2605
2606 if (exproto->peer_global_seq > peer_global_seq) {
2607 ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2608 << peer_global_seq
2609 << " existing->peer_global_seq=" << exproto->peer_global_seq
2610 << ", stopping this connection." << dendl;
2611 stop();
2612 connection->dispatch_queue->queue_reset(connection);
2613 return nullptr;
2614 }
2615
2616 if (existing->policy.lossy) {
2617 // existing connection can be thrown out in favor of this one
2618 ldout(cct, 1)
2619 << __func__ << " existing=" << existing
2620 << " is a lossy channel. Stopping existing in favor of this connection"
2621 << dendl;
2622 existing->protocol->stop();
2623 existing->dispatch_queue->queue_reset(existing.get());
2624 return send_server_ident();
2625 }
2626
2627 if (exproto->server_cookie && exproto->client_cookie &&
2628 exproto->client_cookie != client_cookie) {
2629 // Found previous session
2630 // peer has reseted and we're going to reuse the existing connection
2631 // by replacing the communication socket
2632 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2633 << ", peer must have reseted." << dendl;
2634 if (connection->policy.resetcheck) {
2635 exproto->reset_session();
2636 }
2637 return reuse_connection(existing, exproto);
2638 }
2639
2640 if (exproto->client_cookie == client_cookie) {
2641 // session establishment interrupted between client_ident and server_ident,
2642 // continuing...
2643 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2644 << ", continuing session establishment." << dendl;
2645 return reuse_connection(existing, exproto);
2646 }
2647
2648 if (exproto->state == READY || exproto->state == STANDBY) {
2649 ldout(cct, 1) << __func__ << " existing=" << existing
2650 << " is READY/STANDBY, lets reuse it" << dendl;
2651 return reuse_connection(existing, exproto);
2652 }
2653
2654 // Looks like a connection race: server and client are both connecting to
2655 // each other at the same time.
2656 if (connection->peer_addrs->msgr2_addr() <
2657 messenger->get_myaddrs().msgr2_addr() ||
2658 existing->policy.server) {
2659 // this connection wins
2660 ldout(cct, 1) << __func__
2661 << " connection race detected, replacing existing="
2662 << existing << " socket by this connection's socket" << dendl;
2663 return reuse_connection(existing, exproto);
2664 } else {
2665 // the existing connection wins
2666 ldout(cct, 1)
2667 << __func__
2668 << " connection race detected, this connection loses to existing="
2669 << existing << dendl;
2670 ceph_assert(connection->peer_addrs->msgr2_addr() >
2671 messenger->get_myaddrs().msgr2_addr());
2672
2673 // make sure we follow through with opening the existing
2674 // connection (if it isn't yet open) since we know the peer
2675 // has something to send to us.
2676 existing->send_keepalive();
2677 auto wait = WaitFrame::Encode();
2678 return WRITE(wait, "wait", read_frame);
2679 }
2680}
2681
9f95a23c 2682CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
11fdf7f2
TL
2683 ProtocolV2 *exproto) {
2684 ldout(cct, 20) << __func__ << " existing=" << existing
2685 << " reconnect=" << reconnecting << dendl;
2686
2687 connection->inject_delay();
2688
2689 std::lock_guard<std::mutex> l(existing->write_lock);
2690
2691 connection->center->delete_file_event(connection->cs.fd(),
2692 EVENT_READABLE | EVENT_WRITABLE);
2693
2694 if (existing->delay_state) {
2695 existing->delay_state->flush();
2696 ceph_assert(!connection->delay_state);
2697 }
2698 exproto->reset_recv_state();
2699 exproto->pre_auth.enabled = false;
2700
2701 if (!reconnecting) {
2702 exproto->client_cookie = client_cookie;
2703 exproto->peer_name = peer_name;
2704 exproto->connection_features = connection_features;
2705 existing->set_features(connection_features);
2706 }
2707 exproto->peer_global_seq = peer_global_seq;
2708
9f95a23c 2709 ceph_assert(connection->center->in_thread());
11fdf7f2
TL
2710 auto temp_cs = std::move(connection->cs);
2711 EventCenter *new_center = connection->center;
2712 Worker *new_worker = connection->worker;
9f95a23c
TL
2713 // we can steal the session_stream_handlers under the assumption
2714 // this happens in the event center's thread as there should be
2715 // no user outside its boundaries (simlarly to e.g. outgoing_bl).
2716 auto temp_stream_handlers = std::move(session_stream_handlers);
2717 exproto->auth_meta = auth_meta;
11fdf7f2
TL
2718
2719 ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2720 << dendl;
494da23a 2721
11fdf7f2
TL
2722 // avoid _stop shutdown replacing socket
2723 // queue a reset on the new connection, which we're dumping for the old
2724 stop();
2725
2726 connection->dispatch_queue->queue_reset(connection);
2727
2728 exproto->can_write = false;
494da23a 2729 exproto->write_in_progress = false;
11fdf7f2
TL
2730 exproto->reconnecting = reconnecting;
2731 exproto->replacing = true;
11fdf7f2
TL
2732 existing->state_offset = 0;
2733 // avoid previous thread modify event
2734 exproto->state = NONE;
2735 existing->state = AsyncConnection::STATE_NONE;
2736 // Discard existing prefetch buffer in `recv_buf`
2737 existing->recv_start = existing->recv_end = 0;
2738 // there shouldn't exist any buffer
2739 ceph_assert(connection->recv_start == connection->recv_end);
2740
2741 auto deactivate_existing = std::bind(
9f95a23c
TL
2742 [ existing,
2743 new_worker,
2744 new_center,
2745 exproto,
2746 temp_stream_handlers=std::move(temp_stream_handlers)
2747 ](ConnectedSocket &cs) mutable {
11fdf7f2
TL
2748 // we need to delete time event in original thread
2749 {
2750 std::lock_guard<std::mutex> l(existing->lock);
2751 existing->write_lock.lock();
2752 exproto->requeue_sent();
9f95a23c
TL
2753 // XXX: do we really need the locking for `outgoing_bl`? There is
2754 // a comment just above its definition saying "lockfree, only used
2755 // in own thread". I'm following lockfull schema just in the case.
2756 // From performance point of view it should be fine – this happens
2757 // far away from hot paths.
2758 existing->outgoing_bl.clear();
11fdf7f2 2759 existing->open_write = false;
9f95a23c 2760 exproto->session_stream_handlers = std::move(temp_stream_handlers);
11fdf7f2
TL
2761 existing->write_lock.unlock();
2762 if (exproto->state == NONE) {
2763 existing->shutdown_socket();
2764 existing->cs = std::move(cs);
2765 existing->worker->references--;
2766 new_worker->references++;
2767 existing->logger = new_worker->get_perf_counter();
2768 existing->worker = new_worker;
2769 existing->center = new_center;
2770 if (existing->delay_state)
2771 existing->delay_state->set_center(new_center);
2772 } else if (exproto->state == CLOSED) {
2773 auto back_to_close = std::bind(
2774 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2775 new_center->submit_to(new_center->get_id(),
2776 std::move(back_to_close), true);
2777 return;
2778 } else {
2779 ceph_abort();
2780 }
2781 }
2782
2783 // Before changing existing->center, it may already exists some
2784 // events in existing->center's queue. Then if we mark down
2785 // `existing`, it will execute in another thread and clean up
2786 // connection. Previous event will result in segment fault
2787 auto transfer_existing = [existing, exproto]() mutable {
2788 std::lock_guard<std::mutex> l(existing->lock);
2789 if (exproto->state == CLOSED) return;
2790 ceph_assert(exproto->state == NONE);
2791
2792 exproto->state = SESSION_ACCEPTING;
81eedcae
TL
2793 // we have called shutdown_socket above
2794 ceph_assert(existing->last_tick_id == 0);
2795 // restart timer since we are going to re-build connection
2796 existing->last_connect_started = ceph::coarse_mono_clock::now();
2797 existing->last_tick_id = existing->center->create_time_event(
2798 existing->connect_timeout_us, existing->tick_handler);
11fdf7f2
TL
2799 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2800 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2801 existing->read_handler);
2802 if (!exproto->reconnecting) {
2803 exproto->run_continuation(exproto->send_server_ident());
2804 } else {
2805 exproto->run_continuation(exproto->send_reconnect_ok());
2806 }
2807 };
2808 if (existing->center->in_thread())
2809 transfer_existing();
2810 else
2811 existing->center->submit_to(existing->center->get_id(),
2812 std::move(transfer_existing), true);
2813 },
2814 std::move(temp_cs));
2815
2816 existing->center->submit_to(existing->center->get_id(),
2817 std::move(deactivate_existing), true);
2818 return nullptr;
2819}
2820
2821CtPtr ProtocolV2::send_server_ident() {
2822 ldout(cct, 20) << __func__ << dendl;
2823
2824 // this is required for the case when this connection is being replaced
2825 out_seq = discard_requeued_up_to(out_seq, 0);
2826 in_seq = 0;
2827
2828 if (!connection->policy.lossy) {
2829 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2830 }
2831
2832 uint64_t flags = 0;
2833 if (connection->policy.lossy) {
2834 flags = flags | CEPH_MSG_CONNECT_LOSSY;
2835 }
2836
2837 uint64_t gs = messenger->get_global_seq();
2838 auto server_ident = ServerIdentFrame::Encode(
2839 messenger->get_myaddrs(),
2840 messenger->get_myname().num(),
2841 gs,
2842 connection->policy.features_supported,
2843 connection->policy.features_required | msgr2_required,
2844 flags,
2845 server_cookie);
2846
2847 ldout(cct, 5) << __func__ << " sending identification:"
2848 << " addrs=" << messenger->get_myaddrs()
2849 << " gid=" << messenger->get_myname().num()
2850 << " global_seq=" << gs << " features_supported=" << std::hex
2851 << connection->policy.features_supported
2852 << " features_required="
2853 << (connection->policy.features_required | msgr2_required)
2854 << " flags=" << flags << " cookie=" << std::dec << server_cookie
2855 << dendl;
2856
2857 connection->lock.unlock();
2858 // Because "replacing" will prevent other connections preempt this addr,
2859 // it's safe that here we don't acquire Connection's lock
2860 ssize_t r = messenger->accept_conn(connection);
2861
2862 connection->inject_delay();
2863
2864 connection->lock.lock();
2865
2866 if (r < 0) {
2867 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2868 << connection->peer_addrs->msgr2_addr()
2869 << " just fail later one(this)" << dendl;
2870 connection->inject_delay();
2871 return _fault();
2872 }
2873 if (state != SESSION_ACCEPTING) {
2874 ldout(cct, 1) << __func__
2875 << " state changed while accept_conn, it must be mark_down"
2876 << dendl;
2877 ceph_assert(state == CLOSED || state == NONE);
2878 messenger->unregister_conn(connection);
2879 connection->inject_delay();
2880 return _fault();
2881 }
2882
2883 connection->set_features(connection_features);
2884
2885 // notify
2886 connection->dispatch_queue->queue_accept(connection);
2887 messenger->ms_deliver_handle_fast_accept(connection);
2888
2889 INTERCEPT(12);
2890
2891 return WRITE(server_ident, "server ident", server_ready);
2892}
2893
2894CtPtr ProtocolV2::server_ready() {
2895 ldout(cct, 20) << __func__ << dendl;
2896
2897 if (connection->delay_state) {
2898 ceph_assert(connection->delay_state->ready());
2899 }
2900
2901 return ready();
2902}
2903
2904CtPtr ProtocolV2::send_reconnect_ok() {
2905 ldout(cct, 20) << __func__ << dendl;
2906
2907 out_seq = discard_requeued_up_to(out_seq, message_seq);
2908
2909 uint64_t ms = in_seq;
2910 auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2911
2912 ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2913
2914 connection->lock.unlock();
2915 // Because "replacing" will prevent other connections preempt this addr,
2916 // it's safe that here we don't acquire Connection's lock
2917 ssize_t r = messenger->accept_conn(connection);
2918
2919 connection->inject_delay();
2920
2921 connection->lock.lock();
2922
2923 if (r < 0) {
2924 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2925 << connection->peer_addrs->msgr2_addr()
2926 << " just fail later one(this)" << dendl;
2927 connection->inject_delay();
2928 return _fault();
2929 }
2930 if (state != SESSION_ACCEPTING) {
2931 ldout(cct, 1) << __func__
2932 << " state changed while accept_conn, it must be mark_down"
2933 << dendl;
2934 ceph_assert(state == CLOSED || state == NONE);
2935 messenger->unregister_conn(connection);
2936 connection->inject_delay();
2937 return _fault();
2938 }
2939
2940 // notify
2941 connection->dispatch_queue->queue_accept(connection);
2942 messenger->ms_deliver_handle_fast_accept(connection);
2943
2944 INTERCEPT(14);
2945
2946 return WRITE(reconnect_ok, "reconnect ok", server_ready);
2947}