]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/ProtocolV2.cc
build: use dgit for download target
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <type_traits>
5
6#include "ProtocolV2.h"
7#include "AsyncMessenger.h"
8
9#include "common/EventTrace.h"
10#include "common/ceph_crypto.h"
11#include "common/errno.h"
12#include "include/random.h"
13#include "auth/AuthClient.h"
14#include "auth/AuthServer.h"
15
16#define dout_subsys ceph_subsys_ms
17#undef dout_prefix
18#define dout_prefix _conn_prefix(_dout)
19ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
20 return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21 << *connection->peer_addrs << " conn(" << connection << " "
22 << this
23 << " " << ceph_con_mode_name(auth_meta->con_mode)
24 << " :" << connection->port
25 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq << " l=" << connection->policy.lossy
27 << " rx=" << session_stream_handlers.rx.get()
28 << " tx=" << session_stream_handlers.tx.get()
29 << ").";
30}
31
32using namespace ceph::msgr::v2;
33
34using CtPtr = Ct<ProtocolV2> *;
35using CtRef = Ct<ProtocolV2> &;
36
37void ProtocolV2::run_continuation(CtPtr pcontinuation) {
38 if (pcontinuation) {
39 run_continuation(*pcontinuation);
40 }
41}
42
43void ProtocolV2::run_continuation(CtRef continuation) {
44 try {
45 CONTINUATION_RUN(continuation)
46 } catch (const buffer::error &e) {
47 lderr(cct) << __func__ << " failed decoding of frame header: " << e
48 << dendl;
49 _fault();
50 } catch (const ceph::crypto::onwire::MsgAuthError &e) {
51 lderr(cct) << __func__ << " " << e.what() << dendl;
52 _fault();
53 } catch (const DecryptionError &) {
54 lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
55 }
56}
57
58#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
59
60#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
61
62#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
63
64#ifdef UNIT_TESTS_BUILT
65
66#define INTERCEPT(S) { \
67if(connection->interceptor) { \
68 auto a = connection->interceptor->intercept(connection, (S)); \
69 if (a == Interceptor::ACTION::FAIL) { \
70 return _fault(); \
71 } else if (a == Interceptor::ACTION::STOP) { \
72 stop(); \
73 connection->dispatch_queue->queue_reset(connection); \
74 return nullptr; \
75 }}}
76
77#else
78#define INTERCEPT(S)
79#endif
80
81ProtocolV2::ProtocolV2(AsyncConnection *connection)
82 : Protocol(2, connection),
83 state(NONE),
84 peer_required_features(0),
85 client_cookie(0),
86 server_cookie(0),
87 global_seq(0),
88 connect_seq(0),
89 peer_global_seq(0),
90 message_seq(0),
91 reconnecting(false),
92 replacing(false),
93 can_write(false),
94 bannerExchangeCallback(nullptr),
95 next_tag(static_cast<Tag>(0)),
96 keepalive(false) {
97}
98
99ProtocolV2::~ProtocolV2() {
100}
101
102void ProtocolV2::connect() {
103 ldout(cct, 1) << __func__ << dendl;
104 state = START_CONNECT;
105 pre_auth.enabled = true;
106}
107
108void ProtocolV2::accept() {
109 ldout(cct, 1) << __func__ << dendl;
110 state = START_ACCEPT;
111}
112
113bool ProtocolV2::is_connected() { return can_write; }
114
115/*
116 * Tears down the message queues, and removes them from the
117 * DispatchQueue Must hold write_lock prior to calling.
118 */
119void ProtocolV2::discard_out_queue() {
120 ldout(cct, 10) << __func__ << " started" << dendl;
121
122 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
123 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
124 (*p)->put();
125 }
126 sent.clear();
127 for (auto& [ prio, entries ] : out_queue) {
128 static_cast<void>(prio);
129 for (auto& entry : entries) {
130 ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
131 entry.m->put();
132 }
133 }
134 out_queue.clear();
135}
136
137void ProtocolV2::reset_session() {
138 ldout(cct, 1) << __func__ << dendl;
139
140 std::lock_guard<std::mutex> l(connection->write_lock);
141 if (connection->delay_state) {
142 connection->delay_state->discard();
143 }
144
145 connection->dispatch_queue->discard_queue(connection->conn_id);
146 discard_out_queue();
147 connection->outcoming_bl.clear();
148
149 connection->dispatch_queue->queue_remote_reset(connection);
150
151 out_seq = 0;
152 in_seq = 0;
153 client_cookie = 0;
154 server_cookie = 0;
155 connect_seq = 0;
156 peer_global_seq = 0;
157 message_seq = 0;
158 ack_left = 0;
159 can_write = false;
160}
161
162void ProtocolV2::stop() {
163 ldout(cct, 1) << __func__ << dendl;
164 if (state == CLOSED) {
165 return;
166 }
167
168 if (connection->delay_state) connection->delay_state->flush();
169
170 std::lock_guard<std::mutex> l(connection->write_lock);
171
172 reset_recv_state();
173 discard_out_queue();
174
175 connection->_stop();
176
177 can_write = false;
178 state = CLOSED;
179}
180
181void ProtocolV2::fault() { _fault(); }
182
183void ProtocolV2::requeue_sent() {
184 if (sent.empty()) {
185 return;
186 }
187
188 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
189 out_seq -= sent.size();
190 while (!sent.empty()) {
191 Message *m = sent.back();
192 sent.pop_back();
193 ldout(cct, 5) << __func__ << " requeueing message m=" << m
194 << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
195 << *m << dendl;
196 rq.emplace_front(out_queue_entry_t{false, m});
197 }
198}
199
200uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
201 ldout(cct, 10) << __func__ << " " << seq << dendl;
202 std::lock_guard<std::mutex> l(connection->write_lock);
203 if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
204 return seq;
205 }
206 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
207 uint64_t count = out_seq;
208 while (!rq.empty()) {
209 Message* const m = rq.front().m;
210 if (m->get_seq() == 0 || m->get_seq() > seq) break;
211 ldout(cct, 5) << __func__ << " discarding message m=" << m
212 << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
213 << *m << dendl;
214 m->put();
215 rq.pop_front();
216 count++;
217 }
218 if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
219 return count;
220}
221
222void ProtocolV2::reset_recv_state() {
223 if ((state >= AUTH_CONNECTING && state <= SESSION_RECONNECTING) ||
224 state == READY) {
225 auth_meta.reset(new AuthConnectionMeta);
226 session_stream_handlers.tx.reset(nullptr);
227 session_stream_handlers.rx.reset(nullptr);
228 pre_auth.txbuf.clear();
229 pre_auth.rxbuf.clear();
230 }
231
232 // clean read and write callbacks
233 connection->pendingReadLen.reset();
234 connection->writeCallback.reset();
235
236 next_tag = static_cast<Tag>(0);
237
238 reset_throttle();
239}
240
241size_t ProtocolV2::get_current_msg_size() const {
242 ceph_assert(!rx_segments_desc.empty());
243 size_t sum = 0;
244 // we don't include SegmentIndex::Msg::HEADER.
245 for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
246 sum += rx_segments_desc[idx].length;
247 }
248 return sum;
249}
250
251void ProtocolV2::reset_throttle() {
252 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
253 connection->policy.throttler_messages) {
254 ldout(cct, 10) << __func__ << " releasing " << 1
255 << " message to policy throttler "
256 << connection->policy.throttler_messages->get_current()
257 << "/" << connection->policy.throttler_messages->get_max()
258 << dendl;
259 connection->policy.throttler_messages->put();
260 }
261 if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
262 if (connection->policy.throttler_bytes) {
263 const size_t cur_msg_size = get_current_msg_size();
264 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
265 << " bytes to policy throttler "
266 << connection->policy.throttler_bytes->get_current() << "/"
267 << connection->policy.throttler_bytes->get_max() << dendl;
268 connection->policy.throttler_bytes->put(cur_msg_size);
269 }
270 }
271 if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
272 const size_t cur_msg_size = get_current_msg_size();
273 ldout(cct, 10)
274 << __func__ << " releasing " << cur_msg_size
275 << " bytes to dispatch_queue throttler "
276 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
277 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
278 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
279 }
280}
281
282CtPtr ProtocolV2::_fault() {
283 ldout(cct, 10) << __func__ << dendl;
284
285 if (state == CLOSED || state == NONE) {
286 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
287 return nullptr;
288 }
289
290 if (connection->policy.lossy &&
291 !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
292 ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
293 stop();
294 connection->dispatch_queue->queue_reset(connection);
295 return nullptr;
296 }
297
298 connection->write_lock.lock();
299
300 can_write = false;
301 // requeue sent items
302 requeue_sent();
303
304 if (out_queue.empty() && state >= START_ACCEPT &&
305 state <= SESSION_ACCEPTING && !replacing) {
306 ldout(cct, 2) << __func__ << " with nothing to send and in the half "
307 << " accept state just closed" << dendl;
308 connection->write_lock.unlock();
309 stop();
310 connection->dispatch_queue->queue_reset(connection);
311 return nullptr;
312 }
313
314 replacing = false;
315 connection->fault();
316 reset_recv_state();
317
318 reconnecting = false;
319
320 if (connection->policy.standby && out_queue.empty() && !keepalive &&
321 state != WAIT) {
322 ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
323 << dendl;
324 state = STANDBY;
325 connection->write_lock.unlock();
326 return nullptr;
327 }
328 if (connection->policy.server) {
329 ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
330 state = STANDBY;
331 connection->write_lock.unlock();
332 return nullptr;
333 }
334
335 connection->write_lock.unlock();
336
337 if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
338 state != WAIT &&
339 state != SESSION_ACCEPTING /* due to connection race */) {
340 // policy maybe empty when state is in accept
341 if (connection->policy.server) {
342 ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
343 state = STANDBY;
344 } else {
345 ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
346 connect_seq++;
347 global_seq = messenger->get_global_seq();
348 state = START_CONNECT;
349 pre_auth.enabled = true;
350 connection->state = AsyncConnection::STATE_CONNECTING;
351 }
352 backoff = utime_t();
353 connection->center->dispatch_event_external(connection->read_handler);
354 } else {
355 if (state == WAIT) {
356 backoff.set_from_double(cct->_conf->ms_max_backoff);
357 } else if (backoff == utime_t()) {
358 backoff.set_from_double(cct->_conf->ms_initial_backoff);
359 } else {
360 backoff += backoff;
361 if (backoff > cct->_conf->ms_max_backoff)
362 backoff.set_from_double(cct->_conf->ms_max_backoff);
363 }
364
365 if (server_cookie) {
366 connect_seq++;
367 }
368
369 global_seq = messenger->get_global_seq();
370 state = START_CONNECT;
371 pre_auth.enabled = true;
372 connection->state = AsyncConnection::STATE_CONNECTING;
373 ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
374 // woke up again;
375 connection->register_time_events.insert(
376 connection->center->create_time_event(backoff.to_nsec() / 1000,
377 connection->wakeup_handler));
378 }
379 return nullptr;
380}
381
382void ProtocolV2::prepare_send_message(uint64_t features,
383 Message *m) {
384 ldout(cct, 20) << __func__ << " m=" << *m << dendl;
385
386 // associate message with Connection (for benefit of encode_payload)
387 if (m->empty_payload()) {
388 ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
389 << " " << *m << dendl;
390 } else {
391 ldout(cct, 20) << __func__ << " half-reencoding features " << features
392 << " " << m << " " << *m << dendl;
393 }
394
395 // encode and copy out of *m
396 m->encode(features, 0);
397}
398
399void ProtocolV2::send_message(Message *m) {
400 uint64_t f = connection->get_features();
401
402 // TODO: Currently not all messages supports reencode like MOSDMap, so here
403 // only let fast dispatch support messages prepare message
404 const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
405 if (can_fast_prepare) {
406 prepare_send_message(f, m);
407 }
408
409 std::lock_guard<std::mutex> l(connection->write_lock);
410 bool is_prepared = can_fast_prepare;
411 // "features" changes will change the payload encoding
412 if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
413 // ensure the correctness of message encoding
414 m->clear_payload();
415 is_prepared = false;
416 ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
417 << " != " << connection->get_features() << dendl;
418 }
419 if (state == CLOSED) {
420 ldout(cct, 10) << __func__ << " connection closed."
421 << " Drop message " << m << dendl;
422 m->put();
423 } else {
424 ldout(cct, 5) << __func__ << " enqueueing message m=" << m
425 << " type=" << m->get_type() << " " << *m << dendl;
426 m->trace.event("async enqueueing message");
427 out_queue[m->get_priority()].emplace_back(
428 out_queue_entry_t{is_prepared, m});
429 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
430 << dendl;
431 if ((!replacing && can_write) || state == STANDBY) {
432 connection->center->dispatch_event_external(connection->write_handler);
433 }
434 }
435}
436
437void ProtocolV2::send_keepalive() {
438 ldout(cct, 10) << __func__ << dendl;
439 std::lock_guard<std::mutex> l(connection->write_lock);
440 if (state != CLOSED) {
441 keepalive = true;
442 connection->center->dispatch_event_external(connection->write_handler);
443 }
444}
445
446void ProtocolV2::read_event() {
447 ldout(cct, 20) << __func__ << dendl;
448
449 switch (state) {
450 case START_CONNECT:
451 run_continuation(CONTINUATION(start_client_banner_exchange));
452 break;
453 case START_ACCEPT:
454 run_continuation(CONTINUATION(start_server_banner_exchange));
455 break;
456 case READY:
457 run_continuation(CONTINUATION(read_frame));
458 break;
459 case THROTTLE_MESSAGE:
460 run_continuation(CONTINUATION(throttle_message));
461 break;
462 case THROTTLE_BYTES:
463 run_continuation(CONTINUATION(throttle_bytes));
464 break;
465 case THROTTLE_DISPATCH_QUEUE:
466 run_continuation(CONTINUATION(throttle_dispatch_queue));
467 break;
468 default:
469 break;
470 }
471}
472
473ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
474 out_queue_entry_t out_entry;
475
476 if (!out_queue.empty()) {
477 auto it = out_queue.rbegin();
478 auto& entries = it->second;
479 ceph_assert(!entries.empty());
480 out_entry = entries.front();
481 entries.pop_front();
482 if (entries.empty()) {
483 out_queue.erase(it->first);
484 }
485 }
486 return out_entry;
487}
488
489ssize_t ProtocolV2::write_message(Message *m, bool more) {
490 FUNCTRACE(cct);
491 ceph_assert(connection->center->in_thread());
492 m->set_seq(++out_seq);
493
494 connection->lock.lock();
495 uint64_t ack_seq = in_seq;
496 ack_left = 0;
497 connection->lock.unlock();
498
499 ceph_msg_header &header = m->get_header();
500 ceph_msg_footer &footer = m->get_footer();
501
502 ceph_msg_header2 header2{header.seq, header.tid,
503 header.type, header.priority,
504 header.version,
505 0, header.data_off,
506 ack_seq,
507 footer.flags, header.compat_version,
508 header.reserved};
509
510 auto message = MessageFrame::Encode(
511 header2,
512 m->get_payload(),
513 m->get_middle(),
514 m->get_data());
515 connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
516
517 ldout(cct, 5) << __func__ << " sending message m=" << m
518 << " seq=" << m->get_seq() << " " << *m << dendl;
519
520 m->trace.event("async writing message");
521 ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
522 << " src=" << entity_name_t(messenger->get_myname())
523 << " off=" << header2.data_off
524 << dendl;
525 ssize_t total_send_size = connection->outcoming_bl.length();
526 ssize_t rc = connection->_try_send(more);
527 if (rc < 0) {
528 ldout(cct, 1) << __func__ << " error sending " << m << ", "
529 << cpp_strerror(rc) << dendl;
530 } else {
531 connection->logger->inc(
532 l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
533 ldout(cct, 10) << __func__ << " sending " << m
534 << (rc ? " continuely." : " done.") << dendl;
535 }
536 if (m->get_type() == CEPH_MSG_OSD_OP)
537 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
538 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
539 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
540 m->put();
541
542 return rc;
543}
544
545void ProtocolV2::append_keepalive() {
546 ldout(cct, 10) << __func__ << dendl;
547 auto keepalive_frame = KeepAliveFrame::Encode();
548 connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
549}
550
551void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
552 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
553 connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
554}
555
556void ProtocolV2::handle_message_ack(uint64_t seq) {
557 if (connection->policy.lossy) { // lossy connections don't keep sent messages
558 return;
559 }
560
561 ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
562
563 // trim sent list
564 static const int max_pending = 128;
565 int i = 0;
566 Message *pending[max_pending];
567 connection->write_lock.lock();
568 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
569 Message *m = sent.front();
570 sent.pop_front();
571 pending[i++] = m;
572 ldout(cct, 10) << __func__ << " got ack seq " << seq
573 << " >= " << m->get_seq() << " on " << m << " " << *m
574 << dendl;
575 }
576 connection->write_lock.unlock();
577 for (int k = 0; k < i; k++) {
578 pending[k]->put();
579 }
580}
581
582void ProtocolV2::write_event() {
583 ldout(cct, 10) << __func__ << dendl;
584 ssize_t r = 0;
585
586 connection->write_lock.lock();
587 if (can_write) {
588 if (keepalive) {
589 append_keepalive();
590 keepalive = false;
591 }
592
593 auto start = ceph::mono_clock::now();
594 bool more;
595 do {
596 const auto out_entry = _get_next_outgoing();
597 if (!out_entry.m) {
598 break;
599 }
600
601 if (!connection->policy.lossy) {
602 // put on sent list
603 sent.push_back(out_entry.m);
604 out_entry.m->get();
605 }
606 more = !out_queue.empty();
607 connection->write_lock.unlock();
608
609 // send_message or requeue messages may not encode message
610 if (!out_entry.is_prepared) {
611 prepare_send_message(connection->get_features(), out_entry.m);
612 }
613
614 r = write_message(out_entry.m, more);
615
616 connection->write_lock.lock();
617 if (r == 0) {
618 ;
619 } else if (r < 0) {
620 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
621 break;
622 } else if (r > 0)
623 break;
624 } while (can_write);
625
626 // if r > 0 mean data still lefted, so no need _try_send.
627 if (r == 0) {
628 uint64_t left = ack_left;
629 if (left) {
630 ceph_le64 s;
631 s = in_seq;
632 auto ack = AckFrame::Encode(in_seq);
633 connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
634 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
635 << " messages" << dendl;
636 ack_left -= left;
637 left = ack_left;
638 r = connection->_try_send(left);
639 } else if (is_queued()) {
640 r = connection->_try_send();
641 }
642 }
643 connection->write_lock.unlock();
644
645 connection->logger->tinc(l_msgr_running_send_time,
646 ceph::mono_clock::now() - start);
647 if (r < 0) {
648 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
649 connection->lock.lock();
650 fault();
651 connection->lock.unlock();
652 return;
653 }
654 } else {
655 connection->write_lock.unlock();
656 connection->lock.lock();
657 connection->write_lock.lock();
658 if (state == STANDBY && !connection->policy.server && is_queued()) {
659 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
660 if (server_cookie) { // only increment connect_seq if there is a session
661 connect_seq++;
662 }
663 connection->_connect();
664 } else if (connection->cs && state != NONE && state != CLOSED &&
665 state != START_CONNECT) {
666 r = connection->_try_send();
667 if (r < 0) {
668 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
669 connection->write_lock.unlock();
670 fault();
671 connection->lock.unlock();
672 return;
673 }
674 }
675 connection->write_lock.unlock();
676 connection->lock.unlock();
677 }
678}
679
680bool ProtocolV2::is_queued() {
681 return !out_queue.empty() || connection->is_queued();
682}
683
684uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
685 if (session_stream_handlers.rx) {
686 return segment_onwire_size(logical_size);
687 } else {
688 return logical_size;
689 }
690}
691
692uint32_t ProtocolV2::get_epilogue_size() const {
693 // In secure mode size of epilogue is flexible and depends on particular
694 // cipher implementation. See the comment for epilogue_secure_block_t or
695 // epilogue_plain_block_t.
696 if (session_stream_handlers.rx) {
697 return FRAME_SECURE_EPILOGUE_SIZE + \
698 session_stream_handlers.rx->get_extra_size_at_final();
699 } else {
700 return FRAME_PLAIN_EPILOGUE_SIZE;
701 }
702}
703
704CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
705 rx_buffer_t &&buffer) {
706 const auto len = buffer->length();
707 const auto buf = buffer->c_str();
708 next.node = std::move(buffer);
709 ssize_t r = connection->read(len, buf,
710 [&next, this](char *buffer, int r) {
711 if (unlikely(pre_auth.enabled) && r >= 0) {
712 pre_auth.rxbuf.append(*next.node);
713 ceph_assert(!cct->_conf->ms_die_on_bug ||
714 pre_auth.rxbuf.length() < 1000000);
715 }
716 next.r = r;
717 run_continuation(next);
718 });
719 if (r <= 0) {
720 // error or done synchronously
721 if (unlikely(pre_auth.enabled) && r >= 0) {
722 pre_auth.rxbuf.append(*next.node);
723 ceph_assert(!cct->_conf->ms_die_on_bug ||
724 pre_auth.rxbuf.length() < 1000000);
725 }
726 next.r = r;
727 return &next;
728 }
729
730 return nullptr;
731}
732
733template <class F>
734CtPtr ProtocolV2::write(const std::string &desc,
735 CONTINUATION_TYPE<ProtocolV2> &next,
736 F &frame) {
737 ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
738 return write(desc, next, bl);
739}
740
741CtPtr ProtocolV2::write(const std::string &desc,
742 CONTINUATION_TYPE<ProtocolV2> &next,
743 bufferlist &buffer) {
744 if (unlikely(pre_auth.enabled)) {
745 pre_auth.txbuf.append(buffer);
746 ceph_assert(!cct->_conf->ms_die_on_bug ||
747 pre_auth.txbuf.length() < 1000000);
748 }
749
750 ssize_t r =
751 connection->write(buffer, [&next, desc, this](int r) {
752 if (r < 0) {
753 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
754 << " (" << cpp_strerror(r) << ")" << dendl;
755 connection->inject_delay();
756 _fault();
757 }
758 run_continuation(next);
759 });
760
761 if (r < 0) {
762 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
763 << " (" << cpp_strerror(r) << ")" << dendl;
764 return _fault();
765 } else if (r == 0) {
766 next.setParams();
767 return &next;
768 }
769
770 return nullptr;
771}
772
773CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
774 ldout(cct, 20) << __func__ << dendl;
775 bannerExchangeCallback = &callback;
776
777 bufferlist banner_payload;
778 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
779 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
780
781 bufferlist bl;
782 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
783 encode((uint16_t)banner_payload.length(), bl, 0);
784 bl.claim_append(banner_payload);
785
786 INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
787
788 return WRITE(bl, "banner", _wait_for_peer_banner);
789}
790
791CtPtr ProtocolV2::_wait_for_peer_banner() {
792 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
793 return READ(banner_len, _handle_peer_banner);
794}
795
796CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
797 ldout(cct, 20) << __func__ << " r=" << r << dendl;
798
799 if (r < 0) {
800 ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
801 << cpp_strerror(r) << ")" << dendl;
802 return _fault();
803 }
804
805 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
806
807 if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
808 if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
809 lderr(cct) << __func__ << " peer " << *connection->peer_addrs
810 << " is using msgr V1 protocol" << dendl;
811 return _fault();
812 }
813 ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
814 return _fault();
815 }
816
817 uint16_t payload_len;
818 bufferlist bl;
819 buffer->set_offset(banner_prefix_len);
820 buffer->set_length(sizeof(__le16));
821 bl.push_back(std::move(buffer));
822 auto ti = bl.cbegin();
823 try {
824 decode(payload_len, ti);
825 } catch (const buffer::error &e) {
826 lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
827 return _fault();
828 }
829
830 INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
831
832 return READ(payload_len, _handle_peer_banner_payload);
833}
834
835CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
836 ldout(cct, 20) << __func__ << " r=" << r << dendl;
837
838 if (r < 0) {
839 ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
840 << " (" << cpp_strerror(r) << ")" << dendl;
841 return _fault();
842 }
843
844 uint64_t peer_supported_features;
845 uint64_t peer_required_features;
846
847 bufferlist bl;
848 bl.push_back(std::move(buffer));
849 auto ti = bl.cbegin();
850 try {
851 decode(peer_supported_features, ti);
852 decode(peer_required_features, ti);
853 } catch (const buffer::error &e) {
854 lderr(cct) << __func__ << " decode banner payload failed " << dendl;
855 return _fault();
856 }
857
858 ldout(cct, 1) << __func__ << " supported=" << std::hex
859 << peer_supported_features << " required=" << std::hex
860 << peer_required_features << std::dec << dendl;
861
862 // Check feature bit compatibility
863
864 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
865 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
866
867 if ((required_features & peer_supported_features) != required_features) {
868 ldout(cct, 1) << __func__ << " peer does not support all required features"
869 << " required=" << std::hex << required_features
870 << " supported=" << std::hex << peer_supported_features
871 << std::dec << dendl;
872 stop();
873 connection->dispatch_queue->queue_reset(connection);
874 return nullptr;
875 }
876 if ((supported_features & peer_required_features) != peer_required_features) {
877 ldout(cct, 1) << __func__ << " we do not support all peer required features"
878 << " required=" << std::hex << peer_required_features
879 << " supported=" << supported_features << std::dec << dendl;
880 stop();
881 connection->dispatch_queue->queue_reset(connection);
882 return nullptr;
883 }
884
885 this->peer_required_features = peer_required_features;
886 if (this->peer_required_features == 0) {
887 this->connection_features = msgr2_required;
888 }
889
890 // at this point we can change how the client protocol behaves based on
891 // this->peer_required_features
892
893 if (state == BANNER_CONNECTING) {
894 state = HELLO_CONNECTING;
895 }
896 else {
897 ceph_assert(state == BANNER_ACCEPTING);
898 state = HELLO_ACCEPTING;
899 }
900
901 auto hello = HelloFrame::Encode(messenger->get_mytype(),
902 connection->target_addr);
903
904 INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
905
906 return WRITE(hello, "hello frame", read_frame);
907}
908
909CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
910{
911 ldout(cct, 20) << __func__
912 << " payload.length()=" << payload.length() << dendl;
913
914 if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
915 lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
916 return _fault();
917 }
918
919 auto hello = HelloFrame::Decode(payload);
920
921 ldout(cct, 5) << __func__ << " received hello:"
922 << " peer_type=" << (int)hello.entity_type()
923 << " peer_addr_for_me=" << hello.peer_addr() << dendl;
924
81eedcae
TL
925 sockaddr_storage ss;
926 socklen_t len = sizeof(ss);
927 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
928 ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
929 << " when talking to " << connection->target_addr << dendl;
930
11fdf7f2
TL
931 if (connection->get_peer_type() == -1) {
932 connection->set_peer_type(hello.entity_type());
933
934 ceph_assert(state == HELLO_ACCEPTING);
935 connection->policy = messenger->get_policy(hello.entity_type());
936 ldout(cct, 10) << __func__ << " accept of host_type "
937 << (int)hello.entity_type()
938 << ", policy.lossy=" << connection->policy.lossy
939 << " policy.server=" << connection->policy.server
940 << " policy.standby=" << connection->policy.standby
941 << " policy.resetcheck=" << connection->policy.resetcheck
942 << dendl;
943 } else {
944 ceph_assert(state == HELLO_CONNECTING);
945 if (connection->get_peer_type() != hello.entity_type()) {
946 ldout(cct, 1) << __func__ << " connection peer type does not match what"
947 << " peer advertises " << connection->get_peer_type()
948 << " != " << (int)hello.entity_type() << dendl;
949 stop();
950 connection->dispatch_queue->queue_reset(connection);
951 return nullptr;
952 }
953 }
954
81eedcae
TL
955 if (messenger->get_myaddrs().empty() ||
956 messenger->get_myaddrs().front().is_blank_ip()) {
957 entity_addr_t a;
958 if (cct->_conf->ms_learn_addr_from_peer) {
959 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
960 << " says I am " << hello.peer_addr() << " (socket says "
961 << (sockaddr*)&ss << ")" << dendl;
962 a = hello.peer_addr();
963 } else {
964 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
965 << " says I am " << (sockaddr*)&ss
966 << " (peer says " << hello.peer_addr() << ")" << dendl;
967 a.set_sockaddr((sockaddr *)&ss);
968 }
969 a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
970 a.set_port(0);
971 connection->lock.unlock();
972 messenger->learned_addr(a);
973 if (cct->_conf->ms_inject_internal_delays &&
974 cct->_conf->ms_inject_socket_failures) {
975 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
976 ldout(cct, 10) << __func__ << " sleep for "
977 << cct->_conf->ms_inject_internal_delays << dendl;
978 utime_t t;
979 t.set_from_double(cct->_conf->ms_inject_internal_delays);
980 t.sleep();
981 }
982 }
983 connection->lock.lock();
984 if (state != HELLO_CONNECTING) {
985 ldout(cct, 1) << __func__
986 << " state changed while learned_addr, mark_down or "
987 << " replacing must be happened just now" << dendl;
988 return nullptr;
989 }
990 }
991
992
993
11fdf7f2
TL
994 CtPtr callback;
995 callback = bannerExchangeCallback;
996 bannerExchangeCallback = nullptr;
997 ceph_assert(callback);
998 return callback;
999}
1000
1001CtPtr ProtocolV2::read_frame() {
1002 if (state == CLOSED) {
1003 return nullptr;
1004 }
1005
1006 ldout(cct, 20) << __func__ << dendl;
1007 return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
1008}
1009
1010CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
1011 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1012
1013 if (r < 0) {
1014 ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
1015 << " (" << cpp_strerror(r) << ")" << dendl;
1016 return _fault();
1017 }
1018
1019 ceph::bufferlist preamble;
1020 preamble.push_back(std::move(buffer));
1021
1022 ldout(cct, 30) << __func__ << " preamble\n";
1023 preamble.hexdump(*_dout);
1024 *_dout << dendl;
1025
1026 if (session_stream_handlers.rx) {
1027 ceph_assert(session_stream_handlers.rx);
1028
1029 session_stream_handlers.rx->reset_rx_handler();
1030 preamble = session_stream_handlers.rx->authenticated_decrypt_update(
1031 std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
1032
1033 ldout(cct, 10) << __func__ << " got encrypted preamble."
1034 << " after decrypt premable.length()=" << preamble.length()
1035 << dendl;
1036
1037 ldout(cct, 30) << __func__ << " preamble after decrypt\n";
1038 preamble.hexdump(*_dout);
1039 *_dout << dendl;
1040 }
1041
1042 {
1043 // I expect ceph_le32 will make the endian conversion for me. Passing
1044 // everything through ::Decode is unnecessary.
1045 const auto& main_preamble = \
1046 reinterpret_cast<preamble_block_t&>(*preamble.c_str());
1047
1048 // verify preamble's CRC before any further processing
1049 const auto rx_crc = ceph_crc32c(0,
1050 reinterpret_cast<const unsigned char*>(&main_preamble),
1051 sizeof(main_preamble) - sizeof(main_preamble.crc));
1052 if (rx_crc != main_preamble.crc) {
1053 ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
1054 << " rx_crc=" << rx_crc
1055 << " tx_crc=" << main_preamble.crc << dendl;
1056 return _fault();
1057 }
1058
1059 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1060 if (main_preamble.num_segments < 1 ||
1061 main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1062 ldout(cct, 10) << __func__ << " unsupported num_segments="
1063 << " tx_crc=" << main_preamble.num_segments << dendl;
1064 return _fault();
1065 }
1066
1067 next_tag = static_cast<Tag>(main_preamble.tag);
1068
1069 rx_segments_desc.clear();
1070 rx_segments_data.clear();
1071
1072 if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1073 ldout(cct, 30) << __func__
1074 << " num_segments=" << main_preamble.num_segments
1075 << " is too much" << dendl;
1076 return _fault();
1077 }
1078 for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
1079 ldout(cct, 10) << __func__ << " got new segment:"
1080 << " len=" << main_preamble.segments[idx].length
1081 << " align=" << main_preamble.segments[idx].alignment
1082 << dendl;
1083 rx_segments_desc.emplace_back(main_preamble.segments[idx]);
1084 }
1085 }
1086
1087 // does it need throttle?
1088 if (next_tag == Tag::MESSAGE) {
1089 if (state != READY) {
1090 lderr(cct) << __func__ << " not in ready state!" << dendl;
1091 return _fault();
1092 }
1093 state = THROTTLE_MESSAGE;
1094 return CONTINUE(throttle_message);
1095 } else {
1096 return read_frame_segment();
1097 }
1098}
1099
1100CtPtr ProtocolV2::handle_read_frame_dispatch() {
1101 ldout(cct, 10) << __func__
1102 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1103
1104 switch (next_tag) {
1105 case Tag::HELLO:
1106 case Tag::AUTH_REQUEST:
1107 case Tag::AUTH_BAD_METHOD:
1108 case Tag::AUTH_REPLY_MORE:
1109 case Tag::AUTH_REQUEST_MORE:
1110 case Tag::AUTH_DONE:
1111 case Tag::AUTH_SIGNATURE:
1112 case Tag::CLIENT_IDENT:
1113 case Tag::SERVER_IDENT:
1114 case Tag::IDENT_MISSING_FEATURES:
1115 case Tag::SESSION_RECONNECT:
1116 case Tag::SESSION_RESET:
1117 case Tag::SESSION_RETRY:
1118 case Tag::SESSION_RETRY_GLOBAL:
1119 case Tag::SESSION_RECONNECT_OK:
1120 case Tag::KEEPALIVE2:
1121 case Tag::KEEPALIVE2_ACK:
1122 case Tag::ACK:
1123 case Tag::WAIT:
1124 return handle_frame_payload();
1125 case Tag::MESSAGE:
1126 return handle_message();
1127 default: {
1128 lderr(cct) << __func__
1129 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1130 << dendl;
1131 return _fault();
1132 }
1133 }
1134
1135 return nullptr;
1136}
1137
1138CtPtr ProtocolV2::read_frame_segment() {
1139 ldout(cct, 20) << __func__ << dendl;
1140 ceph_assert(!rx_segments_desc.empty());
1141
1142 // description of current segment to read
1143 const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
1144 rx_buffer_t rx_buffer;
1145 try {
1146 rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
1147 get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
1148 } catch (std::bad_alloc&) {
1149 // Catching because of potential issues with satisfying alignment.
1150 ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
1151 << " len=" << get_onwire_size(cur_rx_desc.length)
1152 << " align=" << cur_rx_desc.alignment
1153 << dendl;
1154 return _fault();
1155 }
1156
1157 return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1158}
1159
1160CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1161 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1162
1163 if (r < 0) {
1164 ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1165 << cpp_strerror(r) << ")" << dendl;
1166 return _fault();
1167 }
1168
1169 rx_segments_data.emplace_back();
1170 rx_segments_data.back().push_back(std::move(rx_buffer));
1171
1172 // decrypt incoming data
1173 // FIXME: if (auth_meta->is_mode_secure()) {
1174 if (session_stream_handlers.rx) {
1175 ceph_assert(session_stream_handlers.rx);
1176
1177 auto& new_seg = rx_segments_data.back();
1178 if (new_seg.length()) {
1179 auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
1180 std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
1181 const auto idx = rx_segments_data.size() - 1;
1182 new_seg.clear();
1183 padded.splice(0, rx_segments_desc[idx].length, &new_seg);
1184
1185 ldout(cct, 20) << __func__
1186 << " unpadded new_seg.length()=" << new_seg.length()
1187 << dendl;
1188 }
1189 }
1190
1191 if (rx_segments_desc.size() == rx_segments_data.size()) {
1192 // OK, all segments planned to read are read. Can go with epilogue.
1193 return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
1194 } else {
1195 // TODO: for makeshift only. This will be more generic and throttled
1196 return read_frame_segment();
1197 }
1198}
1199
1200CtPtr ProtocolV2::handle_frame_payload() {
1201 ceph_assert(!rx_segments_data.empty());
1202 auto& payload = rx_segments_data.back();
1203
1204 ldout(cct, 30) << __func__ << "\n";
1205 payload.hexdump(*_dout);
1206 *_dout << dendl;
1207
1208 switch (next_tag) {
1209 case Tag::HELLO:
1210 return handle_hello(payload);
1211 case Tag::AUTH_REQUEST:
1212 return handle_auth_request(payload);
1213 case Tag::AUTH_BAD_METHOD:
1214 return handle_auth_bad_method(payload);
1215 case Tag::AUTH_REPLY_MORE:
1216 return handle_auth_reply_more(payload);
1217 case Tag::AUTH_REQUEST_MORE:
1218 return handle_auth_request_more(payload);
1219 case Tag::AUTH_DONE:
1220 return handle_auth_done(payload);
1221 case Tag::AUTH_SIGNATURE:
1222 return handle_auth_signature(payload);
1223 case Tag::CLIENT_IDENT:
1224 return handle_client_ident(payload);
1225 case Tag::SERVER_IDENT:
1226 return handle_server_ident(payload);
1227 case Tag::IDENT_MISSING_FEATURES:
1228 return handle_ident_missing_features(payload);
1229 case Tag::SESSION_RECONNECT:
1230 return handle_reconnect(payload);
1231 case Tag::SESSION_RESET:
1232 return handle_session_reset(payload);
1233 case Tag::SESSION_RETRY:
1234 return handle_session_retry(payload);
1235 case Tag::SESSION_RETRY_GLOBAL:
1236 return handle_session_retry_global(payload);
1237 case Tag::SESSION_RECONNECT_OK:
1238 return handle_reconnect_ok(payload);
1239 case Tag::KEEPALIVE2:
1240 return handle_keepalive2(payload);
1241 case Tag::KEEPALIVE2_ACK:
1242 return handle_keepalive2_ack(payload);
1243 case Tag::ACK:
1244 return handle_message_ack(payload);
1245 case Tag::WAIT:
1246 return handle_wait(payload);
1247 default:
1248 ceph_abort();
1249 }
1250 return nullptr;
1251}
1252
1253CtPtr ProtocolV2::ready() {
1254 ldout(cct, 25) << __func__ << dendl;
1255
1256 reconnecting = false;
1257 replacing = false;
1258
1259 // make sure no pending tick timer
1260 if (connection->last_tick_id) {
1261 connection->center->delete_time_event(connection->last_tick_id);
1262 }
1263 connection->last_tick_id = connection->center->create_time_event(
1264 connection->inactive_timeout_us, connection->tick_handler);
1265
1266 {
1267 std::lock_guard<std::mutex> l(connection->write_lock);
1268 can_write = true;
1269 if (!out_queue.empty()) {
1270 connection->center->dispatch_event_external(connection->write_handler);
1271 }
1272 }
1273
1274 connection->maybe_start_delay_thread();
1275
1276 state = READY;
1277 ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1278 << std::hex << client_cookie << " server_cookie="
1279 << server_cookie << std::dec << " in_seq=" << in_seq
1280 << " out_seq=" << out_seq << dendl;
1281
1282 INTERCEPT(15);
1283
1284 return CONTINUE(read_frame);
1285}
1286
1287CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1288{
1289 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1290
1291 if (r < 0) {
1292 ldout(cct, 1) << __func__ << " read data error " << dendl;
1293 return _fault();
1294 }
1295
1296 __u8 late_flags;
1297
1298 // FIXME: if (auth_meta->is_mode_secure()) {
1299 if (session_stream_handlers.rx) {
1300 ldout(cct, 1) << __func__ << " read frame epilogue bytes="
1301 << get_epilogue_size() << dendl;
1302
1303 // decrypt epilogue and authenticate entire frame.
1304 ceph::bufferlist epilogue_bl;
1305 {
1306 epilogue_bl.push_back(std::move(buffer));
1307 try {
1308 epilogue_bl =
1309 session_stream_handlers.rx->authenticated_decrypt_update_final(
1310 std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
1311 } catch (ceph::crypto::onwire::MsgAuthError &e) {
1312 ldout(cct, 5) << __func__ << " message authentication failed: "
1313 << e.what() << dendl;
1314 return _fault();
1315 }
1316 }
1317 auto& epilogue =
1318 reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
1319 late_flags = epilogue.late_flags;
1320 } else {
1321 auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
1322
1323 for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
1324 const __u32 expected_crc = epilogue.crc_values[idx];
1325 const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
1326 if (expected_crc != calculated_crc) {
1327 ldout(cct, 5) << __func__ << " message integrity check failed: "
1328 << " expected_crc=" << expected_crc
1329 << " calculated_crc=" << calculated_crc
1330 << dendl;
1331 return _fault();
1332 } else {
1333 ldout(cct, 20) << __func__ << " message integrity check success: "
1334 << " expected_crc=" << expected_crc
1335 << " calculated_crc=" << calculated_crc
1336 << dendl;
1337 }
1338 }
1339 late_flags = epilogue.late_flags;
1340 }
1341
1342 // we do have a mechanism that allows transmitter to start sending message
1343 // and abort after putting entire data field on wire. This will be used by
1344 // the kernel client to avoid unnecessary buffering.
1345 if (late_flags & FRAME_FLAGS_LATEABRT) {
1346 reset_throttle();
1347 state = READY;
1348 return CONTINUE(read_frame);
1349 } else {
1350 return handle_read_frame_dispatch();
1351 }
1352}
1353
1354CtPtr ProtocolV2::handle_message() {
1355 ldout(cct, 20) << __func__ << dendl;
1356 ceph_assert(state == THROTTLE_DONE);
1357
1358#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1359 ltt_recv_stamp = ceph_clock_now();
1360#endif
1361 recv_stamp = ceph_clock_now();
1362
1363 // we need to get the size before std::moving segments data
1364 const size_t cur_msg_size = get_current_msg_size();
1365 auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
1366
1367 // XXX: paranoid copy just to avoid oops
1368 ceph_msg_header2 current_header = msg_frame.header();
1369
1370 ldout(cct, 5) << __func__
1371 << " got " << msg_frame.front_len()
1372 << " + " << msg_frame.middle_len()
1373 << " + " << msg_frame.data_len()
1374 << " byte message."
1375 << " envelope type=" << current_header.type
1376 << " src " << peer_name
1377 << " off " << current_header.data_off
1378 << dendl;
1379
1380 INTERCEPT(16);
1381 ceph_msg_header header{current_header.seq,
1382 current_header.tid,
1383 current_header.type,
1384 current_header.priority,
1385 current_header.version,
1386 msg_frame.front_len(),
1387 msg_frame.middle_len(),
1388 msg_frame.data_len(),
1389 current_header.data_off,
1390 peer_name,
1391 current_header.compat_version,
1392 current_header.reserved,
1393 0};
1394 ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
1395
1396 Message *message = decode_message(cct, 0, header, footer,
1397 msg_frame.front(),
1398 msg_frame.middle(),
1399 msg_frame.data(),
1400 connection);
1401 if (!message) {
1402 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1403 return _fault();
1404 } else {
1405 state = READ_MESSAGE_COMPLETE;
1406 }
1407
1408 INTERCEPT(17);
1409
1410 message->set_byte_throttler(connection->policy.throttler_bytes);
1411 message->set_message_throttler(connection->policy.throttler_messages);
1412
1413 // store reservation size in message, so we don't get confused
1414 // by messages entering the dispatch queue through other paths.
1415 message->set_dispatch_throttle_size(cur_msg_size);
1416
1417 message->set_recv_stamp(recv_stamp);
1418 message->set_throttle_stamp(throttle_stamp);
1419 message->set_recv_complete_stamp(ceph_clock_now());
1420
1421 // check received seq#. if it is old, drop the message.
1422 // note that incoming messages may skip ahead. this is convenient for the
1423 // client side queueing because messages can't be renumbered, but the (kernel)
1424 // client will occasionally pull a message out of the sent queue to send
1425 // elsewhere. in that case it doesn't matter if we "got" it or not.
1426 uint64_t cur_seq = in_seq;
1427 if (message->get_seq() <= cur_seq) {
1428 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1429 << " <= " << cur_seq << " " << message << " " << *message
1430 << ", discarding" << dendl;
1431 message->put();
1432 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1433 cct->_conf->ms_die_on_old_message) {
1434 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1435 }
1436 return nullptr;
1437 }
1438 if (message->get_seq() > cur_seq + 1) {
1439 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
1440 << cur_seq << " to " << message->get_seq() << dendl;
1441 if (cct->_conf->ms_die_on_skipped_message) {
1442 ceph_assert(0 == "skipped incoming seq");
1443 }
1444 }
1445
1446 message->set_connection(connection);
1447
1448#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1449 if (message->get_type() == CEPH_MSG_OSD_OP ||
1450 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1451 utime_t ltt_processed_stamp = ceph_clock_now();
1452 double usecs_elapsed =
1453 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
1454 ostringstream buf;
1455 if (message->get_type() == CEPH_MSG_OSD_OP)
1456 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1457 false);
1458 else
1459 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1460 false);
1461 }
1462#endif
1463
1464 // note last received message.
1465 in_seq = message->get_seq();
1466 ldout(cct, 5) << __func__ << " received message m=" << message
1467 << " seq=" << message->get_seq()
1468 << " from=" << message->get_source() << " type=" << header.type
1469 << " " << *message << dendl;
1470
1471 bool need_dispatch_writer = false;
1472 if (!connection->policy.lossy) {
1473 ack_left++;
1474 need_dispatch_writer = true;
1475 }
1476
1477 state = READY;
1478
1479 connection->logger->inc(l_msgr_recv_messages);
1480 connection->logger->inc(
1481 l_msgr_recv_bytes,
1482 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1483
1484 messenger->ms_fast_preprocess(message);
1485 auto fast_dispatch_time = ceph::mono_clock::now();
1486 connection->logger->tinc(l_msgr_running_recv_time,
1487 fast_dispatch_time - connection->recv_start_time);
1488 if (connection->delay_state) {
1489 double delay_period = 0;
1490 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1491 delay_period =
1492 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1493 ldout(cct, 1) << "queue_received will delay after "
1494 << (ceph_clock_now() + delay_period) << " on " << message
1495 << " " << *message << dendl;
1496 }
1497 connection->delay_state->queue(delay_period, message);
1498 } else if (messenger->ms_can_fast_dispatch(message)) {
1499 connection->lock.unlock();
1500 connection->dispatch_queue->fast_dispatch(message);
1501 connection->recv_start_time = ceph::mono_clock::now();
1502 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1503 connection->recv_start_time - fast_dispatch_time);
1504 connection->lock.lock();
1505 } else {
1506 connection->dispatch_queue->enqueue(message, message->get_priority(),
1507 connection->conn_id);
1508 }
1509
1510 handle_message_ack(current_header.ack_seq);
1511
1512 // we might have been reused by another connection
1513 // let's check if that is the case
1514 if (state != READY) {
1515 // yes, that was the case, let's do nothing
1516 return nullptr;
1517 }
1518
1519 if (need_dispatch_writer && connection->is_connected()) {
1520 connection->center->dispatch_event_external(connection->write_handler);
1521 }
1522
1523 return CONTINUE(read_frame);
1524}
1525
1526
1527CtPtr ProtocolV2::throttle_message() {
1528 ldout(cct, 20) << __func__ << dendl;
1529
1530 if (connection->policy.throttler_messages) {
1531 ldout(cct, 10) << __func__ << " wants " << 1
1532 << " message from policy throttler "
1533 << connection->policy.throttler_messages->get_current()
1534 << "/" << connection->policy.throttler_messages->get_max()
1535 << dendl;
1536 if (!connection->policy.throttler_messages->get_or_fail()) {
1537 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
1538 << connection->policy.throttler_messages->get_current()
1539 << "/" << connection->policy.throttler_messages->get_max()
1540 << " failed, just wait." << dendl;
1541 // following thread pool deal with th full message queue isn't a
1542 // short time, so we can wait a ms.
1543 if (connection->register_time_events.empty()) {
1544 connection->register_time_events.insert(
1545 connection->center->create_time_event(1000,
1546 connection->wakeup_handler));
1547 }
1548 return nullptr;
1549 }
1550 }
1551
1552 state = THROTTLE_BYTES;
1553 return CONTINUE(throttle_bytes);
1554}
1555
1556CtPtr ProtocolV2::throttle_bytes() {
1557 ldout(cct, 20) << __func__ << dendl;
1558
1559 const size_t cur_msg_size = get_current_msg_size();
1560 if (cur_msg_size) {
1561 if (connection->policy.throttler_bytes) {
1562 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1563 << " bytes from policy throttler "
1564 << connection->policy.throttler_bytes->get_current() << "/"
1565 << connection->policy.throttler_bytes->get_max() << dendl;
1566 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
1567 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1568 << " bytes from policy throttler "
1569 << connection->policy.throttler_bytes->get_current()
1570 << "/" << connection->policy.throttler_bytes->get_max()
1571 << " failed, just wait." << dendl;
1572 // following thread pool deal with th full message queue isn't a
1573 // short time, so we can wait a ms.
1574 if (connection->register_time_events.empty()) {
1575 connection->register_time_events.insert(
1576 connection->center->create_time_event(
1577 1000, connection->wakeup_handler));
1578 }
1579 return nullptr;
1580 }
1581 }
1582 }
1583
1584 state = THROTTLE_DISPATCH_QUEUE;
1585 return CONTINUE(throttle_dispatch_queue);
1586}
1587
1588CtPtr ProtocolV2::throttle_dispatch_queue() {
1589 ldout(cct, 20) << __func__ << dendl;
1590
1591 const size_t cur_msg_size = get_current_msg_size();
1592 if (cur_msg_size) {
1593 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1594 cur_msg_size)) {
1595 ldout(cct, 10)
1596 << __func__ << " wants " << cur_msg_size
1597 << " bytes from dispatch throttle "
1598 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1599 << connection->dispatch_queue->dispatch_throttler.get_max()
1600 << " failed, just wait." << dendl;
1601 // following thread pool deal with th full message queue isn't a
1602 // short time, so we can wait a ms.
1603 if (connection->register_time_events.empty()) {
1604 connection->register_time_events.insert(
1605 connection->center->create_time_event(1000,
1606 connection->wakeup_handler));
1607 }
1608 return nullptr;
1609 }
1610 }
1611
1612 throttle_stamp = ceph_clock_now();
1613 state = THROTTLE_DONE;
1614
1615 return read_frame_segment();
1616}
1617
1618CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1619{
1620 ldout(cct, 20) << __func__
1621 << " payload.length()=" << payload.length() << dendl;
1622
1623 if (state != READY) {
1624 lderr(cct) << __func__ << " not in ready state!" << dendl;
1625 return _fault();
1626 }
1627
1628 auto keepalive_frame = KeepAliveFrame::Decode(payload);
1629
1630 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1631
1632 connection->write_lock.lock();
1633 append_keepalive_ack(keepalive_frame.timestamp());
1634 connection->write_lock.unlock();
1635
1636 ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1637 << keepalive_frame.timestamp() << dendl;
1638 connection->set_last_keepalive(ceph_clock_now());
1639
1640 if (is_connected()) {
1641 connection->center->dispatch_event_external(connection->write_handler);
1642 }
1643
1644 return CONTINUE(read_frame);
1645}
1646
1647CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1648{
1649 ldout(cct, 20) << __func__
1650 << " payload.length()=" << payload.length() << dendl;
1651
1652 if (state != READY) {
1653 lderr(cct) << __func__ << " not in ready state!" << dendl;
1654 return _fault();
1655 }
1656
1657 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1658 connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1659 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1660
1661 return CONTINUE(read_frame);
1662}
1663
1664CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1665{
1666 ldout(cct, 20) << __func__
1667 << " payload.length()=" << payload.length() << dendl;
1668
1669 if (state != READY) {
1670 lderr(cct) << __func__ << " not in ready state!" << dendl;
1671 return _fault();
1672 }
1673
1674 auto ack = AckFrame::Decode(payload);
1675 handle_message_ack(ack.seq());
1676 return CONTINUE(read_frame);
1677}
1678
1679/* Client Protocol Methods */
1680
1681CtPtr ProtocolV2::start_client_banner_exchange() {
1682 ldout(cct, 20) << __func__ << dendl;
1683
1684 INTERCEPT(1);
1685
1686 state = BANNER_CONNECTING;
1687
1688 global_seq = messenger->get_global_seq();
1689
1690 return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1691}
1692
1693CtPtr ProtocolV2::post_client_banner_exchange() {
1694 ldout(cct, 20) << __func__ << dendl;
1695
1696 state = AUTH_CONNECTING;
1697
1698 return send_auth_request();
1699}
1700
1701CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
1702 ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1703 << " auth_client " << messenger->auth_client << dendl;
1704 ceph_assert(messenger->auth_client);
1705
1706 bufferlist bl;
1707 vector<uint32_t> preferred_modes;
1708 auto am = auth_meta;
1709 connection->lock.unlock();
1710 int r = messenger->auth_client->get_auth_request(
1711 connection, am.get(),
1712 &am->auth_method, &preferred_modes, &bl);
1713 connection->lock.lock();
1714 if (state != AUTH_CONNECTING) {
1715 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1716 return _fault();
1717 }
1718 if (r < 0) {
1719 ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1720 << dendl;
1721 stop();
1722 connection->dispatch_queue->queue_reset(connection);
1723 return nullptr;
1724 }
1725
1726 INTERCEPT(9);
1727
1728 auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1729 bl);
1730 return WRITE(frame, "auth request", read_frame);
1731}
1732
1733CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1734 ldout(cct, 20) << __func__
1735 << " payload.length()=" << payload.length() << dendl;
1736
1737 if (state != AUTH_CONNECTING) {
1738 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1739 return _fault();
1740 }
1741
1742 auto bad_method = AuthBadMethodFrame::Decode(payload);
1743 ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1744 << " result " << cpp_strerror(bad_method.result())
1745 << ", allowed methods=" << bad_method.allowed_methods()
1746 << ", allowed modes=" << bad_method.allowed_modes()
1747 << dendl;
1748 ceph_assert(messenger->auth_client);
1749 auto am = auth_meta;
1750 connection->lock.unlock();
1751 int r = messenger->auth_client->handle_auth_bad_method(
1752 connection,
1753 am.get(),
1754 bad_method.method(), bad_method.result(),
1755 bad_method.allowed_methods(),
1756 bad_method.allowed_modes());
1757 connection->lock.lock();
1758 if (state != AUTH_CONNECTING || r < 0) {
1759 return _fault();
1760 }
1761 return send_auth_request(bad_method.allowed_methods());
1762}
1763
1764CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1765{
1766 ldout(cct, 20) << __func__
1767 << " payload.length()=" << payload.length() << dendl;
1768
1769 if (state != AUTH_CONNECTING) {
1770 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1771 return _fault();
1772 }
1773
1774 auto auth_more = AuthReplyMoreFrame::Decode(payload);
1775 ldout(cct, 5) << __func__
1776 << " auth reply more len=" << auth_more.auth_payload().length()
1777 << dendl;
1778 ceph_assert(messenger->auth_client);
1779 ceph::bufferlist reply;
1780 auto am = auth_meta;
1781 connection->lock.unlock();
1782 int r = messenger->auth_client->handle_auth_reply_more(
1783 connection, am.get(), auth_more.auth_payload(), &reply);
1784 connection->lock.lock();
1785 if (state != AUTH_CONNECTING) {
1786 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1787 return _fault();
1788 }
1789 if (r < 0) {
1790 lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1791 << r << dendl;
1792 return _fault();
1793 }
1794 auto more_reply = AuthRequestMoreFrame::Encode(reply);
1795 return WRITE(more_reply, "auth request more", read_frame);
1796}
1797
1798CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1799{
1800 ldout(cct, 20) << __func__
1801 << " payload.length()=" << payload.length() << dendl;
1802
1803 if (state != AUTH_CONNECTING) {
1804 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1805 return _fault();
1806 }
1807
1808 auto auth_done = AuthDoneFrame::Decode(payload);
1809
1810 ceph_assert(messenger->auth_client);
1811 auto am = auth_meta;
1812 connection->lock.unlock();
1813 int r = messenger->auth_client->handle_auth_done(
1814 connection,
1815 am.get(),
1816 auth_done.global_id(),
1817 auth_done.con_mode(),
1818 auth_done.auth_payload(),
1819 &am->session_key,
1820 &am->connection_secret);
1821 connection->lock.lock();
1822 if (state != AUTH_CONNECTING) {
1823 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1824 return _fault();
1825 }
1826 if (r < 0) {
1827 return _fault();
1828 }
1829 auth_meta->con_mode = auth_done.con_mode();
1830 session_stream_handlers = \
1831 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
1832
1833 state = AUTH_CONNECTING_SIGN;
1834
1835 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1836 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1837 auto sig_frame = AuthSignatureFrame::Encode(sig);
1838 pre_auth.enabled = false;
1839 pre_auth.rxbuf.clear();
1840 return WRITE(sig_frame, "auth signature", read_frame);
1841}
1842
1843CtPtr ProtocolV2::finish_client_auth() {
1844 if (!server_cookie) {
1845 ceph_assert(connect_seq == 0);
1846 state = SESSION_CONNECTING;
1847 return send_client_ident();
1848 } else { // reconnecting to previous session
1849 state = SESSION_RECONNECTING;
1850 ceph_assert(connect_seq > 0);
1851 return send_reconnect();
1852 }
1853}
1854
1855CtPtr ProtocolV2::send_client_ident() {
1856 ldout(cct, 20) << __func__ << dendl;
1857
1858 if (!connection->policy.lossy && !client_cookie) {
1859 client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1860 }
1861
1862 uint64_t flags = 0;
1863 if (connection->policy.lossy) {
1864 flags |= CEPH_MSG_CONNECT_LOSSY;
1865 }
1866
11fdf7f2
TL
1867 auto client_ident = ClientIdentFrame::Encode(
1868 messenger->get_myaddrs(),
1869 connection->target_addr,
1870 messenger->get_myname().num(),
1871 global_seq,
1872 connection->policy.features_supported,
1873 connection->policy.features_required | msgr2_required,
1874 flags,
1875 client_cookie);
1876
1877 ldout(cct, 5) << __func__ << " sending identification: "
1878 << "addrs=" << messenger->get_myaddrs()
1879 << " target=" << connection->target_addr
1880 << " gid=" << messenger->get_myname().num()
1881 << " global_seq=" << global_seq
1882 << " features_supported=" << std::hex
1883 << connection->policy.features_supported
1884 << " features_required="
1885 << (connection->policy.features_required | msgr2_required)
1886 << " flags=" << flags
1887 << " cookie=" << client_cookie << std::dec << dendl;
1888
1889 INTERCEPT(11);
1890
1891 return WRITE(client_ident, "client ident", read_frame);
1892}
1893
1894CtPtr ProtocolV2::send_reconnect() {
1895 ldout(cct, 20) << __func__ << dendl;
1896
1897 auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1898 client_cookie,
1899 server_cookie,
1900 global_seq,
1901 connect_seq,
1902 in_seq);
1903
1904 ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1905 << std::hex << client_cookie << " server_cookie="
1906 << server_cookie << std::dec
1907 << " gs=" << global_seq << " cs=" << connect_seq
1908 << " ms=" << in_seq << dendl;
1909
1910 INTERCEPT(13);
1911
1912 return WRITE(reconnect, "reconnect", read_frame);
1913}
1914
1915CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1916{
1917 ldout(cct, 20) << __func__
1918 << " payload.length()=" << payload.length() << dendl;
1919
1920 if (state != SESSION_CONNECTING) {
1921 lderr(cct) << __func__ << " not in session connect state!" << dendl;
1922 return _fault();
1923 }
1924
1925 auto ident_missing =
1926 IdentMissingFeaturesFrame::Decode(payload);
1927 lderr(cct) << __func__
1928 << " client does not support all server features: " << std::hex
1929 << ident_missing.features() << std::dec << dendl;
1930
1931 return _fault();
1932}
1933
1934CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
1935{
1936 ldout(cct, 20) << __func__
1937 << " payload.length()=" << payload.length() << dendl;
1938
1939 if (state != SESSION_RECONNECTING) {
1940 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1941 return _fault();
1942 }
1943
1944 auto reset = ResetFrame::Decode(payload);
1945
1946 ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
1947 << dendl;
1948 if (reset.full()) {
1949 reset_session();
1950 } else {
1951 server_cookie = 0;
1952 connect_seq = 0;
1953 in_seq = 0;
1954 }
1955
1956 state = SESSION_CONNECTING;
1957 return send_client_ident();
1958}
1959
1960CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
1961{
1962 ldout(cct, 20) << __func__
1963 << " payload.length()=" << payload.length() << dendl;
1964
1965 if (state != SESSION_RECONNECTING) {
1966 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1967 return _fault();
1968 }
1969
1970 auto retry = RetryFrame::Decode(payload);
1971 connect_seq = retry.connect_seq() + 1;
1972
1973 ldout(cct, 1) << __func__
1974 << " received session retry connect_seq=" << retry.connect_seq()
1975 << ", inc to cs=" << connect_seq << dendl;
1976
1977 return send_reconnect();
1978}
1979
1980CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
1981{
1982 ldout(cct, 20) << __func__
1983 << " payload.length()=" << payload.length() << dendl;
1984
1985 if (state != SESSION_RECONNECTING) {
1986 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1987 return _fault();
1988 }
1989
1990 auto retry = RetryGlobalFrame::Decode(payload);
1991 global_seq = messenger->get_global_seq(retry.global_seq());
1992
1993 ldout(cct, 1) << __func__ << " received session retry global global_seq="
1994 << retry.global_seq() << ", choose new gs=" << global_seq
1995 << dendl;
1996
1997 return send_reconnect();
1998}
1999
2000CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
2001 ldout(cct, 20) << __func__
2002 << " received WAIT (connection race)"
2003 << " payload.length()=" << payload.length()
2004 << dendl;
2005
2006 if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
2007 lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
2008 return _fault();
2009 }
2010
2011 state = WAIT;
2012 WaitFrame::Decode(payload);
2013 return _fault();
2014}
2015
2016CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2017{
2018 ldout(cct, 20) << __func__
2019 << " payload.length()=" << payload.length() << dendl;
2020
2021 if (state != SESSION_RECONNECTING) {
2022 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2023 return _fault();
2024 }
2025
2026 auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2027 ldout(cct, 5) << __func__
2028 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2029 << dendl;
2030
2031 out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2032
2033 backoff = utime_t();
2034 ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2035 << ", lossy = " << connection->policy.lossy << ", features "
2036 << connection->get_features() << dendl;
2037
2038 if (connection->delay_state) {
2039 ceph_assert(connection->delay_state->ready());
2040 }
2041
2042 connection->dispatch_queue->queue_connect(connection);
2043 messenger->ms_deliver_handle_fast_connect(connection);
2044
2045 return ready();
2046}
2047
2048CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2049{
2050 ldout(cct, 20) << __func__
2051 << " payload.length()=" << payload.length() << dendl;
2052
2053 if (state != SESSION_CONNECTING) {
2054 lderr(cct) << __func__ << " not in session connect state!" << dendl;
2055 return _fault();
2056 }
2057
2058 auto server_ident = ServerIdentFrame::Decode(payload);
2059 ldout(cct, 5) << __func__ << " received server identification:"
2060 << " addrs=" << server_ident.addrs()
2061 << " gid=" << server_ident.gid()
2062 << " global_seq=" << server_ident.global_seq()
2063 << " features_supported=" << std::hex
2064 << server_ident.supported_features()
2065 << " features_required=" << server_ident.required_features()
2066 << " flags=" << server_ident.flags() << " cookie=" << std::dec
2067 << server_ident.cookie() << dendl;
2068
2069 // is this who we intended to talk to?
2070 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2071 // of mon_host or something.
2072 if (!server_ident.addrs().contains(connection->target_addr)) {
2073 ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2074 << ", does not include " << connection->target_addr << dendl;
2075 return _fault();
2076 }
2077
2078 server_cookie = server_ident.cookie();
2079
2080 connection->set_peer_addrs(server_ident.addrs());
2081 peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2082 connection->set_features(server_ident.supported_features() &
2083 connection->policy.features_supported);
2084 peer_global_seq = server_ident.global_seq();
2085
2086 connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2087
2088 backoff = utime_t();
2089 ldout(cct, 10) << __func__ << " connect success " << connect_seq
2090 << ", lossy = " << connection->policy.lossy << ", features "
2091 << connection->get_features() << dendl;
2092
2093 if (connection->delay_state) {
2094 ceph_assert(connection->delay_state->ready());
2095 }
2096
2097 connection->dispatch_queue->queue_connect(connection);
2098 messenger->ms_deliver_handle_fast_connect(connection);
2099
2100 return ready();
2101}
2102
2103/* Server Protocol Methods */
2104
2105CtPtr ProtocolV2::start_server_banner_exchange() {
2106 ldout(cct, 20) << __func__ << dendl;
2107
2108 INTERCEPT(2);
2109
2110 state = BANNER_ACCEPTING;
2111
2112 return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2113}
2114
2115CtPtr ProtocolV2::post_server_banner_exchange() {
2116 ldout(cct, 20) << __func__ << dendl;
2117
2118 state = AUTH_ACCEPTING;
2119
2120 return CONTINUE(read_frame);
2121}
2122
2123CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2124 ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2125 << dendl;
2126
2127 if (state != AUTH_ACCEPTING) {
2128 lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2129 return _fault();
2130 }
2131
2132 auto request = AuthRequestFrame::Decode(payload);
2133 ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2134 << ", preferred_modes=" << request.preferred_modes()
2135 << ", payload_len=" << request.auth_payload().length() << ")"
2136 << dendl;
2137 auth_meta->auth_method = request.method();
2138 auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2139 connection->get_peer_type(), auth_meta->auth_method,
2140 request.preferred_modes());
2141 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2142 return _auth_bad_method(-EOPNOTSUPP);
2143 }
2144 return _handle_auth_request(request.auth_payload(), false);
2145}
2146
2147CtPtr ProtocolV2::_auth_bad_method(int r)
2148{
2149 ceph_assert(r < 0);
2150 std::vector<uint32_t> allowed_methods;
2151 std::vector<uint32_t> allowed_modes;
2152 messenger->auth_server->get_supported_auth_methods(
2153 connection->get_peer_type(), &allowed_methods, &allowed_modes);
2154 ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2155 << " r " << cpp_strerror(r)
2156 << ", allowed_methods " << allowed_methods
2157 << ", allowed_modes " << allowed_modes
2158 << dendl;
2159 auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2160 allowed_methods, allowed_modes);
2161 return WRITE(bad_method, "bad auth method", read_frame);
2162}
2163
2164CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
2165{
2166 if (!messenger->auth_server) {
2167 return _fault();
2168 }
2169 bufferlist reply;
2170 auto am = auth_meta;
2171 connection->lock.unlock();
2172 int r = messenger->auth_server->handle_auth_request(
2173 connection, am.get(),
2174 more, am->auth_method, auth_payload,
2175 &reply);
2176 connection->lock.lock();
2177 if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2178 ldout(cct, 1) << __func__
2179 << " state changed while accept, it must be mark_down"
2180 << dendl;
2181 ceph_assert(state == CLOSED);
2182 return _fault();
2183 }
2184 if (r == 1) {
2185 INTERCEPT(10);
2186 state = AUTH_ACCEPTING_SIGN;
2187
2188 auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2189 auth_meta->con_mode,
2190 reply);
2191 return WRITE(auth_done, "auth done", finish_auth);
2192 } else if (r == 0) {
2193 state = AUTH_ACCEPTING_MORE;
2194
2195 auto more = AuthReplyMoreFrame::Encode(reply);
2196 return WRITE(more, "auth reply more", read_frame);
2197 } else if (r == -EBUSY) {
2198 // kick the client and maybe they'll come back later
2199 return _fault();
2200 } else {
2201 return _auth_bad_method(r);
2202 }
2203}
2204
2205CtPtr ProtocolV2::finish_auth()
2206{
2207 ceph_assert(auth_meta);
2208 // TODO: having a possibility to check whether we're server or client could
2209 // allow reusing finish_auth().
2210 session_stream_handlers = \
2211 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
2212
2213 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2214 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2215 auto sig_frame = AuthSignatureFrame::Encode(sig);
2216 pre_auth.enabled = false;
2217 pre_auth.rxbuf.clear();
2218 return WRITE(sig_frame, "auth signature", read_frame);
2219}
2220
2221CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2222{
2223 ldout(cct, 20) << __func__
2224 << " payload.length()=" << payload.length() << dendl;
2225
2226 if (state != AUTH_ACCEPTING_MORE) {
2227 lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2228 return _fault();
2229 }
2230
2231 auto auth_more = AuthRequestMoreFrame::Decode(payload);
2232 return _handle_auth_request(auth_more.auth_payload(), true);
2233}
2234
2235CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2236{
2237 ldout(cct, 20) << __func__
2238 << " payload.length()=" << payload.length() << dendl;
2239
2240 if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2241 lderr(cct) << __func__
2242 << " pre-auth verification signature seen in wrong state!"
2243 << dendl;
2244 return _fault();
2245 }
2246
2247 auto sig_frame = AuthSignatureFrame::Decode(payload);
2248
2249 const auto actual_tx_sig = auth_meta->session_key.empty() ?
2250 sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2251 if (sig_frame.signature() != actual_tx_sig) {
2252 ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2253 << " actual_tx_sig=" << actual_tx_sig
2254 << " sig_frame.signature()=" << sig_frame.signature()
2255 << dendl;
2256 return _fault();
2257 } else {
2258 ldout(cct, 20) << __func__ << " pre-auth signature success"
2259 << " sig_frame.signature()=" << sig_frame.signature()
2260 << dendl;
2261 pre_auth.txbuf.clear();
2262 }
2263
2264 if (state == AUTH_ACCEPTING_SIGN) {
2265 // server had sent AuthDone and client responded with correct pre-auth
2266 // signature. we can start accepting new sessions/reconnects.
2267 state = SESSION_ACCEPTING;
2268 return CONTINUE(read_frame);
2269 } else if (state == AUTH_CONNECTING_SIGN) {
2270 // this happened at client side
2271 return finish_client_auth();
2272 } else {
2273 ceph_assert_always("state corruption" == nullptr);
2274 }
2275}
2276
2277CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2278{
2279 ldout(cct, 20) << __func__
2280 << " payload.length()=" << payload.length() << dendl;
2281
2282 if (state != SESSION_ACCEPTING) {
2283 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2284 return _fault();
2285 }
2286
2287 auto client_ident = ClientIdentFrame::Decode(payload);
2288
2289 ldout(cct, 5) << __func__ << " received client identification:"
2290 << " addrs=" << client_ident.addrs()
2291 << " target=" << client_ident.target_addr()
2292 << " gid=" << client_ident.gid()
2293 << " global_seq=" << client_ident.global_seq()
2294 << " features_supported=" << std::hex
2295 << client_ident.supported_features()
2296 << " features_required=" << client_ident.required_features()
2297 << " flags=" << client_ident.flags()
2298 << " cookie=" << client_ident.cookie() << std::dec << dendl;
2299
2300 if (client_ident.addrs().empty() ||
2301 client_ident.addrs().front() == entity_addr_t()) {
2302 ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2303 return _fault(); // a v2 peer should never do this
2304 }
2305 if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2306 ldout(cct,5) << __func__ << " peer is trying to reach "
2307 << client_ident.target_addr()
2308 << " which is not us (" << messenger->get_myaddrs() << ")"
2309 << dendl;
2310 return _fault();
2311 }
2312
2313 connection->set_peer_addrs(client_ident.addrs());
2314 connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2315
2316 peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2317 connection->set_peer_id(client_ident.gid());
2318
2319 client_cookie = client_ident.cookie();
2320
2321 uint64_t feat_missing =
2322 (connection->policy.features_required | msgr2_required) &
2323 ~(uint64_t)client_ident.supported_features();
2324 if (feat_missing) {
2325 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2326 << feat_missing << std::dec << dendl;
2327 auto ident_missing_features =
2328 IdentMissingFeaturesFrame::Encode(feat_missing);
2329
2330 return WRITE(ident_missing_features, "ident missing features", read_frame);
2331 }
2332
2333 connection_features =
2334 client_ident.supported_features() & connection->policy.features_supported;
2335
2336 peer_global_seq = client_ident.global_seq();
2337
2338 // Looks good so far, let's check if there is already an existing connection
2339 // to this peer.
2340
2341 connection->lock.unlock();
2342 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2343
2344 if (existing &&
2345 existing->protocol->proto_type != 2) {
2346 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2347 << existing->protocol.get() << " version is "
2348 << existing->protocol->proto_type << ", marking down" << dendl;
2349 existing->mark_down();
2350 existing = nullptr;
2351 }
2352
2353 connection->inject_delay();
2354
2355 connection->lock.lock();
2356 if (state != SESSION_ACCEPTING) {
2357 ldout(cct, 1) << __func__
2358 << " state changed while accept, it must be mark_down"
2359 << dendl;
2360 ceph_assert(state == CLOSED);
2361 return _fault();
2362 }
2363
2364 if (existing) {
2365 return handle_existing_connection(existing);
2366 }
2367
2368 // if everything is OK reply with server identification
2369 return send_server_ident();
2370}
2371
2372CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2373{
2374 ldout(cct, 20) << __func__
2375 << " payload.length()=" << payload.length() << dendl;
2376
2377 if (state != SESSION_ACCEPTING) {
2378 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2379 return _fault();
2380 }
2381
2382 auto reconnect = ReconnectFrame::Decode(payload);
2383
2384 ldout(cct, 5) << __func__
2385 << " received reconnect:"
2386 << " client_cookie=" << std::hex << reconnect.client_cookie()
2387 << " server_cookie=" << reconnect.server_cookie() << std::dec
2388 << " gs=" << reconnect.global_seq()
2389 << " cs=" << reconnect.connect_seq()
2390 << " ms=" << reconnect.msg_seq()
2391 << dendl;
2392
2393 // Should we check if one of the ident.addrs match connection->target_addr
2394 // as we do in ProtocolV1?
2395 connection->set_peer_addrs(reconnect.addrs());
2396 connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2397 peer_global_seq = reconnect.global_seq();
2398
2399 connection->lock.unlock();
2400 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2401
2402 if (existing &&
2403 existing->protocol->proto_type != 2) {
2404 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2405 << existing->protocol.get() << " version is "
2406 << existing->protocol->proto_type << ", marking down" << dendl;
2407 existing->mark_down();
2408 existing = nullptr;
2409 }
2410
2411 connection->inject_delay();
2412
2413 connection->lock.lock();
2414 if (state != SESSION_ACCEPTING) {
2415 ldout(cct, 1) << __func__
2416 << " state changed while accept, it must be mark_down"
2417 << dendl;
2418 ceph_assert(state == CLOSED);
2419 return _fault();
2420 }
2421
2422 if (!existing) {
2423 // there is no existing connection therefore cannot reconnect to previous
2424 // session
2425 ldout(cct, 0) << __func__
2426 << " no existing connection exists, reseting client" << dendl;
2427 auto reset = ResetFrame::Encode(true);
2428 return WRITE(reset, "session reset", read_frame);
2429 }
2430
2431 std::lock_guard<std::mutex> l(existing->lock);
2432
2433 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2434 if (!exproto) {
2435 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2436 ceph_assert(false);
2437 }
2438
2439 if (exproto->state == CLOSED) {
2440 ldout(cct, 5) << __func__ << " existing " << existing
2441 << " already closed. Reseting client" << dendl;
2442 auto reset = ResetFrame::Encode(true);
2443 return WRITE(reset, "session reset", read_frame);
2444 }
2445
2446 if (exproto->replacing) {
2447 ldout(cct, 1) << __func__
2448 << " existing racing replace happened while replacing."
2449 << " existing=" << existing << dendl;
2450 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2451 return WRITE(retry, "session retry", read_frame);
2452 }
2453
2454 if (exproto->client_cookie != reconnect.client_cookie()) {
2455 ldout(cct, 1) << __func__ << " existing=" << existing
2456 << " client cookie mismatch, I must have reseted:"
2457 << " cc=" << std::hex << exproto->client_cookie
2458 << " rcc=" << reconnect.client_cookie()
2459 << ", reseting client." << std::dec
2460 << dendl;
2461 auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2462 return WRITE(reset, "session reset", read_frame);
2463 } else if (exproto->server_cookie == 0) {
2464 // this happens when:
2465 // - a connects to b
2466 // - a sends client_ident
2467 // - b gets client_ident, sends server_ident and sets cookie X
2468 // - connection fault
2469 // - b reconnects to a with cookie X, connect_seq=1
2470 // - a has cookie==0
2471 ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2472 << " server_ident. Asking peer to resume session"
2473 << " establishment" << dendl;
2474 auto reset = ResetFrame::Encode(false);
2475 return WRITE(reset, "session reset", read_frame);
2476 }
2477
2478 if (exproto->peer_global_seq > reconnect.global_seq()) {
2479 ldout(cct, 5) << __func__
2480 << " stale global_seq: sgs=" << exproto->peer_global_seq
2481 << " cgs=" << reconnect.global_seq()
2482 << ", ask client to retry global" << dendl;
2483 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2484
2485 INTERCEPT(18);
2486
2487 return WRITE(retry, "session retry", read_frame);
2488 }
2489
2490 if (exproto->connect_seq > reconnect.connect_seq()) {
2491 ldout(cct, 5) << __func__
2492 << " stale connect_seq scs=" << exproto->connect_seq
2493 << " ccs=" << reconnect.connect_seq()
2494 << " , ask client to retry" << dendl;
2495 auto retry = RetryFrame::Encode(exproto->connect_seq);
2496 return WRITE(retry, "session retry", read_frame);
2497 }
2498
2499 if (exproto->connect_seq == reconnect.connect_seq()) {
2500 // reconnect race: both peers are sending reconnect messages
2501 if (existing->peer_addrs->msgr2_addr() >
2502 messenger->get_myaddrs().msgr2_addr() &&
2503 !existing->policy.server) {
2504 // the existing connection wins
2505 ldout(cct, 1)
2506 << __func__
2507 << " reconnect race detected, this connection loses to existing="
2508 << existing << dendl;
2509
2510 auto wait = WaitFrame::Encode();
2511 return WRITE(wait, "wait", read_frame);
2512 } else {
2513 // this connection wins
2514 ldout(cct, 1) << __func__
2515 << " reconnect race detected, replacing existing="
2516 << existing << " socket by this connection's socket"
2517 << dendl;
2518 }
2519 }
2520
2521 ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2522
2523 reconnecting = true;
2524
2525 // everything looks good
2526 exproto->connect_seq = reconnect.connect_seq();
2527 exproto->message_seq = reconnect.msg_seq();
2528
2529 return reuse_connection(existing, exproto);
2530}
2531
2532CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
2533 ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2534
2535 std::lock_guard<std::mutex> l(existing->lock);
2536
2537 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2538 if (!exproto) {
2539 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2540 ceph_assert(false);
2541 }
2542
2543 if (exproto->state == CLOSED) {
2544 ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2545 << dendl;
2546 return send_server_ident();
2547 }
2548
2549 if (exproto->replacing) {
2550 ldout(cct, 1) << __func__
2551 << " existing racing replace happened while replacing."
2552 << " existing=" << existing << dendl;
2553 auto wait = WaitFrame::Encode();
2554 return WRITE(wait, "wait", read_frame);
2555 }
2556
2557 if (exproto->peer_global_seq > peer_global_seq) {
2558 ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2559 << peer_global_seq
2560 << " existing->peer_global_seq=" << exproto->peer_global_seq
2561 << ", stopping this connection." << dendl;
2562 stop();
2563 connection->dispatch_queue->queue_reset(connection);
2564 return nullptr;
2565 }
2566
2567 if (existing->policy.lossy) {
2568 // existing connection can be thrown out in favor of this one
2569 ldout(cct, 1)
2570 << __func__ << " existing=" << existing
2571 << " is a lossy channel. Stopping existing in favor of this connection"
2572 << dendl;
2573 existing->protocol->stop();
2574 existing->dispatch_queue->queue_reset(existing.get());
2575 return send_server_ident();
2576 }
2577
2578 if (exproto->server_cookie && exproto->client_cookie &&
2579 exproto->client_cookie != client_cookie) {
2580 // Found previous session
2581 // peer has reseted and we're going to reuse the existing connection
2582 // by replacing the communication socket
2583 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2584 << ", peer must have reseted." << dendl;
2585 if (connection->policy.resetcheck) {
2586 exproto->reset_session();
2587 }
2588 return reuse_connection(existing, exproto);
2589 }
2590
2591 if (exproto->client_cookie == client_cookie) {
2592 // session establishment interrupted between client_ident and server_ident,
2593 // continuing...
2594 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2595 << ", continuing session establishment." << dendl;
2596 return reuse_connection(existing, exproto);
2597 }
2598
2599 if (exproto->state == READY || exproto->state == STANDBY) {
2600 ldout(cct, 1) << __func__ << " existing=" << existing
2601 << " is READY/STANDBY, lets reuse it" << dendl;
2602 return reuse_connection(existing, exproto);
2603 }
2604
2605 // Looks like a connection race: server and client are both connecting to
2606 // each other at the same time.
2607 if (connection->peer_addrs->msgr2_addr() <
2608 messenger->get_myaddrs().msgr2_addr() ||
2609 existing->policy.server) {
2610 // this connection wins
2611 ldout(cct, 1) << __func__
2612 << " connection race detected, replacing existing="
2613 << existing << " socket by this connection's socket" << dendl;
2614 return reuse_connection(existing, exproto);
2615 } else {
2616 // the existing connection wins
2617 ldout(cct, 1)
2618 << __func__
2619 << " connection race detected, this connection loses to existing="
2620 << existing << dendl;
2621 ceph_assert(connection->peer_addrs->msgr2_addr() >
2622 messenger->get_myaddrs().msgr2_addr());
2623
2624 // make sure we follow through with opening the existing
2625 // connection (if it isn't yet open) since we know the peer
2626 // has something to send to us.
2627 existing->send_keepalive();
2628 auto wait = WaitFrame::Encode();
2629 return WRITE(wait, "wait", read_frame);
2630 }
2631}
2632
2633CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
2634 ProtocolV2 *exproto) {
2635 ldout(cct, 20) << __func__ << " existing=" << existing
2636 << " reconnect=" << reconnecting << dendl;
2637
2638 connection->inject_delay();
2639
2640 std::lock_guard<std::mutex> l(existing->write_lock);
2641
2642 connection->center->delete_file_event(connection->cs.fd(),
2643 EVENT_READABLE | EVENT_WRITABLE);
2644
2645 if (existing->delay_state) {
2646 existing->delay_state->flush();
2647 ceph_assert(!connection->delay_state);
2648 }
2649 exproto->reset_recv_state();
2650 exproto->pre_auth.enabled = false;
2651
2652 if (!reconnecting) {
2653 exproto->client_cookie = client_cookie;
2654 exproto->peer_name = peer_name;
2655 exproto->connection_features = connection_features;
2656 existing->set_features(connection_features);
2657 }
2658 exproto->peer_global_seq = peer_global_seq;
2659
2660 auto temp_cs = std::move(connection->cs);
2661 EventCenter *new_center = connection->center;
2662 Worker *new_worker = connection->worker;
2663
2664 ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2665 << dendl;
2666 // avoid _stop shutdown replacing socket
2667 // queue a reset on the new connection, which we're dumping for the old
2668 stop();
2669
2670 connection->dispatch_queue->queue_reset(connection);
2671
2672 exproto->can_write = false;
2673 exproto->reconnecting = reconnecting;
2674 exproto->replacing = true;
2675 std::swap(exproto->session_stream_handlers, session_stream_handlers);
2676 exproto->auth_meta = auth_meta;
2677 existing->state_offset = 0;
2678 // avoid previous thread modify event
2679 exproto->state = NONE;
2680 existing->state = AsyncConnection::STATE_NONE;
2681 // Discard existing prefetch buffer in `recv_buf`
2682 existing->recv_start = existing->recv_end = 0;
2683 // there shouldn't exist any buffer
2684 ceph_assert(connection->recv_start == connection->recv_end);
2685
2686 auto deactivate_existing = std::bind(
2687 [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
2688 // we need to delete time event in original thread
2689 {
2690 std::lock_guard<std::mutex> l(existing->lock);
2691 existing->write_lock.lock();
2692 exproto->requeue_sent();
2693 existing->outcoming_bl.clear();
2694 existing->open_write = false;
2695 existing->write_lock.unlock();
2696 if (exproto->state == NONE) {
2697 existing->shutdown_socket();
2698 existing->cs = std::move(cs);
2699 existing->worker->references--;
2700 new_worker->references++;
2701 existing->logger = new_worker->get_perf_counter();
2702 existing->worker = new_worker;
2703 existing->center = new_center;
2704 if (existing->delay_state)
2705 existing->delay_state->set_center(new_center);
2706 } else if (exproto->state == CLOSED) {
2707 auto back_to_close = std::bind(
2708 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2709 new_center->submit_to(new_center->get_id(),
2710 std::move(back_to_close), true);
2711 return;
2712 } else {
2713 ceph_abort();
2714 }
2715 }
2716
2717 // Before changing existing->center, it may already exists some
2718 // events in existing->center's queue. Then if we mark down
2719 // `existing`, it will execute in another thread and clean up
2720 // connection. Previous event will result in segment fault
2721 auto transfer_existing = [existing, exproto]() mutable {
2722 std::lock_guard<std::mutex> l(existing->lock);
2723 if (exproto->state == CLOSED) return;
2724 ceph_assert(exproto->state == NONE);
2725
2726 exproto->state = SESSION_ACCEPTING;
81eedcae
TL
2727 // we have called shutdown_socket above
2728 ceph_assert(existing->last_tick_id == 0);
2729 // restart timer since we are going to re-build connection
2730 existing->last_connect_started = ceph::coarse_mono_clock::now();
2731 existing->last_tick_id = existing->center->create_time_event(
2732 existing->connect_timeout_us, existing->tick_handler);
11fdf7f2
TL
2733 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2734 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2735 existing->read_handler);
2736 if (!exproto->reconnecting) {
2737 exproto->run_continuation(exproto->send_server_ident());
2738 } else {
2739 exproto->run_continuation(exproto->send_reconnect_ok());
2740 }
2741 };
2742 if (existing->center->in_thread())
2743 transfer_existing();
2744 else
2745 existing->center->submit_to(existing->center->get_id(),
2746 std::move(transfer_existing), true);
2747 },
2748 std::move(temp_cs));
2749
2750 existing->center->submit_to(existing->center->get_id(),
2751 std::move(deactivate_existing), true);
2752 return nullptr;
2753}
2754
2755CtPtr ProtocolV2::send_server_ident() {
2756 ldout(cct, 20) << __func__ << dendl;
2757
2758 // this is required for the case when this connection is being replaced
2759 out_seq = discard_requeued_up_to(out_seq, 0);
2760 in_seq = 0;
2761
2762 if (!connection->policy.lossy) {
2763 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2764 }
2765
2766 uint64_t flags = 0;
2767 if (connection->policy.lossy) {
2768 flags = flags | CEPH_MSG_CONNECT_LOSSY;
2769 }
2770
2771 uint64_t gs = messenger->get_global_seq();
2772 auto server_ident = ServerIdentFrame::Encode(
2773 messenger->get_myaddrs(),
2774 messenger->get_myname().num(),
2775 gs,
2776 connection->policy.features_supported,
2777 connection->policy.features_required | msgr2_required,
2778 flags,
2779 server_cookie);
2780
2781 ldout(cct, 5) << __func__ << " sending identification:"
2782 << " addrs=" << messenger->get_myaddrs()
2783 << " gid=" << messenger->get_myname().num()
2784 << " global_seq=" << gs << " features_supported=" << std::hex
2785 << connection->policy.features_supported
2786 << " features_required="
2787 << (connection->policy.features_required | msgr2_required)
2788 << " flags=" << flags << " cookie=" << std::dec << server_cookie
2789 << dendl;
2790
2791 connection->lock.unlock();
2792 // Because "replacing" will prevent other connections preempt this addr,
2793 // it's safe that here we don't acquire Connection's lock
2794 ssize_t r = messenger->accept_conn(connection);
2795
2796 connection->inject_delay();
2797
2798 connection->lock.lock();
2799
2800 if (r < 0) {
2801 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2802 << connection->peer_addrs->msgr2_addr()
2803 << " just fail later one(this)" << dendl;
2804 connection->inject_delay();
2805 return _fault();
2806 }
2807 if (state != SESSION_ACCEPTING) {
2808 ldout(cct, 1) << __func__
2809 << " state changed while accept_conn, it must be mark_down"
2810 << dendl;
2811 ceph_assert(state == CLOSED || state == NONE);
2812 messenger->unregister_conn(connection);
2813 connection->inject_delay();
2814 return _fault();
2815 }
2816
2817 connection->set_features(connection_features);
2818
2819 // notify
2820 connection->dispatch_queue->queue_accept(connection);
2821 messenger->ms_deliver_handle_fast_accept(connection);
2822
2823 INTERCEPT(12);
2824
2825 return WRITE(server_ident, "server ident", server_ready);
2826}
2827
2828CtPtr ProtocolV2::server_ready() {
2829 ldout(cct, 20) << __func__ << dendl;
2830
2831 if (connection->delay_state) {
2832 ceph_assert(connection->delay_state->ready());
2833 }
2834
2835 return ready();
2836}
2837
2838CtPtr ProtocolV2::send_reconnect_ok() {
2839 ldout(cct, 20) << __func__ << dendl;
2840
2841 out_seq = discard_requeued_up_to(out_seq, message_seq);
2842
2843 uint64_t ms = in_seq;
2844 auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2845
2846 ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2847
2848 connection->lock.unlock();
2849 // Because "replacing" will prevent other connections preempt this addr,
2850 // it's safe that here we don't acquire Connection's lock
2851 ssize_t r = messenger->accept_conn(connection);
2852
2853 connection->inject_delay();
2854
2855 connection->lock.lock();
2856
2857 if (r < 0) {
2858 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2859 << connection->peer_addrs->msgr2_addr()
2860 << " just fail later one(this)" << dendl;
2861 connection->inject_delay();
2862 return _fault();
2863 }
2864 if (state != SESSION_ACCEPTING) {
2865 ldout(cct, 1) << __func__
2866 << " state changed while accept_conn, it must be mark_down"
2867 << dendl;
2868 ceph_assert(state == CLOSED || state == NONE);
2869 messenger->unregister_conn(connection);
2870 connection->inject_delay();
2871 return _fault();
2872 }
2873
2874 // notify
2875 connection->dispatch_queue->queue_accept(connection);
2876 messenger->ms_deliver_handle_fast_accept(connection);
2877
2878 INTERCEPT(14);
2879
2880 return WRITE(reconnect_ok, "reconnect ok", server_ready);
2881}