]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/AsyncConnection.cc
buildsys: use download.ceph.com to download source tar ball
[ceph.git] / ceph / src / msg / async / AsyncConnection.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <unistd.h>
18
19#include "include/Context.h"
11fdf7f2 20#include "include/random.h"
7c673cae
FG
21#include "common/errno.h"
22#include "AsyncMessenger.h"
23#include "AsyncConnection.h"
24
11fdf7f2
TL
25#include "ProtocolV1.h"
26#include "ProtocolV2.h"
27
7c673cae
FG
28#include "messages/MOSDOp.h"
29#include "messages/MOSDOpReply.h"
30#include "common/EventTrace.h"
31
32// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
11fdf7f2 33#define SEQ_MASK 0x7fffffff
7c673cae
FG
34
35#define dout_subsys ceph_subsys_ms
36#undef dout_prefix
37#define dout_prefix _conn_prefix(_dout)
38ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
11fdf7f2
TL
39 return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
40 << *peer_addrs << " conn(" << this
41 << (msgr2 ? " msgr2=" : " legacy=")
42 << protocol.get()
43 << " " << ceph_con_mode_name(protocol->auth_meta->con_mode)
7c673cae
FG
44 << " :" << port
45 << " s=" << get_state_name(state)
7c673cae
FG
46 << " l=" << policy.lossy
47 << ").";
48}
49
50// Notes:
51// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
52
11fdf7f2 53const uint32_t AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
7c673cae
FG
54
55class C_time_wakeup : public EventCallback {
56 AsyncConnectionRef conn;
57
58 public:
59 explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {}
11fdf7f2 60 void do_request(uint64_t fd_or_id) override {
7c673cae
FG
61 conn->wakeup_from(fd_or_id);
62 }
63};
64
65class C_handle_read : public EventCallback {
66 AsyncConnectionRef conn;
67
68 public:
69 explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
11fdf7f2 70 void do_request(uint64_t fd_or_id) override {
7c673cae
FG
71 conn->process();
72 }
73};
74
75class C_handle_write : public EventCallback {
76 AsyncConnectionRef conn;
77
78 public:
79 explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
11fdf7f2 80 void do_request(uint64_t fd) override {
7c673cae
FG
81 conn->handle_write();
82 }
83};
84
11fdf7f2
TL
85class C_handle_write_callback : public EventCallback {
86 AsyncConnectionRef conn;
87
88public:
89 explicit C_handle_write_callback(AsyncConnectionRef c) : conn(c) {}
90 void do_request(uint64_t fd) override { conn->handle_write_callback(); }
91};
92
7c673cae
FG
93class C_clean_handler : public EventCallback {
94 AsyncConnectionRef conn;
95 public:
96 explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
11fdf7f2 97 void do_request(uint64_t id) override {
7c673cae
FG
98 conn->cleanup();
99 delete this;
100 }
101};
102
103class C_tick_wakeup : public EventCallback {
104 AsyncConnectionRef conn;
105
106 public:
107 explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
11fdf7f2 108 void do_request(uint64_t fd_or_id) override {
7c673cae
FG
109 conn->tick(fd_or_id);
110 }
111};
112
7c673cae
FG
113
114AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
11fdf7f2 115 Worker *w, bool m2, bool local)
7c673cae 116 : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
11fdf7f2
TL
117 logger(w->get_perf_counter()),
118 state(STATE_NONE), port(-1),
119 dispatch_queue(q), recv_buf(NULL),
120 recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
7c673cae
FG
121 recv_start(0), recv_end(0),
122 last_active(ceph::coarse_mono_clock::now()),
123 inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
11fdf7f2
TL
124 msgr2(m2), state_offset(0),
125 worker(w), center(&w->center),read_buffer(nullptr)
7c673cae 126{
11fdf7f2
TL
127#ifdef UNIT_TESTS_BUILT
128 this->interceptor = m->interceptor;
129#endif
7c673cae
FG
130 read_handler = new C_handle_read(this);
131 write_handler = new C_handle_write(this);
11fdf7f2 132 write_callback_handler = new C_handle_write_callback(this);
7c673cae
FG
133 wakeup_handler = new C_time_wakeup(this);
134 tick_handler = new C_tick_wakeup(this);
7c673cae
FG
135 // double recv_max_prefetch see "read_until"
136 recv_buf = new char[2*recv_max_prefetch];
11fdf7f2
TL
137 if (local) {
138 protocol = std::unique_ptr<Protocol>(new LoopbackProtocolV1(this));
139 } else if (m2) {
140 protocol = std::unique_ptr<Protocol>(new ProtocolV2(this));
141 } else {
142 protocol = std::unique_ptr<Protocol>(new ProtocolV1(this));
143 }
7c673cae
FG
144 logger->inc(l_msgr_created_connections);
145}
146
147AsyncConnection::~AsyncConnection()
148{
7c673cae
FG
149 if (recv_buf)
150 delete[] recv_buf;
11fdf7f2 151 ceph_assert(!delay_state);
7c673cae
FG
152}
153
11fdf7f2
TL
154int AsyncConnection::get_con_mode() const {
155 return protocol->get_con_mode();
7c673cae
FG
156}
157
11fdf7f2 158void AsyncConnection::maybe_start_delay_thread()
7c673cae 159{
11fdf7f2
TL
160 if (!delay_state) {
161 async_msgr->cct->_conf.with_val<std::string>(
162 "ms_inject_delay_type",
163 [this](const string& s) {
164 if (s.find(ceph_entity_type_name(peer_type)) != string::npos) {
165 ldout(msgr->cct, 1) << __func__ << " setting up a delay queue"
166 << dendl;
167 delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue,
168 conn_id);
169 }
170 });
7c673cae 171 }
7c673cae
FG
172}
173
7c673cae 174
11fdf7f2
TL
175ssize_t AsyncConnection::read(unsigned len, char *buffer,
176 std::function<void(char *, ssize_t)> callback) {
177 ldout(async_msgr->cct, 20) << __func__
178 << (pendingReadLen ? " continue" : " start")
179 << " len=" << len << dendl;
180 ssize_t r = read_until(len, buffer);
181 if (r > 0) {
182 readCallback = callback;
183 pendingReadLen = len;
184 read_buffer = buffer;
7c673cae 185 }
11fdf7f2 186 return r;
7c673cae
FG
187}
188
189// Because this func will be called multi times to populate
190// the needed buffer, so the passed in bufferptr must be the same.
191// Normally, only "read_message" will pass existing bufferptr in
192//
193// And it will uses readahead method to reduce small read overhead,
194// "recv_buf" is used to store read buffer
195//
196// return the remaining bytes, 0 means this buffer is finished
197// else return < 0 means error
198ssize_t AsyncConnection::read_until(unsigned len, char *p)
199{
200 ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
201 << state_offset << dendl;
202
203 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
204 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
205 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
206 cs.shutdown();
207 }
208 }
209
210 ssize_t r = 0;
211 uint64_t left = len - state_offset;
212 if (recv_end > recv_start) {
11fdf7f2 213 uint64_t to_read = std::min<uint64_t>(recv_end - recv_start, left);
7c673cae
FG
214 memcpy(p, recv_buf+recv_start, to_read);
215 recv_start += to_read;
216 left -= to_read;
217 ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer "
218 << " left is " << left << " buffer still has "
219 << recv_end - recv_start << dendl;
220 if (left == 0) {
221 return 0;
222 }
223 state_offset += to_read;
224 }
225
226 recv_end = recv_start = 0;
227 /* nothing left in the prefetch buffer */
11fdf7f2 228 if (left > (uint64_t)recv_max_prefetch) {
7c673cae
FG
229 /* this was a large read, we don't prefetch for these */
230 do {
231 r = read_bulk(p+state_offset, left);
232 ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
233 if (r < 0) {
234 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
235 return -1;
236 } else if (r == static_cast<int>(left)) {
237 state_offset = 0;
238 return 0;
239 }
240 state_offset += r;
241 left -= r;
242 } while (r > 0);
243 } else {
244 do {
245 r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
246 ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
247 << " left is " << left << " got " << r << dendl;
248 if (r < 0) {
249 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
250 return -1;
251 }
252 recv_end += r;
253 if (r >= static_cast<int>(left)) {
254 recv_start = len - state_offset;
255 memcpy(p+state_offset, recv_buf, recv_start);
256 state_offset = 0;
257 return 0;
258 }
259 left -= r;
260 } while (r > 0);
261 memcpy(p+state_offset, recv_buf, recv_end-recv_start);
262 state_offset += (recv_end - recv_start);
263 recv_end = recv_start = 0;
264 }
265 ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining "
266 << len - state_offset << " bytes" << dendl;
267 return len - state_offset;
268}
269
11fdf7f2
TL
270/* return -1 means `fd` occurs error or closed, it should be closed
271 * return 0 means EAGAIN or EINTR */
272ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
7c673cae 273{
11fdf7f2
TL
274 ssize_t nread;
275 again:
276 nread = cs.read(buf, len);
277 if (nread < 0) {
278 if (nread == -EAGAIN) {
279 nread = 0;
280 } else if (nread == -EINTR) {
281 goto again;
282 } else {
283 ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
284 << " : "<< strerror(nread) << dendl;
285 return -1;
7c673cae 286 }
11fdf7f2
TL
287 } else if (nread == 0) {
288 ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
289 << cs.fd() << dendl;
290 return -1;
7c673cae 291 }
11fdf7f2 292 return nread;
7c673cae
FG
293}
294
11fdf7f2
TL
295ssize_t AsyncConnection::write(bufferlist &bl,
296 std::function<void(ssize_t)> callback,
297 bool more) {
7c673cae 298
11fdf7f2
TL
299 std::unique_lock<std::mutex> l(write_lock);
300 outcoming_bl.claim_append(bl);
301 ssize_t r = _try_send(more);
302 if (r > 0) {
303 writeCallback = callback;
304 }
305 return r;
7c673cae
FG
306}
307
11fdf7f2
TL
308// return the remaining bytes, it may larger than the length of ptr
309// else return < 0 means error
310ssize_t AsyncConnection::_try_send(bool more)
7c673cae 311{
11fdf7f2
TL
312 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
313 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
314 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
315 cs.shutdown();
7c673cae
FG
316 }
317 }
28e407b8 318
11fdf7f2
TL
319 ceph_assert(center->in_thread());
320 ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outcoming_bl.length()
321 << " bytes" << dendl;
322 ssize_t r = cs.send(outcoming_bl, more);
323 if (r < 0) {
324 ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
325 return r;
7c673cae
FG
326 }
327
11fdf7f2
TL
328 ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
329 << " remaining bytes " << outcoming_bl.length() << dendl;
7c673cae 330
11fdf7f2
TL
331 if (!open_write && is_queued()) {
332 center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
333 open_write = true;
7c673cae
FG
334 }
335
11fdf7f2
TL
336 if (open_write && !is_queued()) {
337 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
338 open_write = false;
339 if (writeCallback) {
340 center->dispatch_event_external(write_callback_handler);
341 }
7c673cae
FG
342 }
343
11fdf7f2
TL
344 return outcoming_bl.length();
345}
31f18b77 346
11fdf7f2
TL
347void AsyncConnection::inject_delay() {
348 if (async_msgr->cct->_conf->ms_inject_internal_delays) {
349 ldout(async_msgr->cct, 10) << __func__ << " sleep for " <<
350 async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
351 utime_t t;
352 t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
353 t.sleep();
354 }
355}
7c673cae 356
11fdf7f2
TL
357void AsyncConnection::process() {
358 std::lock_guard<std::mutex> l(lock);
359 last_active = ceph::coarse_mono_clock::now();
360 recv_start_time = ceph::mono_clock::now();
7c673cae 361
11fdf7f2 362 ldout(async_msgr->cct, 20) << __func__ << dendl;
7c673cae 363
11fdf7f2
TL
364 switch (state) {
365 case STATE_NONE: {
366 ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
367 return;
7c673cae 368 }
11fdf7f2
TL
369 case STATE_CLOSED: {
370 ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
371 return;
7c673cae 372 }
11fdf7f2
TL
373 case STATE_CONNECTING: {
374 ceph_assert(!policy.server);
7c673cae 375
11fdf7f2
TL
376 if (cs) {
377 center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
378 cs.close();
7c673cae
FG
379 }
380
11fdf7f2
TL
381 SocketOptions opts;
382 opts.priority = async_msgr->get_socket_priority();
383 opts.connect_bind_addr = msgr->get_myaddrs().front();
384 ssize_t r = worker->connect(target_addr, opts, &cs);
385 if (r < 0) {
386 protocol->fault();
387 return;
7c673cae 388 }
7c673cae 389
11fdf7f2
TL
390 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
391 state = STATE_CONNECTING_RE;
7c673cae 392 }
11fdf7f2
TL
393 case STATE_CONNECTING_RE: {
394 ssize_t r = cs.is_connected();
395 if (r < 0) {
396 ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to "
397 << target_addr << dendl;
398 if (r == -ECONNREFUSED) {
399 ldout(async_msgr->cct, 2)
400 << __func__ << " connection refused!" << dendl;
401 dispatch_queue->queue_refused(this);
402 }
403 protocol->fault();
404 return;
405 } else if (r == 0) {
406 ldout(async_msgr->cct, 10)
407 << __func__ << " nonblock connect inprogress" << dendl;
408 if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) {
409 center->create_file_event(cs.fd(), EVENT_WRITABLE,
410 read_handler);
411 }
412 logger->tinc(l_msgr_running_recv_time,
413 ceph::mono_clock::now() - recv_start_time);
414 return;
415 }
7c673cae 416
11fdf7f2
TL
417 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
418 ldout(async_msgr->cct, 10)
419 << __func__ << " connect successfully, ready to send banner" << dendl;
420 state = STATE_CONNECTION_ESTABLISHED;
421 break;
7c673cae
FG
422 }
423
11fdf7f2
TL
424 case STATE_ACCEPTING: {
425 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
426 state = STATE_CONNECTION_ESTABLISHED;
7c673cae 427
11fdf7f2 428 break;
7c673cae 429 }
7c673cae 430
11fdf7f2
TL
431 case STATE_CONNECTION_ESTABLISHED: {
432 if (pendingReadLen) {
433 ssize_t r = read(*pendingReadLen, read_buffer, readCallback);
434 if (r <= 0) { // read all bytes, or an error occured
435 pendingReadLen.reset();
436 char *buf_tmp = read_buffer;
437 read_buffer = nullptr;
438 readCallback(buf_tmp, r);
7c673cae 439 }
11fdf7f2
TL
440 return;
441 }
442 break;
443 }
7c673cae
FG
444 }
445
11fdf7f2 446 protocol->read_event();
7c673cae 447
11fdf7f2
TL
448 logger->tinc(l_msgr_running_recv_time,
449 ceph::mono_clock::now() - recv_start_time);
450}
7c673cae 451
11fdf7f2
TL
452bool AsyncConnection::is_connected() {
453 return protocol->is_connected();
454}
7c673cae 455
11fdf7f2
TL
456void AsyncConnection::connect(const entity_addrvec_t &addrs, int type,
457 entity_addr_t &target) {
7c673cae 458
11fdf7f2
TL
459 std::lock_guard<std::mutex> l(lock);
460 set_peer_type(type);
461 set_peer_addrs(addrs);
462 policy = msgr->get_policy(type);
463 target_addr = target;
464 _connect();
7c673cae
FG
465}
466
467void AsyncConnection::_connect()
468{
11fdf7f2 469 ldout(async_msgr->cct, 10) << __func__ << dendl;
7c673cae
FG
470
471 state = STATE_CONNECTING;
11fdf7f2 472 protocol->connect();
7c673cae
FG
473 // rescheduler connection in order to avoid lock dep
474 // may called by external thread(send_message)
475 center->dispatch_event_external(read_handler);
476}
477
11fdf7f2
TL
478void AsyncConnection::accept(ConnectedSocket socket,
479 const entity_addr_t &listen_addr,
480 const entity_addr_t &peer_addr)
7c673cae 481{
11fdf7f2
TL
482 ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
483 << " listen_addr " << listen_addr
484 << " peer_addr " << peer_addr << dendl;
485 ceph_assert(socket.fd() >= 0);
7c673cae
FG
486
487 std::lock_guard<std::mutex> l(lock);
488 cs = std::move(socket);
11fdf7f2
TL
489 socket_addr = listen_addr;
490 target_addr = peer_addr; // until we know better
7c673cae 491 state = STATE_ACCEPTING;
11fdf7f2 492 protocol->accept();
7c673cae
FG
493 // rescheduler connection in order to avoid lock dep
494 center->dispatch_event_external(read_handler);
495}
496
497int AsyncConnection::send_message(Message *m)
498{
11fdf7f2 499 FUNCTRACE(async_msgr->cct);
7c673cae 500 lgeneric_subdout(async_msgr->cct, ms,
11fdf7f2
TL
501 1) << "-- " << async_msgr->get_myaddrs() << " --> "
502 << get_peer_addrs() << " -- "
7c673cae 503 << *m << " -- " << m << " con "
11fdf7f2 504 << this
7c673cae
FG
505 << dendl;
506
507 // optimistic think it's ok to encode(actually may broken now)
508 if (!m->get_priority())
509 m->set_priority(async_msgr->get_default_send_priority());
510
511 m->get_header().src = async_msgr->get_myname();
512 m->set_connection(this);
513
514 if (m->get_type() == CEPH_MSG_OSD_OP)
515 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
516 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
517 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
518
11fdf7f2 519 if (async_msgr->get_myaddrs() == get_peer_addrs()) { //loopback connection
7c673cae
FG
520 ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
521 std::lock_guard<std::mutex> l(write_lock);
11fdf7f2 522 if (protocol->is_connected()) {
7c673cae
FG
523 dispatch_queue->local_delivery(m, m->get_priority());
524 } else {
525 ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
526 << " Drop message " << m << dendl;
527 m->put();
528 }
529 return 0;
530 }
531
7c673cae
FG
532 // we don't want to consider local message here, it's too lightweight which
533 // may disturb users
534 logger->inc(l_msgr_send_messages);
535
11fdf7f2 536 protocol->send_message(m);
7c673cae
FG
537 return 0;
538}
539
11fdf7f2 540entity_addr_t AsyncConnection::_infer_target_addr(const entity_addrvec_t& av)
7c673cae 541{
11fdf7f2
TL
542 // pick the first addr of the same address family as socket_addr. it could be
543 // an any: or v2: addr, we don't care. it should not be a v1 addr.
544 for (auto& i : av.v) {
545 if (i.is_legacy()) {
546 continue;
547 }
548 if (i.get_family() == socket_addr.get_family()) {
549 ldout(async_msgr->cct,10) << __func__ << " " << av << " -> " << i << dendl;
550 return i;
7c673cae 551 }
7c673cae 552 }
11fdf7f2
TL
553 ldout(async_msgr->cct,10) << __func__ << " " << av << " -> nothing to match "
554 << socket_addr << dendl;
555 return {};
7c673cae
FG
556}
557
558void AsyncConnection::fault()
559{
7c673cae
FG
560 shutdown_socket();
561 open_write = false;
562
563 // queue delayed items immediately
564 if (delay_state)
565 delay_state->flush();
11fdf7f2 566
7c673cae
FG
567 recv_start = recv_end = 0;
568 state_offset = 0;
7c673cae 569 outcoming_bl.clear();
7c673cae
FG
570}
571
11fdf7f2
TL
572void AsyncConnection::_stop() {
573 writeCallback.reset();
7c673cae 574 dispatch_queue->discard_queue(conn_id);
7c673cae
FG
575 async_msgr->unregister_conn(this);
576 worker->release_worker();
577
578 state = STATE_CLOSED;
579 open_write = false;
11fdf7f2 580
7c673cae
FG
581 state_offset = 0;
582 // Make sure in-queue events will been processed
583 center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
584}
585
11fdf7f2
TL
586bool AsyncConnection::is_queued() const {
587 return outcoming_bl.length();
7c673cae
FG
588}
589
11fdf7f2
TL
590void AsyncConnection::shutdown_socket() {
591 for (auto &&t : register_time_events) center->delete_time_event(t);
592 register_time_events.clear();
593 if (last_tick_id) {
594 center->delete_time_event(last_tick_id);
595 last_tick_id = 0;
7c673cae 596 }
11fdf7f2
TL
597 if (cs) {
598 center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
599 cs.shutdown();
600 cs.close();
7c673cae
FG
601 }
602}
603
11fdf7f2 604void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
7c673cae
FG
605{
606 Message *m = nullptr;
607 {
608 std::lock_guard<std::mutex> l(delay_lock);
609 register_time_events.erase(id);
610 if (stop_dispatch)
611 return ;
612 if (delay_queue.empty())
613 return ;
11fdf7f2 614 m = delay_queue.front();
7c673cae
FG
615 delay_queue.pop_front();
616 }
617 if (msgr->ms_can_fast_dispatch(m)) {
618 dispatch_queue->fast_dispatch(m);
619 } else {
620 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
621 }
622}
623
11fdf7f2
TL
624void AsyncConnection::DelayedDelivery::discard() {
625 stop_dispatch = true;
626 center->submit_to(center->get_id(),
627 [this]() mutable {
628 std::lock_guard<std::mutex> l(delay_lock);
629 while (!delay_queue.empty()) {
630 Message *m = delay_queue.front();
631 dispatch_queue->dispatch_throttle_release(
632 m->get_dispatch_throttle_size());
633 m->put();
634 delay_queue.pop_front();
635 }
636 for (auto i : register_time_events)
637 center->delete_time_event(i);
638 register_time_events.clear();
639 stop_dispatch = false;
640 },
641 true);
642}
643
7c673cae
FG
644void AsyncConnection::DelayedDelivery::flush() {
645 stop_dispatch = true;
646 center->submit_to(
647 center->get_id(), [this] () mutable {
648 std::lock_guard<std::mutex> l(delay_lock);
649 while (!delay_queue.empty()) {
11fdf7f2 650 Message *m = delay_queue.front();
7c673cae
FG
651 if (msgr->ms_can_fast_dispatch(m)) {
652 dispatch_queue->fast_dispatch(m);
653 } else {
654 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
655 }
656 delay_queue.pop_front();
657 }
658 for (auto i : register_time_events)
659 center->delete_time_event(i);
660 register_time_events.clear();
661 stop_dispatch = false;
662 }, true);
663}
664
665void AsyncConnection::send_keepalive()
666{
11fdf7f2 667 protocol->send_keepalive();
7c673cae
FG
668}
669
670void AsyncConnection::mark_down()
671{
672 ldout(async_msgr->cct, 1) << __func__ << dendl;
673 std::lock_guard<std::mutex> l(lock);
11fdf7f2 674 protocol->stop();
7c673cae
FG
675}
676
677void AsyncConnection::handle_write()
678{
679 ldout(async_msgr->cct, 10) << __func__ << dendl;
11fdf7f2
TL
680 protocol->write_event();
681}
7c673cae 682
11fdf7f2
TL
683void AsyncConnection::handle_write_callback() {
684 std::lock_guard<std::mutex> l(lock);
685 last_active = ceph::coarse_mono_clock::now();
686 recv_start_time = ceph::mono_clock::now();
7c673cae 687 write_lock.lock();
11fdf7f2
TL
688 if (writeCallback) {
689 auto callback = *writeCallback;
690 writeCallback.reset();
31f18b77 691 write_lock.unlock();
11fdf7f2
TL
692 callback(0);
693 return;
7c673cae 694 }
11fdf7f2
TL
695 write_lock.unlock();
696}
7c673cae 697
11fdf7f2 698void AsyncConnection::stop(bool queue_reset) {
7c673cae 699 lock.lock();
11fdf7f2
TL
700 bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
701 protocol->stop();
7c673cae 702 lock.unlock();
11fdf7f2
TL
703 if (need_queue_reset) dispatch_queue->queue_reset(this);
704}
705
706void AsyncConnection::cleanup() {
707 shutdown_socket();
708 delete read_handler;
709 delete write_handler;
710 delete write_callback_handler;
711 delete wakeup_handler;
712 delete tick_handler;
713 if (delay_state) {
714 delete delay_state;
715 delay_state = NULL;
716 }
7c673cae
FG
717}
718
719void AsyncConnection::wakeup_from(uint64_t id)
720{
721 lock.lock();
722 register_time_events.erase(id);
723 lock.unlock();
724 process();
725}
726
727void AsyncConnection::tick(uint64_t id)
728{
729 auto now = ceph::coarse_mono_clock::now();
730 ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
11fdf7f2 731 << " last_active=" << last_active << dendl;
7c673cae
FG
732 std::lock_guard<std::mutex> l(lock);
733 last_tick_id = 0;
734 auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
735 if (inactive_timeout_us < (uint64_t)idle_period) {
736 ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
737 << inactive_timeout_us
738 << " us, mark self fault." << dendl;
11fdf7f2 739 protocol->fault();
7c673cae
FG
740 } else if (is_connected()) {
741 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
742 }
743}