]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/DaemonServer.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mgr / DaemonServer.cc
index e0da9d03bd544a379c6211f8aa2ca7a111c4e7be..48816578ccb52959e3264e74f42c9c98f9f167ce 100644 (file)
@@ -12,6 +12,7 @@
  */
 
 #include "DaemonServer.h"
+#include <boost/algorithm/string.hpp>
 #include "mgr/Mgr.h"
 
 #include "include/stringify.h"
@@ -22,6 +23,7 @@
 #include "mgr/mgr_commands.h"
 #include "mgr/DaemonHealthMetricCollector.h"
 #include "mgr/OSDPerfMetricCollector.h"
+#include "mgr/MDSPerfMetricCollector.h"
 #include "mon/MonCommand.h"
 
 #include "messages/MMgrOpen.h"
@@ -30,6 +32,8 @@
 #include "messages/MMonMgrReport.h"
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
+#include "messages/MMgrCommand.h"
+#include "messages/MMgrCommandReply.h"
 #include "messages/MPGStats.h"
 #include "messages/MOSDScrub.h"
 #include "messages/MOSDScrub2.h"
 #define dout_subsys ceph_subsys_mgr
 #undef dout_prefix
 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
-
-
+using namespace TOPNSPC::common;
+namespace {
+  template <typename Map>
+  bool map_compare(Map const &lhs, Map const &rhs) {
+    return lhs.size() == rhs.size()
+      && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
+                    [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
+  }
+}
 
 DaemonServer::DaemonServer(MonClient *monc_,
                            Finisher &finisher_,
@@ -76,13 +87,14 @@ DaemonServer::DaemonServer(MonClient *monc_,
       py_modules(py_modules_),
       clog(clog_),
       audit_clog(audit_clog_),
-      lock("DaemonServer"),
       pgmap_ready(false),
       timer(g_ceph_context, lock),
       shutting_down(false),
       tick_event(nullptr),
       osd_perf_metric_collector_listener(this),
-      osd_perf_metric_collector(osd_perf_metric_collector_listener)
+      osd_perf_metric_collector(osd_perf_metric_collector_listener),
+      mds_perf_metric_collector_listener(this),
+      mds_perf_metric_collector(mds_perf_metric_collector_listener)
 {
   g_conf().add_observer(this);
 }
@@ -100,7 +112,7 @@ int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
   msgr = Messenger::create(g_ceph_context, public_msgr_type,
                           entity_name_t::MGR(gid),
                           "mgr",
-                          getpid(), 0);
+                          Messenger::get_pid_nonce());
   msgr->set_default_policy(Messenger::Policy::stateless_server(0));
 
   msgr->set_auth_client(monc);
@@ -158,15 +170,9 @@ entity_addrvec_t DaemonServer::get_myaddrs() const
   return msgr->get_myaddrs();
 }
 
-KeyStore *DaemonServer::ms_get_auth1_authorizer_keystore()
-{
-  return monc->rotating_secrets.get();
-}
-
 int DaemonServer::ms_handle_authentication(Connection *con)
 {
-  int ret = 0;
-  MgrSession *s = new MgrSession(cct);
+  auto s = ceph::make_ref<MgrSession>(cct);
   con->set_priv(s);
   s->inst.addr = con->get_peer_addr();
   s->entity_name = con->peer_name;
@@ -180,27 +186,24 @@ int DaemonServer::ms_handle_authentication(Connection *con)
     dout(10) << " session " << s << " " << s->entity_name
             << " allow_all" << dendl;
     s->caps.set_allow_all();
-  }
-
-  if (caps_info.caps.length() > 0) {
+  } else if (caps_info.caps.length() > 0) {
     auto p = caps_info.caps.cbegin();
     string str;
     try {
       decode(str, p);
     }
     catch (buffer::error& e) {
-      ret = -EPERM;
-    }
-    bool success = s->caps.parse(str);
-    if (success) {
       dout(10) << " session " << s << " " << s->entity_name
-              << " has caps " << s->caps << " '" << str << "'" << dendl;
-      ret = 1;
-    } else {
+               << " failed to decode caps" << dendl;
+      return -EACCES;
+    }
+    if (!s->caps.parse(str)) {
       dout(10) << " session " << s << " " << s->entity_name
               << " failed to parse caps '" << str << "'" << dendl;
-      ret = -EPERM;
+      return -EACCES;
     }
+    dout(10) << " session " << s << " " << s->entity_name
+             << " has caps " << s->caps << " '" << str << "'" << dendl;
   }
 
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
@@ -211,22 +214,7 @@ int DaemonServer::ms_handle_authentication(Connection *con)
     osd_cons[s->osd_id].insert(con);
   }
 
-  return ret;
-}
-
-bool DaemonServer::ms_get_authorizer(
-  int dest_type,
-  AuthAuthorizer **authorizer)
-{
-  dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
-
-  if (dest_type == CEPH_ENTITY_TYPE_MON) {
-    return true;
-  }
-
-  *authorizer = monc->build_authorizer(dest_type);
-  dout(20) << "got authorizer " << *authorizer << dendl;
-  return *authorizer != NULL;
+  return 1;
 }
 
 bool DaemonServer::ms_handle_reset(Connection *con)
@@ -256,31 +244,37 @@ bool DaemonServer::ms_handle_refused(Connection *con)
   return false;
 }
 
-bool DaemonServer::ms_dispatch(Message *m)
+bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
 {
   // Note that we do *not* take ::lock here, in order to avoid
   // serializing all message handling.  It's up to each handler
   // to take whatever locks it needs.
   switch (m->get_type()) {
     case MSG_PGSTATS:
-      cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
+      cluster_state.ingest_pgstats(ref_cast<MPGStats>(m));
       maybe_ready(m->get_source().num());
-      m->put();
       return true;
     case MSG_MGR_REPORT:
-      return handle_report(static_cast<MMgrReport*>(m));
+      return handle_report(ref_cast<MMgrReport>(m));
     case MSG_MGR_OPEN:
-      return handle_open(static_cast<MMgrOpen*>(m));
+      return handle_open(ref_cast<MMgrOpen>(m));
     case MSG_MGR_CLOSE:
-      return handle_close(static_cast<MMgrClose*>(m));
+      return handle_close(ref_cast<MMgrClose>(m));
     case MSG_COMMAND:
-      return handle_command(static_cast<MCommand*>(m));
+      return handle_command(ref_cast<MCommand>(m));
+    case MSG_MGR_COMMAND:
+      return handle_command(ref_cast<MMgrCommand>(m));
     default:
       dout(1) << "Unhandled message type " << m->get_type() << dendl;
       return false;
   };
 }
 
+void DaemonServer::dump_pg_ready(ceph::Formatter *f)
+{
+  f->dump_bool("pg_ready", pgmap_ready.load());
+}
+
 void DaemonServer::maybe_ready(int32_t osd_id)
 {
   if (pgmap_ready.load()) {
@@ -333,7 +327,7 @@ void DaemonServer::tick()
 // fire after all modules have had a chance to set their health checks.
 void DaemonServer::schedule_tick_locked(double delay_sec)
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   if (tick_event) {
     timer.cancel_event(tick_event);
@@ -346,7 +340,7 @@ void DaemonServer::schedule_tick_locked(double delay_sec)
     return;
 
   tick_event = timer.add_event_after(delay_sec,
-    new FunctionContext([this](int r) {
+    new LambdaContext([this](int r) {
       tick();
   }));
 }
@@ -363,7 +357,7 @@ void DaemonServer::handle_osd_perf_metric_query_updated()
 
   // Send a fresh MMgrConfigure to all clients, so that they can follow
   // the new policy for transmitting stats
-  finisher.queue(new FunctionContext([this](int r) {
+  finisher.queue(new LambdaContext([this](int r) {
         std::lock_guard l(lock);
         for (auto &c : daemon_connections) {
           if (c->peer_is_osd()) {
@@ -373,6 +367,22 @@ void DaemonServer::handle_osd_perf_metric_query_updated()
       }));
 }
 
+void DaemonServer::handle_mds_perf_metric_query_updated()
+{
+  dout(10) << dendl;
+
+  // Send a fresh MMgrConfigure to all clients, so that they can follow
+  // the new policy for transmitting stats
+  finisher.queue(new LambdaContext([this](int r) {
+        std::lock_guard l(lock);
+        for (auto &c : daemon_connections) {
+          if (c->peer_is_mds()) {
+            _send_configure(c);
+          }
+        }
+      }));
+}
+
 void DaemonServer::shutdown()
 {
   dout(10) << "begin" << dendl;
@@ -392,35 +402,46 @@ static DaemonKey key_from_service(
   const std::string& daemon_name)
 {
   if (!service_name.empty()) {
-    return DaemonKey(service_name, daemon_name);
+    return DaemonKey{service_name, daemon_name};
   } else {
-    return DaemonKey(ceph_entity_type_name(peer_type), daemon_name);
+    return DaemonKey{ceph_entity_type_name(peer_type), daemon_name};
   }
 }
 
-static bool key_from_string(
-  const std::string& name,
-  DaemonKey *out)
+void DaemonServer::fetch_missing_metadata(const DaemonKey& key,
+                                         const entity_addr_t& addr)
 {
-  auto p = name.find('.');
-  if (p == std::string::npos) {
-    return false;
+  if (!daemon_state.is_updating(key) &&
+      (key.type == "osd" || key.type == "mds" || key.type == "mon")) {
+    std::ostringstream oss;
+    auto c = new MetadataUpdate(daemon_state, key);
+    if (key.type == "osd") {
+      oss << "{\"prefix\": \"osd metadata\", \"id\": "
+         << key.name<< "}";
+    } else if (key.type == "mds") {
+      c->set_default("addr", stringify(addr));
+      oss << "{\"prefix\": \"mds metadata\", \"who\": \""
+         << key.name << "\"}";
+    } else if (key.type == "mon") {
+      oss << "{\"prefix\": \"mon metadata\", \"id\": \""
+         << key.name << "\"}";
+    } else {
+      ceph_abort();
+    }
+    monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
   }
-  out->first = name.substr(0, p);
-  out->second = name.substr(p + 1);
-  return true;
 }
 
-bool DaemonServer::handle_open(MMgrOpen *m)
+bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
 {
-  std::lock_guard l(lock);
+  std::unique_lock l(lock);
 
   DaemonKey key = key_from_service(m->service_name,
                                   m->get_connection()->get_peer_type(),
                                   m->daemon_name);
 
   auto con = m->get_connection();
-  dout(4) << "from " << key << " " << con << dendl;
+  dout(10) << "from " << key << " " << con->get_peer_addr() << dendl;
 
   _send_configure(con);
 
@@ -443,7 +464,8 @@ bool DaemonServer::handle_open(MMgrOpen *m)
       dout(2) << "ignoring open from " << key << " " << con->get_peer_addr()
               << "; not ready for session (expect reconnect)" << dendl;
       con->mark_down();
-      m->put();
+      l.unlock();
+      fetch_missing_metadata(key, m->get_source_addr());
       return true;
     }
   }
@@ -457,13 +479,14 @@ bool DaemonServer::handle_open(MMgrOpen *m)
     std::lock_guard l(daemon->lock);
     daemon->perf_counters.clear();
 
+    daemon->service_daemon = m->service_daemon;
     if (m->service_daemon) {
       daemon->service_status = m->daemon_status;
 
       utime_t now = ceph_clock_now();
-      auto d = pending_service_map.get_daemon(m->service_name,
-                                             m->daemon_name);
-      if (d->gid != (uint64_t)m->get_source().num()) {
+      auto [d, added] = pending_service_map.get_daemon(m->service_name,
+                                                      m->daemon_name);
+      if (added || d->gid != (uint64_t)m->get_source().num()) {
        dout(10) << "registering " << key << " in pending_service_map" << dendl;
        d->gid = m->get_source().num();
        d->addr = m->get_source_addr();
@@ -496,11 +519,10 @@ bool DaemonServer::handle_open(MMgrOpen *m)
     daemon_connections.insert(con);
   }
 
-  m->put();
   return true;
 }
 
-bool DaemonServer::handle_close(MMgrClose *m)
+bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
 {
   std::lock_guard l(lock);
 
@@ -522,139 +544,144 @@ bool DaemonServer::handle_close(MMgrClose *m)
   }
 
   // send same message back as a reply
-  m->get_connection()->send_message(m);
+  m->get_connection()->send_message2(m);
   return true;
 }
 
-bool DaemonServer::handle_report(MMgrReport *m)
+void DaemonServer::update_task_status(
+  DaemonKey key,
+  const std::map<std::string,std::string>& task_status)
+{
+  dout(10) << "got task status from " << key << dendl;
+
+  [[maybe_unused]] auto [daemon, added] =
+    pending_service_map.get_daemon(key.type, key.name);
+  if (daemon->task_status != task_status) {
+    daemon->task_status = task_status;
+    pending_service_map_dirty = pending_service_map.epoch;
+  }
+}
+
+bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
 {
   DaemonKey key;
   if (!m->service_name.empty()) {
-    key.first = m->service_name;
+    key.type = m->service_name;
   } else {
-    key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+    key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
   }
-  key.second = m->daemon_name;
+  key.name = m->daemon_name;
 
-  dout(4) << "from " << m->get_connection() << " " << key << dendl;
+  dout(10) << "from " << m->get_connection() << " " << key << dendl;
 
   if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
       m->service_name.empty()) {
     // Clients should not be sending us stats unless they are declaring
     // themselves to be a daemon for some service.
-    dout(4) << "rejecting report from non-daemon client " << m->daemon_name
-           << dendl;
+    dout(10) << "rejecting report from non-daemon client " << m->daemon_name
+            << dendl;
+    clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
+                << " at " << m->get_connection()->get_peer_addrs();
     m->get_connection()->mark_down();
-    m->put();
     return true;
   }
 
-  // Look up the DaemonState
-  DaemonStatePtr daemon;
-  if (daemon_state.exists(key)) {
-    dout(20) << "updating existing DaemonState for " << key << dendl;
-    daemon = daemon_state.get(key);
-  } else {
-    // we don't know the hostname at this stage, reject MMgrReport here.
-    dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
-           << dendl;
-
-    // issue metadata request in background
-    if (!daemon_state.is_updating(key) && 
-       (key.first == "osd" || key.first == "mds" || key.first == "mon")) {
-
-      std::ostringstream oss;
-      auto c = new MetadataUpdate(daemon_state, key);
-      if (key.first == "osd") {
-        oss << "{\"prefix\": \"osd metadata\", \"id\": "
-            << key.second<< "}";
-
-      } else if (key.first == "mds") {
-        c->set_default("addr", stringify(m->get_source_addr()));
-        oss << "{\"prefix\": \"mds metadata\", \"who\": \""
-            << key.second << "\"}";
-      } else if (key.first == "mon") {
-        oss << "{\"prefix\": \"mon metadata\", \"id\": \""
-            << key.second << "\"}";
-      } else {
-       ceph_abort();
-      }
 
-      monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
-    }
-    
-    {
-      std::lock_guard l(lock);
+  {
+    std::unique_lock locker(lock);
+
+    DaemonStatePtr daemon;
+    // Look up the DaemonState
+    if (daemon_state.exists(key)) {
+      dout(20) << "updating existing DaemonState for " << key << dendl;
+      daemon = daemon_state.get(key);
+    } else {
+      locker.unlock();
+
+      // we don't know the hostname at this stage, reject MMgrReport here.
+      dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
+              << dendl;
+      // issue metadata request in background
+      fetch_missing_metadata(key, m->get_source_addr());
+
+      locker.lock();
+
       // kill session
       auto priv = m->get_connection()->get_priv();
       auto session = static_cast<MgrSession*>(priv.get());
       if (!session) {
-       return false;
+        return false;
       }
       m->get_connection()->mark_down();
 
       dout(10) << "unregistering osd." << session->osd_id
-              << "  session " << session << " con " << m->get_connection() << dendl;
+               << "  session " << session << " con " << m->get_connection() << dendl;
       
       if (osd_cons.find(session->osd_id) != osd_cons.end()) {
-          osd_cons[session->osd_id].erase(m->get_connection());
-      } 
+        osd_cons[session->osd_id].erase(m->get_connection());
+      }
 
       auto iter = daemon_connections.find(m->get_connection());
       if (iter != daemon_connections.end()) {
-       daemon_connections.erase(iter);
+        daemon_connections.erase(iter);
       }
-    }
-
-    return false;
-  }
 
-  // Update the DaemonState
-  ceph_assert(daemon != nullptr);
-  {
-    std::lock_guard l(daemon->lock);
-    auto &daemon_counters = daemon->perf_counters;
-    daemon_counters.update(m);
-
-    auto p = m->config_bl.cbegin();
-    if (p != m->config_bl.end()) {
-      decode(daemon->config, p);
-      decode(daemon->ignored_mon_config, p);
-      dout(20) << " got config " << daemon->config
-              << " ignored " << daemon->ignored_mon_config << dendl;
+      return false;
     }
 
-    if (daemon->service_daemon) {
+    // Update the DaemonState
+    ceph_assert(daemon != nullptr);
+    {
+      std::lock_guard l(daemon->lock);
+      auto &daemon_counters = daemon->perf_counters;
+      daemon_counters.update(*m.get());
+
+      auto p = m->config_bl.cbegin();
+      if (p != m->config_bl.end()) {
+        decode(daemon->config, p);
+        decode(daemon->ignored_mon_config, p);
+        dout(20) << " got config " << daemon->config
+                 << " ignored " << daemon->ignored_mon_config << dendl;
+      }
+
       utime_t now = ceph_clock_now();
-      if (m->daemon_status) {
-        daemon->service_status = *m->daemon_status;
-        daemon->service_status_stamp = now;
+      if (daemon->service_daemon) {
+        if (m->daemon_status) {
+          daemon->service_status_stamp = now;
+          daemon->service_status = *m->daemon_status;
+        }
+        daemon->last_service_beacon = now;
+      } else if (m->daemon_status) {
+        derr << "got status from non-daemon " << key << dendl;
+      }
+      // update task status
+      if (m->task_status) {
+        update_task_status(key, *m->task_status);
+        daemon->last_service_beacon = now;
+      }
+      if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
+        // only OSD and MON send health_checks to me now
+        daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
+        dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
+                 << dendl;
       }
-      daemon->last_service_beacon = now;
-    } else if (m->daemon_status) {
-      derr << "got status from non-daemon " << key << dendl;
-    }
-    if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
-      // only OSD and MON send health_checks to me now
-      daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
-      dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
-              << dendl;
     }
   }
 
   // if there are any schema updates, notify the python modules
   if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
-    ostringstream oss;
-    oss << key.first << '.' << key.second;
-    py_modules.notify_all("perf_schema_update", oss.str());
+    py_modules.notify_all("perf_schema_update", ceph::to_string(key));
   }
 
   if (m->get_connection()->peer_is_osd()) {
     osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
   }
 
-  m->put();
+  if (m->metric_report_message) {
+    const MetricReportMessage &message = *m->metric_report_message;
+    boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
+  }
+
   return true;
 }
 
@@ -669,7 +696,7 @@ void DaemonServer::_generate_command_map(
       continue;
     if (p->first == "caps") {
       vector<string> cv;
-      if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
+      if (cmd_getval(cmdmap, "caps", cv) &&
          cv.size() % 2 == 0) {
        for (unsigned i = 0; i < cv.size(); i += 2) {
          string k = string("caps_") + cv[i];
@@ -734,16 +761,22 @@ bool DaemonServer::_allowed_command(
  */
 class CommandContext {
 public:
-  MCommand *m;
+  ceph::ref_t<MCommand> m_tell;
+  ceph::ref_t<MMgrCommand> m_mgr;
+  const std::vector<std::string>& cmd;  ///< ref into m_tell or m_mgr
+  const bufferlist& data;               ///< ref into m_tell or m_mgr
   bufferlist odata;
   cmdmap_t cmdmap;
 
-  explicit CommandContext(MCommand *m_)
-    : m(m_) {
+  explicit CommandContext(ceph::ref_t<MCommand> m)
+    : m_tell{std::move(m)},
+      cmd(m_tell->cmd),
+      data(m_tell->get_data()) {
   }
-
-  ~CommandContext() {
-    m->put();
+  explicit CommandContext(ceph::ref_t<MMgrCommand> m)
+    : m_mgr{std::move(m)},
+      cmd(m_mgr->cmd),
+      data(m_mgr->get_data()) {
   }
 
   void reply(int r, const std::stringstream &ss) {
@@ -752,21 +785,29 @@ public:
 
   void reply(int r, const std::string &rs) {
     // Let the connection drop as soon as we've sent our response
-    ConnectionRef con = m->get_connection();
+    ConnectionRef con = m_tell ? m_tell->get_connection()
+      : m_mgr->get_connection();
     if (con) {
       con->mark_disposable();
     }
 
     if (r == 0) {
-      dout(4) << __func__ << " success" << dendl;
+      dout(20) << "success" << dendl;
     } else {
       derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
     }
     if (con) {
-      MCommandReply *reply = new MCommandReply(r, rs);
-      reply->set_tid(m->get_tid());
-      reply->set_data(odata);
-      con->send_message(reply);
+      if (m_tell) {
+       MCommandReply *reply = new MCommandReply(r, rs);
+       reply->set_tid(m_tell->get_tid());
+       reply->set_data(odata);
+       con->send_message(reply);
+      } else {
+       MMgrCommandReply *reply = new MMgrCommandReply(r, rs);
+       reply->set_tid(m_mgr->get_tid());
+       reply->set_data(odata);
+       con->send_message(reply);
+      }
     }
   }
 };
@@ -791,12 +832,24 @@ public:
   }
 };
 
-bool DaemonServer::handle_command(MCommand *m)
+bool DaemonServer::handle_command(const ref_t<MCommand>& m)
 {
   std::lock_guard l(lock);
-  std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
+  auto cmdctx = std::make_shared<CommandContext>(m);
   try {
-    return _handle_command(m, cmdctx);
+    return _handle_command(cmdctx);
+  } catch (const bad_cmd_get& e) {
+    cmdctx->reply(-EINVAL, e.what());
+    return true;
+  }
+}
+
+bool DaemonServer::handle_command(const ref_t<MMgrCommand>& m)
+{
+  std::lock_guard l(lock);
+  auto cmdctx = std::make_shared<CommandContext>(m);
+  try {
+    return _handle_command(cmdctx);
   } catch (const bad_cmd_get& e) {
     cmdctx->reply(-EINVAL, e.what());
     return true;
@@ -809,47 +862,199 @@ void DaemonServer::log_access_denied(
   dout(1) << " access denied" << dendl;
   audit_clog->info() << "from='" << session->inst << "' "
                      << "entity='" << session->entity_name << "' "
-                     << "cmd=" << cmdctx->cmdmap << ":  access denied";
+                     << "cmd=" << cmdctx->cmd << ":  access denied";
   ss << "access denied: does your client key have mgr caps? "
-        "See http://docs.ceph.com/docs/master/mgr/administrator/"
+        "See http://docs.ceph.com/en/latest/mgr/administrator/"
         "#client-authentication";
 }
 
+void DaemonServer::_check_offlines_pgs(
+  const set<int>& osds,
+  const OSDMap& osdmap,
+  const PGMap& pgmap,
+  offline_pg_report *report)
+{
+  // reset output
+  *report = offline_pg_report();
+  report->osds = osds;
+
+  for (const auto& q : pgmap.pg_stat) {
+    set<int32_t> pg_acting;  // net acting sets (with no missing if degraded)
+    bool found = false;
+    if (q.second.state == 0) {
+      report->unknown.insert(q.first);
+      continue;
+    }
+    if (q.second.state & PG_STATE_DEGRADED) {
+      for (auto& anm : q.second.avail_no_missing) {
+       if (osds.count(anm.osd)) {
+         found = true;
+         continue;
+       }
+       if (anm.osd != CRUSH_ITEM_NONE) {
+         pg_acting.insert(anm.osd);
+       }
+      }
+    } else {
+      for (auto& a : q.second.acting) {
+       if (osds.count(a)) {
+         found = true;
+         continue;
+       }
+       if (a != CRUSH_ITEM_NONE) {
+         pg_acting.insert(a);
+       }
+      }
+    }
+    if (!found) {
+      continue;
+    }
+    const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
+    bool dangerous = false;
+    if (!pi) {
+      report->bad_no_pool.insert(q.first); // pool is creating or deleting
+      dangerous = true;
+    }
+    if (!(q.second.state & PG_STATE_ACTIVE)) {
+      report->bad_already_inactive.insert(q.first);
+      dangerous = true;
+    }
+    if (pg_acting.size() < pi->min_size) {
+      report->bad_become_inactive.insert(q.first);
+      dangerous = true;
+    }
+    if (dangerous) {
+      report->not_ok.insert(q.first);
+    } else {
+      report->ok.insert(q.first);
+      if (q.second.state & PG_STATE_DEGRADED) {
+       report->ok_become_more_degraded.insert(q.first);
+      } else {
+       report->ok_become_degraded.insert(q.first);
+      }
+    }
+  }
+  dout(20) << osds << " -> " << report->ok.size() << " ok, "
+          << report->not_ok.size() << " not ok, "
+          << report->unknown.size() << " unknown"
+          << dendl;
+}
+
+void DaemonServer::_maximize_ok_to_stop_set(
+  const set<int>& orig_osds,
+  unsigned max,
+  const OSDMap& osdmap,
+  const PGMap& pgmap,
+  offline_pg_report *out_report)
+{
+  dout(20) << "orig_osds " << orig_osds << " max " << max << dendl;
+  _check_offlines_pgs(orig_osds, osdmap, pgmap, out_report);
+  if (!out_report->ok_to_stop()) {
+    return;
+  }
+  if (orig_osds.size() >= max) {
+    // already at max
+    return;
+  }
+
+  // semi-arbitrarily start with the first osd in the set
+  offline_pg_report report;
+  set<int> osds = orig_osds;
+  int parent = *osds.begin();
+  set<int> children;
+
+  while (true) {
+    // identify the next parent
+    int r = osdmap.crush->get_immediate_parent_id(parent, &parent);
+    if (r < 0) {
+      return;  // just go with what we have so far!
+    }
+
+    // get candidate additions that are beneath this point in the tree
+    children.clear();
+    r = osdmap.crush->get_all_children(parent, &children);
+    if (r < 0) {
+      return;  // just go with what we have so far!
+    }
+    dout(20) << "  parent " << parent << " children " << children << dendl;
+
+    // try adding in more osds
+    int failed = 0;  // how many children we failed to add to our set
+    for (auto o : children) {
+      if (o >= 0 && osdmap.is_up(o) && osds.count(o) == 0) {
+       osds.insert(o);
+       _check_offlines_pgs(osds, osdmap, pgmap, &report);
+       if (!report.ok_to_stop()) {
+         osds.erase(o);
+         ++failed;
+         continue;
+       }
+       *out_report = report;
+       if (osds.size() == max) {
+         dout(20) << " hit max" << dendl;
+         return;  // yay, we hit the max
+       }
+      }
+    }
+
+    if (failed) {
+      // we hit some failures; go with what we have
+      dout(20) << " hit some peer failures" << dendl;
+      return;
+    }
+  }
+}
+
 bool DaemonServer::_handle_command(
-  MCommand *m,
   std::shared_ptr<CommandContext>& cmdctx)
 {
+  MessageRef m;
+  bool admin_socket_cmd = false;
+  if (cmdctx->m_tell) {
+    m = cmdctx->m_tell;
+    // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
+    // command.
+    admin_socket_cmd = (cmdctx->m_tell->fsid != uuid_d());
+  } else {
+    m = cmdctx->m_mgr;
+  }
   auto priv = m->get_connection()->get_priv();
   auto session = static_cast<MgrSession*>(priv.get());
   if (!session) {
     return true;
   }
-  if (session->inst.name == entity_name_t())
+  if (session->inst.name == entity_name_t()) {
     session->inst.name = m->get_source();
+  }
 
-  std::string format;
-  boost::scoped_ptr<Formatter> f;
   map<string,string> param_str_map;
   std::stringstream ss;
   int r = 0;
 
-  if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
+  if (!cmdmap_from_json(cmdctx->cmd, &(cmdctx->cmdmap), ss)) {
     cmdctx->reply(-EINVAL, ss);
     return true;
   }
 
+  string prefix;
+  cmd_getval(cmdctx->cmdmap, "prefix", prefix);
+  dout(10) << "decoded-size=" << cmdctx->cmdmap.size() << " prefix=" << prefix << dendl;
+
+  boost::scoped_ptr<Formatter> f;
   {
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
+    std::string format;
+    if (boost::algorithm::ends_with(prefix, "_json")) {
+      format = "json";
+    } else {
+      cmd_getval(cmdctx->cmdmap, "format", format, string("plain"));
+    }
     f.reset(Formatter::create(format));
   }
 
-  string prefix;
-  cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
-
-  dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
-  dout(4) << "prefix=" << prefix << dendl;
-
-  if (prefix == "get_command_descriptions") {
+  // this is just for mgr commands - admin socket commands will fall
+  // through and use the admin socket version of
+  // get_command_descriptions
+  if (prefix == "get_command_descriptions" && !admin_socket_cmd) {
     dout(10) << "reading commands from python modules" << dendl;
     const auto py_commands = py_modules.get_commands();
 
@@ -886,7 +1091,10 @@ bool DaemonServer::_handle_command(
 
   bool is_allowed = false;
   ModuleCommand py_command;
-  if (!mgr_cmd) {
+  if (admin_socket_cmd) {
+    // admin socket commands require all capabilities
+    is_allowed = session->caps.is_allow_all();
+  } else if (!mgr_cmd) {
     // Resolve the command to the name of the module that will
     // handle it (if the command exists)
     auto py_commands = py_modules.get_py_commands();
@@ -917,7 +1125,12 @@ bool DaemonServer::_handle_command(
   audit_clog->debug()
     << "from='" << session->inst << "' "
     << "entity='" << session->entity_name << "' "
-    << "cmd=" << m->cmd << ": dispatch";
+    << "cmd=" << cmdctx->cmd << ": dispatch";
+
+  if (admin_socket_cmd) {
+    cct->get_admin_socket()->queue_tell_command(cmdctx->m_tell);
+    return true;
+  }
 
   // ----------------
   // service map commands
@@ -936,11 +1149,15 @@ bool DaemonServer::_handle_command(
       f.reset(Formatter::create("json-pretty"));
     // only include state from services that are in the persisted service map
     f->open_object_section("service_status");
-    for (auto& p : pending_service_map.services) {
-      f->open_object_section(p.first.c_str());
-      for (auto& q : p.second.daemons) {
+    for (auto& [type, service] : pending_service_map.services) {
+      if (ServiceMap::is_normal_ceph_entity(type)) {
+        continue;
+      }
+
+      f->open_object_section(type.c_str());
+      for (auto& q : service.daemons) {
        f->open_object_section(q.first.c_str());
-       DaemonKey key(p.first, q.first);
+       DaemonKey key{type, q.first};
        ceph_assert(daemon_state.exists(key));
        auto daemon = daemon_state.get(key);
        std::lock_guard l(daemon->lock);
@@ -964,8 +1181,8 @@ bool DaemonServer::_handle_command(
   if (prefix == "config set") {
     std::string key;
     std::string val;
-    cmd_getval(cct, cmdctx->cmdmap, "key", key);
-    cmd_getval(cct, cmdctx->cmdmap, "value", val);
+    cmd_getval(cmdctx->cmdmap, "key", key);
+    cmd_getval(cmdctx->cmdmap, "value", val);
     r = cct->_conf.set_val(key, val, &ss);
     if (r == 0) {
       cct->_conf.apply_changes(nullptr);
@@ -984,7 +1201,7 @@ bool DaemonServer::_handle_command(
     pg_t pgid;
     spg_t spgid;
     string pgidstr;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
+    cmd_getval(cmdctx->cmdmap, "pgid", pgidstr);
     if (!pgid.parse(pgidstr.c_str())) {
       ss << "invalid pgid '" << pgidstr << "'";
       cmdctx->reply(-EINVAL, ss);
@@ -1041,7 +1258,7 @@ bool DaemonServer::_handle_command(
              prefix == "osd deep-scrub" ||
              prefix == "osd repair") {
     string whostr;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
+    cmd_getval(cmdctx->cmdmap, "who", whostr);
     vector<string> pvec;
     get_str_vec(prefix, pvec);
 
@@ -1126,7 +1343,7 @@ bool DaemonServer::_handle_command(
              prefix == "osd pool deep-scrub" ||
              prefix == "osd pool repair") {
     vector<string> pool_names;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", pool_names);
+    cmd_getval(cmdctx->cmdmap, "who", pool_names);
     if (pool_names.empty()) {
       ss << "must specify one or more pool names";
       cmdctx->reply(-EINVAL, ss);
@@ -1199,10 +1416,10 @@ bool DaemonServer::_handle_command(
       prefix == "osd test-reweight-by-pg" ||
       prefix == "osd test-reweight-by-utilization";
     int64_t oload;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
+    cmd_getval(cmdctx->cmdmap, "oload", oload, int64_t(120));
     set<int64_t> pools;
     vector<string> poolnames;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
+    cmd_getval(cmdctx->cmdmap, "pools", poolnames);
     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
        for (const auto& poolname : poolnames) {
          int64_t pool = osdmap.lookup_pg_pool_name(poolname);
@@ -1219,21 +1436,21 @@ bool DaemonServer::_handle_command(
     }
     
     double max_change = g_conf().get_val<double>("mon_reweight_max_change");
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
+    cmd_getval(cmdctx->cmdmap, "max_change", max_change);
     if (max_change <= 0.0) {
       ss << "max_change " << max_change << " must be positive";
       cmdctx->reply(-EINVAL, ss);
       return true;
     }
     int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
+    cmd_getval(cmdctx->cmdmap, "max_osds", max_osds);
     if (max_osds <= 0) {
       ss << "max_osds " << max_osds << " must be positive";
       cmdctx->reply(-EINVAL, ss);
       return true;
     }
     bool no_increasing = false;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
+    cmd_getval(cmdctx->cmdmap, "no_increasing", no_increasing);
     string out_str;
     mempool::osdmap::map<int32_t, uint32_t> new_weights;
     r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
@@ -1283,39 +1500,21 @@ bool DaemonServer::_handle_command(
       return true;
     }
   } else if (prefix == "osd df") {
-    string method;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
-    string filter_by;
-    string filter;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "filter_by", filter_by);
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "filter", filter);
-    if (filter_by.empty() != filter.empty()) {
-      cmdctx->reply(-EINVAL, "you must specify both 'filter_by' and 'filter'");
-      return true;
-    }
+    string method, filter;
+    cmd_getval(cmdctx->cmdmap, "output_method", method);
+    cmd_getval(cmdctx->cmdmap, "filter", filter);
     stringstream rs;
     r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
-        string class_name;
-        string item_name;
         // sanity check filter(s)
-        if (filter_by == "class") {
-          if (!osdmap.crush->class_exists(filter)) {
-            rs << "specified class '" << filter << "' does not exist";
-            return -EINVAL;
-          }
-          class_name = filter;
-        }
-        if (filter_by == "name") {
-          if (!osdmap.crush->name_exists(filter)) {
-            rs << "specified name '" << filter << "' does not exist";
-            return -EINVAL;
-          }
-          item_name = filter;
+        if (!filter.empty() &&
+             osdmap.lookup_pg_pool_name(filter) < 0 &&
+            !osdmap.crush->class_exists(filter) &&
+            !osdmap.crush->name_exists(filter)) {
+          rs << "'" << filter << "' not a pool, crush node or device class name";
+          return -EINVAL;
         }
        print_osd_utilization(osdmap, pgmap, ss,
-                              f.get(), method == "tree",
-                              class_name, item_name);
-       
+                              f.get(), method == "tree", filter);
        cmdctx->odata.append(ss);
        return 0;
       });
@@ -1323,7 +1522,7 @@ bool DaemonServer::_handle_command(
     return true;
   } else if (prefix == "osd pool stats") {
     string pool_name;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "pool_name", pool_name);
+    cmd_getval(cmdctx->cmdmap, "pool_name", pool_name);
     int64_t poolid = -ENOENT;
     bool one_pool = false;
     r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
@@ -1374,7 +1573,7 @@ bool DaemonServer::_handle_command(
     int r = 0;
     if (prefix == "osd safe-to-destroy") {
       vector<string> ids;
-      cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
+      cmd_getval(cmdctx->cmdmap, "ids", ids);
       cluster_state.with_osdmap([&](const OSDMap& osdmap) {
                                  r = osdmap.parse_osd_id_list(ids, &osds, &ss);
                                });
@@ -1384,7 +1583,7 @@ bool DaemonServer::_handle_command(
       }
     } else {
       int64_t id;
-      if (!cmd_getval(g_ceph_context, cmdctx->cmdmap, "id", id)) {
+      if (!cmd_getval(cmdctx->cmdmap, "id", id)) {
        r = -EINVAL;
        ss << "must specify OSD id";
       } else {
@@ -1503,10 +1702,10 @@ bool DaemonServer::_handle_command(
 
     if (r) {
       bool force = false;
-      cmd_getval(cct, cmdctx->cmdmap, "force", force);
+      cmd_getval(cmdctx->cmdmap, "force", force);
       if (!force) {
         // Backward compat
-        cmd_getval(cct, cmdctx->cmdmap, "yes_i_really_mean_it", force);
+        cmd_getval(cmdctx->cmdmap, "yes_i_really_mean_it", force);
       }
       if (!force) {
         ss << "\nYou can proceed by passing --force, but be warned that"
@@ -1530,8 +1729,10 @@ bool DaemonServer::_handle_command(
     return true;
   } else if (prefix == "osd ok-to-stop") {
     vector<string> ids;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
+    cmd_getval(cmdctx->cmdmap, "ids", ids);
     set<int> osds;
+    int64_t max = 1;
+    cmd_getval(cmdctx->cmdmap, "max", max);
     int r;
     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
        r = osdmap.parse_osd_id_list(ids, &osds, &ss);
@@ -1540,78 +1741,36 @@ bool DaemonServer::_handle_command(
       ss << "must specify one or more OSDs";
       r = -EINVAL;
     }
+    if (max < (int)osds.size()) {
+      max = osds.size();
+    }
     if (r < 0) {
       cmdctx->reply(r, ss);
       return true;
     }
-    int touched_pgs = 0;
-    int dangerous_pgs = 0;
+    offline_pg_report out_report;
     cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
-       if (pg_map.num_pg_unknown > 0) {
-         ss << pg_map.num_pg_unknown << " pgs have unknown state; "
-            << "cannot draw any conclusions";
-         r = -EAGAIN;
-         return;
-       }
-       for (const auto& q : pg_map.pg_stat) {
-          set<int32_t> pg_acting;  // net acting sets (with no missing if degraded)
-         bool found = false;
-         if (q.second.state & PG_STATE_DEGRADED) {
-           for (auto& anm : q.second.avail_no_missing) {
-             if (osds.count(anm.osd)) {
-               found = true;
-               continue;
-             }
-             if (anm.osd != CRUSH_ITEM_NONE) {
-               pg_acting.insert(anm.osd);
-             }
-           }
-         } else {
-           for (auto& a : q.second.acting) {
-             if (osds.count(a)) {
-               found = true;
-               continue;
-             }
-             if (a != CRUSH_ITEM_NONE) {
-               pg_acting.insert(a);
-             }
-           }
-         }
-         if (!found) {
-           continue;
-         }
-         touched_pgs++;
-         if (!(q.second.state & PG_STATE_ACTIVE) ||
-             (q.second.state & PG_STATE_DEGRADED)) {
-           ++dangerous_pgs;
-           continue;
-         }
-         const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
-         if (!pi) {
-           ++dangerous_pgs; // pool is creating or deleting
-         } else {
-           if (pg_acting.size() < pi->min_size) {
-             ++dangerous_pgs;
-           }
-         }
-       }
+       _maximize_ok_to_stop_set(
+         osds, max, osdmap, pg_map,
+         &out_report);
       });
-    if (r) {
-      cmdctx->reply(r, ss);
-      return true;
+    if (!f) {
+      f.reset(Formatter::create("json"));
+    }
+    f->dump_object("ok_to_stop", out_report);
+    f->flush(cmdctx->odata);
+    cmdctx->odata.append("\n");
+    if (!out_report.unknown.empty()) {
+      ss << out_report.unknown.size() << " pgs have unknown state; "
+        << "cannot draw any conclusions";
+      cmdctx->reply(-EAGAIN, ss);
     }
-    if (dangerous_pgs) {
-      ss << dangerous_pgs << " PGs are already too degraded, would become"
-        << " too degraded or might become unavailable";
+    if (!out_report.ok_to_stop()) {
+      ss << "unsafe to stop osd(s) at this time (" << out_report.not_ok.size() << " PGs are or would become offline)";
       cmdctx->reply(-EBUSY, ss);
-      return true;
+    } else {
+      cmdctx->reply(0, ss);
     }
-    ss << "OSD(s) " << osds << " are ok to stop without reducing"
-       << " availability or risking data, provided there are no other concurrent failures"
-       << " or interventions." << std::endl;
-    ss << touched_pgs << " PGs are likely to be"
-       << " degraded (but remain available) as a result.";
-    cmdctx->reply(0, ss);
     return true;
   } else if (prefix == "pg force-recovery" ||
             prefix == "pg force-backfill" ||
@@ -1643,7 +1802,7 @@ bool DaemonServer::_handle_command(
     if (granularity == "pg") {
       // covnert pg names to pgs, discard any invalid ones while at it
       vector<string> pgids;
-      cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgids);
+      cmd_getval(cmdctx->cmdmap, "pgid", pgids);
       for (auto& i : pgids) {
         pg_t pgid;
         if (!pgid.parse(i.c_str())) {
@@ -1656,7 +1815,7 @@ bool DaemonServer::_handle_command(
     } else {
       // per pool
       vector<string> pool_names;
-      cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", pool_names);
+      cmd_getval(cmdctx->cmdmap, "who", pool_names);
       if (pool_names.empty()) {
         ss << "must specify one or more pool names";
         cmdctx->reply(-EINVAL, ss);
@@ -1781,14 +1940,14 @@ bool DaemonServer::_handle_command(
   } else if (prefix == "config show" ||
             prefix == "config show-with-defaults") {
     string who;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", who);
-    int r = 0;
-    auto dot = who.find('.');
-    DaemonKey key;
-    key.first = who.substr(0, dot);
-    key.second = who.substr(dot + 1);
+    cmd_getval(cmdctx->cmdmap, "who", who);
+    auto [key, valid] = DaemonKey::parse(who);
+    if (!valid) {
+      ss << "invalid daemon name: use <type>.<id>";
+      cmdctx->reply(-EINVAL, ss);
+      return true;
+    }
     DaemonStatePtr daemon = daemon_state.get(key);
-    string name;
     if (!daemon) {
       ss << "no config state for daemon " << who;
       cmdctx->reply(-ENOENT, ss);
@@ -1797,7 +1956,15 @@ bool DaemonServer::_handle_command(
 
     std::lock_guard l(daemon->lock);
 
-    if (cmd_getval(g_ceph_context, cmdctx->cmdmap, "key", name)) {
+    int r = 0;
+    string name;
+    if (cmd_getval(cmdctx->cmdmap, "key", name)) {
+      // handle special options
+      if (name == "fsid") {
+       cmdctx->odata.append(stringify(monc->get_fsid()) + "\n");
+       cmdctx->reply(r, ss);
+       return true;
+      }
       auto p = daemon->config.find(name);
       if (p != daemon->config.end() &&
          !p->second.empty()) {
@@ -1970,15 +2137,16 @@ bool DaemonServer::_handle_command(
       tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
       tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
       tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
+      tbl.define_column("WEAR", TextTable::RIGHT, TextTable::RIGHT);
       tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
       auto now = ceph_clock_now();
       daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
          string h;
-         for (auto& i : dev.devnames) {
+         for (auto& i : dev.attachments) {
            if (h.size()) {
              h += " ";
            }
-           h += i.first + ":" + i.second;
+           h += std::get<0>(i) + ":" + std::get<1>(i);
          }
          string d;
          for (auto& i : dev.daemons) {
@@ -1987,9 +2155,15 @@ bool DaemonServer::_handle_command(
            }
            d += to_string(i);
          }
+         char wear_level_str[16] = {0};
+         if (dev.wear_level >= 0) {
+           snprintf(wear_level_str, sizeof(wear_level_str)-1, "%d%%",
+                    (int)(100.1 * dev.wear_level));
+         }
          tbl << dev.devid
              << h
              << d
+             << wear_level_str
              << dev.get_life_expectancy_str(now)
              << TextTable::endrow;
        });
@@ -1999,9 +2173,8 @@ bool DaemonServer::_handle_command(
     return true;
   } else if (prefix == "device ls-by-daemon") {
     string who;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", who);
-    DaemonKey k;
-    if (!key_from_string(who, &k)) {
+    cmd_getval(cmdctx->cmdmap, "who", who);
+    if (auto [k, valid] = DaemonKey::parse(who); !valid) {
       ss << who << " is not a valid daemon name";
       r = -EINVAL;
     } else {
@@ -2027,11 +2200,11 @@ bool DaemonServer::_handle_command(
            daemon_state.with_device(
              i.first, [&tbl, now] (const DeviceState& dev) {
                string h;
-               for (auto& i : dev.devnames) {
+               for (auto& i : dev.attachments) {
                  if (h.size()) {
                    h += " ";
                  }
-                 h += i.first + ":" + i.second;
+                 h += std::get<0>(i) + ":" + std::get<1>(i);
                }
                tbl << dev.devid
                    << h
@@ -2049,7 +2222,7 @@ bool DaemonServer::_handle_command(
     }
   } else if (prefix == "device ls-by-host") {
     string host;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "host", host);
+    cmd_getval(cmdctx->cmdmap, "host", host);
     set<string> devids;
     daemon_state.list_devids_by_server(host, &devids);
     if (f) {
@@ -2073,12 +2246,12 @@ bool DaemonServer::_handle_command(
        daemon_state.with_device(
          devid, [&tbl, &host, now] (const DeviceState& dev) {
            string n;
-           for (auto& j : dev.devnames) {
-             if (j.first == host) {
+           for (auto& j : dev.attachments) {
+             if (std::get<0>(j) == host) {
                if (n.size()) {
                  n += " ";
                }
-               n += j.second;
+               n += std::get<1>(j);
              }
            }
            string d;
@@ -2101,7 +2274,7 @@ bool DaemonServer::_handle_command(
     return true;
   } else if (prefix == "device info") {
     string devid;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
+    cmd_getval(cmdctx->cmdmap, "devid", devid);
     int r = 0;
     ostringstream rs;
     if (!daemon_state.with_device(devid,
@@ -2125,10 +2298,10 @@ bool DaemonServer::_handle_command(
     return true;
   } else if (prefix == "device set-life-expectancy") {
     string devid;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
+    cmd_getval(cmdctx->cmdmap, "devid", devid);
     string from_str, to_str;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "from", from_str);
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "to", to_str);
+    cmd_getval(cmdctx->cmdmap, "from", from_str);
+    cmd_getval(cmdctx->cmdmap, "to", to_str);
     utime_t from, to;
     if (!from.parse(from_str)) {
       ss << "unable to parse datetime '" << from_str << "'";
@@ -2163,7 +2336,7 @@ bool DaemonServer::_handle_command(
     return true;
   } else if (prefix == "device rm-life-expectancy") {
     string devid;
-    cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
+    cmd_getval(cmdctx->cmdmap, "devid", devid);
     map<string,string> meta;
     if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
          dev.rm_life_expectancy();
@@ -2230,11 +2403,13 @@ bool DaemonServer::_handle_command(
     return true;
   }
 
-  dout(10) << "passing through " << cmdctx->cmdmap.size() << dendl;
-  finisher.queue(new FunctionContext([this, cmdctx, session, py_command, prefix]
-                                     (int r_) mutable {
+  dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
+  finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]
+                                   (int r_) mutable {
     std::stringstream ss;
 
+    dout(10) << "dispatching command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
+
     // Validate that the module is enabled
     auto& py_handler_name = py_command.module_name;
     PyModuleRef module = py_modules.get_module(py_handler_name);
@@ -2281,7 +2456,7 @@ bool DaemonServer::_handle_command(
     }
 
     std::stringstream ds;
-    bufferlist inbl = cmdctx->m->get_data();
+    bufferlist inbl = cmdctx->data;
     int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
                                       inbl, &ds, &ss);
     if (r == -EACCES) {
@@ -2290,6 +2465,7 @@ bool DaemonServer::_handle_command(
 
     cmdctx->odata.append(ds);
     cmdctx->reply(r, ss);
+    dout(10) << " command returned " << r << dendl;
   }));
   return true;
 }
@@ -2302,12 +2478,21 @@ void DaemonServer::_prune_pending_service_map()
   while (p != pending_service_map.services.end()) {
     auto q = p->second.daemons.begin();
     while (q != p->second.daemons.end()) {
-      DaemonKey key(p->first, q->first);
+      DaemonKey key{p->first, q->first};
       if (!daemon_state.exists(key)) {
-       derr << "missing key " << key << dendl;
-       ++q;
-       continue;
+        if (ServiceMap::is_normal_ceph_entity(p->first)) {
+          dout(10) << "daemon " << key << " in service map but not in daemon state "
+                   << "index -- force pruning" << dendl;
+          q = p->second.daemons.erase(q);
+          pending_service_map_dirty = pending_service_map.epoch;
+        } else {
+          derr << "missing key " << key << dendl;
+          ++q;
+        }
+
+        continue;
       }
+
       auto daemon = daemon_state.get(key);
       std::lock_guard l(daemon->lock);
       if (daemon->last_service_beacon == utime_t()) {
@@ -2349,7 +2534,8 @@ void DaemonServer::send_report()
     }
   }
 
-  auto m = new MMonMgrReport();
+  auto m = ceph::make_message<MMonMgrReport>();
+  m->gid = monc->get_global_id();
   py_modules.get_health_checks(&m->health_checks);
   py_modules.get_progress_events(&m->progress_events);
 
@@ -2383,7 +2569,7 @@ void DaemonServer::send_report()
          jf.dump_object("health_checks", m->health_checks);
          jf.flush(*_dout);
          *_dout << dendl;
-          if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+          if (osdmap.require_osd_release >= ceph_release_t::luminous) {
               clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
           }
        });
@@ -2399,7 +2585,7 @@ void DaemonServer::send_report()
         if (acc == accumulated.end()) {
           auto collector = DaemonHealthMetricCollector::create(metric.get_type());
           if (!collector) {
-            derr << __func__ << " " << key.first << "." << key.second
+            derr << __func__ << " " << key
                 << " sent me an unknown health metric: "
                 << std::hex << static_cast<uint8_t>(metric.get_type())
                 << std::dec << dendl;
@@ -2420,7 +2606,7 @@ void DaemonServer::send_report()
   // TODO? We currently do not notify the PyModules
   // TODO: respect needs_send, so we send the report only if we are asked to do
   //       so, or the state is updated.
-  monc->send_mon_message(m);
+  monc->send_mon_message(std::move(m));
 }
 
 void DaemonServer::adjust_pgs()
@@ -2502,6 +2688,7 @@ void DaemonServer::adjust_pgs()
                       << dendl;
              ok = false;
            }
+           vector<int32_t> source_acting;
             for (auto &merge_participant : {merge_source, merge_target}) {
               bool is_merge_source = merge_participant == merge_source;
               if (osdmap.have_pg_upmaps(merge_participant)) {
@@ -2534,6 +2721,19 @@ void DaemonServer::adjust_pgs()
                         << ")" << dendl;
                ok = false;
              }
+             if (is_merge_source) {
+               source_acting = q->second.acting;
+             } else if (ok && q->second.acting != source_acting) {
+               dout(10) << "pool " << i.first
+                        << " pg_num_target " << p.get_pg_num_target()
+                        << " pg_num " << p.get_pg_num()
+                        << (is_merge_source ? " - merge source " : " - merge target ")
+                         << merge_participant
+                        << " acting does not match (source " << source_acting
+                        << " != target " << q->second.acting
+                        << ")" << dendl;
+               ok = false;
+             }
             }
 
            if (ok) {
@@ -2546,6 +2746,7 @@ void DaemonServer::adjust_pgs()
                       << " and " << merge_target
                       << ")" << dendl;
              pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
+              continue;
            }
          } else if (p.get_pg_num_target() > p.get_pg_num()) {
            // pg_num increase (split)
@@ -2624,12 +2825,13 @@ void DaemonServer::adjust_pgs()
            // max_misplaced, to somewhat limit the magnitude of
            // our potential error here.
            int next;
-
+           static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP = 1;
            pool_stat_t s = pg_map.get_pg_pool_sum_stat(i.first);
            if (aggro ||
                // pool is (virtually) empty; just jump to final pgp_num?
                (p.get_pgp_num_target() > p.get_pgp_num() &&
-                s.stats.sum.num_objects <= p.get_pgp_num_target())) {
+                s.stats.sum.num_objects <= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP *
+                                            p.get_pgp_num_target()))) {
              next = target;
            } else {
              double room =
@@ -2637,15 +2839,16 @@ void DaemonServer::adjust_pgs()
                                 max_misplaced / 2.0);
              unsigned estmax = std::max<unsigned>(
                (double)p.get_pg_num() * room, 1u);
-             int delta = target - p.get_pgp_num();
-             next = p.get_pgp_num();
-             if (delta < 0) {
-               next += std::max<int>(-estmax, delta);
-             } else {
-               next += std::min<int>(estmax, delta);
+             unsigned next_min = 0;
+             if (p.get_pgp_num() > estmax) {
+               next_min = p.get_pgp_num() - estmax;
              }
+             next = std::clamp(target,
+                               next_min,
+                               p.get_pgp_num() + estmax);
              dout(20) << " room " << room << " estmax " << estmax
-                      << " delta " << delta << " next " << next << dendl;
+                      << " delta " << (target-p.get_pgp_num())
+                      << " next " << next << dendl;
              if (p.get_pgp_num_target() == p.get_pg_num_target() &&
                  p.get_pgp_num_target() < p.get_pg_num()) {
                // since pgp_num is tracking pg_num, ceph is handling
@@ -2721,23 +2924,28 @@ void DaemonServer::got_service_map()
        // we just started up
        dout(10) << "got initial map e" << service_map.epoch << dendl;
        pending_service_map = service_map;
+       pending_service_map.epoch = service_map.epoch + 1;
       } else {
        // we we already active and therefore must have persisted it,
        // which means ours is the same or newer.
        dout(10) << "got updated map e" << service_map.epoch << dendl;
+       ceph_assert(pending_service_map.epoch > service_map.epoch);
       }
-      pending_service_map.epoch = service_map.epoch + 1;
     });
 
   // cull missing daemons, populate new ones
   std::set<std::string> types;
-  for (auto& p : pending_service_map.services) {
-    types.insert(p.first);
+  for (auto& [type, service] : pending_service_map.services) {
+    if (ServiceMap::is_normal_ceph_entity(type)) {
+      continue;
+    }
+
+    types.insert(type);
 
     std::set<std::string> names;
-    for (auto& q : p.second.daemons) {
+    for (auto& q : service.daemons) {
       names.insert(q.first);
-      DaemonKey key(p.first, q.first);
+      DaemonKey key{type, q.first};
       if (!daemon_state.exists(key)) {
        auto daemon = std::make_shared<DaemonState>(daemon_state.types);
        daemon->key = key;
@@ -2747,7 +2955,7 @@ void DaemonServer::got_service_map()
        dout(10) << "added missing " << key << dendl;
       }
     }
-    daemon_state.cull(p.first, names);
+    daemon_state.cull(type, names);
   }
   daemon_state.cull_services(types);
 }
@@ -2762,11 +2970,11 @@ void DaemonServer::got_mgr_map()
         auto c = new MetadataUpdate(daemon_state, key);
        // FIXME remove post-nautilus: include 'id' for luminous mons
         oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
-           << key.second << "\", \"id\": \"" << key.second << "\"}";
+           << key.name << "\", \"id\": \"" << key.name << "\"}";
         monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
       };
       if (mgrmap.active_name.size()) {
-       DaemonKey key("mgr", mgrmap.active_name);
+       DaemonKey key{"mgr", mgrmap.active_name};
        have.insert(mgrmap.active_name);
        if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
          md_update(key);
@@ -2774,7 +2982,7 @@ void DaemonServer::got_mgr_map()
        }
       }
       for (auto& i : mgrmap.standbys) {
-        DaemonKey key("mgr", i.second.name);
+        DaemonKey key{"mgr", i.second.name};
        have.insert(i.second.name);
        if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
          md_update(key);
@@ -2805,7 +3013,7 @@ void DaemonServer::handle_conf_change(const ConfigProxy& conf,
             << daemon_connections.size() << " clients" << dendl;
     // Send a fresh MMgrConfigure to all clients, so that they can follow
     // the new policy for transmitting stats
-    finisher.queue(new FunctionContext([this](int r) {
+    finisher.queue(new LambdaContext([this](int r) {
       std::lock_guard l(lock);
       for (auto &c : daemon_connections) {
         _send_configure(c);
@@ -2816,35 +3024,53 @@ void DaemonServer::handle_conf_change(const ConfigProxy& conf,
 
 void DaemonServer::_send_configure(ConnectionRef c)
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
-  auto configure = new MMgrConfigure();
+  auto configure = make_message<MMgrConfigure>();
   configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
   configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
 
   if (c->peer_is_osd()) {
     configure->osd_perf_metric_queries =
         osd_perf_metric_collector.get_queries();
+  } else if (c->peer_is_mds()) {
+    configure->metric_config_message =
+      MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector.get_queries()));
   }
 
-  c->send_message(configure);
+  c->send_message2(configure);
 }
 
-OSDPerfMetricQueryID DaemonServer::add_osd_perf_query(
+MetricQueryID DaemonServer::add_osd_perf_query(
     const OSDPerfMetricQuery &query,
     const std::optional<OSDPerfMetricLimit> &limit)
 {
   return osd_perf_metric_collector.add_query(query, limit);
 }
 
-int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
+int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
 {
   return osd_perf_metric_collector.remove_query(query_id);
 }
 
-int DaemonServer::get_osd_perf_counters(
-    OSDPerfMetricQueryID query_id,
-    std::map<OSDPerfMetricKey, PerformanceCounters> *counters)
+int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
+{
+  return osd_perf_metric_collector.get_counters(collector);
+}
+
+MetricQueryID DaemonServer::add_mds_perf_query(
+    const MDSPerfMetricQuery &query,
+    const std::optional<MDSPerfMetricLimit> &limit)
+{
+  return mds_perf_metric_collector.add_query(query, limit);
+}
+
+int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
+{
+  return mds_perf_metric_collector.remove_query(query_id);
+}
+
+int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
 {
-  return osd_perf_metric_collector.get_counters(query_id, counters);
+  return mds_perf_metric_collector.get_counters(collector);
 }