]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/MDSDaemon.cc
import ceph 15.2.14
[ceph.git] / ceph / src / mds / MDSDaemon.cc
index 452d5bbf6602be0125b1b629a084790380251b82..f875715009170c37b793e3355d0b6bd6b33e3f34 100644 (file)
@@ -21,7 +21,6 @@
 #include "common/Clock.h"
 #include "common/HeartbeatMap.h"
 #include "common/Timer.h"
-#include "common/backport14.h"
 #include "common/ceph_argparse.h"
 #include "common/config.h"
 #include "common/entity_name.h"
 #include "events/ESession.h"
 #include "events/ESubtreeMap.h"
 
-#include "messages/MMDSMap.h"
-
-#include "messages/MGenericMessage.h"
-
-#include "messages/MMonCommand.h"
-#include "messages/MCommand.h"
-#include "messages/MCommandReply.h"
-
 #include "auth/AuthAuthorizeHandler.h"
 #include "auth/RotatingKeyRing.h"
 #include "auth/KeyRing.h"
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
 #define dout_prefix *_dout << "mds." << name << ' '
-
+using TOPNSPC::common::cmd_getval;
 // cons/des
-MDSDaemon::MDSDaemon(boost::string_view n, Messenger *m, MonClient *mc) :
+MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc) :
   Dispatcher(m->cct),
-  mds_lock("MDSDaemon::mds_lock"),
-  stopping(false),
   timer(m->cct, mds_lock),
+  gss_ktfile_client(m->cct->_conf.get_val<std::string>("gss_ktab_client_file")),
   beacon(m->cct, mc, n),
-  authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(m->cct,
-                                                                     m->cct->_conf->auth_supported.empty() ?
-                                                                     m->cct->_conf->auth_cluster_required :
-                                                                     m->cct->_conf->auth_supported)),
-  authorize_handler_service_registry(new AuthAuthorizeHandlerRegistry(m->cct,
-                                                                     m->cct->_conf->auth_supported.empty() ?
-                                                                     m->cct->_conf->auth_service_required :
-                                                                     m->cct->_conf->auth_supported)),
   name(n),
   messenger(m),
   monc(mc),
-  mgrc(m->cct, m),
+  mgrc(m->cct, m, &mc->monmap),
   log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
-  mds_rank(NULL),
-  asok_hook(NULL),
   starttime(mono_clock::now())
 {
   orig_argc = 0;
   orig_argv = NULL;
 
   clog = log_client.create_channel();
+  if (!gss_ktfile_client.empty()) {
+    // Assert we can export environment variable 
+    /* 
+        The default client keytab is used, if it is present and readable,
+        to automatically obtain initial credentials for GSSAPI client
+        applications. The principal name of the first entry in the client
+        keytab is used by default when obtaining initial credentials.
+        1. The KRB5_CLIENT_KTNAME environment variable.
+        2. The default_client_keytab_name profile variable in [libdefaults].
+        3. The hardcoded default, DEFCKTNAME.
+    */
+    const int32_t set_result(setenv("KRB5_CLIENT_KTNAME", 
+                                    gss_ktfile_client.c_str(), 1));
+    ceph_assert(set_result == 0);
+  }
 
-  monc->set_messenger(messenger);
-
-  mdsmap = new MDSMap;
+  mdsmap.reset(new MDSMap);
 }
 
 MDSDaemon::~MDSDaemon() {
-  Mutex::Locker lock(mds_lock);
+  std::lock_guard lock(mds_lock);
 
   delete mds_rank;
   mds_rank = NULL;
-  delete mdsmap;
-  mdsmap = NULL;
-
-  delete authorize_handler_service_registry;
-  delete authorize_handler_cluster_registry;
 }
 
 class MDSSocketHook : public AdminSocketHook {
   MDSDaemon *mds;
 public:
   explicit MDSSocketHook(MDSDaemon *m) : mds(m) {}
-  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
-           bufferlist& out) override {
-    stringstream ss;
-    bool r = mds->asok_command(command, cmdmap, format, ss);
-    out.append(ss);
-    return r;
+  int call(
+    std::string_view command,
+    const cmdmap_t& cmdmap,
+    Formatter *f,
+    std::ostream& errss,
+    ceph::buffer::list& out) override {
+    ceph_abort("shoudl go to call_async");
+  }
+  void call_async(
+    std::string_view command,
+    const cmdmap_t& cmdmap,
+    Formatter *f,
+    const bufferlist& inbl,
+    std::function<void(int,const std::string&,bufferlist&)> on_finish) override {
+    mds->asok_command(command, cmdmap, f, inbl, on_finish);
   }
 };
 
-bool MDSDaemon::asok_command(string command, cmdmap_t& cmdmap, string format,
-                   ostream& ss)
+void MDSDaemon::asok_command(
+  std::string_view command,
+  const cmdmap_t& cmdmap,
+  Formatter *f,
+  const bufferlist& inbl,
+  std::function<void(int,const std::string&,bufferlist&)> on_finish)
 {
-  dout(1) << "asok_command: " << command << " (starting...)" << dendl;
+  dout(1) << "asok_command: " << command << " " << cmdmap
+         << " (starting...)" << dendl;
 
-  Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
-  bool handled = false;
+  int r = -ENOSYS;
+  bufferlist outbl;
+  stringstream ss;
   if (command == "status") {
     dump_status(f);
-    handled = true;
+    r = 0;
+  } else if (command == "exit") {
+    outbl.append("Exiting...\n");
+    r = 0;
+    std::thread t([this](){
+                   // Wait a little to improve chances of caller getting
+                   // our response before seeing us disappear from mdsmap
+                   sleep(1);
+                   std::lock_guard l(mds_lock);
+                   suicide();
+                 });
+    t.detach();
+  } else if (command == "respawn") {
+    outbl.append("Respawning...\n");
+    r = 0;
+    std::thread t([this](){
+                   // Wait a little to improve chances of caller getting
+                   // our response before seeing us disappear from mdsmap
+                   sleep(1);
+                   std::lock_guard l(mds_lock);
+                   respawn();
+                 });
+    t.detach();
+  } else if (command == "heap") {
+    if (!ceph_using_tcmalloc()) {
+      ss << "not using tcmalloc";
+      r = -EOPNOTSUPP;
+    } else {
+      string heapcmd;
+      cmd_getval(cmdmap, "heapcmd", heapcmd);
+      vector<string> heapcmd_vec;
+      get_str_vec(heapcmd, heapcmd_vec);
+      string value;
+      if (cmd_getval(cmdmap, "value", value)) {
+       heapcmd_vec.push_back(value);
+      }
+      ceph_heap_profiler_handle_command(heapcmd_vec, ss);
+    }
+  } else if (command == "cpu_profiler") {
+    string arg;
+    cmd_getval(cmdmap, "arg", arg);
+    vector<string> argvec;
+    get_str_vec(arg, argvec);
+    cpu_profiler_handle_command(argvec, ss);
+    r = 0;
   } else {
     if (mds_rank == NULL) {
       dout(1) << "Can't run that command on an inactive MDS!" << dendl;
       f->dump_string("error", "mds_not_active");
     } else {
-      handled = mds_rank->handle_asok_command(command, cmdmap, f, ss);
+      try {
+       mds_rank->handle_asok_command(command, cmdmap, f, inbl, on_finish);
+       return;
+      } catch (const TOPNSPC::common::bad_cmd_get& e) {
+       ss << e.what();
+       r = -EINVAL;
+      }
     }
   }
-  f->flush(ss);
-  delete f;
-
-  dout(1) << "asok_command: " << command << " (complete)" << dendl;
-
-  return handled;
+  on_finish(r, ss.str(), outbl);
 }
 
 void MDSDaemon::dump_status(Formatter *f)
@@ -169,7 +217,7 @@ void MDSDaemon::dump_status(Formatter *f)
   f->dump_string("state", ceph_mds_state_name(mdsmap->get_state_gid(mds_gid_t(
            monc->get_global_id()))));
   if (mds_rank) {
-    Mutex::Locker l(mds_lock);
+    std::lock_guard l(mds_lock);
     mds_rank->dump_status(f);
   }
 
@@ -191,247 +239,225 @@ void MDSDaemon::set_up_admin_socket()
 {
   int r;
   AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
-  assert(asok_hook == nullptr);
+  ceph_assert(asok_hook == nullptr);
   asok_hook = new MDSSocketHook(this);
-  r = admin_socket->register_command("status", "status", asok_hook,
+  r = admin_socket->register_command("status", asok_hook,
                                     "high-level status of MDS");
-  assert(r == 0);
-  r = admin_socket->register_command("dump_ops_in_flight",
-                                    "dump_ops_in_flight", asok_hook,
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump_ops_in_flight", asok_hook,
                                     "show the ops currently in flight");
-  assert(r == 0);
-  r = admin_socket->register_command("ops",
-                                    "ops", asok_hook,
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("ops", asok_hook,
                                     "show the ops currently in flight");
-  assert(r == 0);
-  r = admin_socket->register_command("dump_blocked_ops", "dump_blocked_ops",
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump_blocked_ops",
       asok_hook,
       "show the blocked ops currently in flight");
-  assert(r == 0);
-  r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump_historic_ops",
                                     asok_hook,
-                                    "show slowest recent ops");
-  assert(r == 0);
-  r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
+                                    "show recent ops");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump_historic_ops_by_duration",
                                     asok_hook,
-                                    "show slowest recent ops, sorted by op duration");
-  assert(r == 0);
-  r = admin_socket->register_command("scrub_path",
-                                    "scrub_path name=path,type=CephString "
+                                    "show recent ops, sorted by op duration");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("scrub_path name=path,type=CephString "
                                     "name=scrubops,type=CephChoices,"
-                                    "strings=force|recursive|repair,n=N,req=false",
+                                    "strings=force|recursive|repair,n=N,req=false "
+                                    "name=tag,type=CephString,req=false",
                                      asok_hook,
                                      "scrub an inode and output results");
-  assert(r == 0);
-  r = admin_socket->register_command("tag path",
-                                     "tag path name=path,type=CephString"
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("scrub start "
+                                    "name=path,type=CephString "
+                                    "name=scrubops,type=CephChoices,strings=force|recursive|repair,n=N,req=false "
+                                    "name=tag,type=CephString,req=false",
+                                    asok_hook,
+                                    "scrub and inode and output results");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("scrub abort",
+                                     asok_hook,
+                                     "Abort in progress scrub operations(s)");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("scrub pause",
+                                     asok_hook,
+                                     "Pause in progress scrub operations(s)");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("scrub resume",
+                                     asok_hook,
+                                     "Resume paused scrub operations(s)");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("scrub status",
+                                     asok_hook,
+                                     "Status of scrub operations(s)");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("tag path name=path,type=CephString"
                                      " name=tag,type=CephString",
                                      asok_hook,
                                      "Apply scrub tag recursively");
-   assert(r == 0);
-  r = admin_socket->register_command("flush_path",
-                                     "flush_path name=path,type=CephString",
+   ceph_assert(r == 0);
+  r = admin_socket->register_command("flush_path name=path,type=CephString",
                                      asok_hook,
                                      "flush an inode (and its dirfrags)");
-  assert(r == 0);
-  r = admin_socket->register_command("export dir",
-                                     "export dir "
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("export dir "
                                      "name=path,type=CephString "
                                      "name=rank,type=CephInt",
                                      asok_hook,
                                      "migrate a subtree to named MDS");
-  assert(r == 0);
-  r = admin_socket->register_command("dump cache",
-                                     "dump cache name=path,type=CephString,req=false",
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump cache name=path,type=CephString,req=false",
                                      asok_hook,
                                      "dump metadata cache (optionally to a file)");
-  assert(r == 0);
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("cache drop "
+                                    "name=timeout,type=CephInt,range=0,req=false",
+                                    asok_hook,
+                                    "trim cache and optionally request client to release all caps and flush the journal");
+  ceph_assert(r == 0);
   r = admin_socket->register_command("cache status",
-                                     "cache status",
                                      asok_hook,
                                      "show cache status");
-  assert(r == 0);
-  r = admin_socket->register_command("dump tree",
-                                    "dump tree "
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump tree "
                                     "name=root,type=CephString,req=true "
                                     "name=depth,type=CephInt,req=false ",
                                     asok_hook,
                                     "dump metadata cache for subtree");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("dump loads",
-                                     "dump loads",
                                      asok_hook,
                                      "dump metadata loads");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump snaps name=server,type=CephChoices,strings=--server,req=false",
+                                     asok_hook,
+                                     "dump snapshots");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("session ls "
+                                    "name=cap_dump,type=CephBool,req=false "
+                                    "name=filters,type=CephString,n=N,req=false ",
+                                    asok_hook,
+                                    "List client sessions based on a filter");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("client ls "
+                                    "name=cap_dump,type=CephBool,req=false "
+                                    "name=filters,type=CephString,n=N,req=false ",
+                                    asok_hook,
+                                    "List client sessions based on a filter");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("session evict name=filters,type=CephString,n=N,req=false",
+                                    asok_hook,
+                                    "Evict client session(s) based on a filter");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("client evict name=filters,type=CephString,n=N,req=false",
+                                    asok_hook,
+                                    "Evict client session(s) based on a filter");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("session kill name=client_id,type=CephString",
+                                    asok_hook,
+                                    "Evict a client session by id");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("session ls name=cap_dump,type=CephBool,req=false",
+                                    asok_hook,
+                                    "Enumerate connected CephFS clients");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("session config "
+                                    "name=client_id,type=CephInt,req=true "
+                                    "name=option,type=CephString,req=true "
+                                    "name=value,type=CephString,req=false ",
+                                    asok_hook,
+                                    "Config a CephFS client session");
   assert(r == 0);
-  r = admin_socket->register_command("session evict",
-                                    "session evict name=client_id,type=CephString",
+  r = admin_socket->register_command("client config "
+                                    "name=client_id,type=CephInt,req=true "
+                                    "name=option,type=CephString,req=true "
+                                    "name=value,type=CephString,req=false ",
                                     asok_hook,
-                                    "Evict a CephFS client");
+                                    "Config a CephFS client session");
   assert(r == 0);
-  r = admin_socket->register_command("osdmap barrier",
-                                    "osdmap barrier name=target_epoch,type=CephInt",
+  r = admin_socket->register_command("damage ls",
                                     asok_hook,
-                                    "Wait until the MDS has this OSD map epoch");
+                                    "List detected metadata damage");
   assert(r == 0);
-  r = admin_socket->register_command("session ls",
-                                    "session ls",
+  r = admin_socket->register_command("damage rm "
+                                    "name=damage_id,type=CephInt",
                                     asok_hook,
-                                    "Enumerate connected CephFS clients");
+                                    "Remove a damage table entry");
   assert(r == 0);
+  r = admin_socket->register_command("osdmap barrier name=target_epoch,type=CephInt",
+                                    asok_hook,
+                                    "Wait until the MDS has this OSD map epoch");
+  ceph_assert(r == 0);
   r = admin_socket->register_command("flush journal",
-                                    "flush journal",
                                     asok_hook,
                                     "Flush the journal to the backing store");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("force_readonly",
-                                    "force_readonly",
                                     asok_hook,
                                     "Force MDS to read-only mode");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("get subtrees",
-                                    "get subtrees",
                                     asok_hook,
                                     "Return the subtree map");
-  assert(r == 0);
-  r = admin_socket->register_command("dirfrag split",
-                                    "dirfrag split "
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dirfrag split "
                                      "name=path,type=CephString,req=true "
                                      "name=frag,type=CephString,req=true "
                                      "name=bits,type=CephInt,req=true ",
                                     asok_hook,
                                     "Fragment directory by path");
-  assert(r == 0);
-  r = admin_socket->register_command("dirfrag merge",
-                                    "dirfrag merge "
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dirfrag merge "
                                      "name=path,type=CephString,req=true "
                                      "name=frag,type=CephString,req=true",
                                     asok_hook,
                                     "De-fragment directory by path");
-  assert(r == 0);
-  r = admin_socket->register_command("dirfrag ls",
-                                    "dirfrag ls "
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dirfrag ls "
                                      "name=path,type=CephString,req=true",
                                     asok_hook,
                                     "List fragments in directory");
-  assert(r == 0);
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("openfiles ls",
+                                     asok_hook,
+                                     "List the opening files and their caps");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump inode "
+                                     "name=number,type=CephInt,req=true",
+                                    asok_hook,
+                                    "dump inode by inode number");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("exit",
+                                    asok_hook,
+                                    "Terminate this MDS");
+  r = admin_socket->register_command("respawn",
+                                    asok_hook,
+                                    "Respawn this MDS");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "heap " \
+    "name=heapcmd,type=CephChoices,strings="                           \
+    "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \
+    "name=value,type=CephString,req=false",
+    asok_hook,
+    "show heap usage info (available only if compiled with tcmalloc)");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "cpu_profiler " \
+    "name=arg,type=CephChoices,strings=status|flush",
+    asok_hook,
+    "run cpu profiling on daemon");
+  ceph_assert(r == 0);
 }
 
 void MDSDaemon::clean_up_admin_socket()
 {
-  AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
-  admin_socket->unregister_command("status");
-  admin_socket->unregister_command("dump_ops_in_flight");
-  admin_socket->unregister_command("ops");
-  admin_socket->unregister_command("dump_blocked_ops");
-  admin_socket->unregister_command("dump_historic_ops");
-  admin_socket->unregister_command("dump_historic_ops_by_duration");
-  admin_socket->unregister_command("scrub_path");
-  admin_socket->unregister_command("tag path");
-  admin_socket->unregister_command("flush_path");
-  admin_socket->unregister_command("export dir");
-  admin_socket->unregister_command("dump cache");
-  admin_socket->unregister_command("cache status");
-  admin_socket->unregister_command("dump tree");
-  admin_socket->unregister_command("dump loads");
-  admin_socket->unregister_command("session evict");
-  admin_socket->unregister_command("osdmap barrier");
-  admin_socket->unregister_command("session ls");
-  admin_socket->unregister_command("flush journal");
-  admin_socket->unregister_command("force_readonly");
-  admin_socket->unregister_command("get subtrees");
-  admin_socket->unregister_command("dirfrag split");
-  admin_socket->unregister_command("dirfrag merge");
-  admin_socket->unregister_command("dirfrag ls");
+  g_ceph_context->get_admin_socket()->unregister_commands(asok_hook);
   delete asok_hook;
   asok_hook = NULL;
 }
 
-const char** MDSDaemon::get_tracked_conf_keys() const
-{
-  static const char* KEYS[] = {
-    "mds_op_complaint_time", "mds_op_log_threshold",
-    "mds_op_history_size", "mds_op_history_duration",
-    "mds_enable_op_tracker",
-    "mds_log_pause",
-    // clog & admin clog
-    "clog_to_monitors",
-    "clog_to_syslog",
-    "clog_to_syslog_facility",
-    "clog_to_syslog_level",
-    // PurgeQueue
-    "mds_max_purge_ops",
-    "mds_max_purge_ops_per_pg",
-    "mds_max_purge_files",
-    "mds_inject_migrator_session_race",
-    "clog_to_graylog",
-    "clog_to_graylog_host",
-    "clog_to_graylog_port",
-    "host",
-    "fsid",
-    NULL
-  };
-  return KEYS;
-}
-
-void MDSDaemon::handle_conf_change(const struct md_config_t *conf,
-                            const std::set <std::string> &changed)
-{
-  // We may be called within mds_lock (via `tell`) or outwith the
-  // lock (via admin socket `config set`), so handle either case.
-  const bool initially_locked = mds_lock.is_locked_by_me();
-  if (!initially_locked) {
-    mds_lock.Lock();
-  }
-
-  if (changed.count("mds_op_complaint_time") ||
-      changed.count("mds_op_log_threshold")) {
-    if (mds_rank) {
-      mds_rank->op_tracker.set_complaint_and_threshold(conf->mds_op_complaint_time,
-                                             conf->mds_op_log_threshold);
-    }
-  }
-  if (changed.count("mds_op_history_size") ||
-      changed.count("mds_op_history_duration")) {
-    if (mds_rank) {
-      mds_rank->op_tracker.set_history_size_and_duration(conf->mds_op_history_size,
-                                               conf->mds_op_history_duration);
-    }
-  }
-  if (changed.count("mds_enable_op_tracker")) {
-    if (mds_rank) {
-      mds_rank->op_tracker.set_tracking(conf->mds_enable_op_tracker);
-    }
-  }
-  if (changed.count("clog_to_monitors") ||
-      changed.count("clog_to_syslog") ||
-      changed.count("clog_to_syslog_level") ||
-      changed.count("clog_to_syslog_facility") ||
-      changed.count("clog_to_graylog") ||
-      changed.count("clog_to_graylog_host") ||
-      changed.count("clog_to_graylog_port") ||
-      changed.count("host") ||
-      changed.count("fsid")) {
-    if (mds_rank) {
-      mds_rank->update_log_config();
-    }
-  }
-
-  if (!g_conf->mds_log_pause && changed.count("mds_log_pause")) {
-    if (mds_rank) {
-      mds_rank->mdlog->kick_submitter();
-    }
-  }
-
-  if (mds_rank) {
-    mds_rank->handle_conf_change(conf, changed);
-  }
-
-  if (!initially_locked) {
-    mds_lock.Unlock();
-  }
-}
-
-
 int MDSDaemon::init()
 {
   dout(10) << sizeof(MDSCacheObject) << "\tMDSCacheObject" << dendl;
@@ -457,7 +483,7 @@ int MDSDaemon::init()
   messenger->add_dispatcher_tail(&beacon);
   messenger->add_dispatcher_tail(this);
 
-  // get monmap
+  // init monc
   monc->set_messenger(messenger);
 
   monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD |
@@ -465,76 +491,73 @@ int MDSDaemon::init()
   int r = 0;
   r = monc->init();
   if (r < 0) {
-    derr << "ERROR: failed to get monmap: " << cpp_strerror(-r) << dendl;
-    mds_lock.Lock();
+    derr << "ERROR: failed to init monc: " << cpp_strerror(-r) << dendl;
+    mds_lock.lock();
     suicide();
-    mds_lock.Unlock();
+    mds_lock.unlock();
     return r;
   }
 
+  messenger->set_auth_client(monc);
+  messenger->set_auth_server(monc);
+  monc->set_handle_authentication_dispatcher(this);
+
   // tell monc about log_client so it will know about mon session resets
   monc->set_log_client(&log_client);
 
   r = monc->authenticate();
   if (r < 0) {
     derr << "ERROR: failed to authenticate: " << cpp_strerror(-r) << dendl;
-    mds_lock.Lock();
+    mds_lock.lock();
     suicide();
-    mds_lock.Unlock();
+    mds_lock.unlock();
     return r;
   }
 
   int rotating_auth_attempts = 0;
-  while (monc->wait_auth_rotating(30.0) < 0) {
-    if (++rotating_auth_attempts <= g_conf->max_rotating_auth_attempts) {
+  auto rotating_auth_timeout =
+    g_conf().get_val<int64_t>("rotating_keys_bootstrap_timeout");
+  while (monc->wait_auth_rotating(rotating_auth_timeout) < 0) {
+    if (++rotating_auth_attempts <= g_conf()->max_rotating_auth_attempts) {
       derr << "unable to obtain rotating service keys; retrying" << dendl;
       continue;
     }
     derr << "ERROR: failed to refresh rotating keys, "
          << "maximum retry time reached." << dendl;
-    mds_lock.Lock();
+    std::lock_guard locker{mds_lock};
     suicide();
-    mds_lock.Unlock();
     return -ETIMEDOUT;
   }
 
-  mgrc.init();
-  messenger->add_dispatcher_head(&mgrc);
-
-  mds_lock.Lock();
+  mds_lock.lock();
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
     dout(4) << __func__ << ": terminated already, dropping out" << dendl;
-    mds_lock.Unlock();
+    mds_lock.unlock();
     return 0;
   }
 
   monc->sub_want("mdsmap", 0, 0);
-  monc->sub_want("mgrmap", 0, 0);
   monc->renew_subs();
 
-  mds_lock.Unlock();
+  mds_lock.unlock();
 
   // Set up admin socket before taking mds_lock, so that ordering
   // is consistent (later we take mds_lock within asok callbacks)
   set_up_admin_socket();
-  g_conf->add_observer(this);
-  mds_lock.Lock();
+  std::lock_guard locker{mds_lock};
   if (beacon.get_want_state() == MDSMap::STATE_DNE) {
     suicide();  // we could do something more graceful here
     dout(4) << __func__ << ": terminated already, dropping out" << dendl;
-    mds_lock.Unlock();
     return 0; 
   }
 
   timer.init();
 
-  beacon.init(mdsmap);
+  beacon.init(*mdsmap);
   messenger->set_myname(entity_name_t::MDS(MDS_RANK_NONE));
 
   // schedule tick
   reset_tick();
-  mds_lock.Unlock();
-
   return 0;
 }
 
@@ -545,9 +568,9 @@ void MDSDaemon::reset_tick()
 
   // schedule
   tick_event = timer.add_event_after(
-    g_conf->mds_tick_interval,
-    new FunctionContext([this](int) {
-       assert(mds_lock.is_locked_by_me());
+    g_conf()->mds_tick_interval,
+    new LambdaContext([this](int) {
+       ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
        tick();
       }));
 }
@@ -563,12 +586,18 @@ void MDSDaemon::tick()
   }
 }
 
-void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
-                                  int r, bufferlist outbl,
-                                  boost::string_view outs)
+void MDSDaemon::handle_command(const cref_t<MCommand> &m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  assert(session != NULL);
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<Session *>(priv.get());
+  ceph_assert(session != NULL);
+
+  int r = 0;
+  cmdmap_t cmdmap;
+  std::stringstream ss;
+  std::string outs;
+  bufferlist outbl;
+
   // If someone is using a closed session for sending commands (e.g.
   // the ceph CLI) then we should feel free to clean up this connection
   // as soon as we've sent them a response.
@@ -580,318 +609,61 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
   if (!live_session) {
     // This session only existed to issue commands, so terminate it
     // as soon as we can.
-    assert(session->is_closed());
-    session->connection->mark_disposable();
+    ceph_assert(session->is_closed());
+    session->get_connection()->mark_disposable();
   }
-  session->put();
-
-  MCommandReply *reply = new MCommandReply(r, outs);
-  reply->set_tid(m->get_tid());
-  reply->set_data(outbl);
-  m->get_connection()->send_message(reply);
-}
-
-/* This function DOES put the passed message before returning*/
-void MDSDaemon::handle_command(MCommand *m)
-{
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  assert(session != NULL);
-
-  int r = 0;
-  cmdmap_t cmdmap;
-  std::stringstream ss;
-  std::string outs;
-  bufferlist outbl;
-  Context *run_after = NULL;
-  bool need_reply = true;
+  priv.reset();
 
   if (!session->auth_caps.allow_all()) {
     dout(1) << __func__
       << ": received command from client without `tell` capability: "
-      << m->get_connection()->peer_addr << dendl;
+      << *m->get_connection()->peer_addrs << dendl;
 
     ss << "permission denied";
-    r = -EPERM;
+    r = -EACCES;
   } else if (m->cmd.empty()) {
     r = -EINVAL;
     ss << "no command given";
     outs = ss.str();
-  } else if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
+  } else if (!TOPNSPC::common::cmdmap_from_json(m->cmd, &cmdmap, ss)) {
     r = -EINVAL;
     outs = ss.str();
   } else {
-    r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply);
-  }
-  session->put();
-
-  if (need_reply) {
-    send_command_reply(m, mds_rank, r, outbl, outs);
-  }
-
-  if (run_after) {
-    run_after->complete(0);
-  }
-
-  m->put();
-}
-
-
-struct MDSCommand {
-  string cmdstring;
-  string helpstring;
-  string module;
-  string perm;
-  string availability;
-} mds_commands[] = {
-
-#define COMMAND(parsesig, helptext, module, perm, availability) \
-  {parsesig, helptext, module, perm, availability},
-
-COMMAND("injectargs " \
-       "name=injected_args,type=CephString,n=N",
-       "inject configuration arguments into running MDS",
-       "mds", "*", "cli,rest")
-COMMAND("config set " \
-       "name=key,type=CephString name=value,type=CephString",
-       "Set a configuration option at runtime (not persistent)",
-       "mds", "*", "cli,rest")
-COMMAND("exit",
-       "Terminate this MDS",
-       "mds", "*", "cli,rest")
-COMMAND("respawn",
-       "Restart this MDS",
-       "mds", "*", "cli,rest")
-COMMAND("session kill " \
-        "name=session_id,type=CephInt",
-       "End a client session",
-       "mds", "*", "cli,rest")
-COMMAND("cpu_profiler " \
-       "name=arg,type=CephChoices,strings=status|flush",
-       "run cpu profiling on daemon", "mds", "rw", "cli,rest")
-COMMAND("session ls " \
-       "name=filters,type=CephString,n=N,req=false",
-       "List client sessions", "mds", "r", "cli,rest")
-COMMAND("client ls " \
-       "name=filters,type=CephString,n=N,req=false",
-       "List client sessions", "mds", "r", "cli,rest")
-COMMAND("session evict " \
-       "name=filters,type=CephString,n=N,req=false",
-       "Evict client session(s)", "mds", "rw", "cli,rest")
-COMMAND("client evict " \
-       "name=filters,type=CephString,n=N,req=false",
-       "Evict client session(s)", "mds", "rw", "cli,rest")
-COMMAND("damage ls",
-       "List detected metadata damage", "mds", "r", "cli,rest")
-COMMAND("damage rm name=damage_id,type=CephInt",
-       "Remove a damage table entry", "mds", "rw", "cli,rest")
-COMMAND("version", "report version of MDS", "mds", "r", "cli,rest")
-COMMAND("heap " \
-       "name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", \
-       "show heap usage info (available only if compiled with tcmalloc)", \
-       "mds", "*", "cli,rest")
-};
-
-
-int MDSDaemon::_handle_command(
-    const cmdmap_t &cmdmap,
-    MCommand *m,
-    bufferlist *outbl,
-    std::string *outs,
-    Context **run_later,
-    bool *need_reply)
-{
-  assert(outbl != NULL);
-  assert(outs != NULL);
-
-  class SuicideLater : public Context
-  {
-    MDSDaemon *mds;
-
-    public:
-    explicit SuicideLater(MDSDaemon *mds_) : mds(mds_) {}
-    void finish(int r) override {
-      // Wait a little to improve chances of caller getting
-      // our response before seeing us disappear from mdsmap
-      sleep(1);
-
-      mds->suicide();
-    }
-  };
-
-
-  class RespawnLater : public Context
-  {
-    MDSDaemon *mds;
-
-    public:
-
-    explicit RespawnLater(MDSDaemon *mds_) : mds(mds_) {}
-    void finish(int r) override {
-      // Wait a little to improve chances of caller getting
-      // our response before seeing us disappear from mdsmap
-      sleep(1);
-
-      mds->respawn();
-    }
-  };
-
-  std::stringstream ds;
-  std::stringstream ss;
-  std::string prefix;
-  std::string format;
-  std::unique_ptr<Formatter> f(Formatter::create(format));
-  cmd_getval(cct, cmdmap, "prefix", prefix);
-
-  int r = 0;
-
-  if (prefix == "get_command_descriptions") {
-    int cmdnum = 0;
-    std::unique_ptr<JSONFormatter> f(ceph::make_unique<JSONFormatter>());
-    f->open_object_section("command_descriptions");
-    for (MDSCommand *cp = mds_commands;
-        cp < &mds_commands[ARRAY_SIZE(mds_commands)]; cp++) {
-
-      ostringstream secname;
-      secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
-      dump_cmddesc_to_json(f.get(), secname.str(), cp->cmdstring, cp->helpstring,
-                          cp->module, cp->perm, cp->availability, 0);
-      cmdnum++;
-    }
-    f->close_section();        // command_descriptions
-
-    f->flush(ds);
-    goto out; 
-  }
-
-  cmd_getval(cct, cmdmap, "format", format);
-  if (prefix == "version") {
-    if (f) {
-      f->open_object_section("version");
-      f->dump_string("version", pretty_version_to_str());
-      f->close_section();
-      f->flush(ds);
-    } else {
-      ds << pretty_version_to_str();
-    }
-  } else if (prefix == "injectargs") {
-    vector<string> argsvec;
-    cmd_getval(cct, cmdmap, "injected_args", argsvec);
-
-    if (argsvec.empty()) {
-      r = -EINVAL;
-      ss << "ignoring empty injectargs";
-      goto out;
-    }
-    string args = argsvec.front();
-    for (vector<string>::iterator a = ++argsvec.begin(); a != argsvec.end(); ++a)
-      args += " " + *a;
-    r = cct->_conf->injectargs(args, &ss);
-  } else if (prefix == "config set") {
-    std::string key;
-    cmd_getval(cct, cmdmap, "key", key);
-    std::string val;
-    cmd_getval(cct, cmdmap, "value", val);
-    r = cct->_conf->set_val(key, val, true, &ss);
-    if (r == 0) {
-      cct->_conf->apply_changes(nullptr);
-    }
-  } else if (prefix == "exit") {
-    // We will send response before executing
-    ss << "Exiting...";
-    *run_later = new SuicideLater(this);
-  } else if (prefix == "respawn") {
-    // We will send response before executing
-    ss << "Respawning...";
-    *run_later = new RespawnLater(this);
-  } else if (prefix == "session kill") {
-    if (mds_rank == NULL) {
-      r = -EINVAL;
-      ss << "MDS not active";
-      goto out;
-    }
-    // FIXME harmonize `session kill` with admin socket session evict
-    int64_t session_id = 0;
-    bool got = cmd_getval(cct, cmdmap, "session_id", session_id);
-    assert(got);
-    bool killed = mds_rank->evict_client(session_id, false,
-                                         g_conf->mds_session_blacklist_on_evict,
-                                         ss);
-    if (!killed)
-      r = -ENOENT;
-  } else if (prefix == "heap") {
-    if (!ceph_using_tcmalloc()) {
-      r = -EOPNOTSUPP;
-      ss << "could not issue heap profiler command -- not using tcmalloc!";
-    } else {
-      string heapcmd;
-      cmd_getval(cct, cmdmap, "heapcmd", heapcmd);
-      vector<string> heapcmd_vec;
-      get_str_vec(heapcmd, heapcmd_vec);
-      ceph_heap_profiler_handle_command(heapcmd_vec, ds);
-    }
-  } else if (prefix == "cpu_profiler") {
-    string arg;
-    cmd_getval(cct, cmdmap, "arg", arg);
-    vector<string> argvec;
-    get_str_vec(arg, argvec);
-    cpu_profiler_handle_command(argvec, ds);
-  } else {
-    // Give MDSRank a shot at the command
-    if (!mds_rank) {
-      ss << "MDS not active";
-      r = -EINVAL;
-    }
-    else {
-      bool handled = mds_rank->handle_command(cmdmap, m, &r, &ds, &ss,
-                                             need_reply);
-      if (!handled) {
-        // MDSDaemon doesn't know this command
-        ss << "unrecognized command! " << prefix;
-        r = -EINVAL;
-      }
-    }
+    cct->get_admin_socket()->queue_tell_command(m);
+    return;
   }
 
-out:
-  *outs = ss.str();
-  outbl->append(ds);
-  return r;
+  auto reply = make_message<MCommandReply>(r, outs);
+  reply->set_tid(m->get_tid());
+  reply->set_data(outbl);
+  m->get_connection()->send_message2(reply);
 }
 
-/* This function deletes the passed message before returning. */
-
-void MDSDaemon::handle_mds_map(MMDSMap *m)
+void MDSDaemon::handle_mds_map(const cref_t<MMDSMap> &m)
 {
   version_t epoch = m->get_epoch();
-  dout(5) << "handle_mds_map epoch " << epoch << " from " << m->get_source() << dendl;
 
   // is it new?
   if (epoch <= mdsmap->get_epoch()) {
-    dout(5) << " old map epoch " << epoch << " <= " << mdsmap->get_epoch()
-           << ", discarding" << dendl;
-    m->put();
+    dout(5) << "handle_mds_map old map epoch " << epoch << " <= "
+            << mdsmap->get_epoch() << ", discarding" << dendl;
     return;
   }
 
-  entity_addr_t addr;
+  dout(1) << "Updating MDS map to version " << epoch << " from " << m->get_source() << dendl;
 
   // keep old map, for a moment
-  MDSMap *oldmap = mdsmap;
+  std::unique_ptr<MDSMap> oldmap;
+  oldmap.swap(mdsmap);
 
   // decode and process
-  mdsmap = new MDSMap;
+  mdsmap.reset(new MDSMap);
   mdsmap->decode(m->get_encoded());
-  const MDSMap::DaemonState new_state = mdsmap->get_state_gid(mds_gid_t(monc->get_global_id()));
-  const int incarnation = mdsmap->get_inc_gid(mds_gid_t(monc->get_global_id()));
 
   monc->sub_got("mdsmap", mdsmap->get_epoch());
 
-  // Calculate my effective rank (either my owned rank or my
-  // standby_for_rank if in standby replay)
-  mds_rank_t whoami = mdsmap->get_rank_gid(mds_gid_t(monc->get_global_id()));
-
   // verify compatset
-  CompatSet mdsmap_compat(get_mdsmap_compat_set_all());
+  CompatSet mdsmap_compat(MDSMap::get_compat_set_all());
   dout(10) << "     my compat " << mdsmap_compat << dendl;
   dout(10) << " mdsmap compat " << mdsmap->compat << dendl;
   if (!mdsmap_compat.writeable(mdsmap->compat)) {
@@ -899,61 +671,71 @@ void MDSDaemon::handle_mds_map(MMDSMap *m)
            << " not writeable with daemon features " << mdsmap_compat
            << ", killing myself" << dendl;
     suicide();
-    goto out;
+    return;
   }
 
-  // mark down any failed peers
-  for (map<mds_gid_t,MDSMap::mds_info_t>::const_iterator p = oldmap->get_mds_info().begin();
-       p != oldmap->get_mds_info().end();
-       ++p) {
-    if (mdsmap->get_mds_info().count(p->first) == 0) {
-      dout(10) << " peer mds gid " << p->first << " removed from map" << dendl;
-      messenger->mark_down(p->second.addr);
-    }
+  // Calculate my effective rank (either my owned rank or the rank I'm following if STATE_STANDBY_REPLAY
+  const auto addrs = messenger->get_myaddrs();
+  const auto myid = monc->get_global_id();
+  const auto mygid = mds_gid_t(myid);
+  const auto whoami = mdsmap->get_rank_gid(mygid);
+  const auto old_state = oldmap->get_state_gid(mygid);
+  const auto new_state = mdsmap->get_state_gid(mygid);
+  const auto incarnation = mdsmap->get_inc_gid(mygid);
+  dout(10) << "my gid is " << myid << dendl;
+  dout(10) << "map says I am mds." << whoami << "." << incarnation
+          << " state " << ceph_mds_state_name(new_state) << dendl;
+  dout(10) << "msgr says I am " << addrs << dendl;
+
+  // If we're removed from the MDSMap, stop all processing.
+  using DS = MDSMap::DaemonState;
+  if (old_state != DS::STATE_NULL && new_state == DS::STATE_NULL) {
+    const auto& oldinfo = oldmap->get_info_gid(mygid);
+    dout(1) << "Map removed me " << oldinfo
+            << " from cluster; respawning! See cluster/monitor logs for details." << dendl;
+    respawn();
   }
 
-  if (whoami == MDS_RANK_NONE && 
-      new_state == MDSMap::STATE_STANDBY_REPLAY) {
-    whoami = mdsmap->get_mds_info_gid(mds_gid_t(monc->get_global_id())).standby_for_rank;
+  if (old_state == DS::STATE_NULL && new_state != DS::STATE_NULL) {
+    /* The MDS has been added to the FSMap, now we can init the MgrClient */
+    mgrc.init();
+    messenger->add_dispatcher_tail(&mgrc);
+    monc->sub_want("mgrmap", 0, 0);
+    monc->renew_subs(); /* MgrMap receipt drives connection to ceph-mgr */
   }
 
-  // see who i am
-  addr = messenger->get_myaddr();
-  dout(10) << "map says I am " << addr << " mds." << whoami << "." << incarnation
-          << " state " << ceph_mds_state_name(new_state) << dendl;
+  // mark down any failed peers
+  for (const auto& [gid, info] : oldmap->get_mds_info()) {
+    if (mdsmap->get_mds_info().count(gid) == 0) {
+      dout(10) << " peer mds gid " << gid << " removed from map" << dendl;
+      messenger->mark_down_addrs(info.addrs);
+    }
+  }
 
   if (whoami == MDS_RANK_NONE) {
-    if (mds_rank != NULL) {
-      const auto myid = monc->get_global_id();
-      // We have entered a rank-holding state, we shouldn't be back
-      // here!
-      if (g_conf->mds_enforce_unique_name) {
-        if (mds_gid_t existing = mdsmap->find_mds_gid_by_name(name)) {
-          const MDSMap::mds_info_t& i = mdsmap->get_info_gid(existing);
-          if (i.global_id > myid) {
-            dout(1) << "map replaced me with another mds." << whoami
-                    << " with gid (" << i.global_id << ") larger than myself ("
-                    << myid << "); quitting!" << dendl;
-            // Call suicide() rather than respawn() because if someone else
-            // has taken our ID, we don't want to keep restarting and
-            // fighting them for the ID.
-            suicide();
-            m->put();
-            return;
-          }
-        }
-      }
+    // We do not hold a rank:
+    dout(10) <<  __func__ << ": handling map in rankless mode" << dendl;
 
-      dout(1) << "map removed me (mds." << whoami << " gid:"
-              << myid << ") from cluster due to lost contact; respawning" << dendl;
-      respawn();
+    if (new_state == DS::STATE_STANDBY) {
+      /* Note: STATE_BOOT is never an actual state in the FSMap. The Monitors
+       * generally mark a new MDS as STANDBY (although it's possible to
+       * immediately be assigned a rank).
+       */
+      if (old_state == DS::STATE_NULL) {
+        dout(1) << "Monitors have assigned me to become a standby." << dendl;
+        beacon.set_want_state(*mdsmap, new_state);
+      } else if (old_state == DS::STATE_STANDBY) {
+        dout(5) << "I am still standby" << dendl;
+      }
+    } else if (new_state == DS::STATE_NULL) {
+      /* We are not in the MDSMap yet! Keep waiting: */
+      ceph_assert(beacon.get_want_state() == DS::STATE_BOOT);
+      dout(10) << "not in map yet" << dendl;
+    } else {
+      /* We moved to standby somehow from another state */
+      ceph_abort("invalid transition to standby");
     }
-    // MDSRank not active: process the map here to see if we have
-    // been assigned a rank.
-    dout(10) <<  __func__ << ": handling map in rankless mode" << dendl;
-    _handle_mds_map(oldmap);
   } else {
-
     // Did we already hold a different rank?  MDSMonitor shouldn't try
     // to change that out from under me!
     if (mds_rank && whoami != mds_rank->get_nodeid()) {
@@ -965,9 +747,9 @@ void MDSDaemon::handle_mds_map(MMDSMap *m)
     // Did I previously not hold a rank?  Initialize!
     if (mds_rank == NULL) {
       mds_rank = new MDSRankDispatcher(whoami, mds_lock, clog,
-          timer, beacon, mdsmap, messenger, monc,
-          new FunctionContext([this](int r){respawn();}),
-          new FunctionContext([this](int r){suicide();}));
+          timer, beacon, mdsmap, messenger, monc, &mgrc,
+          new LambdaContext([this](int r){respawn();}),
+          new LambdaContext([this](int r){suicide();}));
       dout(10) <<  __func__ << ": initializing MDS rank "
                << mds_rank->get_nodeid() << dendl;
       mds_rank->init();
@@ -976,47 +758,18 @@ void MDSDaemon::handle_mds_map(MMDSMap *m)
     // MDSRank is active: let him process the map, we have no say.
     dout(10) <<  __func__ << ": handling map as rank "
              << mds_rank->get_nodeid() << dendl;
-    mds_rank->handle_mds_map(m, oldmap);
+    mds_rank->handle_mds_map(m, *oldmap);
   }
 
-out:
-  beacon.notify_mdsmap(mdsmap);
-  m->put();
-  delete oldmap;
-}
-
-void MDSDaemon::_handle_mds_map(MDSMap *oldmap)
-{
-  MDSMap::DaemonState new_state = mdsmap->get_state_gid(mds_gid_t(monc->get_global_id()));
-
-  // Normal rankless case, we're marked as standby
-  if (new_state == MDSMap::STATE_STANDBY) {
-    beacon.set_want_state(mdsmap, new_state);
-    dout(1) << "handle_mds_map standby" << dendl;
-
-    return;
-  }
-
-  // Case where we thought we were standby, but MDSMap disagrees
-  if (beacon.get_want_state() == MDSMap::STATE_STANDBY) {
-    dout(10) << "dropped out of mdsmap, try to re-add myself" << dendl;
-    new_state = MDSMap::STATE_BOOT;
-    beacon.set_want_state(mdsmap, new_state);
-    return;
-  }
-
-  // Case where we have sent a boot beacon that isn't reflected yet
-  if (beacon.get_want_state() == MDSMap::STATE_BOOT) {
-    dout(10) << "not in map yet" << dendl;
-  }
+  beacon.notify_mdsmap(*mdsmap);
 }
 
 void MDSDaemon::handle_signal(int signum)
 {
-  assert(signum == SIGINT || signum == SIGTERM);
+  ceph_assert(signum == SIGINT || signum == SIGTERM);
   derr << "*** got signal " << sig_str(signum) << " ***" << dendl;
   {
-    Mutex::Locker l(mds_lock);
+    std::lock_guard l(mds_lock);
     if (stopping) {
       return;
     }
@@ -1026,13 +779,13 @@ void MDSDaemon::handle_signal(int signum)
 
 void MDSDaemon::suicide()
 {
-  assert(mds_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(mds_lock));
   
   // make sure we don't suicide twice
-  assert(stopping == false);
+  ceph_assert(stopping == false);
   stopping = true;
 
-  dout(1) << "suicide.  wanted state "
+  dout(1) << "suicide! Wanted state "
           << ceph_mds_state_name(beacon.get_want_state()) << dendl;
 
   if (tick_event) {
@@ -1040,24 +793,19 @@ void MDSDaemon::suicide()
     tick_event = 0;
   }
 
-  //because add_observer is called after set_up_admin_socket
-  //so we can use asok_hook to avoid assert in the remove_observer
-  if (asok_hook != NULL)
-    g_conf->remove_observer(this);
-
   clean_up_admin_socket();
 
-  // Inform MDS we are going away, then shut down beacon
-  beacon.set_want_state(mdsmap, MDSMap::STATE_DNE);
+  // Notify the Monitors (MDSMonitor) that we're dying, so that it doesn't have
+  // to wait for us to go laggy. Only do this if we're actually in the MDSMap,
+  // because otherwise the MDSMonitor will drop our message.
+  beacon.set_want_state(*mdsmap, MDSMap::STATE_DNE);
   if (!mdsmap->is_dne_gid(mds_gid_t(monc->get_global_id()))) {
-    // Notify the MDSMonitor that we're dying, so that it doesn't have to
-    // wait for us to go laggy.  Only do this if we're actually in the
-    // MDSMap, because otherwise the MDSMonitor will drop our message.
     beacon.send_and_wait(1);
   }
   beacon.shutdown();
 
-  mgrc.shutdown();
+  if (mgrc.is_initialized())
+    mgrc.shutdown();
 
   if (mds_rank) {
     mds_rank->shutdown();
@@ -1071,7 +819,19 @@ void MDSDaemon::suicide()
 
 void MDSDaemon::respawn()
 {
-  dout(1) << "respawn" << dendl;
+  // --- WARNING TO FUTURE COPY/PASTERS ---
+  // You must also add a call like
+  //
+  //   ceph_pthread_setname(pthread_self(), "ceph-mds");
+  //
+  // to main() so that /proc/$pid/stat field 2 contains "(ceph-mds)"
+  // instead of "(exe)", so that killall (and log rotation) will work.
+
+  dout(1) << "respawn!" << dendl;
+
+  /* Dump recent in case the MDS was stuck doing something which caused it to
+   * be removed from the MDSMap leading to respawn. */
+  g_ceph_context->_log->dump_recent();
 
   char *new_argv[orig_argc+1];
   dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
@@ -1086,18 +846,22 @@ void MDSDaemon::respawn()
    * unlinked.
    */
   char exe_path[PATH_MAX] = "";
-  if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) == -1) {
+#ifdef PROCPREFIX
+  if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
+    dout(1) << "respawning with exe " << exe_path << dendl;
+    strcpy(exe_path, PROCPREFIX "/proc/self/exe");
+  } else {
+#else
+  {
+#endif
     /* Print CWD for the user's interest */
     char buf[PATH_MAX];
     char *cwd = getcwd(buf, sizeof(buf));
-    assert(cwd);
+    ceph_assert(cwd);
     dout(1) << " cwd " << cwd << dendl;
 
     /* Fall back to a best-effort: just running in our CWD */
     strncpy(exe_path, orig_argv[0], PATH_MAX-1);
-  } else {
-    dout(1) << "respawning with exe " << exe_path << dendl;
-    strcpy(exe_path, PROCPREFIX "/proc/self/exe");
   }
 
   dout(1) << " exe_path " << exe_path << dendl;
@@ -1115,9 +879,9 @@ void MDSDaemon::respawn()
 
 
 
-bool MDSDaemon::ms_dispatch(Message *m)
+bool MDSDaemon::ms_dispatch2(const ref_t<Message> &m)
 {
-  Mutex::Locker l(mds_lock);
+  std::lock_guard l(mds_lock);
   if (stopping) {
     return false;
   }
@@ -1125,7 +889,6 @@ bool MDSDaemon::ms_dispatch(Message *m)
   // Drop out early if shutting down
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
     dout(10) << " stopping, discarding " << *m << dendl;
-    m->put();
     return true;
   }
 
@@ -1143,45 +906,41 @@ bool MDSDaemon::ms_dispatch(Message *m)
   }
 }
 
-bool MDSDaemon::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
-{
-  dout(10) << "MDSDaemon::ms_get_authorizer type="
-           << ceph_entity_type_name(dest_type) << dendl;
-
-  /* monitor authorization is being handled on different layer */
-  if (dest_type == CEPH_ENTITY_TYPE_MON)
-    return true;
-
-  if (force_new) {
-    if (monc->wait_auth_rotating(10) < 0)
-      return false;
-  }
-
-  *authorizer = monc->build_authorizer(dest_type);
-  return *authorizer != NULL;
-}
-
-
 /*
  * high priority messages we always process
  */
-bool MDSDaemon::handle_core_message(Message *m)
+
+#define ALLOW_MESSAGES_FROM(peers)                                      \
+  do {                                                                  \
+    if (m->get_connection() && (m->get_connection()->get_peer_type() & (peers)) == 0) { \
+      dout(0) << __FILE__ << "." << __LINE__ << ": filtered out request, peer=" \
+              << m->get_connection()->get_peer_type() << " allowing="   \
+              << #peers << " message=" << *m << dendl;                  \
+      return true;                                                      \
+    }                                                                   \
+  } while (0)
+
+bool MDSDaemon::handle_core_message(const cref_t<Message> &m)
 {
   switch (m->get_type()) {
   case CEPH_MSG_MON_MAP:
     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
-    m->put();
     break;
 
     // MDS
   case CEPH_MSG_MDS_MAP:
     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_MDS);
-    handle_mds_map(static_cast<MMDSMap*>(m));
+    handle_mds_map(ref_cast<MMDSMap>(m));
+    break;
+
+  case MSG_REMOVE_SNAPS:
+    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
+    mds_rank->snapserver->handle_remove_snaps(ref_cast<MRemoveSnaps>(m));
     break;
 
     // OSD
   case MSG_COMMAND:
-    handle_command(static_cast<MCommand*>(m));
+    handle_command(ref_cast<MCommand>(m));
     break;
   case CEPH_MSG_OSD_MAP:
     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
@@ -1189,13 +948,11 @@ bool MDSDaemon::handle_core_message(Message *m)
     if (mds_rank) {
       mds_rank->handle_osd_map();
     }
-    m->put();
     break;
 
   case MSG_MON_COMMAND:
     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
     clog->warn() << "dropping `mds tell` command from legacy monitor";
-    m->put();
     break;
 
   default:
@@ -1213,22 +970,21 @@ bool MDSDaemon::ms_handle_reset(Connection *con)
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT)
     return false;
 
-  Mutex::Locker l(mds_lock);
+  std::lock_guard l(mds_lock);
   if (stopping) {
     return false;
   }
-  dout(5) << "ms_handle_reset on " << con->get_peer_addr() << dendl;
+  dout(5) << "ms_handle_reset on " << con->get_peer_socket_addr() << dendl;
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
     return false;
 
-  Session *session = static_cast<Session *>(con->get_priv());
-  if (session) {
+  auto priv = con->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     if (session->is_closed()) {
       dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl;
       con->mark_down();
-      con->set_priv(NULL);
+      con->set_priv(nullptr);
     }
-    session->put();
   } else {
     con->mark_down();
   }
@@ -1241,23 +997,22 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con)
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT)
     return;
 
-  Mutex::Locker l(mds_lock);
+  std::lock_guard l(mds_lock);
   if (stopping) {
     return;
   }
 
-  dout(5) << "ms_handle_remote_reset on " << con->get_peer_addr() << dendl;
+  dout(5) << "ms_handle_remote_reset on " << con->get_peer_socket_addr() << dendl;
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
     return;
 
-  Session *session = static_cast<Session *>(con->get_priv());
-  if (session) {
+  auto priv = con->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     if (session->is_closed()) {
       dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl;
       con->mark_down();
-      con->set_priv(NULL);
+      con->set_priv(nullptr);
     }
-    session->put();
   }
 }
 
@@ -1267,148 +1022,92 @@ bool MDSDaemon::ms_handle_refused(Connection *con)
   return false;
 }
 
-bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
-                              int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
-                                    bool& is_valid, CryptoKey& session_key,
-                                    std::unique_ptr<AuthAuthorizerChallenge> *challenge)
+bool MDSDaemon::parse_caps(const AuthCapsInfo& info, MDSAuthCaps& caps)
 {
-  Mutex::Locker l(mds_lock);
-  if (stopping) {
-    return false;
-  }
-  if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
-    return false;
-
-  AuthAuthorizeHandler *authorize_handler = 0;
-  switch (peer_type) {
-  case CEPH_ENTITY_TYPE_MDS:
-    authorize_handler = authorize_handler_cluster_registry->get_handler(protocol);
-    break;
-  default:
-    authorize_handler = authorize_handler_service_registry->get_handler(protocol);
-  }
-  if (!authorize_handler) {
-    dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
-    is_valid = false;
+  caps.clear();
+  if (info.allow_all) {
+    caps.set_allow_all();
     return true;
-  }
-
-  AuthCapsInfo caps_info;
-  EntityName name;
-  uint64_t global_id;
-
-  RotatingKeyRing *keys = monc->rotating_secrets.get();
-  if (keys) {
-    is_valid = authorize_handler->verify_authorizer(
-      cct, keys,
-      authorizer_data, authorizer_reply, name, global_id, caps_info,
-      session_key, nullptr, challenge);
   } else {
-    dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
-    is_valid = false;
-  }
-
-  if (is_valid) {
-    entity_name_t n(con->get_peer_type(), global_id);
-
-    // We allow connections and assign Session instances to connections
-    // even if we have not been assigned a rank, because clients with
-    // "allow *" are allowed to connect and do 'tell' operations before
-    // we have a rank.
-    Session *s = NULL;
-    if (mds_rank) {
-      // If we do hold a rank, see if this is an existing client establishing
-      // a new connection, rather than a new client
-      s = mds_rank->sessionmap.get_session(n);
-    }
-
-    // Wire up a Session* to this connection
-    // It doesn't go into a SessionMap instance until it sends an explicit
-    // request to open a session (initial state of Session is `closed`)
-    if (!s) {
-      s = new Session;
-      s->info.auth_name = name;
-      s->info.inst.addr = con->get_peer_addr();
-      s->info.inst.name = n;
-      dout(10) << " new session " << s << " for " << s->info.inst << " con " << con << dendl;
-      con->set_priv(s);
-      s->connection = con;
-      if (mds_rank) {
-        mds_rank->kick_waiters_for_any_client_connection();
-      }
-    } else {
-      dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection
-              << ", new/authorizing con " << con << dendl;
-      con->set_priv(s->get());
-
-
-
-      // Wait until we fully accept the connection before setting
-      // s->connection.  In particular, if there are multiple incoming
-      // connection attempts, they will all get their authorizer
-      // validated, but some of them may "lose the race" and get
-      // dropped.  We only want to consider the winner(s).  See
-      // ms_handle_accept().  This is important for Sessions we replay
-      // from the journal on recovery that don't have established
-      // messenger state; we want the con from only the winning
-      // connect attempt(s).  (Normal reconnects that don't follow MDS
-      // recovery are reconnected to the existing con by the
-      // messenger.)
+    auto it = info.caps.begin();
+    string auth_cap_str;
+    try {
+      decode(auth_cap_str, it);
+    } catch (const buffer::error& e) {
+      dout(1) << __func__ << ": cannot decode auth caps buffer of length " << info.caps.length() << dendl;
+      return false;
     }
 
-    if (caps_info.allow_all) {
-      // Flag for auth providers that don't provide cap strings
-      s->auth_caps.set_allow_all();
+    dout(10) << __func__ << ": parsing auth_cap_str='" << auth_cap_str << "'" << dendl;
+    CachedStackStringStream cs;
+    if (caps.parse(g_ceph_context, auth_cap_str, cs.get())) {
+      return true;
     } else {
-      bufferlist::iterator p = caps_info.caps.begin();
-      string auth_cap_str;
-      try {
-        ::decode(auth_cap_str, p);
-
-        dout(10) << __func__ << ": parsing auth_cap_str='" << auth_cap_str << "'" << dendl;
-        std::ostringstream errstr;
-        if (!s->auth_caps.parse(g_ceph_context, auth_cap_str, &errstr)) {
-          dout(1) << __func__ << ": auth cap parse error: " << errstr.str()
-                 << " parsing '" << auth_cap_str << "'" << dendl;
-         clog->warn() << name << " mds cap '" << auth_cap_str
-                      << "' does not parse: " << errstr.str();
-          is_valid = false;
-        }
-      } catch (buffer::error& e) {
-        // Assume legacy auth, defaults to:
-        //  * permit all filesystem ops
-        //  * permit no `tell` ops
-        dout(1) << __func__ << ": cannot decode auth caps bl of length " << caps_info.caps.length() << dendl;
-        is_valid = false;
-      }
+      dout(1) << __func__ << ": auth cap parse error: " << cs->strv() << " parsing '" << auth_cap_str << "'" << dendl;
+      return false;
     }
   }
-
-  return true;  // we made a decision (see is_valid)
 }
 
+int MDSDaemon::ms_handle_authentication(Connection *con)
+{
+  /* N.B. without mds_lock! */
+  MDSAuthCaps caps;
+  return parse_caps(con->get_peer_caps_info(), caps) ? 0 : -1;
+}
 
 void MDSDaemon::ms_handle_accept(Connection *con)
 {
-  Mutex::Locker l(mds_lock);
+  entity_name_t n(con->get_peer_type(), con->get_peer_global_id());
+  std::lock_guard l(mds_lock);
   if (stopping) {
     return;
   }
 
-  Session *s = static_cast<Session *>(con->get_priv());
-  dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
+  // We allow connections and assign Session instances to connections
+  // even if we have not been assigned a rank, because clients with
+  // "allow *" are allowed to connect and do 'tell' operations before
+  // we have a rank.
+  Session *s = NULL;
+  if (mds_rank) {
+    // If we do hold a rank, see if this is an existing client establishing
+    // a new connection, rather than a new client
+    s = mds_rank->sessionmap.get_session(n);
+  }
+
+  // Wire up a Session* to this connection
+  // It doesn't go into a SessionMap instance until it sends an explicit
+  // request to open a session (initial state of Session is `closed`)
+  if (!s) {
+    s = new Session(con);
+    dout(10) << " new session " << s << " for " << s->info.inst
+            << " con " << con << dendl;
+    con->set_priv(RefCountedPtr{s, false});
+    if (mds_rank) {
+      mds_rank->kick_waiters_for_any_client_connection();
+    }
+  } else {
+    dout(10) << " existing session " << s << " for " << s->info.inst
+            << " existing con " << s->get_connection()
+            << ", new/authorizing con " << con << dendl;
+    con->set_priv(RefCountedPtr{s});
+  }
+
+  parse_caps(con->get_peer_caps_info(), s->auth_caps);
+
+  dout(10) << "ms_handle_accept " << con->get_peer_socket_addr() << " con " << con << " session " << s << dendl;
   if (s) {
-    if (s->connection != con) {
-      dout(10) << " session connection " << s->connection << " -> " << con << dendl;
-      s->connection = con;
+    if (s->get_connection() != con) {
+      dout(10) << " session connection " << s->get_connection()
+              << " -> " << con << dendl;
+      s->set_connection(con);
 
       // send out any queued messages
       while (!s->preopen_out_queue.empty()) {
-       con->send_message(s->preopen_out_queue.front());
+       con->send_message2(s->preopen_out_queue.front());
        s->preopen_out_queue.pop_front();
       }
     }
-    s->put();
   }
 }