]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/AsyncConnection.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / msg / async / AsyncConnection.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <unistd.h>
18
19#include "include/Context.h"
20#include "common/errno.h"
21#include "AsyncMessenger.h"
22#include "AsyncConnection.h"
23
24#include "messages/MOSDOp.h"
25#include "messages/MOSDOpReply.h"
26#include "common/EventTrace.h"
27
28// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
29#define SEQ_MASK 0x7fffffff
30
31#define dout_subsys ceph_subsys_ms
32#undef dout_prefix
33#define dout_prefix _conn_prefix(_dout)
34ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
35 return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
36 << " :" << port
37 << " s=" << get_state_name(state)
38 << " pgs=" << peer_global_seq
39 << " cs=" << connect_seq
40 << " l=" << policy.lossy
41 << ").";
42}
43
44// Notes:
45// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
46
47const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
48const int ASYNC_COALESCE_THRESHOLD = 256;
49
50class C_time_wakeup : public EventCallback {
51 AsyncConnectionRef conn;
52
53 public:
54 explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {}
55 void do_request(int fd_or_id) override {
56 conn->wakeup_from(fd_or_id);
57 }
58};
59
60class C_handle_read : public EventCallback {
61 AsyncConnectionRef conn;
62
63 public:
64 explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
65 void do_request(int fd_or_id) override {
66 conn->process();
67 }
68};
69
70class C_handle_write : public EventCallback {
71 AsyncConnectionRef conn;
72
73 public:
74 explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
75 void do_request(int fd) override {
76 conn->handle_write();
77 }
78};
79
80class C_clean_handler : public EventCallback {
81 AsyncConnectionRef conn;
82 public:
83 explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
84 void do_request(int id) override {
85 conn->cleanup();
86 delete this;
87 }
88};
89
90class C_tick_wakeup : public EventCallback {
91 AsyncConnectionRef conn;
92
93 public:
94 explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
95 void do_request(int fd_or_id) override {
96 conn->tick(fd_or_id);
97 }
98};
99
100static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
101{
102 // create a buffer to read into that matches the data alignment
103 unsigned left = len;
104 if (off & ~CEPH_PAGE_MASK) {
105 // head
106 unsigned head = 0;
107 head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
108 data.push_back(buffer::create(head));
109 left -= head;
110 }
111 unsigned middle = left & CEPH_PAGE_MASK;
112 if (middle > 0) {
113 data.push_back(buffer::create_page_aligned(middle));
114 left -= middle;
115 }
116 if (left) {
117 data.push_back(buffer::create(left));
118 }
119}
120
121AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
122 Worker *w)
123 : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
124 logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
125 out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
126 dispatch_queue(q), can_write(WriteStatus::NOWRITE),
127 open_write(false), keepalive(false), recv_buf(NULL),
128 recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
129 recv_start(0), recv_end(0),
130 last_active(ceph::coarse_mono_clock::now()),
131 inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
132 got_bad_auth(false), authorizer(NULL), replacing(false),
133 is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0),
134 worker(w), center(&w->center)
135{
136 read_handler = new C_handle_read(this);
137 write_handler = new C_handle_write(this);
138 wakeup_handler = new C_time_wakeup(this);
139 tick_handler = new C_tick_wakeup(this);
140 memset(msgvec, 0, sizeof(msgvec));
141 // double recv_max_prefetch see "read_until"
142 recv_buf = new char[2*recv_max_prefetch];
143 state_buffer = new char[4096];
144 logger->inc(l_msgr_created_connections);
145}
146
147AsyncConnection::~AsyncConnection()
148{
149 assert(out_q.empty());
150 assert(sent.empty());
151 delete authorizer;
152 if (recv_buf)
153 delete[] recv_buf;
154 if (state_buffer)
155 delete[] state_buffer;
156 assert(!delay_state);
157}
158
159void AsyncConnection::maybe_start_delay_thread()
160{
161 if (!delay_state) {
162 auto pos = async_msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(peer_type));
163 if (pos != string::npos) {
164 ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl;
165 delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id);
166 }
167 }
168}
169
170/* return -1 means `fd` occurs error or closed, it should be closed
171 * return 0 means EAGAIN or EINTR */
172ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
173{
174 ssize_t nread;
175 again:
176 nread = cs.read(buf, len);
177 if (nread < 0) {
178 if (nread == -EAGAIN) {
179 nread = 0;
180 } else if (nread == -EINTR) {
181 goto again;
182 } else {
183 ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
184 << " : "<< strerror(nread) << dendl;
185 return -1;
186 }
187 } else if (nread == 0) {
188 ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
189 << cs.fd() << dendl;
190 return -1;
191 }
192 return nread;
193}
194
195// return the remaining bytes, it may larger than the length of ptr
196// else return < 0 means error
197ssize_t AsyncConnection::_try_send(bool more)
198{
199 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
200 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
201 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
202 cs.shutdown();
203 }
204 }
205
206 ssize_t r = cs.send(outcoming_bl, more);
207 if (r < 0) {
208 ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
209 return r;
210 }
211
212 ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
213 << " remaining bytes " << outcoming_bl.length() << dendl;
214
215 if (!open_write && is_queued()) {
216 if (center->in_thread()) {
217 center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
218 open_write = true;
219 } else {
220 center->dispatch_event_external(write_handler);
221 }
222 }
223
224 if (open_write && !is_queued()) {
225 if (center->in_thread()) {
226 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
227 open_write = false;
228 } else {
229 center->dispatch_event_external(write_handler);
230 }
231 if (state_after_send != STATE_NONE)
232 center->dispatch_event_external(read_handler);
233 }
234
235 return outcoming_bl.length();
236}
237
238// Because this func will be called multi times to populate
239// the needed buffer, so the passed in bufferptr must be the same.
240// Normally, only "read_message" will pass existing bufferptr in
241//
242// And it will uses readahead method to reduce small read overhead,
243// "recv_buf" is used to store read buffer
244//
245// return the remaining bytes, 0 means this buffer is finished
246// else return < 0 means error
247ssize_t AsyncConnection::read_until(unsigned len, char *p)
248{
249 ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
250 << state_offset << dendl;
251
252 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
253 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
254 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
255 cs.shutdown();
256 }
257 }
258
259 ssize_t r = 0;
260 uint64_t left = len - state_offset;
261 if (recv_end > recv_start) {
262 uint64_t to_read = MIN(recv_end - recv_start, left);
263 memcpy(p, recv_buf+recv_start, to_read);
264 recv_start += to_read;
265 left -= to_read;
266 ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer "
267 << " left is " << left << " buffer still has "
268 << recv_end - recv_start << dendl;
269 if (left == 0) {
270 return 0;
271 }
272 state_offset += to_read;
273 }
274
275 recv_end = recv_start = 0;
276 /* nothing left in the prefetch buffer */
277 if (len > recv_max_prefetch) {
278 /* this was a large read, we don't prefetch for these */
279 do {
280 r = read_bulk(p+state_offset, left);
281 ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
282 if (r < 0) {
283 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
284 return -1;
285 } else if (r == static_cast<int>(left)) {
286 state_offset = 0;
287 return 0;
288 }
289 state_offset += r;
290 left -= r;
291 } while (r > 0);
292 } else {
293 do {
294 r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
295 ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
296 << " left is " << left << " got " << r << dendl;
297 if (r < 0) {
298 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
299 return -1;
300 }
301 recv_end += r;
302 if (r >= static_cast<int>(left)) {
303 recv_start = len - state_offset;
304 memcpy(p+state_offset, recv_buf, recv_start);
305 state_offset = 0;
306 return 0;
307 }
308 left -= r;
309 } while (r > 0);
310 memcpy(p+state_offset, recv_buf, recv_end-recv_start);
311 state_offset += (recv_end - recv_start);
312 recv_end = recv_start = 0;
313 }
314 ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining "
315 << len - state_offset << " bytes" << dendl;
316 return len - state_offset;
317}
318
319void AsyncConnection::inject_delay() {
320 if (async_msgr->cct->_conf->ms_inject_internal_delays) {
321 ldout(async_msgr->cct, 10) << __func__ << " sleep for " <<
322 async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
323 utime_t t;
324 t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
325 t.sleep();
326 }
327}
328
329void AsyncConnection::process()
330{
331 ssize_t r = 0;
332 int prev_state = state;
333#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
334 utime_t ltt_recv_stamp = ceph_clock_now();
335#endif
336 bool need_dispatch_writer = false;
337 std::lock_guard<std::mutex> l(lock);
338 last_active = ceph::coarse_mono_clock::now();
339 do {
340 ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
341 prev_state = state;
342 switch (state) {
343 case STATE_OPEN:
344 {
345 char tag = -1;
346 r = read_until(sizeof(tag), &tag);
347 if (r < 0) {
348 ldout(async_msgr->cct, 1) << __func__ << " read tag failed" << dendl;
349 goto fail;
350 } else if (r > 0) {
351 break;
352 }
353
354 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
355 ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
356 set_last_keepalive(ceph_clock_now());
357 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
358 state = STATE_OPEN_KEEPALIVE2;
359 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
360 state = STATE_OPEN_KEEPALIVE2_ACK;
361 } else if (tag == CEPH_MSGR_TAG_ACK) {
362 state = STATE_OPEN_TAG_ACK;
363 } else if (tag == CEPH_MSGR_TAG_MSG) {
364 state = STATE_OPEN_MESSAGE_HEADER;
365 } else if (tag == CEPH_MSGR_TAG_CLOSE) {
366 state = STATE_OPEN_TAG_CLOSE;
367 } else {
368 ldout(async_msgr->cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
369 goto fail;
370 }
371
372 break;
373 }
374
375 case STATE_OPEN_KEEPALIVE2:
376 {
377 ceph_timespec *t;
378 r = read_until(sizeof(*t), state_buffer);
379 if (r < 0) {
380 ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
381 goto fail;
382 } else if (r > 0) {
383 break;
384 }
385
386 ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
387 t = (ceph_timespec*)state_buffer;
388 utime_t kp_t = utime_t(*t);
389 write_lock.lock();
390 _append_keepalive_or_ack(true, &kp_t);
391 write_lock.unlock();
392 ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
393 set_last_keepalive(ceph_clock_now());
394 need_dispatch_writer = true;
395 state = STATE_OPEN;
396 break;
397 }
398
399 case STATE_OPEN_KEEPALIVE2_ACK:
400 {
401 ceph_timespec *t;
402 r = read_until(sizeof(*t), state_buffer);
403 if (r < 0) {
404 ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
405 goto fail;
406 } else if (r > 0) {
407 break;
408 }
409
410 t = (ceph_timespec*)state_buffer;
411 set_last_keepalive_ack(utime_t(*t));
412 ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
413 state = STATE_OPEN;
414 break;
415 }
416
417 case STATE_OPEN_TAG_ACK:
418 {
419 ceph_le64 *seq;
420 r = read_until(sizeof(*seq), state_buffer);
421 if (r < 0) {
422 ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
423 goto fail;
424 } else if (r > 0) {
425 break;
426 }
427
428 seq = (ceph_le64*)state_buffer;
429 ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl;
430 handle_ack(*seq);
431 state = STATE_OPEN;
432 break;
433 }
434
435 case STATE_OPEN_MESSAGE_HEADER:
436 {
437#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
438 ltt_recv_stamp = ceph_clock_now();
439#endif
440 ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl;
441 ceph_msg_header header;
442 ceph_msg_header_old oldheader;
443 __u32 header_crc = 0;
444 unsigned len;
445 if (has_feature(CEPH_FEATURE_NOSRCADDR))
446 len = sizeof(header);
447 else
448 len = sizeof(oldheader);
449
450 r = read_until(len, state_buffer);
451 if (r < 0) {
452 ldout(async_msgr->cct, 1) << __func__ << " read message header failed" << dendl;
453 goto fail;
454 } else if (r > 0) {
455 break;
456 }
457
458 ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl;
459
460 if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
461 header = *((ceph_msg_header*)state_buffer);
462 if (msgr->crcflags & MSG_CRC_HEADER)
463 header_crc = ceph_crc32c(0, (unsigned char *)&header,
464 sizeof(header) - sizeof(header.crc));
465 } else {
466 oldheader = *((ceph_msg_header_old*)state_buffer);
467 // this is fugly
468 memcpy(&header, &oldheader, sizeof(header));
469 header.src = oldheader.src.name;
470 header.reserved = oldheader.reserved;
471 if (msgr->crcflags & MSG_CRC_HEADER) {
472 header.crc = oldheader.crc;
473 header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
474 }
475 }
476
477 ldout(async_msgr->cct, 20) << __func__ << " got envelope type=" << header.type
478 << " src " << entity_name_t(header.src)
479 << " front=" << header.front_len
480 << " data=" << header.data_len
481 << " off " << header.data_off << dendl;
482
483 // verify header crc
484 if (msgr->crcflags & MSG_CRC_HEADER && header_crc != header.crc) {
485 ldout(async_msgr->cct,0) << __func__ << " got bad header crc "
486 << header_crc << " != " << header.crc << dendl;
487 goto fail;
488 }
489
490 // Reset state
491 data_buf.clear();
492 front.clear();
493 middle.clear();
494 data.clear();
495 recv_stamp = ceph_clock_now();
496 current_header = header;
497 state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE;
498 break;
499 }
500
501 case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE:
502 {
503 if (policy.throttler_messages) {
504 ldout(async_msgr->cct, 10) << __func__ << " wants " << 1 << " message from policy throttler "
505 << policy.throttler_messages->get_current() << "/"
506 << policy.throttler_messages->get_max() << dendl;
507 if (!policy.throttler_messages->get_or_fail()) {
508 ldout(async_msgr->cct, 10) << __func__ << " wants 1 message from policy throttle "
509 << policy.throttler_messages->get_current() << "/"
510 << policy.throttler_messages->get_max() << " failed, just wait." << dendl;
511 // following thread pool deal with th full message queue isn't a
512 // short time, so we can wait a ms.
513 if (register_time_events.empty())
514 register_time_events.insert(center->create_time_event(1000, wakeup_handler));
515 break;
516 }
517 }
518
519 state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
520 break;
521 }
522
523 case STATE_OPEN_MESSAGE_THROTTLE_BYTES:
524 {
525 cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len;
526 if (cur_msg_size) {
527 if (policy.throttler_bytes) {
528 ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
529 << policy.throttler_bytes->get_current() << "/"
530 << policy.throttler_bytes->get_max() << dendl;
531 if (!policy.throttler_bytes->get_or_fail(cur_msg_size)) {
532 ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
533 << policy.throttler_bytes->get_current() << "/"
534 << policy.throttler_bytes->get_max() << " failed, just wait." << dendl;
535 // following thread pool deal with th full message queue isn't a
536 // short time, so we can wait a ms.
537 if (register_time_events.empty())
538 register_time_events.insert(center->create_time_event(1000, wakeup_handler));
539 break;
540 }
541 }
542 }
543
544 state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE;
545 break;
546 }
547
548 case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE:
549 {
550 if (cur_msg_size) {
551 if (!dispatch_queue->dispatch_throttler.get_or_fail(cur_msg_size)) {
552 ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from dispatch throttle "
553 << dispatch_queue->dispatch_throttler.get_current() << "/"
554 << dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl;
555 // following thread pool deal with th full message queue isn't a
556 // short time, so we can wait a ms.
557 if (register_time_events.empty())
558 register_time_events.insert(center->create_time_event(1000, wakeup_handler));
559 break;
560 }
561 }
562
563 throttle_stamp = ceph_clock_now();
564 state = STATE_OPEN_MESSAGE_READ_FRONT;
565 break;
566 }
567
568 case STATE_OPEN_MESSAGE_READ_FRONT:
569 {
570 // read front
571 unsigned front_len = current_header.front_len;
572 if (front_len) {
573 if (!front.length())
574 front.push_back(buffer::create(front_len));
575
576 r = read_until(front_len, front.c_str());
577 if (r < 0) {
578 ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl;
579 goto fail;
580 } else if (r > 0) {
581 break;
582 }
583
584 ldout(async_msgr->cct, 20) << __func__ << " got front " << front.length() << dendl;
585 }
586 state = STATE_OPEN_MESSAGE_READ_MIDDLE;
587 }
588
589 case STATE_OPEN_MESSAGE_READ_MIDDLE:
590 {
591 // read middle
592 unsigned middle_len = current_header.middle_len;
593 if (middle_len) {
594 if (!middle.length())
595 middle.push_back(buffer::create(middle_len));
596
597 r = read_until(middle_len, middle.c_str());
598 if (r < 0) {
599 ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl;
600 goto fail;
601 } else if (r > 0) {
602 break;
603 }
604 ldout(async_msgr->cct, 20) << __func__ << " got middle " << middle.length() << dendl;
605 }
606
607 state = STATE_OPEN_MESSAGE_READ_DATA_PREPARE;
608 }
609
610 case STATE_OPEN_MESSAGE_READ_DATA_PREPARE:
611 {
612 // read data
613 unsigned data_len = le32_to_cpu(current_header.data_len);
614 unsigned data_off = le32_to_cpu(current_header.data_off);
615 if (data_len) {
616 // get a buffer
617 map<ceph_tid_t,pair<bufferlist,int> >::iterator p = rx_buffers.find(current_header.tid);
618 if (p != rx_buffers.end()) {
619 ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second
620 << " at offset " << data_off
621 << " len " << p->second.first.length() << dendl;
622 data_buf = p->second.first;
623 // make sure it's big enough
624 if (data_buf.length() < data_len)
625 data_buf.push_back(buffer::create(data_len - data_buf.length()));
626 data_blp = data_buf.begin();
627 } else {
628 ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl;
629 alloc_aligned_buffer(data_buf, data_len, data_off);
630 data_blp = data_buf.begin();
631 }
632 }
633
634 msg_left = data_len;
635 state = STATE_OPEN_MESSAGE_READ_DATA;
636 }
637
638 case STATE_OPEN_MESSAGE_READ_DATA:
639 {
640 while (msg_left > 0) {
641 bufferptr bp = data_blp.get_current_ptr();
642 unsigned read = MIN(bp.length(), msg_left);
643 r = read_until(read, bp.c_str());
644 if (r < 0) {
645 ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl;
646 goto fail;
647 } else if (r > 0) {
648 break;
649 }
650
651 data_blp.advance(read);
652 data.append(bp, 0, read);
653 msg_left -= read;
654 }
655
656 if (msg_left > 0)
657 break;
658
659 state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH;
660 }
661
662 case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH:
663 {
664 ceph_msg_footer footer;
665 ceph_msg_footer_old old_footer;
666 unsigned len;
667 // footer
668 if (has_feature(CEPH_FEATURE_MSG_AUTH))
669 len = sizeof(footer);
670 else
671 len = sizeof(old_footer);
672
673 r = read_until(len, state_buffer);
674 if (r < 0) {
675 ldout(async_msgr->cct, 1) << __func__ << " read footer data error " << dendl;
676 goto fail;
677 } else if (r > 0) {
678 break;
679 }
680
681 if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
682 footer = *((ceph_msg_footer*)state_buffer);
683 } else {
684 old_footer = *((ceph_msg_footer_old*)state_buffer);
685 footer.front_crc = old_footer.front_crc;
686 footer.middle_crc = old_footer.middle_crc;
687 footer.data_crc = old_footer.data_crc;
688 footer.sig = 0;
689 footer.flags = old_footer.flags;
690 }
691 int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
692 ldout(async_msgr->cct, 10) << __func__ << " aborted = " << aborted << dendl;
693 if (aborted) {
694 ldout(async_msgr->cct, 0) << __func__ << " got " << front.length() << " + " << middle.length() << " + " << data.length()
695 << " byte message.. ABORTED" << dendl;
696 goto fail;
697 }
698
699 ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length()
700 << " + " << data.length() << " byte message" << dendl;
701 Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer,
702 front, middle, data, this);
703 if (!message) {
704 ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl;
705 goto fail;
706 }
707
708 //
709 // Check the signature if one should be present. A zero return indicates success. PLR
710 //
711
712 if (session_security.get() == NULL) {
713 ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl;
714 } else {
715 if (session_security->check_message_signature(message)) {
716 ldout(async_msgr->cct, 0) << __func__ << " Signature check failed" << dendl;
717 message->put();
718 goto fail;
719 }
720 }
721 message->set_byte_throttler(policy.throttler_bytes);
722 message->set_message_throttler(policy.throttler_messages);
723
724 // store reservation size in message, so we don't get confused
725 // by messages entering the dispatch queue through other paths.
726 message->set_dispatch_throttle_size(cur_msg_size);
727
728 message->set_recv_stamp(recv_stamp);
729 message->set_throttle_stamp(throttle_stamp);
730 message->set_recv_complete_stamp(ceph_clock_now());
731
732 // check received seq#. if it is old, drop the message.
733 // note that incoming messages may skip ahead. this is convenient for the client
734 // side queueing because messages can't be renumbered, but the (kernel) client will
735 // occasionally pull a message out of the sent queue to send elsewhere. in that case
736 // it doesn't matter if we "got" it or not.
737 uint64_t cur_seq = in_seq.read();
738 if (message->get_seq() <= cur_seq) {
739 ldout(async_msgr->cct,0) << __func__ << " got old message "
740 << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
741 << ", discarding" << dendl;
742 message->put();
743 if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
744 assert(0 == "old msgs despite reconnect_seq feature");
745 break;
746 }
747 if (message->get_seq() > cur_seq + 1) {
748 ldout(async_msgr->cct, 0) << __func__ << " missed message? skipped from seq "
749 << cur_seq << " to " << message->get_seq() << dendl;
750 if (async_msgr->cct->_conf->ms_die_on_skipped_message)
751 assert(0 == "skipped incoming seq");
752 }
753
754 message->set_connection(this);
755
756#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
757 if (message->get_type() == CEPH_MSG_OSD_OP || message->get_type() == CEPH_MSG_OSD_OPREPLY) {
758 utime_t ltt_processed_stamp = ceph_clock_now();
759 double usecs_elapsed = (ltt_processed_stamp.to_nsec()-ltt_recv_stamp.to_nsec())/1000;
760 ostringstream buf;
761 if (message->get_type() == CEPH_MSG_OSD_OP)
762 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP", false);
763 else
764 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY", false);
765 }
766#endif
767
768 // note last received message.
769 in_seq.set(message->get_seq());
770 ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq "
771 << message->get_seq() << " " << message
772 << " " << *message << dendl;
773
774 if (!policy.lossy) {
775 ack_left.inc();
776 need_dispatch_writer = true;
777 }
778 state = STATE_OPEN;
779
780 logger->inc(l_msgr_recv_messages);
781 logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
782
783 async_msgr->ms_fast_preprocess(message);
784 if (delay_state) {
785 utime_t release = message->get_recv_stamp();
786 double delay_period = 0;
787 if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
788 delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
789 release += delay_period;
790 ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on "
791 << message << " " << *message << dendl;
792 }
793 delay_state->queue(delay_period, release, message);
794 } else if (async_msgr->ms_can_fast_dispatch(message)) {
795 lock.unlock();
796 dispatch_queue->fast_dispatch(message);
797 lock.lock();
798 } else {
799 dispatch_queue->enqueue(message, message->get_priority(), conn_id);
800 }
801
802 break;
803 }
804
805 case STATE_OPEN_TAG_CLOSE:
806 {
807 ldout(async_msgr->cct, 20) << __func__ << " got CLOSE" << dendl;
808 _stop();
809 return ;
810 }
811
812 case STATE_STANDBY:
813 {
814 ldout(async_msgr->cct, 20) << __func__ << " enter STANDY" << dendl;
815
816 break;
817 }
818
819 case STATE_NONE:
820 {
821 ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
822 break;
823 }
824
825 case STATE_CLOSED:
826 {
827 ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
828 break;
829 }
830
831 case STATE_WAIT:
832 {
833 ldout(async_msgr->cct, 1) << __func__ << " enter wait state, failing" << dendl;
834 goto fail;
835 }
836
837 default:
838 {
839 if (_process_connection() < 0)
840 goto fail;
841 break;
842 }
843 }
844 } while (prev_state != state);
845
846 if (need_dispatch_writer && is_connected())
847 center->dispatch_event_external(write_handler);
848 return;
849
850 fail:
851 fault();
852}
853
854ssize_t AsyncConnection::_process_connection()
855{
856 ssize_t r = 0;
857
858 switch(state) {
859 case STATE_WAIT_SEND:
860 {
861 std::lock_guard<std::mutex> l(write_lock);
862 if (!outcoming_bl.length()) {
863 assert(state_after_send);
864 state = state_after_send;
865 state_after_send = STATE_NONE;
866 }
867 break;
868 }
869
870 case STATE_CONNECTING:
871 {
872 assert(!policy.server);
873
874 // reset connect state variables
875 got_bad_auth = false;
876 delete authorizer;
877 authorizer = NULL;
878 authorizer_buf.clear();
879 memset(&connect_msg, 0, sizeof(connect_msg));
880 memset(&connect_reply, 0, sizeof(connect_reply));
881
882 global_seq = async_msgr->get_global_seq();
883 // close old socket. this is safe because we stopped the reader thread above.
884 if (cs) {
885 center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
886 cs.close();
887 }
888
889 SocketOptions opts;
890 opts.priority = async_msgr->get_socket_priority();
891 opts.connect_bind_addr = msgr->get_myaddr();
892 r = worker->connect(get_peer_addr(), opts, &cs);
893 if (r < 0)
894 goto fail;
895
896 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
897 state = STATE_CONNECTING_RE;
898 break;
899 }
900
901 case STATE_CONNECTING_RE:
902 {
903 r = cs.is_connected();
904 if (r < 0) {
905 ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
906 if (r == -ECONNREFUSED) {
907 ldout(async_msgr->cct, 2) << __func__ << " connection refused!" << dendl;
908 dispatch_queue->queue_refused(this);
909 }
910 goto fail;
911 } else if (r == 0) {
912 ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
913 if (async_msgr->get_stack()->nonblock_connect_need_writable_event())
914 center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
915 break;
916 }
917
918 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
919 ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl;
920
921 bufferlist bl;
922 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
923 r = try_send(bl);
924 if (r == 0) {
925 state = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY;
926 ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: "
927 << get_peer_addr() << dendl;
928 } else if (r > 0) {
929 state = STATE_WAIT_SEND;
930 state_after_send = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY;
931 ldout(async_msgr->cct, 10) << __func__ << " connect wait for write banner: "
932 << get_peer_addr() << dendl;
933 } else {
934 goto fail;
935 }
936
937 break;
938 }
939
940 case STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY:
941 {
942 entity_addr_t paddr, peer_addr_for_me;
943 bufferlist myaddrbl;
944 unsigned banner_len = strlen(CEPH_BANNER);
945 unsigned need_len = banner_len + sizeof(ceph_entity_addr)*2;
946 r = read_until(need_len, state_buffer);
947 if (r < 0) {
948 ldout(async_msgr->cct, 1) << __func__ << " read banner and identify addresses failed" << dendl;
949 goto fail;
950 } else if (r > 0) {
951 break;
952 }
953
954 if (memcmp(state_buffer, CEPH_BANNER, banner_len)) {
955 ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
956 << get_peer_addr() << dendl;
957 goto fail;
958 }
959
960 bufferlist bl;
961 bl.append(state_buffer+banner_len, sizeof(ceph_entity_addr)*2);
962 bufferlist::iterator p = bl.begin();
963 try {
964 ::decode(paddr, p);
965 ::decode(peer_addr_for_me, p);
966 } catch (const buffer::error& e) {
967 lderr(async_msgr->cct) << __func__ << " decode peer addr failed " << dendl;
968 goto fail;
969 }
970 ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr "
971 << paddr << " on socket " << cs.fd() << dendl;
972 if (peer_addr != paddr) {
973 if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
974 peer_addr.get_nonce() == paddr.get_nonce()) {
975 ldout(async_msgr->cct, 0) << __func__ << " connect claims to be " << paddr
976 << " not " << peer_addr
977 << " - presumably this is the same node!" << dendl;
978 } else {
979 ldout(async_msgr->cct, 0) << __func__ << " connect claims to be "
980 << paddr << " not " << peer_addr << " - wrong node!" << dendl;
981 goto fail;
982 }
983 }
984
985 ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
986 lock.unlock();
987 async_msgr->learned_addr(peer_addr_for_me);
988 if (async_msgr->cct->_conf->ms_inject_internal_delays) {
989 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
990 ldout(msgr->cct, 10) << __func__ << " sleep for "
991 << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
992 utime_t t;
993 t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
994 t.sleep();
995 }
996 }
997
998 lock.lock();
999 if (state != STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1000 ldout(async_msgr->cct, 1) << __func__ << " state changed while learned_addr, mark_down or "
1001 << " replacing must be happened just now" << dendl;
1002 return 0;
1003 }
1004
1005 ::encode(async_msgr->get_myaddr(), myaddrbl, 0); // legacy
1006 r = try_send(myaddrbl);
1007 if (r == 0) {
1008 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1009 ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr "
1010 << async_msgr->get_myaddr() << dendl;
1011 } else if (r > 0) {
1012 state = STATE_WAIT_SEND;
1013 state_after_send = STATE_CONNECTING_SEND_CONNECT_MSG;
1014 ldout(async_msgr->cct, 10) << __func__ << " connect send my addr done: "
1015 << async_msgr->get_myaddr() << dendl;
1016 } else {
1017 ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, "
1018 << cpp_strerror(r) << dendl;
1019 goto fail;
1020 }
1021
1022 break;
1023 }
1024
1025 case STATE_CONNECTING_SEND_CONNECT_MSG:
1026 {
1027 if (!got_bad_auth) {
1028 delete authorizer;
1029 authorizer = async_msgr->get_authorizer(peer_type, false);
1030 }
1031 bufferlist bl;
1032
1033 connect_msg.features = policy.features_supported;
1034 connect_msg.host_type = async_msgr->get_myinst().name.type();
1035 connect_msg.global_seq = global_seq;
1036 connect_msg.connect_seq = connect_seq;
1037 connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true);
1038 connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1039 connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1040 if (authorizer)
1041 ldout(async_msgr->cct, 10) << __func__ << " connect_msg.authorizer_len="
1042 << connect_msg.authorizer_len << " protocol="
1043 << connect_msg.authorizer_protocol << dendl;
1044 connect_msg.flags = 0;
1045 if (policy.lossy)
1046 connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1047 bl.append((char*)&connect_msg, sizeof(connect_msg));
1048 if (authorizer) {
1049 bl.append(authorizer->bl.c_str(), authorizer->bl.length());
1050 }
1051 ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq="
1052 << connect_seq << " proto=" << connect_msg.protocol_version << dendl;
1053
1054 r = try_send(bl);
1055 if (r == 0) {
1056 state = STATE_CONNECTING_WAIT_CONNECT_REPLY;
1057 ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl;
1058 } else if (r > 0) {
1059 state = STATE_WAIT_SEND;
1060 state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY;
1061 ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl;
1062 } else {
1063 ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply "
1064 << cpp_strerror(r) << dendl;
1065 goto fail;
1066 }
1067
1068 break;
1069 }
1070
1071 case STATE_CONNECTING_WAIT_CONNECT_REPLY:
1072 {
1073 r = read_until(sizeof(connect_reply), state_buffer);
1074 if (r < 0) {
1075 ldout(async_msgr->cct, 1) << __func__ << " read connect reply failed" << dendl;
1076 goto fail;
1077 } else if (r > 0) {
1078 break;
1079 }
1080
1081 connect_reply = *((ceph_msg_connect_reply*)state_buffer);
1082
1083 ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag
1084 << " connect_seq " << connect_reply.connect_seq << " global_seq "
1085 << connect_reply.global_seq << " proto " << connect_reply.protocol_version
1086 << " flags " << (int)connect_reply.flags << " features "
1087 << connect_reply.features << dendl;
1088 state = STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH;
1089
1090 break;
1091 }
1092
1093 case STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH:
1094 {
1095 bufferlist authorizer_reply;
1096 if (connect_reply.authorizer_len) {
1097 ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl;
1098 assert(connect_reply.authorizer_len < 4096);
1099 r = read_until(connect_reply.authorizer_len, state_buffer);
1100 if (r < 0) {
1101 ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl;
1102 goto fail;
1103 } else if (r > 0) {
1104 break;
1105 }
1106
1107 authorizer_reply.append(state_buffer, connect_reply.authorizer_len);
1108 bufferlist::iterator iter = authorizer_reply.begin();
1109 if (authorizer && !authorizer->verify_reply(iter)) {
1110 ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
1111 goto fail;
1112 }
1113 }
1114 r = handle_connect_reply(connect_msg, connect_reply);
1115 if (r < 0)
1116 goto fail;
1117
1118 // state must be changed!
1119 assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH);
1120 break;
1121 }
1122
1123 case STATE_CONNECTING_WAIT_ACK_SEQ:
1124 {
1125 uint64_t newly_acked_seq = 0;
1126
1127 r = read_until(sizeof(newly_acked_seq), state_buffer);
1128 if (r < 0) {
1129 ldout(async_msgr->cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1130 goto fail;
1131 } else if (r > 0) {
1132 break;
1133 }
1134
1135 newly_acked_seq = *((uint64_t*)state_buffer);
1136 ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1137 << " vs out_seq " << out_seq.read() << dendl;
1138 discard_requeued_up_to(newly_acked_seq);
1139 //while (newly_acked_seq > out_seq.read()) {
1140 // Message *m = _get_next_outgoing(NULL);
1141 // assert(m);
1142 // ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
1143 // << " " << *m << dendl;
1144 // assert(m->get_seq() <= newly_acked_seq);
1145 // m->put();
1146 // out_seq.inc();
1147 //}
1148
1149 bufferlist bl;
1150 uint64_t s = in_seq.read();
1151 bl.append((char*)&s, sizeof(s));
1152 r = try_send(bl);
1153 if (r == 0) {
1154 state = STATE_CONNECTING_READY;
1155 ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl;
1156 } else if (r > 0) {
1157 state_after_send = STATE_CONNECTING_READY;
1158 state = STATE_WAIT_SEND;
1159 ldout(async_msgr->cct, 10) << __func__ << " continue send in_seq " << dendl;
1160 } else {
1161 goto fail;
1162 }
1163 break;
1164 }
1165
1166 case STATE_CONNECTING_READY:
1167 {
1168 // hooray!
1169 peer_global_seq = connect_reply.global_seq;
1170 policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1171 state = STATE_OPEN;
1172 once_ready = true;
1173 connect_seq += 1;
1174 assert(connect_seq == connect_reply.connect_seq);
1175 backoff = utime_t();
1176 set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features);
1177 ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq
1178 << ", lossy = " << policy.lossy << ", features "
1179 << get_features() << dendl;
1180
1181 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1182 // connection. PLR
1183 if (authorizer != NULL) {
1184 session_security.reset(
1185 get_auth_session_handler(async_msgr->cct,
1186 authorizer->protocol,
1187 authorizer->session_key,
1188 get_features()));
1189 } else {
1190 // We have no authorizer, so we shouldn't be applying security to messages in this AsyncConnection. PLR
1191 session_security.reset();
1192 }
1193
1194 if (delay_state)
1195 assert(delay_state->ready());
1196 dispatch_queue->queue_connect(this);
1197 async_msgr->ms_deliver_handle_fast_connect(this);
1198
1199 // make sure no pending tick timer
1200 if (last_tick_id)
1201 center->delete_time_event(last_tick_id);
1202 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
1203
1204 // message may in queue between last _try_send and connection ready
1205 // write event may already notify and we need to force scheduler again
1206 write_lock.lock();
1207 can_write = WriteStatus::CANWRITE;
1208 if (is_queued())
1209 center->dispatch_event_external(write_handler);
1210 write_lock.unlock();
1211 maybe_start_delay_thread();
1212 break;
1213 }
1214
1215 case STATE_ACCEPTING:
1216 {
1217 bufferlist bl;
1218 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
1219
1220 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1221
1222 ::encode(async_msgr->get_myaddr(), bl, 0); // legacy
1223 port = async_msgr->get_myaddr().get_port();
1224 ::encode(socket_addr, bl, 0); // legacy
1225 ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl;
1226
1227 r = try_send(bl);
1228 if (r == 0) {
1229 state = STATE_ACCEPTING_WAIT_BANNER_ADDR;
1230 ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: "
1231 << get_peer_addr() << dendl;
1232 } else if (r > 0) {
1233 state = STATE_WAIT_SEND;
1234 state_after_send = STATE_ACCEPTING_WAIT_BANNER_ADDR;
1235 ldout(async_msgr->cct, 10) << __func__ << " wait for write banner and addr: "
1236 << get_peer_addr() << dendl;
1237 } else {
1238 goto fail;
1239 }
1240
1241 break;
1242 }
1243 case STATE_ACCEPTING_WAIT_BANNER_ADDR:
1244 {
1245 bufferlist addr_bl;
1246 entity_addr_t peer_addr;
1247
1248 r = read_until(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr), state_buffer);
1249 if (r < 0) {
1250 ldout(async_msgr->cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1251 goto fail;
1252 } else if (r > 0) {
1253 break;
1254 }
1255
1256 if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1257 ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer
1258 << "' (should be '" << CEPH_BANNER << "')" << dendl;
1259 goto fail;
1260 }
1261
1262 addr_bl.append(state_buffer+strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1263 {
1264 bufferlist::iterator ti = addr_bl.begin();
1265 ::decode(peer_addr, ti);
1266 }
1267
1268 ldout(async_msgr->cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1269 if (peer_addr.is_blank_ip()) {
1270 // peer apparently doesn't know what ip they have; figure it out for them.
1271 int port = peer_addr.get_port();
1272 peer_addr.u = socket_addr.u;
1273 peer_addr.set_port(port);
1274 ldout(async_msgr->cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1275 << " (socket is " << socket_addr << ")" << dendl;
1276 }
1277 set_peer_addr(peer_addr); // so that connection_state gets set up
1278 state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
1279 break;
1280 }
1281
1282 case STATE_ACCEPTING_WAIT_CONNECT_MSG:
1283 {
1284 r = read_until(sizeof(connect_msg), state_buffer);
1285 if (r < 0) {
1286 ldout(async_msgr->cct, 1) << __func__ << " read connect msg failed" << dendl;
1287 goto fail;
1288 } else if (r > 0) {
1289 break;
1290 }
1291
1292 connect_msg = *((ceph_msg_connect*)state_buffer);
1293 state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1294 break;
1295 }
1296
1297 case STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH:
1298 {
1299 bufferlist authorizer_reply;
1300
1301 if (connect_msg.authorizer_len) {
1302 if (!authorizer_buf.length())
1303 authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
1304
1305 r = read_until(connect_msg.authorizer_len, authorizer_buf.c_str());
1306 if (r < 0) {
1307 ldout(async_msgr->cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1308 goto fail;
1309 } else if (r > 0) {
1310 break;
1311 }
1312 }
1313
1314 ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq "
1315 << connect_msg.connect_seq << " global_seq "
1316 << connect_msg.global_seq << dendl;
1317 set_peer_type(connect_msg.host_type);
1318 policy = async_msgr->get_policy(connect_msg.host_type);
1319 ldout(async_msgr->cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1320 << ", policy.lossy=" << policy.lossy << " policy.server="
1321 << policy.server << " policy.standby=" << policy.standby
1322 << " policy.resetcheck=" << policy.resetcheck << dendl;
1323
1324 r = handle_connect_msg(connect_msg, authorizer_buf, authorizer_reply);
1325 if (r < 0)
1326 goto fail;
1327
1328 // state is changed by "handle_connect_msg"
1329 assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH);
1330 break;
1331 }
1332
1333 case STATE_ACCEPTING_WAIT_SEQ:
1334 {
1335 uint64_t newly_acked_seq;
1336 r = read_until(sizeof(newly_acked_seq), state_buffer);
1337 if (r < 0) {
1338 ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
1339 goto fail_registered;
1340 } else if (r > 0) {
1341 break;
1342 }
1343
1344 newly_acked_seq = *((uint64_t*)state_buffer);
1345 ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl;
1346 discard_requeued_up_to(newly_acked_seq);
1347 state = STATE_ACCEPTING_READY;
1348 break;
1349 }
1350
1351 case STATE_ACCEPTING_READY:
1352 {
1353 ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
1354 state = STATE_OPEN;
1355 memset(&connect_msg, 0, sizeof(connect_msg));
1356
1357 if (delay_state)
1358 assert(delay_state->ready());
1359 // make sure no pending tick timer
1360 if (last_tick_id)
1361 center->delete_time_event(last_tick_id);
1362 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
1363
1364 write_lock.lock();
1365 can_write = WriteStatus::CANWRITE;
1366 if (is_queued())
1367 center->dispatch_event_external(write_handler);
1368 write_lock.unlock();
1369 maybe_start_delay_thread();
1370 break;
1371 }
1372
1373 default:
1374 {
1375 lderr(async_msgr->cct) << __func__ << " bad state: " << state << dendl;
1376 ceph_abort();
1377 }
1378 }
1379
1380 return 0;
1381
1382fail_registered:
1383 ldout(async_msgr->cct, 10) << "accept fault after register" << dendl;
1384 inject_delay();
1385
1386fail:
1387 return -1;
1388}
1389
1390int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &reply)
1391{
1392 uint64_t feat_missing;
1393 if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1394 ldout(async_msgr->cct, 0) << __func__ << " connect protocol feature mismatch, my "
1395 << std::hex << connect.features << " < peer "
1396 << reply.features << " missing "
1397 << (reply.features & ~policy.features_supported)
1398 << std::dec << dendl;
1399 goto fail;
1400 }
1401
1402 if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1403 ldout(async_msgr->cct, 0) << __func__ << " connect protocol version mismatch, my "
1404 << connect.protocol_version << " != " << reply.protocol_version
1405 << dendl;
1406 goto fail;
1407 }
1408
1409 if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1410 ldout(async_msgr->cct,0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1411 if (got_bad_auth)
1412 goto fail;
1413 got_bad_auth = true;
1414 delete authorizer;
1415 authorizer = async_msgr->get_authorizer(peer_type, true); // try harder
1416 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1417 }
1418 if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1419 ldout(async_msgr->cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1420 was_session_reset();
1421 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1422 }
1423 if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1424 global_seq = async_msgr->get_global_seq(reply.global_seq);
1425 ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1426 << reply.global_seq << " chose new "
1427 << global_seq << dendl;
1428 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1429 }
1430 if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1431 assert(reply.connect_seq > connect_seq);
1432 ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_SESSION "
1433 << connect_seq << " -> "
1434 << reply.connect_seq << dendl;
1435 connect_seq = reply.connect_seq;
1436 state = STATE_CONNECTING_SEND_CONNECT_MSG;
1437 }
1438 if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1439 ldout(async_msgr->cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1440 state = STATE_WAIT;
1441 }
1442
1443 feat_missing = policy.features_required & ~(uint64_t)connect_reply.features;
1444 if (feat_missing) {
1445 ldout(async_msgr->cct, 1) << __func__ << " missing required features " << std::hex
1446 << feat_missing << std::dec << dendl;
1447 goto fail;
1448 }
1449
1450 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1451 ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1452 state = STATE_CONNECTING_WAIT_ACK_SEQ;
1453 }
1454 if (reply.tag == CEPH_MSGR_TAG_READY) {
1455 ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1456 state = STATE_CONNECTING_READY;
1457 }
1458
1459 return 0;
1460
1461 fail:
1462 return -1;
1463}
1464
1465ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
1466 bufferlist &authorizer_reply)
1467{
1468 ssize_t r = 0;
1469 ceph_msg_connect_reply reply;
1470 bufferlist reply_bl;
1471
1472 memset(&reply, 0, sizeof(reply));
1473 reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
1474
1475 // mismatch?
1476 ldout(async_msgr->cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1477 << ", their proto " << connect.protocol_version << dendl;
1478 if (connect.protocol_version != reply.protocol_version) {
1479 return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply);
1480 }
1481 // require signatures for cephx?
1482 if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
1483 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
1484 peer_type == CEPH_ENTITY_TYPE_MDS) {
1485 if (async_msgr->cct->_conf->cephx_require_signatures ||
1486 async_msgr->cct->_conf->cephx_cluster_require_signatures) {
1487 ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
1488 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1489 }
1490 } else {
1491 if (async_msgr->cct->_conf->cephx_require_signatures ||
1492 async_msgr->cct->_conf->cephx_service_require_signatures) {
1493 ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl;
1494 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1495 }
1496 }
1497 }
1498 uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features;
1499 if (feat_missing) {
1500 ldout(async_msgr->cct, 1) << __func__ << " peer missing required features "
1501 << std::hex << feat_missing << std::dec << dendl;
1502 return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply);
1503 }
1504
1505 lock.unlock();
1506
1507 bool authorizer_valid;
1508 if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl,
1509 authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) {
1510 lock.lock();
1511 ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
1512 session_security.reset();
1513 return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply);
1514 }
1515
1516 // We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR
1517 ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl;
1518
1519 // existing?
1520 AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
1521
1522 inject_delay();
1523
1524 lock.lock();
1525 if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1526 ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl;
1527 assert(state == STATE_CLOSED);
1528 goto fail;
1529 }
1530
1531 if (existing == this)
1532 existing = NULL;
1533 if (existing) {
1534 // There is no possible that existing connection will acquire this
1535 // connection's lock
1536 existing->lock.lock(); // skip lockdep check (we are locking a second AsyncConnection here)
1537
1538 if (existing->replacing || existing->state == STATE_CLOSED) {
1539 ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
1540 << " existing_state=" << get_state_name(existing->state) << dendl;
1541 reply.global_seq = existing->peer_global_seq;
1542 r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
1543 existing->lock.unlock();
1544 if (r < 0)
1545 goto fail;
1546 return 0;
1547 }
1548
1549 if (connect.global_seq < existing->peer_global_seq) {
1550 ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
1551 << ".gseq " << existing->peer_global_seq << " > "
1552 << connect.global_seq << ", RETRY_GLOBAL" << dendl;
1553 reply.global_seq = existing->peer_global_seq; // so we can send it below..
1554 existing->lock.unlock();
1555 return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
1556 } else {
1557 ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
1558 << ".gseq " << existing->peer_global_seq
1559 << " <= " << connect.global_seq << ", looks ok" << dendl;
1560 }
1561
1562 if (existing->policy.lossy) {
1563 ldout(async_msgr->cct, 0) << __func__ << " accept replacing existing (lossy) channel (new one lossy="
1564 << policy.lossy << ")" << dendl;
1565 existing->was_session_reset();
1566 goto replace;
1567 }
1568
1569 ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq
1570 << " vs existing csq=" << existing->connect_seq << " existing_state="
1571 << get_state_name(existing->state) << dendl;
1572
1573 if (connect.connect_seq == 0 && existing->connect_seq > 0) {
1574 ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl;
1575 // this is a hard reset from peer
1576 is_reset_from_peer = true;
1577 if (policy.resetcheck)
1578 existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
1579 goto replace;
1580 }
1581
1582 if (connect.connect_seq < existing->connect_seq) {
1583 // old attempt, or we sent READY but they didn't get it.
1584 ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".cseq "
1585 << existing->connect_seq << " > " << connect.connect_seq
1586 << ", RETRY_SESSION" << dendl;
1587 reply.connect_seq = existing->connect_seq + 1;
1588 existing->lock.unlock();
1589 return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
1590 }
1591
1592 if (connect.connect_seq == existing->connect_seq) {
1593 // if the existing connection successfully opened, and/or
1594 // subsequently went to standby, then the peer should bump
1595 // their connect_seq and retry: this is not a connection race
1596 // we need to resolve here.
1597 if (existing->state == STATE_OPEN ||
1598 existing->state == STATE_STANDBY) {
1599 ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing
1600 << ".cseq " << existing->connect_seq << " == "
1601 << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
1602 reply.connect_seq = existing->connect_seq + 1;
1603 existing->lock.unlock();
1604 return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
1605 }
1606
1607 // connection race?
1608 if (peer_addr < async_msgr->get_myaddr() || existing->policy.server) {
1609 // incoming wins
1610 ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing
1611 << ".cseq " << existing->connect_seq << " == " << connect.connect_seq
1612 << ", or we are server, replacing my attempt" << dendl;
1613 goto replace;
1614 } else {
1615 // our existing outgoing wins
1616 ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing "
1617 << existing << ".cseq " << existing->connect_seq
1618 << " == " << connect.connect_seq << ", sending WAIT" << dendl;
1619 assert(peer_addr > async_msgr->get_myaddr());
1620 existing->lock.unlock();
1621 return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
1622 }
1623 }
1624
1625 assert(connect.connect_seq > existing->connect_seq);
1626 assert(connect.global_seq >= existing->peer_global_seq);
1627 if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
1628 existing->connect_seq == 0) {
1629 ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
1630 << connect.connect_seq << ", " << existing << ".cseq = "
1631 << existing->connect_seq << "), sending RESETSESSION" << dendl;
1632 existing->lock.unlock();
1633 return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
1634 }
1635
1636 // reconnect
1637 ldout(async_msgr->cct, 10) << __func__ << " accept peer sent cseq " << connect.connect_seq
1638 << " > " << existing->connect_seq << dendl;
1639 goto replace;
1640 } // existing
1641 else if (!replacing && connect.connect_seq > 0) {
1642 // we reset, and they are opening a new session
1643 ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
1644 << connect.connect_seq << "), sending RESETSESSION" << dendl;
1645 return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
1646 } else {
1647 // new session
1648 ldout(async_msgr->cct, 10) << __func__ << " accept new session" << dendl;
1649 existing = NULL;
1650 goto open;
1651 }
1652 ceph_abort();
1653
1654 replace:
1655 ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
1656
1657 inject_delay();
1658 if (existing->policy.lossy) {
1659 // disconnect from the Connection
1660 ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl;
1661 existing->_stop();
1662 existing->dispatch_queue->queue_reset(existing.get());
1663 } else {
1664 assert(can_write == WriteStatus::NOWRITE);
1665 existing->write_lock.lock();
1666
1667 // reset the in_seq if this is a hard reset from peer,
1668 // otherwise we respect our original connection's value
1669 if (is_reset_from_peer) {
1670 existing->is_reset_from_peer = true;
1671 }
1672
1673 center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
1674
1675 // Clean up output buffer
1676 existing->outcoming_bl.clear();
1677 if (existing->delay_state) {
1678 existing->delay_state->flush();
1679 assert(!delay_state);
1680 }
1681 existing->requeue_sent();
1682 existing->reset_recv_state();
1683
1684 auto temp_cs = std::move(cs);
1685 EventCenter *new_center = center;
1686 Worker *new_worker = worker;
1687 // avoid _stop shutdown replacing socket
1688 // queue a reset on the new connection, which we're dumping for the old
1689 _stop();
1690
1691 dispatch_queue->queue_reset(this);
1692 ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
1693 existing->can_write = WriteStatus::REPLACING;
1694 existing->open_write = false;
1695 existing->replacing = true;
1696 existing->state_offset = 0;
1697 // avoid previous thread modify event
1698 existing->state = STATE_NONE;
1699 // Discard existing prefetch buffer in `recv_buf`
1700 existing->recv_start = existing->recv_end = 0;
1701 // there shouldn't exist any buffer
1702 assert(recv_start == recv_end);
1703
1704 auto deactivate_existing = std::bind(
1705 [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable {
1706 // we need to delete time event in original thread
1707 {
1708 std::lock_guard<std::mutex> l(existing->lock);
1709 if (existing->state == STATE_NONE) {
1710 existing->shutdown_socket();
1711 existing->cs = std::move(cs);
1712 existing->worker->references--;
1713 new_worker->references++;
1714 existing->logger = new_worker->get_perf_counter();
1715 existing->worker = new_worker;
1716 existing->center = new_center;
1717 if (existing->delay_state)
1718 existing->delay_state->set_center(new_center);
1719 } else if (existing->state == STATE_CLOSED) {
1720 auto back_to_close = std::bind(
1721 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
1722 new_center->submit_to(
1723 new_center->get_id(), std::move(back_to_close), true);
1724 return ;
1725 } else {
1726 ceph_abort();
1727 }
1728 }
1729
1730 // Before changing existing->center, it may already exists some events in existing->center's queue.
1731 // Then if we mark down `existing`, it will execute in another thread and clean up connection.
1732 // Previous event will result in segment fault
1733 auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable {
1734 std::lock_guard<std::mutex> l(existing->lock);
1735 if (existing->state == STATE_CLOSED)
1736 return ;
1737 assert(existing->state == STATE_NONE);
1738
1739 existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
1740 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler);
1741 reply.global_seq = existing->peer_global_seq;
1742 if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
1743 // handle error
1744 existing->fault();
1745 }
1746 };
1747 if (existing->center->in_thread())
1748 transfer_existing();
1749 else
1750 existing->center->submit_to(
1751 existing->center->get_id(), std::move(transfer_existing), true);
1752 }, std::move(temp_cs));
1753
1754 existing->center->submit_to(
1755 existing->center->get_id(), std::move(deactivate_existing), true);
1756 existing->write_lock.unlock();
1757 existing->lock.unlock();
1758 return 0;
1759 }
1760 existing->lock.unlock();
1761
1762 open:
1763 connect_seq = connect.connect_seq + 1;
1764 peer_global_seq = connect.global_seq;
1765 ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
1766 << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl;
1767
1768 int next_state;
1769
1770 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
1771 if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
1772 reply.tag = CEPH_MSGR_TAG_SEQ;
1773 next_state = STATE_ACCEPTING_WAIT_SEQ;
1774 } else {
1775 reply.tag = CEPH_MSGR_TAG_READY;
1776 next_state = STATE_ACCEPTING_READY;
1777 discard_requeued_up_to(0);
1778 is_reset_from_peer = false;
1779 in_seq.set(0);
1780 }
1781
1782 // send READY reply
1783 reply.features = policy.features_supported;
1784 reply.global_seq = async_msgr->get_global_seq();
1785 reply.connect_seq = connect_seq;
1786 reply.flags = 0;
1787 reply.authorizer_len = authorizer_reply.length();
1788 if (policy.lossy)
1789 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
1790
1791 set_features((uint64_t)reply.features & (uint64_t)connect.features);
1792 ldout(async_msgr->cct, 10) << __func__ << " accept features " << get_features() << dendl;
1793
1794 session_security.reset(
1795 get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol,
1796 session_key, get_features()));
1797
1798 reply_bl.append((char*)&reply, sizeof(reply));
1799
1800 if (reply.authorizer_len)
1801 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
1802
1803 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1804 uint64_t s = in_seq.read();
1805 reply_bl.append((char*)&s, sizeof(s));
1806 }
1807
1808 lock.unlock();
1809 // Because "replacing" will prevent other connections preempt this addr,
1810 // it's safe that here we don't acquire Connection's lock
1811 r = async_msgr->accept_conn(this);
1812
1813 inject_delay();
1814
1815 lock.lock();
1816 replacing = false;
1817 if (r < 0) {
1818 ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
1819 << " just fail later one(this)" << dendl;
1820 goto fail_registered;
1821 }
1822 if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1823 ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
1824 assert(state == STATE_CLOSED);
1825 goto fail_registered;
1826 }
1827
1828 r = try_send(reply_bl);
1829 if (r < 0)
1830 goto fail_registered;
1831
1832 // notify
1833 dispatch_queue->queue_accept(this);
1834 async_msgr->ms_deliver_handle_fast_accept(this);
1835 once_ready = true;
1836
1837 if (r == 0) {
1838 state = next_state;
1839 ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl;
1840 } else {
1841 state = STATE_WAIT_SEND;
1842 state_after_send = next_state;
1843 }
1844
1845 return 0;
1846
1847 fail_registered:
1848 ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl;
1849 inject_delay();
1850
1851 fail:
1852 ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl;
1853 return -1;
1854}
1855
1856void AsyncConnection::_connect()
1857{
1858 ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl;
1859
1860 state = STATE_CONNECTING;
1861 // rescheduler connection in order to avoid lock dep
1862 // may called by external thread(send_message)
1863 center->dispatch_event_external(read_handler);
1864}
1865
1866void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
1867{
1868 ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl;
1869 assert(socket.fd() >= 0);
1870
1871 std::lock_guard<std::mutex> l(lock);
1872 cs = std::move(socket);
1873 socket_addr = addr;
1874 state = STATE_ACCEPTING;
1875 // rescheduler connection in order to avoid lock dep
1876 center->dispatch_event_external(read_handler);
1877}
1878
1879int AsyncConnection::send_message(Message *m)
1880{
1881 FUNCTRACE();
1882 lgeneric_subdout(async_msgr->cct, ms,
1883 1) << "-- " << async_msgr->get_myaddr() << " --> "
1884 << get_peer_addr() << " -- "
1885 << *m << " -- " << m << " con "
1886 << m->get_connection().get()
1887 << dendl;
1888
1889 // optimistic think it's ok to encode(actually may broken now)
1890 if (!m->get_priority())
1891 m->set_priority(async_msgr->get_default_send_priority());
1892
1893 m->get_header().src = async_msgr->get_myname();
1894 m->set_connection(this);
1895
1896 if (m->get_type() == CEPH_MSG_OSD_OP)
1897 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
1898 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1899 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
1900
1901 if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
1902 ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
1903 std::lock_guard<std::mutex> l(write_lock);
1904 if (can_write != WriteStatus::CLOSED) {
1905 dispatch_queue->local_delivery(m, m->get_priority());
1906 } else {
1907 ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
1908 << " Drop message " << m << dendl;
1909 m->put();
1910 }
1911 return 0;
1912 }
1913
1914 last_active = ceph::coarse_mono_clock::now();
1915 // we don't want to consider local message here, it's too lightweight which
1916 // may disturb users
1917 logger->inc(l_msgr_send_messages);
1918
1919 bufferlist bl;
1920 uint64_t f = get_features();
1921
1922 // TODO: Currently not all messages supports reencode like MOSDMap, so here
1923 // only let fast dispatch support messages prepare message
1924 bool can_fast_prepare = async_msgr->ms_can_fast_dispatch(m);
1925 if (can_fast_prepare)
1926 prepare_send_message(f, m, bl);
1927
1928 std::lock_guard<std::mutex> l(write_lock);
1929 // "features" changes will change the payload encoding
1930 if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) {
1931 // ensure the correctness of message encoding
1932 bl.clear();
1933 m->get_payload().clear();
1934 ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous "
1935 << f << " != " << get_features() << dendl;
1936 }
1937 if (!is_queued() && can_write == WriteStatus::CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) {
1938 if (!bl.length())
1939 prepare_send_message(get_features(), m, bl);
1940 logger->inc(l_msgr_send_messages_inline);
1941 if (write_message(m, bl, false) < 0) {
1942 ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
1943 // we want to handle fault within internal thread
1944 center->dispatch_event_external(write_handler);
1945 }
1946 } else if (can_write == WriteStatus::CLOSED) {
1947 ldout(async_msgr->cct, 10) << __func__ << " connection closed."
1948 << " Drop message " << m << dendl;
1949 m->put();
1950 } else {
1951 m->trace.event("async enqueueing message");
1952 out_q[m->get_priority()].emplace_back(std::move(bl), m);
1953 ldout(async_msgr->cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl;
1954 if (can_write != WriteStatus::REPLACING)
1955 center->dispatch_event_external(write_handler);
1956 }
1957 return 0;
1958}
1959
1960void AsyncConnection::requeue_sent()
1961{
1962 if (sent.empty())
1963 return;
1964
1965 list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1966 while (!sent.empty()) {
1967 Message* m = sent.back();
1968 sent.pop_back();
1969 ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
1970 << " (" << m->get_seq() << ")" << dendl;
1971 rq.push_front(make_pair(bufferlist(), m));
1972 out_seq.dec();
1973 }
1974}
1975
1976void AsyncConnection::discard_requeued_up_to(uint64_t seq)
1977{
1978 ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl;
1979 std::lock_guard<std::mutex> l(write_lock);
1980 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
1981 return;
1982 list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1983 while (!rq.empty()) {
1984 pair<bufferlist, Message*> p = rq.front();
1985 if (p.second->get_seq() == 0 || p.second->get_seq() > seq)
1986 break;
1987 ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq()
1988 << " <= " << seq << ", discarding" << dendl;
1989 p.second->put();
1990 rq.pop_front();
1991 out_seq.inc();
1992 }
1993 if (rq.empty())
1994 out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1995}
1996
1997/*
1998 * Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue
1999 * Must hold pipe_lock prior to calling.
2000 */
2001void AsyncConnection::discard_out_queue()
2002{
2003 ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
2004
2005 for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
2006 ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
2007 (*p)->put();
2008 }
2009 sent.clear();
2010 for (map<int, list<pair<bufferlist, Message*> > >::iterator p = out_q.begin(); p != out_q.end(); ++p)
2011 for (list<pair<bufferlist, Message*> >::iterator r = p->second.begin(); r != p->second.end(); ++r) {
2012 ldout(async_msgr->cct, 20) << __func__ << " discard " << r->second << dendl;
2013 r->second->put();
2014 }
2015 out_q.clear();
2016 outcoming_bl.clear();
2017}
2018
2019int AsyncConnection::randomize_out_seq()
2020{
2021 if (get_features() & CEPH_FEATURE_MSG_AUTH) {
2022 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
2023 // here. We'll check it on the call. PLR
2024 uint64_t rand_seq;
2025 int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
2026 rand_seq &= SEQ_MASK;
2027 lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
2028 out_seq.set(rand_seq);
2029 return seq_error;
2030 } else {
2031 // previously, seq #'s always started at 0.
2032 out_seq.set(0);
2033 return 0;
2034 }
2035}
2036
2037void AsyncConnection::fault()
2038{
2039 if (state == STATE_CLOSED || state == STATE_NONE) {
2040 ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl;
2041 return ;
2042 }
2043
2044 if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
2045 ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
2046 _stop();
2047 dispatch_queue->queue_reset(this);
2048 return ;
2049 }
2050
2051 write_lock.lock();
2052 can_write = WriteStatus::NOWRITE;
2053 shutdown_socket();
2054 open_write = false;
2055
2056 // queue delayed items immediately
2057 if (delay_state)
2058 delay_state->flush();
2059 // requeue sent items
2060 requeue_sent();
2061 recv_start = recv_end = 0;
2062 state_offset = 0;
2063 replacing = false;
2064 is_reset_from_peer = false;
2065 outcoming_bl.clear();
2066 if (!once_ready && !is_queued() &&
2067 state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2068 ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
2069 << " accept state just closed" << dendl;
2070 write_lock.unlock();
2071 _stop();
2072 dispatch_queue->queue_reset(this);
2073 return ;
2074 }
2075 reset_recv_state();
2076 if (policy.standby && !is_queued() && state != STATE_WAIT) {
2077 ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
2078 state = STATE_STANDBY;
2079 write_lock.unlock();
2080 return;
2081 }
2082
2083 write_lock.unlock();
2084 if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY) &&
2085 state != STATE_WAIT) { // STATE_WAIT is coming from STATE_CONNECTING_*
2086 // policy maybe empty when state is in accept
2087 if (policy.server) {
2088 ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl;
2089 state = STATE_STANDBY;
2090 } else {
2091 ldout(async_msgr->cct, 0) << __func__ << " initiating reconnect" << dendl;
2092 connect_seq++;
2093 state = STATE_CONNECTING;
2094 }
2095 backoff = utime_t();
2096 center->dispatch_event_external(read_handler);
2097 } else {
2098 if (state == STATE_WAIT) {
2099 backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
2100 } else if (backoff == utime_t()) {
2101 backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff);
2102 } else {
2103 backoff += backoff;
2104 if (backoff > async_msgr->cct->_conf->ms_max_backoff)
2105 backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
2106 }
2107
2108 state = STATE_CONNECTING;
2109 ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
2110 // woke up again;
2111 register_time_events.insert(center->create_time_event(
2112 backoff.to_nsec()/1000, wakeup_handler));
2113 }
2114}
2115
2116void AsyncConnection::was_session_reset()
2117{
2118 ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
2119 std::lock_guard<std::mutex> l(write_lock);
2120 if (delay_state)
2121 delay_state->discard();
2122 dispatch_queue->discard_queue(conn_id);
2123 discard_out_queue();
2124
2125 dispatch_queue->queue_remote_reset(this);
2126
2127 if (randomize_out_seq()) {
2128 ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
2129 }
2130
2131 in_seq.set(0);
2132 connect_seq = 0;
2133 // it's safe to directly set 0, double locked
2134 ack_left.set(0);
2135 once_ready = false;
2136 can_write = WriteStatus::NOWRITE;
2137}
2138
2139void AsyncConnection::_stop()
2140{
2141 if (state == STATE_CLOSED)
2142 return ;
2143
2144 if (delay_state)
2145 delay_state->flush();
2146
2147 ldout(async_msgr->cct, 2) << __func__ << dendl;
2148 std::lock_guard<std::mutex> l(write_lock);
2149
2150 reset_recv_state();
2151 dispatch_queue->discard_queue(conn_id);
2152 discard_out_queue();
2153 async_msgr->unregister_conn(this);
2154 worker->release_worker();
2155
2156 state = STATE_CLOSED;
2157 open_write = false;
2158 can_write = WriteStatus::CLOSED;
2159 state_offset = 0;
2160 // Make sure in-queue events will been processed
2161 center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
2162}
2163
2164void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
2165{
2166 ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
2167
2168 // associate message with Connection (for benefit of encode_payload)
2169 if (m->empty_payload())
2170 ldout(async_msgr->cct, 20) << __func__ << " encoding features "
2171 << features << " " << m << " " << *m << dendl;
2172 else
2173 ldout(async_msgr->cct, 20) << __func__ << " half-reencoding features "
2174 << features << " " << m << " " << *m << dendl;
2175
2176 // encode and copy out of *m
2177 m->encode(features, msgr->crcflags);
2178
2179 bl.append(m->get_payload());
2180 bl.append(m->get_middle());
2181 bl.append(m->get_data());
2182}
2183
2184ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
2185{
2186 FUNCTRACE();
2187 assert(can_write == WriteStatus::CANWRITE);
2188 m->set_seq(out_seq.inc());
2189
2190 if (!policy.lossy) {
2191 // put on sent list
2192 sent.push_back(m);
2193 m->get();
2194 }
2195
2196 if (msgr->crcflags & MSG_CRC_HEADER)
2197 m->calc_header_crc();
2198
2199 ceph_msg_header& header = m->get_header();
2200 ceph_msg_footer& footer = m->get_footer();
2201
2202 // TODO: let sign_message could be reentry?
2203 // Now that we have all the crcs calculated, handle the
2204 // digital signature for the message, if the AsyncConnection has session
2205 // security set up. Some session security options do not
2206 // actually calculate and check the signature, but they should
2207 // handle the calls to sign_message and check_signature. PLR
2208 if (session_security.get() == NULL) {
2209 ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl;
2210 } else {
2211 if (session_security->sign_message(m)) {
2212 ldout(async_msgr->cct, 20) << __func__ << " failed to sign m="
2213 << m << "): sig = " << footer.sig << dendl;
2214 } else {
2215 ldout(async_msgr->cct, 20) << __func__ << " signed m=" << m
2216 << "): sig = " << footer.sig << dendl;
2217 }
2218 }
2219
2220 unsigned original_bl_len = outcoming_bl.length();
2221
2222 outcoming_bl.append(CEPH_MSGR_TAG_MSG);
2223
2224 if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
2225 outcoming_bl.append((char*)&header, sizeof(header));
2226 } else {
2227 ceph_msg_header_old oldheader;
2228 memcpy(&oldheader, &header, sizeof(header));
2229 oldheader.src.name = header.src;
2230 oldheader.src.addr = get_peer_addr();
2231 oldheader.orig_src = oldheader.src;
2232 oldheader.reserved = header.reserved;
2233 oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
2234 sizeof(oldheader) - sizeof(oldheader.crc));
2235 outcoming_bl.append((char*)&oldheader, sizeof(oldheader));
2236 }
2237
2238 ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type
2239 << " src " << entity_name_t(header.src)
2240 << " front=" << header.front_len
2241 << " data=" << header.data_len
2242 << " off " << header.data_off << dendl;
2243
2244 if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
2245 std::list<buffer::ptr>::const_iterator pb;
2246 for (pb = bl.buffers().begin(); pb != bl.buffers().end(); ++pb) {
2247 outcoming_bl.append((char*)pb->c_str(), pb->length());
2248 }
2249 } else {
2250 outcoming_bl.claim_append(bl);
2251 }
2252
2253 // send footer; if receiver doesn't support signatures, use the old footer format
2254 ceph_msg_footer_old old_footer;
2255 if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
2256 outcoming_bl.append((char*)&footer, sizeof(footer));
2257 } else {
2258 if (msgr->crcflags & MSG_CRC_HEADER) {
2259 old_footer.front_crc = footer.front_crc;
2260 old_footer.middle_crc = footer.middle_crc;
2261 old_footer.data_crc = footer.data_crc;
2262 } else {
2263 old_footer.front_crc = old_footer.middle_crc = 0;
2264 }
2265 old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2266 old_footer.flags = footer.flags;
2267 outcoming_bl.append((char*)&old_footer, sizeof(old_footer));
2268 }
2269
2270 m->trace.event("async writing message");
2271 logger->inc(l_msgr_send_bytes, outcoming_bl.length() - original_bl_len);
2272 ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
2273 << " " << m << dendl;
2274 ssize_t rc = _try_send(more);
2275 if (rc < 0) {
2276 ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
2277 << cpp_strerror(rc) << dendl;
2278 } else if (rc == 0) {
2279 ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
2280 } else {
2281 ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
2282 }
2283 if (m->get_type() == CEPH_MSG_OSD_OP)
2284 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
2285 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
2286 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
2287 m->put();
2288
2289 return rc;
2290}
2291
2292void AsyncConnection::reset_recv_state()
2293{
2294 // clean up state internal variables and states
2295 if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
2296 state <= STATE_CONNECTING_READY) {
2297 delete authorizer;
2298 authorizer = NULL;
2299 got_bad_auth = false;
2300 }
2301
2302 if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE &&
2303 state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
2304 && policy.throttler_messages) {
2305 ldout(async_msgr->cct, 10) << __func__ << " releasing " << 1
2306 << " message to policy throttler "
2307 << policy.throttler_messages->get_current() << "/"
2308 << policy.throttler_messages->get_max() << dendl;
2309 policy.throttler_messages->put();
2310 }
2311 if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES &&
2312 state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
2313 if (policy.throttler_bytes) {
2314 ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
2315 << " bytes to policy throttler "
2316 << policy.throttler_bytes->get_current() << "/"
2317 << policy.throttler_bytes->get_max() << dendl;
2318 policy.throttler_bytes->put(cur_msg_size);
2319 }
2320 }
2321 if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
2322 state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
2323 ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
2324 << " bytes to dispatch_queue throttler "
2325 << dispatch_queue->dispatch_throttler.get_current() << "/"
2326 << dispatch_queue->dispatch_throttler.get_max() << dendl;
2327 dispatch_queue->dispatch_throttle_release(cur_msg_size);
2328 }
2329}
2330
2331void AsyncConnection::handle_ack(uint64_t seq)
2332{
2333 ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
2334 // trim sent list
2335 std::lock_guard<std::mutex> l(write_lock);
2336 while (!sent.empty() && sent.front()->get_seq() <= seq) {
2337 Message* m = sent.front();
2338 sent.pop_front();
2339 ldout(async_msgr->cct, 10) << __func__ << " got ack seq "
2340 << seq << " >= " << m->get_seq() << " on "
2341 << m << " " << *m << dendl;
2342 m->put();
2343 }
2344}
2345
2346void AsyncConnection::DelayedDelivery::do_request(int id)
2347{
2348 Message *m = nullptr;
2349 {
2350 std::lock_guard<std::mutex> l(delay_lock);
2351 register_time_events.erase(id);
2352 if (stop_dispatch)
2353 return ;
2354 if (delay_queue.empty())
2355 return ;
2356 utime_t release = delay_queue.front().first;
2357 m = delay_queue.front().second;
2358 string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type;
2359 utime_t now = ceph_clock_now();
2360 if ((release > now &&
2361 (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
2362 utime_t t = release - now;
2363 t.sleep();
2364 }
2365 delay_queue.pop_front();
2366 }
2367 if (msgr->ms_can_fast_dispatch(m)) {
2368 dispatch_queue->fast_dispatch(m);
2369 } else {
2370 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
2371 }
2372}
2373
2374void AsyncConnection::DelayedDelivery::flush() {
2375 stop_dispatch = true;
2376 center->submit_to(
2377 center->get_id(), [this] () mutable {
2378 std::lock_guard<std::mutex> l(delay_lock);
2379 while (!delay_queue.empty()) {
2380 Message *m = delay_queue.front().second;
2381 if (msgr->ms_can_fast_dispatch(m)) {
2382 dispatch_queue->fast_dispatch(m);
2383 } else {
2384 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
2385 }
2386 delay_queue.pop_front();
2387 }
2388 for (auto i : register_time_events)
2389 center->delete_time_event(i);
2390 register_time_events.clear();
2391 stop_dispatch = false;
2392 }, true);
2393}
2394
2395void AsyncConnection::send_keepalive()
2396{
2397 ldout(async_msgr->cct, 10) << __func__ << dendl;
2398 std::lock_guard<std::mutex> l(write_lock);
2399 if (can_write != WriteStatus::CLOSED) {
2400 keepalive = true;
2401 center->dispatch_event_external(write_handler);
2402 }
2403}
2404
2405void AsyncConnection::mark_down()
2406{
2407 ldout(async_msgr->cct, 1) << __func__ << dendl;
2408 std::lock_guard<std::mutex> l(lock);
2409 _stop();
2410}
2411
2412void AsyncConnection::_append_keepalive_or_ack(bool ack, utime_t *tp)
2413{
2414 ldout(async_msgr->cct, 10) << __func__ << dendl;
2415 if (ack) {
2416 assert(tp);
2417 struct ceph_timespec ts;
2418 tp->encode_timeval(&ts);
2419 outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
2420 outcoming_bl.append((char*)&ts, sizeof(ts));
2421 } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
2422 struct ceph_timespec ts;
2423 utime_t t = ceph_clock_now();
2424 t.encode_timeval(&ts);
2425 outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
2426 outcoming_bl.append((char*)&ts, sizeof(ts));
2427 } else {
2428 outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
2429 }
2430}
2431
2432void AsyncConnection::handle_write()
2433{
2434 ldout(async_msgr->cct, 10) << __func__ << dendl;
2435 ssize_t r = 0;
2436
2437 write_lock.lock();
2438 if (can_write == WriteStatus::CANWRITE) {
2439 if (keepalive) {
2440 _append_keepalive_or_ack();
2441 keepalive = false;
2442 }
2443
2444 while (1) {
2445 bufferlist data;
2446 Message *m = _get_next_outgoing(&data);
2447 if (!m)
2448 break;
2449
2450 // send_message or requeue messages may not encode message
2451 if (!data.length())
2452 prepare_send_message(get_features(), m, data);
2453
2454 r = write_message(m, data, _has_next_outgoing());
2455 if (r < 0) {
2456 ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
2457 write_lock.unlock();
2458 goto fail;
2459 } else if (r > 0) {
2460 break;
2461 }
2462 }
2463
2464 uint64_t left = ack_left.read();
2465 if (left) {
2466 ceph_le64 s;
2467 s = in_seq.read();
2468 outcoming_bl.append(CEPH_MSGR_TAG_ACK);
2469 outcoming_bl.append((char*)&s, sizeof(s));
2470 ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
2471 ack_left.sub(left);
2472 left = ack_left.read();
2473 r = _try_send(left);
2474 } else if (is_queued()) {
2475 r = _try_send();
2476 }
2477
2478 write_lock.unlock();
2479 if (r < 0) {
2480 ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
2481 goto fail;
2482 }
2483 } else {
2484 write_lock.unlock();
2485 lock.lock();
2486 write_lock.lock();
2487 if (state == STATE_STANDBY && !policy.server && is_queued()) {
2488 ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
2489 _connect();
2490 } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
2491 r = _try_send();
2492 if (r < 0) {
2493 ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
2494 write_lock.unlock();
2495 fault();
2496 lock.unlock();
2497 return ;
2498 }
2499 }
2500 write_lock.unlock();
2501 lock.unlock();
2502 }
2503
2504 return ;
2505
2506 fail:
2507 lock.lock();
2508 fault();
2509 lock.unlock();
2510}
2511
2512void AsyncConnection::wakeup_from(uint64_t id)
2513{
2514 lock.lock();
2515 register_time_events.erase(id);
2516 lock.unlock();
2517 process();
2518}
2519
2520void AsyncConnection::tick(uint64_t id)
2521{
2522 auto now = ceph::coarse_mono_clock::now();
2523 ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
2524 << " last_active" << last_active << dendl;
2525 assert(last_tick_id == id);
2526 std::lock_guard<std::mutex> l(lock);
2527 last_tick_id = 0;
2528 auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
2529 if (inactive_timeout_us < (uint64_t)idle_period) {
2530 ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
2531 << inactive_timeout_us
2532 << " us, mark self fault." << dendl;
2533 fault();
2534 } else if (is_connected()) {
2535 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
2536 }
2537}