]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/OSDMonitor.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mon / OSDMonitor.cc
index 46f702f4023e47810a1d12ba7cbf123a14c92720..180b6967e65fa3e314839ca12d36886c2515d6c6 100644 (file)
@@ -81,7 +81,8 @@
 #include <boost/algorithm/string/predicate.hpp>
 
 #define dout_subsys ceph_subsys_mon
-#define OSD_PG_CREATING_PREFIX "osd_pg_creating"
+static const string OSD_PG_CREATING_PREFIX("osd_pg_creating");
+static const string OSD_METADATA_PREFIX("osd_metadata");
 
 namespace {
 
@@ -268,6 +269,7 @@ void OSDMonitor::get_store_prefixes(std::set<string>& s)
 {
   s.insert(service_name);
   s.insert(OSD_PG_CREATING_PREFIX);
+  s.insert(OSD_METADATA_PREFIX);
 }
 
 void OSDMonitor::update_from_paxos(bool *need_bootstrap)
@@ -566,23 +568,6 @@ void OSDMonitor::on_active()
 void OSDMonitor::on_restart()
 {
   last_osd_report.clear();
-
-  if (mon->is_leader()) {
-    // fix ruleset != ruleid
-    if (osdmap.crush->has_legacy_rulesets() &&
-       !osdmap.crush->has_multirule_rulesets()) {
-      CrushWrapper newcrush;
-      _get_pending_crush(newcrush);
-      int r = newcrush.renumber_rules_by_ruleset();
-      if (r >= 0) {
-       dout(1) << __func__ << " crush map has ruleset != rule id; fixing" << dendl;
-       pending_inc.crush.clear();
-       newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-      } else {
-       dout(10) << __func__ << " unable to renumber rules by ruleset" << dendl;
-      }
-    }
-  }
 }
 
 void OSDMonitor::on_shutdown()
@@ -662,6 +647,40 @@ void OSDMonitor::create_pending()
              << pending_inc.new_nearfull_ratio << dendl;
     }
   }
+
+  // Rewrite CRUSH rule IDs if they are using legacy "ruleset"
+  // structure.
+  if (osdmap.crush->has_legacy_rule_ids()) {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+
+    // First, for all pools, work out which rule they really used
+    // by resolving ruleset to rule.
+    for (const auto &i : osdmap.get_pools()) {
+      const auto pool_id = i.first;
+      const auto &pool = i.second;
+      int new_rule_id = newcrush.find_rule(pool.crush_rule,
+                                          pool.type, pool.size);
+
+      dout(1) << __func__ << " rewriting pool "
+             << osdmap.get_pool_name(pool_id) << " crush ruleset "
+             << pool.crush_rule << " -> rule id " << new_rule_id << dendl;
+      if (pending_inc.new_pools.count(pool_id) == 0) {
+       pending_inc.new_pools[pool_id] = pool;
+      }
+      pending_inc.new_pools[pool_id].crush_rule = new_rule_id;
+    }
+
+    // Now, go ahead and renumber all the rules so that their
+    // rule_id field corresponds to their position in the array
+    auto old_to_new = newcrush.renumber_rules();
+    dout(1) << __func__ << " Rewrote " << old_to_new << " crush IDs:" << dendl;
+    for (const auto &i : old_to_new) {
+      dout(1) << __func__ << " " << i.first << " -> " << i.second << dendl;
+    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+  }
 }
 
 creating_pgs_t
@@ -972,31 +991,190 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     tmp.apply_incremental(pending_inc);
 
     if (tmp.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      // set or clear full/nearfull?
-      int full, backfill, nearfull;
-      tmp.count_full_nearfull_osds(&full, &backfill, &nearfull);
-      if (full > 0) {
-       if (!tmp.test_flag(CEPH_OSDMAP_FULL)) {
-         dout(10) << __func__ << " setting full flag" << dendl;
-         add_flag(CEPH_OSDMAP_FULL);
-         remove_flag(CEPH_OSDMAP_NEARFULL);
-       }
-      } else {
-       if (tmp.test_flag(CEPH_OSDMAP_FULL)) {
-         dout(10) << __func__ << " clearing full flag" << dendl;
-         remove_flag(CEPH_OSDMAP_FULL);
-       }
-       if (nearfull > 0) {
-         if (!tmp.test_flag(CEPH_OSDMAP_NEARFULL)) {
-           dout(10) << __func__ << " setting nearfull flag" << dendl;
-           add_flag(CEPH_OSDMAP_NEARFULL);
-         }
-       } else {
-         if (tmp.test_flag(CEPH_OSDMAP_NEARFULL)) {
-           dout(10) << __func__ << " clearing nearfull flag" << dendl;
-           remove_flag(CEPH_OSDMAP_NEARFULL);
-         }
-       }
+      // remove any legacy osdmap nearfull/full flags
+      {
+        if (tmp.test_flag(CEPH_OSDMAP_FULL | CEPH_OSDMAP_NEARFULL)) {
+          dout(10) << __func__ << " clearing legacy osdmap nearfull/full flag"
+                   << dendl;
+          remove_flag(CEPH_OSDMAP_NEARFULL);
+          remove_flag(CEPH_OSDMAP_FULL);
+        }
+      }
+      // collect which pools are currently affected by
+      // the near/backfill/full osd(s),
+      // and set per-pool near/backfill/full flag instead
+      set<int64_t> full_pool_ids;
+      set<int64_t> backfillfull_pool_ids;
+      set<int64_t> nearfull_pool_ids;
+      tmp.get_full_pools(g_ceph_context,
+                         &full_pool_ids,
+                         &backfillfull_pool_ids,
+                         &nearfull_pool_ids);
+      if (full_pool_ids.empty() ||
+          backfillfull_pool_ids.empty() ||
+          nearfull_pool_ids.empty()) {
+        // normal case - no nearfull, backfillfull or full osds
+        // try cancel any improper nearfull/backfillfull/full pool
+        // flags first
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL) &&
+              nearfull_pool_ids.empty()) {
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s nearfull flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              // load original pool info first!
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL) &&
+              backfillfull_pool_ids.empty()) {
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s backfillfull flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL) &&
+              full_pool_ids.empty()) {
+            if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+              // set by EQUOTA, skipping
+              continue;
+            }
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s full flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_FULL;
+          }
+        }
+      }
+      if (!full_pool_ids.empty()) {
+        dout(10) << __func__ << " marking pool(s) " << full_pool_ids
+                 << " as full" << dendl;
+        for (auto &p: full_pool_ids) {
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL)) {
+            continue;
+          }
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_FULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_FULL for pools which are no longer full too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p)) {
+            // skip pools we have just marked as full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL) ||
+               tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // don't touch if currently is not full
+            // or is running out of quota (and hence considered as full)
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s full flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_FULL;
+        }
+      }
+      if (!backfillfull_pool_ids.empty()) {
+        for (auto &p: backfillfull_pool_ids) {
+          if (full_pool_ids.count(p)) {
+            // skip pools we have already considered as full above
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // make sure FLAG_FULL is truly set, so we are safe not
+            // to set a extra (redundant) FLAG_BACKFILLFULL flag
+            assert(tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL));
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
+            // don't bother if pool is already marked as backfillfull
+            continue;
+          }
+          dout(10) << __func__ << " marking pool '" << tmp.pool_name[p]
+                   << "'s as backfillfull" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_BACKFILLFULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_BACKFILLFULL for pools
+        // which are no longer backfillfull too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p) || backfillfull_pool_ids.count(p)) {
+            // skip pools we have just marked as backfillfull/full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
+            // and don't touch if currently is not backfillfull
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s backfillfull flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+        }
+      }
+      if (!nearfull_pool_ids.empty()) {
+        for (auto &p: nearfull_pool_ids) {
+          if (full_pool_ids.count(p) || backfillfull_pool_ids.count(p)) {
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // make sure FLAG_FULL is truly set, so we are safe not
+            // to set a extra (redundant) FLAG_NEARFULL flag
+            assert(tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL));
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL)) {
+            // don't bother if pool is already marked as nearfull
+            continue;
+          }
+          dout(10) << __func__ << " marking pool '" << tmp.pool_name[p]
+                   << "'s as nearfull" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_NEARFULL for pools
+        // which are no longer nearfull too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p) ||
+              backfillfull_pool_ids.count(p) ||
+              nearfull_pool_ids.count(p)) {
+            // skip pools we have just marked as
+            // nearfull/backfillfull/full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL)) {
+            // and don't touch if currently is not nearfull
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s nearfull flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
       }
 
       // min_compat_client?
@@ -2668,6 +2846,10 @@ bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op)
     goto ignore;
   }
 
+  if (m->forced) {
+    return false;
+  }
+
   for (auto p = m->pg_temp.begin(); p != m->pg_temp.end(); ++p) {
     dout(20) << " " << p->first
             << (osdmap.pg_temp->count(p->first) ? osdmap.pg_temp->get(p->first) : empty)
@@ -3749,16 +3931,6 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
       }
     }
 
-    if (osdmap.crush->has_multirule_rulesets()) {
-      ostringstream ss;
-      ss << "CRUSH map contains multirule rulesets";
-      summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-      if (detail) {
-       ss << "; please manually fix the map";
-       detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-      }
-    }
-
     // Not using 'sortbitwise' and should be?
     if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE) &&
         (osdmap.get_up_osd_features() &
@@ -5264,10 +5436,20 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   return true;
 }
 
-void OSDMonitor::update_pool_flags(int64_t pool_id, uint64_t flags)
+void OSDMonitor::set_pool_flags(int64_t pool_id, uint64_t flags)
+{
+  pg_pool_t *pool = pending_inc.get_new_pool(pool_id,
+    osdmap.get_pg_pool(pool_id));
+  assert(pool);
+  pool->set_flag(flags);
+}
+
+void OSDMonitor::clear_pool_flags(int64_t pool_id, uint64_t flags)
 {
-  const pg_pool_t *pool = osdmap.get_pg_pool(pool_id);
-  pending_inc.get_new_pool(pool_id, pool)->flags = flags;
+  pg_pool_t *pool = pending_inc.get_new_pool(pool_id,
+    osdmap.get_pg_pool(pool_id));
+  assert(pool);
+  pool->unset_flag(flags);
 }
 
 bool OSDMonitor::update_pools_status()
@@ -5290,14 +5472,16 @@ bool OSDMonitor::update_pools_status()
       (pool.quota_max_bytes > 0 && (uint64_t)sum.num_bytes >= pool.quota_max_bytes) ||
       (pool.quota_max_objects > 0 && (uint64_t)sum.num_objects >= pool.quota_max_objects);
 
-    if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
+    if (pool.has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
       if (pool_is_full)
         continue;
 
       mon->clog->info() << "pool '" << pool_name
-                       << "' no longer full; removing FULL flag";
-
-      update_pool_flags(it->first, pool.get_flags() & ~pg_pool_t::FLAG_FULL);
+                       << "' no longer out of quota; removing NO_QUOTA flag";
+      // below we cancel FLAG_FULL too, we'll set it again in
+      // OSDMonitor::encode_pending if it still fails the osd-full checking.
+      clear_pool_flags(it->first,
+                       pg_pool_t::FLAG_FULL_NO_QUOTA | pg_pool_t::FLAG_FULL);
       ret = true;
     } else {
       if (!pool_is_full)
@@ -5315,7 +5499,14 @@ bool OSDMonitor::update_pools_status()
                          << " (reached quota's max_objects: "
                          << pool.quota_max_objects << ")";
       }
-      update_pool_flags(it->first, pool.get_flags() | pg_pool_t::FLAG_FULL);
+      // set both FLAG_FULL_NO_QUOTA and FLAG_FULL
+      // note that below we try to cancel FLAG_BACKFILLFULL/NEARFULL too
+      // since FLAG_FULL should always take precedence
+      set_pool_flags(it->first,
+                     pg_pool_t::FLAG_FULL_NO_QUOTA | pg_pool_t::FLAG_FULL);
+      clear_pool_flags(it->first,
+                       pg_pool_t::FLAG_NEARFULL |
+                       pg_pool_t::FLAG_BACKFILLFULL);
       ret = true;
     }
   }
@@ -5625,9 +5816,22 @@ int OSDMonitor::parse_erasure_code_profile(const vector<string> &erasure_code_pr
       user_map[*i] = string();
       (*erasure_code_profile_map)[*i] = string();
     } else {
-      const string key = i->substr(0, equal);
+      string key = i->substr(0, equal);
       equal++;
       const string value = i->substr(equal);
+      if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
+         key.find("ruleset-") == 0) {
+       if (g_conf->get_val<bool>("mon_fixup_legacy_erasure_code_profiles")) {
+         mon->clog->warn() << "erasure code profile property '" << key
+                           << "' is no longer supported; try "
+                           << "'crush-" << key.substr(8) << "' instead";
+         key = string("crush-") + key.substr(8);
+       } else {
+         *ss << "property '" << key << "' is no longer supported; try "
+             << "'crush-" << key.substr(8) << "' instead";
+         return -EINVAL;
+       }
+      }
       user_map[key] = value;
       (*erasure_code_profile_map)[key] = value;
     }
@@ -5797,6 +6001,36 @@ int OSDMonitor::get_crush_rule(const string &rule_name,
   return 0;
 }
 
+int OSDMonitor::check_pg_num(int64_t pool, int pg_num, int size, ostream *ss)
+{
+  auto max_pgs_per_osd = g_conf->get_val<uint64_t>("mon_max_pg_per_osd");
+  auto num_osds = std::max(osdmap.get_num_in_osds(), 3u);   // assume min cluster size 3
+  auto max_pgs = max_pgs_per_osd * num_osds;
+  uint64_t projected = 0;
+  if (pool < 0) {
+    projected += pg_num * size;
+  }
+  for (const auto& i : osdmap.get_pools()) {
+    if (i.first == pool) {
+      projected += pg_num * size;
+    } else {
+      projected += i.second.get_pg_num() * i.second.get_size();
+    }
+  }
+  if (projected > max_pgs) {
+    if (pool >= 0) {
+      *ss << "pool id " << pool;
+    }
+    *ss << " pg_num " << pg_num << " size " << size
+       << " would mean " << projected
+       << " total pgs, which exceeds max " << max_pgs
+       << " (mon_max_pg_per_osd " << max_pgs_per_osd
+       << " * num_in_osds " << num_osds << ")";
+    return -ERANGE;
+  }
+  return 0;
+}
+
 /**
  * @param name The name of the new pool
  * @param auid The auid of the pool owner. Can be -1
@@ -5876,6 +6110,11 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
     dout(10) << " prepare_pool_size returns " << r << dendl;
     return r;
   }
+  r = check_pg_num(-1, pg_num, size, ss);
+  if (r) {
+    dout(10) << " prepare_pool_size returns " << r << dendl;
+    return r;
+  }
 
   if (!osdmap.crush->check_crush_rule(crush_rule, pool_type, size, *ss)) {
     return -EINVAL;
@@ -6052,6 +6291,10 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
       ss << "pool size must be between 1 and 10";
       return -EINVAL;
     }
+    int r = check_pg_num(pool, p.get_pg_num(), n, &ss);
+    if (r < 0) {
+      return r;
+    }
     p.size = n;
     if (n < p.min_size)
       p.min_size = n;
@@ -6078,7 +6321,7 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
        if (err == 0) {
         k = erasure_code->get_data_chunk_count();
        } else {
-        ss << __func__ << " get_erasure_code failed: " << tmp.rdbuf();
+        ss << __func__ << " get_erasure_code failed: " << tmp.str();
         return err;
        }
 
@@ -6121,6 +6364,10 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
          << " (you may adjust 'mon max pool pg num' for higher values)";
       return -ERANGE;
     }
+    int r = check_pg_num(pool, n, p.get_size(), &ss);
+    if (r) {
+      return r;
+    }
     string force;
     cmd_getval(g_ceph_context,cmdmap, "force", force);
     if (p.cache_mode != pg_pool_t::CACHEMODE_NONE &&
@@ -6250,7 +6497,15 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
     bloomp->set_fpp(f);
   } else if (var == "use_gmt_hitset") {
     if (val == "true" || (interr.empty() && n == 1)) {
-      if (!(osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_HITSET_GMT)) {
+      string force;
+      cmd_getval(g_ceph_context, cmdmap, "force", force);
+      if (!osdmap.get_num_up_osds() && force != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        return -EPERM;
+      }
+      if (!(osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_HITSET_GMT)
+          && force != "--yes-i-really-mean-it") {
        ss << "not all OSDs support GMT hit set.";
        return -EINVAL;
       }
@@ -7329,7 +7584,12 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
   int64_t osdid;
   string name;
-  bool osdid_present = cmd_getval(g_ceph_context, cmdmap, "id", osdid);
+  bool osdid_present = false;
+  if (prefix != "osd pg-temp" &&
+      prefix != "osd pg-upmap" &&
+      prefix != "osd pg-upmap-items") {  // avoid commands with non-int id arg
+    osdid_present = cmd_getval(g_ceph_context, cmdmap, "id", osdid);
+  }
   if (osdid_present) {
     ostringstream oss;
     oss << "osd." << osdid;
@@ -7412,7 +7672,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
     }
 
-    if (crush.has_legacy_rulesets()) {
+    if (crush.has_legacy_rule_ids()) {
       err = -EINVAL;
       ss << "crush maps with ruleset != ruleid are no longer allowed";
       goto reply;
@@ -7422,16 +7682,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    const auto& osdmap_pools = osdmap.get_pools();
-    for (auto pit = osdmap_pools.begin(); pit != osdmap_pools.end(); ++pit) {
-      const int64_t pool_id = pit->first;
-      const pg_pool_t &pool = pit->second;
-      int ruleno = pool.get_crush_rule();
-      if (!crush.rule_exists(ruleno)) {
-       ss << " the crush rule no "<< ruleno << " for pool id " << pool_id << " is in use";
-       err = -EINVAL;
-       goto reply;
-      }
+    err = osdmap.validate_crush_rules(&crush, &ss);
+    if (err < 0) {
+      goto reply;
     }
 
     if (g_conf->mon_osd_crush_smoke_test) {
@@ -7460,6 +7713,26 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     ss << osdmap.get_crush_version() + 1;
     goto update;
 
+  } else if (prefix == "osd crush set-all-straw-buckets-to-straw2") {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    for (int b = 0; b < newcrush.get_max_buckets(); ++b) {
+      int bid = -1 - b;
+      if (newcrush.bucket_exists(bid) &&
+         newcrush.get_bucket_alg(bid)) {
+       dout(20) << " bucket " << bid << " is straw, can convert" << dendl;
+       newcrush.bucket_set_alg(bid, CRUSH_BUCKET_STRAW2);
+      }
+    }
+    if (!validate_crush_against_features(&newcrush, ss)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
+                                             get_last_committed() + 1));
+    return true;
   } else if (prefix == "osd crush set-device-class") {
     if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
       ss << "you must complete the upgrade and 'ceph osd require-osd-release "
@@ -8596,7 +8869,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       // FIXME: this is ok in some situations, but let's not bother with that
       // complexity now.
       int ruleset = newcrush.get_rule_mask_ruleset(ruleno);
-      if (osdmap.crush_ruleset_in_use(ruleset)) {
+      if (osdmap.crush_rule_in_use(ruleset)) {
        ss << "crush ruleset " << name << " " << ruleset << " is in use";
        err = -EBUSY;
        goto reply;
@@ -8801,6 +9074,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return prepare_unset_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
 
   } else if (prefix == "osd set") {
+    string sure;
+    cmd_getval(g_ceph_context, cmdmap, "sure", sure);
     string key;
     cmd_getval(g_ceph_context, cmdmap, "key", key);
     if (key == "full")
@@ -8828,7 +9103,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     else if (key == "notieragent")
       return prepare_set_flag(op, CEPH_OSDMAP_NOTIERAGENT);
     else if (key == "sortbitwise") {
-      if (osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT) {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
+      if ((osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT)
+          || sure == "--yes-i-really-mean-it") {
        return prepare_set_flag(op, CEPH_OSDMAP_SORTBITWISE);
       } else {
        ss << "not all up OSDs have OSD_BITWISE_HOBJ_SORT feature";
@@ -8836,7 +9118,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        goto reply;
       }
     } else if (key == "recovery_deletes") {
-      if (HAVE_FEATURE(osdmap.get_up_osd_features(), OSD_RECOVERY_DELETES)) {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
+      if (HAVE_FEATURE(osdmap.get_up_osd_features(), OSD_RECOVERY_DELETES)
+          || sure == "--yes-i-really-mean-it") {
        return prepare_set_flag(op, CEPH_OSDMAP_RECOVERY_DELETES);
       } else {
        ss << "not all up OSDs have OSD_RECOVERY_DELETES feature";
@@ -8844,6 +9133,12 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        goto reply;
       }
     } else if (key == "require_jewel_osds") {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_jewel_osds";
        err = -EPERM;
@@ -8852,13 +9147,20 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        ss << "require_osd_release is already >= jewel";
        err = 0;
        goto reply;
-      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_JEWEL)) {
+      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_JEWEL)
+                 || sure == "--yes-i-really-mean-it") {
        return prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_JEWEL);
       } else {
        ss << "not all up OSDs have CEPH_FEATURE_SERVER_JEWEL feature";
        err = -EPERM;
       }
     } else if (key == "require_kraken_osds") {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_kraken_osds";
        err = -EPERM;
@@ -8867,7 +9169,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        ss << "require_osd_release is already >= kraken";
        err = 0;
        goto reply;
-      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_KRAKEN)) {
+      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_KRAKEN)
+                 || sure == "--yes-i-really-mean-it") {
        bool r = prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_KRAKEN);
        // ensure JEWEL is also set
        pending_inc.new_flags |= CEPH_OSDMAP_REQUIRE_JEWEL;
@@ -8916,6 +9219,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd require-osd-release") {
     string release;
     cmd_getval(g_ceph_context, cmdmap, "release", release);
+    string sure;
+    cmd_getval(g_ceph_context, cmdmap, "sure", sure);
     if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
       ss << "the sortbitwise flag must be set first";
       err = -EPERM;
@@ -9032,7 +9337,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             if (osdmap.is_up(osd)) {
               msg << ", while it was still marked up";
             } else {
-              msg << ", after it was down for " << int(down_pending_out[osd].sec())
+              auto period = ceph_clock_now() - down_pending_out[osd];
+              msg << ", after it was down for " << int(period.sec())
                   << " seconds";
             }