]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/AsyncMessenger.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / AsyncMessenger.cc
index abe0b474170ec42ca4139a9d6c892e27777ab43a..f6bcaa9b794eac33d001b89b96152b948471d98b 100644 (file)
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 /*
  * Ceph - scalable distributed file system
@@ -33,7 +33,7 @@
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
 static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
-  return *_dout << "-- " << m->get_myaddr() << " ";
+  return *_dout << "-- " << m->get_myaddrs() << " ";
 }
 
 static ostream& _prefix(std::ostream *_dout, Processor *p) {
@@ -50,7 +50,7 @@ class Processor::C_processor_accept : public EventCallback {
 
  public:
   explicit C_processor_accept(Processor *p): pro(p) {}
-  void do_request(int id) override {
+  void do_request(uint64_t id) override {
     pro->accept();
   }
 };
@@ -59,91 +59,93 @@ Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
   : msgr(r), net(c), worker(w),
     listen_handler(new C_processor_accept(this)) {}
 
-int Processor::bind(const entity_addr_t &bind_addr,
+int Processor::bind(const entity_addrvec_t &bind_addrs,
                    const set<int>& avoid_ports,
-                   entity_addr_t* bound_addr)
+                   entity_addrvec_t* bound_addrs)
 {
-  const md_config_t *conf = msgr->cct->_conf;
-  // bind to a socket
-  ldout(msgr->cct, 10) << __func__ << dendl;
-
-  int family;
-  switch (bind_addr.get_family()) {
-    case AF_INET:
-    case AF_INET6:
-      family = bind_addr.get_family();
-      break;
-
-    default:
-      // bind_addr is empty
-      family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
-  }
+  const auto& conf = msgr->cct->_conf;
+  // bind to socket(s)
+  ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl;
 
   SocketOptions opts;
   opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
   opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
 
-  // use whatever user specified (if anything)
-  entity_addr_t listen_addr = bind_addr;
-  if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) {
-    listen_addr.set_type(entity_addr_t::TYPE_LEGACY);
-  }
-  listen_addr.set_family(family);
+  listen_sockets.resize(bind_addrs.v.size());
+  *bound_addrs = bind_addrs;
 
-  /* bind to port */
-  int r = -1;
+  for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
+    auto& listen_addr = bound_addrs->v[k];
 
-  for (int i = 0; i < conf->ms_bind_retry_count; i++) {
-    if (i > 0) {
-      lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
-                       << conf->ms_bind_retry_delay << " seconds " << dendl;
-      sleep(conf->ms_bind_retry_delay);
-    }
+    /* bind to port */
+    int r = -1;
 
-    if (listen_addr.get_port()) {
-      worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
-        r = worker->listen(listen_addr, opts, &listen_socket);
-      }, false);
-      if (r < 0) {
-        lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
-                         << ": " << cpp_strerror(r) << dendl;
-        continue;
+    for (int i = 0; i < conf->ms_bind_retry_count; i++) {
+      if (i > 0) {
+       lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
+                        << conf->ms_bind_retry_delay << " seconds " << dendl;
+       sleep(conf->ms_bind_retry_delay);
       }
-    } else {
-      // try a range of ports
-      for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) {
-        if (avoid_ports.count(port))
-          continue;
-
-        listen_addr.set_port(port);
-        worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
-          r = worker->listen(listen_addr, opts, &listen_socket);
-        }, false);
-        if (r == 0)
-          break;
+
+      if (listen_addr.get_port()) {
+       worker->center.submit_to(
+         worker->center.get_id(),
+         [this, k, &listen_addr, &opts, &r]() {
+           r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
+         }, false);
+       if (r < 0) {
+         lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
+                          << ": " << cpp_strerror(r) << dendl;
+         continue;
+       }
+      } else {
+       // try a range of ports
+       for (int port = msgr->cct->_conf->ms_bind_port_min;
+            port <= msgr->cct->_conf->ms_bind_port_max;
+            port++) {
+         if (avoid_ports.count(port))
+           continue;
+
+         listen_addr.set_port(port);
+         worker->center.submit_to(
+           worker->center.get_id(),
+           [this, k, &listen_addr, &opts, &r]() {
+             r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
+           }, false);
+         if (r == 0)
+           break;
+       }
+       if (r < 0) {
+         lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
+                          << " on any port in range "
+                          << msgr->cct->_conf->ms_bind_port_min
+                          << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
+                          << cpp_strerror(r) << dendl;
+         listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
+         continue;
+       }
+       ldout(msgr->cct, 10) << __func__ << " bound on random port "
+                            << listen_addr << dendl;
       }
-      if (r < 0) {
-        lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
-                         << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
-                         << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
-                         << cpp_strerror(r) << dendl;
-        listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
-        continue;
+      if (r == 0) {
+       break;
       }
-      ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl;
     }
-    if (r == 0)
-      break;
-  }
-  // It seems that binding completely failed, return with that exit status
-  if (r < 0) {
-    lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
-                     << " attempts: " << cpp_strerror(r) << dendl;
-    return r;
+
+    // It seems that binding completely failed, return with that exit status
+    if (r < 0) {
+      lderr(msgr->cct) << __func__ << " was unable to bind after "
+                      << conf->ms_bind_retry_count
+                      << " attempts: " << cpp_strerror(r) << dendl;
+      for (unsigned j = 0; j < k; ++j) {
+       // clean up previous bind
+       listen_sockets[j].abort_accept();
+      }
+      return r;
+    }
   }
 
-  ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
-  *bound_addr = listen_addr;
+  ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl;
   return 0;
 }
 
@@ -152,58 +154,72 @@ void Processor::start()
   ldout(msgr->cct, 1) << __func__ << dendl;
 
   // start thread
-  if (listen_socket) {
-    worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false);
-  }
+  worker->center.submit_to(worker->center.get_id(), [this]() {
+      for (auto& l : listen_sockets) {
+       if (l) {
+         worker->center.create_file_event(l.fd(), EVENT_READABLE,
+                                          listen_handler); }
+      }
+    }, false);
 }
 
 void Processor::accept()
 {
-  ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl;
   SocketOptions opts;
   opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
   opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
   opts.priority = msgr->get_socket_priority();
-  unsigned accept_error_num = 0;
-
-  while (true) {
-    entity_addr_t addr;
-    ConnectedSocket cli_socket;
-    Worker *w = worker;
-    if (!msgr->get_stack()->support_local_listen_table())
-      w = msgr->get_stack()->get_worker();
-    int r = listen_socket.accept(&cli_socket, opts, &addr, w);
-    if (r == 0) {
-      ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;
-
-      msgr->add_accept(w, std::move(cli_socket), addr);
-      continue;
-    } else {
-      if (r == -EINTR) {
-        continue;
-      } else if (r == -EAGAIN) {
-        break;
-      } else if (r == -EMFILE || r == -ENFILE) {
-        lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
-                         << " errno " << r << " " << cpp_strerror(r) << dendl;
-       if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
-         lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
-         ceph_abort();
-       }
+
+  for (auto& listen_socket : listen_sockets) {
+    ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
+                        << dendl;
+    unsigned accept_error_num = 0;
+
+    while (true) {
+      entity_addr_t addr;
+      ConnectedSocket cli_socket;
+      Worker *w = worker;
+      if (!msgr->get_stack()->support_local_listen_table())
+       w = msgr->get_stack()->get_worker();
+      else
+       ++w->references;
+      int r = listen_socket.accept(&cli_socket, opts, &addr, w);
+      if (r == 0) {
+       ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
+                            << cli_socket.fd() << dendl;
+
+       msgr->add_accept(
+         w, std::move(cli_socket),
+         msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
+         addr);
+       accept_error_num = 0;
        continue;
-      } else if (r == -ECONNABORTED) {
-        ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
-                            << " errno " << r << " " << cpp_strerror(r) << dendl;
-        continue;
       } else {
-        lderr(msgr->cct) << __func__ << " no incoming connection?"
-                         << " errno " << r << " " << cpp_strerror(r) << dendl;
-       if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
-          lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
-          ceph_abort();
+       if (r == -EINTR) {
+         continue;
+       } else if (r == -EAGAIN) {
+         break;
+       } else if (r == -EMFILE || r == -ENFILE) {
+         lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
+                          << " errno " << r << " " << cpp_strerror(r) << dendl;
+         if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
+           lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
+           ceph_abort();
+         }
+         continue;
+       } else if (r == -ECONNABORTED) {
+         ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
+                             << " errno " << r << " " << cpp_strerror(r) << dendl;
+         continue;
+       } else {
+         lderr(msgr->cct) << __func__ << " no incoming connection?"
+                          << " errno " << r << " " << cpp_strerror(r) << dendl;
+         if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
+           lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
+           ceph_abort();
+         }
+         continue;
        }
-       continue;
       }
     }
   }
@@ -213,12 +229,14 @@ void Processor::stop()
 {
   ldout(msgr->cct,10) << __func__ << dendl;
 
-  if (listen_socket) {
-    worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
-      listen_socket.abort_accept();
+  worker->center.submit_to(worker->center.get_id(), [this]() {
+      for (auto& listen_socket : listen_sockets) {
+       if (listen_socket) {
+         worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
+         listen_socket.abort_accept();
+       }
+      }
     }, false);
-  }
 }
 
 
@@ -226,7 +244,7 @@ struct StackSingleton {
   CephContext *cct;
   std::shared_ptr<NetworkStack> stack;
 
-  StackSingleton(CephContext *c): cct(c) {}
+  explicit StackSingleton(CephContext *c): cct(c) {}
   void ready(std::string &type) {
     if (!stack)
       stack = NetworkStack::create(cct, type);
@@ -242,7 +260,7 @@ class C_handle_reap : public EventCallback {
 
   public:
   explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
-  void do_request(int id) override {
+  void do_request(uint64_t id) override {
     // judge whether is a time event
     msgr->reap_dead();
   }
@@ -267,14 +285,14 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
   else if (type.find("dpdk") != std::string::npos)
     transport_type = "dpdk";
 
-  ceph_spin_init(&global_seq_lock);
-  StackSingleton *single;
-  cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+transport_type);
+  auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
+    "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
   single->ready(transport_type);
   stack = single->stack.get();
   stack->start();
   local_worker = stack->get_worker();
-  local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
+  local_connection = new AsyncConnection(cct, this, &dispatch_queue,
+                                        local_worker, true, true);
   init_local_connection();
   reap_handler = new C_handle_reap(this);
   unsigned processor_num = 1;
@@ -291,7 +309,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
 AsyncMessenger::~AsyncMessenger()
 {
   delete reap_handler;
-  assert(!did_bind); // either we didn't bind or we shut down the Processor
+  ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
   local_connection->mark_down();
   for (auto &&p : processors)
     delete p;
@@ -299,11 +317,11 @@ AsyncMessenger::~AsyncMessenger()
 
 void AsyncMessenger::ready()
 {
-  ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+  ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
 
   stack->ready();
   if (pending_bind) {
-    int err = bind(pending_bind_addr);
+    int err = bindv(pending_bind_addrs);
     if (err) {
       lderr(cct) << __func__ << " postponed bind failed" << dendl;
       ceph_abort();
@@ -318,7 +336,7 @@ void AsyncMessenger::ready()
 
 int AsyncMessenger::shutdown()
 {
-  ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+  ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
 
   // done!  clean up.
   for (auto &&p : processors)
@@ -335,8 +353,24 @@ int AsyncMessenger::shutdown()
   return 0;
 }
 
-
 int AsyncMessenger::bind(const entity_addr_t &bind_addr)
+{
+  ldout(cct,10) << __func__ << " " << bind_addr << dendl;
+  // old bind() can take entity_addr_t(). new bindv() can take a
+  // 0.0.0.0-like address but needs type and family to be set.
+  auto a = bind_addr;
+  if (a == entity_addr_t()) {
+    a.set_type(entity_addr_t::TYPE_LEGACY);
+    if (cct->_conf->ms_bind_ipv6) {
+      a.set_family(AF_INET6);
+    } else {
+      a.set_family(AF_INET);
+    }
+  }
+  return bindv(entity_addrvec_t(a));
+}
+
+int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
 {
   lock.Lock();
 
@@ -346,11 +380,11 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
     return -1;
   }
 
-  ldout(cct,10) << __func__ << " bind " << bind_addr << dendl;
+  ldout(cct,10) << __func__ << " " << bind_addrs << dendl;
 
   if (!stack->is_ready()) {
     ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
-    pending_bind_addr = bind_addr;
+    pending_bind_addrs = bind_addrs;
     pending_bind = true;
     lock.Unlock();
     return 0;
@@ -360,10 +394,10 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
 
   // bind to a socket
   set<int> avoid_ports;
-  entity_addr_t bound_addr;
+  entity_addrvec_t bound_addrs;
   unsigned i = 0;
   for (auto &&p : processors) {
-    int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+    int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
     if (r) {
       // Note: this is related to local tcp listen table problem.
       // Posix(default kernel implementation) backend shares listen table
@@ -374,19 +408,19 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
       // it, like port is used case. But if the first worker successfully to bind
       // but the second worker failed, it's not expected and we need to assert
       // here
-      assert(i == 0);
+      ceph_assert(i == 0);
       return r;
     }
     ++i;
   }
-  _finish_bind(bind_addr, bound_addr);
+  _finish_bind(bind_addrs, bound_addrs);
   return 0;
 }
 
 int AsyncMessenger::rebind(const set<int>& avoid_ports)
 {
   ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
-  assert(did_bind);
+  ceph_assert(did_bind);
 
   for (auto &&p : processors)
     p->stop();
@@ -395,25 +429,27 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
   // adjust the nonce; we want our entity_addr_t to be truly unique.
   nonce += 1000000;
   ldout(cct, 10) << __func__ << " new nonce " << nonce
-                << " and inst " << get_myinst() << dendl;
+                << " and addr " << get_myaddrs() << dendl;
 
-  entity_addr_t bound_addr;
-  entity_addr_t bind_addr = get_myaddr();
-  bind_addr.set_port(0);
+  entity_addrvec_t bound_addrs;
+  entity_addrvec_t bind_addrs = get_myaddrs();
   set<int> new_avoid(avoid_ports);
-  new_avoid.insert(bind_addr.get_port());
-  ldout(cct, 10) << __func__ << " will try " << bind_addr
+  for (auto& a : bind_addrs.v) {
+    new_avoid.insert(a.get_port());
+    a.set_port(0);
+  }
+  ldout(cct, 10) << __func__ << " will try " << bind_addrs
                 << " and avoid ports " << new_avoid << dendl;
   unsigned i = 0;
   for (auto &&p : processors) {
-    int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+    int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
     if (r) {
-      assert(i == 0);
+      ceph_assert(i == 0);
       return r;
     }
     ++i;
   }
-  _finish_bind(bind_addr, bound_addr);
+  _finish_bind(bind_addrs, bound_addrs);
   for (auto &&p : processors) {
     p->start();
   }
@@ -426,7 +462,6 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
     return 0;
   Mutex::Locker l(lock);
   if (did_bind) {
-    assert(my_inst.addr == bind_addr);
     return 0;
   }
   if (started) {
@@ -435,27 +470,32 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
   }
   ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
 
-  set_myaddr(bind_addr);
+  set_myaddrs(entity_addrvec_t(bind_addr));
   return 0;
 }
 
-void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
-                                 const entity_addr_t& listen_addr)
+void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
+                                 const entity_addrvec_t& listen_addrs)
 {
-  set_myaddr(bind_addr);
-  if (bind_addr != entity_addr_t())
-    learned_addr(bind_addr);
+  set_myaddrs(bind_addrs);
+  for (auto& a : bind_addrs.v) {
+    if (!a.is_blank_ip()) {
+      learned_addr(a);
+    }
+  }
 
-  if (get_myaddr().get_port() == 0) {
-    set_myaddr(listen_addr);
+  if (get_myaddrs().front().get_port() == 0) {
+    set_myaddrs(listen_addrs);
   }
-  entity_addr_t addr = get_myaddr();
-  addr.set_nonce(nonce);
-  set_myaddr(addr);
+  entity_addrvec_t newaddrs = *my_addrs;
+  for (auto& a : newaddrs.v) {
+    a.set_nonce(nonce);
+  }
+  set_myaddrs(newaddrs);
 
   init_local_connection();
 
-  ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl;
+  ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
   did_bind = true;
 }
 
@@ -465,14 +505,18 @@ int AsyncMessenger::start()
   ldout(cct,1) << __func__ << " start" << dendl;
 
   // register at least one entity, first!
-  assert(my_inst.name.type() >= 0);
+  ceph_assert(my_name.type() >= 0);
 
-  assert(!started);
+  ceph_assert(!started);
   started = true;
   stopped = false;
 
   if (!did_bind) {
-    my_inst.addr.nonce = nonce;
+    entity_addrvec_t newaddrs = *my_addrs;
+    for (auto& a : newaddrs.v) {
+      a.nonce = nonce;
+    }
+    set_myaddrs(newaddrs);
     _init_local_connection();
   }
 
@@ -509,90 +553,149 @@ void AsyncMessenger::wait()
   started = false;
 }
 
-void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
+void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
+                               const entity_addr_t &listen_addr,
+                               const entity_addr_t &peer_addr)
 {
   lock.Lock();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
-  conn->accept(std::move(cli_socket), addr);
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+                                               listen_addr.is_msgr2(), false);
+  conn->accept(std::move(cli_socket), listen_addr, peer_addr);
   accepting_conns.insert(conn);
   lock.Unlock();
 }
 
-AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
+AsyncConnectionRef AsyncMessenger::create_connect(
+  const entity_addrvec_t& addrs, int type)
 {
-  assert(lock.is_locked());
-  assert(addr != my_inst.addr);
+  ceph_assert(lock.is_locked());
 
-  ldout(cct, 10) << __func__ << " " << addr
+  ldout(cct, 10) << __func__ << " " << addrs
       << ", creating connection and registering" << dendl;
 
+  // here is where we decide which of the addrs to connect to.  always prefer
+  // the first one, if we support it.
+  entity_addr_t target;
+  for (auto& a : addrs.v) {
+    if (!a.is_msgr2() && !a.is_legacy()) {
+      continue;
+    }
+    // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
+    // trying it?  for now, just pick whichever is listed first.
+    target = a;
+    break;
+  }
+
   // create connection
   Worker *w = stack->get_worker();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
-  conn->connect(addr, type);
-  assert(!conns.count(addr));
-  conns[addr] = conn;
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+                                               target.is_msgr2(), false);
+  conn->connect(addrs, type, target);
+  ceph_assert(!conns.count(addrs));
+  ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
+                << *conn->peer_addrs << dendl;
+  conns[addrs] = conn;
   w->get_perf_counter()->inc(l_msgr_active_connections);
 
   return conn;
 }
 
-ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
-{
-  Mutex::Locker l(lock);
-  if (my_inst.addr == dest.addr) {
-    // local
-    return local_connection;
-  }
 
-  AsyncConnectionRef conn = _lookup_conn(dest.addr);
-  if (conn) {
-    ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl;
-  } else {
-    conn = create_connect(dest.addr, dest.name.type());
-    ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl;
-  }
+ConnectionRef AsyncMessenger::get_loopback_connection()
+{
+  return local_connection;
+}
 
-  return conn;
+bool AsyncMessenger::should_use_msgr2()
+{
+  // if we are bound to v1 only, and we are connecting to a v2 peer,
+  // we cannot use the peer's v2 address. otherwise the connection
+  // is assymetrical, because they would have to use v1 to connect
+  // to us, and we would use v2, and connection race detection etc
+  // would totally break down (among other things).  or, the other
+  // end will be confused that we advertise ourselve with a v1
+  // address only (that we bound to) but connected with protocol v2.
+  return !did_bind || get_myaddrs().has_msgr2();
 }
 
-ConnectionRef AsyncMessenger::get_loopback_connection()
+entity_addrvec_t AsyncMessenger::_filter_addrs(int type,
+                                              const entity_addrvec_t& addrs)
 {
-  return local_connection;
+  if (!should_use_msgr2()) {
+    ldout(cct, 10) << __func__ << " " << addrs << " type " << type
+                  << " limiting to v1 ()" << dendl;
+    entity_addrvec_t r;
+    for (auto& i : addrs.v) {
+      if (i.is_msgr2()) {
+       continue;
+      }
+      r.v.push_back(i);
+    }
+    return r;
+  } else {
+    return addrs;
+  }
 }
 
-int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
+int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
 {
-  FUNCTRACE();
-  assert(m);
+  Mutex::Locker l(lock);
+
+  FUNCTRACE(cct);
+  ceph_assert(m);
 
   if (m->get_type() == CEPH_MSG_OSD_OP)
     OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
   else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
     OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
 
-  ldout(cct, 1) << __func__ << "--> " << dest.name << " "
-      << dest.addr << " -- " << *m << " -- ?+"
+  ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " "
+      << addrs << " -- " << *m << " -- ?+"
       << m->get_data().length() << " " << m << dendl;
 
-  if (dest.addr == entity_addr_t()) {
+  if (addrs.empty()) {
     ldout(cct,0) << __func__ <<  " message " << *m
-        << " with empty dest " << dest.addr << dendl;
+        << " with empty dest " << addrs << dendl;
     m->put();
     return -EINVAL;
   }
 
-  AsyncConnectionRef conn = _lookup_conn(dest.addr);
-  submit_message(m, conn, dest.addr, dest.name.type());
+  auto av = _filter_addrs(type, addrs);
+  AsyncConnectionRef conn = _lookup_conn(av);
+  submit_message(m, conn, av, type);
   return 0;
 }
 
+ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs)
+{
+  Mutex::Locker l(lock);
+  if (*my_addrs == addrs ||
+      (addrs.v.size() == 1 &&
+       my_addrs->contains(addrs.front()))) {
+    // local
+    return local_connection;
+  }
+
+  auto av = _filter_addrs(type, addrs);
+
+  AsyncConnectionRef conn = _lookup_conn(av);
+  if (conn) {
+    ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
+  } else {
+    conn = create_connect(av, type);
+    ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
+  }
+
+  return conn;
+}
+
 void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
-                                    const entity_addr_t& dest_addr, int dest_type)
+                                    const entity_addrvec_t& dest_addrs,
+                                   int dest_type)
 {
   if (cct->_conf->ms_dump_on_send) {
     m->encode(-1, MSG_CRC_ALL);
-    ldout(cct, 0) << __func__ << "submit_message " << *m << "\n";
+    ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
     m->get_payload().hexdump(*_dout);
     if (m->get_data().length() > 0) {
       *_dout << " data:\n";
@@ -609,7 +712,9 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
   }
 
   // local?
-  if (my_inst.addr == dest_addr) {
+  if (*my_addrs == dest_addrs ||
+      (dest_addrs.v.size() == 1 &&
+       my_addrs->contains(dest_addrs.front()))) {
     // local
     local_connection->send_message(m);
     return ;
@@ -618,38 +723,64 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
   // remote, no existing connection.
   const Policy& policy = get_policy(dest_type);
   if (policy.server) {
-    ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addr
+    ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addrs
         << ", lossy server for target type "
         << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
     m->put();
   } else {
-    ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl;
-    con = create_connect(dest_addr, dest_type);
+    ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
+                 << ", new connection." << dendl;
+    con = create_connect(dest_addrs, dest_type);
     con->send_message(m);
   }
 }
 
 /**
- * If my_inst.addr doesn't have an IP set, this function
+ * If my_addr doesn't have an IP set, this function
  * will fill it in from the passed addr. Otherwise it does nothing and returns.
  */
-void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr)
+bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
 {
+  ldout(cct,1) << __func__ << " " << addrs << dendl;
+  bool ret = false;
   Mutex::Locker l(lock);
-  if (my_inst.addr.is_blank_ip()) {
-    int port = my_inst.addr.get_port();
-    my_inst.addr.u = addr.u;
-    my_inst.addr.set_port(port);
+
+  entity_addrvec_t newaddrs = *my_addrs;
+  for (auto& a : newaddrs.v) {
+    if (a.is_blank_ip()) {
+      int type = a.get_type();
+      int port = a.get_port();
+      uint32_t nonce = a.get_nonce();
+      for (auto& b : addrs.v) {
+       if (a.get_family() == b.get_family()) {
+         ldout(cct,1) << __func__ << " assuming my addr " << a
+                      << " matches provided addr " << b << dendl;
+         a = b;
+         a.set_nonce(nonce);
+         a.set_type(type);
+         a.set_port(port);
+         ret = true;
+         break;
+       }
+      }
+    }
+  }
+  set_myaddrs(newaddrs);
+  if (ret) {
     _init_local_connection();
   }
+  ldout(cct,1) << __func__ << " now " << *my_addrs << dendl;
+  return ret;
 }
 
-void AsyncMessenger::set_addr(const entity_addr_t &addr)
+void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
 {
   Mutex::Locker l(lock);
-  entity_addr_t t = addr;
-  t.set_nonce(nonce);
-  set_myaddr(t);
+  auto t = addrs;
+  for (auto& a : t.v) {
+    a.set_nonce(nonce);
+  }
+  set_myaddrs(t);
   _init_local_connection();
 }
 
@@ -666,7 +797,7 @@ void AsyncMessenger::shutdown_connections(bool queue_reset)
   accepting_conns.clear();
 
   while (!conns.empty()) {
-    ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
+    auto it = conns.begin();
     AsyncConnectionRef p = it->second;
     ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
     conns.erase(it);
@@ -686,22 +817,22 @@ void AsyncMessenger::shutdown_connections(bool queue_reset)
   lock.Unlock();
 }
 
-void AsyncMessenger::mark_down(const entity_addr_t& addr)
+void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
 {
   lock.Lock();
-  AsyncConnectionRef p = _lookup_conn(addr);
+  AsyncConnectionRef p = _lookup_conn(addrs);
   if (p) {
-    ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
+    ldout(cct, 1) << __func__ << " " << addrs << " -- " << p << dendl;
     p->stop(true);
   } else {
-    ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
+    ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
   }
   lock.Unlock();
 }
 
 int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
 {
-  int my_type = my_inst.name.type();
+  int my_type = my_name.type();
 
   // set reply protocol version
   if (peer_type == my_type) {
@@ -718,27 +849,80 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
   return 0;
 }
 
-void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
+{
+  Mutex::Locker l(lock);
+  auto it = conns.find(*conn->peer_addrs);
+  if (it != conns.end()) {
+    AsyncConnectionRef existing = it->second;
+
+    // lazy delete, see "deleted_conns"
+    // If conn already in, we will return 0
+    Mutex::Locker l(deleted_lock);
+    if (deleted_conns.erase(existing)) {
+      existing->get_perf_counter()->dec(l_msgr_active_connections);
+      conns.erase(it);
+    } else if (conn != existing) {
+      return -1;
+    }
+  }
+  ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
+  conns[*conn->peer_addrs] = conn;
+  conn->get_perf_counter()->inc(l_msgr_active_connections);
+  accepting_conns.erase(conn);
+  return 0;
+}
+
+
+bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
 {
   // be careful here: multiple threads may block here, and readers of
-  // my_inst.addr do NOT hold any lock.
+  // my_addr do NOT hold any lock.
 
   // this always goes from true -> false under the protection of the
   // mutex.  if it is already false, we need not retake the mutex at
   // all.
   if (!need_addr)
-    return ;
-  lock.Lock();
+    return false;
+  std::lock_guard l(lock);
   if (need_addr) {
-    need_addr = false;
-    entity_addr_t t = peer_addr_for_me;
-    t.set_port(my_inst.addr.get_port());
-    t.set_nonce(my_inst.addr.get_nonce());
-    my_inst.addr = t;
-    ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
+    if (my_addrs->empty()) {
+      auto a = peer_addr_for_me;
+      a.set_type(entity_addr_t::TYPE_ANY);
+      a.set_nonce(nonce);
+      if (!did_bind) {
+       a.set_port(0);
+      }
+      set_myaddrs(entity_addrvec_t(a));
+      ldout(cct,10) << __func__ << " had no addrs" << dendl;
+    } else {
+      // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
+      entity_addrvec_t newaddrs = *my_addrs;
+      for (auto& a : newaddrs.v) {
+       if (a.is_blank_ip() &&
+           a.get_family() == peer_addr_for_me.get_family()) {
+         entity_addr_t t = peer_addr_for_me;
+         if (!did_bind) {
+           t.set_type(entity_addr_t::TYPE_ANY);
+           t.set_port(0);
+         } else {        
+           t.set_type(a.get_type());
+           t.set_port(a.get_port());
+         }
+         t.set_nonce(a.get_nonce());
+         ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
+         a = t;
+       }
+      }
+      set_myaddrs(newaddrs);
+    }
+    ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs
+                 << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
     _init_local_connection();
+    need_addr = false;
+    return true;
   }
-  lock.Unlock();
+  return false;
 }
 
 int AsyncMessenger::reap_dead()
@@ -753,7 +937,7 @@ int AsyncMessenger::reap_dead()
     auto it = deleted_conns.begin();
     AsyncConnectionRef p = *it;
     ldout(cct, 5) << __func__ << " delete " << p << dendl;
-    auto conns_it = conns.find(p->peer_addr);
+    auto conns_it = conns.find(*p->peer_addrs);
     if (conns_it != conns.end() && conns_it->second == p)
       conns.erase(conns_it);
     accepting_conns.erase(p);