]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/OSDMonitor.cc
update sources to v12.1.0
[ceph.git] / ceph / src / mon / OSDMonitor.cc
index 9b82b4a9f993975b2270c4b964f857a504aa1129..ddfeb2a2933824e94601fc17181314a5edce8a88 100644 (file)
 #include <algorithm>
 #include <sstream>
 
-#include "OSDMonitor.h"
-#include "Monitor.h"
-#include "MDSMonitor.h"
-#include "PGMonitor.h"
+#include "mon/OSDMonitor.h"
+#include "mon/Monitor.h"
+#include "mon/MDSMonitor.h"
+#include "mon/PGMonitor.h"
+#include "mon/MgrStatMonitor.h"
+#include "mon/AuthMonitor.h"
+#include "mon/ConfigKeyService.h"
 
-#include "MonitorDBStore.h"
-#include "Session.h"
+#include "mon/MonitorDBStore.h"
+#include "mon/Session.h"
 
 #include "crush/CrushWrapper.h"
 #include "crush/CrushTester.h"
@@ -220,17 +223,23 @@ void OSDMonitor::create_initial()
   newmap.set_flag(CEPH_OSDMAP_SORTBITWISE);
 
   // new cluster should require latest by default
-  newmap.set_flag(CEPH_OSDMAP_REQUIRE_JEWEL);
-  newmap.set_flag(CEPH_OSDMAP_REQUIRE_KRAKEN);
-  if (!g_conf->mon_debug_no_require_luminous) {
-    newmap.set_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+  if (g_conf->mon_debug_no_require_luminous) {
+    newmap.require_osd_release = CEPH_RELEASE_KRAKEN;
+    derr << __func__ << " mon_debug_no_require_luminous=true" << dendl;
+  } else {
+    newmap.require_osd_release = CEPH_RELEASE_LUMINOUS;
     newmap.full_ratio = g_conf->mon_osd_full_ratio;
     if (newmap.full_ratio > 1.0) newmap.full_ratio /= 100;
     newmap.backfillfull_ratio = g_conf->mon_osd_backfillfull_ratio;
     if (newmap.backfillfull_ratio > 1.0) newmap.backfillfull_ratio /= 100;
     newmap.nearfull_ratio = g_conf->mon_osd_nearfull_ratio;
     if (newmap.nearfull_ratio > 1.0) newmap.nearfull_ratio /= 100;
-    newmap.require_min_compat_client = g_conf->mon_osd_initial_require_min_compat_client;
+    int r = ceph_release_from_name(
+      g_conf->mon_osd_initial_require_min_compat_client.c_str());
+    if (r <= 0) {
+      assert(0 == "mon_osd_initial_require_min_compat_client is not valid");
+    }
+    newmap.require_min_compat_client = r;
   }
 
   // encode into pending incremental
@@ -256,6 +265,15 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
   dout(15) << "update_from_paxos paxos e " << version
           << ", my e " << osdmap.epoch << dendl;
 
+  if (mapping_job) {
+    if (!mapping_job->is_done()) {
+      dout(1) << __func__ << " mapping job "
+             << mapping_job.get() << " did not complete, "
+             << mapping_job->shards << " left, canceling" << dendl;
+      mapping_job->abort();
+    }
+    mapping_job.reset();
+  }
 
   /*
    * We will possibly have a stashed latest that *we* wrote, and we will
@@ -325,6 +343,15 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
     }
   }
 
+  // make sure we're using the right pg service.. remove me post-luminous!
+  if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+    dout(10) << __func__ << " pgservice is mgrstat" << dendl;
+    mon->pgservice = mon->mgrstatmon()->get_pg_stat_service();
+  } else {
+    dout(10) << __func__ << " pgservice is pg" << dendl;
+    mon->pgservice = mon->pgmon()->get_pg_stat_service();
+  }
+
   // walk through incrementals
   MonitorDBStore::TransactionRef t;
   size_t tx_size = 0;
@@ -386,6 +413,15 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
       t->erase("mkfs", "osdmap");
     }
 
+    // make sure we're using the right pg service.. remove me post-luminous!
+    if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      dout(10) << __func__ << " pgservice is mgrstat" << dendl;
+      mon->pgservice = mon->mgrstatmon()->get_pg_stat_service();
+    } else {
+      dout(10) << __func__ << " pgservice is pg" << dendl;
+      mon->pgservice = mon->pgmon()->get_pg_stat_service();
+    }
+
     if (tx_size > g_conf->mon_sync_max_payload_size*2) {
       mon->store->apply_transaction(t);
       t = MonitorDBStore::TransactionRef();
@@ -393,7 +429,6 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
     }
     if (mon->monmap->get_required_features().contains_all(
           ceph::features::mon::FEATURE_LUMINOUS)) {
-      creating_pgs = update_pending_pgs(inc);
       for (const auto &osd_state : inc.new_state) {
        if (osd_state.second & CEPH_OSD_UP) {
          // could be marked up *or* down, but we're too lazy to check which
@@ -509,6 +544,23 @@ void OSDMonitor::on_active()
 void OSDMonitor::on_restart()
 {
   last_osd_report.clear();
+
+  if (mon->is_leader()) {
+    // fix ruleset != ruleid
+    if (osdmap.crush->has_legacy_rulesets() &&
+       !osdmap.crush->has_multirule_rulesets()) {
+      CrushWrapper newcrush;
+      _get_pending_crush(newcrush);
+      int r = newcrush.renumber_rules_by_ruleset();
+      if (r >= 0) {
+       dout(1) << __func__ << " crush map has ruleset != rule id; fixing" << dendl;
+       pending_inc.crush.clear();
+       newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+      } else {
+       dout(10) << __func__ << " unable to renumber rules by ruleset" << dendl;
+      }
+    }
+  }
 }
 
 void OSDMonitor::on_shutdown()
@@ -536,326 +588,6 @@ void OSDMonitor::update_logger()
   mon->cluster_logger->set(l_cluster_osd_epoch, osdmap.get_epoch());
 }
 
-template <typename F>
-class OSDUtilizationDumper : public CrushTreeDumper::Dumper<F> {
-public:
-  typedef CrushTreeDumper::Dumper<F> Parent;
-
-  OSDUtilizationDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
-                      const PGMap *pgm_, bool tree_) :
-    Parent(crush),
-    osdmap(osdmap_),
-    pgm(pgm_),
-    tree(tree_),
-    average_util(average_utilization()),
-    min_var(-1),
-    max_var(-1),
-    stddev(0),
-    sum(0) {
-  }
-
-protected:
-  void dump_stray(F *f) {
-    for (int i = 0; i < osdmap->get_max_osd(); i++) {
-      if (osdmap->exists(i) && !this->is_touched(i))
-       dump_item(CrushTreeDumper::Item(i, 0, 0), f);
-    }
-  }
-
-  void dump_item(const CrushTreeDumper::Item &qi, F *f) override {
-    if (!tree && qi.is_bucket())
-      return;
-
-    float reweight = qi.is_bucket() ? -1 : osdmap->get_weightf(qi.id);
-    int64_t kb = 0, kb_used = 0, kb_avail = 0;
-    double util = 0;
-    if (get_bucket_utilization(qi.id, &kb, &kb_used, &kb_avail))
-      if (kb_used && kb)
-        util = 100.0 * (double)kb_used / (double)kb;
-
-    double var = 1.0;
-    if (average_util)
-      var = util / average_util;
-
-    size_t num_pgs = qi.is_bucket() ? 0 : pgm->get_num_pg_by_osd(qi.id);
-
-    dump_item(qi, reweight, kb, kb_used, kb_avail, util, var, num_pgs, f);
-
-    if (!qi.is_bucket() && reweight > 0) {
-      if (min_var < 0 || var < min_var)
-       min_var = var;
-      if (max_var < 0 || var > max_var)
-       max_var = var;
-
-      double dev = util - average_util;
-      dev *= dev;
-      stddev += reweight * dev;
-      sum += reweight;
-    }
-  }
-
-  virtual void dump_item(const CrushTreeDumper::Item &qi,
-                        float &reweight,
-                        int64_t kb,
-                        int64_t kb_used,
-                        int64_t kb_avail,
-                        double& util,
-                        double& var,
-                        const size_t num_pgs,
-                        F *f) = 0;
-
-  double dev() {
-    return sum > 0 ? sqrt(stddev / sum) : 0;
-  }
-
-  double average_utilization() {
-    int64_t kb = 0, kb_used = 0;
-    for (int i = 0; i < osdmap->get_max_osd(); i++) {
-      if (!osdmap->exists(i) || osdmap->get_weight(i) == 0)
-       continue;
-      int64_t kb_i, kb_used_i, kb_avail_i;
-      if (get_osd_utilization(i, &kb_i, &kb_used_i, &kb_avail_i)) {
-       kb += kb_i;
-       kb_used += kb_used_i;
-      }
-    }
-    return kb > 0 ? 100.0 * (double)kb_used / (double)kb : 0;
-  }
-
-  bool get_osd_utilization(int id, int64_t* kb, int64_t* kb_used,
-                          int64_t* kb_avail) const {
-    typedef ceph::unordered_map<int32_t,osd_stat_t> OsdStat;
-    OsdStat::const_iterator p = pgm->osd_stat.find(id);
-    if (p == pgm->osd_stat.end())
-      return false;
-    *kb = p->second.kb;
-    *kb_used = p->second.kb_used;
-    *kb_avail = p->second.kb_avail;
-    return *kb > 0;
-  }
-
-  bool get_bucket_utilization(int id, int64_t* kb, int64_t* kb_used,
-                             int64_t* kb_avail) const {
-    if (id >= 0) {
-      if (osdmap->is_out(id)) {
-        *kb = 0;
-        *kb_used = 0;
-        *kb_avail = 0;
-        return true;
-      }
-      return get_osd_utilization(id, kb, kb_used, kb_avail);
-    }
-
-    *kb = 0;
-    *kb_used = 0;
-    *kb_avail = 0;
-
-    for (int k = osdmap->crush->get_bucket_size(id) - 1; k >= 0; k--) {
-      int item = osdmap->crush->get_bucket_item(id, k);
-      int64_t kb_i = 0, kb_used_i = 0, kb_avail_i = 0;
-      if (!get_bucket_utilization(item, &kb_i, &kb_used_i, &kb_avail_i))
-       return false;
-      *kb += kb_i;
-      *kb_used += kb_used_i;
-      *kb_avail += kb_avail_i;
-    }
-    return *kb > 0;
-  }
-
-protected:
-  const OSDMap *osdmap;
-  const PGMap *pgm;
-  bool tree;
-  double average_util;
-  double min_var;
-  double max_var;
-  double stddev;
-  double sum;
-};
-
-class OSDUtilizationPlainDumper : public OSDUtilizationDumper<TextTable> {
-public:
-  typedef OSDUtilizationDumper<TextTable> Parent;
-
-  OSDUtilizationPlainDumper(const CrushWrapper *crush, const OSDMap *osdmap,
-                    const PGMap *pgm, bool tree) :
-    Parent(crush, osdmap, pgm, tree) {}
-
-  void dump(TextTable *tbl) {
-    tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("SIZE", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("USE", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("AVAIL", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("%USE", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("VAR", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("PGS", TextTable::LEFT, TextTable::RIGHT);
-    if (tree)
-      tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
-
-    Parent::dump(tbl);
-
-    dump_stray(tbl);
-
-    *tbl << "" << "" << "TOTAL"
-        << si_t(pgm->osd_sum.kb << 10)
-        << si_t(pgm->osd_sum.kb_used << 10)
-        << si_t(pgm->osd_sum.kb_avail << 10)
-        << lowprecision_t(average_util)
-        << ""
-        << TextTable::endrow;
-  }
-
-protected:
-  struct lowprecision_t {
-    float v;
-    explicit lowprecision_t(float _v) : v(_v) {}
-  };
-  friend std::ostream &operator<<(ostream& out, const lowprecision_t& v);
-
-  using OSDUtilizationDumper<TextTable>::dump_item;
-  void dump_item(const CrushTreeDumper::Item &qi,
-                        float &reweight,
-                        int64_t kb,
-                        int64_t kb_used,
-                        int64_t kb_avail,
-                        double& util,
-                        double& var,
-                        const size_t num_pgs,
-                        TextTable *tbl) override {
-    *tbl << qi.id
-        << weightf_t(qi.weight)
-        << weightf_t(reweight)
-        << si_t(kb << 10)
-        << si_t(kb_used << 10)
-        << si_t(kb_avail << 10)
-        << lowprecision_t(util)
-        << lowprecision_t(var);
-
-    if (qi.is_bucket()) {
-      *tbl << "-";
-    } else {
-      *tbl << num_pgs;
-    }
-
-    if (tree) {
-      ostringstream name;
-      for (int k = 0; k < qi.depth; k++)
-       name << "    ";
-      if (qi.is_bucket()) {
-       int type = crush->get_bucket_type(qi.id);
-       name << crush->get_type_name(type) << " "
-            << crush->get_item_name(qi.id);
-      } else {
-       name << "osd." << qi.id;
-      }
-      *tbl << name.str();
-    }
-
-    *tbl << TextTable::endrow;
-  }
-
-public:
-  string summary() {
-    ostringstream out;
-    out << "MIN/MAX VAR: " << lowprecision_t(min_var)
-       << "/" << lowprecision_t(max_var) << "  "
-       << "STDDEV: " << lowprecision_t(dev());
-    return out.str();
-  }
-};
-
-ostream& operator<<(ostream& out,
-                   const OSDUtilizationPlainDumper::lowprecision_t& v)
-{
-  if (v.v < -0.01) {
-    return out << "-";
-  } else if (v.v < 0.001) {
-    return out << "0";
-  } else {
-    std::streamsize p = out.precision();
-    return out << std::fixed << std::setprecision(2) << v.v << std::setprecision(p);
-  }
-}
-
-class OSDUtilizationFormatDumper : public OSDUtilizationDumper<Formatter> {
-public:
-  typedef OSDUtilizationDumper<Formatter> Parent;
-
-  OSDUtilizationFormatDumper(const CrushWrapper *crush, const OSDMap *osdmap,
-                            const PGMap *pgm, bool tree) :
-    Parent(crush, osdmap, pgm, tree) {}
-
-  void dump(Formatter *f) {
-    f->open_array_section("nodes");
-    Parent::dump(f);
-    f->close_section();
-
-    f->open_array_section("stray");
-    dump_stray(f);
-    f->close_section();
-  }
-
-protected:
-  using OSDUtilizationDumper<Formatter>::dump_item;
-  void dump_item(const CrushTreeDumper::Item &qi,
-                        float &reweight,
-                        int64_t kb,
-                        int64_t kb_used,
-                        int64_t kb_avail,
-                        double& util,
-                        double& var,
-                        const size_t num_pgs,
-                        Formatter *f) override {
-    f->open_object_section("item");
-    CrushTreeDumper::dump_item_fields(crush, qi, f);
-    f->dump_float("reweight", reweight);
-    f->dump_int("kb", kb);
-    f->dump_int("kb_used", kb_used);
-    f->dump_int("kb_avail", kb_avail);
-    f->dump_float("utilization", util);
-    f->dump_float("var", var);
-    f->dump_unsigned("pgs", num_pgs);
-    CrushTreeDumper::dump_bucket_children(crush, qi, f);
-    f->close_section();
-  }
-
-public:
-  void summary(Formatter *f) {
-    f->open_object_section("summary");
-    f->dump_int("total_kb", pgm->osd_sum.kb);
-    f->dump_int("total_kb_used", pgm->osd_sum.kb_used);
-    f->dump_int("total_kb_avail", pgm->osd_sum.kb_avail);
-    f->dump_float("average_utilization", average_util);
-    f->dump_float("min_var", min_var);
-    f->dump_float("max_var", max_var);
-    f->dump_float("dev", dev());
-    f->close_section();
-  }
-};
-
-void OSDMonitor::print_utilization(ostream &out, Formatter *f, bool tree) const
-{
-  const PGMap *pgm = &mon->pgmon()->pg_map;
-  const CrushWrapper *crush = osdmap.crush.get();
-
-  if (f) {
-    f->open_object_section("df");
-    OSDUtilizationFormatDumper d(crush, &osdmap, pgm, tree);
-    d.dump(f);
-    d.summary(f);
-    f->close_section();
-    f->flush(out);
-  } else {
-    OSDUtilizationPlainDumper d(crush, &osdmap, pgm, tree);
-    TextTable tbl;
-    d.dump(&tbl);
-    out << tbl
-       << d.summary() << "\n";
-  }
-}
-
 void OSDMonitor::create_pending()
 {
   pending_inc = OSDMap::Incremental(osdmap.epoch+1);
@@ -876,18 +608,20 @@ void OSDMonitor::create_pending()
     dout(1) << __func__ << " setting backfillfull_ratio = "
            << pending_inc.new_backfillfull_ratio << dendl;
   }
-  if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+  if (osdmap.get_epoch() > 0 &&
+      osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
     // transition full ratios from PGMap to OSDMap (on upgrade)
-    PGMap *pg_map = &mon->pgmon()->pg_map;
-    if (osdmap.full_ratio != pg_map->full_ratio) {
+    float full_ratio = mon->pgservice->get_full_ratio();
+    float nearfull_ratio = mon->pgservice->get_nearfull_ratio();
+    if (osdmap.full_ratio != full_ratio) {
       dout(10) << __func__ << " full_ratio " << osdmap.full_ratio
-              << " -> " << pg_map->full_ratio << " (from pgmap)" << dendl;
-      pending_inc.new_full_ratio = pg_map->full_ratio;
+              << " -> " << full_ratio << " (from pgmap)" << dendl;
+      pending_inc.new_full_ratio = full_ratio;
     }
-    if (osdmap.nearfull_ratio != pg_map->nearfull_ratio) {
+    if (osdmap.nearfull_ratio != nearfull_ratio) {
       dout(10) << __func__ << " nearfull_ratio " << osdmap.nearfull_ratio
-              << " -> " << pg_map->nearfull_ratio << " (from pgmap)" << dendl;
-      pending_inc.new_nearfull_ratio = pg_map->nearfull_ratio;
+              << " -> " << nearfull_ratio << " (from pgmap)" << dendl;
+      pending_inc.new_nearfull_ratio = nearfull_ratio;
     }
   } else {
     // safety check (this shouldn't really happen)
@@ -911,51 +645,93 @@ void OSDMonitor::create_pending()
 creating_pgs_t
 OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc)
 {
+  dout(10) << __func__ << dendl;
   creating_pgs_t pending_creatings;
   {
     std::lock_guard<std::mutex> l(creating_pgs_lock);
     pending_creatings = creating_pgs;
   }
-  if (pending_creatings.last_scan_epoch > inc.epoch) {
-    return pending_creatings;
-  }
-  for (auto& pg : pending_created_pgs) {
-    pending_creatings.created_pools.insert(pg.pool());
-    pending_creatings.pgs.erase(pg);
-  }
-  pending_created_pgs.clear();
-  // PAXOS_PGMAP is less than PAXOS_OSDMAP, so PGMonitor::update_from_paxos()
-  // should have prepared the latest pgmap if any
-  const auto& pgm = mon->pgmon()->pg_map;
-  if (pgm.last_pg_scan >= creating_pgs.last_scan_epoch) {
-    // TODO: please stop updating pgmap with pgstats once the upgrade is completed
-    for (auto& pgid : pgm.creating_pgs) {
-      auto st = pgm.pg_stat.find(pgid);
-      assert(st != pgm.pg_stat.end());
-      auto created = make_pair(st->second.created, st->second.last_scrub_stamp);
-      // no need to add the pg, if it already exists in creating_pgs
-      pending_creatings.pgs.emplace(pgid, created);
-    }
-  }
-  for (auto old_pool : inc.old_pools) {
-    pending_creatings.created_pools.erase(old_pool);
-    const auto removed_pool = (uint64_t)old_pool;
-    auto first =
-      pending_creatings.pgs.lower_bound(pg_t{0, removed_pool});
-    auto last =
-      pending_creatings.pgs.lower_bound(pg_t{0, removed_pool + 1});
-    pending_creatings.pgs.erase(first, last);
-    last_epoch_clean.remove_pool(removed_pool);
-  }
-  scan_for_creating_pgs(osdmap.get_pools(),
-                       inc.old_pools,
-                       inc.modified,
-                       &pending_creatings);
-  scan_for_creating_pgs(inc.new_pools,
-                       inc.old_pools,
-                       inc.modified,
-                       &pending_creatings);
-  pending_creatings.last_scan_epoch = osdmap.get_epoch();
+  // check for new or old pools
+  if (pending_creatings.last_scan_epoch < inc.epoch) {
+    if (osdmap.get_epoch() &&
+       osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      auto added =
+       mon->pgservice->maybe_add_creating_pgs(creating_pgs.last_scan_epoch,
+                                              osdmap.get_pools(),
+                                              &pending_creatings);
+      dout(7) << __func__ << " " << added << " pgs added from pgmap" << dendl;
+    }
+    unsigned queued = 0;
+    queued += scan_for_creating_pgs(osdmap.get_pools(),
+                                   inc.old_pools,
+                                   inc.modified,
+                                   &pending_creatings);
+    queued += scan_for_creating_pgs(inc.new_pools,
+                                   inc.old_pools,
+                                   inc.modified,
+                                   &pending_creatings);
+    dout(10) << __func__ << " " << queued << " pools queued" << dendl;
+    for (auto deleted_pool : inc.old_pools) {
+      auto removed = pending_creatings.remove_pool(deleted_pool);
+      dout(10) << __func__ << " " << removed
+               << " pg removed because containing pool deleted: "
+               << deleted_pool << dendl;
+      last_epoch_clean.remove_pool(deleted_pool);
+    }
+    // pgmon updates its creating_pgs in check_osd_map() which is called by
+    // on_active() and check_osd_map() could be delayed if lease expires, so its
+    // creating_pgs could be stale in comparison with the one of osdmon. let's
+    // trim them here. otherwise, they will be added back after being erased.
+    unsigned removed = 0;
+    for (auto& pg : pending_created_pgs) {
+      dout(20) << __func__ << " noting created pg " << pg << dendl;
+      pending_creatings.created_pools.insert(pg.pool());
+      removed += pending_creatings.pgs.erase(pg);
+    }
+    pending_created_pgs.clear();
+    dout(10) << __func__ << " " << removed
+            << " pgs removed because they're created" << dendl;
+    pending_creatings.last_scan_epoch = osdmap.get_epoch();
+  }
+
+  // process queue
+  unsigned max = MAX(1, g_conf->mon_osd_max_creating_pgs);
+  const auto total = pending_creatings.pgs.size();
+  while (pending_creatings.pgs.size() < max &&
+        !pending_creatings.queue.empty()) {
+    auto p = pending_creatings.queue.begin();
+    int64_t poolid = p->first;
+    dout(10) << __func__ << " pool " << poolid
+            << " created " << p->second.created
+            << " modified " << p->second.modified
+            << " [" << p->second.start << "-" << p->second.end << ")"
+            << dendl;
+    int n = MIN(max - pending_creatings.pgs.size(),
+               p->second.end - p->second.start);
+    ps_t first = p->second.start;
+    ps_t end = first + n;
+    for (ps_t ps = first; ps < end; ++ps) {
+      const pg_t pgid{ps, static_cast<uint64_t>(poolid)};
+      // NOTE: use the *current* epoch as the PG creation epoch so that the
+      // OSD does not have to generate a long set of PastIntervals.
+      pending_creatings.pgs.emplace(pgid, make_pair(inc.epoch,
+                                                   p->second.modified));
+      dout(10) << __func__ << " adding " << pgid << dendl;
+    }
+    p->second.start = end;
+    if (p->second.done()) {
+      dout(10) << __func__ << " done with queue for " << poolid << dendl;
+      pending_creatings.queue.erase(p);
+    } else {
+      dout(10) << __func__ << " pool " << poolid
+              << " now [" << p->second.start << "-" << p->second.end << ")"
+              << dendl;
+    }
+  }
+  dout(10) << __func__ << " queue remaining: " << pending_creatings.queue.size()
+          << " pools" << dendl;
+  dout(10) << __func__ << " " << pending_creatings.pgs.size() - total
+          << " pgs added from queued pools" << dendl;
   return pending_creatings;
 }
 
@@ -974,7 +750,7 @@ void OSDMonitor::maybe_prime_pg_temp()
 
   // check for interesting OSDs
   set<int> osds;
-  for (map<int32_t,uint8_t>::iterator p = pending_inc.new_state.begin();
+  for (auto p = pending_inc.new_state.begin();
        !all && p != pending_inc.new_state.end();
        ++p) {
     if ((p->second & CEPH_OSD_UP) &&
@@ -1066,12 +842,12 @@ void OSDMonitor::prime_pg_temp(
 {
   if (mon->monmap->get_required_features().contains_all(
         ceph::features::mon::FEATURE_LUMINOUS)) {
+    // TODO: remove this creating_pgs direct access?
     if (creating_pgs.pgs.count(pgid)) {
       return;
     }
   } else {
-    const auto& pg_map = mon->pgmon()->pg_map;
-    if (pg_map.creating_pgs.count(pgid)) {
+    if (mon->pgservice->is_creating_pg(pgid)) {
       return;
     }
   }
@@ -1151,7 +927,7 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     tmp.deepish_copy_from(osdmap);
     tmp.apply_incremental(pending_inc);
 
-    if (tmp.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+    if (tmp.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
       // set or clear full/nearfull?
       int full, backfill, nearfull;
       tmp.count_full_nearfull_osds(&full, &backfill, &nearfull);
@@ -1180,19 +956,19 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
       }
 
       // min_compat_client?
-      if (tmp.require_min_compat_client.empty()) {
+      if (tmp.require_min_compat_client == 0) {
        auto mv = tmp.get_min_compat_client();
-       dout(1) << __func__ << " setting require_min_compat_client to current " << mv
-               << dendl;
-       mon->clog->info() << "setting require_min_compat_client to currently required "
-                         << mv;
-       pending_inc.new_require_min_compat_client = mv.first;
+       dout(1) << __func__ << " setting require_min_compat_client to currently "
+               << "required " << ceph_release_name(mv) << dendl;
+       mon->clog->info() << "setting require_min_compat_client to currently "
+                         << "required " << ceph_release_name(mv);
+       pending_inc.new_require_min_compat_client = mv;
       }
     }
   }
 
   // tell me about it
-  for (map<int32_t,uint8_t>::iterator i = pending_inc.new_state.begin();
+  for (auto i = pending_inc.new_state.begin();
        i != pending_inc.new_state.end();
        ++i) {
     int s = i->second ? i->second : CEPH_OSD_UP;
@@ -1229,21 +1005,21 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     tmp.apply_incremental(pending_inc);
 
     // determine appropriate features
-    if (!tmp.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+    if (tmp.require_osd_release < CEPH_RELEASE_LUMINOUS) {
       dout(10) << __func__ << " encoding without feature SERVER_LUMINOUS"
               << dendl;
       features &= ~CEPH_FEATURE_SERVER_LUMINOUS;
     }
-    if (!tmp.test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
-      dout(10) << __func__ << " encoding without feature SERVER_JEWEL" << dendl;
-      features &= ~CEPH_FEATURE_SERVER_JEWEL;
-    }
-    if (!tmp.test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+    if (tmp.require_osd_release < CEPH_RELEASE_KRAKEN) {
       dout(10) << __func__ << " encoding without feature SERVER_KRAKEN | "
               << "MSG_ADDR2" << dendl;
       features &= ~(CEPH_FEATURE_SERVER_KRAKEN |
                    CEPH_FEATURE_MSG_ADDR2);
     }
+    if (tmp.require_osd_release < CEPH_RELEASE_JEWEL) {
+      dout(10) << __func__ << " encoding without feature SERVER_JEWEL" << dendl;
+      features &= ~CEPH_FEATURE_SERVER_JEWEL;
+    }
     dout(10) << __func__ << " encoding full map with " << features << dendl;
 
     bufferlist fullbl;
@@ -1283,10 +1059,11 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   if (mon->monmap->get_required_features().contains_all(
        ceph::features::mon::FEATURE_LUMINOUS)) {
     auto pending_creatings = update_pending_pgs(pending_inc);
-    if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+    if (osdmap.get_epoch() &&
+       osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
       dout(7) << __func__ << " in the middle of upgrading, "
              << " trimming pending creating_pgs using pgmap" << dendl;
-      trim_creating_pgs(&pending_creatings, mon->pgmon()->pg_map);
+      mon->pgservice->maybe_trim_creating_pgs(&pending_creatings);
     }
     bufferlist creatings_bl;
     ::encode(pending_creatings, creatings_bl);
@@ -1295,17 +1072,16 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
 }
 
 void OSDMonitor::trim_creating_pgs(creating_pgs_t* creating_pgs,
-                                  const PGMap& pgm)
+                                  const ceph::unordered_map<pg_t,pg_stat_t>& pg_stat)
 {
   auto p = creating_pgs->pgs.begin();
   while (p != creating_pgs->pgs.end()) {
-    auto q = pgm.pg_stat.find(p->first);
-    if (q != pgm.pg_stat.end() &&
+    auto q = pg_stat.find(p->first);
+    if (q != pg_stat.end() &&
        !(q->second.state & PG_STATE_CREATING)) {
       dout(20) << __func__ << " pgmap shows " << p->first << " is created"
               << dendl;
       p = creating_pgs->pgs.erase(p);
-      creating_pgs->created_pools.insert(q->first.pool());
     } else {
       ++p;
     }
@@ -1330,6 +1106,28 @@ int OSDMonitor::load_metadata(int osd, map<string, string>& m, ostream *err)
   return 0;
 }
 
+void OSDMonitor::count_metadata(const string& field, Formatter *f)
+{
+  map<string,int> by_val;
+  for (int osd = 0; osd < osdmap.get_max_osd(); ++osd) {
+    if (osdmap.is_up(osd)) {
+      map<string,string> meta;
+      load_metadata(osd, meta, nullptr);
+      auto p = meta.find(field);
+      if (p == meta.end()) {
+       by_val["unknown"]++;
+      } else {
+       by_val[p->second]++;
+      }
+    }
+  }
+  f->open_object_section(field.c_str());
+  for (auto& p : by_val) {
+    f->dump_int(p.first.c_str(), p.second);
+  }
+  f->close_section();
+}
+
 int OSDMonitor::get_osd_objectstore_type(int osd, string *type)
 {
   map<string, string> metadata;
@@ -1425,11 +1223,16 @@ void OSDMonitor::share_map_with_random_osd()
 
 version_t OSDMonitor::get_trim_to()
 {
-  epoch_t floor;
+  if (mon->get_quorum().empty()) {
+    dout(10) << __func__ << ": quorum not formed" << dendl;
+    return 0;
+  }
 
+  epoch_t floor;
   if (mon->monmap->get_required_features().contains_all(
         ceph::features::mon::FEATURE_LUMINOUS)) {
     {
+      // TODO: Get this hidden in PGStatService
       std::lock_guard<std::mutex> l(creating_pgs_lock);
       if (!creating_pgs.pgs.empty()) {
        return 0;
@@ -1437,12 +1240,12 @@ version_t OSDMonitor::get_trim_to()
     }
     floor = get_min_last_epoch_clean();
   } else {
-    if (!mon->pgmon()->is_readable())
+    if (!mon->pgservice->is_readable())
       return 0;
-    if (mon->pgmon()->pg_map.creating_pgs.empty()) {
+    if (mon->pgservice->have_creating_pgs()) {
       return 0;
     }
-    floor = mon->pgmon()->pg_map.get_min_last_epoch_clean();
+    floor = mon->pgservice->get_min_last_epoch_clean();
   }
   {
     dout(10) << " min_last_epoch_clean " << floor << dendl;
@@ -1807,18 +1610,26 @@ bool OSDMonitor::prepare_mark_me_down(MonOpRequestRef op)
 bool OSDMonitor::can_mark_down(int i)
 {
   if (osdmap.test_flag(CEPH_OSDMAP_NODOWN)) {
-    dout(5) << "can_mark_down NODOWN flag set, will not mark osd." << i << " down" << dendl;
+    dout(5) << __func__ << " NODOWN flag set, will not mark osd." << i
+            << " down" << dendl;
+    return false;
+  }
+
+  if (osdmap.is_nodown(i)) {
+    dout(5) << __func__ << " osd." << i << " is marked as nodown, "
+            << "will not mark it down" << dendl;
     return false;
   }
+
   int num_osds = osdmap.get_num_osds();
   if (num_osds == 0) {
-    dout(5) << "can_mark_down no osds" << dendl;
+    dout(5) << __func__ << " no osds" << dendl;
     return false;
   }
   int up = osdmap.get_num_up_osds() - pending_inc.get_net_marked_down(&osdmap);
   float up_ratio = (float)up / (float)num_osds;
   if (up_ratio < g_conf->mon_osd_min_up_ratio) {
-    dout(2) << "can_mark_down current up_ratio " << up_ratio << " < min "
+    dout(2) << __func__ << " current up_ratio " << up_ratio << " < min "
            << g_conf->mon_osd_min_up_ratio
            << ", will not mark osd." << i << " down" << dendl;
     return false;
@@ -1829,9 +1640,17 @@ bool OSDMonitor::can_mark_down(int i)
 bool OSDMonitor::can_mark_up(int i)
 {
   if (osdmap.test_flag(CEPH_OSDMAP_NOUP)) {
-    dout(5) << "can_mark_up NOUP flag set, will not mark osd." << i << " up" << dendl;
+    dout(5) << __func__ << " NOUP flag set, will not mark osd." << i
+            << " up" << dendl;
+    return false;
+  }
+
+  if (osdmap.is_noup(i)) {
+    dout(5) << __func__ << " osd." << i << " is marked as noup, "
+            << "will not mark it up" << dendl;
     return false;
   }
+
   return true;
 }
 
@@ -1845,6 +1664,13 @@ bool OSDMonitor::can_mark_out(int i)
     dout(5) << __func__ << " NOOUT flag set, will not mark osds out" << dendl;
     return false;
   }
+
+  if (osdmap.is_noout(i)) {
+    dout(5) << __func__ << " osd." << i << " is marked as noout, "
+            << "will not mark it out" << dendl;
+    return false;
+  }
+
   int num_osds = osdmap.get_num_osds();
   if (num_osds == 0) {
     dout(5) << __func__ << " no osds" << dendl;
@@ -1870,9 +1696,17 @@ bool OSDMonitor::can_mark_out(int i)
 bool OSDMonitor::can_mark_in(int i)
 {
   if (osdmap.test_flag(CEPH_OSDMAP_NOIN)) {
-    dout(5) << "can_mark_in NOIN flag set, will not mark osd." << i << " in" << dendl;
+    dout(5) << __func__ << " NOIN flag set, will not mark osd." << i
+            << " in" << dendl;
+    return false;
+  }
+
+  if (osdmap.is_noin(i)) {
+    dout(5) << __func__ << " osd." << i << " is marked as noin, "
+            << "will not mark it in" << dendl;
     return false;
   }
+
   return true;
 }
 
@@ -1964,8 +1798,12 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
            << " down" << dendl;
     pending_inc.new_state[target_osd] = CEPH_OSD_UP;
 
-    mon->clog->info() << osdmap.get_inst(target_osd) << " failed ("
-                     << (int)reporters_by_subtree.size() << " reporters from different "
+    mon->clog->info() << "osd." << target_osd << " failed ("
+                     << osdmap.crush->get_full_location_ordered_string(
+                       target_osd)
+                     << ") ("
+                     << (int)reporters_by_subtree.size()
+                     << " reporters from different "
                      << reporter_subtree_level << " after "
                      << failed_for << " >= grace " << grace << ")";
     return true;
@@ -1973,7 +1811,7 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
   return false;
 }
 
-void OSDMonitor::force_failure(utime_t now, int target_osd)
+void OSDMonitor::force_failure(utime_t now, int target_osd, int by)
 {
   // already pending failure?
   if (pending_inc.new_state.count(target_osd) &&
@@ -1985,7 +1823,9 @@ void OSDMonitor::force_failure(utime_t now, int target_osd)
   dout(1) << " we're forcing failure of osd." << target_osd << dendl;
   pending_inc.new_state[target_osd] = CEPH_OSD_UP;
 
-  mon->clog->info() << osdmap.get_inst(target_osd) << " failed (forced)";
+  mon->clog->info() << "osd." << target_osd << " failed ("
+                   << osdmap.crush->get_full_location_ordered_string(target_osd)
+                   << ") (connection refused reported by osd." << by << ")";
   return;
 }
 
@@ -2012,7 +1852,7 @@ bool OSDMonitor::prepare_failure(MonOpRequestRef op)
     if (m->is_immediate()) {
       mon->clog->debug() << m->get_target() << " reported immediately failed by "
             << m->get_orig_source_inst();
-      force_failure(now, target_osd);
+      force_failure(now, target_osd, reporter);
       return true;
     }
     mon->clog->debug() << m->get_target() << " reported failed by "
@@ -2149,7 +1989,7 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
     goto ignore;
   }
 
-  if (osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS) &&
+  if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
       !HAVE_FEATURE(m->osd_features, SERVER_LUMINOUS)) {
     mon->clog->info() << "disallowing boot of OSD "
                      << m->get_orig_source_inst()
@@ -2159,7 +1999,7 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
     goto ignore;
   }
 
-  if (osdmap.test_flag(CEPH_OSDMAP_REQUIRE_JEWEL) &&
+  if (osdmap.require_osd_release >= CEPH_RELEASE_JEWEL &&
       !(m->osd_features & CEPH_FEATURE_SERVER_JEWEL)) {
     mon->clog->info() << "disallowing boot of OSD "
                      << m->get_orig_source_inst()
@@ -2169,7 +2009,7 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
     goto ignore;
   }
 
-  if (osdmap.test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN) &&
+  if (osdmap.require_osd_release >= CEPH_RELEASE_KRAKEN &&
       !HAVE_FEATURE(m->osd_features, SERVER_KRAKEN)) {
     mon->clog->info() << "disallowing boot of OSD "
                      << m->get_orig_source_inst()
@@ -2203,19 +2043,19 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
 
   // make sure upgrades stop at luminous
   if (HAVE_FEATURE(m->osd_features, SERVER_M) &&
-      !osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+      osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
     mon->clog->info() << "disallowing boot of post-luminous OSD "
                      << m->get_orig_source_inst()
-                     << " because require_luminous_osds is not set";
+                     << " because require_osd_release < luminous";
     goto ignore;
   }
 
   // make sure upgrades stop at jewel
   if (HAVE_FEATURE(m->osd_features, SERVER_KRAKEN) &&
-      !osdmap.test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+      osdmap.require_osd_release < CEPH_RELEASE_JEWEL) {
     mon->clog->info() << "disallowing boot of post-jewel OSD "
                      << m->get_orig_source_inst()
-                     << " because require_jewel_osds is not set";
+                     << " because require_osd_release < jewel";
     goto ignore;
   }
 
@@ -2362,6 +2202,7 @@ bool OSDMonitor::prepare_boot(MonOpRequestRef op)
     bufferlist osd_metadata;
     ::encode(m->metadata, osd_metadata);
     pending_metadata[from] = osd_metadata;
+    pending_metadata_rm.erase(from);
 
     // adjust last clean unmount epoch?
     const osd_info_t& info = osdmap.get_info(from);
@@ -2674,7 +2515,7 @@ bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op)
 
   for (auto p = m->pg_temp.begin(); p != m->pg_temp.end(); ++p) {
     dout(20) << " " << p->first
-            << (osdmap.pg_temp->count(p->first) ? (*osdmap.pg_temp)[p->first] : empty)
+            << (osdmap.pg_temp->count(p->first) ? osdmap.pg_temp->get(p->first) : empty)
              << " -> " << p->second << dendl;
 
     // does the pool exist?
@@ -2718,7 +2559,7 @@ bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op)
     //        an existing pg_primary field to imply a change
     if (p->second.size() &&
        (osdmap.pg_temp->count(p->first) == 0 ||
-        !vectors_equal((*osdmap.pg_temp)[p->first], p->second) ||
+        !vectors_equal(osdmap.pg_temp->get(p->first), p->second) ||
         osdmap.primary_temp->count(p->first)))
       return false;
   }
@@ -3007,7 +2848,7 @@ void OSDMonitor::send_incremental(epoch_t first,
          << " to " << session->inst << dendl;
 
   if (first <= session->osd_epoch) {
-    dout(10) << __func__ << session->inst << " should already have epoch "
+    dout(10) << __func__ << " " << session->inst << " should already have epoch "
             << session->osd_epoch << dendl;
     first = session->osd_epoch + 1;
   }
@@ -3159,16 +3000,17 @@ void OSDMonitor::check_pg_creates_sub(Subscription *sub)
   }
 }
 
-void OSDMonitor::scan_for_creating_pgs(
+unsigned OSDMonitor::scan_for_creating_pgs(
   const mempool::osdmap::map<int64_t,pg_pool_t>& pools,
   const mempool::osdmap::set<int64_t>& removed_pools,
   utime_t modified,
   creating_pgs_t* creating_pgs) const
 {
+  unsigned queued = 0;
   for (auto& p : pools) {
     int64_t poolid = p.first;
     const pg_pool_t& pool = p.second;
-    int ruleno = osdmap.crush->find_rule(pool.get_crush_ruleset(),
+    int ruleno = osdmap.crush->find_rule(pool.get_crush_rule(),
                                         pool.get_type(), pool.get_size());
     if (ruleno < 0 || !osdmap.crush->rule_exists(ruleno))
       continue;
@@ -3185,34 +3027,27 @@ void OSDMonitor::scan_for_creating_pgs(
               << " " << pool << dendl;
       continue;
     }
-    dout(10) << __func__ << " scanning pool " << poolid
+    dout(10) << __func__ << " queueing pool create for " << poolid
             << " " << pool << dendl;
-    if (creating_pgs->created_pools.count(poolid)) {
-      // split pgs are skipped by OSD, so drop it early.
-      continue;
-    }
-    // first pgs in this pool
-    for (ps_t ps = 0; ps < pool.get_pg_num(); ps++) {
-      const pg_t pgid{ps, static_cast<uint64_t>(poolid)};
-      if (creating_pgs->pgs.count(pgid)) {
-       dout(20) << __func__ << " already have " << pgid << dendl;
-       continue;
-      }
-      creating_pgs->pgs.emplace(pgid, make_pair(created, modified));
-      dout(10) << __func__ << " adding " << pgid
-              << " at " << osdmap.get_epoch() << dendl;
+    if (creating_pgs->create_pool(poolid, pool.get_pg_num(),
+                                 created, modified)) {
+      queued++;
     }
   }
+  return queued;
 }
 
 void OSDMonitor::update_creating_pgs()
 {
+  dout(10) << __func__ << " " << creating_pgs.pgs.size() << " pgs creating, "
+          << creating_pgs.queue.size() << " pools in queue" << dendl;
   decltype(creating_pgs_by_osd_epoch) new_pgs_by_osd_epoch;
   std::lock_guard<std::mutex> l(creating_pgs_lock);
   for (auto& pg : creating_pgs.pgs) {
     int acting_primary = -1;
     auto pgid = pg.first;
     auto mapped = pg.second.first;
+    dout(20) << __func__ << " looking up " << pgid << dendl;
     mapping.get(pgid, nullptr, nullptr, nullptr, &acting_primary);
     // check the previous creating_pgs, look for the target to whom the pg was
     // previously mapped
@@ -3230,7 +3065,10 @@ void OSDMonitor::update_creating_pgs()
            mapped = mapping.get_epoch();
           }
           break;
-        }
+        } else {
+         // newly creating
+         mapped = mapping.get_epoch();
+       }
       }
     }
     dout(10) << __func__ << " will instruct osd." << acting_primary
@@ -3296,7 +3134,7 @@ void OSDMonitor::tick()
   bool do_propose = false;
   utime_t now = ceph_clock_now();
 
-  if (osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS) &&
+  if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
       mon->monmap->get_required_features().contains_all(
        ceph::features::mon::FEATURE_LUMINOUS)) {
     if (handle_osd_timeouts(now, last_osd_report)) {
@@ -3399,10 +3237,10 @@ void OSDMonitor::tick()
   }
 
   // if map full setting has changed, get that info out there!
-  if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS) &&
-      mon->pgmon()->is_readable()) {
+  if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS &&
+      mon->pgservice->is_readable()) {
     // for pre-luminous compat only!
-    if (!mon->pgmon()->pg_map.full_osds.empty()) {
+    if (mon->pgservice->have_full_osds()) {
       dout(5) << "There are full osds, setting full flag" << dendl;
       add_flag(CEPH_OSDMAP_FULL);
     } else if (osdmap.test_flag(CEPH_OSDMAP_FULL)){
@@ -3410,7 +3248,7 @@ void OSDMonitor::tick()
       remove_flag(CEPH_OSDMAP_FULL);
     }
 
-    if (!mon->pgmon()->pg_map.nearfull_osds.empty()) {
+    if (mon->pgservice->have_nearfull_osds()) {
       dout(5) << "There are near full osds, setting nearfull flag" << dendl;
       add_flag(CEPH_OSDMAP_NEARFULL);
     } else if (osdmap.test_flag(CEPH_OSDMAP_NEARFULL)){
@@ -3458,9 +3296,10 @@ bool OSDMonitor::handle_osd_timeouts(const utime_t &now,
     } else if (can_mark_down(i)) {
       utime_t diff = now - t->second;
       if (diff > timeo) {
-       mon->clog->info() << "osd." << i << " marked down after no pg stats for " << diff << "seconds";
-       derr << "no osd or pg stats from osd." << i << " since " << t->second << ", " << diff
-            << " seconds ago.  marking down" << dendl;
+       mon->clog->info() << "osd." << i << " marked down after no beacon for "
+                         << diff << " seconds";
+       derr << "no beacon from osd." << i << " since " << t->second
+            << ", " << diff << " seconds ago.  marking down" << dendl;
        pending_inc.new_state[i] = CEPH_OSD_UP;
        new_down = true;
       }
@@ -3481,45 +3320,149 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
     int num_in_osds = 0;
     int num_down_in_osds = 0;
     set<int> osds;
+    set<int> down_in_osds;
+    set<int> up_in_osds;
+    set<int> subtree_up;
+    unordered_map<int, set<int> > subtree_type_down;
+    unordered_map<int, int> num_osds_subtree;
+    int max_type = osdmap.crush->get_max_type_id();
+
     for (int i = 0; i < osdmap.get_max_osd(); i++) {
       if (!osdmap.exists(i)) {
         if (osdmap.crush->item_exists(i)) {
           osds.insert(i);
         }
-        continue;
+       continue;
       } 
       if (osdmap.is_out(i))
         continue;
       ++num_in_osds;
+      if (down_in_osds.count(i) || up_in_osds.count(i))
+       continue;
       if (!osdmap.is_up(i)) {
-       ++num_down_in_osds;
-       if (detail) {
-         const osd_info_t& info = osdmap.get_info(i);
-         ostringstream ss;
-         ss << "osd." << i << " is down since epoch " << info.down_at
-            << ", last address " << osdmap.get_addr(i);
-         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+       down_in_osds.insert(i);
+       int parent_id = 0;
+       int current = i;
+       for (int type = 0; type <= max_type; type++) {
+         if (!osdmap.crush->get_type_name(type))
+           continue;
+         int r = osdmap.crush->get_immediate_parent_id(current, &parent_id);
+         if (r == -ENOENT)
+           break;
+         // break early if this parent is already marked as up
+         if (subtree_up.count(parent_id))
+           break;
+         type = osdmap.crush->get_bucket_type(parent_id);
+         if (!osdmap.subtree_type_is_down(
+               g_ceph_context, parent_id, type,
+               &down_in_osds, &up_in_osds, &subtree_up, &subtree_type_down))
+           break;
+         current = parent_id;
+       }
+      }
+    }
+
+    // calculate the number of down osds in each down subtree and
+    // store it in num_osds_subtree
+    for (int type = 1; type <= max_type; type++) {
+      if (!osdmap.crush->get_type_name(type))
+       continue;
+      for (auto j = subtree_type_down[type].begin();
+          j != subtree_type_down[type].end();
+          ++j) {
+       if (type == 1) {
+          list<int> children;
+          int num = osdmap.crush->get_children(*j, &children);
+          num_osds_subtree[*j] = num;
+        } else {
+          list<int> children;
+          int num = 0;
+          int num_children = osdmap.crush->get_children(*j, &children);
+          if (num_children == 0)
+           continue;
+          for (auto l = children.begin(); l != children.end(); ++l) {
+            if (num_osds_subtree[*l] > 0) {
+              num = num + num_osds_subtree[*l];
+            }
+          }
+          num_osds_subtree[*j] = num;
        }
       }
     }
+    num_down_in_osds = down_in_osds.size();
     assert(num_down_in_osds <= num_in_osds);
     if (num_down_in_osds > 0) {
+      // summary of down subtree types and osds
+      for (int type = max_type; type > 0; type--) {
+       if (!osdmap.crush->get_type_name(type))
+         continue;
+       if (subtree_type_down[type].size() > 0) {
+         ostringstream ss;
+         ss << subtree_type_down[type].size() << " "
+            << osdmap.crush->get_type_name(type);
+         if (subtree_type_down[type].size() > 1) {
+           ss << "s";
+         }
+         int sum_down_osds = 0;
+         for (auto j = subtree_type_down[type].begin();
+              j != subtree_type_down[type].end();
+              ++j) {
+           sum_down_osds = sum_down_osds + num_osds_subtree[*j];
+         }
+          ss << " (" << sum_down_osds << " osds) down";
+         summary.push_back(make_pair(HEALTH_WARN, ss.str()));
+       }
+      }
       ostringstream ss;
-      ss << num_down_in_osds << "/" << num_in_osds << " in osds are down";
+      ss << down_in_osds.size() << " osds down";
       summary.push_back(make_pair(HEALTH_WARN, ss.str()));
+
+      if (detail) {
+       // details of down subtree types
+       for (int type = max_type; type > 0; type--) {
+         if (!osdmap.crush->get_type_name(type))
+           continue;
+         for (auto j = subtree_type_down[type].rbegin();
+              j != subtree_type_down[type].rend();
+              ++j) {
+           ostringstream ss;
+           ss << osdmap.crush->get_type_name(type);
+           ss << " ";
+           ss << osdmap.crush->get_item_name(*j);
+           // at the top level, do not print location
+           if (type != max_type) {
+              ss << " (";
+              ss << osdmap.crush->get_full_location_ordered_string(*j);
+              ss << ")";
+           }
+           int num = num_osds_subtree[*j];
+           ss << " (" << num << " osds)";
+           ss << " is down";
+           detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+         }
+        }
+       // details of down osds
+       for (auto it = down_in_osds.begin(); it != down_in_osds.end(); ++it) {
+         ostringstream ss;
+         ss << "osd." << *it << " (";
+         ss << osdmap.crush->get_full_location_ordered_string(*it);
+          ss << ") is down";
+         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+       }
+      }
     }
 
     if (!osds.empty()) {
       ostringstream ss;
-      ss << "osds were removed from osdmap, but still kept in crushmap";
+      ss << osds.size() << " osds exist in the crush map but not in the osdmap";
       summary.push_back(make_pair(HEALTH_WARN, ss.str()));
       if (detail) {
-        ss << " osds: [" << osds << "]";
+        ss << " (osds: " << osds << ")";
         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
       }
     }
 
-    if (osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+    if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
       // An osd could configure failsafe ratio, to something different
       // but for now assume it is the same here.
       float fsr = g_conf->osd_failsafe_full_ratio;
@@ -3563,8 +3506,8 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
        summary.push_back(make_pair(HEALTH_ERR, ss.str()));
       }
 
-      map<int, float> full, backfillfull, nearfull;
-      osdmap.get_full_osd_util(mon->pgmon()->pg_map.osd_stat, &full, &backfillfull, &nearfull);
+      set<int> full, backfillfull, nearfull;
+      osdmap.get_full_osd_counts(&full, &backfillfull, &nearfull);
       if (full.size()) {
        ostringstream ss;
        ss << full.size() << " full osd(s)";
@@ -3583,20 +3526,72 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
       if (detail) {
         for (auto& i: full) {
           ostringstream ss;
-          ss << "osd." << i.first << " is full at " << roundf(i.second * 100) << "%";
+          ss << "osd." << i << " is full";
          detail->push_back(make_pair(HEALTH_ERR, ss.str()));
         }
         for (auto& i: backfillfull) {
           ostringstream ss;
-          ss << "osd." << i.first << " is backfill full at " << roundf(i.second * 100) << "%";
+          ss << "osd." << i << " is backfill full";
          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
         }
         for (auto& i: nearfull) {
           ostringstream ss;
-          ss << "osd." << i.first << " is near full at " << roundf(i.second * 100) << "%";
+          ss << "osd." << i << " is near full";
          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
         }
       }
+
+      // warn if there is any noup osds.
+      vector<int> noup_osds;
+      osdmap.get_noup_osds(&noup_osds);
+      if (noup_osds.size()) {
+        ostringstream ss;
+        ss << noup_osds.size() << " noup osd(s)";
+        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
+        if (detail) {
+          ss << ": " << noup_osds;
+          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+        }
+      }
+
+      // warn if there is any nodown osds.
+      vector<int> nodown_osds;
+      osdmap.get_nodown_osds(&nodown_osds);
+      if (nodown_osds.size()) {
+        ostringstream ss;
+        ss << nodown_osds.size() << " nodown osd(s)";
+        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
+        if (detail) {
+          ss << ": " << nodown_osds;
+          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+        }
+      }
+
+      // warn if there is any noin osds.
+      vector<int> noin_osds;
+      osdmap.get_noin_osds(&noin_osds);
+      if (noin_osds.size()) {
+        ostringstream ss;
+        ss << noin_osds.size() << " noin osd(s)";
+        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
+        if (detail) {
+          ss << ": " << noin_osds;
+          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+        }
+      }
+
+      // warn if there is any noout osds.
+      vector<int> noout_osds;
+      osdmap.get_noout_osds(&noout_osds);
+      if (noout_osds.size()) {
+        ostringstream ss;
+        ss << noout_osds.size() << " noout osd(s)";
+        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
+        if (detail) {
+          ss << ": " << noout_osds;
+          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+        }
+      }
     }
     // note: we leave it to ceph-mgr to generate details health warnings
     // with actual osd utilizations
@@ -3678,6 +3673,16 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
       }
     }
 
+    if (osdmap.crush->has_multirule_rulesets()) {
+      ostringstream ss;
+      ss << "CRUSH map contains multirule rulesets";
+      summary.push_back(make_pair(HEALTH_WARN, ss.str()));
+      if (detail) {
+       ss << "; please manually fix the map";
+       detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+      }
+    }
+
     // Not using 'sortbitwise' and should be?
     if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE) &&
         (osdmap.get_up_osd_features() &
@@ -3714,25 +3719,25 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
     if (g_conf->mon_debug_no_require_luminous) {
       // ignore these checks
     } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_LUMINOUS) &&
-       !osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
-      string msg = "all OSDs are running luminous or later but the"
-       " 'require_luminous_osds' osdmap flag is not set";
+              osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      string msg = "all OSDs are running luminous or later but"
+       " require_osd_release < luminous";
       summary.push_back(make_pair(HEALTH_WARN, msg));
       if (detail) {
        detail->push_back(make_pair(HEALTH_WARN, msg));
       }
     } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_KRAKEN) &&
-       !osdmap.test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
-      string msg = "all OSDs are running kraken or later but the"
-       " 'require_kraken_osds' osdmap flag is not set";
+              osdmap.require_osd_release < CEPH_RELEASE_KRAKEN) {
+      string msg = "all OSDs are running kraken or later but"
+       " require_osd_release < kraken";
       summary.push_back(make_pair(HEALTH_WARN, msg));
       if (detail) {
        detail->push_back(make_pair(HEALTH_WARN, msg));
       }
     } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_JEWEL) &&
-              !osdmap.test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
-      string msg = "all OSDs are running jewel or later but the"
-       " 'require_jewel_osds' osdmap flag is not set";
+              osdmap.require_osd_release < CEPH_RELEASE_JEWEL) {
+      string msg = "all OSDs are running jewel or later but"
+       " require_osd_release < jewel";
       summary.push_back(make_pair(HEALTH_WARN, msg));
       if (detail) {
        detail->push_back(make_pair(HEALTH_WARN, msg));
@@ -3771,7 +3776,7 @@ void OSDMonitor::dump_info(Formatter *f)
 namespace {
   enum osd_pool_get_choices {
     SIZE, MIN_SIZE, CRASH_REPLAY_INTERVAL,
-    PG_NUM, PGP_NUM, CRUSH_RULE, CRUSH_RULESET, HASHPSPOOL,
+    PG_NUM, PGP_NUM, CRUSH_RULE, HASHPSPOOL,
     NODELETE, NOPGCHANGE, NOSIZECHANGE,
     WRITE_FADVISE_DONTNEED, NOSCRUB, NODEEP_SCRUB,
     HIT_SET_TYPE, HIT_SET_PERIOD, HIT_SET_COUNT, HIT_SET_FPP,
@@ -3838,15 +3843,15 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   }
   else if (prefix == "osd perf" ||
           prefix == "osd blocked-by") {
-    const PGMap &pgm = mon->pgmon()->pg_map;
-    r = process_pg_map_command(prefix, cmdmap, pgm, osdmap,
-                              f.get(), &ss, &rdata);
+    r = mon->pgservice->process_pg_command(prefix, cmdmap,
+                                          osdmap, f.get(), &ss, &rdata);
   }
   else if (prefix == "osd dump" ||
           prefix == "osd tree" ||
           prefix == "osd ls" ||
           prefix == "osd getmap" ||
-          prefix == "osd getcrushmap") {
+          prefix == "osd getcrushmap" ||
+          prefix == "osd ls-tree") {
     string val;
 
     epoch_t epoch = 0;
@@ -3908,13 +3913,39 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       }
       rdata.append(ds);
     } else if (prefix == "osd tree") {
+      vector<string> states;
+      cmd_getval(g_ceph_context, cmdmap, "states", states);
+      unsigned filter = 0;
+      for (auto& s : states) {
+       if (s == "up") {
+         filter |= OSDMap::DUMP_UP;
+       } else if (s == "down") {
+         filter |= OSDMap::DUMP_DOWN;
+       } else if (s == "in") {
+         filter |= OSDMap::DUMP_IN;
+       } else if (s == "out") {
+         filter |= OSDMap::DUMP_OUT;
+       } else {
+         ss << "unrecognized state '" << s << "'";
+         r = -EINVAL;
+         goto reply;
+       }
+      }
+      if ((filter & (OSDMap::DUMP_IN|OSDMap::DUMP_OUT)) ==
+         (OSDMap::DUMP_IN|OSDMap::DUMP_OUT) ||
+         (filter & (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) ==
+         (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) {
+       ss << "cannot specify both up and down or both in and out";
+       r = -EINVAL;
+       goto reply;
+      }
       if (f) {
        f->open_object_section("tree");
-       p->print_tree(f.get(), NULL);
+       p->print_tree(f.get(), NULL, filter);
        f->close_section();
        f->flush(ds);
       } else {
-       p->print_tree(NULL, &ds);
+       p->print_tree(NULL, &ds, filter);
       }
       rdata.append(ds);
     } else if (prefix == "osd getmap") {
@@ -3922,14 +3953,50 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       ss << "got osdmap epoch " << p->get_epoch();
     } else if (prefix == "osd getcrushmap") {
       p->crush->encode(rdata, mon->get_quorum_con_features());
-      ss << "got crush map from osdmap epoch " << p->get_epoch();
+      ss << p->get_crush_version();
+    } else if (prefix == "osd ls-tree") {
+      string bucket_name;
+      cmd_getval(g_ceph_context, cmdmap, "name", bucket_name);
+      set<int> osds;
+      r = p->get_osds_by_bucket_name(bucket_name, &osds);
+      if (r == -ENOENT) {
+        ss << "\"" << bucket_name << "\" does not exist";
+        goto reply;
+      } else if (r < 0) {
+        ss << "can not parse bucket name:\"" << bucket_name << "\"";
+        goto reply;
+      }
+
+      if (f) {
+        f->open_array_section("osds");
+        for (auto &i : osds) {
+          if (osdmap.exists(i)) {
+            f->dump_int("osd", i);
+          }
+        }
+        f->close_section();
+        f->flush(ds);
+      } else {
+        bool first = true;
+        for (auto &i : osds) {
+          if (osdmap.exists(i)) {
+            if (!first)
+              ds << "\n";
+            first = false;
+            ds << i;
+          }
+        }
+      }
+
+      rdata.append(ds);
     }
     if (p != &osdmap)
       delete p;
   } else if (prefix == "osd df") {
     string method;
     cmd_getval(g_ceph_context, cmdmap, "output_method", method);
-    print_utilization(ds, f ? f.get() : NULL, method == "tree");
+    print_osd_utilization(osdmap, mon->pgservice, ds,
+                         f.get(), method == "tree");
     rdata.append(ds);
   } else if (prefix == "osd getmaxosd") {
     if (f) {
@@ -4023,6 +4090,20 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       f->close_section();
     }
     f->flush(rdata);
+  } else if (prefix == "osd versions") {
+    if (!f)
+      f.reset(Formatter::create("json-pretty"));
+    count_metadata("ceph_version", f.get());
+    f->flush(rdata);
+    r = 0;
+  } else if (prefix == "osd count-metadata") {
+    if (!f)
+      f.reset(Formatter::create("json-pretty"));
+    string field;
+    cmd_getval(g_ceph_context, cmdmap, "property", field);
+    count_metadata(field, f.get());
+    f->flush(rdata);
+    r = 0;
   } else if (prefix == "osd map") {
     string poolstr, objstr, namespacestr;
     cmd_getval(g_ceph_context, cmdmap, "pool", poolstr);
@@ -4283,7 +4364,6 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       {"crash_replay_interval", CRASH_REPLAY_INTERVAL},
       {"pg_num", PG_NUM}, {"pgp_num", PGP_NUM},
       {"crush_rule", CRUSH_RULE},
-      {"crush_ruleset", CRUSH_RULESET},
       {"hashpspool", HASHPSPOOL}, {"nodelete", NODELETE},
       {"nopgchange", NOPGCHANGE}, {"nosizechange", NOSIZECHANGE},
       {"noscrub", NOSCRUB}, {"nodeep-scrub", NODEEP_SCRUB},
@@ -4402,16 +4482,13 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
                        p->get_crash_replay_interval());
            break;
          case CRUSH_RULE:
-           if (osdmap.crush->rule_exists(p->get_crush_ruleset())) {
+           if (osdmap.crush->rule_exists(p->get_crush_rule())) {
              f->dump_string("crush_rule", osdmap.crush->get_rule_name(
-                              p->get_crush_ruleset()));
+                              p->get_crush_rule()));
            } else {
-             f->dump_string("crush_rule", stringify(p->get_crush_ruleset()));
+             f->dump_string("crush_rule", stringify(p->get_crush_rule()));
            }
            break;
-         case CRUSH_RULESET:
-           f->dump_int("crush_ruleset", p->get_crush_ruleset());
-           break;
          case HASHPSPOOL:
          case NODELETE:
          case NOPGCHANGE:
@@ -4566,16 +4643,13 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
              p->get_crash_replay_interval() << "\n";
            break;
          case CRUSH_RULE:
-           if (osdmap.crush->rule_exists(p->get_crush_ruleset())) {
+           if (osdmap.crush->rule_exists(p->get_crush_rule())) {
              ss << "crush_rule: " << osdmap.crush->get_rule_name(
-               p->get_crush_ruleset()) << "\n";
+               p->get_crush_rule()) << "\n";
            } else {
-             ss << "crush_rule: " << p->get_crush_ruleset() << "\n";
+             ss << "crush_rule: " << p->get_crush_rule() << "\n";
            }
            break;
-         case CRUSH_RULESET:
-           ss << "crush_ruleset: " << p->get_crush_ruleset() << "\n";
-           break;
          case HIT_SET_PERIOD:
            ss << "hit_set_period: " << p->hit_set_period << "\n";
            break;
@@ -4704,9 +4778,8 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
     r = 0;
   } else if (prefix == "osd pool stats") {
-    const auto &pgm = mon->pgmon()->pg_map;
-    r = process_pg_map_command(prefix, cmdmap, pgm, osdmap,
-                              f.get(), &ss, &rdata);
+    r = mon->pgservice->process_pg_command(prefix, cmdmap,
+                                          osdmap, f.get(), &ss, &rdata);
   } else if (prefix == "osd pool get-quota") {
     string pool_name;
     cmd_getval(g_ceph_context, cmdmap, "pool", pool_name);
@@ -4771,7 +4844,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     } else {
       int ruleno = osdmap.crush->get_rule_id(name);
       if (ruleno < 0) {
-       ss << "unknown crush ruleset '" << name << "'";
+       ss << "unknown crush rule '" << name << "'";
        r = ruleno;
        goto reply;
       }
@@ -4879,17 +4952,17 @@ void OSDMonitor::update_pool_flags(int64_t pool_id, uint64_t flags)
 
 bool OSDMonitor::update_pools_status()
 {
-  if (!mon->pgmon()->is_readable())
+  if (!mon->pgservice->is_readable())
     return false;
 
   bool ret = false;
 
   auto& pools = osdmap.get_pools();
   for (auto it = pools.begin(); it != pools.end(); ++it) {
-    if (!mon->pgmon()->pg_map.pg_pool_sum.count(it->first))
+    const pool_stat_t *pstat = mon->pgservice->get_pool_stat(it->first);
+    if (!pstat)
       continue;
-    pool_stat_t& stats = mon->pgmon()->pg_map.pg_pool_sum[it->first];
-    object_stat_sum_t& sum = stats.stats.sum;
+    const object_stat_sum_t& sum = pstat->stats.sum;
     const pg_pool_t &pool = it->second;
     const string& pool_name = osdmap.get_pool_name(it->first);
 
@@ -4935,10 +5008,10 @@ void OSDMonitor::get_pools_health(
 {
   auto& pools = osdmap.get_pools();
   for (auto it = pools.begin(); it != pools.end(); ++it) {
-    if (!mon->pgmon()->pg_map.pg_pool_sum.count(it->first))
+    const pool_stat_t *pstat = mon->pgservice->get_pool_stat(it->first);
+    if (!pstat)
       continue;
-    pool_stat_t& stats = mon->pgmon()->pg_map.pg_pool_sum[it->first];
-    object_stat_sum_t& sum = stats.stats.sum;
+    const object_stat_sum_t& sum = pstat->stats.sum;
     const pg_pool_t &pool = it->second;
     const string& pool_name = osdmap.get_pool_name(it->first);
 
@@ -5023,14 +5096,14 @@ int OSDMonitor::prepare_new_pool(MonOpRequestRef op)
     return -EPERM;
   string erasure_code_profile;
   stringstream ss;
-  string ruleset_name;
+  string rule_name;
   if (m->auid)
-    return prepare_new_pool(m->name, m->auid, m->crush_rule, ruleset_name,
+    return prepare_new_pool(m->name, m->auid, m->crush_rule, rule_name,
                            0, 0,
                             erasure_code_profile,
                            pg_pool_t::TYPE_REPLICATED, 0, FAST_READ_OFF, &ss);
   else
-    return prepare_new_pool(m->name, session->auid, m->crush_rule, ruleset_name,
+    return prepare_new_pool(m->name, session->auid, m->crush_rule, rule_name,
                            0, 0,
                             erasure_code_profile,
                            pg_pool_t::TYPE_REPLICATED, 0, FAST_READ_OFF, &ss);
@@ -5138,14 +5211,14 @@ int OSDMonitor::normalize_profile(const string& profilename,
   return 0;
 }
 
-int OSDMonitor::crush_ruleset_create_erasure(const string &name,
+int OSDMonitor::crush_rule_create_erasure(const string &name,
                                             const string &profile,
-                                            int *ruleset,
+                                            int *rule,
                                             ostream *ss)
 {
   int ruleid = osdmap.crush->get_rule_id(name);
   if (ruleid != -ENOENT) {
-    *ruleset = osdmap.crush->get_rule_mask_ruleset(ruleid);
+    *rule = osdmap.crush->get_rule_mask_ruleset(ruleid);
     return -EEXIST;
   }
 
@@ -5154,7 +5227,7 @@ int OSDMonitor::crush_ruleset_create_erasure(const string &name,
 
   ruleid = newcrush.get_rule_id(name);
   if (ruleid != -ENOENT) {
-    *ruleset = newcrush.get_rule_mask_ruleset(ruleid);
+    *rule = newcrush.get_rule_mask_ruleset(ruleid);
     return -EALREADY;
   } else {
     ErasureCodeInterfaceRef erasure_code;
@@ -5168,7 +5241,7 @@ int OSDMonitor::crush_ruleset_create_erasure(const string &name,
     erasure_code.reset();
     if (err < 0)
       return err;
-    *ruleset = err;
+    *rule = err;
     pending_inc.crush.clear();
     newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
     return 0;
@@ -5252,12 +5325,12 @@ bool OSDMonitor::validate_crush_against_features(const CrushWrapper *newcrush,
   newmap.apply_incremental(new_pending);
 
   // client compat
-  if (newmap.require_min_compat_client.length()) {
+  if (newmap.require_min_compat_client > 0) {
     auto mv = newmap.get_min_compat_client();
-    if (mv.first > newmap.require_min_compat_client) {
-      ss << "new crush map requires client version " << mv
+    if (mv > newmap.require_min_compat_client) {
+      ss << "new crush map requires client version " << ceph_release_name(mv)
         << " but require_min_compat_client is "
-        << newmap.require_min_compat_client;
+        << ceph_release_name(newmap.require_min_compat_client);
       return false;
     }
   }
@@ -5398,40 +5471,40 @@ int OSDMonitor::prepare_pool_stripe_width(const unsigned pool_type,
   return err;
 }
 
-int OSDMonitor::prepare_pool_crush_ruleset(const unsigned pool_type,
+int OSDMonitor::prepare_pool_crush_rule(const unsigned pool_type,
                                           const string &erasure_code_profile,
-                                          const string &ruleset_name,
-                                          int *crush_ruleset,
+                                          const string &rule_name,
+                                          int *crush_rule,
                                           ostream *ss)
 {
 
-  if (*crush_ruleset < 0) {
+  if (*crush_rule < 0) {
     switch (pool_type) {
     case pg_pool_t::TYPE_REPLICATED:
       {
-       if (ruleset_name == "") {
-         //Use default ruleset
-         *crush_ruleset = osdmap.crush->get_osd_pool_default_crush_replicated_ruleset(g_ceph_context);
-         if (*crush_ruleset < 0) {
-           // Errors may happen e.g. if no valid ruleset is available
-           *ss << "No suitable CRUSH ruleset exists, check "
+       if (rule_name == "") {
+         //Use default rule
+         *crush_rule = osdmap.crush->get_osd_pool_default_crush_replicated_ruleset(g_ceph_context);
+         if (*crush_rule < 0) {
+           // Errors may happen e.g. if no valid rule is available
+           *ss << "No suitable CRUSH rule exists, check "
                 << "'osd pool default crush *' config options";
            return -ENOENT;
          }
        } else {
-         return get_crush_ruleset(ruleset_name, crush_ruleset, ss);
+         return get_crush_rule(rule_name, crush_rule, ss);
        }
       }
       break;
     case pg_pool_t::TYPE_ERASURE:
       {
-       int err = crush_ruleset_create_erasure(ruleset_name,
+       int err = crush_rule_create_erasure(rule_name,
                                               erasure_code_profile,
-                                              crush_ruleset, ss);
+                                              crush_rule, ss);
        switch (err) {
        case -EALREADY:
-         dout(20) << "prepare_pool_crush_ruleset: ruleset "
-                  << ruleset_name << " try again" << dendl;
+         dout(20) << "prepare_pool_crush_rule: rule "
+                  << rule_name << " try again" << dendl;
          // fall through
        case 0:
          // need to wait for the crush rule to be proposed before proceeding
@@ -5445,14 +5518,14 @@ int OSDMonitor::prepare_pool_crush_ruleset(const unsigned pool_type,
       }
       break;
     default:
-      *ss << "prepare_pool_crush_ruleset: " << pool_type
+      *ss << "prepare_pool_crush_rule: " << pool_type
         << " is not a known pool type";
       return -EINVAL;
       break;
     }
   } else {
-    if (!osdmap.crush->ruleset_exists(*crush_ruleset)) {
-      *ss << "CRUSH ruleset " << *crush_ruleset << " not found";
+    if (!osdmap.crush->ruleset_exists(*crush_rule)) {
+      *ss << "CRUSH rule " << *crush_rule << " not found";
       return -ENOENT;
     }
   }
@@ -5460,28 +5533,28 @@ int OSDMonitor::prepare_pool_crush_ruleset(const unsigned pool_type,
   return 0;
 }
 
-int OSDMonitor::get_crush_ruleset(const string &ruleset_name,
-                                 int *crush_ruleset,
+int OSDMonitor::get_crush_rule(const string &rule_name,
+                                 int *crush_rule,
                                  ostream *ss)
 {
   int ret;
-  ret = osdmap.crush->get_rule_id(ruleset_name);
+  ret = osdmap.crush->get_rule_id(rule_name);
   if (ret != -ENOENT) {
     // found it, use it
-    *crush_ruleset = ret;
+    *crush_rule = ret;
   } else {
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
 
-    ret = newcrush.get_rule_id(ruleset_name);
+    ret = newcrush.get_rule_id(rule_name);
     if (ret != -ENOENT) {
       // found it, wait for it to be proposed
-      dout(20) << __func__ << ": ruleset " << ruleset_name
+      dout(20) << __func__ << ": rule " << rule_name
               << " try again" << dendl;
       return -EAGAIN;
     } else {
       //Cannot find it , return error
-      *ss << "specified ruleset " << ruleset_name << " doesn't exist";
+      *ss << "specified rule " << rule_name << " doesn't exist";
       return ret;
     }
   }
@@ -5491,8 +5564,8 @@ int OSDMonitor::get_crush_ruleset(const string &ruleset_name,
 /**
  * @param name The name of the new pool
  * @param auid The auid of the pool owner. Can be -1
- * @param crush_ruleset The crush rule to use. If <0, will use the system default
- * @param crush_ruleset_name The crush rule to use, if crush_rulset <0
+ * @param crush_rule The crush rule to use. If <0, will use the system default
+ * @param crush_rule_name The crush rule to use, if crush_rulset <0
  * @param pg_num The pg_num to use. If set to 0, will use the system default
  * @param pgp_num The pgp_num to use. If set to 0, will use the system default
  * @param erasure_code_profile The profile name in OSDMap to be used for erasure code
@@ -5504,8 +5577,8 @@ int OSDMonitor::get_crush_ruleset(const string &ruleset_name,
  * @return 0 on success, negative errno on failure.
  */
 int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
-                                int crush_ruleset,
-                                const string &crush_ruleset_name,
+                                int crush_rule,
+                                const string &crush_rule_name,
                                  unsigned pg_num, unsigned pgp_num,
                                 const string &erasure_code_profile,
                                  const unsigned pool_type,
@@ -5535,10 +5608,10 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
     return -EINVAL;
   }
   int r;
-  r = prepare_pool_crush_ruleset(pool_type, erasure_code_profile,
-                                crush_ruleset_name, &crush_ruleset, ss);
+  r = prepare_pool_crush_rule(pool_type, erasure_code_profile,
+                                crush_rule_name, &crush_rule, ss);
   if (r) {
-    dout(10) << " prepare_pool_crush_ruleset returns " << r << dendl;
+    dout(10) << " prepare_pool_crush_rule returns " << r << dendl;
     return r;
   }
   CrushWrapper newcrush;
@@ -5552,7 +5625,7 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
     r = tester.test_with_crushtool(g_conf->crushtool.c_str(),
                                 osdmap.get_max_osd(),
                                 g_conf->mon_lease,
-                                crush_ruleset);
+                                crush_rule);
   }
   if (r) {
     dout(10) << " tester.test_with_crushtool returns " << r
@@ -5567,7 +5640,7 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
     return r;
   }
 
-  if (!osdmap.crush->check_crush_rule(crush_ruleset, pool_type, size, *ss)) {
+  if (!osdmap.crush->check_crush_rule(crush_rule, pool_type, size, *ss)) {
     return -EINVAL;
   }
 
@@ -5627,7 +5700,7 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
 
   pi->size = size;
   pi->min_size = min_size;
-  pi->crush_ruleset = crush_ruleset;
+  pi->crush_rule = crush_rule;
   pi->expected_num_objects = expected_num_objects;
   pi->object_hash = CEPH_STR_HASH_RJENKINS;
   pi->set_pg_num(pg_num);
@@ -5674,28 +5747,6 @@ bool OSDMonitor::prepare_unset_flag(MonOpRequestRef op, int flag)
   return true;
 }
 
-int OSDMonitor::parse_osd_id(const char *s, stringstream *pss)
-{
-  // osd.NNN?
-  if (strncmp(s, "osd.", 4) == 0) {
-    s += 4;
-  }
-
-  // NNN?
-  ostringstream ss;
-  long id = parse_pos_long(s, &ss);
-  if (id < 0) {
-    *pss << ss.str();
-    return id;
-  }
-  if (id > 0xffff) {
-    *pss << "osd id " << id << " is too large";
-    return -ERANGE;
-  }
-  return id;
-}
-
-
 int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
                                          stringstream& ss)
 {
@@ -5878,21 +5929,7 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
     if (!osdmap.crush->check_crush_rule(id, p.get_type(), p.get_size(), ss)) {
       return -EINVAL;
     }
-    p.crush_ruleset = id;
-  } else if (var == "crush_ruleset") {
-    if (interr.length()) {
-      ss << "error parsing integer value '" << val << "': " << interr;
-      return -EINVAL;
-    }
-    if (!osdmap.crush->ruleset_exists(n)) {
-      ss << "crush ruleset " << n << " does not exist";
-      return -ENOENT;
-    }
-
-    if (!osdmap.crush->check_crush_rule(n, p.get_type(), p.get_size(), ss)) {
-      return -EINVAL;
-    }
-    p.crush_ruleset = n;
+    p.crush_rule = id;
   } else if (var == "nodelete" || var == "nopgchange" ||
             var == "nosizechange" || var == "write_fadvise_dontneed" ||
             var == "noscrub" || var == "nodeep-scrub") {
@@ -5994,7 +6031,8 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
       return -EINVAL;
     }
     stringstream err;
-    if (!is_pool_currently_all_bluestore(pool, p, &err)) {
+    if (!g_conf->mon_debug_no_require_bluestore_for_ec_overwrites &&
+       !is_pool_currently_all_bluestore(pool, p, &err)) {
       ss << "pool must only be stored on bluestore for scrubbing to work: " << err.str();
       return -EINVAL;
     }
@@ -6182,6 +6220,482 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
   return 0;
 }
 
+int OSDMonitor::_prepare_command_osd_crush_remove(
+    CrushWrapper &newcrush,
+    int32_t id,
+    int32_t ancestor,
+    bool has_ancestor,
+    bool unlink_only)
+{
+  int err = 0;
+
+  if (has_ancestor) {
+    err = newcrush.remove_item_under(g_ceph_context, id, ancestor,
+        unlink_only);
+  } else {
+    err = newcrush.remove_item(g_ceph_context, id, unlink_only);
+  }
+  return err;
+}
+
+void OSDMonitor::do_osd_crush_remove(CrushWrapper& newcrush)
+{
+  pending_inc.crush.clear();
+  newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+}
+
+int OSDMonitor::prepare_command_osd_crush_remove(
+    CrushWrapper &newcrush,
+    int32_t id,
+    int32_t ancestor,
+    bool has_ancestor,
+    bool unlink_only)
+{
+  int err = _prepare_command_osd_crush_remove(
+      newcrush, id, ancestor,
+      has_ancestor, unlink_only);
+
+  if (err < 0)
+    return err;
+
+  assert(err == 0);
+  do_osd_crush_remove(newcrush);
+
+  return 0;
+}
+
+int OSDMonitor::prepare_command_osd_remove(int32_t id)
+{
+  if (osdmap.is_up(id)) {
+    return -EBUSY;
+  }
+
+  pending_inc.new_state[id] = osdmap.get_state(id);
+  pending_inc.new_uuid[id] = uuid_d();
+  pending_metadata_rm.insert(id);
+  pending_metadata.erase(id);
+
+  return 0;
+}
+
+int32_t OSDMonitor::_allocate_osd_id(int32_t* existing_id)
+{
+  assert(existing_id);
+  *existing_id = -1;
+
+  for (int32_t i = 0; i < osdmap.get_max_osd(); ++i) {
+    if (!osdmap.exists(i) &&
+        pending_inc.new_up_client.count(i) == 0 &&
+        (pending_inc.new_state.count(i) == 0 ||
+         (pending_inc.new_state[i] & CEPH_OSD_EXISTS) == 0)) {
+      *existing_id = i;
+      return -1;
+    }
+  }
+
+  if (pending_inc.new_max_osd < 0) {
+    return osdmap.get_max_osd();
+  }
+  return pending_inc.new_max_osd;
+}
+
+void OSDMonitor::do_osd_create(
+    const int32_t id,
+    const uuid_d& uuid,
+    int32_t* new_id)
+{
+  dout(10) << __func__ << " uuid " << uuid << dendl;
+  assert(new_id);
+
+  // We presume validation has been performed prior to calling this
+  // function. We assert with prejudice.
+
+  int32_t allocated_id = -1; // declare here so we can jump
+  int32_t existing_id = -1;
+  if (!uuid.is_zero()) {
+    existing_id = osdmap.identify_osd(uuid);
+    if (existing_id >= 0) {
+      assert(id < 0 || id == existing_id);
+      *new_id = existing_id;
+      goto out;
+    } else if (id >= 0) {
+      // uuid does not exist, and id has been provided, so just create
+      // the new osd.id
+      *new_id = id;
+      goto out;
+    }
+  }
+
+  // allocate a new id
+  allocated_id = _allocate_osd_id(&existing_id);
+  dout(10) << __func__ << " allocated id " << allocated_id
+           << " existing id " << existing_id << dendl;
+  if (existing_id >= 0) {
+    assert(existing_id < osdmap.get_max_osd());
+    assert(allocated_id < 0);
+    pending_inc.new_weight[existing_id] = CEPH_OSD_OUT;
+    *new_id = existing_id;
+
+  } else if (allocated_id >= 0) {
+    assert(existing_id < 0);
+    // raise max_osd
+    if (pending_inc.new_max_osd < 0) {
+      pending_inc.new_max_osd = osdmap.get_max_osd() + 1;
+    } else {
+      ++pending_inc.new_max_osd;
+    }
+    *new_id = pending_inc.new_max_osd - 1;
+    assert(*new_id == allocated_id);
+  } else {
+    assert(0 == "unexpected condition");
+  }
+
+out:
+  dout(10) << __func__ << " using id " << *new_id << dendl;
+  if (osdmap.get_max_osd() <= *new_id && pending_inc.new_max_osd <= *new_id) {
+    pending_inc.new_max_osd = *new_id + 1;
+  }
+
+  pending_inc.new_state[*new_id] |= CEPH_OSD_EXISTS | CEPH_OSD_NEW;
+  if (!uuid.is_zero())
+    pending_inc.new_uuid[*new_id] = uuid;
+}
+
+int OSDMonitor::validate_osd_create(
+    const int32_t id,
+    const uuid_d& uuid,
+    const bool check_osd_exists,
+    int32_t* existing_id,
+    stringstream& ss)
+{
+
+  dout(10) << __func__ << " id " << id << " uuid " << uuid
+           << " check_osd_exists " << check_osd_exists << dendl;
+
+  assert(existing_id);
+
+  if (id < 0 && uuid.is_zero()) {
+    // we have nothing to validate
+    *existing_id = -1;
+    return 0;
+  } else if (uuid.is_zero()) {
+    // we have an id but we will ignore it - because that's what
+    // `osd create` does.
+    return 0;
+  }
+
+  /*
+   * This function will be used to validate whether we are able to
+   * create a new osd when the `uuid` is specified.
+   *
+   * It will be used by both `osd create` and `osd new`, as the checks
+   * are basically the same when it pertains to osd id and uuid validation.
+   * However, `osd create` presumes an `uuid` is optional, for legacy
+   * reasons, while `osd new` requires the `uuid` to be provided. This
+   * means that `osd create` will not be idempotent if an `uuid` is not
+   * provided, but we will always guarantee the idempotency of `osd new`.
+   */
+
+  assert(!uuid.is_zero());
+  if (pending_inc.identify_osd(uuid) >= 0) {
+    // osd is about to exist
+    return -EAGAIN;
+  }
+
+  int32_t i = osdmap.identify_osd(uuid);
+  if (i >= 0) {
+    // osd already exists
+    if (id >= 0 && i != id) {
+      ss << "uuid " << uuid << " already in use for different id " << i;
+      return -EEXIST;
+    }
+    // return a positive errno to distinguish between a blocking error
+    // and an error we consider to not be a problem (i.e., this would be
+    // an idempotent operation).
+    *existing_id = i;
+    return EEXIST;
+  }
+  // i < 0
+  if (id >= 0) {
+    if (pending_inc.new_state.count(id)) {
+      // osd is about to exist
+      return -EAGAIN;
+    }
+    // we may not care if an osd exists if we are recreating a previously
+    // destroyed osd.
+    if (check_osd_exists && osdmap.exists(id)) {
+      ss << "id " << id << " already in use and does not match uuid "
+         << uuid;
+      return -EINVAL;
+    }
+  }
+  return 0;
+}
+
+int OSDMonitor::prepare_command_osd_create(
+    const int32_t id,
+    const uuid_d& uuid,
+    int32_t* existing_id,
+    stringstream& ss)
+{
+  dout(10) << __func__ << " id " << id << " uuid " << uuid << dendl;
+  assert(existing_id);
+
+  if (uuid.is_zero()) {
+    dout(10) << __func__ << " no uuid; assuming legacy `osd create`" << dendl;
+  }
+
+  return validate_osd_create(id, uuid, true, existing_id, ss);
+}
+
+int OSDMonitor::prepare_command_osd_new(
+    MonOpRequestRef op,
+    const map<string,cmd_vartype>& cmdmap,
+    const map<string,string>& secrets,
+    stringstream &ss,
+    Formatter *f)
+{
+  uuid_d uuid;
+  string uuidstr;
+  int64_t id = -1;
+
+  assert(paxos->is_plugged());
+
+  dout(10) << __func__ << " " << op << dendl;
+
+  /* validate command. abort now if something's wrong. */
+
+  /* `osd new` will expect a `uuid` to be supplied; `id` is optional.
+   *
+   * If `id` is not specified, we will identify any existing osd based
+   * on `uuid`. Operation will be idempotent iff secrets match.
+   *
+   * If `id` is specified, we will identify any existing osd based on
+   * `uuid` and match against `id`. If they match, operation will be
+   * idempotent iff secrets match.
+   *
+   * `-i secrets.json` will be optional. If supplied, will be used
+   * to check for idempotency when `id` and `uuid` match.
+   *
+   * If `id` is not specified, and `uuid` does not exist, an id will
+   * be found or allocated for the osd.
+   *
+   * If `id` is specified, and the osd has been previously marked
+   * as destroyed, then the `id` will be reused.
+   */
+  if (!cmd_getval(g_ceph_context, cmdmap, "uuid", uuidstr)) {
+    ss << "requires the OSD's UUID to be specified.";
+    return -EINVAL;
+  } else if (!uuid.parse(uuidstr.c_str())) {
+    ss << "invalid UUID value '" << uuidstr << "'.";
+    return -EINVAL;
+  }
+
+  if (cmd_getval(g_ceph_context, cmdmap, "id", id) &&
+      (id < 0)) {
+    ss << "invalid OSD id; must be greater or equal than zero.";
+    return -EINVAL;
+  }
+
+  // are we running an `osd create`-like command, or recreating
+  // a previously destroyed osd?
+
+  bool is_recreate_destroyed = (id >= 0 && osdmap.is_destroyed(id));
+
+  // we will care about `id` to assess whether osd is `destroyed`, or
+  // to create a new osd.
+  // we will need an `id` by the time we reach auth.
+
+  int32_t existing_id = -1;
+  int err = validate_osd_create(id, uuid, !is_recreate_destroyed,
+                                &existing_id, ss);
+
+  bool may_be_idempotent = false;
+  if (err == EEXIST) {
+    // this is idempotent from the osdmon's point-of-view
+    may_be_idempotent = true;
+    assert(existing_id >= 0);
+    id = existing_id;
+  } else if (err < 0) {
+    return err;
+  }
+
+  if (!may_be_idempotent) {
+    // idempotency is out of the window. We are either creating a new
+    // osd or recreating a destroyed osd.
+    //
+    // We now need to figure out if we have an `id` (and if it's valid),
+    // of find an `id` if we don't have one.
+
+    // NOTE: we need to consider the case where the `id` is specified for
+    // `osd create`, and we must honor it. So this means checking if
+    // the `id` is destroyed, and if so assume the destroy; otherwise,
+    // check if it `exists` - in which case we complain about not being
+    // `destroyed`. In the end, if nothing fails, we must allow the
+    // creation, so that we are compatible with `create`.
+    if (id >= 0 && osdmap.exists(id) && !osdmap.is_destroyed(id)) {
+      dout(10) << __func__ << " osd." << id << " isn't destroyed" << dendl;
+      ss << "OSD " << id << " has not yet been destroyed";
+      return -EINVAL;
+    } else if (id < 0) {
+      // find an `id`
+      id = _allocate_osd_id(&existing_id);
+      if (id < 0) {
+        assert(existing_id >= 0);
+        id = existing_id;
+      }
+      dout(10) << __func__ << " found id " << id << " to use" << dendl;
+    } else if (id >= 0 && osdmap.is_destroyed(id)) {
+      dout(10) << __func__ << " recreating osd." << id << dendl;
+    } else {
+      dout(10) << __func__ << " creating new osd." << id << dendl;
+    }
+  } else {
+    assert(id >= 0);
+    assert(osdmap.exists(id));
+  }
+
+  // we are now able to either create a brand new osd or reuse an existing
+  // osd that has been previously destroyed.
+
+  dout(10) << __func__ << " id " << id << " uuid " << uuid << dendl;
+
+  if (may_be_idempotent && secrets.empty()) {
+    // nothing to do, really.
+    dout(10) << __func__ << " idempotent and no secrets -- no op." << dendl;
+    assert(id >= 0);
+    if (f) {
+      f->open_object_section("created_osd");
+      f->dump_int("osdid", id);
+      f->close_section();
+    } else {
+      ss << id;
+    }
+    return EEXIST;
+  }
+
+  string cephx_secret, lockbox_secret, dmcrypt_key;
+  bool has_lockbox = false;
+  bool has_secrets = (!secrets.empty());
+
+  ConfigKeyService *svc = nullptr;
+  AuthMonitor::auth_entity_t cephx_entity, lockbox_entity;
+
+  if (has_secrets) {
+    if (secrets.count("cephx_secret") == 0) {
+      ss << "requires a cephx secret.";
+      return -EINVAL;
+    }
+    cephx_secret = secrets.at("cephx_secret");
+
+    bool has_lockbox_secret = (secrets.count("cephx_lockbox_secret") > 0);
+    bool has_dmcrypt_key = (secrets.count("dmcrypt_key") > 0);
+
+    dout(10) << __func__ << " has lockbox " << has_lockbox_secret
+             << " dmcrypt " << has_dmcrypt_key << dendl;
+
+    if (has_lockbox_secret && has_dmcrypt_key) {
+      has_lockbox = true;
+      lockbox_secret = secrets.at("cephx_lockbox_secret");
+      dmcrypt_key = secrets.at("dmcrypt_key");
+    } else if (!has_lockbox_secret != !has_dmcrypt_key) {
+      ss << "requires both a cephx lockbox secret and a dm-crypt key.";
+      return -EINVAL;
+    }
+
+    dout(10) << __func__ << " validate secrets using osd id " << id << dendl;
+
+    err = mon->authmon()->validate_osd_new(id, uuid,
+        cephx_secret,
+        lockbox_secret,
+        cephx_entity,
+        lockbox_entity,
+        ss);
+    if (err < 0) {
+      return err;
+    } else if (may_be_idempotent && err != EEXIST) {
+      // for this to be idempotent, `id` should already be >= 0; no need
+      // to use validate_id.
+      assert(id >= 0);
+      ss << "osd." << id << " exists but secrets do not match";
+      return -EEXIST;
+    }
+
+    if (has_lockbox) {
+      svc = (ConfigKeyService*)mon->config_key_service;
+      err = svc->validate_osd_new(uuid, dmcrypt_key, ss);
+      if (err < 0) {
+        return err;
+      } else if (may_be_idempotent && err != EEXIST) {
+        assert(id >= 0);
+        ss << "osd." << id << " exists but dm-crypt key does not match.";
+        return -EEXIST;
+      }
+    }
+  }
+  assert(!has_secrets || !cephx_secret.empty());
+  assert(!has_lockbox || !lockbox_secret.empty());
+
+  if (may_be_idempotent) {
+    // we have nothing to do for either the osdmon or the authmon,
+    // and we have no lockbox - so the config key service will not be
+    // touched. This is therefore an idempotent operation, and we can
+    // just return right away.
+    dout(10) << __func__ << " idempotent -- no op." << dendl;
+    assert(id >= 0);
+    if (f) {
+      f->open_object_section("created_osd");
+      f->dump_int("osdid", id);
+      f->close_section();
+    } else {
+      ss << id;
+    }
+    return EEXIST;
+  }
+  assert(!may_be_idempotent);
+
+  // perform updates.
+  if (has_secrets) {
+    assert(!cephx_secret.empty());
+    assert((lockbox_secret.empty() && dmcrypt_key.empty()) ||
+           (!lockbox_secret.empty() && !dmcrypt_key.empty()));
+
+    err = mon->authmon()->do_osd_new(cephx_entity,
+        lockbox_entity,
+        has_lockbox);
+    assert(0 == err);
+
+    if (has_lockbox) {
+      assert(nullptr != svc);
+      svc->do_osd_new(uuid, dmcrypt_key);
+    }
+  }
+
+  if (is_recreate_destroyed) {
+    assert(id >= 0);
+    assert(osdmap.is_destroyed(id));
+    pending_inc.new_weight[id] = CEPH_OSD_OUT;
+    pending_inc.new_state[id] |= CEPH_OSD_DESTROYED | CEPH_OSD_NEW;
+    pending_inc.new_uuid[id] = uuid;
+  } else {
+    assert(id >= 0);
+    int32_t new_id = -1;
+    do_osd_create(id, uuid, &new_id);
+    assert(new_id >= 0);
+    assert(id == new_id);
+  }
+
+  if (f) {
+    f->open_object_section("created_osd");
+    f->dump_int("osdid", id);
+    f->close_section();
+  } else {
+    ss << id;
+  }
+
+  return 0;
+}
+
 bool OSDMonitor::prepare_command(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
@@ -6239,6 +6753,148 @@ static int parse_reweights(CephContext *cct,
   return 0;
 }
 
+int OSDMonitor::prepare_command_osd_destroy(
+    int32_t id,
+    stringstream& ss)
+{
+  assert(paxos->is_plugged());
+
+  // we check if the osd exists for the benefit of `osd purge`, which may
+  // have previously removed the osd. If the osd does not exist, return
+  // -ENOENT to convey this, and let the caller deal with it.
+  //
+  // we presume that all auth secrets and config keys were removed prior
+  // to this command being called. if they exist by now, we also assume
+  // they must have been created by some other command and do not pertain
+  // to this non-existent osd.
+  if (!osdmap.exists(id)) {
+    dout(10) << __func__ << " osd." << id << " does not exist." << dendl;
+    return -ENOENT;
+  }
+
+  uuid_d uuid = osdmap.get_uuid(id);
+  dout(10) << __func__ << " destroying osd." << id
+           << " uuid " << uuid << dendl;
+
+  // if it has been destroyed, we assume our work here is done.
+  if (osdmap.is_destroyed(id)) {
+    ss << "destroyed osd." << id;
+    return 0;
+  }
+
+  EntityName cephx_entity, lockbox_entity;
+  bool idempotent_auth = false, idempotent_cks = false;
+
+  int err = mon->authmon()->validate_osd_destroy(id, uuid,
+                                                 cephx_entity,
+                                                 lockbox_entity,
+                                                 ss);
+  if (err < 0) {
+    if (err == -ENOENT) {
+      idempotent_auth = true;
+      err = 0;
+    } else {
+      return err;
+    }
+  }
+
+  ConfigKeyService *svc = (ConfigKeyService*)mon->config_key_service;
+  err = svc->validate_osd_destroy(id, uuid);
+  if (err < 0) {
+    assert(err == -ENOENT);
+    err = 0;
+    idempotent_cks = true;
+  }
+
+  if (!idempotent_auth) {
+    err = mon->authmon()->do_osd_destroy(cephx_entity, lockbox_entity);
+    assert(0 == err);
+  }
+
+  if (!idempotent_cks) {
+    svc->do_osd_destroy(id, uuid);
+  }
+
+  pending_inc.new_state[id] = CEPH_OSD_DESTROYED;
+  pending_inc.new_uuid[id] = uuid_d();
+
+  // we can only propose_pending() once per service, otherwise we'll be
+  // defying PaxosService and all laws of nature. Therefore, as we may
+  // be used during 'osd purge', let's keep the caller responsible for
+  // proposing.
+  assert(err == 0);
+  return 0;
+}
+
+int OSDMonitor::prepare_command_osd_purge(
+    int32_t id,
+    stringstream& ss)
+{
+  assert(paxos->is_plugged());
+  dout(10) << __func__ << " purging osd." << id << dendl;
+
+  assert(!osdmap.is_up(id));
+
+  /*
+   * This may look a bit weird, but this is what's going to happen:
+   *
+   *  1. we make sure that removing from crush works
+   *  2. we call `prepare_command_osd_destroy()`. If it returns an
+   *     error, then we abort the whole operation, as no updates
+   *     have been made. However, we this function will have
+   *     side-effects, thus we need to make sure that all operations
+   *     performed henceforth will *always* succeed.
+   *  3. we call `prepare_command_osd_remove()`. Although this
+   *     function can return an error, it currently only checks if the
+   *     osd is up - and we have made sure that it is not so, so there
+   *     is no conflict, and it is effectively an update.
+   *  4. finally, we call `do_osd_crush_remove()`, which will perform
+   *     the crush update we delayed from before.
+   */
+
+  CrushWrapper newcrush;
+  _get_pending_crush(newcrush);
+
+  bool may_be_idempotent = false;
+
+  int err = _prepare_command_osd_crush_remove(newcrush, id, 0, false, false);
+  if (err == -ENOENT) {
+    err = 0;
+    may_be_idempotent = true;
+  } else if (err < 0) {
+    ss << "error removing osd." << id << " from crush";
+    return err;
+  }
+
+  // no point destroying the osd again if it has already been marked destroyed
+  if (!osdmap.is_destroyed(id)) {
+    err = prepare_command_osd_destroy(id, ss);
+    if (err < 0) {
+      if (err == -ENOENT) {
+        err = 0;
+      } else {
+        return err;
+      }
+    } else {
+      may_be_idempotent = false;
+    }
+  }
+  assert(0 == err);
+
+  if (may_be_idempotent && !osdmap.exists(id)) {
+    dout(10) << __func__ << " osd." << id << " does not exist and "
+             << "we are idempotent." << dendl;
+    return -ENOENT;
+  }
+
+  err = prepare_command_osd_remove(id);
+  // we should not be busy, as we should have made sure this id is not up.
+  assert(0 == err);
+
+  do_osd_crush_remove(newcrush);
+  return 0;
+}
+
 bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                      map<string,cmd_vartype> &cmdmap)
 {
@@ -6298,6 +6954,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
  
   if (prefix == "osd setcrushmap" ||
       (prefix == "osd crush set" && !osdid_present)) {
+    if (pending_inc.crush.length()) {
+      dout(10) << __func__ << " waiting for pending crush update " << dendl;
+      wait_for_finished_proposal(op, new C_RetryMessage(this, op));
+      return true;
+    }
     dout(10) << "prepare_command setting new crush map" << dendl;
     bufferlist data(m->get_data());
     CrushWrapper crush;
@@ -6310,17 +6971,48 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       ss << "Failed to parse crushmap: " << e.what();
       goto reply;
     }
+  
+    int64_t prior_version = 0;
+    if (cmd_getval(g_ceph_context, cmdmap, "prior_version", prior_version)) {
+      if (prior_version == osdmap.get_crush_version() - 1) {
+       // see if we are a resend of the last update.  this is imperfect
+       // (multiple racing updaters may not both get reliable success)
+       // but we expect crush updaters (via this interface) to be rare-ish.
+       bufferlist current, proposed;
+       osdmap.crush->encode(current, mon->get_quorum_con_features());
+       crush.encode(proposed, mon->get_quorum_con_features());
+       if (current.contents_equal(proposed)) {
+         dout(10) << __func__
+                  << " proposed matches current and version equals previous"
+                  << dendl;
+         err = 0;
+         ss << osdmap.get_crush_version();
+         goto reply;
+       }
+      }
+      if (prior_version != osdmap.get_crush_version()) {
+       err = -EPERM;
+       ss << "prior_version " << prior_version << " != crush version "
+          << osdmap.get_crush_version();
+       goto reply;
+      }
+    }
 
+    if (crush.has_legacy_rulesets()) {
+      err = -EINVAL;
+      ss << "crush maps with ruleset != ruleid are no longer allowed";
+      goto reply;
+    }
     if (!validate_crush_against_features(&crush, ss)) {
       err = -EINVAL;
       goto reply;
     }
-    
+
     const auto& osdmap_pools = osdmap.get_pools();
     for (auto pit = osdmap_pools.begin(); pit != osdmap_pools.end(); ++pit) {
       const int64_t pool_id = pit->first;
       const pg_pool_t &pool = pit->second;
-      int ruleno = pool.get_crush_ruleset();
+      int ruleno = pool.get_crush_rule();
       if (!crush.rule_exists(ruleno)) {
        ss << " the crush rule no "<< ruleno << " for pool id " << pool_id << " is in use";
        err = -EINVAL;
@@ -6349,7 +7041,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     dout(10) << " result " << ess.str() << dendl;
 
     pending_inc.crush = data;
-    ss << "set crush map";
+    ss << osdmap.get_crush_version() + 1;
     goto update;
 
   } else if (prefix == "osd crush set-device-class") {
@@ -6685,8 +7377,53 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        err = 0;
       }
     } while (false);
-
-  } else if (prefix == "osd crush link") {
+  } else if (prefix == "osd crush swap-bucket") {
+    string source, dest, force;
+    cmd_getval(g_ceph_context, cmdmap, "source", source);
+    cmd_getval(g_ceph_context, cmdmap, "dest", dest);
+    cmd_getval(g_ceph_context, cmdmap, "force", force);
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    if (!newcrush.name_exists(source)) {
+      ss << "source item " << source << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+    if (!newcrush.name_exists(dest)) {
+      ss << "dest item " << dest << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+    int sid = newcrush.get_item_id(source);
+    int did = newcrush.get_item_id(dest);
+    int sparent;
+    if (newcrush.get_immediate_parent_id(sid, &sparent) == 0 &&
+       force != "--yes-i-really-mean-it") {
+      ss << "source item " << source << " is not an orphan bucket; pass --yes-i-really-mean-it to proceed anyway";
+      err = -EPERM;
+      goto reply;
+    }
+    if (newcrush.get_bucket_alg(sid) != newcrush.get_bucket_alg(did) &&
+       force != "--yes-i-really-mean-it") {
+      ss << "source bucket alg " << crush_alg_name(newcrush.get_bucket_alg(sid)) << " != "
+        << "dest bucket alg " << crush_alg_name(newcrush.get_bucket_alg(did))
+        << "; pass --yes-i-really-mean-it to proceed anyway";
+      err = -EPERM;
+      goto reply;
+    }
+    int r = newcrush.swap_bucket(g_ceph_context, sid, did);
+    if (r < 0) {
+      ss << "failed to swap bucket contents: " << cpp_strerror(r);
+      goto reply;
+    }
+    ss << "swapped bucket of " << source << " to " << dest;
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    wait_for_finished_proposal(op,
+                              new Monitor::C_Command(mon, op, err, ss.str(),
+                                                     get_last_committed() + 1));
+    return true;
+  } else if (prefix == "osd crush link") {
     // osd crush link <name> <loc1> [<loc2> ...]
     string name;
     cmd_getval(g_ceph_context, cmdmap, "name", name);
@@ -6768,6 +7505,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        return true;
       }
       int id = newcrush.get_item_id(name);
+      int ancestor = 0;
+
       bool unlink_only = prefix == "osd crush unlink";
       string ancestor_str;
       if (cmd_getval(g_ceph_context, cmdmap, "ancestor", ancestor_str)) {
@@ -6777,20 +7516,20 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             << "' does not appear in the crush map";
          break;
        }
-       int ancestor = newcrush.get_item_id(ancestor_str);
-       err = newcrush.remove_item_under(g_ceph_context, id, ancestor,
-                                        unlink_only);
-      } else {
-       err = newcrush.remove_item(g_ceph_context, id, unlink_only);
+        ancestor = newcrush.get_item_id(ancestor_str);
       }
+
+      err = prepare_command_osd_crush_remove(
+          newcrush,
+          id, ancestor,
+          (ancestor < 0), unlink_only);
+
       if (err == -ENOENT) {
        ss << "item " << id << " does not appear in that position";
        err = 0;
        break;
       }
       if (err == 0) {
-       pending_inc.crush.clear();
-       newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
        ss << "removed item id " << id << " name '" << name << "' from crush map";
        getline(ss, rs);
        wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
@@ -6978,9 +7717,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       mode = "firstn";
 
     if (osdmap.crush->rule_exists(name)) {
-      // The name is uniquely associated to a ruleid and the ruleset it contains
-      // From the user point of view, the ruleset is more meaningfull.
-      ss << "ruleset " << name << " already exists";
+      // The name is uniquely associated to a ruleid and the rule it contains
+      // From the user point of view, the rule is more meaningfull.
+      ss << "rule " << name << " already exists";
       err = 0;
       goto reply;
     }
@@ -6989,12 +7728,12 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     _get_pending_crush(newcrush);
 
     if (newcrush.rule_exists(name)) {
-      // The name is uniquely associated to a ruleid and the ruleset it contains
-      // From the user point of view, the ruleset is more meaningfull.
-      ss << "ruleset " << name << " already exists";
+      // The name is uniquely associated to a ruleid and the rule it contains
+      // From the user point of view, the rule is more meaningfull.
+      ss << "rule " << name << " already exists";
       err = 0;
     } else {
-      int ruleno = newcrush.add_simple_ruleset(name, root, type, mode,
+      int ruleno = newcrush.add_simple_rule(name, root, type, mode,
                                               pg_pool_t::TYPE_REPLICATED, &ss);
       if (ruleno < 0) {
        err = ruleno;
@@ -7152,8 +7891,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
     }
 
-    int ruleset;
-    err = crush_ruleset_create_erasure(name, profile, &ruleset, &ss);
+    int rule;
+    err = crush_rule_create_erasure(name, profile, &rule, &ss);
     if (err < 0) {
       switch(err) {
       case -EEXIST: // return immediately
@@ -7170,7 +7909,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        break;
       }
     } else {
-      ss << "created ruleset " << name << " at " << ruleset;
+      ss << "created rule " << name << " at " << rule;
     }
 
     getline(ss, rs);
@@ -7264,9 +8003,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd set-full-ratio" ||
             prefix == "osd set-backfillfull-ratio" ||
              prefix == "osd set-nearfull-ratio") {
-    if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
-      ss << "you must complete the upgrade and set require_luminous_osds before"
-        << " using the new interface";
+    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      ss << "you must complete the upgrade and set require_osd_release ="
+        << "luminous before using the new interface";
       err = -EPERM;
       goto reply;
     }
@@ -7289,17 +8028,16 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                              get_last_committed() + 1));
     return true;
   } else if (prefix == "osd set-require-min-compat-client") {
-    if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
-      ss << "you must complete the upgrade and set require_luminous_osds before"
-        << " using the new interface";
+    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      ss << "you must complete the upgrade and set require_osd_release ="
+        << "luminous before using the new interface";
       err = -EPERM;
       goto reply;
     }
     string v;
     cmd_getval(g_ceph_context, cmdmap, "version", v);
-    if (v != "luminous" && v != "kraken" && v != "jewel" && v != "infernalis" &&
-       v != "hammer" && v != "giant" && v != "firefly" && v != "emperor" &&
-       v != "dumpling" && v != "cuttlefish" && v != "bobtail" && v != "argonaut") {
+    int vno = ceph_release_from_name(v.c_str());
+    if (vno <= 0) {
       ss << "version " << v << " is not recognized";
       err = -EINVAL;
       goto reply;
@@ -7307,16 +8045,57 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     OSDMap newmap;
     newmap.deepish_copy_from(osdmap);
     newmap.apply_incremental(pending_inc);
-    newmap.require_min_compat_client = v;
-    auto mv = newmap.get_min_compat_client();
-    if (v < mv.first) {
-      ss << "osdmap current utilizes features that require " << mv
-        << "; cannot set require_min_compat_client below that to " << v;
+    newmap.require_min_compat_client = vno;
+    auto mvno = newmap.get_min_compat_client();
+    if (vno < mvno) {
+      ss << "osdmap current utilizes features that require "
+        << ceph_release_name(mvno)
+        << "; cannot set require_min_compat_client below that to "
+        << ceph_release_name(vno);
       err = -EPERM;
       goto reply;
     }
-    ss << "set require_min_compat_client to " << v;
-    pending_inc.new_require_min_compat_client = v;
+    string sure;
+    cmd_getval(g_ceph_context, cmdmap, "sure", sure);
+    if (sure != "--yes-i-really-mean-it") {
+      FeatureMap m;
+      mon->get_combined_feature_map(&m);
+      uint64_t features = ceph_release_features(vno);
+      bool first = true;
+      bool ok = true;
+      for (int type : {
+           CEPH_ENTITY_TYPE_CLIENT,
+           CEPH_ENTITY_TYPE_MDS,
+           CEPH_ENTITY_TYPE_MGR }) {
+       auto p = m.m.find(type);
+       if (p == m.m.end()) {
+         continue;
+       }
+       for (auto& q : p->second) {
+         uint64_t missing = ~q.first & features;
+         if (missing) {
+           if (first) {
+             ss << "cannot set require_min_compat_client to " << v << ": ";
+           } else {
+             ss << "; ";
+           }
+           first = false;
+           ss << q.second << " connected " << ceph_entity_type_name(type)
+              << "(s) look like " << ceph_release_name(
+                ceph_release_from_features(q.first))
+              << " (missing 0x" << std::hex << missing << std::dec << ")";
+           ok = false;
+         }
+       }
+      }
+      if (!ok) {
+       ss << "; add --yes-i-really-mean-it to do it anyway";
+       err = -EPERM;
+       goto reply;
+      }
+    }
+    ss << "set require_min_compat_client to " << ceph_release_name(vno);
+    pending_inc.new_require_min_compat_client = vno;
     getline(ss, rs);
     wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
                                                          get_last_committed() + 1));
@@ -7360,11 +8139,17 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       } else {
        ss << "not all up OSDs have OSD_BITWISE_HOBJ_SORT feature";
        err = -EPERM;
+       goto reply;
       }
     } else if (key == "require_jewel_osds") {
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_jewel_osds";
        err = -EPERM;
+       goto reply;
+      } else if (osdmap.require_osd_release >= CEPH_RELEASE_JEWEL) {
+       ss << "require_osd_release is already >= jewel";
+       err = 0;
+       goto reply;
       } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_JEWEL)) {
        return prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_JEWEL);
       } else {
@@ -7375,6 +8160,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_kraken_osds";
        err = -EPERM;
+       goto reply;
+      } else if (osdmap.require_osd_release >= CEPH_RELEASE_KRAKEN) {
+       ss << "require_osd_release is already >= kraken";
+       err = 0;
+       goto reply;
       } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_KRAKEN)) {
        bool r = prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_KRAKEN);
        // ensure JEWEL is also set
@@ -7384,20 +8174,6 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        ss << "not all up OSDs have CEPH_FEATURE_SERVER_KRAKEN feature";
        err = -EPERM;
       }
-    } else if (key == "require_luminous_osds") {
-      if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
-       ss << "the sortbitwise flag must be set before require_luminous_osds";
-       err = -EPERM;
-      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_LUMINOUS)) {
-       bool r = prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_LUMINOUS);
-       // ensure JEWEL and KRAKEN are also set
-       pending_inc.new_flags |= CEPH_OSDMAP_REQUIRE_JEWEL;
-       pending_inc.new_flags |= CEPH_OSDMAP_REQUIRE_KRAKEN;
-       return r;
-      } else {
-       ss << "not all up OSDs have CEPH_FEATURE_SERVER_LUMINOUS feature";
-       err = -EPERM;
-      }
     } else {
       ss << "unrecognized flag '" << key << "'";
       err = -EINVAL;
@@ -7438,6 +8214,43 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = -EINVAL;
     }
 
+  } else if (prefix == "osd require-osd-release") {
+    string release;
+    cmd_getval(g_ceph_context, cmdmap, "release", release);
+    if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
+      ss << "the sortbitwise flag must be set first";
+      err = -EPERM;
+      goto reply;
+    }
+    int rel = ceph_release_from_name(release.c_str());
+    if (rel <= 0) {
+      ss << "unrecognized release " << release;
+      err = -EINVAL;
+      goto reply;
+    }
+    if (rel < CEPH_RELEASE_LUMINOUS) {
+      ss << "use this command only for luminous and later";
+      err = -EINVAL;
+      goto reply;
+    }
+    if (rel == CEPH_RELEASE_LUMINOUS) {
+      if (!HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_LUMINOUS)) {
+       ss << "not all up OSDs have CEPH_FEATURE_SERVER_LUMINOUS feature";
+       err = -EPERM;
+       goto reply;
+      }
+    } else {
+      ss << "not supported for this release yet";
+      err = -EPERM;
+      goto reply;
+    }
+    if (rel < osdmap.require_osd_release) {
+      ss << "require_osd_release cannot be lowered once it has been set";
+      err = -EPERM;
+      goto reply;
+    }
+    pending_inc.new_require_osd_release = rel;
+    goto update;
   } else if (prefix == "osd cluster_snap") {
     // ** DISABLE THIS FOR NOW **
     ss << "cluster snapshot currently disabled (broken implementation)";
@@ -7449,80 +8262,406 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             prefix == "osd rm") {
 
     bool any = false;
+    bool stop = false;
+    bool verbose = true;
 
     vector<string> idvec;
     cmd_getval(g_ceph_context, cmdmap, "ids", idvec);
-    for (unsigned j = 0; j < idvec.size(); j++) {
-      long osd = parse_osd_id(idvec[j].c_str(), &ss);
-      if (osd < 0) {
-       ss << "invalid osd id" << osd;
-       err = -EINVAL;
-       continue;
-      } else if (!osdmap.exists(osd)) {
-       ss << "osd." << osd << " does not exist. ";
-       continue;
+    for (unsigned j = 0; j < idvec.size() && !stop; j++) {
+      set<int> osds;
+
+      // wildcard?
+      if (j == 0 &&
+          (idvec[0] == "any" || idvec[0] == "all" || idvec[0] == "*")) {
+        if (prefix == "osd in") {
+          // touch out osds only
+          osdmap.get_out_osds(osds);
+        } else {
+          osdmap.get_all_osds(osds);
+        }
+        stop = true;
+        verbose = false; // so the output is less noisy.
+      } else {
+        long osd = parse_osd_id(idvec[j].c_str(), &ss);
+        if (osd < 0) {
+          ss << "invalid osd id" << osd;
+          err = -EINVAL;
+          continue;
+        } else if (!osdmap.exists(osd)) {
+          ss << "osd." << osd << " does not exist. ";
+          continue;
+        }
+
+        osds.insert(osd);
       }
-      if (prefix == "osd down") {
-       if (osdmap.is_down(osd)) {
-         ss << "osd." << osd << " is already down. ";
-       } else {
-         pending_inc.new_state[osd] = CEPH_OSD_UP;
-         ss << "marked down osd." << osd << ". ";
-         any = true;
-       }
-      } else if (prefix == "osd out") {
-       if (osdmap.is_out(osd)) {
-         ss << "osd." << osd << " is already out. ";
-       } else {
-         pending_inc.new_weight[osd] = CEPH_OSD_OUT;
-         if (osdmap.osd_weight[osd]) {
-           if (pending_inc.new_xinfo.count(osd) == 0) {
-             pending_inc.new_xinfo[osd] = osdmap.osd_xinfo[osd];
+
+      for (auto &osd : osds) {
+        if (prefix == "osd down") {
+         if (osdmap.is_down(osd)) {
+            if (verbose)
+             ss << "osd." << osd << " is already down. ";
+         } else {
+            pending_inc.pending_osd_state_set(osd, CEPH_OSD_UP);
+           ss << "marked down osd." << osd << ". ";
+           any = true;
+         }
+        } else if (prefix == "osd out") {
+         if (osdmap.is_out(osd)) {
+            if (verbose)
+             ss << "osd." << osd << " is already out. ";
+         } else {
+           pending_inc.new_weight[osd] = CEPH_OSD_OUT;
+           if (osdmap.osd_weight[osd]) {
+             if (pending_inc.new_xinfo.count(osd) == 0) {
+               pending_inc.new_xinfo[osd] = osdmap.osd_xinfo[osd];
+             }
+             pending_inc.new_xinfo[osd].old_weight = osdmap.osd_weight[osd];
            }
-           pending_inc.new_xinfo[osd].old_weight = osdmap.osd_weight[osd];
+           ss << "marked out osd." << osd << ". ";
+           any = true;
          }
-         ss << "marked out osd." << osd << ". ";
-         any = true;
-       }
-      } else if (prefix == "osd in") {
-       if (osdmap.is_in(osd)) {
-         ss << "osd." << osd << " is already in. ";
-       } else {
-         if (osdmap.osd_xinfo[osd].old_weight > 0) {
-           pending_inc.new_weight[osd] = osdmap.osd_xinfo[osd].old_weight;
-           if (pending_inc.new_xinfo.count(osd) == 0) {
-             pending_inc.new_xinfo[osd] = osdmap.osd_xinfo[osd];
+        } else if (prefix == "osd in") {
+         if (osdmap.is_in(osd)) {
+            if (verbose)
+             ss << "osd." << osd << " is already in. ";
+         } else {
+           if (osdmap.osd_xinfo[osd].old_weight > 0) {
+             pending_inc.new_weight[osd] = osdmap.osd_xinfo[osd].old_weight;
+             if (pending_inc.new_xinfo.count(osd) == 0) {
+               pending_inc.new_xinfo[osd] = osdmap.osd_xinfo[osd];
+             }
+             pending_inc.new_xinfo[osd].old_weight = 0;
+           } else {
+             pending_inc.new_weight[osd] = CEPH_OSD_IN;
            }
-           pending_inc.new_xinfo[osd].old_weight = 0;
+           ss << "marked in osd." << osd << ". ";
+           any = true;
+         }
+        } else if (prefix == "osd rm") {
+          err = prepare_command_osd_remove(osd);
+
+          if (err == -EBUSY) {
+           if (any)
+             ss << ", ";
+            ss << "osd." << osd << " is still up; must be down before removal. ";
          } else {
-           pending_inc.new_weight[osd] = CEPH_OSD_IN;
+            assert(err == 0);
+           if (any) {
+             ss << ", osd." << osd;
+            } else {
+             ss << "removed osd." << osd;
+            }
+           any = true;
          }
-         ss << "marked in osd." << osd << ". ";
-         any = true;
-       }
-      } else if (prefix == "osd rm") {
-       if (osdmap.is_up(osd)) {
-         if (any)
-           ss << ", ";
-          ss << "osd." << osd << " is still up; must be down before removal. ";
-         err = -EBUSY;
-       } else {
-         pending_inc.new_state[osd] = osdmap.get_state(osd);
-          pending_inc.new_uuid[osd] = uuid_d();
-         pending_metadata_rm.insert(osd);
-         if (any) {
-           ss << ", osd." << osd;
+        }
+      }
+    }
+    if (any) {
+      getline(ss, rs);
+      wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, err, rs,
+                                               get_last_committed() + 1));
+      return true;
+    }
+  } else if (prefix == "osd add-noup" ||
+             prefix == "osd add-nodown" ||
+             prefix == "osd add-noin" ||
+             prefix == "osd add-noout") {
+
+    enum {
+      OP_NOUP,
+      OP_NODOWN,
+      OP_NOIN,
+      OP_NOOUT,
+    } option;
+
+    if (prefix == "osd add-noup") {
+      option = OP_NOUP;
+    } else if (prefix == "osd add-nodown") {
+      option = OP_NODOWN;
+    } else if (prefix == "osd add-noin") {
+      option = OP_NOIN;
+    } else {
+      option = OP_NOOUT;
+    }
+
+    bool any = false;
+    bool stop = false;
+
+    vector<string> idvec;
+    cmd_getval(g_ceph_context, cmdmap, "ids", idvec);
+    for (unsigned j = 0; j < idvec.size() && !stop; j++) {
+
+      set<int> osds;
+
+      // wildcard?
+      if (j == 0 &&
+          (idvec[0] == "any" || idvec[0] == "all" || idvec[0] == "*")) {
+        osdmap.get_all_osds(osds);
+        stop = true;
+      } else {
+        // try traditional single osd way
+
+        long osd = parse_osd_id(idvec[j].c_str(), &ss);
+        if (osd < 0) {
+          // ss has reason for failure
+          ss << ", unable to parse osd id:\"" << idvec[j] << "\". ";
+          err = -EINVAL;
+          continue;
+        }
+
+        osds.insert(osd);
+      }
+
+      for (auto &osd : osds) {
+
+        if (!osdmap.exists(osd)) {
+          ss << "osd." << osd << " does not exist. ";
+          continue;
+        }
+
+        switch (option) {
+        case OP_NOUP:
+          if (osdmap.is_up(osd)) {
+            ss << "osd." << osd << " is already up. ";
+            continue;
+          }
+
+          if (osdmap.is_noup(osd)) {
+            if (pending_inc.pending_osd_state_clear(osd, CEPH_OSD_NOUP))
+              any = true;
           } else {
-           ss << "removed osd." << osd;
+            pending_inc.pending_osd_state_set(osd, CEPH_OSD_NOUP);
+            any = true;
           }
-         any = true;
-       }
+
+          break;
+
+        case OP_NODOWN:
+          if (osdmap.is_down(osd)) {
+            ss << "osd." << osd << " is already down. ";
+            continue;
+          }
+
+          if (osdmap.is_nodown(osd)) {
+            if (pending_inc.pending_osd_state_clear(osd, CEPH_OSD_NODOWN))
+              any = true;
+          } else {
+            pending_inc.pending_osd_state_set(osd, CEPH_OSD_NODOWN);
+            any = true;
+          }
+
+          break;
+
+        case OP_NOIN:
+          if (osdmap.is_in(osd)) {
+            ss << "osd." << osd << " is already in. ";
+            continue;
+          }
+
+          if (osdmap.is_noin(osd)) {
+            if (pending_inc.pending_osd_state_clear(osd, CEPH_OSD_NOIN))
+              any = true;
+          } else {
+            pending_inc.pending_osd_state_set(osd, CEPH_OSD_NOIN);
+            any = true;
+          }
+
+          break;
+
+        case OP_NOOUT:
+          if (osdmap.is_out(osd)) {
+            ss << "osd." << osd << " is already out. ";
+            continue;
+          }
+
+          if (osdmap.is_noout(osd)) {
+            if (pending_inc.pending_osd_state_clear(osd, CEPH_OSD_NOOUT))
+              any = true;
+          } else {
+            pending_inc.pending_osd_state_set(osd, CEPH_OSD_NOOUT);
+            any = true;
+          }
+
+          break;
+
+        default:
+         assert(0 == "invalid option");
+        }
       }
     }
+
     if (any) {
       getline(ss, rs);
       wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, err, rs,
-                                               get_last_committed() + 1));
+                                 get_last_committed() + 1));
+      return true;
+    }
+  } else if (prefix == "osd rm-noup" ||
+             prefix == "osd rm-nodown" ||
+             prefix == "osd rm-noin" ||
+             prefix == "osd rm-noout") {
+
+    enum {
+      OP_NOUP,
+      OP_NODOWN,
+      OP_NOIN,
+      OP_NOOUT,
+    } option;
+
+    if (prefix == "osd rm-noup") {
+      option = OP_NOUP;
+    } else if (prefix == "osd rm-nodown") {
+      option = OP_NODOWN;
+    } else if (prefix == "osd rm-noin") {
+      option = OP_NOIN;
+    } else {
+      option = OP_NOOUT;
+    }
+
+    bool any = false;
+    bool stop = false;
+
+    vector<string> idvec;
+    cmd_getval(g_ceph_context, cmdmap, "ids", idvec);
+
+    for (unsigned j = 0; j < idvec.size() && !stop; j++) {
+
+      vector<int> osds;
+
+      // wildcard?
+      if (j == 0 &&
+          (idvec[0] == "any" || idvec[0] == "all" || idvec[0] == "*")) {
+
+        // touch previous noup/nodown/noin/noout osds only
+        switch (option) {
+        case OP_NOUP:
+          osdmap.get_noup_osds(&osds);
+          break;
+        case OP_NODOWN:
+          osdmap.get_nodown_osds(&osds);
+          break;
+        case OP_NOIN:
+          osdmap.get_noin_osds(&osds);
+          break;
+        case OP_NOOUT:
+          osdmap.get_noout_osds(&osds);
+          break;
+        default:
+          assert(0 == "invalid option");
+        }
+
+        // cancel any pending noup/nodown/noin/noout requests too
+        vector<int> pending_state_osds;
+        (void) pending_inc.get_pending_state_osds(&pending_state_osds);
+        for (auto &p : pending_state_osds) {
+
+          switch (option) {
+          case OP_NOUP:
+            if (!osdmap.is_noup(p) &&
+                pending_inc.pending_osd_state_clear(p, CEPH_OSD_NOUP)) {
+              any = true;
+            }
+            break;
+
+          case OP_NODOWN:
+            if (!osdmap.is_nodown(p) &&
+                pending_inc.pending_osd_state_clear(p, CEPH_OSD_NODOWN)) {
+              any = true;
+            }
+            break;
+
+          case OP_NOIN:
+            if (!osdmap.is_noin(p) &&
+                pending_inc.pending_osd_state_clear(p, CEPH_OSD_NOIN)) {
+              any = true;
+            }
+            break;
+
+          case OP_NOOUT:
+            if (!osdmap.is_noout(p) &&
+                pending_inc.pending_osd_state_clear(p, CEPH_OSD_NOOUT)) {
+              any = true;
+            }
+            break;
+
+          default:
+            assert(0 == "invalid option");
+          }
+        }
+
+        stop = true;
+      } else {
+        // try traditional single osd way
+
+        long osd = parse_osd_id(idvec[j].c_str(), &ss);
+        if (osd < 0) {
+          // ss has reason for failure
+          ss << ", unable to parse osd id:\"" << idvec[j] << "\". ";
+          err = -EINVAL;
+          continue;
+        }
+
+        osds.push_back(osd);
+      }
+
+      for (auto &osd : osds) {
+
+        if (!osdmap.exists(osd)) {
+          ss << "osd." << osd << " does not exist. ";
+          continue;
+        }
+
+        switch (option) {
+          case OP_NOUP:
+            if (osdmap.is_noup(osd)) {
+              pending_inc.pending_osd_state_set(osd, CEPH_OSD_NOUP);
+              any = true;
+            } else if (pending_inc.pending_osd_state_clear(
+              osd, CEPH_OSD_NOUP)) {
+              any = true;
+            }
+            break;
+
+          case OP_NODOWN:
+            if (osdmap.is_nodown(osd)) {
+              pending_inc.pending_osd_state_set(osd, CEPH_OSD_NODOWN);
+              any = true;
+            } else if (pending_inc.pending_osd_state_clear(
+              osd, CEPH_OSD_NODOWN)) {
+              any = true;
+            }
+            break;
+
+          case OP_NOIN:
+            if (osdmap.is_noin(osd)) {
+              pending_inc.pending_osd_state_set(osd, CEPH_OSD_NOIN);
+              any = true;
+            } else if (pending_inc.pending_osd_state_clear(
+              osd, CEPH_OSD_NOIN)) {
+              any = true;
+            }
+            break;
+
+          case OP_NOOUT:
+            if (osdmap.is_noout(osd)) {
+              pending_inc.pending_osd_state_set(osd, CEPH_OSD_NOOUT);
+              any = true;
+            } else if (pending_inc.pending_osd_state_clear(
+              osd, CEPH_OSD_NOOUT)) {
+              any = true;
+            }
+            break;
+
+          default:
+            assert(0 == "invalid option");
+        }
+      }
+    }
+
+    if (any) {
+      getline(ss, rs);
+      wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, err, rs,
+                                 get_last_committed() + 1));
       return true;
     }
   } else if (prefix == "osd pg-temp") {
@@ -7604,9 +8743,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    if (osdmap.require_min_compat_client.length() &&
-       osdmap.require_min_compat_client < "firefly") {
-      ss << "require_min_compat_client " << osdmap.require_min_compat_client
+    if (osdmap.require_min_compat_client > 0 &&
+       osdmap.require_min_compat_client < CEPH_RELEASE_FIREFLY) {
+      ss << "require_min_compat_client "
+        << ceph_release_name(osdmap.require_min_compat_client)
         << " < firefly, which is required for primary-temp";
       err = -EPERM;
       goto reply;
@@ -7620,13 +8760,15 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     ss << "set " << pgid << " primary_temp mapping to " << osd;
     goto update;
   } else if (prefix == "osd pg-upmap") {
-    if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
-      ss << "you must set the require_luminous_osds flag to use this feature";
+    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      ss << "you must complete the upgrade and set require_osd_release ="
+        << "luminous before using the new interface";
       err = -EPERM;
       goto reply;
     }
-    if (osdmap.require_min_compat_client < "luminous") {
-      ss << "min_compat_client " << osdmap.require_min_compat_client
+    if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
+      ss << "min_compat_client "
+        << ceph_release_name(osdmap.require_min_compat_client)
         << " < luminous, which is required for pg-upmap";
       err = -EPERM;
       goto reply;
@@ -7682,13 +8824,15 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     ss << "set " << pgid << " pg_upmap mapping to " << new_pg_upmap;
     goto update;
   } else if (prefix == "osd rm-pg-upmap") {
-    if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
-      ss << "you must set the require_luminous_osds flag to use this feature";
+    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      ss << "you must complete the upgrade and set require_osd_release ="
+        << "luminous before using the new interface";
       err = -EPERM;
       goto reply;
     }
-    if (osdmap.require_min_compat_client < "luminous") {
-      ss << "require_min_compat_client " << osdmap.require_min_compat_client
+    if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
+      ss << "require_min_compat_client "
+        << ceph_release_name(osdmap.require_min_compat_client)
         << " < luminous, which is required for pg-upmap";
       err = -EPERM;
       goto reply;
@@ -7711,11 +8855,6 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = -EINVAL;
       goto reply;
     }
-    if (!osdmap.pg_exists(pgid)) {
-      ss << "pg " << pgid << " does not exist";
-      err = -ENOENT;
-      goto reply;
-    }
     if (pending_inc.new_pg_upmap.count(pgid) ||
        pending_inc.old_pg_upmap.count(pgid)) {
       dout(10) << __func__ << " waiting for pending update on " << pgid << dendl;
@@ -7727,13 +8866,15 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     ss << "clear " << pgid << " pg_upmap mapping";
     goto update;
   } else if (prefix == "osd pg-upmap-items") {
-    if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
-      ss << "you must set the require_luminous_osds flag to use this feature";
+    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      ss << "you must complete the upgrade and set require_osd_release ="
+        << "luminous before using the new interface";
       err = -EPERM;
       goto reply;
     }
-    if (osdmap.require_min_compat_client < "luminous") {
-      ss << "require_min_compat_client " << osdmap.require_min_compat_client
+    if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
+      ss << "require_min_compat_client "
+        << ceph_release_name(osdmap.require_min_compat_client)
         << " < luminous, which is required for pg-upmap";
       err = -EPERM;
       goto reply;
@@ -7802,13 +8943,15 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     ss << "set " << pgid << " pg_upmap_items mapping to " << new_pg_upmap_items;
     goto update;
   } else if (prefix == "osd rm-pg-upmap-items") {
-    if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
-      ss << "you must set the require_luminous_osds flag to use this feature";
+    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      ss << "you must complete the upgrade and set require_osd_release ="
+        << "luminous before using the new interface";
       err = -EPERM;
       goto reply;
     }
-    if (osdmap.require_min_compat_client < "luminous") {
-      ss << "require_min_compat_client " << osdmap.require_min_compat_client
+    if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
+      ss << "require_min_compat_client "
+        << ceph_release_name(osdmap.require_min_compat_client)
         << " < luminous, which is required for pg-upmap";
       err = -EPERM;
       goto reply;
@@ -7831,11 +8974,6 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = -EINVAL;
       goto reply;
     }
-    if (!osdmap.pg_exists(pgid)) {
-      ss << "pg " << pgid << " does not exist";
-      err = -ENOENT;
-      goto reply;
-    }
     if (pending_inc.new_pg_upmap_items.count(pgid) ||
        pending_inc.old_pg_upmap_items.count(pgid)) {
       dout(10) << __func__ << " waiting for pending update on " << pgid << dendl;
@@ -7867,9 +9005,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = -EINVAL;
       goto reply;
     }
-    if (osdmap.require_min_compat_client.length() &&
-       osdmap.require_min_compat_client < "firefly") {
-      ss << "require_min_compat_client " << osdmap.require_min_compat_client
+    if (osdmap.require_min_compat_client > 0 &&
+       osdmap.require_min_compat_client < CEPH_RELEASE_FIREFLY) {
+      ss << "require_min_compat_client "
+        << ceph_release_name(osdmap.require_min_compat_client)
         << " < firefly, which is required for primary-affinity";
       err = -EPERM;
       goto reply;
@@ -7973,112 +9112,208 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       return true;
     }
 
+  } else if (prefix == "osd destroy" || prefix == "osd purge") {
+    /* Destroying an OSD means that we don't expect to further make use of
+     * the OSDs data (which may even become unreadable after this operation),
+     * and that we are okay with scrubbing all its cephx keys and config-key
+     * data (which may include lockbox keys, thus rendering the osd's data
+     * unreadable).
+     *
+     * The OSD will not be removed. Instead, we will mark it as destroyed,
+     * such that a subsequent call to `create` will not reuse the osd id.
+     * This will play into being able to recreate the OSD, at the same
+     * crush location, with minimal data movement.
+     */
+
+    // make sure authmon is writeable.
+    if (!mon->authmon()->is_writeable()) {
+      dout(10) << __func__ << " waiting for auth mon to be writeable for "
+               << "osd destroy" << dendl;
+      mon->authmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
+      return false;
+    }
+
+    int64_t id;
+    if (!cmd_getval(g_ceph_context, cmdmap, "id", id)) {
+      ss << "unable to parse osd id value '"
+         << cmd_vartype_stringify(cmdmap["id"]) << "";
+      err = -EINVAL;
+      goto reply;
+    }
+
+    bool is_destroy = (prefix == "osd destroy");
+    if (!is_destroy) {
+      assert("osd purge" == prefix);
+    }
+
+    string sure;
+    if (!cmd_getval(g_ceph_context, cmdmap, "sure", sure) ||
+        sure != "--yes-i-really-mean-it") {
+      ss << "Are you SURE? This will mean real, permanent data loss, as well "
+         << "as cephx and lockbox keys. Pass --yes-i-really-mean-it if you "
+         << "really do.";
+      err = -EPERM;
+      goto reply;
+    } else if (is_destroy && !osdmap.exists(id)) {
+      ss << "osd." << id << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    } else if (osdmap.is_up(id)) {
+      ss << "osd." << id << " is not `down`.";
+      err = -EBUSY;
+      goto reply;
+    } else if (is_destroy && osdmap.is_destroyed(id)) {
+      ss << "destroyed osd." << id;
+      err = 0;
+      goto reply;
+    }
+
+    bool goto_reply = false;
+
+    paxos->plug();
+    if (is_destroy) {
+      err = prepare_command_osd_destroy(id, ss);
+      // we checked above that it should exist.
+      assert(err != -ENOENT);
+    } else {
+      err = prepare_command_osd_purge(id, ss);
+      if (err == -ENOENT) {
+        err = 0;
+        ss << "osd." << id << " does not exist.";
+        goto_reply = true;
+      }
+    }
+    paxos->unplug();
+
+    if (err < 0 || goto_reply) {
+      goto reply;
+    }
+
+    if (is_destroy) {
+      ss << "destroyed osd." << id;
+    } else {
+      ss << "purged osd." << id;
+    }
+
+    getline(ss, rs);
+    wait_for_finished_proposal(op,
+        new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1));
+    force_immediate_propose();
+    return true;
+
+  } else if (prefix == "osd new") {
+
+    // make sure authmon is writeable.
+    if (!mon->authmon()->is_writeable()) {
+      dout(10) << __func__ << " waiting for auth mon to be writeable for "
+               << "osd destroy" << dendl;
+      mon->authmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
+      return false;
+    }
+
+    map<string,string> secrets_map;
+
+    bufferlist bl = m->get_data();
+    string secrets_json = bl.to_str();
+    dout(20) << __func__ << " osd new json = " << secrets_json << dendl;
+
+    err = get_json_str_map(secrets_json, ss, &secrets_map);
+    if (err < 0)
+      goto reply;
+
+    dout(20) << __func__ << " osd new secrets " << secrets_map << dendl;
+
+    paxos->plug();
+    err = prepare_command_osd_new(op, cmdmap, secrets_map, ss, f.get());
+    paxos->unplug();
+
+    if (err < 0) {
+      goto reply;
+    }
+
+    if (f) {
+      f->flush(rdata);
+    } else {
+      rdata.append(ss);
+    }
+
+    if (err == EEXIST) {
+      // idempotent operation
+      err = 0;
+      goto reply;
+    }
+
+    wait_for_finished_proposal(op,
+        new Monitor::C_Command(mon, op, 0, rs, rdata,
+                               get_last_committed() + 1));
+    force_immediate_propose();
+    return true;
+
   } else if (prefix == "osd create") {
-    int i = -1;
 
     // optional id provided?
-    int64_t id = -1;
-    if (cmd_getval(g_ceph_context, cmdmap, "id", id)) {
-      if (id < 0) {
-       ss << "invalid osd id value '" << id << "'";
+    int64_t id = -1, cmd_id = -1;
+    if (cmd_getval(g_ceph_context, cmdmap, "id", cmd_id)) {
+      if (cmd_id < 0) {
+       ss << "invalid osd id value '" << cmd_id << "'";
        err = -EINVAL;
        goto reply;
       }
-      dout(10) << " osd create got id " << id << dendl;
+      dout(10) << " osd create got id " << cmd_id << dendl;
     }
 
-    // optional uuid provided?
     uuid_d uuid;
     string uuidstr;
     if (cmd_getval(g_ceph_context, cmdmap, "uuid", uuidstr)) {
       if (!uuid.parse(uuidstr.c_str())) {
-       ss << "invalid uuid value '" << uuidstr << "'";
-       err = -EINVAL;
-       goto reply;
-      }
-      dout(10) << " osd create got uuid " << uuid << dendl;
-      i = osdmap.identify_osd(uuid);
-      if (i >= 0) {
-       // osd already exists
-       if (id >= 0 && i != id) {
-         ss << "uuid " << uuidstr << " already in use for different id " << i;
-         err = -EINVAL;
-         goto reply;
-       }
-       err = 0;
-       if (f) {
-         f->open_object_section("created_osd");
-         f->dump_int("osdid", i);
-         f->close_section();
-         f->flush(rdata);
-       } else {
-         ss << i;
-         rdata.append(ss);
-       }
-       goto reply;
-      }
-      // i < 0
-      if (id >= 0) {
-       if (osdmap.exists(id)) {
-         ss << "id " << id << " already in use and does not match uuid "
-            << uuid;
-         err = -EINVAL;
-         goto reply;
-       }
-       if (pending_inc.new_state.count(id)) {
-         // osd is about to exist
-         wait_for_finished_proposal(op, new C_RetryMessage(this, op));
-         return true;
-       }
-       i = id;
-      }
-      if (pending_inc.identify_osd(uuid) >= 0) {
-       // osd is about to exist
-       wait_for_finished_proposal(op, new C_RetryMessage(this, op));
-       return true;
-      }
-      if (i >= 0) {
-       // raise max_osd
-       if (osdmap.get_max_osd() <= i && pending_inc.new_max_osd <= i)
-         pending_inc.new_max_osd = i + 1;
-       goto done;
+        ss << "invalid uuid value '" << uuidstr << "'";
+        err = -EINVAL;
+        goto reply;
       }
+      // we only care about the id if we also have the uuid, to
+      // ensure the operation's idempotency.
+      id = cmd_id;
     }
 
-    // allocate a new id
-    for (i=0; i < osdmap.get_max_osd(); i++) {
-      if (!osdmap.exists(i) &&
-         pending_inc.new_up_client.count(i) == 0 &&
-         (pending_inc.new_state.count(i) == 0 ||
-          (pending_inc.new_state[i] & CEPH_OSD_EXISTS) == 0)) {
-        pending_inc.new_weight[i] = CEPH_OSD_OUT;
-       goto done;
+    int32_t new_id = -1;
+    err = prepare_command_osd_create(id, uuid, &new_id, ss);
+    if (err < 0) {
+      if (err == -EAGAIN) {
+        wait_for_finished_proposal(op, new C_RetryMessage(this, op));
+        return true;
       }
+      // a check has failed; reply to the user.
+      goto reply;
+
+    } else if (err == EEXIST) {
+      // this is an idempotent operation; we can go ahead and reply.
+      if (f) {
+        f->open_object_section("created_osd");
+        f->dump_int("osdid", new_id);
+        f->close_section();
+        f->flush(rdata);
+      } else {
+        ss << new_id;
+        rdata.append(ss);
+      }
+      err = 0;
+      goto reply;
     }
 
-    // raise max_osd
-    if (pending_inc.new_max_osd < 0)
-      pending_inc.new_max_osd = osdmap.get_max_osd() + 1;
-    else
-      pending_inc.new_max_osd++;
-    i = pending_inc.new_max_osd - 1;
-
-done:
-    dout(10) << " creating osd." << i << dendl;
-    pending_inc.new_state[i] |= CEPH_OSD_EXISTS | CEPH_OSD_NEW;
-    if (!uuid.is_zero())
-      pending_inc.new_uuid[i] = uuid;
+    do_osd_create(id, uuid, &new_id);
+
     if (f) {
       f->open_object_section("created_osd");
-      f->dump_int("osdid", i);
+      f->dump_int("osdid", new_id);
       f->close_section();
       f->flush(rdata);
     } else {
-      ss << i;
+      ss << new_id;
       rdata.append(ss);
     }
-    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs, rdata,
-                                             get_last_committed() + 1));
+    wait_for_finished_proposal(op,
+        new Monitor::C_Command(mon, op, 0, rs, rdata,
+                               get_last_committed() + 1));
     return true;
 
   } else if (prefix == "osd blacklist clear") {
@@ -8263,9 +9498,9 @@ done:
       goto reply;
     }
 
-    bool implicit_ruleset_creation = false;
-    string ruleset_name;
-    cmd_getval(g_ceph_context, cmdmap, "ruleset", ruleset_name);
+    bool implicit_rule_creation = false;
+    string rule_name;
+    cmd_getval(g_ceph_context, cmdmap, "rule", rule_name);
     string erasure_code_profile;
     cmd_getval(g_ceph_context, cmdmap, "erasure_code_profile", erasure_code_profile);
 
@@ -8291,24 +9526,24 @@ done:
          goto wait;
        }
       }
-      if (ruleset_name == "") {
-       implicit_ruleset_creation = true;
+      if (rule_name == "") {
+       implicit_rule_creation = true;
        if (erasure_code_profile == "default") {
-         ruleset_name = "erasure-code";
+         rule_name = "erasure-code";
        } else {
-         dout(1) << "implicitly use ruleset named after the pool: "
+         dout(1) << "implicitly use rule named after the pool: "
                << poolstr << dendl;
-         ruleset_name = poolstr;
+         rule_name = poolstr;
        }
       }
     } else {
-      //NOTE:for replicated pool,cmd_map will put ruleset_name to erasure_code_profile field
-      ruleset_name = erasure_code_profile;
+      //NOTE:for replicated pool,cmd_map will put rule_name to erasure_code_profile field
+      rule_name = erasure_code_profile;
     }
 
-    if (!implicit_ruleset_creation && ruleset_name != "") {
-      int ruleset;
-      err = get_crush_ruleset(ruleset_name, &ruleset, &ss);
+    if (!implicit_rule_creation && rule_name != "") {
+      int rule;
+      err = get_crush_rule(rule_name, &rule, &ss);
       if (err == -EAGAIN) {
        wait_for_finished_proposal(op, new C_RetryMessage(this, op));
        return true;
@@ -8335,7 +9570,7 @@ done:
     
     err = prepare_new_pool(poolstr, 0, // auid=0 for admin created pool
                           -1, // default crush rule
-                          ruleset_name,
+                          rule_name,
                           pg_num, pgp_num,
                           erasure_code_profile, pool_type,
                            (uint64_t)expected_num_objects,
@@ -8481,9 +9716,8 @@ done:
     // make sure new tier is empty
     string force_nonempty;
     cmd_getval(g_ceph_context, cmdmap, "force_nonempty", force_nonempty);
-    const pool_stat_t& tier_stats =
-      mon->pgmon()->pg_map.get_pg_pool_sum_stat(tierpool_id);
-    if (tier_stats.stats.sum.num_objects != 0 &&
+    const pool_stat_t *pstats = mon->pgservice->get_pool_stat(tierpool_id);
+    if (pstats && pstats->stats.sum.num_objects != 0 &&
        force_nonempty != "--force-nonempty") {
       ss << "tier pool '" << tierpoolstr << "' is not empty; --force-nonempty to force";
       err = -ENOTEMPTY;
@@ -8795,10 +10029,10 @@ done:
          mode != pg_pool_t::CACHEMODE_PROXY &&
          mode != pg_pool_t::CACHEMODE_READPROXY))) {
 
-      const pool_stat_t& tier_stats =
-        mon->pgmon()->pg_map.get_pg_pool_sum_stat(pool_id);
+      const pool_stat_t* pstats =
+        mon->pgservice->get_pool_stat(pool_id);
 
-      if (tier_stats.stats.sum.num_objects_dirty > 0) {
+      if (pstats && pstats->stats.sum.num_objects_dirty > 0) {
         ss << "unable to set cache-mode '"
            << pg_pool_t::get_cache_mode_name(mode) << "' on pool '" << poolstr
            << "': dirty objects found";
@@ -8863,9 +10097,9 @@ done:
       goto reply;
     }
     // make sure new tier is empty
-    const pool_stat_t& tier_stats =
-      mon->pgmon()->pg_map.get_pg_pool_sum_stat(tierpool_id);
-    if (tier_stats.stats.sum.num_objects != 0) {
+    const pool_stat_t *pstats =
+      mon->pgservice->get_pool_stat(tierpool_id);
+    if (pstats && pstats->stats.sum.num_objects != 0) {
       ss << "tier pool '" << tierpoolstr << "' is not empty";
       err = -ENOTEMPTY;
       goto reply;
@@ -9003,16 +10237,15 @@ done:
     cmd_getval(g_ceph_context, cmdmap, "no_increasing", no_increasing);
     string out_str;
     mempool::osdmap::map<int32_t, uint32_t> new_weights;
-    err = reweight::by_utilization(osdmap,
-                                  mon->pgmon()->pg_map,
-                                  oload,
-                                  max_change,
-                                  max_osds,
-                                  by_pg,
-                                  pools.empty() ? NULL : &pools,
-                                  no_increasing == "--no-increasing",
-                                  &new_weights,
-                                  &ss, &out_str, f.get());
+    err = mon->pgservice->reweight_by_utilization(osdmap,
+                                            oload,
+                                            max_change,
+                                            max_osds,
+                                            by_pg,
+                                            pools.empty() ? NULL : &pools,
+                                            no_increasing == "--no-increasing",
+                                            &new_weights,
+                                            &ss, &out_str, f.get());
     if (err >= 0) {
       dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
     }