]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/OSDMonitor.cc
import ceph 15.2.14
[ceph.git] / ceph / src / mon / OSDMonitor.cc
index 64907018b3e176e690417b9c7636620bef0b6be8..877ce53a6fd4a288d2e9d6ae1421119563274161 100644 (file)
@@ -39,6 +39,7 @@
 #include "messages/MOSDBeacon.h"
 #include "messages/MOSDFailure.h"
 #include "messages/MOSDMarkMeDown.h"
+#include "messages/MOSDMarkMeDead.h"
 #include "messages/MOSDFull.h"
 #include "messages/MOSDMap.h"
 #include "messages/MMonGetOSDMap.h"
@@ -55,6 +56,8 @@
 #include "messages/MRemoveSnaps.h"
 #include "messages/MOSDScrub.h"
 #include "messages/MRoute.h"
+#include "messages/MMonGetPurgedSnaps.h"
+#include "messages/MMonGetPurgedSnapsReply.h"
 
 #include "common/TextTable.h"
 #include "common/Timer.h"
@@ -93,6 +96,38 @@ static const string OSD_PG_CREATING_PREFIX("osd_pg_creating");
 static const string OSD_METADATA_PREFIX("osd_metadata");
 static const string OSD_SNAP_PREFIX("osd_snap");
 
+/*
+
+  OSD snapshot metadata
+  ---------------------
+
+  -- starting with mimic, removed in octopus --
+
+  "removed_epoch_%llu_%08lx" % (pool, epoch)
+   -> interval_set<snapid_t>
+
+  "removed_snap_%llu_%016llx" % (pool, last_snap)
+   -> { first_snap, end_snap, epoch }   (last_snap = end_snap - 1)
+
+
+  -- starting with mimic --
+
+  "purged_snap_%llu_%016llx" % (pool, last_snap)
+   -> { first_snap, end_snap, epoch }   (last_snap = end_snap - 1)
+
+  - note that the {removed,purged}_snap put the last snap in they key so
+    that we can use forward iteration only to search for an epoch in an
+    interval.  e.g., to test if epoch N is removed/purged, we'll find a key
+    >= N that either does or doesn't contain the given snap.
+
+
+  -- starting with octopus --
+
+  "purged_epoch_%08lx" % epoch
+  -> map<int64_t,interval_set<snapid_t>>
+
+  */
+using namespace TOPNSPC::common;
 namespace {
 
 struct OSDMemCache : public PriorityCache::PriCache {
@@ -343,6 +378,19 @@ epoch_t LastEpochClean::get_lower_bound(const OSDMap& latest) const
   return floor;
 }
 
+void LastEpochClean::dump(Formatter *f) const
+{
+  f->open_array_section("per_pool");
+
+  for (auto& it : report_by_pool) {
+    f->open_object_section("pool");
+    f->dump_unsigned("poolid", it.first);
+    f->dump_unsigned("floor", it.second.floor);
+    f->close_section();
+  }
+
+  f->close_section();
+}
 
 class C_UpdateCreatingPGs : public Context {
 public:
@@ -589,19 +637,19 @@ void OSDMonitor::create_initial()
   if (newmap.nearfull_ratio > 1.0) newmap.nearfull_ratio /= 100;
 
   // new cluster should require latest by default
-  if (g_conf().get_val<bool>("mon_debug_no_require_nautilus")) {
-    if (g_conf()->mon_debug_no_require_mimic) {
-      derr << __func__ << " mon_debug_no_require_mimic=true and nautilus=true" << dendl;
-      newmap.require_osd_release = CEPH_RELEASE_LUMINOUS;
+  if (g_conf().get_val<bool>("mon_debug_no_require_octopus")) {
+    if (g_conf().get_val<bool>("mon_debug_no_require_nautilus")) {
+      derr << __func__ << " mon_debug_no_require_octopus and nautilus=true" << dendl;
+      newmap.require_osd_release = ceph_release_t::mimic;
     } else {
-      derr << __func__ << " mon_debug_no_require_nautilus=true" << dendl;
-      newmap.require_osd_release = CEPH_RELEASE_MIMIC;
+      derr << __func__ << " mon_debug_no_require_octopus=true" << dendl;
+      newmap.require_osd_release = ceph_release_t::nautilus;
     }
   } else {
-    newmap.require_osd_release = CEPH_RELEASE_NAUTILUS;
-    int r = ceph_release_from_name(
-      g_conf()->mon_osd_initial_require_min_compat_client.c_str());
-    if (r <= 0) {
+    newmap.require_osd_release = ceph_release_t::octopus;
+    ceph_release_t r = ceph_release_from_name(
+      g_conf()->mon_osd_initial_require_min_compat_client);
+    if (!r) {
       ceph_abort_msg("mon_osd_initial_require_min_compat_client is not valid");
     }
     newmap.require_min_compat_client = r;
@@ -1036,6 +1084,7 @@ void OSDMonitor::create_pending()
   pending_inc.fsid = mon->monmap->fsid;
   pending_metadata.clear();
   pending_metadata_rm.clear();
+  pending_pseudo_purged_snaps.clear();
 
   dout(10) << "create_pending e " << pending_inc.epoch << dendl;
 
@@ -1178,8 +1227,10 @@ OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc,
       const pg_t pgid{ps, static_cast<uint64_t>(poolid)};
       // NOTE: use the *current* epoch as the PG creation epoch so that the
       // OSD does not have to generate a long set of PastIntervals.
-      pending_creatings.pgs.emplace(pgid, make_pair(inc.epoch,
-                                                   p->second.modified));
+      pending_creatings.pgs.emplace(
+       pgid,
+       creating_pgs_t::pg_create_info(inc.epoch,
+                                      p->second.modified));
       dout(10) << __func__ << " adding " << pgid << dendl;
     }
     p->second.start = end;
@@ -1194,6 +1245,89 @@ OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc,
   }
   dout(10) << __func__ << " queue remaining: " << pending_creatings.queue.size()
           << " pools" << dendl;
+
+  if (mon->monmap->min_mon_release >= ceph_release_t::octopus) {
+    // walk creating pgs' history and past_intervals forward
+    for (auto& i : pending_creatings.pgs) {
+      // this mirrors PG::start_peering_interval()
+      pg_t pgid = i.first;
+
+      // this is a bit imprecise, but sufficient?
+      struct min_size_predicate_t : public IsPGRecoverablePredicate {
+       const pg_pool_t *pi;
+       bool operator()(const set<pg_shard_t> &have) const {
+         return have.size() >= pi->min_size;
+       }
+       explicit min_size_predicate_t(const pg_pool_t *i) : pi(i) {}
+      } min_size_predicate(nextmap.get_pg_pool(pgid.pool()));
+
+      vector<int> up, acting;
+      int up_primary, acting_primary;
+      nextmap.pg_to_up_acting_osds(
+       pgid, &up, &up_primary, &acting, &acting_primary);
+      if (i.second.history.epoch_created == 0) {
+       // new pg entry, set it up
+       i.second.up = up;
+       i.second.acting = acting;
+       i.second.up_primary = up_primary;
+       i.second.acting_primary = acting_primary;
+       i.second.history = pg_history_t(i.second.create_epoch,
+                                       i.second.create_stamp);
+       dout(10) << __func__ << "  pg " << pgid << " just added, "
+                << " up " << i.second.up
+                << " p " << i.second.up_primary
+                << " acting " << i.second.acting
+                << " p " << i.second.acting_primary
+                << " history " << i.second.history
+                << " past_intervals " << i.second.past_intervals
+                << dendl;
+     } else {
+       std::stringstream debug;
+       if (PastIntervals::check_new_interval(
+             i.second.acting_primary, acting_primary,
+             i.second.acting, acting,
+             i.second.up_primary, up_primary,
+             i.second.up, up,
+             i.second.history.same_interval_since,
+             i.second.history.last_epoch_clean,
+             &nextmap,
+             &osdmap,
+             pgid,
+             min_size_predicate,
+             &i.second.past_intervals,
+             &debug)) {
+         epoch_t e = inc.epoch;
+         i.second.history.same_interval_since = e;
+         if (i.second.up != up) {
+           i.second.history.same_up_since = e;
+         }
+         if (i.second.acting_primary != acting_primary) {
+           i.second.history.same_primary_since = e;
+         }
+         if (pgid.is_split(
+               osdmap.get_pg_num(pgid.pool()),
+               nextmap.get_pg_num(pgid.pool()),
+               nullptr)) {
+           i.second.history.last_epoch_split = e;
+         }
+         dout(10) << __func__ << "  pg " << pgid << " new interval,"
+                  << " up " << i.second.up << " -> " << up
+                  << " p " << i.second.up_primary << " -> " << up_primary
+                  << " acting " << i.second.acting << " -> " << acting
+                  << " p " << i.second.acting_primary << " -> "
+                  << acting_primary
+                  << " history " << i.second.history
+                  << " past_intervals " << i.second.past_intervals
+                  << dendl;
+         dout(20) << "  debug: " << debug.str() << dendl;
+         i.second.up = up;
+         i.second.acting = acting;
+         i.second.up_primary = up_primary;
+         i.second.acting_primary = acting_primary;
+       }
+      }
+    }
+  }
   dout(10) << __func__
           << " " << (pending_creatings.pgs.size() - total)
           << "/" << pending_creatings.pgs.size()
@@ -1369,7 +1503,7 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   // finalize up pending_inc
   pending_inc.modified = ceph_clock_now();
 
-  int r = pending_inc.propagate_snaps_to_tiers(cct, osdmap);
+  int r = pending_inc.propagate_base_properties_to_tiers(cct, osdmap);
   ceph_assert(r == 0);
 
   if (mapping_job) {
@@ -1411,14 +1545,16 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     pending_inc.new_last_up_change = pending_inc.modified;
   }
   for (auto& i : pending_inc.new_weight) {
-    if (i.first > osdmap.max_osd) {
+    if (i.first >= osdmap.max_osd) {
       if (i.second) {
        // new osd is already marked in
        pending_inc.new_last_in_change = pending_inc.modified;
+        break;
       }
     } else if (!!i.second != !!osdmap.osd_weight[i.first]) {
       // existing osd marked in or out
       pending_inc.new_last_in_change = pending_inc.modified;
+      break;
     }
   }
 
@@ -1437,7 +1573,8 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
       // which is obviously the hard part TBD..
       vector<pg_t> pgs_to_check;
       tmp.get_upmap_pgs(&pgs_to_check);
-      if (pgs_to_check.size() < g_conf()->mon_clean_pg_upmaps_per_chunk * 2) {
+      if (pgs_to_check.size() <
+         static_cast<uint64_t>(g_conf()->mon_clean_pg_upmaps_per_chunk * 2)) {
         // not enough pgs, do it inline
         tmp.clean_pg_upmaps(cct, &pending_inc);
       } else {
@@ -1451,12 +1588,18 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     // process the pool flag removal below in the same osdmap epoch.
     auto pending_creatings = update_pending_pgs(pending_inc, tmp);
     bufferlist creatings_bl;
-    encode(pending_creatings, creatings_bl);
+    uint64_t features = CEPH_FEATURES_ALL;
+    if (mon->monmap->min_mon_release < ceph_release_t::octopus) {
+      dout(20) << __func__ << " encoding pending pgs without octopus features"
+              << dendl;
+      features &= ~CEPH_FEATURE_SERVER_OCTOPUS;
+    }
+    encode(pending_creatings, creatings_bl, features);
     t->put(OSD_PG_CREATING_PREFIX, "creating", creatings_bl);
 
     // remove any old (or incompat) POOL_CREATING flags
     for (auto& i : tmp.get_pools()) {
-      if (tmp.require_osd_release < CEPH_RELEASE_NAUTILUS) {
+      if (tmp.require_osd_release < ceph_release_t::nautilus) {
        // pre-nautilus OSDMaps shouldn't get this flag.
        if (pending_inc.new_pools.count(i.first)) {
          pending_inc.new_pools[i.first].flags &= ~pg_pool_t::FLAG_CREATING;
@@ -1473,15 +1616,6 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
       }
     }
 
-    // remove any legacy osdmap nearfull/full flags
-    {
-      if (tmp.test_flag(CEPH_OSDMAP_FULL | CEPH_OSDMAP_NEARFULL)) {
-       dout(10) << __func__ << " clearing legacy osdmap nearfull/full flag"
-                << dendl;
-       remove_flag(CEPH_OSDMAP_NEARFULL);
-       remove_flag(CEPH_OSDMAP_FULL);
-      }
-    }
     // collect which pools are currently affected by
     // the near/backfill/full osd(s),
     // and set per-pool near/backfill/full flag instead
@@ -1660,80 +1794,17 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     }
 
     // min_compat_client?
-    if (tmp.require_min_compat_client == 0) {
+    if (!tmp.require_min_compat_client) {
       auto mv = tmp.get_min_compat_client();
       dout(1) << __func__ << " setting require_min_compat_client to currently "
-             << "required " << ceph_release_name(mv) << dendl;
+             << "required " << mv << dendl;
       mon->clog->info() << "setting require_min_compat_client to currently "
-                       << "required " << ceph_release_name(mv);
+                       << "required " << mv;
       pending_inc.new_require_min_compat_client = mv;
     }
 
-    // upgrade to mimic?
-    if (osdmap.require_osd_release < CEPH_RELEASE_MIMIC &&
-       tmp.require_osd_release >= CEPH_RELEASE_MIMIC) {
-      dout(10) << __func__ << " first mimic+ epoch" << dendl;
-      // record this epoch as the deletion for all legacy removed_snaps
-      for (auto& p : tmp.get_pools()) {
-       // update every pool
-       if (pending_inc.new_pools.count(p.first) == 0) {
-         pending_inc.new_pools[p.first] = p.second;
-       }
-       auto& pi = pending_inc.new_pools[p.first];
-       if (pi.snap_seq == 0) {
-         // no snaps on this pool
-         continue;
-       }
-       if ((pi.flags & (pg_pool_t::FLAG_SELFMANAGED_SNAPS |
-                        pg_pool_t::FLAG_POOL_SNAPS)) == 0) {
-         if (!pi.removed_snaps.empty()) {
-           pi.flags |= pg_pool_t::FLAG_SELFMANAGED_SNAPS;
-         } else {
-           pi.flags |= pg_pool_t::FLAG_POOL_SNAPS;
-         }
-       }
-
-       // Make all previously removed snaps appear to be removed in this
-       // epoch.  this populates removed_snaps_queue.  The OSD will subtract
-       // off its purged_snaps, as before, and this set will shrink over the
-       // following epochs as the purged snaps are reported back through the
-       // mgr.
-       OSDMap::snap_interval_set_t removed;
-       if (!p.second.removed_snaps.empty()) {
-         // different flavor of interval_set :(
-         for (auto q = p.second.removed_snaps.begin();
-              q != p.second.removed_snaps.end();
-              ++q) {
-           removed.insert(q.get_start(), q.get_len());
-         }
-       } else {
-         for (snapid_t s = 1; s <= pi.get_snap_seq(); s = s + 1) {
-           if (pi.snaps.count(s) == 0) {
-             removed.insert(s);
-           }
-         }
-       }
-       pending_inc.new_removed_snaps[p.first].union_of(removed);
-
-       dout(10) << __func__ << " converting pool " << p.first
-                << " with " << p.second.removed_snaps.size()
-                << " legacy removed_snaps" << dendl;
-       string k = make_snap_epoch_key(p.first, pending_inc.epoch);
-       bufferlist v;
-       encode(p.second.removed_snaps, v);
-       t->put(OSD_SNAP_PREFIX, k, v);
-       for (auto q = p.second.removed_snaps.begin();
-            q != p.second.removed_snaps.end();
-            ++q) {
-         bufferlist v;
-         string k = make_snap_key_value(p.first, q.get_start(),
-                                        q.get_len(), pending_inc.epoch, &v);
-         t->put(OSD_SNAP_PREFIX, k, v);
-       }
-      }
-    }
-    if (osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS &&
-       tmp.require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+    if (osdmap.require_osd_release < ceph_release_t::nautilus &&
+       tmp.require_osd_release >= ceph_release_t::nautilus) {
       dout(10) << __func__ << " first nautilus+ epoch" << dendl;
       // add creating flags?
       for (auto& i : tmp.get_pools()) {
@@ -1754,6 +1825,89 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
        pending_inc.old_blacklist.push_back(i.first);
       }
     }
+
+    if (osdmap.require_osd_release < ceph_release_t::octopus &&
+       tmp.require_osd_release >= ceph_release_t::octopus) {
+      dout(10) << __func__ << " first octopus+ epoch" << dendl;
+
+      // adjust obsoleted cache modes
+      for (auto& [poolid, pi] : tmp.pools) {
+       if (pi.cache_mode == pg_pool_t::CACHEMODE_FORWARD) {
+         if (pending_inc.new_pools.count(poolid) == 0) {
+           pending_inc.new_pools[poolid] = pi;
+         }
+         dout(10) << __func__ << " switching pool " << poolid
+                  << " cachemode from forward -> proxy" << dendl;
+         pending_inc.new_pools[poolid].cache_mode = pg_pool_t::CACHEMODE_PROXY;
+       }
+       if (pi.cache_mode == pg_pool_t::CACHEMODE_READFORWARD) {
+         if (pending_inc.new_pools.count(poolid) == 0) {
+           pending_inc.new_pools[poolid] = pi;
+         }
+         dout(10) << __func__ << " switching pool " << poolid
+                  << " cachemode from readforward -> readproxy" << dendl;
+         pending_inc.new_pools[poolid].cache_mode =
+           pg_pool_t::CACHEMODE_READPROXY;
+       }
+      }
+
+      // clear removed_snaps for every pool
+      for (auto& [poolid, pi] : tmp.pools) {
+       if (pi.removed_snaps.empty()) {
+         continue;
+       }
+       if (pending_inc.new_pools.count(poolid) == 0) {
+         pending_inc.new_pools[poolid] = pi;
+       }
+       dout(10) << __func__ << " clearing pool " << poolid << " removed_snaps"
+                << dendl;
+       pending_inc.new_pools[poolid].removed_snaps.clear();
+      }
+
+      // create a combined purged snap epoch key for all purged snaps
+      // prior to this epoch, and store it in the current epoch (i.e.,
+      // the last pre-octopus epoch, just prior to the one we're
+      // encoding now).
+      auto it = mon->store->get_iterator(OSD_SNAP_PREFIX);
+      it->lower_bound("purged_snap_");
+      map<int64_t,snap_interval_set_t> combined;
+      while (it->valid()) {
+       if (it->key().find("purged_snap_") != 0) {
+         break;
+       }
+       string k = it->key();
+       long long unsigned pool;
+       int n = sscanf(k.c_str(), "purged_snap_%llu_", &pool);
+       if (n != 1) {
+         derr << __func__ << " invalid purged_snaps key '" << k << "'" << dendl;
+       } else {
+         bufferlist v = it->value();
+         auto p = v.cbegin();
+         snapid_t begin, end;
+         ceph::decode(begin, p);
+         ceph::decode(end, p);
+         combined[pool].insert(begin, end - begin);
+       }
+       it->next();
+      }
+      if (!combined.empty()) {
+       string k = make_purged_snap_epoch_key(pending_inc.epoch - 1);
+       bufferlist v;
+       ceph::encode(combined, v);
+       t->put(OSD_SNAP_PREFIX, k, v);
+       dout(10) << __func__ << " recording pre-octopus purged_snaps in epoch "
+                << (pending_inc.epoch - 1) << ", " << v.length() << " bytes"
+                << dendl;
+      } else {
+       dout(10) << __func__ << " there were no pre-octopus purged snaps"
+                << dendl;
+      }
+
+      // clean out the old removed_snap_ and removed_epoch keys
+      // ('`' is ASCII '_' + 1)
+      t->erase_range(OSD_SNAP_PREFIX, "removed_snap_", "removed_snap`");
+      t->erase_range(OSD_SNAP_PREFIX, "removed_epoch_", "removed_epoch`");
+    }
   }
 
   // tell me about it
@@ -1761,8 +1915,17 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
        i != pending_inc.new_state.end();
        ++i) {
     int s = i->second ? i->second : CEPH_OSD_UP;
-    if (s & CEPH_OSD_UP)
+    if (s & CEPH_OSD_UP) {
       dout(2) << " osd." << i->first << " DOWN" << dendl;
+      // Reset laggy parameters if failure interval exceeds a threshold.
+      const osd_xinfo_t& xi = osdmap.get_xinfo(i->first);
+      if ((xi.laggy_probability || xi.laggy_interval) && xi.down_stamp.sec()) {
+        int last_failure_interval = pending_inc.modified.sec() - xi.down_stamp.sec();
+        if (grace_interval_threshold_exceeded(last_failure_interval)) {
+          set_default_laggy_params(i->first);
+        }
+      }
+    }
     if (s & CEPH_OSD_EXISTS)
       dout(2) << " osd." << i->first << " DNE" << dendl;
   }
@@ -1796,7 +1959,7 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     // determine appropriate features
     features = tmp.get_encoding_features();
     dout(10) << __func__ << " encoding full map with "
-            << ceph_release_name(tmp.require_osd_release)
+            << tmp.require_osd_release
             << " features " << features << dendl;
 
     // the features should be a subset of the mon quorum's features!
@@ -1836,35 +1999,29 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   pending_metadata.clear();
   pending_metadata_rm.clear();
 
-  // removed_snaps
-  if (tmp.require_osd_release >= CEPH_RELEASE_MIMIC) {
-    for (auto& i : pending_inc.new_removed_snaps) {
-      {
-       // all snaps removed this epoch
-       string k = make_snap_epoch_key(i.first, pending_inc.epoch);
-       bufferlist v;
-       encode(i.second, v);
-       t->put(OSD_SNAP_PREFIX, k, v);
-      }
-      for (auto q = i.second.begin();
-          q != i.second.end();
-          ++q) {
-       bufferlist v;
-       string k = make_snap_key_value(i.first, q.get_start(),
-                                      q.get_len(), pending_inc.epoch, &v);
-       t->put(OSD_SNAP_PREFIX, k, v);
-      }
+  // purged_snaps
+  if (tmp.require_osd_release >= ceph_release_t::octopus &&
+      !pending_inc.new_purged_snaps.empty()) {
+    // all snaps purged this epoch (across all pools)
+    string k = make_purged_snap_epoch_key(pending_inc.epoch);
+    bufferlist v;
+    encode(pending_inc.new_purged_snaps, v);
+    t->put(OSD_SNAP_PREFIX, k, v);
+  }
+  for (auto& i : pending_inc.new_purged_snaps) {
+    for (auto q = i.second.begin();
+        q != i.second.end();
+        ++q) {
+      insert_purged_snap_update(i.first, q.get_start(), q.get_end(),
+                               pending_inc.epoch,
+                               t);
     }
-    for (auto& i : pending_inc.new_purged_snaps) {
-      for (auto q = i.second.begin();
-          q != i.second.end();
-          ++q) {
-       bufferlist v;
-       string k = make_snap_purged_key_value(i.first, q.get_start(),
-                                             q.get_len(), pending_inc.epoch,
-                                             &v);
-       t->put(OSD_SNAP_PREFIX, k, v);
-      }
+  }
+  for (auto& [pool, snaps] : pending_pseudo_purged_snaps) {
+    for (auto snap : snaps) {
+      insert_purged_snap_update(pool, snap, snap + 1,
+                               pending_inc.epoch,
+                               t);
     }
   }
 
@@ -2067,7 +2224,8 @@ epoch_t OSDMonitor::get_min_last_epoch_clean() const
   // also scan osd epochs
   // don't trim past the oldest reported osd epoch
   for (auto& osd_epoch : osd_epochs) {
-    if (osd_epoch.second < floor) {
+    if (osd_epoch.second < floor &&
+        osdmap.is_in(osd_epoch.first)) {
       floor = osd_epoch.second;
     }
   }
@@ -2472,6 +2630,8 @@ bool OSDMonitor::preprocess_query(MonOpRequestRef op)
     // damp updates
   case MSG_OSD_MARK_ME_DOWN:
     return preprocess_mark_me_down(op);
+  case MSG_OSD_MARK_ME_DEAD:
+    return preprocess_mark_me_dead(op);
   case MSG_OSD_FULL:
     return preprocess_full(op);
   case MSG_OSD_FAILURE:
@@ -2495,6 +2655,9 @@ bool OSDMonitor::preprocess_query(MonOpRequestRef op)
   case MSG_REMOVE_SNAPS:
     return preprocess_remove_snaps(op);
 
+  case MSG_MON_GET_PURGED_SNAPS:
+    return preprocess_get_purged_snaps(op);
+
   default:
     ceph_abort();
     return true;
@@ -2511,6 +2674,8 @@ bool OSDMonitor::prepare_update(MonOpRequestRef op)
     // damp updates
   case MSG_OSD_MARK_ME_DOWN:
     return prepare_mark_me_down(op);
+  case MSG_OSD_MARK_ME_DEAD:
+    return prepare_mark_me_dead(op);
   case MSG_OSD_FULL:
     return prepare_full(op);
   case MSG_OSD_FAILURE:
@@ -2580,7 +2745,7 @@ bool OSDMonitor::should_propose(double& delay)
 bool OSDMonitor::preprocess_get_osdmap(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MMonGetOSDMap *m = static_cast<MMonGetOSDMap*>(op->get_req());
+  auto m = op->get_req<MMonGetOSDMap>();
 
   uint64_t features = mon->get_quorum_con_features();
   if (op->get_session() && op->get_session()->con_features)
@@ -2642,7 +2807,7 @@ bool OSDMonitor::check_source(MonOpRequestRef op, uuid_d fsid) {
 bool OSDMonitor::preprocess_failure(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
+  auto m = op->get_req<MOSDFailure>();
   // who is target_osd
   int badboy = m->get_target_osd();
 
@@ -2721,7 +2886,7 @@ public:
 
   void _finish(int r) override {
     if (r == 0) {
-      MOSDMarkMeDown *m = static_cast<MOSDMarkMeDown*>(op->get_req());
+      auto m = op->get_req<MOSDMarkMeDown>();
       osdmon->mon->send_reply(
         op,
         new MOSDMarkMeDown(
@@ -2743,7 +2908,7 @@ public:
 bool OSDMonitor::preprocess_mark_me_down(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDMarkMeDown *m = static_cast<MOSDMarkMeDown*>(op->get_req());
+  auto m = op->get_req<MOSDMarkMeDown>();
   int from = m->target_osd;
 
   // check permissions
@@ -2782,7 +2947,7 @@ bool OSDMonitor::preprocess_mark_me_down(MonOpRequestRef op)
 bool OSDMonitor::prepare_mark_me_down(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDMarkMeDown *m = static_cast<MOSDMarkMeDown*>(op->get_req());
+  auto m = op->get_req<MOSDMarkMeDown>();
   int target_osd = m->target_osd;
 
   ceph_assert(osdmap.is_up(target_osd));
@@ -2795,6 +2960,62 @@ bool OSDMonitor::prepare_mark_me_down(MonOpRequestRef op)
   return true;
 }
 
+bool OSDMonitor::preprocess_mark_me_dead(MonOpRequestRef op)
+{
+  op->mark_osdmon_event(__func__);
+  auto m = op->get_req<MOSDMarkMeDead>();
+  int from = m->target_osd;
+
+  // check permissions
+  if (check_source(op, m->fsid)) {
+    mon->no_reply(op);
+    return true;
+  }
+
+  // first, verify the reporting host is valid
+  if (!m->get_orig_source().is_osd()) {
+    mon->no_reply(op);
+    return true;
+  }
+
+  if (!osdmap.exists(from) ||
+      !osdmap.is_down(from)) {
+    dout(5) << __func__ << " from nonexistent or up osd." << from
+           << ", ignoring" << dendl;
+    send_incremental(op, m->get_epoch()+1);
+    mon->no_reply(op);
+    return true;
+  }
+
+  return false;
+}
+
+bool OSDMonitor::prepare_mark_me_dead(MonOpRequestRef op)
+{
+  op->mark_osdmon_event(__func__);
+  auto m = op->get_req<MOSDMarkMeDead>();
+  int target_osd = m->target_osd;
+
+  ceph_assert(osdmap.is_down(target_osd));
+
+  mon->clog->info() << "osd." << target_osd << " marked itself dead as of e"
+                   << m->get_epoch();
+  if (!pending_inc.new_xinfo.count(target_osd)) {
+    pending_inc.new_xinfo[target_osd] = osdmap.osd_xinfo[target_osd];
+  }
+  pending_inc.new_xinfo[target_osd].dead_epoch = m->get_epoch();
+  wait_for_finished_proposal(
+    op,
+    new LambdaContext(
+      [op, this] (int r) {
+       if (r >= 0) {
+         mon->no_reply(op);      // ignore on success
+       }
+      }
+      ));
+  return true;
+}
+
 bool OSDMonitor::can_mark_down(int i)
 {
   if (osdmap.is_nodown(i)) {
@@ -2878,16 +3099,72 @@ bool OSDMonitor::can_mark_in(int i)
 bool OSDMonitor::check_failures(utime_t now)
 {
   bool found_failure = false;
-  for (map<int,failure_info_t>::iterator p = failure_info.begin();
-       p != failure_info.end();
-       ++p) {
-    if (can_mark_down(p->first)) {
-      found_failure |= check_failure(now, p->first, p->second);
+  auto p = failure_info.begin();
+  while (p != failure_info.end()) {
+    auto& [target_osd, fi] = *p;
+    if (can_mark_down(target_osd) &&
+       check_failure(now, target_osd, fi)) {
+      found_failure = true;
+      ++p;
+    } else if (is_failure_stale(now, fi)) {
+      dout(10) << " dropping stale failure_info for osd." << target_osd
+              << " from " << fi.reporters.size() << " reporters"
+              << dendl;
+      p = failure_info.erase(p);
+    } else {
+      ++p;
     }
   }
   return found_failure;
 }
 
+utime_t OSDMonitor::get_grace_time(utime_t now,
+                                  int target_osd,
+                                  failure_info_t& fi) const
+{
+  utime_t orig_grace(g_conf()->osd_heartbeat_grace, 0);
+  if (!g_conf()->mon_osd_adjust_heartbeat_grace) {
+    return orig_grace;
+  }
+  utime_t grace = orig_grace;
+  double halflife = (double)g_conf()->mon_osd_laggy_halflife;
+  double decay_k = ::log(.5) / halflife;
+
+  // scale grace period based on historical probability of 'lagginess'
+  // (false positive failures due to slowness).
+  const osd_xinfo_t& xi = osdmap.get_xinfo(target_osd);
+  const utime_t failed_for = now - fi.get_failed_since();
+  double decay = exp((double)failed_for * decay_k);
+  dout(20) << " halflife " << halflife << " decay_k " << decay_k
+          << " failed_for " << failed_for << " decay " << decay << dendl;
+  double my_grace = decay * (double)xi.laggy_interval * xi.laggy_probability;
+  grace += my_grace;
+
+  // consider the peers reporting a failure a proxy for a potential
+  // 'subcluster' over the overall cluster that is similarly
+  // laggy.  this is clearly not true in all cases, but will sometimes
+  // help us localize the grace correction to a subset of the system
+  // (say, a rack with a bad switch) that is unhappy.
+  double peer_grace = 0;
+  for (auto& [reporter, report] : fi.reporters) {
+    if (osdmap.exists(reporter)) {
+      const osd_xinfo_t& xi = osdmap.get_xinfo(reporter);
+      utime_t elapsed = now - xi.down_stamp;
+      double decay = exp((double)elapsed * decay_k);
+      peer_grace += decay * (double)xi.laggy_interval * xi.laggy_probability;
+    }
+  }
+  peer_grace /= (double)fi.reporters.size();
+  grace += peer_grace;
+  dout(10) << " osd." << target_osd << " has "
+          << fi.reporters.size() << " reporters, "
+          << grace << " grace (" << orig_grace << " + " << my_grace
+          << " + " << peer_grace << "), max_failed_since " << fi.get_failed_since()
+          << dendl;
+
+  return grace;
+}
+
 bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
 {
   // already pending failure?
@@ -2899,32 +3176,6 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
 
   set<string> reporters_by_subtree;
   auto reporter_subtree_level = g_conf().get_val<string>("mon_osd_reporter_subtree_level");
-  utime_t orig_grace(g_conf()->osd_heartbeat_grace, 0);
-  utime_t max_failed_since = fi.get_failed_since();
-  utime_t failed_for = now - max_failed_since;
-
-  utime_t grace = orig_grace;
-  double my_grace = 0, peer_grace = 0;
-  double decay_k = 0;
-  if (g_conf()->mon_osd_adjust_heartbeat_grace) {
-    double halflife = (double)g_conf()->mon_osd_laggy_halflife;
-    decay_k = ::log(.5) / halflife;
-
-    // scale grace period based on historical probability of 'lagginess'
-    // (false positive failures due to slowness).
-    const osd_xinfo_t& xi = osdmap.get_xinfo(target_osd);
-    double decay = exp((double)failed_for * decay_k);
-    dout(20) << " halflife " << halflife << " decay_k " << decay_k
-            << " failed_for " << failed_for << " decay " << decay << dendl;
-    my_grace = decay * (double)xi.laggy_interval * xi.laggy_probability;
-    grace += my_grace;
-  }
-
-  // consider the peers reporting a failure a proxy for a potential
-  // 'subcluster' over the overall cluster that is similarly
-  // laggy.  this is clearly not true in all cases, but will sometimes
-  // help us localize the grace correction to a subset of the system
-  // (say, a rack with a bad switch) that is unhappy.
   ceph_assert(fi.reporters.size());
   for (auto p = fi.reporters.begin(); p != fi.reporters.end();) {
     // get the parent bucket whose type matches with "reporter_subtree_level".
@@ -2937,32 +3188,18 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
       } else {
         reporters_by_subtree.insert(iter->second);
       }
-      if (g_conf()->mon_osd_adjust_heartbeat_grace) {
-        const osd_xinfo_t& xi = osdmap.get_xinfo(p->first);
-        utime_t elapsed = now - xi.down_stamp;
-        double decay = exp((double)elapsed * decay_k);
-        peer_grace += decay * (double)xi.laggy_interval * xi.laggy_probability;
-      }
       ++p;
     } else {
       fi.cancel_report(p->first);;
       p = fi.reporters.erase(p);
     }
   }
-  
-  if (g_conf()->mon_osd_adjust_heartbeat_grace) {
-    peer_grace /= (double)fi.reporters.size();
-    grace += peer_grace;
+  if (reporters_by_subtree.size() < g_conf().get_val<uint64_t>("mon_osd_min_down_reporters")) {
+    return false;
   }
-
-  dout(10) << " osd." << target_osd << " has "
-          << fi.reporters.size() << " reporters, "
-          << grace << " grace (" << orig_grace << " + " << my_grace
-          << " + " << peer_grace << "), max_failed_since " << max_failed_since
-          << dendl;
-
-  if (failed_for >= grace &&
-      reporters_by_subtree.size() >= g_conf().get_val<uint64_t>("mon_osd_min_down_reporters")) {
+  const utime_t failed_for = now - fi.get_failed_since();
+  const utime_t grace = get_grace_time(now, target_osd, fi);
+  if (failed_for >= grace) {
     dout(1) << " we have enough reporters to mark osd." << target_osd
            << " down" << dendl;
     pending_inc.new_state[target_osd] = CEPH_OSD_UP;
@@ -2980,6 +3217,17 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
   return false;
 }
 
+bool OSDMonitor::is_failure_stale(utime_t now, failure_info_t& fi) const
+{
+  // if it takes too long to either cancel the report to mark the osd down,
+  // some reporters must have failed to cancel their reports. let's just
+  // forget these reports.
+  const utime_t failed_for = now - fi.get_failed_since();
+  auto heartbeat_grace = cct->_conf.get_val<int64_t>("osd_heartbeat_grace");
+  auto heartbeat_stale = cct->_conf.get_val<int64_t>("osd_heartbeat_stale");
+  return failed_for >= (heartbeat_grace + heartbeat_stale);
+}
+
 void OSDMonitor::force_failure(int target_osd, int by)
 {
   // already pending failure?
@@ -2991,6 +3239,10 @@ void OSDMonitor::force_failure(int target_osd, int by)
 
   dout(1) << " we're forcing failure of osd." << target_osd << dendl;
   pending_inc.new_state[target_osd] = CEPH_OSD_UP;
+  if (!pending_inc.new_xinfo.count(target_osd)) {
+    pending_inc.new_xinfo[target_osd] = osdmap.osd_xinfo[target_osd];
+  }
+  pending_inc.new_xinfo[target_osd].dead_epoch = pending_inc.epoch;
 
   mon->clog->info() << "osd." << target_osd << " failed ("
                    << osdmap.crush->get_full_location_ordered_string(target_osd)
@@ -3001,7 +3253,7 @@ void OSDMonitor::force_failure(int target_osd, int by)
 bool OSDMonitor::prepare_failure(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
+  auto m = op->get_req<MOSDFailure>();
   dout(1) << "prepare_failure osd." << m->get_target_osd()
          << " " << m->get_target_addrs()
          << " from " << m->get_orig_source()
@@ -3032,11 +3284,7 @@ bool OSDMonitor::prepare_failure(MonOpRequestRef op)
                      << m->get_orig_source();
 
     failure_info_t& fi = failure_info[target_osd];
-    MonOpRequestRef old_op = fi.add_report(reporter, failed_since, op);
-    if (old_op) {
-      mon->no_reply(old_op);
-    }
-
+    fi.add_report(reporter, failed_since, op);
     return check_failure(now, target_osd, fi);
   } else {
     // remove the report
@@ -3045,10 +3293,7 @@ bool OSDMonitor::prepare_failure(MonOpRequestRef op)
                       << m->get_orig_source();
     if (failure_info.count(target_osd)) {
       failure_info_t& fi = failure_info[target_osd];
-      MonOpRequestRef report_op = fi.cancel_report(reporter);
-      if (report_op) {
-        mon->no_reply(report_op);
-      }
+      fi.cancel_report(reporter);
       if (fi.reporters.empty()) {
        dout(10) << " removing last failure_info for osd." << target_osd
                 << dendl;
@@ -3103,13 +3348,46 @@ void OSDMonitor::take_all_failures(list<MonOpRequestRef>& ls)
   failure_info.clear();
 }
 
+int OSDMonitor::get_grace_interval_threshold()
+{
+  int halflife = g_conf()->mon_osd_laggy_halflife;
+  // Scale the halflife period (default: 1_hr) by
+  // a factor (48) to calculate the threshold.
+  int grace_threshold_factor = 48;
+  return halflife * grace_threshold_factor;
+}
+
+bool OSDMonitor::grace_interval_threshold_exceeded(int last_failed_interval)
+{
+  int grace_interval_threshold_secs = get_grace_interval_threshold();
+  if (last_failed_interval > grace_interval_threshold_secs) {
+    dout(1) << " last_failed_interval " << last_failed_interval
+            << " > grace_interval_threshold_secs " << grace_interval_threshold_secs
+            << dendl;
+    return true;
+  }
+  return false;
+}
+
+void OSDMonitor::set_default_laggy_params(int target_osd)
+{
+  if (pending_inc.new_xinfo.count(target_osd) == 0) {
+    pending_inc.new_xinfo[target_osd] = osdmap.osd_xinfo[target_osd];
+  }
+  osd_xinfo_t& xi = pending_inc.new_xinfo[target_osd];
+  xi.down_stamp = pending_inc.modified;
+  xi.laggy_probability = 0.0;
+  xi.laggy_interval = 0;
+  dout(20) << __func__ << " reset laggy, now xi " << xi << dendl;
+}
+
 
 // boot --
 
 bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDBoot *m = static_cast<MOSDBoot*>(op->get_req());
+  auto m = op->get_req<MOSDBoot>();
   int from = m->get_orig_source_inst().name.num();
 
   // check permissions, ignore if failed (no response expected)
@@ -3164,18 +3442,18 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
     }
   }
 
-  // make sure upgrades stop at nautilus
-  if (HAVE_FEATURE(m->osd_features, SERVER_O) &&
-      osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS) {
-    mon->clog->info() << "disallowing boot of post-nautilus OSD "
+  // make sure osd versions do not span more than 3 releases
+  if (HAVE_FEATURE(m->osd_features, SERVER_OCTOPUS) &&
+      osdmap.require_osd_release < ceph_release_t::mimic) {
+    mon->clog->info() << "disallowing boot of octopus+ OSD "
                      << m->get_orig_source_inst()
-                     << " because require_osd_release < nautilus";
+                     << " because require_osd_release < mimic";
     goto ignore;
   }
 
   // The release check here is required because for OSD_PGLOG_HARDLIMIT,
   // we are reusing a jewel feature bit that was retired in luminous.
-  if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
+  if (osdmap.require_osd_release >= ceph_release_t::luminous &&
       osdmap.test_flag(CEPH_OSDMAP_PGLOG_HARDLIMIT) &&
       !(m->osd_features & CEPH_FEATURE_OSD_PGLOG_HARDLIMIT)) {
     mon->clog->info() << "disallowing boot of OSD "
@@ -3232,7 +3510,7 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
 bool OSDMonitor::prepare_boot(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDBoot *m = static_cast<MOSDBoot*>(op->get_req());
+  auto m = op->get_req<MOSDBoot>();
   dout(7) << __func__ << " from " << m->get_source()
          << " sb " << m->sb
          << " client_addrs" << m->get_connection()->get_peer_addrs()
@@ -3329,7 +3607,9 @@ bool OSDMonitor::prepare_boot(MonOpRequestRef op)
        pair<epoch_t,epoch_t>(begin, end);
     }
 
-    osd_xinfo_t xi = osdmap.get_xinfo(from);
+    if (pending_inc.new_xinfo.count(from) == 0)
+      pending_inc.new_xinfo[from] = osdmap.osd_xinfo[from];
+    osd_xinfo_t& xi = pending_inc.new_xinfo[from];
     if (m->boot_epoch == 0) {
       xi.laggy_probability *= (1.0 - g_conf()->mon_osd_laggy_weight);
       xi.laggy_interval *= (1.0 - g_conf()->mon_osd_laggy_weight);
@@ -3364,8 +3644,8 @@ bool OSDMonitor::prepare_boot(MonOpRequestRef op)
        (g_conf()->mon_osd_auto_mark_new_in && (oldstate & CEPH_OSD_NEW)) ||
        (g_conf()->mon_osd_auto_mark_in)) {
       if (can_mark_in(from)) {
-       if (osdmap.osd_xinfo[from].old_weight > 0) {
-         pending_inc.new_weight[from] = osdmap.osd_xinfo[from].old_weight;
+       if (xi.old_weight > 0) {
+         pending_inc.new_weight[from] = xi.old_weight;
          xi.old_weight = 0;
        } else {
          pending_inc.new_weight[from] = CEPH_OSD_IN;
@@ -3376,8 +3656,6 @@ bool OSDMonitor::prepare_boot(MonOpRequestRef op)
       }
     }
 
-    pending_inc.new_xinfo[from] = xi;
-
     // wait
     wait_for_finished_proposal(op, new C_Booted(this, op));
   }
@@ -3387,7 +3665,7 @@ bool OSDMonitor::prepare_boot(MonOpRequestRef op)
 void OSDMonitor::_booted(MonOpRequestRef op, bool logit)
 {
   op->mark_osdmon_event(__func__);
-  MOSDBoot *m = static_cast<MOSDBoot*>(op->get_req());
+  auto m = op->get_req<MOSDBoot>();
   dout(7) << "_booted " << m->get_orig_source_inst() 
          << " w " << m->sb.weight << " from " << m->sb.current_epoch << dendl;
 
@@ -3406,7 +3684,7 @@ void OSDMonitor::_booted(MonOpRequestRef op, bool logit)
 bool OSDMonitor::preprocess_full(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDFull *m = static_cast<MOSDFull*>(op->get_req());
+  auto m = op->get_req<MOSDFull>();
   int from = m->get_orig_source().num();
   set<string> state;
   unsigned mask = CEPH_OSD_NEARFULL | CEPH_OSD_BACKFILLFULL | CEPH_OSD_FULL;
@@ -3457,7 +3735,7 @@ bool OSDMonitor::preprocess_full(MonOpRequestRef op)
 bool OSDMonitor::prepare_full(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  const MOSDFull *m = static_cast<MOSDFull*>(op->get_req());
+  auto m = op->get_req<MOSDFull>();
   const int from = m->get_orig_source().num();
 
   const unsigned mask = CEPH_OSD_NEARFULL | CEPH_OSD_BACKFILLFULL | CEPH_OSD_FULL;
@@ -3498,7 +3776,7 @@ bool OSDMonitor::prepare_full(MonOpRequestRef op)
 bool OSDMonitor::preprocess_alive(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDAlive *m = static_cast<MOSDAlive*>(op->get_req());
+  auto m = op->get_req<MOSDAlive>();
   int from = m->get_orig_source().num();
 
   // check permissions, ignore if failed
@@ -3537,7 +3815,7 @@ bool OSDMonitor::preprocess_alive(MonOpRequestRef op)
 bool OSDMonitor::prepare_alive(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDAlive *m = static_cast<MOSDAlive*>(op->get_req());
+  auto m = op->get_req<MOSDAlive>();
   int from = m->get_orig_source().num();
 
   if (0) {  // we probably don't care much about these
@@ -3565,7 +3843,7 @@ void OSDMonitor::_reply_map(MonOpRequestRef op, epoch_t e)
 bool OSDMonitor::preprocess_pg_created(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  auto m = static_cast<MOSDPGCreated*>(op->get_req());
+  auto m  = op->get_req<MOSDPGCreated>();
   dout(10) << __func__ << " " << *m << dendl;
   auto session = op->get_session();
   mon->no_reply(op);
@@ -3585,7 +3863,7 @@ bool OSDMonitor::preprocess_pg_created(MonOpRequestRef op)
 bool OSDMonitor::prepare_pg_created(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  auto m = static_cast<MOSDPGCreated*>(op->get_req());
+  auto m = op->get_req<MOSDPGCreated>();
   dout(10) << __func__ << " " << *m << dendl;
   auto src = m->get_orig_source();
   auto from = src.num();
@@ -3603,7 +3881,7 @@ bool OSDMonitor::prepare_pg_created(MonOpRequestRef op)
 bool OSDMonitor::preprocess_pg_ready_to_merge(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  auto m = static_cast<MOSDPGReadyToMerge*>(op->get_req());
+  auto m = op->get_req<MOSDPGReadyToMerge>();
   dout(10) << __func__ << " " << *m << dendl;
   const pg_pool_t *pi;
   auto session = op->get_session();
@@ -3643,7 +3921,7 @@ bool OSDMonitor::preprocess_pg_ready_to_merge(MonOpRequestRef op)
 bool OSDMonitor::prepare_pg_ready_to_merge(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  auto m = static_cast<MOSDPGReadyToMerge*>(op->get_req());
+  auto m  = op->get_req<MOSDPGReadyToMerge>();
   dout(10) << __func__ << " " << *m << dendl;
   pg_pool_t p;
   if (pending_inc.new_pools.count(m->pgid.pool()))
@@ -3704,7 +3982,7 @@ bool OSDMonitor::prepare_pg_ready_to_merge(MonOpRequestRef op)
 
 bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op)
 {
-  MOSDPGTemp *m = static_cast<MOSDPGTemp*>(op->get_req());
+  auto m = op->get_req<MOSDPGTemp>();
   dout(10) << "preprocess_pgtemp " << *m << dendl;
   mempool::osdmap::vector<int> empty;
   int from = m->get_orig_source().num();
@@ -3792,6 +4070,7 @@ bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op)
   return true;
 
  ignore:
+  mon->no_reply(op);
   return true;
 }
 
@@ -3811,7 +4090,7 @@ void OSDMonitor::update_up_thru(int from, epoch_t up_thru)
 bool OSDMonitor::prepare_pgtemp(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MOSDPGTemp *m = static_cast<MOSDPGTemp*>(op->get_req());
+  auto m = op->get_req<MOSDPGTemp>();
   int from = m->get_orig_source().num();
   dout(7) << "prepare_pgtemp e" << m->map_epoch << " from " << m->get_orig_source_inst() << dendl;
   for (map<pg_t,vector<int32_t> >::iterator p = m->pg_temp.begin(); p != m->pg_temp.end(); ++p) {
@@ -3850,7 +4129,7 @@ bool OSDMonitor::prepare_pgtemp(MonOpRequestRef op)
 bool OSDMonitor::preprocess_remove_snaps(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MRemoveSnaps *m = static_cast<MRemoveSnaps*>(op->get_req());
+  auto m = op->get_req<MRemoveSnaps>();
   dout(7) << "preprocess_remove_snaps " << *m << dendl;
 
   // check privilege, ignore if failed
@@ -3872,7 +4151,8 @@ bool OSDMonitor::preprocess_remove_snaps(MonOpRequestRef op)
        q != m->snaps.end();
        ++q) {
     if (!osdmap.have_pg_pool(q->first)) {
-      dout(10) << " ignoring removed_snaps " << q->second << " on non-existent pool " << q->first << dendl;
+      dout(10) << " ignoring removed_snaps " << q->second
+              << " on non-existent pool " << q->first << dendl;
       continue;
     }
     const pg_pool_t *pi = osdmap.get_pg_pool(q->first);
@@ -3880,11 +4160,18 @@ bool OSDMonitor::preprocess_remove_snaps(MonOpRequestRef op)
         p != q->second.end();
         ++p) {
       if (*p > pi->get_snap_seq() ||
-         !pi->removed_snaps.contains(*p))
+         !_is_removed_snap(q->first, *p)) {
        return false;
+      }
     }
   }
 
+  if (HAVE_FEATURE(m->get_connection()->get_features(), SERVER_OCTOPUS)) {
+    auto reply = make_message<MRemoveSnaps>();
+    reply->snaps = m->snaps;
+    mon->send_reply(op, reply.detach());
+  }
+
  ignore:
   return true;
 }
@@ -3892,40 +4179,99 @@ bool OSDMonitor::preprocess_remove_snaps(MonOpRequestRef op)
 bool OSDMonitor::prepare_remove_snaps(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MRemoveSnaps *m = static_cast<MRemoveSnaps*>(op->get_req());
+  auto m = op->get_req<MRemoveSnaps>();
   dout(7) << "prepare_remove_snaps " << *m << dendl;
 
-  for (map<int, vector<snapid_t> >::iterator p = m->snaps.begin();
-       p != m->snaps.end();
-       ++p) {
-
-    if (!osdmap.have_pg_pool(p->first)) {
-      dout(10) << " ignoring removed_snaps " << p->second << " on non-existent pool " << p->first << dendl;
+  for (auto& [pool, snaps] : m->snaps) {
+    if (!osdmap.have_pg_pool(pool)) {
+      dout(10) << " ignoring removed_snaps " << snaps
+              << " on non-existent pool " << pool << dendl;
       continue;
     }
 
-    pg_pool_t& pi = osdmap.pools[p->first];
-    for (vector<snapid_t>::iterator q = p->second.begin();
-        q != p->second.end();
-        ++q) {
-      if (!pi.removed_snaps.contains(*q) &&
-         (!pending_inc.new_pools.count(p->first) ||
-          !pending_inc.new_pools[p->first].removed_snaps.contains(*q))) {
-       pg_pool_t *newpi = pending_inc.get_new_pool(p->first, &pi);
-       newpi->removed_snaps.insert(*q);
+    pg_pool_t& pi = osdmap.pools[pool];
+    for (auto s : snaps) {
+      if (!_is_removed_snap(pool, s) &&
+         (!pending_inc.new_pools.count(pool) ||
+          !pending_inc.new_pools[pool].removed_snaps.contains(s)) &&
+         (!pending_inc.new_removed_snaps.count(pool) ||
+          !pending_inc.new_removed_snaps[pool].contains(s))) {
+       pg_pool_t *newpi = pending_inc.get_new_pool(pool, &pi);
+       if (osdmap.require_osd_release < ceph_release_t::octopus) {
+         newpi->removed_snaps.insert(s);
+         dout(10) << " pool " << pool << " removed_snaps added " << s
+                  << " (now " << newpi->removed_snaps << ")" << dendl;
+       }
        newpi->flags |= pg_pool_t::FLAG_SELFMANAGED_SNAPS;
-       dout(10) << " pool " << p->first << " removed_snaps added " << *q
-                << " (now " << newpi->removed_snaps << ")" << dendl;
-       if (*q > newpi->get_snap_seq()) {
-         dout(10) << " pool " << p->first << " snap_seq "
-                  << newpi->get_snap_seq() << " -> " << *q << dendl;
-         newpi->set_snap_seq(*q);
+       if (s > newpi->get_snap_seq()) {
+         dout(10) << " pool " << pool << " snap_seq "
+                  << newpi->get_snap_seq() << " -> " << s << dendl;
+         newpi->set_snap_seq(s);
        }
        newpi->set_snap_epoch(pending_inc.epoch);
-       pending_inc.new_removed_snaps[p->first].insert(*q);
+       dout(10) << " added pool " << pool << " snap " << s
+                << " to removed_snaps queue" << dendl;
+       pending_inc.new_removed_snaps[pool].insert(s);
+      }
+    }
+  }
+
+  if (HAVE_FEATURE(m->get_connection()->get_features(), SERVER_OCTOPUS)) {
+    auto reply = make_message<MRemoveSnaps>();
+    reply->snaps = m->snaps;
+    wait_for_finished_proposal(op, new C_ReplyOp(this, op, reply));
+  }
+
+  return true;
+}
+
+bool OSDMonitor::preprocess_get_purged_snaps(MonOpRequestRef op)
+{
+  op->mark_osdmon_event(__func__);
+  auto m = op->get_req<MMonGetPurgedSnaps>();
+  dout(7) << __func__ << " " << *m << dendl;
+
+  map<epoch_t,mempool::osdmap::map<int64_t,snap_interval_set_t>> r;
+
+  string k = make_purged_snap_epoch_key(m->start);
+  auto it = mon->store->get_iterator(OSD_SNAP_PREFIX);
+  it->upper_bound(k);
+  unsigned long epoch = m->last;
+  while (it->valid()) {
+    if (it->key().find("purged_epoch_") != 0) {
+      break;
+    }
+    string k = it->key();
+    int n = sscanf(k.c_str(), "purged_epoch_%lx", &epoch);
+    if (n != 1) {
+      derr << __func__ << " unable to parse key '" << it->key() << "'" << dendl;
+    } else if (epoch > m->last) {
+      break;
+    } else {
+      bufferlist bl = it->value();
+      auto p = bl.cbegin();
+      auto &v = r[epoch];
+      try {
+       ceph::decode(v, p);
+      } catch (buffer::error& e) {
+       derr << __func__ << " unable to parse value for key '" << it->key()
+            << "': \n";
+       bl.hexdump(*_dout);
+       *_dout << dendl;
       }
+      n += 4 + v.size() * 16;
+    }
+    if (n > 1048576) {
+      // impose a semi-arbitrary limit to message size
+      break;
     }
+    it->next();
   }
+
+  auto reply = make_message<MMonGetPurgedSnapsReply>(m->start, epoch);
+  reply->purged_snaps.swap(r);
+  mon->send_reply(op, reply.detach());
+
   return true;
 }
 
@@ -3954,7 +4300,7 @@ bool OSDMonitor::preprocess_beacon(MonOpRequestRef op)
 bool OSDMonitor::prepare_beacon(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  const auto beacon = static_cast<MOSDBeacon*>(op->get_req());
+  const auto beacon = op->get_req<MOSDBeacon>();
   const auto src = beacon->get_orig_source();
   dout(10) << __func__ << " " << *beacon
           << " from " << src << dendl;
@@ -3978,7 +4324,18 @@ bool OSDMonitor::prepare_beacon(MonOpRequestRef op)
   for (const auto& pg : beacon->pgs) {
     last_epoch_clean.report(pg, beacon->min_last_epoch_clean);
   }
-  return false;
+
+  if (osdmap.osd_xinfo[from].last_purged_snaps_scrub <
+      beacon->last_purged_snaps_scrub) {
+    if (pending_inc.new_xinfo.count(from) == 0) {
+      pending_inc.new_xinfo[from] = osdmap.osd_xinfo[from];
+    }
+    pending_inc.new_xinfo[from].last_purged_snaps_scrub =
+      beacon->last_purged_snaps_scrub;
+    return true;
+  } else {
+    return false;
+  }
 }
 
 // ---------------
@@ -4091,9 +4448,6 @@ void OSDMonitor::send_incremental(epoch_t first,
     m->oldest_map = get_first_committed();
     m->newest_map = osdmap.get_epoch();
 
-    // share removed snaps during the gap
-    get_removed_snaps_range(first, m->oldest_map, &m->gap_removed_snaps);
-
     first = get_first_committed();
     bufferlist bl;
     int err = get_version_full(first, features, bl);
@@ -4133,28 +4487,6 @@ void OSDMonitor::send_incremental(epoch_t first,
   }
 }
 
-void OSDMonitor::get_removed_snaps_range(
-  epoch_t start, epoch_t end,
-  mempool::osdmap::map<int64_t,OSDMap::snap_interval_set_t> *gap_removed_snaps)
-{
-  // we only care about pools that exist now.
-  for (auto& p : osdmap.get_pools()) {
-    auto& t = (*gap_removed_snaps)[p.first];
-    for (epoch_t epoch = start; epoch < end; ++epoch) {
-      string k = make_snap_epoch_key(p.first, epoch);
-      bufferlist v;
-      mon->store->get(OSD_SNAP_PREFIX, k, v);
-      if (v.length()) {
-       auto q = v.cbegin();
-       OSDMap::snap_interval_set_t snaps;
-       decode(snaps, q);
-       t.union_of(snaps);
-      }
-    }
-    dout(10) << __func__ << " " << p.first << " " << t << dendl;
-  }
-}
-
 int OSDMonitor::get_version(version_t ver, bufferlist& bl)
 {
   return get_version(ver, mon->get_quorum_con_features(), bl);
@@ -4376,7 +4708,7 @@ epoch_t OSDMonitor::blacklist(const entity_addrvec_t& av, utime_t until)
 {
   dout(10) << "blacklist " << av << " until " << until << dendl;
   for (auto a : av.v) {
-    if (osdmap.require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+    if (osdmap.require_osd_release >= ceph_release_t::nautilus) {
       a.set_type(entity_addr_t::TYPE_ANY);
     } else {
       a.set_type(entity_addr_t::TYPE_LEGACY);
@@ -4388,7 +4720,7 @@ epoch_t OSDMonitor::blacklist(const entity_addrvec_t& av, utime_t until)
 
 epoch_t OSDMonitor::blacklist(entity_addr_t a, utime_t until)
 {
-  if (osdmap.require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+  if (osdmap.require_osd_release >= ceph_release_t::nautilus) {
     a.set_type(entity_addr_t::TYPE_ANY);
   } else {
     a.set_type(entity_addr_t::TYPE_LEGACY);
@@ -4467,14 +4799,15 @@ void OSDMonitor::check_pg_creates_sub(Subscription *sub)
 void OSDMonitor::do_application_enable(int64_t pool_id,
                                        const std::string &app_name,
                                       const std::string &app_key,
-                                      const std::string &app_value)
+                                      const std::string &app_value,
+                                      bool force)
 {
   ceph_assert(paxos->is_plugged() && is_writeable());
 
   dout(20) << __func__ << ": pool_id=" << pool_id << ", app_name=" << app_name
            << dendl;
 
-  ceph_assert(osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS);
+  ceph_assert(osdmap.require_osd_release >= ceph_release_t::luminous);
 
   auto pp = osdmap.get_pg_pool(pool_id);
   ceph_assert(pp != nullptr);
@@ -4487,7 +4820,11 @@ void OSDMonitor::do_application_enable(int64_t pool_id,
   if (app_key.empty()) {
     p.application_metadata.insert({app_name, {}});
   } else {
-    p.application_metadata.insert({app_name, {{app_key, app_value}}});
+    if (force) {
+      p.application_metadata[app_name][app_key] = app_value;
+    } else {
+      p.application_metadata.insert({app_name, {{app_key, app_value}}});
+    }
   }
   p.last_change = pending_inc.epoch;
   pending_inc.new_pools[pool_id] = p;
@@ -4556,7 +4893,7 @@ void OSDMonitor::update_creating_pgs()
               << dendl;
       continue;
     }
-    auto mapped = pg.second.first;
+    auto mapped = pg.second.create_epoch;
     dout(20) << __func__ << " looking up " << pgid << "@" << mapped << dendl;
     spg_t spgid(pgid);
     mapping.get_primary_and_shard(pgid, &acting_primary, &spgid);
@@ -4609,7 +4946,7 @@ epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next) cons
   MOSDPGCreate *oldm = nullptr; // for pre-mimic OSD compat
   MOSDPGCreate2 *m = nullptr;
 
-  bool old = osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS;
+  bool old = osdmap.require_osd_release < ceph_release_t::nautilus;
 
   epoch_t last = 0;
   for (auto epoch_pgs = creating_pgs_by_epoch->second.lower_bound(next);
@@ -4629,16 +4966,23 @@ epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next) cons
          oldm = new MOSDPGCreate(creating_pgs_epoch);
        }
        oldm->mkpg.emplace(pg.pgid,
-                          pg_create_t{create->second.first, pg.pgid, 0});
-       oldm->ctimes.emplace(pg.pgid, create->second.second);
+                          pg_create_t{create->second.create_epoch, pg.pgid, 0});
+       oldm->ctimes.emplace(pg.pgid, create->second.create_stamp);
       } else {
        if (!m) {
          m = new MOSDPGCreate2(creating_pgs_epoch);
        }
-       m->pgs.emplace(pg, create->second);
+       m->pgs.emplace(pg, make_pair(create->second.create_epoch,
+                                    create->second.create_stamp));
+       if (create->second.history.epoch_created) {
+         dout(20) << __func__ << "   " << pg << " " << create->second.history
+                  << " " << create->second.past_intervals << dendl;
+         m->pg_extra.emplace(pg, make_pair(create->second.history,
+                                           create->second.past_intervals));
+       }
       }
       dout(20) << __func__ << " will create " << pg
-              << " at " << create->second.first << dendl;
+              << " at " << create->second.create_epoch << dendl;
     }
   }
   if (m) {
@@ -4667,6 +5011,28 @@ void OSDMonitor::tick()
   // always update osdmap manifest, regardless of being the leader.
   load_osdmap_manifest();
 
+  // always tune priority cache manager memory on leader and peons
+  if (ceph_using_tcmalloc() && mon_memory_autotune) {
+    std::lock_guard l(balancer_lock);
+    if (pcm != nullptr) {
+      pcm->tune_memory();
+      pcm->balance();
+      _set_new_cache_sizes();
+      dout(10) << "tick balancer "
+               << " inc cache_bytes: " << inc_cache->get_cache_bytes()
+               << " inc comtd_bytes: " << inc_cache->get_committed_size()
+               << " inc used_bytes: " << inc_cache->_get_used_bytes()
+               << " inc num_osdmaps: " << inc_cache->_get_num_osdmaps()
+               << dendl;
+      dout(10) << "tick balancer "
+               << " full cache_bytes: " << full_cache->get_cache_bytes()
+               << " full comtd_bytes: " << full_cache->get_committed_size()
+               << " full used_bytes: " << full_cache->_get_used_bytes()
+               << " full num_osdmaps: " << full_cache->_get_num_osdmaps()
+               << dendl;
+    }
+  }
+
   if (!mon->is_leader()) return;
 
   bool do_propose = false;
@@ -4799,27 +5165,6 @@ void OSDMonitor::tick()
   if (do_propose ||
       !pending_inc.new_pg_temp.empty())  // also propose if we adjusted pg_temp
     propose_pending();
-
-  {
-    std::lock_guard l(balancer_lock);
-    if (ceph_using_tcmalloc() && mon_memory_autotune && pcm != nullptr) {
-      pcm->tune_memory();
-      pcm->balance();
-      _set_new_cache_sizes();
-      dout(10) << "tick balancer "
-               << " inc cache_bytes: " << inc_cache->get_cache_bytes()
-               << " inc comtd_bytes: " << inc_cache->get_committed_size()
-               << " inc used_bytes: " << inc_cache->_get_used_bytes()
-               << " inc num_osdmaps: " << inc_cache->_get_num_osdmaps()
-               << dendl;
-      dout(10) << "tick balancer "
-               << " full cache_bytes: " << full_cache->get_cache_bytes()
-               << " full comtd_bytes: " << full_cache->get_committed_size()
-               << " full used_bytes: " << full_cache->_get_used_bytes()
-               << " full num_osdmaps: " << full_cache->_get_num_osdmaps()
-               << dendl;
-    }
-  }
 }
 
 void OSDMonitor::_set_new_cache_sizes()
@@ -4918,6 +5263,24 @@ void OSDMonitor::dump_info(Formatter *f)
   }
   f->close_section();
 
+  f->open_object_section("osdmap_clean_epochs");
+  f->dump_unsigned("min_last_epoch_clean", get_min_last_epoch_clean());
+
+  f->open_object_section("last_epoch_clean");
+  last_epoch_clean.dump(f);
+  f->close_section();
+
+  f->open_array_section("osd_epochs");
+  for (auto& osd_epoch : osd_epochs) {
+    f->open_object_section("osd");
+    f->dump_unsigned("id", osd_epoch.first);
+    f->dump_unsigned("epoch", osd_epoch.second);
+    f->close_section();
+  }
+  f->close_section(); // osd_epochs
+
+  f->close_section(); // osd_clean_epochs
+
   f->dump_unsigned("osdmap_first_committed", get_first_committed());
   f->dump_unsigned("osdmap_last_committed", get_last_committed());
 
@@ -4970,7 +5333,7 @@ namespace {
 bool OSDMonitor::preprocess_command(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+  auto m = op->get_req<MMonCommand>();
   int r = 0;
   bufferlist rdata;
   stringstream ss, ds;
@@ -4990,10 +5353,10 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   }
 
   string prefix;
-  cmd_getval(cct, cmdmap, "prefix", prefix);
+  cmd_getval(cmdmap, "prefix", prefix);
 
   string format;
-  cmd_getval(cct, cmdmap, "format", format, string("plain"));
+  cmd_getval(cmdmap, "format", format, string("plain"));
   boost::scoped_ptr<Formatter> f(Formatter::create(format));
 
   if (prefix == "osd stat") {
@@ -5013,12 +5376,13 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
           prefix == "osd ls" ||
           prefix == "osd getmap" ||
           prefix == "osd getcrushmap" ||
-          prefix == "osd ls-tree") {
+          prefix == "osd ls-tree" ||
+          prefix == "osd info") {
     string val;
 
     epoch_t epoch = 0;
     int64_t epochnum;
-    cmd_getval(cct, cmdmap, "epoch", epochnum, (int64_t)osdmap.get_epoch());
+    cmd_getval(cmdmap, "epoch", epochnum, (int64_t)osdmap.get_epoch());
     epoch = epochnum;
     
     bufferlist osdmap_bl;
@@ -5080,10 +5444,38 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
        }
       }
       rdata.append(ds);
+    } else if (prefix == "osd info") {
+      int64_t osd_id;
+      bool do_single_osd = true;
+      if (!cmd_getval(cmdmap, "id", osd_id)) {
+       do_single_osd = false;
+      }
+
+      if (do_single_osd && !osdmap.exists(osd_id)) {
+       ss << "osd." << osd_id << " does not exist";
+       r = -EINVAL;
+       goto reply;
+      }
+
+      if (f) {
+       if (do_single_osd) {
+         osdmap.dump_osd(osd_id, f.get());
+       } else {
+         osdmap.dump_osds(f.get());
+       }
+       f->flush(ds);
+      } else {
+       if (do_single_osd) {
+         osdmap.print_osd(osd_id, ds);
+       } else {
+         osdmap.print_osds(ds);
+       }
+      }
+      rdata.append(ds);
     } else if (prefix == "osd tree" || prefix == "osd tree-from") {
       string bucket;
       if (prefix == "osd tree-from") {
-        cmd_getval(cct, cmdmap, "bucket", bucket);
+        cmd_getval(cmdmap, "bucket", bucket);
         if (!osdmap.crush->name_exists(bucket)) {
           ss << "bucket '" << bucket << "' does not exist";
           r = -ENOENT;
@@ -5098,7 +5490,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       }
 
       vector<string> states;
-      cmd_getval(cct, cmdmap, "states", states);
+      cmd_getval(cmdmap, "states", states);
       unsigned filter = 0;
       for (auto& s : states) {
        if (s == "up") {
@@ -5150,7 +5542,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       ss << p->get_crush_version();
     } else if (prefix == "osd ls-tree") {
       string bucket_name;
-      cmd_getval(cct, cmdmap, "name", bucket_name);
+      cmd_getval(cmdmap, "name", bucket_name);
       set<int> osds;
       r = p->get_osds_by_bucket_name(bucket_name, &osds);
       if (r == -ENOENT) {
@@ -5206,7 +5598,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     goto reply;
   } else if (prefix  == "osd find") {
     int64_t osd;
-    if (!cmd_getval(cct, cmdmap, "id", osd)) {
+    if (!cmd_getval(cmdmap, "id", osd)) {
       ss << "unable to parse osd id value '"
          << cmd_vartype_stringify(cmdmap["id"]) << "'";
       r = -EINVAL;
@@ -5218,7 +5610,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       goto reply;
     }
     string format;
-    cmd_getval(cct, cmdmap, "format", format);
+    cmd_getval(cmdmap, "format", format);
     boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
     f->open_object_section("osd_location");
     f->dump_int("osd", osd);
@@ -5233,7 +5625,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
     for (auto& k : {
        "pod_name", "pod_namespace", // set by rook
-       "container_name"             // set by ceph-ansible
+       "container_name"             // set by cephadm, ceph-ansible
        }) {
       if (auto p = m.find(k); p != m.end()) {
        f->dump_string(k, p->second);
@@ -5251,7 +5643,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   } else if (prefix == "osd metadata") {
     int64_t osd = -1;
     if (cmd_vartype_stringify(cmdmap["id"]).size() &&
-        !cmd_getval(cct, cmdmap, "id", osd)) {
+        !cmd_getval(cmdmap, "id", osd)) {
       ss << "unable to parse osd id value '"
          << cmd_vartype_stringify(cmdmap["id"]) << "'";
       r = -EINVAL;
@@ -5263,7 +5655,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       goto reply;
     }
     string format;
-    cmd_getval(cct, cmdmap, "format", format);
+    cmd_getval(cmdmap, "format", format);
     boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
     if (osd >= 0) {
       f->open_object_section("osd_metadata");
@@ -5304,7 +5696,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     if (!f)
       f.reset(Formatter::create("json-pretty"));
     string field;
-    cmd_getval(cct, cmdmap, "property", field);
+    cmd_getval(cmdmap, "property", field);
     count_metadata(field, f.get());
     f->flush(rdata);
     r = 0;
@@ -5397,9 +5789,9 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
   } else if (prefix == "osd map") {
     string poolstr, objstr, namespacestr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
-    cmd_getval(cct, cmdmap, "object", objstr);
-    cmd_getval(cct, cmdmap, "nspace", namespacestr);
+    cmd_getval(cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "object", objstr);
+    cmd_getval(cmdmap, "nspace", namespacestr);
 
     int64_t pool = osdmap.lookup_pg_pool_name(poolstr.c_str());
     if (pool < 0) {
@@ -5453,7 +5845,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   } else if (prefix == "pg map") {
     pg_t pgid;
     string pgidstr;
-    cmd_getval(cct, cmdmap, "pgid", pgidstr);
+    cmd_getval(cmdmap, "pgid", pgidstr);
     if (!pgid.parse(pgidstr.c_str())) {
       ss << "invalid pgid '" << pgidstr << "'";
       r = -EINVAL;
@@ -5544,7 +5936,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
 
   } else if (prefix == "osd pool ls") {
     string detail;
-    cmd_getval(cct, cmdmap, "detail", detail);
+    cmd_getval(cmdmap, "detail", detail);
     if (!f && detail == "detail") {
       ostringstream ss;
       osdmap.print_pools(ss);
@@ -5577,7 +5969,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
 
   } else if (prefix == "osd crush get-tunable") {
     string tunable;
-    cmd_getval(cct, cmdmap, "tunable", tunable);
+    cmd_getval(cmdmap, "tunable", tunable);
     ostringstream rss;
     if (f)
       f->open_object_section("tunable");
@@ -5600,7 +5992,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
 
   } else if (prefix == "osd pool get") {
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool = osdmap.lookup_pg_pool_name(poolstr.c_str());
     if (pool < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -5610,7 +6002,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
 
     const pg_pool_t *p = osdmap.get_pg_pool(pool);
     string var;
-    cmd_getval(cct, cmdmap, "var", var);
+    cmd_getval(cmdmap, "var", var);
 
     typedef std::map<std::string, osd_pool_get_choices> choices_map_t;
     const choices_map_t ALL_CHOICES = {
@@ -6057,7 +6449,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     r = 0;
   } else if (prefix == "osd pool get-quota") {
     string pool_name;
-    cmd_getval(cct, cmdmap, "pool", pool_name);
+    cmd_getval(cmdmap, "pool", pool_name);
 
     int64_t poolid = osdmap.lookup_pg_pool_name(pool_name);
     if (poolid < 0) {
@@ -6067,13 +6459,16 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       goto reply;
     }
     const pg_pool_t *p = osdmap.get_pg_pool(poolid);
-
+    const pool_stat_t* pstat = mon->mgrstatmon()->get_pool_stat(poolid);
+    const object_stat_sum_t& sum = pstat->stats.sum;
     if (f) {
       f->open_object_section("pool_quotas");
       f->dump_string("pool_name", pool_name);
       f->dump_unsigned("pool_id", poolid);
       f->dump_unsigned("quota_max_objects", p->quota_max_objects);
+      f->dump_int("current_num_objects", sum.num_objects);
       f->dump_unsigned("quota_max_bytes", p->quota_max_bytes);
+      f->dump_int("current_num_bytes", sum.num_bytes);
       f->close_section();
       f->flush(rdata);
     } else {
@@ -6082,14 +6477,18 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          << "  max objects: ";
       if (p->quota_max_objects == 0)
         rs << "N/A";
-      else
+      else {
         rs << si_u_t(p->quota_max_objects) << " objects";
+        rs << "  (current num objects: " << sum.num_objects << " objects)";
+      }
       rs << "\n"
          << "  max bytes  : ";
       if (p->quota_max_bytes == 0)
         rs << "N/A";
-      else
+      else {
         rs << byte_u_t(p->quota_max_bytes);
+        rs << "  (current num bytes: " << sum.num_bytes << " bytes)";
+      }
       rdata.append(rs.str());
     }
     rdata.append("\n");
@@ -6108,7 +6507,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
   } else if (prefix == "osd crush rule ls-by-class") {
     string class_name;
-    cmd_getval(cct, cmdmap, "class", class_name);
+    cmd_getval(cmdmap, "class", class_name);
     if (class_name.empty()) {
       ss << "no class specified";
       r = -EINVAL;
@@ -6136,9 +6535,9 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
   } else if (prefix == "osd crush rule dump") {
     string name;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     string format;
-    cmd_getval(cct, cmdmap, "format", format);
+    cmd_getval(cmdmap, "format", format);
     boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
     if (name == "") {
       f->open_array_section("rules");
@@ -6159,7 +6558,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     rdata.append(rs.str());
   } else if (prefix == "osd crush dump") {
     string format;
-    cmd_getval(cct, cmdmap, "format", format);
+    cmd_getval(cmdmap, "format", format);
     boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
     f->open_object_section("crush_map");
     osdmap.crush->dump(f.get());
@@ -6170,7 +6569,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     rdata.append(rs.str());
   } else if (prefix == "osd crush show-tunables") {
     string format;
-    cmd_getval(cct, cmdmap, "format", format);
+    cmd_getval(cmdmap, "format", format);
     boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
     f->open_object_section("crush_map_tunables");
     osdmap.crush->dump_tunables(f.get());
@@ -6181,7 +6580,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     rdata.append(rs.str());
   } else if (prefix == "osd crush tree") {
     string shadow;
-    cmd_getval(cct, cmdmap, "shadow", shadow);
+    cmd_getval(cmdmap, "shadow", shadow);
     bool show_shadow = shadow == "--show-shadow";
     boost::scoped_ptr<Formatter> f(Formatter::create(format));
     if (f) {
@@ -6202,7 +6601,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
   } else if (prefix == "osd crush ls") {
     string name;
-    if (!cmd_getval(cct, cmdmap, "node", name)) {
+    if (!cmd_getval(cmdmap, "node", name)) {
       ss << "no node specified";
       r = -EINVAL;
       goto reply;
@@ -6246,7 +6645,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     f->flush(rdata);
   } else if (prefix == "osd crush class ls-osd") {
     string name;
-    cmd_getval(cct, cmdmap, "class", name);
+    cmd_getval(cmdmap, "class", name);
     set<int> osds;
     osdmap.crush->get_devices_by_class(name, &osds);
     if (f) {
@@ -6267,7 +6666,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
   } else if (prefix == "osd crush get-device-class") {
     vector<string> idvec;
-    cmd_getval(cct, cmdmap, "ids", idvec);
+    cmd_getval(cmdmap, "ids", idvec);
     map<int, string> class_by_osd;
     for (auto& id : idvec) {
       ostringstream ts;
@@ -6359,7 +6758,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     f->flush(rdata);
   } else if (prefix == "osd erasure-code-profile get") {
     string name;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     if (!osdmap.has_erasure_code_profile(name)) {
       ss << "unknown erasure code profile '" << name << "'";
       r = -ENOENT;
@@ -6387,11 +6786,11 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty",
                                                      "json-pretty"));
     string pool_name;
-    cmd_getval(cct, cmdmap, "pool", pool_name);
+    cmd_getval(cmdmap, "pool", pool_name);
     string app;
-    cmd_getval(cct, cmdmap, "app", app);
+    cmd_getval(cmdmap, "app", app);
     string key;
-    cmd_getval(cct, cmdmap, "key", key);
+    cmd_getval(cmdmap, "key", key);
 
     if (pool_name.empty()) {
       // all
@@ -6465,7 +6864,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       ss.str("");
     }
   } else if (prefix == "osd get-require-min-compat-client") {
-    ss << ceph_release_name(osdmap.require_min_compat_client) << std::endl;
+    ss << osdmap.require_min_compat_client << std::endl;
     rdata.append(ss.str());
     ss.str("");
     goto reply;
@@ -6513,24 +6912,22 @@ void OSDMonitor::clear_pool_flags(int64_t pool_id, uint64_t flags)
   pool->unset_flag(flags);
 }
 
-string OSDMonitor::make_snap_epoch_key(int64_t pool, epoch_t epoch)
+string OSDMonitor::make_purged_snap_epoch_key(epoch_t epoch)
 {
   char k[80];
-  snprintf(k, sizeof(k), "removed_epoch_%llu_%08lx",
-          (unsigned long long)pool, (unsigned long)epoch);
+  snprintf(k, sizeof(k), "purged_epoch_%08lx", (unsigned long)epoch);
   return k;
 }
 
-string OSDMonitor::make_snap_key(int64_t pool, snapid_t snap)
+string OSDMonitor::make_purged_snap_key(int64_t pool, snapid_t snap)
 {
   char k[80];
-  snprintf(k, sizeof(k), "removed_snap_%llu_%016llx",
+  snprintf(k, sizeof(k), "purged_snap_%llu_%016llx",
           (unsigned long long)pool, (unsigned long long)snap);
   return k;
 }
 
-
-string OSDMonitor::make_snap_key_value(
+string OSDMonitor::make_purged_snap_key_value(
   int64_t pool, snapid_t snap, snapid_t num,
   epoch_t epoch, bufferlist *v)
 {
@@ -6539,38 +6936,44 @@ string OSDMonitor::make_snap_key_value(
   encode(snap, *v);
   encode(snap + num, *v);
   encode(epoch, *v);
-  return make_snap_key(pool, snap + num - 1);
+  return make_purged_snap_key(pool, snap + num - 1);
 }
 
-string OSDMonitor::make_snap_purged_key(int64_t pool, snapid_t snap)
-{
-  char k[80];
-  snprintf(k, sizeof(k), "purged_snap_%llu_%016llx",
-          (unsigned long long)pool, (unsigned long long)snap);
-  return k;
-}
-string OSDMonitor::make_snap_purged_key_value(
-  int64_t pool, snapid_t snap, snapid_t num,
-  epoch_t epoch, bufferlist *v)
-{
-  // encode the *last* epoch in the key so that we can use forward
-  // iteration only to search for an epoch in an interval.
-  encode(snap, *v);
-  encode(snap + num, *v);
-  encode(epoch, *v);
-  return make_snap_purged_key(pool, snap + num - 1);
-}
 
-int OSDMonitor::lookup_pruned_snap(int64_t pool, snapid_t snap,
-                                  snapid_t *begin, snapid_t *end)
+int OSDMonitor::lookup_purged_snap(
+  int64_t pool, snapid_t snap,
+  snapid_t *begin, snapid_t *end)
 {
-  string k = make_snap_key(pool, snap);
+  string k = make_purged_snap_key(pool, snap);
   auto it = mon->store->get_iterator(OSD_SNAP_PREFIX);
   it->lower_bound(k);
   if (!it->valid()) {
+    dout(20) << __func__
+            << " pool " << pool << " snap " << snap
+            << " - key '" << k << "' not found" << dendl;
     return -ENOENT;
   }
-  if (it->key().find(OSD_SNAP_PREFIX) != 0) {
+  if (it->key().find("purged_snap_") != 0) {
+    dout(20) << __func__
+            << " pool " << pool << " snap " << snap
+            << " - key '" << k << "' got '" << it->key()
+            << "', wrong prefix" << dendl;
+    return -ENOENT;
+  }
+  string gotk = it->key();
+  const char *format = "purged_snap_%llu_";
+  long long int keypool;
+  int n = sscanf(gotk.c_str(), format, &keypool);
+  if (n != 1) {
+    derr << __func__ << " invalid k '" << gotk << "'" << dendl;
+    return -ENOENT;
+  }
+  if (pool != keypool) {
+    dout(20) << __func__
+            << " pool " << pool << " snap " << snap
+            << " - key '" << k << "' got '" << gotk
+            << "', wrong pool " << keypool
+            << dendl;
     return -ENOENT;
   }
   bufferlist v = it->value();
@@ -6578,19 +6981,76 @@ int OSDMonitor::lookup_pruned_snap(int64_t pool, snapid_t snap,
   decode(*begin, p);
   decode(*end, p);
   if (snap < *begin || snap >= *end) {
+    dout(20) << __func__
+            << " pool " << pool << " snap " << snap
+            << " - found [" << *begin << "," << *end << "), no overlap"
+            << dendl;
     return -ENOENT;
   }
   return 0;
 }
 
+void OSDMonitor::insert_purged_snap_update(
+  int64_t pool,
+  snapid_t start, snapid_t end,
+  epoch_t epoch,
+  MonitorDBStore::TransactionRef t)
+{
+  snapid_t before_begin, before_end;
+  snapid_t after_begin, after_end;
+  int b = lookup_purged_snap(pool, start - 1,
+                            &before_begin, &before_end);
+  int a = lookup_purged_snap(pool, end,
+                            &after_begin, &after_end);
+  if (!b && !a) {
+    dout(10) << __func__
+            << " [" << start << "," << end << ") - joins ["
+            << before_begin << "," << before_end << ") and ["
+            << after_begin << "," << after_end << ")" << dendl;
+    // erase only the begin record; we'll overwrite the end one.
+    t->erase(OSD_SNAP_PREFIX, make_purged_snap_key(pool, before_end - 1));
+    bufferlist v;
+    string k = make_purged_snap_key_value(pool,
+                                         before_begin, after_end - before_begin,
+                                         pending_inc.epoch, &v);
+    t->put(OSD_SNAP_PREFIX, k, v);
+  } else if (!b) {
+    dout(10) << __func__
+            << " [" << start << "," << end << ") - join with earlier ["
+            << before_begin << "," << before_end << ")" << dendl;
+    t->erase(OSD_SNAP_PREFIX, make_purged_snap_key(pool, before_end - 1));
+    bufferlist v;
+    string k = make_purged_snap_key_value(pool,
+                                         before_begin, end - before_begin,
+                                         pending_inc.epoch, &v);
+    t->put(OSD_SNAP_PREFIX, k, v);
+  } else if (!a) {
+    dout(10) << __func__
+            << " [" << start << "," << end << ") - join with later ["
+            << after_begin << "," << after_end << ")" << dendl;
+    // overwrite after record
+    bufferlist v;
+    string k = make_purged_snap_key_value(pool,
+                                         start, after_end - start,
+                                         pending_inc.epoch, &v);
+    t->put(OSD_SNAP_PREFIX, k, v);
+  } else {
+    dout(10) << __func__
+            << " [" << start << "," << end << ") - new"
+            << dendl;
+    bufferlist v;
+    string k = make_purged_snap_key_value(pool,
+                                         start, end - start,
+                                         pending_inc.epoch, &v);
+    t->put(OSD_SNAP_PREFIX, k, v);
+  }
+}
+
 bool OSDMonitor::try_prune_purged_snaps()
 {
   if (!mon->mgrstatmon()->is_readable()) {
     return false;
   }
-  if (osdmap.require_osd_release < CEPH_RELEASE_MIMIC) {
-    return false;
-  }
   if (!pending_inc.new_purged_snaps.empty()) {
     return false;  // we already pruned for this epoch
   }
@@ -6615,26 +7075,25 @@ bool OSDMonitor::try_prune_purged_snaps()
       continue;
     }
     dout(20) << __func__ << " pool " << p.first << " purged " << purged << dendl;
-    OSDMap::snap_interval_set_t to_prune;
+    snap_interval_set_t to_prune;
     unsigned maybe_pruned = actually_pruned;
     for (auto i = purged.begin(); i != purged.end(); ++i) {
       snapid_t begin = i.get_start();
       auto end = i.get_start() + i.get_len();
       snapid_t pbegin = 0, pend = 0;
-      int r = lookup_pruned_snap(p.first, begin, &pbegin, &pend);
+      int r = lookup_purged_snap(p.first, begin, &pbegin, &pend);
       if (r == 0) {
        // already purged.
        // be a bit aggressive about backing off here, because the mon may
        // do a lot of work going through this set, and if we know the
        // purged set from the OSDs is at least *partly* stale we may as
        // well wait for it to be fresh.
-       dout(20) << __func__ << "  we've already pruned " << pbegin
+       dout(20) << __func__ << "  we've already purged " << pbegin
                 << "~" << (pend - pbegin) << dendl;
        break;  // next pool
       }
-      if (pbegin && pbegin < end) {
+      if (pbegin && pbegin > begin && pbegin < end) {
        // the tail of [begin,end) is purged; shorten the range
-       ceph_assert(pbegin > begin);
        end = pbegin;
       }
       to_prune.insert(begin, end - begin);
@@ -6646,7 +7105,7 @@ bool OSDMonitor::try_prune_purged_snaps()
     if (!to_prune.empty()) {
       // PGs may still be reporting things as purged that we have already
       // pruned from removed_snaps_queue.
-      OSDMap::snap_interval_set_t actual;
+      snap_interval_set_t actual;
       auto r = osdmap.removed_snaps_queue.find(p.first);
       if (r != osdmap.removed_snaps_queue.end()) {
        actual.intersection_of(to_prune, r->second);
@@ -6730,7 +7189,7 @@ bool OSDMonitor::update_pools_status()
 int OSDMonitor::prepare_new_pool(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+  auto m = op->get_req<MPoolOp>();
   dout(10) << "prepare_new_pool from " << m->get_connection() << dendl;
   MonSession *session = op->get_session();
   if (!session)
@@ -6742,7 +7201,8 @@ int OSDMonitor::prepare_new_pool(MonOpRequestRef op)
   ret = prepare_new_pool(m->name, m->crush_rule, rule_name,
                         0, 0, 0, 0, 0, 0.0,
                         erasure_code_profile,
-                        pg_pool_t::TYPE_REPLICATED, 0, FAST_READ_OFF, &ss);
+                        pg_pool_t::TYPE_REPLICATED, 0, FAST_READ_OFF, {},
+                        &ss);
 
   if (ret < 0) {
     dout(10) << __func__ << " got " << ret << " " << ss.str() << dendl;
@@ -6966,12 +7426,12 @@ bool OSDMonitor::validate_crush_against_features(const CrushWrapper *newcrush,
   newmap.apply_incremental(new_pending);
 
   // client compat
-  if (newmap.require_min_compat_client > 0) {
+  if (newmap.require_min_compat_client != ceph_release_t::unknown) {
     auto mv = newmap.get_min_compat_client();
     if (mv > newmap.require_min_compat_client) {
-      ss << "new crush map requires client version " << ceph_release_name(mv)
+      ss << "new crush map requires client version " << mv
         << " but require_min_compat_client is "
-        << ceph_release_name(newmap.require_min_compat_client);
+        << newmap.require_min_compat_client;
       return false;
     }
   }
@@ -7274,6 +7734,7 @@ int OSDMonitor::prepare_new_pool(string& name,
                                  const unsigned pool_type,
                                  const uint64_t expected_num_objects,
                                  FastReadType fast_read,
+                                const string& pg_autoscale_mode,
                                 ostream *ss)
 {
   if (name.length() == 0)
@@ -7404,10 +7865,12 @@ int OSDMonitor::prepare_new_pool(string& name,
   pi->expected_num_objects = expected_num_objects;
   pi->object_hash = CEPH_STR_HASH_RJENKINS;
 
-  {
-    auto m = pg_pool_t::get_pg_autoscale_mode_by_name(
-      g_conf().get_val<string>("osd_pool_default_pg_autoscale_mode"));
-    pi->pg_autoscale_mode = m >= 0 ? m : 0;
+  if (auto m = pg_pool_t::get_pg_autoscale_mode_by_name(
+        g_conf().get_val<string>("osd_pool_default_pg_autoscale_mode"));
+      m != pg_pool_t::pg_autoscale_mode_t::UNKNOWN) {
+    pi->pg_autoscale_mode = m;
+  } else {
+    pi->pg_autoscale_mode = pg_pool_t::pg_autoscale_mode_t::OFF;
   }
   auto max = g_conf().get_val<int64_t>("mon_osd_max_initial_pgs");
   pi->set_pg_num(
@@ -7417,10 +7880,14 @@ int OSDMonitor::prepare_new_pool(string& name,
   pi->set_pg_num_target(pg_num);
   pi->set_pgp_num(pi->get_pg_num());
   pi->set_pgp_num_target(pgp_num);
-  if (osdmap.require_osd_release >= CEPH_RELEASE_NAUTILUS &&
+  if (osdmap.require_osd_release >= ceph_release_t::nautilus &&
       pg_num_min) {
     pi->opts.set(pool_opts_t::PG_NUM_MIN, static_cast<int64_t>(pg_num_min));
   }
+  if (auto m = pg_pool_t::get_pg_autoscale_mode_by_name(
+       pg_autoscale_mode); m != pg_pool_t::pg_autoscale_mode_t::UNKNOWN) {
+    pi->pg_autoscale_mode = m;
+  }
 
   pi->last_change = pending_inc.epoch;
   pi->auid = 0;
@@ -7432,14 +7899,14 @@ int OSDMonitor::prepare_new_pool(string& name,
   }
   pi->stripe_width = stripe_width;
 
-  if (osdmap.require_osd_release >= CEPH_RELEASE_NAUTILUS &&
+  if (osdmap.require_osd_release >= ceph_release_t::nautilus &&
       target_size_bytes) {
     // only store for nautilus+ because TARGET_SIZE_BYTES may be
     // larger than int32_t max.
     pi->opts.set(pool_opts_t::TARGET_SIZE_BYTES, static_cast<int64_t>(target_size_bytes));
   }
   if (target_size_ratio > 0.0 &&
-    osdmap.require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+      osdmap.require_osd_release >= ceph_release_t::nautilus) {
     // only store for nautilus+, just to be consistent and tidy.
     pi->opts.set(pool_opts_t::TARGET_SIZE_RATIO, target_size_ratio);
   }
@@ -7487,14 +7954,14 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
                                          stringstream& ss)
 {
   string poolstr;
-  cmd_getval(cct, cmdmap, "pool", poolstr);
+  cmd_getval(cmdmap, "pool", poolstr);
   int64_t pool = osdmap.lookup_pg_pool_name(poolstr.c_str());
   if (pool < 0) {
     ss << "unrecognized pool '" << poolstr << "'";
     return -ENOENT;
   }
   string var;
-  cmd_getval(cct, cmdmap, "var", var);
+  cmd_getval(cmdmap, "var", var);
 
   pg_pool_t p = *osdmap.get_pg_pool(pool);
   if (pending_inc.new_pools.count(pool))
@@ -7510,16 +7977,22 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
   int64_t n = 0;
   double f = 0;
   int64_t uf = 0;  // micro-f
-  cmd_getval(cct, cmdmap, "val", val);
-
-  // create list of properties that use SI units
-  std::set<std::string> si_items = {};
-  // create list of properties that use IEC units
-  std::set<std::string> iec_items = {"target_size_bytes"};
+  cmd_getval(cmdmap, "val", val);
 
-  if (si_items.count(var)) {
+  auto si_options = {
+    "target_max_objects"
+  };
+  auto iec_options = {
+    "target_max_bytes",
+    "target_size_bytes",
+    "compression_max_blob_size",
+    "compression_min_blob_size",
+    "csum_max_block",
+    "csum_min_block",
+  };
+  if (count(begin(si_options), end(si_options), var)) {
     n = strict_si_cast<int64_t>(val.c_str(), &interr);
-  } else if (iec_items.count(var)) {
+  } else if (count(begin(iec_options), end(iec_options), var)) {
     n = strict_iec_cast<int64_t>(val.c_str(), &interr);
   } else {
     // parse string as both int and float; different fields use different types.
@@ -7565,8 +8038,7 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
       return r;
     }
     p.size = n;
-    if (n < p.min_size)
-      p.min_size = n;
+    p.min_size = g_conf().get_osd_pool_default_min_size(p.size);
   } else if (var == "min_size") {
     if (p.has_flag(pg_pool_t::FLAG_NOSIZECHANGE)) {
       ss << "pool min size change is disabled; you must unset nosizechange flag for the pool first";
@@ -7626,7 +8098,7 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
       }
       p.set_pg_num(n);
     } else {
-      if (osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS) {
+      if (osdmap.require_osd_release < ceph_release_t::nautilus) {
        ss << "nautilus OSDs are required to adjust pg_num_pending";
        return -EPERM;
       }
@@ -7672,18 +8144,18 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
        return r;
       }
       bool force = false;
-      cmd_getval(cct,cmdmap, "yes_i_really_mean_it", force);
+      cmd_getval(cmdmap, "yes_i_really_mean_it", force);
       if (p.cache_mode != pg_pool_t::CACHEMODE_NONE && !force) {
        ss << "splits in cache pools must be followed by scrubs and leave sufficient free space to avoid overfilling.  use --yes-i-really-mean-it to force.";
        return -EPERM;
       }
     } else {
-      if (osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS) {
+      if (osdmap.require_osd_release < ceph_release_t::nautilus) {
        ss << "nautilus OSDs are required to decrease pg_num";
        return -EPERM;
       }
     }
-    if (osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS) {
+    if (osdmap.require_osd_release < ceph_release_t::nautilus) {
       // pre-nautilus osdmap format; increase pg_num directly
       assert(n > (int)p.get_pg_num());
       // force pre-nautilus clients to resend their ops, since they
@@ -7743,23 +8215,23 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
       ss << "specified pgp_num " << n << " > pg_num " << p.get_pg_num_target();
       return -EINVAL;
     }
-    if (osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS) {
+    if (osdmap.require_osd_release < ceph_release_t::nautilus) {
       // pre-nautilus osdmap format; increase pgp_num directly
       p.set_pgp_num(n);
     } else {
       p.set_pgp_num_target(n);
     }
   } else if (var == "pg_autoscale_mode") {
-    n = pg_pool_t::get_pg_autoscale_mode_by_name(val);
-    if (n < 0) {
+    auto m = pg_pool_t::get_pg_autoscale_mode_by_name(val);
+    if (m == pg_pool_t::pg_autoscale_mode_t::UNKNOWN) {
       ss << "specified invalid mode " << val;
       return -EINVAL;
     }
-    if (osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS) {
+    if (osdmap.require_osd_release < ceph_release_t::nautilus) {
       ss << "must set require_osd_release to nautilus or later before setting pg_autoscale_mode";
       return -EINVAL;
     }
-    p.pg_autoscale_mode = n;
+    p.pg_autoscale_mode = m;
   } else if (var == "crush_rule") {
     int id = osdmap.crush->get_rule_id(val);
     if (id == -ENOENT) {
@@ -7790,7 +8262,7 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
   } else if (var == "hashpspool") {
     uint64_t flag = pg_pool_t::get_flag_by_name(var);
     bool force = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_mean_it", force);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", force);
 
     if (!force) {
       ss << "are you SURE?  this will remap all placement groups in this pool,"
@@ -8042,7 +8514,7 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
         ss << "error parsing unit value '" << val << "': " << interr;
         return -EINVAL;
       }
-      if (osdmap.require_osd_release < CEPH_RELEASE_NAUTILUS) {
+      if (osdmap.require_osd_release < ceph_release_t::nautilus) {
         ss << "must set require_osd_release to nautilus or "
            << "later before setting target_size_bytes";
         return -EINVAL;
@@ -8153,7 +8625,7 @@ int OSDMonitor::_command_pool_application(const string &prefix,
                                           bool preparing)
 {
   string pool_name;
-  cmd_getval(cct, cmdmap, "pool", pool_name);
+  cmd_getval(cmdmap, "pool", pool_name);
   int64_t pool = osdmap.lookup_pg_pool_name(pool_name.c_str());
   if (pool < 0) {
     ss << "unrecognized pool '" << pool_name << "'";
@@ -8168,18 +8640,18 @@ int OSDMonitor::_command_pool_application(const string &prefix,
   }
 
   string app;
-  cmd_getval(cct, cmdmap, "app", app);
+  cmd_getval(cmdmap, "app", app);
   bool app_exists = (p.application_metadata.count(app) > 0);
 
   string key;
-  cmd_getval(cct, cmdmap, "key", key);
+  cmd_getval(cmdmap, "key", key);
   if (key == "all") {
     ss << "key cannot be 'all'";
     return -EINVAL;
   }
 
   string value;
-  cmd_getval(cct, cmdmap, "value", value);
+  cmd_getval(cmdmap, "value", value);
   if (value == "all") {
     ss << "value cannot be 'all'";
     return -EINVAL;
@@ -8197,7 +8669,7 @@ int OSDMonitor::_command_pool_application(const string &prefix,
     }
 
     bool force = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_mean_it", force);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", force);
 
     if (!app_exists && !p.application_metadata.empty() && !force) {
       ss << "Are you SURE? Pool '" << pool_name << "' already has an enabled "
@@ -8224,7 +8696,7 @@ int OSDMonitor::_command_pool_application(const string &prefix,
 
   } else if (boost::algorithm::ends_with(prefix, "disable")) {
     bool force = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_mean_it", force);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", force);
 
     if (!force) {
       ss << "Are you SURE? Disabling an application within a pool might result "
@@ -8255,7 +8727,7 @@ int OSDMonitor::_command_pool_application(const string &prefix,
     }
 
     string key;
-    cmd_getval(cct, cmdmap, "key", key);
+    cmd_getval(cmdmap, "key", key);
 
     if (key.empty()) {
       ss << "key must be provided";
@@ -8277,7 +8749,7 @@ int OSDMonitor::_command_pool_application(const string &prefix,
     }
 
     string value;
-    cmd_getval(cct, cmdmap, "value", value);
+    cmd_getval(cmdmap, "value", value);
     if (value.length() > MAX_POOL_APPLICATION_LENGTH) {
       ss << "value '" << value << "' too long; max length "
          << MAX_POOL_APPLICATION_LENGTH;
@@ -8295,7 +8767,7 @@ int OSDMonitor::_command_pool_application(const string &prefix,
     }
 
     string key;
-    cmd_getval(cct, cmdmap, "key", key);
+    cmd_getval(cmdmap, "key", key);
     auto it = p.application_metadata[app].find(key);
     if (it == p.application_metadata[app].end()) {
       ss << "application '" << app << "' on pool '" << pool_name
@@ -8619,7 +9091,7 @@ int OSDMonitor::prepare_command_osd_new(
    * If `id` is specified, and the osd has been previously marked
    * as destroyed, then the `id` will be reused.
    */
-  if (!cmd_getval(cct, cmdmap, "uuid", uuidstr)) {
+  if (!cmd_getval(cmdmap, "uuid", uuidstr)) {
     ss << "requires the OSD's UUID to be specified.";
     return -EINVAL;
   } else if (!uuid.parse(uuidstr.c_str())) {
@@ -8627,7 +9099,7 @@ int OSDMonitor::prepare_command_osd_new(
     return -EINVAL;
   }
 
-  if (cmd_getval(cct, cmdmap, "id", id) &&
+  if (cmd_getval(cmdmap, "id", id) &&
       (id < 0)) {
     ss << "invalid OSD id; must be greater or equal than zero.";
     return -EINVAL;
@@ -8852,7 +9324,7 @@ int OSDMonitor::prepare_command_osd_new(
 bool OSDMonitor::prepare_command(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+  auto m = op->get_req<MMonCommand>();
   stringstream ss;
   cmdmap_t cmdmap;
   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
@@ -8877,7 +9349,7 @@ static int parse_reweights(CephContext *cct,
                           map<int32_t, uint32_t>* weights)
 {
   string weights_str;
-  if (!cmd_getval(cct, cmdmap, "weights", weights_str)) {
+  if (!cmd_getval(cmdmap, "weights", weights_str)) {
     return -EINVAL;
   }
   std::replace(begin(weights_str), end(weights_str), '\'', '"');
@@ -9052,7 +9524,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                      const cmdmap_t& cmdmap)
 {
   op->mark_osdmon_event(__func__);
-  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+  auto m = op->get_req<MMonCommand>();
   bool ret = false;
   stringstream ss;
   string rs;
@@ -9060,11 +9532,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   int err = 0;
 
   string format;
-  cmd_getval(cct, cmdmap, "format", format, string("plain"));
+  cmd_getval(cmdmap, "format", format, string("plain"));
   boost::scoped_ptr<Formatter> f(Formatter::create(format));
 
   string prefix;
-  cmd_getval(cct, cmdmap, "prefix", prefix);
+  cmd_getval(cmdmap, "prefix", prefix);
 
   int64_t osdid;
   string osd_name;
@@ -9072,7 +9544,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   if (prefix != "osd pg-temp" &&
       prefix != "osd pg-upmap" &&
       prefix != "osd pg-upmap-items") {  // avoid commands with non-int id arg
-    osdid_present = cmd_getval(cct, cmdmap, "id", osdid);
+    osdid_present = cmd_getval(cmdmap, "id", osdid);
   }
   if (osdid_present) {
     ostringstream oss;
@@ -9131,7 +9603,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
   
     int64_t prior_version = 0;
-    if (cmd_getval(cct, cmdmap, "prior_version", prior_version)) {
+    if (cmd_getval(cmdmap, "prior_version", prior_version)) {
       if (prior_version == osdmap.get_crush_version() - 1) {
        // see if we are a resend of the last update.  this is imperfect
        // (multiple racing updaters may not both get reliable success)
@@ -9219,14 +9691,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd crush set-device-class") {
     string device_class;
-    if (!cmd_getval(cct, cmdmap, "class", device_class)) {
+    if (!cmd_getval(cmdmap, "class", device_class)) {
       err = -EINVAL; // no value!
       goto reply;
     }
 
     bool stop = false;
     vector<string> idvec;
-    cmd_getval(cct, cmdmap, "ids", idvec);
+    cmd_getval(cmdmap, "ids", idvec);
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
     set<int> updated;
@@ -9289,20 +9761,18 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
     }
 
-    if (!updated.empty()) {
-      pending_inc.crush.clear();
-      newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-      ss << "set osd(s) " << updated << " to class '" << device_class << "'";
-      getline(ss, rs);
-      wait_for_finished_proposal(op,
-        new Monitor::C_Command(mon,op, 0, rs, get_last_committed() + 1));
-      return true;
-    }
-
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    ss << "set osd(s) " << updated << " to class '" << device_class << "'";
+    getline(ss, rs);
+    wait_for_finished_proposal(
+      op,
+      new Monitor::C_Command(mon,op, 0, rs, get_last_committed() + 1));
+    return true;
  } else if (prefix == "osd crush rm-device-class") {
     bool stop = false;
     vector<string> idvec;
-    cmd_getval(cct, cmdmap, "ids", idvec);
+    cmd_getval(cmdmap, "ids", idvec);
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
     set<int> updated;
@@ -9351,22 +9821,21 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
     }
 
-    if (!updated.empty()) {
-      pending_inc.crush.clear();
-      newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-      ss << "done removing class of osd(s): " << updated;
-      getline(ss, rs);
-      wait_for_finished_proposal(op,
-        new Monitor::C_Command(mon,op, 0, rs, get_last_committed() + 1));
-      return true;
-    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    ss << "done removing class of osd(s): " << updated;
+    getline(ss, rs);
+    wait_for_finished_proposal(
+      op,
+      new Monitor::C_Command(mon,op, 0, rs, get_last_committed() + 1));
+    return true;
   } else if (prefix == "osd crush class create") {
     string device_class;
-    if (!cmd_getval(g_ceph_context, cmdmap, "class", device_class)) {
+    if (!cmd_getval(cmdmap, "class", device_class)) {
       err = -EINVAL; // no value!
       goto reply;
     }
-    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+    if (osdmap.require_osd_release < ceph_release_t::luminous) {
       ss << "you must complete the upgrade and 'ceph osd require-osd-release "
          << "luminous' before using crush device classes";
       err = -EPERM;
@@ -9391,11 +9860,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     goto update;
   } else if (prefix == "osd crush class rm") {
     string device_class;
-    if (!cmd_getval(g_ceph_context, cmdmap, "class", device_class)) {
+    if (!cmd_getval(cmdmap, "class", device_class)) {
        err = -EINVAL; // no value!
        goto reply;
      }
-     if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+    if (osdmap.require_osd_release < ceph_release_t::luminous) {
        ss << "you must complete the upgrade and 'ceph osd require-osd-release "
          << "luminous' before using crush device classes";
        err = -EPERM;
@@ -9473,11 +9942,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
      goto update;
   } else if (prefix == "osd crush class rename") {
     string srcname, dstname;
-    if (!cmd_getval(cct, cmdmap, "srcname", srcname)) {
+    if (!cmd_getval(cmdmap, "srcname", srcname)) {
       err = -EINVAL;
       goto reply;
     }
-    if (!cmd_getval(cct, cmdmap, "dstname", dstname)) {
+    if (!cmd_getval(cmdmap, "dstname", dstname)) {
       err = -EINVAL;
       goto reply;
     }
@@ -9507,9 +9976,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     // os crush add-bucket <name> <type>
     string name, typestr;
     vector<string> argvec;
-    cmd_getval(cct, cmdmap, "name", name);
-    cmd_getval(cct, cmdmap, "type", typestr);
-    cmd_getval(cct, cmdmap, "args", argvec);
+    cmd_getval(cmdmap, "name", name);
+    cmd_getval(cmdmap, "type", typestr);
+    cmd_getval(cmdmap, "args", argvec);
     map<string,string> loc;
     if (!argvec.empty()) {
       CrushWrapper::parse_loc_map(argvec, &loc);
@@ -9581,8 +10050,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     goto update;
   } else if (prefix == "osd crush rename-bucket") {
     string srcname, dstname;
-    cmd_getval(cct, cmdmap, "srcname", srcname);
-    cmd_getval(cct, cmdmap, "dstname", dstname);
+    cmd_getval(cmdmap, "srcname", srcname);
+    cmd_getval(cmdmap, "dstname", dstname);
 
     err = crush_rename_bucket(srcname, dstname, &ss);
     if (err == -EALREADY) // equivalent to success for idempotency
@@ -9603,10 +10072,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     if (prefix == "osd crush weight-set create") {
-      if (osdmap.require_min_compat_client > 0 &&
-         osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
+      if (osdmap.require_min_compat_client != ceph_release_t::unknown &&
+         osdmap.require_min_compat_client < ceph_release_t::luminous) {
        ss << "require_min_compat_client "
-          << ceph_release_name(osdmap.require_min_compat_client)
+          << osdmap.require_min_compat_client
           << " < luminous, which is required for per-pool weight-sets. "
            << "Try 'ceph osd set-require-min-compat-client luminous' "
            << "before using the new interface";
@@ -9614,14 +10083,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        goto reply;
       }
       string poolname, mode;
-      cmd_getval(cct, cmdmap, "pool", poolname);
+      cmd_getval(cmdmap, "pool", poolname);
       pool = osdmap.lookup_pg_pool_name(poolname.c_str());
       if (pool < 0) {
        ss << "pool '" << poolname << "' not found";
        err = -ENOENT;
        goto reply;
       }
-      cmd_getval(cct, cmdmap, "mode", mode);
+      cmd_getval(cmdmap, "mode", mode);
       if (mode != "flat" && mode != "positional") {
        ss << "unrecognized weight-set mode '" << mode << "'";
        err = -EINVAL;
@@ -9652,7 +10121,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     int64_t pool;
     if (prefix == "osd crush weight-set rm") {
       string poolname;
-      cmd_getval(cct, cmdmap, "pool", poolname);
+      cmd_getval(cmdmap, "pool", poolname);
       pool = osdmap.lookup_pg_pool_name(poolname.c_str());
       if (pool < 0) {
        ss << "pool '" << poolname << "' not found";
@@ -9671,9 +10140,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             prefix == "osd crush weight-set reweight-compat") {
     string poolname, item;
     vector<double> weight;
-    cmd_getval(cct, cmdmap, "pool", poolname);
-    cmd_getval(cct, cmdmap, "item", item);
-    cmd_getval(cct, cmdmap, "weight", weight);
+    cmd_getval(cmdmap, "pool", poolname);
+    cmd_getval(cmdmap, "item", item);
+    cmd_getval(cmdmap, "weight", weight);
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
     int64_t pool;
@@ -9736,7 +10205,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     double weight;
-    if (!cmd_getval(cct, cmdmap, "weight", weight)) {
+    if (!cmd_getval(cmdmap, "weight", weight)) {
       ss << "unable to parse weight value '"
          << cmd_vartype_stringify(cmdmap.at("weight")) << "'";
       err = -EINVAL;
@@ -9745,7 +10214,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     string args;
     vector<string> argvec;
-    cmd_getval(cct, cmdmap, "args", argvec);
+    cmd_getval(cmdmap, "args", argvec);
     map<string,string> loc;
     CrushWrapper::parse_loc_map(argvec, &loc);
 
@@ -9805,7 +10274,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
 
       double weight;
-      if (!cmd_getval(cct, cmdmap, "weight", weight)) {
+      if (!cmd_getval(cmdmap, "weight", weight)) {
         ss << "unable to parse weight value '"
            << cmd_vartype_stringify(cmdmap.at("weight")) << "'";
         err = -EINVAL;
@@ -9814,7 +10283,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
       string args;
       vector<string> argvec;
-      cmd_getval(cct, cmdmap, "args", argvec);
+      cmd_getval(cmdmap, "args", argvec);
       map<string,string> loc;
       CrushWrapper::parse_loc_map(argvec, &loc);
 
@@ -9851,8 +10320,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       // osd crush move <name> <loc1> [<loc2> ...]
       string name;
       vector<string> argvec;
-      cmd_getval(cct, cmdmap, "name", name);
-      cmd_getval(cct, cmdmap, "args", argvec);
+      cmd_getval(cmdmap, "name", name);
+      cmd_getval(cmdmap, "args", argvec);
       map<string,string> loc;
       CrushWrapper::parse_loc_map(argvec, &loc);
 
@@ -9891,11 +10360,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     } while (false);
   } else if (prefix == "osd crush swap-bucket") {
     string source, dest;
-    cmd_getval(cct, cmdmap, "source", source);
-    cmd_getval(cct, cmdmap, "dest", dest);
+    cmd_getval(cmdmap, "source", source);
+    cmd_getval(cmdmap, "dest", dest);
 
     bool force = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_mean_it", force);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", force);
 
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
@@ -9941,9 +10410,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd crush link") {
     // osd crush link <name> <loc1> [<loc2> ...]
     string name;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     vector<string> argvec;
-    cmd_getval(cct, cmdmap, "args", argvec);
+    cmd_getval(cmdmap, "args", argvec);
     map<string,string> loc;
     CrushWrapper::parse_loc_map(argvec, &loc);
 
@@ -10004,7 +10473,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       _get_pending_crush(newcrush);
 
       string name;
-      cmd_getval(cct, cmdmap, "name", name);
+      cmd_getval(cmdmap, "name", name);
 
       if (!osdmap.crush->name_exists(name)) {
        err = 0;
@@ -10024,7 +10493,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
       bool unlink_only = prefix == "osd crush unlink";
       string ancestor_str;
-      if (cmd_getval(cct, cmdmap, "ancestor", ancestor_str)) {
+      if (cmd_getval(cmdmap, "ancestor", ancestor_str)) {
        if (!newcrush.name_exists(ancestor_str)) {
          err = -ENOENT;
          ss << "ancestor item '" << ancestor_str
@@ -10073,7 +10542,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     _get_pending_crush(newcrush);
 
     string name;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     if (!newcrush.name_exists(name)) {
       err = -ENOENT;
       ss << "device '" << name << "' does not appear in the crush map";
@@ -10087,7 +10556,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     double w;
-    if (!cmd_getval(cct, cmdmap, "weight", w)) {
+    if (!cmd_getval(cmdmap, "weight", w)) {
       ss << "unable to parse weight value '"
         << cmd_vartype_stringify(cmdmap.at("weight")) << "'";
       err = -EINVAL;
@@ -10112,7 +10581,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     _get_pending_crush(newcrush);
 
     string name;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     if (!newcrush.name_exists(name)) {
       err = -ENOENT;
       ss << "device '" << name << "' does not appear in the crush map";
@@ -10126,7 +10595,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     double w;
-    if (!cmd_getval(cct, cmdmap, "weight", w)) {
+    if (!cmd_getval(cmdmap, "weight", w)) {
       ss << "unable to parse weight value '"
         << cmd_vartype_stringify(cmdmap.at("weight")) << "'";
       err = -EINVAL;
@@ -10151,7 +10620,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     err = 0;
     string profile;
-    cmd_getval(cct, cmdmap, "profile", profile);
+    cmd_getval(cmdmap, "profile", profile);
     if (profile == "legacy" || profile == "argonaut") {
       newcrush.set_tunables_legacy();
     } else if (profile == "bobtail") {
@@ -10190,10 +10659,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     err = 0;
     string tunable;
-    cmd_getval(cct, cmdmap, "tunable", tunable);
+    cmd_getval(cmdmap, "tunable", tunable);
 
     int64_t value = -1;
-    if (!cmd_getval(cct, cmdmap, "value", value)) {
+    if (!cmd_getval(cmdmap, "value", value)) {
       err = -EINVAL;
       ss << "failed to parse integer value "
         << cmd_vartype_stringify(cmdmap.at("value"));
@@ -10228,10 +10697,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd crush rule create-simple") {
     string name, root, type, mode;
-    cmd_getval(cct, cmdmap, "name", name);
-    cmd_getval(cct, cmdmap, "root", root);
-    cmd_getval(cct, cmdmap, "type", type);
-    cmd_getval(cct, cmdmap, "mode", mode);
+    cmd_getval(cmdmap, "name", name);
+    cmd_getval(cmdmap, "root", root);
+    cmd_getval(cmdmap, "type", type);
+    cmd_getval(cmdmap, "mode", mode);
     if (mode == "")
       mode = "firstn";
 
@@ -10269,10 +10738,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd crush rule create-replicated") {
     string name, root, type, device_class;
-    cmd_getval(cct, cmdmap, "name", name);
-    cmd_getval(cct, cmdmap, "root", root);
-    cmd_getval(cct, cmdmap, "type", type);
-    cmd_getval(cct, cmdmap, "class", device_class);
+    cmd_getval(cmdmap, "name", name);
+    cmd_getval(cmdmap, "root", root);
+    cmd_getval(cmdmap, "type", type);
+    cmd_getval(cmdmap, "class", device_class);
 
     if (osdmap.crush->rule_exists(name)) {
       // The name is uniquely associated to a ruleid and the rule it contains
@@ -10309,7 +10778,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd erasure-code-profile rm") {
     string name;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
 
     if (erasure_code_profile_in_use(pending_inc.new_pools, name, &ss))
       goto wait;
@@ -10340,17 +10809,29 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd erasure-code-profile set") {
     string name;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     vector<string> profile;
-    cmd_getval(cct, cmdmap, "profile", profile);
+    cmd_getval(cmdmap, "profile", profile);
 
     bool force = false;
-    cmd_getval(cct, cmdmap, "force", force);
+    cmd_getval(cmdmap, "force", force);
 
     map<string,string> profile_map;
     err = parse_erasure_code_profile(profile, &profile_map, &ss);
     if (err)
       goto reply;
+    if (auto found = profile_map.find("crush-failure-domain");
+       found != profile_map.end()) {
+      const auto& failure_domain = found->second;
+      int failure_domain_type = osdmap.crush->get_type_id(failure_domain);
+      if (failure_domain_type < 0) {
+       ss << "erasure-code-profile " << profile_map
+         << " contains an invalid failure-domain " << std::quoted(failure_domain);
+       err = -EINVAL;
+       goto reply;
+      }
+    }
+
     if (profile_map.find("plugin") == profile_map.end()) {
       ss << "erasure-code-profile " << profile_map
         << " must contain a plugin entry" << std::endl;
@@ -10406,9 +10887,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     if (err)
       goto reply;
     string name, poolstr;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     string profile;
-    cmd_getval(cct, cmdmap, "profile", profile);
+    cmd_getval(cmdmap, "profile", profile);
     if (profile == "")
       profile = "default";
     if (profile == "default") {
@@ -10462,7 +10943,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd crush rule rm") {
     string name;
-    cmd_getval(cct, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
 
     if (!osdmap.crush->rule_exists(name)) {
       ss << "rule " << name << " does not exist";
@@ -10506,8 +10987,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd crush rule rename") {
     string srcname;
     string dstname;
-    cmd_getval(cct, cmdmap, "srcname", srcname);
-    cmd_getval(cct, cmdmap, "dstname", dstname);
+    cmd_getval(cmdmap, "srcname", srcname);
+    cmd_getval(cmdmap, "dstname", dstname);
     if (srcname.empty() || dstname.empty()) {
       ss << "must specify both source rule name and destination rule name";
       err = -EINVAL;
@@ -10544,7 +11025,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd setmaxosd") {
     int64_t newmax;
-    if (!cmd_getval(cct, cmdmap, "newmax", newmax)) {
+    if (!cmd_getval(cmdmap, "newmax", newmax)) {
       ss << "unable to parse 'newmax' value '"
          << cmd_vartype_stringify(cmdmap.at("newmax")) << "'";
       err = -EINVAL;
@@ -10586,7 +11067,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             prefix == "osd set-backfillfull-ratio" ||
              prefix == "osd set-nearfull-ratio") {
     double n;
-    if (!cmd_getval(cct, cmdmap, "ratio", n)) {
+    if (!cmd_getval(cmdmap, "ratio", n)) {
       ss << "unable to parse 'ratio' value '"
          << cmd_vartype_stringify(cmdmap.at("ratio")) << "'";
       err = -EINVAL;
@@ -10605,9 +11086,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd set-require-min-compat-client") {
     string v;
-    cmd_getval(cct, cmdmap, "version", v);
-    int vno = ceph_release_from_name(v.c_str());
-    if (vno <= 0) {
+    cmd_getval(cmdmap, "version", v);
+    ceph_release_t vno = ceph_release_from_name(v);
+    if (!vno) {
       ss << "version " << v << " is not recognized";
       err = -EINVAL;
       goto reply;
@@ -10618,19 +11099,17 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     newmap.require_min_compat_client = vno;
     auto mvno = newmap.get_min_compat_client();
     if (vno < mvno) {
-      ss << "osdmap current utilizes features that require "
-        << ceph_release_name(mvno)
-        << "; cannot set require_min_compat_client below that to "
-        << ceph_release_name(vno);
+      ss << "osdmap current utilizes features that require " << mvno
+        << "; cannot set require_min_compat_client below that to " << vno;
       err = -EPERM;
       goto reply;
     }
     bool sure = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_mean_it", sure);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
     if (!sure) {
       FeatureMap m;
       mon->get_combined_feature_map(&m);
-      uint64_t features = ceph_release_features(vno);
+      uint64_t features = ceph_release_features(ceph::to_integer<int>(vno));
       bool first = true;
       bool ok = true;
       for (int type : {
@@ -10664,7 +11143,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        goto reply;
       }
     }
-    ss << "set require_min_compat_client to " << ceph_release_name(vno);
+    ss << "set require_min_compat_client to " << vno;
     pending_inc.new_require_min_compat_client = vno;
     getline(ss, rs);
     wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
@@ -10678,13 +11157,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd set") {
     bool sure = false;
-    cmd_getval(g_ceph_context, cmdmap, "yes_i_really_mean_it", sure);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
 
     string key;
-    cmd_getval(cct, cmdmap, "key", key);
-    if (key == "full")
-      return prepare_set_flag(op, CEPH_OSDMAP_FULL);
-    else if (key == "pause")
+    cmd_getval(cmdmap, "key", key);
+    if (key == "pause")
       return prepare_set_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
     else if (key == "noup")
       return prepare_set_flag(op, CEPH_OSDMAP_NOUP);
@@ -10717,7 +11194,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
       // The release check here is required because for OSD_PGLOG_HARDLIMIT,
       // we are reusing a jewel feature bit that was retired in luminous.
-      if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
+      if (osdmap.require_osd_release >= ceph_release_t::luminous &&
          (HAVE_FEATURE(osdmap.get_up_osd_features(), OSD_PGLOG_HARDLIMIT)
           || sure)) {
        return prepare_set_flag(op, CEPH_OSDMAP_PGLOG_HARDLIMIT);
@@ -10733,10 +11210,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd unset") {
     string key;
-    cmd_getval(cct, cmdmap, "key", key);
-    if (key == "full")
-      return prepare_unset_flag(op, CEPH_OSDMAP_FULL);
-    else if (key == "pause")
+    cmd_getval(cmdmap, "key", key);
+    if (key == "pause")
       return prepare_unset_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
     else if (key == "noup")
       return prepare_unset_flag(op, CEPH_OSDMAP_NOUP);
@@ -10767,11 +11242,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd require-osd-release") {
     string release;
-    cmd_getval(cct, cmdmap, "release", release);
+    cmd_getval(cmdmap, "release", release);
     bool sure = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_mean_it", sure);
-    int rel = ceph_release_from_name(release.c_str());
-    if (rel <= 0) {
+    cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
+    ceph_release_t rel = ceph_release_from_name(release.c_str());
+    if (!rel) {
       ss << "unrecognized release " << release;
       err = -EINVAL;
       goto reply;
@@ -10781,14 +11256,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = 0;
       goto reply;
     }
-    ceph_assert(osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS);
+    ceph_assert(osdmap.require_osd_release >= ceph_release_t::luminous);
     if (!osdmap.get_num_up_osds() && !sure) {
       ss << "Not advisable to continue since no OSDs are up. Pass "
         << "--yes-i-really-mean-it if you really wish to continue.";
       err = -EPERM;
       goto reply;
     }
-    if (rel == CEPH_RELEASE_MIMIC) {
+    if (rel == ceph_release_t::mimic) {
       if (!mon->monmap->get_required_features().contains_all(
            ceph::features::mon::FEATURE_MIMIC)) {
        ss << "not all mons are mimic";
@@ -10801,7 +11276,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        err = -EPERM;
        goto reply;
       }
-    } else if (rel == CEPH_RELEASE_NAUTILUS) {
+    } else if (rel == ceph_release_t::nautilus) {
       if (!mon->monmap->get_required_features().contains_all(
            ceph::features::mon::FEATURE_NAUTILUS)) {
        ss << "not all mons are nautilus";
@@ -10814,6 +11289,19 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        err = -EPERM;
        goto reply;
       }
+    } else if (rel == ceph_release_t::octopus) {
+      if (!mon->monmap->get_required_features().contains_all(
+           ceph::features::mon::FEATURE_OCTOPUS)) {
+       ss << "not all mons are octopus";
+       err = -EPERM;
+       goto reply;
+      }
+      if ((!HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_OCTOPUS))
+           && !sure) {
+       ss << "not all up OSDs have CEPH_FEATURE_SERVER_OCTOPUS feature";
+       err = -EPERM;
+       goto reply;
+      }
     } else {
       ss << "not supported for this release yet";
       err = -EPERM;
@@ -10827,16 +11315,20 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     pending_inc.new_require_osd_release = rel;
     goto update;
   } else if (prefix == "osd down" ||
-            prefix == "osd out" ||
-            prefix == "osd in" ||
-            prefix == "osd rm") {
+             prefix == "osd out" ||
+             prefix == "osd in" ||
+             prefix == "osd rm" ||
+             prefix == "osd stop") {
 
     bool any = false;
     bool stop = false;
     bool verbose = true;
+    bool definitely_dead = false;
 
     vector<string> idvec;
-    cmd_getval(cct, cmdmap, "ids", idvec);
+    cmd_getval(cmdmap, "ids", idvec);
+    cmd_getval(cmdmap, "definitely_dead", definitely_dead);
+    derr << "definitely_dead " << (int)definitely_dead << dendl;
     for (unsigned j = 0; j < idvec.size() && !stop; j++) {
       set<int> osds;
 
@@ -10875,6 +11367,15 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
            ss << "marked down osd." << osd << ". ";
            any = true;
          }
+         if (definitely_dead) {
+           if (!pending_inc.new_xinfo.count(osd)) {
+             pending_inc.new_xinfo[osd] = osdmap.osd_xinfo[osd];
+           }
+           if (pending_inc.new_xinfo[osd].dead_epoch < pending_inc.epoch) {
+             any = true;
+           }
+           pending_inc.new_xinfo[osd].dead_epoch = pending_inc.epoch;
+         }
         } else if (prefix == "osd out") {
          if (osdmap.is_out(osd)) {
             if (verbose)
@@ -10935,6 +11436,19 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             }
            any = true;
          }
+        } else if (prefix == "osd stop") {
+          if (osdmap.is_stop(osd)) {
+            if (verbose)
+              ss << "osd." << osd << " is already stopped. ";
+          } else if (osdmap.is_down(osd)) {
+            pending_inc.pending_osd_state_set(osd, CEPH_OSD_STOP);
+            ss << "stop down osd." << osd << ". ";
+            any = true;
+          } else {
+            pending_inc.pending_osd_state_set(osd, CEPH_OSD_UP | CEPH_OSD_STOP);
+            ss << "stop osd." << osd << ". ";
+            any = true;
+          }
         }
       }
     }
@@ -10960,8 +11474,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     unsigned flags = 0;
     vector<string> who;
     if (prefix == "osd set-group" || prefix == "osd unset-group") {
-      cmd_getval(cct, cmdmap, "flags", flag_str);
-      cmd_getval(cct, cmdmap, "who", who);
+      cmd_getval(cmdmap, "flags", flag_str);
+      cmd_getval(cmdmap, "who", who);
       vector<string> raw_flags;
       boost::split(raw_flags, flag_str, boost::is_any_of(","));
       for (auto& f : raw_flags) {
@@ -10981,7 +11495,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
         }
       }
     } else {
-      cmd_getval(cct, cmdmap, "ids", who);
+      cmd_getval(cmdmap, "ids", who);
       if (prefix.find("noup") != string::npos)
         flags = CEPH_OSD_NOUP;
       else if (prefix.find("nodown") != string::npos)
@@ -11108,7 +11622,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
   } else if (prefix == "osd pg-temp") {
     string pgidstr;
-    if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
+    if (!cmd_getval(cmdmap, "pgid", pgidstr)) {
       ss << "unable to parse 'pgid' value '"
          << cmd_vartype_stringify(cmdmap.at("pgid")) << "'";
       err = -EINVAL;
@@ -11133,7 +11647,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     vector<int64_t> id_vec;
     vector<int32_t> new_pg_temp;
-    cmd_getval(cct, cmdmap, "id", id_vec);
+    cmd_getval(cmdmap, "id", id_vec);
     if (id_vec.empty())  {
       pending_inc.new_pg_temp[pgid] = mempool::osdmap::vector<int>();
       ss << "done cleaning up pg_temp of " << pgid;
@@ -11170,7 +11684,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     goto update;
   } else if (prefix == "osd primary-temp") {
     string pgidstr;
-    if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
+    if (!cmd_getval(cmdmap, "pgid", pgidstr)) {
       ss << "unable to parse 'pgid' value '"
          << cmd_vartype_stringify(cmdmap.at("pgid")) << "'";
       err = -EINVAL;
@@ -11189,7 +11703,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     int64_t osd;
-    if (!cmd_getval(cct, cmdmap, "id", osd)) {
+    if (!cmd_getval(cmdmap, "id", osd)) {
       ss << "unable to parse 'id' value '"
          << cmd_vartype_stringify(cmdmap.at("id")) << "'";
       err = -EINVAL;
@@ -11201,10 +11715,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    if (osdmap.require_min_compat_client > 0 &&
-       osdmap.require_min_compat_client < CEPH_RELEASE_FIREFLY) {
+    if (osdmap.require_min_compat_client != ceph_release_t::unknown &&
+       osdmap.require_min_compat_client < ceph_release_t::firefly) {
       ss << "require_min_compat_client "
-        << ceph_release_name(osdmap.require_min_compat_client)
+        << osdmap.require_min_compat_client
         << " < firefly, which is required for primary-temp";
       err = -EPERM;
       goto reply;
@@ -11216,7 +11730,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "pg repeer") {
     pg_t pgid;
     string pgidstr;
-    cmd_getval(cct, cmdmap, "pgid", pgidstr);
+    cmd_getval(cmdmap, "pgid", pgidstr);
     if (!pgid.parse(pgidstr.c_str())) {
       ss << "invalid pgid '" << pgidstr << "'";
       err = -EINVAL;
@@ -11262,9 +11776,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
              prefix == "osd rm-pg-upmap" ||
              prefix == "osd pg-upmap-items" ||
              prefix == "osd rm-pg-upmap-items") {
-    if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
+    if (osdmap.require_min_compat_client < ceph_release_t::luminous) {
       ss << "min_compat_client "
-        << ceph_release_name(osdmap.require_min_compat_client)
+        << osdmap.require_min_compat_client
         << " < luminous, which is required for pg-upmap. "
          << "Try 'ceph osd set-require-min-compat-client luminous' "
          << "before using the new interface";
@@ -11277,7 +11791,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     if (err < 0)
       goto reply;
     string pgidstr;
-    if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
+    if (!cmd_getval(cmdmap, "pgid", pgidstr)) {
       ss << "unable to parse 'pgid' value '"
          << cmd_vartype_stringify(cmdmap.at("pgid")) << "'";
       err = -EINVAL;
@@ -11352,7 +11866,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     case OP_PG_UPMAP:
       {
         vector<int64_t> id_vec;
-        if (!cmd_getval(cct, cmdmap, "id", id_vec)) {
+        if (!cmd_getval(cmdmap, "id", id_vec)) {
           ss << "unable to parse 'id' value(s) '"
              << cmd_vartype_stringify(cmdmap.at("id")) << "'";
           err = -EINVAL;
@@ -11412,7 +11926,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     case OP_PG_UPMAP_ITEMS:
       {
         vector<int64_t> id_vec;
-        if (!cmd_getval(cct, cmdmap, "id", id_vec)) {
+        if (!cmd_getval(cmdmap, "id", id_vec)) {
           ss << "unable to parse 'id' value(s) '"
              << cmd_vartype_stringify(cmdmap.at("id")) << "'";
           err = -EINVAL;
@@ -11494,14 +12008,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     goto update;
   } else if (prefix == "osd primary-affinity") {
     int64_t id;
-    if (!cmd_getval(cct, cmdmap, "id", id)) {
+    if (!cmd_getval(cmdmap, "id", id)) {
       ss << "invalid osd id value '"
          << cmd_vartype_stringify(cmdmap.at("id")) << "'";
       err = -EINVAL;
       goto reply;
     }
     double w;
-    if (!cmd_getval(cct, cmdmap, "weight", w)) {
+    if (!cmd_getval(cmdmap, "weight", w)) {
       ss << "unable to parse 'weight' value '"
         << cmd_vartype_stringify(cmdmap.at("weight")) << "'";
       err = -EINVAL;
@@ -11513,10 +12027,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = -EINVAL;
       goto reply;
     }
-    if (osdmap.require_min_compat_client > 0 &&
-       osdmap.require_min_compat_client < CEPH_RELEASE_FIREFLY) {
+    if (osdmap.require_min_compat_client != ceph_release_t::unknown &&
+       osdmap.require_min_compat_client < ceph_release_t::firefly) {
       ss << "require_min_compat_client "
-        << ceph_release_name(osdmap.require_min_compat_client)
+        << osdmap.require_min_compat_client
         << " < firefly, which is required for primary-affinity";
       err = -EPERM;
       goto reply;
@@ -11535,14 +12049,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
   } else if (prefix == "osd reweight") {
     int64_t id;
-    if (!cmd_getval(cct, cmdmap, "id", id)) {
+    if (!cmd_getval(cmdmap, "id", id)) {
       ss << "unable to parse osd id value '"
          << cmd_vartype_stringify(cmdmap.at("id")) << "'";
       err = -EINVAL;
       goto reply;
     }
     double w;
-    if (!cmd_getval(cct, cmdmap, "weight", w)) {
+    if (!cmd_getval(cmdmap, "weight", w)) {
       ss << "unable to parse weight value '"
          << cmd_vartype_stringify(cmdmap.at("weight")) << "'";
       err = -EINVAL;
@@ -11581,14 +12095,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd lost") {
     int64_t id;
-    if (!cmd_getval(cct, cmdmap, "id", id)) {
+    if (!cmd_getval(cmdmap, "id", id)) {
       ss << "unable to parse osd id value '"
          << cmd_vartype_stringify(cmdmap.at("id")) << "'";
       err = -EINVAL;
       goto reply;
     }
     bool sure = false;
-    cmd_getval(g_ceph_context, cmdmap, "yes_i_really_mean_it", sure);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
     if (!sure) {
       ss << "are you SURE?  this might mean real, permanent data loss.  pass "
            "--yes-i-really-mean-it if you really do.";
@@ -11636,7 +12150,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     int64_t id;
-    if (!cmd_getval(cct, cmdmap, "id", id)) {
+    if (!cmd_getval(cmdmap, "id", id)) {
       auto p = cmdmap.find("id");
       if (p == cmdmap.end()) {
        ss << "no osd id specified";
@@ -11655,7 +12169,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     bool sure = false;
-    cmd_getval(g_ceph_context, cmdmap, "yes_i_really_mean_it", sure);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
     if (!sure) {
       ss << "Are you SURE?  Did you verify with 'ceph osd safe-to-destroy'?  "
         << "This will mean real, permanent data loss, as well "
@@ -11769,7 +12283,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     // optional id provided?
     int64_t id = -1, cmd_id = -1;
-    if (cmd_getval(cct, cmdmap, "id", cmd_id)) {
+    if (cmd_getval(cmdmap, "id", cmd_id)) {
       if (cmd_id < 0) {
        ss << "invalid osd id value '" << cmd_id << "'";
        err = -EINVAL;
@@ -11780,7 +12294,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     uuid_d uuid;
     string uuidstr;
-    if (cmd_getval(cct, cmdmap, "uuid", uuidstr)) {
+    if (cmd_getval(cmdmap, "uuid", uuidstr)) {
       if (!uuid.parse(uuidstr.c_str())) {
         ss << "invalid uuid value '" << uuidstr << "'";
         err = -EINVAL;
@@ -11847,7 +12361,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd blacklist") {
     string addrstr;
-    cmd_getval(cct, cmdmap, "addr", addrstr);
+    cmd_getval(cmdmap, "addr", addrstr);
     entity_addr_t addr;
     if (!addr.parse(addrstr.c_str(), 0)) {
       ss << "unable to parse address " << addrstr;
@@ -11855,7 +12369,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     else {
-      if (osdmap.require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+      if (osdmap.require_osd_release >= ceph_release_t::nautilus) {
        // always blacklist type ANY
        addr.set_type(entity_addr_t::TYPE_ANY);
       } else {
@@ -11863,12 +12377,12 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
 
       string blacklistop;
-      cmd_getval(cct, cmdmap, "blacklistop", blacklistop);
+      cmd_getval(cmdmap, "blacklistop", blacklistop);
       if (blacklistop == "add") {
        utime_t expires = ceph_clock_now();
        double d;
        // default one hour
-       cmd_getval(cct, cmdmap, "expire", d,
+       cmd_getval(cmdmap, "expire", d,
           g_conf()->mon_osd_blacklist_default_expire);
        expires += d;
 
@@ -11908,7 +12422,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
   } else if (prefix == "osd pool mksnap") {
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool = osdmap.lookup_pg_pool_name(poolstr.c_str());
     if (pool < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -11916,7 +12430,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     string snapname;
-    cmd_getval(cct, cmdmap, "snap", snapname);
+    cmd_getval(cmdmap, "snap", snapname);
     const pg_pool_t *p = osdmap.get_pg_pool(pool);
     if (p->is_unmanaged_snaps_mode()) {
       ss << "pool " << poolstr << " is in unmanaged snaps mode";
@@ -11951,7 +12465,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd pool rmsnap") {
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool = osdmap.lookup_pg_pool_name(poolstr.c_str());
     if (pool < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -11959,7 +12473,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     string snapname;
-    cmd_getval(cct, cmdmap, "snap", snapname);
+    cmd_getval(cmdmap, "snap", snapname);
     const pg_pool_t *p = osdmap.get_pg_pool(pool);
     if (p->is_unmanaged_snaps_mode()) {
       ss << "pool " << poolstr << " is in unmanaged snaps mode";
@@ -11992,17 +12506,17 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd pool create") {
     int64_t pg_num, pg_num_min;
     int64_t pgp_num;
-    cmd_getval(cct, cmdmap, "pg_num", pg_num, int64_t(0));
-    cmd_getval(cct, cmdmap, "pgp_num", pgp_num, pg_num);
-    cmd_getval(cct, cmdmap, "pg_num_min", pg_num_min, int64_t(0));
+    cmd_getval(cmdmap, "pg_num", pg_num, int64_t(0));
+    cmd_getval(cmdmap, "pgp_num", pgp_num, pg_num);
+    cmd_getval(cmdmap, "pg_num_min", pg_num_min, int64_t(0));
 
     string pool_type_str;
-    cmd_getval(cct, cmdmap, "pool_type", pool_type_str);
+    cmd_getval(cmdmap, "pool_type", pool_type_str);
     if (pool_type_str.empty())
       pool_type_str = g_conf().get_val<string>("osd_pool_default_type");
 
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool_id = osdmap.lookup_pg_pool_name(poolstr);
     if (pool_id >= 0) {
       const pg_pool_t *p = osdmap.get_pg_pool(pool_id);
@@ -12030,9 +12544,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     bool implicit_rule_creation = false;
     int64_t expected_num_objects = 0;
     string rule_name;
-    cmd_getval(cct, cmdmap, "rule", rule_name);
+    cmd_getval(cmdmap, "rule", rule_name);
     string erasure_code_profile;
-    cmd_getval(cct, cmdmap, "erasure_code_profile", erasure_code_profile);
+    cmd_getval(cmdmap, "erasure_code_profile", erasure_code_profile);
 
     if (pool_type == pg_pool_t::TYPE_ERASURE) {
       if (erasure_code_profile == "")
@@ -12066,7 +12580,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
          rule_name = poolstr;
        }
       }
-      cmd_getval(g_ceph_context, cmdmap, "expected_num_objects",
+      cmd_getval(cmdmap, "expected_num_objects",
                  expected_num_objects, int64_t(0));
     } else {
       //NOTE:for replicated pool,cmd_map will put rule_name to erasure_code_profile field
@@ -12083,7 +12597,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
         }
         rule_name = erasure_code_profile;
       } else { // cmd is well-formed
-        cmd_getval(g_ceph_context, cmdmap, "expected_num_objects",
+        cmd_getval(cmdmap, "expected_num_objects",
                    expected_num_objects, int64_t(0));
       }
     }
@@ -12105,27 +12619,43 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    if (expected_num_objects > 0 &&
-       cct->_conf->osd_objectstore == "filestore" &&
-       cct->_conf->filestore_merge_threshold > 0) {
+    set<int32_t> osds;
+    osdmap.get_all_osds(osds);
+    bool has_filestore_osd = std::any_of(osds.begin(), osds.end(), [this](int osd) {
+      string type;
+      if (!get_osd_objectstore_type(osd, &type)) {
+        return type == "filestore";
+      } else {
+        return false;
+      }
+    });
+
+    if (has_filestore_osd &&
+        expected_num_objects > 0 &&
+        cct->_conf->filestore_merge_threshold > 0) {
       ss << "'expected_num_objects' requires 'filestore_merge_threshold < 0'";
       err = -EINVAL;
       goto reply;
     }
 
-    if (expected_num_objects == 0 &&
-       cct->_conf->osd_objectstore == "filestore" &&
-       cct->_conf->filestore_merge_threshold < 0) {
+    if (has_filestore_osd &&
+        expected_num_objects == 0 &&
+        cct->_conf->filestore_merge_threshold < 0) {
       int osds = osdmap.get_num_osds();
-      if (osds && (pg_num >= 1024 || pg_num / osds >= 100)) {
+      bool sure = false;
+      cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
+      if (!sure && osds && (pg_num >= 1024 || pg_num / osds >= 100)) {
         ss << "For better initial performance on pools expected to store a "
-          << "large number of objects, consider supplying the "
-          << "expected_num_objects parameter when creating the pool.\n";
+           << "large number of objects, consider supplying the "
+           << "expected_num_objects parameter when creating the pool."
+           << " Pass --yes-i-really-mean-it to ignore it";
+        err = -EPERM;
+        goto reply;
       }
     }
 
     int64_t fast_read_param;
-    cmd_getval(cct, cmdmap, "fast_read", fast_read_param, int64_t(-1));
+    cmd_getval(cmdmap, "fast_read", fast_read_param, int64_t(-1));
     FastReadType fast_read = FAST_READ_DEFAULT;
     if (fast_read_param == 0)
       fast_read = FAST_READ_OFF;
@@ -12133,11 +12663,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       fast_read = FAST_READ_ON;
 
     int64_t repl_size = 0;
-    cmd_getval(cct, cmdmap, "size", repl_size);
+    cmd_getval(cmdmap, "size", repl_size);
     int64_t target_size_bytes = 0;
     double target_size_ratio = 0.0;
-    cmd_getval(cct, cmdmap, "target_size_bytes", target_size_bytes);
-    cmd_getval(cct, cmdmap, "target_size_ratio", target_size_ratio);
+    cmd_getval(cmdmap, "target_size_bytes", target_size_bytes);
+    cmd_getval(cmdmap, "target_size_ratio", target_size_ratio);
+
+    string pg_autoscale_mode;
+    cmd_getval(cmdmap, "autoscale_mode", pg_autoscale_mode);
 
     err = prepare_new_pool(poolstr,
                           -1, // default crush rule
@@ -12147,6 +12680,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                           erasure_code_profile, pool_type,
                            (uint64_t)expected_num_objects,
                            fast_read,
+                          pg_autoscale_mode,
                           &ss);
     if (err < 0) {
       switch(err) {
@@ -12174,8 +12708,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
              prefix == "osd pool rm") {
     // osd pool delete/rm <poolname> <poolname again> --yes-i-really-really-mean-it
     string poolstr, poolstr2, sure;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
-    cmd_getval(cct, cmdmap, "pool2", poolstr2);
+    cmd_getval(cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool2", poolstr2);
     int64_t pool = osdmap.lookup_pg_pool_name(poolstr.c_str());
     if (pool < 0) {
       ss << "pool '" << poolstr << "' does not exist";
@@ -12184,9 +12718,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     bool force_no_fake = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_really_mean_it", force_no_fake);
+    cmd_getval(cmdmap, "yes_i_really_really_mean_it", force_no_fake);
     bool force = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_really_mean_it_not_faking", force);
+    cmd_getval(cmdmap, "yes_i_really_really_mean_it_not_faking", force);
     if (poolstr2 != poolstr ||
        (!force && !force_no_fake)) {
       ss << "WARNING: this will *PERMANENTLY DESTROY* all data stored in pool " << poolstr
@@ -12205,8 +12739,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     goto update;
   } else if (prefix == "osd pool rename") {
     string srcpoolstr, destpoolstr;
-    cmd_getval(cct, cmdmap, "srcpool", srcpoolstr);
-    cmd_getval(cct, cmdmap, "destpool", destpoolstr);
+    cmd_getval(cmdmap, "srcpool", srcpoolstr);
+    cmd_getval(cmdmap, "destpool", destpoolstr);
     int64_t pool_src = osdmap.lookup_pg_pool_name(srcpoolstr.c_str());
     int64_t pool_dst = osdmap.lookup_pg_pool_name(destpoolstr.c_str());
 
@@ -12263,7 +12797,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     if (err)
       goto reply;
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool_id = osdmap.lookup_pg_pool_name(poolstr);
     if (pool_id < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -12271,7 +12805,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     string tierpoolstr;
-    cmd_getval(cct, cmdmap, "tierpool", tierpoolstr);
+    cmd_getval(cmdmap, "tierpool", tierpoolstr);
     int64_t tierpool_id = osdmap.lookup_pg_pool_name(tierpoolstr);
     if (tierpool_id < 0) {
       ss << "unrecognized pool '" << tierpoolstr << "'";
@@ -12289,7 +12823,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     // make sure new tier is empty
     string force_nonempty;
-    cmd_getval(cct, cmdmap, "force_nonempty", force_nonempty);
+    cmd_getval(cmdmap, "force_nonempty", force_nonempty);
     const pool_stat_t *pstats = mon->mgrstatmon()->get_pool_stat(tierpool_id);
     if (pstats && pstats->stats.sum.num_objects != 0 &&
        force_nonempty != "--force-nonempty") {
@@ -12327,7 +12861,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd tier remove" ||
              prefix == "osd tier rm") {
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool_id = osdmap.lookup_pg_pool_name(poolstr);
     if (pool_id < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -12335,7 +12869,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     string tierpoolstr;
-    cmd_getval(cct, cmdmap, "tierpool", tierpoolstr);
+    cmd_getval(cmdmap, "tierpool", tierpoolstr);
     int64_t tierpool_id = osdmap.lookup_pg_pool_name(tierpoolstr);
     if (tierpool_id < 0) {
       ss << "unrecognized pool '" << tierpoolstr << "'";
@@ -12391,7 +12925,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     if (err)
       goto reply;
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool_id = osdmap.lookup_pg_pool_name(poolstr);
     if (pool_id < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -12399,7 +12933,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     string overlaypoolstr;
-    cmd_getval(cct, cmdmap, "overlaypool", overlaypoolstr);
+    cmd_getval(cmdmap, "overlaypool", overlaypoolstr);
     int64_t overlaypool_id = osdmap.lookup_pg_pool_name(overlaypoolstr);
     if (overlaypool_id < 0) {
       ss << "unrecognized pool '" << overlaypoolstr << "'";
@@ -12444,7 +12978,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd tier remove-overlay" ||
              prefix == "osd tier rm-overlay") {
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool_id = osdmap.lookup_pg_pool_name(poolstr);
     if (pool_id < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -12489,7 +13023,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     if (err)
       goto reply;
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool_id = osdmap.lookup_pg_pool_name(poolstr);
     if (pool_id < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -12504,17 +13038,23 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     string modestr;
-    cmd_getval(cct, cmdmap, "mode", modestr);
+    cmd_getval(cmdmap, "mode", modestr);
     pg_pool_t::cache_mode_t mode = pg_pool_t::get_cache_mode_from_str(modestr);
-    if (mode < 0) {
+    if (int(mode) < 0) {
       ss << "'" << modestr << "' is not a valid cache mode";
       err = -EINVAL;
       goto reply;
     }
 
     bool sure = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_mean_it", sure);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
 
+    if (mode == pg_pool_t::CACHEMODE_FORWARD ||
+       mode == pg_pool_t::CACHEMODE_READFORWARD) {
+      ss << "'" << modestr << "' is no longer a supported cache mode";
+      err = -EPERM;
+      goto reply;
+    }
     if ((mode != pg_pool_t::CACHEMODE_WRITEBACK &&
         mode != pg_pool_t::CACHEMODE_NONE &&
         mode != pg_pool_t::CACHEMODE_PROXY &&
@@ -12539,10 +13079,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     /* Mode description:
      *
      *  none:       No cache-mode defined
-     *  forward:    Forward all reads and writes to base pool
+     *  forward:    Forward all reads and writes to base pool [removed]
      *  writeback:  Cache writes, promote reads from base pool
      *  readonly:   Forward writes to base pool
-     *  readforward: Writes are in writeback mode, Reads are in forward mode
+     *  readforward: Writes are in writeback mode, Reads are in forward mode [removed]
      *  proxy:       Proxy all reads and writes to base pool
      *  readproxy:   Writes are in writeback mode, Reads are in proxy mode
      *
@@ -12550,10 +13090,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
      *
      *  none -> any
      *  forward -> proxy || readforward || readproxy || writeback || any IF num_objects_dirty == 0
-     *  proxy -> forward || readforward || readproxy || writeback || any IF num_objects_dirty == 0
+     *  proxy -> readproxy || writeback || any IF num_objects_dirty == 0
      *  readforward -> forward || proxy || readproxy || writeback || any IF num_objects_dirty == 0
-     *  readproxy -> forward || proxy || readforward || writeback || any IF num_objects_dirty == 0
-     *  writeback -> readforward || readproxy || forward || proxy
+     *  readproxy -> proxy || writeback || any IF num_objects_dirty == 0
+     *  writeback -> readproxy || proxy
      *  readonly -> any
      */
 
@@ -12562,19 +13102,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     // whatever mode is on the pending state.
 
     if (p->cache_mode == pg_pool_t::CACHEMODE_WRITEBACK &&
-        (mode != pg_pool_t::CACHEMODE_FORWARD &&
-         mode != pg_pool_t::CACHEMODE_PROXY &&
-         mode != pg_pool_t::CACHEMODE_READFORWARD &&
+        (mode != pg_pool_t::CACHEMODE_PROXY &&
          mode != pg_pool_t::CACHEMODE_READPROXY)) {
       ss << "unable to set cache-mode '" << pg_pool_t::get_cache_mode_name(mode)
          << "' on a '" << pg_pool_t::get_cache_mode_name(p->cache_mode)
          << "' pool; only '"
-         << pg_pool_t::get_cache_mode_name(pg_pool_t::CACHEMODE_FORWARD)
-        << "','"
-         << pg_pool_t::get_cache_mode_name(pg_pool_t::CACHEMODE_PROXY)
-        << "','"
-         << pg_pool_t::get_cache_mode_name(pg_pool_t::CACHEMODE_READFORWARD)
-        << "','"
          << pg_pool_t::get_cache_mode_name(pg_pool_t::CACHEMODE_READPROXY)
         << "' allowed.";
       err = -EINVAL;
@@ -12582,25 +13114,19 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
     if ((p->cache_mode == pg_pool_t::CACHEMODE_READFORWARD &&
         (mode != pg_pool_t::CACHEMODE_WRITEBACK &&
-         mode != pg_pool_t::CACHEMODE_FORWARD &&
          mode != pg_pool_t::CACHEMODE_PROXY &&
          mode != pg_pool_t::CACHEMODE_READPROXY)) ||
 
         (p->cache_mode == pg_pool_t::CACHEMODE_READPROXY &&
         (mode != pg_pool_t::CACHEMODE_WRITEBACK &&
-         mode != pg_pool_t::CACHEMODE_FORWARD &&
-         mode != pg_pool_t::CACHEMODE_READFORWARD &&
          mode != pg_pool_t::CACHEMODE_PROXY)) ||
 
         (p->cache_mode == pg_pool_t::CACHEMODE_PROXY &&
         (mode != pg_pool_t::CACHEMODE_WRITEBACK &&
-         mode != pg_pool_t::CACHEMODE_FORWARD &&
-         mode != pg_pool_t::CACHEMODE_READFORWARD &&
          mode != pg_pool_t::CACHEMODE_READPROXY)) ||
 
         (p->cache_mode == pg_pool_t::CACHEMODE_FORWARD &&
         (mode != pg_pool_t::CACHEMODE_WRITEBACK &&
-         mode != pg_pool_t::CACHEMODE_READFORWARD &&
          mode != pg_pool_t::CACHEMODE_PROXY &&
          mode != pg_pool_t::CACHEMODE_READPROXY))) {
 
@@ -12640,7 +13166,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     if (err)
       goto reply;
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool_id = osdmap.lookup_pg_pool_name(poolstr);
     if (pool_id < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -12648,7 +13174,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     string tierpoolstr;
-    cmd_getval(cct, cmdmap, "tierpool", tierpoolstr);
+    cmd_getval(cmdmap, "tierpool", tierpoolstr);
     int64_t tierpool_id = osdmap.lookup_pg_pool_name(tierpoolstr);
     if (tierpool_id < 0) {
       ss << "unrecognized pool '" << tierpoolstr << "'";
@@ -12665,7 +13191,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     int64_t size = 0;
-    if (!cmd_getval(cct, cmdmap, "size", size)) {
+    if (!cmd_getval(cmdmap, "size", size)) {
       ss << "unable to parse 'size' value '"
          << cmd_vartype_stringify(cmdmap.at("size")) << "'";
       err = -EINVAL;
@@ -12681,7 +13207,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
     auto& modestr = g_conf().get_val<string>("osd_tier_default_cache_mode");
     pg_pool_t::cache_mode_t mode = pg_pool_t::get_cache_mode_from_str(modestr);
-    if (mode < 0) {
+    if (int(mode) < 0) {
       ss << "osd tier cache default mode '" << modestr << "' is not a valid cache mode";
       err = -EINVAL;
       goto reply;
@@ -12731,7 +13257,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd pool set-quota") {
     string poolstr;
-    cmd_getval(cct, cmdmap, "pool", poolstr);
+    cmd_getval(cmdmap, "pool", poolstr);
     int64_t pool_id = osdmap.lookup_pg_pool_name(poolstr);
     if (pool_id < 0) {
       ss << "unrecognized pool '" << poolstr << "'";
@@ -12740,7 +13266,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     string field;
-    cmd_getval(cct, cmdmap, "field", field);
+    cmd_getval(cmdmap, "field", field);
     if (field != "max_objects" && field != "max_bytes") {
       ss << "unrecognized field '" << field << "'; should be 'max_bytes' or 'max_objects'";
       err = -EINVAL;
@@ -12749,7 +13275,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     // val could contain unit designations, so we treat as a string
     string val;
-    cmd_getval(cct, cmdmap, "val", val);
+    cmd_getval(cmdmap, "val", val);
     string tss;
     int64_t value;
     if (field == "max_objects") {
@@ -12793,7 +13319,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd force-create-pg") {
     pg_t pgid;
     string pgidstr;
-    cmd_getval(cct, cmdmap, "pgid", pgidstr);
+    cmd_getval(cmdmap, "pgid", pgidstr);
     if (!pgid.parse(pgidstr.c_str())) {
       ss << "invalid pgid '" << pgidstr << "'";
       err = -EINVAL;
@@ -12805,7 +13331,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     bool sure = false;
-    cmd_getval(cct, cmdmap, "yes_i_really_mean_it", sure);
+    cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
     if (!sure) {
       ss << "This command will recreate a lost (as in data lost) PG with data in it, such "
         << "that the cluster will give up ever trying to recover the lost data.  Do this "
@@ -12818,9 +13344,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     bool creating_now;
     {
       std::lock_guard<std::mutex> l(creating_pgs_lock);
-      auto emplaced = creating_pgs.pgs.emplace(pgid,
-                                              make_pair(osdmap.get_epoch(),
-                                                        ceph_clock_now()));
+      auto emplaced = creating_pgs.pgs.emplace(
+       pgid,
+       creating_pgs_t::pg_create_info(osdmap.get_epoch(),
+                                      ceph_clock_now()));
       creating_now = emplaced.second;
     }
     if (creating_now) {
@@ -12865,7 +13392,7 @@ bool OSDMonitor::enforce_pool_op_caps(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
 
-  MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+  auto m = op->get_req<MPoolOp>();
   MonSession *session = op->get_session();
   if (!session) {
     _pool_op_reply(op, -EPERM, osdmap.get_epoch());
@@ -12911,7 +13438,7 @@ bool OSDMonitor::enforce_pool_op_caps(MonOpRequestRef op)
 bool OSDMonitor::preprocess_pool_op(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+  auto m = op->get_req<MPoolOp>();
 
   if (enforce_pool_op_caps(op)) {
     return true;
@@ -12975,7 +13502,7 @@ bool OSDMonitor::preprocess_pool_op(MonOpRequestRef op)
       _pool_op_reply(op, -EINVAL, osdmap.get_epoch());
       return true;
     }
-    if (p->is_removed_snap(m->snapid)) {
+    if (_is_removed_snap(m->pool, m->snapid)) {
       _pool_op_reply(op, 0, osdmap.get_epoch());
       return true;
     }
@@ -12996,10 +13523,47 @@ bool OSDMonitor::preprocess_pool_op(MonOpRequestRef op)
   return false;
 }
 
+bool OSDMonitor::_is_removed_snap(int64_t pool, snapid_t snap)
+{
+  if (!osdmap.have_pg_pool(pool)) {
+    dout(10) << __func__ << " pool " << pool << " snap " << snap
+            << " - pool dne" << dendl;
+    return true;
+  }
+  if (osdmap.in_removed_snaps_queue(pool, snap)) {
+    dout(10) << __func__ << " pool " << pool << " snap " << snap
+            << " - in osdmap removed_snaps_queue" << dendl;
+    return true;
+  }
+  snapid_t begin, end;
+  int r = lookup_purged_snap(pool, snap, &begin, &end);
+  if (r == 0) {
+    dout(10) << __func__ << " pool " << pool << " snap " << snap
+            << " - purged, [" << begin << "," << end << ")" << dendl;
+    return true;
+  }
+  return false;
+}
+
+bool OSDMonitor::_is_pending_removed_snap(int64_t pool, snapid_t snap)
+{
+  if (pending_inc.old_pools.count(pool)) {
+    dout(10) << __func__ << " pool " << pool << " snap " << snap
+            << " - pool pending deletion" << dendl;
+    return true;
+  }
+  if (pending_inc.in_new_removed_snaps(pool, snap)) {
+    dout(10) << __func__ << " pool " << pool << " snap " << snap
+            << " - in pending new_removed_snaps" << dendl;
+    return true;
+  }
+  return false;
+}
+
 bool OSDMonitor::preprocess_pool_op_create(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+  auto m = op->get_req<MPoolOp>();
   int64_t pool = osdmap.lookup_pg_pool_name(m->name.c_str());
   if (pool >= 0) {
     _pool_op_reply(op, 0, osdmap.get_epoch());
@@ -13012,7 +13576,7 @@ bool OSDMonitor::preprocess_pool_op_create(MonOpRequestRef op)
 bool OSDMonitor::prepare_pool_op(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+  auto m = op->get_req<MPoolOp>();
   dout(10) << "prepare_pool_op " << *m << dendl;
   if (m->op == POOL_OP_CREATE) {
     return prepare_pool_op_create(op);
@@ -13119,21 +13683,28 @@ bool OSDMonitor::prepare_pool_op(MonOpRequestRef op)
 
   case POOL_OP_CREATE_UNMANAGED_SNAP:
     {
-      uint64_t snapid;
-      pp.add_unmanaged_snap(snapid);
+      uint64_t snapid = pp.add_unmanaged_snap(
+       osdmap.require_osd_release < ceph_release_t::octopus);
       encode(snapid, reply_data);
       changed = true;
     }
     break;
 
   case POOL_OP_DELETE_UNMANAGED_SNAP:
-    if (!pp.is_removed_snap(m->snapid)) {
+    if (!_is_removed_snap(m->pool, m->snapid) &&
+       !_is_pending_removed_snap(m->pool, m->snapid)) {
       if (m->snapid > pp.get_snap_seq()) {
         _pool_op_reply(op, -ENOENT, osdmap.get_epoch());
         return false;
       }
-      pp.remove_unmanaged_snap(m->snapid);
+      pp.remove_unmanaged_snap(
+       m->snapid,
+       osdmap.require_osd_release < ceph_release_t::octopus);
       pending_inc.new_removed_snaps[m->pool].insert(m->snapid);
+      // also record the new seq as purged: this avoids a discontinuity
+      // after all of the snaps have been purged, since the seq assigned
+      // during removal lives in the same namespace as the actual snaps.
+      pending_pseudo_purged_snaps[m->pool].insert(pp.get_snap_seq());
       changed = true;
     }
     break;
@@ -13444,7 +14015,7 @@ int OSDMonitor::_prepare_rename_pool(int64_t pool, string newname)
 bool OSDMonitor::prepare_pool_op_delete(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
-  MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+  auto m = op->get_req<MPoolOp>();
   ostringstream ss;
   int ret = _prepare_remove_pool(m->pool, &ss, false);
   if (ret == -EAGAIN) {
@@ -13462,7 +14033,7 @@ void OSDMonitor::_pool_op_reply(MonOpRequestRef op,
                                 int ret, epoch_t epoch, bufferlist *blp)
 {
   op->mark_osdmon_event(__func__);
-  MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+  auto m = op->get_req<MPoolOp>();
   dout(20) << "_pool_op_reply " << ret << dendl;
   MPoolOpReply *reply = new MPoolOpReply(m->fsid, m->get_tid(),
                                         ret, epoch, get_last_committed(), blp);
@@ -13478,7 +14049,7 @@ void OSDMonitor::convert_pool_priorities(void)
     const auto &pool = i.second;
 
     if (pool.opts.is_set(key)) {
-      int64_t prio;
+      int64_t prio = 0;
       pool.opts.get(key, &prio);
       if (prio > max_prio)
        max_prio = prio;