]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/ProtocolV1.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / msg / async / ProtocolV1.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV1.h"
5
6 #include "common/errno.h"
7
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
12 #include "auth/AuthClient.h"
13 #include "auth/AuthServer.h"
14
15 #define dout_subsys ceph_subsys_ms
16 #undef dout_prefix
17 #define dout_prefix _conn_prefix(_dout)
18 std::ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
19 return *_dout << "--1- " << messenger->get_myaddrs() << " >> "
20 << *connection->peer_addrs
21 << " conn("
22 << connection << " " << this
23 << " :" << connection->port << " s=" << get_state_name(state)
24 << " pgs=" << peer_global_seq << " cs=" << connect_seq
25 << " l=" << connection->policy.lossy << ").";
26 }
27
28 #define WRITE(B, C) write(CONTINUATION(C), B)
29
30 #define READ(L, C) read(CONTINUATION(C), L)
31
32 #define READB(L, B, C) read(CONTINUATION(C), L, B)
33
34 // Constant to limit starting sequence number to 2^31. Nothing special about
35 // it, just a big number. PLR
36 #define SEQ_MASK 0x7fffffff
37
38 const int ASYNC_COALESCE_THRESHOLD = 256;
39
40 using namespace std;
41
42 static void alloc_aligned_buffer(ceph::buffer::list &data, unsigned len, unsigned off) {
43 // create a buffer to read into that matches the data alignment
44 unsigned alloc_len = 0;
45 unsigned left = len;
46 unsigned head = 0;
47 if (off & ~CEPH_PAGE_MASK) {
48 // head
49 alloc_len += CEPH_PAGE_SIZE;
50 head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
51 left -= head;
52 }
53 alloc_len += left;
54 ceph::bufferptr ptr(ceph::buffer::create_small_page_aligned(alloc_len));
55 if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
56 data.push_back(std::move(ptr));
57 }
58
59 /**
60 * Protocol V1
61 **/
62
63 ProtocolV1::ProtocolV1(AsyncConnection *connection)
64 : Protocol(1, connection),
65 temp_buffer(nullptr),
66 can_write(WriteStatus::NOWRITE),
67 keepalive(false),
68 connect_seq(0),
69 peer_global_seq(0),
70 msg_left(0),
71 cur_msg_size(0),
72 replacing(false),
73 is_reset_from_peer(false),
74 once_ready(false),
75 state(NONE),
76 global_seq(0),
77 wait_for_seq(false) {
78 temp_buffer = new char[4096];
79 }
80
81 ProtocolV1::~ProtocolV1() {
82 ceph_assert(out_q.empty());
83 ceph_assert(sent.empty());
84
85 delete[] temp_buffer;
86 }
87
88 void ProtocolV1::connect() {
89 this->state = START_CONNECT;
90
91 // reset connect state variables
92 authorizer_buf.clear();
93 // FIPS zeroization audit 20191115: these memsets are not security related.
94 memset(&connect_msg, 0, sizeof(connect_msg));
95 memset(&connect_reply, 0, sizeof(connect_reply));
96
97 global_seq = messenger->get_global_seq();
98 }
99
100 void ProtocolV1::accept() { this->state = START_ACCEPT; }
101
102 bool ProtocolV1::is_connected() {
103 return can_write.load() == WriteStatus::CANWRITE;
104 }
105
106 void ProtocolV1::stop() {
107 ldout(cct, 20) << __func__ << dendl;
108 if (state == CLOSED) {
109 return;
110 }
111
112 if (connection->delay_state) connection->delay_state->flush();
113
114 ldout(cct, 2) << __func__ << dendl;
115 std::lock_guard<std::mutex> l(connection->write_lock);
116
117 reset_recv_state();
118 discard_out_queue();
119
120 connection->_stop();
121
122 can_write = WriteStatus::CLOSED;
123 state = CLOSED;
124 }
125
126 void ProtocolV1::fault() {
127 ldout(cct, 20) << __func__ << dendl;
128
129 if (state == CLOSED || state == NONE) {
130 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
131 return;
132 }
133
134 if (connection->policy.lossy && state != START_CONNECT &&
135 state != CONNECTING) {
136 ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
137 stop();
138 connection->dispatch_queue->queue_reset(connection);
139 return;
140 }
141
142 connection->write_lock.lock();
143 can_write = WriteStatus::NOWRITE;
144 is_reset_from_peer = false;
145
146 // requeue sent items
147 requeue_sent();
148
149 if (!once_ready && out_q.empty() && state >= START_ACCEPT &&
150 state <= ACCEPTING_WAIT_CONNECT_MSG_AUTH && !replacing) {
151 ldout(cct, 10) << __func__ << " with nothing to send and in the half "
152 << " accept state just closed" << dendl;
153 connection->write_lock.unlock();
154 stop();
155 connection->dispatch_queue->queue_reset(connection);
156 return;
157 }
158 replacing = false;
159
160 connection->fault();
161
162 reset_recv_state();
163
164 if (connection->policy.standby && out_q.empty() && !keepalive &&
165 state != WAIT) {
166 ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
167 << dendl;
168 state = STANDBY;
169 connection->write_lock.unlock();
170 return;
171 }
172
173 connection->write_lock.unlock();
174
175 if ((state >= START_CONNECT && state <= CONNECTING_SEND_CONNECT_MSG) ||
176 state == WAIT) {
177 // backoff!
178 if (state == WAIT) {
179 backoff.set_from_double(cct->_conf->ms_max_backoff);
180 } else if (backoff == utime_t()) {
181 backoff.set_from_double(cct->_conf->ms_initial_backoff);
182 } else {
183 backoff += backoff;
184 if (backoff > cct->_conf->ms_max_backoff)
185 backoff.set_from_double(cct->_conf->ms_max_backoff);
186 }
187
188 global_seq = messenger->get_global_seq();
189 state = START_CONNECT;
190 connection->state = AsyncConnection::STATE_CONNECTING;
191 ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
192 // woke up again;
193 connection->register_time_events.insert(
194 connection->center->create_time_event(backoff.to_nsec() / 1000,
195 connection->wakeup_handler));
196 } else {
197 // policy maybe empty when state is in accept
198 if (connection->policy.server) {
199 ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
200 state = STANDBY;
201 } else {
202 ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
203 connect_seq++;
204 global_seq = messenger->get_global_seq();
205 state = START_CONNECT;
206 connection->state = AsyncConnection::STATE_CONNECTING;
207 }
208 backoff = utime_t();
209 connection->center->dispatch_event_external(connection->read_handler);
210 }
211 }
212
213 void ProtocolV1::send_message(Message *m) {
214 ceph::buffer::list bl;
215 uint64_t f = connection->get_features();
216
217 // TODO: Currently not all messages supports reencode like MOSDMap, so here
218 // only let fast dispatch support messages prepare message
219 bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
220 if (can_fast_prepare) {
221 prepare_send_message(f, m, bl);
222 }
223
224 std::lock_guard<std::mutex> l(connection->write_lock);
225 // "features" changes will change the payload encoding
226 if (can_fast_prepare &&
227 (can_write == WriteStatus::NOWRITE || connection->get_features() != f)) {
228 // ensure the correctness of message encoding
229 bl.clear();
230 m->clear_payload();
231 ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
232 << " != " << connection->get_features() << dendl;
233 }
234 if (can_write == WriteStatus::CLOSED) {
235 ldout(cct, 10) << __func__ << " connection closed."
236 << " Drop message " << m << dendl;
237 m->put();
238 } else {
239 m->queue_start = ceph::mono_clock::now();
240 m->trace.event("async enqueueing message");
241 out_q[m->get_priority()].emplace_back(std::move(bl), m);
242 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
243 << dendl;
244 if (can_write != WriteStatus::REPLACING && !write_in_progress) {
245 write_in_progress = true;
246 connection->center->dispatch_event_external(connection->write_handler);
247 }
248 }
249 }
250
251 void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
252 ceph::buffer::list &bl) {
253 ldout(cct, 20) << __func__ << " m " << *m << dendl;
254
255 // associate message with Connection (for benefit of encode_payload)
256 ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
257 << features << " " << m << " " << *m << dendl;
258
259 // encode and copy out of *m
260 // in write_message we update header.seq and need recalc crc
261 // so skip calc header in encode function.
262 m->encode(features, messenger->crcflags, true);
263
264 bl.append(m->get_payload());
265 bl.append(m->get_middle());
266 bl.append(m->get_data());
267 }
268
269 void ProtocolV1::send_keepalive() {
270 ldout(cct, 10) << __func__ << dendl;
271 std::lock_guard<std::mutex> l(connection->write_lock);
272 if (can_write != WriteStatus::CLOSED) {
273 keepalive = true;
274 connection->center->dispatch_event_external(connection->write_handler);
275 }
276 }
277
278 void ProtocolV1::read_event() {
279 ldout(cct, 20) << __func__ << dendl;
280 switch (state) {
281 case START_CONNECT:
282 CONTINUATION_RUN(CONTINUATION(send_client_banner));
283 break;
284 case START_ACCEPT:
285 CONTINUATION_RUN(CONTINUATION(send_server_banner));
286 break;
287 case OPENED:
288 CONTINUATION_RUN(CONTINUATION(wait_message));
289 break;
290 case THROTTLE_MESSAGE:
291 CONTINUATION_RUN(CONTINUATION(throttle_message));
292 break;
293 case THROTTLE_BYTES:
294 CONTINUATION_RUN(CONTINUATION(throttle_bytes));
295 break;
296 case THROTTLE_DISPATCH_QUEUE:
297 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
298 break;
299 default:
300 break;
301 }
302 }
303
304 void ProtocolV1::write_event() {
305 ldout(cct, 10) << __func__ << dendl;
306 ssize_t r = 0;
307
308 connection->write_lock.lock();
309 if (can_write == WriteStatus::CANWRITE) {
310 if (keepalive) {
311 append_keepalive_or_ack();
312 keepalive = false;
313 }
314
315 auto start = ceph::mono_clock::now();
316 bool more;
317 do {
318 ceph::buffer::list data;
319 Message *m = _get_next_outgoing(&data);
320 if (!m) {
321 break;
322 }
323
324 if (!connection->policy.lossy) {
325 // put on sent list
326 sent.push_back(m);
327 m->get();
328 }
329 more = !out_q.empty();
330 connection->write_lock.unlock();
331
332 // send_message or requeue messages may not encode message
333 if (!data.length()) {
334 prepare_send_message(connection->get_features(), m, data);
335 }
336
337 if (m->queue_start != ceph::mono_time()) {
338 connection->logger->tinc(l_msgr_send_messages_queue_lat,
339 ceph::mono_clock::now() - m->queue_start);
340 }
341
342 r = write_message(m, data, more);
343
344 connection->write_lock.lock();
345 if (r == 0) {
346 ;
347 } else if (r < 0) {
348 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
349 break;
350 } else if (r > 0) {
351 // Outbound message in-progress, thread will be re-awoken
352 // when the outbound socket is writeable again
353 break;
354 }
355 } while (can_write == WriteStatus::CANWRITE);
356 write_in_progress = false;
357 connection->write_lock.unlock();
358
359 // if r > 0 mean data still lefted, so no need _try_send.
360 if (r == 0) {
361 uint64_t left = ack_left;
362 if (left) {
363 ceph_le64 s;
364 s = in_seq;
365 connection->outgoing_bl.append(CEPH_MSGR_TAG_ACK);
366 connection->outgoing_bl.append((char *)&s, sizeof(s));
367 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
368 << " messages" << dendl;
369 ack_left -= left;
370 left = ack_left;
371 r = connection->_try_send(left);
372 } else if (is_queued()) {
373 r = connection->_try_send();
374 }
375 }
376
377 connection->logger->tinc(l_msgr_running_send_time,
378 ceph::mono_clock::now() - start);
379 if (r < 0) {
380 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
381 connection->lock.lock();
382 fault();
383 connection->lock.unlock();
384 return;
385 }
386 } else {
387 write_in_progress = false;
388 connection->write_lock.unlock();
389 connection->lock.lock();
390 connection->write_lock.lock();
391 if (state == STANDBY && !connection->policy.server && is_queued()) {
392 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
393 connection->_connect();
394 } else if (connection->cs && state != NONE && state != CLOSED &&
395 state != START_CONNECT) {
396 r = connection->_try_send();
397 if (r < 0) {
398 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
399 connection->write_lock.unlock();
400 fault();
401 connection->lock.unlock();
402 return;
403 }
404 }
405 connection->write_lock.unlock();
406 connection->lock.unlock();
407 }
408 }
409
410 bool ProtocolV1::is_queued() {
411 return !out_q.empty() || connection->is_queued();
412 }
413
414 void ProtocolV1::run_continuation(CtPtr pcontinuation) {
415 if (pcontinuation) {
416 CONTINUATION_RUN(*pcontinuation);
417 }
418 }
419
420 CtPtr ProtocolV1::read(CONTINUATION_RX_TYPE<ProtocolV1> &next,
421 int len, char *buffer) {
422 if (!buffer) {
423 buffer = temp_buffer;
424 }
425 ssize_t r = connection->read(len, buffer,
426 [&next, this](char *buffer, int r) {
427 next.setParams(buffer, r);
428 CONTINUATION_RUN(next);
429 });
430 if (r <= 0) {
431 next.setParams(buffer, r);
432 return &next;
433 }
434
435 return nullptr;
436 }
437
438 CtPtr ProtocolV1::write(CONTINUATION_TX_TYPE<ProtocolV1> &next,
439 ceph::buffer::list &buffer) {
440 ssize_t r = connection->write(buffer, [&next, this](int r) {
441 next.setParams(r);
442 CONTINUATION_RUN(next);
443 });
444 if (r <= 0) {
445 next.setParams(r);
446 return &next;
447 }
448
449 return nullptr;
450 }
451
452 CtPtr ProtocolV1::ready() {
453 ldout(cct, 25) << __func__ << dendl;
454
455 // make sure no pending tick timer
456 if (connection->last_tick_id) {
457 connection->center->delete_time_event(connection->last_tick_id);
458 }
459 connection->last_tick_id = connection->center->create_time_event(
460 connection->inactive_timeout_us, connection->tick_handler);
461
462 connection->write_lock.lock();
463 can_write = WriteStatus::CANWRITE;
464 if (is_queued()) {
465 connection->center->dispatch_event_external(connection->write_handler);
466 }
467 connection->write_lock.unlock();
468 connection->maybe_start_delay_thread();
469
470 state = OPENED;
471 return wait_message();
472 }
473
474 CtPtr ProtocolV1::wait_message() {
475 if (state != OPENED) { // must have changed due to a replace
476 return nullptr;
477 }
478
479 ldout(cct, 20) << __func__ << dendl;
480
481 return READ(sizeof(char), handle_message);
482 }
483
484 CtPtr ProtocolV1::handle_message(char *buffer, int r) {
485 ldout(cct, 20) << __func__ << " r=" << r << dendl;
486
487 if (r < 0) {
488 ldout(cct, 1) << __func__ << " read tag failed" << dendl;
489 return _fault();
490 }
491
492 char tag = buffer[0];
493 ldout(cct, 20) << __func__ << " process tag " << (int)tag << dendl;
494
495 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
496 ldout(cct, 20) << __func__ << " got KEEPALIVE" << dendl;
497 connection->set_last_keepalive(ceph_clock_now());
498 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
499 return READ(sizeof(ceph_timespec), handle_keepalive2);
500 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
501 return READ(sizeof(ceph_timespec), handle_keepalive2_ack);
502 } else if (tag == CEPH_MSGR_TAG_ACK) {
503 return READ(sizeof(ceph_le64), handle_tag_ack);
504 } else if (tag == CEPH_MSGR_TAG_MSG) {
505 recv_stamp = ceph_clock_now();
506 ldout(cct, 20) << __func__ << " begin MSG" << dendl;
507 return READ(sizeof(ceph_msg_header), handle_message_header);
508 } else if (tag == CEPH_MSGR_TAG_CLOSE) {
509 ldout(cct, 20) << __func__ << " got CLOSE" << dendl;
510 stop();
511 } else {
512 ldout(cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
513 return _fault();
514 }
515 return nullptr;
516 }
517
518 CtPtr ProtocolV1::handle_keepalive2(char *buffer, int r) {
519 ldout(cct, 20) << __func__ << " r=" << r << dendl;
520
521 if (r < 0) {
522 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
523 return _fault();
524 }
525
526 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
527
528 ceph_timespec *t;
529 t = (ceph_timespec *)buffer;
530 utime_t kp_t = utime_t(*t);
531 connection->write_lock.lock();
532 append_keepalive_or_ack(true, &kp_t);
533 connection->write_lock.unlock();
534
535 ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
536 connection->set_last_keepalive(ceph_clock_now());
537
538 if (is_connected()) {
539 connection->center->dispatch_event_external(connection->write_handler);
540 }
541
542 return CONTINUE(wait_message);
543 }
544
545 void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
546 ldout(cct, 10) << __func__ << dendl;
547 if (ack) {
548 ceph_assert(tp);
549 struct ceph_timespec ts;
550 tp->encode_timeval(&ts);
551 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
552 connection->outgoing_bl.append((char *)&ts, sizeof(ts));
553 } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
554 struct ceph_timespec ts;
555 utime_t t = ceph_clock_now();
556 t.encode_timeval(&ts);
557 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
558 connection->outgoing_bl.append((char *)&ts, sizeof(ts));
559 } else {
560 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
561 }
562 }
563
564 CtPtr ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
565 ldout(cct, 20) << __func__ << " r=" << r << dendl;
566
567 if (r < 0) {
568 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
569 return _fault();
570 }
571
572 ceph_timespec *t;
573 t = (ceph_timespec *)buffer;
574 connection->set_last_keepalive_ack(utime_t(*t));
575 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
576
577 return CONTINUE(wait_message);
578 }
579
580 CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
581 ldout(cct, 20) << __func__ << " r=" << r << dendl;
582
583 if (r < 0) {
584 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
585 return _fault();
586 }
587
588 ceph_le64 seq;
589 seq = *(ceph_le64 *)buffer;
590 ldout(cct, 20) << __func__ << " got ACK" << dendl;
591
592 ldout(cct, 15) << __func__ << " got ack seq " << seq << dendl;
593 // trim sent list
594 static const int max_pending = 128;
595 int i = 0;
596 auto now = ceph::mono_clock::now();
597 Message *pending[max_pending];
598 connection->write_lock.lock();
599 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
600 Message *m = sent.front();
601 sent.pop_front();
602 pending[i++] = m;
603 ldout(cct, 10) << __func__ << " got ack seq " << seq
604 << " >= " << m->get_seq() << " on " << m << " " << *m
605 << dendl;
606 }
607 connection->write_lock.unlock();
608 connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
609 for (int k = 0; k < i; k++) {
610 pending[k]->put();
611 }
612
613 return CONTINUE(wait_message);
614 }
615
616 CtPtr ProtocolV1::handle_message_header(char *buffer, int r) {
617 ldout(cct, 20) << __func__ << " r=" << r << dendl;
618
619 if (r < 0) {
620 ldout(cct, 1) << __func__ << " read message header failed" << dendl;
621 return _fault();
622 }
623
624 ldout(cct, 20) << __func__ << " got MSG header" << dendl;
625
626 current_header = *((ceph_msg_header *)buffer);
627
628 ldout(cct, 20) << __func__ << " got envelope type=" << current_header.type << " src "
629 << entity_name_t(current_header.src) << " front=" << current_header.front_len
630 << " data=" << current_header.data_len << " off " << current_header.data_off
631 << dendl;
632
633 if (messenger->crcflags & MSG_CRC_HEADER) {
634 __u32 header_crc = 0;
635 header_crc = ceph_crc32c(0, (unsigned char *)&current_header,
636 sizeof(current_header) - sizeof(current_header.crc));
637 // verify header crc
638 if (header_crc != current_header.crc) {
639 ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
640 << " != " << current_header.crc << dendl;
641 return _fault();
642 }
643 }
644
645 // Reset state
646 data_buf.clear();
647 front.clear();
648 middle.clear();
649 data.clear();
650
651 state = THROTTLE_MESSAGE;
652 return CONTINUE(throttle_message);
653 }
654
655 CtPtr ProtocolV1::throttle_message() {
656 ldout(cct, 20) << __func__ << dendl;
657
658 if (connection->policy.throttler_messages) {
659 ldout(cct, 10) << __func__ << " wants " << 1
660 << " message from policy throttler "
661 << connection->policy.throttler_messages->get_current()
662 << "/" << connection->policy.throttler_messages->get_max()
663 << dendl;
664 if (!connection->policy.throttler_messages->get_or_fail()) {
665 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
666 << connection->policy.throttler_messages->get_current()
667 << "/" << connection->policy.throttler_messages->get_max()
668 << " failed, just wait." << dendl;
669 // following thread pool deal with th full message queue isn't a
670 // short time, so we can wait a ms.
671 if (connection->register_time_events.empty()) {
672 connection->register_time_events.insert(
673 connection->center->create_time_event(1000,
674 connection->wakeup_handler));
675 }
676 return nullptr;
677 }
678 }
679
680 state = THROTTLE_BYTES;
681 return CONTINUE(throttle_bytes);
682 }
683
684 CtPtr ProtocolV1::throttle_bytes() {
685 ldout(cct, 20) << __func__ << dendl;
686
687 cur_msg_size = current_header.front_len + current_header.middle_len +
688 current_header.data_len;
689 if (cur_msg_size) {
690 if (connection->policy.throttler_bytes) {
691 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
692 << " bytes from policy throttler "
693 << connection->policy.throttler_bytes->get_current() << "/"
694 << connection->policy.throttler_bytes->get_max() << dendl;
695 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
696 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
697 << " bytes from policy throttler "
698 << connection->policy.throttler_bytes->get_current()
699 << "/" << connection->policy.throttler_bytes->get_max()
700 << " failed, just wait." << dendl;
701 // following thread pool deal with th full message queue isn't a
702 // short time, so we can wait a ms.
703 if (connection->register_time_events.empty()) {
704 connection->register_time_events.insert(
705 connection->center->create_time_event(
706 1000, connection->wakeup_handler));
707 }
708 return nullptr;
709 }
710 }
711 }
712
713 state = THROTTLE_DISPATCH_QUEUE;
714 return CONTINUE(throttle_dispatch_queue);
715 }
716
717 CtPtr ProtocolV1::throttle_dispatch_queue() {
718 ldout(cct, 20) << __func__ << dendl;
719
720 if (cur_msg_size) {
721 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
722 cur_msg_size)) {
723 ldout(cct, 10)
724 << __func__ << " wants " << cur_msg_size
725 << " bytes from dispatch throttle "
726 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
727 << connection->dispatch_queue->dispatch_throttler.get_max()
728 << " failed, just wait." << dendl;
729 // following thread pool deal with th full message queue isn't a
730 // short time, so we can wait a ms.
731 if (connection->register_time_events.empty()) {
732 connection->register_time_events.insert(
733 connection->center->create_time_event(1000,
734 connection->wakeup_handler));
735 }
736 return nullptr;
737 }
738 }
739
740 throttle_stamp = ceph_clock_now();
741
742 state = READ_MESSAGE_FRONT;
743 return read_message_front();
744 }
745
746 CtPtr ProtocolV1::read_message_front() {
747 ldout(cct, 20) << __func__ << dendl;
748
749 unsigned front_len = current_header.front_len;
750 if (front_len) {
751 if (!front.length()) {
752 front.push_back(ceph::buffer::create(front_len));
753 }
754 return READB(front_len, front.c_str(), handle_message_front);
755 }
756 return read_message_middle();
757 }
758
759 CtPtr ProtocolV1::handle_message_front(char *buffer, int r) {
760 ldout(cct, 20) << __func__ << " r=" << r << dendl;
761
762 if (r < 0) {
763 ldout(cct, 1) << __func__ << " read message front failed" << dendl;
764 return _fault();
765 }
766
767 ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
768
769 return read_message_middle();
770 }
771
772 CtPtr ProtocolV1::read_message_middle() {
773 ldout(cct, 20) << __func__ << dendl;
774
775 if (current_header.middle_len) {
776 if (!middle.length()) {
777 middle.push_back(ceph::buffer::create(current_header.middle_len));
778 }
779 return READB(current_header.middle_len, middle.c_str(),
780 handle_message_middle);
781 }
782
783 return read_message_data_prepare();
784 }
785
786 CtPtr ProtocolV1::handle_message_middle(char *buffer, int r) {
787 ldout(cct, 20) << __func__ << " r" << r << dendl;
788
789 if (r < 0) {
790 ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
791 return _fault();
792 }
793
794 ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
795
796 return read_message_data_prepare();
797 }
798
799 CtPtr ProtocolV1::read_message_data_prepare() {
800 ldout(cct, 20) << __func__ << dendl;
801
802 unsigned data_len = current_header.data_len;
803 unsigned data_off = current_header.data_off;
804
805 if (data_len) {
806 // get a buffer
807 #if 0
808 // rx_buffers is broken by design... see
809 // http://tracker.ceph.com/issues/22480
810 map<ceph_tid_t, pair<ceph::buffer::list, int> >::iterator p =
811 connection->rx_buffers.find(current_header.tid);
812 if (p != connection->rx_buffers.end()) {
813 ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
814 << " at offset " << data_off << " len "
815 << p->second.first.length() << dendl;
816 data_buf = p->second.first;
817 // make sure it's big enough
818 if (data_buf.length() < data_len)
819 data_buf.push_back(buffer::create(data_len - data_buf.length()));
820 data_blp = data_buf.begin();
821 } else {
822 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
823 << data_off << dendl;
824 alloc_aligned_buffer(data_buf, data_len, data_off);
825 data_blp = data_buf.begin();
826 }
827 #else
828 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
829 << data_off << dendl;
830 alloc_aligned_buffer(data_buf, data_len, data_off);
831 data_blp = data_buf.begin();
832 #endif
833 }
834
835 msg_left = data_len;
836
837 return CONTINUE(read_message_data);
838 }
839
840 CtPtr ProtocolV1::read_message_data() {
841 ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
842
843 if (msg_left > 0) {
844 auto bp = data_blp.get_current_ptr();
845 unsigned read_len = std::min(bp.length(), msg_left);
846
847 return READB(read_len, bp.c_str(), handle_message_data);
848 }
849
850 return read_message_footer();
851 }
852
853 CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
854 ldout(cct, 20) << __func__ << " r=" << r << dendl;
855
856 if (r < 0) {
857 ldout(cct, 1) << __func__ << " read data error " << dendl;
858 return _fault();
859 }
860
861 auto bp = data_blp.get_current_ptr();
862 unsigned read_len = std::min(bp.length(), msg_left);
863 ceph_assert(read_len <
864 static_cast<unsigned>(std::numeric_limits<int>::max()));
865 data_blp += read_len;
866 data.append(bp, 0, read_len);
867 msg_left -= read_len;
868
869 return CONTINUE(read_message_data);
870 }
871
872 CtPtr ProtocolV1::read_message_footer() {
873 ldout(cct, 20) << __func__ << dendl;
874
875 state = READ_FOOTER_AND_DISPATCH;
876
877 unsigned len;
878 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
879 len = sizeof(ceph_msg_footer);
880 } else {
881 len = sizeof(ceph_msg_footer_old);
882 }
883
884 return READ(len, handle_message_footer);
885 }
886
887 CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
888 ldout(cct, 20) << __func__ << " r=" << r << dendl;
889
890 if (r < 0) {
891 ldout(cct, 1) << __func__ << " read footer data error " << dendl;
892 return _fault();
893 }
894
895 ceph_msg_footer footer;
896 ceph_msg_footer_old old_footer;
897
898 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
899 footer = *((ceph_msg_footer *)buffer);
900 } else {
901 old_footer = *((ceph_msg_footer_old *)buffer);
902 footer.front_crc = old_footer.front_crc;
903 footer.middle_crc = old_footer.middle_crc;
904 footer.data_crc = old_footer.data_crc;
905 footer.sig = 0;
906 footer.flags = old_footer.flags;
907 }
908
909 int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
910 ldout(cct, 10) << __func__ << " aborted = " << aborted << dendl;
911 if (aborted) {
912 ldout(cct, 0) << __func__ << " got " << front.length() << " + "
913 << middle.length() << " + " << data.length()
914 << " byte message.. ABORTED" << dendl;
915 return _fault();
916 }
917
918 ldout(cct, 20) << __func__ << " got " << front.length() << " + "
919 << middle.length() << " + " << data.length() << " byte message"
920 << dendl;
921 Message *message = decode_message(cct, messenger->crcflags, current_header,
922 footer, front, middle, data, connection);
923 if (!message) {
924 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
925 return _fault();
926 }
927
928 //
929 // Check the signature if one should be present. A zero return indicates
930 // success. PLR
931 //
932
933 if (session_security.get() == NULL) {
934 ldout(cct, 10) << __func__ << " no session security set" << dendl;
935 } else {
936 if (session_security->check_message_signature(message)) {
937 ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
938 message->put();
939 return _fault();
940 }
941 }
942 message->set_byte_throttler(connection->policy.throttler_bytes);
943 message->set_message_throttler(connection->policy.throttler_messages);
944
945 // store reservation size in message, so we don't get confused
946 // by messages entering the dispatch queue through other paths.
947 message->set_dispatch_throttle_size(cur_msg_size);
948
949 message->set_recv_stamp(recv_stamp);
950 message->set_throttle_stamp(throttle_stamp);
951 message->set_recv_complete_stamp(ceph_clock_now());
952
953 // check received seq#. if it is old, drop the message.
954 // note that incoming messages may skip ahead. this is convenient for the
955 // client side queueing because messages can't be renumbered, but the (kernel)
956 // client will occasionally pull a message out of the sent queue to send
957 // elsewhere. in that case it doesn't matter if we "got" it or not.
958 uint64_t cur_seq = in_seq;
959 if (message->get_seq() <= cur_seq) {
960 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
961 << " <= " << cur_seq << " " << message << " " << *message
962 << ", discarding" << dendl;
963 message->put();
964 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
965 cct->_conf->ms_die_on_old_message) {
966 ceph_assert(0 == "old msgs despite reconnect_seq feature");
967 }
968 return nullptr;
969 }
970 if (message->get_seq() > cur_seq + 1) {
971 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
972 << cur_seq << " to " << message->get_seq() << dendl;
973 if (cct->_conf->ms_die_on_skipped_message) {
974 ceph_assert(0 == "skipped incoming seq");
975 }
976 }
977
978 #if defined(WITH_EVENTTRACE)
979 if (message->get_type() == CEPH_MSG_OSD_OP ||
980 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
981 utime_t ltt_processed_stamp = ceph_clock_now();
982 double usecs_elapsed =
983 ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
984 ostringstream buf;
985 if (message->get_type() == CEPH_MSG_OSD_OP)
986 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
987 false);
988 else
989 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
990 false);
991 }
992 #endif
993
994 // note last received message.
995 in_seq = message->get_seq();
996 ldout(cct, 5) << " rx " << message->get_source() << " seq "
997 << message->get_seq() << " " << message << " " << *message
998 << dendl;
999
1000 bool need_dispatch_writer = false;
1001 if (!connection->policy.lossy) {
1002 ack_left++;
1003 need_dispatch_writer = true;
1004 }
1005
1006 state = OPENED;
1007
1008 ceph::mono_time fast_dispatch_time;
1009
1010 if (connection->is_blackhole()) {
1011 ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1012 message->put();
1013 goto out;
1014 }
1015
1016 connection->logger->inc(l_msgr_recv_messages);
1017 connection->logger->inc(
1018 l_msgr_recv_bytes,
1019 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1020
1021 messenger->ms_fast_preprocess(message);
1022 fast_dispatch_time = ceph::mono_clock::now();
1023 connection->logger->tinc(l_msgr_running_recv_time,
1024 fast_dispatch_time - connection->recv_start_time);
1025 if (connection->delay_state) {
1026 double delay_period = 0;
1027 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1028 delay_period =
1029 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1030 ldout(cct, 1) << "queue_received will delay after "
1031 << (ceph_clock_now() + delay_period) << " on " << message
1032 << " " << *message << dendl;
1033 }
1034 connection->delay_state->queue(delay_period, message);
1035 } else if (messenger->ms_can_fast_dispatch(message)) {
1036 connection->lock.unlock();
1037 connection->dispatch_queue->fast_dispatch(message);
1038 connection->recv_start_time = ceph::mono_clock::now();
1039 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1040 connection->recv_start_time - fast_dispatch_time);
1041 connection->lock.lock();
1042 } else {
1043 connection->dispatch_queue->enqueue(message, message->get_priority(),
1044 connection->conn_id);
1045 }
1046
1047 out:
1048 // clean up local buffer references
1049 data_buf.clear();
1050 front.clear();
1051 middle.clear();
1052 data.clear();
1053
1054 if (need_dispatch_writer && connection->is_connected()) {
1055 connection->center->dispatch_event_external(connection->write_handler);
1056 }
1057
1058 return CONTINUE(wait_message);
1059 }
1060
1061 void ProtocolV1::session_reset() {
1062 ldout(cct, 10) << __func__ << " started" << dendl;
1063
1064 std::lock_guard<std::mutex> l(connection->write_lock);
1065 if (connection->delay_state) {
1066 connection->delay_state->discard();
1067 }
1068
1069 connection->dispatch_queue->discard_queue(connection->conn_id);
1070 discard_out_queue();
1071 // note: we need to clear outgoing_bl here, but session_reset may be
1072 // called by other thread, so let caller clear this itself!
1073 // outgoing_bl.clear();
1074
1075 connection->dispatch_queue->queue_remote_reset(connection);
1076
1077 randomize_out_seq();
1078
1079 in_seq = 0;
1080 connect_seq = 0;
1081 // it's safe to directly set 0, double locked
1082 ack_left = 0;
1083 once_ready = false;
1084 can_write = WriteStatus::NOWRITE;
1085 }
1086
1087 void ProtocolV1::randomize_out_seq() {
1088 if (connection->get_features() & CEPH_FEATURE_MSG_AUTH) {
1089 // Set out_seq to a random value, so CRC won't be predictable.
1090 auto rand_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
1091 ldout(cct, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
1092 out_seq = rand_seq;
1093 } else {
1094 // previously, seq #'s always started at 0.
1095 out_seq = 0;
1096 }
1097 }
1098
1099 ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) {
1100 FUNCTRACE(cct);
1101 ceph_assert(connection->center->in_thread());
1102 m->set_seq(++out_seq);
1103
1104 if (messenger->crcflags & MSG_CRC_HEADER) {
1105 m->calc_header_crc();
1106 }
1107
1108 ceph_msg_header &header = m->get_header();
1109 ceph_msg_footer &footer = m->get_footer();
1110
1111 // TODO: let sign_message could be reentry?
1112 // Now that we have all the crcs calculated, handle the
1113 // digital signature for the message, if the AsyncConnection has session
1114 // security set up. Some session security options do not
1115 // actually calculate and check the signature, but they should
1116 // handle the calls to sign_message and check_signature. PLR
1117 if (session_security.get() == NULL) {
1118 ldout(cct, 20) << __func__ << " no session security" << dendl;
1119 } else {
1120 if (session_security->sign_message(m)) {
1121 ldout(cct, 20) << __func__ << " failed to sign m=" << m
1122 << "): sig = " << footer.sig << dendl;
1123 } else {
1124 ldout(cct, 20) << __func__ << " signed m=" << m
1125 << "): sig = " << footer.sig << dendl;
1126 }
1127 }
1128
1129 connection->outgoing_bl.append(CEPH_MSGR_TAG_MSG);
1130 connection->outgoing_bl.append((char *)&header, sizeof(header));
1131
1132 ldout(cct, 20) << __func__ << " sending message type=" << header.type
1133 << " src " << entity_name_t(header.src)
1134 << " front=" << header.front_len << " data=" << header.data_len
1135 << " off " << header.data_off << dendl;
1136
1137 if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.get_num_buffers() > 1)) {
1138 for (const auto &pb : bl.buffers()) {
1139 connection->outgoing_bl.append((char *)pb.c_str(), pb.length());
1140 }
1141 } else {
1142 connection->outgoing_bl.claim_append(bl);
1143 }
1144
1145 // send footer; if receiver doesn't support signatures, use the old footer
1146 // format
1147 ceph_msg_footer_old old_footer;
1148 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
1149 connection->outgoing_bl.append((char *)&footer, sizeof(footer));
1150 } else {
1151 if (messenger->crcflags & MSG_CRC_HEADER) {
1152 old_footer.front_crc = footer.front_crc;
1153 old_footer.middle_crc = footer.middle_crc;
1154 } else {
1155 old_footer.front_crc = old_footer.middle_crc = 0;
1156 }
1157 old_footer.data_crc =
1158 messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
1159 old_footer.flags = footer.flags;
1160 connection->outgoing_bl.append((char *)&old_footer, sizeof(old_footer));
1161 }
1162
1163 m->trace.event("async writing message");
1164 ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
1165 << dendl;
1166 ssize_t total_send_size = connection->outgoing_bl.length();
1167 ssize_t rc = connection->_try_send(more);
1168 if (rc < 0) {
1169 ldout(cct, 1) << __func__ << " error sending " << m << ", "
1170 << cpp_strerror(rc) << dendl;
1171 } else {
1172 connection->logger->inc(
1173 l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
1174 ldout(cct, 10) << __func__ << " sending " << m
1175 << (rc ? " continuely." : " done.") << dendl;
1176 }
1177
1178 #if defined(WITH_EVENTTRACE)
1179 if (m->get_type() == CEPH_MSG_OSD_OP)
1180 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
1181 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1182 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
1183 #endif
1184 m->put();
1185
1186 return rc;
1187 }
1188
1189 void ProtocolV1::requeue_sent() {
1190 write_in_progress = false;
1191 if (sent.empty()) {
1192 return;
1193 }
1194
1195 list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1196 out_seq -= sent.size();
1197 while (!sent.empty()) {
1198 Message *m = sent.back();
1199 sent.pop_back();
1200 ldout(cct, 10) << __func__ << " " << *m << " for resend "
1201 << " (" << m->get_seq() << ")" << dendl;
1202 m->clear_payload();
1203 rq.push_front(make_pair(ceph::buffer::list(), m));
1204 }
1205 }
1206
1207 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
1208 ldout(cct, 10) << __func__ << " " << seq << dendl;
1209 std::lock_guard<std::mutex> l(connection->write_lock);
1210 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
1211 return seq;
1212 }
1213 list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1214 uint64_t count = out_seq;
1215 while (!rq.empty()) {
1216 pair<ceph::buffer::list, Message *> p = rq.front();
1217 if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
1218 ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
1219 << p.second->get_seq() << " <= " << seq << ", discarding"
1220 << dendl;
1221 p.second->put();
1222 rq.pop_front();
1223 count++;
1224 }
1225 if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1226 return count;
1227 }
1228
1229 /*
1230 * Tears down the message queues, and removes them from the
1231 * DispatchQueue Must hold write_lock prior to calling.
1232 */
1233 void ProtocolV1::discard_out_queue() {
1234 ldout(cct, 10) << __func__ << " started" << dendl;
1235
1236 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
1237 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
1238 (*p)->put();
1239 }
1240 sent.clear();
1241 for (map<int, list<pair<ceph::buffer::list, Message *> > >::iterator p =
1242 out_q.begin();
1243 p != out_q.end(); ++p) {
1244 for (list<pair<ceph::buffer::list, Message *> >::iterator r = p->second.begin();
1245 r != p->second.end(); ++r) {
1246 ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
1247 r->second->put();
1248 }
1249 }
1250 out_q.clear();
1251 write_in_progress = false;
1252 }
1253
1254 void ProtocolV1::reset_security()
1255 {
1256 ldout(cct, 5) << __func__ << dendl;
1257
1258 auth_meta.reset(new AuthConnectionMeta);
1259 authorizer_more.clear();
1260 session_security.reset();
1261 }
1262
1263 void ProtocolV1::reset_recv_state()
1264 {
1265 ldout(cct, 5) << __func__ << dendl;
1266
1267 // execute in the same thread that uses the `session_security`.
1268 // We need to do the warp because holding `write_lock` is not
1269 // enough as `write_event()` releases it just before calling
1270 // `write_message()`. `submit_to()` here is NOT blocking.
1271 if (!connection->center->in_thread()) {
1272 connection->center->submit_to(connection->center->get_id(), [this] {
1273 ldout(cct, 5) << "reset_recv_state (warped) reseting security handlers"
1274 << dendl;
1275 // Possibly unnecessary. See the comment in `deactivate_existing`.
1276 std::lock_guard<std::mutex> l(connection->lock);
1277 std::lock_guard<std::mutex> wl(connection->write_lock);
1278 reset_security();
1279 }, /* always_async = */true);
1280 } else {
1281 reset_security();
1282 }
1283
1284 // clean read and write callbacks
1285 connection->pendingReadLen.reset();
1286 connection->writeCallback.reset();
1287
1288 if (state > THROTTLE_MESSAGE && state <= READ_FOOTER_AND_DISPATCH &&
1289 connection->policy.throttler_messages) {
1290 ldout(cct, 10) << __func__ << " releasing " << 1
1291 << " message to policy throttler "
1292 << connection->policy.throttler_messages->get_current()
1293 << "/" << connection->policy.throttler_messages->get_max()
1294 << dendl;
1295 connection->policy.throttler_messages->put();
1296 }
1297 if (state > THROTTLE_BYTES && state <= READ_FOOTER_AND_DISPATCH) {
1298 if (connection->policy.throttler_bytes) {
1299 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
1300 << " bytes to policy throttler "
1301 << connection->policy.throttler_bytes->get_current() << "/"
1302 << connection->policy.throttler_bytes->get_max() << dendl;
1303 connection->policy.throttler_bytes->put(cur_msg_size);
1304 }
1305 }
1306 if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_FOOTER_AND_DISPATCH) {
1307 ldout(cct, 10)
1308 << __func__ << " releasing " << cur_msg_size
1309 << " bytes to dispatch_queue throttler "
1310 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1311 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
1312 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
1313 }
1314 }
1315
1316 Message *ProtocolV1::_get_next_outgoing(ceph::buffer::list *bl) {
1317 Message *m = 0;
1318 if (!out_q.empty()) {
1319 map<int, list<pair<ceph::buffer::list, Message *> > >::reverse_iterator it =
1320 out_q.rbegin();
1321 ceph_assert(!it->second.empty());
1322 list<pair<ceph::buffer::list, Message *> >::iterator p = it->second.begin();
1323 m = p->second;
1324 if (p->first.length() && bl) {
1325 assert(bl->length() == 0);
1326 bl->swap(p->first);
1327 }
1328 it->second.erase(p);
1329 if (it->second.empty()) out_q.erase(it->first);
1330 }
1331 return m;
1332 }
1333
1334 /**
1335 * Client Protocol V1
1336 **/
1337
1338 CtPtr ProtocolV1::send_client_banner() {
1339 ldout(cct, 20) << __func__ << dendl;
1340 state = CONNECTING;
1341
1342 ceph::buffer::list bl;
1343 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1344 return WRITE(bl, handle_client_banner_write);
1345 }
1346
1347 CtPtr ProtocolV1::handle_client_banner_write(int r) {
1348 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1349
1350 if (r < 0) {
1351 ldout(cct, 1) << __func__ << " write client banner failed" << dendl;
1352 return _fault();
1353 }
1354 ldout(cct, 10) << __func__ << " connect write banner done: "
1355 << connection->get_peer_addr() << dendl;
1356
1357 return wait_server_banner();
1358 }
1359
1360 CtPtr ProtocolV1::wait_server_banner() {
1361 state = CONNECTING_WAIT_BANNER_AND_IDENTIFY;
1362
1363 ldout(cct, 20) << __func__ << dendl;
1364
1365 ceph::buffer::list myaddrbl;
1366 unsigned banner_len = strlen(CEPH_BANNER);
1367 unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
1368 return READ(need_len, handle_server_banner_and_identify);
1369 }
1370
1371 CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
1372 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1373
1374 if (r < 0) {
1375 ldout(cct, 1) << __func__ << " read banner and identify addresses failed"
1376 << dendl;
1377 return _fault();
1378 }
1379
1380 unsigned banner_len = strlen(CEPH_BANNER);
1381 if (memcmp(buffer, CEPH_BANNER, banner_len)) {
1382 ldout(cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
1383 << connection->get_peer_addr() << dendl;
1384 return _fault();
1385 }
1386
1387 ceph::buffer::list bl;
1388 entity_addr_t paddr, peer_addr_for_me;
1389
1390 bl.append(buffer + banner_len, sizeof(ceph_entity_addr) * 2);
1391 auto p = bl.cbegin();
1392 try {
1393 decode(paddr, p);
1394 decode(peer_addr_for_me, p);
1395 } catch (const ceph::buffer::error &e) {
1396 lderr(cct) << __func__ << " decode peer addr failed " << dendl;
1397 return _fault();
1398 }
1399 ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
1400 << " on socket " << connection->cs.fd() << dendl;
1401
1402 entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
1403 if (peer_addr != paddr) {
1404 if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
1405 peer_addr.get_nonce() == paddr.get_nonce()) {
1406 ldout(cct, 0) << __func__ << " connect claims to be " << paddr << " not "
1407 << peer_addr << " - presumably this is the same node!"
1408 << dendl;
1409 } else {
1410 ldout(cct, 10) << __func__ << " connect claims to be " << paddr << " not "
1411 << peer_addr << dendl;
1412 return _fault();
1413 }
1414 }
1415
1416 ldout(cct, 20) << __func__ << " connect peer addr for me is "
1417 << peer_addr_for_me << dendl;
1418 if (messenger->get_myaddrs().empty() ||
1419 messenger->get_myaddrs().front().is_blank_ip()) {
1420 sockaddr_storage ss;
1421 socklen_t len = sizeof(ss);
1422 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
1423 entity_addr_t a;
1424 if (cct->_conf->ms_learn_addr_from_peer) {
1425 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
1426 << " says I am " << peer_addr_for_me << " (socket says "
1427 << (sockaddr*)&ss << ")" << dendl;
1428 a = peer_addr_for_me;
1429 } else {
1430 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
1431 << " says I am " << (sockaddr*)&ss
1432 << " (peer says " << peer_addr_for_me << ")" << dendl;
1433 a.set_sockaddr((sockaddr *)&ss);
1434 }
1435 a.set_type(entity_addr_t::TYPE_LEGACY); // anything but NONE; learned_addr ignores this
1436 a.set_port(0);
1437 connection->lock.unlock();
1438 messenger->learned_addr(a);
1439 if (cct->_conf->ms_inject_internal_delays &&
1440 cct->_conf->ms_inject_socket_failures) {
1441 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1442 ldout(cct, 10) << __func__ << " sleep for "
1443 << cct->_conf->ms_inject_internal_delays << dendl;
1444 utime_t t;
1445 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1446 t.sleep();
1447 }
1448 }
1449 connection->lock.lock();
1450 if (state != CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1451 ldout(cct, 1) << __func__
1452 << " state changed while learned_addr, mark_down or "
1453 << " replacing must be happened just now" << dendl;
1454 return nullptr;
1455 }
1456 }
1457
1458 ceph::buffer::list myaddrbl;
1459 encode(messenger->get_myaddr_legacy(), myaddrbl, 0); // legacy
1460 return WRITE(myaddrbl, handle_my_addr_write);
1461 }
1462
1463 CtPtr ProtocolV1::handle_my_addr_write(int r) {
1464 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1465
1466 if (r < 0) {
1467 ldout(cct, 2) << __func__ << " connect couldn't write my addr, "
1468 << cpp_strerror(r) << dendl;
1469 return _fault();
1470 }
1471 ldout(cct, 10) << __func__ << " connect sent my addr "
1472 << messenger->get_myaddr_legacy() << dendl;
1473
1474 return CONTINUE(send_connect_message);
1475 }
1476
1477 CtPtr ProtocolV1::send_connect_message()
1478 {
1479 state = CONNECTING_SEND_CONNECT_MSG;
1480
1481 ldout(cct, 20) << __func__ << dendl;
1482 ceph_assert(messenger->auth_client);
1483
1484 ceph::buffer::list auth_bl;
1485 vector<uint32_t> preferred_modes;
1486
1487 if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
1488 messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
1489 if (authorizer_more.length()) {
1490 ldout(cct,10) << __func__ << " using augmented (challenge) auth payload"
1491 << dendl;
1492 auth_bl = authorizer_more;
1493 } else {
1494 auto am = auth_meta;
1495 authorizer_more.clear();
1496 connection->lock.unlock();
1497 int r = messenger->auth_client->get_auth_request(
1498 connection, am.get(),
1499 &am->auth_method, &preferred_modes, &auth_bl);
1500 connection->lock.lock();
1501 if (r < 0) {
1502 return _fault();
1503 }
1504 if (state != CONNECTING_SEND_CONNECT_MSG) {
1505 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1506 return _fault();
1507 }
1508 }
1509 }
1510
1511 ceph_msg_connect connect;
1512 connect.features = connection->policy.features_supported;
1513 connect.host_type = messenger->get_myname().type();
1514 connect.global_seq = global_seq;
1515 connect.connect_seq = connect_seq;
1516 connect.protocol_version =
1517 messenger->get_proto_version(connection->peer_type, true);
1518 if (auth_bl.length()) {
1519 ldout(cct, 10) << __func__
1520 << " connect_msg.authorizer_len=" << auth_bl.length()
1521 << " protocol=" << auth_meta->auth_method << dendl;
1522 connect.authorizer_protocol = auth_meta->auth_method;
1523 connect.authorizer_len = auth_bl.length();
1524 } else {
1525 connect.authorizer_protocol = 0;
1526 connect.authorizer_len = 0;
1527 }
1528
1529 connect.flags = 0;
1530 if (connection->policy.lossy) {
1531 connect.flags |=
1532 CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1533 }
1534
1535 ceph::buffer::list bl;
1536 bl.append((char *)&connect, sizeof(connect));
1537 if (auth_bl.length()) {
1538 bl.append(auth_bl.c_str(), auth_bl.length());
1539 }
1540
1541 ldout(cct, 10) << __func__ << " connect sending gseq=" << global_seq
1542 << " cseq=" << connect_seq
1543 << " proto=" << connect.protocol_version << dendl;
1544
1545 return WRITE(bl, handle_connect_message_write);
1546 }
1547
1548 CtPtr ProtocolV1::handle_connect_message_write(int r) {
1549 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1550
1551 if (r < 0) {
1552 ldout(cct, 2) << __func__ << " connect couldn't send reply "
1553 << cpp_strerror(r) << dendl;
1554 return _fault();
1555 }
1556
1557 ldout(cct, 20) << __func__
1558 << " connect wrote (self +) cseq, waiting for reply" << dendl;
1559
1560 return wait_connect_reply();
1561 }
1562
1563 CtPtr ProtocolV1::wait_connect_reply() {
1564 ldout(cct, 20) << __func__ << dendl;
1565
1566 // FIPS zeroization audit 20191115: this memset is not security related.
1567 memset(&connect_reply, 0, sizeof(connect_reply));
1568 return READ(sizeof(connect_reply), handle_connect_reply_1);
1569 }
1570
1571 CtPtr ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
1572 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1573
1574 if (r < 0) {
1575 ldout(cct, 1) << __func__ << " read connect reply failed" << dendl;
1576 return _fault();
1577 }
1578
1579 connect_reply = *((ceph_msg_connect_reply *)buffer);
1580
1581 ldout(cct, 20) << __func__ << " connect got reply tag "
1582 << (int)connect_reply.tag << " connect_seq "
1583 << connect_reply.connect_seq << " global_seq "
1584 << connect_reply.global_seq << " proto "
1585 << connect_reply.protocol_version << " flags "
1586 << (int)connect_reply.flags << " features "
1587 << connect_reply.features << dendl;
1588
1589 if (connect_reply.authorizer_len) {
1590 return wait_connect_reply_auth();
1591 }
1592
1593 return handle_connect_reply_2();
1594 }
1595
1596 CtPtr ProtocolV1::wait_connect_reply_auth() {
1597 ldout(cct, 20) << __func__ << dendl;
1598
1599 ldout(cct, 10) << __func__
1600 << " reply.authorizer_len=" << connect_reply.authorizer_len
1601 << dendl;
1602
1603 ceph_assert(connect_reply.authorizer_len < 4096);
1604
1605 return READ(connect_reply.authorizer_len, handle_connect_reply_auth);
1606 }
1607
1608 CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
1609 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1610
1611 if (r < 0) {
1612 ldout(cct, 1) << __func__ << " read connect reply authorizer failed"
1613 << dendl;
1614 return _fault();
1615 }
1616
1617 ceph::buffer::list authorizer_reply;
1618 authorizer_reply.append(buffer, connect_reply.authorizer_len);
1619
1620 if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
1621 messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
1622 auto am = auth_meta;
1623 bool more = (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER);
1624 ceph::buffer::list auth_retry_bl;
1625 int r;
1626 connection->lock.unlock();
1627 if (more) {
1628 r = messenger->auth_client->handle_auth_reply_more(
1629 connection, am.get(), authorizer_reply, &auth_retry_bl);
1630 } else {
1631 // these aren't used for v1
1632 CryptoKey skey;
1633 string con_secret;
1634 r = messenger->auth_client->handle_auth_done(
1635 connection, am.get(),
1636 0 /* global id */, 0 /* con mode */,
1637 authorizer_reply,
1638 &skey, &con_secret);
1639 }
1640 connection->lock.lock();
1641 if (state != CONNECTING_SEND_CONNECT_MSG) {
1642 ldout(cct, 1) << __func__ << " state changed" << dendl;
1643 return _fault();
1644 }
1645 if (r < 0) {
1646 return _fault();
1647 }
1648 if (more && r == 0) {
1649 authorizer_more = auth_retry_bl;
1650 return CONTINUE(send_connect_message);
1651 }
1652 }
1653
1654 return handle_connect_reply_2();
1655 }
1656
1657 CtPtr ProtocolV1::handle_connect_reply_2() {
1658 ldout(cct, 20) << __func__ << dendl;
1659
1660 if (connect_reply.tag == CEPH_MSGR_TAG_FEATURES) {
1661 ldout(cct, 0) << __func__ << " connect protocol feature mismatch, my "
1662 << std::hex << connection->policy.features_supported
1663 << " < peer " << connect_reply.features << " missing "
1664 << (connect_reply.features &
1665 ~connection->policy.features_supported)
1666 << std::dec << dendl;
1667 return _fault();
1668 }
1669
1670 if (connect_reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1671 ldout(cct, 0) << __func__ << " connect protocol version mismatch, my "
1672 << messenger->get_proto_version(connection->peer_type, true)
1673 << " != " << connect_reply.protocol_version << dendl;
1674 return _fault();
1675 }
1676
1677 if (connect_reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1678 ldout(cct, 0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1679 authorizer_more.clear();
1680 return _fault();
1681 }
1682
1683 if (connect_reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1684 ldout(cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1685 session_reset();
1686 connect_seq = 0;
1687
1688 // see session_reset
1689 connection->outgoing_bl.clear();
1690
1691 return CONTINUE(send_connect_message);
1692 }
1693
1694 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1695 global_seq = messenger->get_global_seq(connect_reply.global_seq);
1696 ldout(cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1697 << connect_reply.global_seq << " chose new " << global_seq
1698 << dendl;
1699 return CONTINUE(send_connect_message);
1700 }
1701
1702 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1703 ceph_assert(connect_reply.connect_seq > connect_seq);
1704 ldout(cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq
1705 << " -> " << connect_reply.connect_seq << dendl;
1706 connect_seq = connect_reply.connect_seq;
1707 return CONTINUE(send_connect_message);
1708 }
1709
1710 if (connect_reply.tag == CEPH_MSGR_TAG_WAIT) {
1711 ldout(cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1712 state = WAIT;
1713 return _fault();
1714 }
1715
1716 uint64_t feat_missing;
1717 feat_missing =
1718 connection->policy.features_required & ~(uint64_t)connect_reply.features;
1719 if (feat_missing) {
1720 ldout(cct, 1) << __func__ << " missing required features " << std::hex
1721 << feat_missing << std::dec << dendl;
1722 return _fault();
1723 }
1724
1725 if (connect_reply.tag == CEPH_MSGR_TAG_SEQ) {
1726 ldout(cct, 10)
1727 << __func__
1728 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1729 << dendl;
1730
1731 return wait_ack_seq();
1732 }
1733
1734 if (connect_reply.tag == CEPH_MSGR_TAG_READY) {
1735 ldout(cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1736 }
1737
1738 return client_ready();
1739 }
1740
1741 CtPtr ProtocolV1::wait_ack_seq() {
1742 ldout(cct, 20) << __func__ << dendl;
1743
1744 return READ(sizeof(uint64_t), handle_ack_seq);
1745 }
1746
1747 CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
1748 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1749
1750 if (r < 0) {
1751 ldout(cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1752 return _fault();
1753 }
1754
1755 uint64_t newly_acked_seq = 0;
1756
1757 newly_acked_seq = *((uint64_t *)buffer);
1758 ldout(cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1759 << " vs out_seq " << out_seq << dendl;
1760 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
1761
1762 ceph::buffer::list bl;
1763 uint64_t s = in_seq;
1764 bl.append((char *)&s, sizeof(s));
1765
1766 return WRITE(bl, handle_in_seq_write);
1767 }
1768
1769 CtPtr ProtocolV1::handle_in_seq_write(int r) {
1770 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1771
1772 if (r < 0) {
1773 ldout(cct, 10) << __func__ << " failed to send in_seq " << dendl;
1774 return _fault();
1775 }
1776
1777 ldout(cct, 10) << __func__ << " send in_seq done " << dendl;
1778
1779 return client_ready();
1780 }
1781
1782 CtPtr ProtocolV1::client_ready() {
1783 ldout(cct, 20) << __func__ << dendl;
1784
1785 // hooray!
1786 peer_global_seq = connect_reply.global_seq;
1787 connection->policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1788
1789 once_ready = true;
1790 connect_seq += 1;
1791 ceph_assert(connect_seq == connect_reply.connect_seq);
1792 backoff = utime_t();
1793 connection->set_features((uint64_t)connect_reply.features &
1794 (uint64_t)connection->policy.features_supported);
1795 ldout(cct, 10) << __func__ << " connect success " << connect_seq
1796 << ", lossy = " << connection->policy.lossy << ", features "
1797 << connection->get_features() << dendl;
1798
1799 // If we have an authorizer, get a new AuthSessionHandler to deal with
1800 // ongoing security of the connection. PLR
1801 if (auth_meta->authorizer) {
1802 ldout(cct, 10) << __func__ << " setting up session_security with auth "
1803 << auth_meta->authorizer.get() << dendl;
1804 session_security.reset(get_auth_session_handler(
1805 cct, auth_meta->authorizer->protocol,
1806 auth_meta->session_key,
1807 connection->get_features()));
1808 } else {
1809 // We have no authorizer, so we shouldn't be applying security to messages
1810 // in this AsyncConnection. PLR
1811 ldout(cct, 10) << __func__ << " no authorizer, clearing session_security"
1812 << dendl;
1813 session_security.reset();
1814 }
1815
1816 if (connection->delay_state) {
1817 ceph_assert(connection->delay_state->ready());
1818 }
1819 connection->dispatch_queue->queue_connect(connection);
1820 messenger->ms_deliver_handle_fast_connect(connection);
1821
1822 return ready();
1823 }
1824
1825 /**
1826 * Server Protocol V1
1827 **/
1828
1829 CtPtr ProtocolV1::send_server_banner() {
1830 ldout(cct, 20) << __func__ << dendl;
1831 state = ACCEPTING;
1832
1833 ceph::buffer::list bl;
1834
1835 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1836
1837 // as a server, we should have a legacy addr if we accepted this connection.
1838 auto legacy = messenger->get_myaddrs().legacy_addr();
1839 encode(legacy, bl, 0); // legacy
1840 connection->port = legacy.get_port();
1841 encode(connection->target_addr, bl, 0); // legacy
1842
1843 ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd()
1844 << " legacy " << legacy
1845 << " socket_addr " << connection->socket_addr
1846 << " target_addr " << connection->target_addr
1847 << dendl;
1848
1849 return WRITE(bl, handle_server_banner_write);
1850 }
1851
1852 CtPtr ProtocolV1::handle_server_banner_write(int r) {
1853 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1854
1855 if (r < 0) {
1856 ldout(cct, 1) << " write server banner failed" << dendl;
1857 return _fault();
1858 }
1859 ldout(cct, 10) << __func__ << " write banner and addr done: "
1860 << connection->get_peer_addr() << dendl;
1861
1862 return wait_client_banner();
1863 }
1864
1865 CtPtr ProtocolV1::wait_client_banner() {
1866 ldout(cct, 20) << __func__ << dendl;
1867
1868 return READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
1869 handle_client_banner);
1870 }
1871
1872 CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
1873 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1874
1875 if (r < 0) {
1876 ldout(cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1877 return _fault();
1878 }
1879
1880 if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1881 ldout(cct, 1) << __func__ << " accept peer sent bad banner '" << buffer
1882 << "' (should be '" << CEPH_BANNER << "')" << dendl;
1883 return _fault();
1884 }
1885
1886 ceph::buffer::list addr_bl;
1887 entity_addr_t peer_addr;
1888
1889 addr_bl.append(buffer + strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1890 try {
1891 auto ti = addr_bl.cbegin();
1892 decode(peer_addr, ti);
1893 } catch (const ceph::buffer::error &e) {
1894 lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
1895 return _fault();
1896 }
1897
1898 ldout(cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1899 if (peer_addr.is_blank_ip()) {
1900 // peer apparently doesn't know what ip they have; figure it out for them.
1901 int port = peer_addr.get_port();
1902 peer_addr.set_sockaddr(connection->target_addr.get_sockaddr());
1903 peer_addr.set_port(port);
1904
1905 ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1906 << " (socket is " << connection->target_addr << ")" << dendl;
1907 }
1908 connection->set_peer_addr(peer_addr); // so that connection_state gets set up
1909 connection->target_addr = peer_addr;
1910
1911 return CONTINUE(wait_connect_message);
1912 }
1913
1914 CtPtr ProtocolV1::wait_connect_message() {
1915 ldout(cct, 20) << __func__ << dendl;
1916
1917 // FIPS zeroization audit 20191115: this memset is not security related.
1918 memset(&connect_msg, 0, sizeof(connect_msg));
1919 return READ(sizeof(connect_msg), handle_connect_message_1);
1920 }
1921
1922 CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
1923 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1924
1925 if (r < 0) {
1926 ldout(cct, 1) << __func__ << " read connect msg failed" << dendl;
1927 return _fault();
1928 }
1929
1930 connect_msg = *((ceph_msg_connect *)buffer);
1931
1932 state = ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1933
1934 if (connect_msg.authorizer_len) {
1935 return wait_connect_message_auth();
1936 }
1937
1938 return handle_connect_message_2();
1939 }
1940
1941 CtPtr ProtocolV1::wait_connect_message_auth() {
1942 ldout(cct, 20) << __func__ << dendl;
1943 authorizer_buf.clear();
1944 authorizer_buf.push_back(ceph::buffer::create(connect_msg.authorizer_len));
1945 return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
1946 handle_connect_message_auth);
1947 }
1948
1949 CtPtr ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
1950 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1951
1952 if (r < 0) {
1953 ldout(cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1954 return _fault();
1955 }
1956
1957 return handle_connect_message_2();
1958 }
1959
1960 CtPtr ProtocolV1::handle_connect_message_2() {
1961 ldout(cct, 20) << __func__ << dendl;
1962
1963 ldout(cct, 20) << __func__ << " accept got peer connect_seq "
1964 << connect_msg.connect_seq << " global_seq "
1965 << connect_msg.global_seq << dendl;
1966
1967 connection->set_peer_type(connect_msg.host_type);
1968 connection->policy = messenger->get_policy(connect_msg.host_type);
1969
1970 ldout(cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1971 << ", policy.lossy=" << connection->policy.lossy
1972 << " policy.server=" << connection->policy.server
1973 << " policy.standby=" << connection->policy.standby
1974 << " policy.resetcheck=" << connection->policy.resetcheck
1975 << " features 0x" << std::hex << (uint64_t)connect_msg.features
1976 << std::dec
1977 << dendl;
1978
1979 ceph_msg_connect_reply reply;
1980 ceph::buffer::list authorizer_reply;
1981
1982 // FIPS zeroization audit 20191115: this memset is not security related.
1983 memset(&reply, 0, sizeof(reply));
1984 reply.protocol_version =
1985 messenger->get_proto_version(connection->peer_type, false);
1986
1987 // mismatch?
1988 ldout(cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1989 << ", their proto " << connect_msg.protocol_version << dendl;
1990
1991 if (connect_msg.protocol_version != reply.protocol_version) {
1992 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
1993 authorizer_reply);
1994 }
1995
1996 // require signatures for cephx?
1997 if (connect_msg.authorizer_protocol == CEPH_AUTH_CEPHX) {
1998 if (connection->peer_type == CEPH_ENTITY_TYPE_OSD ||
1999 connection->peer_type == CEPH_ENTITY_TYPE_MDS ||
2000 connection->peer_type == CEPH_ENTITY_TYPE_MGR) {
2001 if (cct->_conf->cephx_require_signatures ||
2002 cct->_conf->cephx_cluster_require_signatures) {
2003 ldout(cct, 10)
2004 << __func__
2005 << " using cephx, requiring MSG_AUTH feature bit for cluster"
2006 << dendl;
2007 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
2008 }
2009 if (cct->_conf->cephx_require_version >= 2 ||
2010 cct->_conf->cephx_cluster_require_version >= 2) {
2011 ldout(cct, 10)
2012 << __func__
2013 << " using cephx, requiring cephx v2 feature bit for cluster"
2014 << dendl;
2015 connection->policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
2016 }
2017 } else {
2018 if (cct->_conf->cephx_require_signatures ||
2019 cct->_conf->cephx_service_require_signatures) {
2020 ldout(cct, 10)
2021 << __func__
2022 << " using cephx, requiring MSG_AUTH feature bit for service"
2023 << dendl;
2024 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
2025 }
2026 if (cct->_conf->cephx_require_version >= 2 ||
2027 cct->_conf->cephx_service_require_version >= 2) {
2028 ldout(cct, 10)
2029 << __func__
2030 << " using cephx, requiring cephx v2 feature bit for service"
2031 << dendl;
2032 connection->policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
2033 }
2034 }
2035 }
2036
2037 uint64_t feat_missing =
2038 connection->policy.features_required & ~(uint64_t)connect_msg.features;
2039 if (feat_missing) {
2040 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2041 << feat_missing << std::dec << dendl;
2042 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply,
2043 authorizer_reply);
2044 }
2045
2046 ceph::buffer::list auth_bl_copy = authorizer_buf;
2047 auto am = auth_meta;
2048 am->auth_method = connect_msg.authorizer_protocol;
2049 if (!HAVE_FEATURE((uint64_t)connect_msg.features, CEPHX_V2)) {
2050 // peer doesn't support it and we won't get here if we require it
2051 am->skip_authorizer_challenge = true;
2052 }
2053 connection->lock.unlock();
2054 ldout(cct,10) << __func__ << " authorizor_protocol "
2055 << connect_msg.authorizer_protocol
2056 << " len " << auth_bl_copy.length()
2057 << dendl;
2058 bool more = (bool)auth_meta->authorizer_challenge;
2059 int r = messenger->auth_server->handle_auth_request(
2060 connection,
2061 am.get(),
2062 more,
2063 am->auth_method,
2064 auth_bl_copy,
2065 &authorizer_reply);
2066 if (r < 0) {
2067 connection->lock.lock();
2068 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2069 ldout(cct, 1) << __func__ << " state changed" << dendl;
2070 return _fault();
2071 }
2072 ldout(cct, 0) << __func__ << ": got bad authorizer, auth_reply_len="
2073 << authorizer_reply.length() << dendl;
2074 session_security.reset();
2075 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
2076 authorizer_reply);
2077 }
2078 if (r == 0) {
2079 connection->lock.lock();
2080 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2081 ldout(cct, 1) << __func__ << " state changed" << dendl;
2082 return _fault();
2083 }
2084 ldout(cct, 10) << __func__ << ": challenging authorizer" << dendl;
2085 ceph_assert(authorizer_reply.length());
2086 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER,
2087 reply, authorizer_reply);
2088 }
2089
2090 // We've verified the authorizer for this AsyncConnection, so set up the
2091 // session security structure. PLR
2092 ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
2093
2094 if (connection->policy.server &&
2095 connection->policy.lossy &&
2096 !connection->policy.register_lossy_clients) {
2097 // incoming lossy client, no need to register this connection
2098 // new session
2099 ldout(cct, 10) << __func__ << " accept new session" << dendl;
2100 connection->lock.lock();
2101 return open(reply, authorizer_reply);
2102 }
2103
2104 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2105
2106 connection->inject_delay();
2107
2108 connection->lock.lock();
2109 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2110 ldout(cct, 1) << __func__ << " state changed" << dendl;
2111 return _fault();
2112 }
2113
2114 if (existing == connection) {
2115 existing = nullptr;
2116 }
2117 if (existing && existing->protocol->proto_type != 1) {
2118 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2119 << existing->protocol.get() << " version is "
2120 << existing->protocol->proto_type << ", marking down" << dendl;
2121 existing->mark_down();
2122 existing = nullptr;
2123 }
2124
2125 if (existing) {
2126 // There is no possible that existing connection will acquire this
2127 // connection's lock
2128 existing->lock.lock(); // skip lockdep check (we are locking a second
2129 // AsyncConnection here)
2130
2131 ldout(cct,10) << __func__ << " existing=" << existing << " exproto="
2132 << existing->protocol.get() << dendl;
2133 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2134 ceph_assert(exproto);
2135 ceph_assert(exproto->proto_type == 1);
2136
2137 if (exproto->state == CLOSED) {
2138 ldout(cct, 1) << __func__ << " existing " << existing
2139 << " already closed." << dendl;
2140 existing->lock.unlock();
2141 existing = nullptr;
2142
2143 return open(reply, authorizer_reply);
2144 }
2145
2146 if (exproto->replacing) {
2147 ldout(cct, 1) << __func__
2148 << " existing racing replace happened while replacing."
2149 << " existing_state="
2150 << connection->get_state_name(existing->state) << dendl;
2151 reply.global_seq = exproto->peer_global_seq;
2152 existing->lock.unlock();
2153 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2154 authorizer_reply);
2155 }
2156
2157 if (connect_msg.global_seq < exproto->peer_global_seq) {
2158 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2159 << exproto->peer_global_seq << " > "
2160 << connect_msg.global_seq << ", RETRY_GLOBAL" << dendl;
2161 reply.global_seq = exproto->peer_global_seq; // so we can send it below..
2162 existing->lock.unlock();
2163 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2164 authorizer_reply);
2165 } else {
2166 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2167 << exproto->peer_global_seq
2168 << " <= " << connect_msg.global_seq << ", looks ok"
2169 << dendl;
2170 }
2171
2172 if (existing->policy.lossy) {
2173 ldout(cct, 0)
2174 << __func__
2175 << " accept replacing existing (lossy) channel (new one lossy="
2176 << connection->policy.lossy << ")" << dendl;
2177 exproto->session_reset();
2178 return replace(existing, reply, authorizer_reply);
2179 }
2180
2181 ldout(cct, 1) << __func__ << " accept connect_seq "
2182 << connect_msg.connect_seq
2183 << " vs existing csq=" << exproto->connect_seq
2184 << " existing_state="
2185 << connection->get_state_name(existing->state) << dendl;
2186
2187 if (connect_msg.connect_seq == 0 && exproto->connect_seq > 0) {
2188 ldout(cct, 0)
2189 << __func__
2190 << " accept peer reset, then tried to connect to us, replacing"
2191 << dendl;
2192 // this is a hard reset from peer
2193 is_reset_from_peer = true;
2194 if (connection->policy.resetcheck) {
2195 exproto->session_reset(); // this resets out_queue, msg_ and
2196 // connect_seq #'s
2197 }
2198 return replace(existing, reply, authorizer_reply);
2199 }
2200
2201 if (connect_msg.connect_seq < exproto->connect_seq) {
2202 // old attempt, or we sent READY but they didn't get it.
2203 ldout(cct, 10) << __func__ << " accept existing " << existing << ".cseq "
2204 << exproto->connect_seq << " > " << connect_msg.connect_seq
2205 << ", RETRY_SESSION" << dendl;
2206 reply.connect_seq = exproto->connect_seq + 1;
2207 existing->lock.unlock();
2208 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2209 authorizer_reply);
2210 }
2211
2212 if (connect_msg.connect_seq == exproto->connect_seq) {
2213 // if the existing connection successfully opened, and/or
2214 // subsequently went to standby, then the peer should bump
2215 // their connect_seq and retry: this is not a connection race
2216 // we need to resolve here.
2217 if (exproto->state == OPENED || exproto->state == STANDBY) {
2218 ldout(cct, 10) << __func__ << " accept connection race, existing "
2219 << existing << ".cseq " << exproto->connect_seq
2220 << " == " << connect_msg.connect_seq
2221 << ", OPEN|STANDBY, RETRY_SESSION " << dendl;
2222 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2223 // replace
2224 if (connection->policy.resetcheck && exproto->connect_seq == 0) {
2225 return replace(existing, reply, authorizer_reply);
2226 }
2227
2228 reply.connect_seq = exproto->connect_seq + 1;
2229 existing->lock.unlock();
2230 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2231 authorizer_reply);
2232 }
2233
2234 // connection race?
2235 if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr_legacy() ||
2236 existing->policy.server) {
2237 // incoming wins
2238 ldout(cct, 10) << __func__ << " accept connection race, existing "
2239 << existing << ".cseq " << exproto->connect_seq
2240 << " == " << connect_msg.connect_seq
2241 << ", or we are server, replacing my attempt" << dendl;
2242 return replace(existing, reply, authorizer_reply);
2243 } else {
2244 // our existing outgoing wins
2245 ldout(messenger->cct, 10)
2246 << __func__ << " accept connection race, existing " << existing
2247 << ".cseq " << exproto->connect_seq
2248 << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
2249 ceph_assert(connection->peer_addrs->legacy_addr() >
2250 messenger->get_myaddr_legacy());
2251 existing->lock.unlock();
2252 // make sure we follow through with opening the existing
2253 // connection (if it isn't yet open) since we know the peer
2254 // has something to send to us.
2255 existing->send_keepalive();
2256 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
2257 authorizer_reply);
2258 }
2259 }
2260
2261 ceph_assert(connect_msg.connect_seq > exproto->connect_seq);
2262 ceph_assert(connect_msg.global_seq >= exproto->peer_global_seq);
2263 if (connection->policy.resetcheck && // RESETSESSION only used by servers;
2264 // peers do not reset each other
2265 exproto->connect_seq == 0) {
2266 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2267 << connect_msg.connect_seq << ", " << existing
2268 << ".cseq = " << exproto->connect_seq
2269 << "), sending RESETSESSION " << dendl;
2270 existing->lock.unlock();
2271 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2272 authorizer_reply);
2273 }
2274
2275 // reconnect
2276 ldout(cct, 10) << __func__ << " accept peer sent cseq "
2277 << connect_msg.connect_seq << " > " << exproto->connect_seq
2278 << dendl;
2279 return replace(existing, reply, authorizer_reply);
2280 } // existing
2281 else if (!replacing && connect_msg.connect_seq > 0) {
2282 // we reset, and they are opening a new session
2283 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2284 << connect_msg.connect_seq << "), sending RESETSESSION"
2285 << dendl;
2286 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2287 authorizer_reply);
2288 } else {
2289 // new session
2290 ldout(cct, 10) << __func__ << " accept new session" << dendl;
2291 existing = nullptr;
2292 return open(reply, authorizer_reply);
2293 }
2294 }
2295
2296 CtPtr ProtocolV1::send_connect_message_reply(char tag,
2297 ceph_msg_connect_reply &reply,
2298 ceph::buffer::list &authorizer_reply) {
2299 ldout(cct, 20) << __func__ << dendl;
2300 ceph::buffer::list reply_bl;
2301 reply.tag = tag;
2302 reply.features =
2303 ((uint64_t)connect_msg.features & connection->policy.features_supported) |
2304 connection->policy.features_required;
2305 reply.authorizer_len = authorizer_reply.length();
2306 reply_bl.append((char *)&reply, sizeof(reply));
2307
2308 ldout(cct, 10) << __func__ << " reply features 0x" << std::hex
2309 << reply.features << " = (policy sup 0x"
2310 << connection->policy.features_supported
2311 << " & connect 0x" << (uint64_t)connect_msg.features
2312 << ") | policy req 0x"
2313 << connection->policy.features_required
2314 << dendl;
2315
2316 if (reply.authorizer_len) {
2317 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2318 authorizer_reply.clear();
2319 }
2320
2321 return WRITE(reply_bl, handle_connect_message_reply_write);
2322 }
2323
2324 CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
2325 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2326
2327 if (r < 0) {
2328 ldout(cct, 1) << " write connect message reply failed" << dendl;
2329 connection->inject_delay();
2330 return _fault();
2331 }
2332
2333 return CONTINUE(wait_connect_message);
2334 }
2335
2336 CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
2337 ceph_msg_connect_reply &reply,
2338 ceph::buffer::list &authorizer_reply) {
2339 ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
2340
2341 connection->inject_delay();
2342 if (existing->policy.lossy) {
2343 // disconnect from the Connection
2344 ldout(cct, 1) << __func__ << " replacing on lossy channel, failing existing"
2345 << dendl;
2346 existing->protocol->stop();
2347 existing->dispatch_queue->queue_reset(existing.get());
2348 } else {
2349 ceph_assert(can_write == WriteStatus::NOWRITE);
2350 existing->write_lock.lock();
2351
2352 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2353
2354 // reset the in_seq if this is a hard reset from peer,
2355 // otherwise we respect our original connection's value
2356 if (is_reset_from_peer) {
2357 exproto->is_reset_from_peer = true;
2358 }
2359
2360 connection->center->delete_file_event(connection->cs.fd(),
2361 EVENT_READABLE | EVENT_WRITABLE);
2362
2363 if (existing->delay_state) {
2364 existing->delay_state->flush();
2365 ceph_assert(!connection->delay_state);
2366 }
2367 exproto->reset_recv_state();
2368
2369 exproto->connect_msg.features = connect_msg.features;
2370
2371 auto temp_cs = std::move(connection->cs);
2372 EventCenter *new_center = connection->center;
2373 Worker *new_worker = connection->worker;
2374 // avoid _stop shutdown replacing socket
2375 // queue a reset on the new connection, which we're dumping for the old
2376 stop();
2377
2378 connection->dispatch_queue->queue_reset(connection);
2379 ldout(messenger->cct, 1)
2380 << __func__ << " stop myself to swap existing" << dendl;
2381 exproto->can_write = WriteStatus::REPLACING;
2382 exproto->replacing = true;
2383 exproto->write_in_progress = false;
2384 existing->state_offset = 0;
2385 // avoid previous thread modify event
2386 exproto->state = NONE;
2387 existing->state = AsyncConnection::STATE_NONE;
2388 // Discard existing prefetch buffer in `recv_buf`
2389 existing->recv_start = existing->recv_end = 0;
2390 // there shouldn't exist any buffer
2391 ceph_assert(connection->recv_start == connection->recv_end);
2392
2393 auto deactivate_existing = std::bind(
2394 [existing, new_worker, new_center, exproto, reply,
2395 authorizer_reply](ConnectedSocket &cs) mutable {
2396 // we need to delete time event in original thread
2397 {
2398 std::lock_guard<std::mutex> l(existing->lock);
2399 existing->write_lock.lock();
2400 exproto->requeue_sent();
2401 existing->outgoing_bl.clear();
2402 existing->open_write = false;
2403 existing->write_lock.unlock();
2404 if (exproto->state == NONE) {
2405 existing->shutdown_socket();
2406 existing->cs = std::move(cs);
2407 existing->worker->references--;
2408 new_worker->references++;
2409 existing->logger = new_worker->get_perf_counter();
2410 existing->worker = new_worker;
2411 existing->center = new_center;
2412 if (existing->delay_state)
2413 existing->delay_state->set_center(new_center);
2414 } else if (exproto->state == CLOSED) {
2415 auto back_to_close =
2416 std::bind([](ConnectedSocket &cs) mutable { cs.close(); },
2417 std::move(cs));
2418 new_center->submit_to(new_center->get_id(),
2419 std::move(back_to_close), true);
2420 return;
2421 } else {
2422 ceph_abort();
2423 }
2424 }
2425
2426 // Before changing existing->center, it may already exists some
2427 // events in existing->center's queue. Then if we mark down
2428 // `existing`, it will execute in another thread and clean up
2429 // connection. Previous event will result in segment fault
2430 auto transfer_existing = [existing, exproto, reply,
2431 authorizer_reply]() mutable {
2432 std::lock_guard<std::mutex> l(existing->lock);
2433 if (exproto->state == CLOSED) return;
2434 ceph_assert(exproto->state == NONE);
2435
2436 // we have called shutdown_socket above
2437 ceph_assert(existing->last_tick_id == 0);
2438 // restart timer since we are going to re-build connection
2439 existing->last_connect_started = ceph::coarse_mono_clock::now();
2440 existing->last_tick_id = existing->center->create_time_event(
2441 existing->connect_timeout_us, existing->tick_handler);
2442 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2443 exproto->state = ACCEPTING;
2444
2445 existing->center->create_file_event(
2446 existing->cs.fd(), EVENT_READABLE, existing->read_handler);
2447 reply.global_seq = exproto->peer_global_seq;
2448 exproto->run_continuation(exproto->send_connect_message_reply(
2449 CEPH_MSGR_TAG_RETRY_GLOBAL, reply, authorizer_reply));
2450 };
2451 if (existing->center->in_thread())
2452 transfer_existing();
2453 else
2454 existing->center->submit_to(existing->center->get_id(),
2455 std::move(transfer_existing), true);
2456 },
2457 std::move(temp_cs));
2458
2459 existing->center->submit_to(existing->center->get_id(),
2460 std::move(deactivate_existing), true);
2461 existing->write_lock.unlock();
2462 existing->lock.unlock();
2463 return nullptr;
2464 }
2465 existing->lock.unlock();
2466
2467 return open(reply, authorizer_reply);
2468 }
2469
2470 CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
2471 ceph::buffer::list &authorizer_reply) {
2472 ldout(cct, 20) << __func__ << dendl;
2473
2474 connect_seq = connect_msg.connect_seq + 1;
2475 peer_global_seq = connect_msg.global_seq;
2476 ldout(cct, 10) << __func__ << " accept success, connect_seq = " << connect_seq
2477 << " in_seq=" << in_seq << ", sending READY" << dendl;
2478
2479 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2480 // in/out sequence
2481 if ((connect_msg.features & CEPH_FEATURE_RECONNECT_SEQ) &&
2482 !is_reset_from_peer) {
2483 reply.tag = CEPH_MSGR_TAG_SEQ;
2484 wait_for_seq = true;
2485 } else {
2486 reply.tag = CEPH_MSGR_TAG_READY;
2487 wait_for_seq = false;
2488 out_seq = discard_requeued_up_to(out_seq, 0);
2489 is_reset_from_peer = false;
2490 in_seq = 0;
2491 }
2492
2493 // send READY reply
2494 reply.features = connection->policy.features_supported;
2495 reply.global_seq = messenger->get_global_seq();
2496 reply.connect_seq = connect_seq;
2497 reply.flags = 0;
2498 reply.authorizer_len = authorizer_reply.length();
2499 if (connection->policy.lossy) {
2500 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
2501 }
2502
2503 connection->set_features((uint64_t)reply.features &
2504 (uint64_t)connect_msg.features);
2505 ldout(cct, 10) << __func__ << " accept features "
2506 << connection->get_features()
2507 << " authorizer_protocol "
2508 << connect_msg.authorizer_protocol << dendl;
2509
2510 session_security.reset(
2511 get_auth_session_handler(cct, auth_meta->auth_method,
2512 auth_meta->session_key,
2513 connection->get_features()));
2514
2515 ceph::buffer::list reply_bl;
2516 reply_bl.append((char *)&reply, sizeof(reply));
2517
2518 if (reply.authorizer_len) {
2519 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2520 }
2521
2522 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
2523 uint64_t s = in_seq;
2524 reply_bl.append((char *)&s, sizeof(s));
2525 }
2526
2527 connection->lock.unlock();
2528 // Because "replacing" will prevent other connections preempt this addr,
2529 // it's safe that here we don't acquire Connection's lock
2530 ssize_t r = messenger->accept_conn(connection);
2531
2532 connection->inject_delay();
2533
2534 connection->lock.lock();
2535 replacing = false;
2536 if (r < 0) {
2537 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2538 << connection->peer_addrs->legacy_addr()
2539 << " just fail later one(this)" << dendl;
2540 ldout(cct, 10) << "accept fault after register" << dendl;
2541 connection->inject_delay();
2542 return _fault();
2543 }
2544 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2545 ldout(cct, 1) << __func__
2546 << " state changed while accept_conn, it must be mark_down"
2547 << dendl;
2548 ceph_assert(state == CLOSED || state == NONE);
2549 ldout(cct, 10) << "accept fault after register" << dendl;
2550 messenger->unregister_conn(connection);
2551 connection->inject_delay();
2552 return _fault();
2553 }
2554
2555 return WRITE(reply_bl, handle_ready_connect_message_reply_write);
2556 }
2557
2558 CtPtr ProtocolV1::handle_ready_connect_message_reply_write(int r) {
2559 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2560
2561 if (r < 0) {
2562 ldout(cct, 1) << __func__ << " write ready connect message reply failed"
2563 << dendl;
2564 return _fault();
2565 }
2566
2567 // notify
2568 connection->dispatch_queue->queue_accept(connection);
2569 messenger->ms_deliver_handle_fast_accept(connection);
2570 once_ready = true;
2571
2572 state = ACCEPTING_HANDLED_CONNECT_MSG;
2573
2574 if (wait_for_seq) {
2575 return wait_seq();
2576 }
2577
2578 return server_ready();
2579 }
2580
2581 CtPtr ProtocolV1::wait_seq() {
2582 ldout(cct, 20) << __func__ << dendl;
2583
2584 return READ(sizeof(uint64_t), handle_seq);
2585 }
2586
2587 CtPtr ProtocolV1::handle_seq(char *buffer, int r) {
2588 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2589
2590 if (r < 0) {
2591 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
2592 return _fault();
2593 }
2594
2595 uint64_t newly_acked_seq = *(uint64_t *)buffer;
2596 ldout(cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq
2597 << dendl;
2598 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
2599
2600 return server_ready();
2601 }
2602
2603 CtPtr ProtocolV1::server_ready() {
2604 ldout(cct, 20) << __func__ << " session_security is "
2605 << session_security
2606 << dendl;
2607
2608 ldout(cct, 20) << __func__ << " accept done" << dendl;
2609 // FIPS zeroization audit 20191115: this memset is not security related.
2610 memset(&connect_msg, 0, sizeof(connect_msg));
2611
2612 if (connection->delay_state) {
2613 ceph_assert(connection->delay_state->ready());
2614 }
2615
2616 return ready();
2617 }