1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
22 #include "AsyncMessenger.h"
24 #include "common/config.h"
25 #include "common/Timer.h"
26 #include "common/errno.h"
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
32 #define dout_subsys ceph_subsys_ms
34 #define dout_prefix _prefix(_dout, this)
35 static ostream
& _prefix(std::ostream
*_dout
, AsyncMessenger
*m
) {
36 return *_dout
<< "-- " << m
->get_myaddrs() << " ";
39 static ostream
& _prefix(std::ostream
*_dout
, Processor
*p
) {
40 return *_dout
<< " Processor -- ";
48 class Processor::C_processor_accept
: public EventCallback
{
52 explicit C_processor_accept(Processor
*p
): pro(p
) {}
53 void do_request(uint64_t id
) override
{
58 Processor::Processor(AsyncMessenger
*r
, Worker
*w
, CephContext
*c
)
59 : msgr(r
), net(c
), worker(w
),
60 listen_handler(new C_processor_accept(this)) {}
62 int Processor::bind(const entity_addrvec_t
&bind_addrs
,
63 const set
<int>& avoid_ports
,
64 entity_addrvec_t
* bound_addrs
)
66 const auto& conf
= msgr
->cct
->_conf
;
68 ldout(msgr
->cct
, 10) << __func__
<< " " << bind_addrs
<< dendl
;
71 opts
.nodelay
= msgr
->cct
->_conf
->ms_tcp_nodelay
;
72 opts
.rcbuf_size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
74 listen_sockets
.resize(bind_addrs
.v
.size());
75 *bound_addrs
= bind_addrs
;
77 for (unsigned k
= 0; k
< bind_addrs
.v
.size(); ++k
) {
78 auto& listen_addr
= bound_addrs
->v
[k
];
83 for (int i
= 0; i
< conf
->ms_bind_retry_count
; i
++) {
85 lderr(msgr
->cct
) << __func__
<< " was unable to bind. Trying again in "
86 << conf
->ms_bind_retry_delay
<< " seconds " << dendl
;
87 sleep(conf
->ms_bind_retry_delay
);
90 if (listen_addr
.get_port()) {
91 worker
->center
.submit_to(
92 worker
->center
.get_id(),
93 [this, k
, &listen_addr
, &opts
, &r
]() {
94 r
= worker
->listen(listen_addr
, k
, opts
, &listen_sockets
[k
]);
97 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
98 << ": " << cpp_strerror(r
) << dendl
;
102 // try a range of ports
103 for (int port
= msgr
->cct
->_conf
->ms_bind_port_min
;
104 port
<= msgr
->cct
->_conf
->ms_bind_port_max
;
106 if (avoid_ports
.count(port
))
109 listen_addr
.set_port(port
);
110 worker
->center
.submit_to(
111 worker
->center
.get_id(),
112 [this, k
, &listen_addr
, &opts
, &r
]() {
113 r
= worker
->listen(listen_addr
, k
, opts
, &listen_sockets
[k
]);
119 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
120 << " on any port in range "
121 << msgr
->cct
->_conf
->ms_bind_port_min
122 << "-" << msgr
->cct
->_conf
->ms_bind_port_max
<< ": "
123 << cpp_strerror(r
) << dendl
;
124 listen_addr
.set_port(0); // Clear port before retry, otherwise we shall fail again.
127 ldout(msgr
->cct
, 10) << __func__
<< " bound on random port "
128 << listen_addr
<< dendl
;
135 // It seems that binding completely failed, return with that exit status
137 lderr(msgr
->cct
) << __func__
<< " was unable to bind after "
138 << conf
->ms_bind_retry_count
139 << " attempts: " << cpp_strerror(r
) << dendl
;
140 for (unsigned j
= 0; j
< k
; ++j
) {
141 // clean up previous bind
142 listen_sockets
[j
].abort_accept();
148 ldout(msgr
->cct
, 10) << __func__
<< " bound to " << *bound_addrs
<< dendl
;
152 void Processor::start()
154 ldout(msgr
->cct
, 1) << __func__
<< dendl
;
157 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
158 for (auto& listen_socket
: listen_sockets
) {
160 if (listen_socket
.fd() == -1) {
161 ldout(msgr
->cct
, 1) << __func__
162 << " Error: processor restart after listen_socket.fd closed. "
166 worker
->center
.create_file_event(listen_socket
.fd(), EVENT_READABLE
,
172 void Processor::accept()
175 opts
.nodelay
= msgr
->cct
->_conf
->ms_tcp_nodelay
;
176 opts
.rcbuf_size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
177 opts
.priority
= msgr
->get_socket_priority();
179 for (auto& listen_socket
: listen_sockets
) {
180 ldout(msgr
->cct
, 10) << __func__
<< " listen_fd=" << listen_socket
.fd()
182 unsigned accept_error_num
= 0;
186 ConnectedSocket cli_socket
;
188 if (!msgr
->get_stack()->support_local_listen_table())
189 w
= msgr
->get_stack()->get_worker();
192 int r
= listen_socket
.accept(&cli_socket
, opts
, &addr
, w
);
194 ldout(msgr
->cct
, 10) << __func__
<< " accepted incoming on sd "
195 << cli_socket
.fd() << dendl
;
198 w
, std::move(cli_socket
),
199 msgr
->get_myaddrs().v
[listen_socket
.get_addr_slot()],
201 accept_error_num
= 0;
207 } else if (r
== -EAGAIN
) {
209 } else if (r
== -EMFILE
|| r
== -ENFILE
) {
210 lderr(msgr
->cct
) << __func__
<< " open file descriptions limit reached sd = " << listen_socket
.fd()
211 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
212 if (++accept_error_num
> msgr
->cct
->_conf
->ms_max_accept_failures
) {
213 lderr(msgr
->cct
) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl
;
217 } else if (r
== -ECONNABORTED
) {
218 ldout(msgr
->cct
, 0) << __func__
<< " it was closed because of rst arrived sd = " << listen_socket
.fd()
219 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
222 lderr(msgr
->cct
) << __func__
<< " no incoming connection?"
223 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
224 if (++accept_error_num
> msgr
->cct
->_conf
->ms_max_accept_failures
) {
225 lderr(msgr
->cct
) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl
;
235 void Processor::stop()
237 ldout(msgr
->cct
,10) << __func__
<< dendl
;
239 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
240 for (auto& listen_socket
: listen_sockets
) {
242 worker
->center
.delete_file_event(listen_socket
.fd(), EVENT_READABLE
);
243 listen_socket
.abort_accept();
250 struct StackSingleton
{
252 std::shared_ptr
<NetworkStack
> stack
;
254 explicit StackSingleton(CephContext
*c
): cct(c
) {}
255 void ready(std::string
&type
) {
257 stack
= NetworkStack::create(cct
, type
);
265 class C_handle_reap
: public EventCallback
{
266 AsyncMessenger
*msgr
;
269 explicit C_handle_reap(AsyncMessenger
*m
): msgr(m
) {}
270 void do_request(uint64_t id
) override
{
271 // judge whether is a time event
280 AsyncMessenger::AsyncMessenger(CephContext
*cct
, entity_name_t name
,
281 const std::string
&type
, string mname
, uint64_t _nonce
)
282 : SimplePolicyMessenger(cct
, name
),
283 dispatch_queue(cct
, this, mname
),
286 std::string transport_type
= "posix";
287 if (type
.find("rdma") != std::string::npos
)
288 transport_type
= "rdma";
289 else if (type
.find("dpdk") != std::string::npos
)
290 transport_type
= "dpdk";
292 auto single
= &cct
->lookup_or_create_singleton_object
<StackSingleton
>(
293 "AsyncMessenger::NetworkStack::" + transport_type
, true, cct
);
294 single
->ready(transport_type
);
295 stack
= single
->stack
.get();
297 local_worker
= stack
->get_worker();
298 local_connection
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
,
299 local_worker
, true, true);
300 init_local_connection();
301 reap_handler
= new C_handle_reap(this);
302 unsigned processor_num
= 1;
303 if (stack
->support_local_listen_table())
304 processor_num
= stack
->get_num_worker();
305 for (unsigned i
= 0; i
< processor_num
; ++i
)
306 processors
.push_back(new Processor(this, stack
->get_worker(i
), cct
));
310 * Destroy the AsyncMessenger. Pretty simple since all the work is done
313 AsyncMessenger::~AsyncMessenger()
316 ceph_assert(!did_bind
); // either we didn't bind or we shut down the Processor
317 for (auto &&p
: processors
)
321 void AsyncMessenger::ready()
323 ldout(cct
,10) << __func__
<< " " << get_myaddrs() << dendl
;
327 int err
= bindv(pending_bind_addrs
);
329 lderr(cct
) << __func__
<< " postponed bind failed" << dendl
;
334 std::lock_guard l
{lock
};
335 for (auto &&p
: processors
)
337 dispatch_queue
.start();
340 int AsyncMessenger::shutdown()
342 ldout(cct
,10) << __func__
<< " " << get_myaddrs() << dendl
;
345 for (auto &&p
: processors
)
348 // break ref cycles on the loopback connection
349 local_connection
->clear_priv();
350 local_connection
->mark_down();
353 stop_cond
.notify_all();
360 int AsyncMessenger::bind(const entity_addr_t
&bind_addr
)
362 ldout(cct
,10) << __func__
<< " " << bind_addr
<< dendl
;
363 // old bind() can take entity_addr_t(). new bindv() can take a
364 // 0.0.0.0-like address but needs type and family to be set.
366 if (a
== entity_addr_t()) {
367 a
.set_type(entity_addr_t::TYPE_LEGACY
);
368 if (cct
->_conf
->ms_bind_ipv6
) {
369 a
.set_family(AF_INET6
);
371 a
.set_family(AF_INET
);
374 return bindv(entity_addrvec_t(a
));
377 int AsyncMessenger::bindv(const entity_addrvec_t
&bind_addrs
)
381 if (!pending_bind
&& started
) {
382 ldout(cct
,10) << __func__
<< " already started" << dendl
;
387 ldout(cct
,10) << __func__
<< " " << bind_addrs
<< dendl
;
389 if (!stack
->is_ready()) {
390 ldout(cct
, 10) << __func__
<< " Network Stack is not ready for bind yet - postponed" << dendl
;
391 pending_bind_addrs
= bind_addrs
;
400 set
<int> avoid_ports
;
401 entity_addrvec_t bound_addrs
;
403 for (auto &&p
: processors
) {
404 int r
= p
->bind(bind_addrs
, avoid_ports
, &bound_addrs
);
406 // Note: this is related to local tcp listen table problem.
407 // Posix(default kernel implementation) backend shares listen table
408 // in the kernel, so all threads can use the same listen table naturally
409 // and only one thread need to bind. But other backends(like dpdk) uses local
410 // listen table, we need to bind/listen tcp port for each worker. So if the
411 // first worker failed to bind, it could be think the normal error then handle
412 // it, like port is used case. But if the first worker successfully to bind
413 // but the second worker failed, it's not expected and we need to assert
420 _finish_bind(bind_addrs
, bound_addrs
);
424 int AsyncMessenger::rebind(const set
<int>& avoid_ports
)
426 ldout(cct
,1) << __func__
<< " rebind avoid " << avoid_ports
<< dendl
;
427 ceph_assert(did_bind
);
429 for (auto &&p
: processors
)
433 // adjust the nonce; we want our entity_addr_t to be truly unique.
435 ldout(cct
, 10) << __func__
<< " new nonce " << nonce
436 << " and addr " << get_myaddrs() << dendl
;
438 entity_addrvec_t bound_addrs
;
439 entity_addrvec_t bind_addrs
= get_myaddrs();
440 set
<int> new_avoid(avoid_ports
);
441 for (auto& a
: bind_addrs
.v
) {
442 new_avoid
.insert(a
.get_port());
445 ldout(cct
, 10) << __func__
<< " will try " << bind_addrs
446 << " and avoid ports " << new_avoid
<< dendl
;
448 for (auto &&p
: processors
) {
449 int r
= p
->bind(bind_addrs
, avoid_ports
, &bound_addrs
);
456 _finish_bind(bind_addrs
, bound_addrs
);
457 for (auto &&p
: processors
) {
463 int AsyncMessenger::client_bind(const entity_addr_t
&bind_addr
)
465 if (!cct
->_conf
->ms_bind_before_connect
)
467 std::lock_guard l
{lock
};
472 ldout(cct
, 10) << __func__
<< " already started" << dendl
;
475 ldout(cct
, 10) << __func__
<< " " << bind_addr
<< dendl
;
477 set_myaddrs(entity_addrvec_t(bind_addr
));
481 void AsyncMessenger::_finish_bind(const entity_addrvec_t
& bind_addrs
,
482 const entity_addrvec_t
& listen_addrs
)
484 set_myaddrs(bind_addrs
);
485 for (auto& a
: bind_addrs
.v
) {
486 if (!a
.is_blank_ip()) {
491 if (get_myaddrs().front().get_port() == 0) {
492 set_myaddrs(listen_addrs
);
494 entity_addrvec_t newaddrs
= *my_addrs
;
495 for (auto& a
: newaddrs
.v
) {
498 set_myaddrs(newaddrs
);
500 init_local_connection();
502 ldout(cct
,1) << __func__
<< " bind my_addrs is " << get_myaddrs() << dendl
;
506 int AsyncMessenger::start()
508 std::scoped_lock l
{lock
};
509 ldout(cct
,1) << __func__
<< " start" << dendl
;
511 // register at least one entity, first!
512 ceph_assert(my_name
.type() >= 0);
514 ceph_assert(!started
);
519 entity_addrvec_t newaddrs
= *my_addrs
;
520 for (auto& a
: newaddrs
.v
) {
523 set_myaddrs(newaddrs
);
524 _init_local_connection();
530 void AsyncMessenger::wait()
533 std::unique_lock locker
{lock
};
538 stop_cond
.wait(locker
);
540 dispatch_queue
.shutdown();
541 if (dispatch_queue
.is_started()) {
542 ldout(cct
, 10) << __func__
<< ": waiting for dispatch queue" << dendl
;
543 dispatch_queue
.wait();
544 dispatch_queue
.discard_local();
545 ldout(cct
, 10) << __func__
<< ": dispatch queue is stopped" << dendl
;
548 // close all connections
549 shutdown_connections(false);
552 ldout(cct
, 10) << __func__
<< ": done." << dendl
;
553 ldout(cct
, 1) << __func__
<< " complete." << dendl
;
557 void AsyncMessenger::add_accept(Worker
*w
, ConnectedSocket cli_socket
,
558 const entity_addr_t
&listen_addr
,
559 const entity_addr_t
&peer_addr
)
561 std::lock_guard l
{lock
};
562 auto conn
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
, w
,
563 listen_addr
.is_msgr2(), false);
564 conn
->accept(std::move(cli_socket
), listen_addr
, peer_addr
);
565 accepting_conns
.insert(conn
);
568 AsyncConnectionRef
AsyncMessenger::create_connect(
569 const entity_addrvec_t
& addrs
, int type
, bool anon
)
571 ceph_assert(ceph_mutex_is_locked(lock
));
573 ldout(cct
, 10) << __func__
<< " " << addrs
574 << ", creating connection and registering" << dendl
;
576 // here is where we decide which of the addrs to connect to. always prefer
577 // the first one, if we support it.
578 entity_addr_t target
;
579 for (auto& a
: addrs
.v
) {
580 if (!a
.is_msgr2() && !a
.is_legacy()) {
583 // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
584 // trying it? for now, just pick whichever is listed first.
590 Worker
*w
= stack
->get_worker();
591 auto conn
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
, w
,
592 target
.is_msgr2(), false);
594 conn
->connect(addrs
, type
, target
);
596 anon_conns
.insert(conn
);
598 ceph_assert(!conns
.count(addrs
));
599 ldout(cct
, 10) << __func__
<< " " << conn
<< " " << addrs
<< " "
600 << *conn
->peer_addrs
<< dendl
;
603 w
->get_perf_counter()->inc(l_msgr_active_connections
);
609 ConnectionRef
AsyncMessenger::get_loopback_connection()
611 return local_connection
;
614 bool AsyncMessenger::should_use_msgr2()
616 // if we are bound to v1 only, and we are connecting to a v2 peer,
617 // we cannot use the peer's v2 address. otherwise the connection
618 // is assymetrical, because they would have to use v1 to connect
619 // to us, and we would use v2, and connection race detection etc
620 // would totally break down (among other things). or, the other
621 // end will be confused that we advertise ourselve with a v1
622 // address only (that we bound to) but connected with protocol v2.
623 return !did_bind
|| get_myaddrs().has_msgr2();
626 entity_addrvec_t
AsyncMessenger::_filter_addrs(const entity_addrvec_t
& addrs
)
628 if (!should_use_msgr2()) {
629 ldout(cct
, 10) << __func__
<< " " << addrs
<< " limiting to v1 ()" << dendl
;
631 for (auto& i
: addrs
.v
) {
643 int AsyncMessenger::send_to(Message
*m
, int type
, const entity_addrvec_t
& addrs
)
648 #if defined(WITH_EVENTTRACE)
649 if (m
->get_type() == CEPH_MSG_OSD_OP
)
650 OID_EVENT_TRACE(((MOSDOp
*)m
)->get_oid().name
.c_str(), "SEND_MSG_OSD_OP");
651 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
652 OID_EVENT_TRACE(((MOSDOpReply
*)m
)->get_oid().name
.c_str(), "SEND_MSG_OSD_OP_REPLY");
655 ldout(cct
, 1) << __func__
<< "--> " << ceph_entity_type_name(type
) << " "
656 << addrs
<< " -- " << *m
<< " -- ?+"
657 << m
->get_data().length() << " " << m
<< dendl
;
660 ldout(cct
,0) << __func__
<< " message " << *m
661 << " with empty dest " << addrs
<< dendl
;
666 if (cct
->_conf
->ms_dump_on_send
) {
667 m
->encode(-1, MSG_CRC_ALL
);
668 ldout(cct
, 0) << __func__
<< " submit_message " << *m
<< "\n";
669 m
->get_payload().hexdump(*_dout
);
670 if (m
->get_data().length() > 0) {
671 *_dout
<< " data:\n";
672 m
->get_data().hexdump(*_dout
);
678 connect_to(type
, addrs
, false)->send_message(m
);
682 ConnectionRef
AsyncMessenger::connect_to(int type
,
683 const entity_addrvec_t
& addrs
,
684 bool anon
, bool not_local_dest
)
686 if (!not_local_dest
) {
687 if (*my_addrs
== addrs
||
688 (addrs
.v
.size() == 1 &&
689 my_addrs
->contains(addrs
.front()))) {
691 return local_connection
;
695 auto av
= _filter_addrs(addrs
);
696 std::lock_guard l
{lock
};
698 return create_connect(av
, type
, anon
);
701 AsyncConnectionRef conn
= _lookup_conn(av
);
703 ldout(cct
, 10) << __func__
<< " " << av
<< " existing " << conn
<< dendl
;
705 conn
= create_connect(av
, type
, false);
706 ldout(cct
, 10) << __func__
<< " " << av
<< " new " << conn
<< dendl
;
713 * If my_addr doesn't have an IP set, this function
714 * will fill it in from the passed addr. Otherwise it does nothing and returns.
716 bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t
&addrs
)
718 ldout(cct
,1) << __func__
<< " " << addrs
<< dendl
;
720 std::lock_guard l
{lock
};
722 entity_addrvec_t newaddrs
= *my_addrs
;
723 for (auto& a
: newaddrs
.v
) {
724 if (a
.is_blank_ip()) {
725 int type
= a
.get_type();
726 int port
= a
.get_port();
727 uint32_t nonce
= a
.get_nonce();
728 for (auto& b
: addrs
.v
) {
729 if (a
.get_family() == b
.get_family()) {
730 ldout(cct
,1) << __func__
<< " assuming my addr " << a
731 << " matches provided addr " << b
<< dendl
;
742 set_myaddrs(newaddrs
);
744 _init_local_connection();
746 ldout(cct
,1) << __func__
<< " now " << *my_addrs
<< dendl
;
750 void AsyncMessenger::set_addrs(const entity_addrvec_t
&addrs
)
752 std::lock_guard l
{lock
};
754 for (auto& a
: t
.v
) {
758 _init_local_connection();
761 void AsyncMessenger::shutdown_connections(bool queue_reset
)
763 ldout(cct
,1) << __func__
<< " " << dendl
;
764 std::lock_guard l
{lock
};
765 for (const auto& c
: accepting_conns
) {
766 ldout(cct
, 5) << __func__
<< " accepting_conn " << c
<< dendl
;
767 c
->stop(queue_reset
);
769 accepting_conns
.clear();
771 for (const auto& [e
, c
] : conns
) {
772 ldout(cct
, 5) << __func__
<< " mark down " << e
<< " " << c
<< dendl
;
773 c
->get_perf_counter()->dec(l_msgr_active_connections
);
774 c
->stop(queue_reset
);
778 for (const auto& c
: anon_conns
) {
779 ldout(cct
, 5) << __func__
<< " mark down " << c
<< dendl
;
780 c
->get_perf_counter()->dec(l_msgr_active_connections
);
781 c
->stop(queue_reset
);
786 std::lock_guard l
{deleted_lock
};
787 if (cct
->_conf
->subsys
.should_gather
<ceph_subsys_ms
, 5>()) {
788 for (const auto& c
: deleted_conns
) {
789 ldout(cct
, 5) << __func__
<< " delete " << c
<< dendl
;
792 deleted_conns
.clear();
796 void AsyncMessenger::mark_down_addrs(const entity_addrvec_t
& addrs
)
798 std::lock_guard l
{lock
};
799 const AsyncConnectionRef
& conn
= _lookup_conn(addrs
);
801 ldout(cct
, 1) << __func__
<< " " << addrs
<< " -- " << conn
<< dendl
;
804 ldout(cct
, 1) << __func__
<< " " << addrs
<< " -- connection dne" << dendl
;
808 int AsyncMessenger::get_proto_version(int peer_type
, bool connect
) const
810 int my_type
= my_name
.type();
812 // set reply protocol version
813 if (peer_type
== my_type
) {
815 return cluster_protocol
;
818 switch (connect
? peer_type
: my_type
) {
819 case CEPH_ENTITY_TYPE_OSD
: return CEPH_OSDC_PROTOCOL
;
820 case CEPH_ENTITY_TYPE_MDS
: return CEPH_MDSC_PROTOCOL
;
821 case CEPH_ENTITY_TYPE_MON
: return CEPH_MONC_PROTOCOL
;
827 int AsyncMessenger::accept_conn(const AsyncConnectionRef
& conn
)
829 std::lock_guard l
{lock
};
830 if (conn
->policy
.server
&&
831 conn
->policy
.lossy
&&
832 !conn
->policy
.register_lossy_clients
) {
833 anon_conns
.insert(conn
);
834 conn
->get_perf_counter()->inc(l_msgr_active_connections
);
837 auto it
= conns
.find(*conn
->peer_addrs
);
838 if (it
!= conns
.end()) {
839 auto& existing
= it
->second
;
841 // lazy delete, see "deleted_conns"
842 // If conn already in, we will return 0
843 std::lock_guard l
{deleted_lock
};
844 if (deleted_conns
.erase(existing
)) {
846 } else if (conn
!= existing
) {
850 ldout(cct
, 10) << __func__
<< " " << conn
<< " " << *conn
->peer_addrs
<< dendl
;
851 conns
[*conn
->peer_addrs
] = conn
;
852 conn
->get_perf_counter()->inc(l_msgr_active_connections
);
853 accepting_conns
.erase(conn
);
858 bool AsyncMessenger::learned_addr(const entity_addr_t
&peer_addr_for_me
)
860 // be careful here: multiple threads may block here, and readers of
861 // my_addr do NOT hold any lock.
863 // this always goes from true -> false under the protection of the
864 // mutex. if it is already false, we need not retake the mutex at
868 std::lock_guard
l(lock
);
870 if (my_addrs
->empty()) {
871 auto a
= peer_addr_for_me
;
872 a
.set_type(entity_addr_t::TYPE_ANY
);
877 set_myaddrs(entity_addrvec_t(a
));
878 ldout(cct
,10) << __func__
<< " had no addrs" << dendl
;
880 // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
881 entity_addrvec_t newaddrs
= *my_addrs
;
882 for (auto& a
: newaddrs
.v
) {
883 if (a
.is_blank_ip() &&
884 a
.get_family() == peer_addr_for_me
.get_family()) {
885 entity_addr_t t
= peer_addr_for_me
;
887 t
.set_type(entity_addr_t::TYPE_ANY
);
890 t
.set_type(a
.get_type());
891 t
.set_port(a
.get_port());
893 t
.set_nonce(a
.get_nonce());
894 ldout(cct
,10) << __func__
<< " " << a
<< " -> " << t
<< dendl
;
898 set_myaddrs(newaddrs
);
900 ldout(cct
, 1) << __func__
<< " learned my addr " << *my_addrs
901 << " (peer_addr_for_me " << peer_addr_for_me
<< ")" << dendl
;
902 _init_local_connection();
909 void AsyncMessenger::reap_dead()
911 ldout(cct
, 1) << __func__
<< " start" << dendl
;
913 std::lock_guard l1
{lock
};
916 std::lock_guard l2
{deleted_lock
};
917 for (auto& c
: deleted_conns
) {
918 ldout(cct
, 5) << __func__
<< " delete " << c
<< dendl
;
919 auto conns_it
= conns
.find(*c
->peer_addrs
);
920 if (conns_it
!= conns
.end() && conns_it
->second
== c
)
921 conns
.erase(conns_it
);
922 accepting_conns
.erase(c
);
925 deleted_conns
.clear();