]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/ProtocolV1.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / msg / async / ProtocolV1.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV1.h"
5
6 #include "common/errno.h"
7
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
12
13 #define dout_subsys ceph_subsys_ms
14 #undef dout_prefix
15 #define dout_prefix _conn_prefix(_dout)
16 ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
17 return *_dout << "--1- " << messenger->get_myaddrs() << " >> "
18 << *connection->peer_addrs
19 << " conn("
20 << connection << " " << this
21 << " :" << connection->port << " s=" << get_state_name(state)
22 << " pgs=" << peer_global_seq << " cs=" << connect_seq
23 << " l=" << connection->policy.lossy << ").";
24 }
25
26 #define WRITE(B, C) write(CONTINUATION(C), B)
27
28 #define READ(L, C) read(CONTINUATION(C), L)
29
30 #define READB(L, B, C) read(CONTINUATION(C), L, B)
31
32 // Constant to limit starting sequence number to 2^31. Nothing special about
33 // it, just a big number. PLR
34 #define SEQ_MASK 0x7fffffff
35
36 const int ASYNC_COALESCE_THRESHOLD = 256;
37
38 using namespace std;
39
40 static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
41 // create a buffer to read into that matches the data alignment
42 unsigned alloc_len = 0;
43 unsigned left = len;
44 unsigned head = 0;
45 if (off & ~CEPH_PAGE_MASK) {
46 // head
47 alloc_len += CEPH_PAGE_SIZE;
48 head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
49 left -= head;
50 }
51 alloc_len += left;
52 bufferptr ptr(buffer::create_small_page_aligned(alloc_len));
53 if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
54 data.push_back(std::move(ptr));
55 }
56
57 /**
58 * Protocol V1
59 **/
60
61 ProtocolV1::ProtocolV1(AsyncConnection *connection)
62 : Protocol(1, connection),
63 temp_buffer(nullptr),
64 can_write(WriteStatus::NOWRITE),
65 keepalive(false),
66 connect_seq(0),
67 peer_global_seq(0),
68 msg_left(0),
69 cur_msg_size(0),
70 replacing(false),
71 is_reset_from_peer(false),
72 once_ready(false),
73 state(NONE),
74 global_seq(0),
75 authorizer(nullptr),
76 wait_for_seq(false) {
77 temp_buffer = new char[4096];
78 }
79
80 ProtocolV1::~ProtocolV1() {
81 ceph_assert(out_q.empty());
82 ceph_assert(sent.empty());
83
84 delete[] temp_buffer;
85
86 if (authorizer) {
87 delete authorizer;
88 }
89 }
90
91 void ProtocolV1::connect() {
92 this->state = START_CONNECT;
93
94 // reset connect state variables
95 if (authorizer) {
96 delete authorizer;
97 authorizer = nullptr;
98 }
99 authorizer_buf.clear();
100 memset(&connect_msg, 0, sizeof(connect_msg));
101 memset(&connect_reply, 0, sizeof(connect_reply));
102
103 global_seq = messenger->get_global_seq();
104 }
105
106 void ProtocolV1::accept() { this->state = START_ACCEPT; }
107
108 bool ProtocolV1::is_connected() {
109 return can_write.load() == WriteStatus::CANWRITE;
110 }
111
112 void ProtocolV1::stop() {
113 ldout(cct, 20) << __func__ << dendl;
114 if (state == CLOSED) {
115 return;
116 }
117
118 if (connection->delay_state) connection->delay_state->flush();
119
120 ldout(cct, 2) << __func__ << dendl;
121 std::lock_guard<std::mutex> l(connection->write_lock);
122
123 reset_recv_state();
124 discard_out_queue();
125
126 connection->_stop();
127
128 can_write = WriteStatus::CLOSED;
129 state = CLOSED;
130 }
131
132 void ProtocolV1::fault() {
133 ldout(cct, 20) << __func__ << dendl;
134
135 if (state == CLOSED || state == NONE) {
136 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
137 return;
138 }
139
140 if (connection->policy.lossy && state != START_CONNECT &&
141 state != CONNECTING) {
142 ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
143 stop();
144 connection->dispatch_queue->queue_reset(connection);
145 return;
146 }
147
148 connection->write_lock.lock();
149 can_write = WriteStatus::NOWRITE;
150 is_reset_from_peer = false;
151
152 // requeue sent items
153 requeue_sent();
154
155 if (!once_ready && out_q.empty() && state >= START_ACCEPT &&
156 state <= ACCEPTING_WAIT_CONNECT_MSG_AUTH && !replacing) {
157 ldout(cct, 10) << __func__ << " with nothing to send and in the half "
158 << " accept state just closed" << dendl;
159 connection->write_lock.unlock();
160 stop();
161 connection->dispatch_queue->queue_reset(connection);
162 return;
163 }
164 replacing = false;
165
166 connection->fault();
167
168 reset_recv_state();
169
170 if (connection->policy.standby && out_q.empty() && !keepalive &&
171 state != WAIT) {
172 ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
173 << dendl;
174 state = STANDBY;
175 connection->write_lock.unlock();
176 return;
177 }
178
179 connection->write_lock.unlock();
180
181 if ((state >= START_CONNECT && state <= CONNECTING_SEND_CONNECT_MSG) ||
182 state == WAIT) {
183 // backoff!
184 if (state == WAIT) {
185 backoff.set_from_double(cct->_conf->ms_max_backoff);
186 } else if (backoff == utime_t()) {
187 backoff.set_from_double(cct->_conf->ms_initial_backoff);
188 } else {
189 backoff += backoff;
190 if (backoff > cct->_conf->ms_max_backoff)
191 backoff.set_from_double(cct->_conf->ms_max_backoff);
192 }
193
194 global_seq = messenger->get_global_seq();
195 state = START_CONNECT;
196 connection->state = AsyncConnection::STATE_CONNECTING;
197 ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
198 // woke up again;
199 connection->register_time_events.insert(
200 connection->center->create_time_event(backoff.to_nsec() / 1000,
201 connection->wakeup_handler));
202 } else {
203 // policy maybe empty when state is in accept
204 if (connection->policy.server) {
205 ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
206 state = STANDBY;
207 } else {
208 ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
209 connect_seq++;
210 global_seq = messenger->get_global_seq();
211 state = START_CONNECT;
212 connection->state = AsyncConnection::STATE_CONNECTING;
213 }
214 backoff = utime_t();
215 connection->center->dispatch_event_external(connection->read_handler);
216 }
217 }
218
219 void ProtocolV1::send_message(Message *m) {
220 bufferlist bl;
221 uint64_t f = connection->get_features();
222
223 // TODO: Currently not all messages supports reencode like MOSDMap, so here
224 // only let fast dispatch support messages prepare message
225 bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
226 if (can_fast_prepare) {
227 prepare_send_message(f, m, bl);
228 }
229
230 std::lock_guard<std::mutex> l(connection->write_lock);
231 // "features" changes will change the payload encoding
232 if (can_fast_prepare &&
233 (can_write == WriteStatus::NOWRITE || connection->get_features() != f)) {
234 // ensure the correctness of message encoding
235 bl.clear();
236 m->clear_payload();
237 ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
238 << " != " << connection->get_features() << dendl;
239 }
240 if (can_write == WriteStatus::CLOSED) {
241 ldout(cct, 10) << __func__ << " connection closed."
242 << " Drop message " << m << dendl;
243 m->put();
244 } else {
245 m->trace.event("async enqueueing message");
246 out_q[m->get_priority()].emplace_back(std::move(bl), m);
247 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
248 << dendl;
249 if (can_write != WriteStatus::REPLACING) {
250 connection->center->dispatch_event_external(connection->write_handler);
251 }
252 }
253 }
254
255 void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
256 bufferlist &bl) {
257 ldout(cct, 20) << __func__ << " m " << *m << dendl;
258
259 // associate message with Connection (for benefit of encode_payload)
260 if (m->empty_payload()) {
261 ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
262 << " " << *m << dendl;
263 } else {
264 ldout(cct, 20) << __func__ << " half-reencoding features " << features
265 << " " << m << " " << *m << dendl;
266 }
267
268 // encode and copy out of *m
269 m->encode(features, messenger->crcflags);
270
271 bl.append(m->get_payload());
272 bl.append(m->get_middle());
273 bl.append(m->get_data());
274 }
275
276 void ProtocolV1::send_keepalive() {
277 ldout(cct, 10) << __func__ << dendl;
278 std::lock_guard<std::mutex> l(connection->write_lock);
279 if (can_write != WriteStatus::CLOSED) {
280 keepalive = true;
281 connection->center->dispatch_event_external(connection->write_handler);
282 }
283 }
284
285 void ProtocolV1::read_event() {
286 ldout(cct, 20) << __func__ << dendl;
287 switch (state) {
288 case START_CONNECT:
289 CONTINUATION_RUN(CONTINUATION(send_client_banner));
290 break;
291 case START_ACCEPT:
292 CONTINUATION_RUN(CONTINUATION(send_server_banner));
293 break;
294 case OPENED:
295 CONTINUATION_RUN(CONTINUATION(wait_message));
296 break;
297 case THROTTLE_MESSAGE:
298 CONTINUATION_RUN(CONTINUATION(throttle_message));
299 break;
300 case THROTTLE_BYTES:
301 CONTINUATION_RUN(CONTINUATION(throttle_bytes));
302 break;
303 case THROTTLE_DISPATCH_QUEUE:
304 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
305 break;
306 default:
307 break;
308 }
309 }
310
311 void ProtocolV1::write_event() {
312 ldout(cct, 10) << __func__ << dendl;
313 ssize_t r = 0;
314
315 connection->write_lock.lock();
316 if (can_write == WriteStatus::CANWRITE) {
317 if (keepalive) {
318 append_keepalive_or_ack();
319 keepalive = false;
320 }
321
322 auto start = ceph::mono_clock::now();
323 bool more;
324 do {
325 bufferlist data;
326 Message *m = _get_next_outgoing(&data);
327 if (!m) {
328 break;
329 }
330
331 if (!connection->policy.lossy) {
332 // put on sent list
333 sent.push_back(m);
334 m->get();
335 }
336 more = !out_q.empty();
337 connection->write_lock.unlock();
338
339 // send_message or requeue messages may not encode message
340 if (!data.length()) {
341 prepare_send_message(connection->get_features(), m, data);
342 }
343
344 r = write_message(m, data, more);
345
346 connection->write_lock.lock();
347 if (r == 0) {
348 ;
349 } else if (r < 0) {
350 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
351 break;
352 } else if (r > 0)
353 break;
354 } while (can_write == WriteStatus::CANWRITE);
355 connection->write_lock.unlock();
356
357 // if r > 0 mean data still lefted, so no need _try_send.
358 if (r == 0) {
359 uint64_t left = ack_left;
360 if (left) {
361 ceph_le64 s;
362 s = in_seq;
363 connection->outcoming_bl.append(CEPH_MSGR_TAG_ACK);
364 connection->outcoming_bl.append((char *)&s, sizeof(s));
365 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
366 << " messages" << dendl;
367 ack_left -= left;
368 left = ack_left;
369 r = connection->_try_send(left);
370 } else if (is_queued()) {
371 r = connection->_try_send();
372 }
373 }
374
375 connection->logger->tinc(l_msgr_running_send_time,
376 ceph::mono_clock::now() - start);
377 if (r < 0) {
378 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
379 connection->lock.lock();
380 fault();
381 connection->lock.unlock();
382 return;
383 }
384 } else {
385 connection->write_lock.unlock();
386 connection->lock.lock();
387 connection->write_lock.lock();
388 if (state == STANDBY && !connection->policy.server && is_queued()) {
389 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
390 connection->_connect();
391 } else if (connection->cs && state != NONE && state != CLOSED &&
392 state != START_CONNECT) {
393 r = connection->_try_send();
394 if (r < 0) {
395 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
396 connection->write_lock.unlock();
397 fault();
398 connection->lock.unlock();
399 return;
400 }
401 }
402 connection->write_lock.unlock();
403 connection->lock.unlock();
404 }
405 }
406
407 bool ProtocolV1::is_queued() {
408 return !out_q.empty() || connection->is_queued();
409 }
410
411 void ProtocolV1::run_continuation(CtPtr pcontinuation) {
412 if (pcontinuation) {
413 CONTINUATION_RUN(*pcontinuation);
414 }
415 }
416
417 CtPtr ProtocolV1::read(CONTINUATION_RX_TYPE<ProtocolV1> &next,
418 int len, char *buffer) {
419 if (!buffer) {
420 buffer = temp_buffer;
421 }
422 ssize_t r = connection->read(len, buffer,
423 [&next, this](char *buffer, int r) {
424 next.setParams(buffer, r);
425 CONTINUATION_RUN(next);
426 });
427 if (r <= 0) {
428 next.setParams(buffer, r);
429 return &next;
430 }
431
432 return nullptr;
433 }
434
435 CtPtr ProtocolV1::write(CONTINUATION_TX_TYPE<ProtocolV1> &next,
436 bufferlist &buffer) {
437 ssize_t r = connection->write(buffer, [&next, this](int r) {
438 next.setParams(r);
439 CONTINUATION_RUN(next);
440 });
441 if (r <= 0) {
442 next.setParams(r);
443 return &next;
444 }
445
446 return nullptr;
447 }
448
449 CtPtr ProtocolV1::ready() {
450 ldout(cct, 25) << __func__ << dendl;
451
452 // make sure no pending tick timer
453 if (connection->last_tick_id) {
454 connection->center->delete_time_event(connection->last_tick_id);
455 }
456 connection->last_tick_id = connection->center->create_time_event(
457 connection->inactive_timeout_us, connection->tick_handler);
458
459 connection->write_lock.lock();
460 can_write = WriteStatus::CANWRITE;
461 if (is_queued()) {
462 connection->center->dispatch_event_external(connection->write_handler);
463 }
464 connection->write_lock.unlock();
465 connection->maybe_start_delay_thread();
466
467 state = OPENED;
468 return wait_message();
469 }
470
471 CtPtr ProtocolV1::wait_message() {
472 if (state != OPENED) { // must have changed due to a replace
473 return nullptr;
474 }
475
476 ldout(cct, 20) << __func__ << dendl;
477
478 return READ(sizeof(char), handle_message);
479 }
480
481 CtPtr ProtocolV1::handle_message(char *buffer, int r) {
482 ldout(cct, 20) << __func__ << " r=" << r << dendl;
483
484 if (r < 0) {
485 ldout(cct, 1) << __func__ << " read tag failed" << dendl;
486 return _fault();
487 }
488
489 char tag = buffer[0];
490 ldout(cct, 20) << __func__ << " process tag " << (int)tag << dendl;
491
492 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
493 ldout(cct, 20) << __func__ << " got KEEPALIVE" << dendl;
494 connection->set_last_keepalive(ceph_clock_now());
495 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
496 return READ(sizeof(ceph_timespec), handle_keepalive2);
497 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
498 return READ(sizeof(ceph_timespec), handle_keepalive2_ack);
499 } else if (tag == CEPH_MSGR_TAG_ACK) {
500 return READ(sizeof(ceph_le64), handle_tag_ack);
501 } else if (tag == CEPH_MSGR_TAG_MSG) {
502 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
503 ltt_recv_stamp = ceph_clock_now();
504 #endif
505 recv_stamp = ceph_clock_now();
506 ldout(cct, 20) << __func__ << " begin MSG" << dendl;
507 return READ(sizeof(ceph_msg_header), handle_message_header);
508 } else if (tag == CEPH_MSGR_TAG_CLOSE) {
509 ldout(cct, 20) << __func__ << " got CLOSE" << dendl;
510 stop();
511 } else {
512 ldout(cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
513 return _fault();
514 }
515 return nullptr;
516 }
517
518 CtPtr ProtocolV1::handle_keepalive2(char *buffer, int r) {
519 ldout(cct, 20) << __func__ << " r=" << r << dendl;
520
521 if (r < 0) {
522 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
523 return _fault();
524 }
525
526 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
527
528 ceph_timespec *t;
529 t = (ceph_timespec *)buffer;
530 utime_t kp_t = utime_t(*t);
531 connection->write_lock.lock();
532 append_keepalive_or_ack(true, &kp_t);
533 connection->write_lock.unlock();
534
535 ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
536 connection->set_last_keepalive(ceph_clock_now());
537
538 if (is_connected()) {
539 connection->center->dispatch_event_external(connection->write_handler);
540 }
541
542 return CONTINUE(wait_message);
543 }
544
545 void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
546 ldout(cct, 10) << __func__ << dendl;
547 if (ack) {
548 ceph_assert(tp);
549 struct ceph_timespec ts;
550 tp->encode_timeval(&ts);
551 connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
552 connection->outcoming_bl.append((char *)&ts, sizeof(ts));
553 } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
554 struct ceph_timespec ts;
555 utime_t t = ceph_clock_now();
556 t.encode_timeval(&ts);
557 connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
558 connection->outcoming_bl.append((char *)&ts, sizeof(ts));
559 } else {
560 connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
561 }
562 }
563
564 CtPtr ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
565 ldout(cct, 20) << __func__ << " r=" << r << dendl;
566
567 if (r < 0) {
568 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
569 return _fault();
570 }
571
572 ceph_timespec *t;
573 t = (ceph_timespec *)buffer;
574 connection->set_last_keepalive_ack(utime_t(*t));
575 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
576
577 return CONTINUE(wait_message);
578 }
579
580 CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
581 ldout(cct, 20) << __func__ << " r=" << r << dendl;
582
583 if (r < 0) {
584 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
585 return _fault();
586 }
587
588 ceph_le64 seq;
589 seq = *(ceph_le64 *)buffer;
590 ldout(cct, 20) << __func__ << " got ACK" << dendl;
591
592 ldout(cct, 15) << __func__ << " got ack seq " << seq << dendl;
593 // trim sent list
594 static const int max_pending = 128;
595 int i = 0;
596 Message *pending[max_pending];
597 connection->write_lock.lock();
598 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
599 Message *m = sent.front();
600 sent.pop_front();
601 pending[i++] = m;
602 ldout(cct, 10) << __func__ << " got ack seq " << seq
603 << " >= " << m->get_seq() << " on " << m << " " << *m
604 << dendl;
605 }
606 connection->write_lock.unlock();
607 for (int k = 0; k < i; k++) {
608 pending[k]->put();
609 }
610
611 return CONTINUE(wait_message);
612 }
613
614 CtPtr ProtocolV1::handle_message_header(char *buffer, int r) {
615 ldout(cct, 20) << __func__ << " r=" << r << dendl;
616
617 if (r < 0) {
618 ldout(cct, 1) << __func__ << " read message header failed" << dendl;
619 return _fault();
620 }
621
622 ldout(cct, 20) << __func__ << " got MSG header" << dendl;
623
624 current_header = *((ceph_msg_header *)buffer);
625
626 ldout(cct, 20) << __func__ << " got envelope type=" << current_header.type << " src "
627 << entity_name_t(current_header.src) << " front=" << current_header.front_len
628 << " data=" << current_header.data_len << " off " << current_header.data_off
629 << dendl;
630
631 if (messenger->crcflags & MSG_CRC_HEADER) {
632 __u32 header_crc = 0;
633 header_crc = ceph_crc32c(0, (unsigned char *)&current_header,
634 sizeof(current_header) - sizeof(current_header.crc));
635 // verify header crc
636 if (header_crc != current_header.crc) {
637 ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
638 << " != " << current_header.crc << dendl;
639 return _fault();
640 }
641 }
642
643 // Reset state
644 data_buf.clear();
645 front.clear();
646 middle.clear();
647 data.clear();
648
649 state = THROTTLE_MESSAGE;
650 return CONTINUE(throttle_message);
651 }
652
653 CtPtr ProtocolV1::throttle_message() {
654 ldout(cct, 20) << __func__ << dendl;
655
656 if (connection->policy.throttler_messages) {
657 ldout(cct, 10) << __func__ << " wants " << 1
658 << " message from policy throttler "
659 << connection->policy.throttler_messages->get_current()
660 << "/" << connection->policy.throttler_messages->get_max()
661 << dendl;
662 if (!connection->policy.throttler_messages->get_or_fail()) {
663 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
664 << connection->policy.throttler_messages->get_current()
665 << "/" << connection->policy.throttler_messages->get_max()
666 << " failed, just wait." << dendl;
667 // following thread pool deal with th full message queue isn't a
668 // short time, so we can wait a ms.
669 if (connection->register_time_events.empty()) {
670 connection->register_time_events.insert(
671 connection->center->create_time_event(1000,
672 connection->wakeup_handler));
673 }
674 return nullptr;
675 }
676 }
677
678 state = THROTTLE_BYTES;
679 return CONTINUE(throttle_bytes);
680 }
681
682 CtPtr ProtocolV1::throttle_bytes() {
683 ldout(cct, 20) << __func__ << dendl;
684
685 cur_msg_size = current_header.front_len + current_header.middle_len +
686 current_header.data_len;
687 if (cur_msg_size) {
688 if (connection->policy.throttler_bytes) {
689 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
690 << " bytes from policy throttler "
691 << connection->policy.throttler_bytes->get_current() << "/"
692 << connection->policy.throttler_bytes->get_max() << dendl;
693 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
694 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
695 << " bytes from policy throttler "
696 << connection->policy.throttler_bytes->get_current()
697 << "/" << connection->policy.throttler_bytes->get_max()
698 << " failed, just wait." << dendl;
699 // following thread pool deal with th full message queue isn't a
700 // short time, so we can wait a ms.
701 if (connection->register_time_events.empty()) {
702 connection->register_time_events.insert(
703 connection->center->create_time_event(
704 1000, connection->wakeup_handler));
705 }
706 return nullptr;
707 }
708 }
709 }
710
711 state = THROTTLE_DISPATCH_QUEUE;
712 return CONTINUE(throttle_dispatch_queue);
713 }
714
715 CtPtr ProtocolV1::throttle_dispatch_queue() {
716 ldout(cct, 20) << __func__ << dendl;
717
718 if (cur_msg_size) {
719 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
720 cur_msg_size)) {
721 ldout(cct, 10)
722 << __func__ << " wants " << cur_msg_size
723 << " bytes from dispatch throttle "
724 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
725 << connection->dispatch_queue->dispatch_throttler.get_max()
726 << " failed, just wait." << dendl;
727 // following thread pool deal with th full message queue isn't a
728 // short time, so we can wait a ms.
729 if (connection->register_time_events.empty()) {
730 connection->register_time_events.insert(
731 connection->center->create_time_event(1000,
732 connection->wakeup_handler));
733 }
734 return nullptr;
735 }
736 }
737
738 throttle_stamp = ceph_clock_now();
739
740 state = READ_MESSAGE_FRONT;
741 return read_message_front();
742 }
743
744 CtPtr ProtocolV1::read_message_front() {
745 ldout(cct, 20) << __func__ << dendl;
746
747 unsigned front_len = current_header.front_len;
748 if (front_len) {
749 if (!front.length()) {
750 front.push_back(buffer::create(front_len));
751 }
752 return READB(front_len, front.c_str(), handle_message_front);
753 }
754 return read_message_middle();
755 }
756
757 CtPtr ProtocolV1::handle_message_front(char *buffer, int r) {
758 ldout(cct, 20) << __func__ << " r=" << r << dendl;
759
760 if (r < 0) {
761 ldout(cct, 1) << __func__ << " read message front failed" << dendl;
762 return _fault();
763 }
764
765 ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
766
767 return read_message_middle();
768 }
769
770 CtPtr ProtocolV1::read_message_middle() {
771 ldout(cct, 20) << __func__ << dendl;
772
773 if (current_header.middle_len) {
774 if (!middle.length()) {
775 middle.push_back(buffer::create(current_header.middle_len));
776 }
777 return READB(current_header.middle_len, middle.c_str(),
778 handle_message_middle);
779 }
780
781 return read_message_data_prepare();
782 }
783
784 CtPtr ProtocolV1::handle_message_middle(char *buffer, int r) {
785 ldout(cct, 20) << __func__ << " r" << r << dendl;
786
787 if (r < 0) {
788 ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
789 return _fault();
790 }
791
792 ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
793
794 return read_message_data_prepare();
795 }
796
797 CtPtr ProtocolV1::read_message_data_prepare() {
798 ldout(cct, 20) << __func__ << dendl;
799
800 unsigned data_len = le32_to_cpu(current_header.data_len);
801 unsigned data_off = le32_to_cpu(current_header.data_off);
802
803 if (data_len) {
804 // get a buffer
805 #if 0
806 // rx_buffers is broken by design... see
807 // http://tracker.ceph.com/issues/22480
808 map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
809 connection->rx_buffers.find(current_header.tid);
810 if (p != connection->rx_buffers.end()) {
811 ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
812 << " at offset " << data_off << " len "
813 << p->second.first.length() << dendl;
814 data_buf = p->second.first;
815 // make sure it's big enough
816 if (data_buf.length() < data_len)
817 data_buf.push_back(buffer::create(data_len - data_buf.length()));
818 data_blp = data_buf.begin();
819 } else {
820 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
821 << data_off << dendl;
822 alloc_aligned_buffer(data_buf, data_len, data_off);
823 data_blp = data_buf.begin();
824 }
825 #else
826 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
827 << data_off << dendl;
828 alloc_aligned_buffer(data_buf, data_len, data_off);
829 data_blp = data_buf.begin();
830 #endif
831 }
832
833 msg_left = data_len;
834
835 return CONTINUE(read_message_data);
836 }
837
838 CtPtr ProtocolV1::read_message_data() {
839 ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
840
841 if (msg_left > 0) {
842 bufferptr bp = data_blp.get_current_ptr();
843 unsigned read_len = std::min(bp.length(), msg_left);
844
845 return READB(read_len, bp.c_str(), handle_message_data);
846 }
847
848 return read_message_footer();
849 }
850
851 CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
852 ldout(cct, 20) << __func__ << " r=" << r << dendl;
853
854 if (r < 0) {
855 ldout(cct, 1) << __func__ << " read data error " << dendl;
856 return _fault();
857 }
858
859 bufferptr bp = data_blp.get_current_ptr();
860 unsigned read_len = std::min(bp.length(), msg_left);
861 ceph_assert(read_len < std::numeric_limits<int>::max());
862 data_blp.advance(read_len);
863 data.append(bp, 0, read_len);
864 msg_left -= read_len;
865
866 return CONTINUE(read_message_data);
867 }
868
869 CtPtr ProtocolV1::read_message_footer() {
870 ldout(cct, 20) << __func__ << dendl;
871
872 state = READ_FOOTER_AND_DISPATCH;
873
874 unsigned len;
875 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
876 len = sizeof(ceph_msg_footer);
877 } else {
878 len = sizeof(ceph_msg_footer_old);
879 }
880
881 return READ(len, handle_message_footer);
882 }
883
884 CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
885 ldout(cct, 20) << __func__ << " r=" << r << dendl;
886
887 if (r < 0) {
888 ldout(cct, 1) << __func__ << " read footer data error " << dendl;
889 return _fault();
890 }
891
892 ceph_msg_footer footer;
893 ceph_msg_footer_old old_footer;
894
895 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
896 footer = *((ceph_msg_footer *)buffer);
897 } else {
898 old_footer = *((ceph_msg_footer_old *)buffer);
899 footer.front_crc = old_footer.front_crc;
900 footer.middle_crc = old_footer.middle_crc;
901 footer.data_crc = old_footer.data_crc;
902 footer.sig = 0;
903 footer.flags = old_footer.flags;
904 }
905
906 int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
907 ldout(cct, 10) << __func__ << " aborted = " << aborted << dendl;
908 if (aborted) {
909 ldout(cct, 0) << __func__ << " got " << front.length() << " + "
910 << middle.length() << " + " << data.length()
911 << " byte message.. ABORTED" << dendl;
912 return _fault();
913 }
914
915 ldout(cct, 20) << __func__ << " got " << front.length() << " + "
916 << middle.length() << " + " << data.length() << " byte message"
917 << dendl;
918 Message *message = decode_message(cct, messenger->crcflags, current_header,
919 footer, front, middle, data, connection);
920 if (!message) {
921 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
922 return _fault();
923 }
924
925 //
926 // Check the signature if one should be present. A zero return indicates
927 // success. PLR
928 //
929
930 if (session_security.get() == NULL) {
931 ldout(cct, 10) << __func__ << " no session security set" << dendl;
932 } else {
933 if (session_security->check_message_signature(message)) {
934 ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
935 message->put();
936 return _fault();
937 }
938 }
939 message->set_byte_throttler(connection->policy.throttler_bytes);
940 message->set_message_throttler(connection->policy.throttler_messages);
941
942 // store reservation size in message, so we don't get confused
943 // by messages entering the dispatch queue through other paths.
944 message->set_dispatch_throttle_size(cur_msg_size);
945
946 message->set_recv_stamp(recv_stamp);
947 message->set_throttle_stamp(throttle_stamp);
948 message->set_recv_complete_stamp(ceph_clock_now());
949
950 // check received seq#. if it is old, drop the message.
951 // note that incoming messages may skip ahead. this is convenient for the
952 // client side queueing because messages can't be renumbered, but the (kernel)
953 // client will occasionally pull a message out of the sent queue to send
954 // elsewhere. in that case it doesn't matter if we "got" it or not.
955 uint64_t cur_seq = in_seq;
956 if (message->get_seq() <= cur_seq) {
957 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
958 << " <= " << cur_seq << " " << message << " " << *message
959 << ", discarding" << dendl;
960 message->put();
961 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
962 cct->_conf->ms_die_on_old_message) {
963 ceph_assert(0 == "old msgs despite reconnect_seq feature");
964 }
965 return nullptr;
966 }
967 if (message->get_seq() > cur_seq + 1) {
968 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
969 << cur_seq << " to " << message->get_seq() << dendl;
970 if (cct->_conf->ms_die_on_skipped_message) {
971 ceph_assert(0 == "skipped incoming seq");
972 }
973 }
974
975 message->set_connection(connection);
976
977 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
978 if (message->get_type() == CEPH_MSG_OSD_OP ||
979 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
980 utime_t ltt_processed_stamp = ceph_clock_now();
981 double usecs_elapsed =
982 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
983 ostringstream buf;
984 if (message->get_type() == CEPH_MSG_OSD_OP)
985 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
986 false);
987 else
988 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
989 false);
990 }
991 #endif
992
993 // note last received message.
994 in_seq = message->get_seq();
995 ldout(cct, 5) << " rx " << message->get_source() << " seq "
996 << message->get_seq() << " " << message << " " << *message
997 << dendl;
998
999 bool need_dispatch_writer = false;
1000 if (!connection->policy.lossy) {
1001 ack_left++;
1002 need_dispatch_writer = true;
1003 }
1004
1005 state = OPENED;
1006
1007 connection->logger->inc(l_msgr_recv_messages);
1008 connection->logger->inc(
1009 l_msgr_recv_bytes,
1010 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1011
1012 messenger->ms_fast_preprocess(message);
1013 auto fast_dispatch_time = ceph::mono_clock::now();
1014 connection->logger->tinc(l_msgr_running_recv_time,
1015 fast_dispatch_time - connection->recv_start_time);
1016 if (connection->delay_state) {
1017 double delay_period = 0;
1018 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1019 delay_period =
1020 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1021 ldout(cct, 1) << "queue_received will delay after "
1022 << (ceph_clock_now() + delay_period) << " on " << message
1023 << " " << *message << dendl;
1024 }
1025 connection->delay_state->queue(delay_period, message);
1026 } else if (messenger->ms_can_fast_dispatch(message)) {
1027 connection->lock.unlock();
1028 connection->dispatch_queue->fast_dispatch(message);
1029 connection->recv_start_time = ceph::mono_clock::now();
1030 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1031 connection->recv_start_time - fast_dispatch_time);
1032 connection->lock.lock();
1033 } else {
1034 connection->dispatch_queue->enqueue(message, message->get_priority(),
1035 connection->conn_id);
1036 }
1037
1038 // clean up local buffer references
1039 data_buf.clear();
1040 front.clear();
1041 middle.clear();
1042 data.clear();
1043
1044 if (need_dispatch_writer && connection->is_connected()) {
1045 connection->center->dispatch_event_external(connection->write_handler);
1046 }
1047
1048 return CONTINUE(wait_message);
1049 }
1050
1051 void ProtocolV1::session_reset() {
1052 ldout(cct, 10) << __func__ << " started" << dendl;
1053
1054 std::lock_guard<std::mutex> l(connection->write_lock);
1055 if (connection->delay_state) {
1056 connection->delay_state->discard();
1057 }
1058
1059 connection->dispatch_queue->discard_queue(connection->conn_id);
1060 discard_out_queue();
1061 // note: we need to clear outcoming_bl here, but session_reset may be
1062 // called by other thread, so let caller clear this itself!
1063 // outcoming_bl.clear();
1064
1065 connection->dispatch_queue->queue_remote_reset(connection);
1066
1067 randomize_out_seq();
1068
1069 in_seq = 0;
1070 connect_seq = 0;
1071 // it's safe to directly set 0, double locked
1072 ack_left = 0;
1073 once_ready = false;
1074 can_write = WriteStatus::NOWRITE;
1075 }
1076
1077 void ProtocolV1::randomize_out_seq() {
1078 if (connection->get_features() & CEPH_FEATURE_MSG_AUTH) {
1079 // Set out_seq to a random value, so CRC won't be predictable.
1080 auto rand_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
1081 ldout(cct, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
1082 out_seq = rand_seq;
1083 } else {
1084 // previously, seq #'s always started at 0.
1085 out_seq = 0;
1086 }
1087 }
1088
1089 ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
1090 FUNCTRACE(cct);
1091 ceph_assert(connection->center->in_thread());
1092 m->set_seq(++out_seq);
1093
1094 if (messenger->crcflags & MSG_CRC_HEADER) {
1095 m->calc_header_crc();
1096 }
1097
1098 ceph_msg_header &header = m->get_header();
1099 ceph_msg_footer &footer = m->get_footer();
1100
1101 // TODO: let sign_message could be reentry?
1102 // Now that we have all the crcs calculated, handle the
1103 // digital signature for the message, if the AsyncConnection has session
1104 // security set up. Some session security options do not
1105 // actually calculate and check the signature, but they should
1106 // handle the calls to sign_message and check_signature. PLR
1107 if (session_security.get() == NULL) {
1108 ldout(cct, 20) << __func__ << " no session security" << dendl;
1109 } else {
1110 if (session_security->sign_message(m)) {
1111 ldout(cct, 20) << __func__ << " failed to sign m=" << m
1112 << "): sig = " << footer.sig << dendl;
1113 } else {
1114 ldout(cct, 20) << __func__ << " signed m=" << m
1115 << "): sig = " << footer.sig << dendl;
1116 }
1117 }
1118
1119 connection->outcoming_bl.append(CEPH_MSGR_TAG_MSG);
1120 connection->outcoming_bl.append((char *)&header, sizeof(header));
1121
1122 ldout(cct, 20) << __func__ << " sending message type=" << header.type
1123 << " src " << entity_name_t(header.src)
1124 << " front=" << header.front_len << " data=" << header.data_len
1125 << " off " << header.data_off << dendl;
1126
1127 if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
1128 for (const auto &pb : bl.buffers()) {
1129 connection->outcoming_bl.append((char *)pb.c_str(), pb.length());
1130 }
1131 } else {
1132 connection->outcoming_bl.claim_append(bl);
1133 }
1134
1135 // send footer; if receiver doesn't support signatures, use the old footer
1136 // format
1137 ceph_msg_footer_old old_footer;
1138 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
1139 connection->outcoming_bl.append((char *)&footer, sizeof(footer));
1140 } else {
1141 if (messenger->crcflags & MSG_CRC_HEADER) {
1142 old_footer.front_crc = footer.front_crc;
1143 old_footer.middle_crc = footer.middle_crc;
1144 old_footer.data_crc = footer.data_crc;
1145 } else {
1146 old_footer.front_crc = old_footer.middle_crc = 0;
1147 }
1148 old_footer.data_crc =
1149 messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
1150 old_footer.flags = footer.flags;
1151 connection->outcoming_bl.append((char *)&old_footer, sizeof(old_footer));
1152 }
1153
1154 m->trace.event("async writing message");
1155 ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
1156 << dendl;
1157 ssize_t total_send_size = connection->outcoming_bl.length();
1158 ssize_t rc = connection->_try_send(more);
1159 if (rc < 0) {
1160 ldout(cct, 1) << __func__ << " error sending " << m << ", "
1161 << cpp_strerror(rc) << dendl;
1162 } else {
1163 connection->logger->inc(
1164 l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
1165 ldout(cct, 10) << __func__ << " sending " << m
1166 << (rc ? " continuely." : " done.") << dendl;
1167 }
1168 if (m->get_type() == CEPH_MSG_OSD_OP)
1169 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
1170 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1171 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
1172 m->put();
1173
1174 return rc;
1175 }
1176
1177 void ProtocolV1::requeue_sent() {
1178 if (sent.empty()) {
1179 return;
1180 }
1181
1182 list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1183 out_seq -= sent.size();
1184 while (!sent.empty()) {
1185 Message *m = sent.back();
1186 sent.pop_back();
1187 ldout(cct, 10) << __func__ << " " << *m << " for resend "
1188 << " (" << m->get_seq() << ")" << dendl;
1189 rq.push_front(make_pair(bufferlist(), m));
1190 }
1191 }
1192
1193 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
1194 ldout(cct, 10) << __func__ << " " << seq << dendl;
1195 std::lock_guard<std::mutex> l(connection->write_lock);
1196 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
1197 return seq;
1198 }
1199 list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1200 uint64_t count = out_seq;
1201 while (!rq.empty()) {
1202 pair<bufferlist, Message *> p = rq.front();
1203 if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
1204 ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
1205 << p.second->get_seq() << " <= " << seq << ", discarding"
1206 << dendl;
1207 p.second->put();
1208 rq.pop_front();
1209 count++;
1210 }
1211 if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1212 return count;
1213 }
1214
1215 /*
1216 * Tears down the message queues, and removes them from the
1217 * DispatchQueue Must hold write_lock prior to calling.
1218 */
1219 void ProtocolV1::discard_out_queue() {
1220 ldout(cct, 10) << __func__ << " started" << dendl;
1221
1222 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
1223 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
1224 (*p)->put();
1225 }
1226 sent.clear();
1227 for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
1228 out_q.begin();
1229 p != out_q.end(); ++p) {
1230 for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
1231 r != p->second.end(); ++r) {
1232 ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
1233 r->second->put();
1234 }
1235 }
1236 out_q.clear();
1237 }
1238
1239 void ProtocolV1::reset_recv_state() {
1240 // clean up state internal variables and states
1241 if (state == CONNECTING_SEND_CONNECT_MSG) {
1242 if (authorizer) {
1243 delete authorizer;
1244 }
1245 authorizer = nullptr;
1246 }
1247
1248 // clean read and write callbacks
1249 connection->pendingReadLen.reset();
1250 connection->writeCallback.reset();
1251
1252 if (state > THROTTLE_MESSAGE && state <= READ_FOOTER_AND_DISPATCH &&
1253 connection->policy.throttler_messages) {
1254 ldout(cct, 10) << __func__ << " releasing " << 1
1255 << " message to policy throttler "
1256 << connection->policy.throttler_messages->get_current()
1257 << "/" << connection->policy.throttler_messages->get_max()
1258 << dendl;
1259 connection->policy.throttler_messages->put();
1260 }
1261 if (state > THROTTLE_BYTES && state <= READ_FOOTER_AND_DISPATCH) {
1262 if (connection->policy.throttler_bytes) {
1263 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
1264 << " bytes to policy throttler "
1265 << connection->policy.throttler_bytes->get_current() << "/"
1266 << connection->policy.throttler_bytes->get_max() << dendl;
1267 connection->policy.throttler_bytes->put(cur_msg_size);
1268 }
1269 }
1270 if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_FOOTER_AND_DISPATCH) {
1271 ldout(cct, 10)
1272 << __func__ << " releasing " << cur_msg_size
1273 << " bytes to dispatch_queue throttler "
1274 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1275 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
1276 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
1277 }
1278 }
1279
1280 Message *ProtocolV1::_get_next_outgoing(bufferlist *bl) {
1281 Message *m = 0;
1282 if (!out_q.empty()) {
1283 map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
1284 out_q.rbegin();
1285 ceph_assert(!it->second.empty());
1286 list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
1287 m = p->second;
1288 if (bl) bl->swap(p->first);
1289 it->second.erase(p);
1290 if (it->second.empty()) out_q.erase(it->first);
1291 }
1292 return m;
1293 }
1294
1295 /**
1296 * Client Protocol V1
1297 **/
1298
1299 CtPtr ProtocolV1::send_client_banner() {
1300 ldout(cct, 20) << __func__ << dendl;
1301 state = CONNECTING;
1302
1303 bufferlist bl;
1304 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1305 return WRITE(bl, handle_client_banner_write);
1306 }
1307
1308 CtPtr ProtocolV1::handle_client_banner_write(int r) {
1309 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1310
1311 if (r < 0) {
1312 ldout(cct, 1) << __func__ << " write client banner failed" << dendl;
1313 return _fault();
1314 }
1315 ldout(cct, 10) << __func__ << " connect write banner done: "
1316 << connection->get_peer_addr() << dendl;
1317
1318 return wait_server_banner();
1319 }
1320
1321 CtPtr ProtocolV1::wait_server_banner() {
1322 state = CONNECTING_WAIT_BANNER_AND_IDENTIFY;
1323
1324 ldout(cct, 20) << __func__ << dendl;
1325
1326 bufferlist myaddrbl;
1327 unsigned banner_len = strlen(CEPH_BANNER);
1328 unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
1329 return READ(need_len, handle_server_banner_and_identify);
1330 }
1331
1332 CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
1333 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1334
1335 if (r < 0) {
1336 ldout(cct, 1) << __func__ << " read banner and identify addresses failed"
1337 << dendl;
1338 return _fault();
1339 }
1340
1341 unsigned banner_len = strlen(CEPH_BANNER);
1342 if (memcmp(buffer, CEPH_BANNER, banner_len)) {
1343 ldout(cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
1344 << connection->get_peer_addr() << dendl;
1345 return _fault();
1346 }
1347
1348 bufferlist bl;
1349 entity_addr_t paddr, peer_addr_for_me;
1350
1351 bl.append(buffer + banner_len, sizeof(ceph_entity_addr) * 2);
1352 auto p = bl.cbegin();
1353 try {
1354 decode(paddr, p);
1355 decode(peer_addr_for_me, p);
1356 } catch (const buffer::error &e) {
1357 lderr(cct) << __func__ << " decode peer addr failed " << dendl;
1358 return _fault();
1359 }
1360 ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
1361 << " on socket " << connection->cs.fd() << dendl;
1362
1363 entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
1364 if (peer_addr != paddr) {
1365 if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
1366 peer_addr.get_nonce() == paddr.get_nonce()) {
1367 ldout(cct, 0) << __func__ << " connect claims to be " << paddr << " not "
1368 << peer_addr << " - presumably this is the same node!"
1369 << dendl;
1370 } else {
1371 ldout(cct, 10) << __func__ << " connect claims to be " << paddr << " not "
1372 << peer_addr << dendl;
1373 return _fault();
1374 }
1375 }
1376
1377 ldout(cct, 20) << __func__ << " connect peer addr for me is "
1378 << peer_addr_for_me << dendl;
1379 if (messenger->get_myaddrs().empty() ||
1380 messenger->get_myaddrs().front().is_blank_ip()) {
1381 sockaddr_storage ss;
1382 socklen_t len = sizeof(ss);
1383 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
1384 entity_addr_t a;
1385 if (cct->_conf->ms_learn_addr_from_peer) {
1386 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
1387 << " says I am " << peer_addr_for_me << " (socket says "
1388 << (sockaddr*)&ss << ")" << dendl;
1389 a = peer_addr_for_me;
1390 } else {
1391 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
1392 << " says I am " << (sockaddr*)&ss
1393 << " (peer says " << peer_addr_for_me << ")" << dendl;
1394 a.set_sockaddr((sockaddr *)&ss);
1395 }
1396 a.set_type(entity_addr_t::TYPE_LEGACY); // anything but NONE; learned_addr ignores this
1397 a.set_port(0);
1398 connection->lock.unlock();
1399 messenger->learned_addr(a);
1400 if (cct->_conf->ms_inject_internal_delays &&
1401 cct->_conf->ms_inject_socket_failures) {
1402 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1403 ldout(cct, 10) << __func__ << " sleep for "
1404 << cct->_conf->ms_inject_internal_delays << dendl;
1405 utime_t t;
1406 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1407 t.sleep();
1408 }
1409 }
1410 connection->lock.lock();
1411 if (state != CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1412 ldout(cct, 1) << __func__
1413 << " state changed while learned_addr, mark_down or "
1414 << " replacing must be happened just now" << dendl;
1415 return nullptr;
1416 }
1417 }
1418
1419 bufferlist myaddrbl;
1420 encode(messenger->get_myaddr_legacy(), myaddrbl, 0); // legacy
1421 return WRITE(myaddrbl, handle_my_addr_write);
1422 }
1423
1424 CtPtr ProtocolV1::handle_my_addr_write(int r) {
1425 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1426
1427 if (r < 0) {
1428 ldout(cct, 2) << __func__ << " connect couldn't write my addr, "
1429 << cpp_strerror(r) << dendl;
1430 return _fault();
1431 }
1432 ldout(cct, 10) << __func__ << " connect sent my addr "
1433 << messenger->get_myaddr_legacy() << dendl;
1434
1435 return CONTINUE(send_connect_message);
1436 }
1437
1438 CtPtr ProtocolV1::send_connect_message() {
1439 state = CONNECTING_SEND_CONNECT_MSG;
1440
1441 ldout(cct, 20) << __func__ << dendl;
1442
1443 if (!authorizer) {
1444 authorizer = messenger->ms_deliver_get_authorizer(connection->peer_type);
1445 }
1446
1447 ceph_msg_connect connect;
1448 connect.features = connection->policy.features_supported;
1449 connect.host_type = messenger->get_myname().type();
1450 connect.global_seq = global_seq;
1451 connect.connect_seq = connect_seq;
1452 connect.protocol_version =
1453 messenger->get_proto_version(connection->peer_type, true);
1454 connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1455 connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1456
1457 if (authorizer) {
1458 ldout(cct, 10) << __func__
1459 << " connect_msg.authorizer_len=" << connect.authorizer_len
1460 << " protocol=" << connect.authorizer_protocol << dendl;
1461 }
1462
1463 connect.flags = 0;
1464 if (connection->policy.lossy) {
1465 connect.flags |=
1466 CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1467 }
1468
1469 bufferlist bl;
1470 bl.append((char *)&connect, sizeof(connect));
1471 if (authorizer) {
1472 bl.append(authorizer->bl.c_str(), authorizer->bl.length());
1473 }
1474
1475 ldout(cct, 10) << __func__ << " connect sending gseq=" << global_seq
1476 << " cseq=" << connect_seq
1477 << " proto=" << connect.protocol_version << dendl;
1478
1479 return WRITE(bl, handle_connect_message_write);
1480 }
1481
1482 CtPtr ProtocolV1::handle_connect_message_write(int r) {
1483 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1484
1485 if (r < 0) {
1486 ldout(cct, 2) << __func__ << " connect couldn't send reply "
1487 << cpp_strerror(r) << dendl;
1488 return _fault();
1489 }
1490
1491 ldout(cct, 20) << __func__
1492 << " connect wrote (self +) cseq, waiting for reply" << dendl;
1493
1494 return wait_connect_reply();
1495 }
1496
1497 CtPtr ProtocolV1::wait_connect_reply() {
1498 ldout(cct, 20) << __func__ << dendl;
1499
1500 memset(&connect_reply, 0, sizeof(connect_reply));
1501 return READ(sizeof(connect_reply), handle_connect_reply_1);
1502 }
1503
1504 CtPtr ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
1505 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1506
1507 if (r < 0) {
1508 ldout(cct, 1) << __func__ << " read connect reply failed" << dendl;
1509 return _fault();
1510 }
1511
1512 connect_reply = *((ceph_msg_connect_reply *)buffer);
1513
1514 ldout(cct, 20) << __func__ << " connect got reply tag "
1515 << (int)connect_reply.tag << " connect_seq "
1516 << connect_reply.connect_seq << " global_seq "
1517 << connect_reply.global_seq << " proto "
1518 << connect_reply.protocol_version << " flags "
1519 << (int)connect_reply.flags << " features "
1520 << connect_reply.features << dendl;
1521
1522 if (connect_reply.authorizer_len) {
1523 return wait_connect_reply_auth();
1524 }
1525
1526 return handle_connect_reply_2();
1527 }
1528
1529 CtPtr ProtocolV1::wait_connect_reply_auth() {
1530 ldout(cct, 20) << __func__ << dendl;
1531
1532 ldout(cct, 10) << __func__
1533 << " reply.authorizer_len=" << connect_reply.authorizer_len
1534 << dendl;
1535
1536 ceph_assert(connect_reply.authorizer_len < 4096);
1537
1538 return READ(connect_reply.authorizer_len, handle_connect_reply_auth);
1539 }
1540
1541 CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
1542 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1543
1544 if (r < 0) {
1545 ldout(cct, 1) << __func__ << " read connect reply authorizer failed"
1546 << dendl;
1547 return _fault();
1548 }
1549
1550 bufferlist authorizer_reply;
1551 authorizer_reply.append(buffer, connect_reply.authorizer_len);
1552
1553 if (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
1554 ldout(cct, 10) << __func__ << " connect got auth challenge" << dendl;
1555 authorizer->add_challenge(cct, authorizer_reply);
1556 return CONTINUE(send_connect_message);
1557 }
1558
1559 auto iter = authorizer_reply.cbegin();
1560 if (authorizer && !authorizer->verify_reply(iter,
1561 nullptr /* connection_secret */)) {
1562 ldout(cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
1563 return _fault();
1564 }
1565
1566 return handle_connect_reply_2();
1567 }
1568
1569 CtPtr ProtocolV1::handle_connect_reply_2() {
1570 ldout(cct, 20) << __func__ << dendl;
1571
1572 if (connect_reply.tag == CEPH_MSGR_TAG_FEATURES) {
1573 ldout(cct, 0) << __func__ << " connect protocol feature mismatch, my "
1574 << std::hex << connection->policy.features_supported
1575 << " < peer " << connect_reply.features << " missing "
1576 << (connect_reply.features &
1577 ~connection->policy.features_supported)
1578 << std::dec << dendl;
1579 return _fault();
1580 }
1581
1582 if (connect_reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1583 ldout(cct, 0) << __func__ << " connect protocol version mismatch, my "
1584 << messenger->get_proto_version(connection->peer_type, true)
1585 << " != " << connect_reply.protocol_version << dendl;
1586 return _fault();
1587 }
1588
1589 if (connect_reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1590 ldout(cct, 0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1591 return _fault();
1592 }
1593
1594 if (connect_reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1595 ldout(cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1596 session_reset();
1597 connect_seq = 0;
1598
1599 // see session_reset
1600 connection->outcoming_bl.clear();
1601
1602 return CONTINUE(send_connect_message);
1603 }
1604
1605 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1606 global_seq = messenger->get_global_seq(connect_reply.global_seq);
1607 ldout(cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1608 << connect_reply.global_seq << " chose new " << global_seq
1609 << dendl;
1610 return CONTINUE(send_connect_message);
1611 }
1612
1613 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1614 ceph_assert(connect_reply.connect_seq > connect_seq);
1615 ldout(cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq
1616 << " -> " << connect_reply.connect_seq << dendl;
1617 connect_seq = connect_reply.connect_seq;
1618 return CONTINUE(send_connect_message);
1619 }
1620
1621 if (connect_reply.tag == CEPH_MSGR_TAG_WAIT) {
1622 ldout(cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1623 state = WAIT;
1624 return _fault();
1625 }
1626
1627 uint64_t feat_missing;
1628 feat_missing =
1629 connection->policy.features_required & ~(uint64_t)connect_reply.features;
1630 if (feat_missing) {
1631 ldout(cct, 1) << __func__ << " missing required features " << std::hex
1632 << feat_missing << std::dec << dendl;
1633 return _fault();
1634 }
1635
1636 if (connect_reply.tag == CEPH_MSGR_TAG_SEQ) {
1637 ldout(cct, 10)
1638 << __func__
1639 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1640 << dendl;
1641
1642 return wait_ack_seq();
1643 }
1644
1645 if (connect_reply.tag == CEPH_MSGR_TAG_READY) {
1646 ldout(cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1647 }
1648
1649 return client_ready();
1650 }
1651
1652 CtPtr ProtocolV1::wait_ack_seq() {
1653 ldout(cct, 20) << __func__ << dendl;
1654
1655 return READ(sizeof(uint64_t), handle_ack_seq);
1656 }
1657
1658 CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
1659 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1660
1661 if (r < 0) {
1662 ldout(cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1663 return _fault();
1664 }
1665
1666 uint64_t newly_acked_seq = 0;
1667
1668 newly_acked_seq = *((uint64_t *)buffer);
1669 ldout(cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1670 << " vs out_seq " << out_seq << dendl;
1671 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
1672
1673 bufferlist bl;
1674 uint64_t s = in_seq;
1675 bl.append((char *)&s, sizeof(s));
1676
1677 return WRITE(bl, handle_in_seq_write);
1678 }
1679
1680 CtPtr ProtocolV1::handle_in_seq_write(int r) {
1681 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1682
1683 if (r < 0) {
1684 ldout(cct, 10) << __func__ << " failed to send in_seq " << dendl;
1685 return _fault();
1686 }
1687
1688 ldout(cct, 10) << __func__ << " send in_seq done " << dendl;
1689
1690 return client_ready();
1691 }
1692
1693 CtPtr ProtocolV1::client_ready() {
1694 ldout(cct, 20) << __func__ << dendl;
1695
1696 // hooray!
1697 peer_global_seq = connect_reply.global_seq;
1698 connection->policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1699
1700 once_ready = true;
1701 connect_seq += 1;
1702 ceph_assert(connect_seq == connect_reply.connect_seq);
1703 backoff = utime_t();
1704 connection->set_features((uint64_t)connect_reply.features &
1705 (uint64_t)connection->policy.features_supported);
1706 ldout(cct, 10) << __func__ << " connect success " << connect_seq
1707 << ", lossy = " << connection->policy.lossy << ", features "
1708 << connection->get_features() << dendl;
1709
1710 // If we have an authorizer, get a new AuthSessionHandler to deal with
1711 // ongoing security of the connection. PLR
1712 if (authorizer != NULL) {
1713 ldout(cct, 10) << __func__ << " setting up session_security with auth "
1714 << authorizer << dendl;
1715 session_security.reset(get_auth_session_handler(
1716 cct, authorizer->protocol,
1717 authorizer->session_key,
1718 connection->get_features()));
1719 } else {
1720 // We have no authorizer, so we shouldn't be applying security to messages
1721 // in this AsyncConnection. PLR
1722 ldout(cct, 10) << __func__ << " no authorizer, clearing session_security"
1723 << dendl;
1724 session_security.reset();
1725 }
1726
1727 if (connection->delay_state) {
1728 ceph_assert(connection->delay_state->ready());
1729 }
1730 connection->dispatch_queue->queue_connect(connection);
1731 messenger->ms_deliver_handle_fast_connect(connection);
1732
1733 return ready();
1734 }
1735
1736 /**
1737 * Server Protocol V1
1738 **/
1739
1740 CtPtr ProtocolV1::send_server_banner() {
1741 ldout(cct, 20) << __func__ << dendl;
1742 state = ACCEPTING;
1743
1744 bufferlist bl;
1745
1746 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1747
1748 // as a server, we should have a legacy addr if we accepted this connection.
1749 auto legacy = messenger->get_myaddrs().legacy_addr();
1750 encode(legacy, bl, 0); // legacy
1751 connection->port = legacy.get_port();
1752 encode(connection->target_addr, bl, 0); // legacy
1753
1754 ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd()
1755 << " legacy " << legacy
1756 << " socket_addr " << connection->socket_addr
1757 << " target_addr " << connection->target_addr
1758 << dendl;
1759
1760 return WRITE(bl, handle_server_banner_write);
1761 }
1762
1763 CtPtr ProtocolV1::handle_server_banner_write(int r) {
1764 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1765
1766 if (r < 0) {
1767 ldout(cct, 1) << " write server banner failed" << dendl;
1768 return _fault();
1769 }
1770 ldout(cct, 10) << __func__ << " write banner and addr done: "
1771 << connection->get_peer_addr() << dendl;
1772
1773 return wait_client_banner();
1774 }
1775
1776 CtPtr ProtocolV1::wait_client_banner() {
1777 ldout(cct, 20) << __func__ << dendl;
1778
1779 return READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
1780 handle_client_banner);
1781 }
1782
1783 CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
1784 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1785
1786 if (r < 0) {
1787 ldout(cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1788 return _fault();
1789 }
1790
1791 if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1792 ldout(cct, 1) << __func__ << " accept peer sent bad banner '" << buffer
1793 << "' (should be '" << CEPH_BANNER << "')" << dendl;
1794 return _fault();
1795 }
1796
1797 bufferlist addr_bl;
1798 entity_addr_t peer_addr;
1799
1800 addr_bl.append(buffer + strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1801 try {
1802 auto ti = addr_bl.cbegin();
1803 decode(peer_addr, ti);
1804 } catch (const buffer::error &e) {
1805 lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
1806 return _fault();
1807 }
1808
1809 ldout(cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1810 if (peer_addr.is_blank_ip()) {
1811 // peer apparently doesn't know what ip they have; figure it out for them.
1812 int port = peer_addr.get_port();
1813 peer_addr.set_sockaddr(connection->target_addr.get_sockaddr());
1814 peer_addr.set_port(port);
1815
1816 ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1817 << " (socket is " << connection->target_addr << ")" << dendl;
1818 }
1819 connection->set_peer_addr(peer_addr); // so that connection_state gets set up
1820 connection->target_addr = peer_addr;
1821
1822 return CONTINUE(wait_connect_message);
1823 }
1824
1825 CtPtr ProtocolV1::wait_connect_message() {
1826 ldout(cct, 20) << __func__ << dendl;
1827
1828 memset(&connect_msg, 0, sizeof(connect_msg));
1829 return READ(sizeof(connect_msg), handle_connect_message_1);
1830 }
1831
1832 CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
1833 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1834
1835 if (r < 0) {
1836 ldout(cct, 1) << __func__ << " read connect msg failed" << dendl;
1837 return _fault();
1838 }
1839
1840 connect_msg = *((ceph_msg_connect *)buffer);
1841
1842 state = ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1843
1844 if (connect_msg.authorizer_len) {
1845 return wait_connect_message_auth();
1846 }
1847
1848 return handle_connect_message_2();
1849 }
1850
1851 CtPtr ProtocolV1::wait_connect_message_auth() {
1852 ldout(cct, 20) << __func__ << dendl;
1853 authorizer_buf.clear();
1854 authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
1855 return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
1856 handle_connect_message_auth);
1857 }
1858
1859 CtPtr ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
1860 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1861
1862 if (r < 0) {
1863 ldout(cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1864 return _fault();
1865 }
1866
1867 return handle_connect_message_2();
1868 }
1869
1870 CtPtr ProtocolV1::handle_connect_message_2() {
1871 ldout(cct, 20) << __func__ << dendl;
1872
1873 ldout(cct, 20) << __func__ << " accept got peer connect_seq "
1874 << connect_msg.connect_seq << " global_seq "
1875 << connect_msg.global_seq << dendl;
1876
1877 connection->set_peer_type(connect_msg.host_type);
1878 connection->policy = messenger->get_policy(connect_msg.host_type);
1879
1880 ldout(cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1881 << ", policy.lossy=" << connection->policy.lossy
1882 << " policy.server=" << connection->policy.server
1883 << " policy.standby=" << connection->policy.standby
1884 << " policy.resetcheck=" << connection->policy.resetcheck
1885 << " features 0x" << std::hex << (uint64_t)connect_msg.features
1886 << std::dec
1887 << dendl;
1888
1889 ceph_msg_connect_reply reply;
1890 bufferlist authorizer_reply;
1891
1892 memset(&reply, 0, sizeof(reply));
1893 reply.protocol_version =
1894 messenger->get_proto_version(connection->peer_type, false);
1895
1896 // mismatch?
1897 ldout(cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1898 << ", their proto " << connect_msg.protocol_version << dendl;
1899
1900 if (connect_msg.protocol_version != reply.protocol_version) {
1901 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
1902 authorizer_reply);
1903 }
1904
1905 // require signatures for cephx?
1906 if (connect_msg.authorizer_protocol == CEPH_AUTH_CEPHX) {
1907 if (connection->peer_type == CEPH_ENTITY_TYPE_OSD ||
1908 connection->peer_type == CEPH_ENTITY_TYPE_MDS) {
1909 if (cct->_conf->cephx_require_signatures ||
1910 cct->_conf->cephx_cluster_require_signatures) {
1911 ldout(cct, 10)
1912 << __func__
1913 << " using cephx, requiring MSG_AUTH feature bit for cluster"
1914 << dendl;
1915 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1916 }
1917 } else {
1918 if (cct->_conf->cephx_require_signatures ||
1919 cct->_conf->cephx_service_require_signatures) {
1920 ldout(cct, 10)
1921 << __func__
1922 << " using cephx, requiring MSG_AUTH feature bit for service"
1923 << dendl;
1924 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1925 }
1926 }
1927 }
1928
1929 uint64_t feat_missing =
1930 connection->policy.features_required & ~(uint64_t)connect_msg.features;
1931 if (feat_missing) {
1932 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
1933 << feat_missing << std::dec << dendl;
1934 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply,
1935 authorizer_reply);
1936 }
1937
1938 bufferlist auth_bl_copy = authorizer_buf;
1939 connection->lock.unlock();
1940 ldout(cct,10) << __func__ << " authorizor_protocol "
1941 << connect_msg.authorizer_protocol
1942 << " len " << auth_bl_copy.length()
1943 << dendl;
1944 bool authorizer_valid;
1945 bool need_challenge = HAVE_FEATURE(connect_msg.features, CEPHX_V2);
1946 bool had_challenge = (bool)authorizer_challenge;
1947 if (!messenger->ms_deliver_verify_authorizer(
1948 connection, connection->peer_type, connect_msg.authorizer_protocol,
1949 auth_bl_copy, authorizer_reply, authorizer_valid, session_key,
1950 nullptr /* connection_secret */,
1951 need_challenge ? &authorizer_challenge : nullptr) ||
1952 !authorizer_valid) {
1953 connection->lock.lock();
1954 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1955 ldout(cct, 1) << __func__
1956 << " state changed while accept, it must be mark_down"
1957 << dendl;
1958 ceph_assert(state == CLOSED);
1959 return _fault();
1960 }
1961
1962 if (need_challenge && !had_challenge && authorizer_challenge) {
1963 ldout(cct, 10) << __func__ << ": challenging authorizer" << dendl;
1964 ceph_assert(authorizer_reply.length());
1965 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER,
1966 reply, authorizer_reply);
1967 } else {
1968 ldout(cct, 0) << __func__ << ": got bad authorizer, auth_reply_len="
1969 << authorizer_reply.length() << dendl;
1970 session_security.reset();
1971 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
1972 authorizer_reply);
1973 }
1974 }
1975
1976 // We've verified the authorizer for this AsyncConnection, so set up the
1977 // session security structure. PLR
1978 ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
1979
1980 // existing?
1981 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
1982
1983 connection->inject_delay();
1984
1985 connection->lock.lock();
1986 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1987 ldout(cct, 1) << __func__
1988 << " state changed while accept, it must be mark_down"
1989 << dendl;
1990 ceph_assert(state == CLOSED);
1991 return _fault();
1992 }
1993
1994 if (existing == connection) {
1995 existing = nullptr;
1996 }
1997 if (existing && existing->protocol->proto_type != 1) {
1998 ldout(cct,1) << __func__ << " existing " << existing << " proto "
1999 << existing->protocol.get() << " version is "
2000 << existing->protocol->proto_type << ", marking down" << dendl;
2001 existing->mark_down();
2002 existing = nullptr;
2003 }
2004
2005 if (existing) {
2006 // There is no possible that existing connection will acquire this
2007 // connection's lock
2008 existing->lock.lock(); // skip lockdep check (we are locking a second
2009 // AsyncConnection here)
2010
2011 ldout(cct,10) << __func__ << " existing=" << existing << " exproto="
2012 << existing->protocol.get() << dendl;
2013 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2014 ceph_assert(exproto);
2015 ceph_assert(exproto->proto_type == 1);
2016
2017 if (exproto->state == CLOSED) {
2018 ldout(cct, 1) << __func__ << " existing " << existing
2019 << " already closed." << dendl;
2020 existing->lock.unlock();
2021 existing = nullptr;
2022
2023 return open(reply, authorizer_reply);
2024 }
2025
2026 if (exproto->replacing) {
2027 ldout(cct, 1) << __func__
2028 << " existing racing replace happened while replacing."
2029 << " existing_state="
2030 << connection->get_state_name(existing->state) << dendl;
2031 reply.global_seq = exproto->peer_global_seq;
2032 existing->lock.unlock();
2033 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2034 authorizer_reply);
2035 }
2036
2037 if (connect_msg.global_seq < exproto->peer_global_seq) {
2038 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2039 << exproto->peer_global_seq << " > "
2040 << connect_msg.global_seq << ", RETRY_GLOBAL" << dendl;
2041 reply.global_seq = exproto->peer_global_seq; // so we can send it below..
2042 existing->lock.unlock();
2043 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2044 authorizer_reply);
2045 } else {
2046 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2047 << exproto->peer_global_seq
2048 << " <= " << connect_msg.global_seq << ", looks ok"
2049 << dendl;
2050 }
2051
2052 if (existing->policy.lossy) {
2053 ldout(cct, 0)
2054 << __func__
2055 << " accept replacing existing (lossy) channel (new one lossy="
2056 << connection->policy.lossy << ")" << dendl;
2057 exproto->session_reset();
2058 return replace(existing, reply, authorizer_reply);
2059 }
2060
2061 ldout(cct, 1) << __func__ << " accept connect_seq "
2062 << connect_msg.connect_seq
2063 << " vs existing csq=" << exproto->connect_seq
2064 << " existing_state="
2065 << connection->get_state_name(existing->state) << dendl;
2066
2067 if (connect_msg.connect_seq == 0 && exproto->connect_seq > 0) {
2068 ldout(cct, 0)
2069 << __func__
2070 << " accept peer reset, then tried to connect to us, replacing"
2071 << dendl;
2072 // this is a hard reset from peer
2073 is_reset_from_peer = true;
2074 if (connection->policy.resetcheck) {
2075 exproto->session_reset(); // this resets out_queue, msg_ and
2076 // connect_seq #'s
2077 }
2078 return replace(existing, reply, authorizer_reply);
2079 }
2080
2081 if (connect_msg.connect_seq < exproto->connect_seq) {
2082 // old attempt, or we sent READY but they didn't get it.
2083 ldout(cct, 10) << __func__ << " accept existing " << existing << ".cseq "
2084 << exproto->connect_seq << " > " << connect_msg.connect_seq
2085 << ", RETRY_SESSION" << dendl;
2086 reply.connect_seq = exproto->connect_seq + 1;
2087 existing->lock.unlock();
2088 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2089 authorizer_reply);
2090 }
2091
2092 if (connect_msg.connect_seq == exproto->connect_seq) {
2093 // if the existing connection successfully opened, and/or
2094 // subsequently went to standby, then the peer should bump
2095 // their connect_seq and retry: this is not a connection race
2096 // we need to resolve here.
2097 if (exproto->state == OPENED || exproto->state == STANDBY) {
2098 ldout(cct, 10) << __func__ << " accept connection race, existing "
2099 << existing << ".cseq " << exproto->connect_seq
2100 << " == " << connect_msg.connect_seq
2101 << ", OPEN|STANDBY, RETRY_SESSION " << dendl;
2102 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2103 // replace
2104 if (connection->policy.resetcheck && exproto->connect_seq == 0) {
2105 return replace(existing, reply, authorizer_reply);
2106 }
2107
2108 reply.connect_seq = exproto->connect_seq + 1;
2109 existing->lock.unlock();
2110 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2111 authorizer_reply);
2112 }
2113
2114 // connection race?
2115 if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr_legacy() ||
2116 existing->policy.server) {
2117 // incoming wins
2118 ldout(cct, 10) << __func__ << " accept connection race, existing "
2119 << existing << ".cseq " << exproto->connect_seq
2120 << " == " << connect_msg.connect_seq
2121 << ", or we are server, replacing my attempt" << dendl;
2122 return replace(existing, reply, authorizer_reply);
2123 } else {
2124 // our existing outgoing wins
2125 ldout(messenger->cct, 10)
2126 << __func__ << " accept connection race, existing " << existing
2127 << ".cseq " << exproto->connect_seq
2128 << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
2129 ceph_assert(connection->peer_addrs->legacy_addr() >
2130 messenger->get_myaddr_legacy());
2131 existing->lock.unlock();
2132 // make sure we follow through with opening the existing
2133 // connection (if it isn't yet open) since we know the peer
2134 // has something to send to us.
2135 existing->send_keepalive();
2136 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
2137 authorizer_reply);
2138 }
2139 }
2140
2141 ceph_assert(connect_msg.connect_seq > exproto->connect_seq);
2142 ceph_assert(connect_msg.global_seq >= exproto->peer_global_seq);
2143 if (connection->policy.resetcheck && // RESETSESSION only used by servers;
2144 // peers do not reset each other
2145 exproto->connect_seq == 0) {
2146 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2147 << connect_msg.connect_seq << ", " << existing
2148 << ".cseq = " << exproto->connect_seq
2149 << "), sending RESETSESSION " << dendl;
2150 existing->lock.unlock();
2151 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2152 authorizer_reply);
2153 }
2154
2155 // reconnect
2156 ldout(cct, 10) << __func__ << " accept peer sent cseq "
2157 << connect_msg.connect_seq << " > " << exproto->connect_seq
2158 << dendl;
2159 return replace(existing, reply, authorizer_reply);
2160 } // existing
2161 else if (!replacing && connect_msg.connect_seq > 0) {
2162 // we reset, and they are opening a new session
2163 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2164 << connect_msg.connect_seq << "), sending RESETSESSION"
2165 << dendl;
2166 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2167 authorizer_reply);
2168 } else {
2169 // new session
2170 ldout(cct, 10) << __func__ << " accept new session" << dendl;
2171 existing = nullptr;
2172 return open(reply, authorizer_reply);
2173 }
2174 }
2175
2176 CtPtr ProtocolV1::send_connect_message_reply(char tag,
2177 ceph_msg_connect_reply &reply,
2178 bufferlist &authorizer_reply) {
2179 ldout(cct, 20) << __func__ << dendl;
2180 bufferlist reply_bl;
2181 reply.tag = tag;
2182 reply.features =
2183 ((uint64_t)connect_msg.features & connection->policy.features_supported) |
2184 connection->policy.features_required;
2185 reply.authorizer_len = authorizer_reply.length();
2186 reply_bl.append((char *)&reply, sizeof(reply));
2187
2188 ldout(cct, 10) << __func__ << " reply features 0x" << std::hex
2189 << reply.features << " = (policy sup 0x"
2190 << connection->policy.features_supported
2191 << " & connect 0x" << (uint64_t)connect_msg.features
2192 << ") | policy req 0x"
2193 << connection->policy.features_required
2194 << dendl;
2195
2196 if (reply.authorizer_len) {
2197 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2198 authorizer_reply.clear();
2199 }
2200
2201 return WRITE(reply_bl, handle_connect_message_reply_write);
2202 }
2203
2204 CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
2205 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2206
2207 if (r < 0) {
2208 ldout(cct, 1) << " write connect message reply failed" << dendl;
2209 connection->inject_delay();
2210 return _fault();
2211 }
2212
2213 return CONTINUE(wait_connect_message);
2214 }
2215
2216 CtPtr ProtocolV1::replace(AsyncConnectionRef existing,
2217 ceph_msg_connect_reply &reply,
2218 bufferlist &authorizer_reply) {
2219 ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
2220
2221 connection->inject_delay();
2222 if (existing->policy.lossy) {
2223 // disconnect from the Connection
2224 ldout(cct, 1) << __func__ << " replacing on lossy channel, failing existing"
2225 << dendl;
2226 existing->protocol->stop();
2227 existing->dispatch_queue->queue_reset(existing.get());
2228 } else {
2229 ceph_assert(can_write == WriteStatus::NOWRITE);
2230 existing->write_lock.lock();
2231
2232 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2233
2234 // reset the in_seq if this is a hard reset from peer,
2235 // otherwise we respect our original connection's value
2236 if (is_reset_from_peer) {
2237 exproto->is_reset_from_peer = true;
2238 }
2239
2240 connection->center->delete_file_event(connection->cs.fd(),
2241 EVENT_READABLE | EVENT_WRITABLE);
2242
2243 if (existing->delay_state) {
2244 existing->delay_state->flush();
2245 ceph_assert(!connection->delay_state);
2246 }
2247 exproto->reset_recv_state();
2248
2249 exproto->connect_msg.features = connect_msg.features;
2250
2251 auto temp_cs = std::move(connection->cs);
2252 EventCenter *new_center = connection->center;
2253 Worker *new_worker = connection->worker;
2254 // avoid _stop shutdown replacing socket
2255 // queue a reset on the new connection, which we're dumping for the old
2256 stop();
2257
2258 connection->dispatch_queue->queue_reset(connection);
2259 ldout(messenger->cct, 1)
2260 << __func__ << " stop myself to swap existing" << dendl;
2261 exproto->can_write = WriteStatus::REPLACING;
2262 exproto->replacing = true;
2263 existing->state_offset = 0;
2264 // avoid previous thread modify event
2265 exproto->state = NONE;
2266 existing->state = AsyncConnection::STATE_NONE;
2267 // Discard existing prefetch buffer in `recv_buf`
2268 existing->recv_start = existing->recv_end = 0;
2269 // there shouldn't exist any buffer
2270 ceph_assert(connection->recv_start == connection->recv_end);
2271
2272 exproto->authorizer_challenge.reset();
2273
2274 auto deactivate_existing = std::bind(
2275 [existing, new_worker, new_center, exproto, reply,
2276 authorizer_reply](ConnectedSocket &cs) mutable {
2277 // we need to delete time event in original thread
2278 {
2279 std::lock_guard<std::mutex> l(existing->lock);
2280 existing->write_lock.lock();
2281 exproto->requeue_sent();
2282 existing->outcoming_bl.clear();
2283 existing->open_write = false;
2284 existing->write_lock.unlock();
2285 if (exproto->state == NONE) {
2286 existing->shutdown_socket();
2287 existing->cs = std::move(cs);
2288 existing->worker->references--;
2289 new_worker->references++;
2290 existing->logger = new_worker->get_perf_counter();
2291 existing->worker = new_worker;
2292 existing->center = new_center;
2293 if (existing->delay_state)
2294 existing->delay_state->set_center(new_center);
2295 } else if (exproto->state == CLOSED) {
2296 auto back_to_close =
2297 std::bind([](ConnectedSocket &cs) mutable { cs.close(); },
2298 std::move(cs));
2299 new_center->submit_to(new_center->get_id(),
2300 std::move(back_to_close), true);
2301 return;
2302 } else {
2303 ceph_abort();
2304 }
2305 }
2306
2307 // Before changing existing->center, it may already exists some
2308 // events in existing->center's queue. Then if we mark down
2309 // `existing`, it will execute in another thread and clean up
2310 // connection. Previous event will result in segment fault
2311 auto transfer_existing = [existing, exproto, reply,
2312 authorizer_reply]() mutable {
2313 std::lock_guard<std::mutex> l(existing->lock);
2314 if (exproto->state == CLOSED) return;
2315 ceph_assert(exproto->state == NONE);
2316
2317 // we have called shutdown_socket above
2318 ceph_assert(existing->last_tick_id == 0);
2319 // restart timer since we are going to re-build connection
2320 existing->last_connect_started = ceph::coarse_mono_clock::now();
2321 existing->last_tick_id = existing->center->create_time_event(
2322 existing->connect_timeout_us, existing->tick_handler);
2323 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2324 exproto->state = ACCEPTING;
2325
2326 existing->center->create_file_event(
2327 existing->cs.fd(), EVENT_READABLE, existing->read_handler);
2328 reply.global_seq = exproto->peer_global_seq;
2329 exproto->run_continuation(exproto->send_connect_message_reply(
2330 CEPH_MSGR_TAG_RETRY_GLOBAL, reply, authorizer_reply));
2331 };
2332 if (existing->center->in_thread())
2333 transfer_existing();
2334 else
2335 existing->center->submit_to(existing->center->get_id(),
2336 std::move(transfer_existing), true);
2337 },
2338 std::move(temp_cs));
2339
2340 existing->center->submit_to(existing->center->get_id(),
2341 std::move(deactivate_existing), true);
2342 existing->write_lock.unlock();
2343 existing->lock.unlock();
2344 return nullptr;
2345 }
2346 existing->lock.unlock();
2347
2348 return open(reply, authorizer_reply);
2349 }
2350
2351 CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
2352 bufferlist &authorizer_reply) {
2353 ldout(cct, 20) << __func__ << dendl;
2354
2355 connect_seq = connect_msg.connect_seq + 1;
2356 peer_global_seq = connect_msg.global_seq;
2357 ldout(cct, 10) << __func__ << " accept success, connect_seq = " << connect_seq
2358 << " in_seq=" << in_seq << ", sending READY" << dendl;
2359
2360 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2361 // in/out sequence
2362 if ((connect_msg.features & CEPH_FEATURE_RECONNECT_SEQ) &&
2363 !is_reset_from_peer) {
2364 reply.tag = CEPH_MSGR_TAG_SEQ;
2365 wait_for_seq = true;
2366 } else {
2367 reply.tag = CEPH_MSGR_TAG_READY;
2368 wait_for_seq = false;
2369 out_seq = discard_requeued_up_to(out_seq, 0);
2370 is_reset_from_peer = false;
2371 in_seq = 0;
2372 }
2373
2374 // send READY reply
2375 reply.features = connection->policy.features_supported;
2376 reply.global_seq = messenger->get_global_seq();
2377 reply.connect_seq = connect_seq;
2378 reply.flags = 0;
2379 reply.authorizer_len = authorizer_reply.length();
2380 if (connection->policy.lossy) {
2381 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
2382 }
2383
2384 connection->set_features((uint64_t)reply.features &
2385 (uint64_t)connect_msg.features);
2386 ldout(cct, 10) << __func__ << " accept features "
2387 << connection->get_features()
2388 << " authorizer_protocol "
2389 << connect_msg.authorizer_protocol << dendl;
2390
2391 session_security.reset(
2392 get_auth_session_handler(cct, connect_msg.authorizer_protocol,
2393 session_key,
2394 connection->get_features()));
2395
2396 bufferlist reply_bl;
2397 reply_bl.append((char *)&reply, sizeof(reply));
2398
2399 if (reply.authorizer_len) {
2400 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2401 }
2402
2403 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
2404 uint64_t s = in_seq;
2405 reply_bl.append((char *)&s, sizeof(s));
2406 }
2407
2408 connection->lock.unlock();
2409 // Because "replacing" will prevent other connections preempt this addr,
2410 // it's safe that here we don't acquire Connection's lock
2411 ssize_t r = messenger->accept_conn(connection);
2412
2413 connection->inject_delay();
2414
2415 connection->lock.lock();
2416 replacing = false;
2417 if (r < 0) {
2418 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2419 << connection->peer_addrs->legacy_addr()
2420 << " just fail later one(this)" << dendl;
2421 ldout(cct, 10) << "accept fault after register" << dendl;
2422 connection->inject_delay();
2423 return _fault();
2424 }
2425 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2426 ldout(cct, 1) << __func__
2427 << " state changed while accept_conn, it must be mark_down"
2428 << dendl;
2429 ceph_assert(state == CLOSED || state == NONE);
2430 ldout(cct, 10) << "accept fault after register" << dendl;
2431 messenger->unregister_conn(connection);
2432 connection->inject_delay();
2433 return _fault();
2434 }
2435
2436 return WRITE(reply_bl, handle_ready_connect_message_reply_write);
2437 }
2438
2439 CtPtr ProtocolV1::handle_ready_connect_message_reply_write(int r) {
2440 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2441
2442 if (r < 0) {
2443 ldout(cct, 1) << __func__ << " write ready connect message reply failed"
2444 << dendl;
2445 return _fault();
2446 }
2447
2448 // notify
2449 connection->dispatch_queue->queue_accept(connection);
2450 messenger->ms_deliver_handle_fast_accept(connection);
2451 once_ready = true;
2452
2453 state = ACCEPTING_HANDLED_CONNECT_MSG;
2454
2455 if (wait_for_seq) {
2456 return wait_seq();
2457 }
2458
2459 return server_ready();
2460 }
2461
2462 CtPtr ProtocolV1::wait_seq() {
2463 ldout(cct, 20) << __func__ << dendl;
2464
2465 return READ(sizeof(uint64_t), handle_seq);
2466 }
2467
2468 CtPtr ProtocolV1::handle_seq(char *buffer, int r) {
2469 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2470
2471 if (r < 0) {
2472 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
2473 return _fault();
2474 }
2475
2476 uint64_t newly_acked_seq = *(uint64_t *)buffer;
2477 ldout(cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq
2478 << dendl;
2479 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
2480
2481 return server_ready();
2482 }
2483
2484 CtPtr ProtocolV1::server_ready() {
2485 ldout(cct, 20) << __func__ << " session_security is "
2486 << session_security
2487 << dendl;
2488
2489 ldout(cct, 20) << __func__ << " accept done" << dendl;
2490 memset(&connect_msg, 0, sizeof(connect_msg));
2491
2492 if (connection->delay_state) {
2493 ceph_assert(connection->delay_state->ready());
2494 }
2495
2496 return ready();
2497 }