]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncConnection.cc
import ceph 12.2.12
[ceph.git] / ceph / src / msg / async / AsyncConnection.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <unistd.h>
18
19 #include "include/Context.h"
20 #include "common/errno.h"
21 #include "AsyncMessenger.h"
22 #include "AsyncConnection.h"
23
24 #include "messages/MOSDOp.h"
25 #include "messages/MOSDOpReply.h"
26 #include "common/EventTrace.h"
27
28 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
29 #define SEQ_MASK 0x7fffffff
30
31 #define dout_subsys ceph_subsys_ms
32 #undef dout_prefix
33 #define dout_prefix _conn_prefix(_dout)
34 ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
35 return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
36 << " :" << port
37 << " s=" << get_state_name(state)
38 << " pgs=" << peer_global_seq
39 << " cs=" << connect_seq
40 << " l=" << policy.lossy
41 << ").";
42 }
43
44 // Notes:
45 // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
46
47 const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
48 const int ASYNC_COALESCE_THRESHOLD = 256;
49
50 class C_time_wakeup : public EventCallback {
51 AsyncConnectionRef conn;
52
53 public:
54 explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {}
55 void do_request(int fd_or_id) override {
56 conn->wakeup_from(fd_or_id);
57 }
58 };
59
60 class C_handle_read : public EventCallback {
61 AsyncConnectionRef conn;
62
63 public:
64 explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
65 void do_request(int fd_or_id) override {
66 conn->process();
67 }
68 };
69
70 class C_handle_write : public EventCallback {
71 AsyncConnectionRef conn;
72
73 public:
74 explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
75 void do_request(int fd) override {
76 conn->handle_write();
77 }
78 };
79
80 class C_clean_handler : public EventCallback {
81 AsyncConnectionRef conn;
82 public:
83 explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
84 void do_request(int id) override {
85 conn->cleanup();
86 delete this;
87 }
88 };
89
90 class C_tick_wakeup : public EventCallback {
91 AsyncConnectionRef conn;
92
93 public:
94 explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
95 void do_request(int fd_or_id) override {
96 conn->tick(fd_or_id);
97 }
98 };
99
100 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
101 {
102 // create a buffer to read into that matches the data alignment
103 unsigned left = len;
104 if (off & ~CEPH_PAGE_MASK) {
105 // head
106 unsigned head = 0;
107 head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
108 data.push_back(buffer::create(head));
109 left -= head;
110 }
111 unsigned middle = left & CEPH_PAGE_MASK;
112 if (middle > 0) {
113 data.push_back(buffer::create_page_aligned(middle));
114 left -= middle;
115 }
116 if (left) {
117 data.push_back(buffer::create(left));
118 }
119 }
120
121 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
122 Worker *w)
123 : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
124 logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
125 state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
126 dispatch_queue(q), can_write(WriteStatus::NOWRITE),
127 keepalive(false), recv_buf(NULL),
128 recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
129 recv_start(0), recv_end(0),
130 last_active(ceph::coarse_mono_clock::now()),
131 inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
132 authorizer(NULL), replacing(false),
133 is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0),
134 worker(w), center(&w->center)
135 {
136 read_handler = new C_handle_read(this);
137 write_handler = new C_handle_write(this);
138 wakeup_handler = new C_time_wakeup(this);
139 tick_handler = new C_tick_wakeup(this);
140 memset(msgvec, 0, sizeof(msgvec));
141 // double recv_max_prefetch see "read_until"
142 recv_buf = new char[2*recv_max_prefetch];
143 state_buffer = new char[4096];
144 logger->inc(l_msgr_created_connections);
145 }
146
147 AsyncConnection::~AsyncConnection()
148 {
149 assert(out_q.empty());
150 assert(sent.empty());
151 delete authorizer;
152 if (recv_buf)
153 delete[] recv_buf;
154 if (state_buffer)
155 delete[] state_buffer;
156 assert(!delay_state);
157 }
158
159 void AsyncConnection::maybe_start_delay_thread()
160 {
161 if (!delay_state) {
162 auto pos = async_msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(peer_type));
163 if (pos != string::npos) {
164 ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl;
165 delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id);
166 }
167 }
168 }
169
170 /* return -1 means `fd` occurs error or closed, it should be closed
171 * return 0 means EAGAIN or EINTR */
172 ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
173 {
174 ssize_t nread;
175 again:
176 nread = cs.read(buf, len);
177 if (nread < 0) {
178 if (nread == -EAGAIN) {
179 nread = 0;
180 } else if (nread == -EINTR) {
181 goto again;
182 } else {
183 ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
184 << " : "<< strerror(nread) << dendl;
185 return -1;
186 }
187 } else if (nread == 0) {
188 ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
189 << cs.fd() << dendl;
190 return -1;
191 }
192 return nread;
193 }
194
195 // return the remaining bytes, it may larger than the length of ptr
196 // else return < 0 means error
197 ssize_t AsyncConnection::_try_send(bool more)
198 {
199 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
200 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
201 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
202 cs.shutdown();
203 }
204 }
205
206 assert(center->in_thread());
207 ssize_t r = cs.send(outcoming_bl, more);
208 if (r < 0) {
209 ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
210 return r;
211 }
212
213 ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
214 << " remaining bytes " << outcoming_bl.length() << dendl;
215
216 if (!open_write && is_queued()) {
217 center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
218 open_write = true;
219 }
220
221 if (open_write && !is_queued()) {
222 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
223 open_write = false;
224 if (state_after_send != STATE_NONE)
225 center->dispatch_event_external(read_handler);
226 }
227
228 return outcoming_bl.length();
229 }
230
231 // Because this func will be called multi times to populate
232 // the needed buffer, so the passed in bufferptr must be the same.
233 // Normally, only "read_message" will pass existing bufferptr in
234 //
235 // And it will uses readahead method to reduce small read overhead,
236 // "recv_buf" is used to store read buffer
237 //
238 // return the remaining bytes, 0 means this buffer is finished
239 // else return < 0 means error
240 ssize_t AsyncConnection::read_until(unsigned len, char *p)
241 {
242 ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
243 << state_offset << dendl;
244
245 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
246 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
247 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
248 cs.shutdown();
249 }
250 }
251
252 ssize_t r = 0;
253 uint64_t left = len - state_offset;
254 if (recv_end > recv_start) {
255 uint64_t to_read = MIN(recv_end - recv_start, left);
256 memcpy(p, recv_buf+recv_start, to_read);
257 recv_start += to_read;
258 left -= to_read;
259 ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer "
260 << " left is " << left << " buffer still has "
261 << recv_end - recv_start << dendl;
262 if (left == 0) {
263 return 0;
264 }
265 state_offset += to_read;
266 }
267
268 recv_end = recv_start = 0;
269 /* nothing left in the prefetch buffer */
270 if (len > recv_max_prefetch) {
271 /* this was a large read, we don't prefetch for these */
272 do {
273 r = read_bulk(p+state_offset, left);
274 ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
275 if (r < 0) {
276 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
277 return -1;
278 } else if (r == static_cast<int>(left)) {
279 state_offset = 0;
280 return 0;
281 }
282 state_offset += r;
283 left -= r;
284 } while (r > 0);
285 } else {
286 do {
287 r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
288 ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
289 << " left is " << left << " got " << r << dendl;
290 if (r < 0) {
291 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
292 return -1;
293 }
294 recv_end += r;
295 if (r >= static_cast<int>(left)) {
296 recv_start = len - state_offset;
297 memcpy(p+state_offset, recv_buf, recv_start);
298 state_offset = 0;
299 return 0;
300 }
301 left -= r;
302 } while (r > 0);
303 memcpy(p+state_offset, recv_buf, recv_end-recv_start);
304 state_offset += (recv_end - recv_start);
305 recv_end = recv_start = 0;
306 }
307 ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining "
308 << len - state_offset << " bytes" << dendl;
309 return len - state_offset;
310 }
311
312 void AsyncConnection::inject_delay() {
313 if (async_msgr->cct->_conf->ms_inject_internal_delays) {
314 ldout(async_msgr->cct, 10) << __func__ << " sleep for " <<
315 async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
316 utime_t t;
317 t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
318 t.sleep();
319 }
320 }
321
322 void AsyncConnection::process()
323 {
324 ssize_t r = 0;
325 int prev_state = state;
326 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
327 utime_t ltt_recv_stamp = ceph_clock_now();
328 #endif
329 bool need_dispatch_writer = false;
330 std::lock_guard<std::mutex> l(lock);
331 last_active = ceph::coarse_mono_clock::now();
332 auto recv_start_time = ceph::mono_clock::now();
333 do {
334 ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
335 prev_state = state;
336 switch (state) {
337 case STATE_OPEN:
338 {
339 char tag = -1;
340 r = read_until(sizeof(tag), &tag);
341 if (r < 0) {
342 ldout(async_msgr->cct, 1) << __func__ << " read tag failed" << dendl;
343 goto fail;
344 } else if (r > 0) {
345 break;
346 }
347
348 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
349 ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
350 set_last_keepalive(ceph_clock_now());
351 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
352 state = STATE_OPEN_KEEPALIVE2;
353 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
354 state = STATE_OPEN_KEEPALIVE2_ACK;
355 } else if (tag == CEPH_MSGR_TAG_ACK) {
356 state = STATE_OPEN_TAG_ACK;
357 } else if (tag == CEPH_MSGR_TAG_MSG) {
358 state = STATE_OPEN_MESSAGE_HEADER;
359 } else if (tag == CEPH_MSGR_TAG_CLOSE) {
360 state = STATE_OPEN_TAG_CLOSE;
361 } else {
362 ldout(async_msgr->cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
363 goto fail;
364 }
365
366 break;
367 }
368
369 case STATE_OPEN_KEEPALIVE2:
370 {
371 ceph_timespec *t;
372 r = read_until(sizeof(*t), state_buffer);
373 if (r < 0) {
374 ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
375 goto fail;
376 } else if (r > 0) {
377 break;
378 }
379
380 ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
381 t = (ceph_timespec*)state_buffer;
382 utime_t kp_t = utime_t(*t);
383 write_lock.lock();
384 _append_keepalive_or_ack(true, &kp_t);
385 write_lock.unlock();
386 ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
387 set_last_keepalive(ceph_clock_now());
388 need_dispatch_writer = true;
389 state = STATE_OPEN;
390 break;
391 }
392
393 case STATE_OPEN_KEEPALIVE2_ACK:
394 {
395 ceph_timespec *t;
396 r = read_until(sizeof(*t), state_buffer);
397 if (r < 0) {
398 ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
399 goto fail;
400 } else if (r > 0) {
401 break;
402 }
403
404 t = (ceph_timespec*)state_buffer;
405 set_last_keepalive_ack(utime_t(*t));
406 ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
407 state = STATE_OPEN;
408 break;
409 }
410
411 case STATE_OPEN_TAG_ACK:
412 {
413 ceph_le64 *seq;
414 r = read_until(sizeof(*seq), state_buffer);
415 if (r < 0) {
416 ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
417 goto fail;
418 } else if (r > 0) {
419 break;
420 }
421
422 seq = (ceph_le64*)state_buffer;
423 ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl;
424 handle_ack(*seq);
425 state = STATE_OPEN;
426 break;
427 }
428
429 case STATE_OPEN_MESSAGE_HEADER:
430 {
431 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
432 ltt_recv_stamp = ceph_clock_now();
433 #endif
434 recv_stamp = ceph_clock_now();
435 ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl;
436 ceph_msg_header header;
437 ceph_msg_header_old oldheader;
438 __u32 header_crc = 0;
439 unsigned len;
440 if (has_feature(CEPH_FEATURE_NOSRCADDR))
441 len = sizeof(header);
442 else
443 len = sizeof(oldheader);
444
445 r = read_until(len, state_buffer);
446 if (r < 0) {
447 ldout(async_msgr->cct, 1) << __func__ << " read message header failed" << dendl;
448 goto fail;
449 } else if (r > 0) {
450 break;
451 }
452
453 ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl;
454
455 if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
456 header = *((ceph_msg_header*)state_buffer);
457 if (msgr->crcflags & MSG_CRC_HEADER)
458 header_crc = ceph_crc32c(0, (unsigned char *)&header,
459 sizeof(header) - sizeof(header.crc));
460 } else {
461 oldheader = *((ceph_msg_header_old*)state_buffer);
462 // this is fugly
463 memcpy(&header, &oldheader, sizeof(header));
464 header.src = oldheader.src.name;
465 header.reserved = oldheader.reserved;
466 if (msgr->crcflags & MSG_CRC_HEADER) {
467 header.crc = oldheader.crc;
468 header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
469 }
470 }
471
472 ldout(async_msgr->cct, 20) << __func__ << " got envelope type=" << header.type
473 << " src " << entity_name_t(header.src)
474 << " front=" << header.front_len
475 << " data=" << header.data_len
476 << " off " << header.data_off << dendl;
477
478 // verify header crc
479 if (msgr->crcflags & MSG_CRC_HEADER && header_crc != header.crc) {
480 ldout(async_msgr->cct,0) << __func__ << " got bad header crc "
481 << header_crc << " != " << header.crc << dendl;
482 goto fail;
483 }
484
485 // Reset state
486 data_buf.clear();
487 front.clear();
488 middle.clear();
489 data.clear();
490 current_header = header;
491 state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE;
492 break;
493 }
494
495 case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE:
496 {
497 if (policy.throttler_messages) {
498 ldout(async_msgr->cct, 10) << __func__ << " wants " << 1 << " message from policy throttler "
499 << policy.throttler_messages->get_current() << "/"
500 << policy.throttler_messages->get_max() << dendl;
501 if (!policy.throttler_messages->get_or_fail()) {
502 ldout(async_msgr->cct, 10) << __func__ << " wants 1 message from policy throttle "
503 << policy.throttler_messages->get_current() << "/"
504 << policy.throttler_messages->get_max() << " failed, just wait." << dendl;
505 // following thread pool deal with th full message queue isn't a
506 // short time, so we can wait a ms.
507 if (register_time_events.empty())
508 register_time_events.insert(center->create_time_event(1000, wakeup_handler));
509 break;
510 }
511 }
512
513 state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
514 break;
515 }
516
517 case STATE_OPEN_MESSAGE_THROTTLE_BYTES:
518 {
519 cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len;
520 if (cur_msg_size) {
521 if (policy.throttler_bytes) {
522 ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
523 << policy.throttler_bytes->get_current() << "/"
524 << policy.throttler_bytes->get_max() << dendl;
525 if (!policy.throttler_bytes->get_or_fail(cur_msg_size)) {
526 ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
527 << policy.throttler_bytes->get_current() << "/"
528 << policy.throttler_bytes->get_max() << " failed, just wait." << dendl;
529 // following thread pool deal with th full message queue isn't a
530 // short time, so we can wait a ms.
531 if (register_time_events.empty())
532 register_time_events.insert(center->create_time_event(1000, wakeup_handler));
533 break;
534 }
535 }
536 }
537
538 state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE;
539 break;
540 }
541
542 case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE:
543 {
544 if (cur_msg_size) {
545 if (!dispatch_queue->dispatch_throttler.get_or_fail(cur_msg_size)) {
546 ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from dispatch throttle "
547 << dispatch_queue->dispatch_throttler.get_current() << "/"
548 << dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl;
549 // following thread pool deal with th full message queue isn't a
550 // short time, so we can wait a ms.
551 if (register_time_events.empty())
552 register_time_events.insert(center->create_time_event(1000, wakeup_handler));
553 break;
554 }
555 }
556
557 throttle_stamp = ceph_clock_now();
558 state = STATE_OPEN_MESSAGE_READ_FRONT;
559 break;
560 }
561
562 case STATE_OPEN_MESSAGE_READ_FRONT:
563 {
564 // read front
565 unsigned front_len = current_header.front_len;
566 if (front_len) {
567 if (!front.length())
568 front.push_back(buffer::create(front_len));
569
570 r = read_until(front_len, front.c_str());
571 if (r < 0) {
572 ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl;
573 goto fail;
574 } else if (r > 0) {
575 break;
576 }
577
578 ldout(async_msgr->cct, 20) << __func__ << " got front " << front.length() << dendl;
579 }
580 state = STATE_OPEN_MESSAGE_READ_MIDDLE;
581 }
582
583 case STATE_OPEN_MESSAGE_READ_MIDDLE:
584 {
585 // read middle
586 unsigned middle_len = current_header.middle_len;
587 if (middle_len) {
588 if (!middle.length())
589 middle.push_back(buffer::create(middle_len));
590
591 r = read_until(middle_len, middle.c_str());
592 if (r < 0) {
593 ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl;
594 goto fail;
595 } else if (r > 0) {
596 break;
597 }
598 ldout(async_msgr->cct, 20) << __func__ << " got middle " << middle.length() << dendl;
599 }
600
601 state = STATE_OPEN_MESSAGE_READ_DATA_PREPARE;
602 }
603
604 case STATE_OPEN_MESSAGE_READ_DATA_PREPARE:
605 {
606 // read data
607 unsigned data_len = le32_to_cpu(current_header.data_len);
608 unsigned data_off = le32_to_cpu(current_header.data_off);
609 if (data_len) {
610 // get a buffer
611 map<ceph_tid_t,pair<bufferlist,int> >::iterator p = rx_buffers.find(current_header.tid);
612 if (p != rx_buffers.end()) {
613 ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second
614 << " at offset " << data_off
615 << " len " << p->second.first.length() << dendl;
616 data_buf = p->second.first;
617 // make sure it's big enough
618 if (data_buf.length() < data_len)
619 data_buf.push_back(buffer::create(data_len - data_buf.length()));
620 data_blp = data_buf.begin();
621 } else {
622 ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl;
623 alloc_aligned_buffer(data_buf, data_len, data_off);
624 data_blp = data_buf.begin();
625 }
626 }
627
628 msg_left = data_len;
629 state = STATE_OPEN_MESSAGE_READ_DATA;
630 }
631
632 case STATE_OPEN_MESSAGE_READ_DATA:
633 {
634 while (msg_left > 0) {
635 bufferptr bp = data_blp.get_current_ptr();
636 unsigned read = MIN(bp.length(), msg_left);
637 r = read_until(read, bp.c_str());
638 if (r < 0) {
639 ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl;
640 goto fail;
641 } else if (r > 0) {
642 break;
643 }
644
645 data_blp.advance(read);
646 data.append(bp, 0, read);
647 msg_left -= read;
648 }
649
650 if (msg_left > 0)
651 break;
652
653 state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH;
654 }
655
656 case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH:
657 {
658 ceph_msg_footer footer;
659 ceph_msg_footer_old old_footer;
660 unsigned len;
661 // footer
662 if (has_feature(CEPH_FEATURE_MSG_AUTH))
663 len = sizeof(footer);
664 else
665 len = sizeof(old_footer);
666
667 r = read_until(len, state_buffer);
668 if (r < 0) {
669 ldout(async_msgr->cct, 1) << __func__ << " read footer data error " << dendl;
670 goto fail;
671 } else if (r > 0) {
672 break;
673 }
674
675 if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
676 footer = *((ceph_msg_footer*)state_buffer);
677 } else {
678 old_footer = *((ceph_msg_footer_old*)state_buffer);
679 footer.front_crc = old_footer.front_crc;
680 footer.middle_crc = old_footer.middle_crc;
681 footer.data_crc = old_footer.data_crc;
682 footer.sig = 0;
683 footer.flags = old_footer.flags;
684 }
685 int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
686 ldout(async_msgr->cct, 10) << __func__ << " aborted = " << aborted << dendl;
687 if (aborted) {
688 ldout(async_msgr->cct, 0) << __func__ << " got " << front.length() << " + " << middle.length() << " + " << data.length()
689 << " byte message.. ABORTED" << dendl;
690 goto fail;
691 }
692
693 ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length()
694 << " + " << data.length() << " byte message" << dendl;
695 Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer,
696 front, middle, data, this);
697 if (!message) {
698 ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl;
699 goto fail;
700 }
701
702 //
703 // Check the signature if one should be present. A zero return indicates success. PLR
704 //
705
706 if (session_security.get() == NULL) {
707 ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl;
708 } else {
709 if (session_security->check_message_signature(message)) {
710 ldout(async_msgr->cct, 0) << __func__ << " Signature check failed" << dendl;
711 message->put();
712 goto fail;
713 }
714 }
715 message->set_byte_throttler(policy.throttler_bytes);
716 message->set_message_throttler(policy.throttler_messages);
717
718 // store reservation size in message, so we don't get confused
719 // by messages entering the dispatch queue through other paths.
720 message->set_dispatch_throttle_size(cur_msg_size);
721
722 message->set_recv_stamp(recv_stamp);
723 message->set_throttle_stamp(throttle_stamp);
724 message->set_recv_complete_stamp(ceph_clock_now());
725
726 // check received seq#. if it is old, drop the message.
727 // note that incoming messages may skip ahead. this is convenient for the client
728 // side queueing because messages can't be renumbered, but the (kernel) client will
729 // occasionally pull a message out of the sent queue to send elsewhere. in that case
730 // it doesn't matter if we "got" it or not.
731 uint64_t cur_seq = in_seq;
732 if (message->get_seq() <= cur_seq) {
733 ldout(async_msgr->cct,0) << __func__ << " got old message "
734 << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
735 << ", discarding" << dendl;
736 message->put();
737 if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
738 assert(0 == "old msgs despite reconnect_seq feature");
739 break;
740 }
741 if (message->get_seq() > cur_seq + 1) {
742 ldout(async_msgr->cct, 0) << __func__ << " missed message? skipped from seq "
743 << cur_seq << " to " << message->get_seq() << dendl;
744 if (async_msgr->cct->_conf->ms_die_on_skipped_message)
745 assert(0 == "skipped incoming seq");
746 }
747
748 message->set_connection(this);
749
750 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
751 if (message->get_type() == CEPH_MSG_OSD_OP || message->get_type() == CEPH_MSG_OSD_OPREPLY) {
752 utime_t ltt_processed_stamp = ceph_clock_now();
753 double usecs_elapsed = (ltt_processed_stamp.to_nsec()-ltt_recv_stamp.to_nsec())/1000;
754 ostringstream buf;
755 if (message->get_type() == CEPH_MSG_OSD_OP)
756 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP", false);
757 else
758 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY", false);
759 }
760 #endif
761
762 // note last received message.
763 in_seq = message->get_seq();
764 ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq "
765 << message->get_seq() << " " << message
766 << " " << *message << dendl;
767
768 if (!policy.lossy) {
769 ack_left++;
770 need_dispatch_writer = true;
771 }
772 state = STATE_OPEN;
773
774 logger->inc(l_msgr_recv_messages);
775 logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
776
777 async_msgr->ms_fast_preprocess(message);
778 auto fast_dispatch_time = ceph::mono_clock::now();
779 logger->tinc(l_msgr_running_recv_time, fast_dispatch_time - recv_start_time);
780 if (delay_state) {
781 utime_t release = message->get_recv_stamp();
782 double delay_period = 0;
783 if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
784 delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
785 release += delay_period;
786 ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on "
787 << message << " " << *message << dendl;
788 }
789 delay_state->queue(delay_period, release, message);
790 } else if (async_msgr->ms_can_fast_dispatch(message)) {
791 lock.unlock();
792 dispatch_queue->fast_dispatch(message);
793 recv_start_time = ceph::mono_clock::now();
794 logger->tinc(l_msgr_running_fast_dispatch_time,
795 recv_start_time - fast_dispatch_time);
796 lock.lock();
797 } else {
798 dispatch_queue->enqueue(message, message->get_priority(), conn_id);
799 }
800
801 // clean up local buffer references
802 data_buf.clear();
803 front.clear();
804 middle.clear();
805 data.clear();
806
807 break;
808 }
809
810 case STATE_OPEN_TAG_CLOSE:
811 {
812 ldout(async_msgr->cct, 20) << __func__ << " got CLOSE" << dendl;
813 _stop();
814 return ;
815 }
816
817 case STATE_STANDBY:
818 {
819 ldout(async_msgr->cct, 20) << __func__ << " enter STANDY" << dendl;
820
821 break;
822 }
823
824 case STATE_NONE:
825 {
826 ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
827 break;
828 }
829
830 case STATE_CLOSED:
831 {
832 ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
833 break;
834 }
835
836 case STATE_WAIT:
837 {
838 ldout(async_msgr->cct, 1) << __func__ << " enter wait state, failing" << dendl;
839 goto fail;
840 }
841
842 default:
843 {
844 if (_process_connection() < 0)
845 goto fail;
846 break;
847 }
848 }
849 } while (prev_state != state);
850
851 if (need_dispatch_writer && is_connected())
852 center->dispatch_event_external(write_handler);
853
854 logger->tinc(l_msgr_running_recv_time, ceph::mono_clock::now() - recv_start_time);
855 return;
856
857 fail:
858 fault();
859 }
860
861 ssize_t AsyncConnection::_process_connection()
862 {
863 ssize_t r = 0;
864
865 switch(state) {
866 case STATE_WAIT_SEND:
867 {
868 std::lock_guard<std::mutex> l(write_lock);
869 if (!outcoming_bl.length()) {
870 assert(state_after_send);
871 state = state_after_send;
872 state_after_send = STATE_NONE;
873 }
874 break;
875 }
876
877 case STATE_CONNECTING:
878 {
879 assert(!policy.server);
880
881 // reset connect state variables
882 delete authorizer;
883 authorizer = NULL;
884 authorizer_buf.clear();
885 memset(&connect_msg, 0, sizeof(connect_msg));
886 memset(&connect_reply, 0, sizeof(connect_reply));
887
888 global_seq = async_msgr->get_global_seq();
889 // close old socket. this is safe because we stopped the reader thread above.
890 if (cs) {
891 center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
892 cs.close();
893 }
894
895 SocketOptions opts;
896 opts.priority = async_msgr->get_socket_priority();
897 opts.connect_bind_addr = msgr->get_myaddr();
898 r = worker->connect(get_peer_addr(), opts, &cs);
899 if (r < 0)
900 goto fail;
901
902 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
903 state = STATE_CONNECTING_RE;
904 break;
905 }
906
907 case STATE_CONNECTING_RE:
908 {
909 r = cs.is_connected();
910 if (r < 0) {
911 ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
912 if (r == -ECONNREFUSED) {
913 ldout(async_msgr->cct, 2) << __func__ << " connection refused!" << dendl;
914 dispatch_queue->queue_refused(this);
915 }
916 goto fail;
917 } else if (r == 0) {
918 ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
919 if (async_msgr->get_stack()->nonblock_connect_need_writable_event())
920 center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
921 break;
922 }
923
924 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
925 ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl;
926
927 bufferlist bl;
928 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
929 r = try_send(bl);
930 if (r == 0) {
931 state = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY;
932 ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: "
933 << get_peer_addr() << dendl;
934 } else if (r > 0) {
935 state = STATE_WAIT_SEND;
936 state_after_send = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY;
937 ldout(async_msgr->cct, 10) << __func__ << " connect wait for write banner: "
938 << get_peer_addr() << dendl;
939 } else {
940 goto fail;
941 }
942
943 break;
944 }
945
946 case STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY:
947 {
948 entity_addr_t paddr, peer_addr_for_me;
949 bufferlist myaddrbl;
950 unsigned banner_len = strlen(CEPH_BANNER);
951 unsigned need_len = banner_len + sizeof(ceph_entity_addr)*2;
952 r = read_until(need_len, state_buffer);
953 if (r < 0) {
954 ldout(async_msgr->cct, 1) << __func__ << " read banner and identify addresses failed" << dendl;
955 goto fail;
956 } else if (r > 0) {
957 break;
958 }
959
960 if (memcmp(state_buffer, CEPH_BANNER, banner_len)) {
961 ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
962 << get_peer_addr() << dendl;
963 goto fail;
964 }
965
966 bufferlist bl;
967 bl.append(state_buffer+banner_len, sizeof(ceph_entity_addr)*2);
968 bufferlist::iterator p = bl.begin();
969 try {
970 ::decode(paddr, p);
971 ::decode(peer_addr_for_me, p);
972 } catch (const buffer::error& e) {
973 lderr(async_msgr->cct) << __func__ << " decode peer addr failed " << dendl;
974 goto fail;
975 }
976 ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr "
977 << paddr << " on socket " << cs.fd() << dendl;
978 if (peer_addr != paddr) {
979 if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
980 peer_addr.get_nonce() == paddr.get_nonce()) {
981 ldout(async_msgr->cct, 0) << __func__ << " connect claims to be " << paddr
982 << " not " << peer_addr
983 << " - presumably this is the same node!" << dendl;
984 } else {
985 ldout(async_msgr->cct, 10) << __func__ << " connect claims to be "
986 << paddr << " not " << peer_addr << dendl;
987 goto fail;
988 }
989 }
990
991 ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
992 lock.unlock();
993 async_msgr->learned_addr(peer_addr_for_me);
994 if (async_msgr->cct->_conf->ms_inject_internal_delays
995 && async_msgr->cct->_conf->ms_inject_socket_failures) {
996 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
997 ldout(msgr->cct, 10) << __func__ << " sleep for "
998 << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
999 utime_t t;
1000 t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
1001 t.sleep();
1002 }
1003 }
1004
1005 lock.lock();
1006 if (state != STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1007 ldout(async_msgr->cct, 1) << __func__ << " state changed while learned_addr, mark_down or "
1008 << " replacing must be happened just now" << dendl;
1009 return 0;
1010 }
1011
1012 ::encode(async_msgr->get_myaddr(), myaddrbl, 0); // legacy
1013 r = try_send(myaddrbl);
1014 if (r == 0) {
1015 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1016 ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr "
1017 << async_msgr->get_myaddr() << dendl;
1018 } else if (r > 0) {
1019 state = STATE_WAIT_SEND;
1020 state_after_send = STATE_CONNECTING_SEND_CONNECT_MSG;
1021 ldout(async_msgr->cct, 10) << __func__ << " connect send my addr done: "
1022 << async_msgr->get_myaddr() << dendl;
1023 } else {
1024 ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, "
1025 << cpp_strerror(r) << dendl;
1026 goto fail;
1027 }
1028
1029 break;
1030 }
1031
1032 case STATE_CONNECTING_SEND_CONNECT_MSG:
1033 {
1034 if (!authorizer) {
1035 authorizer = async_msgr->get_authorizer(peer_type, false);
1036 }
1037 bufferlist bl;
1038
1039 connect_msg.features = policy.features_supported;
1040 connect_msg.host_type = async_msgr->get_myinst().name.type();
1041 connect_msg.global_seq = global_seq;
1042 connect_msg.connect_seq = connect_seq;
1043 connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true);
1044 connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1045 connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1046 if (authorizer)
1047 ldout(async_msgr->cct, 10) << __func__ << " connect_msg.authorizer_len="
1048 << connect_msg.authorizer_len << " protocol="
1049 << connect_msg.authorizer_protocol << dendl;
1050 connect_msg.flags = 0;
1051 if (policy.lossy)
1052 connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1053 bl.append((char*)&connect_msg, sizeof(connect_msg));
1054 if (authorizer) {
1055 bl.append(authorizer->bl.c_str(), authorizer->bl.length());
1056 }
1057 ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq="
1058 << connect_seq << " proto=" << connect_msg.protocol_version << dendl;
1059
1060 r = try_send(bl);
1061 if (r == 0) {
1062 state = STATE_CONNECTING_WAIT_CONNECT_REPLY;
1063 ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl;
1064 } else if (r > 0) {
1065 state = STATE_WAIT_SEND;
1066 state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY;
1067 ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl;
1068 } else {
1069 ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply "
1070 << cpp_strerror(r) << dendl;
1071 goto fail;
1072 }
1073
1074 break;
1075 }
1076
1077 case STATE_CONNECTING_WAIT_CONNECT_REPLY:
1078 {
1079 r = read_until(sizeof(connect_reply), state_buffer);
1080 if (r < 0) {
1081 ldout(async_msgr->cct, 1) << __func__ << " read connect reply failed" << dendl;
1082 goto fail;
1083 } else if (r > 0) {
1084 break;
1085 }
1086
1087 connect_reply = *((ceph_msg_connect_reply*)state_buffer);
1088
1089 ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag
1090 << " connect_seq " << connect_reply.connect_seq << " global_seq "
1091 << connect_reply.global_seq << " proto " << connect_reply.protocol_version
1092 << " flags " << (int)connect_reply.flags << " features "
1093 << connect_reply.features << dendl;
1094 state = STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH;
1095
1096 break;
1097 }
1098
1099 case STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH:
1100 {
1101 bufferlist authorizer_reply;
1102 if (connect_reply.authorizer_len) {
1103 ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl;
1104 assert(connect_reply.authorizer_len < 4096);
1105 r = read_until(connect_reply.authorizer_len, state_buffer);
1106 if (r < 0) {
1107 ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl;
1108 goto fail;
1109 } else if (r > 0) {
1110 break;
1111 }
1112
1113 authorizer_reply.append(state_buffer, connect_reply.authorizer_len);
1114
1115 if (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
1116 ldout(async_msgr->cct,10) << __func__ << " connect got auth challenge" << dendl;
1117 authorizer->add_challenge(async_msgr->cct, authorizer_reply);
1118 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1119 break;
1120 }
1121
1122 auto iter = authorizer_reply.begin();
1123 if (authorizer && !authorizer->verify_reply(iter)) {
1124 ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
1125 goto fail;
1126 }
1127 }
1128 r = handle_connect_reply(connect_msg, connect_reply);
1129 if (r < 0)
1130 goto fail;
1131
1132 // state must be changed!
1133 assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH);
1134 break;
1135 }
1136
1137 case STATE_CONNECTING_WAIT_ACK_SEQ:
1138 {
1139 uint64_t newly_acked_seq = 0;
1140
1141 r = read_until(sizeof(newly_acked_seq), state_buffer);
1142 if (r < 0) {
1143 ldout(async_msgr->cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1144 goto fail;
1145 } else if (r > 0) {
1146 break;
1147 }
1148
1149 newly_acked_seq = *((uint64_t*)state_buffer);
1150 ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1151 << " vs out_seq " << out_seq << dendl;
1152 discard_requeued_up_to(newly_acked_seq);
1153 //while (newly_acked_seq > out_seq.read()) {
1154 // Message *m = _get_next_outgoing(NULL);
1155 // assert(m);
1156 // ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
1157 // << " " << *m << dendl;
1158 // assert(m->get_seq() <= newly_acked_seq);
1159 // m->put();
1160 // out_seq.inc();
1161 //}
1162
1163 bufferlist bl;
1164 uint64_t s = in_seq;
1165 bl.append((char*)&s, sizeof(s));
1166 r = try_send(bl);
1167 if (r == 0) {
1168 state = STATE_CONNECTING_READY;
1169 ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl;
1170 } else if (r > 0) {
1171 state_after_send = STATE_CONNECTING_READY;
1172 state = STATE_WAIT_SEND;
1173 ldout(async_msgr->cct, 10) << __func__ << " continue send in_seq " << dendl;
1174 } else {
1175 goto fail;
1176 }
1177 break;
1178 }
1179
1180 case STATE_CONNECTING_READY:
1181 {
1182 // hooray!
1183 peer_global_seq = connect_reply.global_seq;
1184 policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1185 state = STATE_OPEN;
1186 once_ready = true;
1187 connect_seq += 1;
1188 assert(connect_seq == connect_reply.connect_seq);
1189 backoff = utime_t();
1190 set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features);
1191 ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq
1192 << ", lossy = " << policy.lossy << ", features "
1193 << get_features() << dendl;
1194
1195 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1196 // connection. PLR
1197 if (authorizer != NULL) {
1198 session_security.reset(
1199 get_auth_session_handler(async_msgr->cct,
1200 authorizer->protocol,
1201 authorizer->session_key,
1202 get_features()));
1203 } else {
1204 // We have no authorizer, so we shouldn't be applying security to messages in this AsyncConnection. PLR
1205 session_security.reset();
1206 }
1207
1208 if (delay_state)
1209 assert(delay_state->ready());
1210 dispatch_queue->queue_connect(this);
1211 async_msgr->ms_deliver_handle_fast_connect(this);
1212
1213 // make sure no pending tick timer
1214 if (last_tick_id)
1215 center->delete_time_event(last_tick_id);
1216 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
1217
1218 // message may in queue between last _try_send and connection ready
1219 // write event may already notify and we need to force scheduler again
1220 write_lock.lock();
1221 can_write = WriteStatus::CANWRITE;
1222 if (is_queued())
1223 center->dispatch_event_external(write_handler);
1224 write_lock.unlock();
1225 maybe_start_delay_thread();
1226 break;
1227 }
1228
1229 case STATE_ACCEPTING:
1230 {
1231 bufferlist bl;
1232 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
1233
1234 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1235
1236 ::encode(async_msgr->get_myaddr(), bl, 0); // legacy
1237 port = async_msgr->get_myaddr().get_port();
1238 ::encode(socket_addr, bl, 0); // legacy
1239 ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl;
1240
1241 r = try_send(bl);
1242 if (r == 0) {
1243 state = STATE_ACCEPTING_WAIT_BANNER_ADDR;
1244 ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: "
1245 << get_peer_addr() << dendl;
1246 } else if (r > 0) {
1247 state = STATE_WAIT_SEND;
1248 state_after_send = STATE_ACCEPTING_WAIT_BANNER_ADDR;
1249 ldout(async_msgr->cct, 10) << __func__ << " wait for write banner and addr: "
1250 << get_peer_addr() << dendl;
1251 } else {
1252 goto fail;
1253 }
1254
1255 break;
1256 }
1257 case STATE_ACCEPTING_WAIT_BANNER_ADDR:
1258 {
1259 bufferlist addr_bl;
1260 entity_addr_t peer_addr;
1261
1262 r = read_until(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr), state_buffer);
1263 if (r < 0) {
1264 ldout(async_msgr->cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1265 goto fail;
1266 } else if (r > 0) {
1267 break;
1268 }
1269
1270 if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1271 ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer
1272 << "' (should be '" << CEPH_BANNER << "')" << dendl;
1273 goto fail;
1274 }
1275
1276 addr_bl.append(state_buffer+strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1277 try {
1278 bufferlist::iterator ti = addr_bl.begin();
1279 ::decode(peer_addr, ti);
1280 } catch (const buffer::error& e) {
1281 lderr(async_msgr->cct) << __func__ << " decode peer_addr failed " << dendl;
1282 goto fail;
1283 }
1284
1285 ldout(async_msgr->cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1286 if (peer_addr.is_blank_ip()) {
1287 // peer apparently doesn't know what ip they have; figure it out for them.
1288 int port = peer_addr.get_port();
1289 peer_addr.u = socket_addr.u;
1290 peer_addr.set_port(port);
1291 ldout(async_msgr->cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1292 << " (socket is " << socket_addr << ")" << dendl;
1293 }
1294 set_peer_addr(peer_addr); // so that connection_state gets set up
1295 state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
1296 break;
1297 }
1298
1299 case STATE_ACCEPTING_WAIT_CONNECT_MSG:
1300 {
1301 r = read_until(sizeof(connect_msg), state_buffer);
1302 if (r < 0) {
1303 ldout(async_msgr->cct, 1) << __func__ << " read connect msg failed" << dendl;
1304 goto fail;
1305 } else if (r > 0) {
1306 break;
1307 }
1308
1309 connect_msg = *((ceph_msg_connect*)state_buffer);
1310 state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1311 break;
1312 }
1313
1314 case STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH:
1315 {
1316 bufferlist authorizer_reply;
1317
1318 if (connect_msg.authorizer_len) {
1319 if (!authorizer_buf.length())
1320 authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
1321
1322 r = read_until(connect_msg.authorizer_len, authorizer_buf.c_str());
1323 if (r < 0) {
1324 ldout(async_msgr->cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1325 goto fail;
1326 } else if (r > 0) {
1327 break;
1328 }
1329 }
1330
1331 ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq "
1332 << connect_msg.connect_seq << " global_seq "
1333 << connect_msg.global_seq << dendl;
1334 set_peer_type(connect_msg.host_type);
1335 policy = async_msgr->get_policy(connect_msg.host_type);
1336 ldout(async_msgr->cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1337 << ", policy.lossy=" << policy.lossy << " policy.server="
1338 << policy.server << " policy.standby=" << policy.standby
1339 << " policy.resetcheck=" << policy.resetcheck << dendl;
1340
1341 r = handle_connect_msg(connect_msg, authorizer_buf, authorizer_reply);
1342 if (r < 0)
1343 goto fail;
1344
1345 // state is changed by "handle_connect_msg"
1346 assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH);
1347 break;
1348 }
1349
1350 case STATE_ACCEPTING_WAIT_SEQ:
1351 {
1352 uint64_t newly_acked_seq;
1353 r = read_until(sizeof(newly_acked_seq), state_buffer);
1354 if (r < 0) {
1355 ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
1356 goto fail_registered;
1357 } else if (r > 0) {
1358 break;
1359 }
1360
1361 newly_acked_seq = *((uint64_t*)state_buffer);
1362 ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl;
1363 discard_requeued_up_to(newly_acked_seq);
1364 state = STATE_ACCEPTING_READY;
1365 break;
1366 }
1367
1368 case STATE_ACCEPTING_READY:
1369 {
1370 ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
1371 state = STATE_OPEN;
1372 memset(&connect_msg, 0, sizeof(connect_msg));
1373
1374 if (delay_state)
1375 assert(delay_state->ready());
1376 // make sure no pending tick timer
1377 if (last_tick_id)
1378 center->delete_time_event(last_tick_id);
1379 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
1380
1381 write_lock.lock();
1382 can_write = WriteStatus::CANWRITE;
1383 if (is_queued())
1384 center->dispatch_event_external(write_handler);
1385 write_lock.unlock();
1386 maybe_start_delay_thread();
1387 break;
1388 }
1389
1390 default:
1391 {
1392 lderr(async_msgr->cct) << __func__ << " bad state: " << state << dendl;
1393 ceph_abort();
1394 }
1395 }
1396
1397 return 0;
1398
1399 fail_registered:
1400 ldout(async_msgr->cct, 10) << "accept fault after register" << dendl;
1401 inject_delay();
1402
1403 fail:
1404 return -1;
1405 }
1406
1407 int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &reply)
1408 {
1409 uint64_t feat_missing;
1410 if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1411 ldout(async_msgr->cct, 0) << __func__ << " connect protocol feature mismatch, my "
1412 << std::hex << connect.features << " < peer "
1413 << reply.features << " missing "
1414 << (reply.features & ~policy.features_supported)
1415 << std::dec << dendl;
1416 goto fail;
1417 }
1418
1419 if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1420 ldout(async_msgr->cct, 0) << __func__ << " connect protocol version mismatch, my "
1421 << connect.protocol_version << " != " << reply.protocol_version
1422 << dendl;
1423 goto fail;
1424 }
1425
1426 if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1427 ldout(async_msgr->cct,0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1428 goto fail;
1429 }
1430 if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1431 ldout(async_msgr->cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1432 was_session_reset();
1433 // see was_session_reset
1434 outcoming_bl.clear();
1435 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1436 }
1437 if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1438 global_seq = async_msgr->get_global_seq(reply.global_seq);
1439 ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1440 << reply.global_seq << " chose new "
1441 << global_seq << dendl;
1442 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1443 }
1444 if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1445 assert(reply.connect_seq > connect_seq);
1446 ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_SESSION "
1447 << connect_seq << " -> "
1448 << reply.connect_seq << dendl;
1449 connect_seq = reply.connect_seq;
1450 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1451 }
1452 if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1453 ldout(async_msgr->cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1454 state = STATE_WAIT;
1455 }
1456
1457 feat_missing = policy.features_required & ~(uint64_t)connect_reply.features;
1458 if (feat_missing) {
1459 ldout(async_msgr->cct, 1) << __func__ << " missing required features " << std::hex
1460 << feat_missing << std::dec << dendl;
1461 goto fail;
1462 }
1463
1464 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1465 ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1466 state = STATE_CONNECTING_WAIT_ACK_SEQ;
1467 }
1468 if (reply.tag == CEPH_MSGR_TAG_READY) {
1469 ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1470 state = STATE_CONNECTING_READY;
1471 }
1472
1473 return 0;
1474
1475 fail:
1476 return -1;
1477 }
1478
1479 ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
1480 bufferlist &authorizer_reply)
1481 {
1482 ssize_t r = 0;
1483 ceph_msg_connect_reply reply;
1484 bufferlist reply_bl;
1485
1486 memset(&reply, 0, sizeof(reply));
1487 reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
1488
1489 // mismatch?
1490 ldout(async_msgr->cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1491 << ", their proto " << connect.protocol_version << dendl;
1492 if (connect.protocol_version != reply.protocol_version) {
1493 return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply);
1494 }
1495 // require signatures for cephx?
1496 if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
1497 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
1498 peer_type == CEPH_ENTITY_TYPE_MDS ||
1499 peer_type == CEPH_ENTITY_TYPE_MGR) {
1500 if (async_msgr->cct->_conf->cephx_require_signatures ||
1501 async_msgr->cct->_conf->cephx_cluster_require_signatures) {
1502 ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
1503 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1504 }
1505 if (async_msgr->cct->_conf->cephx_require_version >= 2 ||
1506 async_msgr->cct->_conf->cephx_cluster_require_version >= 2) {
1507 ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring cephx v2 feature bit for cluster" << dendl;
1508 policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
1509 }
1510 } else {
1511 if (async_msgr->cct->_conf->cephx_require_signatures ||
1512 async_msgr->cct->_conf->cephx_service_require_signatures) {
1513 ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl;
1514 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1515 }
1516 if (async_msgr->cct->_conf->cephx_require_version >= 2 ||
1517 async_msgr->cct->_conf->cephx_service_require_version >= 2) {
1518 ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring cephx v2 feature bit for service" << dendl;
1519 policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
1520 }
1521 }
1522 }
1523
1524 uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features;
1525 if (feat_missing) {
1526 ldout(async_msgr->cct, 1) << __func__ << " peer missing required features "
1527 << std::hex << feat_missing << std::dec << dendl;
1528 return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply);
1529 }
1530
1531 lock.unlock();
1532
1533 bool authorizer_valid;
1534 bool need_challenge = HAVE_FEATURE(connect.features, CEPHX_V2);
1535 bool had_challenge = (bool)authorizer_challenge;
1536 if (!async_msgr->verify_authorizer(
1537 this, peer_type, connect.authorizer_protocol, authorizer_bl,
1538 authorizer_reply, authorizer_valid, session_key,
1539 need_challenge ? &authorizer_challenge : nullptr) ||
1540 !authorizer_valid) {
1541 lock.lock();
1542 if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1543 ldout(async_msgr->cct, 1) << __func__
1544 << " state changed while verify_authorizer,"
1545 << " it must be mark_down"
1546 << dendl;
1547 ceph_assert(state == STATE_CLOSED);
1548 return -1;
1549 }
1550 char tag;
1551 if (need_challenge && !had_challenge && authorizer_challenge) {
1552 ldout(async_msgr->cct,10) << __func__ << ": challenging authorizer"
1553 << dendl;
1554 assert(authorizer_reply.length());
1555 tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
1556 } else {
1557 ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
1558 tag = CEPH_MSGR_TAG_BADAUTHORIZER;
1559 }
1560 session_security.reset();
1561 return _reply_accept(tag, connect, reply, authorizer_reply);
1562 }
1563
1564 // We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR
1565 ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl;
1566
1567 // existing?
1568 AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
1569
1570 inject_delay();
1571
1572 lock.lock();
1573 if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1574 ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl;
1575 assert(state == STATE_CLOSED);
1576 goto fail;
1577 }
1578
1579 if (existing == this)
1580 existing = NULL;
1581 if (existing) {
1582 // There is no possible that existing connection will acquire this
1583 // connection's lock
1584 existing->lock.lock(); // skip lockdep check (we are locking a second AsyncConnection here)
1585
1586 if (existing->state == STATE_CLOSED) {
1587 ldout(async_msgr->cct, 1) << __func__ << " existing already closed." << dendl;
1588 existing->lock.unlock();
1589 existing = NULL;
1590 goto open;
1591 }
1592
1593 if (existing->replacing) {
1594 ldout(async_msgr->cct, 1) << __func__ << " existing racing replace happened while replacing."
1595 << " existing_state=" << get_state_name(existing->state) << dendl;
1596 reply.global_seq = existing->peer_global_seq;
1597 r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
1598 existing->lock.unlock();
1599 if (r < 0)
1600 goto fail;
1601 return 0;
1602 }
1603
1604 if (connect.global_seq < existing->peer_global_seq) {
1605 ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
1606 << ".gseq " << existing->peer_global_seq << " > "
1607 << connect.global_seq << ", RETRY_GLOBAL" << dendl;
1608 reply.global_seq = existing->peer_global_seq; // so we can send it below..
1609 existing->lock.unlock();
1610 return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
1611 } else {
1612 ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
1613 << ".gseq " << existing->peer_global_seq
1614 << " <= " << connect.global_seq << ", looks ok" << dendl;
1615 }
1616
1617 if (existing->policy.lossy) {
1618 ldout(async_msgr->cct, 0) << __func__ << " accept replacing existing (lossy) channel (new one lossy="
1619 << policy.lossy << ")" << dendl;
1620 existing->was_session_reset();
1621 goto replace;
1622 }
1623
1624 ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq
1625 << " vs existing csq=" << existing->connect_seq << " existing_state="
1626 << get_state_name(existing->state) << dendl;
1627
1628 if (connect.connect_seq == 0 && existing->connect_seq > 0) {
1629 ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl;
1630 // this is a hard reset from peer
1631 is_reset_from_peer = true;
1632 if (policy.resetcheck)
1633 existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
1634 goto replace;
1635 }
1636
1637 if (connect.connect_seq < existing->connect_seq) {
1638 // old attempt, or we sent READY but they didn't get it.
1639 ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".cseq "
1640 << existing->connect_seq << " > " << connect.connect_seq
1641 << ", RETRY_SESSION" << dendl;
1642 reply.connect_seq = existing->connect_seq + 1;
1643 existing->lock.unlock();
1644 return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
1645 }
1646
1647 if (connect.connect_seq == existing->connect_seq) {
1648 // if the existing connection successfully opened, and/or
1649 // subsequently went to standby, then the peer should bump
1650 // their connect_seq and retry: this is not a connection race
1651 // we need to resolve here.
1652 if (existing->state == STATE_OPEN ||
1653 existing->state == STATE_STANDBY) {
1654 ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing
1655 << ".cseq " << existing->connect_seq << " == "
1656 << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
1657 reply.connect_seq = existing->connect_seq + 1;
1658 existing->lock.unlock();
1659 return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
1660 }
1661
1662 // connection race?
1663 if (peer_addr < async_msgr->get_myaddr() || existing->policy.server) {
1664 // incoming wins
1665 ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing
1666 << ".cseq " << existing->connect_seq << " == " << connect.connect_seq
1667 << ", or we are server, replacing my attempt" << dendl;
1668 goto replace;
1669 } else {
1670 // our existing outgoing wins
1671 ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing "
1672 << existing << ".cseq " << existing->connect_seq
1673 << " == " << connect.connect_seq << ", sending WAIT" << dendl;
1674 assert(peer_addr > async_msgr->get_myaddr());
1675 existing->lock.unlock();
1676 return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
1677 }
1678 }
1679
1680 assert(connect.connect_seq > existing->connect_seq);
1681 assert(connect.global_seq >= existing->peer_global_seq);
1682 if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
1683 existing->connect_seq == 0) {
1684 ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
1685 << connect.connect_seq << ", " << existing << ".cseq = "
1686 << existing->connect_seq << "), sending RESETSESSION" << dendl;
1687 existing->lock.unlock();
1688 return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
1689 }
1690
1691 // reconnect
1692 ldout(async_msgr->cct, 10) << __func__ << " accept peer sent cseq " << connect.connect_seq
1693 << " > " << existing->connect_seq << dendl;
1694 goto replace;
1695 } // existing
1696 else if (!replacing && connect.connect_seq > 0) {
1697 // we reset, and they are opening a new session
1698 ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
1699 << connect.connect_seq << "), sending RESETSESSION" << dendl;
1700 return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
1701 } else {
1702 // new session
1703 ldout(async_msgr->cct, 10) << __func__ << " accept new session" << dendl;
1704 existing = NULL;
1705 goto open;
1706 }
1707 ceph_abort();
1708
1709 replace:
1710 ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
1711
1712 inject_delay();
1713 if (existing->policy.lossy) {
1714 // disconnect from the Connection
1715 ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl;
1716 existing->_stop();
1717 existing->dispatch_queue->queue_reset(existing.get());
1718 } else {
1719 assert(can_write == WriteStatus::NOWRITE);
1720 existing->write_lock.lock();
1721
1722 // reset the in_seq if this is a hard reset from peer,
1723 // otherwise we respect our original connection's value
1724 if (is_reset_from_peer) {
1725 existing->is_reset_from_peer = true;
1726 }
1727
1728 center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
1729
1730 if (existing->delay_state) {
1731 existing->delay_state->flush();
1732 assert(!delay_state);
1733 }
1734 existing->reset_recv_state();
1735
1736 auto temp_cs = std::move(cs);
1737 EventCenter *new_center = center;
1738 Worker *new_worker = worker;
1739 // avoid _stop shutdown replacing socket
1740 // queue a reset on the new connection, which we're dumping for the old
1741 _stop();
1742
1743 dispatch_queue->queue_reset(this);
1744 ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
1745 existing->can_write = WriteStatus::REPLACING;
1746 existing->replacing = true;
1747 existing->state_offset = 0;
1748 // avoid previous thread modify event
1749 existing->state = STATE_NONE;
1750 // Discard existing prefetch buffer in `recv_buf`
1751 existing->recv_start = existing->recv_end = 0;
1752 // there shouldn't exist any buffer
1753 assert(recv_start == recv_end);
1754
1755 existing->authorizer_challenge.reset();
1756
1757 auto deactivate_existing = std::bind(
1758 [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable {
1759 // we need to delete time event in original thread
1760 {
1761 std::lock_guard<std::mutex> l(existing->lock);
1762 existing->write_lock.lock();
1763 existing->requeue_sent();
1764 existing->outcoming_bl.clear();
1765 existing->open_write = false;
1766 existing->write_lock.unlock();
1767 if (existing->state == STATE_NONE) {
1768 existing->shutdown_socket();
1769 existing->cs = std::move(cs);
1770 existing->worker->references--;
1771 new_worker->references++;
1772 existing->logger = new_worker->get_perf_counter();
1773 existing->worker = new_worker;
1774 existing->center = new_center;
1775 if (existing->delay_state)
1776 existing->delay_state->set_center(new_center);
1777 } else if (existing->state == STATE_CLOSED) {
1778 auto back_to_close = std::bind(
1779 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
1780 new_center->submit_to(
1781 new_center->get_id(), std::move(back_to_close), true);
1782 return ;
1783 } else {
1784 ceph_abort();
1785 }
1786 }
1787
1788 // Before changing existing->center, it may already exists some events in existing->center's queue.
1789 // Then if we mark down `existing`, it will execute in another thread and clean up connection.
1790 // Previous event will result in segment fault
1791 auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable {
1792 std::lock_guard<std::mutex> l(existing->lock);
1793 if (existing->state == STATE_CLOSED)
1794 return ;
1795 assert(existing->state == STATE_NONE);
1796
1797 existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
1798 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler);
1799 reply.global_seq = existing->peer_global_seq;
1800 if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
1801 // handle error
1802 existing->fault();
1803 }
1804 };
1805 if (existing->center->in_thread())
1806 transfer_existing();
1807 else
1808 existing->center->submit_to(
1809 existing->center->get_id(), std::move(transfer_existing), true);
1810 }, std::move(temp_cs));
1811
1812 existing->center->submit_to(
1813 existing->center->get_id(), std::move(deactivate_existing), true);
1814 existing->write_lock.unlock();
1815 existing->lock.unlock();
1816 return 0;
1817 }
1818 existing->lock.unlock();
1819
1820 open:
1821 connect_seq = connect.connect_seq + 1;
1822 peer_global_seq = connect.global_seq;
1823 ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
1824 << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
1825
1826 int next_state;
1827
1828 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
1829 if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
1830 reply.tag = CEPH_MSGR_TAG_SEQ;
1831 next_state = STATE_ACCEPTING_WAIT_SEQ;
1832 } else {
1833 reply.tag = CEPH_MSGR_TAG_READY;
1834 next_state = STATE_ACCEPTING_READY;
1835 discard_requeued_up_to(0);
1836 is_reset_from_peer = false;
1837 in_seq = 0;
1838 }
1839
1840 // send READY reply
1841 reply.features = policy.features_supported;
1842 reply.global_seq = async_msgr->get_global_seq();
1843 reply.connect_seq = connect_seq;
1844 reply.flags = 0;
1845 reply.authorizer_len = authorizer_reply.length();
1846 if (policy.lossy)
1847 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
1848
1849 set_features((uint64_t)reply.features & (uint64_t)connect.features);
1850 ldout(async_msgr->cct, 10) << __func__ << " accept features " << get_features() << dendl;
1851
1852 session_security.reset(
1853 get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol,
1854 session_key, get_features()));
1855
1856 reply_bl.append((char*)&reply, sizeof(reply));
1857
1858 if (reply.authorizer_len)
1859 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
1860
1861 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1862 uint64_t s = in_seq;
1863 reply_bl.append((char*)&s, sizeof(s));
1864 }
1865
1866 lock.unlock();
1867 // Because "replacing" will prevent other connections preempt this addr,
1868 // it's safe that here we don't acquire Connection's lock
1869 r = async_msgr->accept_conn(this);
1870
1871 inject_delay();
1872
1873 lock.lock();
1874 replacing = false;
1875 if (r < 0) {
1876 ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
1877 << " just fail later one(this)" << dendl;
1878 goto fail_registered;
1879 }
1880 if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1881 ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
1882 assert(state == STATE_CLOSED || state == STATE_NONE);
1883 async_msgr->unregister_conn(this);
1884 goto fail_registered;
1885 }
1886
1887 r = try_send(reply_bl);
1888 if (r < 0)
1889 goto fail_registered;
1890
1891 // notify
1892 dispatch_queue->queue_accept(this);
1893 async_msgr->ms_deliver_handle_fast_accept(this);
1894 once_ready = true;
1895
1896 if (r == 0) {
1897 state = next_state;
1898 ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl;
1899 } else {
1900 state = STATE_WAIT_SEND;
1901 state_after_send = next_state;
1902 }
1903
1904 return 0;
1905
1906 fail_registered:
1907 ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl;
1908 inject_delay();
1909
1910 fail:
1911 ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl;
1912 return -1;
1913 }
1914
1915 void AsyncConnection::_connect()
1916 {
1917 ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl;
1918
1919 state = STATE_CONNECTING;
1920 // rescheduler connection in order to avoid lock dep
1921 // may called by external thread(send_message)
1922 center->dispatch_event_external(read_handler);
1923 }
1924
1925 void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
1926 {
1927 ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl;
1928 assert(socket.fd() >= 0);
1929
1930 std::lock_guard<std::mutex> l(lock);
1931 cs = std::move(socket);
1932 socket_addr = addr;
1933 state = STATE_ACCEPTING;
1934 // rescheduler connection in order to avoid lock dep
1935 center->dispatch_event_external(read_handler);
1936 }
1937
1938 int AsyncConnection::send_message(Message *m)
1939 {
1940 FUNCTRACE();
1941 lgeneric_subdout(async_msgr->cct, ms,
1942 1) << "-- " << async_msgr->get_myaddr() << " --> "
1943 << get_peer_addr() << " -- "
1944 << *m << " -- " << m << " con "
1945 << m->get_connection().get()
1946 << dendl;
1947
1948 // optimistic think it's ok to encode(actually may broken now)
1949 if (!m->get_priority())
1950 m->set_priority(async_msgr->get_default_send_priority());
1951
1952 m->get_header().src = async_msgr->get_myname();
1953 m->set_connection(this);
1954
1955 if (m->get_type() == CEPH_MSG_OSD_OP)
1956 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
1957 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1958 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
1959
1960 if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
1961 ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
1962 std::lock_guard<std::mutex> l(write_lock);
1963 if (can_write != WriteStatus::CLOSED) {
1964 dispatch_queue->local_delivery(m, m->get_priority());
1965 } else {
1966 ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
1967 << " Drop message " << m << dendl;
1968 m->put();
1969 }
1970 return 0;
1971 }
1972
1973 last_active = ceph::coarse_mono_clock::now();
1974 // we don't want to consider local message here, it's too lightweight which
1975 // may disturb users
1976 logger->inc(l_msgr_send_messages);
1977
1978 bufferlist bl;
1979 uint64_t f = get_features();
1980
1981 // TODO: Currently not all messages supports reencode like MOSDMap, so here
1982 // only let fast dispatch support messages prepare message
1983 bool can_fast_prepare = async_msgr->ms_can_fast_dispatch(m);
1984 if (can_fast_prepare)
1985 prepare_send_message(f, m, bl);
1986
1987 std::lock_guard<std::mutex> l(write_lock);
1988 // "features" changes will change the payload encoding
1989 if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) {
1990 // ensure the correctness of message encoding
1991 bl.clear();
1992 m->get_payload().clear();
1993 ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous "
1994 << f << " != " << get_features() << dendl;
1995 }
1996 if (can_write == WriteStatus::CLOSED) {
1997 ldout(async_msgr->cct, 10) << __func__ << " connection closed."
1998 << " Drop message " << m << dendl;
1999 m->put();
2000 } else {
2001 m->trace.event("async enqueueing message");
2002 out_q[m->get_priority()].emplace_back(std::move(bl), m);
2003 ldout(async_msgr->cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl;
2004 if (can_write != WriteStatus::REPLACING)
2005 center->dispatch_event_external(write_handler);
2006 }
2007 return 0;
2008 }
2009
2010 void AsyncConnection::requeue_sent()
2011 {
2012 if (sent.empty())
2013 return;
2014
2015 list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
2016 while (!sent.empty()) {
2017 Message* m = sent.back();
2018 sent.pop_back();
2019 ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
2020 << " (" << m->get_seq() << ")" << dendl;
2021 rq.push_front(make_pair(bufferlist(), m));
2022 out_seq--;
2023 }
2024 }
2025
2026 void AsyncConnection::discard_requeued_up_to(uint64_t seq)
2027 {
2028 ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl;
2029 std::lock_guard<std::mutex> l(write_lock);
2030 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
2031 return;
2032 list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
2033 while (!rq.empty()) {
2034 pair<bufferlist, Message*> p = rq.front();
2035 if (p.second->get_seq() == 0 || p.second->get_seq() > seq)
2036 break;
2037 ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq()
2038 << " <= " << seq << ", discarding" << dendl;
2039 p.second->put();
2040 rq.pop_front();
2041 out_seq++;
2042 }
2043 if (rq.empty())
2044 out_q.erase(CEPH_MSG_PRIO_HIGHEST);
2045 }
2046
2047 /*
2048 * Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue
2049 * Must hold write_lock prior to calling.
2050 */
2051 void AsyncConnection::discard_out_queue()
2052 {
2053 ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
2054
2055 for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
2056 ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
2057 (*p)->put();
2058 }
2059 sent.clear();
2060 for (map<int, list<pair<bufferlist, Message*> > >::iterator p = out_q.begin(); p != out_q.end(); ++p)
2061 for (list<pair<bufferlist, Message*> >::iterator r = p->second.begin(); r != p->second.end(); ++r) {
2062 ldout(async_msgr->cct, 20) << __func__ << " discard " << r->second << dendl;
2063 r->second->put();
2064 }
2065 out_q.clear();
2066 }
2067
2068 int AsyncConnection::randomize_out_seq()
2069 {
2070 if (get_features() & CEPH_FEATURE_MSG_AUTH) {
2071 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
2072 // here. We'll check it on the call. PLR
2073 uint64_t rand_seq;
2074 int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
2075 rand_seq &= SEQ_MASK;
2076 lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
2077 out_seq = rand_seq;
2078 return seq_error;
2079 } else {
2080 // previously, seq #'s always started at 0.
2081 out_seq = 0;
2082 return 0;
2083 }
2084 }
2085
2086 void AsyncConnection::fault()
2087 {
2088 if (state == STATE_CLOSED || state == STATE_NONE) {
2089 ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl;
2090 return ;
2091 }
2092
2093 if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
2094 ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
2095 _stop();
2096 dispatch_queue->queue_reset(this);
2097 return ;
2098 }
2099
2100 write_lock.lock();
2101 can_write = WriteStatus::NOWRITE;
2102 shutdown_socket();
2103 open_write = false;
2104
2105 // queue delayed items immediately
2106 if (delay_state)
2107 delay_state->flush();
2108 // requeue sent items
2109 requeue_sent();
2110 recv_start = recv_end = 0;
2111 state_offset = 0;
2112 replacing = false;
2113 is_reset_from_peer = false;
2114 outcoming_bl.clear();
2115 if (!once_ready && !is_queued() &&
2116 state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2117 ldout(async_msgr->cct, 10) << __func__ << " with nothing to send and in the half "
2118 << " accept state just closed" << dendl;
2119 write_lock.unlock();
2120 _stop();
2121 dispatch_queue->queue_reset(this);
2122 return ;
2123 }
2124 reset_recv_state();
2125 if (policy.standby && !is_queued() && state != STATE_WAIT) {
2126 ldout(async_msgr->cct, 10) << __func__ << " with nothing to send, going to standby" << dendl;
2127 state = STATE_STANDBY;
2128 write_lock.unlock();
2129 return;
2130 }
2131
2132 write_lock.unlock();
2133 if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY) &&
2134 state != STATE_WAIT) { // STATE_WAIT is coming from STATE_CONNECTING_*
2135 // policy maybe empty when state is in accept
2136 if (policy.server) {
2137 ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl;
2138 state = STATE_STANDBY;
2139 } else {
2140 ldout(async_msgr->cct, 0) << __func__ << " initiating reconnect" << dendl;
2141 connect_seq++;
2142 state = STATE_CONNECTING;
2143 }
2144 backoff = utime_t();
2145 center->dispatch_event_external(read_handler);
2146 } else {
2147 if (state == STATE_WAIT) {
2148 backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
2149 } else if (backoff == utime_t()) {
2150 backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff);
2151 } else {
2152 backoff += backoff;
2153 if (backoff > async_msgr->cct->_conf->ms_max_backoff)
2154 backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
2155 }
2156
2157 state = STATE_CONNECTING;
2158 ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
2159 // woke up again;
2160 register_time_events.insert(center->create_time_event(
2161 backoff.to_nsec()/1000, wakeup_handler));
2162 }
2163 }
2164
2165 void AsyncConnection::was_session_reset()
2166 {
2167 ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
2168 std::lock_guard<std::mutex> l(write_lock);
2169 if (delay_state)
2170 delay_state->discard();
2171 dispatch_queue->discard_queue(conn_id);
2172 discard_out_queue();
2173 // note: we need to clear outcoming_bl here, but was_session_reset may be
2174 // called by other thread, so let caller clear this itself!
2175 // outcoming_bl.clear();
2176
2177 dispatch_queue->queue_remote_reset(this);
2178
2179 if (randomize_out_seq()) {
2180 ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
2181 }
2182
2183 in_seq = 0;
2184 connect_seq = 0;
2185 // it's safe to directly set 0, double locked
2186 ack_left = 0;
2187 once_ready = false;
2188 can_write = WriteStatus::NOWRITE;
2189 }
2190
2191 void AsyncConnection::_stop()
2192 {
2193 if (state == STATE_CLOSED)
2194 return ;
2195
2196 if (delay_state)
2197 delay_state->flush();
2198
2199 ldout(async_msgr->cct, 2) << __func__ << dendl;
2200 std::lock_guard<std::mutex> l(write_lock);
2201
2202 reset_recv_state();
2203 dispatch_queue->discard_queue(conn_id);
2204 discard_out_queue();
2205 async_msgr->unregister_conn(this);
2206 worker->release_worker();
2207
2208 state = STATE_CLOSED;
2209 open_write = false;
2210 can_write = WriteStatus::CLOSED;
2211 state_offset = 0;
2212 // Make sure in-queue events will been processed
2213 center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
2214 }
2215
2216 void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
2217 {
2218 ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
2219
2220 // associate message with Connection (for benefit of encode_payload)
2221 if (m->empty_payload())
2222 ldout(async_msgr->cct, 20) << __func__ << " encoding features "
2223 << features << " " << m << " " << *m << dendl;
2224 else
2225 ldout(async_msgr->cct, 20) << __func__ << " half-reencoding features "
2226 << features << " " << m << " " << *m << dendl;
2227
2228 // encode and copy out of *m
2229 m->encode(features, msgr->crcflags);
2230
2231 bl.append(m->get_payload());
2232 bl.append(m->get_middle());
2233 bl.append(m->get_data());
2234 }
2235
2236 ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
2237 {
2238 FUNCTRACE();
2239 assert(center->in_thread());
2240 m->set_seq(++out_seq);
2241
2242 if (msgr->crcflags & MSG_CRC_HEADER)
2243 m->calc_header_crc();
2244
2245 ceph_msg_header& header = m->get_header();
2246 ceph_msg_footer& footer = m->get_footer();
2247
2248 // TODO: let sign_message could be reentry?
2249 // Now that we have all the crcs calculated, handle the
2250 // digital signature for the message, if the AsyncConnection has session
2251 // security set up. Some session security options do not
2252 // actually calculate and check the signature, but they should
2253 // handle the calls to sign_message and check_signature. PLR
2254 if (session_security.get() == NULL) {
2255 ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl;
2256 } else {
2257 if (session_security->sign_message(m)) {
2258 ldout(async_msgr->cct, 20) << __func__ << " failed to sign m="
2259 << m << "): sig = " << footer.sig << dendl;
2260 } else {
2261 ldout(async_msgr->cct, 20) << __func__ << " signed m=" << m
2262 << "): sig = " << footer.sig << dendl;
2263 }
2264 }
2265
2266 unsigned original_bl_len = outcoming_bl.length();
2267
2268 outcoming_bl.append(CEPH_MSGR_TAG_MSG);
2269
2270 if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
2271 outcoming_bl.append((char*)&header, sizeof(header));
2272 } else {
2273 ceph_msg_header_old oldheader;
2274 memcpy(&oldheader, &header, sizeof(header));
2275 oldheader.src.name = header.src;
2276 oldheader.src.addr = get_peer_addr();
2277 oldheader.orig_src = oldheader.src;
2278 oldheader.reserved = header.reserved;
2279 oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
2280 sizeof(oldheader) - sizeof(oldheader.crc));
2281 outcoming_bl.append((char*)&oldheader, sizeof(oldheader));
2282 }
2283
2284 ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type
2285 << " src " << entity_name_t(header.src)
2286 << " front=" << header.front_len
2287 << " data=" << header.data_len
2288 << " off " << header.data_off << dendl;
2289
2290 if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
2291 for (const auto &pb : bl.buffers()) {
2292 outcoming_bl.append((char*)pb.c_str(), pb.length());
2293 }
2294 } else {
2295 outcoming_bl.claim_append(bl);
2296 }
2297
2298 // send footer; if receiver doesn't support signatures, use the old footer format
2299 ceph_msg_footer_old old_footer;
2300 if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
2301 outcoming_bl.append((char*)&footer, sizeof(footer));
2302 } else {
2303 if (msgr->crcflags & MSG_CRC_HEADER) {
2304 old_footer.front_crc = footer.front_crc;
2305 old_footer.middle_crc = footer.middle_crc;
2306 old_footer.data_crc = footer.data_crc;
2307 } else {
2308 old_footer.front_crc = old_footer.middle_crc = 0;
2309 }
2310 old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2311 old_footer.flags = footer.flags;
2312 outcoming_bl.append((char*)&old_footer, sizeof(old_footer));
2313 }
2314
2315 m->trace.event("async writing message");
2316 ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
2317 << " " << m << dendl;
2318 ssize_t total_send_size = outcoming_bl.length();
2319 ssize_t rc = _try_send(more);
2320 if (rc < 0) {
2321 ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
2322 << cpp_strerror(rc) << dendl;
2323 } else if (rc == 0) {
2324 logger->inc(l_msgr_send_bytes, total_send_size - original_bl_len);
2325 ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
2326 } else {
2327 logger->inc(l_msgr_send_bytes, total_send_size - outcoming_bl.length());
2328 ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
2329 }
2330 if (m->get_type() == CEPH_MSG_OSD_OP)
2331 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
2332 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
2333 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
2334 m->put();
2335
2336 return rc;
2337 }
2338
2339 void AsyncConnection::reset_recv_state()
2340 {
2341 // clean up state internal variables and states
2342 if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
2343 state <= STATE_CONNECTING_READY) {
2344 delete authorizer;
2345 authorizer = NULL;
2346 }
2347
2348 if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE &&
2349 state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
2350 && policy.throttler_messages) {
2351 ldout(async_msgr->cct, 10) << __func__ << " releasing " << 1
2352 << " message to policy throttler "
2353 << policy.throttler_messages->get_current() << "/"
2354 << policy.throttler_messages->get_max() << dendl;
2355 policy.throttler_messages->put();
2356 }
2357 if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES &&
2358 state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
2359 if (policy.throttler_bytes) {
2360 ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
2361 << " bytes to policy throttler "
2362 << policy.throttler_bytes->get_current() << "/"
2363 << policy.throttler_bytes->get_max() << dendl;
2364 policy.throttler_bytes->put(cur_msg_size);
2365 }
2366 }
2367 if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
2368 state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
2369 ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
2370 << " bytes to dispatch_queue throttler "
2371 << dispatch_queue->dispatch_throttler.get_current() << "/"
2372 << dispatch_queue->dispatch_throttler.get_max() << dendl;
2373 dispatch_queue->dispatch_throttle_release(cur_msg_size);
2374 }
2375 }
2376
2377 void AsyncConnection::handle_ack(uint64_t seq)
2378 {
2379 ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
2380 // trim sent list
2381 std::lock_guard<std::mutex> l(write_lock);
2382 while (!sent.empty() && sent.front()->get_seq() <= seq) {
2383 Message* m = sent.front();
2384 sent.pop_front();
2385 ldout(async_msgr->cct, 10) << __func__ << " got ack seq "
2386 << seq << " >= " << m->get_seq() << " on "
2387 << m << " " << *m << dendl;
2388 m->put();
2389 }
2390 }
2391
2392 void AsyncConnection::DelayedDelivery::do_request(int id)
2393 {
2394 Message *m = nullptr;
2395 {
2396 std::lock_guard<std::mutex> l(delay_lock);
2397 register_time_events.erase(id);
2398 if (stop_dispatch)
2399 return ;
2400 if (delay_queue.empty())
2401 return ;
2402 utime_t release = delay_queue.front().first;
2403 m = delay_queue.front().second;
2404 string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type;
2405 utime_t now = ceph_clock_now();
2406 if ((release > now &&
2407 (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
2408 utime_t t = release - now;
2409 t.sleep();
2410 }
2411 delay_queue.pop_front();
2412 }
2413 if (msgr->ms_can_fast_dispatch(m)) {
2414 dispatch_queue->fast_dispatch(m);
2415 } else {
2416 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
2417 }
2418 }
2419
2420 void AsyncConnection::DelayedDelivery::flush() {
2421 stop_dispatch = true;
2422 center->submit_to(
2423 center->get_id(), [this] () mutable {
2424 std::lock_guard<std::mutex> l(delay_lock);
2425 while (!delay_queue.empty()) {
2426 Message *m = delay_queue.front().second;
2427 if (msgr->ms_can_fast_dispatch(m)) {
2428 dispatch_queue->fast_dispatch(m);
2429 } else {
2430 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
2431 }
2432 delay_queue.pop_front();
2433 }
2434 for (auto i : register_time_events)
2435 center->delete_time_event(i);
2436 register_time_events.clear();
2437 stop_dispatch = false;
2438 }, true);
2439 }
2440
2441 void AsyncConnection::send_keepalive()
2442 {
2443 ldout(async_msgr->cct, 10) << __func__ << dendl;
2444 std::lock_guard<std::mutex> l(write_lock);
2445 if (can_write != WriteStatus::CLOSED) {
2446 keepalive = true;
2447 center->dispatch_event_external(write_handler);
2448 }
2449 }
2450
2451 void AsyncConnection::mark_down()
2452 {
2453 ldout(async_msgr->cct, 1) << __func__ << dendl;
2454 std::lock_guard<std::mutex> l(lock);
2455 _stop();
2456 }
2457
2458 void AsyncConnection::_append_keepalive_or_ack(bool ack, utime_t *tp)
2459 {
2460 ldout(async_msgr->cct, 10) << __func__ << dendl;
2461 if (ack) {
2462 assert(tp);
2463 struct ceph_timespec ts;
2464 tp->encode_timeval(&ts);
2465 outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
2466 outcoming_bl.append((char*)&ts, sizeof(ts));
2467 } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
2468 struct ceph_timespec ts;
2469 utime_t t = ceph_clock_now();
2470 t.encode_timeval(&ts);
2471 outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
2472 outcoming_bl.append((char*)&ts, sizeof(ts));
2473 } else {
2474 outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
2475 }
2476 }
2477
2478 void AsyncConnection::handle_write()
2479 {
2480 ldout(async_msgr->cct, 10) << __func__ << dendl;
2481 ssize_t r = 0;
2482
2483 write_lock.lock();
2484 if (can_write == WriteStatus::CANWRITE) {
2485 if (keepalive) {
2486 _append_keepalive_or_ack();
2487 keepalive = false;
2488 }
2489
2490 auto start = ceph::mono_clock::now();
2491 bool more;
2492 do {
2493 bufferlist data;
2494 Message *m = _get_next_outgoing(&data);
2495 if (!m)
2496 break;
2497
2498 if (!policy.lossy) {
2499 // put on sent list
2500 sent.push_back(m);
2501 m->get();
2502 }
2503 more = _has_next_outgoing();
2504 write_lock.unlock();
2505
2506 // send_message or requeue messages may not encode message
2507 if (!data.length())
2508 prepare_send_message(get_features(), m, data);
2509
2510 r = write_message(m, data, more);
2511 if (r < 0) {
2512 ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
2513 goto fail;
2514 }
2515 write_lock.lock();
2516 if (r > 0)
2517 break;
2518 } while (can_write == WriteStatus::CANWRITE);
2519 write_lock.unlock();
2520
2521 uint64_t left = ack_left;
2522 if (left) {
2523 ceph_le64 s;
2524 s = in_seq;
2525 outcoming_bl.append(CEPH_MSGR_TAG_ACK);
2526 outcoming_bl.append((char*)&s, sizeof(s));
2527 ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
2528 ack_left -= left;
2529 left = ack_left;
2530 r = _try_send(left);
2531 } else if (is_queued()) {
2532 r = _try_send();
2533 }
2534
2535 logger->tinc(l_msgr_running_send_time, ceph::mono_clock::now() - start);
2536 if (r < 0) {
2537 ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
2538 goto fail;
2539 }
2540 } else {
2541 write_lock.unlock();
2542 lock.lock();
2543 write_lock.lock();
2544 if (state == STATE_STANDBY && !policy.server && is_queued()) {
2545 ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
2546 _connect();
2547 } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
2548 r = _try_send();
2549 if (r < 0) {
2550 ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
2551 write_lock.unlock();
2552 fault();
2553 lock.unlock();
2554 return ;
2555 }
2556 }
2557 write_lock.unlock();
2558 lock.unlock();
2559 }
2560
2561 return ;
2562
2563 fail:
2564 lock.lock();
2565 fault();
2566 lock.unlock();
2567 }
2568
2569 void AsyncConnection::wakeup_from(uint64_t id)
2570 {
2571 lock.lock();
2572 register_time_events.erase(id);
2573 lock.unlock();
2574 process();
2575 }
2576
2577 void AsyncConnection::tick(uint64_t id)
2578 {
2579 auto now = ceph::coarse_mono_clock::now();
2580 ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
2581 << " last_active" << last_active << dendl;
2582 std::lock_guard<std::mutex> l(lock);
2583 last_tick_id = 0;
2584 auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
2585 if (inactive_timeout_us < (uint64_t)idle_period) {
2586 ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
2587 << inactive_timeout_us
2588 << " us, mark self fault." << dendl;
2589 fault();
2590 } else if (is_connected()) {
2591 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
2592 }
2593 }