]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/MgrClient.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / mgr / MgrClient.cc
index 4737ea428d547f411d6e1daee899f20a5e4bc821..4903fa3a083ca4b305d7aa0a9557dbb4ab4966e7 100644 (file)
@@ -15,6 +15,7 @@
 #include "MgrClient.h"
 
 #include "mgr/MgrContext.h"
+#include "mon/MonMap.h"
 
 #include "msg/Messenger.h"
 #include "messages/MMgrMap.h"
 #include "messages/MMgrConfigure.h"
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
+#include "messages/MMgrCommand.h"
+#include "messages/MMgrCommandReply.h"
 #include "messages/MPGStats.h"
 
+using std::string;
+using std::vector;
+
+using ceph::bufferlist;
+
 #define dout_subsys ceph_subsys_mgrc
 #undef dout_prefix
 #define dout_prefix *_dout << "mgrc " << __func__ << " "
 
-MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
-    : Dispatcher(cct_), cct(cct_), msgr(msgr_),
-      timer(cct_, lock)
+MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_, MonMap *monmap_)
+  : Dispatcher(cct_),
+    cct(cct_),
+    msgr(msgr_),
+    monmap(monmap_),
+    timer(cct_, lock)
 {
   ceph_assert(cct != nullptr);
 }
@@ -44,11 +55,12 @@ void MgrClient::init()
   ceph_assert(msgr != nullptr);
 
   timer.init();
+  initialized = true;
 }
 
 void MgrClient::shutdown()
 {
-  std::lock_guard l(lock);
+  std::unique_lock l(lock);
   ldout(cct, 10) << dendl;
 
   if (connect_retry_callback) {
@@ -64,14 +76,13 @@ void MgrClient::shutdown()
       session->con &&
       HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) {
     ldout(cct, 10) << "closing mgr session" << dendl;
-    MMgrClose *m = new MMgrClose();
+    auto m = make_message<MMgrClose>();
     m->daemon_name = daemon_name;
     m->service_name = service_name;
-    session->con->send_message(m);
-    utime_t timeout;
-    timeout.set_from_double(cct->_conf.get_val<double>(
+    session->con->send_message2(m);
+    auto timeout = ceph::make_timespan(cct->_conf.get_val<double>(
                              "mgr_client_service_daemon_unregister_timeout"));
-    shutdown_cond.WaitInterval(lock, timeout);
+    shutdown_cond.wait_for(l, timeout);
   }
 
   timer.shutdown();
@@ -81,20 +92,29 @@ void MgrClient::shutdown()
   }
 }
 
-bool MgrClient::ms_dispatch(Message *m)
+bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
 {
   std::lock_guard l(lock);
 
   switch(m->get_type()) {
   case MSG_MGR_MAP:
-    return handle_mgr_map(static_cast<MMgrMap*>(m));
+    return handle_mgr_map(ref_cast<MMgrMap>(m));
   case MSG_MGR_CONFIGURE:
-    return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
+    return handle_mgr_configure(ref_cast<MMgrConfigure>(m));
   case MSG_MGR_CLOSE:
-    return handle_mgr_close(static_cast<MMgrClose*>(m));
+    return handle_mgr_close(ref_cast<MMgrClose>(m));
   case MSG_COMMAND_REPLY:
     if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
-      handle_command_reply(static_cast<MCommandReply*>(m));
+      MCommandReply *c = static_cast<MCommandReply*>(m.get());
+      handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
+      return true;
+    } else {
+      return false;
+    }
+  case MSG_MGR_COMMAND_REPLY:
+    if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
+      MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
+      handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
       return true;
     } else {
       return false;
@@ -107,7 +127,7 @@ bool MgrClient::ms_dispatch(Message *m)
 
 void MgrClient::reconnect()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   if (session) {
     ldout(cct, 4) << "Terminating session with "
@@ -126,15 +146,16 @@ void MgrClient::reconnect()
     return;
   }
 
-  if (last_connect_attempt != utime_t()) {
-    utime_t now = ceph_clock_now();
-    utime_t when = last_connect_attempt;
-    when += cct->_conf.get_val<double>("mgr_connect_retry_interval");
+  if (!clock_t::is_zero(last_connect_attempt)) {
+    auto now = clock_t::now();
+    auto when = last_connect_attempt +
+      ceph::make_timespan(
+        cct->_conf.get_val<double>("mgr_connect_retry_interval"));
     if (now < when) {
       if (!connect_retry_callback) {
        connect_retry_callback = timer.add_event_at(
          when,
-         new FunctionContext([this](int r){
+         new LambdaContext([this](int r){
              connect_retry_callback = nullptr;
              reconnect();
            }));
@@ -151,7 +172,7 @@ void MgrClient::reconnect()
 
   ldout(cct, 4) << "Starting new session with " << map.get_active_addrs()
                << dendl;
-  last_connect_attempt = ceph_clock_now();
+  last_connect_attempt = clock_t::now();
 
   session.reset(new MgrSessionState());
   session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR,
@@ -168,18 +189,42 @@ void MgrClient::reconnect()
   }
 
   // resend any pending commands
-  for (const auto &p : command_table.get_commands()) {
-    auto m = p.second.get_message({});
+  auto p = command_table.get_commands().begin();
+  while (p != command_table.get_commands().end()) {
+    auto tid = p->first;
+    auto& op = p->second;
+    ldout(cct,10) << "resending " << tid << (op.tell ? " (tell)":" (cli)") << dendl;
+    MessageRef m;
+    if (op.tell) {
+      if (op.name.size() && op.name != map.active_name) {
+       ldout(cct, 10) << "active mgr " << map.active_name << " != target "
+                      << op.name << dendl;
+       if (op.on_finish) {
+         op.on_finish->complete(-ENXIO);
+       }
+       ++p;
+       command_table.erase(tid);
+       continue;
+      }
+      // Set fsid argument to signal that this is really a tell message (and
+      // we are not a legacy client sending a non-tell command via MCommand).
+      m = op.get_message(monmap->fsid, false);
+    } else {
+      m = op.get_message(
+       {},
+       HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
+    }
     ceph_assert(session);
     ceph_assert(session->con);
     session->con->send_message2(std::move(m));
+    ++p;
   }
 }
 
 void MgrClient::_send_open()
 {
   if (session && session->con) {
-    auto open = new MMgrOpen();
+    auto open = make_message<MMgrOpen>();
     if (!service_name.empty()) {
       open->service_name = service_name;
       open->daemon_name = daemon_name;
@@ -192,19 +237,18 @@ void MgrClient::_send_open()
     }
     cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version);
     cct->_conf.get_defaults_bl(&open->config_defaults_bl);
-    session->con->send_message(open);
+    session->con->send_message2(open);
   }
 }
 
-bool MgrClient::handle_mgr_map(MMgrMap *m)
+bool MgrClient::handle_mgr_map(ref_t<MMgrMap> m)
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   ldout(cct, 20) << *m << dendl;
 
   map = m->get_map();
   ldout(cct, 4) << "Got map version " << map.epoch << dendl;
-  m->put();
 
   ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl;
 
@@ -241,7 +285,7 @@ void MgrClient::_send_stats()
   if (stats_period != 0) {
     report_callback = timer.add_event_after(
       stats_period,
-      new FunctionContext([this](int) {
+      new LambdaContext([this](int) {
          _send_stats();
        }));
   }
@@ -249,11 +293,11 @@ void MgrClient::_send_stats()
 
 void MgrClient::_send_report()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
   ceph_assert(session);
   report_callback = nullptr;
 
-  auto report = new MMgrReport();
+  auto report = make_message<MMgrReport>();
   auto pcc = cct->get_perfcounters_collection();
 
   pcc->with_counters([this, report](
@@ -345,16 +389,23 @@ void MgrClient::_send_report()
     daemon_dirty_status = false;
   }
 
+  if (task_dirty_status) {
+    report->task_status = task_status;
+    task_dirty_status = false;
+  }
+
   report->daemon_health_metrics = std::move(daemon_health_metrics);
 
   cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
                            &last_config_bl_version);
 
   if (get_perf_report_cb) {
-    get_perf_report_cb(&report->osd_perf_metric_reports);
+    MetricPayload payload = get_perf_report_cb();
+    MetricReportMessage message(payload);
+    report->metric_report_message = message;
   }
 
-  session->con->send_message(report);
+  session->con->send_message2(report);
 }
 
 void MgrClient::send_pgstats()
@@ -370,15 +421,14 @@ void MgrClient::_send_pgstats()
   }
 }
 
-bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
+bool MgrClient::handle_mgr_configure(ref_t<MMgrConfigure> m)
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   ldout(cct, 20) << *m << dendl;
 
   if (!session) {
     lderr(cct) << "dropping unexpected configure message" << dendl;
-    m->put();
     return true;
   }
 
@@ -389,8 +439,11 @@ bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
     stats_threshold = m->stats_threshold;
   }
 
-  if (set_perf_queries_cb) {
-    set_perf_queries_cb(m->osd_perf_metric_queries);
+  if (!m->osd_perf_metric_queries.empty()) {
+    handle_config_payload(m->osd_perf_metric_queries);
+  } else if (m->metric_config_message) {
+    const MetricConfigMessage &message = *m->metric_config_message;
+    boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
   }
 
   bool starting = (stats_period == 0) && (m->stats_period != 0);
@@ -399,21 +452,19 @@ bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
     _send_stats();
   }
 
-  m->put();
   return true;
 }
 
-bool MgrClient::handle_mgr_close(MMgrClose *m)
+bool MgrClient::handle_mgr_close(ref_t<MMgrClose> m)
 {
   service_daemon = false;
-  shutdown_cond.Signal();
-  m->put();
+  shutdown_cond.notify_all();
   return true;
 }
 
 int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
-                  bufferlist *outbl, string *outs,
-                  Context *onfinish)
+                            bufferlist *outbl, string *outs,
+                            Context *onfinish)
 {
   std::lock_guard l(lock);
 
@@ -432,8 +483,11 @@ int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
   op.on_finish = onfinish;
 
   if (session && session->con) {
-    // Leaving fsid argument null because it isn't used.
-    auto m = op.get_message({});
+    // Leaving fsid argument null because it isn't used historically, and
+    // we can use it as a signal that we are sending a non-tell command.
+    auto m = op.get_message(
+      {},
+      HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
     session->con->send_message2(std::move(m));
   } else {
     ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl;
@@ -441,36 +495,72 @@ int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
   return 0;
 }
 
-bool MgrClient::handle_command_reply(MCommandReply *m)
+int MgrClient::start_tell_command(
+  const string& name,
+  const vector<string>& cmd, const bufferlist& inbl,
+  bufferlist *outbl, string *outs,
+  Context *onfinish)
 {
-  ceph_assert(lock.is_locked_by_me());
+  std::lock_guard l(lock);
 
-  ldout(cct, 20) << *m << dendl;
+  ldout(cct, 20) << "target: " << name << " cmd: " << cmd << dendl;
+
+  if (map.epoch == 0 && mgr_optional) {
+    ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
+    return -EACCES;
+  }
+
+  auto &op = command_table.start_command();
+  op.tell = true;
+  op.name = name;
+  op.cmd = cmd;
+  op.inbl = inbl;
+  op.outbl = outbl;
+  op.outs = outs;
+  op.on_finish = onfinish;
+
+  if (session && session->con && (name.size() == 0 || map.active_name == name)) {
+    // Set fsid argument to signal that this is really a tell message (and
+    // we are not a legacy client sending a non-tell command via MCommand).
+    auto m = op.get_message(monmap->fsid, false);
+    session->con->send_message2(std::move(m));
+  } else {
+    ldout(cct, 5) << "no mgr session (no running mgr daemon?), or "
+                 << name << " not active mgr, waiting" << dendl;
+  }
+  return 0;
+}
+
+bool MgrClient::handle_command_reply(
+  uint64_t tid,
+  bufferlist& data,
+  const std::string& rs,
+  int r)
+{
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
+
+  ldout(cct, 20) << "tid " << tid << " r " << r << dendl;
 
-  const auto tid = m->get_tid();
   if (!command_table.exists(tid)) {
-    ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
+    ldout(cct, 4) << "handle_command_reply tid " << tid
             << " not found" << dendl;
-    m->put();
     return true;
   }
 
   auto &op = command_table.get_command(tid);
   if (op.outbl) {
-    op.outbl->claim(m->get_data());
+    op.outbl->claim(data);
   }
 
   if (op.outs) {
-    *(op.outs) = m->rs;
+    *(op.outs) = rs;
   }
 
   if (op.on_finish) {
-    op.on_finish->complete(m->r);
+    op.on_finish->complete(r);
   }
 
   command_table.erase(tid);
-
-  m->put();
   return true;
 }
 
@@ -480,14 +570,6 @@ int MgrClient::service_daemon_register(
   const std::map<std::string,std::string>& metadata)
 {
   std::lock_guard l(lock);
-  if (service == "osd" ||
-      service == "mds" ||
-      service == "client" ||
-      service == "mon" ||
-      service == "mgr") {
-    // normal ceph entity types are not allowed!
-    return -EINVAL;
-  }
   if (service_daemon) {
     return -EEXIST;
   }
@@ -516,6 +598,15 @@ int MgrClient::service_daemon_update_status(
   return 0;
 }
 
+int MgrClient::service_daemon_update_task_status(
+  std::map<std::string,std::string> &&status) {
+  std::lock_guard l(lock);
+  ldout(cct,10) << status << dendl;
+  task_status = std::move(status);
+  task_dirty_status = true;
+  return 0;
+}
+
 void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
 {
   std::lock_guard l(lock);