]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/MgrClient.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mgr / MgrClient.cc
index 38f0c313c15db56d2115182c8175cac79ba59e2a..4ac88dd99d4627073c46137026071f7910e7edc8 100644 (file)
 #include "MgrClient.h"
 
 #include "mgr/MgrContext.h"
+#include "mon/MonMap.h"
 
 #include "msg/Messenger.h"
 #include "messages/MMgrMap.h"
 #include "messages/MMgrReport.h"
 #include "messages/MMgrOpen.h"
+#include "messages/MMgrClose.h"
 #include "messages/MMgrConfigure.h"
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
+#include "messages/MMgrCommand.h"
+#include "messages/MMgrCommandReply.h"
 #include "messages/MPGStats.h"
 
+using std::string;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::make_message;
+using ceph::ref_cast;
+using ceph::ref_t;
+
 #define dout_subsys ceph_subsys_mgrc
 #undef dout_prefix
 #define dout_prefix *_dout << "mgrc " << __func__ << " "
 
-MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
-    : Dispatcher(cct_), cct(cct_), msgr(msgr_),
-      timer(cct_, lock)
+MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_, MonMap *monmap_)
+  : Dispatcher(cct_),
+    cct(cct_),
+    msgr(msgr_),
+    monmap(monmap_),
+    timer(cct_, lock)
 {
-  assert(cct != nullptr);
+  ceph_assert(cct != nullptr);
 }
 
 void MgrClient::init()
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
-  assert(msgr != nullptr);
+  ceph_assert(msgr != nullptr);
 
   timer.init();
+  initialized = true;
 }
 
 void MgrClient::shutdown()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock l(lock);
+  ldout(cct, 10) << dendl;
 
   if (connect_retry_callback) {
     timer.cancel_event(connect_retry_callback);
@@ -57,6 +74,19 @@ void MgrClient::shutdown()
   // forget about in-flight commands if we are prematurely shut down
   // (e.g., by control-C)
   command_table.clear();
+  if (service_daemon &&
+      session &&
+      session->con &&
+      HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) {
+    ldout(cct, 10) << "closing mgr session" << dendl;
+    auto m = make_message<MMgrClose>();
+    m->daemon_name = daemon_name;
+    m->service_name = service_name;
+    session->con->send_message2(m);
+    auto timeout = ceph::make_timespan(cct->_conf.get_val<double>(
+                             "mgr_client_service_daemon_unregister_timeout"));
+    shutdown_cond.wait_for(l, timeout);
+  }
 
   timer.shutdown();
   if (session) {
@@ -65,18 +95,29 @@ void MgrClient::shutdown()
   }
 }
 
-bool MgrClient::ms_dispatch(Message *m)
+bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   switch(m->get_type()) {
   case MSG_MGR_MAP:
-    return handle_mgr_map(static_cast<MMgrMap*>(m));
+    return handle_mgr_map(ref_cast<MMgrMap>(m));
   case MSG_MGR_CONFIGURE:
-    return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
+    return handle_mgr_configure(ref_cast<MMgrConfigure>(m));
+  case MSG_MGR_CLOSE:
+    return handle_mgr_close(ref_cast<MMgrClose>(m));
   case MSG_COMMAND_REPLY:
     if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
-      handle_command_reply(static_cast<MCommandReply*>(m));
+      MCommandReply *c = static_cast<MCommandReply*>(m.get());
+      handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
+      return true;
+    } else {
+      return false;
+    }
+  case MSG_MGR_COMMAND_REPLY:
+    if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
+      MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
+      handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
       return true;
     } else {
       return false;
@@ -89,7 +130,7 @@ bool MgrClient::ms_dispatch(Message *m)
 
 void MgrClient::reconnect()
 {
-  assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   if (session) {
     ldout(cct, 4) << "Terminating session with "
@@ -108,15 +149,16 @@ void MgrClient::reconnect()
     return;
   }
 
-  if (last_connect_attempt != utime_t()) {
-    utime_t now = ceph_clock_now();
-    utime_t when = last_connect_attempt;
-    when += cct->_conf->get_val<double>("mgr_connect_retry_interval");
+  if (!clock_t::is_zero(last_connect_attempt)) {
+    auto now = clock_t::now();
+    auto when = last_connect_attempt +
+      ceph::make_timespan(
+        cct->_conf.get_val<double>("mgr_connect_retry_interval"));
     if (now < when) {
       if (!connect_retry_callback) {
        connect_retry_callback = timer.add_event_at(
          when,
-         new FunctionContext([this](int r){
+         new LambdaContext([this](int r){
              connect_retry_callback = nullptr;
              reconnect();
            }));
@@ -131,39 +173,62 @@ void MgrClient::reconnect()
     connect_retry_callback = nullptr;
   }
 
-  ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
+  ldout(cct, 4) << "Starting new session with " << map.get_active_addrs()
                << dendl;
-  entity_inst_t inst;
-  inst.addr = map.get_active_addr();
-  inst.name = entity_name_t::MGR(map.get_active_gid());
-  last_connect_attempt = ceph_clock_now();
+  last_connect_attempt = clock_t::now();
 
   session.reset(new MgrSessionState());
-  session->con = msgr->get_connection(inst);
+  session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR,
+                                 map.get_active_addrs());
 
   if (service_daemon) {
     daemon_dirty_status = true;
   }
+  task_dirty_status = true;
 
   // Don't send an open if we're just a client (i.e. doing
   // command-sending, not stats etc)
-  if (!cct->_conf->name.is_client() || service_daemon) {
+  if (msgr->get_mytype() != CEPH_ENTITY_TYPE_CLIENT || service_daemon) {
     _send_open();
   }
 
   // resend any pending commands
-  for (const auto &p : command_table.get_commands()) {
-    MCommand *m = p.second.get_message({});
-    assert(session);
-    assert(session->con);
-    session->con->send_message(m);
+  auto p = command_table.get_commands().begin();
+  while (p != command_table.get_commands().end()) {
+    auto tid = p->first;
+    auto& op = p->second;
+    ldout(cct,10) << "resending " << tid << (op.tell ? " (tell)":" (cli)") << dendl;
+    MessageRef m;
+    if (op.tell) {
+      if (op.name.size() && op.name != map.active_name) {
+       ldout(cct, 10) << "active mgr " << map.active_name << " != target "
+                      << op.name << dendl;
+       if (op.on_finish) {
+         op.on_finish->complete(-ENXIO);
+       }
+       ++p;
+       command_table.erase(tid);
+       continue;
+      }
+      // Set fsid argument to signal that this is really a tell message (and
+      // we are not a legacy client sending a non-tell command via MCommand).
+      m = op.get_message(monmap->fsid, false);
+    } else {
+      m = op.get_message(
+       {},
+       HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
+    }
+    ceph_assert(session);
+    ceph_assert(session->con);
+    session->con->send_message2(std::move(m));
+    ++p;
   }
 }
 
 void MgrClient::_send_open()
 {
   if (session && session->con) {
-    auto open = new MMgrOpen();
+    auto open = make_message<MMgrOpen>();
     if (!service_name.empty()) {
       open->service_name = service_name;
       open->daemon_name = daemon_name;
@@ -174,25 +239,26 @@ void MgrClient::_send_open()
       open->service_daemon = service_daemon;
       open->daemon_metadata = daemon_metadata;
     }
-    session->con->send_message(open);
+    cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version);
+    cct->_conf.get_defaults_bl(&open->config_defaults_bl);
+    session->con->send_message2(open);
   }
 }
 
-bool MgrClient::handle_mgr_map(MMgrMap *m)
+bool MgrClient::handle_mgr_map(ref_t<MMgrMap> m)
 {
-  assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   ldout(cct, 20) << *m << dendl;
 
   map = m->get_map();
   ldout(cct, 4) << "Got map version " << map.epoch << dendl;
-  m->put();
 
-  ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl;
+  ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl;
 
   // Reset session?
   if (!session ||
-      session->con->get_peer_addr() != map.get_active_addr()) {
+      session->con->get_peer_addrs() != map.get_active_addrs()) {
     reconnect();
   }
 
@@ -201,7 +267,7 @@ bool MgrClient::handle_mgr_map(MMgrMap *m)
 
 bool MgrClient::ms_handle_reset(Connection *con)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   if (session && con == session->con) {
     ldout(cct, 4) << __func__ << " con " << con << dendl;
     reconnect();
@@ -223,7 +289,7 @@ void MgrClient::_send_stats()
   if (stats_period != 0) {
     report_callback = timer.add_event_after(
       stats_period,
-      new FunctionContext([this](int) {
+      new LambdaContext([this](int) {
          _send_stats();
        }));
   }
@@ -231,15 +297,15 @@ void MgrClient::_send_stats()
 
 void MgrClient::_send_report()
 {
-  assert(lock.is_locked_by_me());
-  assert(session);
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
+  ceph_assert(session);
   report_callback = nullptr;
 
-  auto report = new MMgrReport();
+  auto report = make_message<MMgrReport>();
   auto pcc = cct->get_perfcounters_collection();
 
   pcc->with_counters([this, report](
-        const PerfCountersCollection::CounterMap &by_path)
+        const PerfCountersCollectionImpl::CounterMap &by_path)
   {
     // Helper for checking whether a counter should be included
     auto include_counter = [this](
@@ -298,10 +364,10 @@ void MgrClient::_send_report()
        session->declared.insert(path);
       }
 
-      ::encode(static_cast<uint64_t>(data.u64), report->packed);
+      encode(static_cast<uint64_t>(data.u64), report->packed);
       if (data.type & PERFCOUNTER_LONGRUNAVG) {
-        ::encode(static_cast<uint64_t>(data.avgcount), report->packed);
-        ::encode(static_cast<uint64_t>(data.avgcount2), report->packed);
+        encode(static_cast<uint64_t>(data.avgcount), report->packed);
+        encode(static_cast<uint64_t>(data.avgcount2), report->packed);
       }
     }
     ENCODE_FINISH(report->packed);
@@ -327,13 +393,28 @@ void MgrClient::_send_report()
     daemon_dirty_status = false;
   }
 
-  report->osd_health_metrics = std::move(osd_health_metrics);
-  session->con->send_message(report);
+  if (task_dirty_status) {
+    report->task_status = task_status;
+    task_dirty_status = false;
+  }
+
+  report->daemon_health_metrics = std::move(daemon_health_metrics);
+
+  cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
+                           &last_config_bl_version);
+
+  if (get_perf_report_cb) {
+    MetricPayload payload = get_perf_report_cb();
+    MetricReportMessage message(payload);
+    report->metric_report_message = message;
+  }
+
+  session->con->send_message2(report);
 }
 
 void MgrClient::send_pgstats()
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   _send_pgstats();
 }
 
@@ -344,15 +425,14 @@ void MgrClient::_send_pgstats()
   }
 }
 
-bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
+bool MgrClient::handle_mgr_configure(ref_t<MMgrConfigure> m)
 {
-  assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   ldout(cct, 20) << *m << dendl;
 
   if (!session) {
     lderr(cct) << "dropping unexpected configure message" << dendl;
-    m->put();
     return true;
   }
 
@@ -363,25 +443,38 @@ bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
     stats_threshold = m->stats_threshold;
   }
 
+  if (!m->osd_perf_metric_queries.empty()) {
+    handle_config_payload(m->osd_perf_metric_queries);
+  } else if (m->metric_config_message) {
+    const MetricConfigMessage &message = *m->metric_config_message;
+    boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
+  }
+
   bool starting = (stats_period == 0) && (m->stats_period != 0);
   stats_period = m->stats_period;
   if (starting) {
     _send_stats();
   }
 
-  m->put();
+  return true;
+}
+
+bool MgrClient::handle_mgr_close(ref_t<MMgrClose> m)
+{
+  service_daemon = false;
+  shutdown_cond.notify_all();
   return true;
 }
 
 int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
-                  bufferlist *outbl, string *outs,
-                  Context *onfinish)
+                            bufferlist *outbl, string *outs,
+                            Context *onfinish)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   ldout(cct, 20) << "cmd: " << cmd << dendl;
 
-  if (map.epoch == 0) {
+  if (map.epoch == 0 && mgr_optional) {
     ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
     return -EACCES;
   }
@@ -394,43 +487,84 @@ int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
   op.on_finish = onfinish;
 
   if (session && session->con) {
-    // Leaving fsid argument null because it isn't used.
-    MCommand *m = op.get_message({});
-    session->con->send_message(m);
+    // Leaving fsid argument null because it isn't used historically, and
+    // we can use it as a signal that we are sending a non-tell command.
+    auto m = op.get_message(
+      {},
+      HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
+    session->con->send_message2(std::move(m));
+  } else {
+    ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl;
   }
   return 0;
 }
 
-bool MgrClient::handle_command_reply(MCommandReply *m)
+int MgrClient::start_tell_command(
+  const string& name,
+  const vector<string>& cmd, const bufferlist& inbl,
+  bufferlist *outbl, string *outs,
+  Context *onfinish)
 {
-  assert(lock.is_locked_by_me());
+  std::lock_guard l(lock);
 
-  ldout(cct, 20) << *m << dendl;
+  ldout(cct, 20) << "target: " << name << " cmd: " << cmd << dendl;
+
+  if (map.epoch == 0 && mgr_optional) {
+    ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
+    return -EACCES;
+  }
+
+  auto &op = command_table.start_command();
+  op.tell = true;
+  op.name = name;
+  op.cmd = cmd;
+  op.inbl = inbl;
+  op.outbl = outbl;
+  op.outs = outs;
+  op.on_finish = onfinish;
+
+  if (session && session->con && (name.size() == 0 || map.active_name == name)) {
+    // Set fsid argument to signal that this is really a tell message (and
+    // we are not a legacy client sending a non-tell command via MCommand).
+    auto m = op.get_message(monmap->fsid, false);
+    session->con->send_message2(std::move(m));
+  } else {
+    ldout(cct, 5) << "no mgr session (no running mgr daemon?), or "
+                 << name << " not active mgr, waiting" << dendl;
+  }
+  return 0;
+}
+
+bool MgrClient::handle_command_reply(
+  uint64_t tid,
+  bufferlist& data,
+  const std::string& rs,
+  int r)
+{
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
+
+  ldout(cct, 20) << "tid " << tid << " r " << r << dendl;
 
-  const auto tid = m->get_tid();
   if (!command_table.exists(tid)) {
-    ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
+    ldout(cct, 4) << "handle_command_reply tid " << tid
             << " not found" << dendl;
-    m->put();
     return true;
   }
 
   auto &op = command_table.get_command(tid);
   if (op.outbl) {
-    op.outbl->claim(m->get_data());
+    *op.outbl = std::move(data);
   }
 
   if (op.outs) {
-    *(op.outs) = m->rs;
+    *(op.outs) = rs;
   }
 
   if (op.on_finish) {
-    op.on_finish->complete(m->r);
+    op.on_finish->complete(r);
   }
 
   command_table.erase(tid);
-
-  m->put();
   return true;
 }
 
@@ -439,15 +573,7 @@ int MgrClient::service_daemon_register(
   const std::string& name,
   const std::map<std::string,std::string>& metadata)
 {
-  Mutex::Locker l(lock);
-  if (name == "osd" ||
-      name == "mds" ||
-      name == "client" ||
-      name == "mon" ||
-      name == "mgr") {
-    // normal ceph entity types are not allowed!
-    return -EINVAL;
-  }
+  std::lock_guard l(lock);
   if (service_daemon) {
     return -EEXIST;
   }
@@ -459,7 +585,7 @@ int MgrClient::service_daemon_register(
   daemon_dirty_status = true;
 
   // late register?
-  if (cct->_conf->name.is_client() && session && session->con) {
+  if (msgr->get_mytype() == CEPH_ENTITY_TYPE_CLIENT && session && session->con) {
     _send_open();
   }
 
@@ -467,17 +593,27 @@ int MgrClient::service_daemon_register(
 }
 
 int MgrClient::service_daemon_update_status(
-  const std::map<std::string,std::string>& status)
+  std::map<std::string,std::string>&& status)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   ldout(cct,10) << status << dendl;
-  daemon_status = status;
+  daemon_status = std::move(status);
   daemon_dirty_status = true;
   return 0;
 }
 
-void MgrClient::update_osd_health(std::vector<OSDHealthMetric>&& metrics)
+int MgrClient::service_daemon_update_task_status(
+  std::map<std::string,std::string> &&status) {
+  std::lock_guard l(lock);
+  ldout(cct,10) << status << dendl;
+  task_status = std::move(status);
+  task_dirty_status = true;
+  return 0;
+}
+
+void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
 {
-  Mutex::Locker l(lock);
-  osd_health_metrics = std::move(metrics);
+  std::lock_guard l(lock);
+  daemon_health_metrics = std::move(metrics);
 }
+