]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/Monitor.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mon / Monitor.cc
index 9a7304fcab41be15130b3469e32d32c8733de436..6348f44c97b8cdc6fac8cb486f951492d74f9e6f 100644 (file)
@@ -45,6 +45,7 @@
 #include "messages/MMonPaxos.h"
 #include "messages/MRoute.h"
 #include "messages/MForward.h"
+#include "messages/MStatfs.h"
 
 #include "messages/MMonSubscribe.h"
 #include "messages/MMonSubscribeAck.h"
@@ -77,6 +78,7 @@
 #include "MgrMonitor.h"
 #include "MgrStatMonitor.h"
 #include "mon/QuorumService.h"
+#include "mon/OldHealthMonitor.h"
 #include "mon/HealthMonitor.h"
 #include "mon/ConfigKeyService.h"
 #include "common/config.h"
@@ -102,28 +104,20 @@ const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
 #undef FLAG
 #undef COMMAND
 #undef COMMAND_WITH_FLAG
-MonCommand mon_commands[] = {
 #define FLAG(f) (MonCommand::FLAG_##f)
 #define COMMAND(parsesig, helptext, modulename, req_perms, avail)      \
   {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
   {parsesig, helptext, modulename, req_perms, avail, flags},
+MonCommand mon_commands[] = {
 #include <mon/MonCommands.h>
+};
+MonCommand pgmonitor_commands[] = {
+#include <mon/PGMonitorCommands.h>
+};
 #undef COMMAND
 #undef COMMAND_WITH_FLAG
 
-  // FIXME: slurp up the Mgr commands too
-
-#define COMMAND(parsesig, helptext, modulename, req_perms, avail)      \
-  {parsesig, helptext, modulename, req_perms, avail, FLAG(MGR)},
-#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
-  {parsesig, helptext, modulename, req_perms, avail, flags | FLAG(MGR)},
-#include <mgr/MgrCommands.h>
-#undef COMMAND
-#undef COMMAND_WITH_FLAG
-
-};
-
 
 void C_MonContext::finish(int r) {
   if (mon->is_shutdown())
@@ -153,8 +147,6 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
   auth_service_required(cct,
                        cct->_conf->auth_supported.empty() ?
                        cct->_conf->auth_service_required : cct->_conf->auth_supported ),
-  leader_supported_mon_commands(NULL),
-  leader_supported_mon_commands_size(0),
   mgr_messenger(mgr_m),
   mgr_client(cct_, mgr_m),
   pgservice(nullptr),
@@ -204,8 +196,9 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
   paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
   paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
   paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
+  paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
 
-  health_monitor = new HealthMonitor(this);
+  health_monitor = new OldHealthMonitor(this);
   config_key_service = new ConfigKeyService(this, paxos);
 
   mon_caps = new MonCap();
@@ -214,40 +207,30 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
 
   exited_quorum = ceph_clock_now();
 
+  // prepare local commands
+  local_mon_commands.resize(ARRAY_SIZE(mon_commands));
+  for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
+    local_mon_commands[i] = mon_commands[i];
+  }
+  MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
+
+  local_upgrading_mon_commands = local_mon_commands;
+  for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
+    local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
+  }
+  MonCommand::encode_vector(local_upgrading_mon_commands,
+                           local_upgrading_mon_commands_bl);
+
   // assume our commands until we have an election.  this only means
   // we won't reply with EINVAL before the election; any command that
   // actually matters will wait until we have quorum etc and then
   // retry (and revalidate).
-  const MonCommand *cmds;
-  int cmdsize;
-  get_locally_supported_monitor_commands(&cmds, &cmdsize);
-  set_leader_supported_commands(cmds, cmdsize);
+  leader_mon_commands = local_mon_commands;
 
   // note: OSDMonitor may update this based on the luminous flag.
   pgservice = mgrstatmon()->get_pg_stat_service();
 }
 
-PaxosService *Monitor::get_paxos_service_by_name(const string& name)
-{
-  if (name == "mdsmap")
-    return paxos_service[PAXOS_MDSMAP];
-  if (name == "monmap")
-    return paxos_service[PAXOS_MONMAP];
-  if (name == "osdmap")
-    return paxos_service[PAXOS_OSDMAP];
-  if (name == "pgmap")
-    return paxos_service[PAXOS_PGMAP];
-  if (name == "logm")
-    return paxos_service[PAXOS_LOG];
-  if (name == "auth")
-    return paxos_service[PAXOS_AUTH];
-  if (name == "mgr")
-    return paxos_service[PAXOS_MGR];
-
-  assert(0 == "given name does not match known paxos service");
-  return NULL;
-}
-
 Monitor::~Monitor()
 {
   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
@@ -257,8 +240,6 @@ Monitor::~Monitor()
   delete paxos;
   assert(session_map.sessions.empty());
   delete mon_caps;
-  if (leader_supported_mon_commands != mon_commands)
-    delete[] leader_supported_mon_commands;
 }
 
 
@@ -296,7 +277,8 @@ void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
   bool read_only = (command == "mon_status" ||
                     command == "mon metadata" ||
                     command == "quorum_status" ||
-                    command == "ops");
+                    command == "ops" ||
+                    command == "sessions");
 
   (read_only ? audit_clog->debug() : audit_clog->info())
     << "from='admin socket' entity='admin socket' "
@@ -334,6 +316,17 @@ void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
     if (f) {
       f->flush(ss);
     }
+  } else if (command == "sessions") {
+
+    if (f) {
+      f->open_array_section("sessions");
+      for (auto p : session_map.sessions) {
+        f->dump_stream("session") << *p;
+      }
+      f->close_section();
+      f->flush(ss);
+    }
+
   } else {
     assert(0 == "bad AdminSocket command binding");
   }
@@ -378,6 +371,7 @@ CompatSet Monitor::get_supported_features()
   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
+  compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
   return compat;
 }
 
@@ -578,14 +572,22 @@ int Monitor::preinit()
   assert(!logger);
   {
     PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
-    pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess");
-    pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd");
-    pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm");
-    pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions");
-    pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in");
-    pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started");
-    pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won");
-    pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost");
+    pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
+        PerfCountersBuilder::PRIO_USEFUL);
+    pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
+        "sadd", PerfCountersBuilder::PRIO_INTERESTING);
+    pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
+        "srm", PerfCountersBuilder::PRIO_INTERESTING);
+    pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
+        "strm", PerfCountersBuilder::PRIO_USEFUL);
+    pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
+        "ecnt", PerfCountersBuilder::PRIO_USEFUL);
+    pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
+        "estt", PerfCountersBuilder::PRIO_INTERESTING);
+    pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
+        "ewon", PerfCountersBuilder::PRIO_INTERESTING);
+    pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
+        "elst", PerfCountersBuilder::PRIO_INTERESTING);
     logger = pcb.create_perf_counters();
     cct->get_perfcounters_collection()->add(logger);
   }
@@ -768,6 +770,11 @@ int Monitor::preinit()
                                      admin_hook,
                                      "show the ops currently in flight");
   assert(r == 0);
+  r = admin_socket->register_command("sessions",
+                                     "sessions",
+                                     admin_hook,
+                                     "list existing sessions");
+  assert(r == 0);
 
   lock.Lock();
 
@@ -799,13 +806,8 @@ int Monitor::init()
   mgr_messenger->add_dispatcher_tail(this);  // for auth ms_* calls
 
   bootstrap();
-
-  // encode command sets
-  const MonCommand *cmds;
-  int cmdsize;
-  get_locally_supported_monitor_commands(&cmds, &cmdsize);
-  MonCommand::encode_array(cmds, cmdsize, supported_commands_bl);
-
+  // add features of myself into feature_map
+  session_map.feature_map.add_mon(con_self->get_features());
   return 0;
 }
 
@@ -846,6 +848,7 @@ void Monitor::refresh_from_paxos(bool *need_bootstrap)
   for (int i = 0; i < PAXOS_NUM; ++i) {
     paxos_service[i]->post_refresh();
   }
+  load_metadata();
 }
 
 void Monitor::register_cluster_logger()
@@ -897,6 +900,7 @@ void Monitor::shutdown()
     admin_socket->unregister_command("quorum enter");
     admin_socket->unregister_command("quorum exit");
     admin_socket->unregister_command("ops");
+    admin_socket->unregister_command("sessions");
     delete admin_hook;
     admin_hook = NULL;
   }
@@ -1066,6 +1070,7 @@ void Monitor::_reset()
   cancel_probe_timeout();
   timecheck_finish();
   health_events_cleanup();
+  health_check_log_times.clear();
   scrub_event_cancel();
 
   leader_since = utime_t();
@@ -1246,10 +1251,11 @@ void Monitor::sync_reset_timeout()
   dout(10) << __func__ << dendl;
   if (sync_timeout_event)
     timer.cancel_event(sync_timeout_event);
-  sync_timeout_event = new C_MonContext(this, [this](int) {
-      sync_timeout();
-    });
-  timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
+  sync_timeout_event = timer.add_event_after(
+    g_conf->mon_sync_timeout,
+    new C_MonContext(this, [this](int) {
+       sync_timeout();
+      }));
 }
 
 void Monitor::sync_finish(version_t last_committed)
@@ -1421,8 +1427,10 @@ void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
   while (sp.last_committed < paxos->get_version() && left > 0) {
     bufferlist bl;
     sp.last_committed++;
-    store->get(paxos->get_name(), sp.last_committed, bl);
-    // TODO: what if store->get returns error or empty bl?
+
+    int err = store->get(paxos->get_name(), sp.last_committed, bl);
+    assert(err == 0);
+
     tx->put(paxos->get_name(), sp.last_committed, bl);
     left -= bl.length();
     dout(20) << __func__ << " including paxos state " << sp.last_committed
@@ -1590,8 +1598,12 @@ void Monitor::reset_probe_timeout()
       probe_timeout(r);
     });
   double t = g_conf->mon_probe_timeout;
-  timer.add_event_after(t, probe_timeout_event);
-  dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
+  if (timer.add_event_after(t, probe_timeout_event)) {
+    dout(10) << "reset_probe_timeout " << probe_timeout_event
+            << " after " << t << " seconds" << dendl;
+  } else {
+    probe_timeout_event = nullptr;
+  }
 }
 
 void Monitor::probe_timeout(int r)
@@ -1849,7 +1861,7 @@ void Monitor::start_election()
   logger->inc(l_mon_num_elections);
   logger->inc(l_mon_election_call);
 
-  clog->info() << "mon." << name << " calling new monitor election";
+  clog->info() << "mon." << name << " calling monitor election";
   elector.call_election();
 }
 
@@ -1867,13 +1879,13 @@ void Monitor::win_standalone_election()
   set<int> q;
   q.insert(rank);
 
-  const MonCommand *my_cmds;
-  int cmdsize;
-  get_locally_supported_monitor_commands(&my_cmds, &cmdsize);
+  map<int,Metadata> metadata;
+  collect_metadata(&metadata[0]);
+
   win_election(elector.get_epoch(), q,
                CEPH_FEATURES_ALL,
                ceph::features::mon::get_supported(),
-               my_cmds, cmdsize);
+              metadata);
 }
 
 const utime_t& Monitor::get_leader_since() const
@@ -1901,7 +1913,7 @@ void Monitor::_finish_svc_election()
 
 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
                            const mon_feature_t& mon_features,
-                           const MonCommand *cmdset, int cmdsize)
+                          const map<int,Metadata>& metadata)
 {
   dout(10) << __func__ << " epoch " << epoch << " quorum " << active
           << " features " << features
@@ -1914,12 +1926,13 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
   quorum = active;
   quorum_con_features = features;
   quorum_mon_features = mon_features;
+  pending_metadata = metadata;
   outside_quorum.clear();
 
-  clog->info() << "mon." << name << "@" << rank
-               << " won leader election with quorum " << quorum;
+  clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
+      << " in quorum (ranks " << quorum << ")";
 
-  set_leader_supported_commands(cmdset, cmdsize);
+  set_leader_commands(get_local_commands(mon_features));
 
   paxos->leader_init();
   // NOTE: tell monmap monitor first.  This is important for the
@@ -1933,19 +1946,53 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
 
   logger->inc(l_mon_election_win);
 
+  // inject new metadata in first transaction.
+  {
+    // include previous metadata for missing mons (that aren't part of
+    // the current quorum).
+    map<int,Metadata> m = metadata;
+    for (unsigned rank = 0; rank < monmap->size(); ++rank) {
+      if (m.count(rank) == 0 &&
+         mon_metadata.count(rank)) {
+       m[rank] = mon_metadata[rank];
+      }
+    }
+
+    // FIXME: This is a bit sloppy because we aren't guaranteed to submit
+    // a new transaction immediately after the election finishes.  We should
+    // do that anyway for other reasons, though.
+    MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
+    bufferlist bl;
+    ::encode(m, bl);
+    t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
+  }
+
   finish_election();
   if (monmap->size() > 1 &&
       monmap->get_epoch() > 0) {
     timecheck_start();
     health_tick_start();
-    do_health_to_clog_interval();
+
+    // Freshen the health status before doing health_to_clog in case
+    // our just-completed election changed the health
+    healthmon()->wait_for_active_ctx(new FunctionContext([this](int r){
+      dout(20) << "healthmon now active" << dendl;
+      healthmon()->tick();
+      if (healthmon()->is_proposing()) {
+        dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
+        healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
+              [this](int r){
+                assert(lock.is_locked_by_me());
+                do_health_to_clog_interval();
+              }));
+
+      } else {
+        do_health_to_clog_interval();
+      }
+    }));
+
     scrub_event_start();
   }
-
-  Metadata my_meta;
-  collect_sys_info(&my_meta, g_ceph_context);
-  my_meta["addr"] = stringify(messenger->get_myaddr());
-  update_mon_metadata(rank, std::move(my_meta));
 }
 
 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
@@ -1972,14 +2019,22 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
 
   finish_election();
 
-  if (quorum_con_features & CEPH_FEATURE_MON_METADATA) {
+  if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
+      !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
+    // for pre-luminous mons only
     Metadata sys_info;
-    collect_sys_info(&sys_info, g_ceph_context);
+    collect_metadata(&sys_info);
     messenger->send_message(new MMonMetadata(sys_info),
                            monmap->get_inst(get_leader()));
   }
 }
 
+void Monitor::collect_metadata(Metadata *m)
+{
+  collect_sys_info(m, g_ceph_context);
+  (*m)["addr"] = stringify(messenger->get_myaddr());
+}
+
 void Monitor::finish_election()
 {
   apply_quorum_to_compatset_features();
@@ -2057,6 +2112,13 @@ void Monitor::apply_monmap_to_compatset_features()
     assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
   }
+  if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
+    assert(ceph::features::mon::get_persistent().contains_all(
+           ceph::features::mon::FEATURE_LUMINOUS));
+    // this feature should only ever be set if the quorum supports it.
+    assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
+    new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
+  }
 
   dout(5) << __func__ << dendl;
   _apply_compatset_features(new_features);
@@ -2082,6 +2144,9 @@ void Monitor::calc_quorum_requirements()
   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
     required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
   }
+  if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
+    required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
+  }
 
   // monmap
   if (monmap->get_required_features().contains_all(
@@ -2258,14 +2323,13 @@ void Monitor::health_tick_start()
   dout(15) << __func__ << dendl;
 
   health_tick_stop();
-  health_tick_event = new C_MonContext(this, [this](int r) {
-      if (r < 0)
-        return;
-      do_health_to_clog();
-      health_tick_start();
-    });
-  timer.add_event_after(cct->_conf->mon_health_to_clog_tick_interval,
-                        health_tick_event);
+  health_tick_event = timer.add_event_after(
+    cct->_conf->mon_health_to_clog_tick_interval,
+    new C_MonContext(this, [this](int r) {
+       if (r < 0)
+         return;
+       health_tick_start();
+      }));
 }
 
 void Monitor::health_tick_stop()
@@ -2312,7 +2376,9 @@ void Monitor::health_interval_start()
         return;
       do_health_to_clog_interval();
     });
-  timer.add_event_at(next, health_interval_event);
+  if (!timer.add_event_at(next, health_interval_event)) {
+    health_interval_event = nullptr;
+  }
 }
 
 void Monitor::health_interval_stop()
@@ -2392,27 +2458,224 @@ void Monitor::do_health_to_clog(bool force)
 
   dout(10) << __func__ << (force ? " (force)" : "") << dendl;
 
-  list<string> status;
-  health_status_t overall = get_health(status, NULL, NULL);
+  if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+    string summary;
+    health_status_t level = get_health_status(false, nullptr, &summary);
+    if (!force &&
+       summary == health_status_cache.summary &&
+       level == health_status_cache.overall)
+      return;
+    clog->health(level) << "overall " << summary;
+    health_status_cache.summary = summary;
+    health_status_cache.overall = level;
+  } else {
+    // for jewel only
+    list<string> status;
+    health_status_t overall = get_health(status, NULL, NULL);
+    dout(25) << __func__
+            << (force ? " (force)" : "")
+            << dendl;
 
-  dout(25) << __func__
-           << (force ? " (force)" : "")
-           << dendl;
+    string summary = joinify(status.begin(), status.end(), string("; "));
+
+    if (!force &&
+       overall == health_status_cache.overall &&
+       !health_status_cache.summary.empty() &&
+       health_status_cache.summary == summary) {
+      // we got a dup!
+      return;
+    }
+
+    clog->info() << summary;
+
+    health_status_cache.overall = overall;
+    health_status_cache.summary = summary;
+  }
+}
+
+health_status_t Monitor::get_health_status(
+  bool want_detail,
+  Formatter *f,
+  std::string *plain,
+  const char *sep1,
+  const char *sep2)
+{
+  health_status_t r = HEALTH_OK;
+  bool compat = g_conf->mon_health_preluminous_compat;
+  bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
+  if (f) {
+    f->open_object_section("health");
+    f->open_object_section("checks");
+  }
+
+  string summary;
+  string *psummary = f ? nullptr : &summary;
+  for (auto& svc : paxos_service) {
+    r = std::min(r, svc->get_health_checks().dump_summary(
+                  f, psummary, sep2, want_detail));
+  }
+
+  if (f) {
+    f->close_section();
+    f->dump_stream("status") << r;
+  } else {
+    // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
+    *plain = stringify(r);
+    if (summary.size()) {
+      *plain += sep1;
+      *plain += summary;
+    }
+    *plain += "\n";
+  }
+
+  const std::string old_fields_message = "'ceph health' JSON format has "
+    "changed in luminous. If you see this your monitoring system is "
+    "scraping the wrong fields. Disable this with 'mon health preluminous "
+    "compat warning = false'";
+
+  if (f && (compat || compat_warn)) {
+    health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
+    f->open_array_section("summary");
+    if (compat_warn) {
+      f->open_object_section("item");
+      f->dump_stream("severity") << HEALTH_WARN;
+      f->dump_string("summary", old_fields_message);
+      f->close_section();
+    }
+    if (compat) {
+      for (auto& svc : paxos_service) {
+        svc->get_health_checks().dump_summary_compat(f);
+      }
+    }
+    f->close_section();
+    f->dump_stream("overall_status") << cr;
+  }
+
+  if (want_detail) {
+    if (f && (compat || compat_warn)) {
+      f->open_array_section("detail");
+      if (compat_warn) {
+       f->dump_string("item", old_fields_message);
+      }
+    }
 
-  string summary = joinify(status.begin(), status.end(), string("; "));
+    for (auto& svc : paxos_service) {
+      svc->get_health_checks().dump_detail(f, plain, compat);
+    }
 
-  if (!force &&
-      overall == health_status_cache.overall &&
-      !health_status_cache.summary.empty() &&
-      health_status_cache.summary == summary) {
-    // we got a dup!
+    if (f && (compat || compat_warn)) {
+      f->close_section();
+    }
+  }
+  if (f) {
+    f->close_section();
+  }
+  return r;
+}
+
+void Monitor::log_health(
+  const health_check_map_t& updated,
+  const health_check_map_t& previous,
+  MonitorDBStore::TransactionRef t)
+{
+  if (!g_conf->mon_health_to_clog) {
     return;
   }
 
-  clog->info() << summary;
+  const utime_t now = ceph_clock_now();
+
+  // FIXME: log atomically as part of @t instead of using clog.
+  dout(10) << __func__ << " updated " << updated.checks.size()
+          << " previous " << previous.checks.size()
+          << dendl;
+  const auto min_log_period = g_conf->get_val<int64_t>(
+      "mon_health_log_update_period");
+  for (auto& p : updated.checks) {
+    auto q = previous.checks.find(p.first);
+    bool logged = false;
+    if (q == previous.checks.end()) {
+      // new
+      ostringstream ss;
+      ss << "Health check failed: " << p.second.summary << " ("
+         << p.first << ")";
+      clog->health(p.second.severity) << ss.str();
+
+      logged = true;
+    } else {
+      if (p.second.summary != q->second.summary ||
+         p.second.severity != q->second.severity) {
+
+        auto status_iter = health_check_log_times.find(p.first);
+        if (status_iter != health_check_log_times.end()) {
+          if (p.second.severity == q->second.severity &&
+              now - status_iter->second.updated_at < min_log_period) {
+            // We already logged this recently and the severity is unchanged,
+            // so skip emitting an update of the summary string.
+            // We'll get an update out of tick() later if the check
+            // is still failing.
+            continue;
+          }
+        }
+
+        // summary or severity changed (ignore detail changes at this level)
+        ostringstream ss;
+        ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
+        clog->health(p.second.severity) << ss.str();
+
+        logged = true;
+      }
+    }
+    // Record the time at which we last logged, so that we can check this
+    // when considering whether/when to print update messages.
+    if (logged) {
+      auto iter = health_check_log_times.find(p.first);
+      if (iter == health_check_log_times.end()) {
+        health_check_log_times.emplace(p.first, HealthCheckLogStatus(
+          p.second.severity, p.second.summary, now));
+      } else {
+        iter->second = HealthCheckLogStatus(
+          p.second.severity, p.second.summary, now);
+      }
+    }
+  }
+  for (auto& p : previous.checks) {
+    if (!updated.checks.count(p.first)) {
+      // cleared
+      ostringstream ss;
+      if (p.first == "DEGRADED_OBJECTS") {
+        clog->info() << "All degraded objects recovered";
+      } else if (p.first == "OSD_FLAGS") {
+        clog->info() << "OSD flags cleared";
+      } else {
+        clog->info() << "Health check cleared: " << p.first << " (was: "
+                     << p.second.summary << ")";
+      }
+
+      if (health_check_log_times.count(p.first)) {
+        health_check_log_times.erase(p.first);
+      }
+    }
+  }
+
+  if (previous.checks.size() && updated.checks.size() == 0) {
+    // We might be going into a fully healthy state, check
+    // other subsystems
+    bool any_checks = false;
+    for (auto& svc : paxos_service) {
+      if (&(svc->get_health_checks()) == &(previous)) {
+        // Ignore the ones we're clearing right now
+        continue;
+      }
 
-  health_status_cache.overall = overall;
-  health_status_cache.summary = summary;
+      if (svc->get_health_checks().checks.size() > 0) {
+        any_checks = true;
+        break;
+      }
+    }
+    if (!any_checks) {
+      clog->info() << "Cluster is now healthy";
+    }
+  }
 }
 
 health_status_t Monitor::get_health(list<string>& status,
@@ -2432,52 +2695,29 @@ health_status_t Monitor::get_health(list<string>& status,
     s->get_health(summary, detailbl ? &detail : NULL, cct);
   }
 
-  health_monitor->get_health(f, summary, (detailbl ? &detail : NULL));
-
-  if (f) {
-    f->open_object_section("timechecks");
-    f->dump_unsigned("epoch", get_epoch());
-    f->dump_int("round", timecheck_round);
-    f->dump_stream("round_status")
-      << ((timecheck_round%2) ? "on-going" : "finished");
-  }
+  health_monitor->get_health(summary, (detailbl ? &detail : NULL));
 
   health_status_t overall = HEALTH_OK;
   if (!timecheck_skews.empty()) {
     list<string> warns;
-    if (f)
-      f->open_array_section("mons");
     for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
          i != timecheck_skews.end(); ++i) {
       entity_inst_t inst = i->first;
       double skew = i->second;
       double latency = timecheck_latencies[inst];
       string name = monmap->get_name(inst.addr);
-
       ostringstream tcss;
       health_status_t tcstatus = timecheck_status(tcss, skew, latency);
       if (tcstatus != HEALTH_OK) {
         if (overall > tcstatus)
           overall = tcstatus;
         warns.push_back(name);
-
         ostringstream tmp_ss;
         tmp_ss << "mon." << name
                << " addr " << inst.addr << " " << tcss.str()
               << " (latency " << latency << "s)";
         detail.push_back(make_pair(tcstatus, tmp_ss.str()));
       }
-
-      if (f) {
-        f->open_object_section("mon");
-        f->dump_string("name", name.c_str());
-        f->dump_float("skew", skew);
-        f->dump_float("latency", latency);
-        f->dump_stream("health") << tcstatus;
-        if (tcstatus != HEALTH_OK)
-          f->dump_stream("details") << tcss.str();
-        f->close_section();
-      }
     }
     if (!warns.empty()) {
       ostringstream ss;
@@ -2491,11 +2731,7 @@ health_status_t Monitor::get_health(list<string>& status,
       status.push_back(ss.str());
       summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
     }
-    if (f)
-      f->close_section();
   }
-  if (f)
-    f->close_section();
 
   if (f)
     f->open_array_section("summary");
@@ -2547,12 +2783,14 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
   if (f)
     f->open_object_section("status");
 
-  // reply with the status for all the components
-  list<string> health;
-  get_health(health, NULL, f);
-
   if (f) {
     f->dump_stream("fsid") << monmap->get_fsid();
+    if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      get_health_status(false, f, nullptr);
+    } else {
+      list<string> health_str;
+      get_health(health_str, nullptr, f);
+    }
     f->dump_unsigned("election_epoch", get_epoch());
     {
       f->open_array_section("quorum");
@@ -2568,7 +2806,7 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     monmap->dump(f);
     f->close_section();
     f->open_object_section("osdmap");
-    osdmon()->osdmap.print_summary(f, cout);
+    osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
     f->close_section();
     f->open_object_section("pgmap");
     pgservice->print_summary(f, NULL);
@@ -2576,43 +2814,67 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     f->open_object_section("fsmap");
     mdsmon()->get_fsmap().print_summary(f, NULL);
     f->close_section();
-
     f->open_object_section("mgrmap");
     mgrmon()->get_map().print_summary(f, nullptr);
     f->close_section();
+
+    f->dump_object("servicemap", mgrstatmon()->get_service_map());
     f->close_section();
   } else {
-
     ss << "  cluster:\n";
     ss << "    id:     " << monmap->get_fsid() << "\n";
-    ss << "    health: " << joinify(health.begin(), health.end(), 
-                                 string("\n            ")) << "\n";
+
+    string health;
+    if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      get_health_status(false, nullptr, &health,
+                       "\n            ", "\n            ");
+    } else {
+      list<string> ls;
+      get_health(ls, NULL, f);
+      health = joinify(ls.begin(), ls.end(),
+                      string("\n            "));
+    }
+    ss << "    health: " << health << "\n";
+
     ss << "\n \n  services:\n";
-    const auto quorum_names = get_quorum_names();
-    const auto mon_count = monmap->mon_info.size();
-    ss << "    mon: " << mon_count << " daemons, quorum "
-       << quorum_names;
-    if (quorum_names.size() != mon_count) {
-      std::list<std::string> out_of_q;
-      for (size_t i = 0; i < monmap->ranks.size(); ++i) {
-        if (quorum.count(i) == 0) {
-          out_of_q.push_back(monmap->ranks[i]);
-        }
+    {
+      size_t maxlen = 3;
+      auto& service_map = mgrstatmon()->get_service_map();
+      for (auto& p : service_map.services) {
+       maxlen = std::max(maxlen, p.first.size());
+      }
+      string spacing(maxlen - 3, ' ');
+      const auto quorum_names = get_quorum_names();
+      const auto mon_count = monmap->mon_info.size();
+      ss << "    mon: " << spacing << mon_count << " daemons, quorum "
+        << quorum_names;
+      if (quorum_names.size() != mon_count) {
+       std::list<std::string> out_of_q;
+       for (size_t i = 0; i < monmap->ranks.size(); ++i) {
+         if (quorum.count(i) == 0) {
+           out_of_q.push_back(monmap->ranks[i]);
+         }
+       }
+       ss << ", out of quorum: " << joinify(out_of_q.begin(),
+                                            out_of_q.end(), std::string(", "));
       }
-      ss << ", out of quorum: " << joinify(out_of_q.begin(),
-                                           out_of_q.end(), std::string(", "));
-    }
-    ss << "\n";
-    if (mgrmon()->in_use()) {
-      ss << "    mgr: ";
-      mgrmon()->get_map().print_summary(nullptr, &ss);
       ss << "\n";
+      if (mgrmon()->in_use()) {
+       ss << "    mgr: " << spacing;
+       mgrmon()->get_map().print_summary(nullptr, &ss);
+       ss << "\n";
+      }
+      if (mdsmon()->get_fsmap().filesystem_count() > 0) {
+       ss << "    mds: " << spacing << mdsmon()->get_fsmap() << "\n";
+      }
+      ss << "    osd: " << spacing;
+      osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
+      ss << "\n";
+      for (auto& p : service_map.services) {
+       ss << "    " << p.first << ": " << string(maxlen - p.first.size(), ' ')
+          << p.second.get_summary() << "\n";
+      }
     }
-    if (mdsmon()->get_fsmap().filesystem_count() > 0) {
-      ss << "    mds: " << mdsmon()->get_fsmap() << "\n";
-    }
-    ss << "    osd: ";
-    osdmon()->osdmap.print_summary(NULL, ss);
 
     ss << "\n \n  data:\n";
     pgservice->print_summary(NULL, &ss);
@@ -2642,18 +2904,16 @@ void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
   }
 }
 
-const MonCommand *Monitor::_get_moncommand(const string &cmd_prefix,
-                                           MonCommand *cmds, int cmds_size)
+const MonCommand *Monitor::_get_moncommand(
+  const string &cmd_prefix,
+  const vector<MonCommand>& cmds)
 {
-  MonCommand *this_cmd = NULL;
-  for (MonCommand *cp = cmds;
-       cp < &cmds[cmds_size]; cp++) {
-    if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
-      this_cmd = cp;
-      break;
+  for (auto& c : cmds) {
+    if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
+      return &c;
     }
   }
-  return this_cmd;
+  return nullptr;
 }
 
 bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
@@ -2676,26 +2936,23 @@ bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
   return capable;
 }
 
-void Monitor::format_command_descriptions(const MonCommand *commands,
-                                         unsigned commands_size,
+void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
                                          Formatter *f,
                                          bufferlist *rdata,
                                          bool hide_mgr_flag)
 {
   int cmdnum = 0;
   f->open_object_section("command_descriptions");
-  for (const MonCommand *cp = commands;
-       cp < &commands[commands_size]; cp++) {
-
-    unsigned flags = cp->flags;
+  for (const auto &cmd : commands) {
+    unsigned flags = cmd.flags;
     if (hide_mgr_flag) {
       flags &= ~MonCommand::FLAG_MGR;
     }
     ostringstream secname;
     secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
     dump_cmddesc_to_json(f, secname.str(),
-                        cp->cmdstring, cp->helpstring, cp->module,
-                        cp->req_perms, cp->availability, flags);
+                        cmd.cmdstring, cmd.helpstring, cmd.module,
+                        cmd.req_perms, cmd.availability, flags);
     cmdnum++;
   }
   f->close_section();  // command_descriptions
@@ -2703,20 +2960,6 @@ void Monitor::format_command_descriptions(const MonCommand *commands,
   f->flush(*rdata);
 }
 
-void Monitor::get_locally_supported_monitor_commands(const MonCommand **cmds,
-                                                    int *count)
-{
-  *cmds = mon_commands;
-  *count = ARRAY_SIZE(mon_commands);
-}
-void Monitor::set_leader_supported_commands(const MonCommand *cmds, int size)
-{
-  if (leader_supported_mon_commands != mon_commands)
-    delete[] leader_supported_mon_commands;
-  leader_supported_mon_commands = cmds;
-  leader_supported_mon_commands_size = size;
-}
-
 bool Monitor::is_keyring_required()
 {
   string auth_cluster_required = g_conf->auth_supported.empty() ?
@@ -2806,9 +3049,21 @@ void Monitor::handle_command(MonOpRequestRef op)
     // hide mgr commands until luminous upgrade is complete
     bool hide_mgr_flag =
       osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
-    format_command_descriptions(leader_supported_mon_commands,
-                               leader_supported_mon_commands_size, f, &rdata,
-                               hide_mgr_flag);
+
+    std::vector<MonCommand> commands;
+
+    // only include mgr commands once all mons are upgrade (and we've dropped
+    // the hard-coded PGMonitor commands)
+    if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
+      commands = static_cast<MgrMonitor*>(
+        paxos_service[PAXOS_MGR])->get_command_descs();
+    }
+
+    for (auto& c : leader_mon_commands) {
+      commands.push_back(c);
+    }
+
+    format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
     delete f;
     reply_command(op, 0, "", rdata, 0);
     return;
@@ -2838,17 +3093,26 @@ void Monitor::handle_command(MonOpRequestRef op)
   // validate command is in leader map
 
   const MonCommand *leader_cmd;
-  leader_cmd = _get_moncommand(prefix,
-                               // the boost underlying this isn't const for some reason
-                               const_cast<MonCommand*>(leader_supported_mon_commands),
-                               leader_supported_mon_commands_size);
+  const auto& mgr_cmds = mgrmon()->get_command_descs();
+  const MonCommand *mgr_cmd = nullptr;
+  if (!mgr_cmds.empty()) {
+    mgr_cmd = _get_moncommand(prefix, mgr_cmds);
+  }
+  leader_cmd = _get_moncommand(prefix, leader_mon_commands);
   if (!leader_cmd) {
-    reply_command(op, -EINVAL, "command not known", 0);
-    return;
+    leader_cmd = mgr_cmd;
+    if (!leader_cmd) {
+      reply_command(op, -EINVAL, "command not known", 0);
+      return;
+    }
   }
   // validate command is in our map & matches, or forward if it is allowed
-  const MonCommand *mon_cmd = _get_moncommand(prefix, mon_commands,
-                                              ARRAY_SIZE(mon_commands));
+  const MonCommand *mon_cmd = _get_moncommand(
+    prefix,
+    get_local_commands(quorum_mon_features));
+  if (!mon_cmd) {
+    mon_cmd = mgr_cmd;
+  }
   if (!is_leader()) {
     if (!mon_cmd) {
       if (leader_cmd->is_noforward()) {
@@ -2923,8 +3187,8 @@ void Monitor::handle_command(MonOpRequestRef op)
       osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
     const auto& hdr = m->get_header();
     uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
-    uint64_t max =
-      g_conf->mon_client_bytes * g_conf->mon_mgr_proxy_client_bytes_ratio;
+    uint64_t max = g_conf->get_val<uint64_t>("mon_client_bytes")
+                 * g_conf->get_val<double>("mon_mgr_proxy_client_bytes_ratio");
     if (mgr_proxy_bytes + size > max) {
       dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
               << " + " << size << " > max " << max << dendl;
@@ -2943,7 +3207,8 @@ void Monitor::handle_command(MonOpRequestRef op)
     return;
   }
 
-  if (module == "mds" || module == "fs") {
+  if ((module == "mds" || module == "fs")  &&
+      prefix != "fs authorize") {
     mdsmon()->dispatch(op);
     return;
   }
@@ -2972,7 +3237,7 @@ void Monitor::handle_command(MonOpRequestRef op)
     monmon()->dispatch(op);
     return;
   }
-  if (module == "auth") {
+  if (module == "auth" || prefix == "fs authorize") {
     authmon()->dispatch(op);
     return;
   }
@@ -3026,7 +3291,7 @@ void Monitor::handle_command(MonOpRequestRef op)
     end -= start;
     dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
     ostringstream oss;
-    oss << "compacted leveldb in " << end;
+    oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
     rs = oss.str();
     r = 0;
   }
@@ -3044,6 +3309,51 @@ void Monitor::handle_command(MonOpRequestRef op)
       rs = "must supply options to be parsed in a single string";
       r = -EINVAL;
     }
+  } else if (prefix == "time-sync-status") {
+    if (!f)
+      f.reset(Formatter::create("json-pretty"));
+    f->open_object_section("time_sync");
+    if (!timecheck_skews.empty()) {
+      f->open_object_section("time_skew_status");
+      for (auto& i : timecheck_skews) {
+       entity_inst_t inst = i.first;
+       double skew = i.second;
+       double latency = timecheck_latencies[inst];
+       string name = monmap->get_name(inst.addr);
+       ostringstream tcss;
+       health_status_t tcstatus = timecheck_status(tcss, skew, latency);
+       f->open_object_section(name.c_str());
+       f->dump_float("skew", skew);
+       f->dump_float("latency", latency);
+       f->dump_stream("health") << tcstatus;
+       if (tcstatus != HEALTH_OK) {
+         f->dump_stream("details") << tcss.str();
+       }
+       f->close_section();
+      }
+      f->close_section();
+    }
+    f->open_object_section("timechecks");
+    f->dump_unsigned("epoch", get_epoch());
+    f->dump_int("round", timecheck_round);
+    f->dump_stream("round_status") << ((timecheck_round%2) ?
+                                      "on-going" : "finished");
+    f->close_section();
+    f->close_section();
+    f->flush(rdata);
+    r = 0;
+    rs = "";
+  } else if (prefix == "config set") {
+    std::string key;
+    cmd_getval(cct, cmdmap, "key", key);
+    std::string val;
+    cmd_getval(cct, cmdmap, "value", val);
+    r = g_conf->set_val(key, val, true, &ss);
+    if (r == 0) {
+      g_conf->apply_changes(nullptr);
+    }
+    rs = ss.str();
+    goto out;
   } else if (prefix == "status" ||
             prefix == "health" ||
             prefix == "df") {
@@ -3060,25 +3370,35 @@ void Monitor::handle_command(MonOpRequestRef op)
       }
       rdata.append(ds);
     } else if (prefix == "health") {
-      list<string> health_str;
-      get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
-      if (f) {
-        f->flush(ds);
-        ds << '\n';
+      if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+       string plain;
+       get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
+       if (f) {
+         f->flush(rdata);
+       } else {
+         rdata.append(plain);
+       }
       } else {
-       assert(!health_str.empty());
-       ds << health_str.front();
-       health_str.pop_front();
-       if (!health_str.empty()) {
-         ds << ' ';
-         ds << joinify(health_str.begin(), health_str.end(), string("; "));
+       list<string> health_str;
+       get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
+       if (f) {
+         f->flush(ds);
+         ds << '\n';
+       } else {
+         assert(!health_str.empty());
+         ds << health_str.front();
+         health_str.pop_front();
+         if (!health_str.empty()) {
+           ds << ' ';
+           ds << joinify(health_str.begin(), health_str.end(), string("; "));
+         }
        }
+       bufferlist comb;
+       comb.append(ds);
+       if (detail == "detail")
+         comb.append(rdata);
+       rdata = comb;
       }
-      bufferlist comb;
-      comb.append(ds);
-      if (detail == "detail")
-       comb.append(rdata);
-      rdata = comb;
     } else if (prefix == "df") {
       bool verbose = (detail == "detail");
       if (f)
@@ -3119,8 +3439,12 @@ void Monitor::handle_command(MonOpRequestRef op)
       tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
     f->dump_string("tag", tagstr);
 
-    list<string> hs;
-    get_health(hs, NULL, f.get());
+    if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      get_health_status(true, f.get(), nullptr);
+    } else {
+      list<string> health_str;
+      get_health(health_str, nullptr, f.get());
+    }
 
     monmon()->dump_info(f.get());
     osdmon()->dump_info(f.get());
@@ -3329,6 +3653,65 @@ void Monitor::handle_command(MonOpRequestRef op)
     rdata.append(ds);
     rs = "";
     r = 0;
+  } else if (prefix == "versions") {
+    if (!f)
+      f.reset(Formatter::create("json-pretty"));
+    map<string,int> overall;
+    f->open_object_section("version");
+    map<string,int> mon, mgr, osd, mds;
+
+    count_metadata("ceph_version", &mon);
+    f->open_object_section("mon");
+    for (auto& p : mon) {
+      f->dump_int(p.first.c_str(), p.second);
+      overall[p.first] += p.second;
+    }
+    f->close_section();
+
+    mgrmon()->count_metadata("ceph_version", &mgr);
+    f->open_object_section("mgr");
+    for (auto& p : mgr) {
+      f->dump_int(p.first.c_str(), p.second);
+      overall[p.first] += p.second;
+    }
+    f->close_section();
+
+    osdmon()->count_metadata("ceph_version", &osd);
+    f->open_object_section("osd");
+    for (auto& p : osd) {
+      f->dump_int(p.first.c_str(), p.second);
+      overall[p.first] += p.second;
+    }
+    f->close_section();
+
+    mdsmon()->count_metadata("ceph_version", &mds);
+    f->open_object_section("mds");
+    for (auto& p : mds) {
+      f->dump_int(p.first.c_str(), p.second);
+      overall[p.first] += p.second;
+    }
+    f->close_section();
+
+    for (auto& p : mgrstatmon()->get_service_map().services) {
+      f->open_object_section(p.first.c_str());
+      map<string,int> m;
+      p.second.count_metadata("ceph_version", &m);
+      for (auto& q : m) {
+       f->dump_int(q.first.c_str(), q.second);
+       overall[q.first] += q.second;
+      }
+      f->close_section();
+    }
+
+    f->open_object_section("overall");
+    for (auto& p : overall) {
+      f->dump_int(p.first.c_str(), p.second);
+    }
+    f->close_section();
+    f->close_section();
+    f->flush(rdata);
+    rs = "";
+    r = 0;
   }
 
  out:
@@ -3648,7 +4031,8 @@ void Monitor::resend_routed_requests()
 
 void Monitor::remove_session(MonSession *s)
 {
-  dout(10) << "remove_session " << s << " " << s->inst << dendl;
+  dout(10) << "remove_session " << s << " " << s->inst
+          << " features 0x" << std::hex << s->con_features << std::dec << dendl;
   assert(s->con);
   assert(!s->closed);
   for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
@@ -3741,6 +4125,30 @@ void Monitor::_ms_dispatch(Message *m)
   if (s && s->closed) {
     return;
   }
+
+  if (src_is_mon && s) {
+    ConnectionRef con = m->get_connection();
+    if (con->get_messenger() && con->get_features() != s->con_features) {
+      // only update features if this is a non-anonymous connection
+      dout(10) << __func__ << " feature change for " << m->get_source_inst()
+               << " (was " << s->con_features
+               << ", now " << con->get_features() << ")" << dendl;
+      // connection features changed - recreate session.
+      if (s->con && s->con != con) {
+        dout(10) << __func__ << " connection for " << m->get_source_inst()
+                 << " changed from session; mark down and replace" << dendl;
+        s->con->mark_down();
+      }
+      if (s->item.is_on_list()) {
+        // forwarded messages' sessions are not in the sessions map and
+        // exist only while the op is being handled.
+        remove_session(s);
+      }
+      s->put();
+      s = nullptr;
+    }
+  }
+
   if (!s) {
     // if the sender is not a monitor, make sure their first message for a
     // session is an MAuth.  If it is not, assume it's a stray message,
@@ -3762,7 +4170,9 @@ void Monitor::_ms_dispatch(Message *m)
     }
     assert(s);
     con->set_priv(s->get());
-    dout(10) << __func__ << " new session " << s << " " << *s << dendl;
+    dout(10) << __func__ << " new session " << s << " " << *s
+            << " features 0x" << std::hex
+            << s->con_features << std::dec << dendl;
     op->set_session(s);
 
     logger->set(l_mon_num_sessions, session_map.get_size());
@@ -3882,8 +4292,16 @@ void Monitor::dispatch_op(MonOpRequestRef op)
       break;
 
     // MgrStat
-    case MSG_MON_MGR_REPORT:
     case CEPH_MSG_STATFS:
+      // this is an ugly hack, sorry!  force the version to 1 so that we do
+      // not run afoul of the is_readable() paxos check.  the client is going
+      // by the pgmonitor version and the MgrStatMonitor version will lag behind
+      // that until we complete the upgrade.  The paxos ordering crap really
+      // doesn't matter for statfs results, so just kludge around it here.
+      if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+       ((MStatfs*)op->get_req())->version = 1;
+      }
+    case MSG_MON_MGR_REPORT:
     case MSG_GETPOOLSTATS:
       paxos_service[PAXOS_MGRSTAT]->dispatch(op);
       break;
@@ -4040,6 +4458,11 @@ void Monitor::dispatch_op(MonOpRequestRef op)
       health_monitor->dispatch(op);
       break;
 
+    case MSG_MON_HEALTH_CHECKS:
+      op->set_type_service();
+      paxos_service[PAXOS_HEALTH]->dispatch(op);
+      break;
+
     default:
       dealt_with = false;
       break;
@@ -4064,8 +4487,13 @@ void Monitor::handle_ping(MonOpRequestRef op)
   boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
   f->open_object_section("pong");
 
-  list<string> health_str;
-  get_health(health_str, NULL, f.get());
+  if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+    get_health_status(false, f.get(), nullptr);
+  } else {
+    list<string> health_str;
+    get_health(health_str, nullptr, f.get());
+  }
+
   {
     stringstream ss;
     get_mon_status(f.get(), ss);
@@ -4196,10 +4624,11 @@ void Monitor::timecheck_reset_event()
            << " rounds_since_clean " << timecheck_rounds_since_clean
            << dendl;
 
-  timecheck_event = new C_MonContext(this, [this](int) {
-      timecheck_start_round();
-    });
-  timer.add_event_after(delay, timecheck_event);
+  timecheck_event = timer.add_event_after(
+    delay,
+    new C_MonContext(this, [this](int) {
+       timecheck_start_round();
+      }));
 }
 
 void Monitor::timecheck_check_skews()
@@ -4430,10 +4859,9 @@ void Monitor::handle_timecheck_leader(MonOpRequestRef op)
 
   ostringstream ss;
   health_status_t status = timecheck_status(ss, skew_bound, latency);
-  if (status == HEALTH_ERR)
-    clog->error() << other << " " << ss.str();
-  else if (status == HEALTH_WARN)
-    clog->warn() << other << " " << ss.str();
+  if (status != HEALTH_OK) {
+    clog->health(status) << other << " " << ss.str();
+  }
 
   dout(10) << __func__ << " from " << other << " ts " << m->timestamp
           << " delta " << delta << " skew_bound " << skew_bound
@@ -4583,6 +5011,8 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
       logmon()->check_sub(s->sub_map[p->first]);
     } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
       mgrmon()->check_sub(s->sub_map[p->first]);
+    } else if (p->first == "servicemap") {
+      mgrstatmon()->check_sub(s->sub_map[p->first]);
     }
   }
 
@@ -4705,67 +5135,59 @@ void Monitor::handle_mon_metadata(MonOpRequestRef op)
 
 void Monitor::update_mon_metadata(int from, Metadata&& m)
 {
-  pending_metadata.insert(make_pair(from, std::move(m)));
-
-  bufferlist bl;
-  int err = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
-  map<int, Metadata> last_metadata;
-  if (!err) {
-    bufferlist::iterator iter = bl.begin();
-    ::decode(last_metadata, iter);
-    pending_metadata.insert(last_metadata.begin(), last_metadata.end());
-  }
+  // NOTE: this is now for legacy (kraken or jewel) mons only.
+  pending_metadata[from] = std::move(m);
 
   MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
-  bl.clear();
+  bufferlist bl;
   ::encode(pending_metadata, bl);
   t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
   paxos->trigger_propose();
 }
 
-int Monitor::load_metadata(map<int, Metadata>& metadata)
+int Monitor::load_metadata()
 {
   bufferlist bl;
   int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
   if (r)
     return r;
   bufferlist::iterator it = bl.begin();
-  ::decode(metadata, it);
+  ::decode(mon_metadata, it);
+
+  pending_metadata = mon_metadata;
   return 0;
 }
 
 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
 {
   assert(f);
-  map<int, Metadata> last_metadata;
-  if (int r = load_metadata(last_metadata)) {
-    err << "Unable to load metadata: " << cpp_strerror(r);
-    return r;
-  }
-  if (!last_metadata.count(mon)) {
+  if (!mon_metadata.count(mon)) {
     err << "mon." << mon << " not found";
     return -EINVAL;
   }
-  const Metadata& m = last_metadata[mon];
+  const Metadata& m = mon_metadata[mon];
   for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
     f->dump_string(p->first.c_str(), p->second);
   }
   return 0;
 }
 
-void Monitor::count_metadata(const string& field, Formatter *f)
+void Monitor::count_metadata(const string& field, map<string,int> *out)
 {
-  map<int, Metadata> meta;
-  load_metadata(meta);
-  map<string,int> by_val;
-  for (auto& p : meta) {
+  for (auto& p : mon_metadata) {
     auto q = p.second.find(field);
     if (q == p.second.end()) {
-      by_val["unknown"]++;
+      (*out)["unknown"]++;
     } else {
-      by_val[q->second]++;
+      (*out)[q->second]++;
     }
   }
+}
+
+void Monitor::count_metadata(const string& field, Formatter *f)
+{
+  map<string,int> by_val;
+  count_metadata(field, &by_val);
   f->open_object_section(field.c_str());
   for (auto& p : by_val) {
     f->dump_int(p.first.c_str(), p.second);
@@ -4775,15 +5197,9 @@ void Monitor::count_metadata(const string& field, Formatter *f)
 
 int Monitor::print_nodes(Formatter *f, ostream& err)
 {
-  map<int, Metadata> metadata;
-  if (int r = load_metadata(metadata)) {
-    err << "Unable to load metadata.\n";
-    return r;
-  }
-
   map<string, list<int> > mons;        // hostname => mon
-  for (map<int, Metadata>::iterator it = metadata.begin();
-       it != metadata.end(); ++it) {
+  for (map<int, Metadata>::iterator it = mon_metadata.begin();
+       it != mon_metadata.end(); ++it) {
     const Metadata& m = it->second;
     Metadata::const_iterator hostname = m.find("hostname");
     if (hostname == m.end()) {
@@ -4950,14 +5366,16 @@ bool Monitor::_scrub(ScrubResult *r,
     }
 
     bufferlist bl;
-    //TODO: what when store->get returns error or empty bl?
-    store->get(k.first, k.second, bl);
+    int err = store->get(k.first, k.second, bl);
+    assert(err == 0);
+    
     uint32_t key_crc = bl.crc32c(0);
     dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
                                      << " crc " << key_crc << dendl;
     r->prefix_keys[k.first]++;
-    if (r->prefix_crc.count(k.first) == 0)
+    if (r->prefix_crc.count(k.first) == 0) {
       r->prefix_crc[k.first] = 0;
+    }
     r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
 
     if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
@@ -5000,7 +5418,7 @@ void Monitor::scrub_check_results()
     }
   }
   if (!errors)
-    clog->info() << "scrub ok on " << quorum << ": " << mine;
+    clog->debug() << "scrub ok on " << quorum << ": " << mine;
 }
 
 inline void Monitor::scrub_timeout()
@@ -5058,10 +5476,11 @@ void Monitor::scrub_event_start()
     return;
   }
 
-  scrub_event = new C_MonContext(this, [this](int) {
+  scrub_event = timer.add_event_after(
+    cct->_conf->mon_scrub_interval,
+    new C_MonContext(this, [this](int) {
       scrub_start();
-    });
-  timer.add_event_after(cct->_conf->mon_scrub_interval, scrub_event);
+      }));
 }
 
 void Monitor::scrub_event_cancel()
@@ -5085,11 +5504,11 @@ void Monitor::scrub_reset_timeout()
 {
   dout(15) << __func__ << " reset timeout event" << dendl;
   scrub_cancel_timeout();
-
-  scrub_timeout_event = new C_MonContext(this, [this](int) {
+  scrub_timeout_event = timer.add_event_after(
+    g_conf->mon_scrub_timeout,
+    new C_MonContext(this, [this](int) {
       scrub_timeout();
-    });
-  timer.add_event_after(g_conf->mon_scrub_timeout, scrub_timeout_event);
+    }));
 }
 
 /************ TICK ***************/
@@ -5104,14 +5523,49 @@ void Monitor::tick()
 {
   // ok go.
   dout(11) << "tick" << dendl;
+  const utime_t now = ceph_clock_now();
   
+  // Check if we need to emit any delayed health check updated messages
+  if (is_leader()) {
+    const auto min_period = g_conf->get_val<int64_t>(
+                              "mon_health_log_update_period");
+    for (auto& svc : paxos_service) {
+      auto health = svc->get_health_checks();
+
+      for (const auto &i : health.checks) {
+        const std::string &code = i.first;
+        const std::string &summary = i.second.summary;
+        const health_status_t severity = i.second.severity;
+
+        auto status_iter = health_check_log_times.find(code);
+        if (status_iter == health_check_log_times.end()) {
+          continue;
+        }
+
+        auto &log_status = status_iter->second;
+        bool const changed = log_status.last_message != summary
+                             || log_status.severity != severity;
+
+        if (changed && now - log_status.updated_at > min_period) {
+          log_status.last_message = summary;
+          log_status.updated_at = now;
+          log_status.severity = severity;
+
+          ostringstream ss;
+          ss << "Health check update: " << summary << " (" << code << ")";
+          clog->health(severity) << ss.str();
+        }
+      }
+    }
+  }
+
+
   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
     (*p)->tick();
     (*p)->maybe_trim();
   }
   
   // trim sessions
-  utime_t now = ceph_clock_now();
   {
     Mutex::Locker l(session_map_lock);
     auto p = session_map.sessions.begin();
@@ -5444,7 +5898,8 @@ bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
 bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
                                   int protocol, bufferlist& authorizer_data,
                                   bufferlist& authorizer_reply,
-                                  bool& isvalid, CryptoKey& session_key)
+                                  bool& isvalid, CryptoKey& session_key,
+                                  std::unique_ptr<AuthAuthorizerChallenge> *challenge)
 {
   dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
           << " " << ceph_entity_type_name(peer_type)
@@ -5463,7 +5918,7 @@ bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
       
       if (authorizer_data.length()) {
        bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
-                                         auth_ticket_info, authorizer_reply);
+                                          auth_ticket_info, challenge, authorizer_reply);
        if (ret) {
          session_key = auth_ticket_info.session_key;
          isvalid = true;