-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
- return *_dout << "-- " << m->get_myaddr() << " ";
+ return *_dout << "-- " << m->get_myaddrs() << " ";
}
static ostream& _prefix(std::ostream *_dout, Processor *p) {
public:
explicit C_processor_accept(Processor *p): pro(p) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
pro->accept();
}
};
: msgr(r), net(c), worker(w),
listen_handler(new C_processor_accept(this)) {}
-int Processor::bind(const entity_addr_t &bind_addr,
+int Processor::bind(const entity_addrvec_t &bind_addrs,
const set<int>& avoid_ports,
- entity_addr_t* bound_addr)
+ entity_addrvec_t* bound_addrs)
{
- const md_config_t *conf = msgr->cct->_conf;
- // bind to a socket
- ldout(msgr->cct, 10) << __func__ << dendl;
-
- int family;
- switch (bind_addr.get_family()) {
- case AF_INET:
- case AF_INET6:
- family = bind_addr.get_family();
- break;
-
- default:
- // bind_addr is empty
- family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
- }
+ const auto& conf = msgr->cct->_conf;
+ // bind to socket(s)
+ ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl;
SocketOptions opts;
opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
- // use whatever user specified (if anything)
- entity_addr_t listen_addr = bind_addr;
- if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) {
- listen_addr.set_type(entity_addr_t::TYPE_LEGACY);
- }
- listen_addr.set_family(family);
+ listen_sockets.resize(bind_addrs.v.size());
+ *bound_addrs = bind_addrs;
- /* bind to port */
- int r = -1;
+ for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
+ auto& listen_addr = bound_addrs->v[k];
- for (int i = 0; i < conf->ms_bind_retry_count; i++) {
- if (i > 0) {
- lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
- << conf->ms_bind_retry_delay << " seconds " << dendl;
- sleep(conf->ms_bind_retry_delay);
- }
+ /* bind to port */
+ int r = -1;
- if (listen_addr.get_port()) {
- worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
- r = worker->listen(listen_addr, opts, &listen_socket);
- }, false);
- if (r < 0) {
- lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
- << ": " << cpp_strerror(r) << dendl;
- continue;
+ for (int i = 0; i < conf->ms_bind_retry_count; i++) {
+ if (i > 0) {
+ lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
+ << conf->ms_bind_retry_delay << " seconds " << dendl;
+ sleep(conf->ms_bind_retry_delay);
}
- } else {
- // try a range of ports
- for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) {
- if (avoid_ports.count(port))
- continue;
-
- listen_addr.set_port(port);
- worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
- r = worker->listen(listen_addr, opts, &listen_socket);
- }, false);
- if (r == 0)
- break;
+
+ if (listen_addr.get_port()) {
+ worker->center.submit_to(
+ worker->center.get_id(),
+ [this, k, &listen_addr, &opts, &r]() {
+ r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
+ }, false);
+ if (r < 0) {
+ lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
+ << ": " << cpp_strerror(r) << dendl;
+ continue;
+ }
+ } else {
+ // try a range of ports
+ for (int port = msgr->cct->_conf->ms_bind_port_min;
+ port <= msgr->cct->_conf->ms_bind_port_max;
+ port++) {
+ if (avoid_ports.count(port))
+ continue;
+
+ listen_addr.set_port(port);
+ worker->center.submit_to(
+ worker->center.get_id(),
+ [this, k, &listen_addr, &opts, &r]() {
+ r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
+ }, false);
+ if (r == 0)
+ break;
+ }
+ if (r < 0) {
+ lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
+ << " on any port in range "
+ << msgr->cct->_conf->ms_bind_port_min
+ << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
+ << cpp_strerror(r) << dendl;
+ listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
+ continue;
+ }
+ ldout(msgr->cct, 10) << __func__ << " bound on random port "
+ << listen_addr << dendl;
}
- if (r < 0) {
- lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
- << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
- << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
- << cpp_strerror(r) << dendl;
- listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
- continue;
+ if (r == 0) {
+ break;
}
- ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl;
}
- if (r == 0)
- break;
- }
- // It seems that binding completely failed, return with that exit status
- if (r < 0) {
- lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
- << " attempts: " << cpp_strerror(r) << dendl;
- return r;
+
+ // It seems that binding completely failed, return with that exit status
+ if (r < 0) {
+ lderr(msgr->cct) << __func__ << " was unable to bind after "
+ << conf->ms_bind_retry_count
+ << " attempts: " << cpp_strerror(r) << dendl;
+ for (unsigned j = 0; j < k; ++j) {
+ // clean up previous bind
+ listen_sockets[j].abort_accept();
+ }
+ return r;
+ }
}
- ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
- *bound_addr = listen_addr;
+ ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl;
return 0;
}
ldout(msgr->cct, 1) << __func__ << dendl;
// start thread
- if (listen_socket) {
- worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false);
- }
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ for (auto& l : listen_sockets) {
+ if (l) {
+ worker->center.create_file_event(l.fd(), EVENT_READABLE,
+ listen_handler); }
+ }
+ }, false);
}
void Processor::accept()
{
- ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl;
SocketOptions opts;
opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
opts.priority = msgr->get_socket_priority();
- unsigned accept_error_num = 0;
-
- while (true) {
- entity_addr_t addr;
- ConnectedSocket cli_socket;
- Worker *w = worker;
- if (!msgr->get_stack()->support_local_listen_table())
- w = msgr->get_stack()->get_worker();
- int r = listen_socket.accept(&cli_socket, opts, &addr, w);
- if (r == 0) {
- ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;
-
- msgr->add_accept(w, std::move(cli_socket), addr);
- continue;
- } else {
- if (r == -EINTR) {
- continue;
- } else if (r == -EAGAIN) {
- break;
- } else if (r == -EMFILE || r == -ENFILE) {
- lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
- << " errno " << r << " " << cpp_strerror(r) << dendl;
- if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
- lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
- ceph_abort();
- }
+
+ for (auto& listen_socket : listen_sockets) {
+ ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
+ << dendl;
+ unsigned accept_error_num = 0;
+
+ while (true) {
+ entity_addr_t addr;
+ ConnectedSocket cli_socket;
+ Worker *w = worker;
+ if (!msgr->get_stack()->support_local_listen_table())
+ w = msgr->get_stack()->get_worker();
+ else
+ ++w->references;
+ int r = listen_socket.accept(&cli_socket, opts, &addr, w);
+ if (r == 0) {
+ ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
+ << cli_socket.fd() << dendl;
+
+ msgr->add_accept(
+ w, std::move(cli_socket),
+ msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
+ addr);
+ accept_error_num = 0;
continue;
- } else if (r == -ECONNABORTED) {
- ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
- << " errno " << r << " " << cpp_strerror(r) << dendl;
- continue;
} else {
- lderr(msgr->cct) << __func__ << " no incoming connection?"
- << " errno " << r << " " << cpp_strerror(r) << dendl;
- if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
- lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
- ceph_abort();
+ if (r == -EINTR) {
+ continue;
+ } else if (r == -EAGAIN) {
+ break;
+ } else if (r == -EMFILE || r == -ENFILE) {
+ lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
+ << " errno " << r << " " << cpp_strerror(r) << dendl;
+ if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
+ lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
+ ceph_abort();
+ }
+ continue;
+ } else if (r == -ECONNABORTED) {
+ ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
+ << " errno " << r << " " << cpp_strerror(r) << dendl;
+ continue;
+ } else {
+ lderr(msgr->cct) << __func__ << " no incoming connection?"
+ << " errno " << r << " " << cpp_strerror(r) << dendl;
+ if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
+ lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
+ ceph_abort();
+ }
+ continue;
}
- continue;
}
}
}
{
ldout(msgr->cct,10) << __func__ << dendl;
- if (listen_socket) {
- worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
- listen_socket.abort_accept();
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ for (auto& listen_socket : listen_sockets) {
+ if (listen_socket) {
+ worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
+ listen_socket.abort_accept();
+ }
+ }
}, false);
- }
}
CephContext *cct;
std::shared_ptr<NetworkStack> stack;
- StackSingleton(CephContext *c): cct(c) {}
+ explicit StackSingleton(CephContext *c): cct(c) {}
void ready(std::string &type) {
if (!stack)
stack = NetworkStack::create(cct, type);
public:
explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
// judge whether is a time event
msgr->reap_dead();
}
else if (type.find("dpdk") != std::string::npos)
transport_type = "dpdk";
- ceph_spin_init(&global_seq_lock);
- StackSingleton *single;
- cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+transport_type);
+ auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
+ "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
single->ready(transport_type);
stack = single->stack.get();
stack->start();
local_worker = stack->get_worker();
- local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
+ local_connection = new AsyncConnection(cct, this, &dispatch_queue,
+ local_worker, true, true);
init_local_connection();
reap_handler = new C_handle_reap(this);
unsigned processor_num = 1;
AsyncMessenger::~AsyncMessenger()
{
delete reap_handler;
- assert(!did_bind); // either we didn't bind or we shut down the Processor
+ ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
local_connection->mark_down();
for (auto &&p : processors)
delete p;
void AsyncMessenger::ready()
{
- ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+ ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
stack->ready();
if (pending_bind) {
- int err = bind(pending_bind_addr);
+ int err = bindv(pending_bind_addrs);
if (err) {
lderr(cct) << __func__ << " postponed bind failed" << dendl;
ceph_abort();
int AsyncMessenger::shutdown()
{
- ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+ ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
// done! clean up.
for (auto &&p : processors)
return 0;
}
-
int AsyncMessenger::bind(const entity_addr_t &bind_addr)
+{
+ ldout(cct,10) << __func__ << " " << bind_addr << dendl;
+ // old bind() can take entity_addr_t(). new bindv() can take a
+ // 0.0.0.0-like address but needs type and family to be set.
+ auto a = bind_addr;
+ if (a == entity_addr_t()) {
+ a.set_type(entity_addr_t::TYPE_LEGACY);
+ if (cct->_conf->ms_bind_ipv6) {
+ a.set_family(AF_INET6);
+ } else {
+ a.set_family(AF_INET);
+ }
+ }
+ return bindv(entity_addrvec_t(a));
+}
+
+int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
{
lock.Lock();
return -1;
}
- ldout(cct,10) << __func__ << " bind " << bind_addr << dendl;
+ ldout(cct,10) << __func__ << " " << bind_addrs << dendl;
if (!stack->is_ready()) {
ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
- pending_bind_addr = bind_addr;
+ pending_bind_addrs = bind_addrs;
pending_bind = true;
lock.Unlock();
return 0;
// bind to a socket
set<int> avoid_ports;
- entity_addr_t bound_addr;
+ entity_addrvec_t bound_addrs;
unsigned i = 0;
for (auto &&p : processors) {
- int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+ int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
if (r) {
// Note: this is related to local tcp listen table problem.
// Posix(default kernel implementation) backend shares listen table
// it, like port is used case. But if the first worker successfully to bind
// but the second worker failed, it's not expected and we need to assert
// here
- assert(i == 0);
+ ceph_assert(i == 0);
return r;
}
++i;
}
- _finish_bind(bind_addr, bound_addr);
+ _finish_bind(bind_addrs, bound_addrs);
return 0;
}
int AsyncMessenger::rebind(const set<int>& avoid_ports)
{
ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
- assert(did_bind);
+ ceph_assert(did_bind);
for (auto &&p : processors)
p->stop();
// adjust the nonce; we want our entity_addr_t to be truly unique.
nonce += 1000000;
ldout(cct, 10) << __func__ << " new nonce " << nonce
- << " and inst " << get_myinst() << dendl;
+ << " and addr " << get_myaddrs() << dendl;
- entity_addr_t bound_addr;
- entity_addr_t bind_addr = get_myaddr();
- bind_addr.set_port(0);
+ entity_addrvec_t bound_addrs;
+ entity_addrvec_t bind_addrs = get_myaddrs();
set<int> new_avoid(avoid_ports);
- new_avoid.insert(bind_addr.get_port());
- ldout(cct, 10) << __func__ << " will try " << bind_addr
+ for (auto& a : bind_addrs.v) {
+ new_avoid.insert(a.get_port());
+ a.set_port(0);
+ }
+ ldout(cct, 10) << __func__ << " will try " << bind_addrs
<< " and avoid ports " << new_avoid << dendl;
unsigned i = 0;
for (auto &&p : processors) {
- int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+ int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
if (r) {
- assert(i == 0);
+ ceph_assert(i == 0);
return r;
}
++i;
}
- _finish_bind(bind_addr, bound_addr);
+ _finish_bind(bind_addrs, bound_addrs);
for (auto &&p : processors) {
p->start();
}
return 0;
Mutex::Locker l(lock);
if (did_bind) {
- assert(my_inst.addr == bind_addr);
return 0;
}
if (started) {
}
ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
- set_myaddr(bind_addr);
+ set_myaddrs(entity_addrvec_t(bind_addr));
return 0;
}
-void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
- const entity_addr_t& listen_addr)
+void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
+ const entity_addrvec_t& listen_addrs)
{
- set_myaddr(bind_addr);
- if (bind_addr != entity_addr_t())
- learned_addr(bind_addr);
+ set_myaddrs(bind_addrs);
+ for (auto& a : bind_addrs.v) {
+ if (!a.is_blank_ip()) {
+ learned_addr(a);
+ }
+ }
- if (get_myaddr().get_port() == 0) {
- set_myaddr(listen_addr);
+ if (get_myaddrs().front().get_port() == 0) {
+ set_myaddrs(listen_addrs);
}
- entity_addr_t addr = get_myaddr();
- addr.set_nonce(nonce);
- set_myaddr(addr);
+ entity_addrvec_t newaddrs = *my_addrs;
+ for (auto& a : newaddrs.v) {
+ a.set_nonce(nonce);
+ }
+ set_myaddrs(newaddrs);
init_local_connection();
- ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl;
+ ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
did_bind = true;
}
ldout(cct,1) << __func__ << " start" << dendl;
// register at least one entity, first!
- assert(my_inst.name.type() >= 0);
+ ceph_assert(my_name.type() >= 0);
- assert(!started);
+ ceph_assert(!started);
started = true;
stopped = false;
if (!did_bind) {
- my_inst.addr.nonce = nonce;
+ entity_addrvec_t newaddrs = *my_addrs;
+ for (auto& a : newaddrs.v) {
+ a.nonce = nonce;
+ }
+ set_myaddrs(newaddrs);
_init_local_connection();
}
started = false;
}
-void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
+void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
+ const entity_addr_t &listen_addr,
+ const entity_addr_t &peer_addr)
{
lock.Lock();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
- conn->accept(std::move(cli_socket), addr);
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+ listen_addr.is_msgr2(), false);
+ conn->accept(std::move(cli_socket), listen_addr, peer_addr);
accepting_conns.insert(conn);
lock.Unlock();
}
-AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
+AsyncConnectionRef AsyncMessenger::create_connect(
+ const entity_addrvec_t& addrs, int type)
{
- assert(lock.is_locked());
- assert(addr != my_inst.addr);
+ ceph_assert(lock.is_locked());
- ldout(cct, 10) << __func__ << " " << addr
+ ldout(cct, 10) << __func__ << " " << addrs
<< ", creating connection and registering" << dendl;
+ // here is where we decide which of the addrs to connect to. always prefer
+ // the first one, if we support it.
+ entity_addr_t target;
+ for (auto& a : addrs.v) {
+ if (!a.is_msgr2() && !a.is_legacy()) {
+ continue;
+ }
+ // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
+ // trying it? for now, just pick whichever is listed first.
+ target = a;
+ break;
+ }
+
// create connection
Worker *w = stack->get_worker();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
- conn->connect(addr, type);
- assert(!conns.count(addr));
- conns[addr] = conn;
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+ target.is_msgr2(), false);
+ conn->connect(addrs, type, target);
+ ceph_assert(!conns.count(addrs));
+ ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
+ << *conn->peer_addrs << dendl;
+ conns[addrs] = conn;
w->get_perf_counter()->inc(l_msgr_active_connections);
return conn;
}
-ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
-{
- Mutex::Locker l(lock);
- if (my_inst.addr == dest.addr) {
- // local
- return local_connection;
- }
- AsyncConnectionRef conn = _lookup_conn(dest.addr);
- if (conn) {
- ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl;
- } else {
- conn = create_connect(dest.addr, dest.name.type());
- ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl;
- }
+ConnectionRef AsyncMessenger::get_loopback_connection()
+{
+ return local_connection;
+}
- return conn;
+bool AsyncMessenger::should_use_msgr2()
+{
+ // if we are bound to v1 only, and we are connecting to a v2 peer,
+ // we cannot use the peer's v2 address. otherwise the connection
+ // is assymetrical, because they would have to use v1 to connect
+ // to us, and we would use v2, and connection race detection etc
+ // would totally break down (among other things). or, the other
+ // end will be confused that we advertise ourselve with a v1
+ // address only (that we bound to) but connected with protocol v2.
+ return !did_bind || get_myaddrs().has_msgr2();
}
-ConnectionRef AsyncMessenger::get_loopback_connection()
+entity_addrvec_t AsyncMessenger::_filter_addrs(int type,
+ const entity_addrvec_t& addrs)
{
- return local_connection;
+ if (!should_use_msgr2()) {
+ ldout(cct, 10) << __func__ << " " << addrs << " type " << type
+ << " limiting to v1 ()" << dendl;
+ entity_addrvec_t r;
+ for (auto& i : addrs.v) {
+ if (i.is_msgr2()) {
+ continue;
+ }
+ r.v.push_back(i);
+ }
+ return r;
+ } else {
+ return addrs;
+ }
}
-int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
+int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
{
- FUNCTRACE();
- assert(m);
+ Mutex::Locker l(lock);
+
+ FUNCTRACE(cct);
+ ceph_assert(m);
if (m->get_type() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
- ldout(cct, 1) << __func__ << "--> " << dest.name << " "
- << dest.addr << " -- " << *m << " -- ?+"
+ ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " "
+ << addrs << " -- " << *m << " -- ?+"
<< m->get_data().length() << " " << m << dendl;
- if (dest.addr == entity_addr_t()) {
+ if (addrs.empty()) {
ldout(cct,0) << __func__ << " message " << *m
- << " with empty dest " << dest.addr << dendl;
+ << " with empty dest " << addrs << dendl;
m->put();
return -EINVAL;
}
- AsyncConnectionRef conn = _lookup_conn(dest.addr);
- submit_message(m, conn, dest.addr, dest.name.type());
+ auto av = _filter_addrs(type, addrs);
+ AsyncConnectionRef conn = _lookup_conn(av);
+ submit_message(m, conn, av, type);
return 0;
}
+ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs)
+{
+ Mutex::Locker l(lock);
+ if (*my_addrs == addrs ||
+ (addrs.v.size() == 1 &&
+ my_addrs->contains(addrs.front()))) {
+ // local
+ return local_connection;
+ }
+
+ auto av = _filter_addrs(type, addrs);
+
+ AsyncConnectionRef conn = _lookup_conn(av);
+ if (conn) {
+ ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
+ } else {
+ conn = create_connect(av, type);
+ ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
+ }
+
+ return conn;
+}
+
void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
- const entity_addr_t& dest_addr, int dest_type)
+ const entity_addrvec_t& dest_addrs,
+ int dest_type)
{
if (cct->_conf->ms_dump_on_send) {
m->encode(-1, MSG_CRC_ALL);
- ldout(cct, 0) << __func__ << "submit_message " << *m << "\n";
+ ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
m->get_payload().hexdump(*_dout);
if (m->get_data().length() > 0) {
*_dout << " data:\n";
}
// local?
- if (my_inst.addr == dest_addr) {
+ if (*my_addrs == dest_addrs ||
+ (dest_addrs.v.size() == 1 &&
+ my_addrs->contains(dest_addrs.front()))) {
// local
local_connection->send_message(m);
return ;
// remote, no existing connection.
const Policy& policy = get_policy(dest_type);
if (policy.server) {
- ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addr
+ ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addrs
<< ", lossy server for target type "
<< ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
m->put();
} else {
- ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl;
- con = create_connect(dest_addr, dest_type);
+ ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
+ << ", new connection." << dendl;
+ con = create_connect(dest_addrs, dest_type);
con->send_message(m);
}
}
/**
- * If my_inst.addr doesn't have an IP set, this function
+ * If my_addr doesn't have an IP set, this function
* will fill it in from the passed addr. Otherwise it does nothing and returns.
*/
-void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr)
+bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
{
+ ldout(cct,1) << __func__ << " " << addrs << dendl;
+ bool ret = false;
Mutex::Locker l(lock);
- if (my_inst.addr.is_blank_ip()) {
- int port = my_inst.addr.get_port();
- my_inst.addr.u = addr.u;
- my_inst.addr.set_port(port);
+
+ entity_addrvec_t newaddrs = *my_addrs;
+ for (auto& a : newaddrs.v) {
+ if (a.is_blank_ip()) {
+ int type = a.get_type();
+ int port = a.get_port();
+ uint32_t nonce = a.get_nonce();
+ for (auto& b : addrs.v) {
+ if (a.get_family() == b.get_family()) {
+ ldout(cct,1) << __func__ << " assuming my addr " << a
+ << " matches provided addr " << b << dendl;
+ a = b;
+ a.set_nonce(nonce);
+ a.set_type(type);
+ a.set_port(port);
+ ret = true;
+ break;
+ }
+ }
+ }
+ }
+ set_myaddrs(newaddrs);
+ if (ret) {
_init_local_connection();
}
+ ldout(cct,1) << __func__ << " now " << *my_addrs << dendl;
+ return ret;
}
-void AsyncMessenger::set_addr(const entity_addr_t &addr)
+void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
{
Mutex::Locker l(lock);
- entity_addr_t t = addr;
- t.set_nonce(nonce);
- set_myaddr(t);
+ auto t = addrs;
+ for (auto& a : t.v) {
+ a.set_nonce(nonce);
+ }
+ set_myaddrs(t);
_init_local_connection();
}
accepting_conns.clear();
while (!conns.empty()) {
- ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
+ auto it = conns.begin();
AsyncConnectionRef p = it->second;
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
lock.Unlock();
}
-void AsyncMessenger::mark_down(const entity_addr_t& addr)
+void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
{
lock.Lock();
- AsyncConnectionRef p = _lookup_conn(addr);
+ AsyncConnectionRef p = _lookup_conn(addrs);
if (p) {
- ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
+ ldout(cct, 1) << __func__ << " " << addrs << " -- " << p << dendl;
p->stop(true);
} else {
- ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
+ ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
}
lock.Unlock();
}
int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
{
- int my_type = my_inst.name.type();
+ int my_type = my_name.type();
// set reply protocol version
if (peer_type == my_type) {
return 0;
}
-void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
+{
+ Mutex::Locker l(lock);
+ auto it = conns.find(*conn->peer_addrs);
+ if (it != conns.end()) {
+ AsyncConnectionRef existing = it->second;
+
+ // lazy delete, see "deleted_conns"
+ // If conn already in, we will return 0
+ Mutex::Locker l(deleted_lock);
+ if (deleted_conns.erase(existing)) {
+ existing->get_perf_counter()->dec(l_msgr_active_connections);
+ conns.erase(it);
+ } else if (conn != existing) {
+ return -1;
+ }
+ }
+ ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
+ conns[*conn->peer_addrs] = conn;
+ conn->get_perf_counter()->inc(l_msgr_active_connections);
+ accepting_conns.erase(conn);
+ return 0;
+}
+
+
+bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{
// be careful here: multiple threads may block here, and readers of
- // my_inst.addr do NOT hold any lock.
+ // my_addr do NOT hold any lock.
// this always goes from true -> false under the protection of the
// mutex. if it is already false, we need not retake the mutex at
// all.
if (!need_addr)
- return ;
- lock.Lock();
+ return false;
+ std::lock_guard l(lock);
if (need_addr) {
- need_addr = false;
- entity_addr_t t = peer_addr_for_me;
- t.set_port(my_inst.addr.get_port());
- t.set_nonce(my_inst.addr.get_nonce());
- my_inst.addr = t;
- ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
+ if (my_addrs->empty()) {
+ auto a = peer_addr_for_me;
+ a.set_type(entity_addr_t::TYPE_ANY);
+ a.set_nonce(nonce);
+ if (!did_bind) {
+ a.set_port(0);
+ }
+ set_myaddrs(entity_addrvec_t(a));
+ ldout(cct,10) << __func__ << " had no addrs" << dendl;
+ } else {
+ // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
+ entity_addrvec_t newaddrs = *my_addrs;
+ for (auto& a : newaddrs.v) {
+ if (a.is_blank_ip() &&
+ a.get_family() == peer_addr_for_me.get_family()) {
+ entity_addr_t t = peer_addr_for_me;
+ if (!did_bind) {
+ t.set_type(entity_addr_t::TYPE_ANY);
+ t.set_port(0);
+ } else {
+ t.set_type(a.get_type());
+ t.set_port(a.get_port());
+ }
+ t.set_nonce(a.get_nonce());
+ ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
+ a = t;
+ }
+ }
+ set_myaddrs(newaddrs);
+ }
+ ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs
+ << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
_init_local_connection();
+ need_addr = false;
+ return true;
}
- lock.Unlock();
+ return false;
}
int AsyncMessenger::reap_dead()
auto it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
- auto conns_it = conns.find(p->peer_addr);
+ auto conns_it = conns.find(*p->peer_addrs);
if (conns_it != conns.end() && conns_it->second == p)
conns.erase(conns_it);
accepting_conns.erase(p);