]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSDMap.cc
update sources to 12.2.8
[ceph.git] / ceph / src / osd / OSDMap.cc
index 5043ff417151a6407111cd06e1091af8cf1f888f..2bb8beb94e72b32d225ff5dabf57ea4c74b23ab7 100644 (file)
  *
  */
 
+#include <boost/algorithm/string.hpp>
+
 #include "OSDMap.h"
 #include <algorithm>
 #include "common/config.h"
+#include "common/errno.h"
 #include "common/Formatter.h"
 #include "common/TextTable.h"
 #include "include/ceph_features.h"
 #include "include/str_map.h"
 
 #include "common/code_environment.h"
+#include "mon/health_check.h"
 
 #include "crush/CrushTreeDumper.h"
 #include "common/Clock.h"
+#include "mon/PGStatService.h"
  
 #define dout_subsys ceph_subsys_osd
 
@@ -231,6 +236,8 @@ int OSDMap::Incremental::propagate_snaps_to_tiers(CephContext *cct,
   return 0;
 }
 
+// ----------------------------------
+// OSDMap
 
 bool OSDMap::subtree_is_down(int id, set<int> *down_cache) const
 {
@@ -293,6 +300,48 @@ bool OSDMap::containing_subtree_is_down(CephContext *cct, int id, int subtree_ty
   }
 }
 
+bool OSDMap::subtree_type_is_down(
+  CephContext *cct,
+  int id,
+  int subtree_type,
+  set<int> *down_in_osds,
+  set<int> *up_in_osds,
+  set<int> *subtree_up,
+  unordered_map<int, set<int> > *subtree_type_down) const
+{
+  if (id >= 0) {
+    bool is_down_ret = is_down(id);
+    if (!is_out(id)) {
+      if (is_down_ret) {
+        down_in_osds->insert(id);
+      } else {
+        up_in_osds->insert(id);
+      }
+    }
+    return is_down_ret;
+  }
+
+  if (subtree_type_down &&
+      (*subtree_type_down)[subtree_type].count(id)) {
+    return true;
+  }
+
+  list<int> children;
+  crush->get_children(id, &children);
+  for (const auto &child : children) {
+    if (!subtree_type_is_down(
+         cct, child, crush->get_bucket_type(child),
+         down_in_osds, up_in_osds, subtree_up, subtree_type_down)) {
+      subtree_up->insert(id);
+      return false;
+    }
+  }
+  if (subtree_type_down) {
+    (*subtree_type_down)[subtree_type].insert(id);
+  }
+  return true;
+}
+
 void OSDMap::Incremental::encode_client_old(bufferlist& bl) const
 {
   __u16 v = 5;
@@ -332,7 +381,15 @@ void OSDMap::Incremental::encode_client_old(bufferlist& bl) const
     ::encode(n, bl);
   }
   ::encode(new_up_client, bl, 0);
-  ::encode(new_state, bl);
+  {
+    // legacy is map<int32_t,uint8_t>
+    uint32_t n = new_state.size();
+    ::encode(n, bl);
+    for (auto p : new_state) {
+      ::encode(p.first, bl);
+      ::encode((uint8_t)p.second, bl);
+    }
+  }
   ::encode(new_weight, bl);
   // for ::encode(new_pg_temp, bl);
   n = new_pg_temp.size();
@@ -368,7 +425,14 @@ void OSDMap::Incremental::encode_classic(bufferlist& bl, uint64_t features) cons
   ::encode(new_pool_names, bl);
   ::encode(old_pools, bl);
   ::encode(new_up_client, bl, features);
-  ::encode(new_state, bl);
+  {
+    uint32_t n = new_state.size();
+    ::encode(n, bl);
+    for (auto p : new_state) {
+      ::encode(p.first, bl);
+      ::encode((uint8_t)p.second, bl);
+    }
+  }
   ::encode(new_weight, bl);
   ::encode(new_pg_temp, bl);
 
@@ -410,7 +474,7 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
   ENCODE_START(8, 7, bl);
 
   {
-    uint8_t v = 4;
+    uint8_t v = 5;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       v = 3;
     }
@@ -428,7 +492,16 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
     ::encode(new_pool_names, bl);
     ::encode(old_pools, bl);
     ::encode(new_up_client, bl, features);
-    ::encode(new_state, bl);
+    if (v >= 5) {
+      ::encode(new_state, bl);
+    } else {
+      uint32_t n = new_state.size();
+      ::encode(n, bl);
+      for (auto p : new_state) {
+       ::encode(p.first, bl);
+       ::encode((uint8_t)p.second, bl);
+      }
+    }
     ::encode(new_weight, bl);
     ::encode(new_pg_temp, bl);
     ::encode(new_primary_temp, bl);
@@ -445,7 +518,7 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
   }
 
   {
-    uint8_t target_v = 5;
+    uint8_t target_v = 6;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       target_v = 2;
     }
@@ -466,7 +539,11 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
       ::encode(new_nearfull_ratio, bl);
       ::encode(new_full_ratio, bl);
       ::encode(new_backfillfull_ratio, bl);
+    }
+    // 5 was string-based new_require_min_compat_client
+    if (target_v >= 6) {
       ::encode(new_require_min_compat_client, bl);
+      ::encode(new_require_osd_release, bl);
     }
     ENCODE_FINISH(bl); // osd-only data
   }
@@ -543,7 +620,13 @@ void OSDMap::Incremental::decode_classic(bufferlist::iterator &p)
     ::decode(old_pools, p);
   }
   ::decode(new_up_client, p);
-  ::decode(new_state, p);
+  {
+    map<int32_t,uint8_t> ns;
+    ::decode(ns, p);
+    for (auto q : ns) {
+      new_state[q.first] = q.second;
+    }
+  }
   ::decode(new_weight, p);
 
   if (v < 6) {
@@ -612,7 +695,7 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
     return;
   }
   {
-    DECODE_START(4, bl); // client-usable data
+    DECODE_START(5, bl); // client-usable data
     ::decode(fsid, bl);
     ::decode(epoch, bl);
     ::decode(modified, bl);
@@ -626,7 +709,15 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
     ::decode(new_pool_names, bl);
     ::decode(old_pools, bl);
     ::decode(new_up_client, bl);
-    ::decode(new_state, bl);
+    if (struct_v >= 5) {
+      ::decode(new_state, bl);
+    } else {
+      map<int32_t,uint8_t> ns;
+      ::decode(ns, bl);
+      for (auto q : ns) {
+       new_state[q.first] = q.second;
+      }
+    }
     ::decode(new_weight, bl);
     ::decode(new_pg_temp, bl);
     ::decode(new_primary_temp, bl);
@@ -651,7 +742,7 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
   }
 
   {
-    DECODE_START(5, bl); // extended, osd-only data
+    DECODE_START(6, bl); // extended, osd-only data
     ::decode(new_hb_back_up, bl);
     ::decode(new_up_thru, bl);
     ::decode(new_last_clean_interval, bl);
@@ -679,8 +770,29 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
     } else {
       new_backfillfull_ratio = -1;
     }
-    if (struct_v >= 5)
+    if (struct_v == 5) {
+      string r;
+      ::decode(r, bl);
+      if (r.length()) {
+       new_require_min_compat_client = ceph_release_from_name(r.c_str());
+      }
+    }
+    if (struct_v >= 6) {
       ::decode(new_require_min_compat_client, bl);
+      ::decode(new_require_osd_release, bl);
+    } else {
+      if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+       // only for compat with post-kraken pre-luminous test clusters
+       new_require_osd_release = CEPH_RELEASE_LUMINOUS;
+       new_flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
+      } else if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       new_require_osd_release = CEPH_RELEASE_KRAKEN;
+      } else if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_JEWEL)) {
+       new_require_osd_release = CEPH_RELEASE_JEWEL;
+      } else {
+       new_require_osd_release = -1;
+      }
+    }
     DECODE_FINISH(bl); // osd-only data
   }
 
@@ -725,7 +837,8 @@ void OSDMap::Incremental::dump(Formatter *f) const
   f->dump_float("new_full_ratio", new_full_ratio);
   f->dump_float("new_nearfull_ratio", new_nearfull_ratio);
   f->dump_float("new_backfillfull_ratio", new_backfillfull_ratio);
-  f->dump_string("new_require_min_compat_client", new_require_min_compat_client);
+  f->dump_int("new_require_min_compat_client", new_require_min_compat_client);
+  f->dump_int("new_require_osd_release", new_require_osd_release);
 
   if (fullmap.length()) {
     f->open_object_section("full_map");
@@ -807,6 +920,7 @@ void OSDMap::Incremental::dump(Formatter *f) const
     for (auto &state : st)
       f->dump_string("state", state);
     f->close_section();
+    f->close_section();
   }
   f->close_section();
 
@@ -985,6 +1099,13 @@ void OSDMap::get_blacklist(list<pair<entity_addr_t,utime_t> > *bl) const
    std::copy(blacklist.begin(), blacklist.end(), std::back_inserter(*bl));
 }
 
+void OSDMap::get_blacklist(std::set<entity_addr_t> *bl) const
+{
+  for (const auto &i : blacklist) {
+    bl->insert(i.first);
+  }
+}
+
 void OSDMap::set_max_osd(int m)
 {
   int o = max_osd;
@@ -1027,36 +1148,59 @@ int OSDMap::calc_num_osds()
   return num_osd;
 }
 
-void OSDMap::count_full_nearfull_osds(int *full, int *backfill, int *nearfull) const
+void OSDMap::get_full_pools(CephContext *cct,
+                            set<int64_t> *full,
+                            set<int64_t> *backfillfull,
+                            set<int64_t> *nearfull) const
 {
-  *full = 0;
-  *backfill = 0;
-  *nearfull = 0;
+  assert(full);
+  assert(backfillfull);
+  assert(nearfull);
+  full->clear();
+  backfillfull->clear();
+  nearfull->clear();
+
+  vector<int> full_osds;
+  vector<int> backfillfull_osds;
+  vector<int> nearfull_osds;
   for (int i = 0; i < max_osd; ++i) {
     if (exists(i) && is_up(i) && is_in(i)) {
       if (osd_state[i] & CEPH_OSD_FULL)
-       ++(*full);
+        full_osds.push_back(i);
       else if (osd_state[i] & CEPH_OSD_BACKFILLFULL)
-       ++(*backfill);
+       backfillfull_osds.push_back(i);
       else if (osd_state[i] & CEPH_OSD_NEARFULL)
-       ++(*nearfull);
+       nearfull_osds.push_back(i);
     }
   }
+
+  for (auto i: full_osds) {
+    get_pool_ids_by_osd(cct, i, full);
+  }
+  for (auto i: backfillfull_osds) {
+    get_pool_ids_by_osd(cct, i, backfillfull);
+  }
+  for (auto i: nearfull_osds) {
+    get_pool_ids_by_osd(cct, i, nearfull);
+  }
 }
 
-static bool get_osd_utilization(const ceph::unordered_map<int32_t,osd_stat_t> &osd_stat,
-   int id, int64_t* kb, int64_t* kb_used, int64_t* kb_avail) {
-    auto p = osd_stat.find(id);
-    if (p == osd_stat.end())
-      return false;
-    *kb = p->second.kb;
-    *kb_used = p->second.kb_used;
-    *kb_avail = p->second.kb_avail;
-    return *kb > 0;
+static bool get_osd_utilization(
+  const mempool::pgmap::unordered_map<int32_t,osd_stat_t> &osd_stat,
+  int id, int64_t* kb, int64_t* kb_used, int64_t* kb_avail)
+{
+  auto p = osd_stat.find(id);
+  if (p == osd_stat.end())
+    return false;
+  *kb = p->second.kb;
+  *kb_used = p->second.kb_used;
+  *kb_avail = p->second.kb_avail;
+  return *kb > 0;
 }
 
-void OSDMap::get_full_osd_util(const ceph::unordered_map<int32_t,osd_stat_t> &osd_stat,
-     map<int, float> *full, map<int, float> *backfill, map<int, float> *nearfull) const
+void OSDMap::get_full_osd_util(
+  const mempool::pgmap::unordered_map<int32_t,osd_stat_t> &osd_stat,
+  map<int, float> *full, map<int, float> *backfill, map<int, float> *nearfull) const
 {
   full->clear();
   backfill->clear();
@@ -1078,6 +1222,24 @@ void OSDMap::get_full_osd_util(const ceph::unordered_map<int32_t,osd_stat_t> &os
   }
 }
 
+void OSDMap::get_full_osd_counts(set<int> *full, set<int> *backfill,
+                                set<int> *nearfull) const
+{
+  full->clear();
+  backfill->clear();
+  nearfull->clear();
+  for (int i = 0; i < max_osd; ++i) {
+    if (exists(i) && is_up(i) && is_in(i)) {
+      if (osd_state[i] & CEPH_OSD_FULL)
+       full->emplace(i);
+      else if (osd_state[i] & CEPH_OSD_BACKFILLFULL)
+       backfill->emplace(i);
+      else if (osd_state[i] & CEPH_OSD_NEARFULL)
+       nearfull->emplace(i);
+    }
+  }
+}
+
 void OSDMap::get_all_osds(set<int32_t>& ls) const
 {
   for (int i=0; i<max_osd; i++)
@@ -1093,6 +1255,14 @@ void OSDMap::get_up_osds(set<int32_t>& ls) const
   }
 }
 
+void OSDMap::get_out_osds(set<int32_t>& ls) const
+{
+  for (int i = 0; i < max_osd; i++) {
+    if (is_out(i))
+      ls.insert(i);
+  }
+}
+
 void OSDMap::calc_state_set(int state, set<string>& st)
 {
   unsigned t = state;
@@ -1166,8 +1336,9 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
     features |= CEPH_FEATURE_CRUSH_V4;
   if (crush->has_nondefault_tunables5())
     features |= CEPH_FEATURE_CRUSH_TUNABLES5;
-  if (crush->has_incompat_chooseargs())
-    features |= CEPH_FEATURE_CRUSH_CHOOSEARGS;
+  if (crush->has_incompat_choose_args()) {
+    features |= CEPH_FEATUREMASK_CRUSH_CHOOSE_ARGS;
+  }
   mask |= CEPH_FEATURES_CRUSH;
 
   if (!pg_upmap.empty() || !pg_upmap_items.empty())
@@ -1186,7 +1357,7 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
        pool.second.is_tier()) {
       features |= CEPH_FEATURE_OSD_CACHEPOOL;
     }
-    int ruleid = crush->find_rule(pool.second.get_crush_ruleset(),
+    int ruleid = crush->find_rule(pool.second.get_crush_rule(),
                                  pool.second.get_type(),
                                  pool.second.get_size());
     if (ruleid >= 0) {
@@ -1226,14 +1397,14 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
 
   if (entity_type == CEPH_ENTITY_TYPE_OSD) {
     const uint64_t jewel_features = CEPH_FEATURE_SERVER_JEWEL;
-    if (test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+    if (require_osd_release >= CEPH_RELEASE_JEWEL) {
       features |= jewel_features;
     }
     mask |= jewel_features;
 
     const uint64_t kraken_features = CEPH_FEATUREMASK_SERVER_KRAKEN
       | CEPH_FEATURE_MSG_ADDR2;
-    if (test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+    if (require_osd_release >= CEPH_RELEASE_KRAKEN) {
       features |= kraken_features;
     }
     mask |= kraken_features;
@@ -1244,34 +1415,34 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
   return features;
 }
 
-pair<string,string> OSDMap::get_min_compat_client() const
+uint8_t OSDMap::get_min_compat_client() const
 {
   uint64_t f = get_features(CEPH_ENTITY_TYPE_CLIENT, nullptr);
 
   if (HAVE_FEATURE(f, OSDMAP_PG_UPMAP) ||      // v12.0.0-1733-g27d6f43
-      HAVE_FEATURE(f, CRUSH_CHOOSEARGS)) {     // v12.0.1-2172-gef1ef28
-    return make_pair("luminous", "12.2.0");
+      HAVE_FEATURE(f, CRUSH_CHOOSE_ARGS)) {    // v12.0.1-2172-gef1ef28
+    return CEPH_RELEASE_LUMINOUS;  // v12.2.0
   }
   if (HAVE_FEATURE(f, CRUSH_TUNABLES5)) {      // v10.0.0-612-g043a737
-    return make_pair("jewel", "10.2.0");
+    return CEPH_RELEASE_JEWEL;     // v10.2.0
   }
   if (HAVE_FEATURE(f, CRUSH_V4)) {             // v0.91-678-g325fc56
-    return make_pair("hammer", "0.94");
+    return CEPH_RELEASE_HAMMER;    // v0.94.0
   }
   if (HAVE_FEATURE(f, OSD_PRIMARY_AFFINITY) || // v0.76-553-gf825624
       HAVE_FEATURE(f, CRUSH_TUNABLES3) ||      // v0.76-395-ge20a55d
       HAVE_FEATURE(f, OSD_ERASURE_CODES) ||    // v0.73-498-gbfc86a8
       HAVE_FEATURE(f, OSD_CACHEPOOL)) {        // v0.67-401-gb91c1c5
-    return make_pair("firefly", "0.80");
+    return CEPH_RELEASE_FIREFLY;   // v0.80.0
   }
   if (HAVE_FEATURE(f, CRUSH_TUNABLES2) ||      // v0.54-684-g0cc47ff
       HAVE_FEATURE(f, OSDHASHPSPOOL)) {        // v0.57-398-g8cc2b0f
-    return make_pair("dumpling", "0.67");
+    return CEPH_RELEASE_DUMPLING;  // v0.67.0
   }
   if (HAVE_FEATURE(f, CRUSH_TUNABLES)) {       // v0.48argonaut-206-g6f381af
-    return make_pair("argonaut", "0.48argonaut-207");
+    return CEPH_RELEASE_ARGONAUT;  // v0.48argonaut-206-g6f381af
   }
-  return make_pair("argonaut", "0.48");
+  return CEPH_RELEASE_ARGONAUT;    // v0.48argonaut-206-g6f381af
 }
 
 void OSDMap::_calc_up_osd_features()
@@ -1282,6 +1453,8 @@ void OSDMap::_calc_up_osd_features()
     if (!is_up(osd))
       continue;
     const osd_xinfo_t &xi = get_xinfo(osd);
+    if (xi.features == 0)
+      continue;  // bogus xinfo, maybe #20751 or similar, skipping
     if (first) {
       cached_up_osd_features = xi.features;
       first = false;
@@ -1342,10 +1515,8 @@ void OSDMap::dedup(const OSDMap *o, OSDMap *n)
   }
 
   // does pg_temp match?
-  if (o->pg_temp->size() == n->pg_temp->size()) {
-    if (*o->pg_temp == *n->pg_temp)
-      n->pg_temp = o->pg_temp;
-  }
+  if (*o->pg_temp == *n->pg_temp)
+    n->pg_temp = o->pg_temp;
 
   // does primary_temp match?
   if (o->primary_temp->size() == n->primary_temp->size()) {
@@ -1431,6 +1602,136 @@ void OSDMap::clean_temps(CephContext *cct,
   }
 }
 
+void OSDMap::maybe_remove_pg_upmaps(CephContext *cct,
+                                    const OSDMap& osdmap,
+                                    Incremental *pending_inc)
+{
+  ldout(cct, 10) << __func__ << dendl;
+  OSDMap tmpmap;
+  tmpmap.deepish_copy_from(osdmap);
+  tmpmap.apply_incremental(*pending_inc);
+  set<pg_t> to_check;
+  set<pg_t> to_cancel;
+  map<int, map<int, float>> rule_weight_map;
+
+  for (auto& p : tmpmap.pg_upmap) {
+    to_check.insert(p.first);
+  }
+  for (auto& p : tmpmap.pg_upmap_items) {
+    to_check.insert(p.first);
+  }
+  for (auto& p : pending_inc->new_pg_upmap) {
+    to_check.insert(p.first);
+  }
+  for (auto& p : pending_inc->new_pg_upmap_items) {
+    to_check.insert(p.first);
+  }
+  for (auto& pg : to_check) {
+    auto crush_rule = tmpmap.get_pg_pool_crush_rule(pg);
+    if (crush_rule < 0) {
+      lderr(cct) << __func__ << " unable to load crush-rule of pg "
+                 << pg << dendl;
+      continue;
+    }
+    map<int, float> weight_map;
+    auto it = rule_weight_map.find(crush_rule);
+    if (it == rule_weight_map.end()) {
+      auto r = tmpmap.crush->get_rule_weight_osd_map(crush_rule, &weight_map);
+      if (r < 0) {
+        lderr(cct) << __func__ << " unable to get crush weight_map for "
+                   << "crush_rule " << crush_rule << dendl;
+        continue;
+      }
+      rule_weight_map[crush_rule] = weight_map;
+    } else {
+      weight_map = it->second;
+    }
+    auto type = tmpmap.crush->get_rule_failure_domain(crush_rule);
+    if (type < 0) {
+      lderr(cct) << __func__ << " unable to load failure-domain-type of pg "
+                 << pg << dendl;
+      continue;
+    }
+    ldout(cct, 10) << __func__ << " pg " << pg
+                   << " crush-rule-id " << crush_rule
+                   << " weight_map " << weight_map
+                   << " failure-domain-type " << type
+                   << dendl;
+    vector<int> raw;
+    int primary;
+    tmpmap.pg_to_raw_up(pg, &raw, &primary);
+    set<int> parents;
+    for (auto osd : raw) {
+      if (type > 0) {
+        auto parent = tmpmap.crush->get_parent_of_type(osd, type, crush_rule);
+        if (parent >= 0) {
+          lderr(cct) << __func__ << " unable to get parent of raw osd."
+                     << osd << " of pg " << pg
+                     << dendl;
+          break;
+        }
+        auto r = parents.insert(parent);
+        if (!r.second) {
+          // two up-set osds come from same parent
+          to_cancel.insert(pg);
+          break;
+        }
+      }
+      // the above check validates collision only
+      // below we continue to check against crush-topology changing..
+      auto it = weight_map.find(osd);
+      if (it == weight_map.end()) {
+        // osd is gone or has been moved out of the specific crush-tree
+        to_cancel.insert(pg);
+        break;
+      }
+      auto adjusted_weight = tmpmap.get_weightf(it->first) * it->second;
+      if (adjusted_weight == 0) {
+        // osd is out/crush-out
+        to_cancel.insert(pg);
+        break;
+      }
+    }
+  }
+  for (auto &pg: to_cancel) {
+    { // pg_upmap
+      auto it = pending_inc->new_pg_upmap.find(pg);
+      if (it != pending_inc->new_pg_upmap.end()) {
+        ldout(cct, 10) << __func__ << " cancel invalid pending "
+                       << "pg_upmap entry "
+                       << it->first << "->" << it->second
+                       << dendl;
+        pending_inc->new_pg_upmap.erase(it);
+      }
+      if (osdmap.pg_upmap.count(pg)) {
+        ldout(cct, 10) << __func__ << " cancel invalid pg_upmap entry "
+                       << osdmap.pg_upmap.find(pg)->first << "->"
+                       << osdmap.pg_upmap.find(pg)->second
+                       << dendl;
+        pending_inc->old_pg_upmap.insert(pg);
+      }
+    }
+    { // pg_upmap_items
+      auto it = pending_inc->new_pg_upmap_items.find(pg);
+      if (it != pending_inc->new_pg_upmap_items.end()) {
+        ldout(cct, 10) << __func__ << " cancel invalid pending "
+                       << "pg_upmap_items entry "
+                       << it->first << "->" << it->second
+                       << dendl;
+        pending_inc->new_pg_upmap_items.erase(it);
+      }
+      if (osdmap.pg_upmap_items.count(pg)) {
+        ldout(cct, 10) << __func__ << " cancel invalid "
+                       << "pg_upmap_items entry "
+                       << osdmap.pg_upmap_items.find(pg)->first << "->"
+                       << osdmap.pg_upmap_items.find(pg)->second
+                       << dendl;
+        pending_inc->old_pg_upmap_items.insert(pg);
+      }
+    }
+  }
+}
+
 int OSDMap::apply_incremental(const Incremental &inc)
 {
   new_blacklist_entries = false;
@@ -1452,8 +1753,22 @@ int OSDMap::apply_incremental(const Incremental &inc)
   }
 
   // nope, incremental.
-  if (inc.new_flags >= 0)
+  if (inc.new_flags >= 0) {
     flags = inc.new_flags;
+    // the below is just to cover a newly-upgraded luminous mon
+    // cluster that has to set require_jewel_osds or
+    // require_kraken_osds before the osds can be upgraded to
+    // luminous.
+    if (flags & CEPH_OSDMAP_REQUIRE_KRAKEN) {
+      if (require_osd_release < CEPH_RELEASE_KRAKEN) {
+       require_osd_release = CEPH_RELEASE_KRAKEN;
+      }
+    } else if (flags & CEPH_OSDMAP_REQUIRE_JEWEL) {
+      if (require_osd_release < CEPH_RELEASE_JEWEL) {
+       require_osd_release = CEPH_RELEASE_JEWEL;
+      }
+    }
+  }
 
   if (inc.new_max_osd >= 0)
     set_max_osd(inc.new_max_osd);
@@ -1577,7 +1892,11 @@ int OSDMap::apply_incremental(const Incremental &inc)
     if (pg.second.empty())
       pg_temp->erase(pg.first);
     else
-      (*pg_temp)[pg.first] = pg.second;
+      pg_temp->set(pg.first, pg.second);
+  }
+  if (!inc.new_pg_temp.empty()) {
+    // make sure pg_temp is efficiently stored
+    pg_temp->rebuild();
   }
 
   for (const auto &pg : inc.new_primary_temp) {
@@ -1626,9 +1945,16 @@ int OSDMap::apply_incremental(const Incremental &inc)
   if (inc.new_full_ratio >= 0) {
     full_ratio = inc.new_full_ratio;
   }
-  if (inc.new_require_min_compat_client.length()) {
+  if (inc.new_require_min_compat_client > 0) {
     require_min_compat_client = inc.new_require_min_compat_client;
   }
+  if (inc.new_require_osd_release >= 0) {
+    require_osd_release = inc.new_require_osd_release;
+    if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
+      flags |= CEPH_OSDMAP_RECOVERY_DELETES;
+    }
+  }
 
   // do new crush map last (after up/down stuff)
   if (inc.crush.length()) {
@@ -1636,6 +1962,13 @@ int OSDMap::apply_incremental(const Incremental &inc)
     auto blp = bl.begin();
     crush.reset(new CrushWrapper);
     crush->decode(blp);
+    if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      // only increment if this is a luminous-encoded osdmap, lest
+      // the mon's crush_version diverge from what the osds or others
+      // are decoding and applying on their end.  if we won't encode
+      // it in the canonical version, don't change it.
+      ++crush_version;
+    }
   }
 
   calc_num_osds();
@@ -1713,7 +2046,7 @@ void OSDMap::_remove_nonexistent_osds(const pg_pool_t& pool,
   }
 }
 
-int OSDMap::_pg_to_raw_osds(
+void OSDMap::_pg_to_raw_osds(
   const pg_pool_t& pool, pg_t pg,
   vector<int> *osds,
   ps_t *ppps) const
@@ -1723,7 +2056,7 @@ int OSDMap::_pg_to_raw_osds(
   unsigned size = pool.get_size();
 
   // what crush rule?
-  int ruleno = crush->find_rule(pool.get_crush_ruleset(), pool.get_type(), size);
+  int ruleno = crush->find_rule(pool.get_crush_rule(), pool.get_type(), size);
   if (ruleno >= 0)
     crush->do_rule(ruleno, pps, *osds, size, osd_weight, pg.pool());
 
@@ -1731,8 +2064,6 @@ int OSDMap::_pg_to_raw_osds(
 
   if (ppps)
     *ppps = pps;
-
-  return osds->size();
 }
 
 int OSDMap::_pick_primary(const vector<int>& osds) const
@@ -1745,7 +2076,7 @@ int OSDMap::_pick_primary(const vector<int>& osds) const
   return -1;
 }
 
-void OSDMap::_apply_remap(const pg_pool_t& pi, pg_t raw_pg, vector<int> *raw) const
+void OSDMap::_apply_upmap(const pg_pool_t& pi, pg_t raw_pg, vector<int> *raw) const
 {
   pg_t pg = pi.raw_pg_to_pg(raw_pg);
   auto p = pg_upmap.find(pg);
@@ -1758,7 +2089,7 @@ void OSDMap::_apply_remap(const pg_pool_t& pi, pg_t raw_pg, vector<int> *raw) co
       }
     }
     *raw = vector<int>(p->second.begin(), p->second.end());
-    return;
+    // continue to check and apply pg_upmap_items if any
   }
 
   auto q = pg_upmap_items.find(pg);
@@ -1785,7 +2116,6 @@ void OSDMap::_apply_remap(const pg_pool_t& pi, pg_t raw_pg, vector<int> *raw) co
       }
       if (!exists && pos >= 0) {
        (*raw)[pos] = r.second;
-       return;
       }
     }
   }
@@ -1905,17 +2235,16 @@ void OSDMap::_get_temp_osds(const pg_pool_t& pool, pg_t pg,
   }
 }
 
-int OSDMap::pg_to_raw_osds(pg_t pg, vector<int> *raw, int *primary) const
+void OSDMap::pg_to_raw_osds(pg_t pg, vector<int> *raw, int *primary) const
 {
   *primary = -1;
   raw->clear();
   const pg_pool_t *pool = get_pg_pool(pg.pool());
   if (!pool)
-    return 0;
-  int r = _pg_to_raw_osds(*pool, pg, raw, NULL);
+    return;
+  _pg_to_raw_osds(*pool, pg, raw, NULL);
   if (primary)
     *primary = _pick_primary(*raw);
-  return r;
 }
 
 void OSDMap::pg_to_raw_up(pg_t pg, vector<int> *up, int *primary) const
@@ -1931,12 +2260,12 @@ void OSDMap::pg_to_raw_up(pg_t pg, vector<int> *up, int *primary) const
   vector<int> raw;
   ps_t pps;
   _pg_to_raw_osds(*pool, pg, &raw, &pps);
-  _apply_remap(*pool, pg, &raw);
+  _apply_upmap(*pool, pg, &raw);
   _raw_to_up_osds(*pool, raw, up);
   *primary = _pick_primary(raw);
   _apply_primary_affinity(pps, *pool, up, primary);
 }
-  
+
 void OSDMap::_pg_to_up_acting_osds(
   const pg_t& pg, vector<int> *up, int *up_primary,
   vector<int> *acting, int *acting_primary,
@@ -1964,7 +2293,7 @@ void OSDMap::_pg_to_up_acting_osds(
   _get_temp_osds(*pool, pg, &_acting, &_acting_primary);
   if (_acting.empty() || up || up_primary) {
     _pg_to_raw_osds(*pool, pg, &raw, &pps);
-    _apply_remap(*pool, pg, &raw);
+    _apply_upmap(*pool, pg, &raw);
     _raw_to_up_osds(*pool, raw, &_up);
     _up_primary = _pick_primary(_up);
     _apply_primary_affinity(pps, *pool, &_up, &_up_primary);
@@ -2020,6 +2349,24 @@ bool OSDMap::primary_changed(
   return false;      // same primary (tho replicas may have changed)
 }
 
+uint64_t OSDMap::get_encoding_features() const
+{
+  uint64_t f = SIGNIFICANT_FEATURES;
+  if (require_osd_release < CEPH_RELEASE_LUMINOUS) {
+    f &= ~(CEPH_FEATURE_SERVER_LUMINOUS |
+          CEPH_FEATURE_CRUSH_CHOOSE_ARGS);
+  }
+  if (require_osd_release < CEPH_RELEASE_KRAKEN) {
+    f &= ~(CEPH_FEATURE_SERVER_KRAKEN |
+          CEPH_FEATURE_MSG_ADDR2);
+  }
+  if (require_osd_release < CEPH_RELEASE_JEWEL) {
+    f &= ~(CEPH_FEATURE_SERVER_JEWEL |
+          CEPH_FEATURE_NEW_OSDOP_ENCODING |
+          CEPH_FEATURE_CRUSH_TUNABLES5);
+  }
+  return f;
+}
 
 // serialize, unserialize
 void OSDMap::encode_client_old(bufferlist& bl) const
@@ -2057,7 +2404,13 @@ void OSDMap::encode_client_old(bufferlist& bl) const
   ::encode(flags, bl);
 
   ::encode(max_osd, bl);
-  ::encode(osd_state, bl);
+  {
+    uint32_t n = osd_state.size();
+    ::encode(n, bl);
+    for (auto s : osd_state) {
+      ::encode((uint8_t)s, bl);
+    }
+  }
   ::encode(osd_weight, bl);
   ::encode(osd_addrs->client_addr, bl, 0);
 
@@ -2099,7 +2452,13 @@ void OSDMap::encode_classic(bufferlist& bl, uint64_t features) const
   ::encode(flags, bl);
 
   ::encode(max_osd, bl);
-  ::encode(osd_state, bl);
+  {
+    uint32_t n = osd_state.size();
+    ::encode(n, bl);
+    for (auto s : osd_state) {
+      ::encode((uint8_t)s, bl);
+    }
+  }
   ::encode(osd_weight, bl);
   ::encode(osd_addrs->client_addr, bl, features);
 
@@ -2146,8 +2505,10 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   ENCODE_START(8, 7, bl);
 
   {
-    uint8_t v = 4;
-    if (!HAVE_FEATURE(features, OSDMAP_PG_UPMAP)) {
+    // NOTE: any new encoding dependencies must be reflected by
+    // SIGNIFICANT_FEATURES
+    uint8_t v = 6;
+    if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       v = 3;
     }
     ENCODE_START(v, 1, bl); // client-usable data
@@ -2161,10 +2522,29 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
     ::encode(pool_name, bl);
     ::encode(pool_max, bl);
 
-    ::encode(flags, bl);
+    if (v < 4) {
+      decltype(flags) f = flags;
+      if (require_osd_release >= CEPH_RELEASE_LUMINOUS)
+       f |= CEPH_OSDMAP_REQUIRE_LUMINOUS | CEPH_OSDMAP_RECOVERY_DELETES;
+      else if (require_osd_release == CEPH_RELEASE_KRAKEN)
+       f |= CEPH_OSDMAP_REQUIRE_KRAKEN;
+      else if (require_osd_release == CEPH_RELEASE_JEWEL)
+       f |= CEPH_OSDMAP_REQUIRE_JEWEL;
+      ::encode(f, bl);
+    } else {
+      ::encode(flags, bl);
+    }
 
     ::encode(max_osd, bl);
-    ::encode(osd_state, bl);
+    if (v >= 5) {
+      ::encode(osd_state, bl);
+    } else {
+      uint32_t n = osd_state.size();
+      ::encode(n, bl);
+      for (auto s : osd_state) {
+       ::encode((uint8_t)s, bl);
+      }
+    }
     ::encode(osd_weight, bl);
     ::encode(osd_addrs->client_addr, bl, features);
 
@@ -2190,11 +2570,16 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
       assert(pg_upmap.empty());
       assert(pg_upmap_items.empty());
     }
+    if (v >= 6) {
+      ::encode(crush_version, bl);
+    }
     ENCODE_FINISH(bl); // client-usable data
   }
 
   {
-    uint8_t target_v = 4;
+    // NOTE: any new encoding dependencies must be reflected by
+    // SIGNIFICANT_FEATURES
+    uint8_t target_v = 5;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       target_v = 1;
     }
@@ -2219,7 +2604,11 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
       ::encode(nearfull_ratio, bl);
       ::encode(full_ratio, bl);
       ::encode(backfillfull_ratio, bl);
+    }
+    // 4 was string-based new_require_min_compat_client
+    if (target_v >= 5) {
       ::encode(require_min_compat_client, bl);
+      ::encode(require_osd_release, bl);
     }
     ENCODE_FINISH(bl); // osd-only data
   }
@@ -2302,7 +2691,14 @@ void OSDMap::decode_classic(bufferlist::iterator& p)
   ::decode(flags, p);
 
   ::decode(max_osd, p);
-  ::decode(osd_state, p);
+  {
+    vector<uint8_t> os;
+    ::decode(os, p);
+    osd_state.resize(os.size());
+    for (unsigned i = 0; i < os.size(); ++i) {
+      osd_state[i] = os[i];
+    }
+  }
   ::decode(osd_weight, p);
   ::decode(osd_addrs->client_addr, p);
   if (v <= 5) {
@@ -2311,7 +2707,9 @@ void OSDMap::decode_classic(bufferlist::iterator& p)
     while (n--) {
       old_pg_t opg;
       ::decode_raw(opg, p);
-      ::decode((*pg_temp)[pg_t(opg)], p);
+      mempool::osdmap::vector<int32_t> v;
+      ::decode(v, p);
+      pg_temp->set(pg_t(opg), v);
     }
   } else {
     ::decode(*pg_temp, p);
@@ -2387,7 +2785,7 @@ void OSDMap::decode(bufferlist::iterator& bl)
    * Since we made it past that hurdle, we can use our normal paths.
    */
   {
-    DECODE_START(4, bl); // client-usable data
+    DECODE_START(6, bl); // client-usable data
     // base
     ::decode(fsid, bl);
     ::decode(epoch, bl);
@@ -2401,7 +2799,16 @@ void OSDMap::decode(bufferlist::iterator& bl)
     ::decode(flags, bl);
 
     ::decode(max_osd, bl);
-    ::decode(osd_state, bl);
+    if (struct_v >= 5) {
+      ::decode(osd_state, bl);
+    } else {
+      vector<uint8_t> os;
+      ::decode(os, bl);
+      osd_state.resize(os.size());
+      for (unsigned i = 0; i < os.size(); ++i) {
+       osd_state[i] = os[i];
+      }
+    }
     ::decode(osd_weight, bl);
     ::decode(osd_addrs->client_addr, bl);
 
@@ -2433,11 +2840,14 @@ void OSDMap::decode(bufferlist::iterator& bl)
       pg_upmap.clear();
       pg_upmap_items.clear();
     }
+    if (struct_v >= 6) {
+      ::decode(crush_version, bl);
+    }
     DECODE_FINISH(bl); // client-usable data
   }
 
   {
-    DECODE_START(4, bl); // extended, osd-only data
+    DECODE_START(5, bl); // extended, osd-only data
     ::decode(osd_addrs->hb_back_addr, bl);
     ::decode(osd_info, bl);
     ::decode(blacklist, bl);
@@ -2459,8 +2869,33 @@ void OSDMap::decode(bufferlist::iterator& bl)
     } else {
       backfillfull_ratio = 0;
     }
-    if (struct_v >= 4)
+    if (struct_v == 4) {
+      string r;
+      ::decode(r, bl);
+      if (r.length())
+       require_min_compat_client = ceph_release_from_name(r.c_str());
+    }
+    if (struct_v >= 5) {
       ::decode(require_min_compat_client, bl);
+      ::decode(require_osd_release, bl);
+      if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+       flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
+       flags |= CEPH_OSDMAP_RECOVERY_DELETES;
+      }
+    } else {
+      if (flags & CEPH_OSDMAP_REQUIRE_LUMINOUS) {
+       // only for compat with post-kraken pre-luminous test clusters
+       require_osd_release = CEPH_RELEASE_LUMINOUS;
+       flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
+       flags |= CEPH_OSDMAP_RECOVERY_DELETES;
+      } else if (flags & CEPH_OSDMAP_REQUIRE_KRAKEN) {
+       require_osd_release = CEPH_RELEASE_KRAKEN;
+      } else if (flags & CEPH_OSDMAP_REQUIRE_JEWEL) {
+       require_osd_release = CEPH_RELEASE_JEWEL;
+      } else {
+       require_osd_release = 0;
+      }
+    }
     DECODE_FINISH(bl); // osd-only data
   }
 
@@ -2529,16 +2964,19 @@ void OSDMap::dump(Formatter *f) const
   f->dump_stream("created") << get_created();
   f->dump_stream("modified") << get_modified();
   f->dump_string("flags", get_flag_string());
+  f->dump_unsigned("crush_version", get_crush_version());
   f->dump_float("full_ratio", full_ratio);
   f->dump_float("backfillfull_ratio", backfillfull_ratio);
   f->dump_float("nearfull_ratio", nearfull_ratio);
   f->dump_string("cluster_snapshot", get_cluster_snapshot());
   f->dump_int("pool_max", get_pool_max());
   f->dump_int("max_osd", get_max_osd());
-  f->dump_string("require_min_compat_client", require_min_compat_client);
-  auto mv = get_min_compat_client();
-  f->dump_string("min_compat_client", mv.first);
-  f->dump_string("min_compat_client_version", mv.second);
+  f->dump_string("require_min_compat_client",
+                ceph_release_name(require_min_compat_client));
+  f->dump_string("min_compat_client",
+                ceph_release_name(get_min_compat_client()));
+  f->dump_string("require_osd_release",
+                ceph_release_name(require_osd_release));
 
   f->open_array_section("pools");
   for (const auto &pool : pools) {
@@ -2620,15 +3058,7 @@ void OSDMap::dump(Formatter *f) const
   }
   f->close_section();
   f->open_array_section("pg_temp");
-  for (const auto &pg : *pg_temp) {
-    f->open_object_section("osds");
-    f->dump_stream("pgid") << pg.first;
-    f->open_array_section("osds");
-    for (const auto osd : pg.second)
-      f->dump_int("osd", osd);
-    f->close_section();
-    f->close_section();
-  }
+  pg_temp->dump(f);
   f->close_section();
 
   f->open_array_section("primary_temp");
@@ -2656,7 +3086,7 @@ void OSDMap::generate_test_instances(list<OSDMap*>& o)
   CephContext *cct = new CephContext(CODE_ENVIRONMENT_UTILITY);
   o.push_back(new OSDMap);
   uuid_d fsid;
-  o.back()->build_simple(cct, 1, fsid, 16, 7, 8);
+  o.back()->build_simple(cct, 1, fsid, 16);
   o.back()->created = o.back()->modified = utime_t(1, 2);  // fix timestamp
   o.back()->blacklist[entity_addr_t()] = utime_t(5, 6);
   cct->put();
@@ -2703,6 +3133,10 @@ string OSDMap::get_flag_string(unsigned f)
     s += ",require_kraken_osds";
   if (f & CEPH_OSDMAP_REQUIRE_LUMINOUS)
     s += ",require_luminous_osds";
+  if (f & CEPH_OSDMAP_RECOVERY_DELETES)
+    s += ",recovery_deletes";
+  if (f & CEPH_OSDMAP_PURGED_SNAPDIRS)
+    s += ",purged_snapdirs";
   if (s.length())
     s.erase(0, 1);
   return s;
@@ -2713,14 +3147,6 @@ string OSDMap::get_flag_string() const
   return get_flag_string(flags);
 }
 
-struct qi {
-  int item;
-  int depth;
-  float weight;
-  qi() : item(0), depth(0), weight(0) {}
-  qi(int i, int d, float w) : item(i), depth(d), weight(w) {}
-};
-
 void OSDMap::print_pools(ostream& out) const
 {
   for (const auto &pool : pools) {
@@ -2749,14 +3175,20 @@ void OSDMap::print(ostream& out) const
       << "modified " << get_modified() << "\n";
 
   out << "flags " << get_flag_string() << "\n";
+  out << "crush_version " << get_crush_version() << "\n";
   out << "full_ratio " << full_ratio << "\n";
   out << "backfillfull_ratio " << backfillfull_ratio << "\n";
   out << "nearfull_ratio " << nearfull_ratio << "\n";
-  if (require_min_compat_client.length()) {
-    out << "require_min_compat_client " << require_min_compat_client << "\n";
+  if (require_min_compat_client > 0) {
+    out << "require_min_compat_client "
+       << ceph_release_name(require_min_compat_client) << "\n";
+  }
+  out << "min_compat_client " << ceph_release_name(get_min_compat_client())
+      << "\n";
+  if (require_osd_release > 0) {
+    out << "require_osd_release " << ceph_release_name(require_osd_release)
+       << "\n";
   }
-  auto mv = get_min_compat_client();
-  out << "min_compat_client " << mv.first << " " << mv.second << "\n";
   if (get_cluster_snapshot().length())
     out << "cluster_snapshot " << get_cluster_snapshot() << "\n";
   out << "\n";
@@ -2808,29 +3240,54 @@ void OSDMap::print(ostream& out) const
 class OSDTreePlainDumper : public CrushTreeDumper::Dumper<TextTable> {
 public:
   typedef CrushTreeDumper::Dumper<TextTable> Parent;
-  OSDTreePlainDumper(const CrushWrapper *crush, const OSDMap *osdmap_)
-    : Parent(crush), osdmap(osdmap_) {}
+
+  OSDTreePlainDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
+                    unsigned f)
+    : Parent(crush, osdmap_->get_pool_names()), osdmap(osdmap_), filter(f) { }
+
+  bool should_dump_leaf(int i) const override {
+    if (!filter) {
+      return true; // normal case
+    }
+    if (((filter & OSDMap::DUMP_UP) && osdmap->is_up(i)) ||
+       ((filter & OSDMap::DUMP_DOWN) && osdmap->is_down(i)) ||
+       ((filter & OSDMap::DUMP_IN) && osdmap->is_in(i)) ||
+       ((filter & OSDMap::DUMP_OUT) && osdmap->is_out(i)) ||
+        ((filter & OSDMap::DUMP_DESTROYED) && osdmap->is_destroyed(i))) {
+      return true;
+    }
+    return false;
+  }
+
+  bool should_dump_empty_bucket() const override {
+    return !filter;
+  }
 
   void dump(TextTable *tbl) {
     tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
-    tbl->define_column("UP/DOWN", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("STATUS", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("PRIMARY-AFFINITY", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("PRI-AFF", TextTable::LEFT, TextTable::RIGHT);
 
     Parent::dump(tbl);
 
     for (int i = 0; i < osdmap->get_max_osd(); i++) {
-      if (osdmap->exists(i) && !is_touched(i))
-       dump_item(CrushTreeDumper::Item(i, 0, 0), tbl);
+      if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i)) {
+       dump_item(CrushTreeDumper::Item(i, 0, 0, 0), tbl);
+      }
     }
   }
 
 protected:
   void dump_item(const CrushTreeDumper::Item &qi, TextTable *tbl) override {
-
+    const char *c = crush->get_item_class(qi.id);
+    if (!c)
+      c = "";
     *tbl << qi.id
+        << c
         << weightf_t(qi.weight);
 
     ostringstream name;
@@ -2849,7 +3306,15 @@ protected:
        *tbl << "DNE"
             << 0;
       } else {
-       *tbl << (osdmap->is_up(qi.id) ? "up" : "down")
+        string s;
+        if (osdmap->is_up(qi.id)) {
+          s = "up";
+        } else if (osdmap->is_destroyed(qi.id)) {
+          s = "destroyed";
+        } else {
+          s = "down";
+        }
+       *tbl << s
             << weightf_t(osdmap->get_weightf(qi.id))
             << weightf_t(osdmap->get_primary_affinityf(qi.id));
       }
@@ -2859,14 +3324,34 @@ protected:
 
 private:
   const OSDMap *osdmap;
+  const unsigned filter;
 };
 
 class OSDTreeFormattingDumper : public CrushTreeDumper::FormattingDumper {
 public:
   typedef CrushTreeDumper::FormattingDumper Parent;
 
-  OSDTreeFormattingDumper(const CrushWrapper *crush, const OSDMap *osdmap_)
-    : Parent(crush), osdmap(osdmap_) {}
+  OSDTreeFormattingDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
+                         unsigned f)
+    : Parent(crush, osdmap_->get_pool_names()), osdmap(osdmap_), filter(f) { }
+
+  bool should_dump_leaf(int i) const override {
+    if (!filter) {
+      return true; // normal case
+    }
+    if (((filter & OSDMap::DUMP_UP) && osdmap->is_up(i)) ||
+        ((filter & OSDMap::DUMP_DOWN) && osdmap->is_down(i)) ||
+        ((filter & OSDMap::DUMP_IN) && osdmap->is_in(i)) ||
+        ((filter & OSDMap::DUMP_OUT) && osdmap->is_out(i)) ||
+        ((filter & OSDMap::DUMP_DESTROYED) && osdmap->is_destroyed(i))) {
+      return true;
+    }
+    return false;
+  }
+
+  bool should_dump_empty_bucket() const override {
+    return !filter;
+  }
 
   void dump(Formatter *f) {
     f->open_array_section("nodes");
@@ -2874,8 +3359,8 @@ public:
     f->close_section();
     f->open_array_section("stray");
     for (int i = 0; i < osdmap->get_max_osd(); i++) {
-      if (osdmap->exists(i) && !is_touched(i))
-       dump_item(CrushTreeDumper::Item(i, 0, 0), f);
+      if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i))
+       dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
     }
     f->close_section();
   }
@@ -2885,8 +3370,16 @@ protected:
     Parent::dump_item_fields(qi, f);
     if (!qi.is_bucket())
     {
+      string s;
+      if (osdmap->is_up(qi.id)) {
+        s = "up";
+      } else if (osdmap->is_destroyed(qi.id)) {
+        s = "destroyed";
+      } else {
+        s = "down";
+      }
       f->dump_unsigned("exists", (int)osdmap->exists(qi.id));
-      f->dump_string("status", osdmap->is_up(qi.id) ? "up" : "down");
+      f->dump_string("status", s);
       f->dump_float("reweight", osdmap->get_weightf(qi.id));
       f->dump_float("primary_affinity", osdmap->get_primary_affinityf(qi.id));
     }
@@ -2894,21 +3387,23 @@ protected:
 
 private:
   const OSDMap *osdmap;
+  const unsigned filter;
 };
 
-void OSDMap::print_tree(Formatter *f, ostream *out) const
+void OSDMap::print_tree(Formatter *f, ostream *out, unsigned filter) const
 {
-  if (f)
-    OSDTreeFormattingDumper(crush.get(), this).dump(f);
-  else {
+  if (f) {
+    OSDTreeFormattingDumper(crush.get(), this, filter).dump(f);
+  else {
     assert(out);
     TextTable tbl;
-    OSDTreePlainDumper(crush.get(), this).dump(&tbl);
+    OSDTreePlainDumper(crush.get(), this, filter).dump(&tbl);
     *out << tbl;
   }
 }
 
-void OSDMap::print_summary(Formatter *f, ostream& out) const
+void OSDMap::print_summary(Formatter *f, ostream& out,
+                          const string& prefix) const
 {
   if (f) {
     f->open_object_section("osdmap");
@@ -2921,8 +3416,7 @@ void OSDMap::print_summary(Formatter *f, ostream& out) const
     f->dump_unsigned("num_remapped_pgs", get_num_pg_temp());
     f->close_section();
   } else {
-    out << "     osdmap e" << get_epoch() << ": "
-       << get_num_osds() << " osds: "
+    out << get_num_osds() << " osds: "
        << get_num_up_osds() << " up, "
        << get_num_in_osds() << " in";
     if (get_num_pg_temp())
@@ -2930,14 +3424,14 @@ void OSDMap::print_summary(Formatter *f, ostream& out) const
     out << "\n";
     uint64_t important_flags = flags & ~CEPH_OSDMAP_SEMIHIDDEN_FLAGS;
     if (important_flags)
-      out << "            flags " << get_flag_string(important_flags) << "\n";
+      out << prefix << "flags " << get_flag_string(important_flags) << "\n";
   }
 }
 
 void OSDMap::print_oneline_summary(ostream& out) const
 {
   out << "e" << get_epoch() << ": "
-      << get_num_osds() << " osds: "
+      << get_num_osds() << " total, "
       << get_num_up_osds() << " up, "
       << get_num_in_osds() << " in";
   if (test_flag(CEPH_OSDMAP_FULL))
@@ -2946,21 +3440,52 @@ void OSDMap::print_oneline_summary(ostream& out) const
     out << " nearfull";
 }
 
-bool OSDMap::crush_ruleset_in_use(int ruleset) const
+bool OSDMap::crush_rule_in_use(int rule_id) const
 {
   for (const auto &pool : pools) {
-    if (pool.second.crush_ruleset == ruleset)
+    if (pool.second.crush_rule == rule_id)
       return true;
   }
   return false;
 }
 
-int OSDMap::build_simple(CephContext *cct, epoch_t e, uuid_d &fsid,
-                         int nosd, int pg_bits, int pgp_bits)
+int OSDMap::validate_crush_rules(CrushWrapper *newcrush,
+                                ostream *ss) const
+{
+  for (auto& i : pools) {
+    auto& pool = i.second;
+    int ruleno = pool.get_crush_rule();
+    if (!newcrush->rule_exists(ruleno)) {
+      *ss << "pool " << i.first << " references crush_rule " << ruleno
+         << " but it is not present";
+      return -EINVAL;
+    }
+    if (newcrush->get_rule_mask_ruleset(ruleno) != ruleno) {
+      *ss << "rule " << ruleno << " mask ruleset does not match rule id";
+      return -EINVAL;
+    }
+    if (newcrush->get_rule_mask_type(ruleno) != (int)pool.get_type()) {
+      *ss << "pool " << i.first << " type does not match rule " << ruleno;
+      return -EINVAL;
+    }
+    if (pool.get_size() < (int)newcrush->get_rule_mask_min_size(ruleno) ||
+       pool.get_size() > (int)newcrush->get_rule_mask_max_size(ruleno)) {
+      *ss << "pool " << i.first << " size " << pool.get_size() << " does not"
+         << " fall within rule " << ruleno
+         << " min_size " << newcrush->get_rule_mask_min_size(ruleno)
+         << " and max_size " << newcrush->get_rule_mask_max_size(ruleno);
+      return -EINVAL;
+    }
+  }
+  return 0;
+}
+
+int OSDMap::build_simple_optioned(CephContext *cct, epoch_t e, uuid_d &fsid,
+                                 int nosd, int pg_bits, int pgp_bits,
+                                 bool default_pool)
 {
-  ldout(cct, 10) << "build_simple on " << num_osd
-                << " osds with " << pg_bits << " pg bits per osd, "
-                << dendl;
+  ldout(cct, 10) << "build_simple on " << nosd
+                << " osds" << dendl;
   epoch = e;
   set_fsid(fsid);
   created = modified = ceph_clock_now();
@@ -2996,12 +3521,6 @@ int OSDMap::build_simple(CephContext *cct, epoch_t e, uuid_d &fsid,
     set_max_osd(maxosd + 1);
   }
 
-  // pgp_num <= pg_num
-  if (pgp_bits > pg_bits)
-    pgp_bits = pg_bits;
-
-  vector<string> pool_names;
-  pool_names.push_back("rbd");
 
   stringstream ss;
   int r;
@@ -3013,30 +3532,40 @@ int OSDMap::build_simple(CephContext *cct, epoch_t e, uuid_d &fsid,
 
   int poolbase = get_max_osd() ? get_max_osd() : 1;
 
-  int const default_replicated_ruleset = crush->get_osd_pool_default_crush_replicated_ruleset(cct);
-  assert(default_replicated_ruleset >= 0);
-
-  for (auto &plname : pool_names) {
-    int64_t pool = ++pool_max;
-    pools[pool].type = pg_pool_t::TYPE_REPLICATED;
-    pools[pool].flags = cct->_conf->osd_pool_default_flags;
-    if (cct->_conf->osd_pool_default_flag_hashpspool)
-      pools[pool].set_flag(pg_pool_t::FLAG_HASHPSPOOL);
-    if (cct->_conf->osd_pool_default_flag_nodelete)
-      pools[pool].set_flag(pg_pool_t::FLAG_NODELETE);
-    if (cct->_conf->osd_pool_default_flag_nopgchange)
-      pools[pool].set_flag(pg_pool_t::FLAG_NOPGCHANGE);
-    if (cct->_conf->osd_pool_default_flag_nosizechange)
-      pools[pool].set_flag(pg_pool_t::FLAG_NOSIZECHANGE);
-    pools[pool].size = cct->_conf->osd_pool_default_size;
-    pools[pool].min_size = cct->_conf->get_osd_pool_default_min_size();
-    pools[pool].crush_ruleset = default_replicated_ruleset;
-    pools[pool].object_hash = CEPH_STR_HASH_RJENKINS;
-    pools[pool].set_pg_num(poolbase << pg_bits);
-    pools[pool].set_pgp_num(poolbase << pgp_bits);
-    pools[pool].last_change = epoch;
-    pool_name[pool] = plname;
-    name_pool[plname] = pool;
+  const int default_replicated_rule = crush->get_osd_pool_default_crush_replicated_ruleset(cct);
+  assert(default_replicated_rule >= 0);
+
+  if (default_pool) {
+    // pgp_num <= pg_num
+    if (pgp_bits > pg_bits)
+      pgp_bits = pg_bits;
+
+    vector<string> pool_names;
+    pool_names.push_back("rbd");
+    for (auto &plname : pool_names) {
+      int64_t pool = ++pool_max;
+      pools[pool].type = pg_pool_t::TYPE_REPLICATED;
+      pools[pool].flags = cct->_conf->osd_pool_default_flags;
+      if (cct->_conf->osd_pool_default_flag_hashpspool)
+       pools[pool].set_flag(pg_pool_t::FLAG_HASHPSPOOL);
+      if (cct->_conf->osd_pool_default_flag_nodelete)
+       pools[pool].set_flag(pg_pool_t::FLAG_NODELETE);
+      if (cct->_conf->osd_pool_default_flag_nopgchange)
+       pools[pool].set_flag(pg_pool_t::FLAG_NOPGCHANGE);
+      if (cct->_conf->osd_pool_default_flag_nosizechange)
+       pools[pool].set_flag(pg_pool_t::FLAG_NOSIZECHANGE);
+      pools[pool].size = cct->_conf->osd_pool_default_size;
+      pools[pool].min_size = cct->_conf->get_osd_pool_default_min_size();
+      pools[pool].crush_rule = default_replicated_rule;
+      pools[pool].object_hash = CEPH_STR_HASH_RJENKINS;
+      pools[pool].set_pg_num(poolbase << pg_bits);
+      pools[pool].set_pgp_num(poolbase << pgp_bits);
+      pools[pool].last_change = epoch;
+      pools[pool].application_metadata.insert(
+        {pg_pool_t::APPLICATION_NAME_RBD, {}});
+      pool_name[pool] = plname;
+      name_pool[plname] = pool;
+    }
   }
 
   for (int i=0; i<get_max_osd(); i++) {
@@ -3104,7 +3633,7 @@ int OSDMap::build_simple_crush_map(CephContext *cct, CrushWrapper& crush,
     crush.insert_item(cct, o, 1.0, name, loc);
   }
 
-  build_simple_crush_rulesets(cct, crush, "default", ss);
+  build_simple_crush_rules(cct, crush, "default", ss);
 
   crush.finalize();
 
@@ -3173,7 +3702,7 @@ int OSDMap::build_simple_crush_map_from_conf(CephContext *cct,
     crush.insert_item(cct, o, 1.0, section, loc);
   }
 
-  build_simple_crush_rulesets(cct, crush, "default", ss);
+  build_simple_crush_rules(cct, crush, "default", ss);
 
   crush.finalize();
 
@@ -3181,23 +3710,21 @@ int OSDMap::build_simple_crush_map_from_conf(CephContext *cct,
 }
 
 
-int OSDMap::build_simple_crush_rulesets(CephContext *cct,
-                                       CrushWrapper& crush,
-                                       const string& root,
-                                       ostream *ss)
+int OSDMap::build_simple_crush_rules(
+  CephContext *cct,
+  CrushWrapper& crush,
+  const string& root,
+  ostream *ss)
 {
-  int crush_ruleset =
-      crush._get_osd_pool_default_crush_replicated_ruleset(cct, true);
+  int crush_rule = crush.get_osd_pool_default_crush_replicated_ruleset(cct);
   string failure_domain =
     crush.get_type_name(cct->_conf->osd_crush_chooseleaf_type);
 
-  if (crush_ruleset == CEPH_DEFAULT_CRUSH_REPLICATED_RULESET)
-    crush_ruleset = -1; // create ruleset 0 by default
-
   int r;
-  r = crush.add_simple_ruleset_at("replicated_ruleset", root, failure_domain,
-                                  "firstn", pg_pool_t::TYPE_REPLICATED,
-                                  crush_ruleset, ss);
+  r = crush.add_simple_rule_at(
+    "replicated_rule", root, failure_domain, "",
+    "firstn", pg_pool_t::TYPE_REPLICATED,
+    crush_rule, ss);
   if (r < 0)
     return r;
   // do not add an erasure rule by default or else we will implicitly
@@ -3225,20 +3752,18 @@ int OSDMap::summarize_mapping_stats(
   vector<unsigned> new_by_osd(get_max_osd(), 0);
   for (int64_t pool_id : ls) {
     const pg_pool_t *pi = get_pg_pool(pool_id);
-    vector<int> up, up2, acting;
-    int up_primary, acting_primary;
+    vector<int> up, up2;
+    int up_primary;
     for (unsigned ps = 0; ps < pi->get_pg_num(); ++ps) {
       pg_t pgid(ps, pool_id, -1);
       total_pg += pi->get_size();
-      pg_to_up_acting_osds(pgid, &up, &up_primary,
-                          &acting, &acting_primary);
+      pg_to_up_acting_osds(pgid, &up, &up_primary, nullptr, nullptr);
       for (int osd : up) {
        if (osd >= 0 && osd < get_max_osd())
          ++base_by_osd[osd];
       }
       if (newmap) {
-       newmap->pg_to_up_acting_osds(pgid, &up2, &up_primary,
-                                    &acting, &acting_primary);
+       newmap->pg_to_up_acting_osds(pgid, &up2, &up_primary, nullptr, nullptr);
        for (int osd : up2) {
          if (osd >= 0 && osd < get_max_osd())
            ++new_by_osd[osd];
@@ -3420,7 +3945,7 @@ bool OSDMap::try_pg_upmap(
   const pg_pool_t *pool = get_pg_pool(pg.pool());
   if (!pool)
     return false;
-  int rule = crush->find_rule(pool->get_crush_ruleset(), pool->get_type(),
+  int rule = crush->find_rule(pool->get_crush_rule(), pool->get_type(),
                              pool->get_size());
   if (rule < 0)
     return false;
@@ -3456,17 +3981,29 @@ bool OSDMap::try_pg_upmap(
 
 int OSDMap::calc_pg_upmaps(
   CephContext *cct,
-  float max_deviation,
+  float max_deviation_ratio,
   int max,
-  const set<int64_t>& only_pools,
+  const set<int64_t>& only_pools_orig,
   OSDMap::Incremental *pending_inc)
 {
+  set<int64_t> only_pools;
+  if (only_pools_orig.empty()) {
+    for (auto& i : pools) {
+      only_pools.insert(i.first);
+    }
+  } else {
+    only_pools = only_pools_orig;
+  }
   OSDMap tmp;
   tmp.deepish_copy_from(*this);
+  float start_deviation = 0;
+  float end_deviation = 0;
   int num_changed = 0;
   while (true) {
     map<int,set<pg_t>> pgs_by_osd;
     int total_pgs = 0;
+    float osd_weight_total = 0;
+    map<int,float> osd_weight;
     for (auto& i : pools) {
       if (!only_pools.empty() && !only_pools.count(i.first))
        continue;
@@ -3480,23 +4017,43 @@ int OSDMap::calc_pg_upmaps(
        }
       }
       total_pgs += i.second.get_size() * i.second.get_pg_num();
+
+      map<int,float> pmap;
+      int ruleno = tmp.crush->find_rule(i.second.get_crush_rule(),
+                                       i.second.get_type(),
+                                       i.second.get_size());
+      tmp.crush->get_rule_weight_osd_map(ruleno, &pmap);
+      ldout(cct,30) << __func__ << " pool " << i.first << " ruleno " << ruleno << dendl;
+      for (auto p : pmap) {
+       auto adjusted_weight = tmp.get_weightf(p.first) * p.second;
+        if (adjusted_weight == 0) {
+          continue;
+        }
+       osd_weight[p.first] += adjusted_weight;
+       osd_weight_total += adjusted_weight;
+      }
     }
-    float osd_weight_total = 0;
-    map<int,float> osd_weight;
-    for (auto& i : pgs_by_osd) {
-      float w = crush->get_item_weightf(i.first);
-      osd_weight[i.first] = w;
-      osd_weight_total += w;
-      ldout(cct, 20) << " osd." << i.first << " weight " << w
-                    << " pgs " << i.second.size() << dendl;
+    for (auto& i : osd_weight) {
+      int pgs = 0;
+      auto p = pgs_by_osd.find(i.first);
+      if (p != pgs_by_osd.end())
+       pgs = p->second.size();
+      else
+       pgs_by_osd.emplace(i.first, set<pg_t>());
+      ldout(cct, 20) << " osd." << i.first << " weight " << i.second
+                    << " pgs " << pgs << dendl;
     }
 
-    // NOTE: we assume we touch all osds with CRUSH!
+    if (osd_weight_total == 0) {
+      lderr(cct) << __func__ << " abort due to osd_weight_total == 0" << dendl;
+      break;
+    }
     float pgs_per_weight = total_pgs / osd_weight_total;
     ldout(cct, 10) << " osd_weight_total " << osd_weight_total << dendl;
     ldout(cct, 10) << " pgs_per_weight " << pgs_per_weight << dendl;
 
     // osd deviation
+    float total_deviation = 0;
     map<int,float> osd_deviation;       // osd, deviation(pgs)
     multimap<float,int> deviation_osd;  // deviation(pgs), osd
     set<int> overfull;
@@ -3510,9 +4067,14 @@ int OSDMap::calc_pg_upmaps(
                     << dendl;
       osd_deviation[i.first] = deviation;
       deviation_osd.insert(make_pair(deviation, i.first));
-      if (deviation > 0)
+      if (deviation >= 1.0)
        overfull.insert(i.first);
+      total_deviation += abs(deviation);
     }
+    if (num_changed == 0) {
+      start_deviation = total_deviation;
+    }
+    end_deviation = total_deviation;
 
     // build underfull, sorted from least-full to most-average
     vector<int> underfull;
@@ -3523,7 +4085,8 @@ int OSDMap::calc_pg_upmaps(
        break;
       underfull.push_back(i->second);
     }
-    ldout(cct, 10) << " overfull " << overfull
+    ldout(cct, 10) << " total_deviation " << total_deviation
+                  << " overfull " << overfull
                   << " underfull " << underfull << dendl;
     if (overfull.empty() || underfull.empty())
       break;
@@ -3532,14 +4095,17 @@ int OSDMap::calc_pg_upmaps(
     bool restart = false;
     for (auto p = deviation_osd.rbegin(); p != deviation_osd.rend(); ++p) {
       int osd = p->second;
+      float deviation = p->first;
+      // make sure osd is still there (belongs to this crush-tree)
+      assert(osd_weight.count(osd));
       float target = osd_weight[osd] * pgs_per_weight;
-      float deviation = deviation_osd.rbegin()->first;
-      if (deviation/target < max_deviation) {
+      assert(target > 0);
+      if (deviation/target < max_deviation_ratio) {
        ldout(cct, 10) << " osd." << osd
                       << " target " << target
                       << " deviation " << deviation
-                      << " -> " << deviation/target
-                      << " < max " << max_deviation << dendl;
+                      << " -> ratio " << deviation/target
+                      << " < max ratio " << max_deviation_ratio << dendl;
        break;
       }
       int num_to_move = deviation;
@@ -3611,5 +4177,766 @@ int OSDMap::calc_pg_upmaps(
       break;
     }
   }
+  ldout(cct, 10) << " start deviation " << start_deviation << dendl;
+  ldout(cct, 10) << " end deviation " << end_deviation << dendl;
   return num_changed;
 }
+
+int OSDMap::get_osds_by_bucket_name(const string &name, set<int> *osds) const
+{
+  return crush->get_leaves(name, osds);
+}
+
+// get pools whose crush rules might reference the given osd
+void OSDMap::get_pool_ids_by_osd(CephContext *cct,
+                                int osd,
+                                set<int64_t> *pool_ids) const
+{
+  assert(pool_ids);
+  set<int> raw_rules;
+  int r = crush->get_rules_by_osd(osd, &raw_rules);
+  if (r < 0) {
+    lderr(cct) << __func__ << " get_rules_by_osd failed: " << cpp_strerror(r)
+               << dendl;
+    assert(r >= 0);
+  }
+  set<int> rules;
+  for (auto &i: raw_rules) {
+    // exclude any dead rule
+    if (crush_rule_in_use(i)) {
+      rules.insert(i);
+    }
+  }
+  for (auto &r: rules) {
+    get_pool_ids_by_rule(r, pool_ids);
+  }
+}
+
+template <typename F>
+class OSDUtilizationDumper : public CrushTreeDumper::Dumper<F> {
+public:
+  typedef CrushTreeDumper::Dumper<F> Parent;
+
+  OSDUtilizationDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
+                      const PGStatService *pgs_, bool tree_) :
+    Parent(crush, osdmap_->get_pool_names()),
+    osdmap(osdmap_),
+    pgs(pgs_),
+    tree(tree_),
+    average_util(average_utilization()),
+    min_var(-1),
+    max_var(-1),
+    stddev(0),
+    sum(0) {
+  }
+
+protected:
+  void dump_stray(F *f) {
+    for (int i = 0; i < osdmap->get_max_osd(); i++) {
+      if (osdmap->exists(i) && !this->is_touched(i))
+       dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
+    }
+  }
+
+  void dump_item(const CrushTreeDumper::Item &qi, F *f) override {
+    if (!tree && qi.is_bucket())
+      return;
+
+    float reweight = qi.is_bucket() ? -1 : osdmap->get_weightf(qi.id);
+    int64_t kb = 0, kb_used = 0, kb_avail = 0;
+    double util = 0;
+    if (get_bucket_utilization(qi.id, &kb, &kb_used, &kb_avail))
+      if (kb_used && kb)
+        util = 100.0 * (double)kb_used / (double)kb;
+
+    double var = 1.0;
+    if (average_util)
+      var = util / average_util;
+
+    size_t num_pgs = qi.is_bucket() ? 0 : pgs->get_num_pg_by_osd(qi.id);
+
+    dump_item(qi, reweight, kb, kb_used, kb_avail, util, var, num_pgs, f);
+
+    if (!qi.is_bucket() && reweight > 0) {
+      if (min_var < 0 || var < min_var)
+       min_var = var;
+      if (max_var < 0 || var > max_var)
+       max_var = var;
+
+      double dev = util - average_util;
+      dev *= dev;
+      stddev += reweight * dev;
+      sum += reweight;
+    }
+  }
+
+  virtual void dump_item(const CrushTreeDumper::Item &qi,
+                        float &reweight,
+                        int64_t kb,
+                        int64_t kb_used,
+                        int64_t kb_avail,
+                        double& util,
+                        double& var,
+                        const size_t num_pgs,
+                        F *f) = 0;
+
+  double dev() {
+    return sum > 0 ? sqrt(stddev / sum) : 0;
+  }
+
+  double average_utilization() {
+    int64_t kb = 0, kb_used = 0;
+    for (int i = 0; i < osdmap->get_max_osd(); i++) {
+      if (!osdmap->exists(i) || osdmap->get_weight(i) == 0)
+       continue;
+      int64_t kb_i, kb_used_i, kb_avail_i;
+      if (get_osd_utilization(i, &kb_i, &kb_used_i, &kb_avail_i)) {
+       kb += kb_i;
+       kb_used += kb_used_i;
+      }
+    }
+    return kb > 0 ? 100.0 * (double)kb_used / (double)kb : 0;
+  }
+
+  bool get_osd_utilization(int id, int64_t* kb, int64_t* kb_used,
+                          int64_t* kb_avail) const {
+    const osd_stat_t *p = pgs->get_osd_stat(id);
+    if (!p) return false;
+    *kb = p->kb;
+    *kb_used = p->kb_used;
+    *kb_avail = p->kb_avail;
+    return *kb > 0;
+  }
+
+  bool get_bucket_utilization(int id, int64_t* kb, int64_t* kb_used,
+                             int64_t* kb_avail) const {
+    if (id >= 0) {
+      if (osdmap->is_out(id)) {
+        *kb = 0;
+        *kb_used = 0;
+        *kb_avail = 0;
+        return true;
+      }
+      return get_osd_utilization(id, kb, kb_used, kb_avail);
+    }
+
+    *kb = 0;
+    *kb_used = 0;
+    *kb_avail = 0;
+
+    for (int k = osdmap->crush->get_bucket_size(id) - 1; k >= 0; k--) {
+      int item = osdmap->crush->get_bucket_item(id, k);
+      int64_t kb_i = 0, kb_used_i = 0, kb_avail_i = 0;
+      if (!get_bucket_utilization(item, &kb_i, &kb_used_i, &kb_avail_i))
+       return false;
+      *kb += kb_i;
+      *kb_used += kb_used_i;
+      *kb_avail += kb_avail_i;
+    }
+    return *kb > 0;
+  }
+
+protected:
+  const OSDMap *osdmap;
+  const PGStatService *pgs;
+  bool tree;
+  double average_util;
+  double min_var;
+  double max_var;
+  double stddev;
+  double sum;
+};
+
+
+class OSDUtilizationPlainDumper : public OSDUtilizationDumper<TextTable> {
+public:
+  typedef OSDUtilizationDumper<TextTable> Parent;
+
+  OSDUtilizationPlainDumper(const CrushWrapper *crush, const OSDMap *osdmap,
+                    const PGStatService *pgs, bool tree) :
+    Parent(crush, osdmap, pgs, tree) {}
+
+  void dump(TextTable *tbl) {
+    tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("SIZE", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("USE", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("AVAIL", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("%USE", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("VAR", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("PGS", TextTable::LEFT, TextTable::RIGHT);
+    if (tree)
+      tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
+
+    Parent::dump(tbl);
+
+    dump_stray(tbl);
+
+    *tbl << ""
+        << ""
+        << "" << "TOTAL"
+        << byte_u_t(pgs->get_osd_sum().kb << 10)
+        << byte_u_t(pgs->get_osd_sum().kb_used << 10)
+        << byte_u_t(pgs->get_osd_sum().kb_avail << 10)
+        << lowprecision_t(average_util)
+        << ""
+        << TextTable::endrow;
+  }
+
+protected:
+  struct lowprecision_t {
+    float v;
+    explicit lowprecision_t(float _v) : v(_v) {}
+  };
+  friend std::ostream &operator<<(ostream& out, const lowprecision_t& v);
+
+  using OSDUtilizationDumper<TextTable>::dump_item;
+  void dump_item(const CrushTreeDumper::Item &qi,
+                        float &reweight,
+                        int64_t kb,
+                        int64_t kb_used,
+                        int64_t kb_avail,
+                        double& util,
+                        double& var,
+                        const size_t num_pgs,
+                        TextTable *tbl) override {
+    const char *c = crush->get_item_class(qi.id);
+    if (!c)
+      c = "";
+    *tbl << qi.id
+        << c
+        << weightf_t(qi.weight)
+        << weightf_t(reweight)
+        << byte_u_t(kb << 10)
+        << byte_u_t(kb_used << 10)
+        << byte_u_t(kb_avail << 10)
+        << lowprecision_t(util)
+        << lowprecision_t(var);
+
+    if (qi.is_bucket()) {
+      *tbl << "-";
+    } else {
+      *tbl << num_pgs;
+    }
+
+    if (tree) {
+      ostringstream name;
+      for (int k = 0; k < qi.depth; k++)
+       name << "    ";
+      if (qi.is_bucket()) {
+       int type = crush->get_bucket_type(qi.id);
+       name << crush->get_type_name(type) << " "
+            << crush->get_item_name(qi.id);
+      } else {
+       name << "osd." << qi.id;
+      }
+      *tbl << name.str();
+    }
+
+    *tbl << TextTable::endrow;
+  }
+
+public:
+  string summary() {
+    ostringstream out;
+    out << "MIN/MAX VAR: " << lowprecision_t(min_var)
+       << "/" << lowprecision_t(max_var) << "  "
+       << "STDDEV: " << lowprecision_t(dev());
+    return out.str();
+  }
+};
+
+ostream& operator<<(ostream& out,
+                   const OSDUtilizationPlainDumper::lowprecision_t& v)
+{
+  if (v.v < -0.01) {
+    return out << "-";
+  } else if (v.v < 0.001) {
+    return out << "0";
+  } else {
+    std::streamsize p = out.precision();
+    return out << std::fixed << std::setprecision(2) << v.v << std::setprecision(p);
+  }
+}
+
+class OSDUtilizationFormatDumper : public OSDUtilizationDumper<Formatter> {
+public:
+  typedef OSDUtilizationDumper<Formatter> Parent;
+
+  OSDUtilizationFormatDumper(const CrushWrapper *crush, const OSDMap *osdmap,
+                            const PGStatService *pgs, bool tree) :
+    Parent(crush, osdmap, pgs, tree) {}
+
+  void dump(Formatter *f) {
+    f->open_array_section("nodes");
+    Parent::dump(f);
+    f->close_section();
+
+    f->open_array_section("stray");
+    dump_stray(f);
+    f->close_section();
+  }
+
+protected:
+  using OSDUtilizationDumper<Formatter>::dump_item;
+  void dump_item(const CrushTreeDumper::Item &qi,
+                        float &reweight,
+                        int64_t kb,
+                        int64_t kb_used,
+                        int64_t kb_avail,
+                        double& util,
+                        double& var,
+                        const size_t num_pgs,
+                        Formatter *f) override {
+    f->open_object_section("item");
+    CrushTreeDumper::dump_item_fields(crush, weight_set_names, qi, f);
+    f->dump_float("reweight", reweight);
+    f->dump_int("kb", kb);
+    f->dump_int("kb_used", kb_used);
+    f->dump_int("kb_avail", kb_avail);
+    f->dump_float("utilization", util);
+    f->dump_float("var", var);
+    f->dump_unsigned("pgs", num_pgs);
+    CrushTreeDumper::dump_bucket_children(crush, qi, f);
+    f->close_section();
+  }
+
+public:
+  void summary(Formatter *f) {
+    f->open_object_section("summary");
+    f->dump_int("total_kb", pgs->get_osd_sum().kb);
+    f->dump_int("total_kb_used", pgs->get_osd_sum().kb_used);
+    f->dump_int("total_kb_avail", pgs->get_osd_sum().kb_avail);
+    f->dump_float("average_utilization", average_util);
+    f->dump_float("min_var", min_var);
+    f->dump_float("max_var", max_var);
+    f->dump_float("dev", dev());
+    f->close_section();
+  }
+};
+
+void print_osd_utilization(const OSDMap& osdmap,
+                          const PGStatService *pgstat,
+                          ostream& out,
+                          Formatter *f,
+                          bool tree)
+{
+  const CrushWrapper *crush = osdmap.crush.get();
+  if (f) {
+    f->open_object_section("df");
+    OSDUtilizationFormatDumper d(crush, &osdmap, pgstat, tree);
+    d.dump(f);
+    d.summary(f);
+    f->close_section();
+    f->flush(out);
+  } else {
+    OSDUtilizationPlainDumper d(crush, &osdmap, pgstat, tree);
+    TextTable tbl;
+    d.dump(&tbl);
+    out << tbl << d.summary() << "\n";
+  }
+}
+
+void OSDMap::check_health(health_check_map_t *checks) const
+{
+  int num_osds = get_num_osds();
+
+  // OSD_DOWN
+  // OSD_$subtree_DOWN
+  // OSD_ORPHAN
+  if (num_osds >= 0) {
+    int num_in_osds = 0;
+    int num_down_in_osds = 0;
+    set<int> osds;
+    set<int> down_in_osds;
+    set<int> up_in_osds;
+    set<int> subtree_up;
+    unordered_map<int, set<int> > subtree_type_down;
+    unordered_map<int, int> num_osds_subtree;
+    int max_type = crush->get_max_type_id();
+
+    for (int i = 0; i < get_max_osd(); i++) {
+      if (!exists(i)) {
+        if (crush->item_exists(i)) {
+          osds.insert(i);
+        }
+       continue;
+      }
+      if (is_out(i))
+        continue;
+      ++num_in_osds;
+      if (down_in_osds.count(i) || up_in_osds.count(i))
+       continue;
+      if (!is_up(i)) {
+       down_in_osds.insert(i);
+       int parent_id = 0;
+       int current = i;
+       for (int type = 0; type <= max_type; type++) {
+         if (!crush->get_type_name(type))
+           continue;
+         int r = crush->get_immediate_parent_id(current, &parent_id);
+         if (r == -ENOENT)
+           break;
+         // break early if this parent is already marked as up
+         if (subtree_up.count(parent_id))
+           break;
+         type = crush->get_bucket_type(parent_id);
+         if (!subtree_type_is_down(
+               g_ceph_context, parent_id, type,
+               &down_in_osds, &up_in_osds, &subtree_up, &subtree_type_down))
+           break;
+         current = parent_id;
+       }
+      }
+    }
+
+    // calculate the number of down osds in each down subtree and
+    // store it in num_osds_subtree
+    for (int type = 1; type <= max_type; type++) {
+      if (!crush->get_type_name(type))
+       continue;
+      for (auto j = subtree_type_down[type].begin();
+          j != subtree_type_down[type].end();
+          ++j) {
+       list<int> children;
+       int num = 0;
+       int num_children = crush->get_children(*j, &children);
+       if (num_children == 0)
+         continue;
+       for (auto l = children.begin(); l != children.end(); ++l) {
+         if (*l >= 0) {
+           ++num;
+         } else if (num_osds_subtree[*l] > 0) {
+           num = num + num_osds_subtree[*l];
+         }
+       }
+       num_osds_subtree[*j] = num;
+      }
+    }
+    num_down_in_osds = down_in_osds.size();
+    assert(num_down_in_osds <= num_in_osds);
+    if (num_down_in_osds > 0) {
+      // summary of down subtree types and osds
+      for (int type = max_type; type > 0; type--) {
+       if (!crush->get_type_name(type))
+         continue;
+       if (subtree_type_down[type].size() > 0) {
+         ostringstream ss;
+         ss << subtree_type_down[type].size() << " "
+            << crush->get_type_name(type);
+         if (subtree_type_down[type].size() > 1) {
+           ss << "s";
+         }
+         int sum_down_osds = 0;
+         for (auto j = subtree_type_down[type].begin();
+              j != subtree_type_down[type].end();
+              ++j) {
+           sum_down_osds = sum_down_osds + num_osds_subtree[*j];
+         }
+          ss << " (" << sum_down_osds << " osds) down";
+         string err = string("OSD_") +
+           string(crush->get_type_name(type)) + "_DOWN";
+         boost::to_upper(err);
+         auto& d = checks->add(err, HEALTH_WARN, ss.str());
+         for (auto j = subtree_type_down[type].rbegin();
+              j != subtree_type_down[type].rend();
+              ++j) {
+           ostringstream ss;
+           ss << crush->get_type_name(type);
+           ss << " ";
+           ss << crush->get_item_name(*j);
+           // at the top level, do not print location
+           if (type != max_type) {
+              ss << " (";
+              ss << crush->get_full_location_ordered_string(*j);
+              ss << ")";
+           }
+           int num = num_osds_subtree[*j];
+           ss << " (" << num << " osds)";
+           ss << " is down";
+           d.detail.push_back(ss.str());
+         }
+       }
+      }
+      ostringstream ss;
+      ss << down_in_osds.size() << " osds down";
+      auto& d = checks->add("OSD_DOWN", HEALTH_WARN, ss.str());
+      for (auto it = down_in_osds.begin(); it != down_in_osds.end(); ++it) {
+       ostringstream ss;
+       ss << "osd." << *it << " (";
+       ss << crush->get_full_location_ordered_string(*it);
+       ss << ") is down";
+       d.detail.push_back(ss.str());
+      }
+    }
+
+    if (!osds.empty()) {
+      ostringstream ss;
+      ss << osds.size() << " osds exist in the crush map but not in the osdmap";
+      auto& d = checks->add("OSD_ORPHAN", HEALTH_WARN, ss.str());
+      for (auto osd : osds) {
+       ostringstream ss;
+       ss << "osd." << osd << " exists in crush map but not in osdmap";
+       d.detail.push_back(ss.str());
+      }
+    }
+  }
+
+  // OSD_OUT_OF_ORDER_FULL
+  {
+    // An osd could configure failsafe ratio, to something different
+    // but for now assume it is the same here.
+    float fsr = g_conf->osd_failsafe_full_ratio;
+    if (fsr > 1.0) fsr /= 100;
+    float fr = get_full_ratio();
+    float br = get_backfillfull_ratio();
+    float nr = get_nearfull_ratio();
+
+    list<string> detail;
+    // These checks correspond to how OSDService::check_full_status() in an OSD
+    // handles the improper setting of these values.
+    if (br < nr) {
+      ostringstream ss;
+      ss << "backfillfull_ratio (" << br
+        << ") < nearfull_ratio (" << nr << "), increased";
+      detail.push_back(ss.str());
+      br = nr;
+    }
+    if (fr < br) {
+      ostringstream ss;
+      ss << "full_ratio (" << fr << ") < backfillfull_ratio (" << br
+        << "), increased";
+      detail.push_back(ss.str());
+      fr = br;
+    }
+    if (fsr < fr) {
+      ostringstream ss;
+      ss << "osd_failsafe_full_ratio (" << fsr << ") < full_ratio (" << fr
+        << "), increased";
+      detail.push_back(ss.str());
+    }
+    if (!detail.empty()) {
+      auto& d = checks->add("OSD_OUT_OF_ORDER_FULL", HEALTH_ERR,
+                        "full ratio(s) out of order");
+      d.detail.swap(detail);
+    }
+  }
+
+  // OSD_FULL
+  // OSD_NEARFULL
+  // OSD_BACKFILLFULL
+  // OSD_FAILSAFE_FULL
+  {
+    set<int> full, backfillfull, nearfull;
+    get_full_osd_counts(&full, &backfillfull, &nearfull);
+    if (full.size()) {
+      ostringstream ss;
+      ss << full.size() << " full osd(s)";
+      auto& d = checks->add("OSD_FULL", HEALTH_ERR, ss.str());
+      for (auto& i: full) {
+       ostringstream ss;
+       ss << "osd." << i << " is full";
+       d.detail.push_back(ss.str());
+      }
+    }
+    if (backfillfull.size()) {
+      ostringstream ss;
+      ss << backfillfull.size() << " backfillfull osd(s)";
+      auto& d = checks->add("OSD_BACKFILLFULL", HEALTH_WARN, ss.str());
+      for (auto& i: backfillfull) {
+       ostringstream ss;
+       ss << "osd." << i << " is backfill full";
+       d.detail.push_back(ss.str());
+      }
+    }
+    if (nearfull.size()) {
+      ostringstream ss;
+      ss << nearfull.size() << " nearfull osd(s)";
+      auto& d = checks->add("OSD_NEARFULL", HEALTH_WARN, ss.str());
+      for (auto& i: nearfull) {
+       ostringstream ss;
+       ss << "osd." << i << " is near full";
+       d.detail.push_back(ss.str());
+      }
+    }
+  }
+
+  // OSDMAP_FLAGS
+  {
+    // warn about flags
+    uint64_t warn_flags =
+      CEPH_OSDMAP_NEARFULL |
+      CEPH_OSDMAP_FULL |
+      CEPH_OSDMAP_PAUSERD |
+      CEPH_OSDMAP_PAUSEWR |
+      CEPH_OSDMAP_PAUSEREC |
+      CEPH_OSDMAP_NOUP |
+      CEPH_OSDMAP_NODOWN |
+      CEPH_OSDMAP_NOIN |
+      CEPH_OSDMAP_NOOUT |
+      CEPH_OSDMAP_NOBACKFILL |
+      CEPH_OSDMAP_NORECOVER |
+      CEPH_OSDMAP_NOSCRUB |
+      CEPH_OSDMAP_NODEEP_SCRUB |
+      CEPH_OSDMAP_NOTIERAGENT |
+      CEPH_OSDMAP_NOREBALANCE;
+    if (test_flag(warn_flags)) {
+      ostringstream ss;
+      ss << get_flag_string(get_flags() & warn_flags)
+        << " flag(s) set";
+      checks->add("OSDMAP_FLAGS", HEALTH_WARN, ss.str());
+    }
+  }
+
+  // OSD_FLAGS
+  {
+    list<string> detail;
+    const unsigned flags =
+      CEPH_OSD_NOUP |
+      CEPH_OSD_NOIN |
+      CEPH_OSD_NODOWN |
+      CEPH_OSD_NOOUT;
+    for (int i = 0; i < max_osd; ++i) {
+      if (osd_state[i] & flags) {
+       ostringstream ss;
+       set<string> states;
+       OSDMap::calc_state_set(osd_state[i] & flags, states);
+       ss << "osd." << i << " has flags " << states;
+       detail.push_back(ss.str());
+      }
+    }
+    if (!detail.empty()) {
+      ostringstream ss;
+      ss << detail.size() << " osd(s) have {NOUP,NODOWN,NOIN,NOOUT} flags set";
+      auto& d = checks->add("OSD_FLAGS", HEALTH_WARN, ss.str());
+      d.detail.swap(detail);
+    }
+  }
+
+  // OLD_CRUSH_TUNABLES
+  if (g_conf->mon_warn_on_legacy_crush_tunables) {
+    string min = crush->get_min_required_version();
+    if (min < g_conf->mon_crush_min_required_version) {
+      ostringstream ss;
+      ss << "crush map has legacy tunables (require " << min
+        << ", min is " << g_conf->mon_crush_min_required_version << ")";
+      auto& d = checks->add("OLD_CRUSH_TUNABLES", HEALTH_WARN, ss.str());
+      d.detail.push_back("see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
+    }
+  }
+
+  // OLD_CRUSH_STRAW_CALC_VERSION
+  if (g_conf->mon_warn_on_crush_straw_calc_version_zero) {
+    if (crush->get_straw_calc_version() == 0) {
+      ostringstream ss;
+      ss << "crush map has straw_calc_version=0";
+      auto& d = checks->add("OLD_CRUSH_STRAW_CALC_VERSION", HEALTH_WARN, ss.str());
+      d.detail.push_back(
+       "see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
+    }
+  }
+
+  // CACHE_POOL_NO_HIT_SET
+  if (g_conf->mon_warn_on_cache_pools_without_hit_sets) {
+    list<string> detail;
+    for (map<int64_t, pg_pool_t>::const_iterator p = pools.begin();
+        p != pools.end();
+        ++p) {
+      const pg_pool_t& info = p->second;
+      if (info.cache_mode_requires_hit_set() &&
+         info.hit_set_params.get_type() == HitSet::TYPE_NONE) {
+       ostringstream ss;
+       ss << "pool '" << get_pool_name(p->first)
+          << "' with cache_mode " << info.get_cache_mode_name()
+          << " needs hit_set_type to be set but it is not";
+       detail.push_back(ss.str());
+      }
+    }
+    if (!detail.empty()) {
+      ostringstream ss;
+      ss << detail.size() << " cache pools are missing hit_sets";
+      auto& d = checks->add("CACHE_POOL_NO_HIT_SET", HEALTH_WARN, ss.str());
+      d.detail.swap(detail);
+    }
+  }
+
+  // OSD_NO_SORTBITWISE
+  if (!test_flag(CEPH_OSDMAP_SORTBITWISE) &&
+      (get_up_osd_features() &
+       CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT)) {
+    ostringstream ss;
+    ss << "no legacy OSD present but 'sortbitwise' flag is not set";
+    checks->add("OSD_NO_SORTBITWISE", HEALTH_WARN, ss.str());
+  }
+
+  // OSD_UPGRADE_FINISHED
+  // none of these (yet) since we don't run until luminous upgrade is done.
+
+  // POOL_NEARFULL/BACKFILLFULL/FULL
+  {
+    list<string> full_detail, backfillfull_detail, nearfull_detail;
+    for (auto it : get_pools()) {
+      const pg_pool_t &pool = it.second;
+      const string& pool_name = get_pool_name(it.first);
+      if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
+       stringstream ss;
+        if (pool.has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+          // may run out of space too,
+          // but we want EQUOTA taking precedence
+          ss << "pool '" << pool_name << "' is full (no quota)";
+        } else {
+          ss << "pool '" << pool_name << "' is full (no space)";
+        }
+       full_detail.push_back(ss.str());
+      } else if (pool.has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
+        stringstream ss;
+        ss << "pool '" << pool_name << "' is backfillfull";
+        backfillfull_detail.push_back(ss.str());
+      } else if (pool.has_flag(pg_pool_t::FLAG_NEARFULL)) {
+        stringstream ss;
+        ss << "pool '" << pool_name << "' is nearfull";
+        nearfull_detail.push_back(ss.str());
+      }
+    }
+    if (!full_detail.empty()) {
+      ostringstream ss;
+      ss << full_detail.size() << " pool(s) full";
+      auto& d = checks->add("POOL_FULL", HEALTH_WARN, ss.str());
+      d.detail.swap(full_detail);
+    }
+    if (!backfillfull_detail.empty()) {
+      ostringstream ss;
+      ss << backfillfull_detail.size() << " pool(s) backfillfull";
+      auto& d = checks->add("POOL_BACKFILLFULL", HEALTH_WARN, ss.str());
+      d.detail.swap(backfillfull_detail);
+    }
+    if (!nearfull_detail.empty()) {
+      ostringstream ss;
+      ss << nearfull_detail.size() << " pool(s) nearfull";
+      auto& d = checks->add("POOL_NEARFULL", HEALTH_WARN, ss.str());
+      d.detail.swap(nearfull_detail);
+    }
+  }
+}
+
+int OSDMap::parse_osd_id_list(const vector<string>& ls, set<int> *out,
+                             ostream *ss) const
+{
+  out->clear();
+  for (auto i = ls.begin(); i != ls.end(); ++i) {
+    if (i == ls.begin() &&
+       (*i == "any" || *i == "all" || *i == "*")) {
+      get_all_osds(*out);
+      break;
+    }
+    long osd = parse_osd_id(i->c_str(), ss);
+    if (osd < 0) {
+      *ss << "invalid osd id '" << *i << "'";
+      return -EINVAL;
+    }
+    out->insert(osd);
+  }
+  return 0;
+}