]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/OSDMonitor.cc
update sources to 12.2.2
[ceph.git] / ceph / src / mon / OSDMonitor.cc
index 7dcf3397c2ad85fab6a54df3c769bef9df673266..c9ff10b345f72ecb7c7c3353f45fd4a3718f23e8 100644 (file)
@@ -81,7 +81,8 @@
 #include <boost/algorithm/string/predicate.hpp>
 
 #define dout_subsys ceph_subsys_mon
-#define OSD_PG_CREATING_PREFIX "osd_pg_creating"
+static const string OSD_PG_CREATING_PREFIX("osd_pg_creating");
+static const string OSD_METADATA_PREFIX("osd_metadata");
 
 namespace {
 
@@ -240,7 +241,9 @@ void OSDMonitor::create_initial()
     derr << __func__ << " mon_debug_no_require_luminous=true" << dendl;
   } else {
     newmap.require_osd_release = CEPH_RELEASE_LUMINOUS;
-    newmap.flags |= CEPH_OSDMAP_RECOVERY_DELETES;
+    newmap.flags |=
+      CEPH_OSDMAP_RECOVERY_DELETES |
+      CEPH_OSDMAP_PURGED_SNAPDIRS;
     newmap.full_ratio = g_conf->mon_osd_full_ratio;
     if (newmap.full_ratio > 1.0) newmap.full_ratio /= 100;
     newmap.backfillfull_ratio = g_conf->mon_osd_backfillfull_ratio;
@@ -266,6 +269,7 @@ void OSDMonitor::get_store_prefixes(std::set<string>& s)
 {
   s.insert(service_name);
   s.insert(OSD_PG_CREATING_PREFIX);
+  s.insert(OSD_METADATA_PREFIX);
 }
 
 void OSDMonitor::update_from_paxos(bool *need_bootstrap)
@@ -564,23 +568,6 @@ void OSDMonitor::on_active()
 void OSDMonitor::on_restart()
 {
   last_osd_report.clear();
-
-  if (mon->is_leader()) {
-    // fix ruleset != ruleid
-    if (osdmap.crush->has_legacy_rulesets() &&
-       !osdmap.crush->has_multirule_rulesets()) {
-      CrushWrapper newcrush;
-      _get_pending_crush(newcrush);
-      int r = newcrush.renumber_rules_by_ruleset();
-      if (r >= 0) {
-       dout(1) << __func__ << " crush map has ruleset != rule id; fixing" << dendl;
-       pending_inc.crush.clear();
-       newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-      } else {
-       dout(10) << __func__ << " unable to renumber rules by ruleset" << dendl;
-      }
-    }
-  }
 }
 
 void OSDMonitor::on_shutdown()
@@ -660,6 +647,40 @@ void OSDMonitor::create_pending()
              << pending_inc.new_nearfull_ratio << dendl;
     }
   }
+
+  // Rewrite CRUSH rule IDs if they are using legacy "ruleset"
+  // structure.
+  if (osdmap.crush->has_legacy_rule_ids()) {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+
+    // First, for all pools, work out which rule they really used
+    // by resolving ruleset to rule.
+    for (const auto &i : osdmap.get_pools()) {
+      const auto pool_id = i.first;
+      const auto &pool = i.second;
+      int new_rule_id = newcrush.find_rule(pool.crush_rule,
+                                          pool.type, pool.size);
+
+      dout(1) << __func__ << " rewriting pool "
+             << osdmap.get_pool_name(pool_id) << " crush ruleset "
+             << pool.crush_rule << " -> rule id " << new_rule_id << dendl;
+      if (pending_inc.new_pools.count(pool_id) == 0) {
+       pending_inc.new_pools[pool_id] = pool;
+      }
+      pending_inc.new_pools[pool_id].crush_rule = new_rule_id;
+    }
+
+    // Now, go ahead and renumber all the rules so that their
+    // rule_id field corresponds to their position in the array
+    auto old_to_new = newcrush.renumber_rules();
+    dout(1) << __func__ << " Rewrote " << old_to_new << " crush IDs:" << dendl;
+    for (const auto &i : old_to_new) {
+      dout(1) << __func__ << " " << i.first << " -> " << i.second << dendl;
+    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+  }
 }
 
 creating_pgs_t
@@ -970,31 +991,190 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     tmp.apply_incremental(pending_inc);
 
     if (tmp.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      // set or clear full/nearfull?
-      int full, backfill, nearfull;
-      tmp.count_full_nearfull_osds(&full, &backfill, &nearfull);
-      if (full > 0) {
-       if (!tmp.test_flag(CEPH_OSDMAP_FULL)) {
-         dout(10) << __func__ << " setting full flag" << dendl;
-         add_flag(CEPH_OSDMAP_FULL);
-         remove_flag(CEPH_OSDMAP_NEARFULL);
-       }
-      } else {
-       if (tmp.test_flag(CEPH_OSDMAP_FULL)) {
-         dout(10) << __func__ << " clearing full flag" << dendl;
-         remove_flag(CEPH_OSDMAP_FULL);
-       }
-       if (nearfull > 0) {
-         if (!tmp.test_flag(CEPH_OSDMAP_NEARFULL)) {
-           dout(10) << __func__ << " setting nearfull flag" << dendl;
-           add_flag(CEPH_OSDMAP_NEARFULL);
-         }
-       } else {
-         if (tmp.test_flag(CEPH_OSDMAP_NEARFULL)) {
-           dout(10) << __func__ << " clearing nearfull flag" << dendl;
-           remove_flag(CEPH_OSDMAP_NEARFULL);
-         }
-       }
+      // remove any legacy osdmap nearfull/full flags
+      {
+        if (tmp.test_flag(CEPH_OSDMAP_FULL | CEPH_OSDMAP_NEARFULL)) {
+          dout(10) << __func__ << " clearing legacy osdmap nearfull/full flag"
+                   << dendl;
+          remove_flag(CEPH_OSDMAP_NEARFULL);
+          remove_flag(CEPH_OSDMAP_FULL);
+        }
+      }
+      // collect which pools are currently affected by
+      // the near/backfill/full osd(s),
+      // and set per-pool near/backfill/full flag instead
+      set<int64_t> full_pool_ids;
+      set<int64_t> backfillfull_pool_ids;
+      set<int64_t> nearfull_pool_ids;
+      tmp.get_full_pools(g_ceph_context,
+                         &full_pool_ids,
+                         &backfillfull_pool_ids,
+                         &nearfull_pool_ids);
+      if (full_pool_ids.empty() ||
+          backfillfull_pool_ids.empty() ||
+          nearfull_pool_ids.empty()) {
+        // normal case - no nearfull, backfillfull or full osds
+        // try cancel any improper nearfull/backfillfull/full pool
+        // flags first
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL) &&
+              nearfull_pool_ids.empty()) {
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s nearfull flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              // load original pool info first!
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL) &&
+              backfillfull_pool_ids.empty()) {
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s backfillfull flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL) &&
+              full_pool_ids.empty()) {
+            if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+              // set by EQUOTA, skipping
+              continue;
+            }
+            dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                     << "'s full flag" << dendl;
+            if (pending_inc.new_pools.count(p) == 0) {
+              pending_inc.new_pools[p] = pool.second;
+            }
+            pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_FULL;
+          }
+        }
+      }
+      if (!full_pool_ids.empty()) {
+        dout(10) << __func__ << " marking pool(s) " << full_pool_ids
+                 << " as full" << dendl;
+        for (auto &p: full_pool_ids) {
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL)) {
+            continue;
+          }
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_FULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_FULL for pools which are no longer full too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p)) {
+            // skip pools we have just marked as full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL) ||
+               tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // don't touch if currently is not full
+            // or is running out of quota (and hence considered as full)
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s full flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_FULL;
+        }
+      }
+      if (!backfillfull_pool_ids.empty()) {
+        for (auto &p: backfillfull_pool_ids) {
+          if (full_pool_ids.count(p)) {
+            // skip pools we have already considered as full above
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // make sure FLAG_FULL is truly set, so we are safe not
+            // to set a extra (redundant) FLAG_BACKFILLFULL flag
+            assert(tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL));
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
+            // don't bother if pool is already marked as backfillfull
+            continue;
+          }
+          dout(10) << __func__ << " marking pool '" << tmp.pool_name[p]
+                   << "'s as backfillfull" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_BACKFILLFULL;
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_BACKFILLFULL for pools
+        // which are no longer backfillfull too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p) || backfillfull_pool_ids.count(p)) {
+            // skip pools we have just marked as backfillfull/full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
+            // and don't touch if currently is not backfillfull
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s backfillfull flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_BACKFILLFULL;
+        }
+      }
+      if (!nearfull_pool_ids.empty()) {
+        for (auto &p: nearfull_pool_ids) {
+          if (full_pool_ids.count(p) || backfillfull_pool_ids.count(p)) {
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+            // make sure FLAG_FULL is truly set, so we are safe not
+            // to set a extra (redundant) FLAG_NEARFULL flag
+            assert(tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_FULL));
+            continue;
+          }
+          if (tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL)) {
+            // don't bother if pool is already marked as nearfull
+            continue;
+          }
+          dout(10) << __func__ << " marking pool '" << tmp.pool_name[p]
+                   << "'s as nearfull" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = tmp.pools[p];
+          }
+          pending_inc.new_pools[p].flags |= pg_pool_t::FLAG_NEARFULL;
+        }
+        // cancel FLAG_NEARFULL for pools
+        // which are no longer nearfull too
+        for (auto &pool: tmp.get_pools()) {
+          auto p = pool.first;
+          if (full_pool_ids.count(p) ||
+              backfillfull_pool_ids.count(p) ||
+              nearfull_pool_ids.count(p)) {
+            // skip pools we have just marked as
+            // nearfull/backfillfull/full above
+            continue;
+          }
+          if (!tmp.get_pg_pool(p)->has_flag(pg_pool_t::FLAG_NEARFULL)) {
+            // and don't touch if currently is not nearfull
+            continue;
+          }
+          dout(10) << __func__ << " clearing pool '" << tmp.pool_name[p]
+                   << "'s nearfull flag" << dendl;
+          if (pending_inc.new_pools.count(p) == 0) {
+            pending_inc.new_pools[p] = pool.second;
+          }
+          pending_inc.new_pools[p].flags &= ~pg_pool_t::FLAG_NEARFULL;
+        }
       }
 
       // min_compat_client?
@@ -2666,6 +2846,10 @@ bool OSDMonitor::preprocess_pgtemp(MonOpRequestRef op)
     goto ignore;
   }
 
+  if (m->forced) {
+    return false;
+  }
+
   for (auto p = m->pg_temp.begin(); p != m->pg_temp.end(); ++p) {
     dout(20) << " " << p->first
             << (osdmap.pg_temp->count(p->first) ? osdmap.pg_temp->get(p->first) : empty)
@@ -3261,6 +3445,12 @@ epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next) cons
   dout(30) << __func__ << " osd." << osd << " next=" << next
           << " " << creating_pgs_by_osd_epoch << dendl;
   std::lock_guard<std::mutex> l(creating_pgs_lock);
+  if (creating_pgs_epoch <= creating_pgs.last_scan_epoch) {
+    dout(20) << __func__
+            << " not using stale creating_pgs@" << creating_pgs_epoch << dendl;
+    // the subscribers will be updated when the mapping is completed anyway
+    return next;
+  }
   auto creating_pgs_by_epoch = creating_pgs_by_osd_epoch.find(osd);
   if (creating_pgs_by_epoch == creating_pgs_by_osd_epoch.end())
     return next;
@@ -3319,6 +3509,15 @@ void OSDMonitor::tick()
       do_propose = true;
     }
   }
+  if (!osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) &&
+      osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
+      mon->mgrstatmon()->is_readable() &&
+      mon->mgrstatmon()->definitely_converted_snapsets()) {
+    dout(1) << __func__ << " all snapsets converted, setting purged_snapdirs"
+           << dendl;
+    add_flag(CEPH_OSDMAP_PURGED_SNAPDIRS);
+    do_propose = true;
+  }
 
   // mark osds down?
   if (check_failures(now))
@@ -3732,16 +3931,6 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
       }
     }
 
-    if (osdmap.crush->has_multirule_rulesets()) {
-      ostringstream ss;
-      ss << "CRUSH map contains multirule rulesets";
-      summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-      if (detail) {
-       ss << "; please manually fix the map";
-       detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-      }
-    }
-
     // Not using 'sortbitwise' and should be?
     if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE) &&
         (osdmap.get_up_osd_features() &
@@ -4923,6 +5112,34 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       osdmap.crush->list_rules(&ss);
       rdata.append(ss.str());
     }
+  } else if (prefix == "osd crush rule ls-by-class") {
+    string class_name;
+    cmd_getval(g_ceph_context, cmdmap, "class", class_name);
+    if (class_name.empty()) {
+      ss << "no class specified";
+      r = -EINVAL;
+      goto reply;
+    }
+    set<int> rules;
+    r = osdmap.crush->get_rules_by_class(class_name, &rules);
+    if (r < 0) {
+      ss << "failed to get rules by class '" << class_name << "'";
+      goto reply;
+    }
+    if (f) {
+      f->open_array_section("rules");
+      for (auto &rule: rules) {
+        f->dump_string("name", osdmap.crush->get_rule_name(rule));
+      }
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      ostringstream rs;
+      for (auto &rule: rules) {
+        rs << osdmap.crush->get_rule_name(rule) << "\n";
+      }
+      rdata.append(rs.str());
+    }
   } else if (prefix == "osd crush rule dump") {
     string name;
     cmd_getval(g_ceph_context, cmdmap, "name", name);
@@ -5034,14 +5251,24 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   } else if (prefix == "osd crush class ls-osd") {
     string name;
     cmd_getval(g_ceph_context, cmdmap, "class", name);
-    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
     set<int> osds;
     osdmap.crush->get_devices_by_class(name, &osds);
-    f->open_array_section("osds");
-    for (auto& osd : osds)
-      f->dump_int("osd", osd);
-    f->close_section();
-    f->flush(rdata);
+    if (f) {
+      f->open_array_section("osds");
+      for (auto &osd: osds)
+        f->dump_int("osd", osd);
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      bool first = true;
+      for (auto &osd : osds) {
+        if (!first)
+          ds << "\n";
+        first = false;
+        ds << osd;
+      }
+      rdata.append(ds);
+    }
   } else if (prefix == "osd erasure-code-profile ls") {
     const auto &profiles = osdmap.get_erasure_code_profiles();
     if (f)
@@ -5116,6 +5343,87 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       rs << "\n";
       rdata.append(rs.str());
     }
+  } else if (prefix == "osd pool application get") {
+    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty",
+                                                     "json-pretty"));
+    string pool_name;
+    cmd_getval(g_ceph_context, cmdmap, "pool", pool_name);
+    string app;
+    cmd_getval(g_ceph_context, cmdmap, "app", app);
+    string key;
+    cmd_getval(g_ceph_context, cmdmap, "key", key);
+
+    if (pool_name.empty()) {
+      // all
+      f->open_object_section("pools");
+      for (const auto &pool : osdmap.pools) {
+        std::string name("<unknown>");
+        const auto &pni = osdmap.pool_name.find(pool.first);
+        if (pni != osdmap.pool_name.end())
+          name = pni->second;
+        f->open_object_section(name.c_str());
+        for (auto &app_pair : pool.second.application_metadata) {
+          f->open_object_section(app_pair.first.c_str());
+          for (auto &kv_pair : app_pair.second) {
+            f->dump_string(kv_pair.first.c_str(), kv_pair.second);
+          }
+          f->close_section();
+        }
+        f->close_section(); // name
+      }
+      f->close_section(); // pools
+      f->flush(rdata);
+    } else {
+      int64_t pool = osdmap.lookup_pg_pool_name(pool_name.c_str());
+      if (pool < 0) {
+        ss << "unrecognized pool '" << pool_name << "'";
+        r = -ENOENT;
+        goto reply;
+      }
+      auto p = osdmap.get_pg_pool(pool);
+      // filter by pool
+      if (app.empty()) {
+        f->open_object_section(pool_name.c_str());
+        for (auto &app_pair : p->application_metadata) {
+          f->open_object_section(app_pair.first.c_str());
+          for (auto &kv_pair : app_pair.second) {
+            f->dump_string(kv_pair.first.c_str(), kv_pair.second);
+          }
+          f->close_section(); // application
+        }
+        f->close_section(); // pool_name
+        f->flush(rdata);
+        goto reply;
+      }
+
+      auto app_it = p->application_metadata.find(app);
+      if (app_it == p->application_metadata.end()) {
+        ss << "pool '" << pool_name << "' has no application '" << app << "'";
+        r = -ENOENT;
+        goto reply;
+      }
+      // filter by pool + app
+      if (key.empty()) {
+        f->open_object_section(app_it->first.c_str());
+        for (auto &kv_pair : app_it->second) {
+          f->dump_string(kv_pair.first.c_str(), kv_pair.second);
+        }
+        f->close_section(); // application
+        f->flush(rdata);
+        goto reply;
+      }
+      // filter by pool + app + key
+      auto key_it = app_it->second.find(key);
+      if (key_it == app_it->second.end()) {
+        ss << "application '" << app << "' on pool '" << pool_name
+           << "' does not have key '" << key << "'";
+        r = -ENOENT;
+        goto reply;
+      }
+      ss << key_it->second << "\n";
+      rdata.append(ss.str());
+      ss.str("");
+    }
   } else {
     // try prepare update
     return false;
@@ -5128,10 +5436,20 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
   return true;
 }
 
-void OSDMonitor::update_pool_flags(int64_t pool_id, uint64_t flags)
+void OSDMonitor::set_pool_flags(int64_t pool_id, uint64_t flags)
 {
-  const pg_pool_t *pool = osdmap.get_pg_pool(pool_id);
-  pending_inc.get_new_pool(pool_id, pool)->flags = flags;
+  pg_pool_t *pool = pending_inc.get_new_pool(pool_id,
+    osdmap.get_pg_pool(pool_id));
+  assert(pool);
+  pool->set_flag(flags);
+}
+
+void OSDMonitor::clear_pool_flags(int64_t pool_id, uint64_t flags)
+{
+  pg_pool_t *pool = pending_inc.get_new_pool(pool_id,
+    osdmap.get_pg_pool(pool_id));
+  assert(pool);
+  pool->unset_flag(flags);
 }
 
 bool OSDMonitor::update_pools_status()
@@ -5154,14 +5472,16 @@ bool OSDMonitor::update_pools_status()
       (pool.quota_max_bytes > 0 && (uint64_t)sum.num_bytes >= pool.quota_max_bytes) ||
       (pool.quota_max_objects > 0 && (uint64_t)sum.num_objects >= pool.quota_max_objects);
 
-    if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
+    if (pool.has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
       if (pool_is_full)
         continue;
 
       mon->clog->info() << "pool '" << pool_name
-                       << "' no longer full; removing FULL flag";
-
-      update_pool_flags(it->first, pool.get_flags() & ~pg_pool_t::FLAG_FULL);
+                       << "' no longer out of quota; removing NO_QUOTA flag";
+      // below we cancel FLAG_FULL too, we'll set it again in
+      // OSDMonitor::encode_pending if it still fails the osd-full checking.
+      clear_pool_flags(it->first,
+                       pg_pool_t::FLAG_FULL_NO_QUOTA | pg_pool_t::FLAG_FULL);
       ret = true;
     } else {
       if (!pool_is_full)
@@ -5179,7 +5499,14 @@ bool OSDMonitor::update_pools_status()
                          << " (reached quota's max_objects: "
                          << pool.quota_max_objects << ")";
       }
-      update_pool_flags(it->first, pool.get_flags() | pg_pool_t::FLAG_FULL);
+      // set both FLAG_FULL_NO_QUOTA and FLAG_FULL
+      // note that below we try to cancel FLAG_BACKFILLFULL/NEARFULL too
+      // since FLAG_FULL should always take precedence
+      set_pool_flags(it->first,
+                     pg_pool_t::FLAG_FULL_NO_QUOTA | pg_pool_t::FLAG_FULL);
+      clear_pool_flags(it->first,
+                       pg_pool_t::FLAG_NEARFULL |
+                       pg_pool_t::FLAG_BACKFILLFULL);
       ret = true;
     }
   }
@@ -5489,9 +5816,22 @@ int OSDMonitor::parse_erasure_code_profile(const vector<string> &erasure_code_pr
       user_map[*i] = string();
       (*erasure_code_profile_map)[*i] = string();
     } else {
-      const string key = i->substr(0, equal);
+      string key = i->substr(0, equal);
       equal++;
       const string value = i->substr(equal);
+      if (key.find("ruleset-") == 0) {
+       if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS &&
+           g_conf->get_val<bool>("mon_fixup_legacy_erasure_code_profiles")) {
+         mon->clog->warn() << "erasure code profile property '" << key
+                           << "' is no longer supported; try "
+                           << "'crush-" << key.substr(8) << "' instead";
+         key = string("crush-") + key.substr(8);
+       } else {
+         *ss << "property '" << key << "' is no longer supported; try "
+             << "'crush-" << key.substr(8) << "' instead";
+         return -EINVAL;
+       }
+      }
       user_map[key] = value;
       (*erasure_code_profile_map)[key] = value;
     }
@@ -5661,6 +6001,36 @@ int OSDMonitor::get_crush_rule(const string &rule_name,
   return 0;
 }
 
+int OSDMonitor::check_pg_num(int64_t pool, int pg_num, int size, ostream *ss)
+{
+  auto max_pgs_per_osd = g_conf->get_val<uint64_t>("mon_max_pg_per_osd");
+  auto num_osds = std::max(osdmap.get_num_in_osds(), 3u);   // assume min cluster size 3
+  auto max_pgs = max_pgs_per_osd * num_osds;
+  uint64_t projected = 0;
+  if (pool < 0) {
+    projected += pg_num * size;
+  }
+  for (const auto& i : osdmap.get_pools()) {
+    if (i.first == pool) {
+      projected += pg_num * size;
+    } else {
+      projected += i.second.get_pg_num() * i.second.get_size();
+    }
+  }
+  if (projected > max_pgs) {
+    if (pool >= 0) {
+      *ss << "pool id " << pool;
+    }
+    *ss << " pg_num " << pg_num << " size " << size
+       << " would mean " << projected
+       << " total pgs, which exceeds max " << max_pgs
+       << " (mon_max_pg_per_osd " << max_pgs_per_osd
+       << " * num_in_osds " << num_osds << ")";
+    return -ERANGE;
+  }
+  return 0;
+}
+
 /**
  * @param name The name of the new pool
  * @param auid The auid of the pool owner. Can be -1
@@ -5719,15 +6089,20 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
     _get_pending_crush(newcrush);
     ostringstream err;
     CrushTester tester(newcrush, err);
+    tester.set_min_x(0);
     tester.set_max_x(50);
     tester.set_rule(crush_rule);
+    auto start = ceph::coarse_mono_clock::now();
     r = tester.test_with_fork(g_conf->mon_lease);
+    auto duration = ceph::coarse_mono_clock::now() - start;
     if (r < 0) {
       dout(10) << " tester.test_with_fork returns " << r
               << ": " << err.str() << dendl;
       *ss << "crush test failed with " << r << ": " << err.str();
       return r;
     }
+    dout(10) << __func__ << " crush smoke test duration: "
+             << duration << dendl;
   }
   unsigned size, min_size;
   r = prepare_pool_size(pool_type, erasure_code_profile, &size, &min_size, ss);
@@ -5735,6 +6110,11 @@ int OSDMonitor::prepare_new_pool(string& name, uint64_t auid,
     dout(10) << " prepare_pool_size returns " << r << dendl;
     return r;
   }
+  r = check_pg_num(-1, pg_num, size, ss);
+  if (r) {
+    dout(10) << " prepare_pool_size returns " << r << dendl;
+    return r;
+  }
 
   if (!osdmap.crush->check_crush_rule(crush_rule, pool_type, size, *ss)) {
     return -EINVAL;
@@ -5911,6 +6291,10 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
       ss << "pool size must be between 1 and 10";
       return -EINVAL;
     }
+    int r = check_pg_num(pool, p.get_pg_num(), n, &ss);
+    if (r < 0) {
+      return r;
+    }
     p.size = n;
     if (n < p.min_size)
       p.min_size = n;
@@ -5980,6 +6364,10 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
          << " (you may adjust 'mon max pool pg num' for higher values)";
       return -ERANGE;
     }
+    int r = check_pg_num(pool, n, p.get_size(), &ss);
+    if (r) {
+      return r;
+    }
     string force;
     cmd_getval(g_ceph_context,cmdmap, "force", force);
     if (p.cache_mode != pg_pool_t::CACHEMODE_NONE &&
@@ -6109,7 +6497,15 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
     bloomp->set_fpp(f);
   } else if (var == "use_gmt_hitset") {
     if (val == "true" || (interr.empty() && n == 1)) {
-      if (!(osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_HITSET_GMT)) {
+      string force;
+      cmd_getval(g_ceph_context, cmdmap, "force", force);
+      if (!osdmap.get_num_up_osds() && force != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        return -EPERM;
+      }
+      if (!(osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_HITSET_GMT)
+          && force != "--yes-i-really-mean-it") {
        ss << "not all OSDs support GMT hit set.";
        return -EINVAL;
       }
@@ -6703,6 +7099,11 @@ int OSDMonitor::prepare_command_osd_create(
 {
   dout(10) << __func__ << " id " << id << " uuid " << uuid << dendl;
   assert(existing_id);
+  if (osdmap.is_destroyed(id)) {
+    ss << "ceph osd create has been deprecated. Please use ceph osd new "
+          "instead.";
+    return -EINVAL;
+  }
 
   if (uuid.is_zero()) {
     dout(10) << __func__ << " no uuid; assuming legacy `osd create`" << dendl;
@@ -7266,7 +7667,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       }
     }
 
-    if (crush.has_legacy_rulesets()) {
+    if (crush.has_legacy_rule_ids()) {
       err = -EINVAL;
       ss << "crush maps with ruleset != ruleid are no longer allowed";
       goto reply;
@@ -7276,16 +7677,9 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
 
-    const auto& osdmap_pools = osdmap.get_pools();
-    for (auto pit = osdmap_pools.begin(); pit != osdmap_pools.end(); ++pit) {
-      const int64_t pool_id = pit->first;
-      const pg_pool_t &pool = pit->second;
-      int ruleno = pool.get_crush_rule();
-      if (!crush.rule_exists(ruleno)) {
-       ss << " the crush rule no "<< ruleno << " for pool id " << pool_id << " is in use";
-       err = -EINVAL;
-       goto reply;
-      }
+    err = osdmap.validate_crush_rules(&crush, &ss);
+    if (err < 0) {
+      goto reply;
     }
 
     if (g_conf->mon_osd_crush_smoke_test) {
@@ -7294,8 +7688,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       dout(10) << " testing map" << dendl;
       stringstream ess;
       CrushTester tester(crush, ess);
+      tester.set_min_x(0);
       tester.set_max_x(50);
+      auto start = ceph::coarse_mono_clock::now();
       int r = tester.test_with_fork(g_conf->mon_lease);
+      auto duration = ceph::coarse_mono_clock::now() - start;
       if (r < 0) {
        dout(10) << " tester.test_with_fork returns " << r
                 << ": " << ess.str() << dendl;
@@ -7303,13 +7700,34 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        err = r;
        goto reply;
       }
-      dout(10) << " crush test result " << ess.str() << dendl;
+      dout(10) << __func__ << " crush somke test duration: "
+               << duration << ", result: " << ess.str() << dendl;
     }
 
     pending_inc.crush = data;
     ss << osdmap.get_crush_version() + 1;
     goto update;
 
+  } else if (prefix == "osd crush set-all-straw-buckets-to-straw2") {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    for (int b = 0; b < newcrush.get_max_buckets(); ++b) {
+      int bid = -1 - b;
+      if (newcrush.bucket_exists(bid) &&
+         newcrush.get_bucket_alg(bid)) {
+       dout(20) << " bucket " << bid << " is straw, can convert" << dendl;
+       newcrush.bucket_set_alg(bid, CRUSH_BUCKET_STRAW2);
+      }
+    }
+    if (!validate_crush_against_features(&newcrush, ss)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
+                                             get_last_committed() + 1));
+    return true;
   } else if (prefix == "osd crush set-device-class") {
     if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
       ss << "you must complete the upgrade and 'ceph osd require-osd-release "
@@ -7470,16 +7888,11 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
-
-    if (!newcrush.class_exists(srcname)) {
-      err = -ENOENT;
-      ss << "class '" << srcname << "' does not exist";
-      goto reply;
-    }
-
-    if (newcrush.class_exists(dstname)) {
-      err = -EEXIST;
-      ss << "class '" << dstname << "' already exists";
+    if (!newcrush.class_exists(srcname) && newcrush.class_exists(dstname)) {
+      // suppose this is a replay and return success
+      // so command is idempotent
+      ss << "already renamed to '" << dstname << "'";
+      err = 0;
       goto reply;
     }
 
@@ -8451,7 +8864,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       // FIXME: this is ok in some situations, but let's not bother with that
       // complexity now.
       int ruleset = newcrush.get_rule_mask_ruleset(ruleno);
-      if (osdmap.crush_ruleset_in_use(ruleset)) {
+      if (osdmap.crush_rule_in_use(ruleset)) {
        ss << "crush ruleset " << name << " " << ruleset << " is in use";
        err = -EBUSY;
        goto reply;
@@ -8470,6 +8883,45 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
                                              get_last_committed() + 1));
     return true;
 
+  } else if (prefix == "osd crush rule rename") {
+    string srcname;
+    string dstname;
+    cmd_getval(g_ceph_context, cmdmap, "srcname", srcname);
+    cmd_getval(g_ceph_context, cmdmap, "dstname", dstname);
+    if (srcname.empty() || dstname.empty()) {
+      ss << "must specify both source rule name and destination rule name";
+      err = -EINVAL;
+      goto reply;
+    }
+    if (srcname == dstname) {
+      ss << "destination rule name is equal to source rule name";
+      err = 0;
+      goto reply;
+    }
+
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    if (!newcrush.rule_exists(srcname) && newcrush.rule_exists(dstname)) {
+      // srcname does not exist and dstname already exists
+      // suppose this is a replay and return success
+      // (so this command is idempotent)
+      ss << "already renamed to '" << dstname << "'";
+      err = 0;
+      goto reply;
+    }
+
+    err = newcrush.rename_rule(srcname, dstname, &ss);
+    if (err < 0) {
+      // ss has reason for failure
+      goto reply;
+    }
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    getline(ss, rs);
+    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
+                               get_last_committed() + 1));
+    return true;
+
   } else if (prefix == "osd setmaxosd") {
     int64_t newmax;
     if (!cmd_getval(g_ceph_context, cmdmap, "newmax", newmax)) {
@@ -8617,6 +9069,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     return prepare_unset_flag(op, CEPH_OSDMAP_PAUSERD | CEPH_OSDMAP_PAUSEWR);
 
   } else if (prefix == "osd set") {
+    string sure;
+    cmd_getval(g_ceph_context, cmdmap, "sure", sure);
     string key;
     cmd_getval(g_ceph_context, cmdmap, "key", key);
     if (key == "full")
@@ -8644,7 +9098,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     else if (key == "notieragent")
       return prepare_set_flag(op, CEPH_OSDMAP_NOTIERAGENT);
     else if (key == "sortbitwise") {
-      if (osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT) {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
+      if ((osdmap.get_up_osd_features() & CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT)
+          || sure == "--yes-i-really-mean-it") {
        return prepare_set_flag(op, CEPH_OSDMAP_SORTBITWISE);
       } else {
        ss << "not all up OSDs have OSD_BITWISE_HOBJ_SORT feature";
@@ -8652,7 +9113,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        goto reply;
       }
     } else if (key == "recovery_deletes") {
-      if (HAVE_FEATURE(osdmap.get_up_osd_features(), OSD_RECOVERY_DELETES)) {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
+      if (HAVE_FEATURE(osdmap.get_up_osd_features(), OSD_RECOVERY_DELETES)
+          || sure == "--yes-i-really-mean-it") {
        return prepare_set_flag(op, CEPH_OSDMAP_RECOVERY_DELETES);
       } else {
        ss << "not all up OSDs have OSD_RECOVERY_DELETES feature";
@@ -8660,6 +9128,12 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        goto reply;
       }
     } else if (key == "require_jewel_osds") {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_jewel_osds";
        err = -EPERM;
@@ -8668,13 +9142,20 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        ss << "require_osd_release is already >= jewel";
        err = 0;
        goto reply;
-      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_JEWEL)) {
+      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_JEWEL)
+                 || sure == "--yes-i-really-mean-it") {
        return prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_JEWEL);
       } else {
        ss << "not all up OSDs have CEPH_FEATURE_SERVER_JEWEL feature";
        err = -EPERM;
       }
     } else if (key == "require_kraken_osds") {
+      if (!osdmap.get_num_up_osds() && sure != "--yes-i-really-mean-it") {
+        ss << "Not advisable to continue since no OSDs are up. Pass "
+           << "--yes-i-really-mean-it if you really wish to continue.";
+        err = -EPERM;
+        goto reply;
+      }
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_kraken_osds";
        err = -EPERM;
@@ -8683,7 +9164,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        ss << "require_osd_release is already >= kraken";
        err = 0;
        goto reply;
-      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_KRAKEN)) {
+      } else if (HAVE_FEATURE(osdmap.get_up_osd_features(), SERVER_KRAKEN)
+                 || sure == "--yes-i-really-mean-it") {
        bool r = prepare_set_flag(op, CEPH_OSDMAP_REQUIRE_KRAKEN);
        // ensure JEWEL is also set
        pending_inc.new_flags |= CEPH_OSDMAP_REQUIRE_JEWEL;
@@ -8732,6 +9214,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
   } else if (prefix == "osd require-osd-release") {
     string release;
     cmd_getval(g_ceph_context, cmdmap, "release", release);
+    string sure;
+    cmd_getval(g_ceph_context, cmdmap, "sure", sure);
     if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
       ss << "the sortbitwise flag must be set first";
       err = -EPERM;
@@ -8848,7 +9332,8 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             if (osdmap.is_up(osd)) {
               msg << ", while it was still marked up";
             } else {
-              msg << ", after it was down for " << int(down_pending_out[osd].sec())
+              auto period = ceph_clock_now() - down_pending_out[osd];
+              msg << ", after it was down for " << int(period.sec())
                   << " seconds";
             }