]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/OSDMonitor.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mon / OSDMonitor.cc
index 927b8b8af92c2832e0aca57c674d2a31bc5584aa..b01dc4b72ebb7ec48466e9b6ab3d6a0fc349dff5 100644 (file)
@@ -214,6 +214,16 @@ struct OSDMemCache : public PriorityCache::PriCache {
   virtual void set_cache_ratio(double ratio) {
     cache_ratio = ratio;
   }
+  virtual void shift_bins() {
+  }
+  virtual void import_bins(const std::vector<uint64_t> &bins) {
+  }
+  virtual void set_bins(PriorityCache::Priority pri, uint64_t end_bin) {
+  }
+  virtual uint64_t get_bins(PriorityCache::Priority pri) const {
+    return 0;
+  }
+
   virtual string get_cache_name() const = 0;
 };
 
@@ -619,7 +629,7 @@ CrushWrapper &OSDMonitor::_get_stable_crush()
   return *osdmap.crush;
 }
 
-void OSDMonitor::_get_pending_crush(CrushWrapper& newcrush)
+CrushWrapper OSDMonitor::_get_pending_crush()
 {
   bufferlist bl;
   if (pending_inc.crush.length())
@@ -628,7 +638,9 @@ void OSDMonitor::_get_pending_crush(CrushWrapper& newcrush)
     osdmap.crush->encode(bl, CEPH_FEATURES_SUPPORTED_DEFAULT);
 
   auto p = bl.cbegin();
-  newcrush.decode(p);
+  CrushWrapper crush;
+  crush.decode(p);
+  return crush;
 }
 
 void OSDMonitor::create_initial()
@@ -664,26 +676,23 @@ void OSDMonitor::create_initial()
   if (newmap.nearfull_ratio > 1.0) newmap.nearfull_ratio /= 100;
 
   // new cluster should require latest by default
-  if (g_conf().get_val<bool>("mon_debug_no_require_pacific")) {
-    if (g_conf().get_val<bool>("mon_debug_no_require_octopus")) {
-      derr << __func__ << " mon_debug_no_require_pacific and octopus=true" << dendl;
+  if (g_conf().get_val<bool>("mon_debug_no_require_quincy")) {
+    if (g_conf().get_val<bool>("mon_debug_no_require_pacific")) {
+      derr << __func__ << " mon_debug_no_require_quincy and pacific=true" << dendl;
       newmap.require_osd_release = ceph_release_t::nautilus;
     } else {
-      derr << __func__ << " mon_debug_no_require_pacific=true" << dendl;
-      newmap.require_osd_release = ceph_release_t::octopus;
+      derr << __func__ << " mon_debug_no_require_quincy=true" << dendl;
+      newmap.require_osd_release = ceph_release_t::pacific;
     }
   } else {
-    newmap.require_osd_release = ceph_release_t::pacific;
+    newmap.require_osd_release = ceph_release_t::quincy;
   }
 
-  if (newmap.require_osd_release >= ceph_release_t::octopus) {
-    ceph_release_t r = ceph_release_from_name(
-      g_conf()->mon_osd_initial_require_min_compat_client);
-    if (!r) {
-      ceph_abort_msg("mon_osd_initial_require_min_compat_client is not valid");
-    }
-    newmap.require_min_compat_client = r;
+  ceph_release_t r = ceph_release_from_name(g_conf()->mon_osd_initial_require_min_compat_client);
+  if (!r) {
+    ceph_abort_msg("mon_osd_initial_require_min_compat_client is not valid");
   }
+  newmap.require_min_compat_client = r;
 
   // encode into pending incremental
   uint64_t features = newmap.get_encoding_features();
@@ -895,10 +904,6 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
        // could be marked up *or* down, but we're too lazy to check which
        last_osd_report.erase(osd);
       }
-      if (state & CEPH_OSD_OUT) {
-        // could be marked in *or* out, but we can safely drop it
-        osd_epochs.erase(osd);
-      }
     }
     for (const auto [osd, weight] : inc.new_weight) {
       if (weight == CEPH_OSD_OUT) {
@@ -1181,40 +1186,6 @@ void OSDMonitor::create_pending()
              << pending_inc.new_nearfull_ratio << dendl;
     }
   }
-
-  // Rewrite CRUSH rule IDs if they are using legacy "ruleset"
-  // structure.
-  if (osdmap.crush->has_legacy_rule_ids()) {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
-
-    // First, for all pools, work out which rule they really used
-    // by resolving ruleset to rule.
-    for (const auto &i : osdmap.get_pools()) {
-      const auto pool_id = i.first;
-      const auto &pool = i.second;
-      int new_rule_id = newcrush.find_rule(pool.crush_rule,
-                                          pool.type, pool.size);
-
-      dout(1) << __func__ << " rewriting pool "
-             << osdmap.get_pool_name(pool_id) << " crush ruleset "
-             << pool.crush_rule << " -> rule id " << new_rule_id << dendl;
-      if (pending_inc.new_pools.count(pool_id) == 0) {
-       pending_inc.new_pools[pool_id] = pool;
-      }
-      pending_inc.new_pools[pool_id].crush_rule = new_rule_id;
-    }
-
-    // Now, go ahead and renumber all the rules so that their
-    // rule_id field corresponds to their position in the array
-    auto old_to_new = newcrush.renumber_rules();
-    dout(1) << __func__ << " Rewrote " << old_to_new << " crush IDs:" << dendl;
-    for (const auto &i : old_to_new) {
-      dout(1) << __func__ << " " << i.first << " -> " << i.second << dendl;
-    }
-    pending_inc.crush.clear();
-    newcrush.encode(pending_inc.crush, mon.get_quorum_con_features());
-  }
 }
 
 creating_pgs_t
@@ -2059,12 +2030,26 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   // metadata, too!
   for (map<int,bufferlist>::iterator p = pending_metadata.begin();
        p != pending_metadata.end();
-       ++p)
+       ++p) {
+    Metadata m;
+    auto mp = p->second.cbegin();
+    decode(m, mp);
+    auto it = m.find("osd_objectstore");
+    if (it != m.end()) {
+      if (it->second == "filestore") {
+        filestore_osds.insert(p->first);
+      } else {
+        filestore_osds.erase(p->first);
+      }
+    }
     t->put(OSD_METADATA_PREFIX, stringify(p->first), p->second);
+  }
   for (set<int>::iterator p = pending_metadata_rm.begin();
        p != pending_metadata_rm.end();
-       ++p)
+       ++p) {
+    filestore_osds.erase(*p);
     t->erase(OSD_METADATA_PREFIX, stringify(*p));
+  }
   pending_metadata.clear();
   pending_metadata_rm.clear();
 
@@ -2097,6 +2082,8 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   // health
   health_check_map_t next;
   tmp.check_health(cct, &next);
+  // OSD_FILESTORE
+  check_for_filestore_osds(&next);
   encode_health(next, t);
 }
 
@@ -2172,6 +2159,39 @@ int OSDMonitor::get_osd_objectstore_type(int osd, string *type)
   return 0;
 }
 
+void OSDMonitor::get_filestore_osd_list()
+{
+  for (unsigned osd = 0; osd < osdmap.get_num_osds(); ++osd) {
+    string objectstore_type;
+    int r = get_osd_objectstore_type(osd, &objectstore_type);
+    if (r == 0 && objectstore_type == "filestore") {
+      filestore_osds.insert(osd);
+    }
+  }
+}
+
+void OSDMonitor::check_for_filestore_osds(health_check_map_t *checks)
+{
+  if (g_conf()->mon_warn_on_filestore_osds &&
+      filestore_osds.size() > 0) {
+    ostringstream ss, deprecated_tip;
+    list<string> detail;
+    ss << filestore_osds.size()
+       << " osd(s) "
+       << (filestore_osds.size() == 1 ? "is" : "are")
+       << " running Filestore";
+    deprecated_tip << ss.str();
+    ss << " [Deprecated]";
+    auto& d = checks->add("OSD_FILESTORE", HEALTH_WARN, ss.str(),
+                          filestore_osds.size());
+    deprecated_tip << ", which has been deprecated and"
+                   << " not been optimized for QoS"
+                   << " (Filestore OSDs will use 'osd_op_queue = wpq' strictly)";
+    detail.push_back(deprecated_tip.str());
+    d.detail.swap(detail);
+  }
+}
+
 bool OSDMonitor::is_pool_currently_all_bluestore(int64_t pool_id,
                                                 const pg_pool_t &pool,
                                                 ostream *err)
@@ -3498,43 +3518,15 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
 
   ceph_assert(m->get_orig_source_inst().name.is_osd());
 
-  // force all osds to have gone through luminous prior to upgrade to nautilus
-  {
-    vector<string> missing;
-    if (!HAVE_FEATURE(m->osd_features, SERVER_LUMINOUS)) {
-      missing.push_back("CEPH_FEATURE_SERVER_LUMINOUS");
-    }
-    if (!HAVE_FEATURE(m->osd_features, SERVER_JEWEL)) {
-      missing.push_back("CEPH_FEATURE_SERVER_JEWEL");
-    }
-    if (!HAVE_FEATURE(m->osd_features, SERVER_KRAKEN)) {
-      missing.push_back("CEPH_FEATURE_SERVER_KRAKEN");
-    }
-    if (!HAVE_FEATURE(m->osd_features, OSD_RECOVERY_DELETES)) {
-      missing.push_back("CEPH_FEATURE_OSD_RECOVERY_DELETES");
-    }
-
-    if (!missing.empty()) {
-      using std::experimental::make_ostream_joiner;
-
-      stringstream ss;
-      copy(begin(missing), end(missing), make_ostream_joiner(ss, ";"));
-
-      mon.clog->info() << "disallowing boot of OSD "
-                       << m->get_orig_source_inst()
-                       << " because the osd lacks " << ss.str();
-      goto ignore;
-    }
+  // lower bound of N-2
+  if (!HAVE_FEATURE(m->osd_features, SERVER_OCTOPUS)) {
+    mon.clog->info() << "disallowing boot of OSD "
+                    << m->get_orig_source_inst()
+                    << " because the osd lacks CEPH_FEATURE_SERVER_OCTOPUS";
+    goto ignore;
   }
 
   // make sure osd versions do not span more than 3 releases
-  if (HAVE_FEATURE(m->osd_features, SERVER_OCTOPUS) &&
-      osdmap.require_osd_release < ceph_release_t::mimic) {
-    mon.clog->info() << "disallowing boot of octopus+ OSD "
-                     << m->get_orig_source_inst()
-                     << " because require_osd_release < mimic";
-    goto ignore;
-  }
   if (HAVE_FEATURE(m->osd_features, SERVER_PACIFIC) &&
       osdmap.require_osd_release < ceph_release_t::nautilus) {
     mon.clog->info() << "disallowing boot of pacific+ OSD "
@@ -3542,15 +3534,11 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
                      << " because require_osd_release < nautilus";
     goto ignore;
   }
-
-  // The release check here is required because for OSD_PGLOG_HARDLIMIT,
-  // we are reusing a jewel feature bit that was retired in luminous.
-  if (osdmap.require_osd_release >= ceph_release_t::luminous &&
-      osdmap.test_flag(CEPH_OSDMAP_PGLOG_HARDLIMIT) &&
-      !(m->osd_features & CEPH_FEATURE_OSD_PGLOG_HARDLIMIT)) {
-    mon.clog->info() << "disallowing boot of OSD "
+  if (HAVE_FEATURE(m->osd_features, SERVER_QUINCY) &&
+      osdmap.require_osd_release < ceph_release_t::octopus) {
+    mon.clog->info() << "disallowing boot of quincy+ OSD "
                      << m->get_orig_source_inst()
-                     << " because 'pglog_hardlimit' osdmap flag is set and OSD lacks the OSD_PGLOG_HARDLIMIT feature";
+                     << " because require_osd_release < octopus";
     goto ignore;
   }
 
@@ -4957,8 +4945,7 @@ unsigned OSDMonitor::scan_for_creating_pgs(
       continue;
     }
     const pg_pool_t& pool = p.second;
-    int ruleno = osdmap.crush->find_rule(pool.get_crush_rule(),
-                                        pool.get_type(), pool.get_size());
+    int ruleno = pool.get_crush_rule();
     if (ruleno < 0 || !osdmap.crush->rule_exists(ruleno))
       continue;
 
@@ -5425,7 +5412,7 @@ namespace {
     CSUM_TYPE, CSUM_MAX_BLOCK, CSUM_MIN_BLOCK, FINGERPRINT_ALGORITHM,
     PG_AUTOSCALE_MODE, PG_NUM_MIN, TARGET_SIZE_BYTES, TARGET_SIZE_RATIO,
     PG_AUTOSCALE_BIAS, DEDUP_TIER, DEDUP_CHUNK_ALGORITHM, 
-    DEDUP_CDC_CHUNK_SIZE };
+    DEDUP_CDC_CHUNK_SIZE, POOL_EIO, BULK, PG_NUM_MAX };
 
   std::set<osd_pool_get_choices>
     subtract_second_from_first(const std::set<osd_pool_get_choices>& first,
@@ -5465,8 +5452,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   string prefix;
   cmd_getval(cmdmap, "prefix", prefix);
 
-  string format;
-  cmd_getval(cmdmap, "format", format, string("plain"));
+  string format = cmd_getval_or<string>(cmdmap, "format", "plain");
   boost::scoped_ptr<Formatter> f(Formatter::create(format));
 
   if (prefix == "osd stat") {
@@ -5489,11 +5475,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
           prefix == "osd ls-tree" ||
           prefix == "osd info") {
 
-    epoch_t epoch = 0;
-    int64_t epochnum;
-    cmd_getval(cmdmap, "epoch", epochnum, (int64_t)osdmap.get_epoch());
-    epoch = epochnum;
-    
+    epoch_t epoch = cmd_getval_or<int64_t>(cmdmap, "epoch", osdmap.get_epoch());
     bufferlist osdmap_bl;
     int err = get_version_full(epoch, osdmap_bl);
     if (err == -ENOENT) {
@@ -6119,7 +6101,9 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       {"size", SIZE},
       {"min_size", MIN_SIZE},
       {"pg_num", PG_NUM}, {"pgp_num", PGP_NUM},
-      {"crush_rule", CRUSH_RULE}, {"hashpspool", HASHPSPOOL},
+      {"crush_rule", CRUSH_RULE},
+      {"hashpspool", HASHPSPOOL},
+      {"eio", POOL_EIO},
       {"allow_ec_overwrites", EC_OVERWRITES}, {"nodelete", NODELETE},
       {"nopgchange", NOPGCHANGE}, {"nosizechange", NOSIZECHANGE},
       {"noscrub", NOSCRUB}, {"nodeep-scrub", NODEEP_SCRUB},
@@ -6157,12 +6141,14 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       {"fingerprint_algorithm", FINGERPRINT_ALGORITHM},
       {"pg_autoscale_mode", PG_AUTOSCALE_MODE},
       {"pg_num_min", PG_NUM_MIN},
+      {"pg_num_max", PG_NUM_MAX},
       {"target_size_bytes", TARGET_SIZE_BYTES},
       {"target_size_ratio", TARGET_SIZE_RATIO},
       {"pg_autoscale_bias", PG_AUTOSCALE_BIAS},
       {"dedup_tier", DEDUP_TIER},
       {"dedup_chunk_algorithm", DEDUP_CHUNK_ALGORITHM},
       {"dedup_cdc_chunk_size", DEDUP_CDC_CHUNK_SIZE},
+      {"bulk", BULK}
     };
 
     typedef std::set<osd_pool_get_choices> choices_set_t;
@@ -6278,7 +6264,9 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
                             p->pg_autoscale_mode));
            break;
          case HASHPSPOOL:
+         case POOL_EIO:
          case NODELETE:
+         case BULK:
          case NOPGCHANGE:
          case NOSIZECHANGE:
          case WRITE_FADVISE_DONTNEED:
@@ -6383,6 +6371,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          case CSUM_MIN_BLOCK:
          case FINGERPRINT_ALGORITHM:
          case PG_NUM_MIN:
+         case PG_NUM_MAX:
          case TARGET_SIZE_BYTES:
          case TARGET_SIZE_RATIO:
          case PG_AUTOSCALE_BIAS:
@@ -6505,7 +6494,9 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
              "\n";
            break;
          case HASHPSPOOL:
+         case POOL_EIO:
          case NODELETE:
+         case BULK:
          case NOPGCHANGE:
          case NOSIZECHANGE:
          case WRITE_FADVISE_DONTNEED:
@@ -6543,6 +6534,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          case CSUM_MIN_BLOCK:
          case FINGERPRINT_ALGORITHM:
          case PG_NUM_MIN:
+         case PG_NUM_MAX:
          case TARGET_SIZE_BYTES:
          case TARGET_SIZE_RATIO:
          case PG_AUTOSCALE_BIAS:
@@ -6586,6 +6578,11 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     }
     const pg_pool_t *p = osdmap.get_pg_pool(poolid);
     const pool_stat_t* pstat = mon.mgrstatmon()->get_pool_stat(poolid);
+    if (!pstat) {
+      ss << "no stats for pool '" << pool_name << "'";
+      r = -ENOENT;
+      goto reply;
+    }
     const object_stat_sum_t& sum = pstat->stats.sum;
     if (f) {
       f->open_object_section("pool_quotas");
@@ -6705,9 +6702,14 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     rs << "\n";
     rdata.append(rs.str());
   } else if (prefix == "osd crush tree") {
-    string shadow;
-    cmd_getval(cmdmap, "shadow", shadow);
-    bool show_shadow = shadow == "--show-shadow";
+    bool show_shadow = false;
+    if (!cmd_getval_compat_cephbool(cmdmap, "show_shadow", show_shadow)) {
+      std::string shadow;
+      if (cmd_getval(cmdmap, "shadow", shadow) &&
+         shadow == "--show-shadow") {
+       show_shadow = true;
+      }
+    }
     boost::scoped_ptr<Formatter> f(Formatter::create(format));
     if (f) {
       f->open_object_section("crush_tree");
@@ -7323,11 +7325,12 @@ int OSDMonitor::prepare_new_pool(MonOpRequestRef op)
   string erasure_code_profile;
   stringstream ss;
   string rule_name;
+  bool bulk = false;
   int ret = 0;
   ret = prepare_new_pool(m->name, m->crush_rule, rule_name,
-                        0, 0, 0, 0, 0, 0.0,
+                        0, 0, 0, 0, 0, 0, 0.0,
                         erasure_code_profile,
-                        pg_pool_t::TYPE_REPLICATED, 0, FAST_READ_OFF, {},
+                        pg_pool_t::TYPE_REPLICATED, 0, FAST_READ_OFF, {}, bulk,
                         &ss);
 
   if (ret < 0) {
@@ -7353,8 +7356,7 @@ int OSDMonitor::crush_rename_bucket(const string& srcname,
       return ret;
   }
 
-  CrushWrapper newcrush;
-  _get_pending_crush(newcrush);
+  CrushWrapper newcrush = _get_pending_crush();
 
   ret = newcrush.rename_bucket(srcname,
                               dstname,
@@ -7415,7 +7417,7 @@ int OSDMonitor::normalize_profile(const string& profilename,
   auto it = profile.find("stripe_unit");
   if (it != profile.end()) {
     string err_str;
-    uint32_t stripe_unit = strict_iecstrtoll(it->second.c_str(), &err_str);
+    uint32_t stripe_unit = strict_iecstrtoll(it->second, &err_str);
     if (!err_str.empty()) {
       *ss << "could not parse stripe_unit '" << it->second
          << "': " << err_str << std::endl;
@@ -7445,16 +7447,15 @@ int OSDMonitor::crush_rule_create_erasure(const string &name,
 {
   int ruleid = osdmap.crush->get_rule_id(name);
   if (ruleid != -ENOENT) {
-    *rule = osdmap.crush->get_rule_mask_ruleset(ruleid);
+    *rule = ruleid;
     return -EEXIST;
   }
 
-  CrushWrapper newcrush;
-  _get_pending_crush(newcrush);
+  CrushWrapper newcrush = _get_pending_crush();
 
   ruleid = newcrush.get_rule_id(name);
   if (ruleid != -ENOENT) {
-    *rule = newcrush.get_rule_mask_ruleset(ruleid);
+    *rule = ruleid;
     return -EALREADY;
   } else {
     ErasureCodeInterfaceRef erasure_code;
@@ -7714,7 +7715,7 @@ int OSDMonitor::prepare_pool_stripe_width(const unsigned pool_type,
       auto it = profile.find("stripe_unit");
       if (it != profile.end()) {
        string err_str;
-       stripe_unit = strict_iecstrtoll(it->second.c_str(), &err_str);
+       stripe_unit = strict_iecstrtoll(it->second, &err_str);
        ceph_assert(err_str.empty());
       }
       *stripe_width = data_chunks *
@@ -7783,7 +7784,7 @@ int OSDMonitor::prepare_pool_crush_rule(const unsigned pool_type,
            *crush_rule = get_replicated_stretch_crush_rule();
          } else {
            // Use default rule
-           *crush_rule = osdmap.crush->get_osd_pool_default_crush_replicated_ruleset(cct);
+           *crush_rule = osdmap.crush->get_osd_pool_default_crush_replicated_rule(cct);
          }
          if (*crush_rule < 0) {
            // Errors may happen e.g. if no valid rule is available
@@ -7823,7 +7824,7 @@ int OSDMonitor::prepare_pool_crush_rule(const unsigned pool_type,
       return -EINVAL;
     }
   } else {
-    if (!osdmap.crush->ruleset_exists(*crush_rule)) {
+    if (!osdmap.crush->rule_exists(*crush_rule)) {
       *ss << "CRUSH rule " << *crush_rule << " not found";
       return -ENOENT;
     }
@@ -7842,8 +7843,7 @@ int OSDMonitor::get_crush_rule(const string &rule_name,
     // found it, use it
     *crush_rule = ret;
   } else {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     ret = newcrush.get_rule_id(rule_name);
     if (ret != -ENOENT) {
@@ -7860,22 +7860,56 @@ int OSDMonitor::get_crush_rule(const string &rule_name,
   return 0;
 }
 
-int OSDMonitor::check_pg_num(int64_t pool, int pg_num, int size, ostream *ss)
+int OSDMonitor::check_pg_num(int64_t pool, int pg_num, int size, int crush_rule, ostream *ss)
 {
   auto max_pgs_per_osd = g_conf().get_val<uint64_t>("mon_max_pg_per_osd");
-  auto num_osds = std::max(osdmap.get_num_in_osds(), 3u);   // assume min cluster size 3
-  auto max_pgs = max_pgs_per_osd * num_osds;
   uint64_t projected = 0;
+  unsigned osd_num = 0;
+  // assume min cluster size 3
+  auto num_osds = std::max(osdmap.get_num_in_osds(), 3u);
   if (pool < 0) {
+    // a new pool
     projected += pg_num * size;
   }
-  for (const auto& i : osdmap.get_pools()) {
-    if (i.first == pool) {
+  if (mapping.get_epoch() >= osdmap.get_epoch()) {
+    set<int> roots;
+    CrushWrapper newcrush = _get_pending_crush();
+    newcrush.find_takes_by_rule(crush_rule, &roots);
+    int max_osd = osdmap.get_max_osd();
+    for (auto root : roots) {
+      const char *rootname = newcrush.get_item_name(root);
+      set<int> osd_ids;
+      newcrush.get_leaves(rootname, &osd_ids);
+      unsigned out_osd = 0;
+      for (auto id : osd_ids) {
+       if (id > max_osd) { 
+         out_osd++;
+         continue;
+       }
+       projected += mapping.get_osd_acting_pgs(id).size();
+      }
+      osd_num += osd_ids.size() - out_osd;
+    }
+    if (pool >= 0) {   
+      // update an existing pool's pg num
+      const auto& pg_info = osdmap.get_pools().at(pool);    
+      // already counted the pgs of this `pool` by iterating crush map, so 
+      // remove them using adding the specified pg num
       projected += pg_num * size;
-    } else {
-      projected += i.second.get_pg_num_target() * i.second.get_size();
+      projected -= pg_info.get_pg_num_target() * pg_info.get_size();
+    }
+    num_osds = std::max(osd_num, 3u);  // assume min cluster size 3
+  } else {
+    // use pg_num target for evaluating the projected pg num
+    for (const auto& [pool_id, pool_info] : osdmap.get_pools()) {
+      if (pool_id == pool) {
+       projected += pg_num * size;
+      } else {
+       projected += pool_info.get_pg_num_target() * pool_info.get_size();
+      }
     }
   }
+  auto max_pgs = max_pgs_per_osd * num_osds;
   if (projected > max_pgs) {
     if (pool >= 0) {
       *ss << "pool id " << pool;
@@ -7896,6 +7930,8 @@ int OSDMonitor::check_pg_num(int64_t pool, int pg_num, int size, ostream *ss)
  * @param crush_rule_name The crush rule to use, if crush_rulset <0
  * @param pg_num The pg_num to use. If set to 0, will use the system default
  * @param pgp_num The pgp_num to use. If set to 0, will use the system default
+ * @param pg_num_min min pg_num
+ * @param pg_num_max max pg_num
  * @param repl_size Replication factor, or 0 for default
  * @param erasure_code_profile The profile name in OSDMap to be used for erasure code
  * @param pool_type TYPE_ERASURE, or TYPE_REP
@@ -7910,6 +7946,7 @@ int OSDMonitor::prepare_new_pool(string& name,
                                 const string &crush_rule_name,
                                  unsigned pg_num, unsigned pgp_num,
                                 unsigned pg_num_min,
+                                unsigned pg_num_max,
                                  const uint64_t repl_size,
                                 const uint64_t target_size_bytes,
                                 const float target_size_ratio,
@@ -7918,12 +7955,22 @@ int OSDMonitor::prepare_new_pool(string& name,
                                  const uint64_t expected_num_objects,
                                  FastReadType fast_read,
                                 const string& pg_autoscale_mode,
+                                bool bulk,
                                 ostream *ss)
 {
   if (name.length() == 0)
     return -EINVAL;
-  if (pg_num == 0)
-    pg_num = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
+  if (pg_num == 0) {
+    auto pg_num_from_mode =
+      [pg_num=g_conf().get_val<uint64_t>("osd_pool_default_pg_num")]
+      (const string& mode) {
+      return mode == "on" ? 1 : pg_num;
+    };
+    pg_num = pg_num_from_mode(
+      pg_autoscale_mode.empty() ?
+      g_conf().get_val<string>("osd_pool_default_pg_autoscale_mode") :
+      pg_autoscale_mode);
+  }
   if (pgp_num == 0)
     pgp_num = g_conf().get_val<uint64_t>("osd_pool_default_pgp_num");
   if (!pgp_num)
@@ -7950,14 +7997,21 @@ int OSDMonitor::prepare_new_pool(string& name,
     dout(10) << "prepare_pool_crush_rule returns " << r << dendl;
     return r;
   }
+  unsigned size, min_size;
+  r = prepare_pool_size(pool_type, erasure_code_profile, repl_size,
+                        &size, &min_size, ss);
+  if (r) {
+    dout(10) << "prepare_pool_size returns " << r << dendl;
+    return r;
+  }
   if (g_conf()->mon_osd_crush_smoke_test) {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     ostringstream err;
     CrushTester tester(newcrush, err);
     tester.set_min_x(0);
     tester.set_max_x(50);
     tester.set_rule(crush_rule);
+    tester.set_num_rep(size);
     auto start = ceph::coarse_mono_clock::now();
     r = tester.test_with_fork(g_conf()->mon_lease);
     auto duration = ceph::coarse_mono_clock::now() - start;
@@ -7970,20 +8024,14 @@ int OSDMonitor::prepare_new_pool(string& name,
     dout(10) << __func__ << " crush smoke test duration: "
              << duration << dendl;
   }
-  unsigned size, min_size;
-  r = prepare_pool_size(pool_type, erasure_code_profile, repl_size,
-                        &size, &min_size, ss);
-  if (r) {
-    dout(10) << "prepare_pool_size returns " << r << dendl;
-    return r;
-  }
-  r = check_pg_num(-1, pg_num, size, ss);
+  r = check_pg_num(-1, pg_num, size, crush_rule, ss);
   if (r) {
     dout(10) << "check_pg_num returns " << r << dendl;
     return r;
   }
 
-  if (!osdmap.crush->check_crush_rule(crush_rule, pool_type, size, *ss)) {
+  if (osdmap.crush->get_rule_type(crush_rule) != (int)pool_type) {
+    *ss << "crush rule " << crush_rule << " type does not match pool";
     return -EINVAL;
   }
 
@@ -8028,6 +8076,11 @@ int OSDMonitor::prepare_new_pool(string& name,
   pi->type = pool_type;
   pi->fast_read = fread; 
   pi->flags = g_conf()->osd_pool_default_flags;
+  if (bulk) {
+    pi->set_flag(pg_pool_t::FLAG_BULK);
+  } else if (g_conf()->osd_pool_default_flag_bulk) {
+      pi->set_flag(pg_pool_t::FLAG_BULK);
+  }
   if (g_conf()->osd_pool_default_flag_hashpspool)
     pi->set_flag(pg_pool_t::FLAG_HASHPSPOOL);
   if (g_conf()->osd_pool_default_flag_nodelete)
@@ -8082,6 +8135,10 @@ int OSDMonitor::prepare_new_pool(string& name,
       pg_num_min) {
     pi->opts.set(pool_opts_t::PG_NUM_MIN, static_cast<int64_t>(pg_num_min));
   }
+  if (osdmap.require_osd_release >= ceph_release_t::quincy &&
+      pg_num_max) {
+    pi->opts.set(pool_opts_t::PG_NUM_MAX, static_cast<int64_t>(pg_num_max));
+  }
   if (auto m = pg_pool_t::get_pg_autoscale_mode_by_name(
        pg_autoscale_mode); m != pg_pool_t::pg_autoscale_mode_t::UNKNOWN) {
     pi->pg_autoscale_mode = m;
@@ -8189,9 +8246,9 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
     "csum_min_block",
   };
   if (count(begin(si_options), end(si_options), var)) {
-    n = strict_si_cast<int64_t>(val.c_str(), &interr);
+    n = strict_si_cast<int64_t>(val, &interr);
   } else if (count(begin(iec_options), end(iec_options), var)) {
-    n = strict_iec_cast<int64_t>(val.c_str(), &interr);
+    n = strict_iec_cast<int64_t>(val, &interr);
   } else {
     // parse string as both int and float; different fields use different types.
     n = strict_strtoll(val.c_str(), 10, &interr);
@@ -8241,10 +8298,11 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
        return -EPERM;
       }
     }
-    if (!osdmap.crush->check_crush_rule(p.get_crush_rule(), p.type, n, ss)) {
+    if (osdmap.crush->get_rule_type(p.get_crush_rule()) != (int)p.type) {
+      ss << "crush rule " << p.get_crush_rule() << " type does not match pool";
       return -EINVAL;
     }
-    int r = check_pg_num(pool, p.get_pg_num(), n, &ss);
+    int r = check_pg_num(pool, p.get_pg_num(), n, p.get_crush_rule(), &ss);
     if (r < 0) {
       return r;
     }
@@ -8350,7 +8408,7 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
       return -ERANGE;
     }
     if (n > (int)p.get_pg_num_target()) {
-      int r = check_pg_num(pool, n, p.get_size(), &ss);
+      int r = check_pg_num(pool, n, p.get_size(), p.get_crush_rule(), &ss);
       if (r) {
        return r;
       }
@@ -8366,6 +8424,19 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
        return -EPERM;
       }
     }
+    int64_t pg_min = 0, pg_max = 0;
+    p.opts.get(pool_opts_t::PG_NUM_MIN, &pg_min);
+    p.opts.get(pool_opts_t::PG_NUM_MAX, &pg_max);
+    if (pg_min && n < pg_min) {
+      ss << "specified pg_num " << n
+        << " < pg_num_min " << pg_min;
+      return -EINVAL;
+    }
+    if (pg_max && n > pg_max) {
+      ss << "specified pg_num " << n
+        << " < pg_num_max " << pg_max;
+      return -EINVAL;
+    }
     if (osdmap.require_osd_release < ceph_release_t::nautilus) {
       // pre-nautilus osdmap format; increase pg_num directly
       assert(n > (int)p.get_pg_num());
@@ -8453,14 +8524,27 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
       ss << cpp_strerror(id);
       return -ENOENT;
     }
-    if (!osdmap.crush->check_crush_rule(id, p.get_type(), p.get_size(), ss)) {
+    if (osdmap.crush->get_rule_type(id) != (int)p.get_type()) {
+      ss << "crush rule " << id << " type does not match pool";
       return -EINVAL;
     }
     p.crush_rule = id;
   } else if (var == "nodelete" || var == "nopgchange" ||
             var == "nosizechange" || var == "write_fadvise_dontneed" ||
-            var == "noscrub" || var == "nodeep-scrub") {
+            var == "noscrub" || var == "nodeep-scrub" || var == "bulk") {
     uint64_t flag = pg_pool_t::get_flag_by_name(var);
+    // make sure we only compare against 'n' if we didn't receive a string
+    if (val == "true" || (interr.empty() && n == 1)) {
+      p.set_flag(flag);
+    } else if (val == "false" || (interr.empty() && n == 0)) {
+      p.unset_flag(flag);
+    } else {
+      ss << "expecting value 'true', 'false', '0', or '1'";
+      return -EINVAL;
+    }
+  } else if (var == "eio") {
+    uint64_t flag = pg_pool_t::get_flag_by_name(var);
+
     // make sure we only compare against 'n' if we didn't receive a string
     if (val == "true" || (interr.empty() && n == 1)) {
       p.set_flag(flag);
@@ -8740,6 +8824,16 @@ int OSDMonitor::prepare_command_pool_set(const cmdmap_t& cmdmap,
           << " > pg_num " << p.get_pg_num_target();
        return -EINVAL;
       }
+    } else if (var == "pg_num_max") {
+      if (interr.length()) {
+        ss << "error parsing int value '" << val << "': " << interr;
+        return -EINVAL;
+      }
+      if (n && n < (int)p.get_pg_num_target()) {
+       ss << "specified pg_num_max " << n
+          << " < pg_num " << p.get_pg_num_target();
+       return -EINVAL;
+      }
     } else if (var == "recovery_priority") {
       if (interr.length()) {
         ss << "error parsing int value '" << val << "': " << interr;
@@ -9173,8 +9267,7 @@ void OSDMonitor::do_osd_create(
 
 out:
   if (device_class.size()) {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     if (newcrush.get_max_devices() < *new_id + 1) {
       newcrush.set_max_devices(*new_id + 1);
     }
@@ -9724,8 +9817,7 @@ int OSDMonitor::prepare_command_osd_purge(
    *     the crush update we delayed from before.
    */
 
-  CrushWrapper newcrush;
-  _get_pending_crush(newcrush);
+  CrushWrapper newcrush = _get_pending_crush();
 
   bool may_be_idempotent = false;
 
@@ -9778,8 +9870,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   bufferlist rdata;
   int err = 0;
 
-  string format;
-  cmd_getval(cmdmap, "format", format, string("plain"));
+  string format = cmd_getval_or<string>(cmdmap, "format", "plain");
   boost::scoped_ptr<Formatter> f(Formatter::create(format));
 
   string prefix;
@@ -9875,11 +9966,6 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
     }
 
-    if (crush.has_legacy_rule_ids()) {
-      err = -EINVAL;
-      ss << "crush maps with ruleset != ruleid are no longer allowed";
-      goto reply;
-    }
     if (!validate_crush_against_features(&crush, ss)) {
       err = -EINVAL;
       goto reply;
@@ -9898,6 +9984,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       CrushTester tester(crush, ess);
       tester.set_min_x(0);
       tester.set_max_x(50);
+      tester.set_num_rep(3);  // arbitrary
       auto start = ceph::coarse_mono_clock::now();
       int r = tester.test_with_fork(g_conf()->mon_lease);
       auto duration = ceph::coarse_mono_clock::now() - start;
@@ -9917,8 +10004,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     goto update;
 
   } else if (prefix == "osd crush set-all-straw-buckets-to-straw2") {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     for (int b = 0; b < newcrush.get_max_buckets(); ++b) {
       int bid = -1 - b;
       if (newcrush.bucket_exists(bid) &&
@@ -9946,8 +10032,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     bool stop = false;
     vector<string> idvec;
     cmd_getval(cmdmap, "ids", idvec);
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     set<int> updated;
     for (unsigned j = 0; j < idvec.size() && !stop; j++) {
       set<int> osds;
@@ -10020,8 +10105,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     bool stop = false;
     vector<string> idvec;
     cmd_getval(cmdmap, "ids", idvec);
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     set<int> updated;
 
     for (unsigned j = 0; j < idvec.size() && !stop; j++) {
@@ -10093,8 +10177,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       ss << "class '" << device_class << "' already exists";
       goto reply;
     }
-     CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+     CrushWrapper newcrush = _get_pending_crush();
      if (newcrush.class_exists(device_class)) {
       ss << "class '" << device_class << "' already exists";
       goto update;
@@ -10123,8 +10206,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        goto reply;
      }
 
-     CrushWrapper newcrush;
-     _get_pending_crush(newcrush);
+     CrushWrapper newcrush = _get_pending_crush();
      if (!newcrush.class_exists(device_class)) {
        err = 0; // make command idempotent
        goto wait;
@@ -10198,8 +10280,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     if (!newcrush.class_exists(srcname) && newcrush.class_exists(dstname)) {
       // suppose this is a replay and return success
       // so command is idempotent
@@ -10239,8 +10320,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     if (newcrush.name_exists(name)) {
       ss << "bucket '" << name << "' already exists";
@@ -10309,8 +10389,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto update;
   } else if (prefix == "osd crush weight-set create" ||
             prefix == "osd crush weight-set create-compat") {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    if (_have_pending_crush()) {
+      dout(10) << " first waiting for pending crush changes to commit" << dendl;
+      goto wait;
+    }
+    CrushWrapper newcrush = _get_pending_crush();
     int64_t pool;
     int positions;
     if (newcrush.has_non_straw2_buckets()) {
@@ -10363,8 +10446,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   } else if (prefix == "osd crush weight-set rm" ||
             prefix == "osd crush weight-set rm-compat") {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     int64_t pool;
     if (prefix == "osd crush weight-set rm") {
       string poolname;
@@ -10390,8 +10472,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     cmd_getval(cmdmap, "pool", poolname);
     cmd_getval(cmdmap, "item", item);
     cmd_getval(cmdmap, "weight", weight);
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     int64_t pool;
     if (prefix == "osd crush weight-set reweight") {
       pool = osdmap.lookup_pg_pool_name(poolname.c_str());
@@ -10477,8 +10558,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     dout(5) << "adding/updating crush item id " << osdid << " name '"
       << osd_name << "' weight " << weight << " at location "
       << loc << dendl;
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     string action;
     if (prefix == "osd crush set" ||
@@ -10538,8 +10618,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
              << "' initial_weight " << weight << " at location " << loc
              << dendl;
 
-      CrushWrapper newcrush;
-      _get_pending_crush(newcrush);
+      CrushWrapper newcrush = _get_pending_crush();
 
       err = newcrush.create_or_move_item(cct, osdid, weight, osd_name, loc,
                                         g_conf()->osd_crush_update_weight_set);
@@ -10573,8 +10652,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       CrushWrapper::parse_loc_map(argvec, &loc);
 
       dout(0) << "moving crush item name '" << name << "' to location " << loc << dendl;
-      CrushWrapper newcrush;
-      _get_pending_crush(newcrush);
+      CrushWrapper newcrush = _get_pending_crush();
 
       if (!newcrush.name_exists(name)) {
        err = -ENOENT;
@@ -10613,8 +10691,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     bool force = false;
     cmd_getval(cmdmap, "yes_i_really_mean_it", force);
 
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     if (!newcrush.name_exists(source)) {
       ss << "source item " << source << " does not exist";
       err = -ENOENT;
@@ -10681,8 +10758,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     dout(5) << "linking crush item name '" << name << "' at location " << loc << dendl;
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     if (!newcrush.name_exists(name)) {
       err = -ENOENT;
@@ -10716,8 +10792,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             prefix == "osd crush unlink") {
     do {
       // osd crush rm <id> [ancestor]
-      CrushWrapper newcrush;
-      _get_pending_crush(newcrush);
+      CrushWrapper newcrush = _get_pending_crush();
 
       string name;
       cmd_getval(cmdmap, "name", name);
@@ -10772,8 +10847,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     } while (false);
 
   } else if (prefix == "osd crush reweight-all") {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     newcrush.reweight(cct);
     pending_inc.crush.clear();
@@ -10785,8 +10859,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd crush reweight") {
     // osd crush reweight <name> <weight>
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     string name;
     cmd_getval(cmdmap, "name", name);
@@ -10824,8 +10897,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return true;
   } else if (prefix == "osd crush reweight-subtree") {
     // osd crush reweight <name> <weight>
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     string name;
     cmd_getval(cmdmap, "name", name);
@@ -10862,8 +10934,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                              get_last_committed() + 1));
     return true;
   } else if (prefix == "osd crush tunables") {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     err = 0;
     string profile;
@@ -10901,8 +10972,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                              get_last_committed() + 1));
     return true;
   } else if (prefix == "osd crush set-tunable") {
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     err = 0;
     string tunable;
@@ -10959,8 +11029,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     if (newcrush.rule_exists(name)) {
       // The name is uniquely associated to a ruleid and the rule it contains
@@ -10998,8 +11067,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     if (newcrush.rule_exists(name)) {
       // The name is uniquely associated to a ruleid and the rule it contains
@@ -11198,8 +11266,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
 
     if (!newcrush.rule_exists(name)) {
       ss << "rule " << name << " does not exist";
@@ -11211,9 +11278,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       // make sure it is not in use.
       // FIXME: this is ok in some situations, but let's not bother with that
       // complexity now.
-      int ruleset = newcrush.get_rule_mask_ruleset(ruleno);
-      if (osdmap.crush_rule_in_use(ruleset)) {
-       ss << "crush ruleset " << name << " " << ruleset << " is in use";
+      if (osdmap.crush_rule_in_use(ruleno)) {
+       ss << "crush rule " << name << " (" << ruleno << ") is in use";
        err = -EBUSY;
        goto reply;
       }
@@ -11247,8 +11313,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
+    CrushWrapper newcrush = _get_pending_crush();
     if (!newcrush.rule_exists(srcname) && newcrush.rule_exists(dstname)) {
       // srcname does not exist and dstname already exists
       // suppose this is a replay and return success
@@ -11503,40 +11568,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       err = 0;
       goto reply;
     }
-    ceph_assert(osdmap.require_osd_release >= ceph_release_t::luminous);
+    ceph_assert(osdmap.require_osd_release >= ceph_release_t::octopus);
     if (!osdmap.get_num_up_osds() && !sure) {
       ss << "Not advisable to continue since no OSDs are up. Pass "
         << "--yes-i-really-mean-it if you really wish to continue.";
       err = -EPERM;
       goto reply;
     }
-    if (rel == ceph_release_t::mimic) {
-      if (!mon.monmap->get_required_features().contains_all(
-           ceph::features::mon::FEATURE_MIMIC)) {
-       ss << "not all mons are mimic";
-       err = -EPERM;
-       goto reply;
-      }
-      if ((!HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_MIMIC))
-           && !sure) {
-       ss << "not all up OSDs have CEPH_FEATURE_SERVER_MIMIC feature";
-       err = -EPERM;
-       goto reply;
-      }
-    } else if (rel == ceph_release_t::nautilus) {
-      if (!mon.monmap->get_required_features().contains_all(
-           ceph::features::mon::FEATURE_NAUTILUS)) {
-       ss << "not all mons are nautilus";
-       err = -EPERM;
-       goto reply;
-      }
-      if ((!HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_NAUTILUS))
-           && !sure) {
-       ss << "not all up OSDs have CEPH_FEATURE_SERVER_NAUTILUS feature";
-       err = -EPERM;
-       goto reply;
-      }
-    } else if (rel == ceph_release_t::octopus) {
+    if (rel == ceph_release_t::octopus) {
       if (!mon.monmap->get_required_features().contains_all(
            ceph::features::mon::FEATURE_OCTOPUS)) {
        ss << "not all mons are octopus";
@@ -11562,8 +11601,21 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        err = -EPERM;
        goto reply;
       }
+    } else if (rel == ceph_release_t::quincy) {
+      if (!mon.monmap->get_required_features().contains_all(
+           ceph::features::mon::FEATURE_QUINCY)) {
+       ss << "not all mons are quincy";
+       err = -EPERM;
+       goto reply;
+      }
+      if ((!HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_QUINCY))
+           && !sure) {
+       ss << "not all up OSDs have CEPH_FEATURE_SERVER_QUINCY feature";
+       err = -EPERM;
+       goto reply;
+      }
     } else {
-      ss << "not supported for this release yet";
+      ss << "not supported for this release";
       err = -EPERM;
       goto reply;
     }
@@ -12625,7 +12677,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     string addrstr;
     cmd_getval(cmdmap, "addr", addrstr);
     entity_addr_t addr;
-    if (!addr.parse(addrstr.c_str(), 0)) {
+    if (!addr.parse(addrstr)) {
       ss << "unable to parse address " << addrstr;
       err = -EINVAL;
       goto reply;
@@ -12644,9 +12696,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
       if (blocklistop == "add") {
        utime_t expires = ceph_clock_now();
-       double d;
        // default one hour
-       cmd_getval(cmdmap, "expire", d,
+       double d = cmd_getval_or<double>(cmdmap, "expire",
           g_conf()->mon_osd_blocklist_default_expire);
        expires += d;
 
@@ -12768,12 +12819,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                              get_last_committed() + 1));
     return true;
   } else if (prefix == "osd pool create") {
-    int64_t pg_num, pg_num_min;
-    int64_t pgp_num;
-    cmd_getval(cmdmap, "pg_num", pg_num, int64_t(0));
-    cmd_getval(cmdmap, "pgp_num", pgp_num, pg_num);
-    cmd_getval(cmdmap, "pg_num_min", pg_num_min, int64_t(0));
-
+    int64_t pg_num = cmd_getval_or<int64_t>(cmdmap, "pg_num", 0);
+    int64_t pg_num_min = cmd_getval_or<int64_t>(cmdmap, "pg_num_min", 0);
+    int64_t pg_num_max = cmd_getval_or<int64_t>(cmdmap, "pg_num_max", 0);
+    int64_t pgp_num = cmd_getval_or<int64_t>(cmdmap, "pgp_num", pg_num);
     string pool_type_str;
     cmd_getval(cmdmap, "pool_type", pool_type_str);
     if (pool_type_str.empty())
@@ -12844,8 +12893,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
          rule_name = poolstr;
        }
       }
-      cmd_getval(cmdmap, "expected_num_objects",
-                 expected_num_objects, int64_t(0));
+      expected_num_objects =
+       cmd_getval_or<int64_t>(cmdmap, "expected_num_objects", 0);
     } else {
       //NOTE:for replicated pool,cmd_map will put rule_name to erasure_code_profile field
       //     and put expected_num_objects to rule field
@@ -12861,8 +12910,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
         }
         rule_name = erasure_code_profile;
       } else { // cmd is well-formed
-        cmd_getval(cmdmap, "expected_num_objects",
-                   expected_num_objects, int64_t(0));
+        expected_num_objects =
+         cmd_getval_or<int64_t>(cmdmap, "expected_num_objects", 0);
       }
     }
 
@@ -12918,8 +12967,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
     }
 
-    int64_t fast_read_param;
-    cmd_getval(cmdmap, "fast_read", fast_read_param, int64_t(-1));
+    int64_t fast_read_param = cmd_getval_or<int64_t>(cmdmap, "fast_read", -1);
     FastReadType fast_read = FAST_READ_DEFAULT;
     if (fast_read_param == 0)
       fast_read = FAST_READ_OFF;
@@ -12936,15 +12984,17 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     string pg_autoscale_mode;
     cmd_getval(cmdmap, "autoscale_mode", pg_autoscale_mode);
 
+    bool bulk = cmd_getval_or<bool>(cmdmap, "bulk", 0);
     err = prepare_new_pool(poolstr,
                           -1, // default crush rule
                           rule_name,
-                          pg_num, pgp_num, pg_num_min,
+                          pg_num, pgp_num, pg_num_min, pg_num_max,
                            repl_size, target_size_bytes, target_size_ratio,
                           erasure_code_profile, pool_type,
                            (uint64_t)expected_num_objects,
                            fast_read,
                           pg_autoscale_mode,
+                          bulk,
                           &ss);
     if (err < 0) {
       switch(err) {
@@ -13086,11 +13136,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     }
 
     // make sure new tier is empty
-    string force_nonempty;
-    cmd_getval(cmdmap, "force_nonempty", force_nonempty);
+    bool force_nonempty = false;
+    cmd_getval_compat_cephbool(cmdmap, "force_nonempty", force_nonempty);
     const pool_stat_t *pstats = mon.mgrstatmon()->get_pool_stat(tierpool_id);
     if (pstats && pstats->stats.sum.num_objects != 0 &&
-       force_nonempty != "--force-nonempty") {
+       !force_nonempty) {
       ss << "tier pool '" << tierpoolstr << "' is not empty; --force-nonempty to force";
       err = -ENOTEMPTY;
       goto reply;
@@ -13102,8 +13152,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     if ((!tp->removed_snaps.empty() || !tp->snaps.empty()) &&
-       ((force_nonempty != "--force-nonempty") ||
-        (!g_conf()->mon_debug_unsafe_allow_tier_with_nonempty_snaps))) {
+       (!force_nonempty ||
+        !g_conf()->mon_debug_unsafe_allow_tier_with_nonempty_snaps)) {
       ss << "tier pool '" << tierpoolstr << "' has snapshot state; it cannot be added as a tier without breaking the pool";
       err = -ENOTEMPTY;
       goto reply;
@@ -13543,9 +13593,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     string tss;
     int64_t value;
     if (field == "max_objects") {
-      value = strict_sistrtoll(val.c_str(), &tss);
+      value = strict_si_cast<uint64_t>(val, &tss);
     } else if (field == "max_bytes") {
-      value = strict_iecstrtoll(val.c_str(), &tss);
+      value = strict_iecstrtoll(val, &tss);
     } else {
       ceph_abort_msg("unrecognized option");
     }
@@ -14275,8 +14325,7 @@ int OSDMonitor::_prepare_remove_pool(
   }
 
   // remove any choose_args for this pool
-  CrushWrapper newcrush;
-  _get_pending_crush(newcrush);
+  CrushWrapper newcrush = _get_pending_crush();
   if (newcrush.have_choose_args(pool)) {
     dout(10) << __func__ << " removing choose_args for pool " << pool << dendl;
     newcrush.rm_choose_args(pool);
@@ -14435,15 +14484,16 @@ void OSDMonitor::try_enable_stretch_mode(stringstream& ss, bool *okay,
 {
   dout(20) << __func__ << dendl;
   *okay = false;
-  CrushWrapper crush;
-  _get_pending_crush(crush);
-  int dividing_id;
-  int retval = crush.get_validated_type_id(dividing_bucket, &dividing_id);
-  if (retval == -1) {
+  CrushWrapper crush = _get_pending_crush();
+  int dividing_id = -1;
+  if (auto type_id = crush.get_validated_type_id(dividing_bucket);
+      !type_id.has_value()) {
     ss << dividing_bucket << " is not a valid crush bucket type";
     *errcode = -ENOENT;
-    ceph_assert(!commit || retval != -1);
+    ceph_assert(!commit);
     return;
+  } else {
+    dividing_id = *type_id;
   }
   vector<int> subtrees;
   crush.get_subtree_of_type(dividing_id, &subtrees);