]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/Monitor.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mon / Monitor.cc
index 76c3c77081b8845f50856acb82d464803ff95489..37af8574bab48a557cdc0482e8d9eb4500dd3309 100644 (file)
@@ -13,7 +13,9 @@
  */
 
 
+#include <iterator>
 #include <sstream>
+#include <tuple>
 #include <stdlib.h>
 #include <signal.h>
 #include <limits.h>
 #include <boost/scope_exit.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 
+#include "json_spirit/json_spirit_reader.h"
+#include "json_spirit/json_spirit_writer.h"
+
 #include "Monitor.h"
 #include "common/version.h"
+#include "common/blkdev.h"
+#include "common/cmdparse.h"
+#include "common/signal.h"
 
 #include "osd/OSDMap.h"
 
@@ -36,7 +44,6 @@
 #include "messages/MGenericMessage.h"
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
-#include "messages/MMonHealth.h"
 #include "messages/MMonMetadata.h"
 #include "messages/MMonSync.h"
 #include "messages/MMonScrub.h"
 #include "messages/MMonPaxos.h"
 #include "messages/MRoute.h"
 #include "messages/MForward.h"
-#include "messages/MStatfs.h"
 
 #include "messages/MMonSubscribe.h"
 #include "messages/MMonSubscribeAck.h"
 
 #include "messages/MAuthReply.h"
 
-#include "messages/MTimeCheck.h"
+#include "messages/MTimeCheck2.h"
 #include "messages/MPing.h"
 
 #include "common/strtol.h"
 #include "OSDMonitor.h"
 #include "MDSMonitor.h"
 #include "MonmapMonitor.h"
-#include "PGMonitor.h"
 #include "LogMonitor.h"
 #include "AuthMonitor.h"
 #include "MgrMonitor.h"
 #include "MgrStatMonitor.h"
+#include "ConfigMonitor.h"
 #include "mon/QuorumService.h"
-#include "mon/OldHealthMonitor.h"
 #include "mon/HealthMonitor.h"
 #include "mon/ConfigKeyService.h"
 #include "common/config.h"
 #include "common/cmdparse.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 #include "include/compat.h"
 #include "perfglue/heap_profiler.h"
 
@@ -105,20 +110,18 @@ const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
 #undef COMMAND
 #undef COMMAND_WITH_FLAG
 #define FLAG(f) (MonCommand::FLAG_##f)
-#define COMMAND(parsesig, helptext, modulename, req_perms, avail)      \
-  {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
-#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
-  {parsesig, helptext, modulename, req_perms, avail, flags},
+#define COMMAND(parsesig, helptext, modulename, req_perms)     \
+  {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
+#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
+  {parsesig, helptext, modulename, req_perms, flags},
 MonCommand mon_commands[] = {
 #include <mon/MonCommands.h>
 };
-MonCommand pgmonitor_commands[] = {
-#include <mon/PGMonitorCommands.h>
-};
 #undef COMMAND
 #undef COMMAND_WITH_FLAG
 
 
+
 void C_MonContext::finish(int r) {
   if (mon->is_shutdown())
     return;
@@ -128,6 +131,7 @@ void C_MonContext::finish(int r) {
 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
                 Messenger *m, Messenger *mgr_m, MonMap *map) :
   Dispatcher(cct_),
+  AuthServer(cct_),
   name(nm),
   rank(-1), 
   messenger(m),
@@ -135,7 +139,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
   lock("Monitor::lock"),
   timer(cct_, lock),
   finisher(cct_, "mon_finisher", "fin"),
-  cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
+  cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
   has_ever_joined(false),
   logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
   monmap(map),
@@ -146,14 +150,12 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
                        cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
   auth_service_required(cct,
                        cct->_conf->auth_supported.empty() ?
-                       cct->_conf->auth_service_required : cct->_conf->auth_supported ),
+                       cct->_conf->auth_service_required : cct->_conf->auth_supported),
   mgr_messenger(mgr_m),
   mgr_client(cct_, mgr_m),
-  pgservice(nullptr),
+  gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
   store(s),
   
-  state(STATE_PROBING),
-  
   elector(this),
   required_features(0),
   leader(0),
@@ -179,67 +181,90 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
   paxos_service(PAXOS_NUM),
   admin_hook(NULL),
   routed_request_tid(0),
-  op_tracker(cct, true, 1)
+  op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
 {
   clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
   audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
 
   update_log_clients();
 
+  if (!gss_ktfile_client.empty()) {
+    // Assert we can export environment variable 
+    /* 
+        The default client keytab is used, if it is present and readable,
+        to automatically obtain initial credentials for GSSAPI client
+        applications. The principal name of the first entry in the client
+        keytab is used by default when obtaining initial credentials.
+        1. The KRB5_CLIENT_KTNAME environment variable.
+        2. The default_client_keytab_name profile variable in [libdefaults].
+        3. The hardcoded default, DEFCKTNAME.
+    */
+    const int32_t set_result(setenv("KRB5_CLIENT_KTNAME", 
+                                    gss_ktfile_client.c_str(), 1));
+    ceph_assert(set_result == 0);
+  }
+
+  op_tracker.set_complaint_and_threshold(
+      g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
+      g_conf().get_val<int64_t>("mon_op_log_threshold"));
+  op_tracker.set_history_size_and_duration(
+      g_conf().get_val<uint64_t>("mon_op_history_size"),
+      g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
+  op_tracker.set_history_slow_op_size_and_threshold(
+      g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
+      g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
+
   paxos = new Paxos(this, "paxos");
 
-  paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
-  paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
-  paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
-  paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
-  paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
-  paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
-  paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
-  paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
-  paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
-
-  health_monitor = new OldHealthMonitor(this);
+  paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(this, paxos, "mdsmap"));
+  paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(this, paxos, "monmap"));
+  paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, this, paxos, "osdmap"));
+  paxos_service[PAXOS_LOG].reset(new LogMonitor(this, paxos, "logm"));
+  paxos_service[PAXOS_AUTH].reset(new AuthMonitor(this, paxos, "auth"));
+  paxos_service[PAXOS_MGR].reset(new MgrMonitor(this, paxos, "mgr"));
+  paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(this, paxos, "mgrstat"));
+  paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(this, paxos, "health"));
+  paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(this, paxos, "config"));
+
   config_key_service = new ConfigKeyService(this, paxos);
 
-  mon_caps = new MonCap();
-  bool r = mon_caps->parse("allow *", NULL);
-  assert(r);
+  bool r = mon_caps.parse("allow *", NULL);
+  ceph_assert(r);
 
   exited_quorum = ceph_clock_now();
 
   // prepare local commands
-  local_mon_commands.resize(ARRAY_SIZE(mon_commands));
-  for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
+  local_mon_commands.resize(std::size(mon_commands));
+  for (unsigned i = 0; i < std::size(mon_commands); ++i) {
     local_mon_commands[i] = mon_commands[i];
   }
   MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
 
-  local_upgrading_mon_commands = local_mon_commands;
-  for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
-    local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
+  prenautilus_local_mon_commands = local_mon_commands;
+  for (auto& i : prenautilus_local_mon_commands) {
+    std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
+    if (n != i.cmdstring) {
+      dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
+      i.cmdstring = n;
+    }
   }
-  MonCommand::encode_vector(local_upgrading_mon_commands,
-                           local_upgrading_mon_commands_bl);
+  MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
 
   // assume our commands until we have an election.  this only means
   // we won't reply with EINVAL before the election; any command that
   // actually matters will wait until we have quorum etc and then
   // retry (and revalidate).
   leader_mon_commands = local_mon_commands;
-
-  // note: OSDMonitor may update this based on the luminous flag.
-  pgservice = mgrstatmon()->get_pg_stat_service();
 }
 
 Monitor::~Monitor()
 {
-  for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
-    delete *p;
-  delete health_monitor;
+  op_tracker.on_shutdown();
+
+  paxos_service.clear();
   delete config_key_service;
   delete paxos;
-  assert(session_map.sessions.empty());
-  delete mon_caps;
+  ceph_assert(session_map.sessions.empty());
 }
 
 
@@ -247,8 +272,8 @@ class AdminHook : public AdminSocketHook {
   Monitor *mon;
 public:
   explicit AdminHook(Monitor *m) : mon(m) {}
-  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
-           bufferlist& out) override {
+  bool call(std::string_view command, const cmdmap_t& cmdmap,
+           std::string_view format, bufferlist& out) override {
     stringstream ss;
     mon->do_admin_command(command, cmdmap, format, ss);
     out.append(ss);
@@ -256,15 +281,15 @@ public:
   }
 };
 
-void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
-                              ostream& ss)
+void Monitor::do_admin_command(std::string_view command, const cmdmap_t& cmdmap,
+                              std::string_view format, std::ostream& ss)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   boost::scoped_ptr<Formatter> f(Formatter::create(format));
 
   string args;
-  for (cmdmap_t::iterator p = cmdmap.begin();
+  for (auto p = cmdmap.begin();
        p != cmdmap.end(); ++p) {
     if (p->first == "prefix")
       continue;
@@ -300,7 +325,8 @@ void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
       goto abort;
     }
     sync_force(f.get(), ss);
-  } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
+  } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
+            command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
     if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
       goto abort;
   } else if (command == "quorum enter") {
@@ -327,8 +353,29 @@ void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
       f->flush(ss);
     }
 
+  } else if (command == "dump_historic_ops") {
+    if (op_tracker.dump_historic_ops(f.get())) {
+      f->flush(ss);
+    } else {
+      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
+        please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+    }
+  } else if (command == "dump_historic_ops_by_duration" ) {
+    if (op_tracker.dump_historic_ops(f.get(), true)) {
+      f->flush(ss);
+    } else {
+      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
+        please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+    }
+  } else if (command == "dump_historic_slow_ops") {
+    if (op_tracker.dump_historic_slow_ops(f.get(), {})) {
+      f->flush(ss);
+    } else {
+      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
+        please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+    }
   } else {
-    assert(0 == "bad AdminSocket command binding");
+    ceph_abort_msg("bad AdminSocket command binding");
   }
   (read_only ? audit_clog->debug() : audit_clog->info())
     << "from='admin socket' "
@@ -347,7 +394,7 @@ abort:
 
 void Monitor::handle_signal(int signum)
 {
-  assert(signum == SIGINT || signum == SIGTERM);
+  ceph_assert(signum == SIGINT || signum == SIGTERM);
   derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
   shutdown();
 }
@@ -372,6 +419,8 @@ CompatSet Monitor::get_supported_features()
   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
+  compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
+  compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
   return compat;
 }
 
@@ -418,7 +467,7 @@ void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
     t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
     store->apply_transaction(t);
   } else {
-    bufferlist::iterator it = featuresbl.begin();
+    auto it = featuresbl.cbegin();
     features->decode(it);
   }
 }
@@ -464,12 +513,21 @@ const char** Monitor::get_tracked_conf_keys() const
     "mon_health_to_clog_tick_interval",
     // scrub interval
     "mon_scrub_interval",
+    "mon_allow_pool_delete",
+    // osdmap pruning - observed, not handled.
+    "mon_osdmap_full_prune_enabled",
+    "mon_osdmap_full_prune_min",
+    "mon_osdmap_full_prune_interval",
+    "mon_osdmap_full_prune_txsize",
+    // debug options - observed, not handled
+    "mon_debug_extra_checks",
+    "mon_debug_block_osdmap_trim",
     NULL
   };
   return KEYS;
 }
 
-void Monitor::handle_conf_change(const struct md_config_t *conf,
+void Monitor::handle_conf_change(const ConfigProxy& conf,
                                  const std::set<std::string> &changed)
 {
   sanitize_options();
@@ -534,9 +592,9 @@ int Monitor::sanitize_options()
 
   // mon_lease must be greater than mon_lease_renewal; otherwise we
   // may incur in leases expiring before they are renewed.
-  if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
+  if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
     clog->error() << "mon_lease_renew_interval_factor ("
-                 << g_conf->mon_lease_renew_interval_factor
+                 << g_conf()->mon_lease_renew_interval_factor
                  << ") must be less than 1.0";
     r = -EINVAL;
   }
@@ -546,9 +604,9 @@ int Monitor::sanitize_options()
   // with the same value, for a given small vale, could mean timing out if
   // the monitors happened to be overloaded -- or even under normal load for
   // a small enough value.
-  if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
+  if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
     clog->error() << "mon_lease_ack_timeout_factor ("
-                 << g_conf->mon_lease_ack_timeout_factor
+                 << g_conf()->mon_lease_ack_timeout_factor
                  << ") must be greater than 1.0";
     r = -EINVAL;
   }
@@ -569,7 +627,7 @@ int Monitor::preinit()
     return r;
   }
 
-  assert(!logger);
+  ceph_assert(!logger);
   {
     PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
     pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
@@ -592,7 +650,7 @@ int Monitor::preinit()
     cct->get_perfcounters_collection()->add(logger);
   }
 
-  assert(!cluster_logger);
+  ceph_assert(!cluster_logger);
   {
     PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
     pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
@@ -601,9 +659,9 @@ int Monitor::preinit()
     pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
     pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
     pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
-    pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(BYTES));
-    pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(BYTES));
-    pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(BYTES));
+    pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
+    pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
+    pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
     pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
     pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
     pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
@@ -613,11 +671,7 @@ int Monitor::preinit()
     pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
     pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
     pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
-    pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
-    pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
-    pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
-    pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
-    pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
+    pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
     cluster_logger = pcb.create_perf_counters();
   }
 
@@ -644,13 +698,14 @@ int Monitor::preinit()
   if (!has_ever_joined) {
     // impose initial quorum restrictions?
     list<string> initial_members;
-    get_str_list(g_conf->mon_initial_members, initial_members);
+    get_str_list(g_conf()->mon_initial_members, initial_members);
 
     if (!initial_members.empty()) {
       dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
 
-      monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
-                                 &extra_probe_peers);
+      monmap->set_initial_members(
+       g_ceph_context, initial_members, name, messenger->get_myaddrs(),
+       &extra_probe_peers);
 
       dout(10) << " monmap is " << *monmap << dendl;
       dout(10) << " extra probe peers " << extra_probe_peers << dendl;
@@ -658,7 +713,7 @@ int Monitor::preinit()
   } else if (!monmap->contains(name)) {
     derr << "not in monmap and have been in a quorum before; "
          << "must have been removed" << dendl;
-    if (g_conf->mon_force_quorum_join) {
+    if (g_conf()->mon_force_quorum_join) {
       dout(0) << "we should have died but "
               << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
     } else {
@@ -693,7 +748,6 @@ int Monitor::preinit()
   dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
 
   init_paxos();
-  health_monitor->init();
 
   if (is_keyring_required()) {
     // we need to bootstrap authentication keys so we can form an
@@ -705,13 +759,13 @@ int Monitor::preinit()
       if (err == 0 && bl.length() > 0) {
         // Attempt to decode and extract keyring only if it is found.
         KeyRing keyring;
-        bufferlist::iterator p = bl.begin();
-        ::decode(keyring, p);
+        auto p = bl.cbegin();
+        decode(keyring, p);
         extract_save_mon_key(keyring);
       }
     }
 
-    string keyring_loc = g_conf->mon_data + "/keyring";
+    string keyring_loc = g_conf()->mon_data + "/keyring";
 
     r = keyring.load(cct, keyring_loc);
     if (r < 0) {
@@ -725,7 +779,7 @@ int Monitor::preinit()
        keyring.encode_plaintext(bl);
        write_default_keyring(bl);
       } else {
-       derr << "unable to load initial keyring " << g_conf->keyring << dendl;
+       derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
        lock.Unlock();
        return r;
       }
@@ -739,47 +793,72 @@ int Monitor::preinit()
   lock.Unlock();
   r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
                                     "show current monitor status");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("quorum_status", "quorum_status",
                                     admin_hook, "show current quorum status");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("sync_force",
                                     "sync_force name=validate,"
                                     "type=CephChoices,"
                                     "strings=--yes-i-really-mean-it",
                                     admin_hook,
                                     "force sync of and clear monitor store");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("add_bootstrap_peer_hint",
                                     "add_bootstrap_peer_hint name=addr,"
                                     "type=CephIPAddr",
                                     admin_hook,
                                     "add peer address as potential bootstrap"
                                     " peer for cluster bringup");
-  assert(r == 0);
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("add_bootstrap_peer_hintv",
+                                    "add_bootstrap_peer_hintv name=addrv,"
+                                    "type=CephString",
+                                    admin_hook,
+                                    "add peer address vector as potential bootstrap"
+                                    " peer for cluster bringup");
+  ceph_assert(r == 0);
   r = admin_socket->register_command("quorum enter", "quorum enter",
                                      admin_hook,
                                      "force monitor back into quorum");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("quorum exit", "quorum exit",
                                      admin_hook,
                                      "force monitor out of the quorum");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("ops",
                                      "ops",
                                      admin_hook,
                                      "show the ops currently in flight");
-  assert(r == 0);
+  ceph_assert(r == 0);
   r = admin_socket->register_command("sessions",
                                      "sessions",
                                      admin_hook,
                                      "list existing sessions");
-  assert(r == 0);
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
+                                     admin_hook,
+                                    "show recent ops");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
+                                     admin_hook,
+                                    "show recent ops, sorted by duration");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("dump_historic_slow_ops", "dump_historic_slow_ops",
+                                     admin_hook,
+                                    "show recent slow ops");
+  ceph_assert(r == 0);
 
   lock.Lock();
 
   // add ourselves as a conf observer
-  g_conf->add_observer(this);
+  g_conf().add_observer(this);
+
+  messenger->set_auth_client(this);
+  messenger->set_auth_server(this);
+  mgr_messenger->set_auth_client(this);
+
+  auth_registry.refresh_config();
 
   lock.Unlock();
   return 0;
@@ -788,7 +867,7 @@ int Monitor::preinit()
 int Monitor::init()
 {
   dout(2) << "init" << dendl;
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   finisher.start();
 
@@ -801,10 +880,13 @@ int Monitor::init()
   // i'm ready!
   messenger->add_dispatcher_tail(this);
 
+  // kickstart pet mgrclient
   mgr_client.init();
   mgr_messenger->add_dispatcher_tail(&mgr_client);
   mgr_messenger->add_dispatcher_tail(this);  // for auth ms_* calls
+  mgrmon()->prime_mgr_client();
 
+  state = STATE_PROBING;
   bootstrap();
   // add features of myself into feature_map
   session_map.feature_map.add_mon(con_self->get_features());
@@ -817,8 +899,8 @@ void Monitor::init_paxos()
   paxos->init();
 
   // init services
-  for (int i = 0; i < PAXOS_NUM; ++i) {
-    paxos_service[i]->init();
+  for (auto& svc : paxos_service) {
+    svc->init();
   }
 
   refresh_from_paxos(NULL);
@@ -832,8 +914,8 @@ void Monitor::refresh_from_paxos(bool *need_bootstrap)
   int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
   if (r >= 0) {
     try {
-      bufferlist::iterator p = bl.begin();
-      ::decode(fingerprint, p);
+      auto p = bl.cbegin();
+      decode(fingerprint, p);
     }
     catch (buffer::error& e) {
       dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
@@ -842,11 +924,11 @@ void Monitor::refresh_from_paxos(bool *need_bootstrap)
     dout(10) << __func__ << " no cluster_fingerprint" << dendl;
   }
 
-  for (int i = 0; i < PAXOS_NUM; ++i) {
-    paxos_service[i]->refresh(need_bootstrap);
+  for (auto& svc : paxos_service) {
+    svc->refresh(need_bootstrap);
   }
-  for (int i = 0; i < PAXOS_NUM; ++i) {
-    paxos_service[i]->post_refresh();
+  for (auto& svc : paxos_service) {
+    svc->post_refresh();
   }
   load_metadata();
 }
@@ -887,22 +969,19 @@ void Monitor::shutdown()
 
   wait_for_paxos_write();
 
+  {
+    std::lock_guard l(auth_lock);
+    authmon()->_set_mon_num_rank(0, 0);
+  }
+
   state = STATE_SHUTDOWN;
 
   lock.Unlock();
-  g_conf->remove_observer(this);
+  g_conf().remove_observer(this);
   lock.Lock();
 
   if (admin_hook) {
-    AdminSocket* admin_socket = cct->get_admin_socket();
-    admin_socket->unregister_command("mon_status");
-    admin_socket->unregister_command("quorum_status");
-    admin_socket->unregister_command("sync_force");
-    admin_socket->unregister_command("add_bootstrap_peer_hint");
-    admin_socket->unregister_command("quorum enter");
-    admin_socket->unregister_command("quorum exit");
-    admin_socket->unregister_command("ops");
-    admin_socket->unregister_command("sessions");
+    cct->get_admin_socket()->unregister_commands(admin_hook);
     delete admin_hook;
     admin_hook = NULL;
   }
@@ -918,9 +997,9 @@ void Monitor::shutdown()
 
   // clean up
   paxos->shutdown();
-  for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
-    (*p)->shutdown();
-  health_monitor->shutdown();
+  for (auto& svc : paxos_service) {
+    svc->shutdown();
+  }
 
   finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
   finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
@@ -965,6 +1044,62 @@ void Monitor::wait_for_paxos_write()
   }
 }
 
+void Monitor::respawn()
+{
+  // --- WARNING TO FUTURE COPY/PASTERS ---
+  // You must also add a call like
+  //
+  //   ceph_pthread_setname(pthread_self(), "ceph-mon");
+  //
+  // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
+  // instead of "(exe)", so that killall (and log rotation) will work.
+
+  dout(0) << __func__ << dendl;
+
+  char *new_argv[orig_argc+1];
+  dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
+  for (int i=0; i<orig_argc; i++) {
+    new_argv[i] = (char *)orig_argv[i];
+    dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
+  }
+  new_argv[orig_argc] = NULL;
+
+  /* Determine the path to our executable, test if Linux /proc/self/exe exists.
+   * This allows us to exec the same executable even if it has since been
+   * unlinked.
+   */
+  char exe_path[PATH_MAX] = "";
+#ifdef PROCPREFIX
+  if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
+    dout(1) << "respawning with exe " << exe_path << dendl;
+    strcpy(exe_path, PROCPREFIX "/proc/self/exe");
+  } else {
+#else
+  {
+#endif
+    /* Print CWD for the user's interest */
+    char buf[PATH_MAX];
+    char *cwd = getcwd(buf, sizeof(buf));
+    ceph_assert(cwd);
+    dout(1) << " cwd " << cwd << dendl;
+
+    /* Fall back to a best-effort: just running in our CWD */
+    strncpy(exe_path, orig_argv[0], PATH_MAX-1);
+  }
+
+  dout(1) << " exe_path " << exe_path << dendl;
+
+  unblock_all_signals(NULL);
+  execv(exe_path, new_argv);
+
+  dout(0) << "respawn execv " << orig_argv[0]
+         << " failed with " << cpp_strerror(errno) << dendl;
+
+  // We have to assert out here, because suicide() returns, and callers
+  // to respawn expect it never to return.
+  ceph_abort();
+}
+
 void Monitor::bootstrap()
 {
   dout(10) << "bootstrap" << dendl;
@@ -974,8 +1109,25 @@ void Monitor::bootstrap()
   unregister_cluster_logger();
   cancel_probe_timeout();
 
+  if (monmap->get_epoch() == 0) {
+    dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
+    monmap->calc_legacy_ranks();
+  }
+  dout(10) << "monmap " << *monmap << dendl;
+
+  if (monmap->min_mon_release &&
+      monmap->min_mon_release + 2 < (int)ceph_release()) {
+    derr << "current monmap has min_mon_release "
+        << (int)monmap->min_mon_release
+        << " (" << ceph_release_name(monmap->min_mon_release)
+        << ") which is >2 releases older than me " << ceph_release()
+        << " (" << ceph_release_name(ceph_release()) << "), stopping."
+        << dendl;
+    exit(0);
+  }
+
   // note my rank
-  int newrank = monmap->get_rank(messenger->get_myaddr());
+  int newrank = monmap->get_rank(messenger->get_myaddrs());
   if (newrank < 0 && rank >= 0) {
     // was i ever part of the quorum?
     if (has_ever_joined) {
@@ -983,6 +1135,14 @@ void Monitor::bootstrap()
       exit(0);
     }
   }
+  if (newrank >= 0 &&
+      monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
+    dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
+           << messenger->get_myaddrs()
+           << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
+           << dendl;
+    respawn();
+  }
   if (newrank != rank) {
     dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
     messenger->set_myname(entity_name_t::MON(newrank));
@@ -998,7 +1158,7 @@ void Monitor::bootstrap()
   _reset();
 
   // sync store
-  if (g_conf->mon_compact_on_bootstrap) {
+  if (g_conf()->mon_compact_on_bootstrap) {
     dout(10) << "bootstrap -- triggering compaction" << dendl;
     store->compact();
     dout(10) << "bootstrap -- finished compaction" << dendl;
@@ -1020,49 +1180,74 @@ void Monitor::bootstrap()
   dout(10) << "probing other monitors" << dendl;
   for (unsigned i = 0; i < monmap->size(); i++) {
     if ((int)i != rank)
-      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
-                             monmap->get_inst(i));
-  }
-  for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
-       p != extra_probe_peers.end();
-       ++p) {
-    if (*p != messenger->get_myaddr()) {
-      entity_inst_t i;
-      i.name = entity_name_t::MON(-1);
-      i.addr = *p;
-      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
+      send_mon_message(
+       new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
+                     ceph_release()),
+       i);
+  }
+  for (auto& av : extra_probe_peers) {
+    if (av != messenger->get_myaddrs()) {
+      messenger->send_to_mon(
+       new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
+                     ceph_release()),
+       av);
     }
   }
 }
 
-bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
+bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
+                                      const cmdmap_t& cmdmap,
+                                      ostream& ss)
 {
-  string addrstr;
-  if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
-    ss << "unable to parse address string value '"
-         << cmd_vartype_stringify(cmdmap["addr"]) << "'";
-    return false;
-  }
-  dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
-           << addrstr << "'" << dendl;
-
-  entity_addr_t addr;
-  const char *end = 0;
-  if (!addr.parse(addrstr.c_str(), &end)) {
-    ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
-    return false;
-  }
-
   if (is_leader() || is_peon()) {
     ss << "mon already active; ignoring bootstrap hint";
     return true;
   }
 
-  if (addr.get_port() == 0)
-    addr.set_port(CEPH_MON_PORT);
+  entity_addrvec_t addrs;
+  string addrstr;
+  if (cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
+    dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
+            << addrstr << "'" << dendl;
+
+    entity_addr_t addr;
+    const char *end = 0;
+    if (!addr.parse(addrstr.c_str(), &end, entity_addr_t::TYPE_ANY)) {
+      ss << "failed to parse addrs '" << addrstr
+        << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
+      return false;
+    }
+
+    addrs.v.push_back(addr);
+    if (addr.get_port() == 0) {
+      addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
+      addrs.v[0].set_port(CEPH_MON_PORT_IANA);
+      addrs.v.push_back(addr);
+      addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
+      addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
+    } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
+      if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
+       addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
+      } else {
+       addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
+      }
+    }
+  } else if (cmd_getval(g_ceph_context, cmdmap, "addrv", addrstr)) {
+    dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
+            << addrstr << "'" << dendl;
+    const char *end = 0;
+    if (!addrs.parse(addrstr.c_str(), &end)) {
+      ss << "failed to parse addrs '" << addrstr
+        << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
+      return false;
+    }
+  } else {
+    ss << "no addr or addrv provided";
+    return false;
+  }
 
-  extra_probe_peers.insert(addr);
-  ss << "adding peer " << addr << " to list: " << extra_probe_peers;
+  extra_probe_peers.insert(addrs);
+  ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
   return true;
 }
 
@@ -1071,6 +1256,12 @@ void Monitor::_reset()
 {
   dout(10) << __func__ << dendl;
 
+  // disable authentication
+  {
+    std::lock_guard l(auth_lock);
+    authmon()->_set_mon_num_rank(0, 0);
+  }
+
   cancel_probe_timeout();
   timecheck_finish();
   health_events_cleanup();
@@ -1078,6 +1269,7 @@ void Monitor::_reset()
   scrub_event_cancel();
 
   leader_since = utime_t();
+  quorum_since = {};
   if (!quorum.empty()) {
     exited_quorum = ceph_clock_now();
   }
@@ -1089,9 +1281,9 @@ void Monitor::_reset()
 
   paxos->restart();
 
-  for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
-    (*p)->restart();
-  health_monitor->finish();
+  for (auto& svc : paxos_service) {
+    svc->restart();
+  }
 }
 
 
@@ -1102,10 +1294,11 @@ set<string> Monitor::get_sync_targets_names()
 {
   set<string> targets;
   targets.insert(paxos->get_name());
-  for (int i = 0; i < PAXOS_NUM; ++i)
-    paxos_service[i]->get_store_prefixes(targets);
+  for (auto& svc : paxos_service) {
+    svc->get_store_prefixes(targets);
+  }
   ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
-  assert(config_key_service_ptr);
+  ceph_assert(config_key_service_ptr);
   config_key_service_ptr->get_store_prefixes(targets);
   return targets;
 }
@@ -1114,7 +1307,7 @@ set<string> Monitor::get_sync_targets_names()
 void Monitor::sync_timeout()
 {
   dout(10) << __func__ << dendl;
-  assert(state == STATE_SYNCHRONIZING);
+  ceph_assert(state == STATE_SYNCHRONIZING);
   bootstrap();
 }
 
@@ -1132,7 +1325,7 @@ void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
       derr << __func__
            << " something wrong happened while reading the store: "
            << cpp_strerror(err) << dendl;
-      assert(0 == "error reading the store");
+      ceph_abort_msg("error reading the store");
     }
   } else {
     latest_monmap.decode(monmon_bl);
@@ -1146,9 +1339,9 @@ void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
       derr << __func__
            << " something wrong happened while reading the store: "
            << cpp_strerror(err) << dendl;
-      assert(0 == "error reading the store");
+      ceph_abort_msg("error reading the store");
     }
-    assert(backup_bl.length() > 0);
+    ceph_assert(backup_bl.length() > 0);
 
     MonMap backup_monmap;
     backup_monmap.decode(backup_bl);
@@ -1176,7 +1369,7 @@ void Monitor::sync_reset_requester()
     sync_timeout_event = NULL;
   }
 
-  sync_provider = entity_inst_t();
+  sync_provider = entity_addrvec_t();
   sync_cookie = 0;
   sync_full = false;
   sync_start_version = 0;
@@ -1188,11 +1381,11 @@ void Monitor::sync_reset_provider()
   sync_providers.clear();
 }
 
-void Monitor::sync_start(entity_inst_t &other, bool full)
+void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
 {
-  dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
+  dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
 
-  assert(state == STATE_PROBING ||
+  ceph_assert(state == STATE_PROBING ||
         state == STATE_SYNCHRONIZING);
   state = STATE_SYNCHRONIZING;
 
@@ -1207,14 +1400,14 @@ void Monitor::sync_start(entity_inst_t &other, bool full)
     sync_stash_critical_state(t);
     t->put("mon_sync", "in_sync", 1);
 
-    sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
+    sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
     dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
             << sync_last_committed_floor << dendl;
     t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
 
     store->apply_transaction(t);
 
-    assert(g_conf->mon_sync_requester_kill_at != 1);
+    ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
 
     // clear the underlying store
     set<string> targets = get_sync_targets_names();
@@ -1226,19 +1419,19 @@ void Monitor::sync_start(entity_inst_t &other, bool full)
     // deciding a partial or no sync is needed.
     paxos->init();
 
-    assert(g_conf->mon_sync_requester_kill_at != 2);
+    ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
   }
 
   // assume 'other' as the leader. We will update the leader once we receive
   // a reply to the sync start.
-  sync_provider = other;
+  sync_provider = addrs;
 
   sync_reset_timeout();
 
   MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
   if (!sync_full)
     m->last_committed = paxos->get_version();
-  messenger->send_message(m, sync_provider);
+  messenger->send_to_mon(m, sync_provider);
 }
 
 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
@@ -1246,7 +1439,7 @@ void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
   dout(10) << __func__ << dendl;
   bufferlist backup_monmap;
   sync_obtain_latest_monmap(backup_monmap);
-  assert(backup_monmap.length() > 0);
+  ceph_assert(backup_monmap.length() > 0);
   t->put("mon_sync", "latest_monmap", backup_monmap);
 }
 
@@ -1256,7 +1449,7 @@ void Monitor::sync_reset_timeout()
   if (sync_timeout_event)
     timer.cancel_event(sync_timeout_event);
   sync_timeout_event = timer.add_event_after(
-    g_conf->mon_sync_timeout,
+    g_conf()->mon_sync_timeout,
     new C_MonContext(this, [this](int) {
        sync_timeout();
       }));
@@ -1266,7 +1459,7 @@ void Monitor::sync_finish(version_t last_committed)
 {
   dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
 
-  assert(g_conf->mon_sync_requester_kill_at != 7);
+  ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
 
   if (sync_full) {
     // finalize the paxos commits
@@ -1284,7 +1477,7 @@ void Monitor::sync_finish(version_t last_committed)
     store->apply_transaction(tx);
   }
 
-  assert(g_conf->mon_sync_requester_kill_at != 8);
+  ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
 
   auto t(std::make_shared<MonitorDBStore::Transaction>());
   t->erase("mon_sync", "in_sync");
@@ -1292,11 +1485,11 @@ void Monitor::sync_finish(version_t last_committed)
   t->erase("mon_sync", "last_committed_floor");
   store->apply_transaction(t);
 
-  assert(g_conf->mon_sync_requester_kill_at != 9);
+  ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
 
   init_paxos();
 
-  assert(g_conf->mon_sync_requester_kill_at != 10);
+  ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
 
   bootstrap();
 }
@@ -1333,7 +1526,7 @@ void Monitor::handle_sync(MonOpRequestRef op)
 
   default:
     dout(0) << __func__ << " unknown op " << m->op << dendl;
-    assert(0 == "unknown op");
+    ceph_abort_msg("unknown op");
   }
 }
 
@@ -1354,7 +1547,7 @@ void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
     return;
   }
 
-  assert(g_conf->mon_sync_provider_kill_at != 1);
+  ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
 
   // make sure they can understand us.
   if ((required_features ^ m->get_connection()->get_features()) &
@@ -1371,14 +1564,14 @@ void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
   // process instance.  there is no need to be unique *across*
   // monitors, though.
   uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
-  assert(sync_providers.count(cookie) == 0);
+  ceph_assert(sync_providers.count(cookie) == 0);
 
   dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
 
   SyncProvider& sp = sync_providers[cookie];
   sp.cookie = cookie;
-  sp.entity = m->get_source_inst();
-  sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
+  sp.addrs = m->get_source_addrs();
+  sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
 
   set<string> sync_targets;
   if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
@@ -1410,10 +1603,10 @@ void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
     return;
   }
 
-  assert(g_conf->mon_sync_provider_kill_at != 2);
+  ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
 
   SyncProvider& sp = sync_providers[m->cookie];
-  sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
+  sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
 
   if (sp.last_committed < paxos->get_first_committed() &&
       paxos->get_first_committed() > 1) {
@@ -1427,13 +1620,13 @@ void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
   MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
   auto tx(std::make_shared<MonitorDBStore::Transaction>());
 
-  int left = g_conf->mon_sync_max_payload_size;
+  int left = g_conf()->mon_sync_max_payload_size;
   while (sp.last_committed < paxos->get_version() && left > 0) {
     bufferlist bl;
     sp.last_committed++;
 
     int err = store->get(paxos->get_name(), sp.last_committed, bl);
-    assert(err == 0);
+    ceph_assert(err == 0);
 
     tx->put(paxos->get_name(), sp.last_committed, bl);
     left -= bl.length();
@@ -1457,13 +1650,13 @@ void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
             << " key " << sp.last_key << dendl;
     reply->op = MMonSync::OP_LAST_CHUNK;
 
-    assert(g_conf->mon_sync_provider_kill_at != 3);
+    ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
 
     // clean up our local state
     sync_providers.erase(sp.cookie);
   }
 
-  ::encode(*tx, reply->chunk_bl);
+  encode(*tx, reply->chunk_bl);
 
   m->get_connection()->send_message(reply);
 }
@@ -1478,7 +1671,7 @@ void Monitor::handle_sync_cookie(MonOpRequestRef op)
     dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
     return;
   }
-  if (m->get_source_inst() != sync_provider) {
+  if (m->get_source_addrs() != sync_provider) {
     dout(10) << __func__ << " source does not match, discarding" << dendl;
     return;
   }
@@ -1488,20 +1681,20 @@ void Monitor::handle_sync_cookie(MonOpRequestRef op)
   sync_reset_timeout();
   sync_get_next_chunk();
 
-  assert(g_conf->mon_sync_requester_kill_at != 3);
+  ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
 }
 
 void Monitor::sync_get_next_chunk()
 {
   dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
-  if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
-    dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
-    usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
+  if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
+    dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
+    usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
   }
   MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
-  messenger->send_message(r, sync_provider);
+  messenger->send_to_mon(r, sync_provider);
 
-  assert(g_conf->mon_sync_requester_kill_at != 4);
+  ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
 }
 
 void Monitor::handle_sync_chunk(MonOpRequestRef op)
@@ -1513,13 +1706,13 @@ void Monitor::handle_sync_chunk(MonOpRequestRef op)
     dout(10) << __func__ << " cookie does not match, discarding" << dendl;
     return;
   }
-  if (m->get_source_inst() != sync_provider) {
+  if (m->get_source_addrs() != sync_provider) {
     dout(10) << __func__ << " source does not match, discarding" << dendl;
     return;
   }
 
-  assert(state == STATE_SYNCHRONIZING);
-  assert(g_conf->mon_sync_requester_kill_at != 5);
+  ceph_assert(state == STATE_SYNCHRONIZING);
+  ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
 
   auto tx(std::make_shared<MonitorDBStore::Transaction>());
   tx->append_from_encoded(m->chunk_bl);
@@ -1532,7 +1725,7 @@ void Monitor::handle_sync_chunk(MonOpRequestRef op)
 
   store->apply_transaction(tx);
 
-  assert(g_conf->mon_sync_requester_kill_at != 6);
+  ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
 
   if (!sync_full) {
     dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
@@ -1573,7 +1766,8 @@ void Monitor::sync_trim_providers()
   map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
   while (p != sync_providers.end()) {
     if (now > p->second.timeout) {
-      dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
+      dout(10) << __func__ << " expiring cookie " << p->second.cookie
+              << " for " << p->second.addrs << dendl;
       sync_providers.erase(p++);
     } else {
       ++p;
@@ -1601,7 +1795,7 @@ void Monitor::reset_probe_timeout()
   probe_timeout_event = new C_MonContext(this, [this](int r) {
       probe_timeout(r);
     });
-  double t = g_conf->mon_probe_timeout;
+  double t = g_conf()->mon_probe_timeout;
   if (timer.add_event_after(t, probe_timeout_event)) {
     dout(10) << "reset_probe_timeout " << probe_timeout_event
             << " after " << t << " seconds" << dendl;
@@ -1613,8 +1807,8 @@ void Monitor::reset_probe_timeout()
 void Monitor::probe_timeout(int r)
 {
   dout(4) << "probe_timeout " << probe_timeout_event << dendl;
-  assert(is_probing() || is_synchronizing());
-  assert(probe_timeout_event);
+  ceph_assert(is_probing() || is_synchronizing());
+  ceph_assert(probe_timeout_event);
   probe_timeout_event = NULL;
   bootstrap();
 }
@@ -1639,9 +1833,10 @@ void Monitor::handle_probe(MonOpRequestRef op)
     break;
 
   case MMonProbe::OP_MISSING_FEATURES:
-    derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
+    derr << __func__ << " require release " << m->mon_release << " > "
+        << ceph_release() << ", or missing features (have " << CEPH_FEATURES_ALL
         << ", required " << m->required_features
-        << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
+        << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
         << dendl;
     break;
   }
@@ -1654,15 +1849,14 @@ void Monitor::handle_probe_probe(MonOpRequestRef op)
   dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
           << " features " << m->get_connection()->get_features() << dendl;
   uint64_t missing = required_features & ~m->get_connection()->get_features();
-  if (missing) {
-    dout(1) << " peer " << m->get_source_addr() << " missing features "
-           << missing << dendl;
-    if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
-      MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
-                                  name, has_ever_joined);
-      m->required_features = required_features;
-      m->get_connection()->send_message(r);
-    }
+  if (m->mon_release < monmap->min_mon_release || missing) {
+    dout(1) << " peer " << m->get_source_addr() << " release " << m->mon_release
+           << " < min_mon_release " << monmap->min_mon_release
+           << ", or missing features " << missing << dendl;
+    MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
+                                name, has_ever_joined, monmap->min_mon_release);
+    m->required_features = required_features;
+    m->get_connection()->send_message(r);
     goto out;
   }
 
@@ -1683,7 +1877,8 @@ void Monitor::handle_probe_probe(MonOpRequestRef op)
   }
   
   MMonProbe *r;
-  r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
+  r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
+                   ceph_release());
   r->name = name;
   r->quorum = quorum;
   monmap->encode(r->monmap_bl, m->get_connection()->get_features());
@@ -1693,9 +1888,9 @@ void Monitor::handle_probe_probe(MonOpRequestRef op)
 
   // did we discover a peer here?
   if (!monmap->contains(m->get_source_addr())) {
-    dout(1) << " adding peer " << m->get_source_addr()
+    dout(1) << " adding peer " << m->get_source_addrs()
            << " to list of hints" << dendl;
-    extra_probe_peers.insert(m->get_source_addr());
+    extra_probe_peers.insert(m->get_source_addrs());
   }
 
  out:
@@ -1705,7 +1900,8 @@ void Monitor::handle_probe_probe(MonOpRequestRef op)
 void Monitor::handle_probe_reply(MonOpRequestRef op)
 {
   MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
-  dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
+  dout(10) << "handle_probe_reply " << m->get_source_inst()
+          << " " << *m << dendl;
   dout(10) << " monmap is " << *monmap << dendl;
 
   // discover name and addrs during probing or electing states.
@@ -1746,16 +1942,19 @@ void Monitor::handle_probe_reply(MonOpRequestRef op)
       bootstrap();
       return;
     }
-  } else {
+  } else if (peer_name.size()) {
     dout(10) << " peer name is " << peer_name << dendl;
+  } else {
+    dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
   }
 
   // new initial peer?
   if (monmap->get_epoch() == 0 &&
       monmap->contains(m->name) &&
-      monmap->get_addr(m->name).is_blank_ip()) {
-    dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
-    monmap->set_addr(m->name, m->get_source_addr());
+      monmap->get_addrs(m->name).front().is_blank_ip()) {
+    dout(1) << " learned initial mon " << m->name
+           << " addrs " << m->get_source_addrs() << dendl;
+    monmap->set_addrvec(m->name, m->get_source_addrs());
 
     bootstrap();
     return;
@@ -1766,14 +1965,14 @@ void Monitor::handle_probe_reply(MonOpRequestRef op)
     return;
   }
 
-  assert(paxos != NULL);
+  ceph_assert(paxos != NULL);
 
   if (is_synchronizing()) {
     dout(10) << " currently syncing" << dendl;
     return;
   }
 
-  entity_inst_t other = m->get_source_inst();
+  entity_addrvec_t other = m->get_source_addrs();
 
   if (m->paxos_last_version < sync_last_committed_floor) {
     dout(10) << " peer paxos versions [" << m->paxos_first_version
@@ -1792,7 +1991,7 @@ void Monitor::handle_probe_reply(MonOpRequestRef op)
       sync_start(other, true);
       return;
     }
-    if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
+    if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
       dout(10) << " peer paxos last version " << m->paxos_last_version
               << " vs my version " << paxos->get_version()
               << " (too far ahead)"
@@ -1803,6 +2002,23 @@ void Monitor::handle_probe_reply(MonOpRequestRef op)
     }
   }
 
+  // did the existing cluster complete upgrade to luminous?
+  if (osdmon()->osdmap.get_epoch()) {
+    if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      derr << __func__ << " existing cluster has not completed upgrade to"
+          << " luminous; 'ceph osd require_osd_release luminous' before"
+          << " upgrading" << dendl;
+      exit(0);
+    }
+    if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
+       !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
+      derr << __func__ << " existing cluster has not completed a full luminous"
+          << " scrub to purge legacy snapdir objects; please scrub before"
+          << " upgrading beyond luminous." << dendl;
+      exit(0);
+    }
+  }
+
   // is there an existing quorum?
   if (m->quorum.size()) {
     dout(10) << " existing quorum " << m->quorum << dendl;
@@ -1813,13 +2029,14 @@ void Monitor::handle_probe_reply(MonOpRequestRef op)
              << dendl;
 
     if (monmap->contains(name) &&
-        !monmap->get_addr(name).is_blank_ip()) {
+        !monmap->get_addrs(name).front().is_blank_ip()) {
       // i'm part of the cluster; just initiate a new election
       start_election();
     } else {
       dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
-      messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
-                              monmap->get_inst(*m->quorum.begin()));
+      send_mon_message(
+       new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
+       *m->quorum.begin());
     }
   } else {
     if (monmap->contains(m->name)) {
@@ -1830,7 +2047,7 @@ void Monitor::handle_probe_reply(MonOpRequestRef op)
       return;
     }
 
-    unsigned need = monmap->size() / 2 + 1;
+    unsigned need = monmap->min_quorum_size();
     dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
     if (outside_quorum.size() >= need) {
       if (outside_quorum.count(name)) {
@@ -1879,7 +2096,7 @@ void Monitor::win_standalone_election()
   elector.advance_epoch();
 
   rank = monmap->get_rank(name);
-  assert(rank == 0);
+  ceph_assert(rank == 0);
   set<int> q;
   q.insert(rank);
 
@@ -1889,12 +2106,13 @@ void Monitor::win_standalone_election()
   win_election(elector.get_epoch(), q,
                CEPH_FEATURES_ALL,
                ceph::features::mon::get_supported(),
+              ceph_release(),
               metadata);
 }
 
 const utime_t& Monitor::get_leader_since() const
 {
-  assert(state == STATE_LEADER);
+  ceph_assert(state == STATE_LEADER);
   return leader_since;
 }
 
@@ -1905,31 +2123,35 @@ epoch_t Monitor::get_epoch()
 
 void Monitor::_finish_svc_election()
 {
-  assert(state == STATE_LEADER || state == STATE_PEON);
+  ceph_assert(state == STATE_LEADER || state == STATE_PEON);
 
-  for (auto p : paxos_service) {
+  for (auto& svc : paxos_service) {
     // we already called election_finished() on monmon(); avoid callig twice
-    if (state == STATE_LEADER && p == monmon())
+    if (state == STATE_LEADER && svc.get() == monmon())
       continue;
-    p->election_finished();
+    svc->election_finished();
   }
 }
 
 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
                            const mon_feature_t& mon_features,
+                          int min_mon_release,
                           const map<int,Metadata>& metadata)
 {
   dout(10) << __func__ << " epoch " << epoch << " quorum " << active
           << " features " << features
            << " mon_features " << mon_features
+          << " min_mon_release " << min_mon_release
            << dendl;
-  assert(is_electing());
+  ceph_assert(is_electing());
   state = STATE_LEADER;
   leader_since = ceph_clock_now();
+  quorum_since = mono_clock::now();
   leader = rank;
   quorum = active;
   quorum_con_features = features;
   quorum_mon_features = mon_features;
+  quorum_min_mon_release = min_mon_release;
   pending_metadata = metadata;
   outside_quorum.clear();
 
@@ -1946,7 +2168,6 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
   // round without agreeing on who the participants are.
   monmon()->election_finished();
   _finish_svc_election();
-  health_monitor->start(epoch);
 
   logger->inc(l_mon_election_win);
 
@@ -1967,7 +2188,7 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
     // do that anyway for other reasons, though.
     MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
     bufferlist bl;
-    ::encode(m, bl);
+    encode(m, bl);
     t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
   }
 
@@ -1986,7 +2207,7 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
         dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
         healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
               [this](int r){
-                assert(lock.is_locked_by_me());
+                ceph_assert(lock.is_locked_by_me());
                 do_health_to_clog_interval();
               }));
 
@@ -2001,42 +2222,75 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
 
 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
                             uint64_t features,
-                            const mon_feature_t& mon_features)
+                            const mon_feature_t& mon_features,
+                           int min_mon_release)
 {
   state = STATE_PEON;
   leader_since = utime_t();
+  quorum_since = mono_clock::now();
   leader = l;
   quorum = q;
   outside_quorum.clear();
   quorum_con_features = features;
   quorum_mon_features = mon_features;
+  quorum_min_mon_release = min_mon_release;
   dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
           << " quorum is " << quorum << " features are " << quorum_con_features
            << " mon_features are " << quorum_mon_features
+          << " min_mon_release " << min_mon_release
            << dendl;
 
   paxos->peon_init();
   _finish_svc_election();
-  health_monitor->start(epoch);
 
   logger->inc(l_mon_election_lose);
 
   finish_election();
+}
 
-  if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
-      !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
-    // for pre-luminous mons only
-    Metadata sys_info;
-    collect_metadata(&sys_info);
-    messenger->send_message(new MMonMetadata(sys_info),
-                           monmap->get_inst(get_leader()));
+namespace {
+std::string collect_compression_algorithms()
+{
+  ostringstream os;
+  bool printed = false;
+  for (auto [name, key] : Compressor::compression_algorithms) {
+    if (printed) {
+      os << ", ";
+    } else {
+      printed = true;
+    }
+    std::ignore = key;
+    os << name;
   }
+  return os.str();
+}
 }
 
 void Monitor::collect_metadata(Metadata *m)
 {
   collect_sys_info(m, g_ceph_context);
-  (*m)["addr"] = stringify(messenger->get_myaddr());
+  (*m)["addrs"] = stringify(messenger->get_myaddrs());
+  (*m)["compression_algorithms"] = collect_compression_algorithms();
+
+  // infer storage device
+  string devname = store->get_devname();
+  set<string> devnames;
+  get_raw_devices(devname, &devnames);
+  (*m)["devices"] = stringify(devnames);
+  string devids;
+  for (auto& devname : devnames) {
+    string err;
+    string id = get_device_id(devname, &err);
+    if (id.size()) {
+      if (!devids.empty()) {
+       devids += ",";
+      }
+      devids += devname + "=" + id;
+    } else {
+      derr << "failed to get devid for " << devname << ": " << err << dendl;
+    }
+  }
+  (*m)["device_ids"] = devids;
 }
 
 void Monitor::finish_election()
@@ -2051,12 +2305,19 @@ void Monitor::finish_election()
   update_logger();
   register_cluster_logger();
 
+  // enable authentication
+  {
+    std::lock_guard l(auth_lock);
+    authmon()->_set_mon_num_rank(monmap->size(), rank);
+  }
+
   // am i named properly?
-  string cur_name = monmap->get_name(messenger->get_myaddr());
+  string cur_name = monmap->get_name(messenger->get_myaddrs());
   if (cur_name != name) {
     dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
-    messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
-                           monmap->get_inst(*quorum.begin()));
+    send_mon_message(
+      new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
+      *quorum.begin());
   }
 }
 
@@ -2078,18 +2339,12 @@ void Monitor::_apply_compatset_features(CompatSet &new_features)
 void Monitor::apply_quorum_to_compatset_features()
 {
   CompatSet new_features(features);
-  if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
-    new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
-  }
+  new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
   if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
   }
-  if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
-    new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
-  }
-  if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
-    new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
-  }
+  new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
+  new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
   dout(5) << __func__ << dendl;
   _apply_compatset_features(new_features);
 }
@@ -2110,19 +2365,33 @@ void Monitor::apply_monmap_to_compatset_features()
    *   once you unset it.
    */
   if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
-    assert(ceph::features::mon::get_persistent().contains_all(
+    ceph_assert(ceph::features::mon::get_persistent().contains_all(
            ceph::features::mon::FEATURE_KRAKEN));
     // this feature should only ever be set if the quorum supports it.
-    assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
+    ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
   }
   if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
-    assert(ceph::features::mon::get_persistent().contains_all(
+    ceph_assert(ceph::features::mon::get_persistent().contains_all(
            ceph::features::mon::FEATURE_LUMINOUS));
     // this feature should only ever be set if the quorum supports it.
-    assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
+    ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
   }
+  if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
+    ceph_assert(ceph::features::mon::get_persistent().contains_all(
+           ceph::features::mon::FEATURE_MIMIC));
+    // this feature should only ever be set if the quorum supports it.
+    ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
+    new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
+  }
+  if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
+    ceph_assert(ceph::features::mon::get_persistent().contains_all(
+           ceph::features::mon::FEATURE_NAUTILUS));
+    // this feature should only ever be set if the quorum supports it.
+    ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
+    new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
+  }
 
   dout(5) << __func__ << dendl;
   _apply_compatset_features(new_features);
@@ -2133,24 +2402,22 @@ void Monitor::calc_quorum_requirements()
   required_features = 0;
 
   // compatset
-  if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
-    required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
-  }
   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
     required_features |= CEPH_FEATURE_OSDMAP_ENC;
   }
-  if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
-    required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
-  }
-  if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
-    required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
-  }
   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
     required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
   }
   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
     required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
   }
+  if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
+    required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
+  }
+  if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
+    required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
+      CEPH_FEATUREMASK_CEPHX_V2;
+  }
 
   // monmap
   if (monmap->get_required_features().contains_all(
@@ -2161,6 +2428,15 @@ void Monitor::calc_quorum_requirements()
        ceph::features::mon::FEATURE_LUMINOUS)) {
     required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
   }
+  if (monmap->get_required_features().contains_all(
+       ceph::features::mon::FEATURE_MIMIC)) {
+    required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
+  }
+  if (monmap->get_required_features().contains_all(
+       ceph::features::mon::FEATURE_NAUTILUS)) {
+    required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
+      CEPH_FEATUREMASK_CEPHX_V2;
+  }
   dout(10) << __func__ << " required_features " << required_features << dendl;
 }
 
@@ -2223,6 +2499,13 @@ void Monitor::_quorum_status(Formatter *f, ostream& ss)
 
   f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
 
+  if (!quorum.empty()) {
+    f->dump_int(
+      "quorum_age",
+      std::chrono::duration_cast<std::chrono::seconds>(
+       mono_clock::now() - quorum_since).count());
+  }
+
   f->open_object_section("monmap");
   monmap->dump(f);
   f->close_section(); // monmap
@@ -2253,9 +2536,15 @@ void Monitor::get_mon_status(Formatter *f, ostream& ss)
   for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
     f->dump_int("mon", *p);
   }
-
   f->close_section(); // quorum
 
+  if (!quorum.empty()) {
+    f->dump_int(
+      "quorum_age",
+      std::chrono::duration_cast<std::chrono::seconds>(
+       mono_clock::now() - quorum_since).count());
+  }
+
   f->open_object_section("features");
   f->dump_stream("required_con") << required_features;
   mon_feature_t req_mon_features = get_required_mon_features();
@@ -2270,10 +2559,11 @@ void Monitor::get_mon_status(Formatter *f, ostream& ss)
   f->close_section(); // outside_quorum
 
   f->open_array_section("extra_probe_peers");
-  for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
+  for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
        p != extra_probe_peers.end();
-       ++p)
-    f->dump_stream("peer") << *p;
+       ++p) {
+    f->dump_object("peer", *p);
+  }
   f->close_section(); // extra_probe_peers
 
   f->open_array_section("sync_provider");
@@ -2281,7 +2571,7 @@ void Monitor::get_mon_status(Formatter *f, ostream& ss)
        p != sync_providers.end();
        ++p) {
     f->dump_unsigned("cookie", p->second.cookie);
-    f->dump_stream("entity") << p->second.entity;
+    f->dump_object("addrs", p->second.addrs);
     f->dump_stream("timeout") << p->second.timeout;
     f->dump_unsigned("last_committed", p->second.last_committed);
     f->dump_stream("last_key") << p->second.last_key;
@@ -2296,10 +2586,10 @@ void Monitor::get_mon_status(Formatter *f, ostream& ss)
     f->close_section();
   }
 
-  if (g_conf->mon_sync_provider_kill_at > 0)
-    f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
-  if (g_conf->mon_sync_requester_kill_at > 0)
-    f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
+  if (g_conf()->mon_sync_provider_kill_at > 0)
+    f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
+  if (g_conf()->mon_sync_requester_kill_at > 0)
+    f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
 
   f->open_object_section("monmap");
   monmap->dump(f);
@@ -2408,6 +2698,7 @@ void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
   if (changed.count("mon_health_to_clog")) {
     if (!cct->_conf->mon_health_to_clog) {
       health_events_cleanup();
+      return;
     } else {
       if (!health_tick_event) {
         health_tick_start();
@@ -2462,39 +2753,15 @@ void Monitor::do_health_to_clog(bool force)
 
   dout(10) << __func__ << (force ? " (force)" : "") << dendl;
 
-  if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-    string summary;
-    health_status_t level = get_health_status(false, nullptr, &summary);
-    if (!force &&
-       summary == health_status_cache.summary &&
-       level == health_status_cache.overall)
-      return;
-    clog->health(level) << "overall " << summary;
-    health_status_cache.summary = summary;
-    health_status_cache.overall = level;
-  } else {
-    // for jewel only
-    list<string> status;
-    health_status_t overall = get_health(status, NULL, NULL);
-    dout(25) << __func__
-            << (force ? " (force)" : "")
-            << dendl;
-
-    string summary = joinify(status.begin(), status.end(), string("; "));
-
-    if (!force &&
-       overall == health_status_cache.overall &&
-       !health_status_cache.summary.empty() &&
-       health_status_cache.summary == summary) {
-      // we got a dup!
-      return;
-    }
-
-    clog->info() << summary;
-
-    health_status_cache.overall = overall;
-    health_status_cache.summary = summary;
-  }
+  string summary;
+  health_status_t level = get_health_status(false, nullptr, &summary);
+  if (!force &&
+      summary == health_status_cache.summary &&
+      level == health_status_cache.overall)
+    return;
+  clog->health(level) << "overall " << summary;
+  health_status_cache.summary = summary;
+  health_status_cache.overall = level;
 }
 
 health_status_t Monitor::get_health_status(
@@ -2505,8 +2772,6 @@ health_status_t Monitor::get_health_status(
   const char *sep2)
 {
   health_status_t r = HEALTH_OK;
-  bool compat = g_conf->mon_health_preluminous_compat;
-  bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
   if (f) {
     f->open_object_section("health");
     f->open_object_section("checks");
@@ -2522,6 +2787,7 @@ health_status_t Monitor::get_health_status(
   if (f) {
     f->close_section();
     f->dump_stream("status") << r;
+    f->close_section();
   } else {
     // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
     *plain = stringify(r);
@@ -2532,48 +2798,12 @@ health_status_t Monitor::get_health_status(
     *plain += "\n";
   }
 
-  const std::string old_fields_message = "'ceph health' JSON format has "
-    "changed in luminous. If you see this your monitoring system is "
-    "scraping the wrong fields. Disable this with 'mon health preluminous "
-    "compat warning = false'";
-
-  if (f && (compat || compat_warn)) {
-    health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
-    f->open_array_section("summary");
-    if (compat_warn) {
-      f->open_object_section("item");
-      f->dump_stream("severity") << HEALTH_WARN;
-      f->dump_string("summary", old_fields_message);
-      f->close_section();
-    }
-    if (compat) {
-      for (auto& svc : paxos_service) {
-        svc->get_health_checks().dump_summary_compat(f);
-      }
-    }
-    f->close_section();
-    f->dump_stream("overall_status") << cr;
-  }
-
-  if (want_detail) {
-    if (f && (compat || compat_warn)) {
-      f->open_array_section("detail");
-      if (compat_warn) {
-       f->dump_string("item", old_fields_message);
-      }
-    }
-
+  if (want_detail && !f) {
     for (auto& svc : paxos_service) {
-      svc->get_health_checks().dump_detail(f, plain, compat);
-    }
-
-    if (f && (compat || compat_warn)) {
-      f->close_section();
+      svc->get_health_checks().dump_detail(plain);
     }
   }
-  if (f) {
-    f->close_section();
-  }
+
   return r;
 }
 
@@ -2582,7 +2812,7 @@ void Monitor::log_health(
   const health_check_map_t& previous,
   MonitorDBStore::TransactionRef t)
 {
-  if (!g_conf->mon_health_to_clog) {
+  if (!g_conf()->mon_health_to_clog) {
     return;
   }
 
@@ -2592,7 +2822,7 @@ void Monitor::log_health(
   dout(10) << __func__ << " updated " << updated.checks.size()
           << " previous " << previous.checks.size()
           << dendl;
-  const auto min_log_period = g_conf->get_val<int64_t>(
+  const auto min_log_period = g_conf().get_val<int64_t>(
       "mon_health_log_update_period");
   for (auto& p : updated.checks) {
     auto q = previous.checks.find(p.first);
@@ -2682,119 +2912,15 @@ void Monitor::log_health(
   }
 }
 
-health_status_t Monitor::get_health(list<string>& status,
-                                    bufferlist *detailbl,
-                                    Formatter *f)
-{
-  list<pair<health_status_t,string> > summary;
-  list<pair<health_status_t,string> > detail;
-
-  if (f)
-    f->open_object_section("health");
-
-  for (vector<PaxosService*>::iterator p = paxos_service.begin();
-       p != paxos_service.end();
-       ++p) {
-    PaxosService *s = *p;
-    s->get_health(summary, detailbl ? &detail : NULL, cct);
-  }
-
-  health_monitor->get_health(summary, (detailbl ? &detail : NULL));
-
-  health_status_t overall = HEALTH_OK;
-  if (!timecheck_skews.empty()) {
-    list<string> warns;
-    for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
-         i != timecheck_skews.end(); ++i) {
-      entity_inst_t inst = i->first;
-      double skew = i->second;
-      double latency = timecheck_latencies[inst];
-      string name = monmap->get_name(inst.addr);
-      ostringstream tcss;
-      health_status_t tcstatus = timecheck_status(tcss, skew, latency);
-      if (tcstatus != HEALTH_OK) {
-        if (overall > tcstatus)
-          overall = tcstatus;
-        warns.push_back(name);
-        ostringstream tmp_ss;
-        tmp_ss << "mon." << name
-               << " addr " << inst.addr << " " << tcss.str()
-              << " (latency " << latency << "s)";
-        detail.push_back(make_pair(tcstatus, tmp_ss.str()));
-      }
-    }
-    if (!warns.empty()) {
-      ostringstream ss;
-      ss << "clock skew detected on";
-      while (!warns.empty()) {
-        ss << " mon." << warns.front();
-        warns.pop_front();
-        if (!warns.empty())
-          ss << ",";
-      }
-      status.push_back(ss.str());
-      summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
-    }
-  }
-
-  if (f)
-    f->open_array_section("summary");
-  if (!summary.empty()) {
-    while (!summary.empty()) {
-      if (overall > summary.front().first)
-       overall = summary.front().first;
-      status.push_back(summary.front().second);
-      if (f) {
-        f->open_object_section("item");
-        f->dump_stream("severity") <<  summary.front().first;
-        f->dump_string("summary", summary.front().second);
-        f->close_section();
-      }
-      summary.pop_front();
-    }
-  }
-  if (f)
-    f->close_section();
-
-  stringstream fss;
-  fss << overall;
-  status.push_front(fss.str());
-  if (f)
-    f->dump_stream("overall_status") << overall;
-
-  if (f)
-    f->open_array_section("detail");
-  while (!detail.empty()) {
-    if (f)
-      f->dump_string("item", detail.front().second);
-    else if (detailbl != NULL) {
-      detailbl->append(detail.front().second);
-      detailbl->append('\n');
-    }
-    detail.pop_front();
-  }
-  if (f)
-    f->close_section();
-
-  if (f)
-    f->close_section();
-
-  return overall;
-}
-
 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
 {
   if (f)
     f->open_object_section("status");
 
+  mono_clock::time_point now = mono_clock::now();
   if (f) {
     f->dump_stream("fsid") << monmap->get_fsid();
-    if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      get_health_status(false, f, nullptr);
-    } else {
-      list<string> health_str;
-      get_health(health_str, nullptr, f);
-    }
+    get_health_status(false, f, nullptr);
     f->dump_unsigned("election_epoch", get_epoch());
     {
       f->open_array_section("quorum");
@@ -2805,6 +2931,10 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
       for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
        f->dump_string("id", monmap->get_name(*p));
       f->close_section();
+      f->dump_int(
+       "quorum_age",
+       std::chrono::duration_cast<std::chrono::seconds>(
+         mono_clock::now() - quorum_since).count());
     }
     f->open_object_section("monmap");
     monmap->dump(f);
@@ -2813,7 +2943,7 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
     f->close_section();
     f->open_object_section("pgmap");
-    pgservice->print_summary(f, NULL);
+    mgrstatmon()->print_summary(f, NULL);
     f->close_section();
     f->open_object_section("fsmap");
     mdsmon()->get_fsmap().print_summary(f, NULL);
@@ -2823,21 +2953,21 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     f->close_section();
 
     f->dump_object("servicemap", mgrstatmon()->get_service_map());
+
+    f->open_object_section("progress_events");
+    for (auto& i : mgrstatmon()->get_progress_events()) {
+      f->dump_object(i.first.c_str(), i.second);
+    }
+    f->close_section();
+
     f->close_section();
   } else {
     ss << "  cluster:\n";
     ss << "    id:     " << monmap->get_fsid() << "\n";
 
     string health;
-    if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      get_health_status(false, nullptr, &health,
-                       "\n            ", "\n            ");
-    } else {
-      list<string> ls;
-      get_health(ls, NULL, f);
-      health = joinify(ls.begin(), ls.end(),
-                      string("\n            "));
-    }
+    get_health_status(false, nullptr, &health,
+                     "\n            ", "\n            ");
     ss << "    health: " << health << "\n";
 
     ss << "\n \n  services:\n";
@@ -2851,7 +2981,7 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
       const auto quorum_names = get_quorum_names();
       const auto mon_count = monmap->mon_info.size();
       ss << "    mon: " << spacing << mon_count << " daemons, quorum "
-        << quorum_names;
+        << quorum_names << " (age " << timespan_str(now - quorum_since) << ")";
       if (quorum_names.size() != mon_count) {
        std::list<std::string> out_of_q;
        for (size_t i = 0; i < monmap->ranks.size(); ++i) {
@@ -2881,16 +3011,32 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     }
 
     ss << "\n \n  data:\n";
-    pgservice->print_summary(NULL, &ss);
+    mgrstatmon()->print_summary(NULL, &ss);
+
+    auto& pem = mgrstatmon()->get_progress_events();
+    if (!pem.empty()) {
+      ss << "\n \n  progress:\n";
+      for (auto& i : pem) {
+       ss << "    " << i.second.message << "\n";
+       ss << "      [";
+       unsigned j;
+       for (j=0; j < i.second.progress * 30; ++j) {
+         ss << '=';
+       }
+       for (; j < 30; ++j) {
+         ss << '.';
+       }
+       ss << "]\n";
+      }
+    }
     ss << "\n ";
   }
 }
 
-void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
+void Monitor::_generate_command_map(cmdmap_t& cmdmap,
                                     map<string,string> &param_str_map)
 {
-  for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
-       p != cmdmap.end(); ++p) {
+  for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
     if (p->first == "prefix")
       continue;
     if (p->first == "caps") {
@@ -2920,8 +3066,8 @@ const MonCommand *Monitor::_get_moncommand(
   return nullptr;
 }
 
-bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
-                               const map<string,cmd_vartype>& cmdmap,
+bool Monitor::_allowed_command(MonSession *s, const string &module,
+                              const string &prefix, const cmdmap_t& cmdmap,
                                const map<string,string>& param_str_map,
                                const MonCommand *this_cmd) {
 
@@ -2934,7 +3080,8 @@ bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
     CEPH_ENTITY_TYPE_MON,
     s->entity_name,
     module, prefix, param_str_map,
-    cmd_r, cmd_w, cmd_x);
+    cmd_r, cmd_w, cmd_x,
+    s->get_peer_socket_addr());
 
   dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
   return capable;
@@ -2942,21 +3089,18 @@ bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
 
 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
                                          Formatter *f,
-                                         bufferlist *rdata,
-                                         bool hide_mgr_flag)
+                                         uint64_t features,
+                                         bufferlist *rdata)
 {
   int cmdnum = 0;
   f->open_object_section("command_descriptions");
   for (const auto &cmd : commands) {
     unsigned flags = cmd.flags;
-    if (hide_mgr_flag) {
-      flags &= ~MonCommand::FLAG_MGR;
-    }
     ostringstream secname;
     secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
-    dump_cmddesc_to_json(f, secname.str(),
+    dump_cmddesc_to_json(f, features, secname.str(),
                         cmd.cmdstring, cmd.helpstring, cmd.module,
-                        cmd.req_perms, cmd.availability, flags);
+                        cmd.req_perms, flags);
     cmdnum++;
   }
   f->close_section();  // command_descriptions
@@ -2966,13 +3110,10 @@ void Monitor::format_command_descriptions(const std::vector<MonCommand> &command
 
 bool Monitor::is_keyring_required()
 {
-  string auth_cluster_required = g_conf->auth_supported.empty() ?
-    g_conf->auth_cluster_required : g_conf->auth_supported;
-  string auth_service_required = g_conf->auth_supported.empty() ?
-    g_conf->auth_service_required : g_conf->auth_supported;
-
-  return auth_service_required == "cephx" ||
-    auth_cluster_required == "cephx";
+  return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) || 
+         auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) || 
+         auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS)   || 
+         auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
 }
 
 struct C_MgrProxyCommand : public Context {
@@ -2984,7 +3125,7 @@ struct C_MgrProxyCommand : public Context {
   C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
     : mon(mon), op(op), size(s) { }
   void finish(int r) {
-    Mutex::Locker l(mon->lock);
+    std::lock_guard l(mon->lock);
     mon->mgr_proxy_bytes -= size;
     mon->reply_command(op, r, outs, outbl, 0);
   }
@@ -2992,7 +3133,7 @@ struct C_MgrProxyCommand : public Context {
 
 void Monitor::handle_command(MonOpRequestRef op)
 {
-  assert(op->is_type_command());
+  ceph_assert(op->is_type_command());
   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
   if (m->fsid != monmap->fsid) {
     dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
@@ -3000,15 +3141,11 @@ void Monitor::handle_command(MonOpRequestRef op)
     return;
   }
 
-  MonSession *session = static_cast<MonSession *>(
-    m->get_connection()->get_priv());
+  MonSession *session = op->get_session();
   if (!session) {
     dout(5) << __func__ << " dropping stray message " << *m << dendl;
     return;
   }
-  BOOST_SCOPE_EXIT_ALL(=) {
-    session->put();
-  };
 
   if (m->cmd.empty()) {
     string rs = "No command supplied";
@@ -3018,7 +3155,7 @@ void Monitor::handle_command(MonOpRequestRef op)
 
   string prefix;
   vector<string> fullcmd;
-  map<string, cmd_vartype> cmdmap;
+  cmdmap_t cmdmap;
   stringstream ss, ds;
   bufferlist rdata;
   string rs;
@@ -3050,24 +3187,16 @@ void Monitor::handle_command(MonOpRequestRef op)
   if (prefix == "get_command_descriptions") {
     bufferlist rdata;
     Formatter *f = Formatter::create("json");
-    // hide mgr commands until luminous upgrade is complete
-    bool hide_mgr_flag =
-      osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
-
-    std::vector<MonCommand> commands;
 
-    // only include mgr commands once all mons are upgrade (and we've dropped
-    // the hard-coded PGMonitor commands)
-    if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
-      commands = static_cast<MgrMonitor*>(
-        paxos_service[PAXOS_MGR])->get_command_descs();
-    }
+    std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
+        paxos_service[PAXOS_MGR].get())->get_command_descs();
 
     for (auto& c : leader_mon_commands) {
       commands.push_back(c);
     }
 
-    format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
+    auto features = m->get_connection()->get_features();
+    format_command_descriptions(commands, f, features, &rdata);
     delete f;
     reply_command(op, 0, "", rdata, 0);
     return;
@@ -3175,7 +3304,7 @@ void Monitor::handle_command(MonOpRequestRef op)
                         param_str_map, mon_cmd)) {
     dout(1) << __func__ << " access denied" << dendl;
     (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
-      << "from='" << session->inst << "' "
+      << "from='" << session->name << " " << session->addrs << "' "
       << "entity='" << session->entity_name << "' "
       << "cmd=" << m->cmd << ":  access denied";
     reply_command(op, -EACCES, "access denied", 0);
@@ -3183,16 +3312,15 @@ void Monitor::handle_command(MonOpRequestRef op)
   }
 
   (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
-    << "from='" << session->inst << "' "
-    << "entity='" << session->entity_name << "' "
-    << "cmd=" << m->cmd << ": dispatch";
+      << "from='" << session->name << " " << session->addrs << "' "
+      << "entity='" << session->entity_name << "' "
+      << "cmd=" << m->cmd << ": dispatch";
 
-  if (mon_cmd->is_mgr() &&
-      osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+  if (mon_cmd->is_mgr()) {
     const auto& hdr = m->get_header();
     uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
-    uint64_t max = g_conf->get_val<uint64_t>("mon_client_bytes")
-                 * g_conf->get_val<double>("mon_mgr_proxy_client_bytes_ratio");
+    uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
+                 * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
     if (mgr_proxy_bytes + size > max) {
       dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
               << " + " << size << " > max " << max << dendl;
@@ -3216,16 +3344,18 @@ void Monitor::handle_command(MonOpRequestRef op)
     mdsmon()->dispatch(op);
     return;
   }
-  if ((module == "osd" || prefix == "pg map") &&
+  if ((module == "osd" ||
+       prefix == "pg map" ||
+       prefix == "pg repeer") &&
       prefix != "osd last-stat-seq") {
     osdmon()->dispatch(op);
     return;
   }
-
-  if (module == "pg") {
-    pgmon()->dispatch(op);
+  if (module == "config") {
+    configmon()->dispatch(op);
     return;
   }
+
   if (module == "mon" &&
       /* Let the Monitor class handle the following commands:
        *  'mon compact'
@@ -3237,7 +3367,10 @@ void Monitor::handle_command(MonOpRequestRef op)
       prefix != "mon sync force" &&
       prefix != "mon metadata" &&
       prefix != "mon versions" &&
-      prefix != "mon count-metadata") {
+      prefix != "mon count-metadata" &&
+      prefix != "mon ok-to-stop" &&
+      prefix != "mon ok-to-add-offline" &&
+      prefix != "mon ok-to-rm") {
     monmon()->dispatch(op);
     return;
   }
@@ -3289,13 +3422,13 @@ void Monitor::handle_command(MonOpRequestRef op)
 
   if (prefix == "compact" || prefix == "mon compact") {
     dout(1) << "triggering manual compaction" << dendl;
-    utime_t start = ceph_clock_now();
-    store->compact();
-    utime_t end = ceph_clock_now();
-    end -= start;
-    dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
+    auto start = ceph::coarse_mono_clock::now();
+    store->compact_async();
+    auto end = ceph::coarse_mono_clock::now();
+    double duration = std::chrono::duration<double>(end-start).count();
+    dout(1) << "finished manual compaction in " << duration << " seconds" << dendl;
     ostringstream oss;
-    oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
+    oss << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb") << " in " << duration << " seconds";
     rs = oss.str();
     r = 0;
   }
@@ -3305,7 +3438,7 @@ void Monitor::handle_command(MonOpRequestRef op)
     if (!injected_args.empty()) {
       dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
       ostringstream oss;
-      r = g_conf->injectargs(str_join(injected_args, " "), &oss);
+      r = g_conf().injectargs(str_join(injected_args, " "), &oss);
       ss << "injectargs:"  << oss.str();
       rs = ss.str();
       goto out;
@@ -3320,10 +3453,9 @@ void Monitor::handle_command(MonOpRequestRef op)
     if (!timecheck_skews.empty()) {
       f->open_object_section("time_skew_status");
       for (auto& i : timecheck_skews) {
-       entity_inst_t inst = i.first;
        double skew = i.second;
-       double latency = timecheck_latencies[inst];
-       string name = monmap->get_name(inst.addr);
+       double latency = timecheck_latencies[i.first];
+       string name = monmap->get_name(i.first);
        ostringstream tcss;
        health_status_t tcstatus = timecheck_status(tcss, skew, latency);
        f->open_object_section(name.c_str());
@@ -3352,9 +3484,9 @@ void Monitor::handle_command(MonOpRequestRef op)
     cmd_getval(cct, cmdmap, "key", key);
     std::string val;
     cmd_getval(cct, cmdmap, "value", val);
-    r = g_conf->set_val(key, val, true, &ss);
+    r = g_conf().set_val(key, val, &ss);
     if (r == 0) {
-      g_conf->apply_changes(nullptr);
+      g_conf().apply_changes(nullptr);
     }
     rs = ss.str();
     goto out;
@@ -3374,44 +3506,23 @@ void Monitor::handle_command(MonOpRequestRef op)
       }
       rdata.append(ds);
     } else if (prefix == "health") {
-      if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-       string plain;
-       get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
-       if (f) {
-         f->flush(rdata);
-       } else {
-         rdata.append(plain);
-       }
+      string plain;
+      get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
+      if (f) {
+       f->flush(rdata);
       } else {
-       list<string> health_str;
-       get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
-       if (f) {
-         f->flush(ds);
-         ds << '\n';
-       } else {
-         assert(!health_str.empty());
-         ds << health_str.front();
-         health_str.pop_front();
-         if (!health_str.empty()) {
-           ds << ' ';
-           ds << joinify(health_str.begin(), health_str.end(), string("; "));
-         }
-       }
-       bufferlist comb;
-       comb.append(ds);
-       if (detail == "detail")
-         comb.append(rdata);
-       rdata = comb;
+       rdata.append(plain);
       }
     } else if (prefix == "df") {
       bool verbose = (detail == "detail");
       if (f)
         f->open_object_section("stats");
 
-      pgservice->dump_fs_stats(&ds, f.get(), verbose);
-      if (!f)
-        ds << '\n';
-      pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
+      mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
+      if (!f) {
+       ds << "\n \n";
+      }
+      mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
 
       if (f) {
         f->close_section();
@@ -3419,7 +3530,7 @@ void Monitor::handle_command(MonOpRequestRef op)
         ds << '\n';
       }
     } else {
-      assert(0 == "We should never get here!");
+      ceph_abort_msg("We should never get here!");
       return;
     }
     rdata.append(ds);
@@ -3443,18 +3554,13 @@ void Monitor::handle_command(MonOpRequestRef op)
       tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
     f->dump_string("tag", tagstr);
 
-    if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      get_health_status(true, f.get(), nullptr);
-    } else {
-      list<string> health_str;
-      get_health(health_str, nullptr, f.get());
-    }
+    get_health_status(true, f.get(), nullptr);
 
     monmon()->dump_info(f.get());
     osdmon()->dump_info(f.get());
     mdsmon()->dump_info(f.get());
     authmon()->dump_info(f.get());
-    pgservice->dump_info(f.get());
+    mgrstatmon()->dump_info(f.get());
 
     paxos->dump_info(f.get());
 
@@ -3462,7 +3568,7 @@ void Monitor::handle_command(MonOpRequestRef op)
     f->flush(rdata);
 
     ostringstream ss2;
-    ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
+    ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
     rs = ss2.str();
     r = 0;
   } else if (prefix == "osd last-stat-seq") {
@@ -3488,6 +3594,7 @@ void Monitor::handle_command(MonOpRequestRef op)
       print_nodes(f.get(), ds);
       osdmon()->print_nodes(f.get());
       mdsmon()->print_nodes(f.get());
+      mgrmon()->print_nodes(f.get());
       f->close_section();
     } else if (node_type == "mon") {
       print_nodes(f.get(), ds);
@@ -3495,6 +3602,8 @@ void Monitor::handle_command(MonOpRequestRef op)
       osdmon()->print_nodes(f.get());
     } else if (node_type == "mds") {
       mdsmon()->print_nodes(f.get());
+    } else if (node_type == "mgr") {
+      mgrmon()->print_nodes(f.get());
     }
     f->flush(ds);
     rdata.append(ds);
@@ -3591,6 +3700,59 @@ void Monitor::handle_command(MonOpRequestRef op)
     rdata.append(ds);
     rs = "";
     r = 0;
+  } else if (prefix == "mon ok-to-stop") {
+    vector<string> ids;
+    if (!cmd_getval(g_ceph_context, cmdmap, "ids", ids)) {
+      r = -EINVAL;
+      goto out;
+    }
+    set<string> wouldbe;
+    for (auto rank : quorum) {
+      wouldbe.insert(monmap->get_name(rank));
+    }
+    for (auto& n : ids) {
+      if (monmap->contains(n)) {
+       wouldbe.erase(n);
+      }
+    }
+    if (wouldbe.size() < monmap->min_quorum_size()) {
+      r = -EBUSY;
+      rs = "not enough monitors would be available (" + stringify(wouldbe) +
+       ") after stopping mons " + stringify(ids);
+      goto out;
+    }
+    r = 0;
+    rs = "quorum should be preserved (" + stringify(wouldbe) +
+      ") after stopping " + stringify(ids);
+  } else if (prefix == "mon ok-to-add-offline") {
+    if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
+      rs = "adding a monitor may break quorum (until that monitor starts)";
+      r = -EBUSY;
+      goto out;
+    }
+    rs = "adding another mon that is not yet online will not break quorum";
+    r = 0;
+  } else if (prefix == "mon ok-to-rm") {
+    string id;
+    if (!cmd_getval(g_ceph_context, cmdmap, "id", id)) {
+      r = -EINVAL;
+      rs = "must specify a monitor id";
+      goto out;
+    }
+    if (!monmap->contains(id)) {
+      r = 0;
+      rs = "mon." + id + " does not exist";
+      goto out;
+    }
+    int rank = monmap->get_rank(id);
+    if (quorum.count(rank) &&
+       quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
+      r = -EBUSY;
+      rs = "removing mon." + id + " would break quorum";
+      goto out;
+    }
+    r = 0;
+    rs = "safe to remove mon." + id;
   } else if (prefix == "mon_status") {
     get_mon_status(f.get(), ds);
     if (f)
@@ -3600,11 +3762,12 @@ void Monitor::handle_command(MonOpRequestRef op)
     r = 0;
   } else if (prefix == "sync force" ||
              prefix == "mon sync force") {
-    string validate1, validate2;
-    cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
-    cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
-    if (validate1 != "--yes-i-really-mean-it" ||
-       validate2 != "--i-know-what-i-am-doing") {
+    bool validate1 = false;
+    cmd_getval(g_ceph_context, cmdmap, "yes_i_really_mean_it", validate1);
+    bool validate2 = false;
+    cmd_getval(g_ceph_context, cmdmap, "i_know_what_i_am_doing", validate2);
+
+    if (!validate1 || !validate2) {
       r = -EINVAL;
       rs = "are you SURE? this will mean the monitor store will be "
           "erased.  pass '--yes-i-really-mean-it "
@@ -3623,6 +3786,9 @@ void Monitor::handle_command(MonOpRequestRef op)
       // XXX 1-element vector, change at callee or make vector here?
       vector<string> heapcmd_vec;
       get_str_vec(heapcmd, heapcmd_vec);
+      string value;
+      if (cmd_getval(g_ceph_context, cmdmap, "value", value))
+        heapcmd_vec.push_back(value);
       ceph_heap_profiler_handle_command(heapcmd_vec, ds);
       rdata.append(ds);
       rs = "";
@@ -3716,6 +3882,37 @@ void Monitor::handle_command(MonOpRequestRef op)
     f->flush(rdata);
     rs = "";
     r = 0;
+  } else if (prefix == "smart") {
+    string want_devid;
+    cmd_getval(cct, cmdmap, "devid", want_devid);
+
+    string devname = store->get_devname();
+    set<string> devnames;
+    get_raw_devices(devname, &devnames);
+    json_spirit::mObject json_map;
+    uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
+      "mon_smart_report_timeout");
+    for (auto& devname : devnames) {
+      string err;
+      string devid = get_device_id(devname, &err);
+      if (want_devid.size() && want_devid != devid) {
+       derr << "get_device_id failed on " << devname << ": " << err << dendl;
+       continue;
+      }
+      json_spirit::mValue smart_json;
+      if (block_device_get_metrics(devname, smart_timeout,
+                                  &smart_json)) {
+       dout(10) << "block_device_get_metrics failed for /dev/" << devname
+                << dendl;
+       continue;
+      }
+      json_map[devid] = smart_json;
+    }
+    ostringstream ss;
+    json_spirit::write(json_map, ss, json_spirit::pretty_print);
+    rdata.append(ss.str());
+    r = 0;
+    rs = "";
   }
 
  out:
@@ -3733,7 +3930,7 @@ void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
                             bufferlist& rdata, version_t version)
 {
   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
-  assert(m->get_type() == MSG_MON_COMMAND);
+  ceph_assert(m->get_type() == MSG_MON_COMMAND);
   MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
   reply->set_tid(m->get_tid());
   reply->set_data(rdata);
@@ -3757,14 +3954,13 @@ void Monitor::forward_request_leader(MonOpRequestRef op)
   MonSession *session = op->get_session();
   PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
   
-  if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
+  if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
     dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
   } else if (session->proxy_con) {
     dout(10) << "forward_request won't double fwd request " << *req << dendl;
   } else if (!session->closed) {
     RoutedRequest *rr = new RoutedRequest;
     rr->tid = ++routed_request_tid;
-    rr->client_inst = req->get_source_inst();
     rr->con = req->get_connection();
     rr->con_features = rr->con->get_features();
     encode_message(req, CEPH_FEATURES_ALL, rr->request_bl);   // for my use only; use all features
@@ -3786,9 +3982,9 @@ void Monitor::forward_request_leader(MonOpRequestRef op)
     } else if (req->get_source().is_mon()) {
       forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
     }
-    messenger->send_message(forward, monmap->get_inst(mon));
+    send_mon_message(forward, mon);
     op->mark_forwarded();
-    assert(op->get_req()->get_type() != 0);
+    ceph_assert(op->get_req()->get_type() != 0);
   } else {
     dout(10) << "forward_request no session for request " << *req << dendl;
   }
@@ -3796,13 +3992,17 @@ void Monitor::forward_request_leader(MonOpRequestRef op)
 
 // fake connection attached to forwarded messages
 struct AnonConnection : public Connection {
-  explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
+  entity_addr_t socket_addr;
+  explicit AnonConnection(CephContext *cct,
+                         const entity_addr_t& sa)
+    : Connection(cct, NULL),
+      socket_addr(sa) {}
 
   int send_message(Message *m) override {
-    assert(!"send_message on anonymous connection");
+    ceph_assert(!"send_message on anonymous connection");
   }
   void send_keepalive() override {
-    assert(!"send_keepalive on anonymous connection");
+    ceph_assert(!"send_keepalive on anonymous connection");
   }
   void mark_down() override {
     // silently ignore
@@ -3811,16 +4011,21 @@ struct AnonConnection : public Connection {
     // silengtly ignore
   }
   bool is_connected() override { return false; }
+  entity_addr_t get_peer_socket_addr() const override {
+    return socket_addr;
+  }
 };
 
 //extract the original message and put it into the regular dispatch function
 void Monitor::handle_forward(MonOpRequestRef op)
 {
   MForward *m = static_cast<MForward*>(op->get_req());
-  dout(10) << "received forwarded message from " << m->client
+  dout(10) << "received forwarded message from "
+          << ceph_entity_type_name(m->client_type)
+          << " " << m->client_addrs
           << " via " << m->get_source_inst() << dendl;
   MonSession *session = op->get_session();
-  assert(session);
+  ceph_assert(session);
 
   if (!session->is_capable("mon", MON_CAP_X)) {
     dout(0) << "forward from entity with insufficient caps! " 
@@ -3829,16 +4034,18 @@ void Monitor::handle_forward(MonOpRequestRef op)
     // see PaxosService::dispatch(); we rely on this being anon
     // (c->msgr == NULL)
     PaxosServiceMessage *req = m->claim_message();
-    assert(req != NULL);
-
-    ConnectionRef c(new AnonConnection(cct));
-    MonSession *s = new MonSession(req->get_source_inst(),
-                                  static_cast<Connection*>(c.get()));
-    c->set_priv(s->get());
-    c->set_peer_addr(m->client.addr);
-    c->set_peer_type(m->client.name.type());
+    ceph_assert(req != NULL);
+
+    ConnectionRef c(new AnonConnection(cct, m->client_socket_addr));
+    MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
+    s->_ident(req->get_source(),
+             req->get_source_addrs());
+    c->set_priv(RefCountedPtr{s, false});
+    c->set_peer_addrs(m->client_addrs);
+    c->set_peer_type(m->client_type);
     c->set_features(m->con_features);
 
+    s->authenticated = true;
     s->caps = m->client_caps;
     dout(10) << " caps are " << s->caps << dendl;
     s->entity_name = m->entity_name;
@@ -3859,34 +4066,13 @@ void Monitor::handle_forward(MonOpRequestRef op)
      */
     req->rx_election_epoch = get_epoch();
 
-    /* Because this is a special fake connection, we need to break
-       the ref loop between Connection and MonSession differently
-       than we normally do. Here, the Message refers to the Connection
-       which refers to the Session, and nobody else refers to the Connection
-       or the Session. And due to the special nature of this message,
-       nobody refers to the Connection via the Session. So, clear out that
-       half of the ref loop.*/
-    s->con.reset(NULL);
-
     dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
-
     _ms_dispatch(req);
-    s->put();
-  }
-}
-
-void Monitor::try_send_message(Message *m, const entity_inst_t& to)
-{
-  dout(10) << "try_send_message " << *m << " to " << to << dendl;
-
-  bufferlist bl;
-  encode_message(m, quorum_con_features, bl);
-
-  messenger->send_message(m, to);
 
-  for (int i=0; i<(int)monmap->size(); i++) {
-    if (i != rank)
-      messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
+    // break the session <-> con ref loop by removing the con->session
+    // reference, which is no longer needed once the MonOpRequest is
+    // set up.
+    c->set_priv(NULL);
   }
 }
 
@@ -3895,7 +4081,7 @@ void Monitor::send_reply(MonOpRequestRef op, Message *reply)
   op->mark_event(__func__);
 
   MonSession *session = op->get_session();
-  assert(session);
+  ceph_assert(session);
   Message *req = op->get_req();
   ConnectionRef con = op->get_connection();
 
@@ -3959,9 +4145,10 @@ void Monitor::handle_route(MonOpRequestRef op)
     return;
   }
   if (m->msg)
-    dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
+    dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
+            << dendl;
   else
-    dout(10) << "handle_route null to " << m->dest << dendl;
+    dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
   
   // look it up
   if (m->session_mon_tid) {
@@ -3979,7 +4166,7 @@ void Monitor::handle_route(MonOpRequestRef op)
        osdmon()->send_incremental(m->send_osdmap_first, rr->session,
                                   true, MonOpRequestRef());
       }
-      assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
+      ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
       routed_requests.erase(m->session_mon_tid);
       rr->session->routed_request_tids.erase(m->session_mon_tid);
       delete rr;
@@ -3987,11 +4174,7 @@ void Monitor::handle_route(MonOpRequestRef op)
       dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
     }
   } else {
-    dout(10) << " not a routed request, trying to send anyway" << dendl;
-    if (m->msg) {
-      messenger->send_message(m->msg, m->dest);
-      m->msg = NULL;
-    }
+    dout(10) << " not a routed request, ignoring" << dendl;
   }
 }
 
@@ -4010,21 +4193,27 @@ void Monitor::resend_routed_requests()
       rr->op->mark_event("retry routed request");
       retry.push_back(new C_RetryMessage(this, rr->op));
       if (rr->session) {
-        assert(rr->session->routed_request_tids.count(p->first));
+        ceph_assert(rr->session->routed_request_tids.count(p->first));
         rr->session->routed_request_tids.erase(p->first);
       }
       delete rr;
     } else {
-      bufferlist::iterator q = rr->request_bl.begin();
-      PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
+      auto q = rr->request_bl.cbegin();
+      PaxosServiceMessage *req =
+       (PaxosServiceMessage *)decode_message(cct, 0, q);
       rr->op->mark_event("resend forwarded message to leader");
-      dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
-      MForward *forward = new MForward(rr->tid, req, rr->con_features,
+      dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
+              << dendl;
+      MForward *forward = new MForward(rr->tid,
+                                      req,
+                                      rr->con_features,
                                       rr->session->caps);
       req->put();  // forward takes its own ref; drop ours.
-      forward->client = rr->client_inst;
+      forward->client_type = rr->con->get_peer_type();
+      forward->client_addrs = rr->con->get_peer_addrs();
+      forward->client_socket_addr = rr->con->get_peer_socket_addr();
       forward->set_priority(req->get_priority());
-      messenger->send_message(forward, monmap->get_inst(mon));
+      send_mon_message(forward, mon);
     }
   }
   if (mon == rank) {
@@ -4035,21 +4224,21 @@ void Monitor::resend_routed_requests()
 
 void Monitor::remove_session(MonSession *s)
 {
-  dout(10) << "remove_session " << s << " " << s->inst
+  dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
           << " features 0x" << std::hex << s->con_features << std::dec << dendl;
-  assert(s->con);
-  assert(!s->closed);
+  ceph_assert(s->con);
+  ceph_assert(!s->closed);
   for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
        p != s->routed_request_tids.end();
        ++p) {
-    assert(routed_requests.count(*p));
+    ceph_assert(routed_requests.count(*p));
     RoutedRequest *rr = routed_requests[*p];
     dout(10) << " dropping routed request " << rr->tid << dendl;
     delete rr;
     routed_requests.erase(*p);
   }
   s->routed_request_tids.clear();
-  s->con->set_priv(NULL);
+  s->con->set_priv(nullptr);
   session_map.remove_session(s);
   logger->set(l_mon_num_sessions, session_map.get_size());
   logger->inc(l_mon_session_rm);
@@ -4057,24 +4246,19 @@ void Monitor::remove_session(MonSession *s)
 
 void Monitor::remove_all_sessions()
 {
-  Mutex::Locker l(session_map_lock);
+  std::lock_guard l(session_map_lock);
   while (!session_map.sessions.empty()) {
     MonSession *s = session_map.sessions.front();
     remove_session(s);
-    if (logger)
-      logger->inc(l_mon_session_rm);
+    logger->inc(l_mon_session_rm);
   }
   if (logger)
     logger->set(l_mon_num_sessions, session_map.get_size());
 }
 
-void Monitor::send_command(const entity_inst_t& inst,
-                          const vector<string>& com)
+void Monitor::send_mon_message(Message *m, int rank)
 {
-  dout(10) << "send_command " << inst << "" << com << dendl;
-  MMonCommand *c = new MMonCommand(monmap->fsid);
-  c->cmd = com;
-  try_send_message(c, inst);
+  messenger->send_to_mon(m, monmap->get_addrs(rank));
 }
 
 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
@@ -4108,7 +4292,7 @@ void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
     // proxied sessions aren't registered and don't have a con; don't remove
     // those.
     if (!s->proxy_con) {
-      Mutex::Locker l(session_map_lock);
+      std::lock_guard l(session_map_lock);
       remove_session(s);
     }
     op->mark_zap();
@@ -4169,11 +4353,13 @@ void Monitor::_ms_dispatch(Message *m)
 
     ConnectionRef con = m->get_connection();
     {
-      Mutex::Locker l(session_map_lock);
-      s = session_map.new_session(m->get_source_inst(), con.get());
+      std::lock_guard l(session_map_lock);
+      s = session_map.new_session(m->get_source(),
+                                 m->get_source_addrs(),
+                                 con.get());
     }
-    assert(s);
-    con->set_priv(s->get());
+    ceph_assert(s);
+    con->set_priv(RefCountedPtr{s, false});
     dout(10) << __func__ << " new session " << s << " " << *s
             << " features 0x" << std::hex
             << s->con_features << std::dec << dendl;
@@ -4186,18 +4372,18 @@ void Monitor::_ms_dispatch(Message *m)
       // give it monitor caps; the peer type has been authenticated
       dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
       if (!s->caps.is_allow_all()) // but no need to repeatedly copy
-        s->caps = *mon_caps;
+        s->caps = mon_caps;
+      s->authenticated = true;
     }
-    s->put();
   } else {
-    dout(20) << __func__ << " existing session " << s << " for " << s->inst
+    dout(20) << __func__ << " existing session " << s << " for " << s->name
             << dendl;
   }
 
-  assert(s);
+  ceph_assert(s);
 
   s->session_timeout = ceph_clock_now();
-  s->session_timeout += g_conf->mon_session_timeout;
+  s->session_timeout += g_conf()->mon_session_timeout;
 
   if (s->auth_handler) {
     s->entity_name = s->auth_handler->get_entity_name();
@@ -4205,7 +4391,7 @@ void Monitor::_ms_dispatch(Message *m)
   dout(20) << " caps " << s->caps.get_str() << dendl;
 
   if ((is_synchronizing() ||
-       (s->global_id == 0 && !exited_quorum.is_zero())) &&
+       (!s->authenticated && !exited_quorum.is_zero())) &&
       !src_is_mon &&
       m->get_type() != CEPH_MSG_PING) {
     waitlist_or_zap_client(op);
@@ -4219,7 +4405,7 @@ void Monitor::dispatch_op(MonOpRequestRef op)
 {
   op->mark_event("mon:dispatch_op");
   MonSession *s = op->get_session();
-  assert(s);
+  ceph_assert(s);
   if (s->closed) {
     dout(10) << " session closed, dropping " << op->get_req() << dendl;
     return;
@@ -4228,7 +4414,6 @@ void Monitor::dispatch_op(MonOpRequestRef op)
   /* we will consider the default type as being 'monitor' until proven wrong */
   op->set_type_monitor();
   /* deal with all messages that do not necessarily need caps */
-  bool dealt_with = true;
   switch (op->get_req()->get_type()) {
     // auth
     case MSG_MON_GLOBAL_ID:
@@ -4236,37 +4421,41 @@ void Monitor::dispatch_op(MonOpRequestRef op)
       op->set_type_service();
       /* no need to check caps here */
       paxos_service[PAXOS_AUTH]->dispatch(op);
-      break;
+      return;
 
     case CEPH_MSG_PING:
       handle_ping(op);
-      break;
+      return;
+  }
 
-    /* MMonGetMap may be used by clients to obtain a monmap *before*
-     * authenticating with the monitor.  We need to handle these without
-     * checking caps because, even on a cluster without cephx, we only set
-     * session caps *after* the auth handshake.  A good example of this
-     * is when a client calls MonClient::get_monmap_privately(), which does
-     * not authenticate when obtaining a monmap.
-     */
+  if (!op->get_session()->authenticated) {
+    dout(5) << __func__ << " " << op->get_req()->get_source_inst()
+            << " is not authenticated, dropping " << *(op->get_req())
+            << dendl;
+    return;
+  }
+
+  switch (op->get_req()->get_type()) {
     case CEPH_MSG_MON_GET_MAP:
       handle_mon_get_map(op);
-      break;
+      return;
+
+    case MSG_GET_CONFIG:
+      configmon()->handle_get_config(op);
+      return;
 
     case CEPH_MSG_MON_METADATA:
       return handle_mon_metadata(op);
 
-    default:
-      dealt_with = false;
-      break;
+    case CEPH_MSG_MON_SUBSCRIBE:
+      /* FIXME: check what's being subscribed, filter accordingly */
+      handle_subscribe(op);
+      return;
   }
-  if (dealt_with)
-    return;
 
   /* well, maybe the op belongs to a service... */
   op->set_type_service();
   /* deal with all messages which caps should be checked somewhere else */
-  dealt_with = true;
   switch (op->get_req()->get_type()) {
 
     // OSDs
@@ -4281,57 +4470,39 @@ void Monitor::dispatch_op(MonOpRequestRef op)
     case MSG_OSD_PGTEMP:
     case MSG_OSD_PG_CREATED:
     case MSG_REMOVE_SNAPS:
+    case MSG_OSD_PG_READY_TO_MERGE:
       paxos_service[PAXOS_OSDMAP]->dispatch(op);
-      break;
+      return;
 
     // MDSs
     case MSG_MDS_BEACON:
     case MSG_MDS_OFFLOAD_TARGETS:
       paxos_service[PAXOS_MDSMAP]->dispatch(op);
-      break;
+      return;
 
     // Mgrs
     case MSG_MGR_BEACON:
       paxos_service[PAXOS_MGR]->dispatch(op);
-      break;
+      return;
 
     // MgrStat
-    case CEPH_MSG_STATFS:
-      // this is an ugly hack, sorry!  force the version to 1 so that we do
-      // not run afoul of the is_readable() paxos check.  the client is going
-      // by the pgmonitor version and the MgrStatMonitor version will lag behind
-      // that until we complete the upgrade.  The paxos ordering crap really
-      // doesn't matter for statfs results, so just kludge around it here.
-      if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-       ((MStatfs*)op->get_req())->version = 1;
-      }
     case MSG_MON_MGR_REPORT:
+    case CEPH_MSG_STATFS:
     case MSG_GETPOOLSTATS:
       paxos_service[PAXOS_MGRSTAT]->dispatch(op);
-      break;
-
-    // pg
-    case MSG_PGSTATS:
-      paxos_service[PAXOS_PGMAP]->dispatch(op);
-      break;
+      return;
 
-    // log
+      // log
     case MSG_LOG:
       paxos_service[PAXOS_LOG]->dispatch(op);
-      break;
+      return;
 
     // handle_command() does its own caps checking
     case MSG_MON_COMMAND:
       op->set_type_command();
       handle_command(op);
-      break;
-
-    default:
-      dealt_with = false;
-      break;
+      return;
   }
-  if (dealt_with)
-    return;
 
   /* nop, looks like it's not a service message; revert back to monitor */
   op->set_type_monitor();
@@ -4343,67 +4514,53 @@ void Monitor::dispatch_op(MonOpRequestRef op)
     dout(5) << __func__ << " " << op->get_req()->get_source_inst()
             << " not enough caps for " << *(op->get_req()) << " -- dropping"
             << dendl;
-    goto drop;
+    return;
   }
 
-  dealt_with = true;
   switch (op->get_req()->get_type()) {
-
     // misc
     case CEPH_MSG_MON_GET_VERSION:
       handle_get_version(op);
-      break;
-
-    case CEPH_MSG_MON_SUBSCRIBE:
-      /* FIXME: check what's being subscribed, filter accordingly */
-      handle_subscribe(op);
-      break;
-
-    default:
-      dealt_with = false;
-      break;
+      return;
   }
-  if (dealt_with)
-    return;
 
   if (!op->is_src_mon()) {
     dout(1) << __func__ << " unexpected monitor message from"
             << " non-monitor entity " << op->get_req()->get_source_inst()
             << " " << *(op->get_req()) << " -- dropping" << dendl;
-    goto drop;
+    return;
   }
 
   /* messages that should only be sent by another monitor */
-  dealt_with = true;
   switch (op->get_req()->get_type()) {
 
     case MSG_ROUTE:
       handle_route(op);
-      break;
+      return;
 
     case MSG_MON_PROBE:
       handle_probe(op);
-      break;
+      return;
 
     // Sync (i.e., the new slurp, but on steroids)
     case MSG_MON_SYNC:
       handle_sync(op);
-      break;
+      return;
     case MSG_MON_SCRUB:
       handle_scrub(op);
-      break;
+      return;
 
     /* log acks are sent from a monitor we sent the MLog to, and are
        never sent by clients to us. */
     case MSG_LOGACK:
       log_client.handle_log_ack((MLogAck*)op->get_req());
-      break;
+      return;
 
     // monmap
     case MSG_MON_JOIN:
       op->set_type_service();
       paxos_service[PAXOS_MONMAP]->dispatch(op);
-      break;
+      return;
 
     // paxos
     case MSG_MON_PAXOS:
@@ -4412,7 +4569,7 @@ void Monitor::dispatch_op(MonOpRequestRef op)
         MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
         if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
           //can't send these!
-          break;
+          return;
         }
 
         if (state == STATE_SYNCHRONIZING) {
@@ -4420,21 +4577,21 @@ void Monitor::dispatch_op(MonOpRequestRef op)
           // good, thus just drop them and ignore them.
           dout(10) << __func__ << " ignore paxos msg from "
             << pm->get_source_inst() << dendl;
-          break;
+          return;
         }
 
         // sanitize
         if (pm->epoch > get_epoch()) {
           bootstrap();
-          break;
+          return;
         }
         if (pm->epoch != get_epoch()) {
-          break;
+          return;
         }
 
         paxos->dispatch(op);
       }
-      break;
+      return;
 
     // elector messages
     case MSG_MON_ELECTION:
@@ -4443,41 +4600,34 @@ void Monitor::dispatch_op(MonOpRequestRef op)
       if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
         dout(0) << "MMonElection received from entity without enough caps!"
           << op->get_session()->caps << dendl;
-        break;
+        return;;
       }
       if (!is_probing() && !is_synchronizing()) {
         elector.dispatch(op);
       }
-      break;
+      return;
 
     case MSG_FORWARD:
       handle_forward(op);
-      break;
+      return;
 
     case MSG_TIMECHECK:
+      dout(5) << __func__ << " ignoring " << op << dendl;
+      return;
+    case MSG_TIMECHECK2:
       handle_timecheck(op);
-      break;
+      return;
 
     case MSG_MON_HEALTH:
-      health_monitor->dispatch(op);
+      dout(5) << __func__ << " dropping deprecated message: "
+             << *op->get_req() << dendl;
       break;
-
     case MSG_MON_HEALTH_CHECKS:
       op->set_type_service();
       paxos_service[PAXOS_HEALTH]->dispatch(op);
-      break;
-
-    default:
-      dealt_with = false;
-      break;
-  }
-  if (!dealt_with) {
-    dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
-    goto drop;
+      return;
   }
-  return;
-
-drop:
+  dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
   return;
 }
 
@@ -4486,18 +4636,11 @@ void Monitor::handle_ping(MonOpRequestRef op)
   MPing *m = static_cast<MPing*>(op->get_req());
   dout(10) << __func__ << " " << *m << dendl;
   MPing *reply = new MPing;
-  entity_inst_t inst = m->get_source_inst();
   bufferlist payload;
   boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
   f->open_object_section("pong");
 
-  if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-    get_health_status(false, f.get(), nullptr);
-  } else {
-    list<string> health_str;
-    get_health(health_str, nullptr, f.get());
-  }
-
+  get_health_status(false, f.get(), nullptr);
   {
     stringstream ss;
     get_mon_status(f.get(), ss);
@@ -4506,17 +4649,20 @@ void Monitor::handle_ping(MonOpRequestRef op)
   f->close_section();
   stringstream ss;
   f->flush(ss);
-  ::encode(ss.str(), payload);
+  encode(ss.str(), payload);
   reply->set_payload(payload);
   dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
-  messenger->send_message(reply, inst);
+  m->get_connection()->send_message(reply);
 }
 
 void Monitor::timecheck_start()
 {
   dout(10) << __func__ << dendl;
   timecheck_cleanup();
-  timecheck_start_round();
+  if (get_quorum_mon_features().contains_all(
+       ceph::features::mon::FEATURE_NAUTILUS)) {
+    timecheck_start_round();
+  }
 }
 
 void Monitor::timecheck_finish()
@@ -4528,17 +4674,17 @@ void Monitor::timecheck_finish()
 void Monitor::timecheck_start_round()
 {
   dout(10) << __func__ << " curr " << timecheck_round << dendl;
-  assert(is_leader());
+  ceph_assert(is_leader());
 
   if (monmap->size() == 1) {
-    assert(0 == "We are alone; this shouldn't have been scheduled!");
+    ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
     return;
   }
 
   if (timecheck_round % 2) {
     dout(10) << __func__ << " there's a timecheck going on" << dendl;
     utime_t curr_time = ceph_clock_now();
-    double max = g_conf->mon_timecheck_interval*3;
+    double max = g_conf()->mon_timecheck_interval*3;
     if (curr_time - timecheck_round_start < max) {
       dout(10) << __func__ << " keep current round going" << dendl;
       goto out;
@@ -4549,7 +4695,7 @@ void Monitor::timecheck_start_round()
     }
   }
 
-  assert(timecheck_round % 2 == 0);
+  ceph_assert(timecheck_round % 2 == 0);
   timecheck_acks = 0;
   timecheck_round ++;
   timecheck_round_start = ceph_clock_now();
@@ -4564,13 +4710,13 @@ out:
 void Monitor::timecheck_finish_round(bool success)
 {
   dout(10) << __func__ << " curr " << timecheck_round << dendl;
-  assert(timecheck_round % 2);
+  ceph_assert(timecheck_round % 2);
   timecheck_round ++;
   timecheck_round_start = utime_t();
 
   if (success) {
-    assert(timecheck_waiting.empty());
-    assert(timecheck_acks == quorum.size());
+    ceph_assert(timecheck_waiting.empty());
+    ceph_assert(timecheck_acks == quorum.size());
     timecheck_report();
     timecheck_check_skews();
     return;
@@ -4578,9 +4724,8 @@ void Monitor::timecheck_finish_round(bool success)
 
   dout(10) << __func__ << " " << timecheck_waiting.size()
            << " peers still waiting:";
-  for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
-      p != timecheck_waiting.end(); ++p) {
-    *_dout << " " << p->first.name;
+  for (auto& p : timecheck_waiting) {
+    *_dout << " mon." << p.first;
   }
   *_dout << dendl;
   timecheck_waiting.clear();
@@ -4638,22 +4783,20 @@ void Monitor::timecheck_reset_event()
 void Monitor::timecheck_check_skews()
 {
   dout(10) << __func__ << dendl;
-  assert(is_leader());
-  assert((timecheck_round % 2) == 0);
+  ceph_assert(is_leader());
+  ceph_assert((timecheck_round % 2) == 0);
   if (monmap->size() == 1) {
-    assert(0 == "We are alone; we shouldn't have gotten here!");
+    ceph_abort_msg("We are alone; we shouldn't have gotten here!");
     return;
   }
-  assert(timecheck_latencies.size() == timecheck_skews.size());
+  ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
 
   bool found_skew = false;
-  for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
-       p != timecheck_skews.end(); ++p) {
-
+  for (auto& p : timecheck_skews) {
     double abs_skew;
-    if (timecheck_has_skew(p->second, &abs_skew)) {
+    if (timecheck_has_skew(p.second, &abs_skew)) {
       dout(10) << __func__
-               << " " << p->first << " skew " << abs_skew << dendl;
+               << " " << p.first << " skew " << abs_skew << dendl;
       found_skew = true;
     }
   }
@@ -4677,53 +4820,51 @@ void Monitor::timecheck_check_skews()
 void Monitor::timecheck_report()
 {
   dout(10) << __func__ << dendl;
-  assert(is_leader());
-  assert((timecheck_round % 2) == 0);
+  ceph_assert(is_leader());
+  ceph_assert((timecheck_round % 2) == 0);
   if (monmap->size() == 1) {
-    assert(0 == "We are alone; we shouldn't have gotten here!");
+    ceph_abort_msg("We are alone; we shouldn't have gotten here!");
     return;
   }
 
-  assert(timecheck_latencies.size() == timecheck_skews.size());
+  ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
   bool do_output = true; // only output report once
   for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
     if (monmap->get_name(*q) == name)
       continue;
 
-    MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
+    MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
     m->epoch = get_epoch();
     m->round = timecheck_round;
 
-    for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
-         it != timecheck_skews.end(); ++it) {
-      double skew = it->second;
-      double latency = timecheck_latencies[it->first];
+    for (auto& it : timecheck_skews) {
+      double skew = it.second;
+      double latency = timecheck_latencies[it.first];
 
-      m->skews[it->first] = skew;
-      m->latencies[it->first] = latency;
+      m->skews[it.first] = skew;
+      m->latencies[it.first] = latency;
 
       if (do_output) {
-        dout(25) << __func__ << " " << it->first
+        dout(25) << __func__ << " mon." << it.first
                  << " latency " << latency
                  << " skew " << skew << dendl;
       }
     }
     do_output = false;
-    entity_inst_t inst = monmap->get_inst(*q);
-    dout(10) << __func__ << " send report to " << inst << dendl;
-    messenger->send_message(m, inst);
+    dout(10) << __func__ << " send report to mon." << *q << dendl;
+    send_mon_message(m, *q);
   }
 }
 
 void Monitor::timecheck()
 {
   dout(10) << __func__ << dendl;
-  assert(is_leader());
+  ceph_assert(is_leader());
   if (monmap->size() == 1) {
-    assert(0 == "We are alone; we shouldn't have gotten here!");
+    ceph_abort_msg("We are alone; we shouldn't have gotten here!");
     return;
   }
-  assert(timecheck_round % 2 != 0);
+  ceph_assert(timecheck_round % 2 != 0);
 
   timecheck_acks = 1; // we ack ourselves
 
@@ -4731,21 +4872,20 @@ void Monitor::timecheck()
            << " round " << timecheck_round << dendl;
 
   // we are at the eye of the storm; the point of reference
-  timecheck_skews[messenger->get_myinst()] = 0.0;
-  timecheck_latencies[messenger->get_myinst()] = 0.0;
+  timecheck_skews[rank] = 0.0;
+  timecheck_latencies[rank] = 0.0;
 
   for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
     if (monmap->get_name(*it) == name)
       continue;
 
-    entity_inst_t inst = monmap->get_inst(*it);
     utime_t curr_time = ceph_clock_now();
-    timecheck_waiting[inst] = curr_time;
-    MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
+    timecheck_waiting[*it] = curr_time;
+    MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
     m->epoch = get_epoch();
     m->round = timecheck_round;
-    dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
-    messenger->send_message(m, inst);
+    dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
+    send_mon_message(m, *it);
   }
 }
 
@@ -4754,13 +4894,13 @@ health_status_t Monitor::timecheck_status(ostringstream &ss,
                                           const double latency)
 {
   health_status_t status = HEALTH_OK;
-  assert(latency >= 0);
+  ceph_assert(latency >= 0);
 
   double abs_skew;
   if (timecheck_has_skew(skew_bound, &abs_skew)) {
     status = HEALTH_WARN;
     ss << "clock skew " << abs_skew << "s"
-       << " > max " << g_conf->mon_clock_drift_allowed << "s";
+       << " > max " << g_conf()->mon_clock_drift_allowed << "s";
   }
 
   return status;
@@ -4768,12 +4908,12 @@ health_status_t Monitor::timecheck_status(ostringstream &ss,
 
 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
 {
-  MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
+  MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
   dout(10) << __func__ << " " << *m << dendl;
   /* handles PONG's */
-  assert(m->op == MTimeCheck::OP_PONG);
+  ceph_assert(m->op == MTimeCheck2::OP_PONG);
 
-  entity_inst_t other = m->get_source_inst();
+  int other = m->get_source().num();
   if (m->epoch < get_epoch()) {
     dout(1) << __func__ << " got old timecheck epoch " << m->epoch
             << " from " << other
@@ -4781,7 +4921,7 @@ void Monitor::handle_timecheck_leader(MonOpRequestRef op)
             << " -- severely lagged? discard" << dendl;
     return;
   }
-  assert(m->epoch == get_epoch());
+  ceph_assert(m->epoch == get_epoch());
 
   if (m->round < timecheck_round) {
     dout(1) << __func__ << " got old round " << m->round
@@ -4792,7 +4932,7 @@ void Monitor::handle_timecheck_leader(MonOpRequestRef op)
 
   utime_t curr_time = ceph_clock_now();
 
-  assert(timecheck_waiting.count(other) > 0);
+  ceph_assert(timecheck_waiting.count(other) > 0);
   utime_t timecheck_sent = timecheck_waiting[other];
   timecheck_waiting.erase(other);
   if (curr_time < timecheck_sent) {
@@ -4851,7 +4991,7 @@ void Monitor::handle_timecheck_leader(MonOpRequestRef op)
    * may be masked by an even higher latency, but with high latencies
    * we probably have worse issues to deal with than just skewed clocks.
    */
-  assert(latency >= 0);
+  ceph_assert(latency >= 0);
 
   double delta = ((double) m->timestamp) - ((double) curr_time);
   double abs_delta = (delta > 0 ? delta : -delta);
@@ -4877,8 +5017,8 @@ void Monitor::handle_timecheck_leader(MonOpRequestRef op)
   if (timecheck_acks == quorum.size()) {
     dout(10) << __func__ << " got pongs from everybody ("
              << timecheck_acks << " total)" << dendl;
-    assert(timecheck_skews.size() == timecheck_acks);
-    assert(timecheck_waiting.empty());
+    ceph_assert(timecheck_skews.size() == timecheck_acks);
+    ceph_assert(timecheck_waiting.empty());
     // everyone has acked, so bump the round to finish it.
     timecheck_finish_round();
   }
@@ -4886,11 +5026,11 @@ void Monitor::handle_timecheck_leader(MonOpRequestRef op)
 
 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
 {
-  MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
+  MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
   dout(10) << __func__ << " " << *m << dendl;
 
-  assert(is_peon());
-  assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
+  ceph_assert(is_peon());
+  ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
 
   if (m->epoch != get_epoch()) {
     dout(1) << __func__ << " got wrong epoch "
@@ -4908,15 +5048,15 @@ void Monitor::handle_timecheck_peon(MonOpRequestRef op)
 
   timecheck_round = m->round;
 
-  if (m->op == MTimeCheck::OP_REPORT) {
-    assert((timecheck_round % 2) == 0);
+  if (m->op == MTimeCheck2::OP_REPORT) {
+    ceph_assert((timecheck_round % 2) == 0);
     timecheck_latencies.swap(m->latencies);
     timecheck_skews.swap(m->skews);
     return;
   }
 
-  assert((timecheck_round % 2) != 0);
-  MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
+  ceph_assert((timecheck_round % 2) != 0);
+  MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
   utime_t curr_time = ceph_clock_now();
   reply->timestamp = curr_time;
   reply->epoch = m->epoch;
@@ -4928,17 +5068,17 @@ void Monitor::handle_timecheck_peon(MonOpRequestRef op)
 
 void Monitor::handle_timecheck(MonOpRequestRef op)
 {
-  MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
+  MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
   dout(10) << __func__ << " " << *m << dendl;
 
   if (is_leader()) {
-    if (m->op != MTimeCheck::OP_PONG) {
+    if (m->op != MTimeCheck2::OP_PONG) {
       dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
     } else {
       handle_timecheck_leader(op);
     }
   } else if (is_peon()) {
-    if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
+    if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
       dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
     } else {
       handle_timecheck_peon(op);
@@ -4956,11 +5096,24 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
   bool reply = false;
 
   MonSession *s = op->get_session();
-  assert(s);
+  ceph_assert(s);
+
+  if (m->hostname.size()) {
+    s->remote_host = m->hostname;
+  }
 
   for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
        p != m->what.end();
        ++p) {
+    if (p->first == "monmap" || p->first == "config") {
+      // these require no caps
+    } else if (!s->is_capable("mon", MON_CAP_R)) {
+      dout(5) << __func__ << " " << op->get_req()->get_source_inst()
+             << " not enough caps for " << *(op->get_req()) << " -- dropping"
+             << dendl;
+      continue;
+    }
+
     // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
     if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
       reply = true;
@@ -4970,7 +5123,7 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
       for (map<string, Subscription*>::iterator it = s->sub_map.begin();
           it != s->sub_map.end(); ) {
        if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
-         Mutex::Locker l(session_map_lock);
+         std::lock_guard l(session_map_lock);
          session_map.remove_sub((it++)->second);
        } else {
          ++it;
@@ -4979,7 +5132,7 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
     }
 
     {
-      Mutex::Locker l(session_map_lock);
+      std::lock_guard l(session_map_lock);
       session_map.add_update_sub(s, p->first, p->second.start,
                                 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
                                 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
@@ -4989,7 +5142,7 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
       dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
       if ((int)s->is_capable("mds", MON_CAP_R)) {
         Subscription *sub = s->sub_map[p->first];
-        assert(sub != nullptr);
+        ceph_assert(sub != nullptr);
         mdsmon()->check_sub(sub);
       }
     } else if (p->first == "osdmap") {
@@ -5002,12 +5155,7 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
       }
     } else if (p->first == "osd_pg_creates") {
       if ((int)s->is_capable("osd", MON_CAP_W)) {
-       if (monmap->get_required_features().contains_all(
-             ceph::features::mon::FEATURE_LUMINOUS)) {
-         osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
-       } else {
-         pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
-       }
+       osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
       }
     } else if (p->first == "monmap") {
       monmon()->check_sub(s->sub_map[p->first]);
@@ -5017,6 +5165,8 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
       mgrmon()->check_sub(s->sub_map[p->first]);
     } else if (p->first == "servicemap") {
       mgrstatmon()->check_sub(s->sub_map[p->first]);
+    } else if (p->first == "config") {
+      configmon()->check_sub(s);
     }
   }
 
@@ -5026,7 +5176,7 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
     ConnectionRef con = m->get_connection();
     if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
       m->get_connection()->send_message(new MMonSubscribeAck(
-       monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
+       monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
   }
 
 }
@@ -5038,7 +5188,7 @@ void Monitor::handle_get_version(MonOpRequestRef op)
   PaxosService *svc = NULL;
 
   MonSession *s = op->get_session();
-  assert(s);
+  ceph_assert(s);
 
   if (!is_leader() && !is_peon()) {
     dout(10) << " waiting for quorum" << dendl;
@@ -5084,24 +5234,24 @@ bool Monitor::ms_handle_reset(Connection *con)
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
     return false;
 
-  MonSession *s = static_cast<MonSession *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<MonSession*>(priv.get());
   if (!s)
     return false;
 
   // break any con <-> session ref cycle
-  s->con->set_priv(NULL);
+  s->con->set_priv(nullptr);
 
   if (is_shutdown())
     return false;
 
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
-  dout(10) << "reset/close on session " << s->inst << dendl;
-  if (!s->closed) {
-    Mutex::Locker l(session_map_lock);
+  dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
+  if (!s->closed && s->item.is_on_list()) {
+    std::lock_guard l(session_map_lock);
     remove_session(s);
   }
-  s->put();
   return true;
 }
 
@@ -5144,7 +5294,7 @@ void Monitor::update_mon_metadata(int from, Metadata&& m)
 
   MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
   bufferlist bl;
-  ::encode(pending_metadata, bl);
+  encode(pending_metadata, bl);
   t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
   paxos->trigger_propose();
 }
@@ -5155,8 +5305,8 @@ int Monitor::load_metadata()
   int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
   if (r)
     return r;
-  bufferlist::iterator it = bl.begin();
-  ::decode(mon_metadata, it);
+  auto it = bl.cbegin();
+  decode(mon_metadata, it);
 
   pending_metadata = mon_metadata;
   return 0;
@@ -5164,7 +5314,7 @@ int Monitor::load_metadata()
 
 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
 {
-  assert(f);
+  ceph_assert(f);
   if (!mon_metadata.count(mon)) {
     err << "mon." << mon << " not found";
     return -EINVAL;
@@ -5201,7 +5351,7 @@ void Monitor::count_metadata(const string& field, Formatter *f)
 
 int Monitor::print_nodes(Formatter *f, ostream& err)
 {
-  map<string, list<int> > mons;        // hostname => mon
+  map<string, list<string> > mons;     // hostname => mon
   for (map<int, Metadata>::iterator it = mon_metadata.begin();
        it != mon_metadata.end(); ++it) {
     const Metadata& m = it->second;
@@ -5210,7 +5360,7 @@ int Monitor::print_nodes(Formatter *f, ostream& err)
       // not likely though
       continue;
     }
-    mons[hostname->second].push_back(it->first);
+    mons[hostname->second].push_back(monmap->get_name(it->first));
   }
 
   dump_services(f, mons, "mon");
@@ -5223,7 +5373,7 @@ int Monitor::print_nodes(Formatter *f, ostream& err)
 int Monitor::scrub_start()
 {
   dout(10) << __func__ << dendl;
-  assert(is_leader());
+  ceph_assert(is_leader());
 
   if (!scrub_result.empty()) {
     clog->info() << "scrub already in progress";
@@ -5240,8 +5390,8 @@ int Monitor::scrub_start()
 
 int Monitor::scrub()
 {
-  assert(is_leader());
-  assert(scrub_state);
+  ceph_assert(is_leader());
+  ceph_assert(scrub_state);
 
   scrub_cancel_timeout();
   wait_for_paxos_write();
@@ -5260,7 +5410,7 @@ int Monitor::scrub()
     MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
                                  num_keys);
     r->key = scrub_state->last_key;
-    messenger->send_message(r, monmap->get_inst(*p));
+    send_mon_message(r, *p);
   }
 
   // scrub my keys
@@ -5277,7 +5427,7 @@ int Monitor::scrub()
   scrub_reset_timeout();
 
   if (quorum.size() == 1) {
-    assert(scrub_state->finished == true);
+    ceph_assert(scrub_state->finished == true);
     scrub_finish();
   }
   return 0;
@@ -5318,7 +5468,7 @@ void Monitor::handle_scrub(MonOpRequestRef op)
       scrub_reset_timeout();
 
       int from = m->get_source().num();
-      assert(scrub_result.count(from) == 0);
+      ceph_assert(scrub_result.count(from) == 0);
       scrub_result[from] = m->result;
 
       if (scrub_result.size() == quorum.size()) {
@@ -5338,9 +5488,9 @@ bool Monitor::_scrub(ScrubResult *r,
                      pair<string,string> *start,
                      int *num_keys)
 {
-  assert(r != NULL);
-  assert(start != NULL);
-  assert(num_keys != NULL);
+  ceph_assert(r != NULL);
+  ceph_assert(start != NULL);
+  ceph_assert(num_keys != NULL);
 
   set<string> prefixes = get_sync_targets_names();
   prefixes.erase("paxos");  // exclude paxos, as this one may have extra states for proposals, etc.
@@ -5371,7 +5521,7 @@ bool Monitor::_scrub(ScrubResult *r,
 
     bufferlist bl;
     int err = store->get(k.first, k.second, bl);
-    assert(err == 0);
+    ceph_assert(err == 0);
     
     uint32_t key_crc = bl.crc32c(0);
     dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
@@ -5509,7 +5659,7 @@ void Monitor::scrub_reset_timeout()
   dout(15) << __func__ << " reset timeout event" << dendl;
   scrub_cancel_timeout();
   scrub_timeout_event = timer.add_event_after(
-    g_conf->mon_scrub_timeout,
+    g_conf()->mon_scrub_timeout,
     new C_MonContext(this, [this](int) {
       scrub_timeout();
     }));
@@ -5518,7 +5668,7 @@ void Monitor::scrub_reset_timeout()
 /************ TICK ***************/
 void Monitor::new_tick()
 {
-  timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
+  timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext(this, [this](int) {
        tick();
       }));
 }
@@ -5531,7 +5681,7 @@ void Monitor::tick()
   
   // Check if we need to emit any delayed health check updated messages
   if (is_leader()) {
-    const auto min_period = g_conf->get_val<int64_t>(
+    const auto min_period = g_conf().get_val<int64_t>(
                               "mon_health_log_update_period");
     for (auto& svc : paxos_service) {
       auto health = svc->get_health_checks();
@@ -5564,39 +5714,40 @@ void Monitor::tick()
   }
 
 
-  for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
-    (*p)->tick();
-    (*p)->maybe_trim();
+  for (auto& svc : paxos_service) {
+    svc->tick();
+    svc->maybe_trim();
   }
   
   // trim sessions
   {
-    Mutex::Locker l(session_map_lock);
+    std::lock_guard l(session_map_lock);
     auto p = session_map.sessions.begin();
 
     bool out_for_too_long = (!exited_quorum.is_zero() &&
-                            now > (exited_quorum + 2*g_conf->mon_lease));
+                            now > (exited_quorum + 2*g_conf()->mon_lease));
 
     while (!p.end()) {
       MonSession *s = *p;
       ++p;
     
       // don't trim monitors
-      if (s->inst.name.is_mon())
+      if (s->name.is_mon())
        continue;
 
       if (s->session_timeout < now && s->con) {
        // check keepalive, too
        s->session_timeout = s->con->get_last_keepalive();
-       s->session_timeout += g_conf->mon_session_timeout;
+       s->session_timeout += g_conf()->mon_session_timeout;
       }
       if (s->session_timeout < now) {
-       dout(10) << " trimming session " << s->con << " " << s->inst
+       dout(10) << " trimming session " << s->con << " " << s->name
+                << " " << s->addrs
                 << " (timeout " << s->session_timeout
                 << " < now " << now << ")" << dendl;
       } else if (out_for_too_long) {
        // boot the client Session because we've taken too long getting back in
-       dout(10) << " trimming session " << s->con << " " << s->inst
+       dout(10) << " trimming session " << s->con << " " << s->name
                 << " because we've been out of quorum too long" << dendl;
       } else {
        continue;
@@ -5620,9 +5771,43 @@ void Monitor::tick()
     paxos->trigger_propose();
   }
 
+  mgr_client.update_daemon_health(get_health_metrics());
   new_tick();
 }
 
+vector<DaemonHealthMetric> Monitor::get_health_metrics() 
+{
+  vector<DaemonHealthMetric> metrics;
+
+  utime_t oldest_secs;
+  const utime_t now = ceph_clock_now();
+  auto too_old = now;
+  too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
+  int slow = 0;
+  TrackedOpRef oldest_op;
+  auto count_slow_ops = [&](TrackedOp& op) {
+    if (op.get_initiated() < too_old) {
+      slow++;
+      if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
+       oldest_op = &op;
+      }
+      return true;
+    } else {
+      return false;
+    }
+  };
+  if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
+    if (slow) {
+      derr << __func__ << " reporting " << slow << " slow ops, oldest is "
+          << oldest_op->get_desc() << dendl;
+    }
+    metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
+  } else {
+    metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
+  }
+  return metrics;
+}
+
 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
 {
   uuid_d nf;
@@ -5630,7 +5815,7 @@ void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
   dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
 
   bufferlist bl;
-  ::encode(nf, bl);
+  encode(nf, bl);
   t->put(MONITOR_NAME, "cluster_fingerprint", bl);
 }
 
@@ -5640,7 +5825,7 @@ int Monitor::check_fsid()
   int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
   if (r == -ENOENT)
     return r;
-  assert(r == 0);
+  ceph_assert(r == 0);
 
   string es(ebl.c_str(), ebl.length());
 
@@ -5731,17 +5916,17 @@ int Monitor::mkfs(bufferlist& osdmapbl)
     KeyRing keyring;
     string keyring_filename;
 
-    r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
+    r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
     if (r) {
-      derr << "unable to find a keyring file on " << g_conf->keyring
+      derr << "unable to find a keyring file on " << g_conf()->keyring
           << ": " << cpp_strerror(r) << dendl;
-      if (g_conf->key != "") {
-       string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
+      if (g_conf()->key != "") {
+       string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
          "\n\tcaps mon = \"allow *\"\n";
        bufferlist bl;
        bl.append(keyring_plaintext);
        try {
-         bufferlist::iterator i = bl.begin();
+         auto i = bl.cbegin();
          keyring.decode_plaintext(i);
        }
        catch (const buffer::error& e) {
@@ -5755,7 +5940,7 @@ int Monitor::mkfs(bufferlist& osdmapbl)
     } else {
       r = keyring.load(g_ceph_context, keyring_filename);
       if (r < 0) {
-       derr << "unable to load initial keyring " << g_conf->keyring << dendl;
+       derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
        return r;
       }
     }
@@ -5776,7 +5961,7 @@ int Monitor::mkfs(bufferlist& osdmapbl)
 int Monitor::write_default_keyring(bufferlist& bl)
 {
   ostringstream os;
-  os << g_conf->mon_data << "/keyring";
+  os << g_conf()->mon_data << "/keyring";
 
   int err = 0;
   int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
@@ -5811,8 +5996,82 @@ void Monitor::extract_save_mon_key(KeyRing& keyring)
   }
 }
 
-bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
-                               bool force_new)
+// AuthClient methods -- for mon <-> mon communication
+int Monitor::get_auth_request(
+  Connection *con,
+  AuthConnectionMeta *auth_meta,
+  uint32_t *method,
+  vector<uint32_t> *preferred_modes,
+  bufferlist *out)
+{
+  std::scoped_lock l(auth_lock);
+  if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
+      con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
+    return -EACCES;
+  }
+  AuthAuthorizer *auth;
+  if (!ms_get_authorizer(con->get_peer_type(), &auth)) {
+    return -EACCES;
+  }
+  auth_meta->authorizer.reset(auth);
+  auth_registry.get_supported_modes(con->get_peer_type(),
+                                   auth->protocol,
+                                   preferred_modes);
+  *method = auth->protocol;
+  *out = auth->bl;
+  return 0;
+}
+
+int Monitor::handle_auth_reply_more(
+  Connection *con,
+  AuthConnectionMeta *auth_meta,
+  const bufferlist& bl,
+  bufferlist *reply)
+{
+  std::scoped_lock l(auth_lock);
+  if (!auth_meta->authorizer) {
+    derr << __func__ << " no authorizer?" << dendl;
+    return -EACCES;
+  }
+  auth_meta->authorizer->add_challenge(cct, bl);
+  *reply = auth_meta->authorizer->bl;
+  return 0;
+}
+
+int Monitor::handle_auth_done(
+  Connection *con,
+  AuthConnectionMeta *auth_meta,
+  uint64_t global_id,
+  uint32_t con_mode,
+  const bufferlist& bl,
+  CryptoKey *session_key,
+  std::string *connection_secret)
+{
+  std::scoped_lock l(auth_lock);
+  // verify authorizer reply
+  auto p = bl.begin();
+  if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
+    dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
+    return -EACCES;
+  }
+  auth_meta->session_key = auth_meta->authorizer->session_key;
+  return 0;
+}
+
+int Monitor::handle_auth_bad_method(
+  Connection *con,
+  AuthConnectionMeta *auth_meta,
+  uint32_t old_auth_method,
+  int result,
+  const std::vector<uint32_t>& allowed_methods,
+  const std::vector<uint32_t>& allowed_modes)
+{
+  derr << __func__ << " hmm, they didn't like " << old_auth_method
+       << " result " << cpp_strerror(result) << dendl;
+  return -EACCES;
+}
+
+bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer)
 {
   dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
           << dendl;
@@ -5828,7 +6087,7 @@ bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
   if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
     // auth_none
     dout(20) << __func__ << " building auth_none authorizer" << dendl;
-    AuthNoneClientHandler handler(g_ceph_context, nullptr);
+    AuthNoneClientHandler handler{g_ceph_context};
     handler.set_global_id(0);
     *authorizer = handler.build_authorizer(service_id);
     return true;
@@ -5861,8 +6120,8 @@ bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
       return false;
     }
 
-    ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
-                                            secret, (uint64_t)-1);
+    ret = key_server.build_session_auth_info(
+      service_id, auth_ticket_info.ticket, info, secret, (uint64_t)-1);
     if (ret < 0) {
       dout(0) << __func__ << " failed to build mon session_auth_info "
              << cpp_strerror(ret) << dendl;
@@ -5870,7 +6129,8 @@ bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
     }
   } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
     // mgr
-    ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
+    ret = key_server.build_session_auth_info(
+      service_id, auth_ticket_info.ticket, info);
     if (ret < 0) {
       derr << __func__ << " failed to build mgr service session_auth_info "
           << cpp_strerror(ret) << dendl;
@@ -5886,11 +6146,11 @@ bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
     return false;
   }
   bufferlist ticket_data;
-  ::encode(blob, ticket_data);
+  encode(blob, ticket_data);
 
-  bufferlist::iterator iter = ticket_data.begin();
+  auto iter = ticket_data.cbegin();
   CephXTicketHandler handler(g_ceph_context, service_id);
-  ::decode(handler.ticket, iter);
+  decode(handler.ticket, iter);
 
   handler.session_key = info.session_key;
 
@@ -5899,43 +6159,267 @@ bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
   return true;
 }
 
-bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
-                                  int protocol, bufferlist& authorizer_data,
-                                  bufferlist& authorizer_reply,
-                                  bool& isvalid, CryptoKey& session_key,
-                                  std::unique_ptr<AuthAuthorizerChallenge> *challenge)
+KeyStore *Monitor::ms_get_auth1_authorizer_keystore()
 {
-  dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
-          << " " << ceph_entity_type_name(peer_type)
-          << " protocol " << protocol << dendl;
+  return &keyring;
+}
 
-  if (is_shutdown())
-    return false;
+int Monitor::handle_auth_request(
+  Connection *con,
+  AuthConnectionMeta *auth_meta,
+  bool more,
+  uint32_t auth_method,
+  const bufferlist &payload,
+  bufferlist *reply)
+{
+  std::scoped_lock l(auth_lock);
 
-  if (peer_type == CEPH_ENTITY_TYPE_MON &&
-      auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
-    // monitor, and cephx is enabled
-    isvalid = false;
-    if (protocol == CEPH_AUTH_CEPHX) {
-      bufferlist::iterator iter = authorizer_data.begin();
-      CephXServiceTicketInfo auth_ticket_info;
-      
-      if (authorizer_data.length()) {
-       bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
-                                          auth_ticket_info, challenge, authorizer_reply);
-       if (ret) {
-         session_key = auth_ticket_info.session_key;
-         isvalid = true;
-       } else {
-         dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
-        }
+  // NOTE: be careful, the Connection hasn't fully negotiated yet, so
+  // e.g., peer_features, peer_addrs, and others are still unknown.
+
+  dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
+          << " method " << auth_method
+          << " payload " << payload.length()
+          << dendl;
+  if (!more) {
+    auth_meta->auth_mode = payload[0];
+  }
+
+  if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
+      auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
+    AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
+                                                         auth_method);
+    if (!ah) {
+      lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
+                << auth_method << dendl;
+      return -EOPNOTSUPP;
+    }
+    bool was_challenge = (bool)auth_meta->authorizer_challenge;
+    bool isvalid = ah->verify_authorizer(
+      cct,
+      &keyring,
+      payload,
+      auth_meta->get_connection_secret_length(),
+      reply,
+      &con->peer_name,
+      &con->peer_global_id,
+      &con->peer_caps_info,
+      &auth_meta->session_key,
+      &auth_meta->connection_secret,
+      &auth_meta->authorizer_challenge);
+    if (isvalid) {
+      ms_handle_authentication(con);
+      return 1;
+    }
+    if (!more && !was_challenge && auth_meta->authorizer_challenge) {
+      return 0;
+    }
+    dout(10) << __func__ << " bad authorizer on " << con << dendl;
+    return -EACCES;
+  } else if (auth_meta->auth_mode < AUTH_MODE_MON &&
+            auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
+    derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
+        << dendl;
+    return -EACCES;
+  }
+
+  // wait until we've formed an initial quorum on mkfs so that we have
+  // the initial keys (e.g., client.admin).
+  if (authmon()->get_last_committed() == 0) {
+    dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
+    return -EBUSY;
+  }
+
+  RefCountedPtr priv;
+  MonSession *s;
+  int32_t r = 0;
+  auto p = payload.begin();
+  if (!more) {
+    if (con->get_priv()) {
+      return -EACCES; // wtf
+    }
+
+    // handler?
+    unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
+      auth_method, g_ceph_context, &key_server)};
+    if (!auth_handler) {
+      dout(1) << __func__ << " auth_method " << auth_method << " not supported"
+             << dendl;
+      return -EOPNOTSUPP;
+    }
+
+    uint8_t mode;
+    EntityName entity_name;
+
+    try {
+      decode(mode, p);
+      if (mode < AUTH_MODE_MON ||
+         mode > AUTH_MODE_MON_MAX) {
+       dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
+       return -EACCES;
+      }
+      assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
+      decode(entity_name, p);
+      decode(con->peer_global_id, p);
+    } catch (buffer::error& e) {
+      dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
+      return -EACCES;
+    }
+
+    // supported method?
+    if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
+       entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
+       entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
+       entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
+      if (!auth_cluster_required.is_supported_auth(auth_method)) {
+       dout(10) << __func__ << " entity " << entity_name << " method "
+                << auth_method << " not among supported "
+                << auth_cluster_required.get_supported_set() << dendl;
+       return -EOPNOTSUPP;
       }
     } else {
-      dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
+      if (!auth_service_required.is_supported_auth(auth_method)) {
+       dout(10) << __func__ << " entity " << entity_name << " method "
+                << auth_method << " not among supported "
+                << auth_cluster_required.get_supported_set() << dendl;
+       return -EOPNOTSUPP;
+      }
+    }
+
+    // for msgr1 we would do some weirdness here to ensure signatures
+    // are supported by the client if we require it.  for msgr2 that
+    // is not necessary.
+
+    if (!con->peer_global_id) {
+      con->peer_global_id = authmon()->_assign_global_id();
+      if (!con->peer_global_id) {
+       dout(1) << __func__ << " failed to assign global_id" << dendl;
+       return -EBUSY;
+      }
+      dout(10) << __func__ << "  assigned global_id " << con->peer_global_id
+              << dendl;
+    }
+
+    // set up partial session
+    s = new MonSession(con);
+    s->auth_handler = auth_handler.release();
+    con->set_priv(RefCountedPtr{s, false});
+
+    r = s->auth_handler->start_session(
+      entity_name,
+      auth_meta->get_connection_secret_length(),
+      reply,
+      &con->peer_caps_info,
+      &auth_meta->session_key,
+      &auth_meta->connection_secret);
+  } else {
+    priv = con->get_priv();
+    if (!priv) {
+      // this can happen if the async ms_handle_reset event races with
+      // the unlocked call into handle_auth_request
+      return -EACCES;
     }
+    s = static_cast<MonSession*>(priv.get());
+    r = s->auth_handler->handle_request(
+      p,
+      auth_meta->get_connection_secret_length(),
+      reply,
+      &con->peer_global_id,
+      &con->peer_caps_info,
+      &auth_meta->session_key,
+      &auth_meta->connection_secret);
+  }
+  if (r > 0 &&
+      !s->authenticated) {
+    ms_handle_authentication(con);
+  }
+
+  dout(30) << " r " << r << " reply:\n";
+  reply->hexdump(*_dout);
+  *_dout << dendl;
+  return r;
+}
+
+void Monitor::ms_handle_accept(Connection *con)
+{
+  auto priv = con->get_priv();
+  MonSession *s = static_cast<MonSession*>(priv.get());
+  if (!s) {
+    // legacy protocol v1?
+    dout(10) << __func__ << " con " << con << " no session" << dendl;
+    return;
+  }
+
+  if (s->item.is_on_list()) {
+    dout(10) << __func__ << " con " << con << " session " << s
+            << " already on list" << dendl;
   } else {
-    // who cares.
-    isvalid = true;
+    dout(10) << __func__ << " con " << con << " session " << s
+            << " registering session for "
+            << con->get_peer_addrs() << dendl;
+    s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
+             con->get_peer_addrs());
+    std::lock_guard l(session_map_lock);
+    session_map.add_session(s);
   }
-  return true;
+}
+
+int Monitor::ms_handle_authentication(Connection *con)
+{
+  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+    // mon <-> mon connections need no Session, and setting one up
+    // creates an awkward ref cycle between Session and Connection.
+    return 1;
+  }
+
+  auto priv = con->get_priv();
+  MonSession *s = static_cast<MonSession*>(priv.get());
+  if (!s) {
+    // must be msgr2, otherwise dispatch would have set up the session.
+    s = session_map.new_session(
+      entity_name_t(con->get_peer_type(), -1),  // we don't know yet
+      con->get_peer_addrs(),
+      con);
+    assert(s);
+    dout(10) << __func__ << " adding session " << s << " to con " << con
+            << dendl;
+    con->set_priv(s);
+    logger->set(l_mon_num_sessions, session_map.get_size());
+    logger->inc(l_mon_session_add);
+  }
+  dout(10) << __func__ << " session " << s << " con " << con
+          << " addr " << s->con->get_peer_addr()
+          << " " << *s << dendl;
+
+  AuthCapsInfo &caps_info = con->get_peer_caps_info();
+  int ret = 0;
+  if (caps_info.allow_all) {
+    s->caps.set_allow_all();
+    s->authenticated = true;
+    ret = 1;
+  }
+  if (caps_info.caps.length()) {
+    bufferlist::const_iterator p = caps_info.caps.cbegin();
+    string str;
+    try {
+      decode(str, p);
+    } catch (const buffer::error &err) {
+      derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
+          << " in auth db" << dendl;
+      str.clear();
+      ret = -EPERM;
+    }
+    if (ret >= 0) {
+      if (s->caps.parse(str, NULL)) {
+       s->authenticated = true;
+       ret = 1;
+      } else {
+       derr << __func__ << " unparseable caps '" << str << "' for "
+            << con->get_peer_entity_name() << dendl;
+       ret = -EPERM;
+      }
+    }
+  }
+
+  return ret;
 }