]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/SocketMessenger.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / net / SocketMessenger.cc
index 423e7d4edc987303681b0acb98945b958bb69a09..db9421e79e28a09aff172b7c95037201bafe93d5 100644 (file)
@@ -19,7 +19,6 @@
 
 #include "auth/Auth.h"
 #include "Errors.h"
-#include "Dispatcher.h"
 #include "Socket.h"
 
 namespace {
@@ -34,14 +33,14 @@ SocketMessenger::SocketMessenger(const entity_name_t& myname,
                                  const std::string& logic_name,
                                  uint32_t nonce)
   : Messenger{myname},
-    master_sid{seastar::engine().cpu_id()},
+    master_sid{seastar::this_shard_id()},
     logic_name{logic_name},
     nonce{nonce}
 {}
 
 seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
 {
-  assert(seastar::engine().cpu_id() == master_sid);
+  assert(seastar::this_shard_id() == master_sid);
   auto my_addrs = addrs;
   for (auto& addr : my_addrs.v) {
     addr.nonce = nonce;
@@ -49,9 +48,9 @@ seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
   return Messenger::set_myaddrs(my_addrs);
 }
 
-seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
+SocketMessenger::bind_ertr::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
 {
-  assert(seastar::engine().cpu_id() == master_sid);
+  assert(seastar::this_shard_id() == master_sid);
   ceph_assert(addrs.front().get_family() == AF_INET);
   return set_myaddrs(addrs).then([this] {
     if (!listener) {
@@ -61,69 +60,82 @@ seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
     } else {
       return seastar::now();
     }
-  }).then([this] {
-    auto listen_addr = get_myaddr();
-    logger().debug("{} do_bind: try listen {}...", *this, listen_addr.in4_addr());
+  }).then([this] () -> bind_ertr::future<> {
+    const entity_addr_t listen_addr = get_myaddr();
+    logger().debug("{} do_bind: try listen {}...", *this, listen_addr);
     if (!listener) {
       logger().warn("{} do_bind: listener doesn't exist", *this);
-      return seastar::now();
+      return bind_ertr::now();
     }
     return listener->listen(listen_addr);
   });
 }
 
-seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
+SocketMessenger::bind_ertr::future<>
+SocketMessenger::bind(const entity_addrvec_t& addrs)
 {
-  return do_bind(addrs).then([this] {
+  return do_bind(addrs).safe_then([this] {
     logger().info("{} bind: done", *this);
   });
 }
 
-seastar::future<>
+SocketMessenger::bind_ertr::future<>
 SocketMessenger::try_bind(const entity_addrvec_t& addrs,
                           uint32_t min_port, uint32_t max_port)
 {
   auto addr = addrs.front();
   if (addr.get_port() != 0) {
-    return do_bind(addrs).then([this] {
+    return do_bind(addrs).safe_then([this] {
       logger().info("{} try_bind: done", *this);
     });
   }
   ceph_assert(min_port <= max_port);
   return seastar::do_with(uint32_t(min_port),
                           [this, max_port, addr] (auto& port) {
-    return seastar::repeat([this, max_port, addr, &port] {
+    return seastar::repeat_until_value([this, max_port, addr, &port] {
       auto to_bind = addr;
       to_bind.set_port(port);
-      return do_bind(entity_addrvec_t{to_bind}).then([this] {
+      return do_bind(entity_addrvec_t{to_bind}
+      ).safe_then([this] () -> seastar::future<std::optional<bool>> {
         logger().info("{} try_bind: done", *this);
-        return stop_t::yes;
-      }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
-        assert(e.code() == std::errc::address_in_use);
+        return seastar::make_ready_future<std::optional<bool>>(
+            std::make_optional<bool>(true));
+      }, bind_ertr::all_same_way([this, max_port, &port]
+                                 (const std::error_code& e) mutable
+                                 -> seastar::future<std::optional<bool>> {
+        assert(e == std::errc::address_in_use);
         logger().trace("{} try_bind: {} already used", *this, port);
         if (port == max_port) {
-          throw;
+          return seastar::make_ready_future<std::optional<bool>>(
+              std::make_optional<bool>(false));
         }
         ++port;
-        return stop_t::no;
-      });
+        return seastar::make_ready_future<std::optional<bool>>();
+      }));
+    }).then([] (bool success) -> bind_ertr::future<> {
+      if (success) {
+        return bind_ertr::now();
+      } else {
+        return crimson::ct_error::address_in_use::make();
+      }
     });
   });
 }
 
-seastar::future<> SocketMessenger::start(Dispatcher *disp) {
-  assert(seastar::engine().cpu_id() == master_sid);
+seastar::future<> SocketMessenger::start(
+    const dispatchers_t& _dispatchers) {
+  assert(seastar::this_shard_id() == master_sid);
 
-  dispatcher = disp;
+  dispatchers.assign(_dispatchers);
   if (listener) {
     // make sure we have already bound to a valid address
     ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
     ceph_assert(get_myaddr().get_port() > 0);
 
     return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
-      assert(seastar::engine().cpu_id() == master_sid);
+      assert(seastar::this_shard_id() == master_sid);
       SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
-          *this, *dispatcher, get_myaddr().is_msgr2());
+          *this, dispatchers, get_myaddr().is_msgr2());
       conn->start_accept(std::move(socket), peer_addr);
       return seastar::now();
     });
@@ -132,27 +144,29 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) {
 }
 
 crimson::net::ConnectionRef
-SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
 {
-  assert(seastar::engine().cpu_id() == master_sid);
+  assert(seastar::this_shard_id() == master_sid);
 
   // make sure we connect to a valid peer_addr
   ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
   ceph_assert(peer_addr.get_port() > 0);
 
   if (auto found = lookup_conn(peer_addr); found) {
+    logger().debug("{} connect to existing", *found);
     return found->shared_from_this();
   }
   SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
-      *this, *dispatcher, peer_addr.is_msgr2());
-  conn->start_connect(peer_addr, peer_type);
+      *this, dispatchers, peer_addr.is_msgr2());
+  conn->start_connect(peer_addr, peer_name);
   return conn->shared_from_this();
 }
 
 seastar::future<> SocketMessenger::shutdown()
 {
-  assert(seastar::engine().cpu_id() == master_sid);
-  return seastar::futurize_apply([this] {
+  assert(seastar::this_shard_id() == master_sid);
+  return seastar::futurize_invoke([this] {
+    assert(dispatchers.empty());
     if (listener) {
       auto d_listener = listener;
       listener = nullptr;
@@ -163,12 +177,16 @@ seastar::future<> SocketMessenger::shutdown()
   // close all connections
   }).then([this] {
     return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
-      return conn->close();
+      return conn->close_clean(false);
     });
   }).then([this] {
     ceph_assert(accepting_conns.empty());
     return seastar::parallel_for_each(connections, [] (auto conn) {
-      return conn.second->close();
+      return conn.second->close_clean(false);
+    });
+  }).then([this] {
+    return seastar::parallel_for_each(closing_conns, [] (auto conn) {
+      return conn->close_clean(false);
     });
   }).then([this] {
     ceph_assert(connections.empty());
@@ -178,7 +196,7 @@ seastar::future<> SocketMessenger::shutdown()
 
 seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
 {
-  assert(seastar::engine().cpu_id() == master_sid);
+  assert(seastar::this_shard_id() == master_sid);
   if (!need_addr) {
     if ((!get_myaddr().is_any() &&
          get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
@@ -191,13 +209,13 @@ seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_f
     }
     return seastar::now();
   }
-  need_addr = false;
 
   if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
     // Not bound
     entity_addr_t addr = peer_addr_for_me;
     addr.set_type(entity_addr_t::TYPE_ANY);
     addr.set_port(0);
+    need_addr = false;
     return set_myaddrs(entity_addrvec_t{addr}
     ).then([this, &conn, peer_addr_for_me] {
       logger().info("{} learned myaddr={} (unbound) from {}",
@@ -222,6 +240,7 @@ seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_f
       entity_addr_t addr = peer_addr_for_me;
       addr.set_type(get_myaddr().get_type());
       addr.set_port(get_myaddr().get_port());
+      need_addr = false;
       return set_myaddrs(entity_addrvec_t{addr}
       ).then([this, &conn, peer_addr_for_me] {
         logger().info("{} learned myaddr={} (blank IP) from {}",
@@ -233,6 +252,7 @@ seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_f
       throw std::system_error(
           make_error_code(crimson::net::error::bad_peer_address));
     } else {
+      need_addr = false;
       return seastar::now();
     }
   }
@@ -302,6 +322,23 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn)
   connections.erase(found);
 }
 
+void SocketMessenger::closing_conn(SocketConnectionRef conn)
+{
+  closing_conns.push_back(conn);
+}
+
+void SocketMessenger::closed_conn(SocketConnectionRef conn)
+{
+  for (auto it = closing_conns.begin();
+       it != closing_conns.end();) {
+    if (*it == conn) {
+      it = closing_conns.erase(it);
+    } else {
+      it++;
+    }
+  }
+}
+
 seastar::future<uint32_t>
 SocketMessenger::get_global_seq(uint32_t old)
 {