]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/ProtocolV2.cc
update download target update for octopus release
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <type_traits>
5
6#include "ProtocolV2.h"
7#include "AsyncMessenger.h"
8
9#include "common/EventTrace.h"
10#include "common/ceph_crypto.h"
11#include "common/errno.h"
12#include "include/random.h"
13#include "auth/AuthClient.h"
14#include "auth/AuthServer.h"
15
16#define dout_subsys ceph_subsys_ms
17#undef dout_prefix
18#define dout_prefix _conn_prefix(_dout)
19ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
20 return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21 << *connection->peer_addrs << " conn(" << connection << " "
22 << this
23 << " " << ceph_con_mode_name(auth_meta->con_mode)
24 << " :" << connection->port
25 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq << " l=" << connection->policy.lossy
27 << " rx=" << session_stream_handlers.rx.get()
28 << " tx=" << session_stream_handlers.tx.get()
29 << ").";
30}
31
32using namespace ceph::msgr::v2;
33
34using CtPtr = Ct<ProtocolV2> *;
35using CtRef = Ct<ProtocolV2> &;
36
37void ProtocolV2::run_continuation(CtPtr pcontinuation) {
38 if (pcontinuation) {
39 run_continuation(*pcontinuation);
40 }
41}
42
43void ProtocolV2::run_continuation(CtRef continuation) {
44 try {
45 CONTINUATION_RUN(continuation)
46 } catch (const buffer::error &e) {
47 lderr(cct) << __func__ << " failed decoding of frame header: " << e
48 << dendl;
49 _fault();
50 } catch (const ceph::crypto::onwire::MsgAuthError &e) {
51 lderr(cct) << __func__ << " " << e.what() << dendl;
52 _fault();
53 } catch (const DecryptionError &) {
54 lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
55 }
56}
57
58#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
59
60#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
61
62#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
63
64#ifdef UNIT_TESTS_BUILT
65
66#define INTERCEPT(S) { \
67if(connection->interceptor) { \
68 auto a = connection->interceptor->intercept(connection, (S)); \
69 if (a == Interceptor::ACTION::FAIL) { \
70 return _fault(); \
71 } else if (a == Interceptor::ACTION::STOP) { \
72 stop(); \
73 connection->dispatch_queue->queue_reset(connection); \
74 return nullptr; \
75 }}}
76
77#else
78#define INTERCEPT(S)
79#endif
80
81ProtocolV2::ProtocolV2(AsyncConnection *connection)
82 : Protocol(2, connection),
83 state(NONE),
84 peer_required_features(0),
85 client_cookie(0),
86 server_cookie(0),
87 global_seq(0),
88 connect_seq(0),
89 peer_global_seq(0),
90 message_seq(0),
91 reconnecting(false),
92 replacing(false),
93 can_write(false),
94 bannerExchangeCallback(nullptr),
95 next_tag(static_cast<Tag>(0)),
96 keepalive(false) {
97}
98
99ProtocolV2::~ProtocolV2() {
100}
101
102void ProtocolV2::connect() {
103 ldout(cct, 1) << __func__ << dendl;
104 state = START_CONNECT;
105 pre_auth.enabled = true;
106}
107
108void ProtocolV2::accept() {
109 ldout(cct, 1) << __func__ << dendl;
110 state = START_ACCEPT;
111}
112
113bool ProtocolV2::is_connected() { return can_write; }
114
115/*
116 * Tears down the message queues, and removes them from the
117 * DispatchQueue Must hold write_lock prior to calling.
118 */
119void ProtocolV2::discard_out_queue() {
120 ldout(cct, 10) << __func__ << " started" << dendl;
121
122 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
123 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
124 (*p)->put();
125 }
126 sent.clear();
127 for (auto& [ prio, entries ] : out_queue) {
128 static_cast<void>(prio);
129 for (auto& entry : entries) {
130 ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
131 entry.m->put();
132 }
133 }
134 out_queue.clear();
494da23a 135 write_in_progress = false;
11fdf7f2
TL
136}
137
138void ProtocolV2::reset_session() {
139 ldout(cct, 1) << __func__ << dendl;
140
141 std::lock_guard<std::mutex> l(connection->write_lock);
142 if (connection->delay_state) {
143 connection->delay_state->discard();
144 }
145
146 connection->dispatch_queue->discard_queue(connection->conn_id);
147 discard_out_queue();
148 connection->outcoming_bl.clear();
149
150 connection->dispatch_queue->queue_remote_reset(connection);
151
152 out_seq = 0;
153 in_seq = 0;
154 client_cookie = 0;
155 server_cookie = 0;
156 connect_seq = 0;
157 peer_global_seq = 0;
158 message_seq = 0;
159 ack_left = 0;
160 can_write = false;
161}
162
163void ProtocolV2::stop() {
164 ldout(cct, 1) << __func__ << dendl;
165 if (state == CLOSED) {
166 return;
167 }
168
169 if (connection->delay_state) connection->delay_state->flush();
170
171 std::lock_guard<std::mutex> l(connection->write_lock);
172
173 reset_recv_state();
174 discard_out_queue();
175
176 connection->_stop();
177
178 can_write = false;
179 state = CLOSED;
180}
181
182void ProtocolV2::fault() { _fault(); }
183
184void ProtocolV2::requeue_sent() {
494da23a 185 write_in_progress = false;
11fdf7f2
TL
186 if (sent.empty()) {
187 return;
188 }
189
190 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
191 out_seq -= sent.size();
192 while (!sent.empty()) {
193 Message *m = sent.back();
194 sent.pop_back();
195 ldout(cct, 5) << __func__ << " requeueing message m=" << m
196 << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
197 << *m << dendl;
198 rq.emplace_front(out_queue_entry_t{false, m});
199 }
200}
201
202uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
203 ldout(cct, 10) << __func__ << " " << seq << dendl;
204 std::lock_guard<std::mutex> l(connection->write_lock);
205 if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
206 return seq;
207 }
208 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
209 uint64_t count = out_seq;
210 while (!rq.empty()) {
211 Message* const m = rq.front().m;
212 if (m->get_seq() == 0 || m->get_seq() > seq) break;
213 ldout(cct, 5) << __func__ << " discarding message m=" << m
214 << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
215 << *m << dendl;
216 m->put();
217 rq.pop_front();
218 count++;
219 }
220 if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
221 return count;
222}
223
224void ProtocolV2::reset_recv_state() {
494da23a
TL
225 auth_meta.reset(new AuthConnectionMeta);
226 session_stream_handlers.tx.reset(nullptr);
227 session_stream_handlers.rx.reset(nullptr);
228 pre_auth.txbuf.clear();
229 pre_auth.rxbuf.clear();
11fdf7f2
TL
230
231 // clean read and write callbacks
232 connection->pendingReadLen.reset();
233 connection->writeCallback.reset();
234
235 next_tag = static_cast<Tag>(0);
236
237 reset_throttle();
238}
239
240size_t ProtocolV2::get_current_msg_size() const {
241 ceph_assert(!rx_segments_desc.empty());
242 size_t sum = 0;
243 // we don't include SegmentIndex::Msg::HEADER.
244 for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
245 sum += rx_segments_desc[idx].length;
246 }
247 return sum;
248}
249
250void ProtocolV2::reset_throttle() {
251 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
252 connection->policy.throttler_messages) {
253 ldout(cct, 10) << __func__ << " releasing " << 1
254 << " message to policy throttler "
255 << connection->policy.throttler_messages->get_current()
256 << "/" << connection->policy.throttler_messages->get_max()
257 << dendl;
258 connection->policy.throttler_messages->put();
259 }
260 if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
261 if (connection->policy.throttler_bytes) {
262 const size_t cur_msg_size = get_current_msg_size();
263 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
264 << " bytes to policy throttler "
265 << connection->policy.throttler_bytes->get_current() << "/"
266 << connection->policy.throttler_bytes->get_max() << dendl;
267 connection->policy.throttler_bytes->put(cur_msg_size);
268 }
269 }
270 if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
271 const size_t cur_msg_size = get_current_msg_size();
272 ldout(cct, 10)
273 << __func__ << " releasing " << cur_msg_size
274 << " bytes to dispatch_queue throttler "
275 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
276 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
277 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
278 }
279}
280
281CtPtr ProtocolV2::_fault() {
282 ldout(cct, 10) << __func__ << dendl;
283
284 if (state == CLOSED || state == NONE) {
285 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
286 return nullptr;
287 }
288
289 if (connection->policy.lossy &&
290 !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
291 ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
292 stop();
293 connection->dispatch_queue->queue_reset(connection);
294 return nullptr;
295 }
296
297 connection->write_lock.lock();
298
299 can_write = false;
300 // requeue sent items
301 requeue_sent();
302
303 if (out_queue.empty() && state >= START_ACCEPT &&
304 state <= SESSION_ACCEPTING && !replacing) {
305 ldout(cct, 2) << __func__ << " with nothing to send and in the half "
306 << " accept state just closed" << dendl;
307 connection->write_lock.unlock();
308 stop();
309 connection->dispatch_queue->queue_reset(connection);
310 return nullptr;
311 }
312
313 replacing = false;
314 connection->fault();
315 reset_recv_state();
316
317 reconnecting = false;
318
319 if (connection->policy.standby && out_queue.empty() && !keepalive &&
320 state != WAIT) {
321 ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
322 << dendl;
323 state = STANDBY;
324 connection->write_lock.unlock();
325 return nullptr;
326 }
327 if (connection->policy.server) {
328 ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
329 state = STANDBY;
330 connection->write_lock.unlock();
331 return nullptr;
332 }
333
334 connection->write_lock.unlock();
335
336 if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
337 state != WAIT &&
338 state != SESSION_ACCEPTING /* due to connection race */) {
339 // policy maybe empty when state is in accept
340 if (connection->policy.server) {
341 ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
342 state = STANDBY;
343 } else {
344 ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
345 connect_seq++;
346 global_seq = messenger->get_global_seq();
347 state = START_CONNECT;
348 pre_auth.enabled = true;
349 connection->state = AsyncConnection::STATE_CONNECTING;
350 }
351 backoff = utime_t();
352 connection->center->dispatch_event_external(connection->read_handler);
353 } else {
354 if (state == WAIT) {
355 backoff.set_from_double(cct->_conf->ms_max_backoff);
356 } else if (backoff == utime_t()) {
357 backoff.set_from_double(cct->_conf->ms_initial_backoff);
358 } else {
359 backoff += backoff;
360 if (backoff > cct->_conf->ms_max_backoff)
361 backoff.set_from_double(cct->_conf->ms_max_backoff);
362 }
363
364 if (server_cookie) {
365 connect_seq++;
366 }
367
368 global_seq = messenger->get_global_seq();
369 state = START_CONNECT;
370 pre_auth.enabled = true;
371 connection->state = AsyncConnection::STATE_CONNECTING;
372 ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
373 // woke up again;
374 connection->register_time_events.insert(
375 connection->center->create_time_event(backoff.to_nsec() / 1000,
376 connection->wakeup_handler));
377 }
378 return nullptr;
379}
380
381void ProtocolV2::prepare_send_message(uint64_t features,
382 Message *m) {
383 ldout(cct, 20) << __func__ << " m=" << *m << dendl;
384
385 // associate message with Connection (for benefit of encode_payload)
386 if (m->empty_payload()) {
387 ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
388 << " " << *m << dendl;
389 } else {
390 ldout(cct, 20) << __func__ << " half-reencoding features " << features
391 << " " << m << " " << *m << dendl;
392 }
393
394 // encode and copy out of *m
395 m->encode(features, 0);
396}
397
398void ProtocolV2::send_message(Message *m) {
399 uint64_t f = connection->get_features();
400
401 // TODO: Currently not all messages supports reencode like MOSDMap, so here
402 // only let fast dispatch support messages prepare message
403 const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
404 if (can_fast_prepare) {
405 prepare_send_message(f, m);
406 }
407
408 std::lock_guard<std::mutex> l(connection->write_lock);
409 bool is_prepared = can_fast_prepare;
410 // "features" changes will change the payload encoding
411 if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
412 // ensure the correctness of message encoding
413 m->clear_payload();
414 is_prepared = false;
415 ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
416 << " != " << connection->get_features() << dendl;
417 }
418 if (state == CLOSED) {
419 ldout(cct, 10) << __func__ << " connection closed."
420 << " Drop message " << m << dendl;
421 m->put();
422 } else {
423 ldout(cct, 5) << __func__ << " enqueueing message m=" << m
424 << " type=" << m->get_type() << " " << *m << dendl;
425 m->trace.event("async enqueueing message");
426 out_queue[m->get_priority()].emplace_back(
427 out_queue_entry_t{is_prepared, m});
428 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
429 << dendl;
494da23a
TL
430 if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
431 write_in_progress = true;
11fdf7f2
TL
432 connection->center->dispatch_event_external(connection->write_handler);
433 }
434 }
435}
436
437void ProtocolV2::send_keepalive() {
438 ldout(cct, 10) << __func__ << dendl;
439 std::lock_guard<std::mutex> l(connection->write_lock);
440 if (state != CLOSED) {
441 keepalive = true;
442 connection->center->dispatch_event_external(connection->write_handler);
443 }
444}
445
446void ProtocolV2::read_event() {
447 ldout(cct, 20) << __func__ << dendl;
448
449 switch (state) {
450 case START_CONNECT:
451 run_continuation(CONTINUATION(start_client_banner_exchange));
452 break;
453 case START_ACCEPT:
454 run_continuation(CONTINUATION(start_server_banner_exchange));
455 break;
456 case READY:
457 run_continuation(CONTINUATION(read_frame));
458 break;
459 case THROTTLE_MESSAGE:
460 run_continuation(CONTINUATION(throttle_message));
461 break;
462 case THROTTLE_BYTES:
463 run_continuation(CONTINUATION(throttle_bytes));
464 break;
465 case THROTTLE_DISPATCH_QUEUE:
466 run_continuation(CONTINUATION(throttle_dispatch_queue));
467 break;
468 default:
469 break;
470 }
471}
472
473ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
474 out_queue_entry_t out_entry;
475
476 if (!out_queue.empty()) {
477 auto it = out_queue.rbegin();
478 auto& entries = it->second;
479 ceph_assert(!entries.empty());
480 out_entry = entries.front();
481 entries.pop_front();
482 if (entries.empty()) {
483 out_queue.erase(it->first);
484 }
485 }
486 return out_entry;
487}
488
489ssize_t ProtocolV2::write_message(Message *m, bool more) {
490 FUNCTRACE(cct);
491 ceph_assert(connection->center->in_thread());
492 m->set_seq(++out_seq);
493
494 connection->lock.lock();
495 uint64_t ack_seq = in_seq;
496 ack_left = 0;
497 connection->lock.unlock();
498
499 ceph_msg_header &header = m->get_header();
500 ceph_msg_footer &footer = m->get_footer();
501
502 ceph_msg_header2 header2{header.seq, header.tid,
503 header.type, header.priority,
504 header.version,
eafe8130
TL
505 init_le32(0), header.data_off,
506 init_le64(ack_seq),
11fdf7f2
TL
507 footer.flags, header.compat_version,
508 header.reserved};
509
510 auto message = MessageFrame::Encode(
511 header2,
512 m->get_payload(),
513 m->get_middle(),
514 m->get_data());
515 connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
516
517 ldout(cct, 5) << __func__ << " sending message m=" << m
518 << " seq=" << m->get_seq() << " " << *m << dendl;
519
520 m->trace.event("async writing message");
521 ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
522 << " src=" << entity_name_t(messenger->get_myname())
523 << " off=" << header2.data_off
524 << dendl;
525 ssize_t total_send_size = connection->outcoming_bl.length();
526 ssize_t rc = connection->_try_send(more);
527 if (rc < 0) {
528 ldout(cct, 1) << __func__ << " error sending " << m << ", "
529 << cpp_strerror(rc) << dendl;
530 } else {
531 connection->logger->inc(
532 l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
533 ldout(cct, 10) << __func__ << " sending " << m
534 << (rc ? " continuely." : " done.") << dendl;
535 }
536 if (m->get_type() == CEPH_MSG_OSD_OP)
537 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
538 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
539 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
540 m->put();
541
542 return rc;
543}
544
545void ProtocolV2::append_keepalive() {
546 ldout(cct, 10) << __func__ << dendl;
547 auto keepalive_frame = KeepAliveFrame::Encode();
548 connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
549}
550
551void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
552 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
553 connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
554}
555
556void ProtocolV2::handle_message_ack(uint64_t seq) {
557 if (connection->policy.lossy) { // lossy connections don't keep sent messages
558 return;
559 }
560
561 ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
562
563 // trim sent list
564 static const int max_pending = 128;
565 int i = 0;
566 Message *pending[max_pending];
567 connection->write_lock.lock();
568 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
569 Message *m = sent.front();
570 sent.pop_front();
571 pending[i++] = m;
572 ldout(cct, 10) << __func__ << " got ack seq " << seq
573 << " >= " << m->get_seq() << " on " << m << " " << *m
574 << dendl;
575 }
576 connection->write_lock.unlock();
577 for (int k = 0; k < i; k++) {
578 pending[k]->put();
579 }
580}
581
582void ProtocolV2::write_event() {
583 ldout(cct, 10) << __func__ << dendl;
584 ssize_t r = 0;
585
586 connection->write_lock.lock();
587 if (can_write) {
588 if (keepalive) {
589 append_keepalive();
590 keepalive = false;
591 }
592
593 auto start = ceph::mono_clock::now();
594 bool more;
595 do {
596 const auto out_entry = _get_next_outgoing();
597 if (!out_entry.m) {
598 break;
599 }
600
601 if (!connection->policy.lossy) {
602 // put on sent list
603 sent.push_back(out_entry.m);
604 out_entry.m->get();
605 }
606 more = !out_queue.empty();
607 connection->write_lock.unlock();
608
609 // send_message or requeue messages may not encode message
610 if (!out_entry.is_prepared) {
611 prepare_send_message(connection->get_features(), out_entry.m);
612 }
613
614 r = write_message(out_entry.m, more);
615
616 connection->write_lock.lock();
617 if (r == 0) {
618 ;
619 } else if (r < 0) {
620 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
621 break;
622 } else if (r > 0)
623 break;
624 } while (can_write);
494da23a 625 write_in_progress = false;
11fdf7f2
TL
626
627 // if r > 0 mean data still lefted, so no need _try_send.
628 if (r == 0) {
629 uint64_t left = ack_left;
630 if (left) {
631 ceph_le64 s;
632 s = in_seq;
633 auto ack = AckFrame::Encode(in_seq);
634 connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
635 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
636 << " messages" << dendl;
637 ack_left -= left;
638 left = ack_left;
639 r = connection->_try_send(left);
640 } else if (is_queued()) {
641 r = connection->_try_send();
642 }
643 }
644 connection->write_lock.unlock();
645
646 connection->logger->tinc(l_msgr_running_send_time,
647 ceph::mono_clock::now() - start);
648 if (r < 0) {
649 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
650 connection->lock.lock();
651 fault();
652 connection->lock.unlock();
653 return;
654 }
655 } else {
494da23a 656 write_in_progress = false;
11fdf7f2
TL
657 connection->write_lock.unlock();
658 connection->lock.lock();
659 connection->write_lock.lock();
660 if (state == STANDBY && !connection->policy.server && is_queued()) {
661 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
662 if (server_cookie) { // only increment connect_seq if there is a session
663 connect_seq++;
664 }
665 connection->_connect();
666 } else if (connection->cs && state != NONE && state != CLOSED &&
667 state != START_CONNECT) {
668 r = connection->_try_send();
669 if (r < 0) {
670 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
671 connection->write_lock.unlock();
672 fault();
673 connection->lock.unlock();
674 return;
675 }
676 }
677 connection->write_lock.unlock();
678 connection->lock.unlock();
679 }
680}
681
682bool ProtocolV2::is_queued() {
683 return !out_queue.empty() || connection->is_queued();
684}
685
686uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
687 if (session_stream_handlers.rx) {
688 return segment_onwire_size(logical_size);
689 } else {
690 return logical_size;
691 }
692}
693
694uint32_t ProtocolV2::get_epilogue_size() const {
695 // In secure mode size of epilogue is flexible and depends on particular
696 // cipher implementation. See the comment for epilogue_secure_block_t or
697 // epilogue_plain_block_t.
698 if (session_stream_handlers.rx) {
699 return FRAME_SECURE_EPILOGUE_SIZE + \
700 session_stream_handlers.rx->get_extra_size_at_final();
701 } else {
702 return FRAME_PLAIN_EPILOGUE_SIZE;
703 }
704}
705
706CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
707 rx_buffer_t &&buffer) {
708 const auto len = buffer->length();
709 const auto buf = buffer->c_str();
710 next.node = std::move(buffer);
711 ssize_t r = connection->read(len, buf,
712 [&next, this](char *buffer, int r) {
713 if (unlikely(pre_auth.enabled) && r >= 0) {
714 pre_auth.rxbuf.append(*next.node);
715 ceph_assert(!cct->_conf->ms_die_on_bug ||
716 pre_auth.rxbuf.length() < 1000000);
717 }
718 next.r = r;
719 run_continuation(next);
720 });
721 if (r <= 0) {
722 // error or done synchronously
723 if (unlikely(pre_auth.enabled) && r >= 0) {
724 pre_auth.rxbuf.append(*next.node);
725 ceph_assert(!cct->_conf->ms_die_on_bug ||
726 pre_auth.rxbuf.length() < 1000000);
727 }
728 next.r = r;
729 return &next;
730 }
731
732 return nullptr;
733}
734
735template <class F>
736CtPtr ProtocolV2::write(const std::string &desc,
737 CONTINUATION_TYPE<ProtocolV2> &next,
738 F &frame) {
739 ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
740 return write(desc, next, bl);
741}
742
743CtPtr ProtocolV2::write(const std::string &desc,
744 CONTINUATION_TYPE<ProtocolV2> &next,
745 bufferlist &buffer) {
746 if (unlikely(pre_auth.enabled)) {
747 pre_auth.txbuf.append(buffer);
748 ceph_assert(!cct->_conf->ms_die_on_bug ||
749 pre_auth.txbuf.length() < 1000000);
750 }
751
752 ssize_t r =
753 connection->write(buffer, [&next, desc, this](int r) {
754 if (r < 0) {
755 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
756 << " (" << cpp_strerror(r) << ")" << dendl;
757 connection->inject_delay();
758 _fault();
759 }
760 run_continuation(next);
761 });
762
763 if (r < 0) {
764 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
765 << " (" << cpp_strerror(r) << ")" << dendl;
766 return _fault();
767 } else if (r == 0) {
768 next.setParams();
769 return &next;
770 }
771
772 return nullptr;
773}
774
775CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
776 ldout(cct, 20) << __func__ << dendl;
777 bannerExchangeCallback = &callback;
778
779 bufferlist banner_payload;
780 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
781 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
782
783 bufferlist bl;
784 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
785 encode((uint16_t)banner_payload.length(), bl, 0);
786 bl.claim_append(banner_payload);
787
788 INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
789
790 return WRITE(bl, "banner", _wait_for_peer_banner);
791}
792
793CtPtr ProtocolV2::_wait_for_peer_banner() {
794 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
795 return READ(banner_len, _handle_peer_banner);
796}
797
798CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
799 ldout(cct, 20) << __func__ << " r=" << r << dendl;
800
801 if (r < 0) {
802 ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
803 << cpp_strerror(r) << ")" << dendl;
804 return _fault();
805 }
806
807 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
808
809 if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
810 if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
811 lderr(cct) << __func__ << " peer " << *connection->peer_addrs
812 << " is using msgr V1 protocol" << dendl;
813 return _fault();
814 }
815 ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
816 return _fault();
817 }
818
819 uint16_t payload_len;
820 bufferlist bl;
821 buffer->set_offset(banner_prefix_len);
822 buffer->set_length(sizeof(__le16));
823 bl.push_back(std::move(buffer));
824 auto ti = bl.cbegin();
825 try {
826 decode(payload_len, ti);
827 } catch (const buffer::error &e) {
828 lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
829 return _fault();
830 }
831
832 INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
833
834 return READ(payload_len, _handle_peer_banner_payload);
835}
836
837CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
838 ldout(cct, 20) << __func__ << " r=" << r << dendl;
839
840 if (r < 0) {
841 ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
842 << " (" << cpp_strerror(r) << ")" << dendl;
843 return _fault();
844 }
845
846 uint64_t peer_supported_features;
847 uint64_t peer_required_features;
848
849 bufferlist bl;
850 bl.push_back(std::move(buffer));
851 auto ti = bl.cbegin();
852 try {
853 decode(peer_supported_features, ti);
854 decode(peer_required_features, ti);
855 } catch (const buffer::error &e) {
856 lderr(cct) << __func__ << " decode banner payload failed " << dendl;
857 return _fault();
858 }
859
860 ldout(cct, 1) << __func__ << " supported=" << std::hex
861 << peer_supported_features << " required=" << std::hex
862 << peer_required_features << std::dec << dendl;
863
864 // Check feature bit compatibility
865
866 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
867 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
868
869 if ((required_features & peer_supported_features) != required_features) {
870 ldout(cct, 1) << __func__ << " peer does not support all required features"
871 << " required=" << std::hex << required_features
872 << " supported=" << std::hex << peer_supported_features
873 << std::dec << dendl;
874 stop();
875 connection->dispatch_queue->queue_reset(connection);
876 return nullptr;
877 }
878 if ((supported_features & peer_required_features) != peer_required_features) {
879 ldout(cct, 1) << __func__ << " we do not support all peer required features"
880 << " required=" << std::hex << peer_required_features
881 << " supported=" << supported_features << std::dec << dendl;
882 stop();
883 connection->dispatch_queue->queue_reset(connection);
884 return nullptr;
885 }
886
887 this->peer_required_features = peer_required_features;
888 if (this->peer_required_features == 0) {
889 this->connection_features = msgr2_required;
890 }
891
892 // at this point we can change how the client protocol behaves based on
893 // this->peer_required_features
894
895 if (state == BANNER_CONNECTING) {
896 state = HELLO_CONNECTING;
897 }
898 else {
899 ceph_assert(state == BANNER_ACCEPTING);
900 state = HELLO_ACCEPTING;
901 }
902
903 auto hello = HelloFrame::Encode(messenger->get_mytype(),
904 connection->target_addr);
905
906 INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
907
908 return WRITE(hello, "hello frame", read_frame);
909}
910
911CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
912{
913 ldout(cct, 20) << __func__
914 << " payload.length()=" << payload.length() << dendl;
915
916 if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
917 lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
918 return _fault();
919 }
920
921 auto hello = HelloFrame::Decode(payload);
922
923 ldout(cct, 5) << __func__ << " received hello:"
924 << " peer_type=" << (int)hello.entity_type()
925 << " peer_addr_for_me=" << hello.peer_addr() << dendl;
926
81eedcae
TL
927 sockaddr_storage ss;
928 socklen_t len = sizeof(ss);
929 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
930 ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
931 << " when talking to " << connection->target_addr << dendl;
932
11fdf7f2
TL
933 if (connection->get_peer_type() == -1) {
934 connection->set_peer_type(hello.entity_type());
935
936 ceph_assert(state == HELLO_ACCEPTING);
937 connection->policy = messenger->get_policy(hello.entity_type());
938 ldout(cct, 10) << __func__ << " accept of host_type "
939 << (int)hello.entity_type()
940 << ", policy.lossy=" << connection->policy.lossy
941 << " policy.server=" << connection->policy.server
942 << " policy.standby=" << connection->policy.standby
943 << " policy.resetcheck=" << connection->policy.resetcheck
944 << dendl;
945 } else {
946 ceph_assert(state == HELLO_CONNECTING);
947 if (connection->get_peer_type() != hello.entity_type()) {
948 ldout(cct, 1) << __func__ << " connection peer type does not match what"
949 << " peer advertises " << connection->get_peer_type()
950 << " != " << (int)hello.entity_type() << dendl;
951 stop();
952 connection->dispatch_queue->queue_reset(connection);
953 return nullptr;
954 }
955 }
956
81eedcae
TL
957 if (messenger->get_myaddrs().empty() ||
958 messenger->get_myaddrs().front().is_blank_ip()) {
959 entity_addr_t a;
960 if (cct->_conf->ms_learn_addr_from_peer) {
961 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
962 << " says I am " << hello.peer_addr() << " (socket says "
963 << (sockaddr*)&ss << ")" << dendl;
964 a = hello.peer_addr();
965 } else {
966 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
967 << " says I am " << (sockaddr*)&ss
968 << " (peer says " << hello.peer_addr() << ")" << dendl;
969 a.set_sockaddr((sockaddr *)&ss);
970 }
971 a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
972 a.set_port(0);
973 connection->lock.unlock();
974 messenger->learned_addr(a);
975 if (cct->_conf->ms_inject_internal_delays &&
976 cct->_conf->ms_inject_socket_failures) {
977 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
978 ldout(cct, 10) << __func__ << " sleep for "
979 << cct->_conf->ms_inject_internal_delays << dendl;
980 utime_t t;
981 t.set_from_double(cct->_conf->ms_inject_internal_delays);
982 t.sleep();
983 }
984 }
985 connection->lock.lock();
986 if (state != HELLO_CONNECTING) {
987 ldout(cct, 1) << __func__
988 << " state changed while learned_addr, mark_down or "
989 << " replacing must be happened just now" << dendl;
990 return nullptr;
991 }
992 }
993
994
995
11fdf7f2
TL
996 CtPtr callback;
997 callback = bannerExchangeCallback;
998 bannerExchangeCallback = nullptr;
999 ceph_assert(callback);
1000 return callback;
1001}
1002
1003CtPtr ProtocolV2::read_frame() {
1004 if (state == CLOSED) {
1005 return nullptr;
1006 }
1007
1008 ldout(cct, 20) << __func__ << dendl;
1009 return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
1010}
1011
1012CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
1013 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1014
1015 if (r < 0) {
1016 ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
1017 << " (" << cpp_strerror(r) << ")" << dendl;
1018 return _fault();
1019 }
1020
1021 ceph::bufferlist preamble;
1022 preamble.push_back(std::move(buffer));
1023
1024 ldout(cct, 30) << __func__ << " preamble\n";
1025 preamble.hexdump(*_dout);
1026 *_dout << dendl;
1027
1028 if (session_stream_handlers.rx) {
1029 ceph_assert(session_stream_handlers.rx);
1030
1031 session_stream_handlers.rx->reset_rx_handler();
1032 preamble = session_stream_handlers.rx->authenticated_decrypt_update(
1033 std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
1034
1035 ldout(cct, 10) << __func__ << " got encrypted preamble."
1036 << " after decrypt premable.length()=" << preamble.length()
1037 << dendl;
1038
1039 ldout(cct, 30) << __func__ << " preamble after decrypt\n";
1040 preamble.hexdump(*_dout);
1041 *_dout << dendl;
1042 }
1043
1044 {
1045 // I expect ceph_le32 will make the endian conversion for me. Passing
1046 // everything through ::Decode is unnecessary.
1047 const auto& main_preamble = \
1048 reinterpret_cast<preamble_block_t&>(*preamble.c_str());
1049
1050 // verify preamble's CRC before any further processing
1051 const auto rx_crc = ceph_crc32c(0,
1052 reinterpret_cast<const unsigned char*>(&main_preamble),
1053 sizeof(main_preamble) - sizeof(main_preamble.crc));
1054 if (rx_crc != main_preamble.crc) {
1055 ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
1056 << " rx_crc=" << rx_crc
1057 << " tx_crc=" << main_preamble.crc << dendl;
1058 return _fault();
1059 }
1060
1061 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1062 if (main_preamble.num_segments < 1 ||
1063 main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1064 ldout(cct, 10) << __func__ << " unsupported num_segments="
1065 << " tx_crc=" << main_preamble.num_segments << dendl;
1066 return _fault();
1067 }
1068
1069 next_tag = static_cast<Tag>(main_preamble.tag);
1070
1071 rx_segments_desc.clear();
1072 rx_segments_data.clear();
1073
1074 if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1075 ldout(cct, 30) << __func__
1076 << " num_segments=" << main_preamble.num_segments
1077 << " is too much" << dendl;
1078 return _fault();
1079 }
1080 for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
1081 ldout(cct, 10) << __func__ << " got new segment:"
1082 << " len=" << main_preamble.segments[idx].length
1083 << " align=" << main_preamble.segments[idx].alignment
1084 << dendl;
1085 rx_segments_desc.emplace_back(main_preamble.segments[idx]);
1086 }
1087 }
1088
1089 // does it need throttle?
1090 if (next_tag == Tag::MESSAGE) {
1091 if (state != READY) {
1092 lderr(cct) << __func__ << " not in ready state!" << dendl;
1093 return _fault();
1094 }
1095 state = THROTTLE_MESSAGE;
1096 return CONTINUE(throttle_message);
1097 } else {
1098 return read_frame_segment();
1099 }
1100}
1101
1102CtPtr ProtocolV2::handle_read_frame_dispatch() {
1103 ldout(cct, 10) << __func__
1104 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1105
1106 switch (next_tag) {
1107 case Tag::HELLO:
1108 case Tag::AUTH_REQUEST:
1109 case Tag::AUTH_BAD_METHOD:
1110 case Tag::AUTH_REPLY_MORE:
1111 case Tag::AUTH_REQUEST_MORE:
1112 case Tag::AUTH_DONE:
1113 case Tag::AUTH_SIGNATURE:
1114 case Tag::CLIENT_IDENT:
1115 case Tag::SERVER_IDENT:
1116 case Tag::IDENT_MISSING_FEATURES:
1117 case Tag::SESSION_RECONNECT:
1118 case Tag::SESSION_RESET:
1119 case Tag::SESSION_RETRY:
1120 case Tag::SESSION_RETRY_GLOBAL:
1121 case Tag::SESSION_RECONNECT_OK:
1122 case Tag::KEEPALIVE2:
1123 case Tag::KEEPALIVE2_ACK:
1124 case Tag::ACK:
1125 case Tag::WAIT:
1126 return handle_frame_payload();
1127 case Tag::MESSAGE:
1128 return handle_message();
1129 default: {
1130 lderr(cct) << __func__
1131 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1132 << dendl;
1133 return _fault();
1134 }
1135 }
1136
1137 return nullptr;
1138}
1139
1140CtPtr ProtocolV2::read_frame_segment() {
1141 ldout(cct, 20) << __func__ << dendl;
1142 ceph_assert(!rx_segments_desc.empty());
1143
1144 // description of current segment to read
1145 const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
1146 rx_buffer_t rx_buffer;
1147 try {
1148 rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
1149 get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
1150 } catch (std::bad_alloc&) {
1151 // Catching because of potential issues with satisfying alignment.
1152 ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
1153 << " len=" << get_onwire_size(cur_rx_desc.length)
1154 << " align=" << cur_rx_desc.alignment
1155 << dendl;
1156 return _fault();
1157 }
1158
1159 return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1160}
1161
1162CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1163 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1164
1165 if (r < 0) {
1166 ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1167 << cpp_strerror(r) << ")" << dendl;
1168 return _fault();
1169 }
1170
1171 rx_segments_data.emplace_back();
1172 rx_segments_data.back().push_back(std::move(rx_buffer));
1173
1174 // decrypt incoming data
1175 // FIXME: if (auth_meta->is_mode_secure()) {
1176 if (session_stream_handlers.rx) {
1177 ceph_assert(session_stream_handlers.rx);
1178
1179 auto& new_seg = rx_segments_data.back();
1180 if (new_seg.length()) {
1181 auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
1182 std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
1183 const auto idx = rx_segments_data.size() - 1;
1184 new_seg.clear();
1185 padded.splice(0, rx_segments_desc[idx].length, &new_seg);
1186
1187 ldout(cct, 20) << __func__
1188 << " unpadded new_seg.length()=" << new_seg.length()
1189 << dendl;
1190 }
1191 }
1192
1193 if (rx_segments_desc.size() == rx_segments_data.size()) {
1194 // OK, all segments planned to read are read. Can go with epilogue.
1195 return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
1196 } else {
1197 // TODO: for makeshift only. This will be more generic and throttled
1198 return read_frame_segment();
1199 }
1200}
1201
1202CtPtr ProtocolV2::handle_frame_payload() {
1203 ceph_assert(!rx_segments_data.empty());
1204 auto& payload = rx_segments_data.back();
1205
1206 ldout(cct, 30) << __func__ << "\n";
1207 payload.hexdump(*_dout);
1208 *_dout << dendl;
1209
1210 switch (next_tag) {
1211 case Tag::HELLO:
1212 return handle_hello(payload);
1213 case Tag::AUTH_REQUEST:
1214 return handle_auth_request(payload);
1215 case Tag::AUTH_BAD_METHOD:
1216 return handle_auth_bad_method(payload);
1217 case Tag::AUTH_REPLY_MORE:
1218 return handle_auth_reply_more(payload);
1219 case Tag::AUTH_REQUEST_MORE:
1220 return handle_auth_request_more(payload);
1221 case Tag::AUTH_DONE:
1222 return handle_auth_done(payload);
1223 case Tag::AUTH_SIGNATURE:
1224 return handle_auth_signature(payload);
1225 case Tag::CLIENT_IDENT:
1226 return handle_client_ident(payload);
1227 case Tag::SERVER_IDENT:
1228 return handle_server_ident(payload);
1229 case Tag::IDENT_MISSING_FEATURES:
1230 return handle_ident_missing_features(payload);
1231 case Tag::SESSION_RECONNECT:
1232 return handle_reconnect(payload);
1233 case Tag::SESSION_RESET:
1234 return handle_session_reset(payload);
1235 case Tag::SESSION_RETRY:
1236 return handle_session_retry(payload);
1237 case Tag::SESSION_RETRY_GLOBAL:
1238 return handle_session_retry_global(payload);
1239 case Tag::SESSION_RECONNECT_OK:
1240 return handle_reconnect_ok(payload);
1241 case Tag::KEEPALIVE2:
1242 return handle_keepalive2(payload);
1243 case Tag::KEEPALIVE2_ACK:
1244 return handle_keepalive2_ack(payload);
1245 case Tag::ACK:
1246 return handle_message_ack(payload);
1247 case Tag::WAIT:
1248 return handle_wait(payload);
1249 default:
1250 ceph_abort();
1251 }
1252 return nullptr;
1253}
1254
1255CtPtr ProtocolV2::ready() {
1256 ldout(cct, 25) << __func__ << dendl;
1257
1258 reconnecting = false;
1259 replacing = false;
1260
1261 // make sure no pending tick timer
1262 if (connection->last_tick_id) {
1263 connection->center->delete_time_event(connection->last_tick_id);
1264 }
1265 connection->last_tick_id = connection->center->create_time_event(
1266 connection->inactive_timeout_us, connection->tick_handler);
1267
1268 {
1269 std::lock_guard<std::mutex> l(connection->write_lock);
1270 can_write = true;
1271 if (!out_queue.empty()) {
1272 connection->center->dispatch_event_external(connection->write_handler);
1273 }
1274 }
1275
1276 connection->maybe_start_delay_thread();
1277
1278 state = READY;
1279 ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1280 << std::hex << client_cookie << " server_cookie="
1281 << server_cookie << std::dec << " in_seq=" << in_seq
1282 << " out_seq=" << out_seq << dendl;
1283
1284 INTERCEPT(15);
1285
1286 return CONTINUE(read_frame);
1287}
1288
1289CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1290{
1291 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1292
1293 if (r < 0) {
1294 ldout(cct, 1) << __func__ << " read data error " << dendl;
1295 return _fault();
1296 }
1297
1298 __u8 late_flags;
1299
1300 // FIXME: if (auth_meta->is_mode_secure()) {
1301 if (session_stream_handlers.rx) {
1302 ldout(cct, 1) << __func__ << " read frame epilogue bytes="
1303 << get_epilogue_size() << dendl;
1304
1305 // decrypt epilogue and authenticate entire frame.
1306 ceph::bufferlist epilogue_bl;
1307 {
1308 epilogue_bl.push_back(std::move(buffer));
1309 try {
1310 epilogue_bl =
1311 session_stream_handlers.rx->authenticated_decrypt_update_final(
1312 std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
1313 } catch (ceph::crypto::onwire::MsgAuthError &e) {
1314 ldout(cct, 5) << __func__ << " message authentication failed: "
1315 << e.what() << dendl;
1316 return _fault();
1317 }
1318 }
1319 auto& epilogue =
1320 reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
1321 late_flags = epilogue.late_flags;
1322 } else {
1323 auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
1324
1325 for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
1326 const __u32 expected_crc = epilogue.crc_values[idx];
1327 const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
1328 if (expected_crc != calculated_crc) {
1329 ldout(cct, 5) << __func__ << " message integrity check failed: "
1330 << " expected_crc=" << expected_crc
1331 << " calculated_crc=" << calculated_crc
1332 << dendl;
1333 return _fault();
1334 } else {
1335 ldout(cct, 20) << __func__ << " message integrity check success: "
1336 << " expected_crc=" << expected_crc
1337 << " calculated_crc=" << calculated_crc
1338 << dendl;
1339 }
1340 }
1341 late_flags = epilogue.late_flags;
1342 }
1343
1344 // we do have a mechanism that allows transmitter to start sending message
1345 // and abort after putting entire data field on wire. This will be used by
1346 // the kernel client to avoid unnecessary buffering.
1347 if (late_flags & FRAME_FLAGS_LATEABRT) {
1348 reset_throttle();
1349 state = READY;
1350 return CONTINUE(read_frame);
1351 } else {
1352 return handle_read_frame_dispatch();
1353 }
1354}
1355
1356CtPtr ProtocolV2::handle_message() {
1357 ldout(cct, 20) << __func__ << dendl;
1358 ceph_assert(state == THROTTLE_DONE);
1359
1360#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1361 ltt_recv_stamp = ceph_clock_now();
1362#endif
1363 recv_stamp = ceph_clock_now();
1364
1365 // we need to get the size before std::moving segments data
1366 const size_t cur_msg_size = get_current_msg_size();
1367 auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
1368
1369 // XXX: paranoid copy just to avoid oops
1370 ceph_msg_header2 current_header = msg_frame.header();
1371
1372 ldout(cct, 5) << __func__
1373 << " got " << msg_frame.front_len()
1374 << " + " << msg_frame.middle_len()
1375 << " + " << msg_frame.data_len()
1376 << " byte message."
1377 << " envelope type=" << current_header.type
1378 << " src " << peer_name
1379 << " off " << current_header.data_off
1380 << dendl;
1381
1382 INTERCEPT(16);
1383 ceph_msg_header header{current_header.seq,
1384 current_header.tid,
1385 current_header.type,
1386 current_header.priority,
1387 current_header.version,
eafe8130
TL
1388 init_le32(msg_frame.front_len()),
1389 init_le32(msg_frame.middle_len()),
1390 init_le32(msg_frame.data_len()),
11fdf7f2
TL
1391 current_header.data_off,
1392 peer_name,
1393 current_header.compat_version,
1394 current_header.reserved,
eafe8130
TL
1395 init_le32(0)};
1396 ceph_msg_footer footer{init_le32(0), init_le32(0),
1397 init_le32(0), init_le64(0), current_header.flags};
11fdf7f2
TL
1398
1399 Message *message = decode_message(cct, 0, header, footer,
1400 msg_frame.front(),
1401 msg_frame.middle(),
1402 msg_frame.data(),
1403 connection);
1404 if (!message) {
1405 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1406 return _fault();
1407 } else {
1408 state = READ_MESSAGE_COMPLETE;
1409 }
1410
1411 INTERCEPT(17);
1412
1413 message->set_byte_throttler(connection->policy.throttler_bytes);
1414 message->set_message_throttler(connection->policy.throttler_messages);
1415
1416 // store reservation size in message, so we don't get confused
1417 // by messages entering the dispatch queue through other paths.
1418 message->set_dispatch_throttle_size(cur_msg_size);
1419
1420 message->set_recv_stamp(recv_stamp);
1421 message->set_throttle_stamp(throttle_stamp);
1422 message->set_recv_complete_stamp(ceph_clock_now());
1423
1424 // check received seq#. if it is old, drop the message.
1425 // note that incoming messages may skip ahead. this is convenient for the
1426 // client side queueing because messages can't be renumbered, but the (kernel)
1427 // client will occasionally pull a message out of the sent queue to send
1428 // elsewhere. in that case it doesn't matter if we "got" it or not.
1429 uint64_t cur_seq = in_seq;
1430 if (message->get_seq() <= cur_seq) {
1431 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1432 << " <= " << cur_seq << " " << message << " " << *message
1433 << ", discarding" << dendl;
1434 message->put();
1435 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1436 cct->_conf->ms_die_on_old_message) {
1437 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1438 }
1439 return nullptr;
1440 }
1441 if (message->get_seq() > cur_seq + 1) {
1442 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
1443 << cur_seq << " to " << message->get_seq() << dendl;
1444 if (cct->_conf->ms_die_on_skipped_message) {
1445 ceph_assert(0 == "skipped incoming seq");
1446 }
1447 }
1448
11fdf7f2
TL
1449#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1450 if (message->get_type() == CEPH_MSG_OSD_OP ||
1451 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1452 utime_t ltt_processed_stamp = ceph_clock_now();
1453 double usecs_elapsed =
1454 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
1455 ostringstream buf;
1456 if (message->get_type() == CEPH_MSG_OSD_OP)
1457 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1458 false);
1459 else
1460 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1461 false);
1462 }
1463#endif
1464
1465 // note last received message.
1466 in_seq = message->get_seq();
1467 ldout(cct, 5) << __func__ << " received message m=" << message
1468 << " seq=" << message->get_seq()
1469 << " from=" << message->get_source() << " type=" << header.type
1470 << " " << *message << dendl;
1471
1472 bool need_dispatch_writer = false;
1473 if (!connection->policy.lossy) {
1474 ack_left++;
1475 need_dispatch_writer = true;
1476 }
1477
1478 state = READY;
1479
1480 connection->logger->inc(l_msgr_recv_messages);
1481 connection->logger->inc(
1482 l_msgr_recv_bytes,
1483 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1484
1485 messenger->ms_fast_preprocess(message);
1486 auto fast_dispatch_time = ceph::mono_clock::now();
1487 connection->logger->tinc(l_msgr_running_recv_time,
1488 fast_dispatch_time - connection->recv_start_time);
1489 if (connection->delay_state) {
1490 double delay_period = 0;
1491 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1492 delay_period =
1493 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1494 ldout(cct, 1) << "queue_received will delay after "
1495 << (ceph_clock_now() + delay_period) << " on " << message
1496 << " " << *message << dendl;
1497 }
1498 connection->delay_state->queue(delay_period, message);
1499 } else if (messenger->ms_can_fast_dispatch(message)) {
1500 connection->lock.unlock();
1501 connection->dispatch_queue->fast_dispatch(message);
1502 connection->recv_start_time = ceph::mono_clock::now();
1503 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1504 connection->recv_start_time - fast_dispatch_time);
1505 connection->lock.lock();
1506 } else {
1507 connection->dispatch_queue->enqueue(message, message->get_priority(),
1508 connection->conn_id);
1509 }
1510
1511 handle_message_ack(current_header.ack_seq);
1512
1513 // we might have been reused by another connection
1514 // let's check if that is the case
1515 if (state != READY) {
1516 // yes, that was the case, let's do nothing
1517 return nullptr;
1518 }
1519
1520 if (need_dispatch_writer && connection->is_connected()) {
1521 connection->center->dispatch_event_external(connection->write_handler);
1522 }
1523
1524 return CONTINUE(read_frame);
1525}
1526
1527
1528CtPtr ProtocolV2::throttle_message() {
1529 ldout(cct, 20) << __func__ << dendl;
1530
1531 if (connection->policy.throttler_messages) {
1532 ldout(cct, 10) << __func__ << " wants " << 1
1533 << " message from policy throttler "
1534 << connection->policy.throttler_messages->get_current()
1535 << "/" << connection->policy.throttler_messages->get_max()
1536 << dendl;
1537 if (!connection->policy.throttler_messages->get_or_fail()) {
1538 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
1539 << connection->policy.throttler_messages->get_current()
1540 << "/" << connection->policy.throttler_messages->get_max()
1541 << " failed, just wait." << dendl;
1542 // following thread pool deal with th full message queue isn't a
1543 // short time, so we can wait a ms.
1544 if (connection->register_time_events.empty()) {
1545 connection->register_time_events.insert(
1546 connection->center->create_time_event(1000,
1547 connection->wakeup_handler));
1548 }
1549 return nullptr;
1550 }
1551 }
1552
1553 state = THROTTLE_BYTES;
1554 return CONTINUE(throttle_bytes);
1555}
1556
1557CtPtr ProtocolV2::throttle_bytes() {
1558 ldout(cct, 20) << __func__ << dendl;
1559
1560 const size_t cur_msg_size = get_current_msg_size();
1561 if (cur_msg_size) {
1562 if (connection->policy.throttler_bytes) {
1563 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1564 << " bytes from policy throttler "
1565 << connection->policy.throttler_bytes->get_current() << "/"
1566 << connection->policy.throttler_bytes->get_max() << dendl;
1567 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
1568 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1569 << " bytes from policy throttler "
1570 << connection->policy.throttler_bytes->get_current()
1571 << "/" << connection->policy.throttler_bytes->get_max()
1572 << " failed, just wait." << dendl;
1573 // following thread pool deal with th full message queue isn't a
1574 // short time, so we can wait a ms.
1575 if (connection->register_time_events.empty()) {
1576 connection->register_time_events.insert(
1577 connection->center->create_time_event(
1578 1000, connection->wakeup_handler));
1579 }
1580 return nullptr;
1581 }
1582 }
1583 }
1584
1585 state = THROTTLE_DISPATCH_QUEUE;
1586 return CONTINUE(throttle_dispatch_queue);
1587}
1588
1589CtPtr ProtocolV2::throttle_dispatch_queue() {
1590 ldout(cct, 20) << __func__ << dendl;
1591
1592 const size_t cur_msg_size = get_current_msg_size();
1593 if (cur_msg_size) {
1594 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1595 cur_msg_size)) {
1596 ldout(cct, 10)
1597 << __func__ << " wants " << cur_msg_size
1598 << " bytes from dispatch throttle "
1599 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1600 << connection->dispatch_queue->dispatch_throttler.get_max()
1601 << " failed, just wait." << dendl;
1602 // following thread pool deal with th full message queue isn't a
1603 // short time, so we can wait a ms.
1604 if (connection->register_time_events.empty()) {
1605 connection->register_time_events.insert(
1606 connection->center->create_time_event(1000,
1607 connection->wakeup_handler));
1608 }
1609 return nullptr;
1610 }
1611 }
1612
1613 throttle_stamp = ceph_clock_now();
1614 state = THROTTLE_DONE;
1615
1616 return read_frame_segment();
1617}
1618
1619CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1620{
1621 ldout(cct, 20) << __func__
1622 << " payload.length()=" << payload.length() << dendl;
1623
1624 if (state != READY) {
1625 lderr(cct) << __func__ << " not in ready state!" << dendl;
1626 return _fault();
1627 }
1628
1629 auto keepalive_frame = KeepAliveFrame::Decode(payload);
1630
1631 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1632
1633 connection->write_lock.lock();
1634 append_keepalive_ack(keepalive_frame.timestamp());
1635 connection->write_lock.unlock();
1636
1637 ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1638 << keepalive_frame.timestamp() << dendl;
1639 connection->set_last_keepalive(ceph_clock_now());
1640
1641 if (is_connected()) {
1642 connection->center->dispatch_event_external(connection->write_handler);
1643 }
1644
1645 return CONTINUE(read_frame);
1646}
1647
1648CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1649{
1650 ldout(cct, 20) << __func__
1651 << " payload.length()=" << payload.length() << dendl;
1652
1653 if (state != READY) {
1654 lderr(cct) << __func__ << " not in ready state!" << dendl;
1655 return _fault();
1656 }
1657
1658 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1659 connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1660 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1661
1662 return CONTINUE(read_frame);
1663}
1664
1665CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1666{
1667 ldout(cct, 20) << __func__
1668 << " payload.length()=" << payload.length() << dendl;
1669
1670 if (state != READY) {
1671 lderr(cct) << __func__ << " not in ready state!" << dendl;
1672 return _fault();
1673 }
1674
1675 auto ack = AckFrame::Decode(payload);
1676 handle_message_ack(ack.seq());
1677 return CONTINUE(read_frame);
1678}
1679
1680/* Client Protocol Methods */
1681
1682CtPtr ProtocolV2::start_client_banner_exchange() {
1683 ldout(cct, 20) << __func__ << dendl;
1684
1685 INTERCEPT(1);
1686
1687 state = BANNER_CONNECTING;
1688
1689 global_seq = messenger->get_global_seq();
1690
1691 return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1692}
1693
1694CtPtr ProtocolV2::post_client_banner_exchange() {
1695 ldout(cct, 20) << __func__ << dendl;
1696
1697 state = AUTH_CONNECTING;
1698
1699 return send_auth_request();
1700}
1701
1702CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
1703 ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1704 << " auth_client " << messenger->auth_client << dendl;
1705 ceph_assert(messenger->auth_client);
1706
1707 bufferlist bl;
1708 vector<uint32_t> preferred_modes;
1709 auto am = auth_meta;
1710 connection->lock.unlock();
1711 int r = messenger->auth_client->get_auth_request(
1712 connection, am.get(),
1713 &am->auth_method, &preferred_modes, &bl);
1714 connection->lock.lock();
1715 if (state != AUTH_CONNECTING) {
1716 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1717 return _fault();
1718 }
1719 if (r < 0) {
1720 ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1721 << dendl;
1722 stop();
1723 connection->dispatch_queue->queue_reset(connection);
1724 return nullptr;
1725 }
1726
1727 INTERCEPT(9);
1728
1729 auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1730 bl);
1731 return WRITE(frame, "auth request", read_frame);
1732}
1733
1734CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1735 ldout(cct, 20) << __func__
1736 << " payload.length()=" << payload.length() << dendl;
1737
1738 if (state != AUTH_CONNECTING) {
1739 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1740 return _fault();
1741 }
1742
1743 auto bad_method = AuthBadMethodFrame::Decode(payload);
1744 ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1745 << " result " << cpp_strerror(bad_method.result())
1746 << ", allowed methods=" << bad_method.allowed_methods()
1747 << ", allowed modes=" << bad_method.allowed_modes()
1748 << dendl;
1749 ceph_assert(messenger->auth_client);
1750 auto am = auth_meta;
1751 connection->lock.unlock();
1752 int r = messenger->auth_client->handle_auth_bad_method(
1753 connection,
1754 am.get(),
1755 bad_method.method(), bad_method.result(),
1756 bad_method.allowed_methods(),
1757 bad_method.allowed_modes());
1758 connection->lock.lock();
1759 if (state != AUTH_CONNECTING || r < 0) {
1760 return _fault();
1761 }
1762 return send_auth_request(bad_method.allowed_methods());
1763}
1764
1765CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1766{
1767 ldout(cct, 20) << __func__
1768 << " payload.length()=" << payload.length() << dendl;
1769
1770 if (state != AUTH_CONNECTING) {
1771 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1772 return _fault();
1773 }
1774
1775 auto auth_more = AuthReplyMoreFrame::Decode(payload);
1776 ldout(cct, 5) << __func__
1777 << " auth reply more len=" << auth_more.auth_payload().length()
1778 << dendl;
1779 ceph_assert(messenger->auth_client);
1780 ceph::bufferlist reply;
1781 auto am = auth_meta;
1782 connection->lock.unlock();
1783 int r = messenger->auth_client->handle_auth_reply_more(
1784 connection, am.get(), auth_more.auth_payload(), &reply);
1785 connection->lock.lock();
1786 if (state != AUTH_CONNECTING) {
1787 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1788 return _fault();
1789 }
1790 if (r < 0) {
1791 lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1792 << r << dendl;
1793 return _fault();
1794 }
1795 auto more_reply = AuthRequestMoreFrame::Encode(reply);
1796 return WRITE(more_reply, "auth request more", read_frame);
1797}
1798
1799CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1800{
1801 ldout(cct, 20) << __func__
1802 << " payload.length()=" << payload.length() << dendl;
1803
1804 if (state != AUTH_CONNECTING) {
1805 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1806 return _fault();
1807 }
1808
1809 auto auth_done = AuthDoneFrame::Decode(payload);
1810
1811 ceph_assert(messenger->auth_client);
1812 auto am = auth_meta;
1813 connection->lock.unlock();
1814 int r = messenger->auth_client->handle_auth_done(
1815 connection,
1816 am.get(),
1817 auth_done.global_id(),
1818 auth_done.con_mode(),
1819 auth_done.auth_payload(),
1820 &am->session_key,
1821 &am->connection_secret);
1822 connection->lock.lock();
1823 if (state != AUTH_CONNECTING) {
1824 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1825 return _fault();
1826 }
1827 if (r < 0) {
1828 return _fault();
1829 }
1830 auth_meta->con_mode = auth_done.con_mode();
1831 session_stream_handlers = \
1832 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
1833
1834 state = AUTH_CONNECTING_SIGN;
1835
1836 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1837 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1838 auto sig_frame = AuthSignatureFrame::Encode(sig);
1839 pre_auth.enabled = false;
1840 pre_auth.rxbuf.clear();
1841 return WRITE(sig_frame, "auth signature", read_frame);
1842}
1843
1844CtPtr ProtocolV2::finish_client_auth() {
1845 if (!server_cookie) {
1846 ceph_assert(connect_seq == 0);
1847 state = SESSION_CONNECTING;
1848 return send_client_ident();
1849 } else { // reconnecting to previous session
1850 state = SESSION_RECONNECTING;
1851 ceph_assert(connect_seq > 0);
1852 return send_reconnect();
1853 }
1854}
1855
1856CtPtr ProtocolV2::send_client_ident() {
1857 ldout(cct, 20) << __func__ << dendl;
1858
1859 if (!connection->policy.lossy && !client_cookie) {
1860 client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1861 }
1862
1863 uint64_t flags = 0;
1864 if (connection->policy.lossy) {
1865 flags |= CEPH_MSG_CONNECT_LOSSY;
1866 }
1867
11fdf7f2
TL
1868 auto client_ident = ClientIdentFrame::Encode(
1869 messenger->get_myaddrs(),
1870 connection->target_addr,
1871 messenger->get_myname().num(),
1872 global_seq,
1873 connection->policy.features_supported,
1874 connection->policy.features_required | msgr2_required,
1875 flags,
1876 client_cookie);
1877
1878 ldout(cct, 5) << __func__ << " sending identification: "
1879 << "addrs=" << messenger->get_myaddrs()
1880 << " target=" << connection->target_addr
1881 << " gid=" << messenger->get_myname().num()
1882 << " global_seq=" << global_seq
1883 << " features_supported=" << std::hex
1884 << connection->policy.features_supported
1885 << " features_required="
1886 << (connection->policy.features_required | msgr2_required)
1887 << " flags=" << flags
1888 << " cookie=" << client_cookie << std::dec << dendl;
1889
1890 INTERCEPT(11);
1891
1892 return WRITE(client_ident, "client ident", read_frame);
1893}
1894
1895CtPtr ProtocolV2::send_reconnect() {
1896 ldout(cct, 20) << __func__ << dendl;
1897
1898 auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1899 client_cookie,
1900 server_cookie,
1901 global_seq,
1902 connect_seq,
1903 in_seq);
1904
1905 ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1906 << std::hex << client_cookie << " server_cookie="
1907 << server_cookie << std::dec
1908 << " gs=" << global_seq << " cs=" << connect_seq
1909 << " ms=" << in_seq << dendl;
1910
1911 INTERCEPT(13);
1912
1913 return WRITE(reconnect, "reconnect", read_frame);
1914}
1915
1916CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1917{
1918 ldout(cct, 20) << __func__
1919 << " payload.length()=" << payload.length() << dendl;
1920
1921 if (state != SESSION_CONNECTING) {
1922 lderr(cct) << __func__ << " not in session connect state!" << dendl;
1923 return _fault();
1924 }
1925
1926 auto ident_missing =
1927 IdentMissingFeaturesFrame::Decode(payload);
1928 lderr(cct) << __func__
1929 << " client does not support all server features: " << std::hex
1930 << ident_missing.features() << std::dec << dendl;
1931
1932 return _fault();
1933}
1934
1935CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
1936{
1937 ldout(cct, 20) << __func__
1938 << " payload.length()=" << payload.length() << dendl;
1939
1940 if (state != SESSION_RECONNECTING) {
1941 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1942 return _fault();
1943 }
1944
1945 auto reset = ResetFrame::Decode(payload);
1946
1947 ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
1948 << dendl;
1949 if (reset.full()) {
1950 reset_session();
1951 } else {
1952 server_cookie = 0;
1953 connect_seq = 0;
1954 in_seq = 0;
1955 }
1956
1957 state = SESSION_CONNECTING;
1958 return send_client_ident();
1959}
1960
1961CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
1962{
1963 ldout(cct, 20) << __func__
1964 << " payload.length()=" << payload.length() << dendl;
1965
1966 if (state != SESSION_RECONNECTING) {
1967 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1968 return _fault();
1969 }
1970
1971 auto retry = RetryFrame::Decode(payload);
1972 connect_seq = retry.connect_seq() + 1;
1973
1974 ldout(cct, 1) << __func__
1975 << " received session retry connect_seq=" << retry.connect_seq()
1976 << ", inc to cs=" << connect_seq << dendl;
1977
1978 return send_reconnect();
1979}
1980
1981CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
1982{
1983 ldout(cct, 20) << __func__
1984 << " payload.length()=" << payload.length() << dendl;
1985
1986 if (state != SESSION_RECONNECTING) {
1987 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1988 return _fault();
1989 }
1990
1991 auto retry = RetryGlobalFrame::Decode(payload);
1992 global_seq = messenger->get_global_seq(retry.global_seq());
1993
1994 ldout(cct, 1) << __func__ << " received session retry global global_seq="
1995 << retry.global_seq() << ", choose new gs=" << global_seq
1996 << dendl;
1997
1998 return send_reconnect();
1999}
2000
2001CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
2002 ldout(cct, 20) << __func__
2003 << " received WAIT (connection race)"
2004 << " payload.length()=" << payload.length()
2005 << dendl;
2006
2007 if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
2008 lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
2009 return _fault();
2010 }
2011
2012 state = WAIT;
2013 WaitFrame::Decode(payload);
2014 return _fault();
2015}
2016
2017CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2018{
2019 ldout(cct, 20) << __func__
2020 << " payload.length()=" << payload.length() << dendl;
2021
2022 if (state != SESSION_RECONNECTING) {
2023 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2024 return _fault();
2025 }
2026
2027 auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2028 ldout(cct, 5) << __func__
2029 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2030 << dendl;
2031
2032 out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2033
2034 backoff = utime_t();
2035 ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2036 << ", lossy = " << connection->policy.lossy << ", features "
2037 << connection->get_features() << dendl;
2038
2039 if (connection->delay_state) {
2040 ceph_assert(connection->delay_state->ready());
2041 }
2042
2043 connection->dispatch_queue->queue_connect(connection);
2044 messenger->ms_deliver_handle_fast_connect(connection);
2045
2046 return ready();
2047}
2048
2049CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2050{
2051 ldout(cct, 20) << __func__
2052 << " payload.length()=" << payload.length() << dendl;
2053
2054 if (state != SESSION_CONNECTING) {
2055 lderr(cct) << __func__ << " not in session connect state!" << dendl;
2056 return _fault();
2057 }
2058
2059 auto server_ident = ServerIdentFrame::Decode(payload);
2060 ldout(cct, 5) << __func__ << " received server identification:"
2061 << " addrs=" << server_ident.addrs()
2062 << " gid=" << server_ident.gid()
2063 << " global_seq=" << server_ident.global_seq()
2064 << " features_supported=" << std::hex
2065 << server_ident.supported_features()
2066 << " features_required=" << server_ident.required_features()
2067 << " flags=" << server_ident.flags() << " cookie=" << std::dec
2068 << server_ident.cookie() << dendl;
2069
2070 // is this who we intended to talk to?
2071 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2072 // of mon_host or something.
2073 if (!server_ident.addrs().contains(connection->target_addr)) {
2074 ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2075 << ", does not include " << connection->target_addr << dendl;
2076 return _fault();
2077 }
2078
2079 server_cookie = server_ident.cookie();
2080
2081 connection->set_peer_addrs(server_ident.addrs());
2082 peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2083 connection->set_features(server_ident.supported_features() &
2084 connection->policy.features_supported);
2085 peer_global_seq = server_ident.global_seq();
2086
2087 connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2088
2089 backoff = utime_t();
2090 ldout(cct, 10) << __func__ << " connect success " << connect_seq
2091 << ", lossy = " << connection->policy.lossy << ", features "
2092 << connection->get_features() << dendl;
2093
2094 if (connection->delay_state) {
2095 ceph_assert(connection->delay_state->ready());
2096 }
2097
2098 connection->dispatch_queue->queue_connect(connection);
2099 messenger->ms_deliver_handle_fast_connect(connection);
2100
2101 return ready();
2102}
2103
2104/* Server Protocol Methods */
2105
2106CtPtr ProtocolV2::start_server_banner_exchange() {
2107 ldout(cct, 20) << __func__ << dendl;
2108
2109 INTERCEPT(2);
2110
2111 state = BANNER_ACCEPTING;
2112
2113 return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2114}
2115
2116CtPtr ProtocolV2::post_server_banner_exchange() {
2117 ldout(cct, 20) << __func__ << dendl;
2118
2119 state = AUTH_ACCEPTING;
2120
2121 return CONTINUE(read_frame);
2122}
2123
2124CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2125 ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2126 << dendl;
2127
2128 if (state != AUTH_ACCEPTING) {
2129 lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2130 return _fault();
2131 }
2132
2133 auto request = AuthRequestFrame::Decode(payload);
2134 ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2135 << ", preferred_modes=" << request.preferred_modes()
2136 << ", payload_len=" << request.auth_payload().length() << ")"
2137 << dendl;
2138 auth_meta->auth_method = request.method();
2139 auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2140 connection->get_peer_type(), auth_meta->auth_method,
2141 request.preferred_modes());
2142 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2143 return _auth_bad_method(-EOPNOTSUPP);
2144 }
2145 return _handle_auth_request(request.auth_payload(), false);
2146}
2147
2148CtPtr ProtocolV2::_auth_bad_method(int r)
2149{
2150 ceph_assert(r < 0);
2151 std::vector<uint32_t> allowed_methods;
2152 std::vector<uint32_t> allowed_modes;
2153 messenger->auth_server->get_supported_auth_methods(
2154 connection->get_peer_type(), &allowed_methods, &allowed_modes);
2155 ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2156 << " r " << cpp_strerror(r)
2157 << ", allowed_methods " << allowed_methods
2158 << ", allowed_modes " << allowed_modes
2159 << dendl;
2160 auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2161 allowed_methods, allowed_modes);
2162 return WRITE(bad_method, "bad auth method", read_frame);
2163}
2164
2165CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
2166{
2167 if (!messenger->auth_server) {
2168 return _fault();
2169 }
2170 bufferlist reply;
2171 auto am = auth_meta;
2172 connection->lock.unlock();
2173 int r = messenger->auth_server->handle_auth_request(
2174 connection, am.get(),
2175 more, am->auth_method, auth_payload,
2176 &reply);
2177 connection->lock.lock();
2178 if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2179 ldout(cct, 1) << __func__
2180 << " state changed while accept, it must be mark_down"
2181 << dendl;
2182 ceph_assert(state == CLOSED);
2183 return _fault();
2184 }
2185 if (r == 1) {
2186 INTERCEPT(10);
2187 state = AUTH_ACCEPTING_SIGN;
2188
2189 auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2190 auth_meta->con_mode,
2191 reply);
2192 return WRITE(auth_done, "auth done", finish_auth);
2193 } else if (r == 0) {
2194 state = AUTH_ACCEPTING_MORE;
2195
2196 auto more = AuthReplyMoreFrame::Encode(reply);
2197 return WRITE(more, "auth reply more", read_frame);
2198 } else if (r == -EBUSY) {
2199 // kick the client and maybe they'll come back later
2200 return _fault();
2201 } else {
2202 return _auth_bad_method(r);
2203 }
2204}
2205
2206CtPtr ProtocolV2::finish_auth()
2207{
2208 ceph_assert(auth_meta);
2209 // TODO: having a possibility to check whether we're server or client could
2210 // allow reusing finish_auth().
2211 session_stream_handlers = \
2212 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
2213
2214 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2215 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2216 auto sig_frame = AuthSignatureFrame::Encode(sig);
2217 pre_auth.enabled = false;
2218 pre_auth.rxbuf.clear();
2219 return WRITE(sig_frame, "auth signature", read_frame);
2220}
2221
2222CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2223{
2224 ldout(cct, 20) << __func__
2225 << " payload.length()=" << payload.length() << dendl;
2226
2227 if (state != AUTH_ACCEPTING_MORE) {
2228 lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2229 return _fault();
2230 }
2231
2232 auto auth_more = AuthRequestMoreFrame::Decode(payload);
2233 return _handle_auth_request(auth_more.auth_payload(), true);
2234}
2235
2236CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2237{
2238 ldout(cct, 20) << __func__
2239 << " payload.length()=" << payload.length() << dendl;
2240
2241 if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2242 lderr(cct) << __func__
2243 << " pre-auth verification signature seen in wrong state!"
2244 << dendl;
2245 return _fault();
2246 }
2247
2248 auto sig_frame = AuthSignatureFrame::Decode(payload);
2249
2250 const auto actual_tx_sig = auth_meta->session_key.empty() ?
2251 sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2252 if (sig_frame.signature() != actual_tx_sig) {
2253 ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2254 << " actual_tx_sig=" << actual_tx_sig
2255 << " sig_frame.signature()=" << sig_frame.signature()
2256 << dendl;
2257 return _fault();
2258 } else {
2259 ldout(cct, 20) << __func__ << " pre-auth signature success"
2260 << " sig_frame.signature()=" << sig_frame.signature()
2261 << dendl;
2262 pre_auth.txbuf.clear();
2263 }
2264
2265 if (state == AUTH_ACCEPTING_SIGN) {
2266 // server had sent AuthDone and client responded with correct pre-auth
2267 // signature. we can start accepting new sessions/reconnects.
2268 state = SESSION_ACCEPTING;
2269 return CONTINUE(read_frame);
2270 } else if (state == AUTH_CONNECTING_SIGN) {
2271 // this happened at client side
2272 return finish_client_auth();
2273 } else {
2274 ceph_assert_always("state corruption" == nullptr);
2275 }
2276}
2277
2278CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2279{
2280 ldout(cct, 20) << __func__
2281 << " payload.length()=" << payload.length() << dendl;
2282
2283 if (state != SESSION_ACCEPTING) {
2284 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2285 return _fault();
2286 }
2287
2288 auto client_ident = ClientIdentFrame::Decode(payload);
2289
2290 ldout(cct, 5) << __func__ << " received client identification:"
2291 << " addrs=" << client_ident.addrs()
2292 << " target=" << client_ident.target_addr()
2293 << " gid=" << client_ident.gid()
2294 << " global_seq=" << client_ident.global_seq()
2295 << " features_supported=" << std::hex
2296 << client_ident.supported_features()
2297 << " features_required=" << client_ident.required_features()
2298 << " flags=" << client_ident.flags()
2299 << " cookie=" << client_ident.cookie() << std::dec << dendl;
2300
2301 if (client_ident.addrs().empty() ||
2302 client_ident.addrs().front() == entity_addr_t()) {
2303 ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2304 return _fault(); // a v2 peer should never do this
2305 }
2306 if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2307 ldout(cct,5) << __func__ << " peer is trying to reach "
2308 << client_ident.target_addr()
2309 << " which is not us (" << messenger->get_myaddrs() << ")"
2310 << dendl;
2311 return _fault();
2312 }
2313
2314 connection->set_peer_addrs(client_ident.addrs());
2315 connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2316
2317 peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2318 connection->set_peer_id(client_ident.gid());
2319
2320 client_cookie = client_ident.cookie();
2321
2322 uint64_t feat_missing =
2323 (connection->policy.features_required | msgr2_required) &
2324 ~(uint64_t)client_ident.supported_features();
2325 if (feat_missing) {
2326 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2327 << feat_missing << std::dec << dendl;
2328 auto ident_missing_features =
2329 IdentMissingFeaturesFrame::Encode(feat_missing);
2330
2331 return WRITE(ident_missing_features, "ident missing features", read_frame);
2332 }
2333
2334 connection_features =
2335 client_ident.supported_features() & connection->policy.features_supported;
2336
2337 peer_global_seq = client_ident.global_seq();
2338
2339 // Looks good so far, let's check if there is already an existing connection
2340 // to this peer.
2341
2342 connection->lock.unlock();
2343 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2344
2345 if (existing &&
2346 existing->protocol->proto_type != 2) {
2347 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2348 << existing->protocol.get() << " version is "
2349 << existing->protocol->proto_type << ", marking down" << dendl;
2350 existing->mark_down();
2351 existing = nullptr;
2352 }
2353
2354 connection->inject_delay();
2355
2356 connection->lock.lock();
2357 if (state != SESSION_ACCEPTING) {
2358 ldout(cct, 1) << __func__
2359 << " state changed while accept, it must be mark_down"
2360 << dendl;
2361 ceph_assert(state == CLOSED);
2362 return _fault();
2363 }
2364
2365 if (existing) {
2366 return handle_existing_connection(existing);
2367 }
2368
2369 // if everything is OK reply with server identification
2370 return send_server_ident();
2371}
2372
2373CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2374{
2375 ldout(cct, 20) << __func__
2376 << " payload.length()=" << payload.length() << dendl;
2377
2378 if (state != SESSION_ACCEPTING) {
2379 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2380 return _fault();
2381 }
2382
2383 auto reconnect = ReconnectFrame::Decode(payload);
2384
2385 ldout(cct, 5) << __func__
2386 << " received reconnect:"
2387 << " client_cookie=" << std::hex << reconnect.client_cookie()
2388 << " server_cookie=" << reconnect.server_cookie() << std::dec
2389 << " gs=" << reconnect.global_seq()
2390 << " cs=" << reconnect.connect_seq()
2391 << " ms=" << reconnect.msg_seq()
2392 << dendl;
2393
2394 // Should we check if one of the ident.addrs match connection->target_addr
2395 // as we do in ProtocolV1?
2396 connection->set_peer_addrs(reconnect.addrs());
2397 connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2398 peer_global_seq = reconnect.global_seq();
2399
2400 connection->lock.unlock();
2401 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2402
2403 if (existing &&
2404 existing->protocol->proto_type != 2) {
2405 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2406 << existing->protocol.get() << " version is "
2407 << existing->protocol->proto_type << ", marking down" << dendl;
2408 existing->mark_down();
2409 existing = nullptr;
2410 }
2411
2412 connection->inject_delay();
2413
2414 connection->lock.lock();
2415 if (state != SESSION_ACCEPTING) {
2416 ldout(cct, 1) << __func__
2417 << " state changed while accept, it must be mark_down"
2418 << dendl;
2419 ceph_assert(state == CLOSED);
2420 return _fault();
2421 }
2422
2423 if (!existing) {
2424 // there is no existing connection therefore cannot reconnect to previous
2425 // session
2426 ldout(cct, 0) << __func__
2427 << " no existing connection exists, reseting client" << dendl;
2428 auto reset = ResetFrame::Encode(true);
2429 return WRITE(reset, "session reset", read_frame);
2430 }
2431
2432 std::lock_guard<std::mutex> l(existing->lock);
2433
2434 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2435 if (!exproto) {
2436 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2437 ceph_assert(false);
2438 }
2439
2440 if (exproto->state == CLOSED) {
2441 ldout(cct, 5) << __func__ << " existing " << existing
2442 << " already closed. Reseting client" << dendl;
2443 auto reset = ResetFrame::Encode(true);
2444 return WRITE(reset, "session reset", read_frame);
2445 }
2446
2447 if (exproto->replacing) {
2448 ldout(cct, 1) << __func__
2449 << " existing racing replace happened while replacing."
2450 << " existing=" << existing << dendl;
2451 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2452 return WRITE(retry, "session retry", read_frame);
2453 }
2454
2455 if (exproto->client_cookie != reconnect.client_cookie()) {
2456 ldout(cct, 1) << __func__ << " existing=" << existing
2457 << " client cookie mismatch, I must have reseted:"
2458 << " cc=" << std::hex << exproto->client_cookie
2459 << " rcc=" << reconnect.client_cookie()
2460 << ", reseting client." << std::dec
2461 << dendl;
2462 auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2463 return WRITE(reset, "session reset", read_frame);
2464 } else if (exproto->server_cookie == 0) {
2465 // this happens when:
2466 // - a connects to b
2467 // - a sends client_ident
2468 // - b gets client_ident, sends server_ident and sets cookie X
2469 // - connection fault
2470 // - b reconnects to a with cookie X, connect_seq=1
2471 // - a has cookie==0
2472 ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2473 << " server_ident. Asking peer to resume session"
2474 << " establishment" << dendl;
2475 auto reset = ResetFrame::Encode(false);
2476 return WRITE(reset, "session reset", read_frame);
2477 }
2478
2479 if (exproto->peer_global_seq > reconnect.global_seq()) {
2480 ldout(cct, 5) << __func__
2481 << " stale global_seq: sgs=" << exproto->peer_global_seq
2482 << " cgs=" << reconnect.global_seq()
2483 << ", ask client to retry global" << dendl;
2484 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2485
2486 INTERCEPT(18);
2487
2488 return WRITE(retry, "session retry", read_frame);
2489 }
2490
2491 if (exproto->connect_seq > reconnect.connect_seq()) {
2492 ldout(cct, 5) << __func__
2493 << " stale connect_seq scs=" << exproto->connect_seq
2494 << " ccs=" << reconnect.connect_seq()
2495 << " , ask client to retry" << dendl;
2496 auto retry = RetryFrame::Encode(exproto->connect_seq);
2497 return WRITE(retry, "session retry", read_frame);
2498 }
2499
2500 if (exproto->connect_seq == reconnect.connect_seq()) {
2501 // reconnect race: both peers are sending reconnect messages
2502 if (existing->peer_addrs->msgr2_addr() >
2503 messenger->get_myaddrs().msgr2_addr() &&
2504 !existing->policy.server) {
2505 // the existing connection wins
2506 ldout(cct, 1)
2507 << __func__
2508 << " reconnect race detected, this connection loses to existing="
2509 << existing << dendl;
2510
2511 auto wait = WaitFrame::Encode();
2512 return WRITE(wait, "wait", read_frame);
2513 } else {
2514 // this connection wins
2515 ldout(cct, 1) << __func__
2516 << " reconnect race detected, replacing existing="
2517 << existing << " socket by this connection's socket"
2518 << dendl;
2519 }
2520 }
2521
2522 ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2523
2524 reconnecting = true;
2525
2526 // everything looks good
2527 exproto->connect_seq = reconnect.connect_seq();
2528 exproto->message_seq = reconnect.msg_seq();
2529
2530 return reuse_connection(existing, exproto);
2531}
2532
2533CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
2534 ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2535
2536 std::lock_guard<std::mutex> l(existing->lock);
2537
2538 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2539 if (!exproto) {
2540 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2541 ceph_assert(false);
2542 }
2543
2544 if (exproto->state == CLOSED) {
2545 ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2546 << dendl;
2547 return send_server_ident();
2548 }
2549
2550 if (exproto->replacing) {
2551 ldout(cct, 1) << __func__
2552 << " existing racing replace happened while replacing."
2553 << " existing=" << existing << dendl;
2554 auto wait = WaitFrame::Encode();
2555 return WRITE(wait, "wait", read_frame);
2556 }
2557
2558 if (exproto->peer_global_seq > peer_global_seq) {
2559 ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2560 << peer_global_seq
2561 << " existing->peer_global_seq=" << exproto->peer_global_seq
2562 << ", stopping this connection." << dendl;
2563 stop();
2564 connection->dispatch_queue->queue_reset(connection);
2565 return nullptr;
2566 }
2567
2568 if (existing->policy.lossy) {
2569 // existing connection can be thrown out in favor of this one
2570 ldout(cct, 1)
2571 << __func__ << " existing=" << existing
2572 << " is a lossy channel. Stopping existing in favor of this connection"
2573 << dendl;
2574 existing->protocol->stop();
2575 existing->dispatch_queue->queue_reset(existing.get());
2576 return send_server_ident();
2577 }
2578
2579 if (exproto->server_cookie && exproto->client_cookie &&
2580 exproto->client_cookie != client_cookie) {
2581 // Found previous session
2582 // peer has reseted and we're going to reuse the existing connection
2583 // by replacing the communication socket
2584 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2585 << ", peer must have reseted." << dendl;
2586 if (connection->policy.resetcheck) {
2587 exproto->reset_session();
2588 }
2589 return reuse_connection(existing, exproto);
2590 }
2591
2592 if (exproto->client_cookie == client_cookie) {
2593 // session establishment interrupted between client_ident and server_ident,
2594 // continuing...
2595 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2596 << ", continuing session establishment." << dendl;
2597 return reuse_connection(existing, exproto);
2598 }
2599
2600 if (exproto->state == READY || exproto->state == STANDBY) {
2601 ldout(cct, 1) << __func__ << " existing=" << existing
2602 << " is READY/STANDBY, lets reuse it" << dendl;
2603 return reuse_connection(existing, exproto);
2604 }
2605
2606 // Looks like a connection race: server and client are both connecting to
2607 // each other at the same time.
2608 if (connection->peer_addrs->msgr2_addr() <
2609 messenger->get_myaddrs().msgr2_addr() ||
2610 existing->policy.server) {
2611 // this connection wins
2612 ldout(cct, 1) << __func__
2613 << " connection race detected, replacing existing="
2614 << existing << " socket by this connection's socket" << dendl;
2615 return reuse_connection(existing, exproto);
2616 } else {
2617 // the existing connection wins
2618 ldout(cct, 1)
2619 << __func__
2620 << " connection race detected, this connection loses to existing="
2621 << existing << dendl;
2622 ceph_assert(connection->peer_addrs->msgr2_addr() >
2623 messenger->get_myaddrs().msgr2_addr());
2624
2625 // make sure we follow through with opening the existing
2626 // connection (if it isn't yet open) since we know the peer
2627 // has something to send to us.
2628 existing->send_keepalive();
2629 auto wait = WaitFrame::Encode();
2630 return WRITE(wait, "wait", read_frame);
2631 }
2632}
2633
2634CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
2635 ProtocolV2 *exproto) {
2636 ldout(cct, 20) << __func__ << " existing=" << existing
2637 << " reconnect=" << reconnecting << dendl;
2638
2639 connection->inject_delay();
2640
2641 std::lock_guard<std::mutex> l(existing->write_lock);
2642
2643 connection->center->delete_file_event(connection->cs.fd(),
2644 EVENT_READABLE | EVENT_WRITABLE);
2645
2646 if (existing->delay_state) {
2647 existing->delay_state->flush();
2648 ceph_assert(!connection->delay_state);
2649 }
2650 exproto->reset_recv_state();
2651 exproto->pre_auth.enabled = false;
2652
2653 if (!reconnecting) {
2654 exproto->client_cookie = client_cookie;
2655 exproto->peer_name = peer_name;
2656 exproto->connection_features = connection_features;
2657 existing->set_features(connection_features);
2658 }
2659 exproto->peer_global_seq = peer_global_seq;
2660
2661 auto temp_cs = std::move(connection->cs);
2662 EventCenter *new_center = connection->center;
2663 Worker *new_worker = connection->worker;
2664
2665 ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2666 << dendl;
494da23a
TL
2667
2668 std::swap(exproto->session_stream_handlers, session_stream_handlers);
2669 exproto->auth_meta = auth_meta;
2670
11fdf7f2
TL
2671 // avoid _stop shutdown replacing socket
2672 // queue a reset on the new connection, which we're dumping for the old
2673 stop();
2674
2675 connection->dispatch_queue->queue_reset(connection);
2676
2677 exproto->can_write = false;
494da23a 2678 exproto->write_in_progress = false;
11fdf7f2
TL
2679 exproto->reconnecting = reconnecting;
2680 exproto->replacing = true;
11fdf7f2
TL
2681 existing->state_offset = 0;
2682 // avoid previous thread modify event
2683 exproto->state = NONE;
2684 existing->state = AsyncConnection::STATE_NONE;
2685 // Discard existing prefetch buffer in `recv_buf`
2686 existing->recv_start = existing->recv_end = 0;
2687 // there shouldn't exist any buffer
2688 ceph_assert(connection->recv_start == connection->recv_end);
2689
2690 auto deactivate_existing = std::bind(
2691 [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
2692 // we need to delete time event in original thread
2693 {
2694 std::lock_guard<std::mutex> l(existing->lock);
2695 existing->write_lock.lock();
2696 exproto->requeue_sent();
2697 existing->outcoming_bl.clear();
2698 existing->open_write = false;
2699 existing->write_lock.unlock();
2700 if (exproto->state == NONE) {
2701 existing->shutdown_socket();
2702 existing->cs = std::move(cs);
2703 existing->worker->references--;
2704 new_worker->references++;
2705 existing->logger = new_worker->get_perf_counter();
2706 existing->worker = new_worker;
2707 existing->center = new_center;
2708 if (existing->delay_state)
2709 existing->delay_state->set_center(new_center);
2710 } else if (exproto->state == CLOSED) {
2711 auto back_to_close = std::bind(
2712 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2713 new_center->submit_to(new_center->get_id(),
2714 std::move(back_to_close), true);
2715 return;
2716 } else {
2717 ceph_abort();
2718 }
2719 }
2720
2721 // Before changing existing->center, it may already exists some
2722 // events in existing->center's queue. Then if we mark down
2723 // `existing`, it will execute in another thread and clean up
2724 // connection. Previous event will result in segment fault
2725 auto transfer_existing = [existing, exproto]() mutable {
2726 std::lock_guard<std::mutex> l(existing->lock);
2727 if (exproto->state == CLOSED) return;
2728 ceph_assert(exproto->state == NONE);
2729
2730 exproto->state = SESSION_ACCEPTING;
81eedcae
TL
2731 // we have called shutdown_socket above
2732 ceph_assert(existing->last_tick_id == 0);
2733 // restart timer since we are going to re-build connection
2734 existing->last_connect_started = ceph::coarse_mono_clock::now();
2735 existing->last_tick_id = existing->center->create_time_event(
2736 existing->connect_timeout_us, existing->tick_handler);
11fdf7f2
TL
2737 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2738 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2739 existing->read_handler);
2740 if (!exproto->reconnecting) {
2741 exproto->run_continuation(exproto->send_server_ident());
2742 } else {
2743 exproto->run_continuation(exproto->send_reconnect_ok());
2744 }
2745 };
2746 if (existing->center->in_thread())
2747 transfer_existing();
2748 else
2749 existing->center->submit_to(existing->center->get_id(),
2750 std::move(transfer_existing), true);
2751 },
2752 std::move(temp_cs));
2753
2754 existing->center->submit_to(existing->center->get_id(),
2755 std::move(deactivate_existing), true);
2756 return nullptr;
2757}
2758
2759CtPtr ProtocolV2::send_server_ident() {
2760 ldout(cct, 20) << __func__ << dendl;
2761
2762 // this is required for the case when this connection is being replaced
2763 out_seq = discard_requeued_up_to(out_seq, 0);
2764 in_seq = 0;
2765
2766 if (!connection->policy.lossy) {
2767 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2768 }
2769
2770 uint64_t flags = 0;
2771 if (connection->policy.lossy) {
2772 flags = flags | CEPH_MSG_CONNECT_LOSSY;
2773 }
2774
2775 uint64_t gs = messenger->get_global_seq();
2776 auto server_ident = ServerIdentFrame::Encode(
2777 messenger->get_myaddrs(),
2778 messenger->get_myname().num(),
2779 gs,
2780 connection->policy.features_supported,
2781 connection->policy.features_required | msgr2_required,
2782 flags,
2783 server_cookie);
2784
2785 ldout(cct, 5) << __func__ << " sending identification:"
2786 << " addrs=" << messenger->get_myaddrs()
2787 << " gid=" << messenger->get_myname().num()
2788 << " global_seq=" << gs << " features_supported=" << std::hex
2789 << connection->policy.features_supported
2790 << " features_required="
2791 << (connection->policy.features_required | msgr2_required)
2792 << " flags=" << flags << " cookie=" << std::dec << server_cookie
2793 << dendl;
2794
2795 connection->lock.unlock();
2796 // Because "replacing" will prevent other connections preempt this addr,
2797 // it's safe that here we don't acquire Connection's lock
2798 ssize_t r = messenger->accept_conn(connection);
2799
2800 connection->inject_delay();
2801
2802 connection->lock.lock();
2803
2804 if (r < 0) {
2805 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2806 << connection->peer_addrs->msgr2_addr()
2807 << " just fail later one(this)" << dendl;
2808 connection->inject_delay();
2809 return _fault();
2810 }
2811 if (state != SESSION_ACCEPTING) {
2812 ldout(cct, 1) << __func__
2813 << " state changed while accept_conn, it must be mark_down"
2814 << dendl;
2815 ceph_assert(state == CLOSED || state == NONE);
2816 messenger->unregister_conn(connection);
2817 connection->inject_delay();
2818 return _fault();
2819 }
2820
2821 connection->set_features(connection_features);
2822
2823 // notify
2824 connection->dispatch_queue->queue_accept(connection);
2825 messenger->ms_deliver_handle_fast_accept(connection);
2826
2827 INTERCEPT(12);
2828
2829 return WRITE(server_ident, "server ident", server_ready);
2830}
2831
2832CtPtr ProtocolV2::server_ready() {
2833 ldout(cct, 20) << __func__ << dendl;
2834
2835 if (connection->delay_state) {
2836 ceph_assert(connection->delay_state->ready());
2837 }
2838
2839 return ready();
2840}
2841
2842CtPtr ProtocolV2::send_reconnect_ok() {
2843 ldout(cct, 20) << __func__ << dendl;
2844
2845 out_seq = discard_requeued_up_to(out_seq, message_seq);
2846
2847 uint64_t ms = in_seq;
2848 auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2849
2850 ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2851
2852 connection->lock.unlock();
2853 // Because "replacing" will prevent other connections preempt this addr,
2854 // it's safe that here we don't acquire Connection's lock
2855 ssize_t r = messenger->accept_conn(connection);
2856
2857 connection->inject_delay();
2858
2859 connection->lock.lock();
2860
2861 if (r < 0) {
2862 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2863 << connection->peer_addrs->msgr2_addr()
2864 << " just fail later one(this)" << dendl;
2865 connection->inject_delay();
2866 return _fault();
2867 }
2868 if (state != SESSION_ACCEPTING) {
2869 ldout(cct, 1) << __func__
2870 << " state changed while accept_conn, it must be mark_down"
2871 << dendl;
2872 ceph_assert(state == CLOSED || state == NONE);
2873 messenger->unregister_conn(connection);
2874 connection->inject_delay();
2875 return _fault();
2876 }
2877
2878 // notify
2879 connection->dispatch_queue->queue_accept(connection);
2880 messenger->ms_deliver_handle_fast_accept(connection);
2881
2882 INTERCEPT(14);
2883
2884 return WRITE(reconnect_ok, "reconnect ok", server_ready);
2885}