]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSDMap.cc
import ceph 15.2.13
[ceph.git] / ceph / src / osd / OSDMap.cc
index 0c10811a88f58d2aaf58c16990a4988d872fce50..02a2e3d53cdde30f90f7f96a3d212eb99d3cdcc2 100644 (file)
@@ -26,8 +26,8 @@
 #include "common/errno.h"
 #include "common/Formatter.h"
 #include "common/TextTable.h"
-#include "global/global_context.h"
 #include "include/ceph_features.h"
+#include "include/common_fwd.h"
 #include "include/str_map.h"
 
 #include "common/code_environment.h"
 #include "common/Clock.h"
 #include "mon/PGMap.h"
 
+using std::list;
+using std::make_pair;
+using std::map;
+using std::multimap;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::unordered_map;
+using std::vector;
+
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+
 #define dout_subsys ceph_subsys_osd
 
 MEMPOOL_DEFINE_OBJECT_FACTORY(OSDMap, osdmap, osdmap);
@@ -56,7 +73,7 @@ void osd_info_t::dump(Formatter *f) const
   f->dump_int("lost_at", lost_at);
 }
 
-void osd_info_t::encode(bufferlist& bl) const
+void osd_info_t::encode(ceph::buffer::list& bl) const
 {
   using ceph::encode;
   __u8 struct_v = 1;
@@ -69,7 +86,7 @@ void osd_info_t::encode(bufferlist& bl) const
   encode(lost_at, bl);
 }
 
-void osd_info_t::decode(bufferlist::const_iterator& bl)
+void osd_info_t::decode(ceph::buffer::list::const_iterator& bl)
 {
   using ceph::decode;
   __u8 struct_v;
@@ -115,23 +132,33 @@ void osd_xinfo_t::dump(Formatter *f) const
   f->dump_int("laggy_interval", laggy_interval);
   f->dump_int("features", features);
   f->dump_unsigned("old_weight", old_weight);
+  f->dump_stream("last_purged_snaps_scrub") << last_purged_snaps_scrub;
+  f->dump_int("dead_epoch", dead_epoch);
 }
 
-void osd_xinfo_t::encode(bufferlist& bl) const
+void osd_xinfo_t::encode(ceph::buffer::list& bl, uint64_t enc_features) const
 {
-  ENCODE_START(3, 1, bl);
+  uint8_t v = 4;
+  if (!HAVE_FEATURE(enc_features, SERVER_OCTOPUS)) {
+    v = 3;
+  }
+  ENCODE_START(v, 1, bl);
   encode(down_stamp, bl);
   __u32 lp = laggy_probability * 0xfffffffful;
   encode(lp, bl);
   encode(laggy_interval, bl);
   encode(features, bl);
   encode(old_weight, bl);
+  if (v >= 4) {
+    encode(last_purged_snaps_scrub, bl);
+    encode(dead_epoch, bl);
+  }
   ENCODE_FINISH(bl);
 }
 
-void osd_xinfo_t::decode(bufferlist::const_iterator& bl)
+void osd_xinfo_t::decode(ceph::buffer::list::const_iterator& bl)
 {
-  DECODE_START(3, bl);
+  DECODE_START(4, bl);
   decode(down_stamp, bl);
   __u32 lp;
   decode(lp, bl);
@@ -145,6 +172,12 @@ void osd_xinfo_t::decode(bufferlist::const_iterator& bl)
     decode(old_weight, bl);
   else
     old_weight = 0;
+  if (struct_v >= 4) {
+    decode(last_purged_snaps_scrub, bl);
+    decode(dead_epoch, bl);
+  } else {
+    dead_epoch = 0;
+  }
   DECODE_FINISH(bl);
 }
 
@@ -163,7 +196,9 @@ ostream& operator<<(ostream& out, const osd_xinfo_t& xi)
   return out << "down_stamp " << xi.down_stamp
             << " laggy_probability " << xi.laggy_probability
             << " laggy_interval " << xi.laggy_interval
-            << " old_weight " << xi.old_weight;
+            << " old_weight " << xi.old_weight
+            << " last_purged_snaps_scrub " << xi.last_purged_snaps_scrub
+            << " dead_epoch " << xi.dead_epoch;
 }
 
 // ----------------------------------
@@ -203,8 +238,8 @@ int OSDMap::Incremental::identify_osd(uuid_d u) const
   return -1;
 }
 
-int OSDMap::Incremental::propagate_snaps_to_tiers(CephContext *cct,
-                                                 const OSDMap& osdmap)
+int OSDMap::Incremental::propagate_base_properties_to_tiers(CephContext *cct,
+                                                           const OSDMap& osdmap)
 {
   ceph_assert(epoch == osdmap.get_epoch() + 1);
 
@@ -244,6 +279,8 @@ int OSDMap::Incremental::propagate_snaps_to_tiers(CephContext *cct,
        if (new_rem_it != new_removed_snaps.end()) {
          new_removed_snaps[tier_pool] = new_rem_it->second;
        }
+
+       tier->application_metadata = base.application_metadata;
       }
     }
   }
@@ -356,7 +393,7 @@ bool OSDMap::subtree_type_is_down(
   return true;
 }
 
-void OSDMap::Incremental::encode_client_old(bufferlist& bl) const
+void OSDMap::Incremental::encode_client_old(ceph::buffer::list& bl) const
 {
   using ceph::encode;
   __u16 v = 5;
@@ -398,11 +435,21 @@ void OSDMap::Incremental::encode_client_old(bufferlist& bl) const
   encode(new_up_client, bl, 0);
   {
     // legacy is map<int32_t,uint8_t>
-    uint32_t n = new_state.size();
-    encode(n, bl);
+    map<int32_t, uint8_t> os;
     for (auto p : new_state) {
+      // new_state may only inculde some new flags(e.g., CEPH_OSD_NOOUT)
+      // that an old client could not understand.
+      // skip those!
+      uint8_t s = p.second;
+      if (p.second != 0 && s == 0)
+        continue;
+      os[p.first] = s;
+    }
+    uint32_t n = os.size();
+    encode(n, bl);
+    for (auto p : os) {
       encode(p.first, bl);
-      encode((uint8_t)p.second, bl);
+      encode(p.second, bl);
     }
   }
   encode(new_weight, bl);
@@ -417,7 +464,7 @@ void OSDMap::Incremental::encode_client_old(bufferlist& bl) const
   }
 }
 
-void OSDMap::Incremental::encode_classic(bufferlist& bl, uint64_t features) const
+void OSDMap::Incremental::encode_classic(ceph::buffer::list& bl, uint64_t features) const
 {
   using ceph::encode;
   if ((features & CEPH_FEATURE_PGID64) == 0) {
@@ -442,11 +489,21 @@ void OSDMap::Incremental::encode_classic(bufferlist& bl, uint64_t features) cons
   encode(old_pools, bl);
   encode(new_up_client, bl, features);
   {
-    uint32_t n = new_state.size();
-    encode(n, bl);
+    map<int32_t, uint8_t> os;
     for (auto p : new_state) {
+      // new_state may only inculde some new flags(e.g., CEPH_OSD_NOOUT)
+      // that an old client could not understand.
+      // skip those!
+      uint8_t s = p.second;
+      if (p.second != 0 && s == 0)
+        continue;
+      os[p.first] = s;
+    }
+    uint32_t n = os.size();
+    encode(n, bl);
+    for (auto p : os) {
       encode(p.first, bl);
-      encode((uint8_t)p.second, bl);
+      encode(p.second, bl);
     }
   }
   encode(new_weight, bl);
@@ -464,12 +521,12 @@ void OSDMap::Incremental::encode_classic(bufferlist& bl, uint64_t features) cons
   encode(new_up_cluster, bl, features);
   encode(cluster_snapshot, bl);
   encode(new_uuid, bl);
-  encode(new_xinfo, bl);
+  encode(new_xinfo, bl, features);
   encode(new_hb_front_up, bl, features);
 }
 
 template<class T>
-static void encode_addrvec_map_as_addr(const T& m, bufferlist& bl, uint64_t f)
+static void encode_addrvec_map_as_addr(const T& m, ceph::buffer::list& bl, uint64_t f)
 {
   uint32_t n = m.size();
   encode(n, bl);
@@ -480,7 +537,7 @@ static void encode_addrvec_map_as_addr(const T& m, bufferlist& bl, uint64_t f)
 }
 
 template<class T>
-static void encode_addrvec_pvec_as_addr(const T& m, bufferlist& bl, uint64_t f)
+static void encode_addrvec_pvec_as_addr(const T& m, ceph::buffer::list& bl, uint64_t f)
 {
   uint32_t n = m.size();
   encode(n, bl);
@@ -497,7 +554,7 @@ static void encode_addrvec_pvec_as_addr(const T& m, bufferlist& bl, uint64_t f)
  * introduced, please refer to
  *    doc/dev/osd_internals/osdmap_versions.txt
  */
-void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
+void OSDMap::Incremental::encode(ceph::buffer::list& bl, uint64_t features) const
 {
   using ceph::encode;
   if ((features & CEPH_FEATURE_OSDMAP_ENC) == 0) {
@@ -515,7 +572,7 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
   size_t start_offset = bl.length();
   size_t tail_offset;
   size_t crc_offset;
-  std::optional<buffer::list::contiguous_filler> crc_filler;
+  std::optional<ceph::buffer::list::contiguous_filler> crc_filler;
 
   // meta-encoding: how we include client-used and osd-specific data
   ENCODE_START(8, 7, bl);
@@ -550,11 +607,21 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
     if (v >= 5) {
       encode(new_state, bl);
     } else {
-      uint32_t n = new_state.size();
-      encode(n, bl);
+      map<int32_t, uint8_t> os;
       for (auto p : new_state) {
-       encode(p.first, bl);
-       encode((uint8_t)p.second, bl);
+        // new_state may only inculde some new flags(e.g., CEPH_OSD_NOOUT)
+        // that an old client could not understand.
+        // skip those!
+        uint8_t s = p.second;
+        if (p.second != 0 && s == 0)
+          continue;
+        os[p.first] = s;
+      }
+      uint32_t n = os.size();
+      encode(n, bl);
+      for (auto p : os) {
+        encode(p.first, bl);
+        encode(p.second, bl);
       }
     }
     encode(new_weight, bl);
@@ -581,7 +648,7 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
   }
 
   {
-    uint8_t target_v = 7;
+    uint8_t target_v = 9;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       target_v = 2;
     } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
@@ -605,7 +672,7 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
     }
     encode(cluster_snapshot, bl);
     encode(new_uuid, bl);
-    encode(new_xinfo, bl);
+    encode(new_xinfo, bl, features);
     if (target_v < 7) {
       encode_addrvec_map_as_addr(new_hb_front_up, bl, features);
     } else {
@@ -622,6 +689,12 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
       encode(new_require_min_compat_client, bl);
       encode(new_require_osd_release, bl);
     }
+    if (target_v >= 8) {
+      encode(new_crush_node_flags, bl);
+    }
+    if (target_v >= 9) {
+      encode(new_device_class_flags, bl);
+    }
     ENCODE_FINISH(bl); // osd-only data
   }
 
@@ -634,10 +707,10 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
   ENCODE_FINISH(bl); // meta-encoding wrapper
 
   // fill in crc
-  bufferlist front;
+  ceph::buffer::list front;
   front.substr_of(bl, start_offset, crc_offset - start_offset);
   inc_crc = front.crc32c(-1);
-  bufferlist tail;
+  ceph::buffer::list tail;
   tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
   inc_crc = tail.crc32c(inc_crc);
   ceph_le32 crc_le;
@@ -646,7 +719,7 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
   have_crc = true;
 }
 
-void OSDMap::Incremental::decode_classic(bufferlist::const_iterator &p)
+void OSDMap::Incremental::decode_classic(ceph::buffer::list::const_iterator &p)
 {
   using ceph::decode;
   __u32 n, t;
@@ -711,7 +784,7 @@ void OSDMap::Incremental::decode_classic(bufferlist::const_iterator &p)
     decode(n, p);
     while (n--) {
       old_pg_t opg;
-      ::decode_raw(opg, p);
+      ceph::decode_raw(opg, p);
       decode(new_pg_temp[pg_t(opg)], p);
     }
   } else {
@@ -750,7 +823,7 @@ void OSDMap::Incremental::decode_classic(bufferlist::const_iterator &p)
  * introduced, please refer to
  *    doc/dev/osd_internals/osdmap_versions.txt
  */
-void OSDMap::Incremental::decode(bufferlist::const_iterator& bl)
+void OSDMap::Incremental::decode(ceph::buffer::list::const_iterator& bl)
 {
   using ceph::decode;
   /**
@@ -762,7 +835,7 @@ void OSDMap::Incremental::decode(bufferlist::const_iterator& bl)
    */
   size_t start_offset = bl.get_off();
   size_t tail_offset = 0;
-  bufferlist crc_front, crc_tail;
+  ceph::buffer::list crc_front, crc_tail;
 
   DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
   if (struct_v < 7) {
@@ -831,7 +904,7 @@ void OSDMap::Incremental::decode(bufferlist::const_iterator& bl)
   }
 
   {
-    DECODE_START(7, bl); // extended, osd-only data
+    DECODE_START(9, bl); // extended, osd-only data
     decode(new_hb_back_up, bl);
     decode(new_up_thru, bl);
     decode(new_last_clean_interval, bl);
@@ -863,7 +936,7 @@ void OSDMap::Incremental::decode(bufferlist::const_iterator& bl)
       string r;
       decode(r, bl);
       if (r.length()) {
-       new_require_min_compat_client = ceph_release_from_name(r.c_str());
+       new_require_min_compat_client = ceph_release_from_name(r);
       }
     }
     if (struct_v >= 6) {
@@ -872,16 +945,22 @@ void OSDMap::Incremental::decode(bufferlist::const_iterator& bl)
     } else {
       if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
        // only for compat with post-kraken pre-luminous test clusters
-       new_require_osd_release = CEPH_RELEASE_LUMINOUS;
+       new_require_osd_release = ceph_release_t::luminous;
        new_flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
       } else if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_KRAKEN)) {
-       new_require_osd_release = CEPH_RELEASE_KRAKEN;
+       new_require_osd_release = ceph_release_t::kraken;
       } else if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_JEWEL)) {
-       new_require_osd_release = CEPH_RELEASE_JEWEL;
+       new_require_osd_release = ceph_release_t::jewel;
       } else {
-       new_require_osd_release = -1;
+       new_require_osd_release = ceph_release_t::unknown;
       }
     }
+    if (struct_v >= 8) {
+      decode(new_crush_node_flags, bl);
+    }
+    if (struct_v >= 9) {
+      decode(new_device_class_flags, bl);
+    }
     DECODE_FINISH(bl); // osd-only data
   }
 
@@ -903,7 +982,7 @@ void OSDMap::Incremental::decode(bufferlist::const_iterator& bl)
     // verify crc
     uint32_t actual = crc_front.crc32c(-1);
     if (tail_offset < bl.get_off()) {
-      bufferlist tail;
+      ceph::buffer::list tail;
       tail.substr_of(bl.get_bl(), tail_offset, bl.get_off() - tail_offset);
       actual = tail.crc32c(actual);
     }
@@ -911,7 +990,7 @@ void OSDMap::Incremental::decode(bufferlist::const_iterator& bl)
       ostringstream ss;
       ss << "bad crc, actual " << actual << " != expected " << inc_crc;
       string s = ss.str();
-      throw buffer::malformed_input(s.c_str());
+      throw ceph::buffer::malformed_input(s.c_str());
     }
   }
 }
@@ -928,13 +1007,13 @@ void OSDMap::Incremental::dump(Formatter *f) const
   f->dump_float("new_full_ratio", new_full_ratio);
   f->dump_float("new_nearfull_ratio", new_nearfull_ratio);
   f->dump_float("new_backfillfull_ratio", new_backfillfull_ratio);
-  f->dump_int("new_require_min_compat_client", new_require_min_compat_client);
-  f->dump_int("new_require_osd_release", new_require_osd_release);
+  f->dump_int("new_require_min_compat_client", ceph::to_integer<int>(new_require_min_compat_client));
+  f->dump_int("new_require_osd_release", ceph::to_integer<int>(new_require_osd_release));
 
   if (fullmap.length()) {
     f->open_object_section("full_map");
     OSDMap full;
-    bufferlist fbl = fullmap;  // kludge around constness.
+    ceph::buffer::list fbl = fullmap;  // kludge around constness.
     auto p = fbl.cbegin();
     full.decode(p);
     full.dump(f);
@@ -943,7 +1022,7 @@ void OSDMap::Incremental::dump(Formatter *f) const
   if (crush.length()) {
     f->open_object_section("crush");
     CrushWrapper c;
-    bufferlist tbl = crush;  // kludge around constness.
+    ceph::buffer::list tbl = crush;  // kludge around constness.
     auto p = tbl.cbegin();
     c.decode(p);
     c.dump(f);
@@ -1152,7 +1231,7 @@ void OSDMap::Incremental::dump(Formatter *f) const
   OSDMap::dump_erasure_code_profiles(new_erasure_code_profiles, f);
   f->open_array_section("old_erasure_code_profiles");
   for (const auto &erasure_code_profile : old_erasure_code_profiles) {
-    f->dump_string("old", erasure_code_profile.c_str());
+    f->dump_string("old", erasure_code_profile);
   }
   f->close_section();
 
@@ -1185,6 +1264,30 @@ void OSDMap::Incremental::dump(Formatter *f) const
     f->close_section();
     f->close_section();
   }
+  f->open_array_section("new_crush_node_flags");
+  for (auto& i : new_crush_node_flags) {
+    f->open_object_section("node");
+    f->dump_int("id", i.first);
+    set<string> st;
+    calc_state_set(i.second, st);
+    for (auto& j : st) {
+      f->dump_string("flag", j);
+    }
+    f->close_section();
+  }
+  f->close_section();
+  f->open_array_section("new_device_class_flags");
+  for (auto& i : new_device_class_flags) {
+    f->open_object_section("device_class");
+    f->dump_int("id", i.first);
+    set<string> st;
+    calc_state_set(i.second, st);
+    for (auto& j : st) {
+      f->dump_string("flag", j);
+    }
+    f->close_section();
+  }
+  f->close_section();
   f->close_section();
 }
 
@@ -1212,7 +1315,7 @@ bool OSDMap::is_blacklisted(const entity_addr_t& orig) const
   // all blacklist entries are type ANY for nautilus+
   // FIXME: avoid this copy!
   entity_addr_t a = orig;
-  if (require_osd_release < CEPH_RELEASE_NAUTILUS) {
+  if (require_osd_release < ceph_release_t::nautilus) {
     a.set_type(entity_addr_t::TYPE_LEGACY);
   } else {
     a.set_type(entity_addr_t::TYPE_ANY);
@@ -1373,10 +1476,10 @@ void OSDMap::get_up_osds(set<int32_t>& ls) const
   }
 }
 
-void OSDMap::get_out_osds(set<int32_t>& ls) const
+void OSDMap::get_out_existing_osds(set<int32_t>& ls) const
 {
   for (int i = 0; i < max_osd; i++) {
-    if (is_out(i))
+    if (exists(i) && get_weight(i) == CEPH_OSD_OUT)
       ls.insert(i);
   }
 }
@@ -1510,24 +1613,24 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
 
   if (entity_type == CEPH_ENTITY_TYPE_OSD) {
     const uint64_t jewel_features = CEPH_FEATURE_SERVER_JEWEL;
-    if (require_osd_release >= CEPH_RELEASE_JEWEL) {
+    if (require_osd_release >= ceph_release_t::jewel) {
       features |= jewel_features;
     }
     mask |= jewel_features;
 
     const uint64_t kraken_features = CEPH_FEATUREMASK_SERVER_KRAKEN
       | CEPH_FEATURE_MSG_ADDR2;
-    if (require_osd_release >= CEPH_RELEASE_KRAKEN) {
+    if (require_osd_release >= ceph_release_t::kraken) {
       features |= kraken_features;
     }
     mask |= kraken_features;
   }
 
-  if (require_min_compat_client >= CEPH_RELEASE_NAUTILUS) {
+  if (require_min_compat_client >= ceph_release_t::nautilus) {
     // if min_compat_client is >= nautilus, require v2 cephx signatures
     // from everyone
     features |= CEPH_FEATUREMASK_CEPHX_V2;
-  } else if (require_osd_release >= CEPH_RELEASE_NAUTILUS &&
+  } else if (require_osd_release >= ceph_release_t::nautilus &&
             entity_type == CEPH_ENTITY_TYPE_OSD) {
     // if osds are >= nautilus, at least require the signatures from them
     features |= CEPH_FEATUREMASK_CEPHX_V2;
@@ -1539,36 +1642,36 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
   return features;
 }
 
-uint8_t OSDMap::get_min_compat_client() const
+ceph_release_t OSDMap::get_min_compat_client() const
 {
   uint64_t f = get_features(CEPH_ENTITY_TYPE_CLIENT, nullptr);
 
   if (HAVE_FEATURE(f, OSDMAP_PG_UPMAP) ||      // v12.0.0-1733-g27d6f43
       HAVE_FEATURE(f, CRUSH_CHOOSE_ARGS)) {    // v12.0.1-2172-gef1ef28
-    return CEPH_RELEASE_LUMINOUS;  // v12.2.0
+    return ceph_release_t::luminous;  // v12.2.0
   }
   if (HAVE_FEATURE(f, CRUSH_TUNABLES5)) {      // v10.0.0-612-g043a737
-    return CEPH_RELEASE_JEWEL;     // v10.2.0
+    return ceph_release_t::jewel;     // v10.2.0
   }
   if (HAVE_FEATURE(f, CRUSH_V4)) {             // v0.91-678-g325fc56
-    return CEPH_RELEASE_HAMMER;    // v0.94.0
+    return ceph_release_t::hammer;    // v0.94.0
   }
   if (HAVE_FEATURE(f, OSD_PRIMARY_AFFINITY) || // v0.76-553-gf825624
       HAVE_FEATURE(f, CRUSH_TUNABLES3) ||      // v0.76-395-ge20a55d
       HAVE_FEATURE(f, OSD_CACHEPOOL)) {        // v0.67-401-gb91c1c5
-    return CEPH_RELEASE_FIREFLY;   // v0.80.0
+    return ceph_release_t::firefly;   // v0.80.0
   }
   if (HAVE_FEATURE(f, CRUSH_TUNABLES2) ||      // v0.54-684-g0cc47ff
       HAVE_FEATURE(f, OSDHASHPSPOOL)) {        // v0.57-398-g8cc2b0f
-    return CEPH_RELEASE_DUMPLING;  // v0.67.0
+    return ceph_release_t::dumpling;  // v0.67.0
   }
   if (HAVE_FEATURE(f, CRUSH_TUNABLES)) {       // v0.48argonaut-206-g6f381af
-    return CEPH_RELEASE_ARGONAUT;  // v0.48argonaut-206-g6f381af
+    return ceph_release_t::argonaut;  // v0.48argonaut-206-g6f381af
   }
-  return CEPH_RELEASE_ARGONAUT;    // v0.48argonaut-206-g6f381af
+  return ceph_release_t::argonaut;    // v0.48argonaut-206-g6f381af
 }
 
-uint8_t OSDMap::get_require_min_compat_client() const
+ceph_release_t OSDMap::get_require_min_compat_client() const
 {
   return require_min_compat_client;
 }
@@ -1636,7 +1739,7 @@ void OSDMap::dedup(const OSDMap *o, OSDMap *n)
   }
 
   // does crush match?
-  bufferlist oc, nc;
+  ceph::buffer::list oc, nc;
   encode(*o->crush, oc, CEPH_FEATURES_SUPPORTED_DEFAULT);
   encode(*n->crush, nc, CEPH_FEATURES_SUPPORTED_DEFAULT);
   if (oc.contents_equal(nc)) {
@@ -1740,73 +1843,60 @@ void OSDMap::clean_temps(CephContext *cct,
   }
 }
 
-void OSDMap::maybe_remove_pg_upmaps(CephContext *cct,
-                                    const OSDMap& oldmap,
-                                   const OSDMap& nextmap,
-                                    Incremental *pending_inc)
+void OSDMap::get_upmap_pgs(vector<pg_t> *upmap_pgs) const
 {
-  ldout(cct, 10) << __func__ << dendl;
-  set<pg_t> to_check;
-  set<pg_t> to_cancel;
-  map<int, map<int, float>> rule_weight_map;
+  upmap_pgs->reserve(pg_upmap.size() + pg_upmap_items.size());
+  for (auto& p : pg_upmap)
+    upmap_pgs->push_back(p.first);
+  for (auto& p : pg_upmap_items)
+    upmap_pgs->push_back(p.first);
+}
 
-  for (auto& p : nextmap.pg_upmap) {
-    to_check.insert(p.first);
-  }
-  for (auto& p : nextmap.pg_upmap_items) {
-    to_check.insert(p.first);
-  }
-  for (auto& p : pending_inc->new_pg_upmap) {
-    to_check.insert(p.first);
-  }
-  for (auto& p : pending_inc->new_pg_upmap_items) {
-    to_check.insert(p.first);
-  }
+bool OSDMap::check_pg_upmaps(
+  CephContext *cct,
+  const vector<pg_t>& to_check,
+  vector<pg_t> *to_cancel,
+  map<pg_t, mempool::osdmap::vector<pair<int,int>>> *to_remap) const
+{
+  bool any_change = false;
+  map<int, map<int, float>> rule_weight_map;
   for (auto& pg : to_check) {
-    const pg_pool_t *pi = nextmap.get_pg_pool(pg.pool());
+    const pg_pool_t *pi = get_pg_pool(pg.pool());
     if (!pi || pg.ps() >= pi->get_pg_num_pending()) {
       ldout(cct, 0) << __func__ << " pg " << pg << " is gone or merge source"
                    << dendl;
-      to_cancel.insert(pg);
+      to_cancel->push_back(pg);
       continue;
     }
     if (pi->is_pending_merge(pg, nullptr)) {
       ldout(cct, 0) << __func__ << " pg " << pg << " is pending merge"
                    << dendl;
-      to_cancel.insert(pg);
+      to_cancel->push_back(pg);
       continue;
     }
-    vector<int> raw_up;
-    int primary;
-    nextmap.pg_to_raw_up(pg, &raw_up, &primary);
-    vector<int> up;
-    up.reserve(raw_up.size());
-    for (auto osd : raw_up) {
-      // skip non-existent/down osd for erasure-coded PGs
-      if (osd == CRUSH_ITEM_NONE)
-        continue;
-      up.push_back(osd);
-    }
-    auto crush_rule = nextmap.get_pg_pool_crush_rule(pg);
-    auto r = nextmap.crush->verify_upmap(cct,
-                                         crush_rule,
-                                         nextmap.get_pg_pool_size(pg),
-                                         up);
+    vector<int> raw, up;
+    pg_to_raw_upmap(pg, &raw, &up);
+    auto crush_rule = get_pg_pool_crush_rule(pg);
+    auto r = crush->verify_upmap(cct,
+                                 crush_rule,
+                                 get_pg_pool_size(pg),
+                                 up);
     if (r < 0) {
       ldout(cct, 0) << __func__ << " verify_upmap of pg " << pg
                     << " returning " << r
                     << dendl;
-      to_cancel.insert(pg);
+      to_cancel->push_back(pg);
       continue;
     }
     // below we check against crush-topology changing..
     map<int, float> weight_map;
     auto it = rule_weight_map.find(crush_rule);
     if (it == rule_weight_map.end()) {
-      auto r = nextmap.crush->get_rule_weight_osd_map(crush_rule, &weight_map);
+      auto r = crush->get_rule_weight_osd_map(crush_rule, &weight_map);
       if (r < 0) {
         lderr(cct) << __func__ << " unable to get crush weight_map for "
-                   << "crush_rule " << crush_rule << dendl;
+                   << "crush_rule " << crush_rule
+                   << dendl;
         continue;
       }
       rule_weight_map[crush_rule] = weight_map;
@@ -1819,56 +1909,123 @@ void OSDMap::maybe_remove_pg_upmaps(CephContext *cct,
     for (auto osd : up) {
       auto it = weight_map.find(osd);
       if (it == weight_map.end()) {
-        // osd is gone or has been moved out of the specific crush-tree
-        to_cancel.insert(pg);
+        ldout(cct, 10) << __func__ << " pg " << pg << ": osd " << osd << " is gone or has "
+                     << "been moved out of the specific crush-tree"
+                     << dendl;
+        to_cancel->push_back(pg);
         break;
       }
-      auto adjusted_weight = nextmap.get_weightf(it->first) * it->second;
+      auto adjusted_weight = get_weightf(it->first) * it->second;
       if (adjusted_weight == 0) {
-        // osd is out/crush-out
-        to_cancel.insert(pg);
+        ldout(cct, 10) << __func__ << " pg " << pg << ": osd " << osd
+                     << " is out/crush-out"
+                   << dendl;
+        to_cancel->push_back(pg);
         break;
       }
     }
-  }
-  for (auto &pg: to_cancel) {
-    { // pg_upmap
-      auto it = pending_inc->new_pg_upmap.find(pg);
-      if (it != pending_inc->new_pg_upmap.end()) {
-        ldout(cct, 10) << __func__ << " cancel invalid pending "
-                       << "pg_upmap entry "
-                       << it->first << "->" << it->second
-                       << dendl;
-        pending_inc->new_pg_upmap.erase(it);
-      }
-      if (oldmap.pg_upmap.count(pg)) {
-        ldout(cct, 10) << __func__ << " cancel invalid pg_upmap entry "
-                       << oldmap.pg_upmap.find(pg)->first << "->"
-                       << oldmap.pg_upmap.find(pg)->second
-                       << dendl;
-        pending_inc->old_pg_upmap.insert(pg);
-      }
+    if (!to_cancel->empty() && to_cancel->back() == pg)
+      continue;
+    // okay, upmap is valid
+    // continue to check if it is still necessary
+    auto i = pg_upmap.find(pg);
+    if (i != pg_upmap.end() && raw == i->second) {
+      ldout(cct, 10) << " removing redundant pg_upmap "
+                     << i->first << " " << i->second
+                     << dendl;
+      to_cancel->push_back(pg);
+      continue;
     }
-    { // pg_upmap_items
-      auto it = pending_inc->new_pg_upmap_items.find(pg);
-      if (it != pending_inc->new_pg_upmap_items.end()) {
-        ldout(cct, 10) << __func__ << " cancel invalid pending "
-                       << "pg_upmap_items entry "
-                       << it->first << "->" << it->second
-                       << dendl;
-        pending_inc->new_pg_upmap_items.erase(it);
+    auto j = pg_upmap_items.find(pg);
+    if (j != pg_upmap_items.end()) {
+      mempool::osdmap::vector<pair<int,int>> newmap;
+      for (auto& p : j->second) {
+        if (std::find(raw.begin(), raw.end(), p.first) == raw.end()) {
+          // cancel mapping if source osd does not exist anymore
+          continue;
+        }
+        if (p.second != CRUSH_ITEM_NONE && p.second < max_osd &&
+            p.second >= 0 && osd_weight[p.second] == 0) {
+          // cancel mapping if target osd is out
+          continue;
+        }
+        newmap.push_back(p);
       }
-      if (oldmap.pg_upmap_items.count(pg)) {
-        ldout(cct, 10) << __func__ << " cancel invalid "
-                       << "pg_upmap_items entry "
-                       << oldmap.pg_upmap_items.find(pg)->first << "->"
-                       << oldmap.pg_upmap_items.find(pg)->second
+      if (newmap.empty()) {
+        ldout(cct, 10) << " removing no-op pg_upmap_items "
+                       << j->first << " " << j->second
+                       << dendl;
+        to_cancel->push_back(pg);
+      } else if (newmap != j->second) {
+        ldout(cct, 10) << " simplifying partially no-op pg_upmap_items "
+                       << j->first << " " << j->second
+                       << " -> " << newmap
                        << dendl;
-        pending_inc->old_pg_upmap_items.insert(pg);
+        to_remap->insert({pg, newmap});
+        any_change = true;
       }
     }
   }
-  nextmap.clean_pg_upmaps(cct, pending_inc);
+  any_change = any_change || !to_cancel->empty();
+  return any_change;
+}
+
+void OSDMap::clean_pg_upmaps(
+  CephContext *cct,
+  Incremental *pending_inc,
+  const vector<pg_t>& to_cancel,
+  const map<pg_t, mempool::osdmap::vector<pair<int,int>>>& to_remap) const
+{
+  for (auto &pg: to_cancel) {
+    auto i = pending_inc->new_pg_upmap.find(pg);
+    if (i != pending_inc->new_pg_upmap.end()) {
+      ldout(cct, 10) << __func__ << " cancel invalid pending "
+                     << "pg_upmap entry "
+                     << i->first << "->" << i->second
+                     << dendl;
+      pending_inc->new_pg_upmap.erase(i);
+    }
+    auto j = pg_upmap.find(pg);
+    if (j != pg_upmap.end()) {
+      ldout(cct, 10) << __func__ << " cancel invalid pg_upmap entry "
+                     << j->first << "->" << j->second
+                     << dendl;
+      pending_inc->old_pg_upmap.insert(pg);
+    }
+    auto p = pending_inc->new_pg_upmap_items.find(pg);
+    if (p != pending_inc->new_pg_upmap_items.end()) {
+      ldout(cct, 10) << __func__ << " cancel invalid pending "
+                     << "pg_upmap_items entry "
+                     << p->first << "->" << p->second
+                     << dendl;
+      pending_inc->new_pg_upmap_items.erase(p);
+    }
+    auto q = pg_upmap_items.find(pg);
+    if (q != pg_upmap_items.end()) {
+      ldout(cct, 10) << __func__ << " cancel invalid "
+                     << "pg_upmap_items entry "
+                     << q->first << "->" << q->second
+                     << dendl;
+      pending_inc->old_pg_upmap_items.insert(pg);
+    }
+  }
+  for (auto& i : to_remap)
+    pending_inc->new_pg_upmap_items[i.first] = i.second;
+}
+
+bool OSDMap::clean_pg_upmaps(
+  CephContext *cct,
+  Incremental *pending_inc) const
+{
+  ldout(cct, 10) << __func__ << dendl;
+  vector<pg_t> to_check;
+  vector<pg_t> to_cancel;
+  map<pg_t, mempool::osdmap::vector<pair<int,int>>> to_remap;
+
+  get_upmap_pgs(&to_check);
+  auto any_change = check_pg_upmaps(cct, to_check, &to_cancel, &to_remap);
+  clean_pg_upmaps(cct, pending_inc, to_cancel, to_remap);
+  return any_change;
 }
 
 int OSDMap::apply_incremental(const Incremental &inc)
@@ -1886,7 +2043,7 @@ int OSDMap::apply_incremental(const Incremental &inc)
 
   // full map?
   if (inc.fullmap.length()) {
-    bufferlist bl(inc.fullmap);
+    ceph::buffer::list bl(inc.fullmap);
     decode(bl);
     return 0;
   }
@@ -1899,12 +2056,12 @@ int OSDMap::apply_incremental(const Incremental &inc)
     // require_kraken_osds before the osds can be upgraded to
     // luminous.
     if (flags & CEPH_OSDMAP_REQUIRE_KRAKEN) {
-      if (require_osd_release < CEPH_RELEASE_KRAKEN) {
-       require_osd_release = CEPH_RELEASE_KRAKEN;
+      if (require_osd_release < ceph_release_t::kraken) {
+       require_osd_release = ceph_release_t::kraken;
       }
     } else if (flags & CEPH_OSDMAP_REQUIRE_JEWEL) {
-      if (require_osd_release < CEPH_RELEASE_JEWEL) {
-       require_osd_release = CEPH_RELEASE_JEWEL;
+      if (require_osd_release < ceph_release_t::jewel) {
+       require_osd_release = ceph_release_t::jewel;
       }
     }
   }
@@ -2013,6 +2170,7 @@ int OSDMap::apply_incremental(const Incremental &inc)
 
   for (const auto &client : inc.new_up_client) {
     osd_state[client.first] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
+    osd_state[client.first] &= ~CEPH_OSD_STOP; // if any
     osd_addrs->client_addrs[client.first].reset(
       new entity_addrvec_t(client.second));
     osd_addrs->hb_back_addrs[client.first].reset(
@@ -2087,6 +2245,22 @@ int OSDMap::apply_incremental(const Incremental &inc)
   for (const auto &addr : inc.old_blacklist)
     blacklist.erase(addr);
 
+  for (auto& i : inc.new_crush_node_flags) {
+    if (i.second) {
+      crush_node_flags[i.first] = i.second;
+    } else {
+      crush_node_flags.erase(i.first);
+    }
+  }
+
+  for (auto& i : inc.new_device_class_flags) {
+    if (i.second) {
+      device_class_flags[i.first] = i.second;
+    } else {
+      device_class_flags.erase(i.first);
+    }
+  }
+
   // cluster snapshot?
   if (inc.cluster_snapshot.length()) {
     cluster_snapshot = inc.cluster_snapshot;
@@ -2105,36 +2279,44 @@ int OSDMap::apply_incremental(const Incremental &inc)
   if (inc.new_full_ratio >= 0) {
     full_ratio = inc.new_full_ratio;
   }
-  if (inc.new_require_min_compat_client > 0) {
+  if (inc.new_require_min_compat_client > ceph_release_t::unknown) {
     require_min_compat_client = inc.new_require_min_compat_client;
   }
-  if (inc.new_require_osd_release >= 0) {
+  if (inc.new_require_osd_release >= ceph_release_t::unknown) {
     require_osd_release = inc.new_require_osd_release;
-    if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+    if (require_osd_release >= ceph_release_t::luminous) {
       flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
       flags |= CEPH_OSDMAP_RECOVERY_DELETES;
     }
   }
 
-  if (inc.new_require_osd_release >= 0) {
+  if (inc.new_require_osd_release >= ceph_release_t::unknown) {
     require_osd_release = inc.new_require_osd_release;
-    if (require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+    if (require_osd_release >= ceph_release_t::nautilus) {
       flags |= CEPH_OSDMAP_PGLOG_HARDLIMIT;
     }
   }
   // do new crush map last (after up/down stuff)
   if (inc.crush.length()) {
-    bufferlist bl(inc.crush);
+    ceph::buffer::list bl(inc.crush);
     auto blp = bl.cbegin();
     crush.reset(new CrushWrapper);
     crush->decode(blp);
-    if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+    if (require_osd_release >= ceph_release_t::luminous) {
       // only increment if this is a luminous-encoded osdmap, lest
       // the mon's crush_version diverge from what the osds or others
       // are decoding and applying on their end.  if we won't encode
       // it in the canonical version, don't change it.
       ++crush_version;
     }
+    for (auto it = device_class_flags.begin();
+         it != device_class_flags.end();) {
+      const char* class_name = crush->get_class_name(it->first);
+      if (!class_name) // device class is gone
+        it = device_class_flags.erase(it);
+      else
+        it++;
+    }
   }
 
   calc_num_osds();
@@ -2414,14 +2596,16 @@ void OSDMap::pg_to_raw_osds(pg_t pg, vector<int> *raw, int *primary) const
   *primary = _pick_primary(*raw);
 }
 
-void OSDMap::pg_to_raw_upmap(pg_t pg, vector<int> *raw_upmap) const
+void OSDMap::pg_to_raw_upmap(pg_t pg, vector<int>*raw,
+                             vector<int> *raw_upmap) const
 {
   auto pool = get_pg_pool(pg.pool());
   if (!pool) {
     raw_upmap->clear();
     return;
   }
-  _pg_to_raw_osds(*pool, pg, raw_upmap, NULL);
+  _pg_to_raw_osds(*pool, pg, raw, NULL);
+  *raw_upmap = *raw;
   _apply_upmap(*pool, pg, raw_upmap);
 }
 
@@ -2492,8 +2676,11 @@ void OSDMap::_pg_to_up_acting_osds(
     *acting_primary = _acting_primary;
 }
 
-int OSDMap::calc_pg_rank(int osd, const vector<int>& acting, int nrep)
+int OSDMap::calc_pg_role_broken(int osd, const vector<int>& acting, int nrep)
 {
+  // This implementation is broken for EC PGs since the osd may appear
+  // multiple times in the acting set.  See
+  // https://tracker.ceph.com/issues/43213
   if (!nrep)
     nrep = acting.size();
   for (int i=0; i<nrep; i++) 
@@ -2502,12 +2689,24 @@ int OSDMap::calc_pg_rank(int osd, const vector<int>& acting, int nrep)
   return -1;
 }
 
-int OSDMap::calc_pg_role(int osd, const vector<int>& acting, int nrep)
+int OSDMap::calc_pg_role(pg_shard_t who, const vector<int>& acting)
 {
-  return calc_pg_rank(osd, acting, nrep);
+  int nrep = acting.size();
+  if (who.shard == shard_id_t::NO_SHARD) {
+    for (int i=0; i<nrep; i++) {
+      if (acting[i] == who.osd) {
+       return i;
+      }
+    }
+  } else {
+    if (who.shard < nrep && acting[who.shard] == who.osd) {
+      return who.shard;
+    }
+  }
+  return -1;
 }
 
-bool OSDMap::primary_changed(
+bool OSDMap::primary_changed_broken(
   int oldprimary,
   const vector<int> &oldacting,
   int newprimary,
@@ -2519,8 +2718,8 @@ bool OSDMap::primary_changed(
     return true;     // was empty, now not, or vice versa
   if (oldprimary != newprimary)
     return true;     // primary changed
-  if (calc_pg_rank(oldprimary, oldacting) !=
-      calc_pg_rank(newprimary, newacting))
+  if (calc_pg_role_broken(oldprimary, oldacting) !=
+      calc_pg_role_broken(newprimary, newacting))
     return true;
   return false;      // same primary (tho replicas may have changed)
 }
@@ -2528,21 +2727,24 @@ bool OSDMap::primary_changed(
 uint64_t OSDMap::get_encoding_features() const
 {
   uint64_t f = SIGNIFICANT_FEATURES;
-  if (require_osd_release < CEPH_RELEASE_NAUTILUS) {
+  if (require_osd_release < ceph_release_t::octopus) {
+    f &= ~CEPH_FEATURE_SERVER_OCTOPUS;
+  }
+  if (require_osd_release < ceph_release_t::nautilus) {
     f &= ~CEPH_FEATURE_SERVER_NAUTILUS;
   }
-  if (require_osd_release < CEPH_RELEASE_MIMIC) {
+  if (require_osd_release < ceph_release_t::mimic) {
     f &= ~CEPH_FEATURE_SERVER_MIMIC;
   }
-  if (require_osd_release < CEPH_RELEASE_LUMINOUS) {
+  if (require_osd_release < ceph_release_t::luminous) {
     f &= ~(CEPH_FEATURE_SERVER_LUMINOUS |
           CEPH_FEATURE_CRUSH_CHOOSE_ARGS);
   }
-  if (require_osd_release < CEPH_RELEASE_KRAKEN) {
+  if (require_osd_release < ceph_release_t::kraken) {
     f &= ~(CEPH_FEATURE_SERVER_KRAKEN |
           CEPH_FEATURE_MSG_ADDR2);
   }
-  if (require_osd_release < CEPH_RELEASE_JEWEL) {
+  if (require_osd_release < ceph_release_t::jewel) {
     f &= ~(CEPH_FEATURE_SERVER_JEWEL |
           CEPH_FEATURE_NEW_OSDOP_ENCODING |
           CEPH_FEATURE_CRUSH_TUNABLES5);
@@ -2551,7 +2753,7 @@ uint64_t OSDMap::get_encoding_features() const
 }
 
 // serialize, unserialize
-void OSDMap::encode_client_old(bufferlist& bl) const
+void OSDMap::encode_client_old(ceph::buffer::list& bl) const
 {
   using ceph::encode;
   __u16 v = 5;
@@ -2607,12 +2809,12 @@ void OSDMap::encode_client_old(bufferlist& bl) const
   }
 
   // crush
-  bufferlist cbl;
+  ceph::buffer::list cbl;
   crush->encode(cbl, 0 /* legacy (no) features */);
   encode(cbl, bl);
 }
 
-void OSDMap::encode_classic(bufferlist& bl, uint64_t features) const
+void OSDMap::encode_classic(ceph::buffer::list& bl, uint64_t features) const
 {
   using ceph::encode;
   if ((features & CEPH_FEATURE_PGID64) == 0) {
@@ -2649,7 +2851,7 @@ void OSDMap::encode_classic(bufferlist& bl, uint64_t features) const
   encode(*pg_temp, bl);
 
   // crush
-  bufferlist cbl;
+  ceph::buffer::list cbl;
   crush->encode(cbl, 0 /* legacy (no) features */);
   encode(cbl, bl);
 
@@ -2663,7 +2865,7 @@ void OSDMap::encode_classic(bufferlist& bl, uint64_t features) const
   encode(cluster_snapshot_epoch, bl);
   encode(cluster_snapshot, bl);
   encode(*osd_uuid, bl);
-  encode(osd_xinfo, bl);
+  encode(osd_xinfo, bl, features);
   encode(osd_addrs->hb_front_addrs, bl, features);
 }
 
@@ -2671,7 +2873,7 @@ void OSDMap::encode_classic(bufferlist& bl, uint64_t features) const
  * refer to
  *    doc/dev/osd_internals/osdmap_versions.txt
  */
-void OSDMap::encode(bufferlist& bl, uint64_t features) const
+void OSDMap::encode(ceph::buffer::list& bl, uint64_t features) const
 {
   using ceph::encode;
   if ((features & CEPH_FEATURE_OSDMAP_ENC) == 0) {
@@ -2689,7 +2891,7 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   size_t start_offset = bl.length();
   size_t tail_offset;
   size_t crc_offset;
-  std::optional<buffer::list::contiguous_filler> crc_filler;
+  std::optional<ceph::buffer::list::contiguous_filler> crc_filler;
 
   // meta-encoding: how we include client-used and osd-specific data
   ENCODE_START(8, 7, bl);
@@ -2718,11 +2920,11 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
 
     if (v < 4) {
       decltype(flags) f = flags;
-      if (require_osd_release >= CEPH_RELEASE_LUMINOUS)
+      if (require_osd_release >= ceph_release_t::luminous)
        f |= CEPH_OSDMAP_REQUIRE_LUMINOUS | CEPH_OSDMAP_RECOVERY_DELETES;
-      else if (require_osd_release == CEPH_RELEASE_KRAKEN)
+      else if (require_osd_release == ceph_release_t::kraken)
        f |= CEPH_OSDMAP_REQUIRE_KRAKEN;
-      else if (require_osd_release == CEPH_RELEASE_JEWEL)
+      else if (require_osd_release == ceph_release_t::jewel)
        f |= CEPH_OSDMAP_REQUIRE_JEWEL;
       encode(f, bl);
     } else {
@@ -2756,7 +2958,7 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
     }
 
     // crush
-    bufferlist cbl;
+    ceph::buffer::list cbl;
     crush->encode(cbl, features);
     encode(cbl, bl);
     encode(erasure_code_profiles, bl);
@@ -2785,7 +2987,7 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   {
     // NOTE: any new encoding dependencies must be reflected by
     // SIGNIFICANT_FEATURES
-    uint8_t target_v = 7;
+    uint8_t target_v = 9;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       target_v = 1;
     } else if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
@@ -2816,7 +3018,7 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
     encode(cluster_snapshot_epoch, bl);
     encode(cluster_snapshot, bl);
     encode(*osd_uuid, bl);
-    encode(osd_xinfo, bl);
+    encode(osd_xinfo, bl, features);
     if (target_v < 7) {
       encode_addrvec_pvec_as_addr(osd_addrs->hb_front_addrs, bl, features);
     } else {
@@ -2835,6 +3037,12 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
     if (target_v >= 6) {
       encode(removed_snaps_queue, bl);
     }
+    if (target_v >= 8) {
+      encode(crush_node_flags, bl);
+    }
+    if (target_v >= 9) {
+      encode(device_class_flags, bl);
+    }
     ENCODE_FINISH(bl); // osd-only data
   }
 
@@ -2845,11 +3053,11 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   ENCODE_FINISH(bl); // meta-encoding wrapper
 
   // fill in crc
-  bufferlist front;
+  ceph::buffer::list front;
   front.substr_of(bl, start_offset, crc_offset - start_offset);
   crc = front.crc32c(-1);
   if (tail_offset < bl.length()) {
-    bufferlist tail;
+    ceph::buffer::list tail;
     tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
     crc = tail.crc32c(crc);
   }
@@ -2863,13 +3071,13 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
  * refer to
  *    doc/dev/osd_internals/osdmap_versions.txt
  */
-void OSDMap::decode(bufferlist& bl)
+void OSDMap::decode(ceph::buffer::list& bl)
 {
   auto p = bl.cbegin();
   decode(p);
 }
 
-void OSDMap::decode_classic(bufferlist::const_iterator& p)
+void OSDMap::decode_classic(ceph::buffer::list::const_iterator& p)
 {
   using ceph::decode;
   __u32 n, t;
@@ -2935,7 +3143,7 @@ void OSDMap::decode_classic(bufferlist::const_iterator& p)
     decode(n, p);
     while (n--) {
       old_pg_t opg;
-      ::decode_raw(opg, p);
+      ceph::decode_raw(opg, p);
       mempool::osdmap::vector<int32_t> v;
       decode(v, p);
       pg_temp->set(pg_t(opg), v);
@@ -2945,7 +3153,7 @@ void OSDMap::decode_classic(bufferlist::const_iterator& p)
   }
 
   // crush
-  bufferlist cbl;
+  ceph::buffer::list cbl;
   decode(cbl, p);
   auto cblp = cbl.cbegin();
   crush->decode(cblp);
@@ -2990,7 +3198,7 @@ void OSDMap::decode_classic(bufferlist::const_iterator& p)
   post_decode();
 }
 
-void OSDMap::decode(bufferlist::const_iterator& bl)
+void OSDMap::decode(ceph::buffer::list::const_iterator& bl)
 {
   using ceph::decode;
   /**
@@ -3002,7 +3210,7 @@ void OSDMap::decode(bufferlist::const_iterator& bl)
    */
   size_t start_offset = bl.get_off();
   size_t tail_offset = 0;
-  bufferlist crc_front, crc_tail;
+  ceph::buffer::list crc_front, crc_tail;
 
   DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
   if (struct_v < 7) {
@@ -3055,7 +3263,7 @@ void OSDMap::decode(bufferlist::const_iterator& bl)
     }
 
     // crush
-    bufferlist cbl;
+    ceph::buffer::list cbl;
     decode(cbl, bl);
     auto cblp = cbl.cbegin();
     crush->decode(cblp);
@@ -3095,7 +3303,7 @@ void OSDMap::decode(bufferlist::const_iterator& bl)
   }
 
   {
-    DECODE_START(7, bl); // extended, osd-only data
+    DECODE_START(9, bl); // extended, osd-only data
     decode(osd_addrs->hb_back_addrs, bl);
     decode(osd_info, bl);
     decode(blacklist, bl);
@@ -3127,30 +3335,40 @@ void OSDMap::decode(bufferlist::const_iterator& bl)
     if (struct_v >= 5) {
       decode(require_min_compat_client, bl);
       decode(require_osd_release, bl);
-      if (require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+      if (require_osd_release >= ceph_release_t::nautilus) {
        flags |= CEPH_OSDMAP_PGLOG_HARDLIMIT;
       }
-      if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+      if (require_osd_release >= ceph_release_t::luminous) {
        flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
        flags |= CEPH_OSDMAP_RECOVERY_DELETES;
       }
     } else {
       if (flags & CEPH_OSDMAP_REQUIRE_LUMINOUS) {
        // only for compat with post-kraken pre-luminous test clusters
-       require_osd_release = CEPH_RELEASE_LUMINOUS;
+       require_osd_release = ceph_release_t::luminous;
        flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
        flags |= CEPH_OSDMAP_RECOVERY_DELETES;
       } else if (flags & CEPH_OSDMAP_REQUIRE_KRAKEN) {
-       require_osd_release = CEPH_RELEASE_KRAKEN;
+       require_osd_release = ceph_release_t::kraken;
       } else if (flags & CEPH_OSDMAP_REQUIRE_JEWEL) {
-       require_osd_release = CEPH_RELEASE_JEWEL;
+       require_osd_release = ceph_release_t::jewel;
       } else {
-       require_osd_release = 0;
+       require_osd_release = ceph_release_t::unknown;
       }
     }
     if (struct_v >= 6) {
       decode(removed_snaps_queue, bl);
     }
+    if (struct_v >= 8) {
+      decode(crush_node_flags, bl);
+    } else {
+      crush_node_flags.clear();
+    }
+    if (struct_v >= 9) {
+      decode(device_class_flags, bl);
+    } else {
+      device_class_flags.clear();
+    }
     DECODE_FINISH(bl); // osd-only data
   }
 
@@ -3170,7 +3388,7 @@ void OSDMap::decode(bufferlist::const_iterator& bl)
     // verify crc
     uint32_t actual = crc_front.crc32c(-1);
     if (tail_offset < bl.get_off()) {
-      bufferlist tail;
+      ceph::buffer::list tail;
       tail.substr_of(bl.get_bl(), tail_offset, bl.get_off() - tail_offset);
       actual = tail.crc32c(actual);
     }
@@ -3178,7 +3396,7 @@ void OSDMap::decode(bufferlist::const_iterator& bl)
       ostringstream ss;
       ss << "bad crc, actual " << actual << " != expected " << crc;
       string s = ss.str();
-      throw buffer::malformed_input(s.c_str());
+      throw ceph::buffer::malformed_input(s.c_str());
     }
   }
 
@@ -3205,13 +3423,61 @@ void OSDMap::dump_erasure_code_profiles(
   for (const auto &profile : profiles) {
     f->open_object_section(profile.first.c_str());
     for (const auto &profm : profile.second) {
-      f->dump_string(profm.first.c_str(), profm.second.c_str());
+      f->dump_string(profm.first.c_str(), profm.second);
     }
     f->close_section();
   }
   f->close_section();
 }
 
+void OSDMap::dump_osds(Formatter *f) const
+{
+  f->open_array_section("osds");
+  for (int i=0; i<get_max_osd(); i++) {
+    if (exists(i)) {
+      dump_osd(i, f);
+    }
+  }
+  f->close_section();
+}
+
+void OSDMap::dump_osd(int id, Formatter *f) const
+{
+  ceph_assert(f != nullptr);
+  if (!exists(id)) {
+    return;
+  }
+
+  f->open_object_section("osd_info");
+  f->dump_int("osd", id);
+  f->dump_stream("uuid") << get_uuid(id);
+  f->dump_int("up", is_up(id));
+  f->dump_int("in", is_in(id));
+  f->dump_float("weight", get_weightf(id));
+  f->dump_float("primary_affinity", get_primary_affinityf(id));
+  get_info(id).dump(f);
+  f->dump_object("public_addrs", get_addrs(id));
+  f->dump_object("cluster_addrs", get_cluster_addrs(id));
+  f->dump_object("heartbeat_back_addrs", get_hb_back_addrs(id));
+  f->dump_object("heartbeat_front_addrs", get_hb_front_addrs(id));
+  // compat
+  f->dump_stream("public_addr") << get_addrs(id).get_legacy_str();
+  f->dump_stream("cluster_addr") << get_cluster_addrs(id).get_legacy_str();
+  f->dump_stream("heartbeat_back_addr")
+    << get_hb_back_addrs(id).get_legacy_str();
+  f->dump_stream("heartbeat_front_addr")
+    << get_hb_front_addrs(id).get_legacy_str();
+
+  set<string> st;
+  get_state(id, st);
+  f->open_array_section("state");
+  for (const auto &state : st)
+    f->dump_string("state", state);
+  f->close_section();
+
+  f->close_section();
+}
+
 void OSDMap::dump(Formatter *f) const
 {
   f->dump_int("epoch", get_epoch());
@@ -3237,11 +3503,11 @@ void OSDMap::dump(Formatter *f) const
   f->dump_int("pool_max", get_pool_max());
   f->dump_int("max_osd", get_max_osd());
   f->dump_string("require_min_compat_client",
-                ceph_release_name(require_min_compat_client));
+                ceph::to_string(require_min_compat_client));
   f->dump_string("min_compat_client",
-                ceph_release_name(get_min_compat_client()));
+                ceph::to_string(get_min_compat_client()));
   f->dump_string("require_osd_release",
-                ceph_release_name(require_osd_release));
+                ceph::to_string(require_osd_release));
 
   f->open_array_section("pools");
   for (const auto &pool : pools) {
@@ -3257,39 +3523,7 @@ void OSDMap::dump(Formatter *f) const
   }
   f->close_section();
 
-  f->open_array_section("osds");
-  for (int i=0; i<get_max_osd(); i++)
-    if (exists(i)) {
-      f->open_object_section("osd_info");
-      f->dump_int("osd", i);
-      f->dump_stream("uuid") << get_uuid(i);
-      f->dump_int("up", is_up(i));
-      f->dump_int("in", is_in(i));
-      f->dump_float("weight", get_weightf(i));
-      f->dump_float("primary_affinity", get_primary_affinityf(i));
-      get_info(i).dump(f);
-      f->dump_object("public_addrs", get_addrs(i));
-      f->dump_object("cluster_addrs", get_cluster_addrs(i));
-      f->dump_object("heartbeat_back_addrs", get_hb_back_addrs(i));
-      f->dump_object("heartbeat_front_addrs", get_hb_front_addrs(i));
-      // compat
-      f->dump_stream("public_addr") << get_addrs(i).get_legacy_str();
-      f->dump_stream("cluster_addr") << get_cluster_addrs(i).get_legacy_str();
-      f->dump_stream("heartbeat_back_addr")
-       << get_hb_back_addrs(i).get_legacy_str();
-      f->dump_stream("heartbeat_front_addr")
-       << get_hb_front_addrs(i).get_legacy_str();
-
-      set<string> st;
-      get_state(i, st);
-      f->open_array_section("state");
-      for (const auto &state : st)
-       f->dump_string("state", state);
-      f->close_section();
-
-      f->close_section();
-    }
-  f->close_section();
+  dump_osds(f);
 
   f->open_array_section("osd_xinfo");
   for (int i=0; i<get_max_osd(); i++) {
@@ -3395,6 +3629,32 @@ void OSDMap::dump(Formatter *f) const
     f->close_section();
   }
   f->close_section();
+  f->open_object_section("crush_node_flags");
+  for (auto& i : crush_node_flags) {
+    string s = crush->item_exists(i.first) ? crush->get_item_name(i.first)
+      : stringify(i.first);
+    f->open_array_section(s.c_str());
+    set<string> st;
+    calc_state_set(i.second, st);
+    for (auto& j : st) {
+      f->dump_string("flag", j);
+    }
+    f->close_section();
+  }
+  f->close_section();
+  f->open_object_section("device_class_flags");
+  for (auto& i : device_class_flags) {
+    const char* class_name = crush->get_class_name(i.first);
+    string s = class_name ? class_name : stringify(i.first);
+    f->open_array_section(s.c_str());
+    set<string> st;
+    calc_state_set(i.second, st);
+    for (auto& j : st) {
+      f->dump_string("flag", j);
+    }
+    f->close_section();
+  }
+  f->close_section();
 }
 
 void OSDMap::generate_test_instances(list<OSDMap*>& o)
@@ -3413,10 +3673,6 @@ void OSDMap::generate_test_instances(list<OSDMap*>& o)
 string OSDMap::get_flag_string(unsigned f)
 {
   string s;
-  if ( f& CEPH_OSDMAP_NEARFULL)
-    s += ",nearfull";
-  if (f & CEPH_OSDMAP_FULL)
-    s += ",full";
   if (f & CEPH_OSDMAP_PAUSERD)
     s += ",pauserd";
   if (f & CEPH_OSDMAP_PAUSEWR)
@@ -3493,6 +3749,39 @@ void OSDMap::print_pools(ostream& out) const
   out << std::endl;
 }
 
+void OSDMap::print_osds(ostream& out) const
+{
+  for (int i=0; i<get_max_osd(); i++) {
+    if (exists(i)) {
+      print_osd(i, out);
+    }
+  }
+}
+void OSDMap::print_osd(int id, ostream& out) const
+{
+  if (!exists(id)) {
+    return;
+  }
+
+  out << "osd." << id;
+  out << (is_up(id) ? " up  ":" down");
+  out << (is_in(id) ? " in ":" out");
+  out << " weight " << get_weightf(id);
+  if (get_primary_affinity(id) != CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
+    out << " primary_affinity " << get_primary_affinityf(id);
+  }
+  const osd_info_t& info(get_info(id));
+  out << " " << info;
+  out << " " << get_addrs(id) << " " << get_cluster_addrs(id);
+  set<string> st;
+  get_state(id, st);
+  out << " " << st;
+  if (!get_uuid(id).is_zero()) {
+    out << " " << get_uuid(id);
+  }
+  out << "\n";
+}
+
 void OSDMap::print(ostream& out) const
 {
   out << "epoch " << get_epoch() << "\n"
@@ -3505,14 +3794,14 @@ void OSDMap::print(ostream& out) const
   out << "full_ratio " << full_ratio << "\n";
   out << "backfillfull_ratio " << backfillfull_ratio << "\n";
   out << "nearfull_ratio " << nearfull_ratio << "\n";
-  if (require_min_compat_client > 0) {
+  if (require_min_compat_client != ceph_release_t::unknown) {
     out << "require_min_compat_client "
-       << ceph_release_name(require_min_compat_client) << "\n";
+       << require_min_compat_client << "\n";
   }
-  out << "min_compat_client " << ceph_release_name(get_min_compat_client())
+  out << "min_compat_client " << get_min_compat_client()
       << "\n";
-  if (require_osd_release > 0) {
-    out << "require_osd_release " << ceph_release_name(require_osd_release)
+  if (require_osd_release > ceph_release_t::unknown) {
+    out << "require_osd_release " << require_osd_release
        << "\n";
   }
   if (get_cluster_snapshot().length())
@@ -3522,25 +3811,7 @@ void OSDMap::print(ostream& out) const
   print_pools(out);
 
   out << "max_osd " << get_max_osd() << "\n";
-  for (int i=0; i<get_max_osd(); i++) {
-    if (exists(i)) {
-      out << "osd." << i;
-      out << (is_up(i) ? " up  ":" down");
-      out << (is_in(i) ? " in ":" out");
-      out << " weight " << get_weightf(i);
-      if (get_primary_affinity(i) != CEPH_OSD_DEFAULT_PRIMARY_AFFINITY)
-       out << " primary_affinity " << get_primary_affinityf(i);
-      const osd_info_t& info(get_info(i));
-      out << " " << info;
-      out << " " << get_addrs(i) << " " << get_cluster_addrs(i);
-      set<string> st;
-      get_state(i, st);
-      out << " " << st;
-      if (!get_uuid(i).is_zero())
-       out << " " << get_uuid(i);
-      out << "\n";
-    }
-  }
+  print_osds(out);
   out << std::endl;
 
   for (auto& p : pg_upmap) {
@@ -3743,15 +4014,13 @@ void OSDMap::print_summary(Formatter *f, ostream& out,
                           const string& prefix, bool extra) const
 {
   if (f) {
-    f->open_object_section("osdmap");
     f->dump_int("epoch", get_epoch());
     f->dump_int("num_osds", get_num_osds());
     f->dump_int("num_up_osds", get_num_up_osds());
+    f->dump_int("osd_up_since", last_up_change.to_msec() / 1000);
     f->dump_int("num_in_osds", get_num_in_osds());
-    f->dump_bool("full", test_flag(CEPH_OSDMAP_FULL) ? true : false);
-    f->dump_bool("nearfull", test_flag(CEPH_OSDMAP_NEARFULL) ? true : false);
+    f->dump_int("osd_in_since", last_in_change.to_msec() / 1000);
     f->dump_unsigned("num_remapped_pgs", get_num_pg_temp());
-    f->close_section();
   } else {
     utime_t now = ceph_clock_now();
     out << get_num_osds() << " osds: "
@@ -3780,10 +4049,6 @@ void OSDMap::print_oneline_summary(ostream& out) const
       << get_num_osds() << " total, "
       << get_num_up_osds() << " up, "
       << get_num_in_osds() << " in";
-  if (test_flag(CEPH_OSDMAP_FULL))
-    out << "; full flag set";
-  else if (test_flag(CEPH_OSDMAP_NEARFULL))
-    out << "; nearfull flag set";
 }
 
 bool OSDMap::crush_rule_in_use(int rule_id) const
@@ -3913,9 +4178,13 @@ int OSDMap::build_simple_optioned(CephContext *cct, epoch_t e, uuid_d &fsid,
       pools[pool].last_change = epoch;
       pools[pool].application_metadata.insert(
         {pg_pool_t::APPLICATION_NAME_RBD, {}});
-      auto m = pg_pool_t::get_pg_autoscale_mode_by_name(
-        cct->_conf.get_val<string>("osd_pool_default_pg_autoscale_mode"));
-      pools[pool].pg_autoscale_mode = m >= 0 ? m : 0;
+      if (auto m = pg_pool_t::get_pg_autoscale_mode_by_name(
+            cct->_conf.get_val<string>("osd_pool_default_pg_autoscale_mode"));
+         m != pg_pool_t::pg_autoscale_mode_t::UNKNOWN) {
+       pools[pool].pg_autoscale_mode = m;
+      } else {
+       pools[pool].pg_autoscale_mode = pg_pool_t::pg_autoscale_mode_t::OFF;
+      }
       pool_name[pool] = plname;
       name_pool[plname] = pool;
     }
@@ -4245,61 +4514,12 @@ int OSDMap::summarize_mapping_stats(
   return 0;
 }
 
-
-int OSDMap::clean_pg_upmaps(
-  CephContext *cct,
-  Incremental *pending_inc) const
-{
-  ldout(cct, 10) << __func__ << dendl;
-  int changed = 0;
-  for (auto& p : pg_upmap) {
-    vector<int> raw;
-    int primary;
-    pg_to_raw_osds(p.first, &raw, &primary);
-    if (raw == p.second) {
-      ldout(cct, 10) << " removing redundant pg_upmap " << p.first << " "
-                    << p.second << dendl;
-      pending_inc->old_pg_upmap.insert(p.first);
-      ++changed;
-    }
-  }
-  for (auto& p : pg_upmap_items) {
-    vector<int> raw;
-    int primary;
-    pg_to_raw_osds(p.first, &raw, &primary);
-    mempool::osdmap::vector<pair<int,int>> newmap;
-    for (auto& q : p.second) {
-      if (std::find(raw.begin(), raw.end(), q.first) == raw.end()) {
-        // cancel mapping if source osd does not exist anymore
-        continue;
-      }
-      if (q.second != CRUSH_ITEM_NONE && q.second < max_osd &&
-          q.second >= 0 && osd_weight[q.second] == 0) {
-        // cancel mapping if target osd is out
-        continue;
-      }
-      newmap.push_back(q);
-    }
-    if (newmap.empty()) {
-      ldout(cct, 10) << " removing no-op pg_upmap_items " << p.first << " "
-                    << p.second << dendl;
-      pending_inc->old_pg_upmap_items.insert(p.first);
-      ++changed;
-    } else if (newmap != p.second) {
-      ldout(cct, 10) << " simplifying partially no-op pg_upmap_items "
-                    << p.first << " " << p.second << " -> " << newmap << dendl;
-      pending_inc->new_pg_upmap_items[p.first] = newmap;
-      ++changed;
-    }
-  }
-  return changed;
-}
-
 bool OSDMap::try_pg_upmap(
   CephContext *cct,
   pg_t pg,                       ///< pg to potentially remap
   const set<int>& overfull,      ///< osds we'd want to evacuate
   const vector<int>& underfull,  ///< osds to move to, in order of preference
+  const vector<int>& more_underfull,  ///< more osds only slightly underfull
   vector<int> *orig,
   vector<int> *out)              ///< resulting alternative mapping
 {
@@ -4328,6 +4548,7 @@ bool OSDMap::try_pg_upmap(
     rule,
     pool->get_size(),
     overfull, underfull,
+    more_underfull,
     *orig,
     out);
   if (r < 0)
@@ -4339,13 +4560,16 @@ bool OSDMap::try_pg_upmap(
 
 int OSDMap::calc_pg_upmaps(
   CephContext *cct,
-  float max_deviation_ratio,
+  uint32_t max_deviation,
   int max,
   const set<int64_t>& only_pools,
   OSDMap::Incremental *pending_inc)
 {
   ldout(cct, 10) << __func__ << " pools " << only_pools << dendl;
   OSDMap tmp;
+  // Can't be less than 1 pg
+  if (max_deviation < 1)
+    max_deviation = 1;
   tmp.deepish_copy_from(*this);
   int num_changed = 0;
   map<int,set<pg_t>> pgs_by_osd;
@@ -4407,10 +4631,10 @@ int OSDMap::calc_pg_upmaps(
     lderr(cct) << __func__ << " abort due to max <= 0" << dendl;
     return 0;
   }
-  float decay_factor = 1.0 / float(max);
   float stddev = 0;
   map<int,float> osd_deviation;       // osd, deviation(pgs)
   multimap<float,int> deviation_osd;  // deviation(pgs), osd
+  float cur_max_deviation = 0;
   for (auto& i : pgs_by_osd) {
     // make sure osd is still there (belongs to this crush-tree)
     ceph_assert(osd_weight.count(i.first));
@@ -4424,8 +4648,11 @@ int OSDMap::calc_pg_upmaps(
     osd_deviation[i.first] = deviation;
     deviation_osd.insert(make_pair(deviation, i.first));
     stddev += deviation * deviation;
+    if (fabsf(deviation) > cur_max_deviation)
+      cur_max_deviation = fabsf(deviation);
   }
-  if (stddev <= cct->_conf.get_val<double>("osd_calc_pg_upmaps_max_stddev")) {
+  ldout(cct, 20) << " stdev " << stddev << " max_deviation " << cur_max_deviation << dendl;
+  if (cur_max_deviation <= max_deviation) {
     ldout(cct, 10) << __func__ << " distribution is almost perfect"
                    << dendl;
     return 0;
@@ -4436,55 +4663,45 @@ int OSDMap::calc_pg_upmaps(
   auto local_fallback_retries =
     cct->_conf.get_val<uint64_t>("osd_calc_pg_upmaps_local_fallback_retries");
   while (max--) {
+    ldout(cct, 30) << "Top of loop #" << max+1 << dendl;
     // build overfull and underfull
     set<int> overfull;
+    set<int> more_overfull;
+    bool using_more_overfull = false;
     vector<int> underfull;
-    float decay = 0;
-    int decay_count = 0;
-    while (overfull.empty()) {
-      for (auto i = deviation_osd.rbegin(); i != deviation_osd.rend(); i++) {
-        if (i->first >= (1.0 - decay))
+    vector<int> more_underfull;
+    for (auto i = deviation_osd.rbegin(); i != deviation_osd.rend(); i++) {
+        ldout(cct, 30) << " check " << i->first << " <= " << max_deviation << dendl;
+       if (i->first <= 0)
+         break;
+        if (i->first > max_deviation) {
+         ldout(cct, 30) << " add overfull osd." << i->second << dendl;
           overfull.insert(i->second);
+       } else {
+          more_overfull.insert(i->second);
+       }
       }
-      if (!overfull.empty())
-        break;
-      decay_count++;
-      decay = decay_factor * decay_count;
-      if (decay >= 1.0)
-        break;
-      ldout(cct, 30) << " decay_factor = " << decay_factor
-                     << " decay_count = " << decay_count
-                     << " decay (overfull) = " << decay
-                     << dendl;
-    }
-    if (overfull.empty()) {
-      lderr(cct) << __func__ << " failed to build overfull" << dendl;
-      break;
-    }
 
-    decay = 0;
-    decay_count = 0;
-    while (underfull.empty()) {
-      for (auto i = deviation_osd.begin(); i != deviation_osd.end(); i++) {
-        if (i->first >= (-.999 + decay))
+    for (auto i = deviation_osd.begin(); i != deviation_osd.end(); i++) {
+        ldout(cct, 30) << " check " << i->first << " >= " << -(int)max_deviation << dendl;
+        if (i->first >= 0)
           break;
-        underfull.push_back(i->second);
-      }
-      if (!underfull.empty())
-        break;
-      decay_count++;
-      decay = decay_factor * decay_count;
-      if (decay >= .999)
-        break;
-      ldout(cct, 30) << " decay_factor = " << decay_factor
-                     << " decay_count = " << decay_count
-                     << " decay (underfull) = " << decay
-                     << dendl;
+        if (i->first < -(int)max_deviation) {
+         ldout(cct, 30) << " add underfull osd." << i->second << dendl;
+          underfull.push_back(i->second);
+       } else {
+          more_underfull.push_back(i->second);
+       }
     }
-    if (underfull.empty()) {
-      lderr(cct) << __func__ << " failed to build underfull" << dendl;
+    if (underfull.empty() && overfull.empty()) {
+      ldout(cct, 20) << __func__ << " failed to build overfull and underfull" << dendl;
       break;
     }
+    if (overfull.empty() && !underfull.empty()) {
+      ldout(cct, 20) << __func__ << " Using more_overfull since we still have underfull" << dendl;
+      overfull = more_overfull;
+      using_more_overfull = true;
+    }
 
     ldout(cct, 10) << " overfull " << overfull
                    << " underfull " << underfull
@@ -4499,21 +4716,29 @@ int OSDMap::calc_pg_upmaps(
     auto temp_pgs_by_osd = pgs_by_osd;
     // always start with fullest, break if we find any changes to make
     for (auto p = deviation_osd.rbegin(); p != deviation_osd.rend(); ++p) {
-      if (skip_overfull) {
+      if (skip_overfull && !underfull.empty()) {
         ldout(cct, 10) << " skipping overfull " << dendl;
         break; // fall through to check underfull
       }
       int osd = p->second;
       float deviation = p->first;
+      if (deviation < 0) {
+        ldout(cct, 10) << " hitting underfull osds now"
+                       << " when trying to remap overfull osds"
+                       << dendl;
+        break;
+      }
       float target = osd_weight[osd] * pgs_per_weight;
+      ldout(cct, 10) << " Overfull search osd." << osd
+                       << " target " << target
+                       << " deviation " << deviation
+                      << dendl;
       ceph_assert(target > 0);
-      float deviation_ratio = deviation / target;
-      if (deviation_ratio < max_deviation_ratio) {
+      if (!using_more_overfull && deviation <= max_deviation) {
        ldout(cct, 10) << " osd." << osd
                        << " target " << target
                        << " deviation " << deviation
-                       << " -> ratio " << deviation_ratio
-                       << " < max ratio " << max_deviation_ratio
+                       << " < max deviation " << max_deviation
                        << dendl;
        break;
       }
@@ -4607,9 +4832,9 @@ int OSDMap::calc_pg_upmaps(
           // to see if we can append more remapping pairs
         }
        ldout(cct, 10) << " trying " << pg << dendl;
-       vector<int> orig, out;
-        tmp.pg_to_raw_upmap(pg, &orig); // including existing upmaps too
-       if (!try_pg_upmap(cct, pg, overfull, underfull, &orig, &out)) {
+        vector<int> raw, orig, out;
+        tmp.pg_to_raw_upmap(pg, &raw, &orig); // including existing upmaps too
+       if (!try_pg_upmap(cct, pg, overfull, underfull, more_underfull, &orig, &out)) {
          continue;
        }
        ldout(cct, 10) << " " << pg << " " << orig << " -> " << out << dendl;
@@ -4617,13 +4842,24 @@ int OSDMap::calc_pg_upmaps(
          continue;
        }
        ceph_assert(orig != out);
+       int pos = -1;
+       float max_dev = 0;
        for (unsigned i = 0; i < out.size(); ++i) {
           if (orig[i] == out[i])
             continue; // skip invalid remappings
           if (existing.count(orig[i]) || existing.count(out[i]))
             continue; // we want new remappings only!
+         if (osd_deviation[orig[i]] > max_dev) {
+           max_dev = osd_deviation[orig[i]];
+           pos = i;
+           ldout(cct, 30) << "Max osd." << orig[i] << " pos " << i << " dev " << osd_deviation[orig[i]] << dendl;
+         }
+       }
+       if (pos != -1) {
+         int i = pos;
           ldout(cct, 10) << " will try adding new remapping pair "
                          << orig[i] << " -> " << out[i] << " for " << pg
+                        << (orig[i] != osd ? " NOT selected osd" : "")
                          << dendl;
           existing.insert(orig[i]);
           existing.insert(out[i]);
@@ -4652,14 +4888,13 @@ int OSDMap::calc_pg_upmaps(
       float deviation = p.first;
       float target = osd_weight[osd] * pgs_per_weight;
       ceph_assert(target > 0);
-      float deviation_ratio = abs(deviation / target);
-      if (deviation_ratio < max_deviation_ratio) {
-        // respect max_deviation_ratio too
+      if (fabsf(deviation) < max_deviation) {
+        // respect max_deviation too
         ldout(cct, 10) << " osd." << osd
                        << " target " << target
                        << " deviation " << deviation
-                       << " -> absolute ratio " << deviation_ratio
-                       << " < max ratio " << max_deviation_ratio
+                       << " -> absolute " << fabsf(deviation)
+                       << " < max " << max_deviation
                        << dendl;
         break;
       }
@@ -4745,6 +4980,7 @@ int OSDMap::calc_pg_upmaps(
     float new_stddev = 0;
     map<int,float> temp_osd_deviation;
     multimap<float,int> temp_deviation_osd;
+    float cur_max_deviation = 0;
     for (auto& i : temp_pgs_by_osd) {
       // make sure osd is still there (belongs to this crush-tree)
       ceph_assert(osd_weight.count(i.first));
@@ -4757,7 +4993,9 @@ int OSDMap::calc_pg_upmaps(
                      << dendl;
       temp_osd_deviation[i.first] = deviation;
       temp_deviation_osd.insert(make_pair(deviation, i.first));
-      new_stddev += deviation * deviation;
+       new_stddev += deviation * deviation;
+      if (fabsf(deviation) > cur_max_deviation)
+        cur_max_deviation = fabsf(deviation);
     }
     ldout(cct, 10) << " stddev " << stddev << " -> " << new_stddev << dendl;
     if (new_stddev >= stddev) {
@@ -4809,6 +5047,12 @@ int OSDMap::calc_pg_upmaps(
       pending_inc->new_pg_upmap_items[i.first] = i.second;
       ++num_changed;
     }
+    ldout(cct, 20) << " stdev " << stddev << " max_deviation " << cur_max_deviation << dendl;
+    if (cur_max_deviation <= max_deviation) {
+      ldout(cct, 10) << __func__ << " Optimization plan is almost perfect"
+                     << dendl;
+      break;
+    }
   }
   ldout(cct, 10) << " num_changed = " << num_changed << dendl;
   return num_changed;
@@ -4851,23 +5095,32 @@ public:
 
   OSDUtilizationDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
                        const PGMap& pgmap_, bool tree_,
-                       const string& class_name_,
-                       const string& item_name_) :
+                       const string& filter) :
     Parent(crush, osdmap_->get_pool_names()),
     osdmap(osdmap_),
     pgmap(pgmap_),
     tree(tree_),
-    class_name(class_name_),
-    item_name(item_name_),
     min_var(-1),
     max_var(-1),
     stddev(0),
     sum(0) {
-    if (osdmap->crush->name_exists(item_name)) {
-      // filter out items we are allowed to dump
-      auto item_id = osdmap->crush->get_item_id(item_name);
+    if (osdmap->crush->name_exists(filter)) {
+      // filter by crush node
+      auto item_id = osdmap->crush->get_item_id(filter);
       allowed.insert(item_id);
       osdmap->crush->get_all_children(item_id, &allowed);
+    } else if (osdmap->crush->class_exists(filter)) {
+      // filter by device class
+      class_id = osdmap->crush->get_class_id(filter);
+    } else if (auto pool_id = osdmap->lookup_pg_pool_name(filter);
+               pool_id >= 0) {
+      // filter by pool
+      auto crush_rule = osdmap->get_pool_crush_rule(pool_id);
+      set<int> roots;
+      osdmap->crush->find_takes_by_rule(crush_rule, &roots);
+      allowed = roots;
+      for (auto r : roots)
+        osdmap->crush->get_all_children(r, &allowed);
     }
     average_util = average_utilization();
   }
@@ -4877,18 +5130,17 @@ protected:
   bool should_dump(int id) const {
     if (!allowed.empty() && !allowed.count(id)) // filter by name
       return false;
-    if (id >= 0 && !class_name.empty()) {
-      const char* item_class_name = osdmap->crush->get_item_class(id);
-      if (!item_class_name || // not bound to a class yet
-           item_class_name != class_name) // or already bound to
-                                          // a different class
+    if (id >= 0 && class_id >= 0) {
+      auto item_class_id = osdmap->crush->get_item_class_id(id);
+      if (item_class_id < 0 || // not bound to a class yet
+          item_class_id != class_id) // or already bound to a different class
         return false;
     }
     return true;
   }
 
   set<int> get_dumped_osds() {
-    if (class_name.empty() && item_name.empty()) {
+    if (allowed.empty() && class_id < 0) {
       // old way, all
       return {};
     }
@@ -4903,7 +5155,7 @@ protected:
   }
 
   void dump_item(const CrushTreeDumper::Item &qi, F *f) override {
-    if (!tree && qi.is_bucket())
+    if (!tree && (qi.is_bucket() || dumped_osds.count(qi.id)))
       return;
     if (!should_dump(qi.id))
       return;
@@ -4991,7 +5243,7 @@ protected:
     *kb_used_meta = p->statfs.kb_used_internal_metadata();
     *kb_avail = p->statfs.kb_avail();
     
-    return *kb > 0;
+    return true;
   }
 
   bool get_bucket_utilization(int id, int64_t* kb, int64_t* kb_used,
@@ -5035,20 +5287,19 @@ protected:
       *kb_used_meta += kb_used_meta_i;
       *kb_avail += kb_avail_i;
     }
-    return *kb > 0;
+    return true;
   }
 
 protected:
   const OSDMap *osdmap;
   const PGMap& pgmap;
   bool tree;
-  const string class_name;
-  const string item_name;
   double average_util;
   double min_var;
   double max_var;
   double stddev;
   double sum;
+  int class_id = -1;
   set<int> allowed;
   set<int> dumped_osds;
 };
@@ -5060,9 +5311,8 @@ public:
 
   OSDUtilizationPlainDumper(const CrushWrapper *crush, const OSDMap *osdmap,
                             const PGMap& pgmap, bool tree,
-                            const string& class_name,
-                            const string& item_name) :
-    Parent(crush, osdmap, pgmap, tree, class_name, item_name) {}
+                            const string& filter) :
+    Parent(crush, osdmap, pgmap, tree, filter) {}
 
   void dump(TextTable *tbl) {
     tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
@@ -5197,9 +5447,8 @@ public:
 
   OSDUtilizationFormatDumper(const CrushWrapper *crush, const OSDMap *osdmap,
                              const PGMap& pgmap, bool tree,
-                             const string& class_name,
-                             const string& item_name) :
-    Parent(crush, osdmap, pgmap, tree, class_name, item_name) {}
+                             const string& filter) :
+    Parent(crush, osdmap, pgmap, tree, filter) {}
 
   void dump(Formatter *f) {
     f->open_array_section("nodes");
@@ -5275,28 +5524,26 @@ void print_osd_utilization(const OSDMap& osdmap,
                            ostream& out,
                            Formatter *f,
                            bool tree,
-                           const string& class_name,
-                           const string& item_name)
+                           const string& filter)
 {
   const CrushWrapper *crush = osdmap.crush.get();
   if (f) {
     f->open_object_section("df");
-    OSDUtilizationFormatDumper d(crush, &osdmap, pgmap, tree,
-                                 class_name, item_name);
+    OSDUtilizationFormatDumper d(crush, &osdmap, pgmap, tree, filter);
     d.dump(f);
     d.summary(f);
     f->close_section();
     f->flush(out);
   } else {
-    OSDUtilizationPlainDumper d(crush, &osdmap, pgmap, tree,
-                                class_name, item_name);
+    OSDUtilizationPlainDumper d(crush, &osdmap, pgmap, tree, filter);
     TextTable tbl;
     d.dump(&tbl);
     out << tbl << d.summary() << "\n";
   }
 }
 
-void OSDMap::check_health(health_check_map_t *checks) const
+void OSDMap::check_health(CephContext *cct,
+                         health_check_map_t *checks) const
 {
   int num_osds = get_num_osds();
 
@@ -5341,7 +5588,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
            break;
          type = crush->get_bucket_type(parent_id);
          if (!subtree_type_is_down(
-               g_ceph_context, parent_id, type,
+               cct, parent_id, type,
                &down_in_osds, &up_in_osds, &subtree_up, &subtree_type_down))
            break;
          current = parent_id;
@@ -5396,7 +5643,8 @@ void OSDMap::check_health(health_check_map_t *checks) const
          string err = string("OSD_") +
            string(crush->get_type_name(type)) + "_DOWN";
          boost::to_upper(err);
-         auto& d = checks->add(err, HEALTH_WARN, ss.str());
+         auto& d = checks->add(err, HEALTH_WARN, ss.str(),
+                               subtree_type_down[type].size());
          for (auto j = subtree_type_down[type].rbegin();
               j != subtree_type_down[type].rend();
               ++j) {
@@ -5419,7 +5667,8 @@ void OSDMap::check_health(health_check_map_t *checks) const
       }
       ostringstream ss;
       ss << down_in_osds.size() << " osds down";
-      auto& d = checks->add("OSD_DOWN", HEALTH_WARN, ss.str());
+      auto& d = checks->add("OSD_DOWN", HEALTH_WARN, ss.str(),
+                           down_in_osds.size());
       for (auto it = down_in_osds.begin(); it != down_in_osds.end(); ++it) {
        ostringstream ss;
        ss << "osd." << *it << " (";
@@ -5432,7 +5681,8 @@ void OSDMap::check_health(health_check_map_t *checks) const
     if (!osds.empty()) {
       ostringstream ss;
       ss << osds.size() << " osds exist in the crush map but not in the osdmap";
-      auto& d = checks->add("OSD_ORPHAN", HEALTH_WARN, ss.str());
+      auto& d = checks->add("OSD_ORPHAN", HEALTH_WARN, ss.str(),
+                           osds.size());
       for (auto osd : osds) {
        ostringstream ss;
        ss << "osd." << osd << " exists in crush map but not in osdmap";
@@ -5441,11 +5691,36 @@ void OSDMap::check_health(health_check_map_t *checks) const
     }
   }
 
+  std::list<std::string> scrub_messages;
+  bool noscrub = false, nodeepscrub = false;
+  for (const auto &p : pools) {
+    if (p.second.flags & pg_pool_t::FLAG_NOSCRUB) {
+      ostringstream ss;
+      ss << "Pool " << get_pool_name(p.first) << " has noscrub flag";
+      scrub_messages.push_back(ss.str());
+      noscrub = true;
+    }
+    if (p.second.flags & pg_pool_t::FLAG_NODEEP_SCRUB) {
+      ostringstream ss;
+      ss << "Pool " << get_pool_name(p.first) << " has nodeep-scrub flag";
+      scrub_messages.push_back(ss.str());
+      nodeepscrub = true;
+    }
+  }
+  if (noscrub || nodeepscrub) {
+    string out = "";
+    out += noscrub ? string("noscrub") + (nodeepscrub ? ", " : "") : "";
+    out += nodeepscrub ? "nodeep-scrub" : "";
+    auto& d = checks->add("POOL_SCRUB_FLAGS", HEALTH_OK,
+                         "Some pool(s) have the " + out + " flag(s) set", 0);
+    d.detail.splice(d.detail.end(), scrub_messages);
+  }
+
   // OSD_OUT_OF_ORDER_FULL
   {
     // An osd could configure failsafe ratio, to something different
     // but for now assume it is the same here.
-    float fsr = g_conf()->osd_failsafe_full_ratio;
+    float fsr = cct->_conf->osd_failsafe_full_ratio;
     if (fsr > 1.0) fsr /= 100;
     float fr = get_full_ratio();
     float br = get_backfillfull_ratio();
@@ -5476,7 +5751,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
     }
     if (!detail.empty()) {
       auto& d = checks->add("OSD_OUT_OF_ORDER_FULL", HEALTH_ERR,
-                        "full ratio(s) out of order");
+                           "full ratio(s) out of order", 0);
       d.detail.swap(detail);
     }
   }
@@ -5491,7 +5766,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
     if (full.size()) {
       ostringstream ss;
       ss << full.size() << " full osd(s)";
-      auto& d = checks->add("OSD_FULL", HEALTH_ERR, ss.str());
+      auto& d = checks->add("OSD_FULL", HEALTH_ERR, ss.str(), full.size());
       for (auto& i: full) {
        ostringstream ss;
        ss << "osd." << i << " is full";
@@ -5501,7 +5776,8 @@ void OSDMap::check_health(health_check_map_t *checks) const
     if (backfillfull.size()) {
       ostringstream ss;
       ss << backfillfull.size() << " backfillfull osd(s)";
-      auto& d = checks->add("OSD_BACKFILLFULL", HEALTH_WARN, ss.str());
+      auto& d = checks->add("OSD_BACKFILLFULL", HEALTH_WARN, ss.str(),
+                           backfillfull.size());
       for (auto& i: backfillfull) {
        ostringstream ss;
        ss << "osd." << i << " is backfill full";
@@ -5511,7 +5787,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
     if (nearfull.size()) {
       ostringstream ss;
       ss << nearfull.size() << " nearfull osd(s)";
-      auto& d = checks->add("OSD_NEARFULL", HEALTH_WARN, ss.str());
+      auto& d = checks->add("OSD_NEARFULL", HEALTH_WARN, ss.str(), nearfull.size());
       for (auto& i: nearfull) {
        ostringstream ss;
        ss << "osd." << i << " is near full";
@@ -5524,8 +5800,6 @@ void OSDMap::check_health(health_check_map_t *checks) const
   {
     // warn about flags
     uint64_t warn_flags =
-      CEPH_OSDMAP_NEARFULL |
-      CEPH_OSDMAP_FULL |
       CEPH_OSDMAP_PAUSERD |
       CEPH_OSDMAP_PAUSEWR |
       CEPH_OSDMAP_PAUSEREC |
@@ -5542,9 +5816,10 @@ void OSDMap::check_health(health_check_map_t *checks) const
       CEPH_OSDMAP_NOREBALANCE;
     if (test_flag(warn_flags)) {
       ostringstream ss;
-      ss << get_flag_string(get_flags() & warn_flags)
-        << " flag(s) set";
-      checks->add("OSDMAP_FLAGS", HEALTH_WARN, ss.str());
+      string s = get_flag_string(get_flags() & warn_flags);
+      ss << s << " flag(s) set";
+      checks->add("OSDMAP_FLAGS", HEALTH_WARN, ss.str(),
+                 s.size() /* kludgey but sufficient */);
     }
   }
 
@@ -5565,43 +5840,63 @@ void OSDMap::check_health(health_check_map_t *checks) const
        detail.push_back(ss.str());
       }
     }
+    for (auto& i : crush_node_flags) {
+      if (i.second && crush->item_exists(i.first)) {
+       ostringstream ss;
+       set<string> states;
+       OSDMap::calc_state_set(i.second, states);
+       int t = i.first >= 0 ? 0 : crush->get_bucket_type(i.first);
+       const char *tn = crush->get_type_name(t);
+       ss << (tn ? tn : "node") << " "
+          << crush->get_item_name(i.first) << " has flags " << states;
+       detail.push_back(ss.str());
+      }
+    }
+    for (auto& i : device_class_flags) {
+      const char* class_name = crush->get_class_name(i.first);
+      if (i.second && class_name) {
+        ostringstream ss;
+        set<string> states;
+        OSDMap::calc_state_set(i.second, states);
+        ss << "device class '" << class_name << "' has flags " << states;
+        detail.push_back(ss.str());
+      }
+    }
     if (!detail.empty()) {
       ostringstream ss;
-      ss << detail.size() << " osd(s) have {NOUP,NODOWN,NOIN,NOOUT} flags set";
-      auto& d = checks->add("OSD_FLAGS", HEALTH_WARN, ss.str());
+      ss << detail.size() << " OSDs or CRUSH {nodes, device-classes} have {NOUP,NODOWN,NOIN,NOOUT} flags set";
+      auto& d = checks->add("OSD_FLAGS", HEALTH_WARN, ss.str(), detail.size());
       d.detail.swap(detail);
     }
   }
 
   // OLD_CRUSH_TUNABLES
-  if (g_conf()->mon_warn_on_legacy_crush_tunables) {
+  if (cct->_conf->mon_warn_on_legacy_crush_tunables) {
     string min = crush->get_min_required_version();
-    if (min < g_conf()->mon_crush_min_required_version) {
+    if (min < cct->_conf->mon_crush_min_required_version) {
       ostringstream ss;
       ss << "crush map has legacy tunables (require " << min
-        << ", min is " << g_conf()->mon_crush_min_required_version << ")";
-      auto& d = checks->add("OLD_CRUSH_TUNABLES", HEALTH_WARN, ss.str());
+        << ", min is " << cct->_conf->mon_crush_min_required_version << ")";
+      auto& d = checks->add("OLD_CRUSH_TUNABLES", HEALTH_WARN, ss.str(), 0);
       d.detail.push_back("see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
     }
   }
 
   // OLD_CRUSH_STRAW_CALC_VERSION
-  if (g_conf()->mon_warn_on_crush_straw_calc_version_zero) {
+  if (cct->_conf->mon_warn_on_crush_straw_calc_version_zero) {
     if (crush->get_straw_calc_version() == 0) {
       ostringstream ss;
       ss << "crush map has straw_calc_version=0";
-      auto& d = checks->add("OLD_CRUSH_STRAW_CALC_VERSION", HEALTH_WARN, ss.str());
+      auto& d = checks->add("OLD_CRUSH_STRAW_CALC_VERSION", HEALTH_WARN, ss.str(), 0);
       d.detail.push_back(
        "see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
     }
   }
 
   // CACHE_POOL_NO_HIT_SET
-  if (g_conf()->mon_warn_on_cache_pools_without_hit_sets) {
+  if (cct->_conf->mon_warn_on_cache_pools_without_hit_sets) {
     list<string> detail;
-    for (map<int64_t, pg_pool_t>::const_iterator p = pools.begin();
-        p != pools.end();
-        ++p) {
+    for (auto p = pools.cbegin(); p != pools.cend(); ++p) {
       const pg_pool_t& info = p->second;
       if (info.cache_mode_requires_hit_set() &&
          info.hit_set_params.get_type() == HitSet::TYPE_NONE) {
@@ -5615,7 +5910,8 @@ void OSDMap::check_health(health_check_map_t *checks) const
     if (!detail.empty()) {
       ostringstream ss;
       ss << detail.size() << " cache pools are missing hit_sets";
-      auto& d = checks->add("CACHE_POOL_NO_HIT_SET", HEALTH_WARN, ss.str());
+      auto& d = checks->add("CACHE_POOL_NO_HIT_SET", HEALTH_WARN, ss.str(),
+                           detail.size());
       d.detail.swap(detail);
     }
   }
@@ -5624,7 +5920,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
   if (!test_flag(CEPH_OSDMAP_SORTBITWISE)) {
     ostringstream ss;
     ss << "'sortbitwise' flag is not set";
-    checks->add("OSD_NO_SORTBITWISE", HEALTH_WARN, ss.str());
+    checks->add("OSD_NO_SORTBITWISE", HEALTH_WARN, ss.str(), 0);
   }
 
   // OSD_UPGRADE_FINISHED
@@ -5659,22 +5955,66 @@ void OSDMap::check_health(health_check_map_t *checks) const
     if (!full_detail.empty()) {
       ostringstream ss;
       ss << full_detail.size() << " pool(s) full";
-      auto& d = checks->add("POOL_FULL", HEALTH_WARN, ss.str());
+      auto& d = checks->add("POOL_FULL", HEALTH_WARN, ss.str(), full_detail.size());
       d.detail.swap(full_detail);
     }
     if (!backfillfull_detail.empty()) {
       ostringstream ss;
       ss << backfillfull_detail.size() << " pool(s) backfillfull";
-      auto& d = checks->add("POOL_BACKFILLFULL", HEALTH_WARN, ss.str());
+      auto& d = checks->add("POOL_BACKFILLFULL", HEALTH_WARN, ss.str(),
+                           backfillfull_detail.size());
       d.detail.swap(backfillfull_detail);
     }
     if (!nearfull_detail.empty()) {
       ostringstream ss;
       ss << nearfull_detail.size() << " pool(s) nearfull";
-      auto& d = checks->add("POOL_NEARFULL", HEALTH_WARN, ss.str());
+      auto& d = checks->add("POOL_NEARFULL", HEALTH_WARN, ss.str(),
+                           nearfull_detail.size());
       d.detail.swap(nearfull_detail);
     }
   }
+
+  // POOL_PG_NUM_NOT_POWER_OF_TWO
+  if (cct->_conf.get_val<bool>("mon_warn_on_pool_pg_num_not_power_of_two")) {
+    list<string> detail;
+    for (auto it : get_pools()) {
+      if (!isp2(it.second.get_pg_num_target())) {
+       ostringstream ss;
+       ss << "pool '" << get_pool_name(it.first)
+          << "' pg_num " << it.second.get_pg_num_target()
+          << " is not a power of two";
+       detail.push_back(ss.str());
+      }
+    }
+    if (!detail.empty()) {
+      ostringstream ss;
+      ss << detail.size() << " pool(s) have non-power-of-two pg_num";
+      auto& d = checks->add("POOL_PG_NUM_NOT_POWER_OF_TWO", HEALTH_WARN,
+                           ss.str(), detail.size());
+      d.detail.swap(detail);
+    }
+  }
+
+  // POOL_NO_REDUNDANCY
+  if (cct->_conf.get_val<bool>("mon_warn_on_pool_no_redundancy"))
+  {
+    list<string> detail;
+    for (auto it : get_pools()) {
+      if (it.second.get_size() == 1) {
+        ostringstream ss;
+        ss << "pool '" << get_pool_name(it.first)
+           << "' has no replicas configured";
+        detail.push_back(ss.str());
+      }
+    }
+    if (!detail.empty()) {
+      ostringstream ss;
+      ss << detail.size() << " pool(s) have no replicas configured";
+      auto& d = checks->add("POOL_NO_REDUNDANCY", HEALTH_WARN,
+        ss.str(), detail.size());
+      d.detail.swap(detail);
+    }
+  }
 }
 
 int OSDMap::parse_osd_id_list(const vector<string>& ls, set<int> *out,
@@ -5687,7 +6027,7 @@ int OSDMap::parse_osd_id_list(const vector<string>& ls, set<int> *out,
       get_all_osds(*out);
       break;
     }
-    long osd = parse_osd_id(i->c_str(), ss);
+    long osd = TOPNSPC::common::parse_osd_id(i->c_str(), ss);
     if (osd < 0) {
       *ss << "invalid osd id '" << *i << "'";
       return -EINVAL;
@@ -5766,3 +6106,38 @@ float OSDMap::pool_raw_used_rate(int64_t poolid) const
     ceph_abort_msg("unrecognized pool type");
   }
 }
+
+unsigned OSDMap::get_osd_crush_node_flags(int osd) const
+{
+  unsigned flags = 0;
+  if (!crush_node_flags.empty()) {
+    // the map will contain type -> name
+    std::map<std::string,std::string> ploc = crush->get_full_location(osd);
+    for (auto& i : ploc) {
+      int id = crush->get_item_id(i.second);
+      auto p = crush_node_flags.find(id);
+      if (p != crush_node_flags.end()) {
+       flags |= p->second;
+      }
+    }
+  }
+  return flags;
+}
+
+unsigned OSDMap::get_crush_node_flags(int id) const
+{
+  unsigned flags = 0;
+  auto it = crush_node_flags.find(id);
+  if (it != crush_node_flags.end())
+    flags = it->second;
+  return flags;
+}
+
+unsigned OSDMap::get_device_class_flags(int id) const
+{
+  unsigned flags = 0;
+  auto it = device_class_flags.find(id);
+  if (it != device_class_flags.end())
+    flags = it->second;
+  return flags;
+}