]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/ProtocolV1.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / msg / async / ProtocolV1.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV1.h"
5
6 #include "common/errno.h"
7
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
12 #include "auth/AuthClient.h"
13 #include "auth/AuthServer.h"
14
15 #define dout_subsys ceph_subsys_ms
16 #undef dout_prefix
17 #define dout_prefix _conn_prefix(_dout)
18 std::ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
19 return *_dout << "--1- " << messenger->get_myaddrs() << " >> "
20 << *connection->peer_addrs
21 << " conn("
22 << connection << " " << this
23 << " :" << connection->port << " s=" << get_state_name(state)
24 << " pgs=" << peer_global_seq << " cs=" << connect_seq
25 << " l=" << connection->policy.lossy << ").";
26 }
27
28 #define WRITE(B, C) write(CONTINUATION(C), B)
29
30 #define READ(L, C) read(CONTINUATION(C), L)
31
32 #define READB(L, B, C) read(CONTINUATION(C), L, B)
33
34 // Constant to limit starting sequence number to 2^31. Nothing special about
35 // it, just a big number. PLR
36 #define SEQ_MASK 0x7fffffff
37
38 const int ASYNC_COALESCE_THRESHOLD = 256;
39
40 using namespace std;
41
42 static void alloc_aligned_buffer(ceph::buffer::list &data, unsigned len, unsigned off) {
43 // create a buffer to read into that matches the data alignment
44 unsigned alloc_len = 0;
45 unsigned left = len;
46 unsigned head = 0;
47 if (off & ~CEPH_PAGE_MASK) {
48 // head
49 alloc_len += CEPH_PAGE_SIZE;
50 head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
51 left -= head;
52 }
53 alloc_len += left;
54 ceph::bufferptr ptr(ceph::buffer::create_small_page_aligned(alloc_len));
55 if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
56 data.push_back(std::move(ptr));
57 }
58
59 /**
60 * Protocol V1
61 **/
62
63 ProtocolV1::ProtocolV1(AsyncConnection *connection)
64 : Protocol(1, connection),
65 temp_buffer(nullptr),
66 can_write(WriteStatus::NOWRITE),
67 keepalive(false),
68 connect_seq(0),
69 peer_global_seq(0),
70 msg_left(0),
71 cur_msg_size(0),
72 replacing(false),
73 is_reset_from_peer(false),
74 once_ready(false),
75 state(NONE),
76 global_seq(0),
77 wait_for_seq(false) {
78 temp_buffer = new char[4096];
79 }
80
81 ProtocolV1::~ProtocolV1() {
82 ceph_assert(out_q.empty());
83 ceph_assert(sent.empty());
84
85 delete[] temp_buffer;
86 }
87
88 void ProtocolV1::connect() {
89 this->state = START_CONNECT;
90
91 // reset connect state variables
92 authorizer_buf.clear();
93 // FIPS zeroization audit 20191115: these memsets are not security related.
94 memset(&connect_msg, 0, sizeof(connect_msg));
95 memset(&connect_reply, 0, sizeof(connect_reply));
96
97 global_seq = messenger->get_global_seq();
98 }
99
100 void ProtocolV1::accept() { this->state = START_ACCEPT; }
101
102 bool ProtocolV1::is_connected() {
103 return can_write.load() == WriteStatus::CANWRITE;
104 }
105
106 void ProtocolV1::stop() {
107 ldout(cct, 20) << __func__ << dendl;
108 if (state == CLOSED) {
109 return;
110 }
111
112 if (connection->delay_state) connection->delay_state->flush();
113
114 ldout(cct, 2) << __func__ << dendl;
115 std::lock_guard<std::mutex> l(connection->write_lock);
116
117 reset_recv_state();
118 discard_out_queue();
119
120 connection->_stop();
121
122 can_write = WriteStatus::CLOSED;
123 state = CLOSED;
124 }
125
126 void ProtocolV1::fault() {
127 ldout(cct, 20) << __func__ << dendl;
128
129 if (state == CLOSED || state == NONE) {
130 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
131 return;
132 }
133
134 if (connection->policy.lossy && state != START_CONNECT &&
135 state != CONNECTING) {
136 ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
137 stop();
138 connection->dispatch_queue->queue_reset(connection);
139 return;
140 }
141
142 connection->write_lock.lock();
143 can_write = WriteStatus::NOWRITE;
144 is_reset_from_peer = false;
145
146 // requeue sent items
147 requeue_sent();
148
149 if (!once_ready && out_q.empty() && state >= START_ACCEPT &&
150 state <= ACCEPTING_WAIT_CONNECT_MSG_AUTH && !replacing) {
151 ldout(cct, 10) << __func__ << " with nothing to send and in the half "
152 << " accept state just closed" << dendl;
153 connection->write_lock.unlock();
154 stop();
155 connection->dispatch_queue->queue_reset(connection);
156 return;
157 }
158 replacing = false;
159
160 connection->fault();
161
162 reset_recv_state();
163
164 if (connection->policy.standby && out_q.empty() && !keepalive &&
165 state != WAIT) {
166 ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
167 << dendl;
168 state = STANDBY;
169 connection->write_lock.unlock();
170 return;
171 }
172
173 connection->write_lock.unlock();
174
175 if ((state >= START_CONNECT && state <= CONNECTING_SEND_CONNECT_MSG) ||
176 state == WAIT) {
177 // backoff!
178 if (state == WAIT) {
179 backoff.set_from_double(cct->_conf->ms_max_backoff);
180 } else if (backoff == utime_t()) {
181 backoff.set_from_double(cct->_conf->ms_initial_backoff);
182 } else {
183 backoff += backoff;
184 if (backoff > cct->_conf->ms_max_backoff)
185 backoff.set_from_double(cct->_conf->ms_max_backoff);
186 }
187
188 global_seq = messenger->get_global_seq();
189 state = START_CONNECT;
190 connection->state = AsyncConnection::STATE_CONNECTING;
191 ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
192 // woke up again;
193 connection->register_time_events.insert(
194 connection->center->create_time_event(backoff.to_nsec() / 1000,
195 connection->wakeup_handler));
196 } else {
197 // policy maybe empty when state is in accept
198 if (connection->policy.server) {
199 ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
200 state = STANDBY;
201 } else {
202 ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
203 connect_seq++;
204 global_seq = messenger->get_global_seq();
205 state = START_CONNECT;
206 connection->state = AsyncConnection::STATE_CONNECTING;
207 }
208 backoff = utime_t();
209 connection->center->dispatch_event_external(connection->read_handler);
210 }
211 }
212
213 void ProtocolV1::send_message(Message *m) {
214 ceph::buffer::list bl;
215 uint64_t f = connection->get_features();
216
217 // TODO: Currently not all messages supports reencode like MOSDMap, so here
218 // only let fast dispatch support messages prepare message
219 bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
220 if (can_fast_prepare) {
221 prepare_send_message(f, m, bl);
222 }
223
224 std::lock_guard<std::mutex> l(connection->write_lock);
225 // "features" changes will change the payload encoding
226 if (can_fast_prepare &&
227 (can_write == WriteStatus::NOWRITE || connection->get_features() != f)) {
228 // ensure the correctness of message encoding
229 bl.clear();
230 m->clear_payload();
231 ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
232 << " != " << connection->get_features() << dendl;
233 }
234 if (can_write == WriteStatus::CLOSED) {
235 ldout(cct, 10) << __func__ << " connection closed."
236 << " Drop message " << m << dendl;
237 m->put();
238 } else {
239 m->queue_start = ceph::mono_clock::now();
240 m->trace.event("async enqueueing message");
241 out_q[m->get_priority()].emplace_back(std::move(bl), m);
242 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
243 << dendl;
244 if (can_write != WriteStatus::REPLACING && !write_in_progress) {
245 write_in_progress = true;
246 connection->center->dispatch_event_external(connection->write_handler);
247 }
248 }
249 }
250
251 void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
252 ceph::buffer::list &bl) {
253 ldout(cct, 20) << __func__ << " m " << *m << dendl;
254
255 // associate message with Connection (for benefit of encode_payload)
256 ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
257 << features << " " << m << " " << *m << dendl;
258
259 // encode and copy out of *m
260 // in write_message we update header.seq and need recalc crc
261 // so skip calc header in encode function.
262 m->encode(features, messenger->crcflags, true);
263
264 bl.append(m->get_payload());
265 bl.append(m->get_middle());
266 bl.append(m->get_data());
267 }
268
269 void ProtocolV1::send_keepalive() {
270 ldout(cct, 10) << __func__ << dendl;
271 std::lock_guard<std::mutex> l(connection->write_lock);
272 if (can_write != WriteStatus::CLOSED) {
273 keepalive = true;
274 connection->center->dispatch_event_external(connection->write_handler);
275 }
276 }
277
278 void ProtocolV1::read_event() {
279 ldout(cct, 20) << __func__ << dendl;
280 switch (state) {
281 case START_CONNECT:
282 CONTINUATION_RUN(CONTINUATION(send_client_banner));
283 break;
284 case START_ACCEPT:
285 CONTINUATION_RUN(CONTINUATION(send_server_banner));
286 break;
287 case OPENED:
288 CONTINUATION_RUN(CONTINUATION(wait_message));
289 break;
290 case THROTTLE_MESSAGE:
291 CONTINUATION_RUN(CONTINUATION(throttle_message));
292 break;
293 case THROTTLE_BYTES:
294 CONTINUATION_RUN(CONTINUATION(throttle_bytes));
295 break;
296 case THROTTLE_DISPATCH_QUEUE:
297 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
298 break;
299 default:
300 break;
301 }
302 }
303
304 void ProtocolV1::write_event() {
305 ldout(cct, 10) << __func__ << dendl;
306 ssize_t r = 0;
307
308 connection->write_lock.lock();
309 if (can_write == WriteStatus::CANWRITE) {
310 if (keepalive) {
311 append_keepalive_or_ack();
312 keepalive = false;
313 }
314
315 auto start = ceph::mono_clock::now();
316 bool more;
317 do {
318 if (connection->is_queued()) {
319 if (r = connection->_try_send(); r!= 0) {
320 // either fails to send or not all queued buffer is sent
321 break;
322 }
323 }
324
325 ceph::buffer::list data;
326 Message *m = _get_next_outgoing(&data);
327 if (!m) {
328 break;
329 }
330
331 if (!connection->policy.lossy) {
332 // put on sent list
333 sent.push_back(m);
334 m->get();
335 }
336 more = !out_q.empty();
337 connection->write_lock.unlock();
338
339 // send_message or requeue messages may not encode message
340 if (!data.length()) {
341 prepare_send_message(connection->get_features(), m, data);
342 }
343
344 if (m->queue_start != ceph::mono_time()) {
345 connection->logger->tinc(l_msgr_send_messages_queue_lat,
346 ceph::mono_clock::now() - m->queue_start);
347 }
348
349 r = write_message(m, data, more);
350
351 connection->write_lock.lock();
352 if (r == 0) {
353 ;
354 } else if (r < 0) {
355 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
356 break;
357 } else if (r > 0) {
358 // Outbound message in-progress, thread will be re-awoken
359 // when the outbound socket is writeable again
360 break;
361 }
362 } while (can_write == WriteStatus::CANWRITE);
363 write_in_progress = false;
364 connection->write_lock.unlock();
365
366 // if r > 0 mean data still lefted, so no need _try_send.
367 if (r == 0) {
368 uint64_t left = ack_left;
369 if (left) {
370 ceph_le64 s;
371 s = in_seq;
372 connection->outgoing_bl.append(CEPH_MSGR_TAG_ACK);
373 connection->outgoing_bl.append((char *)&s, sizeof(s));
374 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
375 << " messages" << dendl;
376 ack_left -= left;
377 left = ack_left;
378 r = connection->_try_send(left);
379 } else if (is_queued()) {
380 r = connection->_try_send();
381 }
382 }
383
384 connection->logger->tinc(l_msgr_running_send_time,
385 ceph::mono_clock::now() - start);
386 if (r < 0) {
387 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
388 connection->lock.lock();
389 fault();
390 connection->lock.unlock();
391 return;
392 }
393 } else {
394 write_in_progress = false;
395 connection->write_lock.unlock();
396 connection->lock.lock();
397 connection->write_lock.lock();
398 if (state == STANDBY && !connection->policy.server && is_queued()) {
399 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
400 connection->_connect();
401 } else if (connection->cs && state != NONE && state != CLOSED &&
402 state != START_CONNECT) {
403 r = connection->_try_send();
404 if (r < 0) {
405 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
406 connection->write_lock.unlock();
407 fault();
408 connection->lock.unlock();
409 return;
410 }
411 }
412 connection->write_lock.unlock();
413 connection->lock.unlock();
414 }
415 }
416
417 bool ProtocolV1::is_queued() {
418 return !out_q.empty() || connection->is_queued();
419 }
420
421 void ProtocolV1::run_continuation(CtPtr pcontinuation) {
422 if (pcontinuation) {
423 CONTINUATION_RUN(*pcontinuation);
424 }
425 }
426
427 CtPtr ProtocolV1::read(CONTINUATION_RX_TYPE<ProtocolV1> &next,
428 int len, char *buffer) {
429 if (!buffer) {
430 buffer = temp_buffer;
431 }
432 ssize_t r = connection->read(len, buffer,
433 [&next, this](char *buffer, int r) {
434 next.setParams(buffer, r);
435 CONTINUATION_RUN(next);
436 });
437 if (r <= 0) {
438 next.setParams(buffer, r);
439 return &next;
440 }
441
442 return nullptr;
443 }
444
445 CtPtr ProtocolV1::write(CONTINUATION_TX_TYPE<ProtocolV1> &next,
446 ceph::buffer::list &buffer) {
447 ssize_t r = connection->write(buffer, [&next, this](int r) {
448 next.setParams(r);
449 CONTINUATION_RUN(next);
450 });
451 if (r <= 0) {
452 next.setParams(r);
453 return &next;
454 }
455
456 return nullptr;
457 }
458
459 CtPtr ProtocolV1::ready() {
460 ldout(cct, 25) << __func__ << dendl;
461
462 // make sure no pending tick timer
463 if (connection->last_tick_id) {
464 connection->center->delete_time_event(connection->last_tick_id);
465 }
466 connection->last_tick_id = connection->center->create_time_event(
467 connection->inactive_timeout_us, connection->tick_handler);
468
469 connection->write_lock.lock();
470 can_write = WriteStatus::CANWRITE;
471 if (is_queued()) {
472 connection->center->dispatch_event_external(connection->write_handler);
473 }
474 connection->write_lock.unlock();
475 connection->maybe_start_delay_thread();
476
477 state = OPENED;
478 return wait_message();
479 }
480
481 CtPtr ProtocolV1::wait_message() {
482 if (state != OPENED) { // must have changed due to a replace
483 return nullptr;
484 }
485
486 ldout(cct, 20) << __func__ << dendl;
487
488 return READ(sizeof(char), handle_message);
489 }
490
491 CtPtr ProtocolV1::handle_message(char *buffer, int r) {
492 ldout(cct, 20) << __func__ << " r=" << r << dendl;
493
494 if (r < 0) {
495 ldout(cct, 1) << __func__ << " read tag failed" << dendl;
496 return _fault();
497 }
498
499 char tag = buffer[0];
500 ldout(cct, 20) << __func__ << " process tag " << (int)tag << dendl;
501
502 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
503 ldout(cct, 20) << __func__ << " got KEEPALIVE" << dendl;
504 connection->set_last_keepalive(ceph_clock_now());
505 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
506 return READ(sizeof(ceph_timespec), handle_keepalive2);
507 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
508 return READ(sizeof(ceph_timespec), handle_keepalive2_ack);
509 } else if (tag == CEPH_MSGR_TAG_ACK) {
510 return READ(sizeof(ceph_le64), handle_tag_ack);
511 } else if (tag == CEPH_MSGR_TAG_MSG) {
512 recv_stamp = ceph_clock_now();
513 ldout(cct, 20) << __func__ << " begin MSG" << dendl;
514 return READ(sizeof(ceph_msg_header), handle_message_header);
515 } else if (tag == CEPH_MSGR_TAG_CLOSE) {
516 ldout(cct, 20) << __func__ << " got CLOSE" << dendl;
517 stop();
518 } else {
519 ldout(cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
520 return _fault();
521 }
522 return nullptr;
523 }
524
525 CtPtr ProtocolV1::handle_keepalive2(char *buffer, int r) {
526 ldout(cct, 20) << __func__ << " r=" << r << dendl;
527
528 if (r < 0) {
529 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
530 return _fault();
531 }
532
533 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
534
535 ceph_timespec *t;
536 t = (ceph_timespec *)buffer;
537 utime_t kp_t = utime_t(*t);
538 connection->write_lock.lock();
539 append_keepalive_or_ack(true, &kp_t);
540 connection->write_lock.unlock();
541
542 ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
543 connection->set_last_keepalive(ceph_clock_now());
544
545 if (is_connected()) {
546 connection->center->dispatch_event_external(connection->write_handler);
547 }
548
549 return CONTINUE(wait_message);
550 }
551
552 void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
553 ldout(cct, 10) << __func__ << dendl;
554 if (ack) {
555 ceph_assert(tp);
556 struct ceph_timespec ts;
557 tp->encode_timeval(&ts);
558 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
559 connection->outgoing_bl.append((char *)&ts, sizeof(ts));
560 } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
561 struct ceph_timespec ts;
562 utime_t t = ceph_clock_now();
563 t.encode_timeval(&ts);
564 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
565 connection->outgoing_bl.append((char *)&ts, sizeof(ts));
566 } else {
567 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
568 }
569 }
570
571 CtPtr ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
572 ldout(cct, 20) << __func__ << " r=" << r << dendl;
573
574 if (r < 0) {
575 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
576 return _fault();
577 }
578
579 ceph_timespec *t;
580 t = (ceph_timespec *)buffer;
581 connection->set_last_keepalive_ack(utime_t(*t));
582 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
583
584 return CONTINUE(wait_message);
585 }
586
587 CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
588 ldout(cct, 20) << __func__ << " r=" << r << dendl;
589
590 if (r < 0) {
591 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
592 return _fault();
593 }
594
595 ceph_le64 seq;
596 seq = *(ceph_le64 *)buffer;
597 ldout(cct, 20) << __func__ << " got ACK" << dendl;
598
599 ldout(cct, 15) << __func__ << " got ack seq " << seq << dendl;
600 // trim sent list
601 static const int max_pending = 128;
602 int i = 0;
603 auto now = ceph::mono_clock::now();
604 Message *pending[max_pending];
605 connection->write_lock.lock();
606 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
607 Message *m = sent.front();
608 sent.pop_front();
609 pending[i++] = m;
610 ldout(cct, 10) << __func__ << " got ack seq " << seq
611 << " >= " << m->get_seq() << " on " << m << " " << *m
612 << dendl;
613 }
614 connection->write_lock.unlock();
615 connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
616 for (int k = 0; k < i; k++) {
617 pending[k]->put();
618 }
619
620 return CONTINUE(wait_message);
621 }
622
623 CtPtr ProtocolV1::handle_message_header(char *buffer, int r) {
624 ldout(cct, 20) << __func__ << " r=" << r << dendl;
625
626 if (r < 0) {
627 ldout(cct, 1) << __func__ << " read message header failed" << dendl;
628 return _fault();
629 }
630
631 ldout(cct, 20) << __func__ << " got MSG header" << dendl;
632
633 current_header = *((ceph_msg_header *)buffer);
634
635 ldout(cct, 20) << __func__ << " got envelope type=" << current_header.type << " src "
636 << entity_name_t(current_header.src) << " front=" << current_header.front_len
637 << " data=" << current_header.data_len << " off " << current_header.data_off
638 << dendl;
639
640 if (messenger->crcflags & MSG_CRC_HEADER) {
641 __u32 header_crc = 0;
642 header_crc = ceph_crc32c(0, (unsigned char *)&current_header,
643 sizeof(current_header) - sizeof(current_header.crc));
644 // verify header crc
645 if (header_crc != current_header.crc) {
646 ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
647 << " != " << current_header.crc << dendl;
648 return _fault();
649 }
650 }
651
652 // Reset state
653 data_buf.clear();
654 front.clear();
655 middle.clear();
656 data.clear();
657
658 state = THROTTLE_MESSAGE;
659 return CONTINUE(throttle_message);
660 }
661
662 CtPtr ProtocolV1::throttle_message() {
663 ldout(cct, 20) << __func__ << dendl;
664
665 if (connection->policy.throttler_messages) {
666 ldout(cct, 10) << __func__ << " wants " << 1
667 << " message from policy throttler "
668 << connection->policy.throttler_messages->get_current()
669 << "/" << connection->policy.throttler_messages->get_max()
670 << dendl;
671 if (!connection->policy.throttler_messages->get_or_fail()) {
672 ldout(cct, 1) << __func__ << " wants 1 message from policy throttle "
673 << connection->policy.throttler_messages->get_current()
674 << "/" << connection->policy.throttler_messages->get_max()
675 << " failed, just wait." << dendl;
676 // following thread pool deal with th full message queue isn't a
677 // short time, so we can wait a ms.
678 if (connection->register_time_events.empty()) {
679 connection->register_time_events.insert(
680 connection->center->create_time_event(1000,
681 connection->wakeup_handler));
682 }
683 return nullptr;
684 }
685 }
686
687 state = THROTTLE_BYTES;
688 return CONTINUE(throttle_bytes);
689 }
690
691 CtPtr ProtocolV1::throttle_bytes() {
692 ldout(cct, 20) << __func__ << dendl;
693
694 cur_msg_size = current_header.front_len + current_header.middle_len +
695 current_header.data_len;
696 if (cur_msg_size) {
697 if (connection->policy.throttler_bytes) {
698 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
699 << " bytes from policy throttler "
700 << connection->policy.throttler_bytes->get_current() << "/"
701 << connection->policy.throttler_bytes->get_max() << dendl;
702 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
703 ldout(cct, 1) << __func__ << " wants " << cur_msg_size
704 << " bytes from policy throttler "
705 << connection->policy.throttler_bytes->get_current()
706 << "/" << connection->policy.throttler_bytes->get_max()
707 << " failed, just wait." << dendl;
708 // following thread pool deal with th full message queue isn't a
709 // short time, so we can wait a ms.
710 if (connection->register_time_events.empty()) {
711 connection->register_time_events.insert(
712 connection->center->create_time_event(
713 1000, connection->wakeup_handler));
714 }
715 return nullptr;
716 }
717 }
718 }
719
720 state = THROTTLE_DISPATCH_QUEUE;
721 return CONTINUE(throttle_dispatch_queue);
722 }
723
724 CtPtr ProtocolV1::throttle_dispatch_queue() {
725 ldout(cct, 20) << __func__ << dendl;
726
727 if (cur_msg_size) {
728 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
729 cur_msg_size)) {
730 ldout(cct, 1)
731 << __func__ << " wants " << cur_msg_size
732 << " bytes from dispatch throttle "
733 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
734 << connection->dispatch_queue->dispatch_throttler.get_max()
735 << " failed, just wait." << dendl;
736 // following thread pool deal with th full message queue isn't a
737 // short time, so we can wait a ms.
738 if (connection->register_time_events.empty()) {
739 connection->register_time_events.insert(
740 connection->center->create_time_event(1000,
741 connection->wakeup_handler));
742 }
743 return nullptr;
744 }
745 }
746
747 throttle_stamp = ceph_clock_now();
748
749 state = READ_MESSAGE_FRONT;
750 return read_message_front();
751 }
752
753 CtPtr ProtocolV1::read_message_front() {
754 ldout(cct, 20) << __func__ << dendl;
755
756 unsigned front_len = current_header.front_len;
757 if (front_len) {
758 if (!front.length()) {
759 front.push_back(ceph::buffer::create(front_len));
760 }
761 return READB(front_len, front.c_str(), handle_message_front);
762 }
763 return read_message_middle();
764 }
765
766 CtPtr ProtocolV1::handle_message_front(char *buffer, int r) {
767 ldout(cct, 20) << __func__ << " r=" << r << dendl;
768
769 if (r < 0) {
770 ldout(cct, 1) << __func__ << " read message front failed" << dendl;
771 return _fault();
772 }
773
774 ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
775
776 return read_message_middle();
777 }
778
779 CtPtr ProtocolV1::read_message_middle() {
780 ldout(cct, 20) << __func__ << dendl;
781
782 if (current_header.middle_len) {
783 if (!middle.length()) {
784 middle.push_back(ceph::buffer::create(current_header.middle_len));
785 }
786 return READB(current_header.middle_len, middle.c_str(),
787 handle_message_middle);
788 }
789
790 return read_message_data_prepare();
791 }
792
793 CtPtr ProtocolV1::handle_message_middle(char *buffer, int r) {
794 ldout(cct, 20) << __func__ << " r" << r << dendl;
795
796 if (r < 0) {
797 ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
798 return _fault();
799 }
800
801 ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
802
803 return read_message_data_prepare();
804 }
805
806 CtPtr ProtocolV1::read_message_data_prepare() {
807 ldout(cct, 20) << __func__ << dendl;
808
809 unsigned data_len = current_header.data_len;
810 unsigned data_off = current_header.data_off;
811
812 if (data_len) {
813 // get a buffer
814 #if 0
815 // rx_buffers is broken by design... see
816 // http://tracker.ceph.com/issues/22480
817 map<ceph_tid_t, pair<ceph::buffer::list, int> >::iterator p =
818 connection->rx_buffers.find(current_header.tid);
819 if (p != connection->rx_buffers.end()) {
820 ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
821 << " at offset " << data_off << " len "
822 << p->second.first.length() << dendl;
823 data_buf = p->second.first;
824 // make sure it's big enough
825 if (data_buf.length() < data_len)
826 data_buf.push_back(buffer::create(data_len - data_buf.length()));
827 data_blp = data_buf.begin();
828 } else {
829 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
830 << data_off << dendl;
831 alloc_aligned_buffer(data_buf, data_len, data_off);
832 data_blp = data_buf.begin();
833 }
834 #else
835 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
836 << data_off << dendl;
837 alloc_aligned_buffer(data_buf, data_len, data_off);
838 data_blp = data_buf.begin();
839 #endif
840 }
841
842 msg_left = data_len;
843
844 return CONTINUE(read_message_data);
845 }
846
847 CtPtr ProtocolV1::read_message_data() {
848 ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
849
850 if (msg_left > 0) {
851 auto bp = data_blp.get_current_ptr();
852 unsigned read_len = std::min(bp.length(), msg_left);
853
854 return READB(read_len, bp.c_str(), handle_message_data);
855 }
856
857 return read_message_footer();
858 }
859
860 CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
861 ldout(cct, 20) << __func__ << " r=" << r << dendl;
862
863 if (r < 0) {
864 ldout(cct, 1) << __func__ << " read data error " << dendl;
865 return _fault();
866 }
867
868 auto bp = data_blp.get_current_ptr();
869 unsigned read_len = std::min(bp.length(), msg_left);
870 ceph_assert(read_len <
871 static_cast<unsigned>(std::numeric_limits<int>::max()));
872 data_blp += read_len;
873 data.append(bp, 0, read_len);
874 msg_left -= read_len;
875
876 return CONTINUE(read_message_data);
877 }
878
879 CtPtr ProtocolV1::read_message_footer() {
880 ldout(cct, 20) << __func__ << dendl;
881
882 state = READ_FOOTER_AND_DISPATCH;
883
884 unsigned len;
885 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
886 len = sizeof(ceph_msg_footer);
887 } else {
888 len = sizeof(ceph_msg_footer_old);
889 }
890
891 return READ(len, handle_message_footer);
892 }
893
894 CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
895 ldout(cct, 20) << __func__ << " r=" << r << dendl;
896
897 if (r < 0) {
898 ldout(cct, 1) << __func__ << " read footer data error " << dendl;
899 return _fault();
900 }
901
902 ceph_msg_footer footer;
903 ceph_msg_footer_old old_footer;
904
905 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
906 footer = *((ceph_msg_footer *)buffer);
907 } else {
908 old_footer = *((ceph_msg_footer_old *)buffer);
909 footer.front_crc = old_footer.front_crc;
910 footer.middle_crc = old_footer.middle_crc;
911 footer.data_crc = old_footer.data_crc;
912 footer.sig = 0;
913 footer.flags = old_footer.flags;
914 }
915
916 int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
917 ldout(cct, 10) << __func__ << " aborted = " << aborted << dendl;
918 if (aborted) {
919 ldout(cct, 0) << __func__ << " got " << front.length() << " + "
920 << middle.length() << " + " << data.length()
921 << " byte message.. ABORTED" << dendl;
922 return _fault();
923 }
924
925 ldout(cct, 20) << __func__ << " got " << front.length() << " + "
926 << middle.length() << " + " << data.length() << " byte message"
927 << dendl;
928 Message *message = decode_message(cct, messenger->crcflags, current_header,
929 footer, front, middle, data, connection);
930 if (!message) {
931 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
932 return _fault();
933 }
934
935 //
936 // Check the signature if one should be present. A zero return indicates
937 // success. PLR
938 //
939
940 if (session_security.get() == NULL) {
941 ldout(cct, 10) << __func__ << " no session security set" << dendl;
942 } else {
943 if (session_security->check_message_signature(message)) {
944 ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
945 message->put();
946 return _fault();
947 }
948 }
949 message->set_byte_throttler(connection->policy.throttler_bytes);
950 message->set_message_throttler(connection->policy.throttler_messages);
951
952 // store reservation size in message, so we don't get confused
953 // by messages entering the dispatch queue through other paths.
954 message->set_dispatch_throttle_size(cur_msg_size);
955
956 message->set_recv_stamp(recv_stamp);
957 message->set_throttle_stamp(throttle_stamp);
958 message->set_recv_complete_stamp(ceph_clock_now());
959
960 // check received seq#. if it is old, drop the message.
961 // note that incoming messages may skip ahead. this is convenient for the
962 // client side queueing because messages can't be renumbered, but the (kernel)
963 // client will occasionally pull a message out of the sent queue to send
964 // elsewhere. in that case it doesn't matter if we "got" it or not.
965 uint64_t cur_seq = in_seq;
966 if (message->get_seq() <= cur_seq) {
967 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
968 << " <= " << cur_seq << " " << message << " " << *message
969 << ", discarding" << dendl;
970 message->put();
971 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
972 cct->_conf->ms_die_on_old_message) {
973 ceph_assert(0 == "old msgs despite reconnect_seq feature");
974 }
975 return nullptr;
976 }
977 if (message->get_seq() > cur_seq + 1) {
978 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
979 << cur_seq << " to " << message->get_seq() << dendl;
980 if (cct->_conf->ms_die_on_skipped_message) {
981 ceph_assert(0 == "skipped incoming seq");
982 }
983 }
984
985 #if defined(WITH_EVENTTRACE)
986 if (message->get_type() == CEPH_MSG_OSD_OP ||
987 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
988 utime_t ltt_processed_stamp = ceph_clock_now();
989 double usecs_elapsed =
990 ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
991 ostringstream buf;
992 if (message->get_type() == CEPH_MSG_OSD_OP)
993 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
994 false);
995 else
996 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
997 false);
998 }
999 #endif
1000
1001 // note last received message.
1002 in_seq = message->get_seq();
1003 ldout(cct, 5) << " rx " << message->get_source() << " seq "
1004 << message->get_seq() << " " << message << " " << *message
1005 << dendl;
1006
1007 bool need_dispatch_writer = false;
1008 if (!connection->policy.lossy) {
1009 ack_left++;
1010 need_dispatch_writer = true;
1011 }
1012
1013 state = OPENED;
1014
1015 ceph::mono_time fast_dispatch_time;
1016
1017 if (connection->is_blackhole()) {
1018 ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1019 message->put();
1020 goto out;
1021 }
1022
1023 connection->logger->inc(l_msgr_recv_messages);
1024 connection->logger->inc(
1025 l_msgr_recv_bytes,
1026 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1027
1028 messenger->ms_fast_preprocess(message);
1029 fast_dispatch_time = ceph::mono_clock::now();
1030 connection->logger->tinc(l_msgr_running_recv_time,
1031 fast_dispatch_time - connection->recv_start_time);
1032 if (connection->delay_state) {
1033 double delay_period = 0;
1034 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1035 delay_period =
1036 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1037 ldout(cct, 1) << "queue_received will delay after "
1038 << (ceph_clock_now() + delay_period) << " on " << message
1039 << " " << *message << dendl;
1040 }
1041 connection->delay_state->queue(delay_period, message);
1042 } else if (messenger->ms_can_fast_dispatch(message)) {
1043 connection->lock.unlock();
1044 connection->dispatch_queue->fast_dispatch(message);
1045 connection->recv_start_time = ceph::mono_clock::now();
1046 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1047 connection->recv_start_time - fast_dispatch_time);
1048 connection->lock.lock();
1049 } else {
1050 connection->dispatch_queue->enqueue(message, message->get_priority(),
1051 connection->conn_id);
1052 }
1053
1054 out:
1055 // clean up local buffer references
1056 data_buf.clear();
1057 front.clear();
1058 middle.clear();
1059 data.clear();
1060
1061 if (need_dispatch_writer && connection->is_connected()) {
1062 connection->center->dispatch_event_external(connection->write_handler);
1063 }
1064
1065 return CONTINUE(wait_message);
1066 }
1067
1068 void ProtocolV1::session_reset() {
1069 ldout(cct, 10) << __func__ << " started" << dendl;
1070
1071 std::lock_guard<std::mutex> l(connection->write_lock);
1072 if (connection->delay_state) {
1073 connection->delay_state->discard();
1074 }
1075
1076 connection->dispatch_queue->discard_queue(connection->conn_id);
1077 discard_out_queue();
1078 // note: we need to clear outgoing_bl here, but session_reset may be
1079 // called by other thread, so let caller clear this itself!
1080 // outgoing_bl.clear();
1081
1082 connection->dispatch_queue->queue_remote_reset(connection);
1083
1084 randomize_out_seq();
1085
1086 in_seq = 0;
1087 connect_seq = 0;
1088 // it's safe to directly set 0, double locked
1089 ack_left = 0;
1090 once_ready = false;
1091 can_write = WriteStatus::NOWRITE;
1092 }
1093
1094 void ProtocolV1::randomize_out_seq() {
1095 if (connection->get_features() & CEPH_FEATURE_MSG_AUTH) {
1096 // Set out_seq to a random value, so CRC won't be predictable.
1097 auto rand_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
1098 ldout(cct, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
1099 out_seq = rand_seq;
1100 } else {
1101 // previously, seq #'s always started at 0.
1102 out_seq = 0;
1103 }
1104 }
1105
1106 ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) {
1107 FUNCTRACE(cct);
1108 ceph_assert(connection->center->in_thread());
1109 m->set_seq(++out_seq);
1110
1111 if (messenger->crcflags & MSG_CRC_HEADER) {
1112 m->calc_header_crc();
1113 }
1114
1115 ceph_msg_header &header = m->get_header();
1116 ceph_msg_footer &footer = m->get_footer();
1117
1118 // TODO: let sign_message could be reentry?
1119 // Now that we have all the crcs calculated, handle the
1120 // digital signature for the message, if the AsyncConnection has session
1121 // security set up. Some session security options do not
1122 // actually calculate and check the signature, but they should
1123 // handle the calls to sign_message and check_signature. PLR
1124 if (session_security.get() == NULL) {
1125 ldout(cct, 20) << __func__ << " no session security" << dendl;
1126 } else {
1127 if (session_security->sign_message(m)) {
1128 ldout(cct, 20) << __func__ << " failed to sign m=" << m
1129 << "): sig = " << footer.sig << dendl;
1130 } else {
1131 ldout(cct, 20) << __func__ << " signed m=" << m
1132 << "): sig = " << footer.sig << dendl;
1133 }
1134 }
1135
1136 connection->outgoing_bl.append(CEPH_MSGR_TAG_MSG);
1137 connection->outgoing_bl.append((char *)&header, sizeof(header));
1138
1139 ldout(cct, 20) << __func__ << " sending message type=" << header.type
1140 << " src " << entity_name_t(header.src)
1141 << " front=" << header.front_len << " data=" << header.data_len
1142 << " off " << header.data_off << dendl;
1143
1144 if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.get_num_buffers() > 1)) {
1145 for (const auto &pb : bl.buffers()) {
1146 connection->outgoing_bl.append((char *)pb.c_str(), pb.length());
1147 }
1148 } else {
1149 connection->outgoing_bl.claim_append(bl);
1150 }
1151
1152 // send footer; if receiver doesn't support signatures, use the old footer
1153 // format
1154 ceph_msg_footer_old old_footer;
1155 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
1156 connection->outgoing_bl.append((char *)&footer, sizeof(footer));
1157 } else {
1158 if (messenger->crcflags & MSG_CRC_HEADER) {
1159 old_footer.front_crc = footer.front_crc;
1160 old_footer.middle_crc = footer.middle_crc;
1161 } else {
1162 old_footer.front_crc = old_footer.middle_crc = 0;
1163 }
1164 old_footer.data_crc =
1165 messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
1166 old_footer.flags = footer.flags;
1167 connection->outgoing_bl.append((char *)&old_footer, sizeof(old_footer));
1168 }
1169
1170 m->trace.event("async writing message");
1171 ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
1172 << dendl;
1173 ssize_t total_send_size = connection->outgoing_bl.length();
1174 ssize_t rc = connection->_try_send(more);
1175 if (rc < 0) {
1176 ldout(cct, 1) << __func__ << " error sending " << m << ", "
1177 << cpp_strerror(rc) << dendl;
1178 } else {
1179 connection->logger->inc(
1180 l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
1181 ldout(cct, 10) << __func__ << " sending " << m
1182 << (rc ? " continuely." : " done.") << dendl;
1183 }
1184
1185 #if defined(WITH_EVENTTRACE)
1186 if (m->get_type() == CEPH_MSG_OSD_OP)
1187 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
1188 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1189 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
1190 #endif
1191 m->put();
1192
1193 return rc;
1194 }
1195
1196 void ProtocolV1::requeue_sent() {
1197 write_in_progress = false;
1198 if (sent.empty()) {
1199 return;
1200 }
1201
1202 list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1203 out_seq -= sent.size();
1204 while (!sent.empty()) {
1205 Message *m = sent.back();
1206 sent.pop_back();
1207 ldout(cct, 10) << __func__ << " " << *m << " for resend "
1208 << " (" << m->get_seq() << ")" << dendl;
1209 m->clear_payload();
1210 rq.push_front(make_pair(ceph::buffer::list(), m));
1211 }
1212 }
1213
1214 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
1215 ldout(cct, 10) << __func__ << " " << seq << dendl;
1216 std::lock_guard<std::mutex> l(connection->write_lock);
1217 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
1218 return seq;
1219 }
1220 list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1221 uint64_t count = out_seq;
1222 while (!rq.empty()) {
1223 pair<ceph::buffer::list, Message *> p = rq.front();
1224 if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
1225 ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
1226 << p.second->get_seq() << " <= " << seq << ", discarding"
1227 << dendl;
1228 p.second->put();
1229 rq.pop_front();
1230 count++;
1231 }
1232 if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1233 return count;
1234 }
1235
1236 /*
1237 * Tears down the message queues, and removes them from the
1238 * DispatchQueue Must hold write_lock prior to calling.
1239 */
1240 void ProtocolV1::discard_out_queue() {
1241 ldout(cct, 10) << __func__ << " started" << dendl;
1242
1243 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
1244 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
1245 (*p)->put();
1246 }
1247 sent.clear();
1248 for (map<int, list<pair<ceph::buffer::list, Message *> > >::iterator p =
1249 out_q.begin();
1250 p != out_q.end(); ++p) {
1251 for (list<pair<ceph::buffer::list, Message *> >::iterator r = p->second.begin();
1252 r != p->second.end(); ++r) {
1253 ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
1254 r->second->put();
1255 }
1256 }
1257 out_q.clear();
1258 write_in_progress = false;
1259 }
1260
1261 void ProtocolV1::reset_security()
1262 {
1263 ldout(cct, 5) << __func__ << dendl;
1264
1265 auth_meta.reset(new AuthConnectionMeta);
1266 authorizer_more.clear();
1267 session_security.reset();
1268 }
1269
1270 void ProtocolV1::reset_recv_state()
1271 {
1272 ldout(cct, 5) << __func__ << dendl;
1273
1274 // execute in the same thread that uses the `session_security`.
1275 // We need to do the warp because holding `write_lock` is not
1276 // enough as `write_event()` releases it just before calling
1277 // `write_message()`. `submit_to()` here is NOT blocking.
1278 if (!connection->center->in_thread()) {
1279 connection->center->submit_to(connection->center->get_id(), [this] {
1280 ldout(cct, 5) << "reset_recv_state (warped) reseting security handlers"
1281 << dendl;
1282 // Possibly unnecessary. See the comment in `deactivate_existing`.
1283 std::lock_guard<std::mutex> l(connection->lock);
1284 std::lock_guard<std::mutex> wl(connection->write_lock);
1285 reset_security();
1286 }, /* always_async = */true);
1287 } else {
1288 reset_security();
1289 }
1290
1291 // clean read and write callbacks
1292 connection->pendingReadLen.reset();
1293 connection->writeCallback.reset();
1294
1295 if (state > THROTTLE_MESSAGE && state <= READ_FOOTER_AND_DISPATCH &&
1296 connection->policy.throttler_messages) {
1297 ldout(cct, 10) << __func__ << " releasing " << 1
1298 << " message to policy throttler "
1299 << connection->policy.throttler_messages->get_current()
1300 << "/" << connection->policy.throttler_messages->get_max()
1301 << dendl;
1302 connection->policy.throttler_messages->put();
1303 }
1304 if (state > THROTTLE_BYTES && state <= READ_FOOTER_AND_DISPATCH) {
1305 if (connection->policy.throttler_bytes) {
1306 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
1307 << " bytes to policy throttler "
1308 << connection->policy.throttler_bytes->get_current() << "/"
1309 << connection->policy.throttler_bytes->get_max() << dendl;
1310 connection->policy.throttler_bytes->put(cur_msg_size);
1311 }
1312 }
1313 if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_FOOTER_AND_DISPATCH) {
1314 ldout(cct, 10)
1315 << __func__ << " releasing " << cur_msg_size
1316 << " bytes to dispatch_queue throttler "
1317 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1318 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
1319 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
1320 }
1321 }
1322
1323 Message *ProtocolV1::_get_next_outgoing(ceph::buffer::list *bl) {
1324 Message *m = 0;
1325 if (!out_q.empty()) {
1326 map<int, list<pair<ceph::buffer::list, Message *> > >::reverse_iterator it =
1327 out_q.rbegin();
1328 ceph_assert(!it->second.empty());
1329 list<pair<ceph::buffer::list, Message *> >::iterator p = it->second.begin();
1330 m = p->second;
1331 if (p->first.length() && bl) {
1332 assert(bl->length() == 0);
1333 bl->swap(p->first);
1334 }
1335 it->second.erase(p);
1336 if (it->second.empty()) out_q.erase(it->first);
1337 }
1338 return m;
1339 }
1340
1341 /**
1342 * Client Protocol V1
1343 **/
1344
1345 CtPtr ProtocolV1::send_client_banner() {
1346 ldout(cct, 20) << __func__ << dendl;
1347 state = CONNECTING;
1348
1349 ceph::buffer::list bl;
1350 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1351 return WRITE(bl, handle_client_banner_write);
1352 }
1353
1354 CtPtr ProtocolV1::handle_client_banner_write(int r) {
1355 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1356
1357 if (r < 0) {
1358 ldout(cct, 1) << __func__ << " write client banner failed" << dendl;
1359 return _fault();
1360 }
1361 ldout(cct, 10) << __func__ << " connect write banner done: "
1362 << connection->get_peer_addr() << dendl;
1363
1364 return wait_server_banner();
1365 }
1366
1367 CtPtr ProtocolV1::wait_server_banner() {
1368 state = CONNECTING_WAIT_BANNER_AND_IDENTIFY;
1369
1370 ldout(cct, 20) << __func__ << dendl;
1371
1372 ceph::buffer::list myaddrbl;
1373 unsigned banner_len = strlen(CEPH_BANNER);
1374 unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
1375 return READ(need_len, handle_server_banner_and_identify);
1376 }
1377
1378 CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
1379 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1380
1381 if (r < 0) {
1382 ldout(cct, 1) << __func__ << " read banner and identify addresses failed"
1383 << dendl;
1384 return _fault();
1385 }
1386
1387 unsigned banner_len = strlen(CEPH_BANNER);
1388 if (memcmp(buffer, CEPH_BANNER, banner_len)) {
1389 ldout(cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
1390 << connection->get_peer_addr() << dendl;
1391 return _fault();
1392 }
1393
1394 ceph::buffer::list bl;
1395 entity_addr_t paddr, peer_addr_for_me;
1396
1397 bl.append(buffer + banner_len, sizeof(ceph_entity_addr) * 2);
1398 auto p = bl.cbegin();
1399 try {
1400 decode(paddr, p);
1401 decode(peer_addr_for_me, p);
1402 } catch (const ceph::buffer::error &e) {
1403 lderr(cct) << __func__ << " decode peer addr failed " << dendl;
1404 return _fault();
1405 }
1406 ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
1407 << " on socket " << connection->cs.fd() << dendl;
1408
1409 entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
1410 if (peer_addr != paddr) {
1411 if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
1412 peer_addr.get_nonce() == paddr.get_nonce()) {
1413 ldout(cct, 0) << __func__ << " connect claims to be " << paddr << " not "
1414 << peer_addr << " - presumably this is the same node!"
1415 << dendl;
1416 } else {
1417 ldout(cct, 10) << __func__ << " connect claims to be " << paddr << " not "
1418 << peer_addr << dendl;
1419 return _fault();
1420 }
1421 }
1422
1423 ldout(cct, 20) << __func__ << " connect peer addr for me is "
1424 << peer_addr_for_me << dendl;
1425 if (messenger->get_myaddrs().empty() ||
1426 messenger->get_myaddrs().front().is_blank_ip()) {
1427 sockaddr_storage ss;
1428 socklen_t len = sizeof(ss);
1429 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
1430 entity_addr_t a;
1431 if (cct->_conf->ms_learn_addr_from_peer) {
1432 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
1433 << " says I am " << peer_addr_for_me << " (socket says "
1434 << (sockaddr*)&ss << ")" << dendl;
1435 a = peer_addr_for_me;
1436 } else {
1437 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
1438 << " says I am " << (sockaddr*)&ss
1439 << " (peer says " << peer_addr_for_me << ")" << dendl;
1440 a.set_sockaddr((sockaddr *)&ss);
1441 }
1442 a.set_type(entity_addr_t::TYPE_LEGACY); // anything but NONE; learned_addr ignores this
1443 a.set_port(0);
1444 connection->lock.unlock();
1445 messenger->learned_addr(a);
1446 if (cct->_conf->ms_inject_internal_delays &&
1447 cct->_conf->ms_inject_socket_failures) {
1448 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1449 ldout(cct, 10) << __func__ << " sleep for "
1450 << cct->_conf->ms_inject_internal_delays << dendl;
1451 utime_t t;
1452 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1453 t.sleep();
1454 }
1455 }
1456 connection->lock.lock();
1457 if (state != CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1458 ldout(cct, 1) << __func__
1459 << " state changed while learned_addr, mark_down or "
1460 << " replacing must be happened just now" << dendl;
1461 return nullptr;
1462 }
1463 }
1464
1465 ceph::buffer::list myaddrbl;
1466 encode(messenger->get_myaddr_legacy(), myaddrbl, 0); // legacy
1467 return WRITE(myaddrbl, handle_my_addr_write);
1468 }
1469
1470 CtPtr ProtocolV1::handle_my_addr_write(int r) {
1471 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1472
1473 if (r < 0) {
1474 ldout(cct, 2) << __func__ << " connect couldn't write my addr, "
1475 << cpp_strerror(r) << dendl;
1476 return _fault();
1477 }
1478 ldout(cct, 10) << __func__ << " connect sent my addr "
1479 << messenger->get_myaddr_legacy() << dendl;
1480
1481 return CONTINUE(send_connect_message);
1482 }
1483
1484 CtPtr ProtocolV1::send_connect_message()
1485 {
1486 state = CONNECTING_SEND_CONNECT_MSG;
1487
1488 ldout(cct, 20) << __func__ << dendl;
1489 ceph_assert(messenger->auth_client);
1490
1491 ceph::buffer::list auth_bl;
1492 vector<uint32_t> preferred_modes;
1493
1494 if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
1495 messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
1496 if (authorizer_more.length()) {
1497 ldout(cct,10) << __func__ << " using augmented (challenge) auth payload"
1498 << dendl;
1499 auth_bl = authorizer_more;
1500 } else {
1501 auto am = auth_meta;
1502 authorizer_more.clear();
1503 connection->lock.unlock();
1504 int r = messenger->auth_client->get_auth_request(
1505 connection, am.get(),
1506 &am->auth_method, &preferred_modes, &auth_bl);
1507 connection->lock.lock();
1508 if (r < 0) {
1509 return _fault();
1510 }
1511 if (state != CONNECTING_SEND_CONNECT_MSG) {
1512 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1513 return _fault();
1514 }
1515 }
1516 }
1517
1518 ceph_msg_connect connect;
1519 connect.features = connection->policy.features_supported;
1520 connect.host_type = messenger->get_myname().type();
1521 connect.global_seq = global_seq;
1522 connect.connect_seq = connect_seq;
1523 connect.protocol_version =
1524 messenger->get_proto_version(connection->peer_type, true);
1525 if (auth_bl.length()) {
1526 ldout(cct, 10) << __func__
1527 << " connect_msg.authorizer_len=" << auth_bl.length()
1528 << " protocol=" << auth_meta->auth_method << dendl;
1529 connect.authorizer_protocol = auth_meta->auth_method;
1530 connect.authorizer_len = auth_bl.length();
1531 } else {
1532 connect.authorizer_protocol = 0;
1533 connect.authorizer_len = 0;
1534 }
1535
1536 connect.flags = 0;
1537 if (connection->policy.lossy) {
1538 connect.flags |=
1539 CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1540 }
1541
1542 ceph::buffer::list bl;
1543 bl.append((char *)&connect, sizeof(connect));
1544 if (auth_bl.length()) {
1545 bl.append(auth_bl.c_str(), auth_bl.length());
1546 }
1547
1548 ldout(cct, 10) << __func__ << " connect sending gseq=" << global_seq
1549 << " cseq=" << connect_seq
1550 << " proto=" << connect.protocol_version << dendl;
1551
1552 return WRITE(bl, handle_connect_message_write);
1553 }
1554
1555 CtPtr ProtocolV1::handle_connect_message_write(int r) {
1556 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1557
1558 if (r < 0) {
1559 ldout(cct, 2) << __func__ << " connect couldn't send reply "
1560 << cpp_strerror(r) << dendl;
1561 return _fault();
1562 }
1563
1564 ldout(cct, 20) << __func__
1565 << " connect wrote (self +) cseq, waiting for reply" << dendl;
1566
1567 return wait_connect_reply();
1568 }
1569
1570 CtPtr ProtocolV1::wait_connect_reply() {
1571 ldout(cct, 20) << __func__ << dendl;
1572
1573 // FIPS zeroization audit 20191115: this memset is not security related.
1574 memset(&connect_reply, 0, sizeof(connect_reply));
1575 return READ(sizeof(connect_reply), handle_connect_reply_1);
1576 }
1577
1578 CtPtr ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
1579 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1580
1581 if (r < 0) {
1582 ldout(cct, 1) << __func__ << " read connect reply failed" << dendl;
1583 return _fault();
1584 }
1585
1586 connect_reply = *((ceph_msg_connect_reply *)buffer);
1587
1588 ldout(cct, 20) << __func__ << " connect got reply tag "
1589 << (int)connect_reply.tag << " connect_seq "
1590 << connect_reply.connect_seq << " global_seq "
1591 << connect_reply.global_seq << " proto "
1592 << connect_reply.protocol_version << " flags "
1593 << (int)connect_reply.flags << " features "
1594 << connect_reply.features << dendl;
1595
1596 if (connect_reply.authorizer_len) {
1597 return wait_connect_reply_auth();
1598 }
1599
1600 return handle_connect_reply_2();
1601 }
1602
1603 CtPtr ProtocolV1::wait_connect_reply_auth() {
1604 ldout(cct, 20) << __func__ << dendl;
1605
1606 ldout(cct, 10) << __func__
1607 << " reply.authorizer_len=" << connect_reply.authorizer_len
1608 << dendl;
1609
1610 ceph_assert(connect_reply.authorizer_len < 4096);
1611
1612 return READ(connect_reply.authorizer_len, handle_connect_reply_auth);
1613 }
1614
1615 CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
1616 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1617
1618 if (r < 0) {
1619 ldout(cct, 1) << __func__ << " read connect reply authorizer failed"
1620 << dendl;
1621 return _fault();
1622 }
1623
1624 ceph::buffer::list authorizer_reply;
1625 authorizer_reply.append(buffer, connect_reply.authorizer_len);
1626
1627 if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
1628 messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
1629 auto am = auth_meta;
1630 bool more = (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER);
1631 ceph::buffer::list auth_retry_bl;
1632 int r;
1633 connection->lock.unlock();
1634 if (more) {
1635 r = messenger->auth_client->handle_auth_reply_more(
1636 connection, am.get(), authorizer_reply, &auth_retry_bl);
1637 } else {
1638 // these aren't used for v1
1639 CryptoKey skey;
1640 string con_secret;
1641 r = messenger->auth_client->handle_auth_done(
1642 connection, am.get(),
1643 0 /* global id */, 0 /* con mode */,
1644 authorizer_reply,
1645 &skey, &con_secret);
1646 }
1647 connection->lock.lock();
1648 if (state != CONNECTING_SEND_CONNECT_MSG) {
1649 ldout(cct, 1) << __func__ << " state changed" << dendl;
1650 return _fault();
1651 }
1652 if (r < 0) {
1653 return _fault();
1654 }
1655 if (more && r == 0) {
1656 authorizer_more = auth_retry_bl;
1657 return CONTINUE(send_connect_message);
1658 }
1659 }
1660
1661 return handle_connect_reply_2();
1662 }
1663
1664 CtPtr ProtocolV1::handle_connect_reply_2() {
1665 ldout(cct, 20) << __func__ << dendl;
1666
1667 if (connect_reply.tag == CEPH_MSGR_TAG_FEATURES) {
1668 ldout(cct, 0) << __func__ << " connect protocol feature mismatch, my "
1669 << std::hex << connection->policy.features_supported
1670 << " < peer " << connect_reply.features << " missing "
1671 << (connect_reply.features &
1672 ~connection->policy.features_supported)
1673 << std::dec << dendl;
1674 return _fault();
1675 }
1676
1677 if (connect_reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1678 ldout(cct, 0) << __func__ << " connect protocol version mismatch, my "
1679 << messenger->get_proto_version(connection->peer_type, true)
1680 << " != " << connect_reply.protocol_version << dendl;
1681 return _fault();
1682 }
1683
1684 if (connect_reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1685 ldout(cct, 0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1686 authorizer_more.clear();
1687 return _fault();
1688 }
1689
1690 if (connect_reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1691 ldout(cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1692 session_reset();
1693 connect_seq = 0;
1694
1695 // see session_reset
1696 connection->outgoing_bl.clear();
1697
1698 return CONTINUE(send_connect_message);
1699 }
1700
1701 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1702 global_seq = messenger->get_global_seq(connect_reply.global_seq);
1703 ldout(cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1704 << connect_reply.global_seq << " chose new " << global_seq
1705 << dendl;
1706 return CONTINUE(send_connect_message);
1707 }
1708
1709 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1710 ceph_assert(connect_reply.connect_seq > connect_seq);
1711 ldout(cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq
1712 << " -> " << connect_reply.connect_seq << dendl;
1713 connect_seq = connect_reply.connect_seq;
1714 return CONTINUE(send_connect_message);
1715 }
1716
1717 if (connect_reply.tag == CEPH_MSGR_TAG_WAIT) {
1718 ldout(cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1719 state = WAIT;
1720 return _fault();
1721 }
1722
1723 uint64_t feat_missing;
1724 feat_missing =
1725 connection->policy.features_required & ~(uint64_t)connect_reply.features;
1726 if (feat_missing) {
1727 ldout(cct, 1) << __func__ << " missing required features " << std::hex
1728 << feat_missing << std::dec << dendl;
1729 return _fault();
1730 }
1731
1732 if (connect_reply.tag == CEPH_MSGR_TAG_SEQ) {
1733 ldout(cct, 10)
1734 << __func__
1735 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1736 << dendl;
1737
1738 return wait_ack_seq();
1739 }
1740
1741 if (connect_reply.tag == CEPH_MSGR_TAG_READY) {
1742 ldout(cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1743 }
1744
1745 return client_ready();
1746 }
1747
1748 CtPtr ProtocolV1::wait_ack_seq() {
1749 ldout(cct, 20) << __func__ << dendl;
1750
1751 return READ(sizeof(uint64_t), handle_ack_seq);
1752 }
1753
1754 CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
1755 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1756
1757 if (r < 0) {
1758 ldout(cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1759 return _fault();
1760 }
1761
1762 uint64_t newly_acked_seq = 0;
1763
1764 newly_acked_seq = *((uint64_t *)buffer);
1765 ldout(cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1766 << " vs out_seq " << out_seq << dendl;
1767 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
1768
1769 ceph::buffer::list bl;
1770 uint64_t s = in_seq;
1771 bl.append((char *)&s, sizeof(s));
1772
1773 return WRITE(bl, handle_in_seq_write);
1774 }
1775
1776 CtPtr ProtocolV1::handle_in_seq_write(int r) {
1777 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1778
1779 if (r < 0) {
1780 ldout(cct, 10) << __func__ << " failed to send in_seq " << dendl;
1781 return _fault();
1782 }
1783
1784 ldout(cct, 10) << __func__ << " send in_seq done " << dendl;
1785
1786 return client_ready();
1787 }
1788
1789 CtPtr ProtocolV1::client_ready() {
1790 ldout(cct, 20) << __func__ << dendl;
1791
1792 // hooray!
1793 peer_global_seq = connect_reply.global_seq;
1794 connection->policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1795
1796 once_ready = true;
1797 connect_seq += 1;
1798 ceph_assert(connect_seq == connect_reply.connect_seq);
1799 backoff = utime_t();
1800 connection->set_features((uint64_t)connect_reply.features &
1801 (uint64_t)connection->policy.features_supported);
1802 ldout(cct, 10) << __func__ << " connect success " << connect_seq
1803 << ", lossy = " << connection->policy.lossy << ", features "
1804 << connection->get_features() << dendl;
1805
1806 // If we have an authorizer, get a new AuthSessionHandler to deal with
1807 // ongoing security of the connection. PLR
1808 if (auth_meta->authorizer) {
1809 ldout(cct, 10) << __func__ << " setting up session_security with auth "
1810 << auth_meta->authorizer.get() << dendl;
1811 session_security.reset(get_auth_session_handler(
1812 cct, auth_meta->authorizer->protocol,
1813 auth_meta->session_key,
1814 connection->get_features()));
1815 } else {
1816 // We have no authorizer, so we shouldn't be applying security to messages
1817 // in this AsyncConnection. PLR
1818 ldout(cct, 10) << __func__ << " no authorizer, clearing session_security"
1819 << dendl;
1820 session_security.reset();
1821 }
1822
1823 if (connection->delay_state) {
1824 ceph_assert(connection->delay_state->ready());
1825 }
1826 connection->dispatch_queue->queue_connect(connection);
1827 messenger->ms_deliver_handle_fast_connect(connection);
1828
1829 return ready();
1830 }
1831
1832 /**
1833 * Server Protocol V1
1834 **/
1835
1836 CtPtr ProtocolV1::send_server_banner() {
1837 ldout(cct, 20) << __func__ << dendl;
1838 state = ACCEPTING;
1839
1840 ceph::buffer::list bl;
1841
1842 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1843
1844 // as a server, we should have a legacy addr if we accepted this connection.
1845 auto legacy = messenger->get_myaddrs().legacy_addr();
1846 encode(legacy, bl, 0); // legacy
1847 connection->port = legacy.get_port();
1848 encode(connection->target_addr, bl, 0); // legacy
1849
1850 ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd()
1851 << " legacy " << legacy
1852 << " socket_addr " << connection->socket_addr
1853 << " target_addr " << connection->target_addr
1854 << dendl;
1855
1856 return WRITE(bl, handle_server_banner_write);
1857 }
1858
1859 CtPtr ProtocolV1::handle_server_banner_write(int r) {
1860 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1861
1862 if (r < 0) {
1863 ldout(cct, 1) << " write server banner failed" << dendl;
1864 return _fault();
1865 }
1866 ldout(cct, 10) << __func__ << " write banner and addr done: "
1867 << connection->get_peer_addr() << dendl;
1868
1869 return wait_client_banner();
1870 }
1871
1872 CtPtr ProtocolV1::wait_client_banner() {
1873 ldout(cct, 20) << __func__ << dendl;
1874
1875 return READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
1876 handle_client_banner);
1877 }
1878
1879 CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
1880 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1881
1882 if (r < 0) {
1883 ldout(cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1884 return _fault();
1885 }
1886
1887 if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1888 ldout(cct, 1) << __func__ << " accept peer sent bad banner '" << buffer
1889 << "' (should be '" << CEPH_BANNER << "')" << dendl;
1890 return _fault();
1891 }
1892
1893 ceph::buffer::list addr_bl;
1894 entity_addr_t peer_addr;
1895
1896 addr_bl.append(buffer + strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1897 try {
1898 auto ti = addr_bl.cbegin();
1899 decode(peer_addr, ti);
1900 } catch (const ceph::buffer::error &e) {
1901 lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
1902 return _fault();
1903 }
1904
1905 ldout(cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1906 if (peer_addr.is_blank_ip()) {
1907 // peer apparently doesn't know what ip they have; figure it out for them.
1908 int port = peer_addr.get_port();
1909 peer_addr.set_sockaddr(connection->target_addr.get_sockaddr());
1910 peer_addr.set_port(port);
1911
1912 ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1913 << " (socket is " << connection->target_addr << ")" << dendl;
1914 }
1915 connection->set_peer_addr(peer_addr); // so that connection_state gets set up
1916 connection->target_addr = peer_addr;
1917
1918 return CONTINUE(wait_connect_message);
1919 }
1920
1921 CtPtr ProtocolV1::wait_connect_message() {
1922 ldout(cct, 20) << __func__ << dendl;
1923
1924 // FIPS zeroization audit 20191115: this memset is not security related.
1925 memset(&connect_msg, 0, sizeof(connect_msg));
1926 return READ(sizeof(connect_msg), handle_connect_message_1);
1927 }
1928
1929 CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
1930 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1931
1932 if (r < 0) {
1933 ldout(cct, 1) << __func__ << " read connect msg failed" << dendl;
1934 return _fault();
1935 }
1936
1937 connect_msg = *((ceph_msg_connect *)buffer);
1938
1939 state = ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1940
1941 if (connect_msg.authorizer_len) {
1942 return wait_connect_message_auth();
1943 }
1944
1945 return handle_connect_message_2();
1946 }
1947
1948 CtPtr ProtocolV1::wait_connect_message_auth() {
1949 ldout(cct, 20) << __func__ << dendl;
1950 authorizer_buf.clear();
1951 authorizer_buf.push_back(ceph::buffer::create(connect_msg.authorizer_len));
1952 return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
1953 handle_connect_message_auth);
1954 }
1955
1956 CtPtr ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
1957 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1958
1959 if (r < 0) {
1960 ldout(cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1961 return _fault();
1962 }
1963
1964 return handle_connect_message_2();
1965 }
1966
1967 CtPtr ProtocolV1::handle_connect_message_2() {
1968 ldout(cct, 20) << __func__ << dendl;
1969
1970 ldout(cct, 20) << __func__ << " accept got peer connect_seq "
1971 << connect_msg.connect_seq << " global_seq "
1972 << connect_msg.global_seq << dendl;
1973
1974 connection->set_peer_type(connect_msg.host_type);
1975 connection->policy = messenger->get_policy(connect_msg.host_type);
1976
1977 ldout(cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1978 << ", policy.lossy=" << connection->policy.lossy
1979 << " policy.server=" << connection->policy.server
1980 << " policy.standby=" << connection->policy.standby
1981 << " policy.resetcheck=" << connection->policy.resetcheck
1982 << " features 0x" << std::hex << (uint64_t)connect_msg.features
1983 << std::dec
1984 << dendl;
1985
1986 ceph_msg_connect_reply reply;
1987 ceph::buffer::list authorizer_reply;
1988
1989 // FIPS zeroization audit 20191115: this memset is not security related.
1990 memset(&reply, 0, sizeof(reply));
1991 reply.protocol_version =
1992 messenger->get_proto_version(connection->peer_type, false);
1993
1994 // mismatch?
1995 ldout(cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1996 << ", their proto " << connect_msg.protocol_version << dendl;
1997
1998 if (connect_msg.protocol_version != reply.protocol_version) {
1999 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
2000 authorizer_reply);
2001 }
2002
2003 // require signatures for cephx?
2004 if (connect_msg.authorizer_protocol == CEPH_AUTH_CEPHX) {
2005 if (connection->peer_type == CEPH_ENTITY_TYPE_OSD ||
2006 connection->peer_type == CEPH_ENTITY_TYPE_MDS ||
2007 connection->peer_type == CEPH_ENTITY_TYPE_MGR) {
2008 if (cct->_conf->cephx_require_signatures ||
2009 cct->_conf->cephx_cluster_require_signatures) {
2010 ldout(cct, 10)
2011 << __func__
2012 << " using cephx, requiring MSG_AUTH feature bit for cluster"
2013 << dendl;
2014 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
2015 }
2016 if (cct->_conf->cephx_require_version >= 2 ||
2017 cct->_conf->cephx_cluster_require_version >= 2) {
2018 ldout(cct, 10)
2019 << __func__
2020 << " using cephx, requiring cephx v2 feature bit for cluster"
2021 << dendl;
2022 connection->policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
2023 }
2024 } else {
2025 if (cct->_conf->cephx_require_signatures ||
2026 cct->_conf->cephx_service_require_signatures) {
2027 ldout(cct, 10)
2028 << __func__
2029 << " using cephx, requiring MSG_AUTH feature bit for service"
2030 << dendl;
2031 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
2032 }
2033 if (cct->_conf->cephx_require_version >= 2 ||
2034 cct->_conf->cephx_service_require_version >= 2) {
2035 ldout(cct, 10)
2036 << __func__
2037 << " using cephx, requiring cephx v2 feature bit for service"
2038 << dendl;
2039 connection->policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
2040 }
2041 }
2042 }
2043
2044 uint64_t feat_missing =
2045 connection->policy.features_required & ~(uint64_t)connect_msg.features;
2046 if (feat_missing) {
2047 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2048 << feat_missing << std::dec << dendl;
2049 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply,
2050 authorizer_reply);
2051 }
2052
2053 ceph::buffer::list auth_bl_copy = authorizer_buf;
2054 auto am = auth_meta;
2055 am->auth_method = connect_msg.authorizer_protocol;
2056 if (!HAVE_FEATURE((uint64_t)connect_msg.features, CEPHX_V2)) {
2057 // peer doesn't support it and we won't get here if we require it
2058 am->skip_authorizer_challenge = true;
2059 }
2060 connection->lock.unlock();
2061 ldout(cct,10) << __func__ << " authorizor_protocol "
2062 << connect_msg.authorizer_protocol
2063 << " len " << auth_bl_copy.length()
2064 << dendl;
2065 bool more = (bool)auth_meta->authorizer_challenge;
2066 int r = messenger->auth_server->handle_auth_request(
2067 connection,
2068 am.get(),
2069 more,
2070 am->auth_method,
2071 auth_bl_copy,
2072 &authorizer_reply);
2073 if (r < 0) {
2074 connection->lock.lock();
2075 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2076 ldout(cct, 1) << __func__ << " state changed" << dendl;
2077 return _fault();
2078 }
2079 ldout(cct, 0) << __func__ << ": got bad authorizer, auth_reply_len="
2080 << authorizer_reply.length() << dendl;
2081 session_security.reset();
2082 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
2083 authorizer_reply);
2084 }
2085 if (r == 0) {
2086 connection->lock.lock();
2087 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2088 ldout(cct, 1) << __func__ << " state changed" << dendl;
2089 return _fault();
2090 }
2091 ldout(cct, 10) << __func__ << ": challenging authorizer" << dendl;
2092 ceph_assert(authorizer_reply.length());
2093 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER,
2094 reply, authorizer_reply);
2095 }
2096
2097 // We've verified the authorizer for this AsyncConnection, so set up the
2098 // session security structure. PLR
2099 ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
2100
2101 if (connection->policy.server &&
2102 connection->policy.lossy &&
2103 !connection->policy.register_lossy_clients) {
2104 // incoming lossy client, no need to register this connection
2105 // new session
2106 ldout(cct, 10) << __func__ << " accept new session" << dendl;
2107 connection->lock.lock();
2108 return open(reply, authorizer_reply);
2109 }
2110
2111 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2112
2113 connection->inject_delay();
2114
2115 connection->lock.lock();
2116 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2117 ldout(cct, 1) << __func__ << " state changed" << dendl;
2118 return _fault();
2119 }
2120
2121 if (existing == connection) {
2122 existing = nullptr;
2123 }
2124 if (existing && existing->protocol->proto_type != 1) {
2125 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2126 << existing->protocol.get() << " version is "
2127 << existing->protocol->proto_type << ", marking down" << dendl;
2128 existing->mark_down();
2129 existing = nullptr;
2130 }
2131
2132 if (existing) {
2133 // There is no possible that existing connection will acquire this
2134 // connection's lock
2135 existing->lock.lock(); // skip lockdep check (we are locking a second
2136 // AsyncConnection here)
2137
2138 ldout(cct,10) << __func__ << " existing=" << existing << " exproto="
2139 << existing->protocol.get() << dendl;
2140 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2141 ceph_assert(exproto);
2142 ceph_assert(exproto->proto_type == 1);
2143
2144 if (exproto->state == CLOSED) {
2145 ldout(cct, 1) << __func__ << " existing " << existing
2146 << " already closed." << dendl;
2147 existing->lock.unlock();
2148 existing = nullptr;
2149
2150 return open(reply, authorizer_reply);
2151 }
2152
2153 if (exproto->replacing) {
2154 ldout(cct, 1) << __func__
2155 << " existing racing replace happened while replacing."
2156 << " existing_state="
2157 << connection->get_state_name(existing->state) << dendl;
2158 reply.global_seq = exproto->peer_global_seq;
2159 existing->lock.unlock();
2160 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2161 authorizer_reply);
2162 }
2163
2164 if (connect_msg.global_seq < exproto->peer_global_seq) {
2165 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2166 << exproto->peer_global_seq << " > "
2167 << connect_msg.global_seq << ", RETRY_GLOBAL" << dendl;
2168 reply.global_seq = exproto->peer_global_seq; // so we can send it below..
2169 existing->lock.unlock();
2170 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2171 authorizer_reply);
2172 } else {
2173 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2174 << exproto->peer_global_seq
2175 << " <= " << connect_msg.global_seq << ", looks ok"
2176 << dendl;
2177 }
2178
2179 if (existing->policy.lossy) {
2180 ldout(cct, 0)
2181 << __func__
2182 << " accept replacing existing (lossy) channel (new one lossy="
2183 << connection->policy.lossy << ")" << dendl;
2184 exproto->session_reset();
2185 return replace(existing, reply, authorizer_reply);
2186 }
2187
2188 ldout(cct, 1) << __func__ << " accept connect_seq "
2189 << connect_msg.connect_seq
2190 << " vs existing csq=" << exproto->connect_seq
2191 << " existing_state="
2192 << connection->get_state_name(existing->state) << dendl;
2193
2194 if (connect_msg.connect_seq == 0 && exproto->connect_seq > 0) {
2195 ldout(cct, 0)
2196 << __func__
2197 << " accept peer reset, then tried to connect to us, replacing"
2198 << dendl;
2199 // this is a hard reset from peer
2200 is_reset_from_peer = true;
2201 if (connection->policy.resetcheck) {
2202 exproto->session_reset(); // this resets out_queue, msg_ and
2203 // connect_seq #'s
2204 }
2205 return replace(existing, reply, authorizer_reply);
2206 }
2207
2208 if (connect_msg.connect_seq < exproto->connect_seq) {
2209 // old attempt, or we sent READY but they didn't get it.
2210 ldout(cct, 10) << __func__ << " accept existing " << existing << ".cseq "
2211 << exproto->connect_seq << " > " << connect_msg.connect_seq
2212 << ", RETRY_SESSION" << dendl;
2213 reply.connect_seq = exproto->connect_seq + 1;
2214 existing->lock.unlock();
2215 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2216 authorizer_reply);
2217 }
2218
2219 if (connect_msg.connect_seq == exproto->connect_seq) {
2220 // if the existing connection successfully opened, and/or
2221 // subsequently went to standby, then the peer should bump
2222 // their connect_seq and retry: this is not a connection race
2223 // we need to resolve here.
2224 if (exproto->state == OPENED || exproto->state == STANDBY) {
2225 ldout(cct, 10) << __func__ << " accept connection race, existing "
2226 << existing << ".cseq " << exproto->connect_seq
2227 << " == " << connect_msg.connect_seq
2228 << ", OPEN|STANDBY, RETRY_SESSION " << dendl;
2229 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2230 // replace
2231 if (connection->policy.resetcheck && exproto->connect_seq == 0) {
2232 return replace(existing, reply, authorizer_reply);
2233 }
2234
2235 reply.connect_seq = exproto->connect_seq + 1;
2236 existing->lock.unlock();
2237 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2238 authorizer_reply);
2239 }
2240
2241 // connection race?
2242 if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr_legacy() ||
2243 existing->policy.server) {
2244 // incoming wins
2245 ldout(cct, 10) << __func__ << " accept connection race, existing "
2246 << existing << ".cseq " << exproto->connect_seq
2247 << " == " << connect_msg.connect_seq
2248 << ", or we are server, replacing my attempt" << dendl;
2249 return replace(existing, reply, authorizer_reply);
2250 } else {
2251 // our existing outgoing wins
2252 ldout(messenger->cct, 10)
2253 << __func__ << " accept connection race, existing " << existing
2254 << ".cseq " << exproto->connect_seq
2255 << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
2256 ceph_assert(connection->peer_addrs->legacy_addr() >
2257 messenger->get_myaddr_legacy());
2258 existing->lock.unlock();
2259 // make sure we follow through with opening the existing
2260 // connection (if it isn't yet open) since we know the peer
2261 // has something to send to us.
2262 existing->send_keepalive();
2263 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
2264 authorizer_reply);
2265 }
2266 }
2267
2268 ceph_assert(connect_msg.connect_seq > exproto->connect_seq);
2269 ceph_assert(connect_msg.global_seq >= exproto->peer_global_seq);
2270 if (connection->policy.resetcheck && // RESETSESSION only used by servers;
2271 // peers do not reset each other
2272 exproto->connect_seq == 0) {
2273 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2274 << connect_msg.connect_seq << ", " << existing
2275 << ".cseq = " << exproto->connect_seq
2276 << "), sending RESETSESSION " << dendl;
2277 existing->lock.unlock();
2278 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2279 authorizer_reply);
2280 }
2281
2282 // reconnect
2283 ldout(cct, 10) << __func__ << " accept peer sent cseq "
2284 << connect_msg.connect_seq << " > " << exproto->connect_seq
2285 << dendl;
2286 return replace(existing, reply, authorizer_reply);
2287 } // existing
2288 else if (!replacing && connect_msg.connect_seq > 0) {
2289 // we reset, and they are opening a new session
2290 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2291 << connect_msg.connect_seq << "), sending RESETSESSION"
2292 << dendl;
2293 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2294 authorizer_reply);
2295 } else {
2296 // new session
2297 ldout(cct, 10) << __func__ << " accept new session" << dendl;
2298 existing = nullptr;
2299 return open(reply, authorizer_reply);
2300 }
2301 }
2302
2303 CtPtr ProtocolV1::send_connect_message_reply(char tag,
2304 ceph_msg_connect_reply &reply,
2305 ceph::buffer::list &authorizer_reply) {
2306 ldout(cct, 20) << __func__ << dendl;
2307 ceph::buffer::list reply_bl;
2308 reply.tag = tag;
2309 reply.features =
2310 ((uint64_t)connect_msg.features & connection->policy.features_supported) |
2311 connection->policy.features_required;
2312 reply.authorizer_len = authorizer_reply.length();
2313 reply_bl.append((char *)&reply, sizeof(reply));
2314
2315 ldout(cct, 10) << __func__ << " reply features 0x" << std::hex
2316 << reply.features << " = (policy sup 0x"
2317 << connection->policy.features_supported
2318 << " & connect 0x" << (uint64_t)connect_msg.features
2319 << ") | policy req 0x"
2320 << connection->policy.features_required
2321 << dendl;
2322
2323 if (reply.authorizer_len) {
2324 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2325 authorizer_reply.clear();
2326 }
2327
2328 return WRITE(reply_bl, handle_connect_message_reply_write);
2329 }
2330
2331 CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
2332 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2333
2334 if (r < 0) {
2335 ldout(cct, 1) << " write connect message reply failed" << dendl;
2336 connection->inject_delay();
2337 return _fault();
2338 }
2339
2340 return CONTINUE(wait_connect_message);
2341 }
2342
2343 CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
2344 ceph_msg_connect_reply &reply,
2345 ceph::buffer::list &authorizer_reply) {
2346 ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
2347
2348 connection->inject_delay();
2349 if (existing->policy.lossy) {
2350 // disconnect from the Connection
2351 ldout(cct, 1) << __func__ << " replacing on lossy channel, failing existing"
2352 << dendl;
2353 existing->protocol->stop();
2354 existing->dispatch_queue->queue_reset(existing.get());
2355 } else {
2356 ceph_assert(can_write == WriteStatus::NOWRITE);
2357 existing->write_lock.lock();
2358
2359 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2360
2361 // reset the in_seq if this is a hard reset from peer,
2362 // otherwise we respect our original connection's value
2363 if (is_reset_from_peer) {
2364 exproto->is_reset_from_peer = true;
2365 }
2366
2367 connection->center->delete_file_event(connection->cs.fd(),
2368 EVENT_READABLE | EVENT_WRITABLE);
2369
2370 if (existing->delay_state) {
2371 existing->delay_state->flush();
2372 ceph_assert(!connection->delay_state);
2373 }
2374 exproto->reset_recv_state();
2375
2376 exproto->connect_msg.features = connect_msg.features;
2377
2378 auto temp_cs = std::move(connection->cs);
2379 EventCenter *new_center = connection->center;
2380 Worker *new_worker = connection->worker;
2381 // avoid _stop shutdown replacing socket
2382 // queue a reset on the new connection, which we're dumping for the old
2383 stop();
2384
2385 connection->dispatch_queue->queue_reset(connection);
2386 ldout(messenger->cct, 1)
2387 << __func__ << " stop myself to swap existing" << dendl;
2388 exproto->can_write = WriteStatus::REPLACING;
2389 exproto->replacing = true;
2390 exproto->write_in_progress = false;
2391 existing->state_offset = 0;
2392 // avoid previous thread modify event
2393 exproto->state = NONE;
2394 existing->state = AsyncConnection::STATE_NONE;
2395 // Discard existing prefetch buffer in `recv_buf`
2396 existing->recv_start = existing->recv_end = 0;
2397 // there shouldn't exist any buffer
2398 ceph_assert(connection->recv_start == connection->recv_end);
2399
2400 auto deactivate_existing = std::bind(
2401 [existing, new_worker, new_center, exproto, reply,
2402 authorizer_reply](ConnectedSocket &cs) mutable {
2403 // we need to delete time event in original thread
2404 {
2405 std::lock_guard<std::mutex> l(existing->lock);
2406 existing->write_lock.lock();
2407 exproto->requeue_sent();
2408 existing->outgoing_bl.clear();
2409 existing->open_write = false;
2410 existing->write_lock.unlock();
2411 if (exproto->state == NONE) {
2412 existing->shutdown_socket();
2413 existing->cs = std::move(cs);
2414 existing->worker->references--;
2415 new_worker->references++;
2416 existing->logger = new_worker->get_perf_counter();
2417 existing->worker = new_worker;
2418 existing->center = new_center;
2419 if (existing->delay_state)
2420 existing->delay_state->set_center(new_center);
2421 } else if (exproto->state == CLOSED) {
2422 auto back_to_close =
2423 std::bind([](ConnectedSocket &cs) mutable { cs.close(); },
2424 std::move(cs));
2425 new_center->submit_to(new_center->get_id(),
2426 std::move(back_to_close), true);
2427 return;
2428 } else {
2429 ceph_abort();
2430 }
2431 }
2432
2433 // Before changing existing->center, it may already exists some
2434 // events in existing->center's queue. Then if we mark down
2435 // `existing`, it will execute in another thread and clean up
2436 // connection. Previous event will result in segment fault
2437 auto transfer_existing = [existing, exproto, reply,
2438 authorizer_reply]() mutable {
2439 std::lock_guard<std::mutex> l(existing->lock);
2440 if (exproto->state == CLOSED) return;
2441 ceph_assert(exproto->state == NONE);
2442
2443 // we have called shutdown_socket above
2444 ceph_assert(existing->last_tick_id == 0);
2445 // restart timer since we are going to re-build connection
2446 existing->last_connect_started = ceph::coarse_mono_clock::now();
2447 existing->last_tick_id = existing->center->create_time_event(
2448 existing->connect_timeout_us, existing->tick_handler);
2449 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2450 exproto->state = ACCEPTING;
2451
2452 existing->center->create_file_event(
2453 existing->cs.fd(), EVENT_READABLE, existing->read_handler);
2454 reply.global_seq = exproto->peer_global_seq;
2455 exproto->run_continuation(exproto->send_connect_message_reply(
2456 CEPH_MSGR_TAG_RETRY_GLOBAL, reply, authorizer_reply));
2457 };
2458 if (existing->center->in_thread())
2459 transfer_existing();
2460 else
2461 existing->center->submit_to(existing->center->get_id(),
2462 std::move(transfer_existing), true);
2463 },
2464 std::move(temp_cs));
2465
2466 existing->center->submit_to(existing->center->get_id(),
2467 std::move(deactivate_existing), true);
2468 existing->write_lock.unlock();
2469 existing->lock.unlock();
2470 return nullptr;
2471 }
2472 existing->lock.unlock();
2473
2474 return open(reply, authorizer_reply);
2475 }
2476
2477 CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
2478 ceph::buffer::list &authorizer_reply) {
2479 ldout(cct, 20) << __func__ << dendl;
2480
2481 connect_seq = connect_msg.connect_seq + 1;
2482 peer_global_seq = connect_msg.global_seq;
2483 ldout(cct, 10) << __func__ << " accept success, connect_seq = " << connect_seq
2484 << " in_seq=" << in_seq << ", sending READY" << dendl;
2485
2486 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2487 // in/out sequence
2488 if ((connect_msg.features & CEPH_FEATURE_RECONNECT_SEQ) &&
2489 !is_reset_from_peer) {
2490 reply.tag = CEPH_MSGR_TAG_SEQ;
2491 wait_for_seq = true;
2492 } else {
2493 reply.tag = CEPH_MSGR_TAG_READY;
2494 wait_for_seq = false;
2495 out_seq = discard_requeued_up_to(out_seq, 0);
2496 is_reset_from_peer = false;
2497 in_seq = 0;
2498 }
2499
2500 // send READY reply
2501 reply.features = connection->policy.features_supported;
2502 reply.global_seq = messenger->get_global_seq();
2503 reply.connect_seq = connect_seq;
2504 reply.flags = 0;
2505 reply.authorizer_len = authorizer_reply.length();
2506 if (connection->policy.lossy) {
2507 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
2508 }
2509
2510 connection->set_features((uint64_t)reply.features &
2511 (uint64_t)connect_msg.features);
2512 ldout(cct, 10) << __func__ << " accept features "
2513 << connection->get_features()
2514 << " authorizer_protocol "
2515 << connect_msg.authorizer_protocol << dendl;
2516
2517 session_security.reset(
2518 get_auth_session_handler(cct, auth_meta->auth_method,
2519 auth_meta->session_key,
2520 connection->get_features()));
2521
2522 ceph::buffer::list reply_bl;
2523 reply_bl.append((char *)&reply, sizeof(reply));
2524
2525 if (reply.authorizer_len) {
2526 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2527 }
2528
2529 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
2530 uint64_t s = in_seq;
2531 reply_bl.append((char *)&s, sizeof(s));
2532 }
2533
2534 connection->lock.unlock();
2535 // Because "replacing" will prevent other connections preempt this addr,
2536 // it's safe that here we don't acquire Connection's lock
2537 ssize_t r = messenger->accept_conn(connection);
2538
2539 connection->inject_delay();
2540
2541 connection->lock.lock();
2542 replacing = false;
2543 if (r < 0) {
2544 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2545 << connection->peer_addrs->legacy_addr()
2546 << " just fail later one(this)" << dendl;
2547 ldout(cct, 10) << "accept fault after register" << dendl;
2548 connection->inject_delay();
2549 return _fault();
2550 }
2551 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2552 ldout(cct, 1) << __func__
2553 << " state changed while accept_conn, it must be mark_down"
2554 << dendl;
2555 ceph_assert(state == CLOSED || state == NONE);
2556 ldout(cct, 10) << "accept fault after register" << dendl;
2557 messenger->unregister_conn(connection);
2558 connection->inject_delay();
2559 return _fault();
2560 }
2561
2562 return WRITE(reply_bl, handle_ready_connect_message_reply_write);
2563 }
2564
2565 CtPtr ProtocolV1::handle_ready_connect_message_reply_write(int r) {
2566 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2567
2568 if (r < 0) {
2569 ldout(cct, 1) << __func__ << " write ready connect message reply failed"
2570 << dendl;
2571 return _fault();
2572 }
2573
2574 // notify
2575 connection->dispatch_queue->queue_accept(connection);
2576 messenger->ms_deliver_handle_fast_accept(connection);
2577 once_ready = true;
2578
2579 state = ACCEPTING_HANDLED_CONNECT_MSG;
2580
2581 if (wait_for_seq) {
2582 return wait_seq();
2583 }
2584
2585 return server_ready();
2586 }
2587
2588 CtPtr ProtocolV1::wait_seq() {
2589 ldout(cct, 20) << __func__ << dendl;
2590
2591 return READ(sizeof(uint64_t), handle_seq);
2592 }
2593
2594 CtPtr ProtocolV1::handle_seq(char *buffer, int r) {
2595 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2596
2597 if (r < 0) {
2598 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
2599 return _fault();
2600 }
2601
2602 uint64_t newly_acked_seq = *(uint64_t *)buffer;
2603 ldout(cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq
2604 << dendl;
2605 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
2606
2607 return server_ready();
2608 }
2609
2610 CtPtr ProtocolV1::server_ready() {
2611 ldout(cct, 20) << __func__ << " session_security is "
2612 << session_security
2613 << dendl;
2614
2615 ldout(cct, 20) << __func__ << " accept done" << dendl;
2616 // FIPS zeroization audit 20191115: this memset is not security related.
2617 memset(&connect_msg, 0, sizeof(connect_msg));
2618
2619 if (connection->delay_state) {
2620 ceph_assert(connection->delay_state->ready());
2621 }
2622
2623 return ready();
2624 }