1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
22 #include "AsyncMessenger.h"
24 #include "common/config.h"
25 #include "common/Timer.h"
26 #include "common/errno.h"
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
32 #define dout_subsys ceph_subsys_ms
34 #define dout_prefix _prefix(_dout, this)
35 static std::ostream
& _prefix(std::ostream
*_dout
, AsyncMessenger
*m
) {
36 return *_dout
<< "-- " << m
->get_myaddrs() << " ";
39 static std::ostream
& _prefix(std::ostream
*_dout
, Processor
*p
) {
40 return *_dout
<< " Processor -- ";
48 class Processor::C_processor_accept
: public EventCallback
{
52 explicit C_processor_accept(Processor
*p
): pro(p
) {}
53 void do_request(uint64_t id
) override
{
58 Processor::Processor(AsyncMessenger
*r
, Worker
*w
, CephContext
*c
)
59 : msgr(r
), net(c
), worker(w
),
60 listen_handler(new C_processor_accept(this)) {}
62 int Processor::bind(const entity_addrvec_t
&bind_addrs
,
63 const std::set
<int>& avoid_ports
,
64 entity_addrvec_t
* bound_addrs
)
66 const auto& conf
= msgr
->cct
->_conf
;
68 ldout(msgr
->cct
, 10) << __func__
<< " " << bind_addrs
<< dendl
;
71 opts
.nodelay
= msgr
->cct
->_conf
->ms_tcp_nodelay
;
72 opts
.rcbuf_size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
74 listen_sockets
.resize(bind_addrs
.v
.size());
75 *bound_addrs
= bind_addrs
;
77 for (unsigned k
= 0; k
< bind_addrs
.v
.size(); ++k
) {
78 auto& listen_addr
= bound_addrs
->v
[k
];
83 for (int i
= 0; i
< conf
->ms_bind_retry_count
; i
++) {
85 lderr(msgr
->cct
) << __func__
<< " was unable to bind. Trying again in "
86 << conf
->ms_bind_retry_delay
<< " seconds " << dendl
;
87 sleep(conf
->ms_bind_retry_delay
);
90 if (listen_addr
.get_port()) {
91 worker
->center
.submit_to(
92 worker
->center
.get_id(),
93 [this, k
, &listen_addr
, &opts
, &r
]() {
94 r
= worker
->listen(listen_addr
, k
, opts
, &listen_sockets
[k
]);
97 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
98 << ": " << cpp_strerror(r
) << dendl
;
102 // try a range of ports
103 for (int port
= msgr
->cct
->_conf
->ms_bind_port_min
;
104 port
<= msgr
->cct
->_conf
->ms_bind_port_max
;
106 if (avoid_ports
.count(port
))
109 listen_addr
.set_port(port
);
110 worker
->center
.submit_to(
111 worker
->center
.get_id(),
112 [this, k
, &listen_addr
, &opts
, &r
]() {
113 r
= worker
->listen(listen_addr
, k
, opts
, &listen_sockets
[k
]);
119 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
120 << " on any port in range "
121 << msgr
->cct
->_conf
->ms_bind_port_min
122 << "-" << msgr
->cct
->_conf
->ms_bind_port_max
<< ": "
123 << cpp_strerror(r
) << dendl
;
124 listen_addr
.set_port(0); // Clear port before retry, otherwise we shall fail again.
127 ldout(msgr
->cct
, 10) << __func__
<< " bound on random port "
128 << listen_addr
<< dendl
;
135 // It seems that binding completely failed, return with that exit status
137 lderr(msgr
->cct
) << __func__
<< " was unable to bind after "
138 << conf
->ms_bind_retry_count
139 << " attempts: " << cpp_strerror(r
) << dendl
;
140 for (unsigned j
= 0; j
< k
; ++j
) {
141 // clean up previous bind
142 listen_sockets
[j
].abort_accept();
148 ldout(msgr
->cct
, 10) << __func__
<< " bound to " << *bound_addrs
<< dendl
;
152 void Processor::start()
154 ldout(msgr
->cct
, 1) << __func__
<< dendl
;
157 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
158 for (auto& listen_socket
: listen_sockets
) {
160 if (listen_socket
.fd() == -1) {
161 ldout(msgr
->cct
, 1) << __func__
162 << " Error: processor restart after listen_socket.fd closed. "
166 worker
->center
.create_file_event(listen_socket
.fd(), EVENT_READABLE
,
172 void Processor::accept()
175 opts
.nodelay
= msgr
->cct
->_conf
->ms_tcp_nodelay
;
176 opts
.rcbuf_size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
177 opts
.priority
= msgr
->get_socket_priority();
179 for (auto& listen_socket
: listen_sockets
) {
180 ldout(msgr
->cct
, 10) << __func__
<< " listen_fd=" << listen_socket
.fd()
182 unsigned accept_error_num
= 0;
186 ConnectedSocket cli_socket
;
188 if (!msgr
->get_stack()->support_local_listen_table())
189 w
= msgr
->get_stack()->get_worker();
192 int r
= listen_socket
.accept(&cli_socket
, opts
, &addr
, w
);
194 ldout(msgr
->cct
, 10) << __func__
<< " accepted incoming on sd "
195 << cli_socket
.fd() << dendl
;
198 w
, std::move(cli_socket
),
199 msgr
->get_myaddrs().v
[listen_socket
.get_addr_slot()],
201 accept_error_num
= 0;
207 } else if (r
== -EAGAIN
) {
209 } else if (r
== -EMFILE
|| r
== -ENFILE
) {
210 lderr(msgr
->cct
) << __func__
<< " open file descriptions limit reached sd = " << listen_socket
.fd()
211 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
212 if (++accept_error_num
> msgr
->cct
->_conf
->ms_max_accept_failures
) {
213 lderr(msgr
->cct
) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl
;
217 } else if (r
== -ECONNABORTED
) {
218 ldout(msgr
->cct
, 0) << __func__
<< " it was closed because of rst arrived sd = " << listen_socket
.fd()
219 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
222 lderr(msgr
->cct
) << __func__
<< " no incoming connection?"
223 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
224 if (++accept_error_num
> msgr
->cct
->_conf
->ms_max_accept_failures
) {
225 lderr(msgr
->cct
) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl
;
235 void Processor::stop()
237 ldout(msgr
->cct
,10) << __func__
<< dendl
;
239 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
240 for (auto& listen_socket
: listen_sockets
) {
242 worker
->center
.delete_file_event(listen_socket
.fd(), EVENT_READABLE
);
243 listen_socket
.abort_accept();
250 struct StackSingleton
{
252 std::shared_ptr
<NetworkStack
> stack
;
254 explicit StackSingleton(CephContext
*c
): cct(c
) {}
255 void ready(std::string
&type
) {
257 stack
= NetworkStack::create(cct
, type
);
265 class C_handle_reap
: public EventCallback
{
266 AsyncMessenger
*msgr
;
269 explicit C_handle_reap(AsyncMessenger
*m
): msgr(m
) {}
270 void do_request(uint64_t id
) override
{
271 // judge whether is a time event
280 AsyncMessenger::AsyncMessenger(CephContext
*cct
, entity_name_t name
,
281 const std::string
&type
, std::string mname
, uint64_t _nonce
)
282 : SimplePolicyMessenger(cct
, name
),
283 dispatch_queue(cct
, this, mname
),
286 std::string transport_type
= "posix";
287 if (type
.find("rdma") != std::string::npos
)
288 transport_type
= "rdma";
289 else if (type
.find("dpdk") != std::string::npos
)
290 transport_type
= "dpdk";
292 auto single
= &cct
->lookup_or_create_singleton_object
<StackSingleton
>(
293 "AsyncMessenger::NetworkStack::" + transport_type
, true, cct
);
294 single
->ready(transport_type
);
295 stack
= single
->stack
.get();
297 local_worker
= stack
->get_worker();
298 local_connection
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
,
299 local_worker
, true, true);
300 init_local_connection();
301 reap_handler
= new C_handle_reap(this);
302 unsigned processor_num
= 1;
303 if (stack
->support_local_listen_table())
304 processor_num
= stack
->get_num_worker();
305 for (unsigned i
= 0; i
< processor_num
; ++i
)
306 processors
.push_back(new Processor(this, stack
->get_worker(i
), cct
));
310 * Destroy the AsyncMessenger. Pretty simple since all the work is done
313 AsyncMessenger::~AsyncMessenger()
316 ceph_assert(!did_bind
); // either we didn't bind or we shut down the Processor
317 for (auto &&p
: processors
)
321 void AsyncMessenger::ready()
323 ldout(cct
,10) << __func__
<< " " << get_myaddrs() << dendl
;
327 int err
= bindv(pending_bind_addrs
, saved_public_addrs
);
329 lderr(cct
) << __func__
<< " postponed bind failed" << dendl
;
334 std::lock_guard l
{lock
};
335 for (auto &&p
: processors
)
337 dispatch_queue
.start();
340 int AsyncMessenger::shutdown()
342 ldout(cct
,10) << __func__
<< " " << get_myaddrs() << dendl
;
345 for (auto &&p
: processors
)
348 // break ref cycles on the loopback connection
349 local_connection
->clear_priv();
350 local_connection
->mark_down();
353 stop_cond
.notify_all();
360 int AsyncMessenger::bind(const entity_addr_t
&bind_addr
,
361 std::optional
<entity_addrvec_t
> public_addrs
)
363 ldout(cct
, 10) << __func__
<< " " << bind_addr
364 << " public " << public_addrs
<< dendl
;
365 // old bind() can take entity_addr_t(). new bindv() can take a
366 // 0.0.0.0-like address but needs type and family to be set.
368 if (a
== entity_addr_t()) {
369 a
.set_type(entity_addr_t::TYPE_LEGACY
);
370 if (cct
->_conf
->ms_bind_ipv6
) {
371 a
.set_family(AF_INET6
);
373 a
.set_family(AF_INET
);
376 return bindv(entity_addrvec_t(a
), public_addrs
);
379 int AsyncMessenger::bindv(const entity_addrvec_t
&bind_addrs
,
380 std::optional
<entity_addrvec_t
> public_addrs
)
384 if (!pending_bind
&& started
) {
385 ldout(cct
,10) << __func__
<< " already started" << dendl
;
390 ldout(cct
, 10) << __func__
<< " " << bind_addrs
391 << " public " << public_addrs
<< dendl
;
392 if (public_addrs
&& bind_addrs
!= public_addrs
) {
393 // for the sake of rebind() and the is-not-ready case let's
394 // store public_addrs. there is no point in that if public
395 // addrs are indifferent from bind_addrs.
396 saved_public_addrs
= std::move(public_addrs
);
399 if (!stack
->is_ready()) {
400 ldout(cct
, 10) << __func__
<< " Network Stack is not ready for bind yet - postponed" << dendl
;
401 pending_bind_addrs
= bind_addrs
;
410 std::set
<int> avoid_ports
;
411 entity_addrvec_t bound_addrs
;
413 for (auto &&p
: processors
) {
414 int r
= p
->bind(bind_addrs
, avoid_ports
, &bound_addrs
);
416 // Note: this is related to local tcp listen table problem.
417 // Posix(default kernel implementation) backend shares listen table
418 // in the kernel, so all threads can use the same listen table naturally
419 // and only one thread need to bind. But other backends(like dpdk) uses local
420 // listen table, we need to bind/listen tcp port for each worker. So if the
421 // first worker failed to bind, it could be think the normal error then handle
422 // it, like port is used case. But if the first worker successfully to bind
423 // but the second worker failed, it's not expected and we need to assert
430 _finish_bind(bind_addrs
, bound_addrs
);
434 int AsyncMessenger::rebind(const std::set
<int>& avoid_ports
)
436 ldout(cct
,1) << __func__
<< " rebind avoid " << avoid_ports
<< dendl
;
437 ceph_assert(did_bind
);
439 for (auto &&p
: processors
)
443 // adjust the nonce; we want our entity_addr_t to be truly unique.
445 ldout(cct
, 10) << __func__
<< " new nonce " << nonce
446 << " and addr " << get_myaddrs() << dendl
;
448 entity_addrvec_t bound_addrs
;
449 entity_addrvec_t bind_addrs
= get_myaddrs();
450 std::set
<int> new_avoid(avoid_ports
);
451 for (auto& a
: bind_addrs
.v
) {
452 new_avoid
.insert(a
.get_port());
455 ldout(cct
, 10) << __func__
<< " will try " << bind_addrs
456 << " and avoid ports " << new_avoid
<< dendl
;
458 for (auto &&p
: processors
) {
459 int r
= p
->bind(bind_addrs
, avoid_ports
, &bound_addrs
);
466 _finish_bind(bind_addrs
, bound_addrs
);
467 for (auto &&p
: processors
) {
473 int AsyncMessenger::client_bind(const entity_addr_t
&bind_addr
)
475 if (!cct
->_conf
->ms_bind_before_connect
)
477 std::lock_guard l
{lock
};
482 ldout(cct
, 10) << __func__
<< " already started" << dendl
;
485 ldout(cct
, 10) << __func__
<< " " << bind_addr
<< dendl
;
487 set_myaddrs(entity_addrvec_t(bind_addr
));
491 void AsyncMessenger::_finish_bind(const entity_addrvec_t
& bind_addrs
,
492 const entity_addrvec_t
& listen_addrs
)
494 set_myaddrs(bind_addrs
);
495 for (auto& a
: bind_addrs
.v
) {
496 if (!a
.is_blank_ip()) {
501 if (get_myaddrs().front().get_port() == 0) {
502 set_myaddrs(listen_addrs
);
504 entity_addrvec_t newaddrs
= *my_addrs
;
505 for (auto& a
: newaddrs
.v
) {
507 if (saved_public_addrs
) {
508 // transplantate network layer addresses while keeping ports
509 // (as they can be figured out by msgr from the allowed range [1])
510 // unless they are explicitly specified (NATing both IP/port?)
512 // [1]: the low-level `Processor::bind` scans for free ports in
513 // a range controlled by ms_bind_port_min and ms_bind_port_max
514 const auto& public_addr
=
515 saved_public_addrs
->addr_of_type(a
.get_type());
516 const auto public_port
= public_addr
.get_port();
517 const auto bound_port
= a
.get_port();
518 a
.set_sockaddr(public_addr
.get_sockaddr());
519 a
.set_port(public_port
== 0 ? bound_port
: public_port
);
522 set_myaddrs(newaddrs
);
524 init_local_connection();
526 ldout(cct
,1) << __func__
<< " bind my_addrs is " << get_myaddrs() << dendl
;
530 int AsyncMessenger::client_reset()
534 std::scoped_lock l
{lock
};
535 // adjust the nonce; we want our entity_addr_t to be truly unique.
537 ldout(cct
, 10) << __func__
<< " new nonce " << nonce
<< dendl
;
539 entity_addrvec_t newaddrs
= *my_addrs
;
540 for (auto& a
: newaddrs
.v
) {
543 set_myaddrs(newaddrs
);
544 _init_local_connection();
548 int AsyncMessenger::start()
550 std::scoped_lock l
{lock
};
551 ldout(cct
,1) << __func__
<< " start" << dendl
;
553 // register at least one entity, first!
554 ceph_assert(my_name
.type() >= 0);
556 ceph_assert(!started
);
561 entity_addrvec_t newaddrs
= *my_addrs
;
562 for (auto& a
: newaddrs
.v
) {
565 set_myaddrs(newaddrs
);
566 _init_local_connection();
572 void AsyncMessenger::wait()
575 std::unique_lock locker
{lock
};
580 stop_cond
.wait(locker
);
582 dispatch_queue
.shutdown();
583 if (dispatch_queue
.is_started()) {
584 ldout(cct
, 10) << __func__
<< ": waiting for dispatch queue" << dendl
;
585 dispatch_queue
.wait();
586 dispatch_queue
.discard_local();
587 ldout(cct
, 10) << __func__
<< ": dispatch queue is stopped" << dendl
;
590 // close all connections
591 shutdown_connections(false);
594 ldout(cct
, 10) << __func__
<< ": done." << dendl
;
595 ldout(cct
, 1) << __func__
<< " complete." << dendl
;
599 void AsyncMessenger::add_accept(Worker
*w
, ConnectedSocket cli_socket
,
600 const entity_addr_t
&listen_addr
,
601 const entity_addr_t
&peer_addr
)
603 std::lock_guard l
{lock
};
604 auto conn
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
, w
,
605 listen_addr
.is_msgr2(), false);
606 conn
->accept(std::move(cli_socket
), listen_addr
, peer_addr
);
607 accepting_conns
.insert(conn
);
610 AsyncConnectionRef
AsyncMessenger::create_connect(
611 const entity_addrvec_t
& addrs
, int type
, bool anon
)
613 ceph_assert(ceph_mutex_is_locked(lock
));
615 ldout(cct
, 10) << __func__
<< " " << addrs
616 << ", creating connection and registering" << dendl
;
618 // here is where we decide which of the addrs to connect to. always prefer
619 // the first one, if we support it.
620 entity_addr_t target
;
621 for (auto& a
: addrs
.v
) {
622 if (!a
.is_msgr2() && !a
.is_legacy()) {
625 // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
626 // trying it? for now, just pick whichever is listed first.
632 Worker
*w
= stack
->get_worker();
633 auto conn
= ceph::make_ref
<AsyncConnection
>(cct
, this, &dispatch_queue
, w
,
634 target
.is_msgr2(), false);
636 conn
->connect(addrs
, type
, target
);
638 anon_conns
.insert(conn
);
640 ceph_assert(!conns
.count(addrs
));
641 ldout(cct
, 10) << __func__
<< " " << conn
<< " " << addrs
<< " "
642 << *conn
->peer_addrs
<< dendl
;
645 w
->get_perf_counter()->inc(l_msgr_active_connections
);
651 ConnectionRef
AsyncMessenger::get_loopback_connection()
653 return local_connection
;
656 bool AsyncMessenger::should_use_msgr2()
658 // if we are bound to v1 only, and we are connecting to a v2 peer,
659 // we cannot use the peer's v2 address. otherwise the connection
660 // is assymetrical, because they would have to use v1 to connect
661 // to us, and we would use v2, and connection race detection etc
662 // would totally break down (among other things). or, the other
663 // end will be confused that we advertise ourselve with a v1
664 // address only (that we bound to) but connected with protocol v2.
665 return !did_bind
|| get_myaddrs().has_msgr2();
668 entity_addrvec_t
AsyncMessenger::_filter_addrs(const entity_addrvec_t
& addrs
)
670 if (!should_use_msgr2()) {
671 ldout(cct
, 10) << __func__
<< " " << addrs
<< " limiting to v1 ()" << dendl
;
673 for (auto& i
: addrs
.v
) {
685 int AsyncMessenger::send_to(Message
*m
, int type
, const entity_addrvec_t
& addrs
)
690 #if defined(WITH_EVENTTRACE)
691 if (m
->get_type() == CEPH_MSG_OSD_OP
)
692 OID_EVENT_TRACE(((MOSDOp
*)m
)->get_oid().name
.c_str(), "SEND_MSG_OSD_OP");
693 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
694 OID_EVENT_TRACE(((MOSDOpReply
*)m
)->get_oid().name
.c_str(), "SEND_MSG_OSD_OP_REPLY");
697 ldout(cct
, 1) << __func__
<< "--> " << ceph_entity_type_name(type
) << " "
698 << addrs
<< " -- " << *m
<< " -- ?+"
699 << m
->get_data().length() << " " << m
<< dendl
;
702 ldout(cct
,0) << __func__
<< " message " << *m
703 << " with empty dest " << addrs
<< dendl
;
708 if (cct
->_conf
->ms_dump_on_send
) {
709 m
->encode(-1, MSG_CRC_ALL
);
710 ldout(cct
, 0) << __func__
<< " submit_message " << *m
<< "\n";
711 m
->get_payload().hexdump(*_dout
);
712 if (m
->get_data().length() > 0) {
713 *_dout
<< " data:\n";
714 m
->get_data().hexdump(*_dout
);
720 connect_to(type
, addrs
, false)->send_message(m
);
724 ConnectionRef
AsyncMessenger::connect_to(int type
,
725 const entity_addrvec_t
& addrs
,
726 bool anon
, bool not_local_dest
)
728 if (!not_local_dest
) {
729 if (*my_addrs
== addrs
||
730 (addrs
.v
.size() == 1 &&
731 my_addrs
->contains(addrs
.front()))) {
733 return local_connection
;
737 auto av
= _filter_addrs(addrs
);
738 std::lock_guard l
{lock
};
740 return create_connect(av
, type
, anon
);
743 AsyncConnectionRef conn
= _lookup_conn(av
);
745 ldout(cct
, 10) << __func__
<< " " << av
<< " existing " << conn
<< dendl
;
747 conn
= create_connect(av
, type
, false);
748 ldout(cct
, 10) << __func__
<< " " << av
<< " new " << conn
<< dendl
;
755 * If my_addr doesn't have an IP set, this function
756 * will fill it in from the passed addr. Otherwise it does nothing and returns.
758 bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t
&addrs
)
760 ldout(cct
,1) << __func__
<< " " << addrs
<< dendl
;
762 std::lock_guard l
{lock
};
764 entity_addrvec_t newaddrs
= *my_addrs
;
765 for (auto& a
: newaddrs
.v
) {
766 if (a
.is_blank_ip()) {
767 int type
= a
.get_type();
768 int port
= a
.get_port();
769 uint32_t nonce
= a
.get_nonce();
770 for (auto& b
: addrs
.v
) {
771 if (a
.get_family() == b
.get_family()) {
772 ldout(cct
,1) << __func__
<< " assuming my addr " << a
773 << " matches provided addr " << b
<< dendl
;
784 set_myaddrs(newaddrs
);
786 _init_local_connection();
788 ldout(cct
,1) << __func__
<< " now " << *my_addrs
<< dendl
;
792 void AsyncMessenger::shutdown_connections(bool queue_reset
)
794 ldout(cct
,1) << __func__
<< " " << dendl
;
795 std::lock_guard l
{lock
};
796 for (const auto& c
: accepting_conns
) {
797 ldout(cct
, 5) << __func__
<< " accepting_conn " << c
<< dendl
;
798 c
->stop(queue_reset
);
800 accepting_conns
.clear();
802 for (const auto& [e
, c
] : conns
) {
803 ldout(cct
, 5) << __func__
<< " mark down " << e
<< " " << c
<< dendl
;
804 c
->stop(queue_reset
);
808 for (const auto& c
: anon_conns
) {
809 ldout(cct
, 5) << __func__
<< " mark down " << c
<< dendl
;
810 c
->stop(queue_reset
);
815 std::lock_guard l
{deleted_lock
};
816 for (const auto& c
: deleted_conns
) {
817 ldout(cct
, 5) << __func__
<< " delete " << c
<< dendl
;
818 c
->get_perf_counter()->dec(l_msgr_active_connections
);
820 deleted_conns
.clear();
824 void AsyncMessenger::mark_down_addrs(const entity_addrvec_t
& addrs
)
826 std::lock_guard l
{lock
};
827 const AsyncConnectionRef
& conn
= _lookup_conn(addrs
);
829 ldout(cct
, 1) << __func__
<< " " << addrs
<< " -- " << conn
<< dendl
;
832 ldout(cct
, 1) << __func__
<< " " << addrs
<< " -- connection dne" << dendl
;
836 int AsyncMessenger::get_proto_version(int peer_type
, bool connect
) const
838 int my_type
= my_name
.type();
840 // set reply protocol version
841 if (peer_type
== my_type
) {
843 return cluster_protocol
;
846 switch (connect
? peer_type
: my_type
) {
847 case CEPH_ENTITY_TYPE_OSD
: return CEPH_OSDC_PROTOCOL
;
848 case CEPH_ENTITY_TYPE_MDS
: return CEPH_MDSC_PROTOCOL
;
849 case CEPH_ENTITY_TYPE_MON
: return CEPH_MONC_PROTOCOL
;
855 int AsyncMessenger::accept_conn(const AsyncConnectionRef
& conn
)
857 std::lock_guard l
{lock
};
858 if (conn
->policy
.server
&&
859 conn
->policy
.lossy
&&
860 !conn
->policy
.register_lossy_clients
) {
861 anon_conns
.insert(conn
);
862 conn
->get_perf_counter()->inc(l_msgr_active_connections
);
865 auto it
= conns
.find(*conn
->peer_addrs
);
866 if (it
!= conns
.end()) {
867 auto& existing
= it
->second
;
869 // lazy delete, see "deleted_conns"
870 // If conn already in, we will return 0
871 std::lock_guard l
{deleted_lock
};
872 if (deleted_conns
.erase(existing
)) {
873 it
->second
->get_perf_counter()->dec(l_msgr_active_connections
);
875 } else if (conn
!= existing
) {
879 ldout(cct
, 10) << __func__
<< " " << conn
<< " " << *conn
->peer_addrs
<< dendl
;
880 conns
[*conn
->peer_addrs
] = conn
;
881 conn
->get_perf_counter()->inc(l_msgr_active_connections
);
882 accepting_conns
.erase(conn
);
887 bool AsyncMessenger::learned_addr(const entity_addr_t
&peer_addr_for_me
)
889 // be careful here: multiple threads may block here, and readers of
890 // my_addr do NOT hold any lock.
892 // this always goes from true -> false under the protection of the
893 // mutex. if it is already false, we need not retake the mutex at
897 std::lock_guard
l(lock
);
899 if (my_addrs
->empty()) {
900 auto a
= peer_addr_for_me
;
901 a
.set_type(entity_addr_t::TYPE_ANY
);
906 set_myaddrs(entity_addrvec_t(a
));
907 ldout(cct
,10) << __func__
<< " had no addrs" << dendl
;
909 // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
910 entity_addrvec_t newaddrs
= *my_addrs
;
911 for (auto& a
: newaddrs
.v
) {
912 if (a
.is_blank_ip() &&
913 a
.get_family() == peer_addr_for_me
.get_family()) {
914 entity_addr_t t
= peer_addr_for_me
;
916 t
.set_type(entity_addr_t::TYPE_ANY
);
919 t
.set_type(a
.get_type());
920 t
.set_port(a
.get_port());
922 t
.set_nonce(a
.get_nonce());
923 ldout(cct
,10) << __func__
<< " " << a
<< " -> " << t
<< dendl
;
927 set_myaddrs(newaddrs
);
929 ldout(cct
, 1) << __func__
<< " learned my addr " << *my_addrs
930 << " (peer_addr_for_me " << peer_addr_for_me
<< ")" << dendl
;
931 _init_local_connection();
938 void AsyncMessenger::reap_dead()
940 ldout(cct
, 1) << __func__
<< " start" << dendl
;
942 std::lock_guard l1
{lock
};
945 std::lock_guard l2
{deleted_lock
};
946 for (auto& c
: deleted_conns
) {
947 ldout(cct
, 5) << __func__
<< " delete " << c
<< dendl
;
948 auto conns_it
= conns
.find(*c
->peer_addrs
);
949 if (conns_it
!= conns
.end() && conns_it
->second
== c
)
950 conns
.erase(conns_it
);
951 accepting_conns
.erase(c
);
953 c
->get_perf_counter()->dec(l_msgr_active_connections
);
955 deleted_conns
.clear();