1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
22 #include "AsyncMessenger.h"
24 #include "common/config.h"
25 #include "common/Timer.h"
26 #include "common/errno.h"
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
32 #define dout_subsys ceph_subsys_ms
34 #define dout_prefix _prefix(_dout, this)
35 static std::ostream
& _prefix(std::ostream
*_dout
, AsyncMessenger
*m
) {
36 return *_dout
<< "-- " << m
->get_myaddrs() << " ";
39 static std::ostream
& _prefix(std::ostream
*_dout
, Processor
*p
) {
40 return *_dout
<< " Processor -- ";
48 class Processor::C_processor_accept
: public EventCallback
{
52 explicit C_processor_accept(Processor
*p
): pro(p
) {}
53 void do_request(uint64_t id
) override
{
58 Processor::Processor(AsyncMessenger
*r
, Worker
*w
, CephContext
*c
)
59 : msgr(r
), net(c
), worker(w
),
60 listen_handler(new C_processor_accept(this)) {}
62 int Processor::bind(const entity_addrvec_t
&bind_addrs
,
63 const std::set
<int>& avoid_ports
,
64 entity_addrvec_t
* bound_addrs
)
66 const auto& conf
= msgr
->cct
->_conf
;
68 ldout(msgr
->cct
, 10) << __func__
<< " " << bind_addrs
<< dendl
;
71 opts
.nodelay
= msgr
->cct
->_conf
->ms_tcp_nodelay
;
72 opts
.rcbuf_size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
74 listen_sockets
.resize(bind_addrs
.v
.size());
75 *bound_addrs
= bind_addrs
;
77 for (unsigned k
= 0; k
< bind_addrs
.v
.size(); ++k
) {
78 auto& listen_addr
= bound_addrs
->v
[k
];
83 for (int i
= 0; i
< conf
->ms_bind_retry_count
; i
++) {
85 lderr(msgr
->cct
) << __func__
<< " was unable to bind. Trying again in "
86 << conf
->ms_bind_retry_delay
<< " seconds " << dendl
;
87 sleep(conf
->ms_bind_retry_delay
);
90 if (listen_addr
.get_port()) {
91 worker
->center
.submit_to(
92 worker
->center
.get_id(),
93 [this, k
, &listen_addr
, &opts
, &r
]() {
94 r
= worker
->listen(listen_addr
, k
, opts
, &listen_sockets
[k
]);
97 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
98 << ": " << cpp_strerror(r
) << dendl
;
102 // try a range of ports
103 for (int port
= msgr
->cct
->_conf
->ms_bind_port_min
;
104 port
<= msgr
->cct
->_conf
->ms_bind_port_max
;
106 if (avoid_ports
.count(port
))
109 listen_addr
.set_port(port
);
110 worker
->center
.submit_to(
111 worker
->center
.get_id(),
112 [this, k
, &listen_addr
, &opts
, &r
]() {
113 r
= worker
->listen(listen_addr
, k
, opts
, &listen_sockets
[k
]);
119 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
120 << " on any port in range "
121 << msgr
->cct
->_conf
->ms_bind_port_min
122 << "-" << msgr
->cct
->_conf
->ms_bind_port_max
<< ": "
123 << cpp_strerror(r
) << dendl
;
124 listen_addr
.set_port(0); // Clear port before retry, otherwise we shall fail again.
127 ldout(msgr
->cct
, 10) << __func__
<< " bound on random port "
128 << listen_addr
<< dendl
;
135 // It seems that binding completely failed, return with that exit status
137 lderr(msgr
->cct
) << __func__
<< " was unable to bind after "
138 << conf
->ms_bind_retry_count
139 << " attempts: " << cpp_strerror(r
) << dendl
;
140 for (unsigned j
= 0; j
< k
; ++j
) {
141 // clean up previous bind
142 listen_sockets
[j
].abort_accept();
148 ldout(msgr
->cct
, 10) << __func__
<< " bound to " << *bound_addrs
<< dendl
;
152 void Processor::start()
154 ldout(msgr
->cct
, 1) << __func__
<< dendl
;
157 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
158 for (auto& listen_socket
: listen_sockets
) {
160 if (listen_socket
.fd() == -1) {
161 ldout(msgr
->cct
, 1) << __func__
162 << " Error: processor restart after listen_socket.fd closed. "
166 worker
->center
.create_file_event(listen_socket
.fd(), EVENT_READABLE
,
172 void Processor::accept()
175 opts
.nodelay
= msgr
->cct
->_conf
->ms_tcp_nodelay
;
176 opts
.rcbuf_size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
177 opts
.priority
= msgr
->get_socket_priority();
179 for (auto& listen_socket
: listen_sockets
) {
180 ldout(msgr
->cct
, 10) << __func__
<< " listen_fd=" << listen_socket
.fd()
182 unsigned accept_error_num
= 0;
186 ConnectedSocket cli_socket
;
188 if (!msgr
->get_stack()->support_local_listen_table())
189 w
= msgr
->get_stack()->get_worker();
192 int r
= listen_socket
.accept(&cli_socket
, opts
, &addr
, w
);
194 ldout(msgr
->cct
, 10) << __func__
<< " accepted incoming on sd "
195 << cli_socket
.fd() << dendl
;
198 w
, std::move(cli_socket
),
199 msgr
->get_myaddrs().v
[listen_socket
.get_addr_slot()],
201 accept_error_num
= 0;
207 } else if (r
== -EAGAIN
) {
209 } else if (r
== -EMFILE
|| r
== -ENFILE
) {
210 lderr(msgr
->cct
) << __func__
<< " open file descriptions limit reached sd = " << listen_socket
.fd()
211 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
212 if (++accept_error_num
> msgr
->cct
->_conf
->ms_max_accept_failures
) {
213 lderr(msgr
->cct
) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl
;
217 } else if (r
== -ECONNABORTED
) {
218 ldout(msgr
->cct
, 0) << __func__
<< " it was closed because of rst arrived sd = " << listen_socket
.fd()
219 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
222 lderr(msgr
->cct
) << __func__
<< " no incoming connection?"
223 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
224 if (++accept_error_num
> msgr
->cct
->_conf
->ms_max_accept_failures
) {
225 lderr(msgr
->cct
) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl
;
235 void Processor::stop()
237 ldout(msgr
->cct
,10) << __func__
<< dendl
;
239 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
240 for (auto& listen_socket
: listen_sockets
) {
242 worker
->center
.delete_file_event(listen_socket
.fd(), EVENT_READABLE
);
243 listen_socket
.abort_accept();
250 struct StackSingleton
{
252 std::shared_ptr
<NetworkStack
> stack
;
254 explicit StackSingleton(CephContext
*c
): cct(c
) {}
255 void ready(std::string
&type
) {
257 stack
= NetworkStack::create(cct
, type
);
265 class C_handle_reap
: public EventCallback
{
266 AsyncMessenger
*msgr
;
269 explicit C_handle_reap(AsyncMessenger
*m
): msgr(m
) {}
270 void do_request(uint64_t id
) override
{
271 // judge whether is a time event
280 AsyncMessenger::AsyncMessenger(CephContext
*cct
, entity_name_t name
,
281 const std::string
&type
, std::string mname
, uint64_t _nonce
)
282 : SimplePolicyMessenger(cct
, name
),
283 dispatch_queue(cct
, this, mname
),
286 std::string transport_type
= "posix";
287 if (type
.find("rdma") != std::string::npos
)
288 transport_type
= "rdma";
289 else if (type
.find("dpdk") != std::string::npos
)
290 transport_type
= "dpdk";
292 auto single
= &cct
->lookup_or_create_singleton_object
<StackSingleton
>(
293 "AsyncMessenger::NetworkStack::" + transport_type
, true, cct
);
294 single
->ready(transport_type
);
295 stack
= single
->stack
.get();
297 local_worker
= stack
->get_worker();
298 local_connection
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
,
299 local_worker
, true, true);
300 init_local_connection();
301 reap_handler
= new C_handle_reap(this);
302 unsigned processor_num
= 1;
303 if (stack
->support_local_listen_table())
304 processor_num
= stack
->get_num_worker();
305 for (unsigned i
= 0; i
< processor_num
; ++i
)
306 processors
.push_back(new Processor(this, stack
->get_worker(i
), cct
));
310 * Destroy the AsyncMessenger. Pretty simple since all the work is done
313 AsyncMessenger::~AsyncMessenger()
316 ceph_assert(!did_bind
); // either we didn't bind or we shut down the Processor
317 for (auto &&p
: processors
)
321 void AsyncMessenger::ready()
323 ldout(cct
,10) << __func__
<< " " << get_myaddrs() << dendl
;
327 int err
= bindv(pending_bind_addrs
);
329 lderr(cct
) << __func__
<< " postponed bind failed" << dendl
;
334 std::lock_guard l
{lock
};
335 for (auto &&p
: processors
)
337 dispatch_queue
.start();
340 int AsyncMessenger::shutdown()
342 ldout(cct
,10) << __func__
<< " " << get_myaddrs() << dendl
;
345 for (auto &&p
: processors
)
348 // break ref cycles on the loopback connection
349 local_connection
->clear_priv();
350 local_connection
->mark_down();
353 stop_cond
.notify_all();
360 int AsyncMessenger::bind(const entity_addr_t
&bind_addr
)
362 ldout(cct
,10) << __func__
<< " " << bind_addr
<< dendl
;
363 // old bind() can take entity_addr_t(). new bindv() can take a
364 // 0.0.0.0-like address but needs type and family to be set.
366 if (a
== entity_addr_t()) {
367 a
.set_type(entity_addr_t::TYPE_LEGACY
);
368 if (cct
->_conf
->ms_bind_ipv6
) {
369 a
.set_family(AF_INET6
);
371 a
.set_family(AF_INET
);
374 return bindv(entity_addrvec_t(a
));
377 int AsyncMessenger::bindv(const entity_addrvec_t
&bind_addrs
)
381 if (!pending_bind
&& started
) {
382 ldout(cct
,10) << __func__
<< " already started" << dendl
;
387 ldout(cct
,10) << __func__
<< " " << bind_addrs
<< dendl
;
389 if (!stack
->is_ready()) {
390 ldout(cct
, 10) << __func__
<< " Network Stack is not ready for bind yet - postponed" << dendl
;
391 pending_bind_addrs
= bind_addrs
;
400 std::set
<int> avoid_ports
;
401 entity_addrvec_t bound_addrs
;
403 for (auto &&p
: processors
) {
404 int r
= p
->bind(bind_addrs
, avoid_ports
, &bound_addrs
);
406 // Note: this is related to local tcp listen table problem.
407 // Posix(default kernel implementation) backend shares listen table
408 // in the kernel, so all threads can use the same listen table naturally
409 // and only one thread need to bind. But other backends(like dpdk) uses local
410 // listen table, we need to bind/listen tcp port for each worker. So if the
411 // first worker failed to bind, it could be think the normal error then handle
412 // it, like port is used case. But if the first worker successfully to bind
413 // but the second worker failed, it's not expected and we need to assert
420 _finish_bind(bind_addrs
, bound_addrs
);
424 int AsyncMessenger::rebind(const std::set
<int>& avoid_ports
)
426 ldout(cct
,1) << __func__
<< " rebind avoid " << avoid_ports
<< dendl
;
427 ceph_assert(did_bind
);
429 for (auto &&p
: processors
)
433 // adjust the nonce; we want our entity_addr_t to be truly unique.
435 ldout(cct
, 10) << __func__
<< " new nonce " << nonce
436 << " and addr " << get_myaddrs() << dendl
;
438 entity_addrvec_t bound_addrs
;
439 entity_addrvec_t bind_addrs
= get_myaddrs();
440 std::set
<int> new_avoid(avoid_ports
);
441 for (auto& a
: bind_addrs
.v
) {
442 new_avoid
.insert(a
.get_port());
445 ldout(cct
, 10) << __func__
<< " will try " << bind_addrs
446 << " and avoid ports " << new_avoid
<< dendl
;
448 for (auto &&p
: processors
) {
449 int r
= p
->bind(bind_addrs
, avoid_ports
, &bound_addrs
);
456 _finish_bind(bind_addrs
, bound_addrs
);
457 for (auto &&p
: processors
) {
463 int AsyncMessenger::client_bind(const entity_addr_t
&bind_addr
)
465 if (!cct
->_conf
->ms_bind_before_connect
)
467 std::lock_guard l
{lock
};
472 ldout(cct
, 10) << __func__
<< " already started" << dendl
;
475 ldout(cct
, 10) << __func__
<< " " << bind_addr
<< dendl
;
477 set_myaddrs(entity_addrvec_t(bind_addr
));
481 void AsyncMessenger::_finish_bind(const entity_addrvec_t
& bind_addrs
,
482 const entity_addrvec_t
& listen_addrs
)
484 set_myaddrs(bind_addrs
);
485 for (auto& a
: bind_addrs
.v
) {
486 if (!a
.is_blank_ip()) {
491 if (get_myaddrs().front().get_port() == 0) {
492 set_myaddrs(listen_addrs
);
494 entity_addrvec_t newaddrs
= *my_addrs
;
495 for (auto& a
: newaddrs
.v
) {
498 set_myaddrs(newaddrs
);
500 init_local_connection();
502 ldout(cct
,1) << __func__
<< " bind my_addrs is " << get_myaddrs() << dendl
;
506 int AsyncMessenger::client_reset()
510 std::scoped_lock l
{lock
};
511 // adjust the nonce; we want our entity_addr_t to be truly unique.
513 ldout(cct
, 10) << __func__
<< " new nonce " << nonce
<< dendl
;
515 entity_addrvec_t newaddrs
= *my_addrs
;
516 for (auto& a
: newaddrs
.v
) {
519 set_myaddrs(newaddrs
);
520 _init_local_connection();
524 int AsyncMessenger::start()
526 std::scoped_lock l
{lock
};
527 ldout(cct
,1) << __func__
<< " start" << dendl
;
529 // register at least one entity, first!
530 ceph_assert(my_name
.type() >= 0);
532 ceph_assert(!started
);
537 entity_addrvec_t newaddrs
= *my_addrs
;
538 for (auto& a
: newaddrs
.v
) {
541 set_myaddrs(newaddrs
);
542 _init_local_connection();
548 void AsyncMessenger::wait()
551 std::unique_lock locker
{lock
};
556 stop_cond
.wait(locker
);
558 dispatch_queue
.shutdown();
559 if (dispatch_queue
.is_started()) {
560 ldout(cct
, 10) << __func__
<< ": waiting for dispatch queue" << dendl
;
561 dispatch_queue
.wait();
562 dispatch_queue
.discard_local();
563 ldout(cct
, 10) << __func__
<< ": dispatch queue is stopped" << dendl
;
566 // close all connections
567 shutdown_connections(false);
570 ldout(cct
, 10) << __func__
<< ": done." << dendl
;
571 ldout(cct
, 1) << __func__
<< " complete." << dendl
;
575 void AsyncMessenger::add_accept(Worker
*w
, ConnectedSocket cli_socket
,
576 const entity_addr_t
&listen_addr
,
577 const entity_addr_t
&peer_addr
)
579 std::lock_guard l
{lock
};
580 auto conn
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
, w
,
581 listen_addr
.is_msgr2(), false);
582 conn
->accept(std::move(cli_socket
), listen_addr
, peer_addr
);
583 accepting_conns
.insert(conn
);
586 AsyncConnectionRef
AsyncMessenger::create_connect(
587 const entity_addrvec_t
& addrs
, int type
, bool anon
)
589 ceph_assert(ceph_mutex_is_locked(lock
));
591 ldout(cct
, 10) << __func__
<< " " << addrs
592 << ", creating connection and registering" << dendl
;
594 // here is where we decide which of the addrs to connect to. always prefer
595 // the first one, if we support it.
596 entity_addr_t target
;
597 for (auto& a
: addrs
.v
) {
598 if (!a
.is_msgr2() && !a
.is_legacy()) {
601 // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
602 // trying it? for now, just pick whichever is listed first.
608 Worker
*w
= stack
->get_worker();
609 auto conn
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
, w
,
610 target
.is_msgr2(), false);
612 conn
->connect(addrs
, type
, target
);
614 anon_conns
.insert(conn
);
616 ceph_assert(!conns
.count(addrs
));
617 ldout(cct
, 10) << __func__
<< " " << conn
<< " " << addrs
<< " "
618 << *conn
->peer_addrs
<< dendl
;
621 w
->get_perf_counter()->inc(l_msgr_active_connections
);
627 ConnectionRef
AsyncMessenger::get_loopback_connection()
629 return local_connection
;
632 bool AsyncMessenger::should_use_msgr2()
634 // if we are bound to v1 only, and we are connecting to a v2 peer,
635 // we cannot use the peer's v2 address. otherwise the connection
636 // is assymetrical, because they would have to use v1 to connect
637 // to us, and we would use v2, and connection race detection etc
638 // would totally break down (among other things). or, the other
639 // end will be confused that we advertise ourselve with a v1
640 // address only (that we bound to) but connected with protocol v2.
641 return !did_bind
|| get_myaddrs().has_msgr2();
644 entity_addrvec_t
AsyncMessenger::_filter_addrs(const entity_addrvec_t
& addrs
)
646 if (!should_use_msgr2()) {
647 ldout(cct
, 10) << __func__
<< " " << addrs
<< " limiting to v1 ()" << dendl
;
649 for (auto& i
: addrs
.v
) {
661 int AsyncMessenger::send_to(Message
*m
, int type
, const entity_addrvec_t
& addrs
)
666 #if defined(WITH_EVENTTRACE)
667 if (m
->get_type() == CEPH_MSG_OSD_OP
)
668 OID_EVENT_TRACE(((MOSDOp
*)m
)->get_oid().name
.c_str(), "SEND_MSG_OSD_OP");
669 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
670 OID_EVENT_TRACE(((MOSDOpReply
*)m
)->get_oid().name
.c_str(), "SEND_MSG_OSD_OP_REPLY");
673 ldout(cct
, 1) << __func__
<< "--> " << ceph_entity_type_name(type
) << " "
674 << addrs
<< " -- " << *m
<< " -- ?+"
675 << m
->get_data().length() << " " << m
<< dendl
;
678 ldout(cct
,0) << __func__
<< " message " << *m
679 << " with empty dest " << addrs
<< dendl
;
684 if (cct
->_conf
->ms_dump_on_send
) {
685 m
->encode(-1, MSG_CRC_ALL
);
686 ldout(cct
, 0) << __func__
<< " submit_message " << *m
<< "\n";
687 m
->get_payload().hexdump(*_dout
);
688 if (m
->get_data().length() > 0) {
689 *_dout
<< " data:\n";
690 m
->get_data().hexdump(*_dout
);
696 connect_to(type
, addrs
, false)->send_message(m
);
700 ConnectionRef
AsyncMessenger::connect_to(int type
,
701 const entity_addrvec_t
& addrs
,
702 bool anon
, bool not_local_dest
)
704 if (!not_local_dest
) {
705 if (*my_addrs
== addrs
||
706 (addrs
.v
.size() == 1 &&
707 my_addrs
->contains(addrs
.front()))) {
709 return local_connection
;
713 auto av
= _filter_addrs(addrs
);
714 std::lock_guard l
{lock
};
716 return create_connect(av
, type
, anon
);
719 AsyncConnectionRef conn
= _lookup_conn(av
);
721 ldout(cct
, 10) << __func__
<< " " << av
<< " existing " << conn
<< dendl
;
723 conn
= create_connect(av
, type
, false);
724 ldout(cct
, 10) << __func__
<< " " << av
<< " new " << conn
<< dendl
;
731 * If my_addr doesn't have an IP set, this function
732 * will fill it in from the passed addr. Otherwise it does nothing and returns.
734 bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t
&addrs
)
736 ldout(cct
,1) << __func__
<< " " << addrs
<< dendl
;
738 std::lock_guard l
{lock
};
740 entity_addrvec_t newaddrs
= *my_addrs
;
741 for (auto& a
: newaddrs
.v
) {
742 if (a
.is_blank_ip()) {
743 int type
= a
.get_type();
744 int port
= a
.get_port();
745 uint32_t nonce
= a
.get_nonce();
746 for (auto& b
: addrs
.v
) {
747 if (a
.get_family() == b
.get_family()) {
748 ldout(cct
,1) << __func__
<< " assuming my addr " << a
749 << " matches provided addr " << b
<< dendl
;
760 set_myaddrs(newaddrs
);
762 _init_local_connection();
764 ldout(cct
,1) << __func__
<< " now " << *my_addrs
<< dendl
;
768 void AsyncMessenger::set_addrs(const entity_addrvec_t
&addrs
)
770 std::lock_guard l
{lock
};
772 for (auto& a
: t
.v
) {
776 _init_local_connection();
779 void AsyncMessenger::shutdown_connections(bool queue_reset
)
781 ldout(cct
,1) << __func__
<< " " << dendl
;
782 std::lock_guard l
{lock
};
783 for (const auto& c
: accepting_conns
) {
784 ldout(cct
, 5) << __func__
<< " accepting_conn " << c
<< dendl
;
785 c
->stop(queue_reset
);
787 accepting_conns
.clear();
789 for (const auto& [e
, c
] : conns
) {
790 ldout(cct
, 5) << __func__
<< " mark down " << e
<< " " << c
<< dendl
;
791 c
->stop(queue_reset
);
795 for (const auto& c
: anon_conns
) {
796 ldout(cct
, 5) << __func__
<< " mark down " << c
<< dendl
;
797 c
->stop(queue_reset
);
802 std::lock_guard l
{deleted_lock
};
803 for (const auto& c
: deleted_conns
) {
804 ldout(cct
, 5) << __func__
<< " delete " << c
<< dendl
;
805 c
->get_perf_counter()->dec(l_msgr_active_connections
);
807 deleted_conns
.clear();
811 void AsyncMessenger::mark_down_addrs(const entity_addrvec_t
& addrs
)
813 std::lock_guard l
{lock
};
814 const AsyncConnectionRef
& conn
= _lookup_conn(addrs
);
816 ldout(cct
, 1) << __func__
<< " " << addrs
<< " -- " << conn
<< dendl
;
819 ldout(cct
, 1) << __func__
<< " " << addrs
<< " -- connection dne" << dendl
;
823 int AsyncMessenger::get_proto_version(int peer_type
, bool connect
) const
825 int my_type
= my_name
.type();
827 // set reply protocol version
828 if (peer_type
== my_type
) {
830 return cluster_protocol
;
833 switch (connect
? peer_type
: my_type
) {
834 case CEPH_ENTITY_TYPE_OSD
: return CEPH_OSDC_PROTOCOL
;
835 case CEPH_ENTITY_TYPE_MDS
: return CEPH_MDSC_PROTOCOL
;
836 case CEPH_ENTITY_TYPE_MON
: return CEPH_MONC_PROTOCOL
;
842 int AsyncMessenger::accept_conn(const AsyncConnectionRef
& conn
)
844 std::lock_guard l
{lock
};
845 if (conn
->policy
.server
&&
846 conn
->policy
.lossy
&&
847 !conn
->policy
.register_lossy_clients
) {
848 anon_conns
.insert(conn
);
849 conn
->get_perf_counter()->inc(l_msgr_active_connections
);
852 auto it
= conns
.find(*conn
->peer_addrs
);
853 if (it
!= conns
.end()) {
854 auto& existing
= it
->second
;
856 // lazy delete, see "deleted_conns"
857 // If conn already in, we will return 0
858 std::lock_guard l
{deleted_lock
};
859 if (deleted_conns
.erase(existing
)) {
860 it
->second
->get_perf_counter()->dec(l_msgr_active_connections
);
862 } else if (conn
!= existing
) {
866 ldout(cct
, 10) << __func__
<< " " << conn
<< " " << *conn
->peer_addrs
<< dendl
;
867 conns
[*conn
->peer_addrs
] = conn
;
868 conn
->get_perf_counter()->inc(l_msgr_active_connections
);
869 accepting_conns
.erase(conn
);
874 bool AsyncMessenger::learned_addr(const entity_addr_t
&peer_addr_for_me
)
876 // be careful here: multiple threads may block here, and readers of
877 // my_addr do NOT hold any lock.
879 // this always goes from true -> false under the protection of the
880 // mutex. if it is already false, we need not retake the mutex at
884 std::lock_guard
l(lock
);
886 if (my_addrs
->empty()) {
887 auto a
= peer_addr_for_me
;
888 a
.set_type(entity_addr_t::TYPE_ANY
);
893 set_myaddrs(entity_addrvec_t(a
));
894 ldout(cct
,10) << __func__
<< " had no addrs" << dendl
;
896 // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
897 entity_addrvec_t newaddrs
= *my_addrs
;
898 for (auto& a
: newaddrs
.v
) {
899 if (a
.is_blank_ip() &&
900 a
.get_family() == peer_addr_for_me
.get_family()) {
901 entity_addr_t t
= peer_addr_for_me
;
903 t
.set_type(entity_addr_t::TYPE_ANY
);
906 t
.set_type(a
.get_type());
907 t
.set_port(a
.get_port());
909 t
.set_nonce(a
.get_nonce());
910 ldout(cct
,10) << __func__
<< " " << a
<< " -> " << t
<< dendl
;
914 set_myaddrs(newaddrs
);
916 ldout(cct
, 1) << __func__
<< " learned my addr " << *my_addrs
917 << " (peer_addr_for_me " << peer_addr_for_me
<< ")" << dendl
;
918 _init_local_connection();
925 void AsyncMessenger::reap_dead()
927 ldout(cct
, 1) << __func__
<< " start" << dendl
;
929 std::lock_guard l1
{lock
};
932 std::lock_guard l2
{deleted_lock
};
933 for (auto& c
: deleted_conns
) {
934 ldout(cct
, 5) << __func__
<< " delete " << c
<< dendl
;
935 auto conns_it
= conns
.find(*c
->peer_addrs
);
936 if (conns_it
!= conns
.end() && conns_it
->second
== c
)
937 conns
.erase(conns_it
);
938 accepting_conns
.erase(c
);
940 c
->get_perf_counter()->dec(l_msgr_active_connections
);
942 deleted_conns
.clear();