]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/mgr/client.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / mgr / client.cc
index 60fbfdf683629ee5977895011619e67328963650..5aa8a88ba214a7eb59ba7734411264321b662ed6 100644 (file)
@@ -3,6 +3,8 @@
 
 #include "client.h"
 
+#include <seastar/core/sleep.hh>
+
 #include "crimson/common/log.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
@@ -26,7 +28,7 @@ Client::Client(crimson::net::Messenger& msgr,
                  WithStats& with_stats)
   : msgr{msgr},
     with_stats{with_stats},
-    tick_timer{[this] {tick();}}
+    report_timer{[this] {report();}}
 {}
 
 seastar::future<> Client::start()
@@ -36,56 +38,86 @@ seastar::future<> Client::start()
 
 seastar::future<> Client::stop()
 {
-  return gate.close().then([this] {
-    if (conn) {
-      return conn->close();
-    } else {
+  logger().info("{}", __func__);
+  report_timer.cancel();
+  auto fut = gate.close();
+  if (conn) {
+    conn->mark_down();
+  }
+  return fut;
+}
+
+std::optional<seastar::future<>>
+Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
+{
+  bool dispatched = true;
+  gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
+    switch(m->get_type()) {
+    case MSG_MGR_MAP:
+      return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
+    case MSG_MGR_CONFIGURE:
+      return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
+    default:
+      dispatched = false;
       return seastar::now();
     }
   });
+  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
-seastar::future<> Client::ms_dispatch(crimson::net::Connection* conn,
-                                      MessageRef m)
+void Client::ms_handle_connect(crimson::net::ConnectionRef c)
 {
-  switch(m->get_type()) {
-  case MSG_MGR_MAP:
-    return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
-  case MSG_MGR_CONFIGURE:
-    return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
-  default:
-    return seastar::now();
-  }
+  gate.dispatch_in_background(__func__, *this, [this, c] {
+    if (conn == c) {
+      // ask for the mgrconfigure message
+      auto m = ceph::make_message<MMgrOpen>();
+      m->daemon_name = local_conf()->name.get_id();
+      return conn->send(std::move(m));
+    } else {
+      return seastar::now();
+    }
+  });
 }
 
-seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c)
+void Client::ms_handle_reset(crimson::net::ConnectionRef c, bool /* is_replace */)
 {
-  if (conn == c) {
-    conn = nullptr;
-    tick_timer.cancel();
-  }
-  return seastar::now();
+  gate.dispatch_in_background(__func__, *this, [this, c] {
+    if (conn == c) {
+      report_timer.cancel();
+      return reconnect();
+    } else {
+      return seastar::now();
+    }
+  });
 }
 
 seastar::future<> Client::reconnect()
 {
   if (conn) {
-    // crimson::net::Protocol::close() is able to close() in background
-    (void)conn->close();
+    conn->mark_down();
+    conn = {};
   }
   if (!mgrmap.get_available()) {
     logger().warn("No active mgr available yet");
     return seastar::now();
   }
-  auto peer = mgrmap.get_active_addrs().front();
-  conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MGR);
-  // ask for the mgrconfigure message
-  auto m = ceph::make_message<MMgrOpen>();
-  m->daemon_name = local_conf()->name.get_id();
-  return conn->send(std::move(m));
+  auto retry_interval = std::chrono::duration<double>(
+    local_conf().get_val<double>("mgr_connect_retry_interval"));
+  auto a_while = std::chrono::duration_cast<seastar::steady_clock_type::duration>(
+    retry_interval);
+  return seastar::sleep(a_while).then([this] {
+    auto peer = mgrmap.get_active_addrs().pick_addr(msgr.get_myaddr().get_type());
+    if (peer == entity_addr_t{}) {
+      // crimson msgr only uses the first bound addr
+      logger().error("mgr.{} does not have an addr compatible with me",
+                     mgrmap.get_active_name());
+      return;
+    }
+    conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MGR);
+  });
 }
 
-seastar::future<> Client::handle_mgr_map(crimson::net::Connection*,
+seastar::future<> Client::handle_mgr_map(crimson::net::ConnectionRef,
                                          Ref<MMgrMap> m)
 {
   mgrmap = m->get_map();
@@ -99,34 +131,36 @@ seastar::future<> Client::handle_mgr_map(crimson::net::Connection*,
   }
 }
 
-seastar::future<> Client::handle_mgr_conf(crimson::net::Connection* conn,
+seastar::future<> Client::handle_mgr_conf(crimson::net::ConnectionRef,
                                           Ref<MMgrConfigure> m)
 {
   logger().info("{} {}", __func__, *m);
 
-  auto tick_period = std::chrono::seconds{m->stats_period};
-  if (tick_period.count()) {
-    if (tick_timer.armed()) {
-      tick_timer.rearm(tick_timer.get_timeout(), tick_period);
+  auto report_period = std::chrono::seconds{m->stats_period};
+  if (report_period.count()) {
+    if (report_timer.armed()) {
+      report_timer.rearm(report_timer.get_timeout(), report_period);
     } else {
-      tick_timer.arm_periodic(tick_period);
+      report_timer.arm_periodic(report_period);
     }
   } else {
-    tick_timer.cancel();
+    report_timer.cancel();
   }
   return seastar::now();
 }
 
-void Client::tick()
+void Client::report()
 {
-  (void) seastar::with_gate(gate, [this] {
-    if (conn) {
-      auto pg_stats = with_stats.get_stats();
-      return conn->send(std::move(pg_stats));
-    } else {
-      return reconnect();
-    }
+  gate.dispatch_in_background(__func__, *this, [this] {
+    assert(conn);
+    auto pg_stats = with_stats.get_stats();
+    return conn->send(std::move(pg_stats));
   });
 }
 
+void Client::print(std::ostream& out) const
+{
+  out << "mgrc ";
+}
+
 }