]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncMessenger.cc
update sources to v12.1.1
[ceph.git] / ceph / src / msg / async / AsyncMessenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include "acconfig.h"
18
19 #include <iostream>
20 #include <fstream>
21
22 #include "AsyncMessenger.h"
23
24 #include "common/config.h"
25 #include "common/Timer.h"
26 #include "common/errno.h"
27
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
31
32 #define dout_subsys ceph_subsys_ms
33 #undef dout_prefix
34 #define dout_prefix _prefix(_dout, this)
35 static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
36 return *_dout << "-- " << m->get_myaddr() << " ";
37 }
38
39 static ostream& _prefix(std::ostream *_dout, Processor *p) {
40 return *_dout << " Processor -- ";
41 }
42
43
44 /*******************
45 * Processor
46 */
47
48 class Processor::C_processor_accept : public EventCallback {
49 Processor *pro;
50
51 public:
52 explicit C_processor_accept(Processor *p): pro(p) {}
53 void do_request(int id) override {
54 pro->accept();
55 }
56 };
57
58 Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
59 : msgr(r), net(c), worker(w),
60 listen_handler(new C_processor_accept(this)) {}
61
62 int Processor::bind(const entity_addr_t &bind_addr,
63 const set<int>& avoid_ports,
64 entity_addr_t* bound_addr)
65 {
66 const md_config_t *conf = msgr->cct->_conf;
67 // bind to a socket
68 ldout(msgr->cct, 10) << __func__ << dendl;
69
70 int family;
71 switch (bind_addr.get_family()) {
72 case AF_INET:
73 case AF_INET6:
74 family = bind_addr.get_family();
75 break;
76
77 default:
78 // bind_addr is empty
79 family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
80 }
81
82 SocketOptions opts;
83 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
84 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
85
86 // use whatever user specified (if anything)
87 entity_addr_t listen_addr = bind_addr;
88 if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) {
89 listen_addr.set_type(entity_addr_t::TYPE_LEGACY);
90 }
91 listen_addr.set_family(family);
92
93 /* bind to port */
94 int r = -1;
95
96 for (int i = 0; i < conf->ms_bind_retry_count; i++) {
97 if (i > 0) {
98 lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
99 << conf->ms_bind_retry_delay << " seconds " << dendl;
100 sleep(conf->ms_bind_retry_delay);
101 }
102
103 if (listen_addr.get_port()) {
104 worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
105 r = worker->listen(listen_addr, opts, &listen_socket);
106 }, false);
107 if (r < 0) {
108 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
109 << ": " << cpp_strerror(r) << dendl;
110 continue;
111 }
112 } else {
113 // try a range of ports
114 for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) {
115 if (avoid_ports.count(port))
116 continue;
117
118 listen_addr.set_port(port);
119 worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
120 r = worker->listen(listen_addr, opts, &listen_socket);
121 }, false);
122 if (r == 0)
123 break;
124 }
125 if (r < 0) {
126 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
127 << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
128 << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
129 << cpp_strerror(r) << dendl;
130 listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
131 continue;
132 }
133 ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl;
134 }
135 if (r == 0)
136 break;
137 }
138 // It seems that binding completely failed, return with that exit status
139 if (r < 0) {
140 lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
141 << " attempts: " << cpp_strerror(r) << dendl;
142 return r;
143 }
144
145 ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
146 *bound_addr = listen_addr;
147 return 0;
148 }
149
150 void Processor::start()
151 {
152 ldout(msgr->cct, 1) << __func__ << dendl;
153
154 // start thread
155 if (listen_socket) {
156 worker->center.submit_to(worker->center.get_id(), [this]() {
157 worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false);
158 }
159 }
160
161 void Processor::accept()
162 {
163 ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl;
164 SocketOptions opts;
165 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
166 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
167 opts.priority = msgr->get_socket_priority();
168 while (true) {
169 entity_addr_t addr;
170 ConnectedSocket cli_socket;
171 Worker *w = worker;
172 if (!msgr->get_stack()->support_local_listen_table())
173 w = msgr->get_stack()->get_worker();
174 int r = listen_socket.accept(&cli_socket, opts, &addr, w);
175 if (r == 0) {
176 ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;
177
178 msgr->add_accept(w, std::move(cli_socket), addr);
179 continue;
180 } else {
181 if (r == -EINTR) {
182 continue;
183 } else if (r == -EAGAIN) {
184 break;
185 } else if (r == -EMFILE || r == -ENFILE) {
186 lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
187 << " errno " << r << " " << cpp_strerror(r) << dendl;
188 break;
189 } else if (r == -ECONNABORTED) {
190 ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
191 << " errno " << r << " " << cpp_strerror(r) << dendl;
192 continue;
193 } else {
194 lderr(msgr->cct) << __func__ << " no incoming connection?"
195 << " errno " << r << " " << cpp_strerror(r) << dendl;
196 break;
197 }
198 }
199 }
200 }
201
202 void Processor::stop()
203 {
204 ldout(msgr->cct,10) << __func__ << dendl;
205
206 if (listen_socket) {
207 worker->center.submit_to(worker->center.get_id(), [this]() {
208 worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
209 listen_socket.abort_accept();
210 }, false);
211 }
212 }
213
214
215 struct StackSingleton {
216 CephContext *cct;
217 std::shared_ptr<NetworkStack> stack;
218
219 StackSingleton(CephContext *c): cct(c) {}
220 void ready(std::string &type) {
221 if (!stack)
222 stack = NetworkStack::create(cct, type);
223 }
224 ~StackSingleton() {
225 stack->stop();
226 }
227 };
228
229
230 class C_handle_reap : public EventCallback {
231 AsyncMessenger *msgr;
232
233 public:
234 explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
235 void do_request(int id) override {
236 // judge whether is a time event
237 msgr->reap_dead();
238 }
239 };
240
241 /*******************
242 * AsyncMessenger
243 */
244
245 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
246 const std::string &type, string mname, uint64_t _nonce)
247 : SimplePolicyMessenger(cct, name,mname, _nonce),
248 dispatch_queue(cct, this, mname),
249 lock("AsyncMessenger::lock"),
250 nonce(_nonce), need_addr(true), did_bind(false),
251 global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
252 cluster_protocol(0), stopped(true)
253 {
254 std::string transport_type = "posix";
255 if (type.find("rdma") != std::string::npos)
256 transport_type = "rdma";
257 else if (type.find("dpdk") != std::string::npos)
258 transport_type = "dpdk";
259
260 ceph_spin_init(&global_seq_lock);
261 StackSingleton *single;
262 cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+transport_type);
263 single->ready(transport_type);
264 stack = single->stack.get();
265 stack->start();
266 local_worker = stack->get_worker();
267 local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
268 init_local_connection();
269 reap_handler = new C_handle_reap(this);
270 unsigned processor_num = 1;
271 if (stack->support_local_listen_table())
272 processor_num = stack->get_num_worker();
273 for (unsigned i = 0; i < processor_num; ++i)
274 processors.push_back(new Processor(this, stack->get_worker(i), cct));
275 }
276
277 /**
278 * Destroy the AsyncMessenger. Pretty simple since all the work is done
279 * elsewhere.
280 */
281 AsyncMessenger::~AsyncMessenger()
282 {
283 delete reap_handler;
284 assert(!did_bind); // either we didn't bind or we shut down the Processor
285 local_connection->mark_down();
286 for (auto &&p : processors)
287 delete p;
288 }
289
290 void AsyncMessenger::ready()
291 {
292 ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
293
294 stack->ready();
295 if (pending_bind) {
296 int err = bind(pending_bind_addr);
297 if (err) {
298 lderr(cct) << __func__ << " postponed bind failed" << dendl;
299 ceph_abort();
300 }
301 }
302
303 Mutex::Locker l(lock);
304 for (auto &&p : processors)
305 p->start();
306 dispatch_queue.start();
307 }
308
309 int AsyncMessenger::shutdown()
310 {
311 ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
312
313 // done! clean up.
314 for (auto &&p : processors)
315 p->stop();
316 mark_down_all();
317 // break ref cycles on the loopback connection
318 local_connection->set_priv(NULL);
319 did_bind = false;
320 lock.Lock();
321 stop_cond.Signal();
322 stopped = true;
323 lock.Unlock();
324 stack->drain();
325 return 0;
326 }
327
328
329 int AsyncMessenger::bind(const entity_addr_t &bind_addr)
330 {
331 lock.Lock();
332
333 if (!pending_bind && started) {
334 ldout(cct,10) << __func__ << " already started" << dendl;
335 lock.Unlock();
336 return -1;
337 }
338
339 ldout(cct,10) << __func__ << " bind " << bind_addr << dendl;
340
341 if (!stack->is_ready()) {
342 ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
343 pending_bind_addr = bind_addr;
344 pending_bind = true;
345 lock.Unlock();
346 return 0;
347 }
348
349 lock.Unlock();
350
351 // bind to a socket
352 set<int> avoid_ports;
353 entity_addr_t bound_addr;
354 unsigned i = 0;
355 for (auto &&p : processors) {
356 int r = p->bind(bind_addr, avoid_ports, &bound_addr);
357 if (r) {
358 // Note: this is related to local tcp listen table problem.
359 // Posix(default kernel implementation) backend shares listen table
360 // in the kernel, so all threads can use the same listen table naturally
361 // and only one thread need to bind. But other backends(like dpdk) uses local
362 // listen table, we need to bind/listen tcp port for each worker. So if the
363 // first worker failed to bind, it could be think the normal error then handle
364 // it, like port is used case. But if the first worker successfully to bind
365 // but the second worker failed, it's not expected and we need to assert
366 // here
367 assert(i == 0);
368 return r;
369 }
370 ++i;
371 }
372 _finish_bind(bind_addr, bound_addr);
373 return 0;
374 }
375
376 int AsyncMessenger::rebind(const set<int>& avoid_ports)
377 {
378 ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
379 assert(did_bind);
380
381 for (auto &&p : processors)
382 p->stop();
383 mark_down_all();
384
385 // adjust the nonce; we want our entity_addr_t to be truly unique.
386 nonce += 1000000;
387 ldout(cct, 10) << __func__ << " new nonce " << nonce
388 << " and inst " << get_myinst() << dendl;
389
390 entity_addr_t bound_addr;
391 entity_addr_t bind_addr = get_myaddr();
392 bind_addr.set_port(0);
393 set<int> new_avoid(avoid_ports);
394 new_avoid.insert(bind_addr.get_port());
395 ldout(cct, 10) << __func__ << " will try " << bind_addr
396 << " and avoid ports " << new_avoid << dendl;
397 unsigned i = 0;
398 for (auto &&p : processors) {
399 int r = p->bind(bind_addr, avoid_ports, &bound_addr);
400 if (r) {
401 assert(i == 0);
402 return r;
403 }
404 ++i;
405 }
406 _finish_bind(bind_addr, bound_addr);
407 for (auto &&p : processors) {
408 p->start();
409 }
410 return 0;
411 }
412
413 int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
414 {
415 if (!cct->_conf->ms_bind_before_connect)
416 return 0;
417 Mutex::Locker l(lock);
418 if (did_bind) {
419 assert(my_inst.addr == bind_addr);
420 return 0;
421 }
422 if (started) {
423 ldout(cct, 10) << __func__ << " already started" << dendl;
424 return -1;
425 }
426 ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
427
428 set_myaddr(bind_addr);
429 return 0;
430 }
431
432 void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
433 const entity_addr_t& listen_addr)
434 {
435 set_myaddr(bind_addr);
436 if (bind_addr != entity_addr_t())
437 learned_addr(bind_addr);
438
439 if (get_myaddr().get_port() == 0) {
440 set_myaddr(listen_addr);
441 }
442 entity_addr_t addr = get_myaddr();
443 addr.set_nonce(nonce);
444 set_myaddr(addr);
445
446 init_local_connection();
447
448 ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl;
449 did_bind = true;
450 }
451
452 int AsyncMessenger::start()
453 {
454 lock.Lock();
455 ldout(cct,1) << __func__ << " start" << dendl;
456
457 // register at least one entity, first!
458 assert(my_inst.name.type() >= 0);
459
460 assert(!started);
461 started = true;
462 stopped = false;
463
464 if (!did_bind) {
465 my_inst.addr.nonce = nonce;
466 _init_local_connection();
467 }
468
469 lock.Unlock();
470 return 0;
471 }
472
473 void AsyncMessenger::wait()
474 {
475 lock.Lock();
476 if (!started) {
477 lock.Unlock();
478 return;
479 }
480 if (!stopped)
481 stop_cond.Wait(lock);
482
483 lock.Unlock();
484
485 dispatch_queue.shutdown();
486 if (dispatch_queue.is_started()) {
487 ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
488 dispatch_queue.wait();
489 dispatch_queue.discard_local();
490 ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
491 }
492
493 // close all connections
494 shutdown_connections(false);
495 stack->drain();
496
497 ldout(cct, 10) << __func__ << ": done." << dendl;
498 ldout(cct, 1) << __func__ << " complete." << dendl;
499 started = false;
500 }
501
502 void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
503 {
504 lock.Lock();
505 AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
506 conn->accept(std::move(cli_socket), addr);
507 accepting_conns.insert(conn);
508 lock.Unlock();
509 }
510
511 AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
512 {
513 assert(lock.is_locked());
514 assert(addr != my_inst.addr);
515
516 ldout(cct, 10) << __func__ << " " << addr
517 << ", creating connection and registering" << dendl;
518
519 // create connection
520 Worker *w = stack->get_worker();
521 AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
522 conn->connect(addr, type);
523 assert(!conns.count(addr));
524 conns[addr] = conn;
525 w->get_perf_counter()->inc(l_msgr_active_connections);
526
527 return conn;
528 }
529
530 ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
531 {
532 Mutex::Locker l(lock);
533 if (my_inst.addr == dest.addr) {
534 // local
535 return local_connection;
536 }
537
538 AsyncConnectionRef conn = _lookup_conn(dest.addr);
539 if (conn) {
540 ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl;
541 } else {
542 conn = create_connect(dest.addr, dest.name.type());
543 ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl;
544 }
545
546 return conn;
547 }
548
549 ConnectionRef AsyncMessenger::get_loopback_connection()
550 {
551 return local_connection;
552 }
553
554 int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
555 {
556 FUNCTRACE();
557 assert(m);
558
559 if (m->get_type() == CEPH_MSG_OSD_OP)
560 OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
561 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
562 OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
563
564 ldout(cct, 1) << __func__ << "--> " << dest.name << " "
565 << dest.addr << " -- " << *m << " -- ?+"
566 << m->get_data().length() << " " << m << dendl;
567
568 if (dest.addr == entity_addr_t()) {
569 ldout(cct,0) << __func__ << " message " << *m
570 << " with empty dest " << dest.addr << dendl;
571 m->put();
572 return -EINVAL;
573 }
574
575 AsyncConnectionRef conn = _lookup_conn(dest.addr);
576 submit_message(m, conn, dest.addr, dest.name.type());
577 return 0;
578 }
579
580 void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
581 const entity_addr_t& dest_addr, int dest_type)
582 {
583 if (cct->_conf->ms_dump_on_send) {
584 m->encode(-1, MSG_CRC_ALL);
585 ldout(cct, 0) << __func__ << "submit_message " << *m << "\n";
586 m->get_payload().hexdump(*_dout);
587 if (m->get_data().length() > 0) {
588 *_dout << " data:\n";
589 m->get_data().hexdump(*_dout);
590 }
591 *_dout << dendl;
592 m->clear_payload();
593 }
594
595 // existing connection?
596 if (con) {
597 con->send_message(m);
598 return ;
599 }
600
601 // local?
602 if (my_inst.addr == dest_addr) {
603 // local
604 local_connection->send_message(m);
605 return ;
606 }
607
608 // remote, no existing connection.
609 const Policy& policy = get_policy(dest_type);
610 if (policy.server) {
611 ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addr
612 << ", lossy server for target type "
613 << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
614 m->put();
615 } else {
616 ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl;
617 con = create_connect(dest_addr, dest_type);
618 con->send_message(m);
619 }
620 }
621
622 /**
623 * If my_inst.addr doesn't have an IP set, this function
624 * will fill it in from the passed addr. Otherwise it does nothing and returns.
625 */
626 void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr)
627 {
628 Mutex::Locker l(lock);
629 if (my_inst.addr.is_blank_ip()) {
630 int port = my_inst.addr.get_port();
631 my_inst.addr.u = addr.u;
632 my_inst.addr.set_port(port);
633 _init_local_connection();
634 }
635 }
636
637 void AsyncMessenger::set_addr(const entity_addr_t &addr)
638 {
639 Mutex::Locker l(lock);
640 entity_addr_t t = addr;
641 t.set_nonce(nonce);
642 set_myaddr(t);
643 _init_local_connection();
644 }
645
646 void AsyncMessenger::shutdown_connections(bool queue_reset)
647 {
648 ldout(cct,1) << __func__ << " " << dendl;
649 lock.Lock();
650 for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
651 q != accepting_conns.end(); ++q) {
652 AsyncConnectionRef p = *q;
653 ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
654 p->stop(queue_reset);
655 }
656 accepting_conns.clear();
657
658 while (!conns.empty()) {
659 ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
660 AsyncConnectionRef p = it->second;
661 ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
662 conns.erase(it);
663 p->get_perf_counter()->dec(l_msgr_active_connections);
664 p->stop(queue_reset);
665 }
666
667 {
668 Mutex::Locker l(deleted_lock);
669 while (!deleted_conns.empty()) {
670 set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
671 AsyncConnectionRef p = *it;
672 ldout(cct, 5) << __func__ << " delete " << p << dendl;
673 deleted_conns.erase(it);
674 }
675 }
676 lock.Unlock();
677 }
678
679 void AsyncMessenger::mark_down(const entity_addr_t& addr)
680 {
681 lock.Lock();
682 AsyncConnectionRef p = _lookup_conn(addr);
683 if (p) {
684 ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
685 p->stop(true);
686 } else {
687 ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
688 }
689 lock.Unlock();
690 }
691
692 int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
693 {
694 int my_type = my_inst.name.type();
695
696 // set reply protocol version
697 if (peer_type == my_type) {
698 // internal
699 return cluster_protocol;
700 } else {
701 // public
702 switch (connect ? peer_type : my_type) {
703 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
704 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
705 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
706 }
707 }
708 return 0;
709 }
710
711 void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
712 {
713 // be careful here: multiple threads may block here, and readers of
714 // my_inst.addr do NOT hold any lock.
715
716 // this always goes from true -> false under the protection of the
717 // mutex. if it is already false, we need not retake the mutex at
718 // all.
719 if (!need_addr)
720 return ;
721 lock.Lock();
722 if (need_addr) {
723 need_addr = false;
724 entity_addr_t t = peer_addr_for_me;
725 t.set_port(my_inst.addr.get_port());
726 t.set_nonce(my_inst.addr.get_nonce());
727 my_inst.addr = t;
728 ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
729 _init_local_connection();
730 }
731 lock.Unlock();
732 }
733
734 int AsyncMessenger::reap_dead()
735 {
736 ldout(cct, 1) << __func__ << " start" << dendl;
737 int num = 0;
738
739 Mutex::Locker l1(lock);
740 Mutex::Locker l2(deleted_lock);
741
742 while (!deleted_conns.empty()) {
743 auto it = deleted_conns.begin();
744 AsyncConnectionRef p = *it;
745 ldout(cct, 5) << __func__ << " delete " << p << dendl;
746 auto conns_it = conns.find(p->peer_addr);
747 if (conns_it != conns.end() && conns_it->second == p)
748 conns.erase(conns_it);
749 accepting_conns.erase(p);
750 deleted_conns.erase(it);
751 ++num;
752 }
753
754 return num;
755 }