]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/AsyncConnection.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / msg / async / AsyncConnection.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <unistd.h>
18
19#include "include/Context.h"
11fdf7f2 20#include "include/random.h"
7c673cae
FG
21#include "common/errno.h"
22#include "AsyncMessenger.h"
23#include "AsyncConnection.h"
24
11fdf7f2
TL
25#include "ProtocolV1.h"
26#include "ProtocolV2.h"
27
7c673cae
FG
28#include "messages/MOSDOp.h"
29#include "messages/MOSDOpReply.h"
30#include "common/EventTrace.h"
31
32// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
11fdf7f2 33#define SEQ_MASK 0x7fffffff
7c673cae
FG
34
35#define dout_subsys ceph_subsys_ms
36#undef dout_prefix
37#define dout_prefix _conn_prefix(_dout)
f67539c2 38std::ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
11fdf7f2
TL
39 return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
40 << *peer_addrs << " conn(" << this
41 << (msgr2 ? " msgr2=" : " legacy=")
42 << protocol.get()
43 << " " << ceph_con_mode_name(protocol->auth_meta->con_mode)
7c673cae
FG
44 << " :" << port
45 << " s=" << get_state_name(state)
7c673cae
FG
46 << " l=" << policy.lossy
47 << ").";
48}
49
50// Notes:
51// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
52
11fdf7f2 53const uint32_t AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
7c673cae
FG
54
55class C_time_wakeup : public EventCallback {
56 AsyncConnectionRef conn;
57
58 public:
59 explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {}
11fdf7f2 60 void do_request(uint64_t fd_or_id) override {
7c673cae
FG
61 conn->wakeup_from(fd_or_id);
62 }
63};
64
65class C_handle_read : public EventCallback {
66 AsyncConnectionRef conn;
67
68 public:
69 explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
11fdf7f2 70 void do_request(uint64_t fd_or_id) override {
7c673cae
FG
71 conn->process();
72 }
73};
74
75class C_handle_write : public EventCallback {
76 AsyncConnectionRef conn;
77
78 public:
79 explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
11fdf7f2 80 void do_request(uint64_t fd) override {
7c673cae
FG
81 conn->handle_write();
82 }
83};
84
11fdf7f2
TL
85class C_handle_write_callback : public EventCallback {
86 AsyncConnectionRef conn;
87
88public:
89 explicit C_handle_write_callback(AsyncConnectionRef c) : conn(c) {}
90 void do_request(uint64_t fd) override { conn->handle_write_callback(); }
91};
92
7c673cae
FG
93class C_clean_handler : public EventCallback {
94 AsyncConnectionRef conn;
95 public:
96 explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
11fdf7f2 97 void do_request(uint64_t id) override {
7c673cae
FG
98 conn->cleanup();
99 delete this;
100 }
101};
102
103class C_tick_wakeup : public EventCallback {
104 AsyncConnectionRef conn;
105
106 public:
107 explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
11fdf7f2 108 void do_request(uint64_t fd_or_id) override {
7c673cae
FG
109 conn->tick(fd_or_id);
110 }
111};
112
7c673cae
FG
113
114AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
11fdf7f2 115 Worker *w, bool m2, bool local)
9f95a23c
TL
116 : Connection(cct, m),
117 delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
11fdf7f2 118 logger(w->get_perf_counter()),
aee94f69 119 labeled_logger(w->get_labeled_perf_counter()),
11fdf7f2
TL
120 state(STATE_NONE), port(-1),
121 dispatch_queue(q), recv_buf(NULL),
122 recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
7c673cae
FG
123 recv_start(0), recv_end(0),
124 last_active(ceph::coarse_mono_clock::now()),
81eedcae
TL
125 connect_timeout_us(cct->_conf->ms_connection_ready_timeout*1000*1000),
126 inactive_timeout_us(cct->_conf->ms_connection_idle_timeout*1000*1000),
11fdf7f2
TL
127 msgr2(m2), state_offset(0),
128 worker(w), center(&w->center),read_buffer(nullptr)
7c673cae 129{
11fdf7f2
TL
130#ifdef UNIT_TESTS_BUILT
131 this->interceptor = m->interceptor;
132#endif
7c673cae
FG
133 read_handler = new C_handle_read(this);
134 write_handler = new C_handle_write(this);
11fdf7f2 135 write_callback_handler = new C_handle_write_callback(this);
7c673cae
FG
136 wakeup_handler = new C_time_wakeup(this);
137 tick_handler = new C_tick_wakeup(this);
7c673cae
FG
138 // double recv_max_prefetch see "read_until"
139 recv_buf = new char[2*recv_max_prefetch];
11fdf7f2
TL
140 if (local) {
141 protocol = std::unique_ptr<Protocol>(new LoopbackProtocolV1(this));
142 } else if (m2) {
143 protocol = std::unique_ptr<Protocol>(new ProtocolV2(this));
144 } else {
145 protocol = std::unique_ptr<Protocol>(new ProtocolV1(this));
146 }
7c673cae
FG
147 logger->inc(l_msgr_created_connections);
148}
149
150AsyncConnection::~AsyncConnection()
151{
7c673cae
FG
152 if (recv_buf)
153 delete[] recv_buf;
11fdf7f2 154 ceph_assert(!delay_state);
7c673cae
FG
155}
156
9f95a23c
TL
157int AsyncConnection::get_con_mode() const
158{
11fdf7f2 159 return protocol->get_con_mode();
7c673cae
FG
160}
161
9f95a23c
TL
162bool AsyncConnection::is_msgr2() const
163{
164 return protocol->proto_type == 2;
165}
166
11fdf7f2 167void AsyncConnection::maybe_start_delay_thread()
7c673cae 168{
11fdf7f2
TL
169 if (!delay_state) {
170 async_msgr->cct->_conf.with_val<std::string>(
171 "ms_inject_delay_type",
f67539c2
TL
172 [this](const std::string& s) {
173 if (s.find(ceph_entity_type_name(peer_type)) != std::string::npos) {
11fdf7f2
TL
174 ldout(msgr->cct, 1) << __func__ << " setting up a delay queue"
175 << dendl;
176 delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue,
177 conn_id);
178 }
179 });
7c673cae 180 }
7c673cae
FG
181}
182
7c673cae 183
11fdf7f2
TL
184ssize_t AsyncConnection::read(unsigned len, char *buffer,
185 std::function<void(char *, ssize_t)> callback) {
186 ldout(async_msgr->cct, 20) << __func__
187 << (pendingReadLen ? " continue" : " start")
188 << " len=" << len << dendl;
189 ssize_t r = read_until(len, buffer);
190 if (r > 0) {
191 readCallback = callback;
192 pendingReadLen = len;
193 read_buffer = buffer;
7c673cae 194 }
11fdf7f2 195 return r;
7c673cae
FG
196}
197
198// Because this func will be called multi times to populate
199// the needed buffer, so the passed in bufferptr must be the same.
200// Normally, only "read_message" will pass existing bufferptr in
201//
202// And it will uses readahead method to reduce small read overhead,
203// "recv_buf" is used to store read buffer
204//
205// return the remaining bytes, 0 means this buffer is finished
206// else return < 0 means error
207ssize_t AsyncConnection::read_until(unsigned len, char *p)
208{
209 ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
210 << state_offset << dendl;
211
212 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
213 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
214 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
215 cs.shutdown();
216 }
217 }
218
219 ssize_t r = 0;
220 uint64_t left = len - state_offset;
221 if (recv_end > recv_start) {
11fdf7f2 222 uint64_t to_read = std::min<uint64_t>(recv_end - recv_start, left);
7c673cae
FG
223 memcpy(p, recv_buf+recv_start, to_read);
224 recv_start += to_read;
225 left -= to_read;
226 ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer "
227 << " left is " << left << " buffer still has "
228 << recv_end - recv_start << dendl;
229 if (left == 0) {
1e59de90 230 state_offset = 0;
7c673cae
FG
231 return 0;
232 }
233 state_offset += to_read;
234 }
235
236 recv_end = recv_start = 0;
237 /* nothing left in the prefetch buffer */
11fdf7f2 238 if (left > (uint64_t)recv_max_prefetch) {
7c673cae
FG
239 /* this was a large read, we don't prefetch for these */
240 do {
241 r = read_bulk(p+state_offset, left);
242 ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
243 if (r < 0) {
244 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
245 return -1;
246 } else if (r == static_cast<int>(left)) {
247 state_offset = 0;
248 return 0;
249 }
250 state_offset += r;
251 left -= r;
252 } while (r > 0);
253 } else {
254 do {
255 r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
256 ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
257 << " left is " << left << " got " << r << dendl;
258 if (r < 0) {
259 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
260 return -1;
261 }
262 recv_end += r;
263 if (r >= static_cast<int>(left)) {
264 recv_start = len - state_offset;
265 memcpy(p+state_offset, recv_buf, recv_start);
266 state_offset = 0;
267 return 0;
268 }
269 left -= r;
270 } while (r > 0);
271 memcpy(p+state_offset, recv_buf, recv_end-recv_start);
272 state_offset += (recv_end - recv_start);
273 recv_end = recv_start = 0;
274 }
275 ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining "
276 << len - state_offset << " bytes" << dendl;
277 return len - state_offset;
278}
279
11fdf7f2
TL
280/* return -1 means `fd` occurs error or closed, it should be closed
281 * return 0 means EAGAIN or EINTR */
282ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
7c673cae 283{
11fdf7f2
TL
284 ssize_t nread;
285 again:
286 nread = cs.read(buf, len);
287 if (nread < 0) {
288 if (nread == -EAGAIN) {
289 nread = 0;
290 } else if (nread == -EINTR) {
291 goto again;
292 } else {
293 ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
f67539c2 294 << " : "<< nread << " " << strerror(nread) << dendl;
11fdf7f2 295 return -1;
7c673cae 296 }
11fdf7f2
TL
297 } else if (nread == 0) {
298 ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
299 << cs.fd() << dendl;
300 return -1;
7c673cae 301 }
11fdf7f2 302 return nread;
7c673cae
FG
303}
304
f67539c2 305ssize_t AsyncConnection::write(ceph::buffer::list &bl,
11fdf7f2
TL
306 std::function<void(ssize_t)> callback,
307 bool more) {
7c673cae 308
11fdf7f2 309 std::unique_lock<std::mutex> l(write_lock);
9f95a23c 310 outgoing_bl.claim_append(bl);
11fdf7f2
TL
311 ssize_t r = _try_send(more);
312 if (r > 0) {
313 writeCallback = callback;
314 }
315 return r;
7c673cae
FG
316}
317
11fdf7f2
TL
318// return the remaining bytes, it may larger than the length of ptr
319// else return < 0 means error
320ssize_t AsyncConnection::_try_send(bool more)
7c673cae 321{
11fdf7f2
TL
322 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
323 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
324 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
325 cs.shutdown();
7c673cae
FG
326 }
327 }
28e407b8 328
11fdf7f2 329 ceph_assert(center->in_thread());
9f95a23c 330 ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
11fdf7f2 331 << " bytes" << dendl;
1e59de90
TL
332 // network block would make ::send return EAGAIN, that would make here looks
333 // like do not call cs.send() and r = 0
334 ssize_t r = 0;
335 if (likely(!inject_network_congestion())) {
336 r = cs.send(outgoing_bl, more);
337 }
11fdf7f2
TL
338 if (r < 0) {
339 ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
340 return r;
7c673cae
FG
341 }
342
11fdf7f2 343 ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
9f95a23c 344 << " remaining bytes " << outgoing_bl.length() << dendl;
7c673cae 345
11fdf7f2
TL
346 if (!open_write && is_queued()) {
347 center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
348 open_write = true;
7c673cae
FG
349 }
350
11fdf7f2
TL
351 if (open_write && !is_queued()) {
352 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
353 open_write = false;
354 if (writeCallback) {
355 center->dispatch_event_external(write_callback_handler);
356 }
7c673cae
FG
357 }
358
9f95a23c 359 return outgoing_bl.length();
11fdf7f2 360}
31f18b77 361
11fdf7f2
TL
362void AsyncConnection::inject_delay() {
363 if (async_msgr->cct->_conf->ms_inject_internal_delays) {
364 ldout(async_msgr->cct, 10) << __func__ << " sleep for " <<
365 async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
366 utime_t t;
367 t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
368 t.sleep();
369 }
370}
7c673cae 371
1e59de90
TL
372bool AsyncConnection::inject_network_congestion() const {
373 return (async_msgr->cct->_conf->ms_inject_network_congestion > 0 &&
374 rand() % async_msgr->cct->_conf->ms_inject_network_congestion != 0);
375}
376
11fdf7f2
TL
377void AsyncConnection::process() {
378 std::lock_guard<std::mutex> l(lock);
379 last_active = ceph::coarse_mono_clock::now();
380 recv_start_time = ceph::mono_clock::now();
7c673cae 381
11fdf7f2 382 ldout(async_msgr->cct, 20) << __func__ << dendl;
7c673cae 383
11fdf7f2
TL
384 switch (state) {
385 case STATE_NONE: {
386 ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
387 return;
7c673cae 388 }
11fdf7f2
TL
389 case STATE_CLOSED: {
390 ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
391 return;
7c673cae 392 }
11fdf7f2
TL
393 case STATE_CONNECTING: {
394 ceph_assert(!policy.server);
7c673cae 395
81eedcae
TL
396 // clear timer (if any) since we are connecting/re-connecting
397 if (last_tick_id) {
398 center->delete_time_event(last_tick_id);
81eedcae 399 }
f67539c2
TL
400 last_connect_started = ceph::coarse_mono_clock::now();
401 last_tick_id = center->create_time_event(
402 connect_timeout_us, tick_handler);
81eedcae 403
11fdf7f2
TL
404 if (cs) {
405 center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
406 cs.close();
7c673cae
FG
407 }
408
11fdf7f2
TL
409 SocketOptions opts;
410 opts.priority = async_msgr->get_socket_priority();
1e59de90
TL
411 if (async_msgr->cct->_conf->mon_use_min_delay_socket) {
412 if (async_msgr->get_mytype() == CEPH_ENTITY_TYPE_MON &&
413 peer_is_mon()) {
414 opts.priority = SOCKET_PRIORITY_MIN_DELAY;
415 }
416 }
11fdf7f2
TL
417 opts.connect_bind_addr = msgr->get_myaddrs().front();
418 ssize_t r = worker->connect(target_addr, opts, &cs);
419 if (r < 0) {
420 protocol->fault();
421 return;
7c673cae 422 }
7c673cae 423
11fdf7f2
TL
424 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
425 state = STATE_CONNECTING_RE;
7c673cae 426 }
11fdf7f2
TL
427 case STATE_CONNECTING_RE: {
428 ssize_t r = cs.is_connected();
429 if (r < 0) {
430 ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to "
431 << target_addr << dendl;
432 if (r == -ECONNREFUSED) {
433 ldout(async_msgr->cct, 2)
434 << __func__ << " connection refused!" << dendl;
435 dispatch_queue->queue_refused(this);
436 }
437 protocol->fault();
438 return;
439 } else if (r == 0) {
440 ldout(async_msgr->cct, 10)
441 << __func__ << " nonblock connect inprogress" << dendl;
442 if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) {
443 center->create_file_event(cs.fd(), EVENT_WRITABLE,
444 read_handler);
445 }
446 logger->tinc(l_msgr_running_recv_time,
447 ceph::mono_clock::now() - recv_start_time);
448 return;
449 }
7c673cae 450
11fdf7f2
TL
451 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
452 ldout(async_msgr->cct, 10)
453 << __func__ << " connect successfully, ready to send banner" << dendl;
454 state = STATE_CONNECTION_ESTABLISHED;
455 break;
7c673cae
FG
456 }
457
11fdf7f2
TL
458 case STATE_ACCEPTING: {
459 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
460 state = STATE_CONNECTION_ESTABLISHED;
1e59de90
TL
461 if (async_msgr->cct->_conf->mon_use_min_delay_socket) {
462 if (async_msgr->get_mytype() == CEPH_ENTITY_TYPE_MON &&
463 peer_is_mon()) {
464 cs.set_priority(cs.fd(), SOCKET_PRIORITY_MIN_DELAY,
465 target_addr.get_family());
466 }
467 }
11fdf7f2 468 break;
7c673cae 469 }
7c673cae 470
11fdf7f2
TL
471 case STATE_CONNECTION_ESTABLISHED: {
472 if (pendingReadLen) {
473 ssize_t r = read(*pendingReadLen, read_buffer, readCallback);
474 if (r <= 0) { // read all bytes, or an error occured
475 pendingReadLen.reset();
476 char *buf_tmp = read_buffer;
477 read_buffer = nullptr;
478 readCallback(buf_tmp, r);
7c673cae 479 }
9f95a23c
TL
480 logger->tinc(l_msgr_running_recv_time,
481 ceph::mono_clock::now() - recv_start_time);
11fdf7f2
TL
482 return;
483 }
484 break;
485 }
7c673cae
FG
486 }
487
11fdf7f2 488 protocol->read_event();
7c673cae 489
11fdf7f2
TL
490 logger->tinc(l_msgr_running_recv_time,
491 ceph::mono_clock::now() - recv_start_time);
492}
7c673cae 493
11fdf7f2
TL
494bool AsyncConnection::is_connected() {
495 return protocol->is_connected();
496}
7c673cae 497
11fdf7f2
TL
498void AsyncConnection::connect(const entity_addrvec_t &addrs, int type,
499 entity_addr_t &target) {
7c673cae 500
11fdf7f2
TL
501 std::lock_guard<std::mutex> l(lock);
502 set_peer_type(type);
503 set_peer_addrs(addrs);
504 policy = msgr->get_policy(type);
505 target_addr = target;
506 _connect();
7c673cae
FG
507}
508
509void AsyncConnection::_connect()
510{
11fdf7f2 511 ldout(async_msgr->cct, 10) << __func__ << dendl;
7c673cae
FG
512
513 state = STATE_CONNECTING;
11fdf7f2 514 protocol->connect();
7c673cae
FG
515 // rescheduler connection in order to avoid lock dep
516 // may called by external thread(send_message)
517 center->dispatch_event_external(read_handler);
518}
519
11fdf7f2
TL
520void AsyncConnection::accept(ConnectedSocket socket,
521 const entity_addr_t &listen_addr,
522 const entity_addr_t &peer_addr)
7c673cae 523{
11fdf7f2
TL
524 ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
525 << " listen_addr " << listen_addr
526 << " peer_addr " << peer_addr << dendl;
527 ceph_assert(socket.fd() >= 0);
7c673cae
FG
528
529 std::lock_guard<std::mutex> l(lock);
530 cs = std::move(socket);
11fdf7f2
TL
531 socket_addr = listen_addr;
532 target_addr = peer_addr; // until we know better
7c673cae 533 state = STATE_ACCEPTING;
11fdf7f2 534 protocol->accept();
7c673cae
FG
535 // rescheduler connection in order to avoid lock dep
536 center->dispatch_event_external(read_handler);
537}
538
539int AsyncConnection::send_message(Message *m)
540{
11fdf7f2 541 FUNCTRACE(async_msgr->cct);
7c673cae 542 lgeneric_subdout(async_msgr->cct, ms,
11fdf7f2
TL
543 1) << "-- " << async_msgr->get_myaddrs() << " --> "
544 << get_peer_addrs() << " -- "
7c673cae 545 << *m << " -- " << m << " con "
11fdf7f2 546 << this
7c673cae
FG
547 << dendl;
548
9f95a23c
TL
549 if (is_blackhole()) {
550 lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)
551 << " blackhole " << *m << dendl;
552 m->put();
553 return 0;
554 }
555
7c673cae
FG
556 // optimistic think it's ok to encode(actually may broken now)
557 if (!m->get_priority())
558 m->set_priority(async_msgr->get_default_send_priority());
559
560 m->get_header().src = async_msgr->get_myname();
561 m->set_connection(this);
562
9f95a23c 563#if defined(WITH_EVENTTRACE)
7c673cae
FG
564 if (m->get_type() == CEPH_MSG_OSD_OP)
565 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
566 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
567 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
9f95a23c 568#endif
7c673cae 569
9f95a23c 570 if (is_loopback) { //loopback connection
7c673cae
FG
571 ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
572 std::lock_guard<std::mutex> l(write_lock);
11fdf7f2 573 if (protocol->is_connected()) {
7c673cae
FG
574 dispatch_queue->local_delivery(m, m->get_priority());
575 } else {
576 ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
577 << " Drop message " << m << dendl;
578 m->put();
579 }
580 return 0;
581 }
582
7c673cae
FG
583 // we don't want to consider local message here, it's too lightweight which
584 // may disturb users
585 logger->inc(l_msgr_send_messages);
586
11fdf7f2 587 protocol->send_message(m);
7c673cae
FG
588 return 0;
589}
590
11fdf7f2 591entity_addr_t AsyncConnection::_infer_target_addr(const entity_addrvec_t& av)
7c673cae 592{
11fdf7f2
TL
593 // pick the first addr of the same address family as socket_addr. it could be
594 // an any: or v2: addr, we don't care. it should not be a v1 addr.
595 for (auto& i : av.v) {
596 if (i.is_legacy()) {
597 continue;
598 }
599 if (i.get_family() == socket_addr.get_family()) {
600 ldout(async_msgr->cct,10) << __func__ << " " << av << " -> " << i << dendl;
601 return i;
7c673cae 602 }
7c673cae 603 }
11fdf7f2
TL
604 ldout(async_msgr->cct,10) << __func__ << " " << av << " -> nothing to match "
605 << socket_addr << dendl;
606 return {};
7c673cae
FG
607}
608
609void AsyncConnection::fault()
610{
7c673cae
FG
611 shutdown_socket();
612 open_write = false;
613
614 // queue delayed items immediately
615 if (delay_state)
616 delay_state->flush();
11fdf7f2 617
7c673cae
FG
618 recv_start = recv_end = 0;
619 state_offset = 0;
9f95a23c 620 outgoing_bl.clear();
7c673cae
FG
621}
622
11fdf7f2
TL
623void AsyncConnection::_stop() {
624 writeCallback.reset();
7c673cae 625 dispatch_queue->discard_queue(conn_id);
7c673cae
FG
626 async_msgr->unregister_conn(this);
627 worker->release_worker();
628
629 state = STATE_CLOSED;
630 open_write = false;
11fdf7f2 631
7c673cae
FG
632 state_offset = 0;
633 // Make sure in-queue events will been processed
634 center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
635}
636
11fdf7f2 637bool AsyncConnection::is_queued() const {
9f95a23c 638 return outgoing_bl.length();
7c673cae
FG
639}
640
11fdf7f2
TL
641void AsyncConnection::shutdown_socket() {
642 for (auto &&t : register_time_events) center->delete_time_event(t);
643 register_time_events.clear();
644 if (last_tick_id) {
645 center->delete_time_event(last_tick_id);
646 last_tick_id = 0;
7c673cae 647 }
11fdf7f2
TL
648 if (cs) {
649 center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
650 cs.shutdown();
651 cs.close();
7c673cae
FG
652 }
653}
654
11fdf7f2 655void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
7c673cae
FG
656{
657 Message *m = nullptr;
658 {
659 std::lock_guard<std::mutex> l(delay_lock);
660 register_time_events.erase(id);
661 if (stop_dispatch)
662 return ;
663 if (delay_queue.empty())
664 return ;
11fdf7f2 665 m = delay_queue.front();
7c673cae
FG
666 delay_queue.pop_front();
667 }
668 if (msgr->ms_can_fast_dispatch(m)) {
669 dispatch_queue->fast_dispatch(m);
670 } else {
671 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
672 }
673}
674
11fdf7f2
TL
675void AsyncConnection::DelayedDelivery::discard() {
676 stop_dispatch = true;
677 center->submit_to(center->get_id(),
678 [this]() mutable {
679 std::lock_guard<std::mutex> l(delay_lock);
680 while (!delay_queue.empty()) {
681 Message *m = delay_queue.front();
682 dispatch_queue->dispatch_throttle_release(
683 m->get_dispatch_throttle_size());
684 m->put();
685 delay_queue.pop_front();
686 }
687 for (auto i : register_time_events)
688 center->delete_time_event(i);
689 register_time_events.clear();
690 stop_dispatch = false;
691 },
692 true);
693}
694
7c673cae
FG
695void AsyncConnection::DelayedDelivery::flush() {
696 stop_dispatch = true;
697 center->submit_to(
698 center->get_id(), [this] () mutable {
699 std::lock_guard<std::mutex> l(delay_lock);
700 while (!delay_queue.empty()) {
11fdf7f2 701 Message *m = delay_queue.front();
7c673cae
FG
702 if (msgr->ms_can_fast_dispatch(m)) {
703 dispatch_queue->fast_dispatch(m);
704 } else {
705 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
706 }
707 delay_queue.pop_front();
708 }
709 for (auto i : register_time_events)
710 center->delete_time_event(i);
711 register_time_events.clear();
712 stop_dispatch = false;
713 }, true);
714}
715
716void AsyncConnection::send_keepalive()
717{
11fdf7f2 718 protocol->send_keepalive();
7c673cae
FG
719}
720
721void AsyncConnection::mark_down()
722{
723 ldout(async_msgr->cct, 1) << __func__ << dendl;
724 std::lock_guard<std::mutex> l(lock);
11fdf7f2 725 protocol->stop();
7c673cae
FG
726}
727
728void AsyncConnection::handle_write()
729{
730 ldout(async_msgr->cct, 10) << __func__ << dendl;
11fdf7f2
TL
731 protocol->write_event();
732}
7c673cae 733
11fdf7f2
TL
734void AsyncConnection::handle_write_callback() {
735 std::lock_guard<std::mutex> l(lock);
736 last_active = ceph::coarse_mono_clock::now();
737 recv_start_time = ceph::mono_clock::now();
7c673cae 738 write_lock.lock();
11fdf7f2
TL
739 if (writeCallback) {
740 auto callback = *writeCallback;
741 writeCallback.reset();
31f18b77 742 write_lock.unlock();
11fdf7f2
TL
743 callback(0);
744 return;
7c673cae 745 }
11fdf7f2
TL
746 write_lock.unlock();
747}
7c673cae 748
11fdf7f2 749void AsyncConnection::stop(bool queue_reset) {
7c673cae 750 lock.lock();
11fdf7f2
TL
751 bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
752 protocol->stop();
7c673cae 753 lock.unlock();
11fdf7f2
TL
754 if (need_queue_reset) dispatch_queue->queue_reset(this);
755}
756
757void AsyncConnection::cleanup() {
758 shutdown_socket();
759 delete read_handler;
760 delete write_handler;
761 delete write_callback_handler;
762 delete wakeup_handler;
763 delete tick_handler;
764 if (delay_state) {
765 delete delay_state;
766 delay_state = NULL;
767 }
7c673cae
FG
768}
769
770void AsyncConnection::wakeup_from(uint64_t id)
771{
772 lock.lock();
773 register_time_events.erase(id);
774 lock.unlock();
775 process();
776}
777
778void AsyncConnection::tick(uint64_t id)
779{
780 auto now = ceph::coarse_mono_clock::now();
781 ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
11fdf7f2 782 << " last_active=" << last_active << dendl;
7c673cae
FG
783 std::lock_guard<std::mutex> l(lock);
784 last_tick_id = 0;
81eedcae
TL
785 if (!is_connected()) {
786 if (connect_timeout_us <=
787 (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>
788 (now - last_connect_started).count()) {
789 ldout(async_msgr->cct, 1) << __func__ << " see no progress in more than "
790 << connect_timeout_us
1e59de90
TL
791 << " us during connecting to "
792 << target_addr << ", fault."
81eedcae
TL
793 << dendl;
794 protocol->fault();
aee94f69 795 labeled_logger->inc(l_msgr_connection_ready_timeouts);
81eedcae
TL
796 } else {
797 last_tick_id = center->create_time_event(connect_timeout_us, tick_handler);
798 }
799 } else {
800 auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>
801 (now - last_active).count();
802 if (inactive_timeout_us < (uint64_t)idle_period) {
803 ldout(async_msgr->cct, 1) << __func__ << " idle (" << idle_period
804 << ") for more than " << inactive_timeout_us
805 << " us, fault."
806 << dendl;
807 protocol->fault();
aee94f69 808 labeled_logger->inc(l_msgr_connection_idle_timeouts);
81eedcae
TL
809 } else {
810 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
811 }
7c673cae
FG
812 }
813}