]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/MDSMonitor.cc
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / mon / MDSMonitor.cc
index 436c9ff42a3b96be81ebf431caa2c77500bbe7d8..091206a684a9194b78a93d90febeaa57a24f647d 100644 (file)
  * 
  */
 
+#include <regex>
 #include <sstream>
 #include <boost/utility.hpp>
-#include <boost/regex.hpp>
 
 #include "MDSMonitor.h"
 #include "FSCommands.h"
 #include "Monitor.h"
 #include "MonitorDBStore.h"
 #include "OSDMonitor.h"
-#include "PGMonitor.h"
 
 #include "common/strtol.h"
 #include "common/perf_counters.h"
 #include "messages/MMonCommand.h"
 #include "messages/MGenericMessage.h"
 
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 #include "include/str_list.h"
 #include "include/stringify.h"
 #include "mds/mdstypes.h"
 #include "Session.h"
 
+using namespace TOPNSPC::common;
+
+using std::dec;
+using std::hex;
+using std::list;
+using std::map;
+using std::make_pair;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::string_view;
+using std::stringstream;
+using std::to_string;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+using ceph::ErasureCodeInterfaceRef;
+using ceph::ErasureCodeProfile;
+using ceph::Formatter;
+using ceph::JSONFormatter;
+using ceph::make_message;
+using ceph::mono_clock;
+using ceph::mono_time;
+
 #define dout_subsys ceph_subsys_mon
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, mon, get_fsmap())
-static ostream& _prefix(std::ostream *_dout, Monitor *mon, const FSMap& fsmap) {
-  return *_dout << "mon." << mon->name << "@" << mon->rank
-               << "(" << mon->get_state_name()
+static ostream& _prefix(std::ostream *_dout, Monitor &mon, const FSMap& fsmap) {
+  return *_dout << "mon." << mon.name << "@" << mon.rank
+               << "(" << mon.get_state_name()
                << ").mds e" << fsmap.get_epoch() << " ";
 }
 
@@ -57,29 +84,31 @@ static const string MDS_HEALTH_PREFIX("mds_health");
  * Specialized implementation of cmd_getval to allow us to parse
  * out strongly-typedef'd types
  */
-template<> bool cmd_getval(CephContext *cct, const cmdmap_t& cmdmap,
-                          const std::string& k, mds_gid_t &val)
+namespace TOPNSPC::common {
+template<> bool cmd_getval(const cmdmap_t& cmdmap,
+                          std::string_view k, mds_gid_t &val)
 {
-  return cmd_getval(cct, cmdmap, k, (int64_t&)val);
+  return cmd_getval(cmdmap, k, (int64_t&)val);
 }
 
-template<> bool cmd_getval(CephContext *cct, const cmdmap_t& cmdmap,
-                          const std::string& k, mds_rank_t &val)
+template<> bool cmd_getval(const cmdmap_t& cmdmap,
+                          std::string_view k, mds_rank_t &val)
 {
-  return cmd_getval(cct, cmdmap, k, (int64_t&)val);
+  return cmd_getval(cmdmap, k, (int64_t&)val);
 }
 
-template<> bool cmd_getval(CephContext *cct, const cmdmap_t& cmdmap,
-                          const std::string& k, MDSMap::DaemonState &val)
+template<> bool cmd_getval(const cmdmap_t& cmdmap,
+                          std::string_view k, MDSMap::DaemonState &val)
 {
-  return cmd_getval(cct, cmdmap, k, (int64_t&)val);
+  return cmd_getval(cmdmap, k, (int64_t&)val);
+}
 }
-
 // my methods
 
-void MDSMonitor::print_map(const FSMap &m, int dbl)
+template <int dblV>
+void MDSMonitor::print_map(const FSMap& m)
 {
-  dout(dbl) << "print_map\n";
+  dout(dblV) << "print_map\n";
   m.print(*_dout);
   *_dout << dendl;
 }
@@ -90,7 +119,7 @@ void MDSMonitor::create_initial()
   dout(10) << "create_initial" << dendl;
 }
 
-void MDSMonitor::get_store_prefixes(std::set<string>& s)
+void MDSMonitor::get_store_prefixes(std::set<string>& s) const
 {
   s.insert(service_name);
   s.insert(MDS_METADATA_PREFIX);
@@ -105,7 +134,7 @@ void MDSMonitor::update_from_paxos(bool *need_bootstrap)
 
   dout(10) << __func__ << " version " << version
           << ", my e " << get_fsmap().epoch << dendl;
-  assert(version > get_fsmap().epoch);
+  ceph_assert(version > get_fsmap().epoch);
 
   load_health();
 
@@ -113,21 +142,25 @@ void MDSMonitor::update_from_paxos(bool *need_bootstrap)
   bufferlist fsmap_bl;
   fsmap_bl.clear();
   int err = get_version(version, fsmap_bl);
-  assert(err == 0);
+  ceph_assert(err == 0);
 
-  assert(fsmap_bl.length() > 0);
+  ceph_assert(fsmap_bl.length() > 0);
   dout(10) << __func__ << " got " << version << dendl;
-  PaxosFSMap::decode(fsmap_bl);
+  try {
+    PaxosFSMap::decode(fsmap_bl);
+  } catch (const ceph::buffer::malformed_input& e) {
+    derr << "unable to decode FSMap: " << e.what() << dendl;
+    throw;
+  }
 
   // new map
   dout(0) << "new map" << dendl;
-  print_map(get_fsmap(), 0);
-  if (!g_conf->mon_mds_skip_sanity) {
+  print_map<0>(get_fsmap());
+  if (!g_conf()->mon_mds_skip_sanity) {
     get_fsmap().sanity();
   }
 
   check_subs();
-  update_logger();
 }
 
 void MDSMonitor::init()
@@ -139,8 +172,8 @@ void MDSMonitor::create_pending()
 {
   auto &fsmap = PaxosFSMap::create_pending();
 
-  if (mon->osdmon()->is_readable()) {
-    const auto &osdmap = mon->osdmon()->osdmap;
+  if (mon.osdmon()->is_readable()) {
+    const auto &osdmap = mon.osdmon()->osdmap;
     fsmap.sanitize([&osdmap](int64_t pool){return osdmap.have_pg_pool(pool);});
   }
 
@@ -155,9 +188,9 @@ void MDSMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   dout(10) << "encode_pending e" << epoch << dendl;
 
   // print map iff 'debug mon = 30' or higher
-  print_map(get_pending_fsmap(), 30);
-  if (!g_conf->mon_mds_skip_sanity) {
-    pending.sanity();
+  print_map<30>(pending);
+  if (!g_conf()->mon_mds_skip_sanity) {
+    pending.sanity(true);
   }
 
   // Set 'modified' on maps modified this epoch
@@ -168,9 +201,9 @@ void MDSMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   }
 
   // apply to paxos
-  assert(get_last_committed() + 1 == pending.epoch);
+  ceph_assert(get_last_committed() + 1 == pending.epoch);
   bufferlist pending_bl;
-  pending.encode(pending_bl, mon->get_quorum_con_features());
+  pending.encode(pending_bl, mon.get_quorum_con_features());
 
   /* put everything in the transaction */
   put_version(t, pending.epoch, pending_bl);
@@ -206,22 +239,26 @@ void MDSMonitor::encode_pending(MonitorDBStore::TransactionRef t)
       health = p->second;
     } else {
       bufferlist bl;
-      mon->store->get(MDS_HEALTH_PREFIX, stringify(gid), bl);
+      mon.store->get(MDS_HEALTH_PREFIX, stringify(gid), bl);
       if (!bl.length()) {
        derr << "Missing health data for MDS " << gid << dendl;
        continue;
       }
-      bufferlist::iterator bl_i = bl.begin();
+      auto bl_i = bl.cbegin();
       health.decode(bl_i);
     }
     for (const auto &metric : health.metrics) {
-      const int rank = info.rank;
+      if (metric.type == MDS_HEALTH_DUMMY) {
+        continue;
+      }
+      const auto rank = info.rank;
       health_check_t *check = &new_checks.get_or_add(
        mds_metric_name(metric.type),
        metric.sev,
-       mds_metric_summary(metric.type));
+       mds_metric_summary(metric.type),
+       1);
       ostringstream ss;
-      ss << "mds" << info.name << "(mds." << rank << "): " << metric.message;
+      ss << "mds." << info.name << "(mds." << rank << "): " << metric.message;
       bool first = true;
       for (auto &p : metric.metadata) {
        if (first) {
@@ -237,71 +274,53 @@ void MDSMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   }
   pending.get_health_checks(&new_checks);
   for (auto& p : new_checks.checks) {
-    p.second.summary = boost::regex_replace(
+    p.second.summary = std::regex_replace(
       p.second.summary,
-      boost::regex("%num%"),
+      std::regex("%num%"),
       stringify(p.second.detail.size()));
-    p.second.summary = boost::regex_replace(
+    p.second.summary = std::regex_replace(
       p.second.summary,
-      boost::regex("%plurals%"),
+      std::regex("%plurals%"),
       p.second.detail.size() > 1 ? "s" : "");
-    p.second.summary = boost::regex_replace(
+    p.second.summary = std::regex_replace(
       p.second.summary,
-      boost::regex("%isorare%"),
+      std::regex("%isorare%"),
       p.second.detail.size() > 1 ? "are" : "is");
-    p.second.summary = boost::regex_replace(
+    p.second.summary = std::regex_replace(
       p.second.summary,
-      boost::regex("%hasorhave%"),
+      std::regex("%hasorhave%"),
       p.second.detail.size() > 1 ? "have" : "has");
   }
   encode_health(new_checks, t);
 }
 
-version_t MDSMonitor::get_trim_to()
+version_t MDSMonitor::get_trim_to() const
 {
   version_t floor = 0;
-  if (g_conf->mon_mds_force_trim_to > 0 &&
-      g_conf->mon_mds_force_trim_to < (int)get_last_committed()) {
-    floor = g_conf->mon_mds_force_trim_to;
+  if (g_conf()->mon_mds_force_trim_to > 0 &&
+      g_conf()->mon_mds_force_trim_to <= (int)get_last_committed()) {
+    floor = g_conf()->mon_mds_force_trim_to;
     dout(10) << __func__ << " explicit mon_mds_force_trim_to = "
              << floor << dendl;
   }
 
-  unsigned max = g_conf->mon_max_mdsmap_epochs;
+  unsigned max = g_conf()->mon_max_mdsmap_epochs;
   version_t last = get_last_committed();
 
-  if (last - get_first_committed() > max && floor < last - max)
-    return last - max;
-  return floor;
-}
-
-void MDSMonitor::update_logger()
-{
-  dout(10) << "update_logger" << dendl;
-
-  const auto &fsmap = get_fsmap();
-
-  uint64_t up = 0;
-  uint64_t in = 0;
-  uint64_t failed = 0;
-  for (const auto &i : fsmap.filesystems) {
-    const MDSMap &mds_map = i.second->mds_map;
-
-    up += mds_map.get_num_up_mds();
-    in += mds_map.get_num_in_mds();
-    failed += mds_map.get_num_failed_mds();
+  if (last - get_first_committed() > max && floor < last - max) {
+    floor = last-max;
   }
-  mon->cluster_logger->set(l_cluster_num_mds_up, up);
-  mon->cluster_logger->set(l_cluster_num_mds_in, in);
-  mon->cluster_logger->set(l_cluster_num_mds_failed, failed);
-  mon->cluster_logger->set(l_cluster_mds_epoch, fsmap.get_epoch());
+
+  dout(20) << __func__ << " = " << floor << dendl;
+  return floor;
 }
 
 bool MDSMonitor::preprocess_query(MonOpRequestRef op)
 {
   op->mark_mdsmon_event(__func__);
-  PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
-  dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
+  auto m = op->get_req<PaxosServiceMessage>();
+  dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source()
+          << " " << m->get_orig_source_addrs() << dendl;
 
   switch (m->get_type()) {
     
@@ -311,10 +330,9 @@ bool MDSMonitor::preprocess_query(MonOpRequestRef op)
   case MSG_MON_COMMAND:
     try {
       return preprocess_command(op);
-    }
-    catch (const bad_cmd_get& e) {
+    } catch (const bad_cmd_get& e) {
       bufferlist bl;
-      mon->reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
+      mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
       return true;
     }
 
@@ -341,7 +359,7 @@ void MDSMonitor::_note_beacon(MMDSBeacon *m)
 bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
 {
   op->mark_mdsmon_event(__func__);
-  MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
+  auto m = op->get_req<MMDSBeacon>();
   MDSMap::DaemonState state = m->get_state();
   mds_gid_t gid = m->get_global_id();
   version_t seq = m->get_seq();
@@ -351,21 +369,23 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
   const auto &fsmap = get_fsmap();
 
   // check privileges, ignore if fails
-  MonSession *session = m->get_session();
-  assert(session);
+  MonSession *session = op->get_session();
+  if (!session)
+    goto ignore;
   if (!session->is_capable("mds", MON_CAP_X)) {
     dout(0) << "preprocess_beacon got MMDSBeacon from entity with insufficient privileges "
            << session->caps << dendl;
     goto ignore;
   }
 
-  if (m->get_fsid() != mon->monmap->fsid) {
-    dout(0) << "preprocess_beacon on fsid " << m->get_fsid() << " != " << mon->monmap->fsid << dendl;
+  if (m->get_fsid() != mon.monmap->fsid) {
+    dout(0) << "preprocess_beacon on fsid " << m->get_fsid() << " != " << mon.monmap->fsid << dendl;
     goto ignore;
   }
 
   dout(5)  << "preprocess_beacon " << *m
-          << " from " << m->get_orig_source_inst()
+          << " from " << m->get_orig_source()
+          << " " << m->get_orig_source_addrs()
           << " " << m->get_compat()
           << dendl;
 
@@ -375,12 +395,6 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
     goto ignore;
   }
 
-  // check compat
-  if (!m->get_compat().writeable(fsmap.compat)) {
-    dout(1) << " mds " << m->get_source_inst() << " can't write to fsmap " << fsmap.compat << dendl;
-    goto ignore;
-  }
-
   // fw to leader?
   if (!is_leader())
     return false;
@@ -395,18 +409,27 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
        * know which FS it was part of. Nor does this matter. Sending an empty
        * MDSMap is sufficient for getting the MDS to respawn.
        */
-      MDSMap null_map;
-      null_map.epoch = fsmap.epoch;
-      null_map.compat = fsmap.compat;
-      mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &null_map));
+      auto m = make_message<MMDSMap>(mon.monmap->fsid, MDSMap::create_null_mdsmap());
+      mon.send_reply(op, m.detach());
       return true;
     } else {
-      return false;  // not booted yet.
+      /* check if we've already recorded its entry in pending */
+      const auto& pending = get_pending_fsmap();
+      if (pending.gid_exists(gid)) {
+        /* MDS is already booted. */
+        goto ignore;
+      } else {
+        return false;  // not booted yet.
+      }
     }
   }
   dout(10) << __func__ << ": GID exists in map: " << gid << dendl;
   info = fsmap.get_info_gid(gid);
 
+  if (state == MDSMap::STATE_DNE) {
+    return false;
+  }
+
   // old seq?
   if (info.state_seq > seq) {
     dout(7) << "mds_beacon " << *m << " has old seq, ignoring" << dendl;
@@ -436,25 +459,30 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
     // ignore, already booted.
     goto ignore;
   }
-  // is there a state change here?
-  if (info.state != state) {
-    // legal state change?
-    if ((info.state == MDSMap::STATE_STANDBY ||
-        info.state == MDSMap::STATE_STANDBY_REPLAY) && state > 0) {
-      dout(10) << "mds_beacon mds can't activate itself (" << ceph_mds_state_name(info.state)
-              << " -> " << ceph_mds_state_name(state) << ")" << dendl;
-      goto reply;
-    }
 
-    if ((state == MDSMap::STATE_STANDBY || state == MDSMap::STATE_STANDBY_REPLAY)
-        && info.rank != MDS_RANK_NONE)
-    {
-      dout(4) << "mds_beacon MDS can't go back into standby after taking rank: "
-                 "held rank " << info.rank << " while requesting state "
-              << ceph_mds_state_name(state) << dendl;
-      goto reply;
+  // did the join_fscid change
+  if (m->get_fs().size()) {
+    fs_cluster_id_t fscid = FS_CLUSTER_ID_NONE;
+    auto f = fsmap.get_filesystem(m->get_fs());
+    if (f) {
+      fscid = f->fscid;
     }
-    
+    if (info.join_fscid != fscid) {
+      dout(10) << __func__ << " standby mds_join_fs changed to " << fscid
+               << " (" << m->get_fs() << ")" << dendl;
+      _note_beacon(m);
+      return false;
+    }
+  } else {
+    if (info.join_fscid != FS_CLUSTER_ID_NONE) {
+      dout(10) << __func__ << " standby mds_join_fs was cleared" << dendl;
+      _note_beacon(m);
+      return false;
+    }
+  }
+
+  // is there a state change here?
+  if (info.state != state) {
     _note_beacon(m);
     return false;
   }
@@ -470,30 +498,32 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
 
  reply:
   // note time and reply
-  assert(effective_epoch > 0);
+  ceph_assert(effective_epoch > 0);
   _note_beacon(m);
-  mon->send_reply(op,
-                 new MMDSBeacon(mon->monmap->fsid, m->get_global_id(), m->get_name(),
-                                effective_epoch, state, seq,
-                                CEPH_FEATURES_SUPPORTED_DEFAULT));
+  {
+    auto beacon = make_message<MMDSBeacon>(mon.monmap->fsid,
+        m->get_global_id(), m->get_name(), effective_epoch,
+        state, seq, CEPH_FEATURES_SUPPORTED_DEFAULT);
+    mon.send_reply(op, beacon.detach());
+  }
   return true;
 
  ignore:
   // I won't reply this beacon, drop it.
-  mon->no_reply(op);
+  mon.no_reply(op);
   return true;
 }
 
 bool MDSMonitor::preprocess_offload_targets(MonOpRequestRef op)
 {
   op->mark_mdsmon_event(__func__);
-  MMDSLoadTargets *m = static_cast<MMDSLoadTargets*>(op->get_req());
+  auto m = op->get_req<MMDSLoadTargets>();
   dout(10) << "preprocess_offload_targets " << *m << " from " << m->get_orig_source() << dendl;
 
   const auto &fsmap = get_fsmap();
   
   // check privileges, ignore message if fails
-  MonSession *session = m->get_session();
+  MonSession *session = op->get_session();
   if (!session)
     goto ignore;
   if (!session->is_capable("mds", MON_CAP_X)) {
@@ -509,7 +539,7 @@ bool MDSMonitor::preprocess_offload_targets(MonOpRequestRef op)
   return false;
 
  ignore:
-  mon->no_reply(op);
+  mon.no_reply(op);
   return true;
 }
 
@@ -517,7 +547,7 @@ bool MDSMonitor::preprocess_offload_targets(MonOpRequestRef op)
 bool MDSMonitor::prepare_update(MonOpRequestRef op)
 {
   op->mark_mdsmon_event(__func__);
-  PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
+  auto m = op->get_req<PaxosServiceMessage>();
   dout(7) << "prepare_update " << *m << dendl;
 
   switch (m->get_type()) {
@@ -528,11 +558,10 @@ bool MDSMonitor::prepare_update(MonOpRequestRef op)
   case MSG_MON_COMMAND:
     try {
       return prepare_command(op);
-    }
-    catch (const bad_cmd_get& e) {
+    } catch (const bad_cmd_get& e) {
       bufferlist bl;
-      mon->reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
-      return true;
+      mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
+      return false; /* nothing to propose */
     }
 
   case MSG_MDS_OFFLOAD_TARGETS:
@@ -542,16 +571,17 @@ bool MDSMonitor::prepare_update(MonOpRequestRef op)
     ceph_abort();
   }
 
-  return true;
+  return false; /* nothing to propose! */
 }
 
 bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
 {
   op->mark_mdsmon_event(__func__);
-  MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
+  auto m = op->get_req<MMDSBeacon>();
   // -- this is an update --
-  dout(12) << "prepare_beacon " << *m << " from " << m->get_orig_source_inst() << dendl;
-  entity_addr_t addr = m->get_orig_source_inst().addr;
+  dout(12) << "prepare_beacon " << *m << " from " << m->get_orig_source()
+          << " " << m->get_orig_source_addrs() << dendl;
+  entity_addrvec_t addrs = m->get_orig_source_addrs();
   mds_gid_t gid = m->get_global_id();
   MDSMap::DaemonState state = m->get_state();
   version_t seq = m->get_seq();
@@ -573,12 +603,18 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
 
   std::set<mds_metric_t> new_types;
   for (const auto &i : new_health) {
+    if (i.type == MDS_HEALTH_DUMMY) {
+      continue;
+    }
     new_types.insert(i.type);
   }
 
   for (const auto &new_metric: new_health) {
+    if (new_metric.type == MDS_HEALTH_DUMMY) {
+      continue;
+    }
     if (old_types.count(new_metric.type) == 0) {
-      dout(10) << "MDS health message (" << m->get_orig_source_inst().name
+      dout(10) << "MDS health message (" << m->get_orig_source()
               << "): " << new_metric.sev << " " << new_metric.message << dendl;
     }
   }
@@ -586,33 +622,32 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
   // Log the disappearance of health messages at INFO
   for (const auto &old_metric : old_health) {
     if (new_types.count(old_metric.type) == 0) {
-      mon->clog->info() << "MDS health message cleared ("
-        << m->get_orig_source_inst().name << "): " << old_metric.message;
+      mon.clog->info() << "MDS health message cleared ("
+        << m->get_orig_source() << "): " << old_metric.message;
     }
   }
 
   // Store health
   pending_daemon_health[gid] = m->get_health();
 
-  // boot?
+  const auto& cs = m->get_compat();
   if (state == MDSMap::STATE_BOOT) {
     // zap previous instance of this name?
-    if (g_conf->mds_enforce_unique_name) {
+    if (g_conf()->mds_enforce_unique_name) {
       bool failed_mds = false;
       while (mds_gid_t existing = pending.find_mds_gid_by_name(m->get_name())) {
-        if (!mon->osdmon()->is_writeable()) {
-          mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
+        if (!mon.osdmon()->is_writeable()) {
+          mon.osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
           return false;
         }
-        const MDSMap::mds_info_t &existing_info =
-          pending.get_info_gid(existing);
-        mon->clog->info() << existing_info.human_name() << " restarted";
+        const auto& existing_info = pending.get_info_gid(existing);
+        mon.clog->info() << existing_info.human_name() << " restarted";
        fail_mds_gid(pending, existing);
         failed_mds = true;
       }
       if (failed_mds) {
-        assert(mon->osdmon()->is_writeable());
-        request_proposal(mon->osdmon());
+        ceph_assert(mon.osdmon()->is_writeable());
+        request_proposal(mon.osdmon());
       }
     }
 
@@ -621,31 +656,20 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
       MDSMap::mds_info_t new_info;
       new_info.global_id = gid;
       new_info.name = m->get_name();
-      new_info.addr = addr;
+      new_info.addrs = addrs;
       new_info.mds_features = m->get_mds_features();
       new_info.state = MDSMap::STATE_STANDBY;
       new_info.state_seq = seq;
-      new_info.standby_for_rank = m->get_standby_for_rank();
-      new_info.standby_for_name = m->get_standby_for_name();
-      new_info.standby_for_fscid = m->get_standby_for_fscid();
-      new_info.standby_replay = m->get_standby_replay();
-      pending.insert(new_info);
-    }
-
-    // Resolve standby_for_name to a rank
-    const MDSMap::mds_info_t &info = pending.get_info_gid(gid);
-    if (!info.standby_for_name.empty()) {
-      const MDSMap::mds_info_t *leaderinfo = pending.find_by_name(
-          info.standby_for_name);
-      if (leaderinfo && (leaderinfo->rank >= 0)) {
-        const auto &fscid = pending.mds_roles.at(leaderinfo->global_id);
-
-        pending.modify_daemon(gid, [fscid, leaderinfo](
-              MDSMap::mds_info_t *info) {
-            info->standby_for_rank = leaderinfo->rank;
-            info->standby_for_fscid = fscid;
-        });
+      new_info.compat = cs;
+      if (m->get_fs().size()) {
+       fs_cluster_id_t fscid = FS_CLUSTER_ID_NONE;
+       auto f = pending.get_filesystem(m->get_fs());
+       if (f) {
+         fscid = f->fscid;
+       }
+        new_info.join_fscid = fscid;
       }
+      pending.insert(new_info);
     }
 
     // initialize the beacon timer
@@ -653,15 +677,6 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
     beacon.stamp = mono_clock::now();
     beacon.seq = seq;
 
-    // new incompat?
-    if (!pending.compat.writeable(m->get_compat())) {
-      dout(10) << " fsmap " << pending.compat
-               << " can't write to new mds' " << m->get_compat()
-              << ", updating fsmap and killing old mds's"
-              << dendl;
-      pending.update_compat(m->get_compat());
-    }
-
     update_metadata(m->get_global_id(), m->get_sys_info());
   } else {
     // state update
@@ -675,148 +690,138 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
        * know which FS it was part of. Nor does this matter. Sending an empty
        * MDSMap is sufficient for getting the MDS to respawn.
        */
-      wait_for_finished_proposal(op, new FunctionContext([op, this](int r){
-        if (r >= 0) {
-          const auto& fsmap = get_fsmap();
-          MDSMap null_map;
-          null_map.epoch = fsmap.epoch;
-          null_map.compat = fsmap.compat;
-          mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &null_map));
-        } else {
-          dispatch(op);        // try again
-        }
-      }));
+      goto null;
+    }
+
+    const auto& info = pending.get_info_gid(gid);
+
+    // did the reported compat change? That's illegal!
+    if (cs.compare(info.compat) != 0) {
+      if (!mon.osdmon()->is_writeable()) {
+        mon.osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
+        return false;
+      }
+      mon.clog->warn() << info.human_name() << " compat changed unexpectedly";
+      fail_mds_gid(pending, gid);
+      request_proposal(mon.osdmon());
       return true;
     }
 
-    const MDSMap::mds_info_t &info = pending.get_info_gid(gid);
-    // Old MDS daemons don't mention that they're standby replay until
-    // after they've sent their boot beacon, so update this field.
-    if (info.standby_replay != m->get_standby_replay()) {
-      pending.modify_daemon(info.global_id, [&m](
-            MDSMap::mds_info_t *i)
-        {
-          i->standby_replay = m->get_standby_replay();
-        });
+    if (state == MDSMap::STATE_DNE) {
+      dout(1) << __func__ << ": DNE from " << info << dendl;
+      goto evict;
     }
 
-    if (info.state == MDSMap::STATE_STOPPING &&
-        state != MDSMap::STATE_STOPPING &&
-        state != MDSMap::STATE_STOPPED) {
-      // we can't transition to any other states from STOPPING
-      dout(0) << "got beacon for MDS in STATE_STOPPING, ignoring requested state change"
-              << dendl;
-      _note_beacon(m);
-      return true;
+    // legal state change?
+    if ((info.state == MDSMap::STATE_STANDBY && state != info.state) ||
+        (info.state == MDSMap::STATE_STANDBY_REPLAY && state != info.state && state != MDSMap::STATE_DAMAGED)) {
+      // Standby daemons should never modify their own state.
+      // Except that standby-replay can indicate the rank is damaged due to failure to replay.
+      // Reject any attempts to do so.
+      derr << "standby " << gid << " attempted to change state to "
+           << ceph_mds_state_name(state) << ", rejecting" << dendl;
+      goto evict;
+    } else if (info.state != MDSMap::STATE_STANDBY && state != info.state &&
+               !MDSMap::state_transition_valid(info.state, state)) {
+      // Validate state transitions for daemons that hold a rank
+      derr << "daemon " << gid << " (rank " << info.rank << ") "
+           << "reported invalid state transition "
+           << ceph_mds_state_name(info.state) << " -> "
+           << ceph_mds_state_name(state) << dendl;
+      goto evict;
     }
 
     if (info.laggy()) {
-      dout(1) << "prepare_beacon clearing laggy flag on " << addr << dendl;
-      pending.modify_daemon(info.global_id, [](MDSMap::mds_info_t *info)
+      dout(1) << "prepare_beacon clearing laggy flag on " << addrs << dendl;
+      pending.modify_daemon(info.global_id, [](auto& info)
         {
-          info->clear_laggy();
+          info.clear_laggy();
         }
       );
     }
-  
+
     dout(5)  << "prepare_beacon mds." << info.rank
             << " " << ceph_mds_state_name(info.state)
             << " -> " << ceph_mds_state_name(state)
-            << "  standby_for_rank=" << m->get_standby_for_rank()
             << dendl;
+
+    fs_cluster_id_t fscid = FS_CLUSTER_ID_NONE;
+    if (m->get_fs().size()) {
+      auto f = pending.get_filesystem(m->get_fs());
+      if (f) {
+        fscid = f->fscid;
+      }
+    }
+    pending.modify_daemon(gid, [fscid](auto& info) {
+      info.join_fscid = fscid;
+    });
+
     if (state == MDSMap::STATE_STOPPED) {
       const auto fscid = pending.mds_roles.at(gid);
       const auto &fs = pending.get_filesystem(fscid);
 
-      mon->clog->info() << info.human_name() << " finished "
-                        << "deactivating rank " << info.rank << " in filesystem "
+      mon.clog->info() << info.human_name() << " finished "
+                        << "stopping rank " << info.rank << " in filesystem "
                         << fs->mds_map.fs_name << " (now has "
                         << fs->mds_map.get_num_in_mds() - 1 << " ranks)";
 
       auto erased = pending.stop(gid);
       erased.push_back(gid);
 
-      for (const auto &erased_gid : erased) {
+      for (const autoerased_gid : erased) {
         last_beacon.erase(erased_gid);
         if (pending_daemon_health.count(erased_gid)) {
           pending_daemon_health.erase(erased_gid);
           pending_daemon_health_rm.insert(erased_gid);
         }
       }
-
-
     } else if (state == MDSMap::STATE_DAMAGED) {
-      if (!mon->osdmon()->is_writeable()) {
+      if (!mon.osdmon()->is_writeable()) {
         dout(1) << __func__ << ": DAMAGED from rank " << info.rank
-                << " waiting for osdmon writeable to blacklist it" << dendl;
-        mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
+                << " waiting for osdmon writeable to blocklist it" << dendl;
+        mon.osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
         return false;
       }
 
+      auto rank = info.rank;
+
       // Record this MDS rank as damaged, so that other daemons
       // won't try to run it.
-      dout(0) << __func__ << ": marking rank "
-              << info.rank << " damaged" << dendl;
+      dout(0) << __func__ << ": marking rank " << rank << " damaged" << dendl;
+
+      auto fs = pending.get_filesystem(gid);
+      auto rankgid = fs->mds_map.get_gid(rank);
+      auto rankinfo = pending.get_info_gid(rankgid);
+      auto followergid = fs->mds_map.get_standby_replay(rank);
+
+      ceph_assert(gid == rankgid || gid == followergid);
 
       utime_t until = ceph_clock_now();
-      until += g_conf->get_val<double>("mon_mds_blacklist_interval");
-      const auto blacklist_epoch = mon->osdmon()->blacklist(info.addr, until);
-      request_proposal(mon->osdmon());
-      pending.damaged(gid, blacklist_epoch);
-      last_beacon.erase(gid);
-
-      // Respond to MDS, so that it knows it can continue to shut down
-      mon->send_reply(op,
-                     new MMDSBeacon(
-                       mon->monmap->fsid, m->get_global_id(),
-                       m->get_name(), pending.get_epoch(), state, seq,
-                       CEPH_FEATURES_SUPPORTED_DEFAULT));
-    } else if (state == MDSMap::STATE_DNE) {
-      if (!mon->osdmon()->is_writeable()) {
-        dout(1) << __func__ << ": DNE from rank " << info.rank
-                << " waiting for osdmon writeable to blacklist it" << dendl;
-        mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
-        return false;
+      until += g_conf().get_val<double>("mon_mds_blocklist_interval");
+      const auto blocklist_epoch = mon.osdmon()->blocklist(rankinfo.addrs, until);
+      if (followergid != MDS_GID_NONE) {
+        fail_mds_gid(pending, followergid);
+        last_beacon.erase(followergid);
       }
+      request_proposal(mon.osdmon());
+      pending.damaged(rankgid, blocklist_epoch);
+      last_beacon.erase(rankgid);
 
-      fail_mds_gid(pending, gid);
-      assert(mon->osdmon()->is_writeable());
-      request_proposal(mon->osdmon());
-
-      // Respond to MDS, so that it knows it can continue to shut down
-      mon->send_reply(op,
-                     new MMDSBeacon(
-                       mon->monmap->fsid, m->get_global_id(),
-                       m->get_name(), pending.get_epoch(), state, seq,
-                       CEPH_FEATURES_SUPPORTED_DEFAULT));
-    } else if (info.state == MDSMap::STATE_STANDBY && state != info.state) {
-      // Standby daemons should never modify their own
-      // state.  Reject any attempts to do so.
-      derr << "standby " << gid << " attempted to change state to "
-           << ceph_mds_state_name(state) << ", rejecting" << dendl;
-      return true;
-    } else if (info.state != MDSMap::STATE_STANDBY && state != info.state &&
-               !MDSMap::state_transition_valid(info.state, state)) {
-      // Validate state transitions for daemons that hold a rank
-      derr << "daemon " << gid << " (rank " << info.rank << ") "
-           << "reported invalid state transition "
-           << ceph_mds_state_name(info.state) << " -> "
-           << ceph_mds_state_name(state) << dendl;
-      return true;
+      /* MDS expects beacon reply back */
     } else {
       if (info.state != MDSMap::STATE_ACTIVE && state == MDSMap::STATE_ACTIVE) {
         const auto &fscid = pending.mds_roles.at(gid);
         const auto &fs = pending.get_filesystem(fscid);
-        mon->clog->info() << info.human_name() << " is now active in "
+        mon.clog->info() << info.human_name() << " is now active in "
                           << "filesystem " << fs->mds_map.fs_name << " as rank "
                           << info.rank;
       }
 
       // Made it through special cases and validations, record the
       // daemon's reported state to the FSMap.
-      pending.modify_daemon(gid, [state, seq](MDSMap::mds_info_t *info) {
-        info->state = state;
-        info->state_seq = seq;
+      pending.modify_daemon(gid, [state, seq](auto& info) {
+        info.state = state;
+        info.state_seq = seq;
       });
     }
   }
@@ -824,11 +829,37 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
   dout(5) << "prepare_beacon pending map now:" << dendl;
   print_map(pending);
   
-  wait_for_finished_proposal(op, new FunctionContext([op, this](int r){
+  wait_for_finished_proposal(op, new LambdaContext([op, this](int r){
     if (r >= 0)
       _updated(op);   // success
     else if (r == -ECANCELED) {
-      mon->no_reply(op);
+      mon.no_reply(op);
+    } else {
+      dispatch(op);        // try again
+    }
+  }));
+
+  return true;
+
+evict:
+  if (!mon.osdmon()->is_writeable()) {
+    dout(1) << __func__ << ": waiting for writeable OSDMap to evict" << dendl;
+    mon.osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
+    return false;
+  }
+
+  fail_mds_gid(pending, gid);
+  request_proposal(mon.osdmon());
+  dout(5) << __func__ << ": pending map now:" << dendl;
+  print_map(pending);
+
+  goto null;
+
+null:
+  wait_for_finished_proposal(op, new LambdaContext([op, this](int r){
+    if (r >= 0) {
+      auto m = make_message<MMDSMap>(mon.monmap->fsid, MDSMap::create_null_mdsmap());
+      mon.send_reply(op, m.detach());
     } else {
       dispatch(op);        // try again
     }
@@ -840,18 +871,20 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
 bool MDSMonitor::prepare_offload_targets(MonOpRequestRef op)
 {
   auto &pending = get_pending_fsmap_writeable();
+  bool propose = false;
 
   op->mark_mdsmon_event(__func__);
-  MMDSLoadTargets *m = static_cast<MMDSLoadTargets*>(op->get_req());
+  auto m = op->get_req<MMDSLoadTargets>();
   mds_gid_t gid = m->global_id;
   if (pending.gid_has_rank(gid)) {
     dout(10) << "prepare_offload_targets " << gid << " " << m->targets << dendl;
     pending.update_export_targets(gid, m->targets);
+    propose = true;
   } else {
     dout(10) << "prepare_offload_targets " << gid << " not in map" << dendl;
   }
-  mon->no_reply(op);
-  return true;
+  mon.no_reply(op);
+  return propose;
 }
 
 bool MDSMonitor::should_propose(double& delay)
@@ -864,89 +897,30 @@ void MDSMonitor::_updated(MonOpRequestRef op)
 {
   const auto &fsmap = get_fsmap();
   op->mark_mdsmon_event(__func__);
-  MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
+  auto m = op->get_req<MMDSBeacon>();
   dout(10) << "_updated " << m->get_orig_source() << " " << *m << dendl;
-  mon->clog->debug() << m->get_orig_source_inst() << " "
-         << ceph_mds_state_name(m->get_state());
+  mon.clog->debug() << m->get_orig_source() << " "
+                    << m->get_orig_source_addrs() << " "
+                    << ceph_mds_state_name(m->get_state());
 
   if (m->get_state() == MDSMap::STATE_STOPPED) {
     // send the map manually (they're out of the map, so they won't get it automatic)
-    MDSMap null_map;
-    null_map.epoch = fsmap.epoch;
-    null_map.compat = fsmap.compat;
-    mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &null_map));
+    auto m = make_message<MMDSMap>(mon.monmap->fsid, MDSMap::create_null_mdsmap());
+    mon.send_reply(op, m.detach());
   } else {
-    mon->send_reply(op, new MMDSBeacon(mon->monmap->fsid,
-                                      m->get_global_id(),
-                                      m->get_name(),
-                                      fsmap.get_epoch(),
-                                      m->get_state(),
-                                      m->get_seq(),
-                                      CEPH_FEATURES_SUPPORTED_DEFAULT));
+    auto beacon = make_message<MMDSBeacon>(mon.monmap->fsid,
+        m->get_global_id(), m->get_name(), fsmap.get_epoch(),
+        m->get_state(), m->get_seq(), CEPH_FEATURES_SUPPORTED_DEFAULT);
+    mon.send_reply(op, beacon.detach());
   }
 }
 
 void MDSMonitor::on_active()
 {
   tick();
-  update_logger();
 
   if (is_leader()) {
-    mon->clog->debug() << "fsmap " << get_fsmap();
-  }
-}
-
-void MDSMonitor::get_health(list<pair<health_status_t, string> >& summary,
-                           list<pair<health_status_t, string> > *detail,
-                           CephContext* cct) const
-{
-  const auto &fsmap = get_fsmap();
-
-  fsmap.get_health(summary, detail);
-
-  // For each MDS GID...
-  const auto &info_map = fsmap.get_mds_info();
-  for (const auto &i : info_map) {
-    const auto &gid = i.first;
-    const auto &info = i.second;
-
-    // Decode MDSHealth
-    bufferlist bl;
-    mon->store->get(MDS_HEALTH_PREFIX, stringify(gid), bl);
-    if (!bl.length()) {
-      derr << "Missing health data for MDS " << gid << dendl;
-      continue;
-    }
-    MDSHealth health;
-    bufferlist::iterator bl_i = bl.begin();
-    health.decode(bl_i);
-
-    for (const auto &metric : health.metrics) {
-      const int rank = info.rank;
-      std::ostringstream message;
-      message << "mds" << rank << ": " << metric.message;
-      summary.push_back(std::make_pair(metric.sev, message.str()));
-
-      if (detail) {
-        // There is no way for us to clealy associate detail entries with summary entries (#7192), so
-        // we duplicate the summary message in the detail string and tag the metadata on.
-        std::ostringstream detail_message;
-        detail_message << message.str();
-        if (metric.metadata.size()) {
-          detail_message << "(";
-          auto k = metric.metadata.begin();
-          while (k != metric.metadata.end()) {
-            detail_message << k->first << ": " << k->second;
-            if (boost::next(k) != metric.metadata.end()) {
-              detail_message << ", ";
-            }
-            ++k;
-          }
-          detail_message << ")";
-        }
-        detail->push_back(std::make_pair(metric.sev, detail_message.str()));
-      }
-    }
+    mon.clog->debug() << "fsmap " << get_fsmap();
   }
 }
 
@@ -963,33 +937,35 @@ void MDSMonitor::dump_info(Formatter *f)
 bool MDSMonitor::preprocess_command(MonOpRequestRef op)
 {
   op->mark_mdsmon_event(__func__);
-  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+  auto m = op->get_req<MMonCommand>();
   int r = -1;
   bufferlist rdata;
   stringstream ss, ds;
 
-  map<string, cmd_vartype> cmdmap;
-  const auto &fsmap = get_fsmap();
-
+  cmdmap_t cmdmap;
   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
     // ss has reason for failure
     string rs = ss.str();
-    mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
+    mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed());
     return true;
   }
 
   string prefix;
-  cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
-  string format;
-  cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
+  cmd_getval(cmdmap, "prefix", prefix);
+  string format = cmd_getval_or<string>(cmdmap, "format", "plain");
   std::unique_ptr<Formatter> f(Formatter::create(format));
 
-  MonSession *session = m->get_session();
+  MonSession *session = op->get_session();
   if (!session) {
-    mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
+    mon.reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
     return true;
   }
 
+  // to use const qualifier filter fsmap beforehand
+  FSMap _fsmap_copy = get_fsmap();
+  _fsmap_copy.filter(session->get_allowed_fs_names());
+  const auto& fsmap = _fsmap_copy;
+
   if (prefix == "mds stat") {
     if (f) {
       f->open_object_section("mds_stat");
@@ -1000,56 +976,64 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       ds << fsmap;
     }
     r = 0;
-  } else if (prefix == "mds dump") {
-    int64_t epocharg;
-    epoch_t epoch;
-
-    const FSMap *fsmapp = &get_fsmap();
-    FSMap dummy;
-    if (cmd_getval(g_ceph_context, cmdmap, "epoch", epocharg)) {
-      epoch = epocharg;
-      bufferlist b;
-      int err = get_version(epoch, b);
-      if (err == -ENOENT) {
-       r = -ENOENT;
-        goto out;
+  } else if (prefix == "mds ok-to-stop") {
+    vector<string> ids;
+    if (!cmd_getval(cmdmap, "ids", ids)) {
+      r = -EINVAL;
+      ss << "must specify mds id";
+      goto out;
+    }
+    if (fsmap.is_any_degraded()) {
+      ss << "one or more filesystems is currently degraded";
+      r = -EBUSY;
+      goto out;
+    }
+    set<mds_gid_t> stopping;
+    for (auto& id : ids) {
+      ostringstream ess;
+      mds_gid_t gid = gid_from_arg(fsmap, id, ess);
+      if (gid == MDS_GID_NONE) {
+       // the mds doesn't exist, but no file systems are unhappy, so losing it
+       // can't have any effect.
+       continue;
+      }
+      stopping.insert(gid);
+    }
+    set<mds_gid_t> active;
+    set<mds_gid_t> standby;
+    for (auto gid : stopping) {
+      if (fsmap.gid_has_rank(gid)) {
+       // ignore standby-replay daemons (at this level)
+       if (!fsmap.is_standby_replay(gid)) {
+         auto standby = fsmap.get_standby_replay(gid);
+         if (standby == MDS_GID_NONE ||
+             stopping.count(standby)) {
+           // no standby-replay, or we're also stopping the standby-replay
+           // for this mds
+           active.insert(gid);
+         }
+       }
       } else {
-       assert(err == 0);
-       assert(b.length());
-        dummy.decode(b);
-        fsmapp = &dummy;
+       // net loss of a standby
+       standby.insert(gid);
       }
     }
-
-    stringstream ds;
-    const MDSMap *mdsmapp = nullptr;
-    MDSMap blank;
-    blank.epoch = fsmapp->epoch;
-    if (fsmapp->legacy_client_fscid != FS_CLUSTER_ID_NONE) {
-      mdsmapp = &fsmapp->filesystems.at(fsmapp->legacy_client_fscid)->mds_map;
-    } else {
-      mdsmapp = &blank;
-    }
-    if (f != NULL) {
-      f->open_object_section("mdsmap");
-      mdsmapp->dump(f.get());
-      f->close_section();
-      f->flush(ds);
-      r = 0;
-    } else {
-      mdsmapp->print(ds);
-      r = 0;
+    if (fsmap.get_num_standby() - standby.size() < active.size()) {
+      r = -EBUSY;
+      ss << "insufficent standby MDS daemons to stop active gids "
+        << stringify(active)
+        << " and/or standby gids " << stringify(standby);;
+      goto out;
     }
-
-    rdata.append(ds);
-    ss << "dumped fsmap epoch " << fsmapp->get_epoch();
+    r = 0;
+    ss << "should be safe to stop " << ids;
   } else if (prefix == "fs dump") {
     int64_t epocharg;
     epoch_t epoch;
 
     const FSMap *fsmapp = &fsmap;
     FSMap dummy;
-    if (cmd_getval(g_ceph_context, cmdmap, "epoch", epocharg)) {
+    if (cmd_getval(cmdmap, "epoch", epocharg)) {
       epoch = epocharg;
       bufferlist b;
       int err = get_version(epoch, b);
@@ -1057,8 +1041,8 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
        r = -ENOENT;
         goto out;
       } else {
-       assert(err == 0);
-       assert(b.length());
+       ceph_assert(err == 0);
+       ceph_assert(b.length());
        dummy.decode(b);
         fsmapp = &dummy;
       }
@@ -1083,7 +1067,7 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       f.reset(Formatter::create("json-pretty"));
 
     string who;
-    bool all = !cmd_getval(g_ceph_context, cmdmap, "who", who);
+    bool all = !cmd_getval(cmdmap, "who", who);
     dout(1) << "all = " << all << dendl;
     if (all) {
       r = 0;
@@ -1129,46 +1113,42 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
     if (!f)
       f.reset(Formatter::create("json-pretty"));
     string field;
-    cmd_getval(g_ceph_context, cmdmap, "property", field);
+    cmd_getval(cmdmap, "property", field);
     count_metadata(field, f.get());
     f->flush(ds);
     r = 0;
-  } else if (prefix == "mds getmap") {
-    epoch_t e;
-    int64_t epocharg;
-    bufferlist b;
-    if (cmd_getval(g_ceph_context, cmdmap, "epoch", epocharg)) {
-      e = epocharg;
-      int err = get_version(e, b);
-      if (err == -ENOENT) {
-       r = -ENOENT;
-      } else {
-       assert(err == 0);
-       assert(b.length());
-       FSMap mm;
-       mm.decode(b);
-       mm.encode(rdata, m->get_connection()->get_features());
-       ss << "got fsmap epoch " << mm.get_epoch();
-       r = 0;
-      }
+  } else if (prefix == "fs compat show") {
+    string fs_name;
+    cmd_getval(cmdmap, "fs_name", fs_name);
+    const auto &fs = fsmap.get_filesystem(fs_name);
+    if (fs == nullptr) {
+      ss << "filesystem '" << fs_name << "' not found";
+      r = -ENOENT;
+      goto out;
+    }
+
+    if (f) {
+      f->open_object_section("mds_compat");
+      fs->mds_map.compat.dump(f.get());
+      f->close_section();
+      f->flush(ds);
     } else {
-      fsmap.encode(rdata, m->get_connection()->get_features());
-      ss << "got fsmap epoch " << fsmap.get_epoch();
-      r = 0;
+      ds << fs->mds_map.compat;
     }
+    r = 0;
   } else if (prefix == "mds compat show") {
       if (f) {
        f->open_object_section("mds_compat");
-       fsmap.compat.dump(f.get());
+       fsmap.default_compat.dump(f.get());
        f->close_section();
        f->flush(ds);
       } else {
-       ds << fsmap.compat;
+       ds << fsmap.default_compat;
       }
       r = 0;
   } else if (prefix == "fs get") {
     string fs_name;
-    cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
+    cmd_getval(cmdmap, "fs_name", fs_name);
     const auto &fs = fsmap.get_filesystem(fs_name);
     if (fs == nullptr) {
       ss << "filesystem '" << fs_name << "' not found";
@@ -1196,7 +1176,7 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
           f->dump_string("name", mds_map.fs_name);
           /* Output both the names and IDs of pools, for use by
            * humans and machines respectively */
-          f->dump_string("metadata_pool", mon->osdmon()->osdmap.get_pool_name(
+          f->dump_string("metadata_pool", mon.osdmon()->osdmap.get_pool_name(
                 mds_map.metadata_pool));
           f->dump_int("metadata_pool_id", mds_map.metadata_pool);
           f->open_array_section("data_pool_ids");
@@ -1207,7 +1187,7 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
 
           f->open_array_section("data_pools");
           for (const auto &id : mds_map.data_pools) {
-            const auto &name = mon->osdmon()->osdmap.get_pool_name(id);
+            const auto &name = mon.osdmon()->osdmap.get_pool_name(id);
             f->dump_string("data_pool", name);
           }
           f->close_section();
@@ -1220,13 +1200,13 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       for (const auto &p : fsmap.filesystems) {
         const auto &fs = p.second;
         const MDSMap &mds_map = fs->mds_map;
-        const string &md_pool_name = mon->osdmon()->osdmap.get_pool_name(
+        const string &md_pool_name = mon.osdmon()->osdmap.get_pool_name(
             mds_map.metadata_pool);
         
         ds << "name: " << mds_map.fs_name << ", metadata pool: "
            << md_pool_name << ", data pools: [";
         for (const auto &id : mds_map.data_pools) {
-          const string &pool_name = mon->osdmon()->osdmap.get_pool_name(id);
+          const string &pool_name = mon.osdmon()->osdmap.get_pool_name(id);
           ds << pool_name << " ";
         }
         ds << "]" << std::endl;
@@ -1237,6 +1217,41 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       }
     }
     r = 0;
+  } else if (prefix == "fs feature ls") {
+    if (f) {
+      f->open_array_section("cephfs_features");
+      for (size_t i = 0; i <= CEPHFS_FEATURE_MAX; ++i) {
+       f->open_object_section("feature");
+       f->dump_int("index", i);
+       f->dump_string("name", cephfs_feature_name(i));
+       f->close_section();
+      }
+      f->close_section();
+      f->flush(ds);
+    } else {
+      for (size_t i = 0; i <= CEPHFS_FEATURE_MAX; ++i) {
+        ds << i << " " << cephfs_feature_name(i) << std::endl;
+      }
+    }
+    r = 0;
+  } else if (prefix == "fs lsflags") {
+    string fs_name;
+    cmd_getval(cmdmap, "fs_name", fs_name);
+    const auto &fs = fsmap.get_filesystem(fs_name);
+    if (!fs) {
+      ss << "filesystem '" << fs_name << "' not found";
+      r = -ENOENT;
+    } else {
+      const MDSMap &mds_map = fs->mds_map;
+      if (f) {
+        mds_map.dump_flags_state(f.get());
+        f->flush(ds);
+      }
+      else {
+        mds_map.print_flags(ds);
+      }
+      r = 0;
+    }
   }
 
 out:
@@ -1244,7 +1259,7 @@ out:
     rdata.append(ds);
     string rs;
     getline(ss, rs);
-    mon->reply_command(op, r, rs, rdata, get_last_committed());
+    mon.reply_command(op, r, rs, rdata, get_last_committed());
     return true;
   } else
     return false;
@@ -1252,26 +1267,26 @@ out:
 
 bool MDSMonitor::fail_mds_gid(FSMap &fsmap, mds_gid_t gid)
 {
-  const MDSMap::mds_info_t &info = fsmap.get_info_gid(gid);
+  const auto& info = fsmap.get_info_gid(gid);
   dout(1) << "fail_mds_gid " << gid << " mds." << info.name << " role " << info.rank << dendl;
 
-  ceph_assert(mon->osdmon()->is_writeable());
+  ceph_assert(mon.osdmon()->is_writeable());
 
-  epoch_t blacklist_epoch = 0;
+  epoch_t blocklist_epoch = 0;
   if (info.rank >= 0 && info.state != MDSMap::STATE_STANDBY_REPLAY) {
     utime_t until = ceph_clock_now();
-    until += g_conf->get_val<double>("mon_mds_blacklist_interval");
-    blacklist_epoch = mon->osdmon()->blacklist(info.addr, until);
+    until += g_conf().get_val<double>("mon_mds_blocklist_interval");
+    blocklist_epoch = mon.osdmon()->blocklist(info.addrs, until);
   }
 
-  fsmap.erase(gid, blacklist_epoch);
+  fsmap.erase(gid, blocklist_epoch);
   last_beacon.erase(gid);
   if (pending_daemon_health.count(gid)) {
     pending_daemon_health.erase(gid);
     pending_daemon_health_rm.insert(gid);
   }
 
-  return blacklist_epoch != 0;
+  return blocklist_epoch != 0;
 }
 
 mds_gid_t MDSMonitor::gid_from_arg(const FSMap &fsmap, const std::string &arg, std::ostream &ss)
@@ -1283,7 +1298,7 @@ mds_gid_t MDSMonitor::gid_from_arg(const FSMap &fsmap, const std::string &arg, s
   if (r == 0) {
     // See if a GID is assigned to this role
     const auto &fs = fsmap.get_filesystem(role.fscid);
-    assert(fs != nullptr);  // parse_role ensures it exists
+    ceph_assert(fs != nullptr);  // parse_role ensures it exists
     if (fs->mds_map.is_up(role.rank)) {
       dout(10) << __func__ << ": validated rank/GID " << role
                << " as a rank" << dendl;
@@ -1323,13 +1338,13 @@ mds_gid_t MDSMonitor::gid_from_arg(const FSMap &fsmap, const std::string &arg, s
 int MDSMonitor::fail_mds(FSMap &fsmap, std::ostream &ss,
     const std::string &arg, MDSMap::mds_info_t *failed_info)
 {
-  assert(failed_info != nullptr);
+  ceph_assert(failed_info != nullptr);
 
   mds_gid_t gid = gid_from_arg(fsmap, arg, ss);
   if (gid == MDS_GID_NONE) {
     return 0;
   }
-  if (!mon->osdmon()->is_writeable()) {
+  if (!mon.osdmon()->is_writeable()) {
     return -EAGAIN;
   }
 
@@ -1339,62 +1354,69 @@ int MDSMonitor::fail_mds(FSMap &fsmap, std::ostream &ss,
 
   fail_mds_gid(fsmap, gid);
   ss << "failed mds gid " << gid;
-  assert(mon->osdmon()->is_writeable());
-  request_proposal(mon->osdmon());
+  ceph_assert(mon.osdmon()->is_writeable());
+  request_proposal(mon.osdmon());
   return 0;
 }
 
 bool MDSMonitor::prepare_command(MonOpRequestRef op)
 {
   op->mark_mdsmon_event(__func__);
-  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+  auto m = op->get_req<MMonCommand>();
   int r = -EINVAL;
   stringstream ss;
   bufferlist rdata;
 
-  map<string, cmd_vartype> cmdmap;
+  cmdmap_t cmdmap;
   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
     string rs = ss.str();
-    mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
-    return true;
+    mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed());
+    return false;
   }
 
   string prefix;
-  cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
+  cmd_getval(cmdmap, "prefix", prefix);
 
   /* Refuse access if message not associated with a valid session */
-  MonSession *session = m->get_session();
+  MonSession *session = op->get_session();
   if (!session) {
-    mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
-    return true;
+    mon.reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
+    return false;
   }
 
   auto &pending = get_pending_fsmap_writeable();
 
   bool batched_propose = false;
   for (const auto &h : handlers) {
-    if (h->can_handle(prefix)) {
-      batched_propose = h->batched_propose();
-      if (batched_propose) {
-        paxos->plug();
-      }
-      r = h->handle(mon, pending, op, cmdmap, ss);
-      if (batched_propose) {
-        paxos->unplug();
-      }
+    r = h->can_handle(prefix, op, pending, cmdmap, ss);
+    if (r == 1) {
+      ; // pass, since we got the right handler.
+    } else if (r == 0) {
+      continue;
+    } else {
+      goto out;
+    }
 
-      if (r == -EAGAIN) {
-        // message has been enqueued for retry; return.
-        dout(4) << __func__ << " enqueue for retry by prepare_command" << dendl;
-        return false;
-      } else {
-        if (r == 0) {
-          // On successful updates, print the updated map
-          print_map(pending);
-        }
-        // Successful or not, we're done: respond.
-        goto out;
+    batched_propose = h->batched_propose();
+    if (batched_propose) {
+      paxos.plug();
+    }
+    r = h->handle(&mon, pending, op, cmdmap, ss);
+    if (batched_propose) {
+      paxos.unplug();
+    }
+
+    if (r == -EAGAIN) {
+      // message has been enqueued for retry; return.
+      dout(4) << __func__ << " enqueue for retry by prepare_command" << dendl;
+      return false;
+    } else {
+      if (r == 0) {
+       // On successful updates, print the updated map
+       print_map(pending);
       }
+      // Successful or not, we're done: respond.
+      goto out;
     }
   }
 
@@ -1409,19 +1431,6 @@ bool MDSMonitor::prepare_command(MonOpRequestRef op)
     goto out;
   }
 
-  // Only handle legacy commands if there is a filesystem configured
-  if (pending.legacy_client_fscid == FS_CLUSTER_ID_NONE) {
-    if (pending.filesystems.size() == 0) {
-      ss << "No filesystem configured: use `ceph fs new` to create a filesystem";
-    } else {
-      ss << "No filesystem set for use with legacy commands";
-    }
-    r = -EINVAL;
-    goto out;
-  }
-
-  r = legacy_filesystem_command(pending, op, prefix, cmdmap, ss);
-
   if (r == -ENOSYS && ss.str().empty()) {
     ss << "unrecognized command";
   }
@@ -1442,7 +1451,7 @@ out:
     return true;
   } else {
     // reply immediately
-    mon->reply_command(op, r, rs, rdata, get_last_committed());
+    mon.reply_command(op, r, rs, rdata, get_last_committed());
     return false;
   }
 }
@@ -1451,67 +1460,31 @@ int MDSMonitor::filesystem_command(
     FSMap &fsmap,
     MonOpRequestRef op,
     std::string const &prefix,
-    map<string, cmd_vartype> &cmdmap,
+    const cmdmap_t& cmdmap,
     std::stringstream &ss)
 {
   dout(4) << __func__ << " prefix='" << prefix << "'" << dendl;
   op->mark_mdsmon_event(__func__);
   int r = 0;
   string whostr;
-  cmd_getval(g_ceph_context, cmdmap, "who", whostr);
-
-  if (prefix == "mds stop" ||
-      prefix == "mds deactivate") {
-    mds_role_t role;
-    r = fsmap.parse_role(whostr, &role, ss);
-    if (r < 0 ) {
-      return r;
-    }
-    const auto &fs = fsmap.get_filesystem(role.fscid);
-
-    if (!fs->mds_map.is_active(role.rank)) {
-      r = -EEXIST;
-      ss << "mds." << role << " not active (" 
-        << ceph_mds_state_name(fs->mds_map.get_state(role.rank)) << ")";
-    } else if (fs->mds_map.get_root() == role.rank ||
-               fs->mds_map.get_tableserver() == role.rank) {
-      r = -EINVAL;
-      ss << "can't tell the root (" << fs->mds_map.get_root()
-        << ") or tableserver (" << fs->mds_map.get_tableserver()
-        << ") to deactivate";
-    } else if (role.rank != fs->mds_map.get_last_in_mds()) {
-      r = -EINVAL;
-      ss << "mds." << role << " doesn't have the max rank ("
-        << fs->mds_map.get_last_in_mds() << ")";
-    } else if (fs->mds_map.get_num_in_mds() <= size_t(fs->mds_map.get_max_mds())) {
-      r = -EBUSY;
-      ss << "must decrease max_mds or else MDS will immediately reactivate";
-    } else {
-      r = 0;
-      mds_gid_t gid = fs->mds_map.up.at(role.rank);
-      ss << "telling mds." << role << " "
-         << fsmap.get_info_gid(gid).addr << " to deactivate";
+  cmd_getval(cmdmap, "role", whostr);
 
-      fsmap.modify_daemon(gid, [](MDSMap::mds_info_t *info) {
-        info->state = MDSMap::STATE_STOPPING;
-      });
-    }
-  } else if (prefix == "mds set_state") {
+  if (prefix == "mds set_state") {
     mds_gid_t gid;
-    if (!cmd_getval(g_ceph_context, cmdmap, "gid", gid)) {
+    if (!cmd_getval(cmdmap, "gid", gid)) {
       ss << "error parsing 'gid' value '"
-         << cmd_vartype_stringify(cmdmap["gid"]) << "'";
+         << cmd_vartype_stringify(cmdmap.at("gid")) << "'";
       return -EINVAL;
     }
     MDSMap::DaemonState state;
-    if (!cmd_getval(g_ceph_context, cmdmap, "state", state)) {
+    if (!cmd_getval(cmdmap, "state", state)) {
       ss << "error parsing 'state' string value '"
-         << cmd_vartype_stringify(cmdmap["state"]) << "'";
+         << cmd_vartype_stringify(cmdmap.at("state")) << "'";
       return -EINVAL;
     }
-    if (fsmap.gid_exists(gid)) {
-      fsmap.modify_daemon(gid, [state](MDSMap::mds_info_t *info) {
-        info->state = state;
+    if (fsmap.gid_exists(gid, op->get_session()->get_allowed_fs_names())) {
+      fsmap.modify_daemon(gid, [state](auto& info) {
+        info.state = state;
       });
       ss << "set mds gid " << gid << " to state " << state << " "
          << ceph_mds_state_name(state);
@@ -1519,60 +1492,87 @@ int MDSMonitor::filesystem_command(
     }
   } else if (prefix == "mds fail") {
     string who;
-    cmd_getval(g_ceph_context, cmdmap, "who", who);
+    cmd_getval(cmdmap, "role_or_gid", who);
 
     MDSMap::mds_info_t failed_info;
+    mds_gid_t gid = gid_from_arg(fsmap, who, ss);
+    if (gid == MDS_GID_NONE) {
+      ss << "MDS named '" << who << "' does not exist, is not up or you "
+        << "lack the permission to see.";
+      return 0;
+    }
+    if(!fsmap.gid_exists(gid, op->get_session()->get_allowed_fs_names())) {
+      ss << "MDS named '" << who << "' does not exist, is not up or you "
+        << "lack the permission to see.";
+      return -EINVAL;
+    }
+    string_view fs_name = fsmap.fs_name_from_gid(gid);
+    if (!op->get_session()->fs_name_capable(fs_name, MON_CAP_W)) {
+      ss << "Permission denied.";
+      return -EPERM;
+    }
+
     r = fail_mds(fsmap, ss, who, &failed_info);
     if (r < 0 && r == -EAGAIN) {
-      mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
+      mon.osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
       return -EAGAIN; // don't propose yet; wait for message to be retried
     } else if (r == 0) {
       // Only log if we really did something (not when was already gone)
       if (failed_info.global_id != MDS_GID_NONE) {
-        mon->clog->info() << failed_info.human_name() << " marked failed by "
+        mon.clog->info() << failed_info.human_name() << " marked failed by "
                           << op->get_session()->entity_name;
       }
     }
   } else if (prefix == "mds rm") {
     mds_gid_t gid;
-    if (!cmd_getval(g_ceph_context, cmdmap, "gid", gid)) {
+    if (!cmd_getval(cmdmap, "gid", gid)) {
       ss << "error parsing 'gid' value '"
-         << cmd_vartype_stringify(cmdmap["gid"]) << "'";
+         << cmd_vartype_stringify(cmdmap.at("gid")) << "'";
       return -EINVAL;
     }
-    if (!fsmap.gid_exists(gid)) {
-      ss << "mds gid " << gid << " dne";
-      r = 0;
+    if (!fsmap.gid_exists(gid, op->get_session()->get_allowed_fs_names())) {
+      ss << "mds gid " << gid << " does not exist";
+      return 0;
+    }
+    string_view fs_name = fsmap.fs_name_from_gid(gid);
+    if (!op->get_session()->fs_name_capable(fs_name, MON_CAP_W)) {
+      ss << "Permission denied.";
+      return -EPERM;
+    }
+    const auto &info = fsmap.get_info_gid(gid);
+    MDSMap::DaemonState state = info.state;
+    if (state > 0) {
+    ss << "cannot remove active mds." << info.name
+       << " rank " << info.rank;
+    return -EBUSY;
     } else {
-      const auto &info = fsmap.get_info_gid(gid);
-      MDSMap::DaemonState state = info.state;
-      if (state > 0) {
-        ss << "cannot remove active mds." << info.name
-           << " rank " << info.rank;
-        return -EBUSY;
-      } else {
-        fsmap.erase(gid, {});
-        ss << "removed mds gid " << gid;
-        return 0;
-      }
+    fsmap.erase(gid, {});
+    ss << "removed mds gid " << gid;
+    return 0;
     }
   } else if (prefix == "mds rmfailed") {
-    string confirm;
-    if (!cmd_getval(g_ceph_context, cmdmap, "confirm", confirm) ||
-       confirm != "--yes-i-really-mean-it") {
+    bool confirm = false;
+    cmd_getval(cmdmap, "yes_i_really_mean_it", confirm);
+    if (!confirm) {
          ss << "WARNING: this can make your filesystem inaccessible! "
                "Add --yes-i-really-mean-it if you are sure you wish to continue.";
          return -EPERM;
     }
     
     std::string role_str;
-    cmd_getval(g_ceph_context, cmdmap, "who", role_str);
+    cmd_getval(cmdmap, "role", role_str);
     mds_role_t role;
-    int r = fsmap.parse_role(role_str, &role, ss);
+    const auto fs_names = op->get_session()->get_allowed_fs_names();
+    int r = fsmap.parse_role(role_str, &role, ss, fs_names);
     if (r < 0) {
       ss << "invalid role '" << role_str << "'";
       return -EINVAL;
     }
+    string_view fs_name = fsmap.get_filesystem(role.fscid)->mds_map.get_fs_name();
+    if (!op->get_session()->fs_name_capable(fs_name, MON_CAP_W)) {
+      ss << "Permission denied.";
+      return -EPERM;
+    }
 
     fsmap.modify_filesystem(
         role.fscid,
@@ -1583,147 +1583,91 @@ int MDSMonitor::filesystem_command(
 
     ss << "removed failed mds." << role;
     return 0;
+    /* TODO: convert to fs commands to update defaults */
   } else if (prefix == "mds compat rm_compat") {
     int64_t f;
-    if (!cmd_getval(g_ceph_context, cmdmap, "feature", f)) {
+    if (!cmd_getval(cmdmap, "feature", f)) {
       ss << "error parsing feature value '"
-         << cmd_vartype_stringify(cmdmap["feature"]) << "'";
+         << cmd_vartype_stringify(cmdmap.at("feature")) << "'";
       return -EINVAL;
     }
-    if (fsmap.compat.compat.contains(f)) {
+    if (fsmap.default_compat.compat.contains(f)) {
       ss << "removing compat feature " << f;
-      CompatSet modified = fsmap.compat;
-      modified.compat.remove(f);
-      fsmap.update_compat(modified);
+      fsmap.default_compat.compat.remove(f);
     } else {
-      ss << "compat feature " << f << " not present in " << fsmap.compat;
+      ss << "compat feature " << f << " not present in " << fsmap.default_compat;
     }
     r = 0;
   } else if (prefix == "mds compat rm_incompat") {
     int64_t f;
-    if (!cmd_getval(g_ceph_context, cmdmap, "feature", f)) {
+    if (!cmd_getval(cmdmap, "feature", f)) {
       ss << "error parsing feature value '"
-         << cmd_vartype_stringify(cmdmap["feature"]) << "'";
+         << cmd_vartype_stringify(cmdmap.at("feature")) << "'";
       return -EINVAL;
     }
-    if (fsmap.compat.incompat.contains(f)) {
+    if (fsmap.default_compat.incompat.contains(f)) {
       ss << "removing incompat feature " << f;
-      CompatSet modified = fsmap.compat;
-      modified.incompat.remove(f);
-      fsmap.update_compat(modified);
+      fsmap.default_compat.incompat.remove(f);
     } else {
-      ss << "incompat feature " << f << " not present in " << fsmap.compat;
+      ss << "incompat feature " << f << " not present in " << fsmap.default_compat;
     }
     r = 0;
   } else if (prefix == "mds repaired") {
     std::string role_str;
-    cmd_getval(g_ceph_context, cmdmap, "rank", role_str);
+    cmd_getval(cmdmap, "role", role_str);
     mds_role_t role;
-    r = fsmap.parse_role(role_str, &role, ss);
+    const auto fs_names = op->get_session()->get_allowed_fs_names();
+    r = fsmap.parse_role(role_str, &role, ss, fs_names);
     if (r < 0) {
       return r;
     }
+    string_view fs_name = fsmap.get_filesystem(role.fscid)->mds_map.get_fs_name();
+    if (!op->get_session()->fs_name_capable(fs_name, MON_CAP_W)) {
+      ss << "Permission denied.";
+      return -EPERM;
+    }
 
     bool modified = fsmap.undamaged(role.fscid, role.rank);
     if (modified) {
-      dout(1) << "repaired: restoring rank " << role << dendl;
+      ss << "repaired: restoring rank " << role;
     } else {
-      dout(1) << "repaired: no-op on rank " << role << dendl;
+      ss << "nothing to do: rank is not damaged";
     }
 
     r = 0;
-  } else {
-    return -ENOSYS;
-  }
-
-  return r;
-}
-
-/**
- * Helper to legacy_filesystem_command
- */
-void MDSMonitor::modify_legacy_filesystem(FSMap &fsmap,
-    std::function<void(std::shared_ptr<Filesystem> )> fn)
-{
-  fsmap.modify_filesystem(
-    fsmap.legacy_client_fscid,
-    fn
-  );
-}
-
-
-
-/**
- * Handle a command that affects the filesystem (i.e. a filesystem
- * must exist for the command to act upon).
- *
- * @retval 0        Command was successfully handled and has side effects
- * @retval -EAGAIN  Messages has been requeued for retry
- * @retval -ENOSYS  Unknown command
- * @retval < 0      An error has occurred; **ss** may have been set.
- */
-int MDSMonitor::legacy_filesystem_command(
-    FSMap &fsmap,
-    MonOpRequestRef op,
-    std::string const &prefix,
-    map<string, cmd_vartype> &cmdmap,
-    std::stringstream &ss)
-{
-  dout(4) << __func__ << " prefix='" << prefix << "'" << dendl;
-  op->mark_mdsmon_event(__func__);
-  int r = 0;
-  string whostr;
-  cmd_getval(g_ceph_context, cmdmap, "who", whostr);
-
-  assert (fsmap.legacy_client_fscid != FS_CLUSTER_ID_NONE);
-
-  if (prefix == "mds set_max_mds") {
-    // NOTE: deprecated by "fs set max_mds"
-    int64_t maxmds;
-    if (!cmd_getval(g_ceph_context, cmdmap, "maxmds", maxmds) || maxmds <= 0) {
+  } else if (prefix == "mds freeze") {
+    std::string who;
+    cmd_getval(cmdmap, "role_or_gid", who);
+    mds_gid_t gid = gid_from_arg(fsmap, who, ss);
+    if (gid == MDS_GID_NONE) {
       return -EINVAL;
     }
 
-    const MDSMap& mdsmap =
-      fsmap.filesystems.at(fsmap.legacy_client_fscid)->mds_map;
-      
-    if (!mdsmap.allows_multimds() &&
-       maxmds > mdsmap.get_max_mds() &&
-       maxmds > 1) {
-      ss << "multi-MDS clusters are not enabled; set 'allow_multimds' to enable";
-      return -EINVAL;
-    }
-
-    if (maxmds > MAX_MDS) {
-      ss << "may not have more than " << MAX_MDS << " MDS ranks";
-      return -EINVAL;
+    string_view fs_name = fsmap.fs_name_from_gid(gid);
+    if (!op->get_session()->fs_name_capable(fs_name, MON_CAP_W)) {
+      ss << "Permission denied.";
+      return -EPERM;
     }
 
-    modify_legacy_filesystem(fsmap,
-        [maxmds](std::shared_ptr<Filesystem> fs)
+    bool freeze = false;
     {
-      fs->mds_map.set_max_mds(maxmds);
-    });
+      std::string str;
+      cmd_getval(cmdmap, "val", str);
+      if ((r = parse_bool(str, &freeze, ss)) != 0) {
+        return r;
+      }
+    }
 
-    r = 0;
-    ss << "max_mds = " << maxmds;
-  } else if (prefix == "mds cluster_down") {
-    // NOTE: deprecated by "fs set cluster_down"
-    modify_legacy_filesystem(fsmap,
-        [](std::shared_ptr<Filesystem> fs)
-    {
-      fs->mds_map.set_flag(CEPH_MDSMAP_DOWN);
-    });
-    ss << "marked fsmap DOWN";
-    r = 0;
-  } else if (prefix == "mds cluster_up") {
-    // NOTE: deprecated by "fs set cluster_up"
-    modify_legacy_filesystem(fsmap,
-        [](std::shared_ptr<Filesystem> fs)
-    {
-      fs->mds_map.clear_flag(CEPH_MDSMAP_DOWN);
-    });
-    ss << "unmarked fsmap DOWN";
+    auto f = [freeze,gid,&ss](auto& info) {
+      if (freeze) {
+        ss << "freezing mds." << gid;
+        info.freeze();
+      } else {
+        ss << "unfreezing mds." << gid;
+        info.unfreeze();
+      }
+    };
+    fsmap.modify_daemon(gid, f);
     r = 0;
   } else {
     return -ENOSYS;
@@ -1732,32 +1676,35 @@ int MDSMonitor::legacy_filesystem_command(
   return r;
 }
 
-
 void MDSMonitor::check_subs()
 {
-  std::list<std::string> types;
-
   // Subscriptions may be to "mdsmap" (MDS and legacy clients),
   // "mdsmap.<namespace>", or to "fsmap" for the full state of all
   // filesystems.  Build a list of all the types we service
   // subscriptions for.
-  types.push_back("fsmap");
-  types.push_back("fsmap.user");
-  types.push_back("mdsmap");
+
+  std::vector<std::string> types = {
+    "fsmap",
+    "fsmap.user",
+    "mdsmap",
+  };
+
   for (const auto &p : get_fsmap().filesystems) {
     const auto &fscid = p.first;
-    std::ostringstream oss;
-    oss << "mdsmap." << fscid;
-    types.push_back(oss.str());
+    CachedStackStringStream cos;
+    *cos << "mdsmap." << fscid;
+    types.push_back(std::string(cos->strv()));
   }
 
   for (const auto &type : types) {
-    if (mon->session_map.subs.count(type) == 0)
+    auto& subs = mon.session_map.subs;
+    auto subs_it = subs.find(type);
+    if (subs_it == subs.end())
       continue;
-    xlist<Subscription*>::iterator p = mon->session_map.subs[type]->begin();
-    while (!p.end()) {
-      Subscription *sub = *p;
-      ++p;
+    auto sub_it = subs_it->second->begin();
+    while (!sub_it.end()) {
+      auto sub = *sub_it;
+      ++sub_it; // N.B. check_sub may remove sub!
       check_sub(sub);
     }
   }
@@ -1768,47 +1715,45 @@ void MDSMonitor::check_sub(Subscription *sub)
 {
   dout(20) << __func__ << ": " << sub->type << dendl;
 
-  const auto &fsmap = get_fsmap();
+  // to use const qualifier filter fsmap beforehand
+  FSMap _fsmap_copy = get_fsmap();
+  _fsmap_copy.filter(sub->session->get_allowed_fs_names());
+  const auto& fsmap = _fsmap_copy;
+  if (sub->next > fsmap.get_epoch()) {
+    return;
+  }
 
   if (sub->type == "fsmap") {
-    if (sub->next <= fsmap.get_epoch()) {
-      sub->session->con->send_message(new MFSMap(mon->monmap->fsid, fsmap));
-      if (sub->onetime) {
-        mon->session_map.remove_sub(sub);
-      } else {
-        sub->next = fsmap.get_epoch() + 1;
-      }
+    sub->session->con->send_message(new MFSMap(mon.monmap->fsid, fsmap));
+    if (sub->onetime) {
+      mon.session_map.remove_sub(sub);
+    } else {
+      sub->next = fsmap.get_epoch() + 1;
     }
   } else if (sub->type == "fsmap.user") {
-    if (sub->next <= fsmap.get_epoch()) {
-      FSMapUser fsmap_u;
-      fsmap_u.epoch = fsmap.get_epoch();
-      fsmap_u.legacy_client_fscid = fsmap.legacy_client_fscid;
-      for (const auto &p : fsmap.filesystems) {
-       FSMapUser::fs_info_t& fs_info = fsmap_u.filesystems[p.second->fscid];
-       fs_info.cid = p.second->fscid;
-       fs_info.name = p.second->mds_map.fs_name;
-      }
-      sub->session->con->send_message(new MFSMapUser(mon->monmap->fsid, fsmap_u));
-      if (sub->onetime) {
-       mon->session_map.remove_sub(sub);
-      } else {
-       sub->next = fsmap.get_epoch() + 1;
-      }
+    FSMapUser fsmap_u;
+    fsmap_u.epoch = fsmap.get_epoch();
+    fsmap_u.legacy_client_fscid = fsmap.legacy_client_fscid;
+    for (const auto &p : fsmap.filesystems) {
+      FSMapUser::fs_info_t& fs_info = fsmap_u.filesystems[p.second->fscid];
+      fs_info.cid = p.second->fscid;
+      fs_info.name = p.second->mds_map.fs_name;
+    }
+    sub->session->con->send_message(new MFSMapUser(mon.monmap->fsid, fsmap_u));
+    if (sub->onetime) {
+      mon.session_map.remove_sub(sub);
+    } else {
+      sub->next = fsmap.get_epoch() + 1;
     }
   } else if (sub->type.compare(0, 6, "mdsmap") == 0) {
-    if (sub->next > fsmap.get_epoch()) {
-      return;
-    }
-
-    const bool is_mds = sub->session->inst.name.is_mds();
+    const bool is_mds = sub->session->name.is_mds();
     mds_gid_t mds_gid = MDS_GID_NONE;
     fs_cluster_id_t fscid = FS_CLUSTER_ID_NONE;
     if (is_mds) {
       // What (if any) namespace are you assigned to?
       auto mds_info = fsmap.get_mds_info();
       for (const auto &p : mds_info) {
-        if (p.second.addr == sub->session->inst.addr) {
+        if (p.second.addrs == sub->session->addrs) {
           mds_gid = p.first;
           fscid = fsmap.mds_roles.at(mds_gid);
         }
@@ -1816,7 +1761,7 @@ void MDSMonitor::check_sub(Subscription *sub)
     } else {
       // You're a client.  Did you request a particular
       // namespace?
-      if (sub->type.find("mdsmap.") == 0) {
+      if (sub->type.compare(0, 7, "mdsmap.") == 0) {
         auto namespace_id_str = sub->type.substr(std::string("mdsmap.").size());
         dout(10) << __func__ << ": namespace_id " << namespace_id_str << dendl;
         std::string err;
@@ -1827,15 +1772,6 @@ void MDSMonitor::check_sub(Subscription *sub)
                   << "'" << dendl;
           return;
         }
-        if (fsmap.filesystems.count(fscid) == 0) {
-          // Client asked for a non-existent namespace, send them nothing
-          // TODO: something more graceful for when a client has a filesystem
-          // mounted, and the fileysstem is deleted.  Add a "shut down you fool"
-          // flag to MMDSMap?
-          dout(1) << "Client subscribed to non-existent namespace '" <<
-                  fscid << "'" << dendl;
-          return;
-        }
       } else {
         // Unqualified request for "mdsmap": give it the one marked
         // for use by legacy clients.
@@ -1847,16 +1783,24 @@ void MDSMonitor::check_sub(Subscription *sub)
           return;
         }
       }
+      if (!fsmap.filesystem_exists(fscid)) {
+        // Client asked for a non-existent namespace, send them nothing
+        // TODO: something more graceful for when a client has a filesystem
+        // mounted, and the fileysstem is deleted.  Add a "shut down you fool"
+        // flag to MMDSMap?
+        dout(1) << "Client subscribed to non-existent namespace '" <<
+                fscid << "'" << dendl;
+        return;
+      }
     }
-    dout(10) << __func__ << ": is_mds=" << is_mds << ", fscid= " << fscid << dendl;
+    dout(10) << __func__ << ": is_mds=" << is_mds << ", fscid=" << fscid << dendl;
 
     // Work out the effective latest epoch
     const MDSMap *mds_map = nullptr;
-    MDSMap null_map;
-    null_map.compat = fsmap.compat;
+    MDSMap null_map = MDSMap::create_null_mdsmap();
     if (fscid == FS_CLUSTER_ID_NONE) {
       // For a client, we should have already dropped out
-      assert(is_mds);
+      ceph_assert(is_mds);
 
       auto it = fsmap.standby_daemons.find(mds_gid);
       if (it != fsmap.standby_daemons.end()) {
@@ -1872,19 +1816,19 @@ void MDSMonitor::check_sub(Subscription *sub)
       mds_map = &fsmap.get_filesystem(fscid)->mds_map;
     }
 
-    assert(mds_map != nullptr);
+    ceph_assert(mds_map != nullptr);
     dout(10) << __func__ << " selected MDS map epoch " <<
       mds_map->epoch << " for namespace " << fscid << " for subscriber "
-      << sub->session->inst.name << " who wants epoch " << sub->next << dendl;
+      << sub->session->name << " who wants epoch " << sub->next << dendl;
 
     if (sub->next > mds_map->epoch) {
       return;
     }
-    auto msg = new MMDSMap(mon->monmap->fsid, mds_map);
+    auto msg = make_message<MMDSMap>(mon.monmap->fsid, *mds_map);
 
-    sub->session->con->send_message(msg);
+    sub->session->con->send_message(msg.detach());
     if (sub->onetime) {
-      mon->session_map.remove_sub(sub);
+      mon.session_map.remove_sub(sub);
     } else {
       sub->next = mds_map->get_epoch() + 1;
     }
@@ -1895,16 +1839,17 @@ void MDSMonitor::check_sub(Subscription *sub)
 void MDSMonitor::update_metadata(mds_gid_t gid,
                                 const map<string, string>& metadata)
 {
+  dout(20) << __func__ <<  ": mds." << gid << ": " << metadata << dendl;
   if (metadata.empty()) {
+    dout(5) << __func__ << ": mds." << gid << ": no metadata!" << dendl;
     return;
   }
   pending_metadata[gid] = metadata;
 
-  MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
+  MonitorDBStore::TransactionRef t = paxos.get_pending_transaction();
   bufferlist bl;
-  ::encode(pending_metadata, bl);
+  encode(pending_metadata, bl);
   t->put(MDS_METADATA_PREFIX, "last_metadata", bl);
-  paxos->trigger_propose();
 }
 
 void MDSMonitor::remove_from_metadata(const FSMap &fsmap, MonitorDBStore::TransactionRef t)
@@ -1921,21 +1866,21 @@ void MDSMonitor::remove_from_metadata(const FSMap &fsmap, MonitorDBStore::Transa
   if (!update)
     return;
   bufferlist bl;
-  ::encode(pending_metadata, bl);
+  encode(pending_metadata, bl);
   t->put(MDS_METADATA_PREFIX, "last_metadata", bl);
 }
 
 int MDSMonitor::load_metadata(map<mds_gid_t, Metadata>& m)
 {
   bufferlist bl;
-  int r = mon->store->get(MDS_METADATA_PREFIX, "last_metadata", bl);
+  int r = mon.store->get(MDS_METADATA_PREFIX, "last_metadata", bl);
   if (r) {
-    dout(1) << "Unable to load 'last_metadata'" << dendl;
+    dout(5) << "Unable to load 'last_metadata'" << dendl;
     return r;
   }
 
-  bufferlist::iterator it = bl.begin();
-  ::decode(m, it);
+  auto it = bl.cbegin();
+  ceph::decode(m, it);
   return 0;
 }
 
@@ -1964,10 +1909,24 @@ void MDSMonitor::count_metadata(const std::string &field, Formatter *f)
   f->close_section();
 }
 
+void MDSMonitor::get_versions(std::map<string, list<string> > &versions)
+{
+  map<mds_gid_t,Metadata> meta;
+  load_metadata(meta);
+  const auto &fsmap = get_fsmap();
+  std::map<mds_gid_t, mds_info_t> map = fsmap.get_mds_info();
+  dout(10) << __func__ << " mds meta=" << meta << dendl;
+  for (auto& p : meta) {
+    auto q = p.second.find("ceph_version_short");
+    if (q == p.second.end()) continue;
+    versions[q->second].push_back(string("mds.") + map[p.first].name);
+  }
+}
+
 int MDSMonitor::dump_metadata(const FSMap& fsmap, const std::string &who,
     Formatter *f, ostream& err)
 {
-  assert(f);
+  ceph_assert(f);
 
   mds_gid_t gid = gid_from_arg(fsmap, who, err);
   if (gid == MDS_GID_NONE) {
@@ -1992,7 +1951,7 @@ int MDSMonitor::dump_metadata(const FSMap& fsmap, const std::string &who,
 
 int MDSMonitor::print_nodes(Formatter *f)
 {
-  assert(f);
+  ceph_assert(f);
 
   const auto &fsmap = get_fsmap();
 
@@ -2001,7 +1960,7 @@ int MDSMonitor::print_nodes(Formatter *f)
     return r;
   }
 
-  map<string, list<int> > mdses; // hostname => rank
+  map<string, list<string> > mdses; // hostname => mds
   for (const auto &p : metadata) {
     const mds_gid_t& gid = p.first;
     const Metadata& m = p.second;
@@ -2015,8 +1974,7 @@ int MDSMonitor::print_nodes(Formatter *f)
       continue;
     }
     const MDSMap::mds_info_t& mds_info = fsmap.get_info_gid(gid);
-    // FIXME: include filesystem name with rank here
-    mdses[hostname->second].push_back(mds_info.rank);
+    mdses[hostname->second].push_back(mds_info.name);
   }
 
   dump_services(f, mdses, "mds");
@@ -2025,44 +1983,63 @@ int MDSMonitor::print_nodes(Formatter *f)
 
 /**
  * If a cluster is undersized (with respect to max_mds), then
- * attempt to find daemons to grow it.
+ * attempt to find daemons to grow it. If the cluster is oversized
+ * (with respect to max_mds) then shrink it by stopping its highest rank.
  */
-bool MDSMonitor::maybe_expand_cluster(FSMap &fsmap, fs_cluster_id_t fscid)
+bool MDSMonitor::maybe_resize_cluster(FSMap &fsmap, fs_cluster_id_t fscid)
 {
-  auto fs = fsmap.get_filesystem(fscid);
+  auto&& fs = fsmap.get_filesystem(fscid);
   auto &mds_map = fs->mds_map;
 
-  if (fs->mds_map.test_flag(CEPH_MDSMAP_DOWN)) {
-    return false;
-  }
-
   int in = mds_map.get_num_in_mds();
   int max = mds_map.get_max_mds();
 
   dout(20) << __func__ << " in " << in << " max " << max << dendl;
 
-  if (in < max) {
+  /* Check that both the current epoch mds_map is resizeable as well as the
+   * current batch of changes in pending. This is important if an MDS is
+   * becoming active in the next epoch.
+   */
+  if (!get_fsmap().filesystem_exists(fscid) ||
+      !get_fsmap().get_filesystem(fscid)->mds_map.is_resizeable() ||
+      !mds_map.is_resizeable()) {
+    dout(5) << __func__ << " mds_map is not currently resizeable" << dendl;
+    return false;
+  }
+
+  if (in < max && !mds_map.test_flag(CEPH_MDSMAP_NOT_JOINABLE)) {
     mds_rank_t mds = mds_rank_t(0);
-    string name;
     while (mds_map.is_in(mds)) {
       mds++;
     }
-    mds_gid_t newgid = fsmap.find_replacement_for({fscid, mds},
-                         name, g_conf->mon_force_standby_active);
-    if (newgid == MDS_GID_NONE) {
+    auto info = fsmap.find_replacement_for({fscid, mds});
+    if (!info) {
       return false;
     }
 
-    const auto &new_info = fsmap.get_info_gid(newgid);
-    dout(1) << "assigned standby " << new_info.addr
+    dout(1) << "assigned standby " << info->addrs
             << " as mds." << mds << dendl;
-
-    mon->clog->info() << new_info.human_name() << " assigned to "
+    mon.clog->info() << info->human_name() << " assigned to "
                          "filesystem " << mds_map.fs_name << " as rank "
                       << mds << " (now has " << mds_map.get_num_in_mds() + 1
                       << " ranks)";
-    fsmap.promote(newgid, fs, mds);
+    fsmap.promote(info->global_id, *fs, mds);
     return true;
+  } else if (in > max) {
+    mds_rank_t target = in - 1;
+    const auto &info = mds_map.get_info(target);
+    if (mds_map.is_active(target)) {
+      dout(1) << "stopping " << target << dendl;
+      mon.clog->info() << "stopping " << info.human_name();
+      auto f = [](auto& info) {
+        info.state = MDSMap::STATE_STOPPING;
+      };
+      fsmap.modify_daemon(info.global_id, f);
+      return true;
+    } else {
+      dout(20) << "skipping stop of " << target << dendl;
+      return false;
+    }
   }
 
   return false;
@@ -2070,323 +2047,344 @@ bool MDSMonitor::maybe_expand_cluster(FSMap &fsmap, fs_cluster_id_t fscid)
 
 
 /**
- * If a daemon is laggy, and a suitable replacement
- * is available, fail this daemon (remove from map) and pass its
- * role to another daemon.
+ * Fail a daemon and replace it with a suitable standby.
  */
-void MDSMonitor::maybe_replace_gid(FSMap &fsmap, mds_gid_t gid,
-    const MDSMap::mds_info_t& info, bool *mds_propose, bool *osd_propose)
+bool MDSMonitor::drop_mds(FSMap &fsmap, mds_gid_t gid, const mds_info_t* rep_info, bool *osd_propose)
 {
-  assert(mds_propose != nullptr);
-  assert(osd_propose != nullptr);
+  ceph_assert(osd_propose != nullptr);
 
   const auto fscid = fsmap.mds_roles.at(gid);
+  const auto& info = fsmap.get_info_gid(gid);
+  const auto rank = info.rank;
+  const auto state = info.state;
 
-  // We will only take decisive action (replacing/removing a daemon)
-  // if we have some indicating that some other daemon(s) are successfully
-  // getting beacons through recently.
-  mono_time latest_beacon = mono_clock::zero();
-  for (const auto &p : last_beacon) {
-    latest_beacon = std::max(p.second.stamp, latest_beacon);
-  }
-  mono_time now = mono_clock::now();
-  chrono::duration<double> since = now-latest_beacon;
-  const bool may_replace = since.count() <
-      std::max(g_conf->mds_beacon_interval, g_conf->mds_beacon_grace * 0.5);
-
-  // are we in?
-  // and is there a non-laggy standby that can take over for us?
-  mds_gid_t sgid;
-  if (info.rank >= 0 &&
-      info.state != MDSMap::STATE_STANDBY &&
-      info.state != MDSMap::STATE_STANDBY_REPLAY &&
-      may_replace &&
-      !fsmap.get_filesystem(fscid)->mds_map.test_flag(CEPH_MDSMAP_DOWN) &&
-      (sgid = fsmap.find_replacement_for({fscid, info.rank}, info.name,
-                g_conf->mon_force_standby_active)) != MDS_GID_NONE)
-  {
-    
-    MDSMap::mds_info_t si = fsmap.get_info_gid(sgid);
-    dout(1) << " replacing " << gid << " " << info.addr << " mds."
-      << info.rank << "." << info.inc
-      << " " << ceph_mds_state_name(info.state)
-      << " with " << sgid << "/" << si.name << " " << si.addr << dendl;
-
-    mon->clog->warn() << info.human_name() 
-                      << " is not responding, replacing it "
-                      << "as rank " << info.rank
-                      << " with standby " << si.human_name();
+  if (info.is_frozen()) {
+    return false;
+  } else if (state == MDSMap::STATE_STANDBY_REPLAY ||
+             state == MDSMap::STATE_STANDBY) {
+    dout(1)  << " failing and removing standby " << gid << " " << info.addrs
+            << " mds." << rank
+            << "." << info.inc << " " << ceph_mds_state_name(state)
+            << dendl;
+    *osd_propose |= fail_mds_gid(fsmap, gid);
+    return true;
+  } else if (rank >= 0 && rep_info) {
+    auto fs = fsmap.filesystems.at(fscid);
+    if (fs->mds_map.test_flag(CEPH_MDSMAP_NOT_JOINABLE)) {
+      return false;
+    }
+    // are we in?
+    // and is there a non-laggy standby that can take over for us?
+    dout(1)  << " replacing " << gid << " " << info.addrs
+            << " mds." << rank << "." << info.inc
+            << " " << ceph_mds_state_name(state)
+            << " with " << rep_info->global_id << "/" << rep_info->name << " " << rep_info->addrs
+            << dendl;
 
-    // Remember what NS the old one was in
-    const fs_cluster_id_t fscid = fsmap.mds_roles.at(gid);
+    mon.clog->warn() << "Replacing " << info.human_name()
+                      << " as rank " << rank
+                      << " with standby " << rep_info->human_name();
 
     // Remove the old one
     *osd_propose |= fail_mds_gid(fsmap, gid);
 
     // Promote the replacement
-    auto fs = fsmap.filesystems.at(fscid);
-    fsmap.promote(sgid, fs, info.rank);
-
-    *mds_propose = true;
-  } else if ((info.state == MDSMap::STATE_STANDBY_REPLAY ||
-             info.state == MDSMap::STATE_STANDBY) && may_replace) {
-    dout(1) << " failing and removing " << gid << " " << info.addr << " mds." << info.rank 
-      << "." << info.inc << " " << ceph_mds_state_name(info.state)
-      << dendl;
-    mon->clog->info() << "Standby " << info.human_name() << " is not "
-                         "responding, dropping it";
-    fail_mds_gid(fsmap, gid);
-    *mds_propose = true;
-  } else if (!info.laggy()) {
-      dout(1) << " marking " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
-        << " " << ceph_mds_state_name(info.state)
-        << " laggy" << dendl;
-      fsmap.modify_daemon(info.global_id, [](MDSMap::mds_info_t *info) {
-          info->laggy_since = ceph_clock_now();
-      });
-      *mds_propose = true;
+    fsmap.promote(rep_info->global_id, *fs, rank);
+
+    return true;
   }
+  return false;
 }
 
-bool MDSMonitor::maybe_promote_standby(FSMap &fsmap, std::shared_ptr<Filesystem> &fs)
+bool MDSMonitor::check_health(FSMap& fsmap, bool* propose_osdmap)
 {
-  assert(!fs->mds_map.test_flag(CEPH_MDSMAP_DOWN));
-
   bool do_propose = false;
+  const auto now = mono_clock::now();
+  const bool osdmap_writeable = mon.osdmon()->is_writeable();
+  const auto mds_beacon_grace = g_conf().get_val<double>("mds_beacon_grace");
+  const auto mds_beacon_interval = g_conf().get_val<double>("mds_beacon_interval");
 
-  // have a standby take over?
-  set<mds_rank_t> failed;
-  fs->mds_map.get_failed_mds_set(failed);
-  if (!failed.empty()) {
-    set<mds_rank_t>::iterator p = failed.begin();
-    while (p != failed.end()) {
-      mds_rank_t f = *p++;
-      mds_gid_t sgid = fsmap.find_replacement_for({fs->fscid, f}, {},
-          g_conf->mon_force_standby_active);
-      if (sgid) {
-        const MDSMap::mds_info_t si = fsmap.get_info_gid(sgid);
-        dout(1) << " taking over failed mds." << f << " with " << sgid
-                << "/" << si.name << " " << si.addr << dendl;
-        mon->clog->info() << "Standby " << si.human_name()
-                          << " assigned to filesystem " << fs->mds_map.fs_name
-                          << " as rank " << f;
-
-        fsmap.promote(sgid, fs, f);
-       do_propose = true;
-      }
-    }
-  } else if (!fs->mds_map.is_degraded()) {
-    // There were no failures to replace, so try using any available standbys
-    // as standby-replay daemons. Don't do this when the cluster is degraded
-    // as a standby-replay daemon may try to read a journal being migrated.
+  if (mono_clock::is_zero(last_tick)) {
+    last_tick = now;
+  }
 
-    // Take a copy of the standby GIDs so that we can iterate over
-    // them while perhaps-modifying standby_daemons during the loop
-    // (if we promote anyone they are removed from standby_daemons)
-    std::vector<mds_gid_t> standby_gids;
-    for (const auto &j : fsmap.standby_daemons) {
-      standby_gids.push_back(j.first);
+  {
+    auto since_last = std::chrono::duration<double>(now-last_tick);
+
+    if (since_last.count() > (mds_beacon_grace-mds_beacon_interval)) {
+      // This case handles either local slowness (calls being delayed
+      // for whatever reason) or cluster election slowness (a long gap
+      // between calls while an election happened)
+      dout(1) << __func__ << ": resetting beacon timeouts due to mon delay "
+              "(slow election?) of " << since_last.count() << " seconds" << dendl;
+      for (auto& p : last_beacon) {
+        p.second.stamp = now;
+      }
     }
+  }
 
-    for (const auto &gid : standby_gids) {
-      const auto &info = fsmap.standby_daemons.at(gid);
-      assert(info.state == MDSMap::STATE_STANDBY);
+  // make sure last_beacon is fully populated
+  for (auto& p : fsmap.mds_roles) {
+    auto& gid = p.first;
+    last_beacon.emplace(std::piecewise_construct,
+        std::forward_as_tuple(gid),
+        std::forward_as_tuple(now, 0));
+  }
 
-      if (!info.standby_replay) {
-        continue;
-      }
+  // We will only take decisive action (replacing/removing a daemon)
+  // if we have some indication that some other daemon(s) are successfully
+  // getting beacons through recently.
+  mono_time latest_beacon = mono_clock::zero();
+  for (const auto& p : last_beacon) {
+    latest_beacon = std::max(p.second.stamp, latest_beacon);
+  }
+  auto since = std::chrono::duration<double>(now-latest_beacon);
+  const bool may_replace = since.count() <
+      std::max(g_conf()->mds_beacon_interval, g_conf()->mds_beacon_grace * 0.5);
 
-      /*
-       * This mds is standby but has no rank assigned.
-       * See if we can find it somebody to shadow
-       */
-      dout(20) << "gid " << gid << " is standby and following nobody" << dendl;
-      
-      // standby for someone specific?
-      if (info.standby_for_rank >= 0) {
-        // The mds_info_t may or may not tell us exactly which filesystem
-        // the standby_for_rank refers to: lookup via legacy_client_fscid
-        mds_role_t target_role = {
-          info.standby_for_fscid == FS_CLUSTER_ID_NONE ?
-            fsmap.legacy_client_fscid : info.standby_for_fscid,
-          info.standby_for_rank};
-
-        // It is possible that the map contains a standby_for_fscid
-        // that doesn't correspond to an existing filesystem, especially
-        // if we loaded from a version with a bug (#17466)
-        if (info.standby_for_fscid != FS_CLUSTER_ID_NONE
-            && !fsmap.filesystem_exists(info.standby_for_fscid)) {
-          derr << "gid " << gid << " has invalid standby_for_fscid "
-               << info.standby_for_fscid << dendl;
-          continue;
-        }
+  // check beacon timestamps
+  std::vector<mds_gid_t> to_remove;
+  const bool mon_down = mon.is_mon_down();
+  const auto mds_beacon_mon_down_grace =
+      g_conf().get_val<std::chrono::seconds>("mds_beacon_mon_down_grace");
+  const auto quorum_age = std::chrono::seconds(mon.quorum_age());
+  const bool new_quorum = quorum_age < mds_beacon_mon_down_grace;
+  for (auto it = last_beacon.begin(); it != last_beacon.end(); ) {
+    auto& [gid, beacon_info] = *it;
+    auto since_last = std::chrono::duration<double>(now-beacon_info.stamp);
 
-        // If we managed to resolve a full target role
-        if (target_role.fscid != FS_CLUSTER_ID_NONE) {
-          const auto &fs = fsmap.get_filesystem(target_role.fscid);
-          if (fs->mds_map.is_followable(target_role.rank)) {
-            do_propose |= try_standby_replay(fsmap, info, *fs,
-                fs->mds_map.get_info(target_role.rank));
-          }
-        }
+    if (!fsmap.gid_exists(gid)) {
+      // gid no longer exists, remove from tracked beacons
+      it = last_beacon.erase(it);
+      continue;
+    }
 
-       continue;
+    if (since_last.count() >= g_conf()->mds_beacon_grace) {
+      auto& info = fsmap.get_info_gid(gid);
+      dout(1) << "no beacon from mds." << info.rank << "." << info.inc
+              << " (gid: " << gid << " addr: " << info.addrs
+              << " state: " << ceph_mds_state_name(info.state) << ")"
+              << " since " << since_last.count() << dendl;
+      if ((mon_down || new_quorum) && since_last < mds_beacon_mon_down_grace) {
+        /* The MDS may be sending beacons to a monitor not yet in quorum or
+         * temporarily partitioned. Hold off on removal for a little longer...
+         */
+        dout(10) << "deferring removal for mds_beacon_mon_down_grace during MON_DOWN" << dendl;
+        ++it;
+        continue;
       }
+      // If the OSDMap is writeable, we can blocklist things, so we can
+      // try failing any laggy MDS daemons.  Consider each one for failure.
+      if (!info.laggy()) {
+        dout(1)  << " marking " << gid << " " << info.addrs
+                << " mds." << info.rank << "." << info.inc
+                << " " << ceph_mds_state_name(info.state)
+                << " laggy" << dendl;
+        fsmap.modify_daemon(info.global_id, [](auto& info) {
+            info.laggy_since = ceph_clock_now();
+        });
+        do_propose = true;
+      }
+      if (osdmap_writeable && may_replace) {
+        to_remove.push_back(gid); // drop_mds may invalidate iterator
+      }
+    }
 
-      // check everyone
-      for (const auto &p : fsmap.filesystems) {
-       if (info.standby_for_fscid != FS_CLUSTER_ID_NONE &&
-           info.standby_for_fscid != p.first)
-         continue;
-
-       bool assigned = false;
-        const auto &fs = p.second;
-        const MDSMap &mds_map = fs->mds_map;
-        for (const auto &mds_i : mds_map.mds_info) {
-          const MDSMap::mds_info_t &cand_info = mds_i.second;
-          if (cand_info.rank >= 0 && mds_map.is_followable(cand_info.rank)) {
-            if ((info.standby_for_name.length() && info.standby_for_name != cand_info.name) ||
-                info.standby_for_rank != MDS_RANK_NONE) {
-              continue;   // we're supposed to follow someone else
-            }
+    ++it;
+  }
 
-            if (try_standby_replay(fsmap, info, *fs, cand_info)) {
-             assigned = true;
-              break;
+  for (const auto& gid : to_remove) {
+    auto info = fsmap.get_info_gid(gid);
+    const mds_info_t* rep_info = nullptr;
+    if (info.rank >= 0) {
+      auto fscid = fsmap.fscid_from_gid(gid);
+      rep_info = fsmap.find_replacement_for({fscid, info.rank});
+    }
+    bool dropped = drop_mds(fsmap, gid, rep_info, propose_osdmap);
+    if (dropped) {
+      mon.clog->info() << "MDS " << info.human_name()
+                        << " is removed because it is dead or otherwise unavailable.";
+      do_propose = true;
+    }
+  }
+
+  if (osdmap_writeable) {
+    for (auto& [fscid, fs] : fsmap.filesystems) {
+      if (!fs->mds_map.test_flag(CEPH_MDSMAP_NOT_JOINABLE) &&
+          fs->mds_map.is_resizeable()) {
+        // Check if a rank or standby-replay should be replaced with a stronger
+        // affinity standby. This looks at ranks and standby-replay:
+        for (const auto& [gid, info] : fs->mds_map.get_mds_info()) {
+          const auto join_fscid = info.join_fscid;
+          if (join_fscid == fscid)
+            continue;
+          const auto rank = info.rank;
+          const auto state = info.state;
+          const mds_info_t* rep_info = nullptr;
+          if (state == MDSMap::STATE_STANDBY_REPLAY) {
+            rep_info = fsmap.get_available_standby(*fs);
+          } else if (state == MDSMap::STATE_ACTIVE) {
+            rep_info = fsmap.find_replacement_for({fscid, rank});
+          } else {
+            /* N.B. !is_degraded() */
+            ceph_abort_msg("invalid state in MDSMap");
+          }
+          if (!rep_info) {
+            break;
+          }
+          bool better_affinity = false;
+          if (join_fscid == FS_CLUSTER_ID_NONE) {
+            better_affinity = (rep_info->join_fscid == fscid);
+          } else {
+            better_affinity = (rep_info->join_fscid == fscid) ||
+                              (rep_info->join_fscid == FS_CLUSTER_ID_NONE);
+          }
+          if (better_affinity) {
+            if (state == MDSMap::STATE_STANDBY_REPLAY) {
+              mon.clog->info() << "Dropping low affinity standby-replay "
+                                << info.human_name()
+                                << " in favor of higher affinity standby.";
+              *propose_osdmap |= fail_mds_gid(fsmap, gid);
+              /* Now let maybe_promote_standby do the promotion. */
+            } else {
+              mon.clog->info() << "Dropping low affinity active "
+                                << info.human_name()
+                                << " in favor of higher affinity standby.";
+              do_propose |= drop_mds(fsmap, gid, rep_info, propose_osdmap);
             }
+            break; /* don't replace more than one per tick per fs */
           }
         }
-       if (assigned) {
-         do_propose = true;
-         break;
-       }
       }
     }
   }
-
   return do_propose;
 }
 
-void MDSMonitor::tick()
+bool MDSMonitor::maybe_promote_standby(FSMap &fsmap, Filesystem& fs)
 {
-  // make sure mds's are still alive
-  // ...if i am an active leader
-
-  if (!is_active() || !is_leader()) return;
-
-  auto &pending = get_pending_fsmap_writeable();
+  if (fs.mds_map.test_flag(CEPH_MDSMAP_NOT_JOINABLE)) {
+    return false;
+  }
 
   bool do_propose = false;
 
-  do_propose |= pending.check_health();
-
-  // expand mds cluster (add new nodes to @in)?
-  for (auto &p : pending.filesystems) {
-    do_propose |= maybe_expand_cluster(pending, p.second->fscid);
-  }
+  // have a standby take over?
+  set<mds_rank_t> failed;
+  fs.mds_map.get_failed_mds_set(failed);
+  for (const auto& rank : failed) {
+    auto info = fsmap.find_replacement_for({fs.fscid, rank});
+    if (info) {
+      dout(1) << " taking over failed mds." << rank << " with " << info->global_id
+              << "/" << info->name << " " << info->addrs << dendl;
+      mon.clog->info() << "Standby " << info->human_name()
+                        << " assigned to filesystem " << fs.mds_map.fs_name
+                        << " as rank " << rank;
 
-  mono_time now = mono_clock::now();
-  if (last_tick == decltype(last_tick)::min()) {
-    last_tick = now;
+      fsmap.promote(info->global_id, fs, rank);
+      do_propose = true;
+    }
   }
-  chrono::duration<double> since_last = now-last_tick;
 
-  if (since_last.count() >
-      (g_conf->mds_beacon_grace - g_conf->mds_beacon_interval)) {
-    // This case handles either local slowness (calls being delayed
-    // for whatever reason) or cluster election slowness (a long gap
-    // between calls while an election happened)
-    dout(1) << __func__ << ": resetting beacon timeouts due to mon delay "
-            "(slow election?) of " << now - last_tick << " seconds" << dendl;
-    for (auto &p : last_beacon) {
-      p.second.stamp = now;
+  if (fs.mds_map.is_resizeable() && fs.mds_map.allows_standby_replay()) {
+    // There were no failures to replace, so try using any available standbys
+    // as standby-replay daemons. Don't do this when the cluster is degraded
+    // as a standby-replay daemon may try to read a journal being migrated.
+    for (;;) {
+      auto info = fsmap.get_available_standby(fs);
+      if (!info) break;
+      dout(20) << "standby available mds." << info->global_id << dendl;
+      bool changed = false;
+      for (const auto& rank : fs.mds_map.in) {
+        dout(20) << "examining " << rank << dendl;
+        if (fs.mds_map.is_followable(rank)) {
+          dout(1) << "  setting mds." << info->global_id
+                  << " to follow mds rank " << rank << dendl;
+          fsmap.assign_standby_replay(info->global_id, fs.fscid, rank);
+          do_propose = true;
+          changed = true;
+          break;
+        }
+      }
+      if (!changed) break;
     }
   }
 
-  last_tick = now;
+  return do_propose;
+}
 
-  // make sure last_beacon is fully populated
-  for (auto &p : pending.mds_roles) {
-    auto &gid = p.first;
-    last_beacon.emplace(std::piecewise_construct,
-        std::forward_as_tuple(gid),
-        std::forward_as_tuple(mono_clock::now(), 0));
-  }
+void MDSMonitor::tick()
+{
+  if (!is_active() || !is_leader()) return;
 
+  auto &pending = get_pending_fsmap_writeable();
 
-  // check beacon timestamps
+  bool do_propose = false;
   bool propose_osdmap = false;
-  bool osdmap_writeable = mon->osdmon()->is_writeable();
-  for (auto it = last_beacon.begin(); it != last_beacon.end(); ) {
-    mds_gid_t gid = it->first;
-    auto beacon_info = it->second;
-    chrono::duration<double> since_last = now-beacon_info.stamp;
-
-    if (!pending.gid_exists(gid)) {
-      // clean it out
-      it = last_beacon.erase(it);
-      continue;
-    }
 
+  if (check_fsmap_struct_version) {
+    /* Allow time for trimming otherwise PaxosService::is_writeable will always
+     * be false.
+     */
 
-    if (since_last.count() >= g_conf->mds_beacon_grace) {
-      auto &info = pending.get_info_gid(gid);
-      dout(1) << "no beacon from mds." << info.rank << "." << info.inc
-              << " (gid: " << gid << " addr: " << info.addr
-              << " state: " << ceph_mds_state_name(info.state) << ")"
-              << " since " << since_last.count() << "s" << dendl;
-      // If the OSDMap is writeable, we can blacklist things, so we can
-      // try failing any laggy MDS daemons.  Consider each one for failure.
-      if (osdmap_writeable) {
-        maybe_replace_gid(pending, gid, info, &do_propose, &propose_osdmap);
+    auto now = clock::now();
+    auto elapsed = now - last_fsmap_struct_flush;
+    if (elapsed > std::chrono::seconds(30)) {
+      FSMap fsmap;
+      bufferlist bl;
+      auto v = get_first_committed();
+      int err = get_version(v, bl);
+      if (err) {
+        derr << "could not get version " << v << dendl;
+        ceph_abort();
+      }
+      try {
+        fsmap.decode(bl);
+      } catch (const ceph::buffer::malformed_input& e) {
+        dout(5) << "flushing old fsmap struct because unable to decode FSMap: " << e.what() << dendl;
+      }
+      /* N.B. FSMap::is_struct_old is also true for undecoded (failed to decode) FSMap */
+      if (fsmap.is_struct_old()) {
+        dout(5) << "fsmap struct is too old; proposing to flush out old versions" << dendl;
+        do_propose = true;
+        last_fsmap_struct_flush = now;
+      } else {
+        dout(20) << "struct is recent" << dendl;
+        check_fsmap_struct_version = false;
       }
     }
+  }
 
-    ++it;
+  do_propose |= pending.check_health();
+
+  /* Check health and affinity of ranks */
+  do_propose |= check_health(pending, &propose_osdmap);
+
+  /* Resize the cluster according to max_mds. */
+  for (auto& p : pending.filesystems) {
+    do_propose |= maybe_resize_cluster(pending, p.second->fscid);
   }
-  if (propose_osdmap) {
-    request_proposal(mon->osdmon());
+
+  /* Replace any failed ranks. */
+  for (auto& p : pending.filesystems) {
+    do_propose |= maybe_promote_standby(pending, *p.second);
   }
 
-  for (auto &p : pending.filesystems) {
-    auto &fs = p.second;
-    if (!fs->mds_map.test_flag(CEPH_MDSMAP_DOWN)) {
-      do_propose |= maybe_promote_standby(pending, fs);
-    }
+  if (propose_osdmap) {
+    request_proposal(mon.osdmon());
   }
 
   if (do_propose) {
     propose_pending();
   }
-}
 
-/**
- * finfo: the would-be follower
- * leader_fs: the Filesystem containing the would-be leader
- * ainfo: the would-be leader
- */
-bool MDSMonitor::try_standby_replay(
-    FSMap &fsmap,
-    const MDSMap::mds_info_t& finfo,
-    const Filesystem &leader_fs,
-    const MDSMap::mds_info_t& ainfo)
-{
-  // someone else already following?
-  if (leader_fs.has_standby_replay(ainfo.global_id)) {
-    dout(20) << " mds." << ainfo.rank << " already has a follower" << dendl;
-    return false;
-  } else {
-    // Assign the new role to the standby
-    dout(10) << "  setting to follow mds rank " << ainfo.rank << dendl;
-    fsmap.assign_standby_replay(finfo.global_id, leader_fs.fscid, ainfo.rank);
-    return true;
-  }
+  last_tick = mono_clock::now();
 }
 
-MDSMonitor::MDSMonitor(Monitor *mn, Paxos *p, string service_name)
+MDSMonitor::MDSMonitor(Monitor &mn, Paxos &p, string service_name)
   : PaxosService(mn, p, service_name)
 {
-  handlers = FileSystemCommandHandler::load(p);
+  handlers = FileSystemCommandHandler::load(&p);
 }
 
 void MDSMonitor::on_restart()