]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/ProtocolV1.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / ProtocolV1.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV1.h"
5
6 #include "common/errno.h"
7
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
12
13 #define dout_subsys ceph_subsys_ms
14 #undef dout_prefix
15 #define dout_prefix _conn_prefix(_dout)
16 ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
17 return *_dout << "--1- " << messenger->get_myaddrs() << " >> "
18 << *connection->peer_addrs
19 << " conn("
20 << connection << " " << this
21 << " :" << connection->port << " s=" << get_state_name(state)
22 << " pgs=" << peer_global_seq << " cs=" << connect_seq
23 << " l=" << connection->policy.lossy << ").";
24 }
25
26 #define WRITE(B, C) write(CONTINUATION(C), B)
27
28 #define READ(L, C) read(CONTINUATION(C), L)
29
30 #define READB(L, B, C) read(CONTINUATION(C), L, B)
31
32 // Constant to limit starting sequence number to 2^31. Nothing special about
33 // it, just a big number. PLR
34 #define SEQ_MASK 0x7fffffff
35
36 const int ASYNC_COALESCE_THRESHOLD = 256;
37
38 using namespace std;
39
40 static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
41 // create a buffer to read into that matches the data alignment
42 unsigned alloc_len = 0;
43 unsigned left = len;
44 unsigned head = 0;
45 if (off & ~CEPH_PAGE_MASK) {
46 // head
47 alloc_len += CEPH_PAGE_SIZE;
48 head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
49 left -= head;
50 }
51 alloc_len += left;
52 bufferptr ptr(buffer::create_small_page_aligned(alloc_len));
53 if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
54 data.push_back(std::move(ptr));
55 }
56
57 /**
58 * Protocol V1
59 **/
60
61 ProtocolV1::ProtocolV1(AsyncConnection *connection)
62 : Protocol(1, connection),
63 temp_buffer(nullptr),
64 can_write(WriteStatus::NOWRITE),
65 keepalive(false),
66 connect_seq(0),
67 peer_global_seq(0),
68 msg_left(0),
69 cur_msg_size(0),
70 replacing(false),
71 is_reset_from_peer(false),
72 once_ready(false),
73 state(NONE),
74 global_seq(0),
75 authorizer(nullptr),
76 wait_for_seq(false) {
77 temp_buffer = new char[4096];
78 }
79
80 ProtocolV1::~ProtocolV1() {
81 ceph_assert(out_q.empty());
82 ceph_assert(sent.empty());
83
84 delete[] temp_buffer;
85
86 if (authorizer) {
87 delete authorizer;
88 }
89 }
90
91 void ProtocolV1::connect() {
92 this->state = START_CONNECT;
93
94 // reset connect state variables
95 if (authorizer) {
96 delete authorizer;
97 authorizer = nullptr;
98 }
99 authorizer_buf.clear();
100 memset(&connect_msg, 0, sizeof(connect_msg));
101 memset(&connect_reply, 0, sizeof(connect_reply));
102
103 global_seq = messenger->get_global_seq();
104 }
105
106 void ProtocolV1::accept() { this->state = START_ACCEPT; }
107
108 bool ProtocolV1::is_connected() {
109 return can_write.load() == WriteStatus::CANWRITE;
110 }
111
112 void ProtocolV1::stop() {
113 ldout(cct, 20) << __func__ << dendl;
114 if (state == CLOSED) {
115 return;
116 }
117
118 if (connection->delay_state) connection->delay_state->flush();
119
120 ldout(cct, 2) << __func__ << dendl;
121 std::lock_guard<std::mutex> l(connection->write_lock);
122
123 reset_recv_state();
124 discard_out_queue();
125
126 connection->_stop();
127
128 can_write = WriteStatus::CLOSED;
129 state = CLOSED;
130 }
131
132 void ProtocolV1::fault() {
133 ldout(cct, 20) << __func__ << dendl;
134
135 if (state == CLOSED || state == NONE) {
136 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
137 return;
138 }
139
140 if (connection->policy.lossy && state != START_CONNECT &&
141 state != CONNECTING) {
142 ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
143 stop();
144 connection->dispatch_queue->queue_reset(connection);
145 return;
146 }
147
148 connection->write_lock.lock();
149 can_write = WriteStatus::NOWRITE;
150 is_reset_from_peer = false;
151
152 // requeue sent items
153 requeue_sent();
154
155 if (!once_ready && out_q.empty() && state >= START_ACCEPT &&
156 state <= ACCEPTING_WAIT_CONNECT_MSG_AUTH && !replacing) {
157 ldout(cct, 10) << __func__ << " with nothing to send and in the half "
158 << " accept state just closed" << dendl;
159 connection->write_lock.unlock();
160 stop();
161 connection->dispatch_queue->queue_reset(connection);
162 return;
163 }
164 replacing = false;
165
166 connection->fault();
167
168 reset_recv_state();
169
170 if (connection->policy.standby && out_q.empty() && !keepalive &&
171 state != WAIT) {
172 ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
173 << dendl;
174 state = STANDBY;
175 connection->write_lock.unlock();
176 return;
177 }
178
179 connection->write_lock.unlock();
180
181 if ((state >= START_CONNECT && state <= CONNECTING_SEND_CONNECT_MSG) ||
182 state == WAIT) {
183 // backoff!
184 if (state == WAIT) {
185 backoff.set_from_double(cct->_conf->ms_max_backoff);
186 } else if (backoff == utime_t()) {
187 backoff.set_from_double(cct->_conf->ms_initial_backoff);
188 } else {
189 backoff += backoff;
190 if (backoff > cct->_conf->ms_max_backoff)
191 backoff.set_from_double(cct->_conf->ms_max_backoff);
192 }
193
194 global_seq = messenger->get_global_seq();
195 state = START_CONNECT;
196 connection->state = AsyncConnection::STATE_CONNECTING;
197 ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
198 // woke up again;
199 connection->register_time_events.insert(
200 connection->center->create_time_event(backoff.to_nsec() / 1000,
201 connection->wakeup_handler));
202 } else {
203 // policy maybe empty when state is in accept
204 if (connection->policy.server) {
205 ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
206 state = STANDBY;
207 } else {
208 ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
209 connect_seq++;
210 global_seq = messenger->get_global_seq();
211 state = START_CONNECT;
212 connection->state = AsyncConnection::STATE_CONNECTING;
213 }
214 backoff = utime_t();
215 connection->center->dispatch_event_external(connection->read_handler);
216 }
217 }
218
219 void ProtocolV1::send_message(Message *m) {
220 bufferlist bl;
221 uint64_t f = connection->get_features();
222
223 // TODO: Currently not all messages supports reencode like MOSDMap, so here
224 // only let fast dispatch support messages prepare message
225 bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
226 if (can_fast_prepare) {
227 prepare_send_message(f, m, bl);
228 }
229
230 std::lock_guard<std::mutex> l(connection->write_lock);
231 // "features" changes will change the payload encoding
232 if (can_fast_prepare &&
233 (can_write == WriteStatus::NOWRITE || connection->get_features() != f)) {
234 // ensure the correctness of message encoding
235 bl.clear();
236 m->clear_payload();
237 ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
238 << " != " << connection->get_features() << dendl;
239 }
240 if (can_write == WriteStatus::CLOSED) {
241 ldout(cct, 10) << __func__ << " connection closed."
242 << " Drop message " << m << dendl;
243 m->put();
244 } else {
245 m->trace.event("async enqueueing message");
246 out_q[m->get_priority()].emplace_back(std::move(bl), m);
247 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
248 << dendl;
249 if (can_write != WriteStatus::REPLACING) {
250 connection->center->dispatch_event_external(connection->write_handler);
251 }
252 }
253 }
254
255 void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
256 bufferlist &bl) {
257 ldout(cct, 20) << __func__ << " m " << *m << dendl;
258
259 // associate message with Connection (for benefit of encode_payload)
260 if (m->empty_payload()) {
261 ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
262 << " " << *m << dendl;
263 } else {
264 ldout(cct, 20) << __func__ << " half-reencoding features " << features
265 << " " << m << " " << *m << dendl;
266 }
267
268 // encode and copy out of *m
269 m->encode(features, messenger->crcflags);
270
271 bl.append(m->get_payload());
272 bl.append(m->get_middle());
273 bl.append(m->get_data());
274 }
275
276 void ProtocolV1::send_keepalive() {
277 ldout(cct, 10) << __func__ << dendl;
278 std::lock_guard<std::mutex> l(connection->write_lock);
279 if (can_write != WriteStatus::CLOSED) {
280 keepalive = true;
281 connection->center->dispatch_event_external(connection->write_handler);
282 }
283 }
284
285 void ProtocolV1::read_event() {
286 ldout(cct, 20) << __func__ << dendl;
287 switch (state) {
288 case START_CONNECT:
289 CONTINUATION_RUN(CONTINUATION(send_client_banner));
290 break;
291 case START_ACCEPT:
292 CONTINUATION_RUN(CONTINUATION(send_server_banner));
293 break;
294 case OPENED:
295 CONTINUATION_RUN(CONTINUATION(wait_message));
296 break;
297 case THROTTLE_MESSAGE:
298 CONTINUATION_RUN(CONTINUATION(throttle_message));
299 break;
300 case THROTTLE_BYTES:
301 CONTINUATION_RUN(CONTINUATION(throttle_bytes));
302 break;
303 case THROTTLE_DISPATCH_QUEUE:
304 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
305 break;
306 default:
307 break;
308 }
309 }
310
311 void ProtocolV1::write_event() {
312 ldout(cct, 10) << __func__ << dendl;
313 ssize_t r = 0;
314
315 connection->write_lock.lock();
316 if (can_write == WriteStatus::CANWRITE) {
317 if (keepalive) {
318 append_keepalive_or_ack();
319 keepalive = false;
320 }
321
322 auto start = ceph::mono_clock::now();
323 bool more;
324 do {
325 bufferlist data;
326 Message *m = _get_next_outgoing(&data);
327 if (!m) {
328 break;
329 }
330
331 if (!connection->policy.lossy) {
332 // put on sent list
333 sent.push_back(m);
334 m->get();
335 }
336 more = !out_q.empty();
337 connection->write_lock.unlock();
338
339 // send_message or requeue messages may not encode message
340 if (!data.length()) {
341 prepare_send_message(connection->get_features(), m, data);
342 }
343
344 r = write_message(m, data, more);
345
346 connection->write_lock.lock();
347 if (r == 0) {
348 ;
349 } else if (r < 0) {
350 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
351 break;
352 } else if (r > 0)
353 break;
354 } while (can_write == WriteStatus::CANWRITE);
355 connection->write_lock.unlock();
356
357 // if r > 0 mean data still lefted, so no need _try_send.
358 if (r == 0) {
359 uint64_t left = ack_left;
360 if (left) {
361 ceph_le64 s;
362 s = in_seq;
363 connection->outcoming_bl.append(CEPH_MSGR_TAG_ACK);
364 connection->outcoming_bl.append((char *)&s, sizeof(s));
365 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
366 << " messages" << dendl;
367 ack_left -= left;
368 left = ack_left;
369 r = connection->_try_send(left);
370 } else if (is_queued()) {
371 r = connection->_try_send();
372 }
373 }
374
375 connection->logger->tinc(l_msgr_running_send_time,
376 ceph::mono_clock::now() - start);
377 if (r < 0) {
378 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
379 connection->lock.lock();
380 fault();
381 connection->lock.unlock();
382 return;
383 }
384 } else {
385 connection->write_lock.unlock();
386 connection->lock.lock();
387 connection->write_lock.lock();
388 if (state == STANDBY && !connection->policy.server && is_queued()) {
389 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
390 connection->_connect();
391 } else if (connection->cs && state != NONE && state != CLOSED &&
392 state != START_CONNECT) {
393 r = connection->_try_send();
394 if (r < 0) {
395 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
396 connection->write_lock.unlock();
397 fault();
398 connection->lock.unlock();
399 return;
400 }
401 }
402 connection->write_lock.unlock();
403 connection->lock.unlock();
404 }
405 }
406
407 bool ProtocolV1::is_queued() {
408 return !out_q.empty() || connection->is_queued();
409 }
410
411 void ProtocolV1::run_continuation(CtPtr pcontinuation) {
412 if (pcontinuation) {
413 CONTINUATION_RUN(*pcontinuation);
414 }
415 }
416
417 CtPtr ProtocolV1::read(CONTINUATION_RX_TYPE<ProtocolV1> &next,
418 int len, char *buffer) {
419 if (!buffer) {
420 buffer = temp_buffer;
421 }
422 ssize_t r = connection->read(len, buffer,
423 [&next, this](char *buffer, int r) {
424 next.setParams(buffer, r);
425 CONTINUATION_RUN(next);
426 });
427 if (r <= 0) {
428 next.setParams(buffer, r);
429 return &next;
430 }
431
432 return nullptr;
433 }
434
435 CtPtr ProtocolV1::write(CONTINUATION_TX_TYPE<ProtocolV1> &next,
436 bufferlist &buffer) {
437 ssize_t r = connection->write(buffer, [&next, this](int r) {
438 next.setParams(r);
439 CONTINUATION_RUN(next);
440 });
441 if (r <= 0) {
442 next.setParams(r);
443 return &next;
444 }
445
446 return nullptr;
447 }
448
449 CtPtr ProtocolV1::ready() {
450 ldout(cct, 25) << __func__ << dendl;
451
452 // make sure no pending tick timer
453 if (connection->last_tick_id) {
454 connection->center->delete_time_event(connection->last_tick_id);
455 }
456 connection->last_tick_id = connection->center->create_time_event(
457 connection->inactive_timeout_us, connection->tick_handler);
458
459 connection->write_lock.lock();
460 can_write = WriteStatus::CANWRITE;
461 if (is_queued()) {
462 connection->center->dispatch_event_external(connection->write_handler);
463 }
464 connection->write_lock.unlock();
465 connection->maybe_start_delay_thread();
466
467 state = OPENED;
468 return wait_message();
469 }
470
471 CtPtr ProtocolV1::wait_message() {
472 if (state != OPENED) { // must have changed due to a replace
473 return nullptr;
474 }
475
476 ldout(cct, 20) << __func__ << dendl;
477
478 return READ(sizeof(char), handle_message);
479 }
480
481 CtPtr ProtocolV1::handle_message(char *buffer, int r) {
482 ldout(cct, 20) << __func__ << " r=" << r << dendl;
483
484 if (r < 0) {
485 ldout(cct, 1) << __func__ << " read tag failed" << dendl;
486 return _fault();
487 }
488
489 char tag = buffer[0];
490 ldout(cct, 20) << __func__ << " process tag " << (int)tag << dendl;
491
492 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
493 ldout(cct, 20) << __func__ << " got KEEPALIVE" << dendl;
494 connection->set_last_keepalive(ceph_clock_now());
495 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
496 return READ(sizeof(ceph_timespec), handle_keepalive2);
497 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
498 return READ(sizeof(ceph_timespec), handle_keepalive2_ack);
499 } else if (tag == CEPH_MSGR_TAG_ACK) {
500 return READ(sizeof(ceph_le64), handle_tag_ack);
501 } else if (tag == CEPH_MSGR_TAG_MSG) {
502 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
503 ltt_recv_stamp = ceph_clock_now();
504 #endif
505 recv_stamp = ceph_clock_now();
506 ldout(cct, 20) << __func__ << " begin MSG" << dendl;
507 return READ(sizeof(ceph_msg_header), handle_message_header);
508 } else if (tag == CEPH_MSGR_TAG_CLOSE) {
509 ldout(cct, 20) << __func__ << " got CLOSE" << dendl;
510 stop();
511 } else {
512 ldout(cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
513 return _fault();
514 }
515 return nullptr;
516 }
517
518 CtPtr ProtocolV1::handle_keepalive2(char *buffer, int r) {
519 ldout(cct, 20) << __func__ << " r=" << r << dendl;
520
521 if (r < 0) {
522 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
523 return _fault();
524 }
525
526 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
527
528 ceph_timespec *t;
529 t = (ceph_timespec *)buffer;
530 utime_t kp_t = utime_t(*t);
531 connection->write_lock.lock();
532 append_keepalive_or_ack(true, &kp_t);
533 connection->write_lock.unlock();
534
535 ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
536 connection->set_last_keepalive(ceph_clock_now());
537
538 if (is_connected()) {
539 connection->center->dispatch_event_external(connection->write_handler);
540 }
541
542 return CONTINUE(wait_message);
543 }
544
545 void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
546 ldout(cct, 10) << __func__ << dendl;
547 if (ack) {
548 ceph_assert(tp);
549 struct ceph_timespec ts;
550 tp->encode_timeval(&ts);
551 connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
552 connection->outcoming_bl.append((char *)&ts, sizeof(ts));
553 } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
554 struct ceph_timespec ts;
555 utime_t t = ceph_clock_now();
556 t.encode_timeval(&ts);
557 connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
558 connection->outcoming_bl.append((char *)&ts, sizeof(ts));
559 } else {
560 connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
561 }
562 }
563
564 CtPtr ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
565 ldout(cct, 20) << __func__ << " r=" << r << dendl;
566
567 if (r < 0) {
568 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
569 return _fault();
570 }
571
572 ceph_timespec *t;
573 t = (ceph_timespec *)buffer;
574 connection->set_last_keepalive_ack(utime_t(*t));
575 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
576
577 return CONTINUE(wait_message);
578 }
579
580 CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
581 ldout(cct, 20) << __func__ << " r=" << r << dendl;
582
583 if (r < 0) {
584 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
585 return _fault();
586 }
587
588 ceph_le64 seq;
589 seq = *(ceph_le64 *)buffer;
590 ldout(cct, 20) << __func__ << " got ACK" << dendl;
591
592 ldout(cct, 15) << __func__ << " got ack seq " << seq << dendl;
593 // trim sent list
594 static const int max_pending = 128;
595 int i = 0;
596 Message *pending[max_pending];
597 connection->write_lock.lock();
598 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
599 Message *m = sent.front();
600 sent.pop_front();
601 pending[i++] = m;
602 ldout(cct, 10) << __func__ << " got ack seq " << seq
603 << " >= " << m->get_seq() << " on " << m << " " << *m
604 << dendl;
605 }
606 connection->write_lock.unlock();
607 for (int k = 0; k < i; k++) {
608 pending[k]->put();
609 }
610
611 return CONTINUE(wait_message);
612 }
613
614 CtPtr ProtocolV1::handle_message_header(char *buffer, int r) {
615 ldout(cct, 20) << __func__ << " r=" << r << dendl;
616
617 if (r < 0) {
618 ldout(cct, 1) << __func__ << " read message header failed" << dendl;
619 return _fault();
620 }
621
622 ldout(cct, 20) << __func__ << " got MSG header" << dendl;
623
624 current_header = *((ceph_msg_header *)buffer);
625
626 ldout(cct, 20) << __func__ << " got envelope type=" << current_header.type << " src "
627 << entity_name_t(current_header.src) << " front=" << current_header.front_len
628 << " data=" << current_header.data_len << " off " << current_header.data_off
629 << dendl;
630
631 if (messenger->crcflags & MSG_CRC_HEADER) {
632 __u32 header_crc = 0;
633 header_crc = ceph_crc32c(0, (unsigned char *)&current_header,
634 sizeof(current_header) - sizeof(current_header.crc));
635 // verify header crc
636 if (header_crc != current_header.crc) {
637 ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
638 << " != " << current_header.crc << dendl;
639 return _fault();
640 }
641 }
642
643 // Reset state
644 data_buf.clear();
645 front.clear();
646 middle.clear();
647 data.clear();
648
649 state = THROTTLE_MESSAGE;
650 return CONTINUE(throttle_message);
651 }
652
653 CtPtr ProtocolV1::throttle_message() {
654 ldout(cct, 20) << __func__ << dendl;
655
656 if (connection->policy.throttler_messages) {
657 ldout(cct, 10) << __func__ << " wants " << 1
658 << " message from policy throttler "
659 << connection->policy.throttler_messages->get_current()
660 << "/" << connection->policy.throttler_messages->get_max()
661 << dendl;
662 if (!connection->policy.throttler_messages->get_or_fail()) {
663 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
664 << connection->policy.throttler_messages->get_current()
665 << "/" << connection->policy.throttler_messages->get_max()
666 << " failed, just wait." << dendl;
667 // following thread pool deal with th full message queue isn't a
668 // short time, so we can wait a ms.
669 if (connection->register_time_events.empty()) {
670 connection->register_time_events.insert(
671 connection->center->create_time_event(1000,
672 connection->wakeup_handler));
673 }
674 return nullptr;
675 }
676 }
677
678 state = THROTTLE_BYTES;
679 return CONTINUE(throttle_bytes);
680 }
681
682 CtPtr ProtocolV1::throttle_bytes() {
683 ldout(cct, 20) << __func__ << dendl;
684
685 cur_msg_size = current_header.front_len + current_header.middle_len +
686 current_header.data_len;
687 if (cur_msg_size) {
688 if (connection->policy.throttler_bytes) {
689 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
690 << " bytes from policy throttler "
691 << connection->policy.throttler_bytes->get_current() << "/"
692 << connection->policy.throttler_bytes->get_max() << dendl;
693 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
694 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
695 << " bytes from policy throttler "
696 << connection->policy.throttler_bytes->get_current()
697 << "/" << connection->policy.throttler_bytes->get_max()
698 << " failed, just wait." << dendl;
699 // following thread pool deal with th full message queue isn't a
700 // short time, so we can wait a ms.
701 if (connection->register_time_events.empty()) {
702 connection->register_time_events.insert(
703 connection->center->create_time_event(
704 1000, connection->wakeup_handler));
705 }
706 return nullptr;
707 }
708 }
709 }
710
711 state = THROTTLE_DISPATCH_QUEUE;
712 return CONTINUE(throttle_dispatch_queue);
713 }
714
715 CtPtr ProtocolV1::throttle_dispatch_queue() {
716 ldout(cct, 20) << __func__ << dendl;
717
718 if (cur_msg_size) {
719 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
720 cur_msg_size)) {
721 ldout(cct, 10)
722 << __func__ << " wants " << cur_msg_size
723 << " bytes from dispatch throttle "
724 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
725 << connection->dispatch_queue->dispatch_throttler.get_max()
726 << " failed, just wait." << dendl;
727 // following thread pool deal with th full message queue isn't a
728 // short time, so we can wait a ms.
729 if (connection->register_time_events.empty()) {
730 connection->register_time_events.insert(
731 connection->center->create_time_event(1000,
732 connection->wakeup_handler));
733 }
734 return nullptr;
735 }
736 }
737
738 throttle_stamp = ceph_clock_now();
739
740 state = READ_MESSAGE_FRONT;
741 return read_message_front();
742 }
743
744 CtPtr ProtocolV1::read_message_front() {
745 ldout(cct, 20) << __func__ << dendl;
746
747 unsigned front_len = current_header.front_len;
748 if (front_len) {
749 if (!front.length()) {
750 front.push_back(buffer::create(front_len));
751 }
752 return READB(front_len, front.c_str(), handle_message_front);
753 }
754 return read_message_middle();
755 }
756
757 CtPtr ProtocolV1::handle_message_front(char *buffer, int r) {
758 ldout(cct, 20) << __func__ << " r=" << r << dendl;
759
760 if (r < 0) {
761 ldout(cct, 1) << __func__ << " read message front failed" << dendl;
762 return _fault();
763 }
764
765 ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
766
767 return read_message_middle();
768 }
769
770 CtPtr ProtocolV1::read_message_middle() {
771 ldout(cct, 20) << __func__ << dendl;
772
773 if (current_header.middle_len) {
774 if (!middle.length()) {
775 middle.push_back(buffer::create(current_header.middle_len));
776 }
777 return READB(current_header.middle_len, middle.c_str(),
778 handle_message_middle);
779 }
780
781 return read_message_data_prepare();
782 }
783
784 CtPtr ProtocolV1::handle_message_middle(char *buffer, int r) {
785 ldout(cct, 20) << __func__ << " r" << r << dendl;
786
787 if (r < 0) {
788 ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
789 return _fault();
790 }
791
792 ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
793
794 return read_message_data_prepare();
795 }
796
797 CtPtr ProtocolV1::read_message_data_prepare() {
798 ldout(cct, 20) << __func__ << dendl;
799
800 unsigned data_len = le32_to_cpu(current_header.data_len);
801 unsigned data_off = le32_to_cpu(current_header.data_off);
802
803 if (data_len) {
804 // get a buffer
805 #if 0
806 // rx_buffers is broken by design... see
807 // http://tracker.ceph.com/issues/22480
808 map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
809 connection->rx_buffers.find(current_header.tid);
810 if (p != connection->rx_buffers.end()) {
811 ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
812 << " at offset " << data_off << " len "
813 << p->second.first.length() << dendl;
814 data_buf = p->second.first;
815 // make sure it's big enough
816 if (data_buf.length() < data_len)
817 data_buf.push_back(buffer::create(data_len - data_buf.length()));
818 data_blp = data_buf.begin();
819 } else {
820 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
821 << data_off << dendl;
822 alloc_aligned_buffer(data_buf, data_len, data_off);
823 data_blp = data_buf.begin();
824 }
825 #else
826 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
827 << data_off << dendl;
828 alloc_aligned_buffer(data_buf, data_len, data_off);
829 data_blp = data_buf.begin();
830 #endif
831 }
832
833 msg_left = data_len;
834
835 return CONTINUE(read_message_data);
836 }
837
838 CtPtr ProtocolV1::read_message_data() {
839 ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
840
841 if (msg_left > 0) {
842 bufferptr bp = data_blp.get_current_ptr();
843 unsigned read_len = std::min(bp.length(), msg_left);
844
845 return READB(read_len, bp.c_str(), handle_message_data);
846 }
847
848 return read_message_footer();
849 }
850
851 CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
852 ldout(cct, 20) << __func__ << " r=" << r << dendl;
853
854 if (r < 0) {
855 ldout(cct, 1) << __func__ << " read data error " << dendl;
856 return _fault();
857 }
858
859 bufferptr bp = data_blp.get_current_ptr();
860 unsigned read_len = std::min(bp.length(), msg_left);
861 ceph_assert(read_len < std::numeric_limits<int>::max());
862 data_blp.advance(read_len);
863 data.append(bp, 0, read_len);
864 msg_left -= read_len;
865
866 return CONTINUE(read_message_data);
867 }
868
869 CtPtr ProtocolV1::read_message_footer() {
870 ldout(cct, 20) << __func__ << dendl;
871
872 state = READ_FOOTER_AND_DISPATCH;
873
874 unsigned len;
875 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
876 len = sizeof(ceph_msg_footer);
877 } else {
878 len = sizeof(ceph_msg_footer_old);
879 }
880
881 return READ(len, handle_message_footer);
882 }
883
884 CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
885 ldout(cct, 20) << __func__ << " r=" << r << dendl;
886
887 if (r < 0) {
888 ldout(cct, 1) << __func__ << " read footer data error " << dendl;
889 return _fault();
890 }
891
892 ceph_msg_footer footer;
893 ceph_msg_footer_old old_footer;
894
895 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
896 footer = *((ceph_msg_footer *)buffer);
897 } else {
898 old_footer = *((ceph_msg_footer_old *)buffer);
899 footer.front_crc = old_footer.front_crc;
900 footer.middle_crc = old_footer.middle_crc;
901 footer.data_crc = old_footer.data_crc;
902 footer.sig = 0;
903 footer.flags = old_footer.flags;
904 }
905
906 int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
907 ldout(cct, 10) << __func__ << " aborted = " << aborted << dendl;
908 if (aborted) {
909 ldout(cct, 0) << __func__ << " got " << front.length() << " + "
910 << middle.length() << " + " << data.length()
911 << " byte message.. ABORTED" << dendl;
912 return _fault();
913 }
914
915 ldout(cct, 20) << __func__ << " got " << front.length() << " + "
916 << middle.length() << " + " << data.length() << " byte message"
917 << dendl;
918 Message *message = decode_message(cct, messenger->crcflags, current_header,
919 footer, front, middle, data, connection);
920 if (!message) {
921 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
922 return _fault();
923 }
924
925 //
926 // Check the signature if one should be present. A zero return indicates
927 // success. PLR
928 //
929
930 if (session_security.get() == NULL) {
931 ldout(cct, 10) << __func__ << " no session security set" << dendl;
932 } else {
933 if (session_security->check_message_signature(message)) {
934 ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
935 message->put();
936 return _fault();
937 }
938 }
939 message->set_byte_throttler(connection->policy.throttler_bytes);
940 message->set_message_throttler(connection->policy.throttler_messages);
941
942 // store reservation size in message, so we don't get confused
943 // by messages entering the dispatch queue through other paths.
944 message->set_dispatch_throttle_size(cur_msg_size);
945
946 message->set_recv_stamp(recv_stamp);
947 message->set_throttle_stamp(throttle_stamp);
948 message->set_recv_complete_stamp(ceph_clock_now());
949
950 // check received seq#. if it is old, drop the message.
951 // note that incoming messages may skip ahead. this is convenient for the
952 // client side queueing because messages can't be renumbered, but the (kernel)
953 // client will occasionally pull a message out of the sent queue to send
954 // elsewhere. in that case it doesn't matter if we "got" it or not.
955 uint64_t cur_seq = in_seq;
956 if (message->get_seq() <= cur_seq) {
957 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
958 << " <= " << cur_seq << " " << message << " " << *message
959 << ", discarding" << dendl;
960 message->put();
961 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
962 cct->_conf->ms_die_on_old_message) {
963 ceph_assert(0 == "old msgs despite reconnect_seq feature");
964 }
965 return nullptr;
966 }
967 if (message->get_seq() > cur_seq + 1) {
968 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
969 << cur_seq << " to " << message->get_seq() << dendl;
970 if (cct->_conf->ms_die_on_skipped_message) {
971 ceph_assert(0 == "skipped incoming seq");
972 }
973 }
974
975 message->set_connection(connection);
976
977 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
978 if (message->get_type() == CEPH_MSG_OSD_OP ||
979 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
980 utime_t ltt_processed_stamp = ceph_clock_now();
981 double usecs_elapsed =
982 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
983 ostringstream buf;
984 if (message->get_type() == CEPH_MSG_OSD_OP)
985 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
986 false);
987 else
988 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
989 false);
990 }
991 #endif
992
993 // note last received message.
994 in_seq = message->get_seq();
995 ldout(cct, 5) << " rx " << message->get_source() << " seq "
996 << message->get_seq() << " " << message << " " << *message
997 << dendl;
998
999 bool need_dispatch_writer = false;
1000 if (!connection->policy.lossy) {
1001 ack_left++;
1002 need_dispatch_writer = true;
1003 }
1004
1005 state = OPENED;
1006
1007 connection->logger->inc(l_msgr_recv_messages);
1008 connection->logger->inc(
1009 l_msgr_recv_bytes,
1010 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1011
1012 messenger->ms_fast_preprocess(message);
1013 auto fast_dispatch_time = ceph::mono_clock::now();
1014 connection->logger->tinc(l_msgr_running_recv_time,
1015 fast_dispatch_time - connection->recv_start_time);
1016 if (connection->delay_state) {
1017 double delay_period = 0;
1018 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1019 delay_period =
1020 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1021 ldout(cct, 1) << "queue_received will delay after "
1022 << (ceph_clock_now() + delay_period) << " on " << message
1023 << " " << *message << dendl;
1024 }
1025 connection->delay_state->queue(delay_period, message);
1026 } else if (messenger->ms_can_fast_dispatch(message)) {
1027 connection->lock.unlock();
1028 connection->dispatch_queue->fast_dispatch(message);
1029 connection->recv_start_time = ceph::mono_clock::now();
1030 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1031 connection->recv_start_time - fast_dispatch_time);
1032 connection->lock.lock();
1033 } else {
1034 connection->dispatch_queue->enqueue(message, message->get_priority(),
1035 connection->conn_id);
1036 }
1037
1038 // clean up local buffer references
1039 data_buf.clear();
1040 front.clear();
1041 middle.clear();
1042 data.clear();
1043
1044 if (need_dispatch_writer && connection->is_connected()) {
1045 connection->center->dispatch_event_external(connection->write_handler);
1046 }
1047
1048 return CONTINUE(wait_message);
1049 }
1050
1051 void ProtocolV1::session_reset() {
1052 ldout(cct, 10) << __func__ << " started" << dendl;
1053
1054 std::lock_guard<std::mutex> l(connection->write_lock);
1055 if (connection->delay_state) {
1056 connection->delay_state->discard();
1057 }
1058
1059 connection->dispatch_queue->discard_queue(connection->conn_id);
1060 discard_out_queue();
1061 // note: we need to clear outcoming_bl here, but session_reset may be
1062 // called by other thread, so let caller clear this itself!
1063 // outcoming_bl.clear();
1064
1065 connection->dispatch_queue->queue_remote_reset(connection);
1066
1067 randomize_out_seq();
1068
1069 in_seq = 0;
1070 connect_seq = 0;
1071 // it's safe to directly set 0, double locked
1072 ack_left = 0;
1073 once_ready = false;
1074 can_write = WriteStatus::NOWRITE;
1075 }
1076
1077 void ProtocolV1::randomize_out_seq() {
1078 if (connection->get_features() & CEPH_FEATURE_MSG_AUTH) {
1079 // Set out_seq to a random value, so CRC won't be predictable.
1080 auto rand_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
1081 ldout(cct, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
1082 out_seq = rand_seq;
1083 } else {
1084 // previously, seq #'s always started at 0.
1085 out_seq = 0;
1086 }
1087 }
1088
1089 ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
1090 FUNCTRACE(cct);
1091 ceph_assert(connection->center->in_thread());
1092 m->set_seq(++out_seq);
1093
1094 if (messenger->crcflags & MSG_CRC_HEADER) {
1095 m->calc_header_crc();
1096 }
1097
1098 ceph_msg_header &header = m->get_header();
1099 ceph_msg_footer &footer = m->get_footer();
1100
1101 // TODO: let sign_message could be reentry?
1102 // Now that we have all the crcs calculated, handle the
1103 // digital signature for the message, if the AsyncConnection has session
1104 // security set up. Some session security options do not
1105 // actually calculate and check the signature, but they should
1106 // handle the calls to sign_message and check_signature. PLR
1107 if (session_security.get() == NULL) {
1108 ldout(cct, 20) << __func__ << " no session security" << dendl;
1109 } else {
1110 if (session_security->sign_message(m)) {
1111 ldout(cct, 20) << __func__ << " failed to sign m=" << m
1112 << "): sig = " << footer.sig << dendl;
1113 } else {
1114 ldout(cct, 20) << __func__ << " signed m=" << m
1115 << "): sig = " << footer.sig << dendl;
1116 }
1117 }
1118
1119 connection->outcoming_bl.append(CEPH_MSGR_TAG_MSG);
1120 connection->outcoming_bl.append((char *)&header, sizeof(header));
1121
1122 ldout(cct, 20) << __func__ << " sending message type=" << header.type
1123 << " src " << entity_name_t(header.src)
1124 << " front=" << header.front_len << " data=" << header.data_len
1125 << " off " << header.data_off << dendl;
1126
1127 if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
1128 for (const auto &pb : bl.buffers()) {
1129 connection->outcoming_bl.append((char *)pb.c_str(), pb.length());
1130 }
1131 } else {
1132 connection->outcoming_bl.claim_append(bl);
1133 }
1134
1135 // send footer; if receiver doesn't support signatures, use the old footer
1136 // format
1137 ceph_msg_footer_old old_footer;
1138 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
1139 connection->outcoming_bl.append((char *)&footer, sizeof(footer));
1140 } else {
1141 if (messenger->crcflags & MSG_CRC_HEADER) {
1142 old_footer.front_crc = footer.front_crc;
1143 old_footer.middle_crc = footer.middle_crc;
1144 old_footer.data_crc = footer.data_crc;
1145 } else {
1146 old_footer.front_crc = old_footer.middle_crc = 0;
1147 }
1148 old_footer.data_crc =
1149 messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
1150 old_footer.flags = footer.flags;
1151 connection->outcoming_bl.append((char *)&old_footer, sizeof(old_footer));
1152 }
1153
1154 m->trace.event("async writing message");
1155 ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
1156 << dendl;
1157 ssize_t total_send_size = connection->outcoming_bl.length();
1158 ssize_t rc = connection->_try_send(more);
1159 if (rc < 0) {
1160 ldout(cct, 1) << __func__ << " error sending " << m << ", "
1161 << cpp_strerror(rc) << dendl;
1162 } else {
1163 connection->logger->inc(
1164 l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
1165 ldout(cct, 10) << __func__ << " sending " << m
1166 << (rc ? " continuely." : " done.") << dendl;
1167 }
1168 if (m->get_type() == CEPH_MSG_OSD_OP)
1169 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
1170 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1171 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
1172 m->put();
1173
1174 return rc;
1175 }
1176
1177 void ProtocolV1::requeue_sent() {
1178 if (sent.empty()) {
1179 return;
1180 }
1181
1182 list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1183 out_seq -= sent.size();
1184 while (!sent.empty()) {
1185 Message *m = sent.back();
1186 sent.pop_back();
1187 ldout(cct, 10) << __func__ << " " << *m << " for resend "
1188 << " (" << m->get_seq() << ")" << dendl;
1189 rq.push_front(make_pair(bufferlist(), m));
1190 }
1191 }
1192
1193 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
1194 ldout(cct, 10) << __func__ << " " << seq << dendl;
1195 std::lock_guard<std::mutex> l(connection->write_lock);
1196 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
1197 return seq;
1198 }
1199 list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1200 uint64_t count = out_seq;
1201 while (!rq.empty()) {
1202 pair<bufferlist, Message *> p = rq.front();
1203 if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
1204 ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
1205 << p.second->get_seq() << " <= " << seq << ", discarding"
1206 << dendl;
1207 p.second->put();
1208 rq.pop_front();
1209 count++;
1210 }
1211 if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1212 return count;
1213 }
1214
1215 /*
1216 * Tears down the message queues, and removes them from the
1217 * DispatchQueue Must hold write_lock prior to calling.
1218 */
1219 void ProtocolV1::discard_out_queue() {
1220 ldout(cct, 10) << __func__ << " started" << dendl;
1221
1222 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
1223 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
1224 (*p)->put();
1225 }
1226 sent.clear();
1227 for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
1228 out_q.begin();
1229 p != out_q.end(); ++p) {
1230 for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
1231 r != p->second.end(); ++r) {
1232 ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
1233 r->second->put();
1234 }
1235 }
1236 out_q.clear();
1237 }
1238
1239 void ProtocolV1::reset_recv_state() {
1240 // clean up state internal variables and states
1241 if (state == CONNECTING_SEND_CONNECT_MSG) {
1242 if (authorizer) {
1243 delete authorizer;
1244 }
1245 authorizer = nullptr;
1246 }
1247
1248 // clean read and write callbacks
1249 connection->pendingReadLen.reset();
1250 connection->writeCallback.reset();
1251
1252 if (state > THROTTLE_MESSAGE && state <= READ_FOOTER_AND_DISPATCH &&
1253 connection->policy.throttler_messages) {
1254 ldout(cct, 10) << __func__ << " releasing " << 1
1255 << " message to policy throttler "
1256 << connection->policy.throttler_messages->get_current()
1257 << "/" << connection->policy.throttler_messages->get_max()
1258 << dendl;
1259 connection->policy.throttler_messages->put();
1260 }
1261 if (state > THROTTLE_BYTES && state <= READ_FOOTER_AND_DISPATCH) {
1262 if (connection->policy.throttler_bytes) {
1263 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
1264 << " bytes to policy throttler "
1265 << connection->policy.throttler_bytes->get_current() << "/"
1266 << connection->policy.throttler_bytes->get_max() << dendl;
1267 connection->policy.throttler_bytes->put(cur_msg_size);
1268 }
1269 }
1270 if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_FOOTER_AND_DISPATCH) {
1271 ldout(cct, 10)
1272 << __func__ << " releasing " << cur_msg_size
1273 << " bytes to dispatch_queue throttler "
1274 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1275 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
1276 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
1277 }
1278 }
1279
1280 Message *ProtocolV1::_get_next_outgoing(bufferlist *bl) {
1281 Message *m = 0;
1282 if (!out_q.empty()) {
1283 map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
1284 out_q.rbegin();
1285 ceph_assert(!it->second.empty());
1286 list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
1287 m = p->second;
1288 if (bl) bl->swap(p->first);
1289 it->second.erase(p);
1290 if (it->second.empty()) out_q.erase(it->first);
1291 }
1292 return m;
1293 }
1294
1295 /**
1296 * Client Protocol V1
1297 **/
1298
1299 CtPtr ProtocolV1::send_client_banner() {
1300 ldout(cct, 20) << __func__ << dendl;
1301 state = CONNECTING;
1302
1303 bufferlist bl;
1304 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1305 return WRITE(bl, handle_client_banner_write);
1306 }
1307
1308 CtPtr ProtocolV1::handle_client_banner_write(int r) {
1309 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1310
1311 if (r < 0) {
1312 ldout(cct, 1) << __func__ << " write client banner failed" << dendl;
1313 return _fault();
1314 }
1315 ldout(cct, 10) << __func__ << " connect write banner done: "
1316 << connection->get_peer_addr() << dendl;
1317
1318 return wait_server_banner();
1319 }
1320
1321 CtPtr ProtocolV1::wait_server_banner() {
1322 state = CONNECTING_WAIT_BANNER_AND_IDENTIFY;
1323
1324 ldout(cct, 20) << __func__ << dendl;
1325
1326 bufferlist myaddrbl;
1327 unsigned banner_len = strlen(CEPH_BANNER);
1328 unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
1329 return READ(need_len, handle_server_banner_and_identify);
1330 }
1331
1332 CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
1333 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1334
1335 if (r < 0) {
1336 ldout(cct, 1) << __func__ << " read banner and identify addresses failed"
1337 << dendl;
1338 return _fault();
1339 }
1340
1341 unsigned banner_len = strlen(CEPH_BANNER);
1342 if (memcmp(buffer, CEPH_BANNER, banner_len)) {
1343 ldout(cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
1344 << connection->get_peer_addr() << dendl;
1345 return _fault();
1346 }
1347
1348 bufferlist bl;
1349 entity_addr_t paddr, peer_addr_for_me;
1350
1351 bl.append(buffer + banner_len, sizeof(ceph_entity_addr) * 2);
1352 auto p = bl.cbegin();
1353 try {
1354 decode(paddr, p);
1355 decode(peer_addr_for_me, p);
1356 } catch (const buffer::error &e) {
1357 lderr(cct) << __func__ << " decode peer addr failed " << dendl;
1358 return _fault();
1359 }
1360 ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
1361 << " on socket " << connection->cs.fd() << dendl;
1362
1363 entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
1364 if (peer_addr != paddr) {
1365 if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
1366 peer_addr.get_nonce() == paddr.get_nonce()) {
1367 ldout(cct, 0) << __func__ << " connect claims to be " << paddr << " not "
1368 << peer_addr << " - presumably this is the same node!"
1369 << dendl;
1370 } else {
1371 ldout(cct, 10) << __func__ << " connect claims to be " << paddr << " not "
1372 << peer_addr << dendl;
1373 return _fault();
1374 }
1375 }
1376
1377 ldout(cct, 20) << __func__ << " connect peer addr for me is "
1378 << peer_addr_for_me << dendl;
1379 connection->lock.unlock();
1380 messenger->learned_addr(peer_addr_for_me);
1381 if (cct->_conf->ms_inject_internal_delays &&
1382 cct->_conf->ms_inject_socket_failures) {
1383 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1384 ldout(cct, 10) << __func__ << " sleep for "
1385 << cct->_conf->ms_inject_internal_delays << dendl;
1386 utime_t t;
1387 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1388 t.sleep();
1389 }
1390 }
1391
1392 connection->lock.lock();
1393 if (state != CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1394 ldout(cct, 1) << __func__
1395 << " state changed while learned_addr, mark_down or "
1396 << " replacing must be happened just now" << dendl;
1397 return nullptr;
1398 }
1399
1400 bufferlist myaddrbl;
1401 encode(messenger->get_myaddr_legacy(), myaddrbl, 0); // legacy
1402 return WRITE(myaddrbl, handle_my_addr_write);
1403 }
1404
1405 CtPtr ProtocolV1::handle_my_addr_write(int r) {
1406 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1407
1408 if (r < 0) {
1409 ldout(cct, 2) << __func__ << " connect couldn't write my addr, "
1410 << cpp_strerror(r) << dendl;
1411 return _fault();
1412 }
1413 ldout(cct, 10) << __func__ << " connect sent my addr "
1414 << messenger->get_myaddr_legacy() << dendl;
1415
1416 return CONTINUE(send_connect_message);
1417 }
1418
1419 CtPtr ProtocolV1::send_connect_message() {
1420 state = CONNECTING_SEND_CONNECT_MSG;
1421
1422 ldout(cct, 20) << __func__ << dendl;
1423
1424 if (!authorizer) {
1425 authorizer = messenger->ms_deliver_get_authorizer(connection->peer_type);
1426 }
1427
1428 ceph_msg_connect connect;
1429 connect.features = connection->policy.features_supported;
1430 connect.host_type = messenger->get_myname().type();
1431 connect.global_seq = global_seq;
1432 connect.connect_seq = connect_seq;
1433 connect.protocol_version =
1434 messenger->get_proto_version(connection->peer_type, true);
1435 connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1436 connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1437
1438 if (authorizer) {
1439 ldout(cct, 10) << __func__
1440 << " connect_msg.authorizer_len=" << connect.authorizer_len
1441 << " protocol=" << connect.authorizer_protocol << dendl;
1442 }
1443
1444 connect.flags = 0;
1445 if (connection->policy.lossy) {
1446 connect.flags |=
1447 CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1448 }
1449
1450 bufferlist bl;
1451 bl.append((char *)&connect, sizeof(connect));
1452 if (authorizer) {
1453 bl.append(authorizer->bl.c_str(), authorizer->bl.length());
1454 }
1455
1456 ldout(cct, 10) << __func__ << " connect sending gseq=" << global_seq
1457 << " cseq=" << connect_seq
1458 << " proto=" << connect.protocol_version << dendl;
1459
1460 return WRITE(bl, handle_connect_message_write);
1461 }
1462
1463 CtPtr ProtocolV1::handle_connect_message_write(int r) {
1464 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1465
1466 if (r < 0) {
1467 ldout(cct, 2) << __func__ << " connect couldn't send reply "
1468 << cpp_strerror(r) << dendl;
1469 return _fault();
1470 }
1471
1472 ldout(cct, 20) << __func__
1473 << " connect wrote (self +) cseq, waiting for reply" << dendl;
1474
1475 return wait_connect_reply();
1476 }
1477
1478 CtPtr ProtocolV1::wait_connect_reply() {
1479 ldout(cct, 20) << __func__ << dendl;
1480
1481 memset(&connect_reply, 0, sizeof(connect_reply));
1482 return READ(sizeof(connect_reply), handle_connect_reply_1);
1483 }
1484
1485 CtPtr ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
1486 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1487
1488 if (r < 0) {
1489 ldout(cct, 1) << __func__ << " read connect reply failed" << dendl;
1490 return _fault();
1491 }
1492
1493 connect_reply = *((ceph_msg_connect_reply *)buffer);
1494
1495 ldout(cct, 20) << __func__ << " connect got reply tag "
1496 << (int)connect_reply.tag << " connect_seq "
1497 << connect_reply.connect_seq << " global_seq "
1498 << connect_reply.global_seq << " proto "
1499 << connect_reply.protocol_version << " flags "
1500 << (int)connect_reply.flags << " features "
1501 << connect_reply.features << dendl;
1502
1503 if (connect_reply.authorizer_len) {
1504 return wait_connect_reply_auth();
1505 }
1506
1507 return handle_connect_reply_2();
1508 }
1509
1510 CtPtr ProtocolV1::wait_connect_reply_auth() {
1511 ldout(cct, 20) << __func__ << dendl;
1512
1513 ldout(cct, 10) << __func__
1514 << " reply.authorizer_len=" << connect_reply.authorizer_len
1515 << dendl;
1516
1517 ceph_assert(connect_reply.authorizer_len < 4096);
1518
1519 return READ(connect_reply.authorizer_len, handle_connect_reply_auth);
1520 }
1521
1522 CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
1523 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1524
1525 if (r < 0) {
1526 ldout(cct, 1) << __func__ << " read connect reply authorizer failed"
1527 << dendl;
1528 return _fault();
1529 }
1530
1531 bufferlist authorizer_reply;
1532 authorizer_reply.append(buffer, connect_reply.authorizer_len);
1533
1534 if (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
1535 ldout(cct, 10) << __func__ << " connect got auth challenge" << dendl;
1536 authorizer->add_challenge(cct, authorizer_reply);
1537 return CONTINUE(send_connect_message);
1538 }
1539
1540 auto iter = authorizer_reply.cbegin();
1541 if (authorizer && !authorizer->verify_reply(iter,
1542 nullptr /* connection_secret */)) {
1543 ldout(cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
1544 return _fault();
1545 }
1546
1547 return handle_connect_reply_2();
1548 }
1549
1550 CtPtr ProtocolV1::handle_connect_reply_2() {
1551 ldout(cct, 20) << __func__ << dendl;
1552
1553 if (connect_reply.tag == CEPH_MSGR_TAG_FEATURES) {
1554 ldout(cct, 0) << __func__ << " connect protocol feature mismatch, my "
1555 << std::hex << connection->policy.features_supported
1556 << " < peer " << connect_reply.features << " missing "
1557 << (connect_reply.features &
1558 ~connection->policy.features_supported)
1559 << std::dec << dendl;
1560 return _fault();
1561 }
1562
1563 if (connect_reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1564 ldout(cct, 0) << __func__ << " connect protocol version mismatch, my "
1565 << messenger->get_proto_version(connection->peer_type, true)
1566 << " != " << connect_reply.protocol_version << dendl;
1567 return _fault();
1568 }
1569
1570 if (connect_reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1571 ldout(cct, 0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1572 return _fault();
1573 }
1574
1575 if (connect_reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1576 ldout(cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1577 session_reset();
1578 connect_seq = 0;
1579
1580 // see session_reset
1581 connection->outcoming_bl.clear();
1582
1583 return CONTINUE(send_connect_message);
1584 }
1585
1586 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1587 global_seq = messenger->get_global_seq(connect_reply.global_seq);
1588 ldout(cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1589 << connect_reply.global_seq << " chose new " << global_seq
1590 << dendl;
1591 return CONTINUE(send_connect_message);
1592 }
1593
1594 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1595 ceph_assert(connect_reply.connect_seq > connect_seq);
1596 ldout(cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq
1597 << " -> " << connect_reply.connect_seq << dendl;
1598 connect_seq = connect_reply.connect_seq;
1599 return CONTINUE(send_connect_message);
1600 }
1601
1602 if (connect_reply.tag == CEPH_MSGR_TAG_WAIT) {
1603 ldout(cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1604 state = WAIT;
1605 return _fault();
1606 }
1607
1608 uint64_t feat_missing;
1609 feat_missing =
1610 connection->policy.features_required & ~(uint64_t)connect_reply.features;
1611 if (feat_missing) {
1612 ldout(cct, 1) << __func__ << " missing required features " << std::hex
1613 << feat_missing << std::dec << dendl;
1614 return _fault();
1615 }
1616
1617 if (connect_reply.tag == CEPH_MSGR_TAG_SEQ) {
1618 ldout(cct, 10)
1619 << __func__
1620 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1621 << dendl;
1622
1623 return wait_ack_seq();
1624 }
1625
1626 if (connect_reply.tag == CEPH_MSGR_TAG_READY) {
1627 ldout(cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1628 }
1629
1630 return client_ready();
1631 }
1632
1633 CtPtr ProtocolV1::wait_ack_seq() {
1634 ldout(cct, 20) << __func__ << dendl;
1635
1636 return READ(sizeof(uint64_t), handle_ack_seq);
1637 }
1638
1639 CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
1640 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1641
1642 if (r < 0) {
1643 ldout(cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1644 return _fault();
1645 }
1646
1647 uint64_t newly_acked_seq = 0;
1648
1649 newly_acked_seq = *((uint64_t *)buffer);
1650 ldout(cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1651 << " vs out_seq " << out_seq << dendl;
1652 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
1653
1654 bufferlist bl;
1655 uint64_t s = in_seq;
1656 bl.append((char *)&s, sizeof(s));
1657
1658 return WRITE(bl, handle_in_seq_write);
1659 }
1660
1661 CtPtr ProtocolV1::handle_in_seq_write(int r) {
1662 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1663
1664 if (r < 0) {
1665 ldout(cct, 10) << __func__ << " failed to send in_seq " << dendl;
1666 return _fault();
1667 }
1668
1669 ldout(cct, 10) << __func__ << " send in_seq done " << dendl;
1670
1671 return client_ready();
1672 }
1673
1674 CtPtr ProtocolV1::client_ready() {
1675 ldout(cct, 20) << __func__ << dendl;
1676
1677 // hooray!
1678 peer_global_seq = connect_reply.global_seq;
1679 connection->policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1680
1681 once_ready = true;
1682 connect_seq += 1;
1683 ceph_assert(connect_seq == connect_reply.connect_seq);
1684 backoff = utime_t();
1685 connection->set_features((uint64_t)connect_reply.features &
1686 (uint64_t)connection->policy.features_supported);
1687 ldout(cct, 10) << __func__ << " connect success " << connect_seq
1688 << ", lossy = " << connection->policy.lossy << ", features "
1689 << connection->get_features() << dendl;
1690
1691 // If we have an authorizer, get a new AuthSessionHandler to deal with
1692 // ongoing security of the connection. PLR
1693 if (authorizer != NULL) {
1694 ldout(cct, 10) << __func__ << " setting up session_security with auth "
1695 << authorizer << dendl;
1696 session_security.reset(get_auth_session_handler(
1697 cct, authorizer->protocol,
1698 authorizer->session_key,
1699 connection->get_features()));
1700 } else {
1701 // We have no authorizer, so we shouldn't be applying security to messages
1702 // in this AsyncConnection. PLR
1703 ldout(cct, 10) << __func__ << " no authorizer, clearing session_security"
1704 << dendl;
1705 session_security.reset();
1706 }
1707
1708 if (connection->delay_state) {
1709 ceph_assert(connection->delay_state->ready());
1710 }
1711 connection->dispatch_queue->queue_connect(connection);
1712 messenger->ms_deliver_handle_fast_connect(connection);
1713
1714 return ready();
1715 }
1716
1717 /**
1718 * Server Protocol V1
1719 **/
1720
1721 CtPtr ProtocolV1::send_server_banner() {
1722 ldout(cct, 20) << __func__ << dendl;
1723 state = ACCEPTING;
1724
1725 bufferlist bl;
1726
1727 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1728
1729 // as a server, we should have a legacy addr if we accepted this connection.
1730 auto legacy = messenger->get_myaddrs().legacy_addr();
1731 encode(legacy, bl, 0); // legacy
1732 connection->port = legacy.get_port();
1733 encode(connection->target_addr, bl, 0); // legacy
1734
1735 ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd()
1736 << " legacy " << legacy
1737 << " socket_addr " << connection->socket_addr
1738 << " target_addr " << connection->target_addr
1739 << dendl;
1740
1741 return WRITE(bl, handle_server_banner_write);
1742 }
1743
1744 CtPtr ProtocolV1::handle_server_banner_write(int r) {
1745 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1746
1747 if (r < 0) {
1748 ldout(cct, 1) << " write server banner failed" << dendl;
1749 return _fault();
1750 }
1751 ldout(cct, 10) << __func__ << " write banner and addr done: "
1752 << connection->get_peer_addr() << dendl;
1753
1754 return wait_client_banner();
1755 }
1756
1757 CtPtr ProtocolV1::wait_client_banner() {
1758 ldout(cct, 20) << __func__ << dendl;
1759
1760 return READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
1761 handle_client_banner);
1762 }
1763
1764 CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
1765 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1766
1767 if (r < 0) {
1768 ldout(cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1769 return _fault();
1770 }
1771
1772 if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1773 ldout(cct, 1) << __func__ << " accept peer sent bad banner '" << buffer
1774 << "' (should be '" << CEPH_BANNER << "')" << dendl;
1775 return _fault();
1776 }
1777
1778 bufferlist addr_bl;
1779 entity_addr_t peer_addr;
1780
1781 addr_bl.append(buffer + strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1782 try {
1783 auto ti = addr_bl.cbegin();
1784 decode(peer_addr, ti);
1785 } catch (const buffer::error &e) {
1786 lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
1787 return _fault();
1788 }
1789
1790 ldout(cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1791 if (peer_addr.is_blank_ip()) {
1792 // peer apparently doesn't know what ip they have; figure it out for them.
1793 int port = peer_addr.get_port();
1794 peer_addr.set_sockaddr(connection->target_addr.get_sockaddr());
1795 peer_addr.set_port(port);
1796
1797 ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1798 << " (socket is " << connection->target_addr << ")" << dendl;
1799 }
1800 connection->set_peer_addr(peer_addr); // so that connection_state gets set up
1801 connection->target_addr = peer_addr;
1802
1803 return CONTINUE(wait_connect_message);
1804 }
1805
1806 CtPtr ProtocolV1::wait_connect_message() {
1807 ldout(cct, 20) << __func__ << dendl;
1808
1809 memset(&connect_msg, 0, sizeof(connect_msg));
1810 return READ(sizeof(connect_msg), handle_connect_message_1);
1811 }
1812
1813 CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
1814 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1815
1816 if (r < 0) {
1817 ldout(cct, 1) << __func__ << " read connect msg failed" << dendl;
1818 return _fault();
1819 }
1820
1821 connect_msg = *((ceph_msg_connect *)buffer);
1822
1823 state = ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1824
1825 if (connect_msg.authorizer_len) {
1826 return wait_connect_message_auth();
1827 }
1828
1829 return handle_connect_message_2();
1830 }
1831
1832 CtPtr ProtocolV1::wait_connect_message_auth() {
1833 ldout(cct, 20) << __func__ << dendl;
1834 authorizer_buf.clear();
1835 authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
1836 return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
1837 handle_connect_message_auth);
1838 }
1839
1840 CtPtr ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
1841 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1842
1843 if (r < 0) {
1844 ldout(cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1845 return _fault();
1846 }
1847
1848 return handle_connect_message_2();
1849 }
1850
1851 CtPtr ProtocolV1::handle_connect_message_2() {
1852 ldout(cct, 20) << __func__ << dendl;
1853
1854 ldout(cct, 20) << __func__ << " accept got peer connect_seq "
1855 << connect_msg.connect_seq << " global_seq "
1856 << connect_msg.global_seq << dendl;
1857
1858 connection->set_peer_type(connect_msg.host_type);
1859 connection->policy = messenger->get_policy(connect_msg.host_type);
1860
1861 ldout(cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1862 << ", policy.lossy=" << connection->policy.lossy
1863 << " policy.server=" << connection->policy.server
1864 << " policy.standby=" << connection->policy.standby
1865 << " policy.resetcheck=" << connection->policy.resetcheck
1866 << " features 0x" << std::hex << (uint64_t)connect_msg.features
1867 << std::dec
1868 << dendl;
1869
1870 ceph_msg_connect_reply reply;
1871 bufferlist authorizer_reply;
1872
1873 memset(&reply, 0, sizeof(reply));
1874 reply.protocol_version =
1875 messenger->get_proto_version(connection->peer_type, false);
1876
1877 // mismatch?
1878 ldout(cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1879 << ", their proto " << connect_msg.protocol_version << dendl;
1880
1881 if (connect_msg.protocol_version != reply.protocol_version) {
1882 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
1883 authorizer_reply);
1884 }
1885
1886 // require signatures for cephx?
1887 if (connect_msg.authorizer_protocol == CEPH_AUTH_CEPHX) {
1888 if (connection->peer_type == CEPH_ENTITY_TYPE_OSD ||
1889 connection->peer_type == CEPH_ENTITY_TYPE_MDS) {
1890 if (cct->_conf->cephx_require_signatures ||
1891 cct->_conf->cephx_cluster_require_signatures) {
1892 ldout(cct, 10)
1893 << __func__
1894 << " using cephx, requiring MSG_AUTH feature bit for cluster"
1895 << dendl;
1896 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1897 }
1898 } else {
1899 if (cct->_conf->cephx_require_signatures ||
1900 cct->_conf->cephx_service_require_signatures) {
1901 ldout(cct, 10)
1902 << __func__
1903 << " using cephx, requiring MSG_AUTH feature bit for service"
1904 << dendl;
1905 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1906 }
1907 }
1908 }
1909
1910 uint64_t feat_missing =
1911 connection->policy.features_required & ~(uint64_t)connect_msg.features;
1912 if (feat_missing) {
1913 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
1914 << feat_missing << std::dec << dendl;
1915 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply,
1916 authorizer_reply);
1917 }
1918
1919 bufferlist auth_bl_copy = authorizer_buf;
1920 connection->lock.unlock();
1921 ldout(cct,10) << __func__ << " authorizor_protocol "
1922 << connect_msg.authorizer_protocol
1923 << " len " << auth_bl_copy.length()
1924 << dendl;
1925 bool authorizer_valid;
1926 bool need_challenge = HAVE_FEATURE(connect_msg.features, CEPHX_V2);
1927 bool had_challenge = (bool)authorizer_challenge;
1928 if (!messenger->ms_deliver_verify_authorizer(
1929 connection, connection->peer_type, connect_msg.authorizer_protocol,
1930 auth_bl_copy, authorizer_reply, authorizer_valid, session_key,
1931 nullptr /* connection_secret */,
1932 need_challenge ? &authorizer_challenge : nullptr) ||
1933 !authorizer_valid) {
1934 connection->lock.lock();
1935 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1936 ldout(cct, 1) << __func__
1937 << " state changed while accept, it must be mark_down"
1938 << dendl;
1939 ceph_assert(state == CLOSED);
1940 return _fault();
1941 }
1942
1943 if (need_challenge && !had_challenge && authorizer_challenge) {
1944 ldout(cct, 10) << __func__ << ": challenging authorizer" << dendl;
1945 ceph_assert(authorizer_reply.length());
1946 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER,
1947 reply, authorizer_reply);
1948 } else {
1949 ldout(cct, 0) << __func__ << ": got bad authorizer, auth_reply_len="
1950 << authorizer_reply.length() << dendl;
1951 session_security.reset();
1952 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
1953 authorizer_reply);
1954 }
1955 }
1956
1957 // We've verified the authorizer for this AsyncConnection, so set up the
1958 // session security structure. PLR
1959 ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
1960
1961 // existing?
1962 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
1963
1964 connection->inject_delay();
1965
1966 connection->lock.lock();
1967 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1968 ldout(cct, 1) << __func__
1969 << " state changed while accept, it must be mark_down"
1970 << dendl;
1971 ceph_assert(state == CLOSED);
1972 return _fault();
1973 }
1974
1975 if (existing == connection) {
1976 existing = nullptr;
1977 }
1978 if (existing && existing->protocol->proto_type != 1) {
1979 ldout(cct,1) << __func__ << " existing " << existing << " proto "
1980 << existing->protocol.get() << " version is "
1981 << existing->protocol->proto_type << ", marking down" << dendl;
1982 existing->mark_down();
1983 existing = nullptr;
1984 }
1985
1986 if (existing) {
1987 // There is no possible that existing connection will acquire this
1988 // connection's lock
1989 existing->lock.lock(); // skip lockdep check (we are locking a second
1990 // AsyncConnection here)
1991
1992 ldout(cct,10) << __func__ << " existing=" << existing << " exproto="
1993 << existing->protocol.get() << dendl;
1994 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
1995 ceph_assert(exproto);
1996 ceph_assert(exproto->proto_type == 1);
1997
1998 if (exproto->state == CLOSED) {
1999 ldout(cct, 1) << __func__ << " existing " << existing
2000 << " already closed." << dendl;
2001 existing->lock.unlock();
2002 existing = nullptr;
2003
2004 return open(reply, authorizer_reply);
2005 }
2006
2007 if (exproto->replacing) {
2008 ldout(cct, 1) << __func__
2009 << " existing racing replace happened while replacing."
2010 << " existing_state="
2011 << connection->get_state_name(existing->state) << dendl;
2012 reply.global_seq = exproto->peer_global_seq;
2013 existing->lock.unlock();
2014 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2015 authorizer_reply);
2016 }
2017
2018 if (connect_msg.global_seq < exproto->peer_global_seq) {
2019 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2020 << exproto->peer_global_seq << " > "
2021 << connect_msg.global_seq << ", RETRY_GLOBAL" << dendl;
2022 reply.global_seq = exproto->peer_global_seq; // so we can send it below..
2023 existing->lock.unlock();
2024 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2025 authorizer_reply);
2026 } else {
2027 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2028 << exproto->peer_global_seq
2029 << " <= " << connect_msg.global_seq << ", looks ok"
2030 << dendl;
2031 }
2032
2033 if (existing->policy.lossy) {
2034 ldout(cct, 0)
2035 << __func__
2036 << " accept replacing existing (lossy) channel (new one lossy="
2037 << connection->policy.lossy << ")" << dendl;
2038 exproto->session_reset();
2039 return replace(existing, reply, authorizer_reply);
2040 }
2041
2042 ldout(cct, 1) << __func__ << " accept connect_seq "
2043 << connect_msg.connect_seq
2044 << " vs existing csq=" << exproto->connect_seq
2045 << " existing_state="
2046 << connection->get_state_name(existing->state) << dendl;
2047
2048 if (connect_msg.connect_seq == 0 && exproto->connect_seq > 0) {
2049 ldout(cct, 0)
2050 << __func__
2051 << " accept peer reset, then tried to connect to us, replacing"
2052 << dendl;
2053 // this is a hard reset from peer
2054 is_reset_from_peer = true;
2055 if (connection->policy.resetcheck) {
2056 exproto->session_reset(); // this resets out_queue, msg_ and
2057 // connect_seq #'s
2058 }
2059 return replace(existing, reply, authorizer_reply);
2060 }
2061
2062 if (connect_msg.connect_seq < exproto->connect_seq) {
2063 // old attempt, or we sent READY but they didn't get it.
2064 ldout(cct, 10) << __func__ << " accept existing " << existing << ".cseq "
2065 << exproto->connect_seq << " > " << connect_msg.connect_seq
2066 << ", RETRY_SESSION" << dendl;
2067 reply.connect_seq = exproto->connect_seq + 1;
2068 existing->lock.unlock();
2069 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2070 authorizer_reply);
2071 }
2072
2073 if (connect_msg.connect_seq == exproto->connect_seq) {
2074 // if the existing connection successfully opened, and/or
2075 // subsequently went to standby, then the peer should bump
2076 // their connect_seq and retry: this is not a connection race
2077 // we need to resolve here.
2078 if (exproto->state == OPENED || exproto->state == STANDBY) {
2079 ldout(cct, 10) << __func__ << " accept connection race, existing "
2080 << existing << ".cseq " << exproto->connect_seq
2081 << " == " << connect_msg.connect_seq
2082 << ", OPEN|STANDBY, RETRY_SESSION " << dendl;
2083 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2084 // replace
2085 if (connection->policy.resetcheck && exproto->connect_seq == 0) {
2086 return replace(existing, reply, authorizer_reply);
2087 }
2088
2089 reply.connect_seq = exproto->connect_seq + 1;
2090 existing->lock.unlock();
2091 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2092 authorizer_reply);
2093 }
2094
2095 // connection race?
2096 if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr_legacy() ||
2097 existing->policy.server) {
2098 // incoming wins
2099 ldout(cct, 10) << __func__ << " accept connection race, existing "
2100 << existing << ".cseq " << exproto->connect_seq
2101 << " == " << connect_msg.connect_seq
2102 << ", or we are server, replacing my attempt" << dendl;
2103 return replace(existing, reply, authorizer_reply);
2104 } else {
2105 // our existing outgoing wins
2106 ldout(messenger->cct, 10)
2107 << __func__ << " accept connection race, existing " << existing
2108 << ".cseq " << exproto->connect_seq
2109 << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
2110 ceph_assert(connection->peer_addrs->legacy_addr() >
2111 messenger->get_myaddr_legacy());
2112 existing->lock.unlock();
2113 // make sure we follow through with opening the existing
2114 // connection (if it isn't yet open) since we know the peer
2115 // has something to send to us.
2116 existing->send_keepalive();
2117 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
2118 authorizer_reply);
2119 }
2120 }
2121
2122 ceph_assert(connect_msg.connect_seq > exproto->connect_seq);
2123 ceph_assert(connect_msg.global_seq >= exproto->peer_global_seq);
2124 if (connection->policy.resetcheck && // RESETSESSION only used by servers;
2125 // peers do not reset each other
2126 exproto->connect_seq == 0) {
2127 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2128 << connect_msg.connect_seq << ", " << existing
2129 << ".cseq = " << exproto->connect_seq
2130 << "), sending RESETSESSION " << dendl;
2131 existing->lock.unlock();
2132 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2133 authorizer_reply);
2134 }
2135
2136 // reconnect
2137 ldout(cct, 10) << __func__ << " accept peer sent cseq "
2138 << connect_msg.connect_seq << " > " << exproto->connect_seq
2139 << dendl;
2140 return replace(existing, reply, authorizer_reply);
2141 } // existing
2142 else if (!replacing && connect_msg.connect_seq > 0) {
2143 // we reset, and they are opening a new session
2144 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2145 << connect_msg.connect_seq << "), sending RESETSESSION"
2146 << dendl;
2147 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2148 authorizer_reply);
2149 } else {
2150 // new session
2151 ldout(cct, 10) << __func__ << " accept new session" << dendl;
2152 existing = nullptr;
2153 return open(reply, authorizer_reply);
2154 }
2155 }
2156
2157 CtPtr ProtocolV1::send_connect_message_reply(char tag,
2158 ceph_msg_connect_reply &reply,
2159 bufferlist &authorizer_reply) {
2160 ldout(cct, 20) << __func__ << dendl;
2161 bufferlist reply_bl;
2162 reply.tag = tag;
2163 reply.features =
2164 ((uint64_t)connect_msg.features & connection->policy.features_supported) |
2165 connection->policy.features_required;
2166 reply.authorizer_len = authorizer_reply.length();
2167 reply_bl.append((char *)&reply, sizeof(reply));
2168
2169 ldout(cct, 10) << __func__ << " reply features 0x" << std::hex
2170 << reply.features << " = (policy sup 0x"
2171 << connection->policy.features_supported
2172 << " & connect 0x" << (uint64_t)connect_msg.features
2173 << ") | policy req 0x"
2174 << connection->policy.features_required
2175 << dendl;
2176
2177 if (reply.authorizer_len) {
2178 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2179 authorizer_reply.clear();
2180 }
2181
2182 return WRITE(reply_bl, handle_connect_message_reply_write);
2183 }
2184
2185 CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
2186 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2187
2188 if (r < 0) {
2189 ldout(cct, 1) << " write connect message reply failed" << dendl;
2190 connection->inject_delay();
2191 return _fault();
2192 }
2193
2194 return CONTINUE(wait_connect_message);
2195 }
2196
2197 CtPtr ProtocolV1::replace(AsyncConnectionRef existing,
2198 ceph_msg_connect_reply &reply,
2199 bufferlist &authorizer_reply) {
2200 ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
2201
2202 connection->inject_delay();
2203 if (existing->policy.lossy) {
2204 // disconnect from the Connection
2205 ldout(cct, 1) << __func__ << " replacing on lossy channel, failing existing"
2206 << dendl;
2207 existing->protocol->stop();
2208 existing->dispatch_queue->queue_reset(existing.get());
2209 } else {
2210 ceph_assert(can_write == WriteStatus::NOWRITE);
2211 existing->write_lock.lock();
2212
2213 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2214
2215 // reset the in_seq if this is a hard reset from peer,
2216 // otherwise we respect our original connection's value
2217 if (is_reset_from_peer) {
2218 exproto->is_reset_from_peer = true;
2219 }
2220
2221 connection->center->delete_file_event(connection->cs.fd(),
2222 EVENT_READABLE | EVENT_WRITABLE);
2223
2224 if (existing->delay_state) {
2225 existing->delay_state->flush();
2226 ceph_assert(!connection->delay_state);
2227 }
2228 exproto->reset_recv_state();
2229
2230 exproto->connect_msg.features = connect_msg.features;
2231
2232 auto temp_cs = std::move(connection->cs);
2233 EventCenter *new_center = connection->center;
2234 Worker *new_worker = connection->worker;
2235 // avoid _stop shutdown replacing socket
2236 // queue a reset on the new connection, which we're dumping for the old
2237 stop();
2238
2239 connection->dispatch_queue->queue_reset(connection);
2240 ldout(messenger->cct, 1)
2241 << __func__ << " stop myself to swap existing" << dendl;
2242 exproto->can_write = WriteStatus::REPLACING;
2243 exproto->replacing = true;
2244 existing->state_offset = 0;
2245 // avoid previous thread modify event
2246 exproto->state = NONE;
2247 existing->state = AsyncConnection::STATE_NONE;
2248 // Discard existing prefetch buffer in `recv_buf`
2249 existing->recv_start = existing->recv_end = 0;
2250 // there shouldn't exist any buffer
2251 ceph_assert(connection->recv_start == connection->recv_end);
2252
2253 exproto->authorizer_challenge.reset();
2254
2255 auto deactivate_existing = std::bind(
2256 [existing, new_worker, new_center, exproto, reply,
2257 authorizer_reply](ConnectedSocket &cs) mutable {
2258 // we need to delete time event in original thread
2259 {
2260 std::lock_guard<std::mutex> l(existing->lock);
2261 existing->write_lock.lock();
2262 exproto->requeue_sent();
2263 existing->outcoming_bl.clear();
2264 existing->open_write = false;
2265 existing->write_lock.unlock();
2266 if (exproto->state == NONE) {
2267 existing->shutdown_socket();
2268 existing->cs = std::move(cs);
2269 existing->worker->references--;
2270 new_worker->references++;
2271 existing->logger = new_worker->get_perf_counter();
2272 existing->worker = new_worker;
2273 existing->center = new_center;
2274 if (existing->delay_state)
2275 existing->delay_state->set_center(new_center);
2276 } else if (exproto->state == CLOSED) {
2277 auto back_to_close =
2278 std::bind([](ConnectedSocket &cs) mutable { cs.close(); },
2279 std::move(cs));
2280 new_center->submit_to(new_center->get_id(),
2281 std::move(back_to_close), true);
2282 return;
2283 } else {
2284 ceph_abort();
2285 }
2286 }
2287
2288 // Before changing existing->center, it may already exists some
2289 // events in existing->center's queue. Then if we mark down
2290 // `existing`, it will execute in another thread and clean up
2291 // connection. Previous event will result in segment fault
2292 auto transfer_existing = [existing, exproto, reply,
2293 authorizer_reply]() mutable {
2294 std::lock_guard<std::mutex> l(existing->lock);
2295 if (exproto->state == CLOSED) return;
2296 ceph_assert(exproto->state == NONE);
2297
2298 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2299 exproto->state = ACCEPTING;
2300
2301 existing->center->create_file_event(
2302 existing->cs.fd(), EVENT_READABLE, existing->read_handler);
2303 reply.global_seq = exproto->peer_global_seq;
2304 exproto->run_continuation(exproto->send_connect_message_reply(
2305 CEPH_MSGR_TAG_RETRY_GLOBAL, reply, authorizer_reply));
2306 };
2307 if (existing->center->in_thread())
2308 transfer_existing();
2309 else
2310 existing->center->submit_to(existing->center->get_id(),
2311 std::move(transfer_existing), true);
2312 },
2313 std::move(temp_cs));
2314
2315 existing->center->submit_to(existing->center->get_id(),
2316 std::move(deactivate_existing), true);
2317 existing->write_lock.unlock();
2318 existing->lock.unlock();
2319 return nullptr;
2320 }
2321 existing->lock.unlock();
2322
2323 return open(reply, authorizer_reply);
2324 }
2325
2326 CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
2327 bufferlist &authorizer_reply) {
2328 ldout(cct, 20) << __func__ << dendl;
2329
2330 connect_seq = connect_msg.connect_seq + 1;
2331 peer_global_seq = connect_msg.global_seq;
2332 ldout(cct, 10) << __func__ << " accept success, connect_seq = " << connect_seq
2333 << " in_seq=" << in_seq << ", sending READY" << dendl;
2334
2335 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2336 // in/out sequence
2337 if ((connect_msg.features & CEPH_FEATURE_RECONNECT_SEQ) &&
2338 !is_reset_from_peer) {
2339 reply.tag = CEPH_MSGR_TAG_SEQ;
2340 wait_for_seq = true;
2341 } else {
2342 reply.tag = CEPH_MSGR_TAG_READY;
2343 wait_for_seq = false;
2344 out_seq = discard_requeued_up_to(out_seq, 0);
2345 is_reset_from_peer = false;
2346 in_seq = 0;
2347 }
2348
2349 // send READY reply
2350 reply.features = connection->policy.features_supported;
2351 reply.global_seq = messenger->get_global_seq();
2352 reply.connect_seq = connect_seq;
2353 reply.flags = 0;
2354 reply.authorizer_len = authorizer_reply.length();
2355 if (connection->policy.lossy) {
2356 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
2357 }
2358
2359 connection->set_features((uint64_t)reply.features &
2360 (uint64_t)connect_msg.features);
2361 ldout(cct, 10) << __func__ << " accept features "
2362 << connection->get_features()
2363 << " authorizer_protocol "
2364 << connect_msg.authorizer_protocol << dendl;
2365
2366 session_security.reset(
2367 get_auth_session_handler(cct, connect_msg.authorizer_protocol,
2368 session_key,
2369 connection->get_features()));
2370
2371 bufferlist reply_bl;
2372 reply_bl.append((char *)&reply, sizeof(reply));
2373
2374 if (reply.authorizer_len) {
2375 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2376 }
2377
2378 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
2379 uint64_t s = in_seq;
2380 reply_bl.append((char *)&s, sizeof(s));
2381 }
2382
2383 connection->lock.unlock();
2384 // Because "replacing" will prevent other connections preempt this addr,
2385 // it's safe that here we don't acquire Connection's lock
2386 ssize_t r = messenger->accept_conn(connection);
2387
2388 connection->inject_delay();
2389
2390 connection->lock.lock();
2391 replacing = false;
2392 if (r < 0) {
2393 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2394 << connection->peer_addrs->legacy_addr()
2395 << " just fail later one(this)" << dendl;
2396 ldout(cct, 10) << "accept fault after register" << dendl;
2397 connection->inject_delay();
2398 return _fault();
2399 }
2400 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2401 ldout(cct, 1) << __func__
2402 << " state changed while accept_conn, it must be mark_down"
2403 << dendl;
2404 ceph_assert(state == CLOSED || state == NONE);
2405 ldout(cct, 10) << "accept fault after register" << dendl;
2406 messenger->unregister_conn(connection);
2407 connection->inject_delay();
2408 return _fault();
2409 }
2410
2411 return WRITE(reply_bl, handle_ready_connect_message_reply_write);
2412 }
2413
2414 CtPtr ProtocolV1::handle_ready_connect_message_reply_write(int r) {
2415 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2416
2417 if (r < 0) {
2418 ldout(cct, 1) << __func__ << " write ready connect message reply failed"
2419 << dendl;
2420 return _fault();
2421 }
2422
2423 // notify
2424 connection->dispatch_queue->queue_accept(connection);
2425 messenger->ms_deliver_handle_fast_accept(connection);
2426 once_ready = true;
2427
2428 state = ACCEPTING_HANDLED_CONNECT_MSG;
2429
2430 if (wait_for_seq) {
2431 return wait_seq();
2432 }
2433
2434 return server_ready();
2435 }
2436
2437 CtPtr ProtocolV1::wait_seq() {
2438 ldout(cct, 20) << __func__ << dendl;
2439
2440 return READ(sizeof(uint64_t), handle_seq);
2441 }
2442
2443 CtPtr ProtocolV1::handle_seq(char *buffer, int r) {
2444 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2445
2446 if (r < 0) {
2447 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
2448 return _fault();
2449 }
2450
2451 uint64_t newly_acked_seq = *(uint64_t *)buffer;
2452 ldout(cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq
2453 << dendl;
2454 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
2455
2456 return server_ready();
2457 }
2458
2459 CtPtr ProtocolV1::server_ready() {
2460 ldout(cct, 20) << __func__ << " session_security is "
2461 << session_security
2462 << dendl;
2463
2464 ldout(cct, 20) << __func__ << " accept done" << dendl;
2465 memset(&connect_msg, 0, sizeof(connect_msg));
2466
2467 if (connection->delay_state) {
2468 ceph_assert(connection->delay_state->ready());
2469 }
2470
2471 return ready();
2472 }