]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/OSDMonitor.cc
update sources to v12.1.2
[ceph.git] / ceph / src / mon / OSDMonitor.cc
index 4f05449bc11b7ab6d0b23c6ac30460f2fbb06e85..b2f7b3e9882b3120ee0e72e89680fb92932083e6 100644 (file)
 
 #include "json_spirit/json_spirit_reader.h"
 
+#include <boost/algorithm/string/predicate.hpp>
+
 #define dout_subsys ceph_subsys_mon
 #define OSD_PG_CREATING_PREFIX "osd_pg_creating"
 
+namespace {
+
+const uint32_t MAX_POOL_APPLICATIONS = 4;
+const uint32_t MAX_POOL_APPLICATION_KEYS = 64;
+const uint32_t MAX_POOL_APPLICATION_LENGTH = 128;
+
+} // anonymous namespace
+
 void LastEpochClean::Lec::report(ps_t ps, epoch_t last_epoch_clean)
 {
   if (epoch_by_pg.size() <= ps) {
@@ -230,6 +240,7 @@ void OSDMonitor::create_initial()
     derr << __func__ << " mon_debug_no_require_luminous=true" << dendl;
   } else {
     newmap.require_osd_release = CEPH_RELEASE_LUMINOUS;
+    newmap.flags |= CEPH_OSDMAP_RECOVERY_DELETES;
     newmap.full_ratio = g_conf->mon_osd_full_ratio;
     if (newmap.full_ratio > 1.0) newmap.full_ratio /= 100;
     newmap.backfillfull_ratio = g_conf->mon_osd_backfillfull_ratio;
@@ -739,7 +750,9 @@ OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc)
   }
   dout(10) << __func__ << " queue remaining: " << pending_creatings.queue.size()
           << " pools" << dendl;
-  dout(10) << __func__ << " " << pending_creatings.pgs.size() - total
+  dout(10) << __func__
+          << " " << (pending_creatings.pgs.size() - total)
+          << "/" << pending_creatings.pgs.size()
           << " pgs added from queued pools" << dendl;
   return pending_creatings;
 }
@@ -873,7 +886,7 @@ void OSDMonitor::prime_pg_temp(
   int next_up_primary, next_acting_primary;
   next.pg_to_up_acting_osds(pgid, &next_up, &next_up_primary,
                            &next_acting, &next_acting_primary);
-  if (acting == next_acting)
+  if (acting == next_acting && next_up != next_acting)
     return;  // no change since last epoch
 
   if (acting.empty())
@@ -882,6 +895,12 @@ void OSDMonitor::prime_pg_temp(
   if (pool && acting.size() < pool->min_size)
     return;  // can be no worse off than before
 
+  if (next_up == next_acting) {
+    acting.clear();
+    dout(20) << __func__ << "next_up === next_acting now, clear pg_temp"
+             << dendl;
+  }
+
   dout(20) << __func__ << " " << pgid << " " << up << "/" << acting
           << " -> " << next_up << "/" << next_acting
           << ", priming " << acting
@@ -931,6 +950,18 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   }
   mapping_job.reset();
 
+  // ensure we don't have blank new_state updates.  these are interrpeted as
+  // CEPH_OSD_UP (and almost certainly not what we want!).
+  auto p = pending_inc.new_state.begin();
+  while (p != pending_inc.new_state.end()) {
+    if (p->second == 0) {
+      dout(10) << "new_state for osd." << p->first << " is 0, removing" << dendl;
+      p = pending_inc.new_state.erase(p);
+    } else {
+      ++p;
+    }
+  }
+
   bufferlist bl;
 
   {
@@ -999,6 +1030,74 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
            pending_inc.new_erasure_code_profiles[p.first] = newprofile;
          }
        }
+
+        // auto-enable pool applications upon upgrade
+        // NOTE: this can be removed post-Luminous assuming upgrades need to
+        // proceed through Luminous
+        for (auto &pool_pair : tmp.pools) {
+          int64_t pool_id = pool_pair.first;
+          pg_pool_t pg_pool = pool_pair.second;
+          if (pg_pool.is_tier()) {
+            continue;
+          }
+
+          std::string pool_name = tmp.get_pool_name(pool_id);
+          uint32_t match_count = 0;
+
+          // CephFS
+          FSMap const &pending_fsmap = mon->mdsmon()->get_pending();
+          if (pending_fsmap.pool_in_use(pool_id)) {
+            dout(10) << __func__ << " auto-enabling CephFS on pool '"
+                     << pool_name << "'" << dendl;
+            pg_pool.application_metadata.insert(
+              {pg_pool_t::APPLICATION_NAME_CEPHFS, {}});
+            ++match_count;
+          }
+
+          // RBD heuristics (default OpenStack pool names from docs and
+          // ceph-ansible)
+          if (boost::algorithm::contains(pool_name, "rbd") ||
+              pool_name == "images" || pool_name == "volumes" ||
+              pool_name == "backups" || pool_name == "vms") {
+            dout(10) << __func__ << " auto-enabling RBD on pool '"
+                     << pool_name << "'" << dendl;
+            pg_pool.application_metadata.insert(
+              {pg_pool_t::APPLICATION_NAME_RBD, {}});
+            ++match_count;
+          }
+
+          // RGW heuristics
+          if (boost::algorithm::contains(pool_name, ".rgw") ||
+              boost::algorithm::contains(pool_name, ".log") ||
+              boost::algorithm::contains(pool_name, ".intent-log") ||
+              boost::algorithm::contains(pool_name, ".usage") ||
+              boost::algorithm::contains(pool_name, ".users")) {
+            dout(10) << __func__ << " auto-enabling RGW on pool '"
+                     << pool_name << "'" << dendl;
+            pg_pool.application_metadata.insert(
+              {pg_pool_t::APPLICATION_NAME_RGW, {}});
+            ++match_count;
+          }
+
+          // OpenStack gnocchi (from ceph-ansible)
+          if (pool_name == "metrics" && match_count == 0) {
+            dout(10) << __func__ << " auto-enabling OpenStack Gnocchi on pool '"
+                     << pool_name << "'" << dendl;
+            pg_pool.application_metadata.insert({"openstack_gnocchi", {}});
+            ++match_count;
+          }
+
+          if (match_count == 1) {
+            pg_pool.last_change = pending_inc.epoch;
+            pending_inc.new_pools[pool_id] = pg_pool;
+          } else if (match_count > 1) {
+            auto pstat = mon->pgservice->get_pool_stat(pool_id);
+            if (pstat != nullptr && pstat->stats.sum.num_objects > 0) {
+              mon->clog->info() << "unable to auto-enable application for pool "
+                                << "'" << pool_name << "'";
+            }
+          }
+        }
       }
     }
   }
@@ -1147,21 +1246,26 @@ int OSDMonitor::load_metadata(int osd, map<string, string>& m, ostream *err)
   return 0;
 }
 
-void OSDMonitor::count_metadata(const string& field, Formatter *f)
+void OSDMonitor::count_metadata(const string& field, map<string,int> *out)
 {
-  map<string,int> by_val;
   for (int osd = 0; osd < osdmap.get_max_osd(); ++osd) {
     if (osdmap.is_up(osd)) {
       map<string,string> meta;
       load_metadata(osd, meta, nullptr);
       auto p = meta.find(field);
       if (p == meta.end()) {
-       by_val["unknown"]++;
+       (*out)["unknown"]++;
       } else {
-       by_val[p->second]++;
+       (*out)[p->second]++;
       }
     }
   }
+}
+
+void OSDMonitor::count_metadata(const string& field, Formatter *f)
+{
+  map<string,int> by_val;
+  count_metadata(field, &by_val);
   f->open_object_section(field.c_str());
   for (auto& p : by_val) {
     f->dump_int(p.first.c_str(), p.second);
@@ -2068,6 +2172,14 @@ bool OSDMonitor::preprocess_boot(MonOpRequestRef op)
     goto ignore;
   }
 
+  if (osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES) &&
+      !(m->osd_features & CEPH_FEATURE_OSD_RECOVERY_DELETES)) {
+    mon->clog->info() << "disallowing boot of OSD "
+                     << m->get_orig_source_inst()
+                     << " because 'recovery_deletes' osdmap flag is set and OSD lacks the OSD_RECOVERY_DELETES feature";
+    goto ignore;
+  }
+
   if (any_of(osdmap.get_pools().begin(),
             osdmap.get_pools().end(),
             [](const std::pair<int64_t,pg_pool_t>& pool)
@@ -3041,6 +3153,27 @@ void OSDMonitor::check_pg_creates_sub(Subscription *sub)
   }
 }
 
+void OSDMonitor::do_application_enable(int64_t pool_id,
+                                       const std::string &app_name)
+{
+  assert(paxos->is_plugged());
+
+  dout(20) << __func__ << ": pool_id=" << pool_id << ", app_name=" << app_name
+           << dendl;
+
+  auto pp = osdmap.get_pg_pool(pool_id);
+  assert(pp != nullptr);
+
+  pg_pool_t p = *pp;
+  if (pending_inc.new_pools.count(pool_id)) {
+    p = pending_inc.new_pools[pool_id];
+  }
+
+  p.application_metadata.insert({app_name, {}});
+  p.last_change = pending_inc.epoch;
+  pending_inc.new_pools[pool_id] = p;
+}
+
 unsigned OSDMonitor::scan_for_creating_pgs(
   const mempool::osdmap::map<int64_t,pg_pool_t>& pools,
   const mempool::osdmap::set<int64_t>& removed_pools,
@@ -3084,11 +3217,11 @@ void OSDMonitor::update_creating_pgs()
           << creating_pgs.queue.size() << " pools in queue" << dendl;
   decltype(creating_pgs_by_osd_epoch) new_pgs_by_osd_epoch;
   std::lock_guard<std::mutex> l(creating_pgs_lock);
-  for (auto& pg : creating_pgs.pgs) {
+  for (const auto& pg : creating_pgs.pgs) {
     int acting_primary = -1;
     auto pgid = pg.first;
     auto mapped = pg.second.first;
-    dout(20) << __func__ << " looking up " << pgid << dendl;
+    dout(20) << __func__ << " looking up " << pgid << "@" << mapped << dendl;
     mapping.get(pgid, nullptr, nullptr, nullptr, &acting_primary);
     // check the previous creating_pgs, look for the target to whom the pg was
     // previously mapped
@@ -3113,14 +3246,14 @@ void OSDMonitor::update_creating_pgs()
       }
     }
     dout(10) << __func__ << " will instruct osd." << acting_primary
-            << " to create " << pgid << dendl;
+            << " to create " << pgid << "@" << mapped << dendl;
     new_pgs_by_osd_epoch[acting_primary][mapped].insert(pgid);
   }
   creating_pgs_by_osd_epoch = std::move(new_pgs_by_osd_epoch);
   creating_pgs_epoch = mapping.get_epoch();
 }
 
-epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
+epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next) const
 {
   dout(30) << __func__ << " osd." << osd << " next=" << next
           << " " << creating_pgs_by_osd_epoch << dendl;
@@ -3144,11 +3277,12 @@ epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
        m = new MOSDPGCreate(creating_pgs_epoch);
       // Need the create time from the monitor using its clock to set
       // last_scrub_stamp upon pg creation.
-      const auto& creation = creating_pgs.pgs[pg];
-      m->mkpg.emplace(pg, pg_create_t{creation.first, pg, 0});
-      m->ctimes.emplace(pg, creation.second);
+      auto create = creating_pgs.pgs.find(pg);
+      assert(create != creating_pgs.pgs.end());
+      m->mkpg.emplace(pg, pg_create_t{create->second.first, pg, 0});
+      m->ctimes.emplace(pg, create->second.second);
       dout(20) << __func__ << " will create " << pg
-              << " at " << creation.first << dendl;
+              << " at " << create->second.first << dendl;
     }
   }
   if (!m) {
@@ -3237,8 +3371,15 @@ void OSDMonitor::tick()
          }
        }
 
-       if (g_conf->mon_osd_down_out_interval > 0 &&
-           down.sec() >= grace) {
+        bool down_out = !osdmap.is_destroyed(o) &&
+          g_conf->mon_osd_down_out_interval > 0 && down.sec() >= grace;
+        bool destroyed_out = osdmap.is_destroyed(o) &&
+          g_conf->mon_osd_destroyed_out_interval > 0 &&
+        // this is not precise enough as we did not make a note when this osd
+        // was marked as destroyed, but let's not bother with that
+        // complexity for now.
+          down.sec() >= g_conf->mon_osd_destroyed_out_interval;
+        if (down_out || destroyed_out) {
          dout(10) << "tick marking osd." << o << " OUT after " << down
                   << " sec (target " << grace << " = " << orig_grace << " + " << my_grace << ")" << dendl;
          pending_inc.new_weight[o] = CEPH_OSD_OUT;
@@ -3329,6 +3470,10 @@ bool OSDMonitor::handle_osd_timeouts(const utime_t &now,
 
   for (int i=0; i < max_osd; ++i) {
     dout(30) << __func__ << ": checking up on osd " << i << dendl;
+    if (!osdmap.exists(i)) {
+      last_osd_report.erase(i); // if any
+      continue;
+    }
     if (!osdmap.is_up(i))
       continue;
     const std::map<int,utime_t>::const_iterator t = last_osd_report.find(i);
@@ -3504,137 +3649,6 @@ void OSDMonitor::get_health(list<pair<health_status_t,string> >& summary,
       }
     }
 
-    if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      // An osd could configure failsafe ratio, to something different
-      // but for now assume it is the same here.
-      float fsr = g_conf->osd_failsafe_full_ratio;
-      if (fsr > 1.0) fsr /= 100;
-      float fr = osdmap.get_full_ratio();
-      float br = osdmap.get_backfillfull_ratio();
-      float nr = osdmap.get_nearfull_ratio();
-
-      bool out_of_order = false;
-      // These checks correspond to how OSDService::check_full_status() in an OSD
-      // handles the improper setting of these values.
-      if (br < nr) {
-        out_of_order = true;
-        if (detail) {
-         ostringstream ss;
-         ss << "backfillfull_ratio (" << br << ") < nearfull_ratio (" << nr << "), increased";
-         detail->push_back(make_pair(HEALTH_ERR, ss.str()));
-        }
-        br = nr;
-      }
-      if (fr < br) {
-        out_of_order = true;
-        if (detail) {
-         ostringstream ss;
-         ss << "full_ratio (" << fr << ") < backfillfull_ratio (" << br << "), increased";
-         detail->push_back(make_pair(HEALTH_ERR, ss.str()));
-        }
-        fr = br;
-      }
-      if (fsr < fr) {
-        out_of_order = true;
-        if (detail) {
-         ostringstream ss;
-         ss << "osd_failsafe_full_ratio (" << fsr << ") < full_ratio (" << fr << "), increased";
-         detail->push_back(make_pair(HEALTH_ERR, ss.str()));
-        }
-      }
-      if (out_of_order) {
-       ostringstream ss;
-       ss << "Full ratio(s) out of order";
-       summary.push_back(make_pair(HEALTH_ERR, ss.str()));
-      }
-
-      set<int> full, backfillfull, nearfull;
-      osdmap.get_full_osd_counts(&full, &backfillfull, &nearfull);
-      if (full.size()) {
-       ostringstream ss;
-       ss << full.size() << " full osd(s)";
-       summary.push_back(make_pair(HEALTH_ERR, ss.str()));
-      }
-      if (backfillfull.size()) {
-       ostringstream ss;
-       ss << backfillfull.size() << " backfillfull osd(s)";
-       summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-      }
-      if (nearfull.size()) {
-       ostringstream ss;
-       ss << nearfull.size() << " nearfull osd(s)";
-       summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-      }
-      if (detail) {
-        for (auto& i: full) {
-          ostringstream ss;
-          ss << "osd." << i << " is full";
-         detail->push_back(make_pair(HEALTH_ERR, ss.str()));
-        }
-        for (auto& i: backfillfull) {
-          ostringstream ss;
-          ss << "osd." << i << " is backfill full";
-         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-        for (auto& i: nearfull) {
-          ostringstream ss;
-          ss << "osd." << i << " is near full";
-         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-
-      // warn if there is any noup osds.
-      vector<int> noup_osds;
-      osdmap.get_noup_osds(&noup_osds);
-      if (noup_osds.size()) {
-        ostringstream ss;
-        ss << noup_osds.size() << " noup osd(s)";
-        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-        if (detail) {
-          ss << ": " << noup_osds;
-          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-
-      // warn if there is any nodown osds.
-      vector<int> nodown_osds;
-      osdmap.get_nodown_osds(&nodown_osds);
-      if (nodown_osds.size()) {
-        ostringstream ss;
-        ss << nodown_osds.size() << " nodown osd(s)";
-        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-        if (detail) {
-          ss << ": " << nodown_osds;
-          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-
-      // warn if there is any noin osds.
-      vector<int> noin_osds;
-      osdmap.get_noin_osds(&noin_osds);
-      if (noin_osds.size()) {
-        ostringstream ss;
-        ss << noin_osds.size() << " noin osd(s)";
-        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-        if (detail) {
-          ss << ": " << noin_osds;
-          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-
-      // warn if there is any noout osds.
-      vector<int> noout_osds;
-      osdmap.get_noout_osds(&noout_osds);
-      if (noout_osds.size()) {
-        ostringstream ss;
-        ss << noout_osds.size() << " noout osd(s)";
-        summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-        if (detail) {
-          ss << ": " << noout_osds;
-          detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-        }
-      }
-    }
     // note: we leave it to ceph-mgr to generate details health warnings
     // with actual osd utilizations
 
@@ -3983,6 +3997,8 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          filter |= OSDMap::DUMP_IN;
        } else if (s == "out") {
          filter |= OSDMap::DUMP_OUT;
+       } else if (s == "destroyed") {
+         filter |= OSDMap::DUMP_DESTROYED;
        } else {
          ss << "unrecognized state '" << s << "'";
          r = -EINVAL;
@@ -3990,10 +4006,18 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
        }
       }
       if ((filter & (OSDMap::DUMP_IN|OSDMap::DUMP_OUT)) ==
-         (OSDMap::DUMP_IN|OSDMap::DUMP_OUT) ||
-         (filter & (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) ==
-         (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) {
-       ss << "cannot specify both up and down or both in and out";
+         (OSDMap::DUMP_IN|OSDMap::DUMP_OUT)) {
+        ss << "cannot specify both 'in' and 'out'";
+        r = -EINVAL;
+        goto reply;
+      }
+      if (((filter & (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) ==
+          (OSDMap::DUMP_UP|OSDMap::DUMP_DOWN)) ||
+           ((filter & (OSDMap::DUMP_UP|OSDMap::DUMP_DESTROYED)) ==
+           (OSDMap::DUMP_UP|OSDMap::DUMP_DESTROYED)) ||
+           ((filter & (OSDMap::DUMP_DOWN|OSDMap::DUMP_DESTROYED)) ==
+           (OSDMap::DUMP_DOWN|OSDMap::DUMP_DESTROYED))) {
+       ss << "can specify only one of 'up', 'down' and 'destroyed'";
        r = -EINVAL;
        goto reply;
       }
@@ -4464,6 +4488,7 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       CACHE_TARGET_DIRTY_RATIO, CACHE_TARGET_DIRTY_HIGH_RATIO,
       CACHE_MIN_FLUSH_AGE, CACHE_MIN_EVICT_AGE,
       MIN_READ_RECENCY_FOR_PROMOTE,
+      MIN_WRITE_RECENCY_FOR_PROMOTE,
       HIT_SET_GRADE_DECAY_RATE, HIT_SET_SEARCH_LAST_N
     };
     const choices_set_t ONLY_ERASURE_CHOICES = {
@@ -4514,9 +4539,18 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       for(choices_set_t::const_iterator it = selected_choices.begin();
          it != selected_choices.end(); ++it) {
        choices_map_t::const_iterator i;
-       f->open_object_section("pool");
-       f->dump_string("pool", poolstr);
-       f->dump_int("pool_id", pool);
+        for (i = ALL_CHOICES.begin(); i != ALL_CHOICES.end(); ++i) {
+          if (i->second == *it) {
+            break;
+          }
+        }
+        assert(i != ALL_CHOICES.end());
+        bool pool_opt = pool_opts_t::is_opt_name(i->first);
+        if (!pool_opt) {
+          f->open_object_section("pool");
+          f->dump_string("pool", poolstr);
+          f->dump_int("pool_id", pool);
+        }
        switch(*it) {
          case PG_NUM:
            f->dump_int("pg_num", p->get_pg_num());
@@ -4552,11 +4586,6 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          case WRITE_FADVISE_DONTNEED:
          case NOSCRUB:
          case NODEEP_SCRUB:
-           for (i = ALL_CHOICES.begin(); i != ALL_CHOICES.end(); ++i) {
-             if (i->second == *it)
-               break;
-           }
-           assert(i != ALL_CHOICES.end());
            f->dump_string(i->first.c_str(),
                           p->has_flag(pg_pool_t::get_flag_by_name(i->first)) ?
                           "true" : "false");
@@ -4655,23 +4684,27 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
          case CSUM_TYPE:
          case CSUM_MAX_BLOCK:
          case CSUM_MIN_BLOCK:
-           for (i = ALL_CHOICES.begin(); i != ALL_CHOICES.end(); ++i) {
-             if (i->second == *it)
-               break;
-           }
-           assert(i != ALL_CHOICES.end());
-            if(*it == CSUM_TYPE) {
-              int val;
-              p->opts.get(pool_opts_t::CSUM_TYPE, &val);
-              f->dump_string(i->first.c_str(), Checksummer::get_csum_type_string(val));
-            }
-           else {
-              p->opts.dump(i->first, f.get());
+            pool_opts_t::key_t key = pool_opts_t::get_opt_desc(i->first).key;
+            if (p->opts.is_set(key)) {
+              f->open_object_section("pool");
+              f->dump_string("pool", poolstr);
+              f->dump_int("pool_id", pool);
+              if(*it == CSUM_TYPE) {
+                int val;
+                p->opts.get(pool_opts_t::CSUM_TYPE, &val);
+                f->dump_string(i->first.c_str(), Checksummer::get_csum_type_string(val));
+              } else {
+                p->opts.dump(i->first, f.get());
+              }
+              f->close_section();
+              f->flush(rdata);
             }
             break;
        }
-       f->close_section();
-       f->flush(rdata);
+        if (!pool_opt) {
+         f->close_section();
+         f->flush(rdata);
+        }
       }
 
     } else /* !f */ {
@@ -4877,16 +4910,16 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     r = 0;
   } else if (prefix == "osd crush rule list" ||
             prefix == "osd crush rule ls") {
-    string format;
-    cmd_getval(g_ceph_context, cmdmap, "format", format);
-    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
-    f->open_array_section("rules");
-    osdmap.crush->list_rules(f.get());
-    f->close_section();
-    ostringstream rs;
-    f->flush(rs);
-    rs << "\n";
-    rdata.append(rs.str());
+    if (f) {
+      f->open_array_section("rules");
+      osdmap.crush->list_rules(f.get());
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      ostringstream ss;
+      osdmap.crush->list_rules(&ss);
+      rdata.append(ss.str());
+    }
   } else if (prefix == "osd crush rule dump") {
     string name;
     cmd_getval(g_ceph_context, cmdmap, "name", name);
@@ -4933,11 +4966,24 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
     rs << "\n";
     rdata.append(rs.str());
   } else if (prefix == "osd crush tree") {
-    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
-    f->open_array_section("crush_map_roots");
-    osdmap.crush->dump_tree(f.get());
-    f->close_section();
-    f->flush(rdata);
+    string shadow;
+    cmd_getval(g_ceph_context, cmdmap, "shadow", shadow);
+    bool show_shadow = shadow == "--show-shadow";
+    boost::scoped_ptr<Formatter> f(Formatter::create(format));
+    if (f) {
+      osdmap.crush->dump_tree(nullptr,
+                              f.get(),
+                              osdmap.get_pool_names(),
+                              show_shadow);
+      f->flush(rdata);
+    } else {
+      ostringstream ss;
+      osdmap.crush->dump_tree(&ss,
+                              nullptr,
+                              osdmap.get_pool_names(),
+                              show_shadow);
+      rdata.append(ss.str());
+    }
   } else if (prefix == "osd crush class ls") {
     boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json-pretty"));
     f->open_array_section("crush_classes");
@@ -4973,6 +5019,37 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
       rs << "\n";
       rdata.append(rs.str());
     }
+  } else if (prefix == "osd crush weight-set ls") {
+    boost::scoped_ptr<Formatter> f(Formatter::create(format));
+    if (f) {
+      f->open_array_section("weight_sets");
+      if (osdmap.crush->have_choose_args(CrushWrapper::DEFAULT_CHOOSE_ARGS)) {
+       f->dump_string("pool", "(compat)");
+      }
+      for (auto& i : osdmap.crush->choose_args) {
+       if (i.first >= 0) {
+         f->dump_string("pool", osdmap.get_pool_name(i.first));
+       }
+      }
+      f->close_section();
+      f->flush(rdata);
+    } else {
+      ostringstream rs;
+      if (osdmap.crush->have_choose_args(CrushWrapper::DEFAULT_CHOOSE_ARGS)) {
+       rs << "(compat)\n";
+      }
+      for (auto& i : osdmap.crush->choose_args) {
+       if (i.first >= 0) {
+         rs << osdmap.get_pool_name(i.first) << "\n";
+       }
+      }
+      rdata.append(rs.str());
+    }
+  } else if (prefix == "osd crush weight-set dump") {
+    boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty",
+                                                    "json-pretty"));
+    osdmap.crush->dump_choose_args(f.get());
+    f->flush(rdata);
   } else if (prefix == "osd erasure-code-profile get") {
     string name;
     cmd_getval(g_ceph_context, cmdmap, "name", name);
@@ -5857,6 +5934,12 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
        return -EEXIST;
       return 0;
     }
+    if (n > (unsigned)g_conf->mon_max_pool_pg_num) {
+      ss << "'pg_num' must be greater than 0 and less than or equal to "
+         << g_conf->mon_max_pool_pg_num
+         << " (you may adjust 'mon max pool pg num' for higher values)";
+      return -ERANGE;
+    }
     string force;
     cmd_getval(g_ceph_context,cmdmap, "force", force);
     if (p.cache_mode != pg_pool_t::CACHEMODE_NONE &&
@@ -6208,6 +6291,158 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
   return 0;
 }
 
+int OSDMonitor::prepare_command_pool_application(const string &prefix,
+                                                 map<string,cmd_vartype> &cmdmap,
+                                                 stringstream& ss)
+{
+  string pool_name;
+  cmd_getval(g_ceph_context, cmdmap, "pool", pool_name);
+  int64_t pool = osdmap.lookup_pg_pool_name(pool_name.c_str());
+  if (pool < 0) {
+    ss << "unrecognized pool '" << pool_name << "'";
+    return -ENOENT;
+  }
+
+  pg_pool_t p = *osdmap.get_pg_pool(pool);
+  if (pending_inc.new_pools.count(pool)) {
+    p = pending_inc.new_pools[pool];
+  }
+
+  string app;
+  cmd_getval(g_ceph_context, cmdmap, "app", app);
+  bool app_exists = (p.application_metadata.count(app) > 0);
+
+  if (boost::algorithm::ends_with(prefix, "enable")) {
+    if (app.empty()) {
+      ss << "application name must be provided";
+      return -EINVAL;
+    }
+
+    if (p.is_tier()) {
+      ss << "application must be enabled on base tier";
+      return -EINVAL;
+    }
+
+    string force;
+    cmd_getval(g_ceph_context, cmdmap, "force", force);
+
+    if (!app_exists && !p.application_metadata.empty() &&
+        force != "--yes-i-really-mean-it") {
+      ss << "Are you SURE? Pool '" << pool_name << "' already has an enabled "
+         << "application; pass --yes-i-really-mean-it to proceed anyway";
+      return -EPERM;
+    }
+
+    if (!app_exists && p.application_metadata.size() >= MAX_POOL_APPLICATIONS) {
+      ss << "too many enabled applications on pool '" << pool_name << "'; "
+         << "max " << MAX_POOL_APPLICATIONS;
+      return -EINVAL;
+    }
+
+    if (app.length() > MAX_POOL_APPLICATION_LENGTH) {
+      ss << "application name '" << app << "' too long; max length "
+         << MAX_POOL_APPLICATION_LENGTH;
+      return -EINVAL;
+    }
+
+    if (!app_exists) {
+      p.application_metadata[app] = {};
+    }
+    ss << "enabled application '" << app << "' on pool '" << pool_name << "'";
+
+  } else if (boost::algorithm::ends_with(prefix, "disable")) {
+    string force;
+    cmd_getval(g_ceph_context, cmdmap, "force", force);
+
+    if (force != "--yes-i-really-mean-it") {
+      ss << "Are you SURE? Disabling an application within a pool might result "
+         << "in loss of application functionality; pass "
+         << "--yes-i-really-mean-it to proceed anyway";
+      return -EPERM;
+    }
+
+    if (!app_exists) {
+      ss << "application '" << app << "' is not enabled on pool '" << pool_name
+         << "'";
+      return 0; // idempotent
+    }
+
+    p.application_metadata.erase(app);
+    ss << "disable application '" << app << "' on pool '" << pool_name << "'";
+
+  } else if (boost::algorithm::ends_with(prefix, "set")) {
+    if (p.is_tier()) {
+      ss << "application metadata must be set on base tier";
+      return -EINVAL;
+    }
+
+    if (!app_exists) {
+      ss << "application '" << app << "' is not enabled on pool '" << pool_name
+         << "'";
+      return -ENOENT;
+    }
+
+    string key;
+    cmd_getval(g_ceph_context, cmdmap, "key", key);
+
+    if (key.empty()) {
+      ss << "key must be provided";
+      return -EINVAL;
+    }
+
+    auto &app_keys = p.application_metadata[app];
+    if (app_keys.count(key) == 0 &&
+        app_keys.size() >= MAX_POOL_APPLICATION_KEYS) {
+      ss << "too many keys set for application '" << app << "' on pool '"
+         << pool_name << "'; max " << MAX_POOL_APPLICATION_KEYS;
+      return -EINVAL;
+    }
+
+    if (key.length() > MAX_POOL_APPLICATION_LENGTH) {
+      ss << "key '" << app << "' too long; max length "
+         << MAX_POOL_APPLICATION_LENGTH;
+      return -EINVAL;
+    }
+
+    string value;
+    cmd_getval(g_ceph_context, cmdmap, "value", value);
+    if (value.length() > MAX_POOL_APPLICATION_LENGTH) {
+      ss << "value '" << value << "' too long; max length "
+         << MAX_POOL_APPLICATION_LENGTH;
+      return -EINVAL;
+    }
+
+    p.application_metadata[app][key] = value;
+    ss << "set application '" << app << "' key '" << key << "' to '"
+       << value << "' on pool '" << pool_name << "'";
+  } else if (boost::algorithm::ends_with(prefix, "rm")) {
+    if (!app_exists) {
+      ss << "application '" << app << "' is not enabled on pool '" << pool_name
+         << "'";
+      return -ENOENT;
+    }
+
+    string key;
+    cmd_getval(g_ceph_context, cmdmap, "key", key);
+    auto it = p.application_metadata[app].find(key);
+    if (it == p.application_metadata[app].end()) {
+      ss << "application '" << app << "' on pool '" << pool_name
+         << "' does not have key '" << key << "'";
+      return 0; // idempotent
+    }
+
+    p.application_metadata[app].erase(it);
+    ss << "removed application '" << app << "' key '" << key << "' on pool '"
+       << pool_name << "'";
+  } else {
+    assert(false);
+  }
+
+  p.last_change = pending_inc.epoch;
+  pending_inc.new_pools[pool] = p;
+  return 0;
+}
+
 int OSDMonitor::_prepare_command_osd_crush_remove(
     CrushWrapper &newcrush,
     int32_t id,
@@ -6664,6 +6899,12 @@ int OSDMonitor::prepare_command_osd_new(
     assert(osdmap.is_destroyed(id));
     pending_inc.new_weight[id] = CEPH_OSD_OUT;
     pending_inc.new_state[id] |= CEPH_OSD_DESTROYED | CEPH_OSD_NEW;
+    if (osdmap.get_state(id) & CEPH_OSD_UP) {
+      // due to http://tracker.ceph.com/issues/20751 some clusters may
+      // have UP set for non-existent OSDs; make sure it is cleared
+      // for a newly created osd.
+      pending_inc.new_state[id] |= CEPH_OSD_UP;
+    }
     pending_inc.new_uuid[id] = uuid;
   } else {
     assert(id >= 0);
@@ -7115,6 +7356,69 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       return true;
     }
 
+ } else if (prefix == "osd crush rm-device-class") {
+    bool stop = false;
+    vector<string> idvec;
+    cmd_getval(g_ceph_context, cmdmap, "ids", idvec);
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    set<int> updated;
+
+    for (unsigned j = 0; j < idvec.size() && !stop; j++) {
+      set<int> osds;
+
+      // wildcard?
+      if (j == 0 &&
+          (idvec[0] == "any" || idvec[0] == "all" || idvec[0] == "*")) {
+        osdmap.get_all_osds(osds);
+        stop = true;
+      } else {
+        // try traditional single osd way
+        long osd = parse_osd_id(idvec[j].c_str(), &ss);
+        if (osd < 0) {
+          // ss has reason for failure
+          ss << ", unable to parse osd id:\"" << idvec[j] << "\". ";
+          err = -EINVAL;
+          goto reply;
+        }
+        osds.insert(osd);
+      }
+
+      for (auto &osd : osds) {
+        if (!osdmap.exists(osd)) {
+          ss << "osd." << osd << " does not exist. ";
+          continue;
+        }
+
+        auto class_name = newcrush.get_item_class(osd);
+        stringstream ts;
+        if (!class_name) {
+          ss << "osd." << osd << " belongs to no class, ";
+          continue;
+        }
+        // note that we do not verify if class_is_in_use here
+        // in case the device is misclassified and user wants
+        // to overridely reset...
+
+        err = newcrush.remove_device_class(g_ceph_context, osd, &ss);
+        if (err < 0) {
+          // ss has reason for failure
+          goto reply;
+        }
+        updated.insert(osd);
+      }
+    }
+
+    if (!updated.empty()) {
+      pending_inc.crush.clear();
+      newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+      ss << "done removing class of osd(s): " << updated;
+      getline(ss, rs);
+      wait_for_finished_proposal(op,
+        new Monitor::C_Command(mon,op, 0, rs, get_last_committed() + 1));
+      return true;
+    }
+
   } else if (prefix == "osd crush add-bucket") {
     // os crush add-bucket <name> <type>
     string name, typestr;
@@ -7176,40 +7480,6 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     else
       goto update;
-  } else if (prefix == "osd crush class create") {
-    string device_class;
-    if (!cmd_getval(g_ceph_context, cmdmap, "class", device_class)) {
-      err = -EINVAL; // no value!
-      goto reply;
-    }
-    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      ss << "you must complete the upgrade and 'ceph osd require-osd-release "
-        << "luminous' before using crush device classes";
-      err = -EPERM;
-      goto reply;
-    }
-    if (!_have_pending_crush() &&
-       _get_stable_crush().class_exists(device_class)) {
-      ss << "class '" << device_class << "' already exists";
-      goto reply;
-    }
-
-    CrushWrapper newcrush;
-    _get_pending_crush(newcrush);
-
-    if (newcrush.class_exists(name)) {
-      ss << "class '" << device_class << "' already exists";
-      goto update;
-    }
-
-    int class_id = newcrush.get_or_create_class_id(device_class);
-
-    pending_inc.crush.clear();
-    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-    ss << "created class " << device_class << " with id " << class_id
-       << " to crush map";
-    goto update;
-    
   } else if (prefix == "osd crush class rm") {
     string device_class;
     if (!cmd_getval(g_ceph_context, cmdmap, "class", device_class)) {
@@ -7234,17 +7504,31 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     int class_id = newcrush.get_class_id(device_class);
 
-    if (newcrush.class_is_in_use(class_id)) {
+    stringstream ts;
+    if (newcrush.class_is_in_use(class_id, &ts)) {
       err = -EBUSY;
-      ss << "class '" << device_class << "' is in use";
+      ss << "class '" << device_class << "' " << ts.str();
       goto reply;
     }
 
-    err = newcrush.remove_class_name(device_class);
-    if (err < 0) {
-      ss << "class '" << device_class << "' cannot be removed '"
-        << cpp_strerror(err) << "'";
-      goto reply;
+    set<int> osds;
+    newcrush.get_devices_by_class(device_class, &osds);
+    for (auto& p: osds) {
+      err = newcrush.remove_device_class(g_ceph_context, p, &ss);
+      if (err < 0) {
+        // ss has reason for failure
+        goto reply;
+      }
+    }
+
+    if (osds.empty()) {
+      // empty class, remove directly
+      err = newcrush.remove_class_name(device_class);
+      if (err < 0) {
+        ss << "class '" << device_class << "' cannot be removed '"
+           << cpp_strerror(err) << "'";
+        goto reply;
+      }
     }
 
     pending_inc.crush.clear();
@@ -7253,59 +7537,129 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        << " from crush map";
     goto update;
 
-  } else if (prefix == "osd crush class rename") {
-    string srcname, dstname;
-    if (!cmd_getval(g_ceph_context, cmdmap, "srcname", srcname)) {
-      err = -EINVAL;
-      goto reply;
-    }
-    if (osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
-      ss << "you must complete the upgrade and 'ceph osd require-osd-release "
-        << "luminous' before using crush device classes";
+  } else if (prefix == "osd crush weight-set create" ||
+            prefix == "osd crush weight-set create-compat") {
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    int64_t pool;
+    int positions;
+    if (newcrush.has_non_straw2_buckets()) {
+      ss << "crush map contains one or more bucket(s) that are not straw2";
       err = -EPERM;
       goto reply;
     }
-
-    if (!cmd_getval(g_ceph_context, cmdmap, "dstname", dstname)) {
-      err = -EINVAL;
-      goto reply;
+    if (prefix == "osd crush weight-set create") {
+      if (osdmap.require_min_compat_client > 0 &&
+         osdmap.require_min_compat_client < CEPH_RELEASE_LUMINOUS) {
+       ss << "require_min_compat_client "
+          << ceph_release_name(osdmap.require_min_compat_client)
+          << " < luminous, which is required for per-pool weight-sets. "
+           << "Try 'ceph osd set-require-min-compat-client luminous' "
+           << "before using the new interface";
+       err = -EPERM;
+       goto reply;
+      }
+      string poolname, mode;
+      cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
+      pool = osdmap.lookup_pg_pool_name(poolname.c_str());
+      if (pool < 0) {
+       ss << "pool '" << poolname << "' not found";
+       err = -ENOENT;
+       goto reply;
+      }
+      cmd_getval(g_ceph_context, cmdmap, "mode", mode);
+      if (mode != "flat" && mode != "positional") {
+       ss << "unrecognized weight-set mode '" << mode << "'";
+       err = -EINVAL;
+       goto reply;
+      }
+      positions = mode == "flat" ? 1 : osdmap.get_pg_pool(pool)->get_size();
+    } else {
+      pool = CrushWrapper::DEFAULT_CHOOSE_ARGS;
+      positions = 1;
     }
+    newcrush.create_choose_args(pool, positions);
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    goto update;
 
+  } else if (prefix == "osd crush weight-set rm" ||
+            prefix == "osd crush weight-set rm-compat") {
     CrushWrapper newcrush;
     _get_pending_crush(newcrush);
-
-    if (!newcrush.class_exists(srcname)) {
-      err = -ENOENT;
-      ss << "class '" << srcname << "' does not exist";
-      goto reply;
+    int64_t pool;
+    if (prefix == "osd crush weight-set rm") {
+      string poolname;
+      cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
+      pool = osdmap.lookup_pg_pool_name(poolname.c_str());
+      if (pool < 0) {
+       ss << "pool '" << poolname << "' not found";
+       err = -ENOENT;
+       goto reply;
+      }
+    } else {
+      pool = CrushWrapper::DEFAULT_CHOOSE_ARGS;
     }
+    newcrush.rm_choose_args(pool);
+    pending_inc.crush.clear();
+    newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
+    goto update;
 
-    if (newcrush.class_exists(dstname)) {
-      err = -EEXIST;
-      ss << "class '" << dstname << "' already exists";
-      goto reply;
+  } else if (prefix == "osd crush weight-set reweight" ||
+            prefix == "osd crush weight-set reweight-compat") {
+    string poolname, item;
+    vector<double> weight;
+    cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
+    cmd_getval(g_ceph_context, cmdmap, "item", item);
+    cmd_getval(g_ceph_context, cmdmap, "weight", weight);
+    CrushWrapper newcrush;
+    _get_pending_crush(newcrush);
+    int64_t pool;
+    if (prefix == "osd crush weight-set reweight") {
+      pool = osdmap.lookup_pg_pool_name(poolname.c_str());
+      if (pool < 0) {
+       ss << "pool '" << poolname << "' not found";
+       err = -ENOENT;
+       goto reply;
+      }
+      if (!newcrush.have_choose_args(pool)) {
+       ss << "no weight-set for pool '" << poolname << "'";
+       err = -ENOENT;
+       goto reply;
+      }
+      auto arg_map = newcrush.choose_args_get(pool);
+      int positions = newcrush.get_choose_args_positions(arg_map);
+      if (weight.size() != (size_t)positions) {
+         ss << "must specify exact " << positions << " weight values";
+         err = -EINVAL;
+         goto reply;
+      }
+    } else {
+      pool = CrushWrapper::DEFAULT_CHOOSE_ARGS;
+      if (!newcrush.have_choose_args(pool)) {
+       ss << "no backward-compatible weight-set";
+       err = -ENOENT;
+       goto reply;
+      }
     }
-
-    int class_id = newcrush.get_class_id(srcname);
-
-    if (newcrush.class_is_in_use(class_id)) {
-      err = -EBUSY;
-      ss << "class '" << srcname << "' is in use";
+    if (!newcrush.name_exists(item)) {
+      ss << "item '" << item << "' does not exist";
+      err = -ENOENT;
       goto reply;
     }
-
-    err = newcrush.rename_class(srcname, dstname);
+    err = newcrush.choose_args_adjust_item_weightf(
+      g_ceph_context,
+      newcrush.choose_args_get(pool),
+      newcrush.get_item_id(item),
+      weight,
+      &ss);
     if (err < 0) {
-      ss << "fail to rename '" << srcname << "' to '" << dstname << "':"
-         << cpp_strerror(err);
       goto reply;
     }
-
+    err = 0;
     pending_inc.crush.clear();
     newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
-    ss << "rename class '" << srcname << "' to '" << dstname << "'";
     goto update;
-    
   } else if (osdid_present &&
             (prefix == "osd crush set" || prefix == "osd crush add")) {
     // <OsdName> is 'osd.<id>' or '<id>', passed as int64_t id
@@ -7314,7 +7668,7 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
 
     if (!osdmap.exists(osdid)) {
       err = -ENOENT;
-      ss << name << " does not exist.  create it before updating the crush map";
+      ss << name << " does not exist. Create it before updating the crush map";
       goto reply;
     }
 
@@ -8279,6 +8633,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        err = -EPERM;
        goto reply;
       }
+    } else if (key == "recovery_deletes") {
+      if (HAVE_FEATURE(osdmap.get_up_osd_features(), OSD_RECOVERY_DELETES)) {
+       return prepare_set_flag(op, CEPH_OSDMAP_RECOVERY_DELETES);
+      } else {
+       ss << "not all up OSDs have OSD_RECOVERY_DELETES feature";
+       err = -EPERM;
+       goto reply;
+      }
     } else if (key == "require_jewel_osds") {
       if (!osdmap.test_flag(CEPH_OSDMAP_SORTBITWISE)) {
        ss << "the sortbitwise flag must be set before require_jewel_osds";
@@ -8385,6 +8747,10 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
       goto reply;
     }
     pending_inc.new_require_osd_release = rel;
+    if (rel >= CEPH_RELEASE_LUMINOUS &&
+       !osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
+      return prepare_set_flag(op, CEPH_OSDMAP_RECOVERY_DELETES);
+    }
     goto update;
   } else if (prefix == "osd cluster_snap") {
     // ** DISABLE THIS FOR NOW **
@@ -9114,7 +9480,14 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
             err = -ENOENT;
             goto reply;
           }
-          new_pg_upmap_items.push_back(make_pair(from, to));
+          pair<int32_t,int32_t> entry = make_pair(from, to);
+          auto it = std::find(new_pg_upmap_items.begin(),
+            new_pg_upmap_items.end(), entry);
+          if (it != new_pg_upmap_items.end()) {
+            ss << "osd." << from << " -> osd." << to << " already exists, ";
+            continue;
+          }
+          new_pg_upmap_items.push_back(entry);
           items << from << "->" << to << ",";
         }
         string out(items.str());
@@ -10368,7 +10741,20 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
     wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
                                              get_last_committed() + 1));
     return true;
+  } else if (prefix == "osd pool application enable" ||
+             prefix == "osd pool application disable" ||
+             prefix == "osd pool application set" ||
+             prefix == "osd pool application rm") {
+    err = prepare_command_pool_application(prefix, cmdmap, ss);
+    if (err == -EAGAIN)
+      goto wait;
+    if (err < 0)
+      goto reply;
 
+    getline(ss, rs);
+    wait_for_finished_proposal(
+      op, new Monitor::C_Command(mon, op, 0, rs, get_last_committed() + 1));
+    return true;
   } else if (prefix == "osd reweight-by-pg" ||
             prefix == "osd reweight-by-utilization" ||
             prefix == "osd test-reweight-by-pg" ||
@@ -10438,6 +10824,32 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
        new Monitor::C_Command(mon, op, 0, rs, rdata, get_last_committed() + 1));
       return true;
     }
+  } else if (prefix == "osd force-create-pg") {
+    pg_t pgid;
+    string pgidstr;
+    cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
+    if (!pgid.parse(pgidstr.c_str())) {
+      ss << "invalid pgid '" << pgidstr << "'";
+      err = -EINVAL;
+      goto reply;
+    }
+    bool creating_now;
+    {
+      std::lock_guard<std::mutex> l(creating_pgs_lock);
+      auto emplaced = creating_pgs.pgs.emplace(pgid,
+                                              make_pair(osdmap.get_epoch(),
+                                                        ceph_clock_now()));
+      creating_now = emplaced.second;
+    }
+    if (creating_now) {
+      ss << "pg " << pgidstr << " now creating, ok";
+      err = 0;
+      goto update;
+    } else {
+      ss << "pg " << pgid << " already creating";
+      err = 0;
+      goto reply;
+    }
   } else {
     err = -EINVAL;
   }