]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/OSDMonitor.cc
update sources to 12.2.8
[ceph.git] / ceph / src / mon / OSDMonitor.cc
index ddfeb2a2933824e94601fc17181314a5edce8a88..53c3769654d57b682945dca71dbec4b514be8427 100644 (file)
@@ -17,6 +17,8 @@
  */
 
 #include <algorithm>
+#include <boost/algorithm/string.hpp>
+#include <locale>
 #include <sstream>
 
 #include "mon/OSDMonitor.h"
 #include "common/cmdparse.h"
 #include "include/str_list.h"
 #include "include/str_map.h"
+#include "include/scope_guard.h"
+
+#include "auth/cephx/CephxKeyServer.h"
+#include "osd/OSDCap.h"
 
 #include "json_spirit/json_spirit_reader.h"
 
+#include <boost/algorithm/string/predicate.hpp>
+
 #define dout_subsys ceph_subsys_mon
-#define OSD_PG_CREATING_PREFIX "osd_pg_creating"
+static const string OSD_PG_CREATING_PREFIX("osd_pg_creating");
+static const string OSD_METADATA_PREFIX("osd_metadata");
+
+namespace {
+
+const uint32_t MAX_POOL_APPLICATIONS = 4;
+const uint32_t MAX_POOL_APPLICATION_KEYS = 64;
+const uint32_t MAX_POOL_APPLICATION_LENGTH = 128;
+
+bool is_osd_writable(const OSDCapGrant& grant, const std::string* pool_name) {
+  // Note: this doesn't include support for the application tag match
+  if ((grant.spec.allow & OSD_CAP_W) != 0) {
+    auto& match = grant.match;
+    if (match.is_match_all()) {
+      return true;
+    } else if (pool_name != nullptr && match.auid < 0 &&
+               !match.pool_namespace.pool_name.empty() &&
+               match.pool_namespace.pool_name == *pool_name) {
+      return true;
+    }
+  }
+  return false;
+}
+
+bool is_unmanaged_snap_op_permitted(CephContext* cct,
+                                    const KeyServer& key_server,
+                                    const EntityName& entity_name,
+                                    const MonCap& mon_caps,
+                                    const std::string* pool_name)
+{
+  typedef std::map<std::string, std::string> CommandArgs;
+
+  if (mon_caps.is_capable(cct, CEPH_ENTITY_TYPE_MON,
+                               entity_name, "osd",
+                               "osd pool op unmanaged-snap",
+                               (pool_name == nullptr ?
+                                  CommandArgs{} /* pool DNE, require unrestricted cap */ :
+                                  CommandArgs{{"poolname", *pool_name}}),
+                                false, true, false)) {
+    return true;
+  }
+
+  AuthCapsInfo caps_info;
+  if (!key_server.get_service_caps(entity_name, CEPH_ENTITY_TYPE_OSD,
+                                   caps_info)) {
+    dout(10) << "unable to locate OSD cap data for " << entity_name
+             << " in auth db" << dendl;
+    return false;
+  }
+
+  string caps_str;
+  if (caps_info.caps.length() > 0) {
+    auto p = caps_info.caps.begin();
+    try {
+      decode(caps_str, p);
+    } catch (const buffer::error &err) {
+      derr << "corrupt OSD cap data for " << entity_name << " in auth db"
+           << dendl;
+      return false;
+    }
+  }
+
+  OSDCap osd_cap;
+  if (!osd_cap.parse(caps_str, nullptr)) {
+    dout(10) << "unable to parse OSD cap data for " << entity_name
+             << " in auth db" << dendl;
+    return false;
+  }
+
+  // if the entity has write permissions in one or all pools, permit
+  // usage of unmanaged-snapshots
+  if (osd_cap.allow_all()) {
+    return true;
+  }
+
+  for (auto& grant : osd_cap.grants) {
+    if (grant.profile.is_valid()) {
+      for (auto& profile_grant : grant.profile_grants) {
+        if (is_osd_writable(profile_grant, pool_name)) {
+          return true;
+        }
+      }
+    } else if (is_osd_writable(grant, pool_name)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+} // anonymous namespace
 
 void LastEpochClean::Lec::report(ps_t ps, epoch_t last_epoch_clean)
 {
@@ -173,7 +271,6 @@ OSDMonitor::OSDMonitor(
    cct(cct),
    inc_osd_cache(g_conf->mon_osd_cache_size),
    full_osd_cache(g_conf->mon_osd_cache_size),
-   last_attempted_minwait_time(utime_t()),
    mapper(mn->cct, &mn->cpu_tp),
    op_tracker(cct, true, 1)
 {}
@@ -213,8 +310,7 @@ void OSDMonitor::create_initial()
     newmap.decode(bl);
     newmap.set_fsid(mon->monmap->fsid);
   } else {
-    newmap.build_simple(g_ceph_context, 0, mon->monmap->fsid, 0,
-                       g_conf->osd_pg_bits, g_conf->osd_pgp_bits);
+    newmap.build_simple(g_ceph_context, 0, mon->monmap->fsid, 0);
   }
   newmap.set_epoch(1);
   newmap.created = newmap.modified = ceph_clock_now();
@@ -228,6 +324,9 @@ void OSDMonitor::create_initial()
     derr << __func__ << " mon_debug_no_require_luminous=true" << dendl;
   } else {
     newmap.require_osd_release = CEPH_RELEASE_LUMINOUS;
+    newmap.flags |=
+      CEPH_OSDMAP_RECOVERY_DELETES |
+      CEPH_OSDMAP_PURGED_SNAPDIRS;
     newmap.full_ratio = g_conf->mon_osd_full_ratio;
     if (newmap.full_ratio > 1.0) newmap.full_ratio /= 100;
     newmap.backfillfull_ratio = g_conf->mon_osd_backfillfull_ratio;
@@ -243,8 +342,9 @@ void OSDMonitor::create_initial()
   }
 
   // encode into pending incremental
+  uint64_t features = newmap.get_encoding_features();
   newmap.encode(pending_inc.fullmap,
-                mon->get_quorum_con_features() | CEPH_FEATURE_RESERVED);
+                features | CEPH_FEATURE_RESERVED);
   pending_inc.full_crc = newmap.get_crc();
   dout(20) << " full crc " << pending_inc.full_crc << dendl;
 }
@@ -253,6 +353,7 @@ void OSDMonitor::get_store_prefixes(std::set<string>& s)
 {
   s.insert(service_name);
   s.insert(OSD_PG_CREATING_PREFIX);
+  s.insert(OSD_METADATA_PREFIX);
 }
 
 void OSDMonitor::update_from_paxos(bool *need_bootstrap)
@@ -275,6 +376,8 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
     mapping_job.reset();
   }
 
+  load_health();
+
   /*
    * We will possibly have a stashed latest that *we* wrote, and we will
    * always be sure to have the oldest full map in the first..last range
@@ -495,12 +598,17 @@ void OSDMonitor::start_mapping()
             << dendl;
     mapping_job->abort();
   }
-  auto fin = new C_UpdateCreatingPGs(this, osdmap.get_epoch());
-  mapping_job = mapping.start_update(osdmap, mapper,
-                                    g_conf->mon_osd_mapping_pgs_per_chunk);
-  dout(10) << __func__ << " started mapping job " << mapping_job.get()
-          << " at " << fin->start << dendl;
-  mapping_job->set_finish_event(fin);
+  if (!osdmap.get_pools().empty()) {
+    auto fin = new C_UpdateCreatingPGs(this, osdmap.get_epoch());
+    mapping_job = mapping.start_update(osdmap, mapper,
+                                      g_conf->mon_osd_mapping_pgs_per_chunk);
+    dout(10) << __func__ << " started mapping job " << mapping_job.get()
+            << " at " << fin->start << dendl;
+    mapping_job->set_finish_event(fin);
+  } else {
+    dout(10) << __func__ << " no pools, no mapping job" << dendl;
+    mapping_job = nullptr;
+  }
 }
 
 void OSDMonitor::update_msgr_features()
@@ -527,7 +635,7 @@ void OSDMonitor::on_active()
   update_logger();
 
   if (mon->is_leader()) {
-    mon->clog->info() << "osdmap " << osdmap;
+    mon->clog->debug() << "osdmap " << osdmap;
   } else {
     list<MonOpRequestRef> ls;
     take_all_failures(ls);
@@ -544,23 +652,6 @@ void OSDMonitor::on_active()
 void OSDMonitor::on_restart()
 {
   last_osd_report.clear();
-
-  if (mon->is_leader()) {
-    // fix ruleset != ruleid
-    if (osdmap.crush->has_legacy_rulesets() &&
-       !osdmap.crush->has_multirule_rulesets()) {
-      CrushWrapper newcrush;
-      _get_pending_crush(newcrush);
-      int r = newcrush.renumber_rules_by_ruleset();
-      if (r >= 0) {
-       dout(1) << __func__ << " crush map has ruleset != rule id; fixing" << dendl;
-       pending_inc.crush.clear();
-       newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-      } else {
-       dout(10) << __func__ << " unable to renumber rules by ruleset" << dendl;
-      }
-    }
-  }
 }
 
 void OSDMonitor::on_shutdown()
@@ -640,10 +731,45 @@ void OSDMonitor::create_pending()
              << pending_inc.new_nearfull_ratio << dendl;
     }
   }
+
+  // Rewrite CRUSH rule IDs if they are using legacy "ruleset"
+  // structure.
+  if (osdmap.crush->has_legacy_rule_ids()) {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+
+    // First, for all pools, work out which rule they really used
+    // by resolving ruleset to rule.
+    for (const auto &i : osdmap.get_pools()) {
+      const auto pool_id = i.first;
+      const auto &pool = i.second;
+      int new_rule_id = newcrush.find_rule(pool.crush_rule,
+                                          pool.type, pool.size);
+
+      dout(1) << __func__ << " rewriting pool "
+             << osdmap.get_pool_name(pool_id) << " crush ruleset "
+             << pool.crush_rule << " -> rule id " << new_rule_id << dendl;
+      if (pending_inc.new_pools.count(pool_id) == 0) {
+       pending_inc.new_pools[pool_id] = pool;
+      }
+      pending_inc.new_pools[pool_id].crush_rule = new_rule_id;
+    }
+
+    // Now, go ahead and renumber all the rules so that their
+    // rule_id field corresponds to their position in the array
+    auto old_to_new = newcrush.renumber_rules();
+    dout(1) << __func__ << " Rewrote " << old_to_new << " crush IDs:" << dendl;
+    for (const auto &i : old_to_new) {
+      dout(1) << __func__ << " " << i.first << " -> " << i.second << dendl;
+    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+  }
 }
 
 creating_pgs_t
-OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc)
+OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc,
+                              const OSDMap& nextmap)
 {
   dout(10) << __func__ << dendl;
   creating_pgs_t pending_creatings;
@@ -694,6 +820,20 @@ OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc)
     pending_creatings.last_scan_epoch = osdmap.get_epoch();
   }
 
+  // filter out any pgs that shouldn't exist.
+  {
+    auto i = pending_creatings.pgs.begin();
+    while (i != pending_creatings.pgs.end()) {
+      if (!nextmap.pg_exists(i->first)) {
+       dout(10) << __func__ << " removing pg " << i->first
+                << " which should not exist" << dendl;
+       i = pending_creatings.pgs.erase(i);
+      } else {
+       ++i;
+      }
+    }
+  }
+
   // process queue
   unsigned max = MAX(1, g_conf->mon_osd_max_creating_pgs);
   const auto total = pending_creatings.pgs.size();
@@ -730,7 +870,9 @@ OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc)
   }
   dout(10) << __func__ << " queue remaining: " << pending_creatings.queue.size()
           << " pools" << dendl;
-  dout(10) << __func__ << " " << pending_creatings.pgs.size() - total
+  dout(10) << __func__
+          << " " << (pending_creatings.pgs.size() - total)
+          << "/" << pending_creatings.pgs.size()
           << " pgs added from queued pools" << dendl;
   return pending_creatings;
 }
@@ -795,7 +937,9 @@ void OSDMonitor::maybe_prime_pg_temp()
   next.deepish_copy_from(osdmap);
   next.apply_incremental(pending_inc);
 
-  if (all) {
+  if (next.get_pools().empty()) {
+    dout(10) << __func__ << " no pools, no pg_temp priming" << dendl;
+  } else if (all) {
     PrimeTempJob job(next, this);
     mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk);
     if (job.wait_for(g_conf->mon_osd_prime_pg_temp_max_time)) {
@@ -862,7 +1006,7 @@ void OSDMonitor::prime_pg_temp(
   int next_up_primary, next_acting_primary;
   next.pg_to_up_acting_osds(pgid, &next_up, &next_up_primary,
                            &next_acting, &next_acting_primary);
-  if (acting == next_acting)
+  if (acting == next_acting && next_up != next_acting)
     return;  // no change since last epoch
 
   if (acting.empty())
@@ -871,6 +1015,12 @@ void OSDMonitor::prime_pg_temp(
   if (pool && acting.size() < pool->min_size)
     return;  // can be no worse off than before
 
+  if (next_up == next_acting) {
+    acting.clear();
+    dout(20) << __func__ << "next_up === next_acting now, clear pg_temp"
+             << dendl;
+  }
+
   dout(20) << __func__ << " " << pgid << " " << up << "/" << acting
           << " -> " << next_up << "/" << next_acting
           << ", priming " << acting
@@ -920,6 +1070,18 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   }
   mapping_job.reset();
 
+  // ensure we don't have blank new_state updates.  these are interrpeted as
+  // CEPH_OSD_UP (and almost certainly not what we want!).
+  auto p = pending_inc.new_state.begin();
+  while (p != pending_inc.new_state.end()) {
+    if (p->second == 0) {
+      dout(10) << "new_state for osd." << p->first << " is 0, removing" << dendl;
+      p = pending_inc.new_state.erase(p);
+    } else {
+      ++p;
+    }
+  }
+
   bufferlist bl;
 
   {
@@ -928,31 +1090,190 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     tmp.apply_incremental(pending_inc);
 
     if (tmp.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      // set or clear full/nearfull?
-      int full, backfill, nearfull;
-      tmp.count_full_nearfull_osds(&full, &backfill, &nearfull);
-      if (full > 0) {
-       if (!tmp.test_flag(CEPH_OSDMAP_FULL)) {
-         dout(10) << __func__ << " setting full flag" << dendl;
-         add_flag(CEPH_OSDMAP_FULL);
-         remove_flag(CEPH_OSDMAP_NEARFULL);
-       }
-      } else {
-       if (tmp.test_flag(CEPH_OSDMAP_FULL)) {
-         dout(10) << __func__ << " clearing full flag" << dendl;
-         remove_flag(CEPH_OSDMAP_FULL);
-       }
-       if (nearfull > 0) {
-         if (!tmp.test_flag(CEPH_OSDMAP_NEARFULL)) {
-           dout(10) << __func__ << " setting nearfull flag" << dendl;
-           add_flag(CEPH_OSDMAP_NEARFULL);
-         }
-       } else {
-         if (tmp.test_flag(CEPH_OSDMAP_NEARFULL)) {
-           dout(10) << __func__ << " clearing nearfull flag" << dendl;
-           remove_flag(CEPH_OSDMAP_NEARFULL);
-         }
-       }
+      // remove any legacy osdmap nearfull/full flags
+      {
+        if (tmp.test_flag(CEPH_OSDMAP_FULL | CEPH_OSDMAP_NEARFULL)) {
+          dout(10) << __func__ << " clearing legacy osdmap nearfull/full flag"
+                   << dendl;
+          remove_flag(CEPH_OSDMAP_NEARFULL);
+          remove_flag(CEPH_OSDMAP_FULL);
+        }
+      }
+      // collect which pools are currently affected by
+      // the near/backfill/full osd(s),
+      // and set per-pool near/backfill/full flag instead
+      set<int64_t> full_pool_ids;
+      set<int64_t> backfillfull_pool_ids;
+      set<int64_t> nearfull_pool_ids;
+      tmp.get_full_pools(g_ceph_context,
+                         &full_pool_ids,
+                         &backfillfull_pool_ids,
+                         &nearfull_pool_ids);
+      if (full_pool_ids.empty() ||
+          backfillfull_pool_ids.empty() ||
+          nearfull_pool_ids.empty()) {
+        // normal case - no nearfull, backfillfull or full osds
+        // try cancel any improper nearfull/backfillfull/full pool
+        // flags first
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL) &&
+              nearfull_pool_ids.empty()) {
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s nearfull flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              // load original pool info first!
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL) &&
+              backfillfull_pool_ids.empty()) {
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s backfillfull flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL) &&
+              full_pool_ids.empty()) {
+            if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+              // set by EQUOTA, skipping
+              continue;
+            }
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s full flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_FULL;
+          }
+        }
+      }
+      if (!full_pool_ids.empty()) {
+        dout(10) << __func__ << " marking pool(s) " << full_pool_ids
+                 << " as full" << dendl;
+        for (auto &p: full_pool_ids) {
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL)) {
+            continue;
+          }
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_FULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_FULL for pools which are no longer full too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p)) {
+            // skip pools we have just marked as full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL) ||
+               tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // don't touch if currently is not full
+            // or is running out of quota (and hence considered as full)
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s full flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_FULL;
+        }
+      }
+      if (!backfillfull_pool_ids.empty()) {
+        for (auto &p: backfillfull_pool_ids) {
+          if (full_pool_ids.count(p)) {
+            // skip pools we have already considered as full above
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // make sure FLAG_FULL is truly set, so we are safe not
+            // to set a extra (redundant) FLAG_BACKFILLFULL flag
+            assert(tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL));
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
+            // don't bother if pool is already marked as backfillfull
+            continue;
+          }
+          dout(10) << __func__ << " marking pool '" << tmp.pool_name[p]
+                   << "'s as backfillfull" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_BACKFILLFULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_BACKFILLFULL for pools
+        // which are no longer backfillfull too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p) || backfillfull_pool_ids.count(p)) {
+            // skip pools we have just marked as backfillfull/full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
+            // and don't touch if currently is not backfillfull
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s backfillfull flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+        }
+      }
+      if (!nearfull_pool_ids.empty()) {
+        for (auto &p: nearfull_pool_ids) {
+          if (full_pool_ids.count(p) || backfillfull_pool_ids.count(p)) {
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // make sure FLAG_FULL is truly set, so we are safe not
+            // to set a extra (redundant) FLAG_NEARFULL flag
+            assert(tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL));
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL)) {
+            // don't bother if pool is already marked as nearfull
+            continue;
+          }
+          dout(10) << __func__ << " marking pool '" << tmp.pool_name[p]
+                   << "'s as nearfull" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_NEARFULL for pools
+        // which are no longer nearfull too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p) ||
+              backfillfull_pool_ids.count(p) ||
+              nearfull_pool_ids.count(p)) {
+            // skip pools we have just marked as
+            // nearfull/backfillfull/full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL)) {
+            // and don't touch if currently is not nearfull
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s nearfull flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
       }
 
       // min_compat_client?
@@ -964,6 +1285,99 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
                          << "required " << ceph_release_name(mv);
        pending_inc.new_require_min_compat_client = mv;
       }
+
+      if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+       // convert ec profile ruleset-* -> crush-*
+       for (auto& p : tmp.erasure_code_profiles) {
+         bool changed = false;
+         map<string,string> newprofile;
+         for (auto& q : p.second) {
+           if (q.first.find("ruleset-") == 0) {
+             string key = "crush-";
+             key += q.first.substr(8);
+             newprofile[key] = q.second;
+             changed = true;
+             dout(20) << " updating ec profile " << p.first
+                      << " key " << q.first << " -> " << key << dendl;
+           } else {
+             newprofile[q.first] = q.second;
+           }
+         }
+         if (changed) {
+           dout(10) << " updated ec profile " << p.first << ": "
+                    << newprofile << dendl;
+           pending_inc.new_erasure_code_profiles[p.first] = newprofile;
+         }
+       }
+
+        // auto-enable pool applications upon upgrade
+        // NOTE: this can be removed post-Luminous assuming upgrades need to
+        // proceed through Luminous
+        for (auto &pool_pair : tmp.pools) {
+          int64_t pool_id = pool_pair.first;
+          pg_pool_t pg_pool = pool_pair.second;
+          if (pg_pool.is_tier()) {
+            continue;
+          }
+
+          std::string pool_name = tmp.get_pool_name(pool_id);
+          uint32_t match_count = 0;
+
+          // CephFS
+          const FSMap &pending_fsmap = mon->mdsmon()->get_pending_fsmap();
+          if (pending_fsmap.pool_in_use(pool_id)) {
+            dout(10) << __func__ << " auto-enabling CephFS on pool '"
+                     << pool_name << "'" << dendl;
+            pg_pool.application_metadata.insert(
+              {pg_pool_t::APPLICATION_NAME_CEPHFS, {}});
+            ++match_count;
+          }
+
+          // RBD heuristics (default OpenStack pool names from docs and
+          // ceph-ansible)
+          if (boost::algorithm::contains(pool_name, "rbd") ||
+              pool_name == "images" || pool_name == "volumes" ||
+              pool_name == "backups" || pool_name == "vms") {
+            dout(10) << __func__ << " auto-enabling RBD on pool '"
+                     << pool_name << "'" << dendl;
+            pg_pool.application_metadata.insert(
+              {pg_pool_t::APPLICATION_NAME_RBD, {}});
+            ++match_count;
+          }
+
+          // RGW heuristics
+          if (boost::algorithm::contains(pool_name, ".rgw") ||
+              boost::algorithm::contains(pool_name, ".log") ||
+              boost::algorithm::contains(pool_name, ".intent-log") ||
+              boost::algorithm::contains(pool_name, ".usage") ||
+              boost::algorithm::contains(pool_name, ".users")) {
+            dout(10) << __func__ << " auto-enabling RGW on pool '"
+                     << pool_name << "'" << dendl;
+            pg_pool.application_metadata.insert(
+              {pg_pool_t::APPLICATION_NAME_RGW, {}});
+            ++match_count;
+          }
+
+          // OpenStack gnocchi (from ceph-ansible)
+          if (pool_name == "metrics" && match_count == 0) {
+            dout(10) << __func__ << " auto-enabling OpenStack Gnocchi on pool '"
+                     << pool_name << "'" << dendl;
+            pg_pool.application_metadata.insert({"openstack_gnocchi", {}});
+            ++match_count;
+          }
+
+          if (match_count == 1) {
+            pg_pool.last_change = pending_inc.epoch;
+            pending_inc.new_pools[pool_id] = pg_pool;
+          } else if (match_count > 1) {
+            auto pstat = mon->pgservice->get_pool_stat(pool_id);
+            if (pstat != nullptr && pstat->stats.sum.num_objects > 0) {
+              mon->clog->info() << "unable to auto-enable application for pool "
+                                << "'" << pool_name << "'";
+            }
+          }
+        }
+      }
     }
   }
 
@@ -995,8 +1409,11 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     }
   }
 
+  // clean inappropriate pg_upmap/pg_upmap_items (if any)
+  osdmap.maybe_remove_pg_upmaps(cct, osdmap, &pending_inc);
+
   // features for osdmap and its incremental
-  uint64_t features = mon->get_quorum_con_features();
+  uint64_t features;
 
   // encode full map and determine its crc
   OSDMap tmp;
@@ -1005,22 +1422,13 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     tmp.apply_incremental(pending_inc);
 
     // determine appropriate features
-    if (tmp.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      dout(10) << __func__ << " encoding without feature SERVER_LUMINOUS"
-              << dendl;
-      features &= ~CEPH_FEATURE_SERVER_LUMINOUS;
-    }
-    if (tmp.require_osd_release < CEPH_RELEASE_KRAKEN) {
-      dout(10) << __func__ << " encoding without feature SERVER_KRAKEN | "
-              << "MSG_ADDR2" << dendl;
-      features &= ~(CEPH_FEATURE_SERVER_KRAKEN |
-                   CEPH_FEATURE_MSG_ADDR2);
-    }
-    if (tmp.require_osd_release < CEPH_RELEASE_JEWEL) {
-      dout(10) << __func__ << " encoding without feature SERVER_JEWEL" << dendl;
-      features &= ~CEPH_FEATURE_SERVER_JEWEL;
-    }
-    dout(10) << __func__ << " encoding full map with " << features << dendl;
+    features = tmp.get_encoding_features();
+    dout(10) << __func__ << " encoding full map with "
+            << ceph_release_name(tmp.require_osd_release)
+            << " features " << features << dendl;
+
+    // the features should be a subset of the mon quorum's features!
+    assert((features & ~mon->get_quorum_con_features()) == 0);
 
     bufferlist fullbl;
     ::encode(tmp, fullbl, features | CEPH_FEATURE_RESERVED);
@@ -1058,7 +1466,7 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   // and pg creating, also!
   if (mon->monmap->get_required_features().contains_all(
        ceph::features::mon::FEATURE_LUMINOUS)) {
-    auto pending_creatings = update_pending_pgs(pending_inc);
+    auto pending_creatings = update_pending_pgs(pending_inc, tmp);
     if (osdmap.get_epoch() &&
        osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
       dout(7) << __func__ << " in the middle of upgrading, "
@@ -1069,6 +1477,11 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     ::encode(pending_creatings, creatings_bl);
     t->put(OSD_PG_CREATING_PREFIX, "creating", creatings_bl);
   }
+
+  // health
+  health_check_map_t next;
+  tmp.check_health(&next);
+  encode_health(next, t);
 }
 
 void OSDMonitor::trim_creating_pgs(creating_pgs_t* creating_pgs,
@@ -1106,21 +1519,26 @@ int OSDMonitor::load_metadata(int osd, map<string, string>& m, ostream *err)
   return 0;
 }
 
-void OSDMonitor::count_metadata(const string& field, Formatter *f)
+void OSDMonitor::count_metadata(const string& field, map<string,int> *out)
 {
-  map<string,int> by_val;
   for (int osd = 0; osd < osdmap.get_max_osd(); ++osd) {
     if (osdmap.is_up(osd)) {
       map<string,string> meta;
       load_metadata(osd, meta, nullptr);
       auto p = meta.find(field);
       if (p == meta.end()) {
-       by_val["unknown"]++;
+       (*out)["unknown"]++;
       } else {
-       by_val[p->second]++;
+       (*out)[p->second]++;
       }
     }
   }
+}
+
+void OSDMonitor::count_metadata(const string& field, Formatter *f)
+{
+  map<string,int> by_val;
+  count_metadata(field, &by_val);
   f->open_object_section(field.c_str());
   for (auto& p : by_val) {
     f->dump_int(p.first.c_str(), p.second);
@@ -1214,8 +1632,13 @@ void OSDMonitor::share_map_with_random_osd()
   }
 
   dout(10) << "committed, telling random " << s->inst << " all about it" << dendl;
+
+  // get feature of the peer
+  // use quorum_con_features, if it's an anonymous connection.
+  uint64_t features = s->con_features ? s->con_features :
+                                        mon->get_quorum_con_features();
   // whatev, they'll request more if they need it
-  MOSDMap *m = build_incremental(osdmap.get_epoch() - 1, osdmap.get_epoch());
+  MOSDMap *m = build_incremental(osdmap.get_epoch() - 1, osdmap.get_epoch(), features);
   s->con->send_message(m);
   // NOTE: do *not* record osd has up to this epoch (as we do
   // elsewhere) as they may still need to request older values.
@@ -1394,22 +1817,6 @@ bool OSDMonitor::should_propose(double& delay)
     return true;
   }
 
-  // propose as fast as possible if updating up_thru or pg_temp
-  // want to merge OSDMap changes as much as possible
-  if ((pending_inc.new_primary_temp.size() == 1
-      || pending_inc.new_up_thru.size() == 1)
-      && pending_inc.new_state.size() < 2) {
-    dout(15) << " propose as fast as possible for up_thru/pg_temp" << dendl;
-
-    utime_t now = ceph_clock_now();
-    if (now - last_attempted_minwait_time > g_conf->paxos_propose_interval
-       && now - paxos->get_last_commit_time() > g_conf->paxos_min_wait) {
-      delay = g_conf->paxos_min_wait;
-      last_attempted_minwait_time = now;
-      return true;
-    }
-  }
-
   return PaxosService::should_propose(delay);
 }
 
@@ -1422,21 +1829,26 @@ bool OSDMonitor::preprocess_get_osdmap(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
   MMonGetOSDMap *m = static_cast<MMonGetOSDMap*>(op->get_req());
+
+  uint64_t features = mon->get_quorum_con_features();
+  if (m->get_session() && m->get_session()->con_features)
+    features = m->get_session()->con_features;
+
   dout(10) << __func__ << " " << *m << dendl;
-  MOSDMap *reply = new MOSDMap(mon->monmap->fsid);
+  MOSDMap *reply = new MOSDMap(mon->monmap->fsid, features);
   epoch_t first = get_first_committed();
   epoch_t last = osdmap.get_epoch();
   int max = g_conf->osd_map_message_max;
   for (epoch_t e = MAX(first, m->get_full_first());
        e <= MIN(last, m->get_full_last()) && max > 0;
        ++e, --max) {
-    int r = get_version_full(e, reply->maps[e]);
+    int r = get_version_full(e, features, reply->maps[e]);
     assert(r >= 0);
   }
   for (epoch_t e = MAX(first, m->get_inc_first());
        e <= MIN(last, m->get_inc_last()) && max > 0;
        ++e, --max) {
-    int r = get_version(e, reply->incremental_maps[e]);
+    int r = get_version(e, features, reply->incremental_maps[e]);
     assert(r >= 0);
   }
   reply->oldest_map = first;
@@ -1527,6 +1939,7 @@ bool OSDMonitor::preprocess_failure(MonOpRequestRef op)
   return false;
 
  didit:
+  mon->no_reply(op);
   return true;
 }
 
@@ -1811,7 +2224,7 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
   return false;
 }
 
-void OSDMonitor::force_failure(utime_t now, int target_osd, int by)
+void OSDMonitor::force_failure(int target_osd, int by)
 {
   // already pending failure?
   if (pending_inc.new_state.count(target_osd) &&
@@ -1852,7 +2265,8 @@ bool OSDMonitor::prepare_failure(MonOpRequestRef op)
     if (m->is_immediate()) {
       mon->clog->debug() << m->get_target() << " reported immediately failed by "
             << m->get_orig_source_inst();
-      force_failure(now, target_osd, reporter);
+      force_failure(target_osd, reporter);
+      mon->no_reply(op);
       return true;
     }
     mon->clog->debug() << m->get_target() << " reported failed by "
@@ -1910,6 +2324,7 @@ void OSDMonitor::process_failures()
           o->mark_event(__func__);
           MOSDFailure *m = o->get_req<MOSDFailure>();
           send_latest(o, m->get_epoch());
+         mon->no_reply(o);
         }
        ls.pop_front();
       }
@@ -2027,6 +2442,14 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
     goto ignore;
   }
 
+  if (osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES) &&
+      !(m->osd_features & CEPH_FEATURE_OSD_RECOVERY_DELETES)) {
+    mon->clog->info() << "disallowing boot of OSD "
+                     << m->get_orig_source_inst()
+                     << " because 'recovery_deletes' osdmap flag is set and OSD lacks the OSD_RECOVERY_DELETES feature";
+    goto ignore;
+  }
+
   if (any_of(osdmap.get_pools().begin(),
             osdmap.get_pools().end(),
             [](const std::pair<int64_t,pg_pool_t>& pool)
@@ -2456,6 +2879,7 @@ bool OSDMonitor::preprocess_pg_created(MonOpRequestRef op)
   auto m = static_cast<MOSDPGCreated*>(op->get_req());
   dout(10) << __func__ << " " << *m << dendl;
   auto session = m->get_session();
+  mon->no_reply(op);
   if (!session) {
     dout(10) << __func__ << ": no monitor session!" << dendl;
     return true;
@@ -2513,6 +2937,10 @@ bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op)
     goto ignore;
   }
 
+  if (m->forced) {
+    return false;
+  }
+
   for (auto p = m->pg_temp.begin(); p != m->pg_temp.end(); ++p) {
     dout(20) << " " << p->first
             << (osdmap.pg_temp->count(p->first) ? osdmap.pg_temp->get(p->first) : empty)
@@ -2713,6 +3141,7 @@ bool OSDMonitor::preprocess_beacon(MonOpRequestRef op)
   auto beacon = static_cast<MOSDBeacon*>(op->get_req());
   // check caps
   auto session = beacon->get_session();
+  mon->no_reply(op);
   if (!session) {
     dout(10) << __func__ << " no monitor session!" << dendl;
     return true;
@@ -2768,25 +3197,25 @@ void OSDMonitor::send_latest(MonOpRequestRef op, epoch_t start)
 }
 
 
-MOSDMap *OSDMonitor::build_latest_full()
+MOSDMap *OSDMonitor::build_latest_full(uint64_t features)
 {
-  MOSDMap *r = new MOSDMap(mon->monmap->fsid);
-  get_version_full(osdmap.get_epoch(), r->maps[osdmap.get_epoch()]);
+  MOSDMap *r = new MOSDMap(mon->monmap->fsid, features);
+  get_version_full(osdmap.get_epoch(), features, r->maps[osdmap.get_epoch()]);
   r->oldest_map = get_first_committed();
   r->newest_map = osdmap.get_epoch();
   return r;
 }
 
-MOSDMap *OSDMonitor::build_incremental(epoch_t from, epoch_t to)
+MOSDMap *OSDMonitor::build_incremental(epoch_t from, epoch_t to, uint64_t features)
 {
-  dout(10) << "build_incremental [" << from << ".." << to << "]" << dendl;
-  MOSDMap *m = new MOSDMap(mon->monmap->fsid);
+  dout(10) << "build_incremental [" << from << ".." << to << "] with features " << std::hex << features << dendl;
+  MOSDMap *m = new MOSDMap(mon->monmap->fsid, features);
   m->oldest_map = get_first_committed();
   m->newest_map = osdmap.get_epoch();
 
   for (epoch_t e = to; e >= from && e > 0; e--) {
     bufferlist bl;
-    int err = get_version(e, bl);
+    int err = get_version(e, features, bl);
     if (err == 0) {
       assert(bl.length());
       // if (get_version(e, bl) > 0) {
@@ -2796,7 +3225,7 @@ MOSDMap *OSDMonitor::build_incremental(epoch_t from, epoch_t to)
     } else {
       assert(err == -ENOENT);
       assert(!bl.length());
-      get_version_full(e, bl);
+      get_version_full(e, features, bl);
       if (bl.length() > 0) {
       //else if (get_version("full", e, bl) > 0) {
       dout(20) << "build_incremental   full " << e << " "
@@ -2814,7 +3243,7 @@ void OSDMonitor::send_full(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
   dout(5) << "send_full to " << op->get_req()->get_orig_source_inst() << dendl;
-  mon->send_reply(op, build_latest_full());
+  mon->send_reply(op, build_latest_full(op->get_session()->con_features));
 }
 
 void OSDMonitor::send_incremental(MonOpRequestRef op, epoch_t first)
@@ -2847,6 +3276,11 @@ void OSDMonitor::send_incremental(epoch_t first,
   dout(5) << "send_incremental [" << first << ".." << osdmap.get_epoch() << "]"
          << " to " << session->inst << dendl;
 
+  // get feature of the peer
+  // use quorum_con_features, if it's an anonymous connection.
+  uint64_t features = session->con_features ? session->con_features :
+    mon->get_quorum_con_features();
+
   if (first <= session->osd_epoch) {
     dout(10) << __func__ << " " << session->inst << " should already have epoch "
             << session->osd_epoch << dendl;
@@ -2856,14 +3290,14 @@ void OSDMonitor::send_incremental(epoch_t first,
   if (first < get_first_committed()) {
     first = get_first_committed();
     bufferlist bl;
-    int err = get_version_full(first, bl);
+    int err = get_version_full(first, features, bl);
     assert(err == 0);
     assert(bl.length());
 
     dout(20) << "send_incremental starting with base full "
             << first << " " << bl.length() << " bytes" << dendl;
 
-    MOSDMap *m = new MOSDMap(osdmap.get_fsid());
+    MOSDMap *m = new MOSDMap(osdmap.get_fsid(), features);
     m->oldest_map = get_first_committed();
     m->newest_map = osdmap.get_epoch();
     m->maps[first] = bl;
@@ -2880,9 +3314,9 @@ void OSDMonitor::send_incremental(epoch_t first,
   }
 
   while (first <= osdmap.get_epoch()) {
-    epoch_t last = MIN(first + g_conf->osd_map_message_max - 1,
-                      osdmap.get_epoch());
-    MOSDMap *m = build_incremental(first, last);
+    epoch_t last = std::min<epoch_t>(first + g_conf->osd_map_message_max - 1,
+                                    osdmap.get_epoch());
+    MOSDMap *m = build_incremental(first, last, features);
 
     if (req) {
       // send some maps.  it may not be all of them, but it will get them
@@ -2900,26 +3334,98 @@ void OSDMonitor::send_incremental(epoch_t first,
 
 int OSDMonitor::get_version(version_t ver, bufferlist& bl)
 {
-    if (inc_osd_cache.lookup(ver, &bl)) {
-      return 0;
-    }
-    int ret = PaxosService::get_version(ver, bl);
-    if (!ret) {
-      inc_osd_cache.add(ver, bl);
-    }
+  return get_version(ver, mon->get_quorum_con_features(), bl);
+}
+
+void OSDMonitor::reencode_incremental_map(bufferlist& bl, uint64_t features)
+{
+  OSDMap::Incremental inc;
+  bufferlist::iterator q = bl.begin();
+  inc.decode(q);
+  // always encode with subset of osdmap's canonical features
+  uint64_t f = features & inc.encode_features;
+  dout(20) << __func__ << " " << inc.epoch << " with features " << f
+          << dendl;
+  bl.clear();
+  if (inc.fullmap.length()) {
+    // embedded full map?
+    OSDMap m;
+    m.decode(inc.fullmap);
+    inc.fullmap.clear();
+    m.encode(inc.fullmap, f | CEPH_FEATURE_RESERVED);
+  }
+  if (inc.crush.length()) {
+    // embedded crush map
+    CrushWrapper c;
+    auto p = inc.crush.begin();
+    c.decode(p);
+    inc.crush.clear();
+    c.encode(inc.crush, f);
+  }
+  inc.encode(bl, f | CEPH_FEATURE_RESERVED);
+}
+
+void OSDMonitor::reencode_full_map(bufferlist& bl, uint64_t features)
+{
+  OSDMap m;
+  bufferlist::iterator q = bl.begin();
+  m.decode(q);
+  // always encode with subset of osdmap's canonical features
+  uint64_t f = features & m.get_encoding_features();
+  dout(20) << __func__ << " " << m.get_epoch() << " with features " << f
+          << dendl;
+  bl.clear();
+  m.encode(bl, f | CEPH_FEATURE_RESERVED);
+}
+
+int OSDMonitor::get_version(version_t ver, uint64_t features, bufferlist& bl)
+{
+  uint64_t significant_features = OSDMap::get_significant_features(features);
+  if (inc_osd_cache.lookup({ver, significant_features}, &bl)) {
+    return 0;
+  }
+  int ret = PaxosService::get_version(ver, bl);
+  if (ret < 0) {
     return ret;
+  }
+  // NOTE: this check is imprecise; the OSDMap encoding features may
+  // be a subset of the latest mon quorum features, but worst case we
+  // reencode once and then cache the (identical) result under both
+  // feature masks.
+  if (significant_features !=
+      OSDMap::get_significant_features(mon->get_quorum_con_features())) {
+    reencode_incremental_map(bl, features);
+  }
+  inc_osd_cache.add({ver, significant_features}, bl);
+  return 0;
 }
 
 int OSDMonitor::get_version_full(version_t ver, bufferlist& bl)
 {
-    if (full_osd_cache.lookup(ver, &bl)) {
-      return 0;
-    }
-    int ret = PaxosService::get_version_full(ver, bl);
-    if (!ret) {
-      full_osd_cache.add(ver, bl);
-    }
+  return get_version_full(ver, mon->get_quorum_con_features(), bl);
+}
+
+int OSDMonitor::get_version_full(version_t ver, uint64_t features,
+                                bufferlist& bl)
+{
+  uint64_t significant_features = OSDMap::get_significant_features(features);
+  if (full_osd_cache.lookup({ver, significant_features}, &bl)) {
+    return 0;
+  }
+  int ret = PaxosService::get_version_full(ver, bl);
+  if (ret < 0) {
     return ret;
+  }
+  // NOTE: this check is imprecise; the OSDMap encoding features may
+  // be a subset of the latest mon quorum features, but worst case we
+  // reencode once and then cache the (identical) result under both
+  // feature masks.
+  if (significant_features !=
+      OSDMap::get_significant_features(mon->get_quorum_con_features())) {
+    reencode_full_map(bl, features);
+  }
+  full_osd_cache.add({ver, significant_features}, bl);
+  return 0;
 }
 
 epoch_t OSDMonitor::blacklist(const entity_addr_t& a, utime_t until)
@@ -2956,7 +3462,7 @@ void OSDMonitor::check_osdmap_sub(Subscription *sub)
     if (sub->next >= 1)
       send_incremental(sub->next, sub->session, sub->incremental_onetime);
     else
-      sub->session->con->send_message(build_latest_full());
+      sub->session->con->send_message(build_latest_full(sub->session->con_features));
     if (sub->onetime)
       mon->session_map.remove_sub(sub);
     else
@@ -3000,6 +3506,30 @@ void OSDMonitor::check_pg_creates_sub(Subscription *sub)
   }
 }
 
+void OSDMonitor::do_application_enable(int64_t pool_id,
+                                       const std::string &app_name)
+{
+  assert(paxos->is_plugged() && is_writeable());
+
+  dout(20) << __func__ << ": pool_id=" << pool_id << ", app_name=" << app_name
+           << dendl;
+
+  assert(osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS ||
+        pending_inc.new_require_osd_release >= CEPH_RELEASE_LUMINOUS);
+
+  auto pp = osdmap.get_pg_pool(pool_id);
+  assert(pp != nullptr);
+
+  pg_pool_t p = *pp;
+  if (pending_inc.new_pools.count(pool_id)) {
+    p = pending_inc.new_pools[pool_id];
+  }
+
+  p.application_metadata.insert({app_name, {}});
+  p.last_change = pending_inc.epoch;
+  pending_inc.new_pools[pool_id] = p;
+}
+
 unsigned OSDMonitor::scan_for_creating_pgs(
   const mempool::osdmap::map<int64_t,pg_pool_t>& pools,
   const mempool::osdmap::set<int64_t>& removed_pools,
@@ -3043,11 +3573,16 @@ void OSDMonitor::update_creating_pgs()
           << creating_pgs.queue.size() << " pools in queue" << dendl;
   decltype(creating_pgs_by_osd_epoch) new_pgs_by_osd_epoch;
   std::lock_guard<std::mutex> l(creating_pgs_lock);
-  for (auto& pg : creating_pgs.pgs) {
+  for (const auto& pg : creating_pgs.pgs) {
     int acting_primary = -1;
     auto pgid = pg.first;
+    if (!osdmap.pg_exists(pgid)) {
+      dout(20) << __func__ << " ignoring " << pgid << " which should not exist"
+              << dendl;
+      continue;
+    }
     auto mapped = pg.second.first;
-    dout(20) << __func__ << " looking up " << pgid << dendl;
+    dout(20) << __func__ << " looking up " << pgid << "@" << mapped << dendl;
     mapping.get(pgid, nullptr, nullptr, nullptr, &acting_primary);
     // check the previous creating_pgs, look for the target to whom the pg was
     // previously mapped
@@ -3072,18 +3607,24 @@ void OSDMonitor::update_creating_pgs()
       }
     }
     dout(10) << __func__ << " will instruct osd." << acting_primary
-            << " to create " << pgid << dendl;
+            << " to create " << pgid << "@" << mapped << dendl;
     new_pgs_by_osd_epoch[acting_primary][mapped].insert(pgid);
   }
   creating_pgs_by_osd_epoch = std::move(new_pgs_by_osd_epoch);
   creating_pgs_epoch = mapping.get_epoch();
 }
 
-epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
+epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next) const
 {
   dout(30) << __func__ << " osd." << osd << " next=" << next
           << " " << creating_pgs_by_osd_epoch << dendl;
   std::lock_guard<std::mutex> l(creating_pgs_lock);
+  if (creating_pgs_epoch <= creating_pgs.last_scan_epoch) {
+    dout(20) << __func__
+            << " not using stale creating_pgs@" << creating_pgs_epoch << dendl;
+    // the subscribers will be updated when the mapping is completed anyway
+    return next;
+  }
   auto creating_pgs_by_epoch = creating_pgs_by_osd_epoch.find(osd);
   if (creating_pgs_by_epoch == creating_pgs_by_osd_epoch.end())
     return next;
@@ -3103,11 +3644,12 @@ epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
        m = new MOSDPGCreate(creating_pgs_epoch);
       // Need the create time from the monitor using its clock to set
       // last_scrub_stamp upon pg creation.
-      const auto& creation = creating_pgs.pgs[pg];
-      m->mkpg.emplace(pg, pg_create_t{creation.first, pg, 0});
-      m->ctimes.emplace(pg, creation.second);
+      auto create = creating_pgs.pgs.find(pg);
+      assert(create != creating_pgs.pgs.end());
+      m->mkpg.emplace(pg, pg_create_t{create->second.first, pg, 0});
+      m->ctimes.emplace(pg, create->second.second);
       dout(20) << __func__ << " will create " << pg
-              << " at " << creation.first << dendl;
+              << " at " << create->second.first << dendl;
     }
   }
   if (!m) {
@@ -3141,6 +3683,15 @@ void OSDMonitor::tick()
       do_propose = true;
     }
   }
+  if (!osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) &&
+      osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
+      mon->mgrstatmon()->is_readable() &&
+      mon->mgrstatmon()->definitely_converted_snapsets()) {
+    dout(1) << __func__ << " all snapsets converted, setting purged_snapdirs"
+           << dendl;
+    add_flag(CEPH_OSDMAP_PURGED_SNAPDIRS);
+    do_propose = true;
+  }
 
   // mark osds down?
   if (check_failures(now))
@@ -3196,8 +3747,15 @@ void OSDMonitor::tick()
          }
        }
 
-       if (g_conf->mon_osd_down_out_interval > 0 &&
-           down.sec() >= grace) {
+        bool down_out = !osdmap.is_destroyed(o) &&
+          g_conf->mon_osd_down_out_interval > 0 && down.sec() >= grace;
+        bool destroyed_out = osdmap.is_destroyed(o) &&
+          g_conf->mon_osd_destroyed_out_interval > 0 &&
+        // this is not precise enough as we did not make a note when this osd
+        // was marked as destroyed, but let's not bother with that
+        // complexity for now.
+          down.sec() >= g_conf->mon_osd_destroyed_out_interval;
+        if (down_out || destroyed_out) {
          dout(10) << "tick marking osd." << o << " OUT after " << down
                   << " sec (target " << grace << " = " << orig_grace << " + " << my_grace << ")" << dendl;
          pending_inc.new_weight[o] = CEPH_OSD_OUT;
@@ -3214,7 +3772,8 @@ void OSDMonitor::tick()
 
          do_propose = true;
 
-         mon->clog->info() << "osd." << o << " out (down for " << down << ")";
+         mon->clog->info() << "Marking osd." << o << " out (has been down for "
+                            << int(down.sec()) << " seconds)";
        } else
          continue;
       }
@@ -3287,6 +3846,10 @@ bool OSDMonitor::handle_osd_timeouts(const utime_t &now,
 
   for (int i=0; i < max_osd; ++i) {
     dout(30) << __func__ << ": checking up on osd " << i << dendl;
+    if (!osdmap.exists(i)) {
+      last_osd_report.erase(i); // if any
+      continue;
+    }
     if (!osdmap.is_up(i))
       continue;
     const std::map<int,utime_t>::const_iterator t = last_osd_report.find(i);
@@ -3333,7 +3896,7 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
           osds.insert(i);
         }
        continue;
-      } 
+      }
       if (osdmap.is_out(i))
         continue;
       ++num_in_osds;
@@ -3462,137 +4025,6 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
       }
     }
 
-    if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      // An osd could configure failsafe ratio, to something different
-      // but for now assume it is the same here.
-      float fsr = g_conf->osd_failsafe_full_ratio;
-      if (fsr > 1.0) fsr /= 100;
-      float fr = osdmap.get_full_ratio();
-      float br = osdmap.get_backfillfull_ratio();
-      float nr = osdmap.get_nearfull_ratio();
-
-      bool out_of_order = false;
-      // These checks correspond to how OSDService::check_full_status() in an OSD
-      // handles the improper setting of these values.
-      if (br < nr) {
-        out_of_order = true;
-        if (detail) {
-         ostringstream ss;
-         ss << "backfillfull_ratio (" << br << ") < nearfull_ratio (" << nr << "), increased";
-         detail->push_back(make_pair(HEALTH_ERR, ss.str()));
-        }
-        br = nr;
-      }
-      if (fr < br) {
-        out_of_order = true;
-        if (detail) {
-         ostringstream ss;
-         ss << "full_ratio (" << fr << ") < backfillfull_ratio (" << br << "), increased";
-         detail->push_back(make_pair(HEALTH_ERR, ss.str()));
-        }
-        fr = br;
-      }
-      if (fsr < fr) {
-        out_of_order = true;
-        if (detail) {
-         ostringstream ss;
-         ss << "osd_failsafe_full_ratio (" << fsr << ") < full_ratio (" << fr << "), increased";
-         detail->push_back(make_pair(HEALTH_ERR, ss.str()));
-        }
-      }
-      if (out_of_order) {
-       ostringstream ss;
-       ss << "Full ratio(s) out of order";
-       summary.push_back(make_pair(HEALTH_ERR, ss.str()));
-      }
-
-      set<int> full, backfillfull, nearfull;
-      osdmap.get_full_osd_counts(&full, &backfillfull, &nearfull);
-      if (full.size()) {
-       ostringstream ss;
-       ss << full.size() << " full osd(s)";
-       summary.push_back(make_pair(HEALTH_ERR, ss.str()));
-      }
-      if (backfillfull.size()) {
-       ostringstream ss;
-       ss << backfillfull.size() << " backfillfull osd(s)";
-       summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-      }
-      if (nearfull.size()) {
-       ostringstream ss;
-       ss << nearfull.size() << " nearfull osd(s)";
-       summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-      }
-      if (detail) {
-        for (auto& i: full) {
-          ostringstream ss;
-          ss << "osd." << i << " is full";
-         detail->push_back(make_pair(HEALTH_ERR, ss.str()));
-        }
-        for (auto& i: backfillfull) {
-          ostringstream ss;
-          ss << "osd." << i << " is backfill full";
-         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-        for (auto& i: nearfull) {
-          ostringstream ss;
-          ss << "osd." << i << " is near full";
-         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-
-      // warn if there is any noup osds.
-      vector<int> noup_osds;
-      osdmap.get_noup_osds(&noup_osds);
-      if (noup_osds.size()) {
-        ostringstream ss;
-        ss << noup_osds.size() << " noup osd(s)";
-        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-        if (detail) {
-          ss << ": " << noup_osds;
-          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-
-      // warn if there is any nodown osds.
-      vector<int> nodown_osds;
-      osdmap.get_nodown_osds(&nodown_osds);
-      if (nodown_osds.size()) {
-        ostringstream ss;
-        ss << nodown_osds.size() << " nodown osd(s)";
-        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-        if (detail) {
-          ss << ": " << nodown_osds;
-          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-
-      // warn if there is any noin osds.
-      vector<int> noin_osds;
-      osdmap.get_noin_osds(&noin_osds);
-      if (noin_osds.size()) {
-        ostringstream ss;
-        ss << noin_osds.size() << " noin osd(s)";
-        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-        if (detail) {
-          ss << ": " << noin_osds;
-          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-
-      // warn if there is any noout osds.
-      vector<int> noout_osds;
-      osdmap.get_noout_osds(&noout_osds);
-      if (noout_osds.size()) {
-        ostringstream ss;
-        ss << noout_osds.size() << " noout osd(s)";
-        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-        if (detail) {
-          ss << ": " << noout_osds;
-          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-    }
     // note: we leave it to ceph-mgr to generate details health warnings
     // with actual osd utilizations
 
@@ -3673,16 +4105,6 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
       }
     }
 
-    if (osdmap.crush->has_multirule_rulesets()) {
-      ostringstream ss;
-      ss << "CRUSH map contains multirule rulesets";
-      summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-      if (detail) {
-       ss << "; please manually fix the map";
-       detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-      }
-    }
-
     // Not using 'sortbitwise' and should be?
     if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE) &&
         (osdmap.get_up_osd_features() &
@@ -3744,7 +4166,17 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
       }
     }
 
-    get_pools_health(summary, detail);
+    for (auto it : osdmap.get_pools()) {
+      const pg_pool_t &pool = it.second;
+      if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
+       const string& pool_name = osdmap.get_pool_name(it.first);
+       stringstream ss;
+       ss << "pool '" << pool_name << "' is full";
+       summary.push_back(make_pair(HEALTH_WARN, ss.str()));
+       if (detail)
+         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
+      }
+    }
   }
 }
 
@@ -3776,7 +4208,7 @@ void OSDMonitor::dump_info(Formatter *f)
 namespace {
   enum osd_pool_get_choices {
     SIZE, MIN_SIZE, CRASH_REPLAY_INTERVAL,
-    PG_NUM, PGP_NUM, CRUSH_RULE, HASHPSPOOL,
+    PG_NUM, PGP_NUM, CRUSH_RULE, HASHPSPOOL, EC_OVERWRITES,
     NODELETE, NOPGCHANGE, NOSIZECHANGE,
     WRITE_FADVISE_DONTNEED, NOSCRUB, NODEEP_SCRUB,
     HIT_SET_TYPE, HIT_SET_PERIOD, HIT_SET_COUNT, HIT_SET_FPP,
@@ -3835,7 +4267,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   boost::scoped_ptr<Formatter> f(Formatter::create(format));
 
   if (prefix == "osd stat") {
-    osdmap.print_summary(f.get(), ds);
+    osdmap.print_summary(f.get(), ds, "");
     if (f)
       f->flush(rdata);
     else
@@ -3877,6 +4309,12 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       p->decode(osdmap_bl);
     }
 
+    auto sg = make_scope_guard([&] {
+      if (p != &osdmap) {
+        delete p;
+      }
+    });
+
     if (prefix == "osd dump") {
       stringstream ds;
       if (f) {
@@ -3925,6 +4363,8 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          filter |= OSDMap::DUMP_IN;
        } else if (s == "out") {
          filter |= OSDMap::DUMP_OUT;
+       } else if (s == "destroyed") {
+         filter |= OSDMap::DUMP_DESTROYED;
        } else {
          ss << "unrecognized state '" << s << "'";
          r = -EINVAL;
@@ -3932,10 +4372,18 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
        }
       }
       if ((filter & (OSDMap::DUMP_IN|OSDMap::DUMP_OUT)) ==
-         (OSDMap::DUMP_IN|OSDMap::DUMP_OUT) ||
-         (filter & (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) ==
-         (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) {
-       ss << "cannot specify both up and down or both in and out";
+         (OSDMap::DUMP_IN|OSDMap::DUMP_OUT)) {
+        ss << "cannot specify both 'in' and 'out'";
+        r = -EINVAL;
+        goto reply;
+      }
+      if (((filter & (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) ==
+          (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) ||
+           ((filter & (OSDMap::DUMP_UP|OSDMap::DUMP_DESTROYED)) ==
+           (OSDMap::DUMP_UP|OSDMap::DUMP_DESTROYED)) ||
+           ((filter & (OSDMap::DUMP_DOWN|OSDMap::DUMP_DESTROYED)) ==
+           (OSDMap::DUMP_DOWN|OSDMap::DUMP_DESTROYED))) {
+       ss << "can specify only one of 'up', 'down' and 'destroyed'";
        r = -EINVAL;
        goto reply;
       }
@@ -3990,8 +4438,6 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
 
       rdata.append(ds);
     }
-    if (p != &osdmap)
-      delete p;
   } else if (prefix == "osd df") {
     string method;
     cmd_getval(g_ceph_context, cmdmap, "output_method", method);
@@ -4201,15 +4647,15 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
     goto reply;
 
-  } else if ((prefix == "osd scrub" ||
-             prefix == "osd deep-scrub" ||
-             prefix == "osd repair")) {
+  } else if (prefix == "osd scrub" ||
+            prefix == "osd deep-scrub" ||
+            prefix == "osd repair") {
     string whostr;
     cmd_getval(g_ceph_context, cmdmap, "who", whostr);
     vector<string> pvec;
     get_str_vec(prefix, pvec);
 
-    if (whostr == "*") {
+    if (whostr == "*" || whostr == "all" || whostr == "any") {
       ss << "osds ";
       int c = 0;
       for (int i = 0; i < osdmap.get_max_osd(); i++)
@@ -4363,8 +4809,8 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       {"min_size", MIN_SIZE},
       {"crash_replay_interval", CRASH_REPLAY_INTERVAL},
       {"pg_num", PG_NUM}, {"pgp_num", PGP_NUM},
-      {"crush_rule", CRUSH_RULE},
-      {"hashpspool", HASHPSPOOL}, {"nodelete", NODELETE},
+      {"crush_rule", CRUSH_RULE}, {"hashpspool", HASHPSPOOL},
+      {"allow_ec_overwrites", EC_OVERWRITES}, {"nodelete", NODELETE},
       {"nopgchange", NOPGCHANGE}, {"nosizechange", NOSIZECHANGE},
       {"noscrub", NOSCRUB}, {"nodeep-scrub", NODEEP_SCRUB},
       {"write_fadvise_dontneed", WRITE_FADVISE_DONTNEED},
@@ -4408,10 +4854,11 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       CACHE_TARGET_DIRTY_RATIO, CACHE_TARGET_DIRTY_HIGH_RATIO,
       CACHE_MIN_FLUSH_AGE, CACHE_MIN_EVICT_AGE,
       MIN_READ_RECENCY_FOR_PROMOTE,
+      MIN_WRITE_RECENCY_FOR_PROMOTE,
       HIT_SET_GRADE_DECAY_RATE, HIT_SET_SEARCH_LAST_N
     };
     const choices_set_t ONLY_ERASURE_CHOICES = {
-      ERASURE_CODE_PROFILE
+      EC_OVERWRITES, ERASURE_CODE_PROFILE
     };
 
     choices_set_t selected_choices;
@@ -4451,16 +4898,29 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
        goto reply;
       }
 
+      if (pool_opts_t::is_opt_name(var) &&
+         !p->opts.is_set(pool_opts_t::get_opt_desc(var).key)) {
+       ss << "option '" << var << "' is not set on pool '" << poolstr << "'";
+       r = -ENOENT;
+       goto reply;
+      }
+
       selected_choices.insert(selected);
     }
 
     if (f) {
+      f->open_object_section("pool");
+      f->dump_string("pool", poolstr);
+      f->dump_int("pool_id", pool);
       for(choices_set_t::const_iterator it = selected_choices.begin();
          it != selected_choices.end(); ++it) {
        choices_map_t::const_iterator i;
-       f->open_object_section("pool");
-       f->dump_string("pool", poolstr);
-       f->dump_int("pool_id", pool);
+        for (i = ALL_CHOICES.begin(); i != ALL_CHOICES.end(); ++i) {
+          if (i->second == *it) {
+            break;
+          }
+        }
+        assert(i != ALL_CHOICES.end());
        switch(*it) {
          case PG_NUM:
            f->dump_int("pg_num", p->get_pg_num());
@@ -4489,6 +4949,10 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
              f->dump_string("crush_rule", stringify(p->get_crush_rule()));
            }
            break;
+         case EC_OVERWRITES:
+           f->dump_bool("allow_ec_overwrites",
+                         p->has_flag(pg_pool_t::FLAG_EC_OVERWRITES));
+           break;
          case HASHPSPOOL:
          case NODELETE:
          case NOPGCHANGE:
@@ -4496,14 +4960,8 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          case WRITE_FADVISE_DONTNEED:
          case NOSCRUB:
          case NODEEP_SCRUB:
-           for (i = ALL_CHOICES.begin(); i != ALL_CHOICES.end(); ++i) {
-             if (i->second == *it)
-               break;
-           }
-           assert(i != ALL_CHOICES.end());
-           f->dump_string(i->first.c_str(),
-                          p->has_flag(pg_pool_t::get_flag_by_name(i->first)) ?
-                          "true" : "false");
+           f->dump_bool(i->first.c_str(),
+                          p->has_flag(pg_pool_t::get_flag_by_name(i->first)));
            break;
          case HIT_SET_PERIOD:
            f->dump_int("hit_set_period", p->hit_set_period);
@@ -4599,25 +5057,21 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          case CSUM_TYPE:
          case CSUM_MAX_BLOCK:
          case CSUM_MIN_BLOCK:
-           for (i = ALL_CHOICES.begin(); i != ALL_CHOICES.end(); ++i) {
-             if (i->second == *it)
-               break;
+            pool_opts_t::key_t key = pool_opts_t::get_opt_desc(i->first).key;
+            if (p->opts.is_set(key)) {
+              if(*it == CSUM_TYPE) {
+                int val;
+                p->opts.get(pool_opts_t::CSUM_TYPE, &val);
+                f->dump_string(i->first.c_str(), Checksummer::get_csum_type_string(val));
+              } else {
+                p->opts.dump(i->first, f.get());
+              }
            }
-           assert(i != ALL_CHOICES.end());
-            if(*it == CSUM_TYPE) {
-              int val;
-              p->opts.get(pool_opts_t::CSUM_TYPE, &val);
-              f->dump_string(i->first.c_str(), Checksummer::get_csum_type_string(val));
-            }
-           else {
-              p->opts.dump(i->first, f.get());
-            }
             break;
        }
-       f->close_section();
-       f->flush(rdata);
       }
-
+      f->close_section();
+      f->flush(rdata);
     } else /* !f */ {
       for(choices_set_t::const_iterator it = selected_choices.begin();
          it != selected_choices.end(); ++it) {
@@ -4716,6 +5170,11 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
            ss << "hit_set_search_last_n: " <<
              p->hit_set_search_last_n << "\n";
            break;
+         case EC_OVERWRITES:
+           ss << "allow_ec_overwrites: " <<
+             (p->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) ? "true" : "false") <<
+             "\n";
+           break;
          case HASHPSPOOL:
          case NODELETE:
          case NOPGCHANGE:
@@ -4808,29 +5267,57 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       if (p->quota_max_objects == 0)
         rs << "N/A";
       else
-        rs << si_t(p->quota_max_objects) << " objects";
+        rs << si_u_t(p->quota_max_objects) << " objects";
       rs << "\n"
          << "  max bytes  : ";
       if (p->quota_max_bytes == 0)
         rs << "N/A";
       else
-        rs << si_t(p->quota_max_bytes) << "B";
+        rs << byte_u_t(p->quota_max_bytes);
       rdata.append(rs.str());
     }
     rdata.append("\n");
     r = 0;
   } else if (prefix == "osd crush rule list" ||
             prefix == "osd crush rule ls") {
-    string format;
-    cmd_getval(g_ceph_context, cmdmap, "format", format);
-    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
-    f->open_array_section("rules");
-    osdmap.crush->list_rules(f.get());
-    f->close_section();
-    ostringstream rs;
-    f->flush(rs);
-    rs << "\n";
-    rdata.append(rs.str());
+    if (f) {
+      f->open_array_section("rules");
+      osdmap.crush->list_rules(f.get());
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      ostringstream ss;
+      osdmap.crush->list_rules(&ss);
+      rdata.append(ss.str());
+    }
+  } else if (prefix == "osd crush rule ls-by-class") {
+    string class_name;
+    cmd_getval(g_ceph_context, cmdmap, "class", class_name);
+    if (class_name.empty()) {
+      ss << "no class specified";
+      r = -EINVAL;
+      goto reply;
+    }
+    set<int> rules;
+    r = osdmap.crush->get_rules_by_class(class_name, &rules);
+    if (r < 0) {
+      ss << "failed to get rules by class '" << class_name << "'";
+      goto reply;
+    }
+    if (f) {
+      f->open_array_section("rules");
+      for (auto &rule: rules) {
+        f->dump_string("name", osdmap.crush->get_rule_name(rule));
+      }
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      ostringstream rs;
+      for (auto &rule: rules) {
+        rs << osdmap.crush->get_rule_name(rule) << "\n";
+      }
+      rdata.append(rs.str());
+    }
   } else if (prefix == "osd crush rule dump") {
     string name;
     cmd_getval(g_ceph_context, cmdmap, "name", name);
@@ -4877,11 +5364,61 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     rs << "\n";
     rdata.append(rs.str());
   } else if (prefix == "osd crush tree") {
-    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
-    f->open_array_section("crush_map_roots");
-    osdmap.crush->dump_tree(f.get());
-    f->close_section();
-    f->flush(rdata);
+    string shadow;
+    cmd_getval(g_ceph_context, cmdmap, "shadow", shadow);
+    bool show_shadow = shadow == "--show-shadow";
+    boost::scoped_ptr<Formatter> f(Formatter::create(format));
+    if (f) {
+      osdmap.crush->dump_tree(nullptr,
+                              f.get(),
+                              osdmap.get_pool_names(),
+                              show_shadow);
+      f->flush(rdata);
+    } else {
+      ostringstream ss;
+      osdmap.crush->dump_tree(&ss,
+                              nullptr,
+                              osdmap.get_pool_names(),
+                              show_shadow);
+      rdata.append(ss.str());
+    }
+  } else if (prefix == "osd crush ls") {
+    string name;
+    if (!cmd_getval(g_ceph_context, cmdmap, "node", name)) {
+      ss << "no node specified";
+      r = -EINVAL;
+      goto reply;
+    }
+    if (!osdmap.crush->name_exists(name)) {
+      ss << "node '" << name << "' does not exist";
+      r = -ENOENT;
+      goto reply;
+    }
+    int id = osdmap.crush->get_item_id(name);
+    list<int> result;
+    if (id >= 0) {
+      result.push_back(id);
+    } else {
+      int num = osdmap.crush->get_bucket_size(id);
+      for (int i = 0; i < num; ++i) {
+       result.push_back(osdmap.crush->get_bucket_item(id, i));
+      }
+    }
+    if (f) {
+      f->open_array_section("items");
+      for (auto i : result) {
+       f->dump_string("item", osdmap.crush->get_item_name(i));
+      }
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      ostringstream ss;
+      for (auto i : result) {
+       ss << osdmap.crush->get_item_name(i) << "\n";
+      }
+      rdata.append(ss.str());
+    }
+    r = 0;
   } else if (prefix == "osd crush class ls") {
     boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
     f->open_array_section("crush_classes");
@@ -4889,6 +5426,27 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       f->dump_string("class", i.second);
     f->close_section();
     f->flush(rdata);
+  } else if (prefix == "osd crush class ls-osd") {
+    string name;
+    cmd_getval(g_ceph_context, cmdmap, "class", name);
+    set<int> osds;
+    osdmap.crush->get_devices_by_class(name, &osds);
+    if (f) {
+      f->open_array_section("osds");
+      for (auto &osd: osds)
+        f->dump_int("osd", osd);
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      bool first = true;
+      for (auto &osd : osds) {
+        if (!first)
+          ds << "\n";
+        first = false;
+        ds << osd;
+      }
+      rdata.append(ds);
+    }
   } else if (prefix == "osd erasure-code-profile ls") {
     const auto &profiles = osdmap.get_erasure_code_profiles();
     if (f)
@@ -4906,6 +5464,37 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       rs << "\n";
       rdata.append(rs.str());
     }
+  } else if (prefix == "osd crush weight-set ls") {
+    boost::scoped_ptr<Formatter> f(Formatter::create(format));
+    if (f) {
+      f->open_array_section("weight_sets");
+      if (osdmap.crush->have_choose_args(CrushWrapper::DEFAULT_CHOOSE_ARGS)) {
+       f->dump_string("pool", "(compat)");
+      }
+      for (auto& i : osdmap.crush->choose_args) {
+       if (i.first >= 0) {
+         f->dump_string("pool", osdmap.get_pool_name(i.first));
+       }
+      }
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      ostringstream rs;
+      if (osdmap.crush->have_choose_args(CrushWrapper::DEFAULT_CHOOSE_ARGS)) {
+       rs << "(compat)\n";
+      }
+      for (auto& i : osdmap.crush->choose_args) {
+       if (i.first >= 0) {
+         rs << osdmap.get_pool_name(i.first) << "\n";
+       }
+      }
+      rdata.append(rs.str());
+    }
+  } else if (prefix == "osd crush weight-set dump") {
+    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty",
+                                                    "json-pretty"));
+    osdmap.crush->dump_choose_args(f.get());
+    f->flush(rdata);
   } else if (prefix == "osd erasure-code-profile get") {
     string name;
     cmd_getval(g_ceph_context, cmdmap, "name", name);
@@ -4932,6 +5521,87 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       rs << "\n";
       rdata.append(rs.str());
     }
+  } else if (prefix == "osd pool application get") {
+    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty",
+                                                     "json-pretty"));
+    string pool_name;
+    cmd_getval(g_ceph_context, cmdmap, "pool", pool_name);
+    string app;
+    cmd_getval(g_ceph_context, cmdmap, "app", app);
+    string key;
+    cmd_getval(g_ceph_context, cmdmap, "key", key);
+
+    if (pool_name.empty()) {
+      // all
+      f->open_object_section("pools");
+      for (const auto &pool : osdmap.pools) {
+        std::string name("<unknown>");
+        const auto &pni = osdmap.pool_name.find(pool.first);
+        if (pni != osdmap.pool_name.end())
+          name = pni->second;
+        f->open_object_section(name.c_str());
+        for (auto &app_pair : pool.second.application_metadata) {
+          f->open_object_section(app_pair.first.c_str());
+          for (auto &kv_pair : app_pair.second) {
+            f->dump_string(kv_pair.first.c_str(), kv_pair.second);
+          }
+          f->close_section();
+        }
+        f->close_section(); // name
+      }
+      f->close_section(); // pools
+      f->flush(rdata);
+    } else {
+      int64_t pool = osdmap.lookup_pg_pool_name(pool_name.c_str());
+      if (pool < 0) {
+        ss << "unrecognized pool '" << pool_name << "'";
+        r = -ENOENT;
+        goto reply;
+      }
+      auto p = osdmap.get_pg_pool(pool);
+      // filter by pool
+      if (app.empty()) {
+        f->open_object_section(pool_name.c_str());
+        for (auto &app_pair : p->application_metadata) {
+          f->open_object_section(app_pair.first.c_str());
+          for (auto &kv_pair : app_pair.second) {
+            f->dump_string(kv_pair.first.c_str(), kv_pair.second);
+          }
+          f->close_section(); // application
+        }
+        f->close_section(); // pool_name
+        f->flush(rdata);
+        goto reply;
+      }
+
+      auto app_it = p->application_metadata.find(app);
+      if (app_it == p->application_metadata.end()) {
+        ss << "pool '" << pool_name << "' has no application '" << app << "'";
+        r = -ENOENT;
+        goto reply;
+      }
+      // filter by pool + app
+      if (key.empty()) {
+        f->open_object_section(app_it->first.c_str());
+        for (auto &kv_pair : app_it->second) {
+          f->dump_string(kv_pair.first.c_str(), kv_pair.second);
+        }
+        f->close_section(); // application
+        f->flush(rdata);
+        goto reply;
+      }
+      // filter by pool + app + key
+      auto key_it = app_it->second.find(key);
+      if (key_it == app_it->second.end()) {
+        ss << "application '" << app << "' on pool '" << pool_name
+           << "' does not have key '" << key << "'";
+        r = -ENOENT;
+        goto reply;
+      }
+      ss << key_it->second << "\n";
+      rdata.append(ss.str());
+      ss.str("");
+    }
   } else {
     // try prepare update
     return false;
@@ -4944,10 +5614,20 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   return true;
 }
 
-void OSDMonitor::update_pool_flags(int64_t pool_id, uint64_t flags)
+void OSDMonitor::set_pool_flags(int64_t pool_id, uint64_t flags)
+{
+  pg_pool_t *pool = pending_inc.get_new_pool(pool_id,
+    osdmap.get_pg_pool(pool_id));
+  assert(pool);
+  pool->set_flag(flags);
+}
+
+void OSDMonitor::clear_pool_flags(int64_t pool_id, uint64_t flags)
 {
-  const pg_pool_t *pool = osdmap.get_pg_pool(pool_id);
-  pending_inc.get_new_pool(pool_id, pool)->flags = flags;
+  pg_pool_t *pool = pending_inc.get_new_pool(pool_id,
+    osdmap.get_pg_pool(pool_id));
+  assert(pool);
+  pool->unset_flag(flags);
 }
 
 bool OSDMonitor::update_pools_status()
@@ -4970,14 +5650,16 @@ bool OSDMonitor::update_pools_status()
       (pool.quota_max_bytes > 0 && (uint64_t)sum.num_bytes >= pool.quota_max_bytes) ||
       (pool.quota_max_objects > 0 && (uint64_t)sum.num_objects >= pool.quota_max_objects);
 
-    if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
+    if (pool.has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
       if (pool_is_full)
         continue;
 
       mon->clog->info() << "pool '" << pool_name
-                       << "' no longer full; removing FULL flag";
-
-      update_pool_flags(it->first, pool.get_flags() & ~pg_pool_t::FLAG_FULL);
+                       << "' no longer out of quota; removing NO_QUOTA flag";
+      // below we cancel FLAG_FULL too, we'll set it again in
+      // OSDMonitor::encode_pending if it still fails the osd-full checking.
+      clear_pool_flags(it->first,
+                       pg_pool_t::FLAG_FULL_NO_QUOTA | pg_pool_t::FLAG_FULL);
       ret = true;
     } else {
       if (!pool_is_full)
@@ -4987,7 +5669,7 @@ bool OSDMonitor::update_pools_status()
           (uint64_t)sum.num_bytes >= pool.quota_max_bytes) {
         mon->clog->warn() << "pool '" << pool_name << "' is full"
                          << " (reached quota's max_bytes: "
-                         << si_t(pool.quota_max_bytes) << ")";
+                         << byte_u_t(pool.quota_max_bytes) << ")";
       }
       if (pool.quota_max_objects > 0 &&
                 (uint64_t)sum.num_objects >= pool.quota_max_objects) {
@@ -4995,97 +5677,20 @@ bool OSDMonitor::update_pools_status()
                          << " (reached quota's max_objects: "
                          << pool.quota_max_objects << ")";
       }
-      update_pool_flags(it->first, pool.get_flags() | pg_pool_t::FLAG_FULL);
+      // set both FLAG_FULL_NO_QUOTA and FLAG_FULL
+      // note that below we try to cancel FLAG_BACKFILLFULL/NEARFULL too
+      // since FLAG_FULL should always take precedence
+      set_pool_flags(it->first,
+                     pg_pool_t::FLAG_FULL_NO_QUOTA | pg_pool_t::FLAG_FULL);
+      clear_pool_flags(it->first,
+                       pg_pool_t::FLAG_NEARFULL |
+                       pg_pool_t::FLAG_BACKFILLFULL);
       ret = true;
     }
   }
   return ret;
 }
 
-void OSDMonitor::get_pools_health(
-    list<pair<health_status_t,string> >& summary,
-    list<pair<health_status_t,string> > *detail) const
-{
-  auto& pools = osdmap.get_pools();
-  for (auto it = pools.begin(); it != pools.end(); ++it) {
-    const pool_stat_t *pstat = mon->pgservice->get_pool_stat(it->first);
-    if (!pstat)
-      continue;
-    const object_stat_sum_t& sum = pstat->stats.sum;
-    const pg_pool_t &pool = it->second;
-    const string& pool_name = osdmap.get_pool_name(it->first);
-
-    if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
-      // uncomment these asserts if/when we update the FULL flag on pg_stat update
-      //assert((pool.quota_max_objects > 0) || (pool.quota_max_bytes > 0));
-
-      stringstream ss;
-      ss << "pool '" << pool_name << "' is full";
-      summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-      if (detail)
-       detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-    }
-
-    float warn_threshold = (float)g_conf->mon_pool_quota_warn_threshold/100;
-    float crit_threshold = (float)g_conf->mon_pool_quota_crit_threshold/100;
-
-    if (pool.quota_max_objects > 0) {
-      stringstream ss;
-      health_status_t status = HEALTH_OK;
-      if ((uint64_t)sum.num_objects >= pool.quota_max_objects) {
-       // uncomment these asserts if/when we update the FULL flag on pg_stat update
-        //assert(pool.has_flag(pg_pool_t::FLAG_FULL));
-      } else if (crit_threshold > 0 &&
-                sum.num_objects >= pool.quota_max_objects*crit_threshold) {
-        ss << "pool '" << pool_name
-           << "' has " << sum.num_objects << " objects"
-           << " (max " << pool.quota_max_objects << ")";
-        status = HEALTH_ERR;
-      } else if (warn_threshold > 0 &&
-                sum.num_objects >= pool.quota_max_objects*warn_threshold) {
-        ss << "pool '" << pool_name
-           << "' has " << sum.num_objects << " objects"
-           << " (max " << pool.quota_max_objects << ")";
-        status = HEALTH_WARN;
-      }
-      if (status != HEALTH_OK) {
-        pair<health_status_t,string> s(status, ss.str());
-        summary.push_back(s);
-        if (detail)
-          detail->push_back(s);
-      }
-    }
-
-    if (pool.quota_max_bytes > 0) {
-      health_status_t status = HEALTH_OK;
-      stringstream ss;
-      if ((uint64_t)sum.num_bytes >= pool.quota_max_bytes) {
-       // uncomment these asserts if/when we update the FULL flag on pg_stat update
-       //assert(pool.has_flag(pg_pool_t::FLAG_FULL));
-      } else if (crit_threshold > 0 &&
-                sum.num_bytes >= pool.quota_max_bytes*crit_threshold) {
-        ss << "pool '" << pool_name
-           << "' has " << si_t(sum.num_bytes) << " bytes"
-           << " (max " << si_t(pool.quota_max_bytes) << ")";
-        status = HEALTH_ERR;
-      } else if (warn_threshold > 0 &&
-                sum.num_bytes >= pool.quota_max_bytes*warn_threshold) {
-        ss << "pool '" << pool_name
-           << "' has " << si_t(sum.num_bytes) << " bytes"
-           << " (max " << si_t(pool.quota_max_bytes) << ")";
-        status = HEALTH_WARN;
-      }
-      if (status != HEALTH_OK) {
-        pair<health_status_t,string> s(status, ss.str());
-        summary.push_back(s);
-        if (detail)
-          detail->push_back(s);
-      }
-    }
-  }
-}
-
-
 int OSDMonitor::prepare_new_pool(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
@@ -5097,16 +5702,22 @@ int OSDMonitor::prepare_new_pool(MonOpRequestRef op)
   string erasure_code_profile;
   stringstream ss;
   string rule_name;
+  int ret = 0;
   if (m->auid)
-    return prepare_new_pool(m->name, m->auid, m->crush_rule, rule_name,
+    ret =  prepare_new_pool(m->name, m->auid, m->crush_rule, rule_name,
                            0, 0,
                             erasure_code_profile,
                            pg_pool_t::TYPE_REPLICATED, 0, FAST_READ_OFF, &ss);
   else
-    return prepare_new_pool(m->name, session->auid, m->crush_rule, rule_name,
+    ret = prepare_new_pool(m->name, session->auid, m->crush_rule, rule_name,
                            0, 0,
                             erasure_code_profile,
                            pg_pool_t::TYPE_REPLICATED, 0, FAST_READ_OFF, &ss);
+
+  if (ret < 0) {
+    dout(10) << __func__ << " got " << ret << " " << ss.str() << dendl;
+  }
+  return ret;
 }
 
 int OSDMonitor::crush_rename_bucket(const string& srcname,
@@ -5188,7 +5799,7 @@ int OSDMonitor::normalize_profile(const string& profilename,
   auto it = profile.find("stripe_unit");
   if (it != profile.end()) {
     string err_str;
-    uint32_t stripe_unit = strict_si_cast<uint32_t>(it->second.c_str(), &err_str);
+    uint32_t stripe_unit = strict_iecstrtoll(it->second.c_str(), &err_str);
     if (!err_str.empty()) {
       *ss << "could not parse stripe_unit '" << it->second
          << "': " << err_str << std::endl;
@@ -5237,7 +5848,7 @@ int OSDMonitor::crush_rule_create_erasure(const string &name,
       return err;
     }
 
-    err = erasure_code->create_ruleset(name, newcrush, ss);
+    err = erasure_code->create_rule(name, newcrush, ss);
     erasure_code.reset();
     if (err < 0)
       return err;
@@ -5389,9 +6000,22 @@ int OSDMonitor::parse_erasure_code_profile(const vector<string> &erasure_code_pr
       user_map[*i] = string();
       (*erasure_code_profile_map)[*i] = string();
     } else {
-      const string key = i->substr(0, equal);
+      string key = i->substr(0, equal);
       equal++;
       const string value = i->substr(equal);
+      if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
+         key.find("ruleset-") == 0) {
+       if (g_conf->get_val<bool>("mon_fixup_legacy_erasure_code_profiles")) {
+         mon->clog->warn() << "erasure code profile property '" << key
+                           << "' is no longer supported; try "
+                           << "'crush-" << key.substr(8) << "' instead";
+         key = string("crush-") + key.substr(8);
+       } else {
+         *ss << "property '" << key << "' is no longer supported; try "
+             << "'crush-" << key.substr(8) << "' instead";
+         return -EINVAL;
+       }
+      }
       user_map[key] = value;
       (*erasure_code_profile_map)[key] = value;
     }
@@ -5455,7 +6079,7 @@ int OSDMonitor::prepare_pool_stripe_width(const unsigned pool_type,
       auto it = profile.find("stripe_unit");
       if (it != profile.end()) {
        string err_str;
-       stripe_unit = strict_si_cast<uint32_t>(it->second.c_str(), &err_str);
+       stripe_unit = strict_iecstrtoll(it->second.c_str(), &err_str);
        assert(err_str.empty());
       }
       *stripe_width = data_chunks *
@@ -5472,10 +6096,10 @@ int OSDMonitor::prepare_pool_stripe_width(const unsigned pool_type,
 }
 
 int OSDMonitor::prepare_pool_crush_rule(const unsigned pool_type,
-                                          const string &erasure_code_profile,
-                                          const string &rule_name,
-                                          int *crush_rule,
-                                          ostream *ss)
+                                       const string &erasure_code_profile,
+                                       const string &rule_name,
+                                       int *crush_rule,
+                                       ostream *ss)
 {
 
   if (*crush_rule < 0) {
@@ -5483,7 +6107,7 @@ int OSDMonitor::prepare_pool_crush_rule(const unsigned pool_type,
     case pg_pool_t::TYPE_REPLICATED:
       {
        if (rule_name == "") {
-         //Use default rule
+         // Use default rule
          *crush_rule = osdmap.crush->get_osd_pool_default_crush_replicated_ruleset(g_ceph_context);
          if (*crush_rule < 0) {
            // Errors may happen e.g. if no valid rule is available
@@ -5534,8 +6158,8 @@ int OSDMonitor::prepare_pool_crush_rule(const unsigned pool_type,
 }
 
 int OSDMonitor::get_crush_rule(const string &rule_name,
-                                 int *crush_rule,
-                                 ostream *ss)
+                              int *crush_rule,
+                              ostream *ss)
 {
   int ret;
   ret = osdmap.crush->get_rule_id(rule_name);
@@ -5553,7 +6177,7 @@ int OSDMonitor::get_crush_rule(const string &rule_name,
               << " try again" << dendl;
       return -EAGAIN;
     } else {
-      //Cannot find it , return error
+      // Cannot find it , return error
       *ss << "specified rule " << rule_name << " doesn't exist";
       return ret;
     }
@@ -5561,6 +6185,36 @@ int OSDMonitor::get_crush_rule(const string &rule_name,
   return 0;
 }
 
+int OSDMonitor::check_pg_num(int64_t pool, int pg_num, int size, ostream *ss)
+{
+  auto max_pgs_per_osd = g_conf->get_val<uint64_t>("mon_max_pg_per_osd");
+  auto num_osds = std::max(osdmap.get_num_in_osds(), 3u);   // assume min cluster size 3
+  auto max_pgs = max_pgs_per_osd * num_osds;
+  uint64_t projected = 0;
+  if (pool < 0) {
+    projected += pg_num * size;
+  }
+  for (const auto& i : osdmap.get_pools()) {
+    if (i.first == pool) {
+      projected += pg_num * size;
+    } else {
+      projected += i.second.get_pg_num() * i.second.get_size();
+    }
+  }
+  if (projected > max_pgs) {
+    if (pool >= 0) {
+      *ss << "pool id " << pool;
+    }
+    *ss << " pg_num " << pg_num << " size " << size
+       << " would mean " << projected
+       << " total pgs, which exceeds max " << max_pgs
+       << " (mon_max_pg_per_osd " << max_pgs_per_osd
+       << " * num_in_osds " << num_osds << ")";
+    return -ERANGE;
+  }
+  return 0;
+}
+
 /**
  * @param name The name of the new pool
  * @param auid The auid of the pool owner. Can be -1
@@ -5611,32 +6265,38 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
   r = prepare_pool_crush_rule(pool_type, erasure_code_profile,
                                 crush_rule_name, &crush_rule, ss);
   if (r) {
-    dout(10) << " prepare_pool_crush_rule returns " << r << dendl;
+    dout(10) << "prepare_pool_crush_rule returns " << r << dendl;
     return r;
   }
-  CrushWrapper newcrush;
-  _get_pending_crush(newcrush);
-  ostringstream err;
-  CrushTester tester(newcrush, err);
-  // use the internal crush tester if crushtool config is empty
-  if (g_conf->crushtool.empty()) {
-    r = tester.test();
-  } else {
-    r = tester.test_with_crushtool(g_conf->crushtool.c_str(),
-                                osdmap.get_max_osd(),
-                                g_conf->mon_lease,
-                                crush_rule);
+  if (g_conf->mon_osd_crush_smoke_test) {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    ostringstream err;
+    CrushTester tester(newcrush, err);
+    tester.set_min_x(0);
+    tester.set_max_x(50);
+    tester.set_rule(crush_rule);
+    auto start = ceph::coarse_mono_clock::now();
+    r = tester.test_with_fork(g_conf->mon_lease);
+    auto duration = ceph::coarse_mono_clock::now() - start;
+    if (r < 0) {
+      dout(10) << "tester.test_with_fork returns " << r
+              << ": " << err.str() << dendl;
+      *ss << "crush test failed with " << r << ": " << err.str();
+      return r;
+    }
+    dout(10) << __func__ << " crush smoke test duration: "
+             << duration << dendl;
   }
+  unsigned size, min_size;
+  r = prepare_pool_size(pool_type, erasure_code_profile, &size, &min_size, ss);
   if (r) {
-    dout(10) << " tester.test_with_crushtool returns " << r
-            << ": " << err.str() << dendl;
-    *ss << "crushtool check failed with " << r << ": " << err.str();
+    dout(10) << "prepare_pool_size returns " << r << dendl;
     return r;
   }
-  unsigned size, min_size;
-  r = prepare_pool_size(pool_type, erasure_code_profile, &size, &min_size, ss);
+  r = check_pg_num(-1, pg_num, size, ss);
   if (r) {
-    dout(10) << " prepare_pool_size returns " << r << dendl;
+    dout(10) << "check_pg_num returns " << r << dendl;
     return r;
   }
 
@@ -5647,7 +6307,7 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
   uint32_t stripe_width = 0;
   r = prepare_pool_stripe_width(pool_type, erasure_code_profile, &stripe_width, ss);
   if (r) {
-    dout(10) << " prepare_pool_stripe_width returns " << r << dendl;
+    dout(10) << "prepare_pool_stripe_width returns " << r << dendl;
     return r;
   }
   
@@ -5815,6 +6475,10 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
       ss << "pool size must be between 1 and 10";
       return -EINVAL;
     }
+    int r = check_pg_num(pool, p.get_pg_num(), n, &ss);
+    if (r < 0) {
+      return r;
+    }
     p.size = n;
     if (n < p.min_size)
       p.min_size = n;
@@ -5841,7 +6505,7 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
        if (err == 0) {
         k = erasure_code->get_data_chunk_count();
        } else {
-        ss << __func__ << " get_erasure_code failed: " << tmp.rdbuf();
+        ss << __func__ << " get_erasure_code failed: " << tmp.str();
         return err;
        }
 
@@ -5878,6 +6542,16 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
        return -EEXIST;
       return 0;
     }
+    if (n > (unsigned)g_conf->mon_max_pool_pg_num) {
+      ss << "'pg_num' must be greater than 0 and less than or equal to "
+         << g_conf->mon_max_pool_pg_num
+         << " (you may adjust 'mon max pool pg num' for higher values)";
+      return -ERANGE;
+    }
+    int r = check_pg_num(pool, n, p.get_size(), &ss);
+    if (r) {
+      return r;
+    }
     string force;
     cmd_getval(g_ceph_context,cmdmap, "force", force);
     if (p.cache_mode != pg_pool_t::CACHEMODE_NONE &&
@@ -6007,7 +6681,15 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
     bloomp->set_fpp(f);
   } else if (var == "use_gmt_hitset") {
     if (val == "true" || (interr.empty() && n == 1)) {
-      if (!(osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_HITSET_GMT)) {
+      string force;
+      cmd_getval(g_ceph_context, cmdmap, "force", force);
+      if (!osdmap.get_num_up_osds() && force != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        return -EPERM;
+      }
+      if (!(osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_HITSET_GMT)
+          && force != "--yes-i-really-mean-it") {
        ss << "not all OSDs support GMT hit set.";
        return -EINVAL;
       }
@@ -6021,8 +6703,14 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
       ss << "ec overwrites can only be enabled for an erasure coded pool";
       return -EINVAL;
     }
-    if (val == "true" || (interr.empty() && n == 1)) {
-       p.flags |= pg_pool_t::FLAG_EC_OVERWRITES;
+    stringstream err;
+    if (!g_conf->mon_debug_no_require_bluestore_for_ec_overwrites &&
+       !is_pool_currently_all_bluestore(pool, p, &err)) {
+      ss << "pool must only be stored on bluestore for scrubbing to work: " << err.str();
+      return -EINVAL;
+    }
+    if (val == "true" || (interr.empty() && n == 1)) {
+       p.flags |= pg_pool_t::FLAG_EC_OVERWRITES;
     } else if (val == "false" || (interr.empty() && n == 0)) {
       ss << "ec overwrites cannot be disabled once enabled";
       return -EINVAL;
@@ -6030,12 +6718,6 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
       ss << "expecting value 'true', 'false', '0', or '1'";
       return -EINVAL;
     }
-    stringstream err;
-    if (!g_conf->mon_debug_no_require_bluestore_for_ec_overwrites &&
-       !is_pool_currently_all_bluestore(pool, p, &err)) {
-      ss << "pool must only be stored on bluestore for scrubbing to work: " << err.str();
-      return -EINVAL;
-    }
   } else if (var == "target_max_objects") {
     if (interr.length()) {
       ss << "error parsing int '" << val << "': " << interr;
@@ -6136,32 +6818,37 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
       return -EINVAL;
     }
   } else if (pool_opts_t::is_opt_name(var)) {
+    bool unset = val == "unset";
     if (var == "compression_mode") {
-      auto cmode = Compressor::get_comp_mode_type(val);
-      if (!cmode) {
-       ss << "unrecognized compression mode '" << val << "'";
-       return EINVAL;
+      if (!unset) {
+        auto cmode = Compressor::get_comp_mode_type(val);
+        if (!cmode) {
+         ss << "unrecognized compression mode '" << val << "'";
+         return -EINVAL;
+        }
       }
     } else if (var == "compression_algorithm") {
-      auto alg = Compressor::get_comp_alg_type(val);
-      if (!alg) {
-        ss << "unrecognized compression_algorithm '" << val << "'";
-       return EINVAL;
+      if (!unset) {
+        auto alg = Compressor::get_comp_alg_type(val);
+        if (!alg) {
+          ss << "unrecognized compression_algorithm '" << val << "'";
+         return -EINVAL;
+        }
       }
     } else if (var == "compression_required_ratio") {
       if (floaterr.length()) {
         ss << "error parsing float value '" << val << "': " << floaterr;
         return -EINVAL;
       }
-      if (f < 0 || f>1) {
+      if (f < 0 || f > 1) {
         ss << "compression_required_ratio is out of range (0-1): '" << val << "'";
-       return EINVAL;
+       return -EINVAL;
       }
     } else if (var == "csum_type") {
-      auto t = val != "unset" ? Checksummer::get_csum_string_type(val) : 0;
+      auto t = unset ? 0 : Checksummer::get_csum_string_type(val);
       if (t < 0 ) {
         ss << "unrecognized csum_type '" << val << "'";
-       return EINVAL;
+       return -EINVAL;
       }
       //preserve csum_type numeric value
       n = t;
@@ -6179,7 +6866,7 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
     pool_opts_t::opt_desc_t desc = pool_opts_t::get_opt_desc(var);
     switch (desc.type) {
     case pool_opts_t::STR:
-      if (val.empty()) {
+      if (unset) {
        p.opts.unset(desc.key);
       } else {
        p.opts.set(desc.key, static_cast<std::string>(val));
@@ -6214,7 +6901,163 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
     ss << "unrecognized variable '" << var << "'";
     return -EINVAL;
   }
-  ss << "set pool " << pool << " " << var << " to " << val;
+  if (val != "unset") {
+    ss << "set pool " << pool << " " << var << " to " << val;
+  } else {
+    ss << "unset pool " << pool << " " << var;
+  }
+  p.last_change = pending_inc.epoch;
+  pending_inc.new_pools[pool] = p;
+  return 0;
+}
+
+int OSDMonitor::prepare_command_pool_application(const string &prefix,
+                                                 map<string,cmd_vartype> &cmdmap,
+                                                 stringstream& ss)
+{
+  string pool_name;
+  cmd_getval(g_ceph_context, cmdmap, "pool", pool_name);
+  int64_t pool = osdmap.lookup_pg_pool_name(pool_name.c_str());
+  if (pool < 0) {
+    ss << "unrecognized pool '" << pool_name << "'";
+    return -ENOENT;
+  }
+
+  pg_pool_t p = *osdmap.get_pg_pool(pool);
+  if (pending_inc.new_pools.count(pool)) {
+    p = pending_inc.new_pools[pool];
+  }
+
+  string app;
+  cmd_getval(g_ceph_context, cmdmap, "app", app);
+  bool app_exists = (p.application_metadata.count(app) > 0);
+
+  if (boost::algorithm::ends_with(prefix, "enable")) {
+    if (app.empty()) {
+      ss << "application name must be provided";
+      return -EINVAL;
+    }
+
+    if (p.is_tier()) {
+      ss << "application must be enabled on base tier";
+      return -EINVAL;
+    }
+
+    string force;
+    cmd_getval(g_ceph_context, cmdmap, "force", force);
+
+    if (!app_exists && !p.application_metadata.empty() &&
+        force != "--yes-i-really-mean-it") {
+      ss << "Are you SURE? Pool '" << pool_name << "' already has an enabled "
+         << "application; pass --yes-i-really-mean-it to proceed anyway";
+      return -EPERM;
+    }
+
+    if (!app_exists && p.application_metadata.size() >= MAX_POOL_APPLICATIONS) {
+      ss << "too many enabled applications on pool '" << pool_name << "'; "
+         << "max " << MAX_POOL_APPLICATIONS;
+      return -EINVAL;
+    }
+
+    if (app.length() > MAX_POOL_APPLICATION_LENGTH) {
+      ss << "application name '" << app << "' too long; max length "
+         << MAX_POOL_APPLICATION_LENGTH;
+      return -EINVAL;
+    }
+
+    if (!app_exists) {
+      p.application_metadata[app] = {};
+    }
+    ss << "enabled application '" << app << "' on pool '" << pool_name << "'";
+
+  } else if (boost::algorithm::ends_with(prefix, "disable")) {
+    string force;
+    cmd_getval(g_ceph_context, cmdmap, "force", force);
+
+    if (force != "--yes-i-really-mean-it") {
+      ss << "Are you SURE? Disabling an application within a pool might result "
+         << "in loss of application functionality; pass "
+         << "--yes-i-really-mean-it to proceed anyway";
+      return -EPERM;
+    }
+
+    if (!app_exists) {
+      ss << "application '" << app << "' is not enabled on pool '" << pool_name
+         << "'";
+      return 0; // idempotent
+    }
+
+    p.application_metadata.erase(app);
+    ss << "disable application '" << app << "' on pool '" << pool_name << "'";
+
+  } else if (boost::algorithm::ends_with(prefix, "set")) {
+    if (p.is_tier()) {
+      ss << "application metadata must be set on base tier";
+      return -EINVAL;
+    }
+
+    if (!app_exists) {
+      ss << "application '" << app << "' is not enabled on pool '" << pool_name
+         << "'";
+      return -ENOENT;
+    }
+
+    string key;
+    cmd_getval(g_ceph_context, cmdmap, "key", key);
+
+    if (key.empty()) {
+      ss << "key must be provided";
+      return -EINVAL;
+    }
+
+    auto &app_keys = p.application_metadata[app];
+    if (app_keys.count(key) == 0 &&
+        app_keys.size() >= MAX_POOL_APPLICATION_KEYS) {
+      ss << "too many keys set for application '" << app << "' on pool '"
+         << pool_name << "'; max " << MAX_POOL_APPLICATION_KEYS;
+      return -EINVAL;
+    }
+
+    if (key.length() > MAX_POOL_APPLICATION_LENGTH) {
+      ss << "key '" << app << "' too long; max length "
+         << MAX_POOL_APPLICATION_LENGTH;
+      return -EINVAL;
+    }
+
+    string value;
+    cmd_getval(g_ceph_context, cmdmap, "value", value);
+    if (value.length() > MAX_POOL_APPLICATION_LENGTH) {
+      ss << "value '" << value << "' too long; max length "
+         << MAX_POOL_APPLICATION_LENGTH;
+      return -EINVAL;
+    }
+
+    p.application_metadata[app][key] = value;
+    ss << "set application '" << app << "' key '" << key << "' to '"
+       << value << "' on pool '" << pool_name << "'";
+  } else if (boost::algorithm::ends_with(prefix, "rm")) {
+    if (!app_exists) {
+      ss << "application '" << app << "' is not enabled on pool '" << pool_name
+         << "'";
+      return -ENOENT;
+    }
+
+    string key;
+    cmd_getval(g_ceph_context, cmdmap, "key", key);
+    auto it = p.application_metadata[app].find(key);
+    if (it == p.application_metadata[app].end()) {
+      ss << "application '" << app << "' on pool '" << pool_name
+         << "' does not have key '" << key << "'";
+      return 0; // idempotent
+    }
+
+    p.application_metadata[app].erase(it);
+    ss << "removed application '" << app << "' key '" << key << "' on pool '"
+       << pool_name << "'";
+  } else {
+    assert(false);
+  }
+
   p.last_change = pending_inc.epoch;
   pending_inc.new_pools[pool] = p;
   return 0;
@@ -6302,6 +7145,7 @@ int32_t OSDMonitor::_allocate_osd_id(int32_t* existing_id)
 void OSDMonitor::do_osd_create(
     const int32_t id,
     const uuid_d& uuid,
+    const string& device_class,
     int32_t* new_id)
 {
   dout(10) << __func__ << " uuid " << uuid << dendl;
@@ -6335,7 +7179,6 @@ void OSDMonitor::do_osd_create(
     assert(allocated_id < 0);
     pending_inc.new_weight[existing_id] = CEPH_OSD_OUT;
     *new_id = existing_id;
-
   } else if (allocated_id >= 0) {
     assert(existing_id < 0);
     // raise max_osd
@@ -6351,6 +7194,33 @@ void OSDMonitor::do_osd_create(
   }
 
 out:
+  if (device_class.size()) {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    if (newcrush.get_max_devices() < *new_id + 1) {
+      newcrush.set_max_devices(*new_id + 1);
+    }
+    string name = string("osd.") + stringify(*new_id);
+    if (!newcrush.item_exists(*new_id)) {
+      newcrush.set_item_name(*new_id, name);
+    }
+    ostringstream ss;
+    int r = newcrush.update_device_class(*new_id, device_class, name, &ss);
+    if (r < 0) {
+      derr << __func__ << " failed to set " << name << " device_class "
+          << device_class << ": " << cpp_strerror(r) << " - " << ss.str()
+          << dendl;
+      // non-fatal... this might be a replay and we want to be idempotent.
+    } else {
+      dout(20) << __func__ << " set " << name << " device_class " << device_class
+              << dendl;
+      pending_inc.crush.clear();
+      newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    }
+  } else {
+    dout(20) << __func__ << " no device_class" << dendl;
+  }
+
   dout(10) << __func__ << " using id " << *new_id << dendl;
   if (osdmap.get_max_osd() <= *new_id && pending_inc.new_max_osd <= *new_id) {
     pending_inc.new_max_osd = *new_id + 1;
@@ -6440,6 +7310,11 @@ int OSDMonitor::prepare_command_osd_create(
 {
   dout(10) << __func__ << " id " << id << " uuid " << uuid << dendl;
   assert(existing_id);
+  if (osdmap.is_destroyed(id)) {
+    ss << "ceph osd create has been deprecated. Please use ceph osd new "
+          "instead.";
+    return -EINVAL;
+  }
 
   if (uuid.is_zero()) {
     dout(10) << __func__ << " no uuid; assuming legacy `osd create`" << dendl;
@@ -6451,7 +7326,7 @@ int OSDMonitor::prepare_command_osd_create(
 int OSDMonitor::prepare_command_osd_new(
     MonOpRequestRef op,
     const map<string,cmd_vartype>& cmdmap,
-    const map<string,string>& secrets,
+    const map<string,string>& params,
     stringstream &ss,
     Formatter *f)
 {
@@ -6560,9 +7435,9 @@ int OSDMonitor::prepare_command_osd_new(
 
   dout(10) << __func__ << " id " << id << " uuid " << uuid << dendl;
 
-  if (may_be_idempotent && secrets.empty()) {
+  if (may_be_idempotent && params.empty()) {
     // nothing to do, really.
-    dout(10) << __func__ << " idempotent and no secrets -- no op." << dendl;
+    dout(10) << __func__ << " idempotent and no params -- no op." << dendl;
     assert(id >= 0);
     if (f) {
       f->open_object_section("created_osd");
@@ -6574,30 +7449,38 @@ int OSDMonitor::prepare_command_osd_new(
     return EEXIST;
   }
 
+  string device_class;
+  auto p = params.find("crush_device_class");
+  if (p != params.end()) {
+    device_class = p->second;
+    dout(20) << __func__ << " device_class will be " << device_class << dendl;
+  }
   string cephx_secret, lockbox_secret, dmcrypt_key;
   bool has_lockbox = false;
-  bool has_secrets = (!secrets.empty());
+  bool has_secrets = params.count("cephx_secret")
+    || params.count("cephx_lockbox_secret")
+    || params.count("dmcrypt_key");
 
   ConfigKeyService *svc = nullptr;
   AuthMonitor::auth_entity_t cephx_entity, lockbox_entity;
 
   if (has_secrets) {
-    if (secrets.count("cephx_secret") == 0) {
+    if (params.count("cephx_secret") == 0) {
       ss << "requires a cephx secret.";
       return -EINVAL;
     }
-    cephx_secret = secrets.at("cephx_secret");
+    cephx_secret = params.at("cephx_secret");
 
-    bool has_lockbox_secret = (secrets.count("cephx_lockbox_secret") > 0);
-    bool has_dmcrypt_key = (secrets.count("dmcrypt_key") > 0);
+    bool has_lockbox_secret = (params.count("cephx_lockbox_secret") > 0);
+    bool has_dmcrypt_key = (params.count("dmcrypt_key") > 0);
 
     dout(10) << __func__ << " has lockbox " << has_lockbox_secret
              << " dmcrypt " << has_dmcrypt_key << dendl;
 
     if (has_lockbox_secret && has_dmcrypt_key) {
       has_lockbox = true;
-      lockbox_secret = secrets.at("cephx_lockbox_secret");
-      dmcrypt_key = secrets.at("dmcrypt_key");
+      lockbox_secret = params.at("cephx_lockbox_secret");
+      dmcrypt_key = params.at("dmcrypt_key");
     } else if (!has_lockbox_secret != !has_dmcrypt_key) {
       ss << "requires both a cephx lockbox secret and a dm-crypt key.";
       return -EINVAL;
@@ -6676,11 +7559,17 @@ int OSDMonitor::prepare_command_osd_new(
     assert(osdmap.is_destroyed(id));
     pending_inc.new_weight[id] = CEPH_OSD_OUT;
     pending_inc.new_state[id] |= CEPH_OSD_DESTROYED | CEPH_OSD_NEW;
+    if (osdmap.get_state(id) & CEPH_OSD_UP) {
+      // due to http://tracker.ceph.com/issues/20751 some clusters may
+      // have UP set for non-existent OSDs; make sure it is cleared
+      // for a newly created osd.
+      pending_inc.new_state[id] |= CEPH_OSD_UP;
+    }
     pending_inc.new_uuid[id] = uuid;
   } else {
     assert(id >= 0);
     int32_t new_id = -1;
-    do_osd_create(id, uuid, &new_id);
+    do_osd_create(id, uuid, device_class, &new_id);
     assert(new_id >= 0);
     assert(id == new_id);
   }
@@ -6792,7 +7681,6 @@ int OSDMonitor::prepare_command_osd_destroy(
   if (err < 0) {
     if (err == -ENOENT) {
       idempotent_auth = true;
-      err = 0;
     } else {
       return err;
     }
@@ -6915,7 +7803,12 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   int64_t osdid;
   string name;
-  bool osdid_present = cmd_getval(g_ceph_context, cmdmap, "id", osdid);
+  bool osdid_present = false;
+  if (prefix != "osd pg-temp" &&
+      prefix != "osd pg-upmap" &&
+      prefix != "osd pg-upmap-items") {  // avoid commands with non-int id arg
+    osdid_present = cmd_getval(g_ceph_context, cmdmap, "id", osdid);
+  }
   if (osdid_present) {
     ostringstream oss;
     oss << "osd." << osdid;
@@ -6998,7 +7891,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
     }
 
-    if (crush.has_legacy_rulesets()) {
+    if (crush.has_legacy_rule_ids()) {
       err = -EINVAL;
       ss << "crush maps with ruleset != ruleid are no longer allowed";
       goto reply;
@@ -7008,46 +7901,62 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    const auto& osdmap_pools = osdmap.get_pools();
-    for (auto pit = osdmap_pools.begin(); pit != osdmap_pools.end(); ++pit) {
-      const int64_t pool_id = pit->first;
-      const pg_pool_t &pool = pit->second;
-      int ruleno = pool.get_crush_rule();
-      if (!crush.rule_exists(ruleno)) {
-       ss << " the crush rule no "<< ruleno << " for pool id " << pool_id << " is in use";
-       err = -EINVAL;
-       goto reply;
-      }
-    }
-
-    // sanity check: test some inputs to make sure this map isn't totally broken
-    dout(10) << " testing map" << dendl;
-    stringstream ess;
-    CrushTester tester(crush, ess);
-    // XXX: Use mon_lease as a timeout value for crushtool.
-    // If the crushtool consistently takes longer than 'mon_lease' seconds,
-    // then we would consistently trigger an election before the command
-    // finishes, having a flapping monitor unable to hold quorum.
-    int r = tester.test_with_crushtool(g_conf->crushtool.c_str(),
-                                      osdmap.get_max_osd(),
-                                      g_conf->mon_lease);
-    if (r < 0) {
-      derr << "error on crush map: " << ess.str() << dendl;
-      ss << "Failed crushmap test: " << ess.str();
-      err = r;
+    err = osdmap.validate_crush_rules(&crush, &ss);
+    if (err < 0) {
       goto reply;
     }
 
-    dout(10) << " result " << ess.str() << dendl;
+    if (g_conf->mon_osd_crush_smoke_test) {
+      // sanity check: test some inputs to make sure this map isn't
+      // totally broken
+      dout(10) << " testing map" << dendl;
+      stringstream ess;
+      CrushTester tester(crush, ess);
+      tester.set_min_x(0);
+      tester.set_max_x(50);
+      auto start = ceph::coarse_mono_clock::now();
+      int r = tester.test_with_fork(g_conf->mon_lease);
+      auto duration = ceph::coarse_mono_clock::now() - start;
+      if (r < 0) {
+       dout(10) << " tester.test_with_fork returns " << r
+                << ": " << ess.str() << dendl;
+       ss << "crush smoke test failed with " << r << ": " << ess.str();
+       err = r;
+       goto reply;
+      }
+      dout(10) << __func__ << " crush somke test duration: "
+               << duration << ", result: " << ess.str() << dendl;
+    }
 
     pending_inc.crush = data;
     ss << osdmap.get_crush_version() + 1;
     goto update;
 
+  } else if (prefix == "osd crush set-all-straw-buckets-to-straw2") {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    for (int b = 0; b < newcrush.get_max_buckets(); ++b) {
+      int bid = -1 - b;
+      if (newcrush.bucket_exists(bid) &&
+         newcrush.get_bucket_alg(bid)) {
+       dout(20) << " bucket " << bid << " is straw, can convert" << dendl;
+       newcrush.bucket_set_alg(bid, CRUSH_BUCKET_STRAW2);
+      }
+    }
+    if (!validate_crush_against_features(&newcrush, ss)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
+                                             get_last_committed() + 1));
+    return true;
   } else if (prefix == "osd crush set-device-class") {
-    if (!osdmap.exists(osdid)) {
-      err = -ENOENT;
-      ss << name << " does not exist.  create it before updating the crush map";
+    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+      ss << "you must complete the upgrade and 'ceph osd require-osd-release "
+         << "luminous' before using crush device classes";
+      err = -EPERM;
       goto reply;
     }
 
@@ -7057,39 +7966,174 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
+    bool stop = false;
+    vector<string> idvec;
+    cmd_getval(g_ceph_context, cmdmap, "ids", idvec);
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    set<int> updated;
+    for (unsigned j = 0; j < idvec.size() && !stop; j++) {
+      set<int> osds;
+      // wildcard?
+      if (j == 0 &&
+          (idvec[0] == "any" || idvec[0] == "all" || idvec[0] == "*")) {
+        osdmap.get_all_osds(osds);
+        stop = true;
+      } else {
+        // try traditional single osd way
+        long osd = parse_osd_id(idvec[j].c_str(), &ss);
+        if (osd < 0) {
+          // ss has reason for failure
+          ss << ", unable to parse osd id:\"" << idvec[j] << "\". ";
+          err = -EINVAL;
+          continue;
+        }
+        osds.insert(osd);
+      }
+
+      for (auto &osd : osds) {
+        if (!osdmap.exists(osd)) {
+          ss << "osd." << osd << " does not exist. ";
+          continue;
+        }
+
+        ostringstream oss;
+        oss << "osd." << osd;
+        string name = oss.str();
+
+       if (newcrush.get_max_devices() < osd + 1) {
+         newcrush.set_max_devices(osd + 1);
+       }
+        string action;
+        if (newcrush.item_exists(osd)) {
+          action = "updating";
+        } else {
+          action = "creating";
+          newcrush.set_item_name(osd, name);
+        }
+
+        dout(5) << action << " crush item id " << osd << " name '" << name
+                << "' device_class '" << device_class << "'"
+                << dendl;
+        err = newcrush.update_device_class(osd, device_class, name, &ss);
+        if (err < 0) {
+          goto reply;
+        }
+        if (err == 0 && !_have_pending_crush()) {
+          if (!stop) {
+            // for single osd only, wildcard makes too much noise
+            ss << "set-device-class item id " << osd << " name '" << name
+               << "' device_class '" << device_class << "': no change";
+          }
+        } else {
+          updated.insert(osd);
+        }
+      }
+    }
+
+    if (!updated.empty()) {
+      pending_inc.crush.clear();
+      newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+      ss << "set osd(s) " << updated << " to class '" << device_class << "'";
+      getline(ss, rs);
+      wait_for_finished_proposal(op,
+        new Monitor::C_Command(mon,op, 0, rs, get_last_committed() + 1));
+      return true;
+    }
+
+ } else if (prefix == "osd crush rm-device-class") {
+    bool stop = false;
+    vector<string> idvec;
+    cmd_getval(g_ceph_context, cmdmap, "ids", idvec);
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
+    set<int> updated;
 
-    string action;
-    if (newcrush.item_exists(osdid)) {
-      action = "updating";
-    } else {
-      action = "creating";
-      newcrush.set_item_name(osdid, name);
+    for (unsigned j = 0; j < idvec.size() && !stop; j++) {
+      set<int> osds;
+
+      // wildcard?
+      if (j == 0 &&
+          (idvec[0] == "any" || idvec[0] == "all" || idvec[0] == "*")) {
+        osdmap.get_all_osds(osds);
+        stop = true;
+      } else {
+        // try traditional single osd way
+        long osd = parse_osd_id(idvec[j].c_str(), &ss);
+        if (osd < 0) {
+          // ss has reason for failure
+          ss << ", unable to parse osd id:\"" << idvec[j] << "\". ";
+          err = -EINVAL;
+          goto reply;
+        }
+        osds.insert(osd);
+      }
+
+      for (auto &osd : osds) {
+        if (!osdmap.exists(osd)) {
+          ss << "osd." << osd << " does not exist. ";
+          continue;
+        }
+
+        auto class_name = newcrush.get_item_class(osd);
+        if (!class_name) {
+          ss << "osd." << osd << " belongs to no class, ";
+          continue;
+        }
+        // note that we do not verify if class_is_in_use here
+        // in case the device is misclassified and user wants
+        // to overridely reset...
+
+        err = newcrush.remove_device_class(g_ceph_context, osd, &ss);
+        if (err < 0) {
+          // ss has reason for failure
+          goto reply;
+        }
+        updated.insert(osd);
+      }
     }
 
-    dout(5) << action << " crush item id " << osdid << " name '"
-           << name << "' device_class " << device_class << dendl;
-    err = newcrush.update_device_class(g_ceph_context, osdid, device_class, name);
+    if (!updated.empty()) {
+      pending_inc.crush.clear();
+      newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+      ss << "done removing class of osd(s): " << updated;
+      getline(ss, rs);
+      wait_for_finished_proposal(op,
+        new Monitor::C_Command(mon,op, 0, rs, get_last_committed() + 1));
+      return true;
+    }
+  } else if (prefix == "osd crush class rename") {
+    string srcname, dstname;
+    if (!cmd_getval(g_ceph_context, cmdmap, "srcname", srcname)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    if (!cmd_getval(g_ceph_context, cmdmap, "dstname", dstname)) {
+      err = -EINVAL;
+      goto reply;
+    }
 
-    if (err < 0)
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    if (!newcrush.class_exists(srcname) && newcrush.class_exists(dstname)) {
+      // suppose this is a replay and return success
+      // so command is idempotent
+      ss << "already renamed to '" << dstname << "'";
+      err = 0;
       goto reply;
+    }
 
-    if (err == 0 && !_have_pending_crush()) {
-      ss << "set-device-class item id " << osdid << " name '" << name << "' device_class "
-        << device_class << " : no change";
+    err = newcrush.rename_class(srcname, dstname);
+    if (err < 0) {
+      ss << "fail to rename '" << srcname << "' to '" << dstname << "' : "
+         << cpp_strerror(err);
       goto reply;
     }
 
     pending_inc.crush.clear();
     newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-    ss << "set-device-class item id " << osdid << " name '" << name << "' device_class "
-       << device_class;
-    getline(ss, rs);
-    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
-                                                     get_last_committed() + 1));
-    return true;
-
+    ss << "rename class '" << srcname << "' to '" << dstname << "'";
+    goto update;
   } else if (prefix == "osd crush add-bucket") {
     // os crush add-bucket <name> <type>
     string name, typestr;
@@ -7151,72 +8195,129 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     else
       goto update;
-  } else if (prefix == "osd crush class create") {
-    string device_class;
-    if (!cmd_getval(g_ceph_context, cmdmap, "class", device_class)) {
-      err = -EINVAL; // no value!
+  } else if (prefix == "osd crush weight-set create" ||
+            prefix == "osd crush weight-set create-compat") {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    int64_t pool;
+    int positions;
+    if (newcrush.has_non_straw2_buckets()) {
+      ss << "crush map contains one or more bucket(s) that are not straw2";
+      err = -EPERM;
       goto reply;
     }
-
-    if (!_have_pending_crush() &&
-       _get_stable_crush().class_exists(device_class)) {
-      ss << "class '" << device_class << "' already exists";
-      goto reply;
+    if (prefix == "osd crush weight-set create") {
+      if (osdmap.require_min_compat_client > 0 &&
+         osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
+       ss << "require_min_compat_client "
+          << ceph_release_name(osdmap.require_min_compat_client)
+          << " < luminous, which is required for per-pool weight-sets. "
+           << "Try 'ceph osd set-require-min-compat-client luminous' "
+           << "before using the new interface";
+       err = -EPERM;
+       goto reply;
+      }
+      string poolname, mode;
+      cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
+      pool = osdmap.lookup_pg_pool_name(poolname.c_str());
+      if (pool < 0) {
+       ss << "pool '" << poolname << "' not found";
+       err = -ENOENT;
+       goto reply;
+      }
+      cmd_getval(g_ceph_context, cmdmap, "mode", mode);
+      if (mode != "flat" && mode != "positional") {
+       ss << "unrecognized weight-set mode '" << mode << "'";
+       err = -EINVAL;
+       goto reply;
+      }
+      positions = mode == "flat" ? 1 : osdmap.get_pg_pool(pool)->get_size();
+    } else {
+      pool = CrushWrapper::DEFAULT_CHOOSE_ARGS;
+      positions = 1;
     }
+    newcrush.create_choose_args(pool, positions);
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    goto update;
 
+  } else if (prefix == "osd crush weight-set rm" ||
+            prefix == "osd crush weight-set rm-compat") {
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
-
-    if (newcrush.class_exists(name)) {
-      ss << "class '" << device_class << "' already exists";
-      goto update;
+    int64_t pool;
+    if (prefix == "osd crush weight-set rm") {
+      string poolname;
+      cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
+      pool = osdmap.lookup_pg_pool_name(poolname.c_str());
+      if (pool < 0) {
+       ss << "pool '" << poolname << "' not found";
+       err = -ENOENT;
+       goto reply;
+      }
+    } else {
+      pool = CrushWrapper::DEFAULT_CHOOSE_ARGS;
     }
-
-    int class_id = newcrush.get_or_create_class_id(device_class);
-
+    newcrush.rm_choose_args(pool);
     pending_inc.crush.clear();
     newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-    ss << "created class " << device_class << " with id " << class_id
-       << " to crush map";
     goto update;
-    
-  } else if (prefix == "osd crush class rm") {
-    string device_class;
-    if (!cmd_getval(g_ceph_context, cmdmap, "class", device_class)) {
-      err = -EINVAL; // no value!
-      goto reply;
+
+  } else if (prefix == "osd crush weight-set reweight" ||
+            prefix == "osd crush weight-set reweight-compat") {
+    string poolname, item;
+    vector<double> weight;
+    cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
+    cmd_getval(g_ceph_context, cmdmap, "item", item);
+    cmd_getval(g_ceph_context, cmdmap, "weight", weight);
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    int64_t pool;
+    if (prefix == "osd crush weight-set reweight") {
+      pool = osdmap.lookup_pg_pool_name(poolname.c_str());
+      if (pool < 0) {
+       ss << "pool '" << poolname << "' not found";
+       err = -ENOENT;
+       goto reply;
+      }
+      if (!newcrush.have_choose_args(pool)) {
+       ss << "no weight-set for pool '" << poolname << "'";
+       err = -ENOENT;
+       goto reply;
+      }
+      auto arg_map = newcrush.choose_args_get(pool);
+      int positions = newcrush.get_choose_args_positions(arg_map);
+      if (weight.size() != (size_t)positions) {
+         ss << "must specify exact " << positions << " weight values";
+         err = -EINVAL;
+         goto reply;
+      }
+    } else {
+      pool = CrushWrapper::DEFAULT_CHOOSE_ARGS;
+      if (!newcrush.have_choose_args(pool)) {
+       ss << "no backward-compatible weight-set";
+       err = -ENOENT;
+       goto reply;
+      }
     }
-
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
-
-    if (!newcrush.class_exists(device_class)) {
+    if (!newcrush.name_exists(item)) {
+      ss << "item '" << item << "' does not exist";
       err = -ENOENT;
-      ss << "class '" << device_class << "' does not exist";
       goto reply;
     }
-
-    int class_id = newcrush.get_class_id(device_class);
-
-    if (newcrush.class_is_in_use(class_id)) {
-      err = -EBUSY;
-      ss << "class '" << device_class << "' is in use";
-      goto reply;
-    }
-
-    err = newcrush.remove_class_name(device_class);
+    err = newcrush.choose_args_adjust_item_weightf(
+      g_ceph_context,
+      newcrush.choose_args_get(pool),
+      newcrush.get_item_id(item),
+      weight,
+      &ss);
     if (err < 0) {
-      ss << "class '" << device_class << "' cannot be removed '"
-        << cpp_strerror(err) << "'";
       goto reply;
     }
-
+    err = 0;
     pending_inc.crush.clear();
     newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-    ss << "removed class " << device_class << " with id " << class_id
-       << " from crush map";
     goto update;
-    
   } else if (osdid_present &&
             (prefix == "osd crush set" || prefix == "osd crush add")) {
     // <OsdName> is 'osd.<id>' or '<id>', passed as int64_t id
@@ -7225,7 +8326,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     if (!osdmap.exists(osdid)) {
       err = -ENOENT;
-      ss << name << " does not exist.  create it before updating the crush map";
+      ss << name << " does not exist. Create it before updating the crush map";
       goto reply;
     }
 
@@ -7414,6 +8515,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     int r = newcrush.swap_bucket(g_ceph_context, sid, did);
     if (r < 0) {
       ss << "failed to swap bucket contents: " << cpp_strerror(r);
+      err = r;
       goto reply;
     }
     ss << "swapped bucket of " << source << " to " << dest;
@@ -7539,7 +8641,6 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     } while (false);
 
   } else if (prefix == "osd crush reweight-all") {
-    // osd crush reweight <name> <weight>
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
 
@@ -7682,7 +8783,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     if (tunable == "straw_calc_version") {
-      if (value < 0 || value > 1) {
+      if (value != 0 && value != 1) {
        ss << "value must be 0 or 1; got " << value;
        err = -EINVAL;
        goto reply;
@@ -7733,7 +8834,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       ss << "rule " << name << " already exists";
       err = 0;
     } else {
-      int ruleno = newcrush.add_simple_rule(name, root, type, mode,
+      int ruleno = newcrush.add_simple_rule(name, root, type, "", mode,
                                               pg_pool_t::TYPE_REPLICATED, &ss);
       if (ruleno < 0) {
        err = ruleno;
@@ -7748,6 +8849,55 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                              get_last_committed() + 1));
     return true;
 
+  } else if (prefix == "osd crush rule create-replicated") {
+    string name, root, type, device_class;
+    cmd_getval(g_ceph_context, cmdmap, "name", name);
+    cmd_getval(g_ceph_context, cmdmap, "root", root);
+    cmd_getval(g_ceph_context, cmdmap, "type", type);
+    cmd_getval(g_ceph_context, cmdmap, "class", device_class);
+
+    if (!device_class.empty()) {
+      if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+        ss << "you must complete the upgrade and 'ceph osd require-osd-release "
+           << "luminous' before using crush device classes";
+        err = -EPERM;
+        goto reply;
+      }
+    }
+
+    if (osdmap.crush->rule_exists(name)) {
+      // The name is uniquely associated to a ruleid and the rule it contains
+      // From the user point of view, the rule is more meaningfull.
+      ss << "rule " << name << " already exists";
+      err = 0;
+      goto reply;
+    }
+
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+
+    if (newcrush.rule_exists(name)) {
+      // The name is uniquely associated to a ruleid and the rule it contains
+      // From the user point of view, the rule is more meaningfull.
+      ss << "rule " << name << " already exists";
+      err = 0;
+    } else {
+      int ruleno = newcrush.add_simple_rule(
+       name, root, type, device_class,
+       "firstn", pg_pool_t::TYPE_REPLICATED, &ss);
+      if (ruleno < 0) {
+       err = ruleno;
+       goto reply;
+      }
+
+      pending_inc.crush.clear();
+      newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    }
+    getline(ss, rs);
+    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
+                                             get_last_committed() + 1));
+    return true;
+
   } else if (prefix == "osd erasure-code-profile rm") {
     string name;
     cmd_getval(g_ceph_context, cmdmap, "name", name);
@@ -7941,7 +9091,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       // FIXME: this is ok in some situations, but let's not bother with that
       // complexity now.
       int ruleset = newcrush.get_rule_mask_ruleset(ruleno);
-      if (osdmap.crush_ruleset_in_use(ruleset)) {
+      if (osdmap.crush_rule_in_use(ruleset)) {
        ss << "crush ruleset " << name << " " << ruleset << " is in use";
        err = -EBUSY;
        goto reply;
@@ -7960,6 +9110,45 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                              get_last_committed() + 1));
     return true;
 
+  } else if (prefix == "osd crush rule rename") {
+    string srcname;
+    string dstname;
+    cmd_getval(g_ceph_context, cmdmap, "srcname", srcname);
+    cmd_getval(g_ceph_context, cmdmap, "dstname", dstname);
+    if (srcname.empty() || dstname.empty()) {
+      ss << "must specify both source rule name and destination rule name";
+      err = -EINVAL;
+      goto reply;
+    }
+    if (srcname == dstname) {
+      ss << "destination rule name is equal to source rule name";
+      err = 0;
+      goto reply;
+    }
+
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    if (!newcrush.rule_exists(srcname) && newcrush.rule_exists(dstname)) {
+      // srcname does not exist and dstname already exists
+      // suppose this is a replay and return success
+      // (so this command is idempotent)
+      ss << "already renamed to '" << dstname << "'";
+      err = 0;
+      goto reply;
+    }
+
+    err = newcrush.rename_rule(srcname, dstname, &ss);
+    if (err < 0) {
+      // ss has reason for failure
+      goto reply;
+    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    getline(ss, rs);
+    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
+                               get_last_committed() + 1));
+    return true;
+
   } else if (prefix == "osd setmaxosd") {
     int64_t newmax;
     if (!cmd_getval(g_ceph_context, cmdmap, "newmax", newmax)) {
@@ -8004,15 +9193,15 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             prefix == "osd set-backfillfull-ratio" ||
              prefix == "osd set-nearfull-ratio") {
     if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      ss << "you must complete the upgrade and set require_osd_release ="
-        << "luminous before using the new interface";
+      ss << "you must complete the upgrade and 'ceph osd require-osd-release "
+        << "luminous' before using the new interface";
       err = -EPERM;
       goto reply;
     }
     double n;
     if (!cmd_getval(g_ceph_context, cmdmap, "ratio", n)) {
       ss << "unable to parse 'ratio' value '"
-         << cmd_vartype_stringify(cmdmap["who"]) << "'";
+         << cmd_vartype_stringify(cmdmap["ratio"]) << "'";
       err = -EINVAL;
       goto reply;
     }
@@ -8029,8 +9218,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd set-require-min-compat-client") {
     if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      ss << "you must complete the upgrade and set require_osd_release ="
-        << "luminous before using the new interface";
+      ss << "you must complete the upgrade and 'ceph osd require-osd-release "
+        << "luminous' before using the new interface";
       err = -EPERM;
       goto reply;
     }
@@ -8107,6 +9296,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return prepare_unset_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
 
   } else if (prefix == "osd set") {
+    string sure;
+    cmd_getval(g_ceph_context, cmdmap, "sure", sure);
     string key;
     cmd_getval(g_ceph_context, cmdmap, "key", key);
     if (key == "full")
@@ -8134,14 +9325,42 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     else if (key == "notieragent")
       return prepare_set_flag(op, CEPH_OSDMAP_NOTIERAGENT);
     else if (key == "sortbitwise") {
-      if (osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT) {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
+      if ((osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT)
+          || sure == "--yes-i-really-mean-it") {
        return prepare_set_flag(op, CEPH_OSDMAP_SORTBITWISE);
       } else {
        ss << "not all up OSDs have OSD_BITWISE_HOBJ_SORT feature";
        err = -EPERM;
        goto reply;
       }
+    } else if (key == "recovery_deletes") {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
+      if (HAVE_FEATURE(osdmap.get_up_osd_features(), OSD_RECOVERY_DELETES)
+          || sure == "--yes-i-really-mean-it") {
+       return prepare_set_flag(op, CEPH_OSDMAP_RECOVERY_DELETES);
+      } else {
+       ss << "not all up OSDs have OSD_RECOVERY_DELETES feature";
+       err = -EPERM;
+       goto reply;
+      }
     } else if (key == "require_jewel_osds") {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_jewel_osds";
        err = -EPERM;
@@ -8150,13 +9369,20 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        ss << "require_osd_release is already >= jewel";
        err = 0;
        goto reply;
-      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_JEWEL)) {
+      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_JEWEL)
+                 || sure == "--yes-i-really-mean-it") {
        return prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_JEWEL);
       } else {
        ss << "not all up OSDs have CEPH_FEATURE_SERVER_JEWEL feature";
        err = -EPERM;
       }
     } else if (key == "require_kraken_osds") {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_kraken_osds";
        err = -EPERM;
@@ -8165,7 +9391,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        ss << "require_osd_release is already >= kraken";
        err = 0;
        goto reply;
-      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_KRAKEN)) {
+      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_KRAKEN)
+                 || sure == "--yes-i-really-mean-it") {
        bool r = prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_KRAKEN);
        // ensure JEWEL is also set
        pending_inc.new_flags |= CEPH_OSDMAP_REQUIRE_JEWEL;
@@ -8206,10 +9433,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       return prepare_unset_flag(op, CEPH_OSDMAP_NODEEP_SCRUB);
     else if (key == "notieragent")
       return prepare_unset_flag(op, CEPH_OSDMAP_NOTIERAGENT);
-    else if (key == "sortbitwise") {
-      ss << "the sortbitwise flag is required and cannot be unset";
-      err = -EPERM;
-    } else {
+    else {
       ss << "unrecognized flag '" << key << "'";
       err = -EINVAL;
     }
@@ -8217,6 +9441,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd require-osd-release") {
     string release;
     cmd_getval(g_ceph_context, cmdmap, "release", release);
+    string sure;
+    cmd_getval(g_ceph_context, cmdmap, "sure", sure);
     if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
       ss << "the sortbitwise flag must be set first";
       err = -EPERM;
@@ -8233,6 +9459,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = -EINVAL;
       goto reply;
     }
+    if (rel == osdmap.require_osd_release) {
+      // idempotent
+      err = 0;
+      goto reply;
+    }
     if (rel == CEPH_RELEASE_LUMINOUS) {
       if (!HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_LUMINOUS)) {
        ss << "not all up OSDs have CEPH_FEATURE_SERVER_LUMINOUS feature";
@@ -8250,6 +9481,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     pending_inc.new_require_osd_release = rel;
+    if (rel >= CEPH_RELEASE_LUMINOUS &&
+       !osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
+      return prepare_set_flag(op, CEPH_OSDMAP_RECOVERY_DELETES);
+    }
     goto update;
   } else if (prefix == "osd cluster_snap") {
     // ** DISABLE THIS FOR NOW **
@@ -8318,6 +9553,18 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
              pending_inc.new_xinfo[osd].old_weight = osdmap.osd_weight[osd];
            }
            ss << "marked out osd." << osd << ". ";
+            std::ostringstream msg;
+            msg << "Client " << op->get_session()->entity_name
+                << " marked osd." << osd << " out";
+            if (osdmap.is_up(osd)) {
+              msg << ", while it was still marked up";
+            } else {
+              auto period = ceph_clock_now() - down_pending_out[osd];
+              msg << ", after it was down for " << int(period.sec())
+                  << " seconds";
+            }
+
+            mon->clog->info() << msg.str();
            any = true;
          }
         } else if (prefix == "osd in") {
@@ -8706,184 +9953,27 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       new_pg_temp.push_back(osd);
     }
 
-    pending_inc.new_pg_temp[pgid] = mempool::osdmap::vector<int>(
-      new_pg_temp.begin(), new_pg_temp.end());
-    ss << "set " << pgid << " pg_temp mapping to " << new_pg_temp;
-    goto update;
-  } else if (prefix == "osd primary-temp") {
-    string pgidstr;
-    if (!cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr)) {
-      ss << "unable to parse 'pgid' value '"
-         << cmd_vartype_stringify(cmdmap["pgid"]) << "'";
-      err = -EINVAL;
-      goto reply;
-    }
-    pg_t pgid;
-    if (!pgid.parse(pgidstr.c_str())) {
-      ss << "invalid pgid '" << pgidstr << "'";
-      err = -EINVAL;
-      goto reply;
-    }
-    if (!osdmap.pg_exists(pgid)) {
-      ss << "pg " << pgid << " does not exist";
-      err = -ENOENT;
-      goto reply;
-    }
-
-    int64_t osd;
-    if (!cmd_getval(g_ceph_context, cmdmap, "id", osd)) {
-      ss << "unable to parse 'id' value '"
-         << cmd_vartype_stringify(cmdmap["id"]) << "'";
-      err = -EINVAL;
-      goto reply;
-    }
-    if (osd != -1 && !osdmap.exists(osd)) {
-      ss << "osd." << osd << " does not exist";
-      err = -ENOENT;
-      goto reply;
-    }
-
-    if (osdmap.require_min_compat_client > 0 &&
-       osdmap.require_min_compat_client < CEPH_RELEASE_FIREFLY) {
-      ss << "require_min_compat_client "
-        << ceph_release_name(osdmap.require_min_compat_client)
-        << " < firefly, which is required for primary-temp";
-      err = -EPERM;
-      goto reply;
-    } else if (!g_conf->mon_osd_allow_primary_temp) {
-      ss << "you must enable 'mon osd allow primary temp = true' on the mons before you can set primary_temp mappings.  note that this is for developers only: older clients/OSDs will break and there is no feature bit infrastructure in place.";
-      err = -EPERM;
-      goto reply;
-    }
-
-    pending_inc.new_primary_temp[pgid] = osd;
-    ss << "set " << pgid << " primary_temp mapping to " << osd;
-    goto update;
-  } else if (prefix == "osd pg-upmap") {
-    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      ss << "you must complete the upgrade and set require_osd_release ="
-        << "luminous before using the new interface";
-      err = -EPERM;
-      goto reply;
-    }
-    if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
-      ss << "min_compat_client "
-        << ceph_release_name(osdmap.require_min_compat_client)
-        << " < luminous, which is required for pg-upmap";
-      err = -EPERM;
-      goto reply;
-    }
-    err = check_cluster_features(CEPH_FEATUREMASK_OSDMAP_PG_UPMAP, ss);
-    if (err == -EAGAIN)
-      goto wait;
-    if (err < 0)
-      goto reply;
-    string pgidstr;
-    if (!cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr)) {
-      ss << "unable to parse 'pgid' value '"
-         << cmd_vartype_stringify(cmdmap["pgid"]) << "'";
-      err = -EINVAL;
-      goto reply;
-    }
-    pg_t pgid;
-    if (!pgid.parse(pgidstr.c_str())) {
-      ss << "invalid pgid '" << pgidstr << "'";
-      err = -EINVAL;
-      goto reply;
-    }
-    if (!osdmap.pg_exists(pgid)) {
-      ss << "pg " << pgid << " does not exist";
-      err = -ENOENT;
-      goto reply;
-    }
-    if (pending_inc.new_pg_upmap.count(pgid) ||
-       pending_inc.old_pg_upmap.count(pgid)) {
-      dout(10) << __func__ << " waiting for pending update on " << pgid << dendl;
-      wait_for_finished_proposal(op, new C_RetryMessage(this, op));
-      return true;
-    }
-    vector<int64_t> id_vec;
-    if (!cmd_getval(g_ceph_context, cmdmap, "id", id_vec)) {
-      ss << "unable to parse 'id' value(s) '"
-         << cmd_vartype_stringify(cmdmap["id"]) << "'";
-      err = -EINVAL;
-      goto reply;
-    }
-    vector<int32_t> new_pg_upmap;
-    for (auto osd : id_vec) {
-      if (osd != CRUSH_ITEM_NONE && !osdmap.exists(osd)) {
-        ss << "osd." << osd << " does not exist";
-        err = -ENOENT;
-        goto reply;
-      }
-      new_pg_upmap.push_back(osd);
-    }
-
-    pending_inc.new_pg_upmap[pgid] = mempool::osdmap::vector<int32_t>(
-      new_pg_upmap.begin(), new_pg_upmap.end());
-    ss << "set " << pgid << " pg_upmap mapping to " << new_pg_upmap;
-    goto update;
-  } else if (prefix == "osd rm-pg-upmap") {
-    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      ss << "you must complete the upgrade and set require_osd_release ="
-        << "luminous before using the new interface";
-      err = -EPERM;
-      goto reply;
-    }
-    if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
-      ss << "require_min_compat_client "
-        << ceph_release_name(osdmap.require_min_compat_client)
-        << " < luminous, which is required for pg-upmap";
-      err = -EPERM;
-      goto reply;
-    }
-    err = check_cluster_features(CEPH_FEATUREMASK_OSDMAP_PG_UPMAP, ss);
-    if (err == -EAGAIN)
-      goto wait;
-    if (err < 0)
-      goto reply;
-    string pgidstr;
-    if (!cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr)) {
-      ss << "unable to parse 'pgid' value '"
-         << cmd_vartype_stringify(cmdmap["pgid"]) << "'";
-      err = -EINVAL;
-      goto reply;
-    }
-    pg_t pgid;
-    if (!pgid.parse(pgidstr.c_str())) {
-      ss << "invalid pgid '" << pgidstr << "'";
-      err = -EINVAL;
-      goto reply;
-    }
-    if (pending_inc.new_pg_upmap.count(pgid) ||
-       pending_inc.old_pg_upmap.count(pgid)) {
-      dout(10) << __func__ << " waiting for pending update on " << pgid << dendl;
-      wait_for_finished_proposal(op, new C_RetryMessage(this, op));
-      return true;
-    }
-
-    pending_inc.old_pg_upmap.insert(pgid);
-    ss << "clear " << pgid << " pg_upmap mapping";
-    goto update;
-  } else if (prefix == "osd pg-upmap-items") {
-    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      ss << "you must complete the upgrade and set require_osd_release ="
-        << "luminous before using the new interface";
-      err = -EPERM;
-      goto reply;
-    }
-    if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
-      ss << "require_min_compat_client "
-        << ceph_release_name(osdmap.require_min_compat_client)
-        << " < luminous, which is required for pg-upmap";
-      err = -EPERM;
+    int pool_min_size = osdmap.get_pg_pool_min_size(pgid);
+    if ((int)new_pg_temp.size() < pool_min_size) {
+      ss << "num of osds (" << new_pg_temp.size() <<") < pool min size ("
+         << pool_min_size << ")";
+      err = -EINVAL;
       goto reply;
     }
-    err = check_cluster_features(CEPH_FEATUREMASK_OSDMAP_PG_UPMAP, ss);
-    if (err == -EAGAIN)
-      goto wait;
-    if (err < 0)
+
+    int pool_size = osdmap.get_pg_pool_size(pgid);
+    if ((int)new_pg_temp.size() > pool_size) {
+      ss << "num of osds (" << new_pg_temp.size() <<") > pool size ("
+         << pool_size << ")";
+      err = -EINVAL;
       goto reply;
+    }
+
+    pending_inc.new_pg_temp[pgid] = mempool::osdmap::vector<int>(
+      new_pg_temp.begin(), new_pg_temp.end());
+    ss << "set " << pgid << " pg_temp mapping to " << new_pg_temp;
+    goto update;
+  } else if (prefix == "osd primary-temp") {
     string pgidstr;
     if (!cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr)) {
       ss << "unable to parse 'pgid' value '"
@@ -8902,57 +9992,52 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = -ENOENT;
       goto reply;
     }
-    if (pending_inc.new_pg_upmap_items.count(pgid) ||
-       pending_inc.old_pg_upmap_items.count(pgid)) {
-      dout(10) << __func__ << " waiting for pending update on " << pgid << dendl;
-      wait_for_finished_proposal(op, new C_RetryMessage(this, op));
-      return true;
-    }
-    vector<int64_t> id_vec;
-    if (!cmd_getval(g_ceph_context, cmdmap, "id", id_vec)) {
-      ss << "unable to parse 'id' value(s) '"
+
+    int64_t osd;
+    if (!cmd_getval(g_ceph_context, cmdmap, "id", osd)) {
+      ss << "unable to parse 'id' value '"
          << cmd_vartype_stringify(cmdmap["id"]) << "'";
       err = -EINVAL;
       goto reply;
     }
-    if (id_vec.size() % 2) {
-      ss << "you must specify pairs of osd ids to be remapped";
-      err = -EINVAL;
+    if (osd != -1 && !osdmap.exists(osd)) {
+      ss << "osd." << osd << " does not exist";
+      err = -ENOENT;
       goto reply;
     }
-    vector<pair<int32_t,int32_t>> new_pg_upmap_items;
-    for (auto p = id_vec.begin(); p != id_vec.end(); ++p) {
-      int from = *p++;
-      int to = *p;
-      if (!osdmap.exists(from)) {
-       ss << "osd." << from << " does not exist";
-       err = -ENOENT;
-       goto reply;
-      }
-      if (to != CRUSH_ITEM_NONE && !osdmap.exists(to)) {
-       ss << "osd." << to << " does not exist";
-       err = -ENOENT;
-       goto reply;
-      }
-      new_pg_upmap_items.push_back(make_pair(from, to));
+
+    if (osdmap.require_min_compat_client > 0 &&
+       osdmap.require_min_compat_client < CEPH_RELEASE_FIREFLY) {
+      ss << "require_min_compat_client "
+        << ceph_release_name(osdmap.require_min_compat_client)
+        << " < firefly, which is required for primary-temp";
+      err = -EPERM;
+      goto reply;
+    } else if (!g_conf->mon_osd_allow_primary_temp) {
+      ss << "you must enable 'mon osd allow primary temp = true' on the mons before you can set primary_temp mappings.  note that this is for developers only: older clients/OSDs will break and there is no feature bit infrastructure in place.";
+      err = -EPERM;
+      goto reply;
     }
 
-    pending_inc.new_pg_upmap_items[pgid] =
-      mempool::osdmap::vector<pair<int32_t,int32_t>>(
-      new_pg_upmap_items.begin(), new_pg_upmap_items.end());
-    ss << "set " << pgid << " pg_upmap_items mapping to " << new_pg_upmap_items;
+    pending_inc.new_primary_temp[pgid] = osd;
+    ss << "set " << pgid << " primary_temp mapping to " << osd;
     goto update;
-  } else if (prefix == "osd rm-pg-upmap-items") {
+  } else if (prefix == "osd pg-upmap" ||
+             prefix == "osd rm-pg-upmap" ||
+             prefix == "osd pg-upmap-items" ||
+             prefix == "osd rm-pg-upmap-items") {
     if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      ss << "you must complete the upgrade and set require_osd_release ="
-        << "luminous before using the new interface";
+      ss << "you must complete the upgrade and 'ceph osd require-osd-release "
+        << "luminous' before using the new interface";
       err = -EPERM;
       goto reply;
     }
     if (osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
-      ss << "require_min_compat_client "
+      ss << "min_compat_client "
         << ceph_release_name(osdmap.require_min_compat_client)
-        << " < luminous, which is required for pg-upmap";
+        << " < luminous, which is required for pg-upmap. "
+         << "Try 'ceph osd set-require-min-compat-client luminous' "
+         << "before using the new interface";
       err = -EPERM;
       goto reply;
     }
@@ -8974,15 +10059,208 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = -EINVAL;
       goto reply;
     }
-    if (pending_inc.new_pg_upmap_items.count(pgid) ||
-       pending_inc.old_pg_upmap_items.count(pgid)) {
-      dout(10) << __func__ << " waiting for pending update on " << pgid << dendl;
-      wait_for_finished_proposal(op, new C_RetryMessage(this, op));
+    if (!osdmap.pg_exists(pgid)) {
+      ss << "pg " << pgid << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+    if (pending_inc.old_pools.count(pgid.pool())) {
+      ss << "pool of " << pgid << " is pending removal";
+      err = -ENOENT;
+      getline(ss, rs);
+      wait_for_finished_proposal(op,
+        new Monitor::C_Command(mon, op, err, rs, get_last_committed() + 1));
       return true;
     }
 
-    pending_inc.old_pg_upmap_items.insert(pgid);
-    ss << "clear " << pgid << " pg_upmap_items mapping";
+    enum {
+      OP_PG_UPMAP,
+      OP_RM_PG_UPMAP,
+      OP_PG_UPMAP_ITEMS,
+      OP_RM_PG_UPMAP_ITEMS,
+    } option;
+
+    if (prefix == "osd pg-upmap") {
+      option = OP_PG_UPMAP;
+    } else if (prefix == "osd rm-pg-upmap") {
+      option = OP_RM_PG_UPMAP;
+    } else if (prefix == "osd pg-upmap-items") {
+      option = OP_PG_UPMAP_ITEMS;
+    } else {
+      option = OP_RM_PG_UPMAP_ITEMS;
+    }
+
+    // check pending upmap changes
+    switch (option) {
+    case OP_PG_UPMAP: // fall through
+    case OP_RM_PG_UPMAP:
+      if (pending_inc.new_pg_upmap.count(pgid) ||
+          pending_inc.old_pg_upmap.count(pgid)) {
+        dout(10) << __func__ << " waiting for pending update on "
+                 << pgid << dendl;
+        wait_for_finished_proposal(op, new C_RetryMessage(this, op));
+        return true;
+      }
+      break;
+
+    case OP_PG_UPMAP_ITEMS: // fall through
+    case OP_RM_PG_UPMAP_ITEMS:
+      if (pending_inc.new_pg_upmap_items.count(pgid) ||
+          pending_inc.old_pg_upmap_items.count(pgid)) {
+        dout(10) << __func__ << " waiting for pending update on "
+                 << pgid << dendl;
+        wait_for_finished_proposal(op, new C_RetryMessage(this, op));
+        return true;
+      }
+      break;
+
+    default:
+      assert(0 == "invalid option");
+    }
+
+    switch (option) {
+    case OP_PG_UPMAP:
+      {
+        vector<int64_t> id_vec;
+        if (!cmd_getval(g_ceph_context, cmdmap, "id", id_vec)) {
+          ss << "unable to parse 'id' value(s) '"
+             << cmd_vartype_stringify(cmdmap["id"]) << "'";
+          err = -EINVAL;
+          goto reply;
+        }
+
+        int pool_min_size = osdmap.get_pg_pool_min_size(pgid);
+        if ((int)id_vec.size() < pool_min_size) {
+          ss << "num of osds (" << id_vec.size() <<") < pool min size ("
+             << pool_min_size << ")";
+          err = -EINVAL;
+          goto reply;
+        }
+
+        int pool_size = osdmap.get_pg_pool_size(pgid);
+        if ((int)id_vec.size() > pool_size) {
+          ss << "num of osds (" << id_vec.size() <<") > pool size ("
+             << pool_size << ")";
+          err = -EINVAL;
+          goto reply;
+        }
+
+        vector<int32_t> new_pg_upmap;
+        for (auto osd : id_vec) {
+          if (osd != CRUSH_ITEM_NONE && !osdmap.exists(osd)) {
+            ss << "osd." << osd << " does not exist";
+            err = -ENOENT;
+            goto reply;
+          }
+          auto it = std::find(new_pg_upmap.begin(), new_pg_upmap.end(), osd);
+          if (it != new_pg_upmap.end()) {
+            ss << "osd." << osd << " already exists, ";
+            continue;
+          }
+          new_pg_upmap.push_back(osd);
+        }
+
+        if (new_pg_upmap.empty()) {
+          ss << "no valid upmap items(pairs) is specified";
+          err = -EINVAL;
+          goto reply;
+        }
+
+        pending_inc.new_pg_upmap[pgid] = mempool::osdmap::vector<int32_t>(
+          new_pg_upmap.begin(), new_pg_upmap.end());
+        ss << "set " << pgid << " pg_upmap mapping to " << new_pg_upmap;
+      }
+      break;
+
+    case OP_RM_PG_UPMAP:
+      {
+        pending_inc.old_pg_upmap.insert(pgid);
+        ss << "clear " << pgid << " pg_upmap mapping";
+      }
+      break;
+
+    case OP_PG_UPMAP_ITEMS:
+      {
+        vector<int64_t> id_vec;
+        if (!cmd_getval(g_ceph_context, cmdmap, "id", id_vec)) {
+          ss << "unable to parse 'id' value(s) '"
+             << cmd_vartype_stringify(cmdmap["id"]) << "'";
+          err = -EINVAL;
+          goto reply;
+        }
+
+        if (id_vec.size() % 2) {
+          ss << "you must specify pairs of osd ids to be remapped";
+          err = -EINVAL;
+          goto reply;
+        }
+
+        int pool_size = osdmap.get_pg_pool_size(pgid);
+        if ((int)(id_vec.size() / 2) > pool_size) {
+          ss << "num of osd pairs (" << id_vec.size() / 2 <<") > pool size ("
+             << pool_size << ")";
+          err = -EINVAL;
+          goto reply;
+        }
+
+        vector<pair<int32_t,int32_t>> new_pg_upmap_items;
+        ostringstream items;
+        items << "[";
+        for (auto p = id_vec.begin(); p != id_vec.end(); ++p) {
+          int from = *p++;
+          int to = *p;
+          if (from == to) {
+            ss << "from osd." << from << " == to osd." << to << ", ";
+            continue;
+          }
+          if (!osdmap.exists(from)) {
+            ss << "osd." << from << " does not exist";
+            err = -ENOENT;
+            goto reply;
+          }
+          if (to != CRUSH_ITEM_NONE && !osdmap.exists(to)) {
+            ss << "osd." << to << " does not exist";
+            err = -ENOENT;
+            goto reply;
+          }
+          pair<int32_t,int32_t> entry = make_pair(from, to);
+          auto it = std::find(new_pg_upmap_items.begin(),
+            new_pg_upmap_items.end(), entry);
+          if (it != new_pg_upmap_items.end()) {
+            ss << "osd." << from << " -> osd." << to << " already exists, ";
+            continue;
+          }
+          new_pg_upmap_items.push_back(entry);
+          items << from << "->" << to << ",";
+        }
+        string out(items.str());
+        out.resize(out.size() - 1); // drop last ','
+        out += "]";
+
+        if (new_pg_upmap_items.empty()) {
+          ss << "no valid upmap items(pairs) is specified";
+          err = -EINVAL;
+          goto reply;
+        }
+
+        pending_inc.new_pg_upmap_items[pgid] =
+          mempool::osdmap::vector<pair<int32_t,int32_t>>(
+          new_pg_upmap_items.begin(), new_pg_upmap_items.end());
+        ss << "set " << pgid << " pg_upmap_items mapping to " << out;
+      }
+      break;
+
+    case OP_RM_PG_UPMAP_ITEMS:
+      {
+        pending_inc.old_pg_upmap_items.insert(pgid);
+        ss << "clear " << pgid << " pg_upmap_items mapping";
+      }
+      break;
+
+    default:
+      assert(0 == "invalid option");
+    }
+
     goto update;
   } else if (prefix == "osd primary-affinity") {
     int64_t id;
@@ -9079,7 +10357,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     wait_for_finished_proposal(
        op,
        new Monitor::C_Command(mon, op, 0, rs, rdata, get_last_committed() + 1));
-      return true;
+    return true;
   } else if (prefix == "osd lost") {
     int64_t id;
     if (!cmd_getval(g_ceph_context, cmdmap, "id", id)) {
@@ -9154,9 +10432,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
          << "really do.";
       err = -EPERM;
       goto reply;
-    } else if (is_destroy && !osdmap.exists(id)) {
+    } else if (!osdmap.exists(id)) {
       ss << "osd." << id << " does not exist";
-      err = -ENOENT;
+      err = 0; // idempotent
       goto reply;
     } else if (osdmap.is_up(id)) {
       ss << "osd." << id << " is not `down`.";
@@ -9206,25 +10484,25 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     // make sure authmon is writeable.
     if (!mon->authmon()->is_writeable()) {
       dout(10) << __func__ << " waiting for auth mon to be writeable for "
-               << "osd destroy" << dendl;
+               << "osd new" << dendl;
       mon->authmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
       return false;
     }
 
-    map<string,string> secrets_map;
+    map<string,string> param_map;
 
     bufferlist bl = m->get_data();
-    string secrets_json = bl.to_str();
-    dout(20) << __func__ << " osd new json = " << secrets_json << dendl;
+    string param_json = bl.to_str();
+    dout(20) << __func__ << " osd new json = " << param_json << dendl;
 
-    err = get_json_str_map(secrets_json, ss, &secrets_map);
+    err = get_json_str_map(param_json, ss, &param_map);
     if (err < 0)
       goto reply;
 
-    dout(20) << __func__ << " osd new secrets " << secrets_map << dendl;
+    dout(20) << __func__ << " osd new params " << param_map << dendl;
 
     paxos->plug();
-    err = prepare_command_osd_new(op, cmdmap, secrets_map, ss, f.get());
+    err = prepare_command_osd_new(op, cmdmap, param_map, ss, f.get());
     paxos->unplug();
 
     if (err < 0) {
@@ -9300,7 +10578,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    do_osd_create(id, uuid, &new_id);
+    string empty_device_class;
+    do_osd_create(id, uuid, empty_device_class, &new_id);
 
     if (f) {
       f->open_object_section("created_osd");
@@ -9344,10 +10623,21 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        utime_t expires = ceph_clock_now();
        double d;
        // default one hour
-       cmd_getval(g_ceph_context, cmdmap, "expire", d, double(60*60));
+       cmd_getval(g_ceph_context, cmdmap, "expire", d,
+          g_conf->mon_osd_blacklist_default_expire);
        expires += d;
 
        pending_inc.new_blacklist[addr] = expires;
+
+        {
+          // cancel any pending un-blacklisting request too
+          auto it = std::find(pending_inc.old_blacklist.begin(),
+            pending_inc.old_blacklist.end(), addr);
+          if (it != pending_inc.old_blacklist.end()) {
+            pending_inc.old_blacklist.erase(it);
+          }
+        }
+
        ss << "blacklisting " << addr << " until " << expires << " (" << d << " sec)";
        getline(ss, rs);
        wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
@@ -9463,7 +10753,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     string pool_type_str;
     cmd_getval(g_ceph_context, cmdmap, "pool_type", pool_type_str);
     if (pool_type_str.empty())
-      pool_type_str = pg_pool_t::get_default_type();
+      pool_type_str = g_conf->osd_pool_default_type;
 
     string poolstr;
     cmd_getval(g_ceph_context, cmdmap, "pool", poolstr);
@@ -9499,6 +10789,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     bool implicit_rule_creation = false;
+    int64_t expected_num_objects = 0;
     string rule_name;
     cmd_getval(g_ceph_context, cmdmap, "rule", rule_name);
     string erasure_code_profile;
@@ -9536,9 +10827,26 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
          rule_name = poolstr;
        }
       }
+      cmd_getval(g_ceph_context, cmdmap, "expected_num_objects",
+                 expected_num_objects, int64_t(0));
     } else {
       //NOTE:for replicated pool,cmd_map will put rule_name to erasure_code_profile field
-      rule_name = erasure_code_profile;
+      //     and put expected_num_objects to rule field
+      if (erasure_code_profile != "") { // cmd is from CLI
+        if (rule_name != "") {
+          string interr;
+          expected_num_objects = strict_strtoll(rule_name.c_str(), 10, &interr);
+          if (interr.length()) {
+            ss << "error parsing integer value '" << rule_name << "': " << interr;
+            err = -EINVAL;
+            goto reply;
+          }
+        }
+        rule_name = erasure_code_profile;
+      } else { // cmd is well-formed
+        cmd_getval(g_ceph_context, cmdmap, "expected_num_objects",
+                   expected_num_objects, int64_t(0));
+      }
     }
 
     if (!implicit_rule_creation && rule_name != "") {
@@ -9552,8 +10860,6 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        goto reply;
     }
 
-    int64_t expected_num_objects;
-    cmd_getval(g_ceph_context, cmdmap, "expected_num_objects", expected_num_objects, int64_t(0));
     if (expected_num_objects < 0) {
       ss << "'expected_num_objects' must be non-negative";
       err = -EINVAL;
@@ -10174,11 +11480,18 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     // val could contain unit designations, so we treat as a string
     string val;
     cmd_getval(g_ceph_context, cmdmap, "val", val);
-    stringstream tss;
-    int64_t value = unit_to_bytesize(val, &tss);
-    if (value < 0) {
-      ss << "error parsing value '" << value << "': " << tss.str();
-      err = value;
+    string tss;
+    int64_t value;
+    if (field == "max_objects") {
+      value = strict_sistrtoll(val.c_str(), &tss);
+    } else if (field == "max_bytes") {
+      value = strict_iecstrtoll(val.c_str(), &tss);
+    } else {
+      assert(0 == "unrecognized option");
+    }
+    if (!tss.empty()) {
+      ss << "error parsing value '" << val << "': " << tss;
+      err = -EINVAL;
       goto reply;
     }
 
@@ -10195,7 +11508,20 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
                                              get_last_committed() + 1));
     return true;
+  } else if (prefix == "osd pool application enable" ||
+             prefix == "osd pool application disable" ||
+             prefix == "osd pool application set" ||
+             prefix == "osd pool application rm") {
+    err = prepare_command_pool_application(prefix, cmdmap, ss);
+    if (err == -EAGAIN)
+      goto wait;
+    if (err < 0)
+      goto reply;
 
+    getline(ss, rs);
+    wait_for_finished_proposal(
+      op, new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1));
+    return true;
   } else if (prefix == "osd reweight-by-pg" ||
             prefix == "osd reweight-by-utilization" ||
             prefix == "osd test-reweight-by-pg" ||
@@ -10265,6 +11591,37 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        new Monitor::C_Command(mon, op, 0, rs, rdata, get_last_committed() + 1));
       return true;
     }
+  } else if (prefix == "osd force-create-pg") {
+    pg_t pgid;
+    string pgidstr;
+    cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
+    if (!pgid.parse(pgidstr.c_str())) {
+      ss << "invalid pgid '" << pgidstr << "'";
+      err = -EINVAL;
+      goto reply;
+    }
+    if (!osdmap.pg_exists(pgid)) {
+      ss << "pg " << pgid << " should not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+    bool creating_now;
+    {
+      std::lock_guard<std::mutex> l(creating_pgs_lock);
+      auto emplaced = creating_pgs.pgs.emplace(pgid,
+                                              make_pair(osdmap.get_epoch(),
+                                                        ceph_clock_now()));
+      creating_now = emplaced.second;
+    }
+    if (creating_now) {
+      ss << "pg " << pgidstr << " now creating, ok";
+      err = 0;
+      goto update;
+    } else {
+      ss << "pg " << pgid << " already creating";
+      err = 0;
+      goto reply;
+    }
   } else {
     err = -EINVAL;
   }
@@ -10287,11 +11644,61 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   return true;
 }
 
-bool OSDMonitor::preprocess_pool_op(MonOpRequestRef op) 
+bool OSDMonitor::enforce_pool_op_caps(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
+
   MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
-  
+  MonSession *session = m->get_session();
+  if (!session) {
+    _pool_op_reply(op, -EPERM, osdmap.get_epoch());
+    return true;
+  }
+
+  switch (m->op) {
+  case POOL_OP_CREATE_UNMANAGED_SNAP:
+  case POOL_OP_DELETE_UNMANAGED_SNAP:
+    {
+      const std::string* pool_name = nullptr;
+      const pg_pool_t *pg_pool = osdmap.get_pg_pool(m->pool);
+      if (pg_pool != nullptr) {
+        pool_name = &osdmap.get_pool_name(m->pool);
+      }
+
+      if (!is_unmanaged_snap_op_permitted(cct, mon->key_server,
+                                          session->entity_name, session->caps,
+                                          pool_name)) {
+        dout(0) << "got unmanaged-snap pool op from entity with insufficient "
+                << "privileges. message: " << *m  << std::endl
+                << "caps: " << session->caps << dendl;
+        _pool_op_reply(op, -EPERM, osdmap.get_epoch());
+        return true;
+      }
+    }
+    break;
+  default:
+    if (!session->is_capable("osd", MON_CAP_W)) {
+      dout(0) << "got pool op from entity with insufficient privileges. "
+              << "message: " << *m  << std::endl
+              << "caps: " << session->caps << dendl;
+      _pool_op_reply(op, -EPERM, osdmap.get_epoch());
+      return true;
+    }
+    break;
+  }
+
+  return false;
+}
+
+bool OSDMonitor::preprocess_pool_op(MonOpRequestRef op)
+{
+  op->mark_osdmon_event(__func__);
+  MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
+
+  if (enforce_pool_op_caps(op)) {
+    return true;
+  }
+
   if (m->fsid != mon->monmap->fsid) {
     dout(0) << __func__ << " drop message on fsid " << m->fsid
             << " != " << mon->monmap->fsid << " for " << *m << dendl;
@@ -10371,19 +11778,6 @@ bool OSDMonitor::preprocess_pool_op_create(MonOpRequestRef op)
 {
   op->mark_osdmon_event(__func__);
   MPoolOp *m = static_cast<MPoolOp*>(op->get_req());
-  MonSession *session = m->get_session();
-  if (!session) {
-    _pool_op_reply(op, -EPERM, osdmap.get_epoch());
-    return true;
-  }
-  if (!session->is_capable("osd", MON_CAP_W)) {
-    dout(5) << "attempt to create new pool without sufficient auid privileges!"
-           << "message: " << *m  << std::endl
-           << "caps: " << session->caps << dendl;
-    _pool_op_reply(op, -EPERM, osdmap.get_epoch());
-    return true;
-  }
-
   int64_t pool = osdmap.lookup_pg_pool_name(m->name.c_str());
   if (pool >= 0) {
     _pool_op_reply(op, 0, osdmap.get_epoch());
@@ -10510,6 +11904,10 @@ bool OSDMonitor::prepare_pool_op(MonOpRequestRef op)
 
   case POOL_OP_DELETE_UNMANAGED_SNAP:
     if (!pp.is_removed_snap(m->snapid)) {
+      if (m->snapid > pp.get_snap_seq()) {
+        _pool_op_reply(op, -ENOENT, osdmap.get_epoch());
+        return false;
+      }
       pp.remove_unmanaged_snap(m->snapid);
       changed = true;
     }
@@ -10551,7 +11949,7 @@ int OSDMonitor::_check_remove_pool(int64_t pool_id, const pg_pool_t& pool,
   const string& poolstr = osdmap.get_pool_name(pool_id);
 
   // If the Pool is in use by CephFS, refuse to delete it
-  FSMap const &pending_fsmap = mon->mdsmon()->get_pending();
+  FSMap const &pending_fsmap = mon->mdsmon()->get_pending_fsmap();
   if (pending_fsmap.pool_in_use(pool_id)) {
     *ss << "pool '" << poolstr << "' is in use by CephFS";
     return -EBUSY;
@@ -10600,7 +11998,7 @@ bool OSDMonitor::_check_become_tier(
   const std::string &tier_pool_name = osdmap.get_pool_name(tier_pool_id);
   const std::string &base_pool_name = osdmap.get_pool_name(base_pool_id);
 
-  const FSMap &pending_fsmap = mon->mdsmon()->get_pending();
+  const FSMap &pending_fsmap = mon->mdsmon()->get_pending_fsmap();
   if (pending_fsmap.pool_in_use(tier_pool_id)) {
     *ss << "pool '" << tier_pool_name << "' is in use by CephFS";
     *err = -EBUSY;
@@ -10660,12 +12058,15 @@ bool OSDMonitor::_check_remove_tier(
   const std::string &base_pool_name = osdmap.get_pool_name(base_pool_id);
 
   // Apply CephFS-specific checks
-  const FSMap &pending_fsmap = mon->mdsmon()->get_pending();
+  const FSMap &pending_fsmap = mon->mdsmon()->get_pending_fsmap();
   if (pending_fsmap.pool_in_use(base_pool_id)) {
-    if (base_pool->type != pg_pool_t::TYPE_REPLICATED) {
-      // If the underlying pool is erasure coded, we can't permit the
-      // removal of the replicated tier that CephFS relies on to access it
-      *ss << "pool '" << base_pool_name << "' is in use by CephFS via its tier";
+    if (base_pool->is_erasure() && !base_pool->allows_ecoverwrites()) {
+      // If the underlying pool is erasure coded and does not allow EC
+      // overwrites, we can't permit the removal of the replicated tier that
+      // CephFS relies on to access it
+      *ss << "pool '" << base_pool_name <<
+          "' does not allow EC overwrites and is in use by CephFS"
+          " via its tier";
       *err = -EBUSY;
       return false;
     }
@@ -10686,7 +12087,7 @@ bool OSDMonitor::_check_remove_tier(
 int OSDMonitor::_prepare_remove_pool(
   int64_t pool, ostream *ss, bool no_fake)
 {
-  dout(10) << "_prepare_remove_pool " << pool << dendl;
+  dout(10) << __func__ << " " << pool << dendl;
   const pg_pool_t *p = osdmap.get_pg_pool(pool);
   int r = _check_remove_pool(pool, *p, ss);
   if (r < 0)
@@ -10703,7 +12104,7 @@ int OSDMonitor::_prepare_remove_pool(
   }
 
   if (pending_inc.old_pools.count(pool)) {
-    dout(10) << "_prepare_remove_pool " << pool << " already pending removal"
+    dout(10) << __func__ << " " << pool << " already pending removal"
             << dendl;
     return 0;
   }
@@ -10720,25 +12121,82 @@ int OSDMonitor::_prepare_remove_pool(
   // remove
   pending_inc.old_pools.insert(pool);
 
-  // remove any pg_temp mappings for this pool too
+  // remove any pg_temp mappings for this pool
   for (auto p = osdmap.pg_temp->begin();
        p != osdmap.pg_temp->end();
        ++p) {
     if (p->first.pool() == (uint64_t)pool) {
-      dout(10) << "_prepare_remove_pool " << pool << " removing obsolete pg_temp "
+      dout(10) << __func__ << " " << pool << " removing obsolete pg_temp "
               << p->first << dendl;
       pending_inc.new_pg_temp[p->first].clear();
     }
   }
+  // remove any primary_temp mappings for this pool
   for (auto p = osdmap.primary_temp->begin();
       p != osdmap.primary_temp->end();
       ++p) {
     if (p->first.pool() == (uint64_t)pool) {
-      dout(10) << "_prepare_remove_pool " << pool
+      dout(10) << __func__ << " " << pool
                << " removing obsolete primary_temp" << p->first << dendl;
       pending_inc.new_primary_temp[p->first] = -1;
     }
   }
+  // remove any pg_upmap mappings for this pool
+  for (auto& p : osdmap.pg_upmap) {
+    if (p.first.pool() == (uint64_t)pool) {
+      dout(10) << __func__ << " " << pool
+               << " removing obsolete pg_upmap "
+               << p.first << dendl;
+      pending_inc.old_pg_upmap.insert(p.first);
+    }
+  }
+  // remove any pending pg_upmap mappings for this pool
+  {
+    auto it = pending_inc.new_pg_upmap.begin();
+    while (it != pending_inc.new_pg_upmap.end()) {
+      if (it->first.pool() == (uint64_t)pool) {
+        dout(10) << __func__ << " " << pool
+                 << " removing pending pg_upmap "
+                 << it->first << dendl;
+        it = pending_inc.new_pg_upmap.erase(it);
+      } else {
+        it++;
+      }
+    }
+  }
+  // remove any pg_upmap_items mappings for this pool
+  for (auto& p : osdmap.pg_upmap_items) {
+    if (p.first.pool() == (uint64_t)pool) {
+      dout(10) << __func__ << " " << pool
+               << " removing obsolete pg_upmap_items " << p.first
+               << dendl;
+      pending_inc.old_pg_upmap_items.insert(p.first);
+    }
+  }
+  // remove any pending pg_upmap mappings for this pool
+  {
+    auto it = pending_inc.new_pg_upmap_items.begin();
+    while (it != pending_inc.new_pg_upmap_items.end()) {
+      if (it->first.pool() == (uint64_t)pool) {
+        dout(10) << __func__ << " " << pool
+                 << " removing pending pg_upmap_items "
+                 << it->first << dendl;
+        it = pending_inc.new_pg_upmap_items.erase(it);
+      } else {
+        it++;
+      }
+    }
+  }
+
+  // remove any choose_args for this pool
+  CrushWrapper newcrush;
+  _get_pending_crush(newcrush);
+  if (newcrush.have_choose_args(pool)) {
+    dout(10) << __func__ << " removing choose_args for pool " << pool << dendl;
+    newcrush.rm_choose_args(pool);
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+  }
   return 0;
 }