]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSDMap.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / osd / OSDMap.cc
index f6cf25fd9b27f1e0b6f48b5f89dfe98fb610ba7e..e32a5bf2caa617c8b05ce79b3baa245866e76755 100644 (file)
  *
  */
 
+#include <algorithm>
+#include <optional>
+#include <random>
+
 #include <boost/algorithm/string.hpp>
 
 #include "OSDMap.h"
-#include <algorithm>
 #include "common/config.h"
 #include "common/errno.h"
 #include "common/Formatter.h"
 #include "common/TextTable.h"
+#include "global/global_context.h"
 #include "include/ceph_features.h"
 #include "include/str_map.h"
 
@@ -31,8 +35,8 @@
 
 #include "crush/CrushTreeDumper.h"
 #include "common/Clock.h"
-#include "mon/PGStatService.h"
+#include "mon/PGMap.h"
+
 #define dout_subsys ceph_subsys_osd
 
 MEMPOOL_DEFINE_OBJECT_FACTORY(OSDMap, osdmap, osdmap);
@@ -54,26 +58,28 @@ void osd_info_t::dump(Formatter *f) const
 
 void osd_info_t::encode(bufferlist& bl) const
 {
+  using ceph::encode;
   __u8 struct_v = 1;
-  ::encode(struct_v, bl);
-  ::encode(last_clean_begin, bl);
-  ::encode(last_clean_end, bl);
-  ::encode(up_from, bl);
-  ::encode(up_thru, bl);
-  ::encode(down_at, bl);
-  ::encode(lost_at, bl);
+  encode(struct_v, bl);
+  encode(last_clean_begin, bl);
+  encode(last_clean_end, bl);
+  encode(up_from, bl);
+  encode(up_thru, bl);
+  encode(down_at, bl);
+  encode(lost_at, bl);
 }
 
-void osd_info_t::decode(bufferlist::iterator& bl)
+void osd_info_t::decode(bufferlist::const_iterator& bl)
 {
+  using ceph::decode;
   __u8 struct_v;
-  ::decode(struct_v, bl);
-  ::decode(last_clean_begin, bl);
-  ::decode(last_clean_end, bl);
-  ::decode(up_from, bl);
-  ::decode(up_thru, bl);
-  ::decode(down_at, bl);
-  ::decode(lost_at, bl);
+  decode(struct_v, bl);
+  decode(last_clean_begin, bl);
+  decode(last_clean_end, bl);
+  decode(up_from, bl);
+  decode(up_thru, bl);
+  decode(down_at, bl);
+  decode(lost_at, bl);
 }
 
 void osd_info_t::generate_test_instances(list<osd_info_t*>& o)
@@ -114,29 +120,29 @@ void osd_xinfo_t::dump(Formatter *f) const
 void osd_xinfo_t::encode(bufferlist& bl) const
 {
   ENCODE_START(3, 1, bl);
-  ::encode(down_stamp, bl);
+  encode(down_stamp, bl);
   __u32 lp = laggy_probability * 0xfffffffful;
-  ::encode(lp, bl);
-  ::encode(laggy_interval, bl);
-  ::encode(features, bl);
-  ::encode(old_weight, bl);
+  encode(lp, bl);
+  encode(laggy_interval, bl);
+  encode(features, bl);
+  encode(old_weight, bl);
   ENCODE_FINISH(bl);
 }
 
-void osd_xinfo_t::decode(bufferlist::iterator& bl)
+void osd_xinfo_t::decode(bufferlist::const_iterator& bl)
 {
   DECODE_START(3, bl);
-  ::decode(down_stamp, bl);
+  decode(down_stamp, bl);
   __u32 lp;
-  ::decode(lp, bl);
+  decode(lp, bl);
   laggy_probability = (float)lp / (float)0xffffffff;
-  ::decode(laggy_interval, bl);
+  decode(laggy_interval, bl);
   if (struct_v >= 2)
-    ::decode(features, bl);
+    decode(features, bl);
   else
     features = 0;
   if (struct_v >= 3)
-    ::decode(old_weight, bl);
+    decode(old_weight, bl);
   else
     old_weight = 0;
   DECODE_FINISH(bl);
@@ -200,12 +206,14 @@ int OSDMap::Incremental::identify_osd(uuid_d u) const
 int OSDMap::Incremental::propagate_snaps_to_tiers(CephContext *cct,
                                                  const OSDMap& osdmap)
 {
-  assert(epoch == osdmap.get_epoch() + 1);
+  ceph_assert(epoch == osdmap.get_epoch() + 1);
 
   for (auto &new_pool : new_pools) {
     if (!new_pool.second.tiers.empty()) {
       pg_pool_t& base = new_pool.second;
 
+      auto new_rem_it = new_removed_snaps.find(new_pool.first);
+
       for (const auto &tier_pool : base.tiers) {
        const auto &r = new_pools.find(tier_pool);
        pg_pool_t *tier = 0;
@@ -230,6 +238,12 @@ int OSDMap::Incremental::propagate_snaps_to_tiers(CephContext *cct,
        tier->snap_epoch = base.snap_epoch;
        tier->snaps = base.snaps;
        tier->removed_snaps = base.removed_snaps;
+       tier->flags |= base.flags & (pg_pool_t::FLAG_SELFMANAGED_SNAPS|
+                                    pg_pool_t::FLAG_POOL_SNAPS);
+
+       if (new_rem_it != new_removed_snaps.end()) {
+         new_removed_snaps[tier_pool] = new_rem_it->second;
+       }
       }
     }
   }
@@ -280,7 +294,7 @@ bool OSDMap::containing_subtree_is_down(CephContext *cct, int id, int subtree_ty
     } else {
       type = crush->get_bucket_type(current);
     }
-    assert(type >= 0);
+    ceph_assert(type >= 0);
 
     if (!subtree_is_down(current, down_cache)) {
       ldout(cct, 30) << "containing_subtree_is_down(" << id << ") = false" << dendl;
@@ -344,66 +358,68 @@ bool OSDMap::subtree_type_is_down(
 
 void OSDMap::Incremental::encode_client_old(bufferlist& bl) const
 {
+  using ceph::encode;
   __u16 v = 5;
-  ::encode(v, bl);
-  ::encode(fsid, bl);
-  ::encode(epoch, bl);
-  ::encode(modified, bl);
+  encode(v, bl);
+  encode(fsid, bl);
+  encode(epoch, bl);
+  encode(modified, bl);
   int32_t new_t = new_pool_max;
-  ::encode(new_t, bl);
-  ::encode(new_flags, bl);
-  ::encode(fullmap, bl);
-  ::encode(crush, bl);
+  encode(new_t, bl);
+  encode(new_flags, bl);
+  encode(fullmap, bl);
+  encode(crush, bl);
 
-  ::encode(new_max_osd, bl);
-  // for ::encode(new_pools, bl);
+  encode(new_max_osd, bl);
+  // for encode(new_pools, bl);
   __u32 n = new_pools.size();
-  ::encode(n, bl);
+  encode(n, bl);
   for (const auto &new_pool : new_pools) {
     n = new_pool.first;
-    ::encode(n, bl);
-    ::encode(new_pool.second, bl, 0);
+    encode(n, bl);
+    encode(new_pool.second, bl, 0);
   }
-  // for ::encode(new_pool_names, bl);
+  // for encode(new_pool_names, bl);
   n = new_pool_names.size();
-  ::encode(n, bl);
+  encode(n, bl);
 
   for (const auto &new_pool_name : new_pool_names) {
     n = new_pool_name.first;
-    ::encode(n, bl);
-    ::encode(new_pool_name.second, bl);
+    encode(n, bl);
+    encode(new_pool_name.second, bl);
   }
-  // for ::encode(old_pools, bl);
+  // for encode(old_pools, bl);
   n = old_pools.size();
-  ::encode(n, bl);
+  encode(n, bl);
   for (auto &old_pool : old_pools) {
     n = old_pool;
-    ::encode(n, bl);
+    encode(n, bl);
   }
-  ::encode(new_up_client, bl, 0);
+  encode(new_up_client, bl, 0);
   {
     // legacy is map<int32_t,uint8_t>
     uint32_t n = new_state.size();
-    ::encode(n, bl);
+    encode(n, bl);
     for (auto p : new_state) {
-      ::encode(p.first, bl);
-      ::encode((uint8_t)p.second, bl);
+      encode(p.first, bl);
+      encode((uint8_t)p.second, bl);
     }
   }
-  ::encode(new_weight, bl);
-  // for ::encode(new_pg_temp, bl);
+  encode(new_weight, bl);
+  // for encode(new_pg_temp, bl);
   n = new_pg_temp.size();
-  ::encode(n, bl);
+  encode(n, bl);
 
   for (const auto &pg_temp : new_pg_temp) {
     old_pg_t opg = pg_temp.first.get_old_pg();
-    ::encode(opg, bl);
-    ::encode(pg_temp.second, bl);
+    encode(opg, bl);
+    encode(pg_temp.second, bl);
   }
 }
 
 void OSDMap::Incremental::encode_classic(bufferlist& bl, uint64_t features) const
 {
+  using ceph::encode;
   if ((features & CEPH_FEATURE_PGID64) == 0) {
     encode_client_old(bl);
     return;
@@ -411,49 +427,79 @@ void OSDMap::Incremental::encode_classic(bufferlist& bl, uint64_t features) cons
 
   // base
   __u16 v = 6;
-  ::encode(v, bl);
-  ::encode(fsid, bl);
-  ::encode(epoch, bl);
-  ::encode(modified, bl);
-  ::encode(new_pool_max, bl);
-  ::encode(new_flags, bl);
-  ::encode(fullmap, bl);
-  ::encode(crush, bl);
-
-  ::encode(new_max_osd, bl);
-  ::encode(new_pools, bl, features);
-  ::encode(new_pool_names, bl);
-  ::encode(old_pools, bl);
-  ::encode(new_up_client, bl, features);
+  encode(v, bl);
+  encode(fsid, bl);
+  encode(epoch, bl);
+  encode(modified, bl);
+  encode(new_pool_max, bl);
+  encode(new_flags, bl);
+  encode(fullmap, bl);
+  encode(crush, bl);
+
+  encode(new_max_osd, bl);
+  encode(new_pools, bl, features);
+  encode(new_pool_names, bl);
+  encode(old_pools, bl);
+  encode(new_up_client, bl, features);
   {
     uint32_t n = new_state.size();
-    ::encode(n, bl);
+    encode(n, bl);
     for (auto p : new_state) {
-      ::encode(p.first, bl);
-      ::encode((uint8_t)p.second, bl);
+      encode(p.first, bl);
+      encode((uint8_t)p.second, bl);
     }
   }
-  ::encode(new_weight, bl);
-  ::encode(new_pg_temp, bl);
+  encode(new_weight, bl);
+  encode(new_pg_temp, bl);
 
   // extended
   __u16 ev = 10;
-  ::encode(ev, bl);
-  ::encode(new_hb_back_up, bl, features);
-  ::encode(new_up_thru, bl);
-  ::encode(new_last_clean_interval, bl);
-  ::encode(new_lost, bl);
-  ::encode(new_blacklist, bl, features);
-  ::encode(old_blacklist, bl, features);
-  ::encode(new_up_cluster, bl, features);
-  ::encode(cluster_snapshot, bl);
-  ::encode(new_uuid, bl);
-  ::encode(new_xinfo, bl);
-  ::encode(new_hb_front_up, bl, features);
+  encode(ev, bl);
+  encode(new_hb_back_up, bl, features);
+  encode(new_up_thru, bl);
+  encode(new_last_clean_interval, bl);
+  encode(new_lost, bl);
+  encode(new_blacklist, bl, features);
+  encode(old_blacklist, bl, features);
+  encode(new_up_cluster, bl, features);
+  encode(cluster_snapshot, bl);
+  encode(new_uuid, bl);
+  encode(new_xinfo, bl);
+  encode(new_hb_front_up, bl, features);
+}
+
+template<class T>
+static void encode_addrvec_map_as_addr(const T& m, bufferlist& bl, uint64_t f)
+{
+  uint32_t n = m.size();
+  encode(n, bl);
+  for (auto& i : m) {
+    encode(i.first, bl);
+    encode(i.second.legacy_addr(), bl, f);
+  }
 }
 
+template<class T>
+static void encode_addrvec_pvec_as_addr(const T& m, bufferlist& bl, uint64_t f)
+{
+  uint32_t n = m.size();
+  encode(n, bl);
+  for (auto& i : m) {
+    if (i) {
+      encode(i->legacy_addr(), bl, f);
+    } else {
+      encode(entity_addr_t(), bl, f);
+    }
+  }
+}
+
+/* for a description of osdmap incremental versions, and when they were
+ * introduced, please refer to
+ *    doc/dev/osd_internals/osdmap_versions.txt
+ */
 void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
 {
+  using ceph::encode;
   if ((features & CEPH_FEATURE_OSDMAP_ENC) == 0) {
     encode_classic(bl, features);
     return;
@@ -463,182 +509,219 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
   // OSDMaps.  others should be passing around the canonical encoded
   // buffers from on high.  select out those callers by passing in an
   // "impossible" feature bit.
-  assert(features & CEPH_FEATURE_RESERVED);
+  ceph_assert(features & CEPH_FEATURE_RESERVED);
   features &= ~CEPH_FEATURE_RESERVED;
 
   size_t start_offset = bl.length();
   size_t tail_offset;
-  buffer::list::iterator crc_it;
+  size_t crc_offset;
+  std::optional<buffer::list::contiguous_filler> crc_filler;
 
   // meta-encoding: how we include client-used and osd-specific data
   ENCODE_START(8, 7, bl);
 
   {
-    uint8_t v = 5;
+    uint8_t v = 8;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       v = 3;
+    } else if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
+      v = 5;
+    } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
+      v = 6;
     }
     ENCODE_START(v, 1, bl); // client-usable data
-    ::encode(fsid, bl);
-    ::encode(epoch, bl);
-    ::encode(modified, bl);
-    ::encode(new_pool_max, bl);
-    ::encode(new_flags, bl);
-    ::encode(fullmap, bl);
-    ::encode(crush, bl);
-
-    ::encode(new_max_osd, bl);
-    ::encode(new_pools, bl, features);
-    ::encode(new_pool_names, bl);
-    ::encode(old_pools, bl);
-    ::encode(new_up_client, bl, features);
+    encode(fsid, bl);
+    encode(epoch, bl);
+    encode(modified, bl);
+    encode(new_pool_max, bl);
+    encode(new_flags, bl);
+    encode(fullmap, bl);
+    encode(crush, bl);
+
+    encode(new_max_osd, bl);
+    encode(new_pools, bl, features);
+    encode(new_pool_names, bl);
+    encode(old_pools, bl);
+    if (v >= 7) {
+      encode(new_up_client, bl, features);
+    } else {
+      encode_addrvec_map_as_addr(new_up_client, bl, features);
+    }
     if (v >= 5) {
-      ::encode(new_state, bl);
+      encode(new_state, bl);
     } else {
       uint32_t n = new_state.size();
-      ::encode(n, bl);
+      encode(n, bl);
       for (auto p : new_state) {
-       ::encode(p.first, bl);
-       ::encode((uint8_t)p.second, bl);
+       encode(p.first, bl);
+       encode((uint8_t)p.second, bl);
       }
     }
-    ::encode(new_weight, bl);
-    ::encode(new_pg_temp, bl);
-    ::encode(new_primary_temp, bl);
-    ::encode(new_primary_affinity, bl);
-    ::encode(new_erasure_code_profiles, bl);
-    ::encode(old_erasure_code_profiles, bl);
+    encode(new_weight, bl);
+    encode(new_pg_temp, bl);
+    encode(new_primary_temp, bl);
+    encode(new_primary_affinity, bl);
+    encode(new_erasure_code_profiles, bl);
+    encode(old_erasure_code_profiles, bl);
     if (v >= 4) {
-      ::encode(new_pg_upmap, bl);
-      ::encode(old_pg_upmap, bl);
-      ::encode(new_pg_upmap_items, bl);
-      ::encode(old_pg_upmap_items, bl);
+      encode(new_pg_upmap, bl);
+      encode(old_pg_upmap, bl);
+      encode(new_pg_upmap_items, bl);
+      encode(old_pg_upmap_items, bl);
+    }
+    if (v >= 6) {
+      encode(new_removed_snaps, bl);
+      encode(new_purged_snaps, bl);
+    }
+    if (v >= 8) {
+      encode(new_last_up_change, bl);
+      encode(new_last_in_change, bl);
     }
     ENCODE_FINISH(bl); // client-usable data
   }
 
   {
-    uint8_t target_v = 6;
+    uint8_t target_v = 9;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       target_v = 2;
+    } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
+      target_v = 6;
     }
     ENCODE_START(target_v, 1, bl); // extended, osd-only data
-    ::encode(new_hb_back_up, bl, features);
-    ::encode(new_up_thru, bl);
-    ::encode(new_last_clean_interval, bl);
-    ::encode(new_lost, bl);
-    ::encode(new_blacklist, bl, features);
-    ::encode(old_blacklist, bl, features);
-    ::encode(new_up_cluster, bl, features);
-    ::encode(cluster_snapshot, bl);
-    ::encode(new_uuid, bl);
-    ::encode(new_xinfo, bl);
-    ::encode(new_hb_front_up, bl, features);
-    ::encode(features, bl);         // NOTE: features arg, not the member
+    if (target_v < 7) {
+      encode_addrvec_map_as_addr(new_hb_back_up, bl, features);
+    } else {
+      encode(new_hb_back_up, bl, features);
+    }
+    encode(new_up_thru, bl);
+    encode(new_last_clean_interval, bl);
+    encode(new_lost, bl);
+    encode(new_blacklist, bl, features);
+    encode(old_blacklist, bl, features);
+    if (target_v < 7) {
+      encode_addrvec_map_as_addr(new_up_cluster, bl, features);
+    } else {
+      encode(new_up_cluster, bl, features);
+    }
+    encode(cluster_snapshot, bl);
+    encode(new_uuid, bl);
+    encode(new_xinfo, bl);
+    if (target_v < 7) {
+      encode_addrvec_map_as_addr(new_hb_front_up, bl, features);
+    } else {
+      encode(new_hb_front_up, bl, features);
+    }
+    encode(features, bl);         // NOTE: features arg, not the member
     if (target_v >= 3) {
-      ::encode(new_nearfull_ratio, bl);
-      ::encode(new_full_ratio, bl);
-      ::encode(new_backfillfull_ratio, bl);
+      encode(new_nearfull_ratio, bl);
+      encode(new_full_ratio, bl);
+      encode(new_backfillfull_ratio, bl);
     }
     // 5 was string-based new_require_min_compat_client
     if (target_v >= 6) {
-      ::encode(new_require_min_compat_client, bl);
-      ::encode(new_require_osd_release, bl);
+      encode(new_require_min_compat_client, bl);
+      encode(new_require_osd_release, bl);
+    }
+    if (target_v >= 8) {
+      encode(new_crush_node_flags, bl);
+    }
+    if (target_v >= 9) {
+      encode(new_device_class_flags, bl);
     }
     ENCODE_FINISH(bl); // osd-only data
   }
 
-  ::encode((uint32_t)0, bl); // dummy inc_crc
-  crc_it = bl.end();
-  crc_it.advance(-4);
+  crc_offset = bl.length();
+  crc_filler = bl.append_hole(sizeof(uint32_t));
   tail_offset = bl.length();
 
-  ::encode(full_crc, bl);
+  encode(full_crc, bl);
 
   ENCODE_FINISH(bl); // meta-encoding wrapper
 
   // fill in crc
   bufferlist front;
-  front.substr_of(bl, start_offset, crc_it.get_off() - start_offset);
+  front.substr_of(bl, start_offset, crc_offset - start_offset);
   inc_crc = front.crc32c(-1);
   bufferlist tail;
   tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
   inc_crc = tail.crc32c(inc_crc);
   ceph_le32 crc_le;
   crc_le = inc_crc;
-  crc_it.copy_in(4, (char*)&crc_le);
+  crc_filler->copy_in(4u, (char*)&crc_le);
   have_crc = true;
 }
 
-void OSDMap::Incremental::decode_classic(bufferlist::iterator &p)
+void OSDMap::Incremental::decode_classic(bufferlist::const_iterator &p)
 {
+  using ceph::decode;
   __u32 n, t;
   // base
   __u16 v;
-  ::decode(v, p);
-  ::decode(fsid, p);
-  ::decode(epoch, p);
-  ::decode(modified, p);
+  decode(v, p);
+  decode(fsid, p);
+  decode(epoch, p);
+  decode(modified, p);
   if (v == 4 || v == 5) {
-    ::decode(n, p);
+    decode(n, p);
     new_pool_max = n;
   } else if (v >= 6)
-    ::decode(new_pool_max, p);
-  ::decode(new_flags, p);
-  ::decode(fullmap, p);
-  ::decode(crush, p);
+    decode(new_pool_max, p);
+  decode(new_flags, p);
+  decode(fullmap, p);
+  decode(crush, p);
 
-  ::decode(new_max_osd, p);
+  decode(new_max_osd, p);
   if (v < 6) {
     new_pools.clear();
-    ::decode(n, p);
+    decode(n, p);
     while (n--) {
-      ::decode(t, p);
-      ::decode(new_pools[t], p);
+      decode(t, p);
+      decode(new_pools[t], p);
     }
   } else {
-    ::decode(new_pools, p);
+    decode(new_pools, p);
   }
   if (v == 5) {
     new_pool_names.clear();
-    ::decode(n, p);
+    decode(n, p);
     while (n--) {
-      ::decode(t, p);
-      ::decode(new_pool_names[t], p);
+      decode(t, p);
+      decode(new_pool_names[t], p);
     }
   } else if (v >= 6) {
-    ::decode(new_pool_names, p);
+    decode(new_pool_names, p);
   }
   if (v < 6) {
     old_pools.clear();
-    ::decode(n, p);
+    decode(n, p);
     while (n--) {
-      ::decode(t, p);
+      decode(t, p);
       old_pools.insert(t);
     }
   } else {
-    ::decode(old_pools, p);
+    decode(old_pools, p);
   }
-  ::decode(new_up_client, p);
+  decode(new_up_client, p);
   {
     map<int32_t,uint8_t> ns;
-    ::decode(ns, p);
+    decode(ns, p);
     for (auto q : ns) {
       new_state[q.first] = q.second;
     }
   }
-  ::decode(new_weight, p);
+  decode(new_weight, p);
 
   if (v < 6) {
     new_pg_temp.clear();
-    ::decode(n, p);
+    decode(n, p);
     while (n--) {
       old_pg_t opg;
       ::decode_raw(opg, p);
-      ::decode(new_pg_temp[pg_t(opg)], p);
+      decode(new_pg_temp[pg_t(opg)], p);
     }
   } else {
-    ::decode(new_pg_temp, p);
+    decode(new_pg_temp, p);
   }
 
   // decode short map, too.
@@ -648,29 +731,34 @@ void OSDMap::Incremental::decode_classic(bufferlist::iterator &p)
   // extended
   __u16 ev = 0;
   if (v >= 5)
-    ::decode(ev, p);
-  ::decode(new_hb_back_up, p);
+    decode(ev, p);
+  decode(new_hb_back_up, p);
   if (v < 5)
-    ::decode(new_pool_names, p);
-  ::decode(new_up_thru, p);
-  ::decode(new_last_clean_interval, p);
-  ::decode(new_lost, p);
-  ::decode(new_blacklist, p);
-  ::decode(old_blacklist, p);
+    decode(new_pool_names, p);
+  decode(new_up_thru, p);
+  decode(new_last_clean_interval, p);
+  decode(new_lost, p);
+  decode(new_blacklist, p);
+  decode(old_blacklist, p);
   if (ev >= 6)
-    ::decode(new_up_cluster, p);
+    decode(new_up_cluster, p);
   if (ev >= 7)
-    ::decode(cluster_snapshot, p);
+    decode(cluster_snapshot, p);
   if (ev >= 8)
-    ::decode(new_uuid, p);
+    decode(new_uuid, p);
   if (ev >= 9)
-    ::decode(new_xinfo, p);
+    decode(new_xinfo, p);
   if (ev >= 10)
-    ::decode(new_hb_front_up, p);
+    decode(new_hb_front_up, p);
 }
 
-void OSDMap::Incremental::decode(bufferlist::iterator& bl)
+/* for a description of osdmap incremental versions, and when they were
+ * introduced, please refer to
+ *    doc/dev/osd_internals/osdmap_versions.txt
+ */
+void OSDMap::Incremental::decode(bufferlist::const_iterator& bl)
 {
+  using ceph::decode;
   /**
    * Older encodings of the Incremental had a single struct_v which
    * covered the whole encoding, and was prior to our modern
@@ -684,8 +772,7 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
 
   DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
   if (struct_v < 7) {
-    int struct_v_size = sizeof(struct_v);
-    bl.advance(-struct_v_size);
+    bl.seek(start_offset);
     decode_classic(bl);
     encode_features = 0;
     if (struct_v >= 6)
@@ -695,91 +782,99 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
     return;
   }
   {
-    DECODE_START(5, bl); // client-usable data
-    ::decode(fsid, bl);
-    ::decode(epoch, bl);
-    ::decode(modified, bl);
-    ::decode(new_pool_max, bl);
-    ::decode(new_flags, bl);
-    ::decode(fullmap, bl);
-    ::decode(crush, bl);
-
-    ::decode(new_max_osd, bl);
-    ::decode(new_pools, bl);
-    ::decode(new_pool_names, bl);
-    ::decode(old_pools, bl);
-    ::decode(new_up_client, bl);
+    DECODE_START(8, bl); // client-usable data
+    decode(fsid, bl);
+    decode(epoch, bl);
+    decode(modified, bl);
+    decode(new_pool_max, bl);
+    decode(new_flags, bl);
+    decode(fullmap, bl);
+    decode(crush, bl);
+
+    decode(new_max_osd, bl);
+    decode(new_pools, bl);
+    decode(new_pool_names, bl);
+    decode(old_pools, bl);
+    decode(new_up_client, bl);
     if (struct_v >= 5) {
-      ::decode(new_state, bl);
+      decode(new_state, bl);
     } else {
       map<int32_t,uint8_t> ns;
-      ::decode(ns, bl);
+      decode(ns, bl);
       for (auto q : ns) {
        new_state[q.first] = q.second;
       }
     }
-    ::decode(new_weight, bl);
-    ::decode(new_pg_temp, bl);
-    ::decode(new_primary_temp, bl);
+    decode(new_weight, bl);
+    decode(new_pg_temp, bl);
+    decode(new_primary_temp, bl);
     if (struct_v >= 2)
-      ::decode(new_primary_affinity, bl);
+      decode(new_primary_affinity, bl);
     else
       new_primary_affinity.clear();
     if (struct_v >= 3) {
-      ::decode(new_erasure_code_profiles, bl);
-      ::decode(old_erasure_code_profiles, bl);
+      decode(new_erasure_code_profiles, bl);
+      decode(old_erasure_code_profiles, bl);
     } else {
       new_erasure_code_profiles.clear();
       old_erasure_code_profiles.clear();
     }
     if (struct_v >= 4) {
-      ::decode(new_pg_upmap, bl);
-      ::decode(old_pg_upmap, bl);
-      ::decode(new_pg_upmap_items, bl);
-      ::decode(old_pg_upmap_items, bl);
+      decode(new_pg_upmap, bl);
+      decode(old_pg_upmap, bl);
+      decode(new_pg_upmap_items, bl);
+      decode(old_pg_upmap_items, bl);
+    }
+    if (struct_v >= 6) {
+      decode(new_removed_snaps, bl);
+      decode(new_purged_snaps, bl);
+    }
+    if (struct_v >= 8) {
+      decode(new_last_up_change, bl);
+      decode(new_last_in_change, bl);
     }
     DECODE_FINISH(bl); // client-usable data
   }
 
   {
-    DECODE_START(6, bl); // extended, osd-only data
-    ::decode(new_hb_back_up, bl);
-    ::decode(new_up_thru, bl);
-    ::decode(new_last_clean_interval, bl);
-    ::decode(new_lost, bl);
-    ::decode(new_blacklist, bl);
-    ::decode(old_blacklist, bl);
-    ::decode(new_up_cluster, bl);
-    ::decode(cluster_snapshot, bl);
-    ::decode(new_uuid, bl);
-    ::decode(new_xinfo, bl);
-    ::decode(new_hb_front_up, bl);
+    DECODE_START(9, bl); // extended, osd-only data
+    decode(new_hb_back_up, bl);
+    decode(new_up_thru, bl);
+    decode(new_last_clean_interval, bl);
+    decode(new_lost, bl);
+    decode(new_blacklist, bl);
+    decode(old_blacklist, bl);
+    decode(new_up_cluster, bl);
+    decode(cluster_snapshot, bl);
+    decode(new_uuid, bl);
+    decode(new_xinfo, bl);
+    decode(new_hb_front_up, bl);
     if (struct_v >= 2)
-      ::decode(encode_features, bl);
+      decode(encode_features, bl);
     else
       encode_features = CEPH_FEATURE_PGID64 | CEPH_FEATURE_OSDMAP_ENC;
     if (struct_v >= 3) {
-      ::decode(new_nearfull_ratio, bl);
-      ::decode(new_full_ratio, bl);
+      decode(new_nearfull_ratio, bl);
+      decode(new_full_ratio, bl);
     } else {
       new_nearfull_ratio = -1;
       new_full_ratio = -1;
     }
     if (struct_v >= 4) {
-      ::decode(new_backfillfull_ratio, bl);
+      decode(new_backfillfull_ratio, bl);
     } else {
       new_backfillfull_ratio = -1;
     }
     if (struct_v == 5) {
       string r;
-      ::decode(r, bl);
+      decode(r, bl);
       if (r.length()) {
        new_require_min_compat_client = ceph_release_from_name(r.c_str());
       }
     }
     if (struct_v >= 6) {
-      ::decode(new_require_min_compat_client, bl);
-      ::decode(new_require_osd_release, bl);
+      decode(new_require_min_compat_client, bl);
+      decode(new_require_osd_release, bl);
     } else {
       if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
        // only for compat with post-kraken pre-luminous test clusters
@@ -793,15 +888,21 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
        new_require_osd_release = -1;
       }
     }
+    if (struct_v >= 8) {
+      decode(new_crush_node_flags, bl);
+    }
+    if (struct_v >= 9) {
+      decode(new_device_class_flags, bl);
+    }
     DECODE_FINISH(bl); // osd-only data
   }
 
   if (struct_v >= 8) {
     have_crc = true;
     crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
-    ::decode(inc_crc, bl);
+    decode(inc_crc, bl);
     tail_offset = bl.get_off();
-    ::decode(full_crc, bl);
+    decode(full_crc, bl);
   } else {
     have_crc = false;
     full_crc = 0;
@@ -832,6 +933,8 @@ void OSDMap::Incremental::dump(Formatter *f) const
   f->dump_int("epoch", epoch);
   f->dump_stream("fsid") << fsid;
   f->dump_stream("modified") << modified;
+  f->dump_stream("new_last_up_change") << new_last_up_change;
+  f->dump_stream("new_last_in_change") << new_last_in_change;
   f->dump_int("new_pool_max", new_pool_max);
   f->dump_int("new_flags", new_flags);
   f->dump_float("new_full_ratio", new_full_ratio);
@@ -844,7 +947,7 @@ void OSDMap::Incremental::dump(Formatter *f) const
     f->open_object_section("full_map");
     OSDMap full;
     bufferlist fbl = fullmap;  // kludge around constness.
-    auto p = fbl.begin();
+    auto p = fbl.cbegin();
     full.decode(p);
     full.dump(f);
     f->close_section();
@@ -853,7 +956,7 @@ void OSDMap::Incremental::dump(Formatter *f) const
     f->open_object_section("crush");
     CrushWrapper c;
     bufferlist tbl = crush;  // kludge around constness.
-    auto p = tbl.begin();
+    auto p = tbl.cbegin();
     c.decode(p);
     c.dump(f);
     f->close_section();
@@ -890,12 +993,21 @@ void OSDMap::Incremental::dump(Formatter *f) const
   for (const auto &upclient : new_up_client) {
     f->open_object_section("osd");
     f->dump_int("osd", upclient.first);
-    f->dump_stream("public_addr") << upclient.second;
-    f->dump_stream("cluster_addr") << new_up_cluster.find(upclient.first)->second;
-    f->dump_stream("heartbeat_back_addr") << new_hb_back_up.find(upclient.first)->second;
-    map<int32_t, entity_addr_t>::const_iterator q;
-    if ((q = new_hb_front_up.find(upclient.first)) != new_hb_front_up.end())
-      f->dump_stream("heartbeat_front_addr") << q->second;
+    f->dump_stream("public_addr") << upclient.second.legacy_addr();
+    f->dump_object("public_addrs", upclient.second);
+    if (auto p = new_up_cluster.find(upclient.first);
+       p != new_up_cluster.end()) {
+      f->dump_stream("cluster_addr") << p->second.legacy_addr();
+      f->dump_object("cluster_addrs", p->second);
+    }
+    if (auto p = new_hb_back_up.find(upclient.first);
+       p != new_hb_back_up.end()) {
+      f->dump_object("heartbeat_back_addrs", p->second);
+    }
+    if (auto p = new_hb_front_up.find(upclient.first);
+       p != new_hb_front_up.end()) {
+      f->dump_object("heartbeat_front_addrs", p->second);
+    }
     f->close_section();
   }
   f->close_section();
@@ -1055,6 +1167,61 @@ void OSDMap::Incremental::dump(Formatter *f) const
     f->dump_string("old", erasure_code_profile.c_str());
   }
   f->close_section();
+
+  f->open_array_section("new_removed_snaps");
+  for (auto& p : new_removed_snaps) {
+    f->open_object_section("pool");
+    f->dump_int("pool", p.first);
+    f->open_array_section("snaps");
+    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
+      f->open_object_section("interval");
+      f->dump_unsigned("begin", q.get_start());
+      f->dump_unsigned("length", q.get_len());
+      f->close_section();
+    }
+    f->close_section();
+    f->close_section();
+  }
+  f->close_section();
+  f->open_array_section("new_purged_snaps");
+  for (auto& p : new_purged_snaps) {
+    f->open_object_section("pool");
+    f->dump_int("pool", p.first);
+    f->open_array_section("snaps");
+    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
+      f->open_object_section("interval");
+      f->dump_unsigned("begin", q.get_start());
+      f->dump_unsigned("length", q.get_len());
+      f->close_section();
+    }
+    f->close_section();
+    f->close_section();
+  }
+  f->open_array_section("new_crush_node_flags");
+  for (auto& i : new_crush_node_flags) {
+    f->open_object_section("node");
+    f->dump_int("id", i.first);
+    set<string> st;
+    calc_state_set(i.second, st);
+    for (auto& j : st) {
+      f->dump_string("flag", j);
+    }
+    f->close_section();
+  }
+  f->close_section();
+  f->open_array_section("new_device_class_flags");
+  for (auto& i : new_device_class_flags) {
+    f->open_object_section("device_class");
+    f->dump_int("id", i.first);
+    set<string> st;
+    calc_state_set(i.second, st);
+    for (auto& j : st) {
+      f->dump_string("flag", j);
+    }
+    f->close_section();
+  }
+  f->close_section();
+  f->close_section();
 }
 
 void OSDMap::Incremental::generate_test_instances(list<Incremental*>& o)
@@ -1072,21 +1239,45 @@ void OSDMap::set_epoch(epoch_t e)
     pool.second.last_change = e;
 }
 
-bool OSDMap::is_blacklisted(const entity_addr_t& a) const
+bool OSDMap::is_blacklisted(const entity_addr_t& orig) const
 {
-  if (blacklist.empty())
+  if (blacklist.empty()) {
     return false;
+  }
+
+  // all blacklist entries are type ANY for nautilus+
+  // FIXME: avoid this copy!
+  entity_addr_t a = orig;
+  if (require_osd_release < CEPH_RELEASE_NAUTILUS) {
+    a.set_type(entity_addr_t::TYPE_LEGACY);
+  } else {
+    a.set_type(entity_addr_t::TYPE_ANY);
+  }
 
   // this specific instance?
-  if (blacklist.count(a))
+  if (blacklist.count(a)) {
     return true;
+  }
 
   // is entire ip blacklisted?
   if (a.is_ip()) {
-    entity_addr_t b = a;
-    b.set_port(0);
-    b.set_nonce(0);
-    if (blacklist.count(b)) {
+    a.set_port(0);
+    a.set_nonce(0);
+    if (blacklist.count(a)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+bool OSDMap::is_blacklisted(const entity_addrvec_t& av) const
+{
+  if (blacklist.empty())
+    return false;
+
+  for (auto& a : av.v) {
+    if (is_blacklisted(a)) {
       return true;
     }
   }
@@ -1118,10 +1309,10 @@ void OSDMap::set_max_osd(int m)
   }
   osd_info.resize(m);
   osd_xinfo.resize(m);
-  osd_addrs->client_addr.resize(m);
-  osd_addrs->cluster_addr.resize(m);
-  osd_addrs->hb_back_addr.resize(m);
-  osd_addrs->hb_front_addr.resize(m);
+  osd_addrs->client_addrs.resize(m);
+  osd_addrs->cluster_addrs.resize(m);
+  osd_addrs->hb_back_addrs.resize(m);
+  osd_addrs->hb_front_addrs.resize(m);
   osd_uuid->resize(m);
   if (osd_primary_affinity)
     osd_primary_affinity->resize(m, CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
@@ -1153,9 +1344,9 @@ void OSDMap::get_full_pools(CephContext *cct,
                             set<int64_t> *backfillfull,
                             set<int64_t> *nearfull) const
 {
-  assert(full);
-  assert(backfillfull);
-  assert(nearfull);
+  ceph_assert(full);
+  ceph_assert(backfillfull);
+  ceph_assert(nearfull);
   full->clear();
   backfillfull->clear();
   nearfull->clear();
@@ -1185,43 +1376,6 @@ void OSDMap::get_full_pools(CephContext *cct,
   }
 }
 
-static bool get_osd_utilization(
-  const mempool::pgmap::unordered_map<int32_t,osd_stat_t> &osd_stat,
-  int id, int64_t* kb, int64_t* kb_used, int64_t* kb_avail)
-{
-  auto p = osd_stat.find(id);
-  if (p == osd_stat.end())
-    return false;
-  *kb = p->second.kb;
-  *kb_used = p->second.kb_used;
-  *kb_avail = p->second.kb_avail;
-  return *kb > 0;
-}
-
-void OSDMap::get_full_osd_util(
-  const mempool::pgmap::unordered_map<int32_t,osd_stat_t> &osd_stat,
-  map<int, float> *full, map<int, float> *backfill, map<int, float> *nearfull) const
-{
-  full->clear();
-  backfill->clear();
-  nearfull->clear();
-  for (int i = 0; i < max_osd; ++i) {
-    if (exists(i) && is_up(i) && is_in(i)) {
-      int64_t kb, kb_used, kb_avail;
-      if (osd_state[i] & CEPH_OSD_FULL) {
-        if (get_osd_utilization(osd_stat, i, &kb, &kb_used, &kb_avail))
-         full->emplace(i, (float)kb_used / (float)kb);
-      } else if (osd_state[i] & CEPH_OSD_BACKFILLFULL) {
-        if (get_osd_utilization(osd_stat, i, &kb, &kb_used, &kb_avail))
-         backfill->emplace(i, (float)kb_used / (float)kb);
-      } else if (osd_state[i] & CEPH_OSD_NEARFULL) {
-        if (get_osd_utilization(osd_stat, i, &kb, &kb_used, &kb_avail))
-         nearfull->emplace(i, (float)kb_used / (float)kb);
-      }
-    }
-  }
-}
-
 void OSDMap::get_full_osd_counts(set<int> *full, set<int> *backfill,
                                 set<int> *nearfull) const
 {
@@ -1255,14 +1409,23 @@ void OSDMap::get_up_osds(set<int32_t>& ls) const
   }
 }
 
-void OSDMap::get_out_osds(set<int32_t>& ls) const
+void OSDMap::get_out_existing_osds(set<int32_t>& ls) const
 {
   for (int i = 0; i < max_osd; i++) {
-    if (is_out(i))
+    if (exists(i) && get_weight(i) == CEPH_OSD_OUT)
       ls.insert(i);
   }
 }
 
+void OSDMap::get_flag_set(set<string> *flagset) const
+{
+  for (unsigned i = 0; i < sizeof(flags) * 8; ++i) {
+    if (flags & (1<<i)) {
+      flagset->insert(get_flag_string(flags & (1<<i)));
+    }
+  }
+}
+
 void OSDMap::calc_state_set(int state, set<string>& st)
 {
   unsigned t = state;
@@ -1290,7 +1453,8 @@ void OSDMap::adjust_osd_weights(const map<int,double>& weights, Incremental& inc
 int OSDMap::identify_osd(const entity_addr_t& addr) const
 {
   for (int i=0; i<max_osd; i++)
-    if (exists(i) && (get_addr(i) == addr || get_cluster_addr(i) == addr))
+    if (exists(i) && (get_addrs(i).contains(addr) ||
+                     get_cluster_addrs(i).contains(addr)))
       return i;
   return -1;
 }
@@ -1306,8 +1470,10 @@ int OSDMap::identify_osd(const uuid_d& u) const
 int OSDMap::identify_osd_on_all_channels(const entity_addr_t& addr) const
 {
   for (int i=0; i<max_osd; i++)
-    if (exists(i) && (get_addr(i) == addr || get_cluster_addr(i) == addr ||
-       get_hb_back_addr(i) == addr || get_hb_front_addr(i) == addr))
+    if (exists(i) && (get_addrs(i).contains(addr) ||
+                     get_cluster_addrs(i).contains(addr) ||
+                     get_hb_back_addrs(i).contains(addr) ||
+                     get_hb_front_addrs(i).contains(addr)))
       return i;
   return -1;
 }
@@ -1315,7 +1481,8 @@ int OSDMap::identify_osd_on_all_channels(const entity_addr_t& addr) const
 int OSDMap::find_osd_on_ip(const entity_addr_t& ip) const
 {
   for (int i=0; i<max_osd; i++)
-    if (exists(i) && (get_addr(i).is_same_host(ip) || get_cluster_addr(i).is_same_host(ip)))
+    if (exists(i) && (get_addrs(i).is_same_host(ip) ||
+                     get_cluster_addrs(i).is_same_host(ip)))
       return i;
   return -1;
 }
@@ -1349,10 +1516,6 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
     if (pool.second.has_flag(pg_pool_t::FLAG_HASHPSPOOL)) {
       features |= CEPH_FEATURE_OSDHASHPSPOOL;
     }
-    if (pool.second.is_erasure() &&
-       entity_type != CEPH_ENTITY_TYPE_CLIENT) { // not for clients
-      features |= CEPH_FEATURE_OSD_ERASURE_CODES;
-    }
     if (!pool.second.tiers.empty() ||
        pool.second.is_tier()) {
       features |= CEPH_FEATURE_OSD_CACHEPOOL;
@@ -1369,21 +1532,7 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
        features |= CEPH_FEATURE_CRUSH_TUNABLES5;
     }
   }
-  if (entity_type == CEPH_ENTITY_TYPE_OSD) {
-    for (auto &erasure_code_profile : erasure_code_profiles) {
-      auto& profile = erasure_code_profile.second;
-      const auto& plugin = profile.find("plugin");
-      if (plugin != profile.end()) {
-       if (plugin->second == "isa" || plugin->second == "lrc")
-         features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
-       if (plugin->second == "shec")
-         features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
-      }
-    }
-  }
   mask |= CEPH_FEATURE_OSDHASHPSPOOL | CEPH_FEATURE_OSD_CACHEPOOL;
-  if (entity_type != CEPH_ENTITY_TYPE_CLIENT)
-    mask |= CEPH_FEATURE_OSD_ERASURE_CODES;
 
   if (osd_primary_affinity) {
     for (int i = 0; i < max_osd; ++i) {
@@ -1410,6 +1559,17 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
     mask |= kraken_features;
   }
 
+  if (require_min_compat_client >= CEPH_RELEASE_NAUTILUS) {
+    // if min_compat_client is >= nautilus, require v2 cephx signatures
+    // from everyone
+    features |= CEPH_FEATUREMASK_CEPHX_V2;
+  } else if (require_osd_release >= CEPH_RELEASE_NAUTILUS &&
+            entity_type == CEPH_ENTITY_TYPE_OSD) {
+    // if osds are >= nautilus, at least require the signatures from them
+    features |= CEPH_FEATUREMASK_CEPHX_V2;
+  }
+  mask |= CEPH_FEATUREMASK_CEPHX_V2;
+
   if (pmask)
     *pmask = mask;
   return features;
@@ -1431,7 +1591,6 @@ uint8_t OSDMap::get_min_compat_client() const
   }
   if (HAVE_FEATURE(f, OSD_PRIMARY_AFFINITY) || // v0.76-553-gf825624
       HAVE_FEATURE(f, CRUSH_TUNABLES3) ||      // v0.76-395-ge20a55d
-      HAVE_FEATURE(f, OSD_ERASURE_CODES) ||    // v0.73-498-gbfc86a8
       HAVE_FEATURE(f, OSD_CACHEPOOL)) {        // v0.67-401-gb91c1c5
     return CEPH_RELEASE_FIREFLY;   // v0.80.0
   }
@@ -1445,6 +1604,11 @@ uint8_t OSDMap::get_min_compat_client() const
   return CEPH_RELEASE_ARGONAUT;    // v0.48argonaut-206-g6f381af
 }
 
+uint8_t OSDMap::get_require_min_compat_client() const
+{
+  return require_min_compat_client;
+}
+
 void OSDMap::_calc_up_osd_features()
 {
   bool first = true;
@@ -1471,6 +1635,7 @@ uint64_t OSDMap::get_up_osd_features() const
 
 void OSDMap::dedup(const OSDMap *o, OSDMap *n)
 {
+  using ceph::encode;
   if (o->epoch == n->epoch)
     return;
 
@@ -1480,24 +1645,24 @@ void OSDMap::dedup(const OSDMap *o, OSDMap *n)
   if (o->max_osd != n->max_osd)
     diff++;
   for (int i = 0; i < o->max_osd && i < n->max_osd; i++) {
-    if ( n->osd_addrs->client_addr[i] &&  o->osd_addrs->client_addr[i] &&
-       *n->osd_addrs->client_addr[i] == *o->osd_addrs->client_addr[i])
-      n->osd_addrs->client_addr[i] = o->osd_addrs->client_addr[i];
+    if ( n->osd_addrs->client_addrs[i] &&  o->osd_addrs->client_addrs[i] &&
+       *n->osd_addrs->client_addrs[i] == *o->osd_addrs->client_addrs[i])
+      n->osd_addrs->client_addrs[i] = o->osd_addrs->client_addrs[i];
     else
       diff++;
-    if ( n->osd_addrs->cluster_addr[i] &&  o->osd_addrs->cluster_addr[i] &&
-       *n->osd_addrs->cluster_addr[i] == *o->osd_addrs->cluster_addr[i])
-      n->osd_addrs->cluster_addr[i] = o->osd_addrs->cluster_addr[i];
+    if ( n->osd_addrs->cluster_addrs[i] &&  o->osd_addrs->cluster_addrs[i] &&
+       *n->osd_addrs->cluster_addrs[i] == *o->osd_addrs->cluster_addrs[i])
+      n->osd_addrs->cluster_addrs[i] = o->osd_addrs->cluster_addrs[i];
     else
       diff++;
-    if ( n->osd_addrs->hb_back_addr[i] &&  o->osd_addrs->hb_back_addr[i] &&
-       *n->osd_addrs->hb_back_addr[i] == *o->osd_addrs->hb_back_addr[i])
-      n->osd_addrs->hb_back_addr[i] = o->osd_addrs->hb_back_addr[i];
+    if ( n->osd_addrs->hb_back_addrs[i] &&  o->osd_addrs->hb_back_addrs[i] &&
+       *n->osd_addrs->hb_back_addrs[i] == *o->osd_addrs->hb_back_addrs[i])
+      n->osd_addrs->hb_back_addrs[i] = o->osd_addrs->hb_back_addrs[i];
     else
       diff++;
-    if ( n->osd_addrs->hb_front_addr[i] &&  o->osd_addrs->hb_front_addr[i] &&
-       *n->osd_addrs->hb_front_addr[i] == *o->osd_addrs->hb_front_addr[i])
-      n->osd_addrs->hb_front_addr[i] = o->osd_addrs->hb_front_addr[i];
+    if ( n->osd_addrs->hb_front_addrs[i] &&  o->osd_addrs->hb_front_addrs[i] &&
+       *n->osd_addrs->hb_front_addrs[i] == *o->osd_addrs->hb_front_addrs[i])
+      n->osd_addrs->hb_front_addrs[i] = o->osd_addrs->hb_front_addrs[i];
     else
       diff++;
   }
@@ -1508,8 +1673,8 @@ void OSDMap::dedup(const OSDMap *o, OSDMap *n)
 
   // does crush match?
   bufferlist oc, nc;
-  ::encode(*o->crush, oc, CEPH_FEATURES_SUPPORTED_DEFAULT);
-  ::encode(*n->crush, nc, CEPH_FEATURES_SUPPORTED_DEFAULT);
+  encode(*o->crush, oc, CEPH_FEATURES_SUPPORTED_DEFAULT);
+  encode(*n->crush, nc, CEPH_FEATURES_SUPPORTED_DEFAULT);
   if (oc.contents_equal(nc)) {
     n->crush = o->crush;
   }
@@ -1531,18 +1696,17 @@ void OSDMap::dedup(const OSDMap *o, OSDMap *n)
 }
 
 void OSDMap::clean_temps(CephContext *cct,
-                        const OSDMap& osdmap, Incremental *pending_inc)
+                        const OSDMap& oldmap,
+                        const OSDMap& nextmap,
+                        Incremental *pending_inc)
 {
   ldout(cct, 10) << __func__ << dendl;
-  OSDMap tmpmap;
-  tmpmap.deepish_copy_from(osdmap);
-  tmpmap.apply_incremental(*pending_inc);
 
-  for (auto pg : *tmpmap.pg_temp) {
+  for (auto pg : *nextmap.pg_temp) {
     // if pool does not exist, remove any existing pg_temps associated with
     // it.  we don't care about pg_temps on the pending_inc either; if there
     // are new_pg_temp entries on the pending, clear them out just as well.
-    if (!osdmap.have_pg_pool(pg.first.pool())) {
+    if (!nextmap.have_pg_pool(pg.first.pool())) {
       ldout(cct, 10) << __func__ << " removing pg_temp " << pg.first
                     << " for nonexistent pool " << pg.first.pool() << dendl;
       pending_inc->new_pg_temp[pg.first].clear();
@@ -1551,7 +1715,7 @@ void OSDMap::clean_temps(CephContext *cct,
     // all osds down?
     unsigned num_up = 0;
     for (auto o : pg.second) {
-      if (!tmpmap.is_down(o)) {
+      if (!nextmap.is_down(o)) {
        ++num_up;
        break;
       }
@@ -1565,30 +1729,30 @@ void OSDMap::clean_temps(CephContext *cct,
     // redundant pg_temp?
     vector<int> raw_up;
     int primary;
-    tmpmap.pg_to_raw_up(pg.first, &raw_up, &primary);
+    nextmap.pg_to_raw_up(pg.first, &raw_up, &primary);
     bool remove = false;
-    if (vectors_equal(raw_up, pg.second)) {
+    if (raw_up == pg.second) {
       ldout(cct, 10) << __func__ << "  removing pg_temp " << pg.first << " "
                     << pg.second << " that matches raw_up mapping" << dendl;
       remove = true;
     }
     // oversized pg_temp?
-    if (pg.second.size() > tmpmap.get_pg_pool(pg.first.pool())->get_size()) {
+    if (pg.second.size() > nextmap.get_pg_pool(pg.first.pool())->get_size()) {
       ldout(cct, 10) << __func__ << "  removing pg_temp " << pg.first << " "
                     << pg.second << " exceeds pool size" << dendl;
       remove = true;
     }
     if (remove) {
-      if (osdmap.pg_temp->count(pg.first))
+      if (oldmap.pg_temp->count(pg.first))
        pending_inc->new_pg_temp[pg.first].clear();
       else
        pending_inc->new_pg_temp.erase(pg.first);
     }
   }
   
-  for (auto &pg : *tmpmap.primary_temp) {
+  for (auto &pg : *nextmap.primary_temp) {
     // primary down?
-    if (tmpmap.is_down(pg.second)) {
+    if (nextmap.is_down(pg.second)) {
       ldout(cct, 10) << __func__ << "  removing primary_temp " << pg.first
                     << " to down " << pg.second << dendl;
       pending_inc->new_primary_temp[pg.first] = -1;
@@ -1598,13 +1762,13 @@ void OSDMap::clean_temps(CephContext *cct,
     vector<int> real_up, templess_up;
     int real_primary, templess_primary;
     pg_t pgid = pg.first;
-    tmpmap.pg_to_acting_osds(pgid, &real_up, &real_primary);
-    tmpmap.pg_to_raw_up(pgid, &templess_up, &templess_primary);
+    nextmap.pg_to_acting_osds(pgid, &real_up, &real_primary);
+    nextmap.pg_to_raw_up(pgid, &templess_up, &templess_primary);
     if (real_primary == templess_primary){
       ldout(cct, 10) << __func__ << "  removing primary_temp "
                     << pgid << " -> " << real_primary
                     << " (unnecessary/redundant)" << dendl;
-      if (osdmap.primary_temp->count(pgid))
+      if (oldmap.primary_temp->count(pgid))
        pending_inc->new_primary_temp[pgid] = -1;
       else
        pending_inc->new_primary_temp.erase(pgid);
@@ -1612,134 +1776,183 @@ void OSDMap::clean_temps(CephContext *cct,
   }
 }
 
-void OSDMap::maybe_remove_pg_upmaps(CephContext *cct,
-                                    const OSDMap& osdmap,
-                                    Incremental *pending_inc)
+void OSDMap::get_upmap_pgs(vector<pg_t> *upmap_pgs) const
 {
-  ldout(cct, 10) << __func__ << dendl;
-  OSDMap tmpmap;
-  tmpmap.deepish_copy_from(osdmap);
-  tmpmap.apply_incremental(*pending_inc);
-  set<pg_t> to_check;
-  set<pg_t> to_cancel;
-  map<int, map<int, float>> rule_weight_map;
+  upmap_pgs->reserve(pg_upmap.size() + pg_upmap_items.size());
+  for (auto& p : pg_upmap)
+    upmap_pgs->push_back(p.first);
+  for (auto& p : pg_upmap_items)
+    upmap_pgs->push_back(p.first);
+}
 
-  for (auto& p : tmpmap.pg_upmap) {
-    to_check.insert(p.first);
-  }
-  for (auto& p : tmpmap.pg_upmap_items) {
-    to_check.insert(p.first);
-  }
-  for (auto& p : pending_inc->new_pg_upmap) {
-    to_check.insert(p.first);
-  }
-  for (auto& p : pending_inc->new_pg_upmap_items) {
-    to_check.insert(p.first);
-  }
+bool OSDMap::check_pg_upmaps(
+  CephContext *cct,
+  const vector<pg_t>& to_check,
+  vector<pg_t> *to_cancel,
+  map<pg_t, mempool::osdmap::vector<pair<int,int>>> *to_remap) const
+{
+  bool any_change = false;
+  map<int, map<int, float>> rule_weight_map;
   for (auto& pg : to_check) {
-    auto crush_rule = tmpmap.get_pg_pool_crush_rule(pg);
-    if (crush_rule < 0) {
-      lderr(cct) << __func__ << " unable to load crush-rule of pg "
-                 << pg << dendl;
+    const pg_pool_t *pi = get_pg_pool(pg.pool());
+    if (!pi || pg.ps() >= pi->get_pg_num_pending()) {
+      ldout(cct, 0) << __func__ << " pg " << pg << " is gone or merge source"
+                   << dendl;
+      to_cancel->push_back(pg);
+      continue;
+    }
+    if (pi->is_pending_merge(pg, nullptr)) {
+      ldout(cct, 0) << __func__ << " pg " << pg << " is pending merge"
+                   << dendl;
+      to_cancel->push_back(pg);
       continue;
     }
+    vector<int> raw, up;
+    pg_to_raw_upmap(pg, &raw, &up);
+    auto i = pg_upmap.find(pg);
+    if (i != pg_upmap.end() && raw == i->second) {
+      ldout(cct, 10) << " removing redundant pg_upmap "
+                     << i->first << " " << i->second
+                     << dendl;
+      to_cancel->push_back(pg);
+      continue;
+    }
+    auto j = pg_upmap_items.find(pg);
+    if (j != pg_upmap_items.end()) {
+      mempool::osdmap::vector<pair<int,int>> newmap;
+      for (auto& p : j->second) {
+        if (std::find(raw.begin(), raw.end(), p.first) == raw.end()) {
+          // cancel mapping if source osd does not exist anymore
+          continue;
+        }
+        if (p.second != CRUSH_ITEM_NONE && p.second < max_osd &&
+            p.second >= 0 && osd_weight[p.second] == 0) {
+          // cancel mapping if target osd is out
+          continue;
+        }
+        newmap.push_back(p);
+      }
+      if (newmap.empty()) {
+        ldout(cct, 10) << " removing no-op pg_upmap_items "
+                       << j->first << " " << j->second
+                       << dendl;
+        to_cancel->push_back(pg);
+        continue;
+      } else if (newmap != j->second) {
+        ldout(cct, 10) << " simplifying partially no-op pg_upmap_items "
+                       << j->first << " " << j->second
+                       << " -> " << newmap
+                       << dendl;
+        to_remap->insert({pg, newmap});
+        any_change = true;
+        continue;
+      }
+    }
+    auto crush_rule = get_pg_pool_crush_rule(pg);
+    auto r = crush->verify_upmap(cct,
+                                 crush_rule,
+                                 get_pg_pool_size(pg),
+                                 up);
+    if (r < 0) {
+      ldout(cct, 0) << __func__ << " verify_upmap of pg " << pg
+                    << " returning " << r
+                    << dendl;
+      to_cancel->push_back(pg);
+      continue;
+    }
+    // below we check against crush-topology changing..
     map<int, float> weight_map;
     auto it = rule_weight_map.find(crush_rule);
     if (it == rule_weight_map.end()) {
-      auto r = tmpmap.crush->get_rule_weight_osd_map(crush_rule, &weight_map);
+      auto r = crush->get_rule_weight_osd_map(crush_rule, &weight_map);
       if (r < 0) {
         lderr(cct) << __func__ << " unable to get crush weight_map for "
-                   << "crush_rule " << crush_rule << dendl;
+                   << "crush_rule " << crush_rule
+                   << dendl;
         continue;
       }
       rule_weight_map[crush_rule] = weight_map;
     } else {
       weight_map = it->second;
     }
-    auto type = tmpmap.crush->get_rule_failure_domain(crush_rule);
-    if (type < 0) {
-      lderr(cct) << __func__ << " unable to load failure-domain-type of pg "
-                 << pg << dendl;
-      continue;
-    }
     ldout(cct, 10) << __func__ << " pg " << pg
-                   << " crush-rule-id " << crush_rule
                    << " weight_map " << weight_map
-                   << " failure-domain-type " << type
                    << dendl;
-    vector<int> raw;
-    int primary;
-    tmpmap.pg_to_raw_up(pg, &raw, &primary);
-    set<int> parents;
-    for (auto osd : raw) {
-      if (type > 0) {
-        auto parent = tmpmap.crush->get_parent_of_type(osd, type, crush_rule);
-        if (parent >= 0) {
-          lderr(cct) << __func__ << " unable to get parent of raw osd."
-                     << osd << " of pg " << pg
-                     << dendl;
-          break;
-        }
-        auto r = parents.insert(parent);
-        if (!r.second) {
-          // two up-set osds come from same parent
-          to_cancel.insert(pg);
-          break;
-        }
-      }
-      // the above check validates collision only
-      // below we continue to check against crush-topology changing..
+    for (auto osd : up) {
       auto it = weight_map.find(osd);
       if (it == weight_map.end()) {
         // osd is gone or has been moved out of the specific crush-tree
-        to_cancel.insert(pg);
+        to_cancel->push_back(pg);
         break;
       }
-      auto adjusted_weight = tmpmap.get_weightf(it->first) * it->second;
+      auto adjusted_weight = get_weightf(it->first) * it->second;
       if (adjusted_weight == 0) {
         // osd is out/crush-out
-        to_cancel.insert(pg);
+        to_cancel->push_back(pg);
         break;
       }
     }
   }
+  any_change = any_change || !to_cancel->empty();
+  return any_change;
+}
+
+void OSDMap::clean_pg_upmaps(
+  CephContext *cct,
+  Incremental *pending_inc,
+  const vector<pg_t>& to_cancel,
+  const map<pg_t, mempool::osdmap::vector<pair<int,int>>>& to_remap) const
+{
   for (auto &pg: to_cancel) {
-    { // pg_upmap
-      auto it = pending_inc->new_pg_upmap.find(pg);
-      if (it != pending_inc->new_pg_upmap.end()) {
-        ldout(cct, 10) << __func__ << " cancel invalid pending "
-                       << "pg_upmap entry "
-                       << it->first << "->" << it->second
-                       << dendl;
-        pending_inc->new_pg_upmap.erase(it);
-      }
-      if (osdmap.pg_upmap.count(pg)) {
-        ldout(cct, 10) << __func__ << " cancel invalid pg_upmap entry "
-                       << osdmap.pg_upmap.find(pg)->first << "->"
-                       << osdmap.pg_upmap.find(pg)->second
-                       << dendl;
-        pending_inc->old_pg_upmap.insert(pg);
-      }
+    auto i = pending_inc->new_pg_upmap.find(pg);
+    if (i != pending_inc->new_pg_upmap.end()) {
+      ldout(cct, 10) << __func__ << " cancel invalid pending "
+                     << "pg_upmap entry "
+                     << i->first << "->" << i->second
+                     << dendl;
+      pending_inc->new_pg_upmap.erase(i);
     }
-    { // pg_upmap_items
-      auto it = pending_inc->new_pg_upmap_items.find(pg);
-      if (it != pending_inc->new_pg_upmap_items.end()) {
-        ldout(cct, 10) << __func__ << " cancel invalid pending "
-                       << "pg_upmap_items entry "
-                       << it->first << "->" << it->second
-                       << dendl;
-        pending_inc->new_pg_upmap_items.erase(it);
-      }
-      if (osdmap.pg_upmap_items.count(pg)) {
-        ldout(cct, 10) << __func__ << " cancel invalid "
-                       << "pg_upmap_items entry "
-                       << osdmap.pg_upmap_items.find(pg)->first << "->"
-                       << osdmap.pg_upmap_items.find(pg)->second
-                       << dendl;
-        pending_inc->old_pg_upmap_items.insert(pg);
-      }
+    auto j = pg_upmap.find(pg);
+    if (j != pg_upmap.end()) {
+      ldout(cct, 10) << __func__ << " cancel invalid pg_upmap entry "
+                     << j->first << "->" << j->second
+                     << dendl;
+      pending_inc->old_pg_upmap.insert(pg);
+    }
+    auto p = pending_inc->new_pg_upmap_items.find(pg);
+    if (p != pending_inc->new_pg_upmap_items.end()) {
+      ldout(cct, 10) << __func__ << " cancel invalid pending "
+                     << "pg_upmap_items entry "
+                     << p->first << "->" << p->second
+                     << dendl;
+      pending_inc->new_pg_upmap_items.erase(p);
+    }
+    auto q = pg_upmap_items.find(pg);
+    if (q != pg_upmap_items.end()) {
+      ldout(cct, 10) << __func__ << " cancel invalid "
+                     << "pg_upmap_items entry "
+                     << q->first << "->" << q->second
+                     << dendl;
+      pending_inc->old_pg_upmap_items.insert(pg);
     }
   }
+  for (auto& i : to_remap)
+    pending_inc->new_pg_upmap_items[i.first] = i.second;
+}
+
+bool OSDMap::clean_pg_upmaps(
+  CephContext *cct,
+  Incremental *pending_inc) const
+{
+  ldout(cct, 10) << __func__ << dendl;
+  vector<pg_t> to_check;
+  vector<pg_t> to_cancel;
+  map<pg_t, mempool::osdmap::vector<pair<int,int>>> to_remap;
+
+  get_upmap_pgs(&to_check);
+  auto any_change = check_pg_upmaps(cct, to_check, &to_cancel, &to_remap);
+  clean_pg_upmaps(cct, pending_inc, to_cancel, to_remap);
+  return any_change;
 }
 
 int OSDMap::apply_incremental(const Incremental &inc)
@@ -1750,7 +1963,7 @@ int OSDMap::apply_incremental(const Incremental &inc)
   else if (inc.fsid != fsid)
     return -EINVAL;
   
-  assert(inc.epoch == epoch+1);
+  ceph_assert(inc.epoch == epoch+1);
 
   epoch++;
   modified = inc.modified;
@@ -1791,6 +2004,31 @@ int OSDMap::apply_incremental(const Incremental &inc)
     pools[pool.first].last_change = epoch;
   }
 
+  new_removed_snaps = inc.new_removed_snaps;
+  new_purged_snaps = inc.new_purged_snaps;
+  for (auto p = new_removed_snaps.begin();
+       p != new_removed_snaps.end();
+       ++p) {
+    removed_snaps_queue[p->first].union_of(p->second);
+  }
+  for (auto p = new_purged_snaps.begin();
+       p != new_purged_snaps.end();
+       ++p) {
+    auto q = removed_snaps_queue.find(p->first);
+    ceph_assert(q != removed_snaps_queue.end());
+    q->second.subtract(p->second);
+    if (q->second.empty()) {
+      removed_snaps_queue.erase(q);
+    }
+  }
+
+  if (inc.new_last_up_change != utime_t()) {
+    last_up_change = inc.new_last_up_change;
+  }
+  if (inc.new_last_in_change != utime_t()) {
+    last_in_change = inc.new_last_in_change;
+  }
+
   for (const auto &pname : inc.new_pool_names) {
     auto pool_name_entry = pool_name.find(pname.first);
     if (pool_name_entry != pool_name.end()) {
@@ -1847,10 +2085,10 @@ int OSDMap::apply_incremental(const Incremental &inc)
       osd_info[osd] = osd_info_t();
       osd_xinfo[osd] = osd_xinfo_t();
       set_primary_affinity(osd, CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
-      osd_addrs->client_addr[osd].reset(new entity_addr_t());
-      osd_addrs->cluster_addr[osd].reset(new entity_addr_t());
-      osd_addrs->hb_front_addr[osd].reset(new entity_addr_t());
-      osd_addrs->hb_back_addr[osd].reset(new entity_addr_t());
+      osd_addrs->client_addrs[osd].reset(new entity_addrvec_t());
+      osd_addrs->cluster_addrs[osd].reset(new entity_addrvec_t());
+      osd_addrs->hb_front_addrs[osd].reset(new entity_addrvec_t());
+      osd_addrs->hb_back_addrs[osd].reset(new entity_addrvec_t());
       osd_state[osd] = 0;
     } else {
       osd_state[osd] ^= s;
@@ -1859,23 +2097,19 @@ int OSDMap::apply_incremental(const Incremental &inc)
 
   for (const auto &client : inc.new_up_client) {
     osd_state[client.first] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
-    osd_addrs->client_addr[client.first].reset(new entity_addr_t(client.second));
-    if (inc.new_hb_back_up.empty())
-      osd_addrs->hb_back_addr[client.first].reset(new entity_addr_t(client.second)); //this is a backward-compatibility hack
-    else
-      osd_addrs->hb_back_addr[client.first].reset(
-       new entity_addr_t(inc.new_hb_back_up.find(client.first)->second));
-    const auto j = inc.new_hb_front_up.find(client.first);
-    if (j != inc.new_hb_front_up.end())
-      osd_addrs->hb_front_addr[client.first].reset(new entity_addr_t(j->second));
-    else
-      osd_addrs->hb_front_addr[client.first].reset();
+    osd_addrs->client_addrs[client.first].reset(
+      new entity_addrvec_t(client.second));
+    osd_addrs->hb_back_addrs[client.first].reset(
+      new entity_addrvec_t(inc.new_hb_back_up.find(client.first)->second));
+    osd_addrs->hb_front_addrs[client.first].reset(
+      new entity_addrvec_t(inc.new_hb_front_up.find(client.first)->second));
 
     osd_info[client.first].up_from = epoch;
   }
 
   for (const auto &cluster : inc.new_up_cluster)
-    osd_addrs->cluster_addr[cluster.first].reset(new entity_addr_t(cluster.second));
+    osd_addrs->cluster_addrs[cluster.first].reset(
+      new entity_addrvec_t(cluster.second));
 
   // info
   for (const auto &thru : inc.new_up_thru)
@@ -1937,6 +2171,22 @@ int OSDMap::apply_incremental(const Incremental &inc)
   for (const auto &addr : inc.old_blacklist)
     blacklist.erase(addr);
 
+  for (auto& i : inc.new_crush_node_flags) {
+    if (i.second) {
+      crush_node_flags[i.first] = i.second;
+    } else {
+      crush_node_flags.erase(i.first);
+    }
+  }
+
+  for (auto& i : inc.new_device_class_flags) {
+    if (i.second) {
+      device_class_flags[i.first] = i.second;
+    } else {
+      device_class_flags.erase(i.first);
+    }
+  }
+
   // cluster snapshot?
   if (inc.cluster_snapshot.length()) {
     cluster_snapshot = inc.cluster_snapshot;
@@ -1966,10 +2216,16 @@ int OSDMap::apply_incremental(const Incremental &inc)
     }
   }
 
+  if (inc.new_require_osd_release >= 0) {
+    require_osd_release = inc.new_require_osd_release;
+    if (require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+      flags |= CEPH_OSDMAP_PGLOG_HARDLIMIT;
+    }
+  }
   // do new crush map last (after up/down stuff)
   if (inc.crush.length()) {
     bufferlist bl(inc.crush);
-    auto blp = bl.begin();
+    auto blp = bl.cbegin();
     crush.reset(new CrushWrapper);
     crush->decode(blp);
     if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
@@ -1979,6 +2235,14 @@ int OSDMap::apply_incremental(const Incremental &inc)
       // it in the canonical version, don't change it.
       ++crush_version;
     }
+    for (auto it = device_class_flags.begin();
+         it != device_class_flags.end();) {
+      const char* class_name = crush->get_class_name(it->first);
+      if (!class_name) // device class is gone
+        it = device_class_flags.erase(it);
+      else
+        it++;
+    }
   }
 
   calc_num_osds();
@@ -2248,24 +2512,35 @@ void OSDMap::_get_temp_osds(const pg_pool_t& pool, pg_t pg,
 
 void OSDMap::pg_to_raw_osds(pg_t pg, vector<int> *raw, int *primary) const
 {
-  *primary = -1;
-  raw->clear();
   const pg_pool_t *pool = get_pg_pool(pg.pool());
-  if (!pool)
+  if (!pool) {
+    *primary = -1;
+    raw->clear();
     return;
+  }
   _pg_to_raw_osds(*pool, pg, raw, NULL);
-  if (primary)
-    *primary = _pick_primary(*raw);
+  *primary = _pick_primary(*raw);
+}
+
+void OSDMap::pg_to_raw_upmap(pg_t pg, vector<int>*raw,
+                             vector<int> *raw_upmap) const
+{
+  auto pool = get_pg_pool(pg.pool());
+  if (!pool) {
+    raw_upmap->clear();
+    return;
+  }
+  _pg_to_raw_osds(*pool, pg, raw, NULL);
+  *raw_upmap = *raw;
+  _apply_upmap(*pool, pg, raw_upmap);
 }
 
 void OSDMap::pg_to_raw_up(pg_t pg, vector<int> *up, int *primary) const
 {
   const pg_pool_t *pool = get_pg_pool(pg.pool());
   if (!pool) {
-    if (primary)
-      *primary = -1;
-    if (up)
-      up->clear();
+    *primary = -1;
+    up->clear();
     return;
   }
   vector<int> raw;
@@ -2363,6 +2638,12 @@ bool OSDMap::primary_changed(
 uint64_t OSDMap::get_encoding_features() const
 {
   uint64_t f = SIGNIFICANT_FEATURES;
+  if (require_osd_release < CEPH_RELEASE_NAUTILUS) {
+    f &= ~CEPH_FEATURE_SERVER_NAUTILUS;
+  }
+  if (require_osd_release < CEPH_RELEASE_MIMIC) {
+    f &= ~CEPH_FEATURE_SERVER_MIMIC;
+  }
   if (require_osd_release < CEPH_RELEASE_LUMINOUS) {
     f &= ~(CEPH_FEATURE_SERVER_LUMINOUS |
           CEPH_FEATURE_CRUSH_CHOOSE_ARGS);
@@ -2382,120 +2663,127 @@ uint64_t OSDMap::get_encoding_features() const
 // serialize, unserialize
 void OSDMap::encode_client_old(bufferlist& bl) const
 {
+  using ceph::encode;
   __u16 v = 5;
-  ::encode(v, bl);
+  encode(v, bl);
 
   // base
-  ::encode(fsid, bl);
-  ::encode(epoch, bl);
-  ::encode(created, bl);
-  ::encode(modified, bl);
+  encode(fsid, bl);
+  encode(epoch, bl);
+  encode(created, bl);
+  encode(modified, bl);
 
-  // for ::encode(pools, bl);
+  // for encode(pools, bl);
   __u32 n = pools.size();
-  ::encode(n, bl);
+  encode(n, bl);
 
   for (const auto &pool : pools) {
     n = pool.first;
-    ::encode(n, bl);
-    ::encode(pool.second, bl, 0);
+    encode(n, bl);
+    encode(pool.second, bl, 0);
   }
-  // for ::encode(pool_name, bl);
+  // for encode(pool_name, bl);
   n = pool_name.size();
-  ::encode(n, bl);
+  encode(n, bl);
   for (const auto &pname : pool_name) {
     n = pname.first;
-    ::encode(n, bl);
-    ::encode(pname.second, bl);
+    encode(n, bl);
+    encode(pname.second, bl);
   }
-  // for ::encode(pool_max, bl);
+  // for encode(pool_max, bl);
   n = pool_max;
-  ::encode(n, bl);
+  encode(n, bl);
 
-  ::encode(flags, bl);
+  encode(flags, bl);
 
-  ::encode(max_osd, bl);
+  encode(max_osd, bl);
   {
     uint32_t n = osd_state.size();
-    ::encode(n, bl);
+    encode(n, bl);
     for (auto s : osd_state) {
-      ::encode((uint8_t)s, bl);
+      encode((uint8_t)s, bl);
     }
   }
-  ::encode(osd_weight, bl);
-  ::encode(osd_addrs->client_addr, bl, 0);
+  encode(osd_weight, bl);
+  encode(osd_addrs->client_addrs, bl, 0);
 
-  // for ::encode(pg_temp, bl);
+  // for encode(pg_temp, bl);
   n = pg_temp->size();
-  ::encode(n, bl);
+  encode(n, bl);
   for (const auto pg : *pg_temp) {
     old_pg_t opg = pg.first.get_old_pg();
-    ::encode(opg, bl);
-    ::encode(pg.second, bl);
+    encode(opg, bl);
+    encode(pg.second, bl);
   }
 
   // crush
   bufferlist cbl;
   crush->encode(cbl, 0 /* legacy (no) features */);
-  ::encode(cbl, bl);
+  encode(cbl, bl);
 }
 
 void OSDMap::encode_classic(bufferlist& bl, uint64_t features) const
 {
+  using ceph::encode;
   if ((features & CEPH_FEATURE_PGID64) == 0) {
     encode_client_old(bl);
     return;
   }
 
   __u16 v = 6;
-  ::encode(v, bl);
+  encode(v, bl);
 
   // base
-  ::encode(fsid, bl);
-  ::encode(epoch, bl);
-  ::encode(created, bl);
-  ::encode(modified, bl);
+  encode(fsid, bl);
+  encode(epoch, bl);
+  encode(created, bl);
+  encode(modified, bl);
 
-  ::encode(pools, bl, features);
-  ::encode(pool_name, bl);
-  ::encode(pool_max, bl);
+  encode(pools, bl, features);
+  encode(pool_name, bl);
+  encode(pool_max, bl);
 
-  ::encode(flags, bl);
+  encode(flags, bl);
 
-  ::encode(max_osd, bl);
+  encode(max_osd, bl);
   {
     uint32_t n = osd_state.size();
-    ::encode(n, bl);
+    encode(n, bl);
     for (auto s : osd_state) {
-      ::encode((uint8_t)s, bl);
+      encode((uint8_t)s, bl);
     }
   }
-  ::encode(osd_weight, bl);
-  ::encode(osd_addrs->client_addr, bl, features);
+  encode(osd_weight, bl);
+  encode(osd_addrs->client_addrs, bl, features);
 
-  ::encode(*pg_temp, bl);
+  encode(*pg_temp, bl);
 
   // crush
   bufferlist cbl;
   crush->encode(cbl, 0 /* legacy (no) features */);
-  ::encode(cbl, bl);
+  encode(cbl, bl);
 
   // extended
   __u16 ev = 10;
-  ::encode(ev, bl);
-  ::encode(osd_addrs->hb_back_addr, bl, features);
-  ::encode(osd_info, bl);
-  ::encode(blacklist, bl, features);
-  ::encode(osd_addrs->cluster_addr, bl, features);
-  ::encode(cluster_snapshot_epoch, bl);
-  ::encode(cluster_snapshot, bl);
-  ::encode(*osd_uuid, bl);
-  ::encode(osd_xinfo, bl);
-  ::encode(osd_addrs->hb_front_addr, bl, features);
+  encode(ev, bl);
+  encode(osd_addrs->hb_back_addrs, bl, features);
+  encode(osd_info, bl);
+  encode(blacklist, bl, features);
+  encode(osd_addrs->cluster_addrs, bl, features);
+  encode(cluster_snapshot_epoch, bl);
+  encode(cluster_snapshot, bl);
+  encode(*osd_uuid, bl);
+  encode(osd_xinfo, bl);
+  encode(osd_addrs->hb_front_addrs, bl, features);
 }
 
+/* for a description of osdmap versions, and when they were introduced, please
+ * refer to
+ *    doc/dev/osd_internals/osdmap_versions.txt
+ */
 void OSDMap::encode(bufferlist& bl, uint64_t features) const
 {
+  using ceph::encode;
   if ((features & CEPH_FEATURE_OSDMAP_ENC) == 0) {
     encode_classic(bl, features);
     return;
@@ -2505,12 +2793,13 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   // OSDMaps.  others should be passing around the canonical encoded
   // buffers from on high.  select out those callers by passing in an
   // "impossible" feature bit.
-  assert(features & CEPH_FEATURE_RESERVED);
+  ceph_assert(features & CEPH_FEATURE_RESERVED);
   features &= ~CEPH_FEATURE_RESERVED;
 
   size_t start_offset = bl.length();
   size_t tail_offset;
-  buffer::list::iterator crc_it;
+  size_t crc_offset;
+  std::optional<buffer::list::contiguous_filler> crc_filler;
 
   // meta-encoding: how we include client-used and osd-specific data
   ENCODE_START(8, 7, bl);
@@ -2518,20 +2807,24 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   {
     // NOTE: any new encoding dependencies must be reflected by
     // SIGNIFICANT_FEATURES
-    uint8_t v = 6;
+    uint8_t v = 9;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       v = 3;
+    } else if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
+      v = 6;
+    } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
+      v = 7;
     }
     ENCODE_START(v, 1, bl); // client-usable data
     // base
-    ::encode(fsid, bl);
-    ::encode(epoch, bl);
-    ::encode(created, bl);
-    ::encode(modified, bl);
+    encode(fsid, bl);
+    encode(epoch, bl);
+    encode(created, bl);
+    encode(modified, bl);
 
-    ::encode(pools, bl, features);
-    ::encode(pool_name, bl);
-    ::encode(pool_max, bl);
+    encode(pools, bl, features);
+    encode(pool_name, bl);
+    encode(pool_max, bl);
 
     if (v < 4) {
       decltype(flags) f = flags;
@@ -2541,48 +2834,60 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
        f |= CEPH_OSDMAP_REQUIRE_KRAKEN;
       else if (require_osd_release == CEPH_RELEASE_JEWEL)
        f |= CEPH_OSDMAP_REQUIRE_JEWEL;
-      ::encode(f, bl);
+      encode(f, bl);
     } else {
-      ::encode(flags, bl);
+      encode(flags, bl);
     }
 
-    ::encode(max_osd, bl);
+    encode(max_osd, bl);
     if (v >= 5) {
-      ::encode(osd_state, bl);
+      encode(osd_state, bl);
     } else {
       uint32_t n = osd_state.size();
-      ::encode(n, bl);
+      encode(n, bl);
       for (auto s : osd_state) {
-       ::encode((uint8_t)s, bl);
+       encode((uint8_t)s, bl);
       }
     }
-    ::encode(osd_weight, bl);
-    ::encode(osd_addrs->client_addr, bl, features);
+    encode(osd_weight, bl);
+    if (v >= 8) {
+      encode(osd_addrs->client_addrs, bl, features);
+    } else {
+      encode_addrvec_pvec_as_addr(osd_addrs->client_addrs, bl, features);
+    }
 
-    ::encode(*pg_temp, bl);
-    ::encode(*primary_temp, bl);
+    encode(*pg_temp, bl);
+    encode(*primary_temp, bl);
     if (osd_primary_affinity) {
-      ::encode(*osd_primary_affinity, bl);
+      encode(*osd_primary_affinity, bl);
     } else {
       vector<__u32> v;
-      ::encode(v, bl);
+      encode(v, bl);
     }
 
     // crush
     bufferlist cbl;
     crush->encode(cbl, features);
-    ::encode(cbl, bl);
-    ::encode(erasure_code_profiles, bl);
+    encode(cbl, bl);
+    encode(erasure_code_profiles, bl);
 
     if (v >= 4) {
-      ::encode(pg_upmap, bl);
-      ::encode(pg_upmap_items, bl);
+      encode(pg_upmap, bl);
+      encode(pg_upmap_items, bl);
     } else {
-      assert(pg_upmap.empty());
-      assert(pg_upmap_items.empty());
+      ceph_assert(pg_upmap.empty());
+      ceph_assert(pg_upmap_items.empty());
     }
     if (v >= 6) {
-      ::encode(crush_version, bl);
+      encode(crush_version, bl);
+    }
+    if (v >= 7) {
+      encode(new_removed_snaps, bl);
+      encode(new_purged_snaps, bl);
+    }
+    if (v >= 9) {
+      encode(last_up_change, bl);
+      encode(last_in_change, bl);
     }
     ENCODE_FINISH(bl); // client-usable data
   }
@@ -2590,50 +2895,74 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   {
     // NOTE: any new encoding dependencies must be reflected by
     // SIGNIFICANT_FEATURES
-    uint8_t target_v = 5;
+    uint8_t target_v = 9;
     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
       target_v = 1;
+    } else if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
+      target_v = 5;
+    } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
+      target_v = 6;
     }
     ENCODE_START(target_v, 1, bl); // extended, osd-only data
-    ::encode(osd_addrs->hb_back_addr, bl, features);
-    ::encode(osd_info, bl);
+    if (target_v < 7) {
+      encode_addrvec_pvec_as_addr(osd_addrs->hb_back_addrs, bl, features);
+    } else {
+      encode(osd_addrs->hb_back_addrs, bl, features);
+    }
+    encode(osd_info, bl);
     {
       // put this in a sorted, ordered map<> so that we encode in a
       // deterministic order.
       map<entity_addr_t,utime_t> blacklist_map;
       for (const auto &addr : blacklist)
        blacklist_map.insert(make_pair(addr.first, addr.second));
-      ::encode(blacklist_map, bl, features);
-    }
-    ::encode(osd_addrs->cluster_addr, bl, features);
-    ::encode(cluster_snapshot_epoch, bl);
-    ::encode(cluster_snapshot, bl);
-    ::encode(*osd_uuid, bl);
-    ::encode(osd_xinfo, bl);
-    ::encode(osd_addrs->hb_front_addr, bl, features);
+      encode(blacklist_map, bl, features);
+    }
+    if (target_v < 7) {
+      encode_addrvec_pvec_as_addr(osd_addrs->cluster_addrs, bl, features);
+    } else {
+      encode(osd_addrs->cluster_addrs, bl, features);
+    }
+    encode(cluster_snapshot_epoch, bl);
+    encode(cluster_snapshot, bl);
+    encode(*osd_uuid, bl);
+    encode(osd_xinfo, bl);
+    if (target_v < 7) {
+      encode_addrvec_pvec_as_addr(osd_addrs->hb_front_addrs, bl, features);
+    } else {
+      encode(osd_addrs->hb_front_addrs, bl, features);
+    }
     if (target_v >= 2) {
-      ::encode(nearfull_ratio, bl);
-      ::encode(full_ratio, bl);
-      ::encode(backfillfull_ratio, bl);
+      encode(nearfull_ratio, bl);
+      encode(full_ratio, bl);
+      encode(backfillfull_ratio, bl);
     }
     // 4 was string-based new_require_min_compat_client
     if (target_v >= 5) {
-      ::encode(require_min_compat_client, bl);
-      ::encode(require_osd_release, bl);
+      encode(require_min_compat_client, bl);
+      encode(require_osd_release, bl);
+    }
+    if (target_v >= 6) {
+      encode(removed_snaps_queue, bl);
+    }
+    if (target_v >= 8) {
+      encode(crush_node_flags, bl);
+    }
+    if (target_v >= 9) {
+      encode(device_class_flags, bl);
     }
     ENCODE_FINISH(bl); // osd-only data
   }
 
-  ::encode((uint32_t)0, bl); // dummy crc
-  crc_it = bl.end();
-  crc_it.advance(-4);
+  crc_offset = bl.length();
+  crc_filler = bl.append_hole(sizeof(uint32_t));
   tail_offset = bl.length();
 
   ENCODE_FINISH(bl); // meta-encoding wrapper
 
   // fill in crc
   bufferlist front;
-  front.substr_of(bl, start_offset, crc_it.get_off() - start_offset);
+  front.substr_of(bl, start_offset, crc_offset - start_offset);
   crc = front.crc32c(-1);
   if (tail_offset < bl.length()) {
     bufferlist tail;
@@ -2642,138 +2971,144 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   }
   ceph_le32 crc_le;
   crc_le = crc;
-  crc_it.copy_in(4, (char*)&crc_le);
+  crc_filler->copy_in(4, (char*)&crc_le);
   crc_defined = true;
 }
 
+/* for a description of osdmap versions, and when they were introduced, please
+ * refer to
+ *    doc/dev/osd_internals/osdmap_versions.txt
+ */
 void OSDMap::decode(bufferlist& bl)
 {
-  auto p = bl.begin();
+  auto p = bl.cbegin();
   decode(p);
 }
 
-void OSDMap::decode_classic(bufferlist::iterator& p)
+void OSDMap::decode_classic(bufferlist::const_iterator& p)
 {
+  using ceph::decode;
   __u32 n, t;
   __u16 v;
-  ::decode(v, p);
+  decode(v, p);
 
   // base
-  ::decode(fsid, p);
-  ::decode(epoch, p);
-  ::decode(created, p);
-  ::decode(modified, p);
+  decode(fsid, p);
+  decode(epoch, p);
+  decode(created, p);
+  decode(modified, p);
 
   if (v < 6) {
     if (v < 4) {
       int32_t max_pools = 0;
-      ::decode(max_pools, p);
+      decode(max_pools, p);
       pool_max = max_pools;
     }
     pools.clear();
-    ::decode(n, p);
+    decode(n, p);
     while (n--) {
-      ::decode(t, p);
-      ::decode(pools[t], p);
+      decode(t, p);
+      decode(pools[t], p);
     }
     if (v == 4) {
-      ::decode(n, p);
+      decode(n, p);
       pool_max = n;
     } else if (v == 5) {
       pool_name.clear();
-      ::decode(n, p);
+      decode(n, p);
       while (n--) {
-       ::decode(t, p);
-       ::decode(pool_name[t], p);
+       decode(t, p);
+       decode(pool_name[t], p);
       }
-      ::decode(n, p);
+      decode(n, p);
       pool_max = n;
     }
   } else {
-    ::decode(pools, p);
-    ::decode(pool_name, p);
-    ::decode(pool_max, p);
+    decode(pools, p);
+    decode(pool_name, p);
+    decode(pool_max, p);
   }
   // kludge around some old bug that zeroed out pool_max (#2307)
   if (pools.size() && pool_max < pools.rbegin()->first) {
     pool_max = pools.rbegin()->first;
   }
 
-  ::decode(flags, p);
+  decode(flags, p);
 
-  ::decode(max_osd, p);
+  decode(max_osd, p);
   {
     vector<uint8_t> os;
-    ::decode(os, p);
+    decode(os, p);
     osd_state.resize(os.size());
     for (unsigned i = 0; i < os.size(); ++i) {
       osd_state[i] = os[i];
     }
   }
-  ::decode(osd_weight, p);
-  ::decode(osd_addrs->client_addr, p);
+  decode(osd_weight, p);
+  decode(osd_addrs->client_addrs, p);
   if (v <= 5) {
     pg_temp->clear();
-    ::decode(n, p);
+    decode(n, p);
     while (n--) {
       old_pg_t opg;
       ::decode_raw(opg, p);
       mempool::osdmap::vector<int32_t> v;
-      ::decode(v, p);
+      decode(v, p);
       pg_temp->set(pg_t(opg), v);
     }
   } else {
-    ::decode(*pg_temp, p);
+    decode(*pg_temp, p);
   }
 
   // crush
   bufferlist cbl;
-  ::decode(cbl, p);
-  auto cblp = cbl.begin();
+  decode(cbl, p);
+  auto cblp = cbl.cbegin();
   crush->decode(cblp);
 
   // extended
   __u16 ev = 0;
   if (v >= 5)
-    ::decode(ev, p);
-  ::decode(osd_addrs->hb_back_addr, p);
-  ::decode(osd_info, p);
+    decode(ev, p);
+  decode(osd_addrs->hb_back_addrs, p);
+  decode(osd_info, p);
   if (v < 5)
-    ::decode(pool_name, p);
+    decode(pool_name, p);
 
-  ::decode(blacklist, p);
+  decode(blacklist, p);
   if (ev >= 6)
-    ::decode(osd_addrs->cluster_addr, p);
+    decode(osd_addrs->cluster_addrs, p);
   else
-    osd_addrs->cluster_addr.resize(osd_addrs->client_addr.size());
+    osd_addrs->cluster_addrs.resize(osd_addrs->client_addrs.size());
 
   if (ev >= 7) {
-    ::decode(cluster_snapshot_epoch, p);
-    ::decode(cluster_snapshot, p);
+    decode(cluster_snapshot_epoch, p);
+    decode(cluster_snapshot, p);
   }
 
   if (ev >= 8) {
-    ::decode(*osd_uuid, p);
+    decode(*osd_uuid, p);
   } else {
     osd_uuid->resize(max_osd);
   }
   if (ev >= 9)
-    ::decode(osd_xinfo, p);
+    decode(osd_xinfo, p);
   else
     osd_xinfo.resize(max_osd);
 
   if (ev >= 10)
-    ::decode(osd_addrs->hb_front_addr, p);
+    decode(osd_addrs->hb_front_addrs, p);
   else
-    osd_addrs->hb_front_addr.resize(osd_addrs->hb_back_addr.size());
+    osd_addrs->hb_front_addrs.resize(osd_addrs->hb_back_addrs.size());
 
   osd_primary_affinity.reset();
 
   post_decode();
 }
 
-void OSDMap::decode(bufferlist::iterator& bl)
+void OSDMap::decode(bufferlist::const_iterator& bl)
 {
+  using ceph::decode;
   /**
    * Older encodings of the OSDMap had a single struct_v which
    * covered the whole encoding, and was prior to our modern
@@ -2787,8 +3122,7 @@ void OSDMap::decode(bufferlist::iterator& bl)
 
   DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
   if (struct_v < 7) {
-    int struct_v_size = sizeof(struct_v);
-    bl.advance(-struct_v_size);
+    bl.seek(start_offset);
     decode_classic(bl);
     return;
   }
@@ -2796,38 +3130,40 @@ void OSDMap::decode(bufferlist::iterator& bl)
    * Since we made it past that hurdle, we can use our normal paths.
    */
   {
-    DECODE_START(6, bl); // client-usable data
+    DECODE_START(9, bl); // client-usable data
     // base
-    ::decode(fsid, bl);
-    ::decode(epoch, bl);
-    ::decode(created, bl);
-    ::decode(modified, bl);
+    decode(fsid, bl);
+    decode(epoch, bl);
+    decode(created, bl);
+    decode(modified, bl);
 
-    ::decode(pools, bl);
-    ::decode(pool_name, bl);
-    ::decode(pool_max, bl);
+    decode(pools, bl);
+    decode(pool_name, bl);
+    decode(pool_max, bl);
 
-    ::decode(flags, bl);
+    decode(flags, bl);
 
-    ::decode(max_osd, bl);
+    decode(max_osd, bl);
     if (struct_v >= 5) {
-      ::decode(osd_state, bl);
+      decode(osd_state, bl);
     } else {
       vector<uint8_t> os;
-      ::decode(os, bl);
+      decode(os, bl);
       osd_state.resize(os.size());
       for (unsigned i = 0; i < os.size(); ++i) {
        osd_state[i] = os[i];
       }
     }
-    ::decode(osd_weight, bl);
-    ::decode(osd_addrs->client_addr, bl);
+    decode(osd_weight, bl);
+    decode(osd_addrs->client_addrs, bl);
 
-    ::decode(*pg_temp, bl);
-    ::decode(*primary_temp, bl);
+    decode(*pg_temp, bl);
+    decode(*primary_temp, bl);
+    // dates back to firefly. version increased from 2 to 3 still in firefly.
+    // do we really still need to keep this around? even for old clients?
     if (struct_v >= 2) {
       osd_primary_affinity.reset(new mempool::osdmap::vector<__u32>);
-      ::decode(*osd_primary_affinity, bl);
+      decode(*osd_primary_affinity, bl);
       if (osd_primary_affinity->empty())
        osd_primary_affinity.reset();
     } else {
@@ -2836,59 +3172,80 @@ void OSDMap::decode(bufferlist::iterator& bl)
 
     // crush
     bufferlist cbl;
-    ::decode(cbl, bl);
-    auto cblp = cbl.begin();
+    decode(cbl, bl);
+    auto cblp = cbl.cbegin();
     crush->decode(cblp);
+    // added in firefly; version increased in luminous, so it affects
+    // giant, hammer, infernallis, jewel, and kraken. probably should be left
+    // alone until we require clients to be all luminous?
     if (struct_v >= 3) {
-      ::decode(erasure_code_profiles, bl);
+      decode(erasure_code_profiles, bl);
     } else {
       erasure_code_profiles.clear();
     }
+    // version increased from 3 to 4 still in luminous, so same as above
+    // applies.
     if (struct_v >= 4) {
-      ::decode(pg_upmap, bl);
-      ::decode(pg_upmap_items, bl);
+      decode(pg_upmap, bl);
+      decode(pg_upmap_items, bl);
     } else {
       pg_upmap.clear();
       pg_upmap_items.clear();
     }
+    // again, version increased from 5 to 6 still in luminous, so above
+    // applies.
     if (struct_v >= 6) {
-      ::decode(crush_version, bl);
+      decode(crush_version, bl);
+    }
+    // version increase from 6 to 7 in mimic
+    if (struct_v >= 7) {
+      decode(new_removed_snaps, bl);
+      decode(new_purged_snaps, bl);
+    }
+    // version increase from 7 to 8, 8 to 9, in nautilus.
+    if (struct_v >= 9) {
+      decode(last_up_change, bl);
+      decode(last_in_change, bl);
     }
     DECODE_FINISH(bl); // client-usable data
   }
 
   {
-    DECODE_START(5, bl); // extended, osd-only data
-    ::decode(osd_addrs->hb_back_addr, bl);
-    ::decode(osd_info, bl);
-    ::decode(blacklist, bl);
-    ::decode(osd_addrs->cluster_addr, bl);
-    ::decode(cluster_snapshot_epoch, bl);
-    ::decode(cluster_snapshot, bl);
-    ::decode(*osd_uuid, bl);
-    ::decode(osd_xinfo, bl);
-    ::decode(osd_addrs->hb_front_addr, bl);
+    DECODE_START(9, bl); // extended, osd-only data
+    decode(osd_addrs->hb_back_addrs, bl);
+    decode(osd_info, bl);
+    decode(blacklist, bl);
+    decode(osd_addrs->cluster_addrs, bl);
+    decode(cluster_snapshot_epoch, bl);
+    decode(cluster_snapshot, bl);
+    decode(*osd_uuid, bl);
+    decode(osd_xinfo, bl);
+    decode(osd_addrs->hb_front_addrs, bl);
+    // 
     if (struct_v >= 2) {
-      ::decode(nearfull_ratio, bl);
-      ::decode(full_ratio, bl);
+      decode(nearfull_ratio, bl);
+      decode(full_ratio, bl);
     } else {
       nearfull_ratio = 0;
       full_ratio = 0;
     }
     if (struct_v >= 3) {
-      ::decode(backfillfull_ratio, bl);
+      decode(backfillfull_ratio, bl);
     } else {
       backfillfull_ratio = 0;
     }
     if (struct_v == 4) {
       string r;
-      ::decode(r, bl);
+      decode(r, bl);
       if (r.length())
        require_min_compat_client = ceph_release_from_name(r.c_str());
     }
     if (struct_v >= 5) {
-      ::decode(require_min_compat_client, bl);
-      ::decode(require_osd_release, bl);
+      decode(require_min_compat_client, bl);
+      decode(require_osd_release, bl);
+      if (require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+       flags |= CEPH_OSDMAP_PGLOG_HARDLIMIT;
+      }
       if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
        flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
        flags |= CEPH_OSDMAP_RECOVERY_DELETES;
@@ -2907,12 +3264,25 @@ void OSDMap::decode(bufferlist::iterator& bl)
        require_osd_release = 0;
       }
     }
+    if (struct_v >= 6) {
+      decode(removed_snaps_queue, bl);
+    }
+    if (struct_v >= 8) {
+      decode(crush_node_flags, bl);
+    } else {
+      crush_node_flags.clear();
+    }
+    if (struct_v >= 9) {
+      decode(device_class_flags, bl);
+    } else {
+      device_class_flags.clear();
+    }
     DECODE_FINISH(bl); // osd-only data
   }
 
   if (struct_v >= 8) {
     crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
-    ::decode(crc, bl);
+    decode(crc, bl);
     tail_offset = bl.get_off();
     crc_defined = true;
   } else {
@@ -2974,7 +3344,17 @@ void OSDMap::dump(Formatter *f) const
   f->dump_stream("fsid") << get_fsid();
   f->dump_stream("created") << get_created();
   f->dump_stream("modified") << get_modified();
+  f->dump_stream("last_up_change") << last_up_change;
+  f->dump_stream("last_in_change") << last_in_change;
   f->dump_string("flags", get_flag_string());
+  f->dump_unsigned("flags_num", flags);
+  f->open_array_section("flags_set");
+  set<string> flagset;
+  get_flag_set(&flagset);
+  for (auto p : flagset) {
+    f->dump_string("flag", p);
+  }
+  f->close_section();
   f->dump_unsigned("crush_version", get_crush_version());
   f->dump_float("full_ratio", full_ratio);
   f->dump_float("backfillfull_ratio", backfillfull_ratio);
@@ -3014,10 +3394,17 @@ void OSDMap::dump(Formatter *f) const
       f->dump_float("weight", get_weightf(i));
       f->dump_float("primary_affinity", get_primary_affinityf(i));
       get_info(i).dump(f);
-      f->dump_stream("public_addr") << get_addr(i);
-      f->dump_stream("cluster_addr") << get_cluster_addr(i);
-      f->dump_stream("heartbeat_back_addr") << get_hb_back_addr(i);
-      f->dump_stream("heartbeat_front_addr") << get_hb_front_addr(i);
+      f->dump_object("public_addrs", get_addrs(i));
+      f->dump_object("cluster_addrs", get_cluster_addrs(i));
+      f->dump_object("heartbeat_back_addrs", get_hb_back_addrs(i));
+      f->dump_object("heartbeat_front_addrs", get_hb_front_addrs(i));
+      // compat
+      f->dump_stream("public_addr") << get_addrs(i).get_legacy_str();
+      f->dump_stream("cluster_addr") << get_cluster_addrs(i).get_legacy_str();
+      f->dump_stream("heartbeat_back_addr")
+       << get_hb_back_addrs(i).get_legacy_str();
+      f->dump_stream("heartbeat_front_addr")
+       << get_hb_front_addrs(i).get_legacy_str();
 
       set<string> st;
       get_state(i, st);
@@ -3088,6 +3475,78 @@ void OSDMap::dump(Formatter *f) const
   f->close_section();
 
   dump_erasure_code_profiles(erasure_code_profiles, f);
+
+  f->open_array_section("removed_snaps_queue");
+  for (auto& p : removed_snaps_queue) {
+    f->open_object_section("pool");
+    f->dump_int("pool", p.first);
+    f->open_array_section("snaps");
+    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
+      f->open_object_section("interval");
+      f->dump_unsigned("begin", q.get_start());
+      f->dump_unsigned("length", q.get_len());
+      f->close_section();
+    }
+    f->close_section();
+    f->close_section();
+  }
+  f->close_section();
+  f->open_array_section("new_removed_snaps");
+  for (auto& p : new_removed_snaps) {
+    f->open_object_section("pool");
+    f->dump_int("pool", p.first);
+    f->open_array_section("snaps");
+    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
+      f->open_object_section("interval");
+      f->dump_unsigned("begin", q.get_start());
+      f->dump_unsigned("length", q.get_len());
+      f->close_section();
+    }
+    f->close_section();
+    f->close_section();
+  }
+  f->close_section();
+  f->open_array_section("new_purged_snaps");
+  for (auto& p : new_purged_snaps) {
+    f->open_object_section("pool");
+    f->dump_int("pool", p.first);
+    f->open_array_section("snaps");
+    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
+      f->open_object_section("interval");
+      f->dump_unsigned("begin", q.get_start());
+      f->dump_unsigned("length", q.get_len());
+      f->close_section();
+    }
+    f->close_section();
+    f->close_section();
+  }
+  f->close_section();
+  f->open_object_section("crush_node_flags");
+  for (auto& i : crush_node_flags) {
+    string s = crush->item_exists(i.first) ? crush->get_item_name(i.first)
+      : stringify(i.first);
+    f->open_array_section(s.c_str());
+    set<string> st;
+    calc_state_set(i.second, st);
+    for (auto& j : st) {
+      f->dump_string("flag", j);
+    }
+    f->close_section();
+  }
+  f->close_section();
+  f->open_object_section("device_class_flags");
+  for (auto& i : device_class_flags) {
+    const char* class_name = crush->get_class_name(i.first);
+    string s = class_name ? class_name : stringify(i.first);
+    f->open_array_section(s.c_str());
+    set<string> st;
+    calc_state_set(i.second, st);
+    for (auto& j : st) {
+      f->dump_string("flag", j);
+    }
+    f->close_section();
+  }
+  f->close_section();
 }
 
 void OSDMap::generate_test_instances(list<OSDMap*>& o)
@@ -3136,6 +3595,8 @@ string OSDMap::get_flag_string(unsigned f)
     s += ",nodeep-scrub";
   if (f & CEPH_OSDMAP_NOTIERAGENT)
     s += ",notieragent";
+  if (f & CEPH_OSDMAP_NOSNAPTRIM)
+    s += ",nosnaptrim";
   if (f & CEPH_OSDMAP_SORTBITWISE)
     s += ",sortbitwise";
   if (f & CEPH_OSDMAP_REQUIRE_JEWEL)
@@ -3148,6 +3609,8 @@ string OSDMap::get_flag_string(unsigned f)
     s += ",recovery_deletes";
   if (f & CEPH_OSDMAP_PURGED_SNAPDIRS)
     s += ",purged_snapdirs";
+  if (f & CEPH_OSDMAP_PGLOG_HARDLIMIT)
+    s += ",pglog_hardlimit";
   if (s.length())
     s.erase(0, 1);
   return s;
@@ -3174,6 +3637,10 @@ void OSDMap::print_pools(ostream& out) const
 
     if (!pool.second.removed_snaps.empty())
       out << "\tremoved_snaps " << pool.second.removed_snaps << "\n";
+    auto p = removed_snaps_queue.find(pool.first);
+    if (p != removed_snaps_queue.end()) {
+      out << "\tremoved_snaps_queue " << p->second << "\n";
+    }
   }
   out << std::endl;
 }
@@ -3217,8 +3684,7 @@ void OSDMap::print(ostream& out) const
        out << " primary_affinity " << get_primary_affinityf(i);
       const osd_info_t& info(get_info(i));
       out << " " << info;
-      out << " " << get_addr(i) << " " << get_cluster_addr(i) << " " << get_hb_back_addr(i)
-         << " " << get_hb_front_addr(i);
+      out << " " << get_addrs(i) << " " << get_cluster_addrs(i);
       set<string> st;
       get_state(i, st);
       out << " " << st;
@@ -3244,8 +3710,6 @@ void OSDMap::print(ostream& out) const
 
   for (const auto &addr : blacklist)
     out << "blacklist " << addr.first << " expires " << addr.second << "\n";
-
-  // ignore pg_swap_primary
 }
 
 class OSDTreePlainDumper : public CrushTreeDumper::Dumper<TextTable> {
@@ -3274,7 +3738,7 @@ public:
     return !filter;
   }
 
-  void dump(TextTable *tbl) {
+  void init_table(TextTable *tbl) {
     tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
@@ -3282,12 +3746,19 @@ public:
     tbl->define_column("STATUS", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("PRI-AFF", TextTable::LEFT, TextTable::RIGHT);
+  }
+  void dump(TextTable *tbl, string& bucket) {
+    init_table(tbl);
 
-    Parent::dump(tbl);
-
-    for (int i = 0; i < osdmap->get_max_osd(); i++) {
-      if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i)) {
-       dump_item(CrushTreeDumper::Item(i, 0, 0, 0), tbl);
+    if (!bucket.empty()) {
+      set_root(bucket);
+      Parent::dump(tbl);
+    } else {
+      Parent::dump(tbl);
+      for (int i = 0; i < osdmap->get_max_osd(); i++) {
+       if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i)) {
+         dump_item(CrushTreeDumper::Item(i, 0, 0, 0), tbl);
+       }
       }
     }
   }
@@ -3364,16 +3835,23 @@ public:
     return !filter;
   }
 
-  void dump(Formatter *f) {
-    f->open_array_section("nodes");
-    Parent::dump(f);
-    f->close_section();
-    f->open_array_section("stray");
-    for (int i = 0; i < osdmap->get_max_osd(); i++) {
-      if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i))
-       dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
+  void dump(Formatter *f, string& bucket) {
+    if (!bucket.empty()) {
+      set_root(bucket);
+      f->open_array_section("nodes");
+      Parent::dump(f);
+      f->close_section();
+    } else {
+      f->open_array_section("nodes");
+      Parent::dump(f);
+      f->close_section();
+      f->open_array_section("stray");
+      for (int i = 0; i < osdmap->get_max_osd(); i++) {
+       if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i))
+         dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
+      }
+      f->close_section();
     }
-    f->close_section();
   }
 
 protected:
@@ -3401,20 +3879,20 @@ private:
   const unsigned filter;
 };
 
-void OSDMap::print_tree(Formatter *f, ostream *out, unsigned filter) const
+void OSDMap::print_tree(Formatter *f, ostream *out, unsigned filter, string bucket) const
 {
   if (f) {
-    OSDTreeFormattingDumper(crush.get(), this, filter).dump(f);
+    OSDTreeFormattingDumper(crush.get(), this, filter).dump(f, bucket);
   } else {
-    assert(out);
+    ceph_assert(out);
     TextTable tbl;
-    OSDTreePlainDumper(crush.get(), this, filter).dump(&tbl);
+    OSDTreePlainDumper(crush.get(), this, filter).dump(&tbl, bucket);
     *out << tbl;
   }
 }
 
 void OSDMap::print_summary(Formatter *f, ostream& out,
-                          const string& prefix) const
+                          const string& prefix, bool extra) const
 {
   if (f) {
     f->open_object_section("osdmap");
@@ -3427,9 +3905,18 @@ void OSDMap::print_summary(Formatter *f, ostream& out,
     f->dump_unsigned("num_remapped_pgs", get_num_pg_temp());
     f->close_section();
   } else {
+    utime_t now = ceph_clock_now();
     out << get_num_osds() << " osds: "
-       << get_num_up_osds() << " up, "
-       << get_num_in_osds() << " in";
+       << get_num_up_osds() << " up";
+    if (last_up_change != utime_t()) {
+      out << " (since " << utimespan_str(now - last_up_change) << ")";
+    }
+    out << ", " << get_num_in_osds() << " in";
+    if (last_in_change != utime_t()) {
+      out << " (since " << utimespan_str(now - last_in_change) << ")";
+    }
+    if (extra)
+      out << "; epoch: e" << get_epoch();
     if (get_num_pg_temp())
       out << "; " << get_num_pg_temp() << " remapped pgs";
     out << "\n";
@@ -3446,9 +3933,9 @@ void OSDMap::print_oneline_summary(ostream& out) const
       << get_num_up_osds() << " up, "
       << get_num_in_osds() << " in";
   if (test_flag(CEPH_OSDMAP_FULL))
-    out << " full";
+    out << "; full flag set";
   else if (test_flag(CEPH_OSDMAP_NEARFULL))
-    out << " nearfull";
+    out << "; nearfull flag set";
 }
 
 bool OSDMap::crush_rule_in_use(int rule_id) const
@@ -3479,9 +3966,10 @@ int OSDMap::validate_crush_rules(CrushWrapper *newcrush,
       *ss << "pool " << i.first << " type does not match rule " << ruleno;
       return -EINVAL;
     }
-    if (pool.get_size() < (int)newcrush->get_rule_mask_min_size(ruleno) ||
-       pool.get_size() > (int)newcrush->get_rule_mask_max_size(ruleno)) {
-      *ss << "pool " << i.first << " size " << pool.get_size() << " does not"
+    int poolsize = pool.get_size();
+    if (poolsize < newcrush->get_rule_mask_min_size(ruleno) ||
+       poolsize > newcrush->get_rule_mask_max_size(ruleno)) {
+      *ss << "pool " << i.first << " size " << poolsize << " does not"
          << " fall within rule " << ruleno
          << " min_size " << newcrush->get_rule_mask_min_size(ruleno)
          << " and max_size " << newcrush->get_rule_mask_max_size(ruleno);
@@ -3506,9 +3994,9 @@ int OSDMap::build_simple_optioned(CephContext *cct, epoch_t e, uuid_d &fsid,
   } else {
     // count osds
     int maxosd = 0;
-    const md_config_t *conf = cct->_conf;
+    const auto& conf = cct->_conf;
     vector<string> sections;
-    conf->get_all_sections(sections);
+    conf.get_all_sections(sections);
 
     for (auto &section : sections) {
       if (section.find("osd.") != 0)
@@ -3539,12 +4027,12 @@ int OSDMap::build_simple_optioned(CephContext *cct, epoch_t e, uuid_d &fsid,
     r = build_simple_crush_map(cct, *crush, nosd, &ss);
   else
     r = build_simple_crush_map_from_conf(cct, *crush, &ss);
-  assert(r == 0);
+  ceph_assert(r == 0);
 
   int poolbase = get_max_osd() ? get_max_osd() : 1;
 
   const int default_replicated_rule = crush->get_osd_pool_default_crush_replicated_ruleset(cct);
-  assert(default_replicated_rule >= 0);
+  ceph_assert(default_replicated_rule >= 0);
 
   if (default_pool) {
     // pgp_num <= pg_num
@@ -3565,15 +4053,21 @@ int OSDMap::build_simple_optioned(CephContext *cct, epoch_t e, uuid_d &fsid,
        pools[pool].set_flag(pg_pool_t::FLAG_NOPGCHANGE);
       if (cct->_conf->osd_pool_default_flag_nosizechange)
        pools[pool].set_flag(pg_pool_t::FLAG_NOSIZECHANGE);
-      pools[pool].size = cct->_conf->osd_pool_default_size;
-      pools[pool].min_size = cct->_conf->get_osd_pool_default_min_size();
+      pools[pool].size = cct->_conf.get_val<uint64_t>("osd_pool_default_size");
+      pools[pool].min_size = cct->_conf.get_osd_pool_default_min_size(
+                                 pools[pool].size);
       pools[pool].crush_rule = default_replicated_rule;
       pools[pool].object_hash = CEPH_STR_HASH_RJENKINS;
       pools[pool].set_pg_num(poolbase << pg_bits);
       pools[pool].set_pgp_num(poolbase << pgp_bits);
+      pools[pool].set_pg_num_target(poolbase << pg_bits);
+      pools[pool].set_pgp_num_target(poolbase << pgp_bits);
       pools[pool].last_change = epoch;
       pools[pool].application_metadata.insert(
         {pg_pool_t::APPLICATION_NAME_RBD, {}});
+      auto m = pg_pool_t::get_pg_autoscale_mode_by_name(
+        cct->_conf.get_val<string>("osd_pool_default_pg_autoscale_mode"));
+      pools[pool].pg_autoscale_mode = m >= 0 ? m : 0;
       pool_name[pool] = plname;
       name_pool[plname] = pool;
     }
@@ -3598,7 +4092,7 @@ int OSDMap::get_erasure_code_profile_default(CephContext *cct,
                                             map<string,string> &profile_map,
                                             ostream *ss)
 {
-  int r = get_json_str_map(cct->_conf->osd_pool_default_erasure_code_profile,
+  int r = get_json_str_map(cct->_conf.get_val<string>("osd_pool_default_erasure_code_profile"),
                      *ss,
                      &profile_map);
   return r;
@@ -3615,9 +4109,10 @@ int OSDMap::_build_crush_types(CrushWrapper& crush)
   crush.set_type_name(6, "pod");
   crush.set_type_name(7, "room");
   crush.set_type_name(8, "datacenter");
-  crush.set_type_name(9, "region");
-  crush.set_type_name(10, "root");
-  return 10;
+  crush.set_type_name(9, "zone");
+  crush.set_type_name(10, "region");
+  crush.set_type_name(11, "root");
+  return 11;
 }
 
 int OSDMap::build_simple_crush_map(CephContext *cct, CrushWrapper& crush,
@@ -3630,7 +4125,7 @@ int OSDMap::build_simple_crush_map(CephContext *cct, CrushWrapper& crush,
   int rootid;
   int r = crush.add_bucket(0, 0, CRUSH_HASH_DEFAULT,
                           root_type, 0, NULL, NULL, &rootid);
-  assert(r == 0);
+  ceph_assert(r == 0);
   crush.set_item_name(rootid, "default");
 
   for (int o=0; o<nosd; o++) {
@@ -3655,7 +4150,7 @@ int OSDMap::build_simple_crush_map_from_conf(CephContext *cct,
                                             CrushWrapper& crush,
                                             ostream *ss)
 {
-  const md_config_t *conf = cct->_conf;
+  const auto& conf = cct->_conf;
 
   crush.create();
 
@@ -3665,12 +4160,12 @@ int OSDMap::build_simple_crush_map_from_conf(CephContext *cct,
   int r = crush.add_bucket(0, 0,
                           CRUSH_HASH_DEFAULT,
                           root_type, 0, NULL, NULL, &rootid);
-  assert(r == 0);
+  ceph_assert(r == 0);
   crush.set_item_name(rootid, "default");
 
   // add osds
   vector<string> sections;
-  conf->get_all_sections(sections);
+  conf.get_all_sections(sections);
 
   for (auto &section : sections) {
     if (section.find("osd.") != 0)
@@ -3686,12 +4181,12 @@ int OSDMap::build_simple_crush_map_from_conf(CephContext *cct,
     vector<string> sectiontmp;
     sectiontmp.push_back("osd");
     sectiontmp.push_back(section);
-    conf->get_val_from_conf_file(sectiontmp, "host", host, false);
-    conf->get_val_from_conf_file(sectiontmp, "rack", rack, false);
-    conf->get_val_from_conf_file(sectiontmp, "row", row, false);
-    conf->get_val_from_conf_file(sectiontmp, "room", room, false);
-    conf->get_val_from_conf_file(sectiontmp, "datacenter", dc, false);
-    conf->get_val_from_conf_file(sectiontmp, "root", pool, false);
+    conf.get_val_from_conf_file(sectiontmp, "host", host, false);
+    conf.get_val_from_conf_file(sectiontmp, "rack", rack, false);
+    conf.get_val_from_conf_file(sectiontmp, "row", row, false);
+    conf.get_val_from_conf_file(sectiontmp, "room", room, false);
+    conf.get_val_from_conf_file(sectiontmp, "datacenter", dc, false);
+    conf.get_val_from_conf_file(sectiontmp, "root", pool, false);
 
     if (host.length() == 0)
       host = "unknownhost";
@@ -3766,7 +4261,7 @@ int OSDMap::summarize_mapping_stats(
     vector<int> up, up2;
     int up_primary;
     for (unsigned ps = 0; ps < pi->get_pg_num(); ++ps) {
-      pg_t pgid(ps, pool_id, -1);
+      pg_t pgid(ps, pool_id);
       total_pg += pi->get_size();
       pg_to_up_acting_osds(pgid, &up, &up_primary, nullptr, nullptr);
       for (int osd : up) {
@@ -3792,7 +4287,7 @@ int OSDMap::summarize_mapping_stats(
            }
          }
        } else {
-         assert(0 == "unhandled pool type");
+         ceph_abort_msg("unhandled pool type");
        }
       }
     }
@@ -3902,49 +4397,6 @@ int OSDMap::summarize_mapping_stats(
   return 0;
 }
 
-
-int OSDMap::clean_pg_upmaps(
-  CephContext *cct,
-  Incremental *pending_inc)
-{
-  ldout(cct, 10) << __func__ << dendl;
-  int changed = 0;
-  for (auto& p : pg_upmap) {
-    vector<int> raw;
-    int primary;
-    pg_to_raw_osds(p.first, &raw, &primary);
-    if (vectors_equal(raw, p.second)) {
-      ldout(cct, 10) << " removing redundant pg_upmap " << p.first << " "
-                    << p.second << dendl;
-      pending_inc->old_pg_upmap.insert(p.first);
-      ++changed;
-    }
-  }
-  for (auto& p : pg_upmap_items) {
-    vector<int> raw;
-    int primary;
-    pg_to_raw_osds(p.first, &raw, &primary);
-    mempool::osdmap::vector<pair<int,int>> newmap;
-    for (auto& q : p.second) {
-      if (std::find(raw.begin(), raw.end(), q.first) != raw.end()) {
-       newmap.push_back(q);
-      }
-    }
-    if (newmap.empty()) {
-      ldout(cct, 10) << " removing no-op pg_upmap_items " << p.first << " "
-                    << p.second << dendl;
-      pending_inc->old_pg_upmap_items.insert(p.first);
-      ++changed;
-    } else if (newmap != p.second) {
-      ldout(cct, 10) << " simplifying partially no-op pg_upmap_items "
-                    << p.first << " " << p.second << " -> " << newmap << dendl;
-      pending_inc->new_pg_upmap_items[p.first] = newmap;
-      ++changed;
-    }
-  }
-  return changed;
-}
-
 bool OSDMap::try_pg_upmap(
   CephContext *cct,
   pg_t pg,                       ///< pg to potentially remap
@@ -3961,9 +4413,6 @@ bool OSDMap::try_pg_upmap(
   if (rule < 0)
     return false;
 
-  // get original mapping
-  _pg_to_raw_osds(*pool, pg, orig, NULL);
-
   // make sure there is something there to remap
   bool any = false;
   for (auto osd : *orig) {
@@ -3994,202 +4443,476 @@ int OSDMap::calc_pg_upmaps(
   CephContext *cct,
   float max_deviation_ratio,
   int max,
-  const set<int64_t>& only_pools_orig,
+  const set<int64_t>& only_pools,
   OSDMap::Incremental *pending_inc)
 {
-  set<int64_t> only_pools;
-  if (only_pools_orig.empty()) {
-    for (auto& i : pools) {
-      only_pools.insert(i.first);
-    }
-  } else {
-    only_pools = only_pools_orig;
-  }
+  ldout(cct, 10) << __func__ << " pools " << only_pools << dendl;
   OSDMap tmp;
   tmp.deepish_copy_from(*this);
-  float start_deviation = 0;
-  float end_deviation = 0;
   int num_changed = 0;
-  while (true) {
-    map<int,set<pg_t>> pgs_by_osd;
-    int total_pgs = 0;
-    float osd_weight_total = 0;
-    map<int,float> osd_weight;
-    for (auto& i : pools) {
-      if (!only_pools.empty() && !only_pools.count(i.first))
-       continue;
-      for (unsigned ps = 0; ps < i.second.get_pg_num(); ++ps) {
-       pg_t pg(ps, i.first);
-       vector<int> up;
-       tmp.pg_to_up_acting_osds(pg, &up, nullptr, nullptr, nullptr);
-       for (auto osd : up) {
-         if (osd != CRUSH_ITEM_NONE)
-           pgs_by_osd[osd].insert(pg);
-       }
+  map<int,set<pg_t>> pgs_by_osd;
+  int total_pgs = 0;
+  float osd_weight_total = 0;
+  map<int,float> osd_weight;
+  for (auto& i : pools) {
+    if (!only_pools.empty() && !only_pools.count(i.first))
+      continue;
+    for (unsigned ps = 0; ps < i.second.get_pg_num(); ++ps) {
+      pg_t pg(ps, i.first);
+      vector<int> up;
+      tmp.pg_to_up_acting_osds(pg, &up, nullptr, nullptr, nullptr);
+      ldout(cct, 20) << __func__ << " " << pg << " up " << up << dendl;
+      for (auto osd : up) {
+        if (osd != CRUSH_ITEM_NONE)
+         pgs_by_osd[osd].insert(pg);
       }
-      total_pgs += i.second.get_size() * i.second.get_pg_num();
-
-      map<int,float> pmap;
-      int ruleno = tmp.crush->find_rule(i.second.get_crush_rule(),
-                                       i.second.get_type(),
-                                       i.second.get_size());
-      tmp.crush->get_rule_weight_osd_map(ruleno, &pmap);
-      ldout(cct,30) << __func__ << " pool " << i.first << " ruleno " << ruleno << dendl;
-      for (auto p : pmap) {
-       auto adjusted_weight = tmp.get_weightf(p.first) * p.second;
-        if (adjusted_weight == 0) {
-          continue;
-        }
-       osd_weight[p.first] += adjusted_weight;
-       osd_weight_total += adjusted_weight;
+    }
+    total_pgs += i.second.get_size() * i.second.get_pg_num();
+
+    map<int,float> pmap;
+    int ruleno = tmp.crush->find_rule(i.second.get_crush_rule(),
+                                     i.second.get_type(),
+                                     i.second.get_size());
+    tmp.crush->get_rule_weight_osd_map(ruleno, &pmap);
+    ldout(cct,20) << __func__ << " pool " << i.first
+                  << " ruleno " << ruleno
+                  << " weight-map " << pmap
+                  << dendl;
+    for (auto p : pmap) {
+      auto adjusted_weight = tmp.get_weightf(p.first) * p.second;
+      if (adjusted_weight == 0) {
+        continue;
       }
+      osd_weight[p.first] += adjusted_weight;
+      osd_weight_total += adjusted_weight;
     }
-    for (auto& i : osd_weight) {
-      int pgs = 0;
-      auto p = pgs_by_osd.find(i.first);
-      if (p != pgs_by_osd.end())
+  }
+  for (auto& i : osd_weight) {
+    int pgs = 0;
+    auto p = pgs_by_osd.find(i.first);
+    if (p != pgs_by_osd.end())
        pgs = p->second.size();
-      else
+    else
        pgs_by_osd.emplace(i.first, set<pg_t>());
-      ldout(cct, 20) << " osd." << i.first << " weight " << i.second
+    ldout(cct, 20) << " osd." << i.first << " weight " << i.second
                     << " pgs " << pgs << dendl;
-    }
+  }
+  if (osd_weight_total == 0) {
+    lderr(cct) << __func__ << " abort due to osd_weight_total == 0" << dendl;
+    return 0;
+  }
+  float pgs_per_weight = total_pgs / osd_weight_total;
+  ldout(cct, 10) << " osd_weight_total " << osd_weight_total << dendl;
+  ldout(cct, 10) << " pgs_per_weight " << pgs_per_weight << dendl;
 
-    if (osd_weight_total == 0) {
-      lderr(cct) << __func__ << " abort due to osd_weight_total == 0" << dendl;
+  if (max <= 0) {
+    lderr(cct) << __func__ << " abort due to max <= 0" << dendl;
+    return 0;
+  }
+  float decay_factor = 1.0 / float(max);
+  float stddev = 0;
+  map<int,float> osd_deviation;       // osd, deviation(pgs)
+  multimap<float,int> deviation_osd;  // deviation(pgs), osd
+  for (auto& i : pgs_by_osd) {
+    // make sure osd is still there (belongs to this crush-tree)
+    ceph_assert(osd_weight.count(i.first));
+    float target = osd_weight[i.first] * pgs_per_weight;
+    float deviation = (float)i.second.size() - target;
+    ldout(cct, 20) << " osd." << i.first
+                   << "\tpgs " << i.second.size()
+                   << "\ttarget " << target
+                   << "\tdeviation " << deviation
+                   << dendl;
+    osd_deviation[i.first] = deviation;
+    deviation_osd.insert(make_pair(deviation, i.first));
+    stddev += deviation * deviation;
+  }
+  if (stddev <= cct->_conf.get_val<double>("osd_calc_pg_upmaps_max_stddev")) {
+    ldout(cct, 10) << __func__ << " distribution is almost perfect"
+                   << dendl;
+    return 0;
+  }
+  bool skip_overfull = false;
+  auto aggressive =
+    cct->_conf.get_val<bool>("osd_calc_pg_upmaps_aggressively");
+  auto local_fallback_retries =
+    cct->_conf.get_val<uint64_t>("osd_calc_pg_upmaps_local_fallback_retries");
+  while (max--) {
+    // build overfull and underfull
+    set<int> overfull;
+    vector<int> underfull;
+    float decay = 0;
+    int decay_count = 0;
+    while (overfull.empty()) {
+      for (auto i = deviation_osd.rbegin(); i != deviation_osd.rend(); i++) {
+        if (i->first >= (1.0 - decay))
+          overfull.insert(i->second);
+      }
+      if (!overfull.empty())
+        break;
+      decay_count++;
+      decay = decay_factor * decay_count;
+      if (decay >= 1.0)
+        break;
+      ldout(cct, 30) << " decay_factor = " << decay_factor
+                     << " decay_count = " << decay_count
+                     << " decay (overfull) = " << decay
+                     << dendl;
+    }
+    if (overfull.empty()) {
+      lderr(cct) << __func__ << " failed to build overfull" << dendl;
       break;
     }
-    float pgs_per_weight = total_pgs / osd_weight_total;
-    ldout(cct, 10) << " osd_weight_total " << osd_weight_total << dendl;
-    ldout(cct, 10) << " pgs_per_weight " << pgs_per_weight << dendl;
 
-    // osd deviation
-    float total_deviation = 0;
-    map<int,float> osd_deviation;       // osd, deviation(pgs)
-    multimap<float,int> deviation_osd;  // deviation(pgs), osd
-    set<int> overfull;
-    for (auto& i : pgs_by_osd) {
-      float target = osd_weight[i.first] * pgs_per_weight;
-      float deviation = (float)i.second.size() - target;
-      ldout(cct, 20) << " osd." << i.first
-                    << "\tpgs " << i.second.size()
-                    << "\ttarget " << target
-                    << "\tdeviation " << deviation
-                    << dendl;
-      osd_deviation[i.first] = deviation;
-      deviation_osd.insert(make_pair(deviation, i.first));
-      if (deviation >= 1.0)
-       overfull.insert(i.first);
-      total_deviation += abs(deviation);
-    }
-    if (num_changed == 0) {
-      start_deviation = total_deviation;
-    }
-    end_deviation = total_deviation;
-
-    // build underfull, sorted from least-full to most-average
-    vector<int> underfull;
-    for (auto i = deviation_osd.begin();
-        i != deviation_osd.end();
-        ++i) {
-      if (i->first >= -.999)
-       break;
-      underfull.push_back(i->second);
+    decay = 0;
+    decay_count = 0;
+    while (underfull.empty()) {
+      for (auto i = deviation_osd.begin(); i != deviation_osd.end(); i++) {
+        if (i->first >= (-.999 + decay))
+          break;
+        underfull.push_back(i->second);
+      }
+      if (!underfull.empty())
+        break;
+      decay_count++;
+      decay = decay_factor * decay_count;
+      if (decay >= .999)
+        break;
+      ldout(cct, 30) << " decay_factor = " << decay_factor
+                     << " decay_count = " << decay_count
+                     << " decay (underfull) = " << decay
+                     << dendl;
     }
-    ldout(cct, 10) << " total_deviation " << total_deviation
-                  << " overfull " << overfull
-                  << " underfull " << underfull << dendl;
-    if (overfull.empty() || underfull.empty())
+    if (underfull.empty()) {
+      lderr(cct) << __func__ << " failed to build underfull" << dendl;
       break;
+    }
+
+    ldout(cct, 10) << " overfull " << overfull
+                   << " underfull " << underfull
+                   << dendl;
+    set<pg_t> to_skip;
+    uint64_t local_fallback_retried = 0;
 
-    // pick fullest
-    bool restart = false;
+  retry:
+
+    set<pg_t> to_unmap;
+    map<pg_t, mempool::osdmap::vector<pair<int32_t,int32_t>>> to_upmap;
+    auto temp_pgs_by_osd = pgs_by_osd;
+    // always start with fullest, break if we find any changes to make
     for (auto p = deviation_osd.rbegin(); p != deviation_osd.rend(); ++p) {
+      if (skip_overfull) {
+        ldout(cct, 10) << " skipping overfull " << dendl;
+        break; // fall through to check underfull
+      }
       int osd = p->second;
       float deviation = p->first;
-      // make sure osd is still there (belongs to this crush-tree)
-      assert(osd_weight.count(osd));
       float target = osd_weight[osd] * pgs_per_weight;
-      assert(target > 0);
-      if (deviation/target < max_deviation_ratio) {
+      ceph_assert(target > 0);
+      float deviation_ratio = deviation / target;
+      if (deviation_ratio < max_deviation_ratio) {
        ldout(cct, 10) << " osd." << osd
-                      << " target " << target
-                      << " deviation " << deviation
-                      << " -> ratio " << deviation/target
-                      << " < max ratio " << max_deviation_ratio << dendl;
+                       << " target " << target
+                       << " deviation " << deviation
+                       << " -> ratio " << deviation_ratio
+                       << " < max ratio " << max_deviation_ratio
+                       << dendl;
        break;
       }
-      int num_to_move = deviation;
-      ldout(cct, 10) << " osd." << osd << " move " << num_to_move << dendl;
-      if (num_to_move < 1)
-       break;
-
-      set<pg_t>& pgs = pgs_by_osd[osd];
 
+      vector<pg_t> pgs;
+      pgs.reserve(pgs_by_osd[osd].size());
+      for (auto& pg : pgs_by_osd[osd]) {
+        if (to_skip.count(pg))
+          continue;
+        pgs.push_back(pg);
+      }
+      if (aggressive) {
+        // shuffle PG list so they all get equal (in)attention
+        std::random_device rd;
+        std::default_random_engine rng{rd()};
+        std::shuffle(pgs.begin(), pgs.end(), rng);
+      }
       // look for remaps we can un-remap
       for (auto pg : pgs) {
        auto p = tmp.pg_upmap_items.find(pg);
-       if (p != tmp.pg_upmap_items.end()) {
-         for (auto q : p->second) {
-           if (q.second == osd) {
-             ldout(cct, 10) << "  dropping pg_upmap_items " << pg
-                            << " " << p->second << dendl;
-             tmp.pg_upmap_items.erase(p);
-             pending_inc->old_pg_upmap_items.insert(pg);
-             ++num_changed;
-             restart = true;
-           }
-         }
-       }
-       if (restart)
-         break;
-      } // pg loop
-      if (restart)
-       break;
+        if (p == tmp.pg_upmap_items.end())
+          continue;
+        mempool::osdmap::vector<pair<int32_t,int32_t>> new_upmap_items;
+        for (auto q : p->second) {
+         if (q.second == osd) {
+            ldout(cct, 10) << " will try dropping existing"
+                           << " remapping pair "
+                           << q.first << " -> " << q.second
+                           << " which remapped " << pg
+                           << " into overfull osd." << osd
+                           << dendl;
+            temp_pgs_by_osd[q.second].erase(pg);
+            temp_pgs_by_osd[q.first].insert(pg);
+          } else {
+            new_upmap_items.push_back(q);
+          }
+        }
+        if (new_upmap_items.empty()) {
+          // drop whole item
+          ldout(cct, 10) << " existing pg_upmap_items " << p->second
+                         << " remapped " << pg << " into overfull osd." << osd
+                         << ", will try cancelling it entirely"
+                         << dendl;
+          to_unmap.insert(pg);
+          goto test_change;
+        } else if (new_upmap_items.size() != p->second.size()) {
+          // drop single remapping pair, updating
+          ceph_assert(new_upmap_items.size() < p->second.size());
+          ldout(cct, 10) << " existing pg_upmap_items " << p->second
+                         << " remapped " << pg << " into overfull osd." << osd
+                         << ", new_pg_upmap_items now " << new_upmap_items
+                         << dendl;
+          to_upmap[pg] = new_upmap_items;
+          goto test_change;
+        }
+      }
 
+      // try upmap
       for (auto pg : pgs) {
-       if (tmp.pg_upmap.count(pg) ||
-           tmp.pg_upmap_items.count(pg)) {
-         ldout(cct, 20) << "  already remapped " << pg << dendl;
+        auto temp_it = tmp.pg_upmap.find(pg);
+        if (temp_it != tmp.pg_upmap.end()) {
+          // leave pg_upmap alone
+          // it must be specified by admin since balancer does not
+          // support pg_upmap yet
+         ldout(cct, 10) << " " << pg << " already has pg_upmap "
+                         << temp_it->second << ", skipping"
+                         << dendl;
          continue;
        }
-       ldout(cct, 10) << "  trying " << pg << dendl;
-       vector<int> orig, out;
+        auto pg_pool_size = tmp.get_pg_pool_size(pg);
+        mempool::osdmap::vector<pair<int32_t,int32_t>> new_upmap_items;
+        set<int> existing;
+        auto it = tmp.pg_upmap_items.find(pg);
+        if (it != tmp.pg_upmap_items.end() &&
+            it->second.size() >= (size_t)pg_pool_size) {
+          ldout(cct, 10) << " " << pg << " already has full-size pg_upmap_items "
+                         << it->second << ", skipping"
+                         << dendl;
+          continue;
+        } else if (it != tmp.pg_upmap_items.end()) {
+          ldout(cct, 10) << " " << pg << " already has pg_upmap_items "
+                         << it->second
+                         << dendl;
+          new_upmap_items = it->second;
+          // build existing too (for dedup)
+          for (auto i : it->second) {
+            existing.insert(i.first);
+            existing.insert(i.second);
+          }
+          // fall through
+          // to see if we can append more remapping pairs
+        }
+       ldout(cct, 10) << " trying " << pg << dendl;
+        vector<int> raw, orig, out;
+        tmp.pg_to_raw_upmap(pg, &raw, &orig); // including existing upmaps too
        if (!try_pg_upmap(cct, pg, overfull, underfull, &orig, &out)) {
          continue;
        }
-       ldout(cct, 10) << "  " << pg << " " << orig << " -> " << out << dendl;
+       ldout(cct, 10) << " " << pg << " " << orig << " -> " << out << dendl;
        if (orig.size() != out.size()) {
          continue;
        }
-       assert(orig != out);
-       auto& rmi = tmp.pg_upmap_items[pg];
+       ceph_assert(orig != out);
        for (unsigned i = 0; i < out.size(); ++i) {
-         if (orig[i] != out[i]) {
-           rmi.push_back(make_pair(orig[i], out[i]));
-         }
+          if (orig[i] == out[i])
+            continue; // skip invalid remappings
+          if (existing.count(orig[i]) || existing.count(out[i]))
+            continue; // we want new remappings only!
+          ldout(cct, 10) << " will try adding new remapping pair "
+                         << orig[i] << " -> " << out[i] << " for " << pg
+                         << dendl;
+          existing.insert(orig[i]);
+          existing.insert(out[i]);
+          temp_pgs_by_osd[orig[i]].erase(pg);
+          temp_pgs_by_osd[out[i]].insert(pg);
+          ceph_assert(new_upmap_items.size() < (size_t)pg_pool_size);
+          new_upmap_items.push_back(make_pair(orig[i], out[i]));
+          // append new remapping pairs slowly
+          // This way we can make sure that each tiny change will
+          // definitely make distribution of PGs converging to
+          // the perfect status.
+          to_upmap[pg] = new_upmap_items;
+          goto test_change;
        }
-       pending_inc->new_pg_upmap_items[pg] = rmi;
-       ldout(cct, 10) << "  " << pg << " pg_upmap_items " << rmi << dendl;
-       restart = true;
-       ++num_changed;
-       break;
-      } // pg loop
-      if (restart)
-       break;
-    } // osd loop
+      }
+    }
 
-    if (!restart) {
-      ldout(cct, 10) << " failed to find any changes to make" << dendl;
-      break;
+    ceph_assert(!(to_unmap.size() || to_upmap.size()));
+    ldout(cct, 10) << " failed to find any changes for overfull osds"
+                   << dendl;
+    for (auto& p : deviation_osd) {
+      if (std::find(underfull.begin(), underfull.end(), p.second) ==
+                    underfull.end())
+        break;
+      int osd = p.second;
+      float deviation = p.first;
+      float target = osd_weight[osd] * pgs_per_weight;
+      ceph_assert(target > 0);
+      float deviation_ratio = abs(deviation / target);
+      if (deviation_ratio < max_deviation_ratio) {
+        // respect max_deviation_ratio too
+        ldout(cct, 10) << " osd." << osd
+                       << " target " << target
+                       << " deviation " << deviation
+                       << " -> absolute ratio " << deviation_ratio
+                       << " < max ratio " << max_deviation_ratio
+                       << dendl;
+        break;
+      }
+      // look for remaps we can un-remap
+      vector<pair<pg_t,
+        mempool::osdmap::vector<pair<int32_t,int32_t>>>> candidates;
+      candidates.reserve(tmp.pg_upmap_items.size());
+      for (auto& i : tmp.pg_upmap_items) {
+        if (to_skip.count(i.first))
+          continue;
+        if (!only_pools.empty() && !only_pools.count(i.first.pool()))
+          continue;
+        candidates.push_back(make_pair(i.first, i.second));
+      }
+      if (aggressive) {
+        // shuffle candidates so they all get equal (in)attention
+        std::random_device rd;
+        std::default_random_engine rng{rd()};
+        std::shuffle(candidates.begin(), candidates.end(), rng);
+      }
+      for (auto& i : candidates) {
+        auto pg = i.first;
+        mempool::osdmap::vector<pair<int32_t,int32_t>> new_upmap_items;
+        for (auto& j : i.second) {
+          if (j.first == osd) {
+            ldout(cct, 10) << " will try dropping existing"
+                           << " remapping pair "
+                           << j.first << " -> " << j.second
+                           << " which remapped " << pg
+                           << " out from underfull osd." << osd
+                           << dendl;
+            temp_pgs_by_osd[j.second].erase(pg);
+            temp_pgs_by_osd[j.first].insert(pg);
+          } else {
+            new_upmap_items.push_back(j);
+          }
+        }
+        if (new_upmap_items.empty()) {
+          // drop whole item
+          ldout(cct, 10) << " existing pg_upmap_items " << i.second
+                         << " remapped " << pg
+                         << " out from underfull osd." << osd
+                         << ", will try cancelling it entirely"
+                         << dendl;
+          to_unmap.insert(pg);
+          goto test_change;
+        } else if (new_upmap_items.size() != i.second.size()) {
+          // drop single remapping pair, updating
+          ceph_assert(new_upmap_items.size() < i.second.size());
+          ldout(cct, 10) << " existing pg_upmap_items " << i.second
+                         << " remapped " << pg
+                         << " out from underfull osd." << osd
+                         << ", new_pg_upmap_items now " << new_upmap_items
+                         << dendl;
+          to_upmap[pg] = new_upmap_items;
+          goto test_change;
+        }
+      }
     }
-    if (--max == 0) {
-      ldout(cct, 10) << " hit max iterations, stopping" << dendl;
+
+    ceph_assert(!(to_unmap.size() || to_upmap.size()));
+    ldout(cct, 10) << " failed to find any changes for underfull osds"
+                   << dendl;
+    if (!aggressive) {
+      ldout(cct, 10) << " break due to aggressive mode not enabled" << dendl;
       break;
+    } else if (!skip_overfull) {
+      // safe to quit because below here we know
+      // we've done checking both overfull and underfull osds..
+      ldout(cct, 10) << " break due to not being able to find any"
+                     << " further optimizations"
+                     << dendl;
+      break;
+    }
+    // restart with fullest and do exhaustive searching
+    skip_overfull = false;
+    continue;
+
+  test_change:
+
+    // test change, apply if change is good
+    ceph_assert(to_unmap.size() || to_upmap.size());
+    float new_stddev = 0;
+    map<int,float> temp_osd_deviation;
+    multimap<float,int> temp_deviation_osd;
+    for (auto& i : temp_pgs_by_osd) {
+      // make sure osd is still there (belongs to this crush-tree)
+      ceph_assert(osd_weight.count(i.first));
+      float target = osd_weight[i.first] * pgs_per_weight;
+      float deviation = (float)i.second.size() - target;
+      ldout(cct, 20) << " osd." << i.first
+                     << "\tpgs " << i.second.size()
+                     << "\ttarget " << target
+                     << "\tdeviation " << deviation
+                     << dendl;
+      temp_osd_deviation[i.first] = deviation;
+      temp_deviation_osd.insert(make_pair(deviation, i.first));
+      new_stddev += deviation * deviation;
+    }
+    ldout(cct, 10) << " stddev " << stddev << " -> " << new_stddev << dendl;
+    if (new_stddev >= stddev) {
+      if (!aggressive) {
+        ldout(cct, 10) << " break because stddev is not decreasing"
+                       << " and aggressive mode is not enabled"
+                       << dendl;
+        break;
+      }
+      local_fallback_retried++;
+      if (local_fallback_retried >= local_fallback_retries) {
+        // does not make progress
+        // flip *skip_overfull* so both overfull and underfull
+        // get equal (in)attention
+        skip_overfull = !skip_overfull;
+        ldout(cct, 10) << " hit local_fallback_retries "
+                       << local_fallback_retries
+                       << dendl;
+        continue;
+      }
+      for (auto& i : to_unmap)
+        to_skip.insert(i);
+      for (auto& i : to_upmap)
+        to_skip.insert(i.first);
+      ldout(cct, 20) << " local_fallback_retried " << local_fallback_retried
+                     << " to_skip " << to_skip
+                     << dendl;
+      goto retry;
+    }
+
+    // ready to go
+    ceph_assert(new_stddev < stddev);
+    stddev = new_stddev;
+    pgs_by_osd = temp_pgs_by_osd;
+    osd_deviation = temp_osd_deviation;
+    deviation_osd = temp_deviation_osd;
+    for (auto& i : to_unmap) {
+      ldout(cct, 10) << " unmap pg " << i << dendl;
+      ceph_assert(tmp.pg_upmap_items.count(i));
+      tmp.pg_upmap_items.erase(i);
+      pending_inc->old_pg_upmap_items.insert(i);
+      ++num_changed;
+    }
+    for (auto& i : to_upmap) {
+      ldout(cct, 10) << " upmap pg " << i.first
+                     << " new pg_upmap_items " << i.second
+                     << dendl;
+      tmp.pg_upmap_items[i.first] = i.second;
+      pending_inc->new_pg_upmap_items[i.first] = i.second;
+      ++num_changed;
     }
   }
-  ldout(cct, 10) << " start deviation " << start_deviation << dendl;
-  ldout(cct, 10) << " end deviation " << end_deviation << dendl;
+  ldout(cct, 10) << " num_changed = " << num_changed << dendl;
   return num_changed;
 }
 
@@ -4203,13 +4926,13 @@ void OSDMap::get_pool_ids_by_osd(CephContext *cct,
                                 int osd,
                                 set<int64_t> *pool_ids) const
 {
-  assert(pool_ids);
+  ceph_assert(pool_ids);
   set<int> raw_rules;
   int r = crush->get_rules_by_osd(osd, &raw_rules);
   if (r < 0) {
     lderr(cct) << __func__ << " get_rules_by_osd failed: " << cpp_strerror(r)
                << dendl;
-    assert(r >= 0);
+    ceph_assert(r >= 0);
   }
   set<int> rules;
   for (auto &i: raw_rules) {
@@ -4229,19 +4952,51 @@ public:
   typedef CrushTreeDumper::Dumper<F> Parent;
 
   OSDUtilizationDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
-                      const PGStatService *pgs_, bool tree_) :
+                       const PGMap& pgmap_, bool tree_,
+                       const string& class_name_,
+                       const string& item_name_) :
     Parent(crush, osdmap_->get_pool_names()),
     osdmap(osdmap_),
-    pgs(pgs_),
+    pgmap(pgmap_),
     tree(tree_),
-    average_util(average_utilization()),
+    class_name(class_name_),
+    item_name(item_name_),
     min_var(-1),
     max_var(-1),
     stddev(0),
     sum(0) {
+    if (osdmap->crush->name_exists(item_name)) {
+      // filter out items we are allowed to dump
+      auto item_id = osdmap->crush->get_item_id(item_name);
+      allowed.insert(item_id);
+      osdmap->crush->get_all_children(item_id, &allowed);
+    }
+    average_util = average_utilization();
   }
 
 protected:
+
+  bool should_dump(int id) const {
+    if (!allowed.empty() && !allowed.count(id)) // filter by name
+      return false;
+    if (id >= 0 && !class_name.empty()) {
+      const char* item_class_name = osdmap->crush->get_item_class(id);
+      if (!item_class_name || // not bound to a class yet
+           item_class_name != class_name) // or already bound to
+                                          // a different class
+        return false;
+    }
+    return true;
+  }
+
+  set<int> get_dumped_osds() {
+    if (class_name.empty() && item_name.empty()) {
+      // old way, all
+      return {};
+    }
+    return dumped_osds;
+  }
+
   void dump_stray(F *f) {
     for (int i = 0; i < osdmap->get_max_osd(); i++) {
       if (osdmap->exists(i) && !this->is_touched(i))
@@ -4252,11 +5007,17 @@ protected:
   void dump_item(const CrushTreeDumper::Item &qi, F *f) override {
     if (!tree && qi.is_bucket())
       return;
+    if (!should_dump(qi.id))
+      return;
 
+    if (!qi.is_bucket())
+      dumped_osds.insert(qi.id);
     float reweight = qi.is_bucket() ? -1 : osdmap->get_weightf(qi.id);
-    int64_t kb = 0, kb_used = 0, kb_avail = 0;
+    int64_t kb = 0, kb_used = 0, kb_used_data = 0, kb_used_omap = 0,
+      kb_used_meta = 0, kb_avail = 0;
     double util = 0;
-    if (get_bucket_utilization(qi.id, &kb, &kb_used, &kb_avail))
+    if (get_bucket_utilization(qi.id, &kb, &kb_used, &kb_used_data,
+                              &kb_used_omap, &kb_used_meta, &kb_avail))
       if (kb_used && kb)
         util = 100.0 * (double)kb_used / (double)kb;
 
@@ -4264,9 +5025,11 @@ protected:
     if (average_util)
       var = util / average_util;
 
-    size_t num_pgs = qi.is_bucket() ? 0 : pgs->get_num_pg_by_osd(qi.id);
+    size_t num_pgs = qi.is_bucket() ? 0 : pgmap.get_num_pg_by_osd(qi.id);
 
-    dump_item(qi, reweight, kb, kb_used, kb_avail, util, var, num_pgs, f);
+    dump_item(qi, reweight, kb, kb_used,
+             kb_used_data, kb_used_omap, kb_used_meta,
+             kb_avail, util, var, num_pgs, f);
 
     if (!qi.is_bucket() && reweight > 0) {
       if (min_var < 0 || var < min_var)
@@ -4285,6 +5048,9 @@ protected:
                         float &reweight,
                         int64_t kb,
                         int64_t kb_used,
+                        int64_t kb_used_data,
+                        int64_t kb_used_omap,
+                        int64_t kb_used_meta,
                         int64_t kb_avail,
                         double& util,
                         double& var,
@@ -4298,10 +5064,14 @@ protected:
   double average_utilization() {
     int64_t kb = 0, kb_used = 0;
     for (int i = 0; i < osdmap->get_max_osd(); i++) {
-      if (!osdmap->exists(i) || osdmap->get_weight(i) == 0)
+      if (!osdmap->exists(i) ||
+           osdmap->get_weight(i) == 0 ||
+          !should_dump(i))
        continue;
-      int64_t kb_i, kb_used_i, kb_avail_i;
-      if (get_osd_utilization(i, &kb_i, &kb_used_i, &kb_avail_i)) {
+      int64_t kb_i, kb_used_i, kb_used_data_i, kb_used_omap_i, kb_used_meta_i,
+       kb_avail_i;
+      if (get_osd_utilization(i, &kb_i, &kb_used_i, &kb_used_data_i,
+                             &kb_used_omap_i, &kb_used_meta_i, &kb_avail_i)) {
        kb += kb_i;
        kb_used += kb_used_i;
       }
@@ -4310,38 +5080,61 @@ protected:
   }
 
   bool get_osd_utilization(int id, int64_t* kb, int64_t* kb_used,
+                          int64_t* kb_used_data,
+                          int64_t* kb_used_omap,
+                          int64_t* kb_used_meta,
                           int64_t* kb_avail) const {
-    const osd_stat_t *p = pgs->get_osd_stat(id);
+    const osd_stat_t *p = pgmap.get_osd_stat(id);
     if (!p) return false;
-    *kb = p->kb;
-    *kb_used = p->kb_used;
-    *kb_avail = p->kb_avail;
+    *kb = p->statfs.kb();
+    *kb_used = p->statfs.kb_used_raw();
+    *kb_used_data = p->statfs.kb_used_data();
+    *kb_used_omap = p->statfs.kb_used_omap();
+    *kb_used_meta = p->statfs.kb_used_internal_metadata();
+    *kb_avail = p->statfs.kb_avail();
+    
     return *kb > 0;
   }
 
   bool get_bucket_utilization(int id, int64_t* kb, int64_t* kb_used,
+                             int64_t* kb_used_data,
+                             int64_t* kb_used_omap,
+                             int64_t* kb_used_meta,
                              int64_t* kb_avail) const {
     if (id >= 0) {
-      if (osdmap->is_out(id)) {
+      if (osdmap->is_out(id) || !should_dump(id)) {
         *kb = 0;
         *kb_used = 0;
+       *kb_used_data = 0;
+       *kb_used_omap = 0;
+       *kb_used_meta = 0;
         *kb_avail = 0;
         return true;
       }
-      return get_osd_utilization(id, kb, kb_used, kb_avail);
+      return get_osd_utilization(id, kb, kb_used, kb_used_data,
+                                kb_used_omap, kb_used_meta, kb_avail);
     }
 
     *kb = 0;
     *kb_used = 0;
+    *kb_used_data = 0;
+    *kb_used_omap = 0;
+    *kb_used_meta = 0;
     *kb_avail = 0;
 
     for (int k = osdmap->crush->get_bucket_size(id) - 1; k >= 0; k--) {
       int item = osdmap->crush->get_bucket_item(id, k);
-      int64_t kb_i = 0, kb_used_i = 0, kb_avail_i = 0;
-      if (!get_bucket_utilization(item, &kb_i, &kb_used_i, &kb_avail_i))
+      int64_t kb_i = 0, kb_used_i = 0, kb_used_data_i = 0,
+       kb_used_omap_i = 0, kb_used_meta_i = 0, kb_avail_i = 0;
+      if (!get_bucket_utilization(item, &kb_i, &kb_used_i,
+                                 &kb_used_data_i, &kb_used_omap_i,
+                                 &kb_used_meta_i, &kb_avail_i))
        return false;
       *kb += kb_i;
       *kb_used += kb_used_i;
+      *kb_used_data += kb_used_data_i;
+      *kb_used_omap += kb_used_omap_i;
+      *kb_used_meta += kb_used_meta_i;
       *kb_avail += kb_avail_i;
     }
     return *kb > 0;
@@ -4349,13 +5142,17 @@ protected:
 
 protected:
   const OSDMap *osdmap;
-  const PGStatService *pgs;
+  const PGMap& pgmap;
   bool tree;
+  const string class_name;
+  const string item_name;
   double average_util;
   double min_var;
   double max_var;
   double stddev;
   double sum;
+  set<int> allowed;
+  set<int> dumped_osds;
 };
 
 
@@ -4364,8 +5161,10 @@ public:
   typedef OSDUtilizationDumper<TextTable> Parent;
 
   OSDUtilizationPlainDumper(const CrushWrapper *crush, const OSDMap *osdmap,
-                    const PGStatService *pgs, bool tree) :
-    Parent(crush, osdmap, pgs, tree) {}
+                            const PGMap& pgmap, bool tree,
+                            const string& class_name,
+                            const string& item_name) :
+    Parent(crush, osdmap, pgmap, tree, class_name, item_name) {}
 
   void dump(TextTable *tbl) {
     tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
@@ -4373,11 +5172,15 @@ public:
     tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("SIZE", TextTable::LEFT, TextTable::RIGHT);
-    tbl->define_column("USE", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("RAW USE", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("DATA", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("OMAP", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("META", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("AVAIL", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("%USE", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("VAR", TextTable::LEFT, TextTable::RIGHT);
     tbl->define_column("PGS", TextTable::LEFT, TextTable::RIGHT);
+    tbl->define_column("STATUS", TextTable::LEFT, TextTable::RIGHT);
     if (tree)
       tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
 
@@ -4385,12 +5188,16 @@ public:
 
     dump_stray(tbl);
 
+    auto sum = pgmap.get_osd_sum(get_dumped_osds());
     *tbl << ""
         << ""
         << "" << "TOTAL"
-        << byte_u_t(pgs->get_osd_sum().kb << 10)
-        << byte_u_t(pgs->get_osd_sum().kb_used << 10)
-        << byte_u_t(pgs->get_osd_sum().kb_avail << 10)
+        << byte_u_t(sum.statfs.total)
+        << byte_u_t(sum.statfs.get_used_raw())
+        << byte_u_t(sum.statfs.allocated)
+        << byte_u_t(sum.statfs.omap_allocated)
+        << byte_u_t(sum.statfs.internal_metadata)
+        << byte_u_t(sum.statfs.available)
         << lowprecision_t(average_util)
         << ""
         << TextTable::endrow;
@@ -4408,6 +5215,9 @@ protected:
                         float &reweight,
                         int64_t kb,
                         int64_t kb_used,
+                        int64_t kb_used_data,
+                        int64_t kb_used_omap,
+                        int64_t kb_used_meta,
                         int64_t kb_avail,
                         double& util,
                         double& var,
@@ -4422,14 +5232,25 @@ protected:
         << weightf_t(reweight)
         << byte_u_t(kb << 10)
         << byte_u_t(kb_used << 10)
+        << byte_u_t(kb_used_data << 10)
+        << byte_u_t(kb_used_omap << 10)
+        << byte_u_t(kb_used_meta << 10)
         << byte_u_t(kb_avail << 10)
         << lowprecision_t(util)
         << lowprecision_t(var);
 
     if (qi.is_bucket()) {
       *tbl << "-";
+      *tbl << "";
     } else {
       *tbl << num_pgs;
+      if (osdmap->is_up(qi.id)) {
+        *tbl << "up";
+      } else if (osdmap->is_destroyed(qi.id)) {
+        *tbl << "destroyed";
+      } else {
+        *tbl << "down";
+      }
     }
 
     if (tree) {
@@ -4477,8 +5298,10 @@ public:
   typedef OSDUtilizationDumper<Formatter> Parent;
 
   OSDUtilizationFormatDumper(const CrushWrapper *crush, const OSDMap *osdmap,
-                            const PGStatService *pgs, bool tree) :
-    Parent(crush, osdmap, pgs, tree) {}
+                             const PGMap& pgmap, bool tree,
+                             const string& class_name,
+                             const string& item_name) :
+    Parent(crush, osdmap, pgmap, tree, class_name, item_name) {}
 
   void dump(Formatter *f) {
     f->open_array_section("nodes");
@@ -4493,23 +5316,38 @@ public:
 protected:
   using OSDUtilizationDumper<Formatter>::dump_item;
   void dump_item(const CrushTreeDumper::Item &qi,
-                        float &reweight,
-                        int64_t kb,
-                        int64_t kb_used,
-                        int64_t kb_avail,
-                        double& util,
-                        double& var,
-                        const size_t num_pgs,
-                        Formatter *f) override {
+                float &reweight,
+                int64_t kb,
+                int64_t kb_used,
+                int64_t kb_used_data,
+                int64_t kb_used_omap,
+                int64_t kb_used_meta,
+                int64_t kb_avail,
+                double& util,
+                double& var,
+                const size_t num_pgs,
+                Formatter *f) override {
     f->open_object_section("item");
     CrushTreeDumper::dump_item_fields(crush, weight_set_names, qi, f);
     f->dump_float("reweight", reweight);
     f->dump_int("kb", kb);
     f->dump_int("kb_used", kb_used);
+    f->dump_int("kb_used_data", kb_used_data);
+    f->dump_int("kb_used_omap", kb_used_omap);
+    f->dump_int("kb_used_meta", kb_used_meta);
     f->dump_int("kb_avail", kb_avail);
     f->dump_float("utilization", util);
     f->dump_float("var", var);
     f->dump_unsigned("pgs", num_pgs);
+    if (!qi.is_bucket()) {
+      if (osdmap->is_up(qi.id)) {
+        f->dump_string("status", "up");
+      } else if (osdmap->is_destroyed(qi.id)) {
+        f->dump_string("status", "destroyed");
+      } else {
+        f->dump_string("status", "down");
+      }
+    }
     CrushTreeDumper::dump_bucket_children(crush, qi, f);
     f->close_section();
   }
@@ -4517,9 +5355,15 @@ protected:
 public:
   void summary(Formatter *f) {
     f->open_object_section("summary");
-    f->dump_int("total_kb", pgs->get_osd_sum().kb);
-    f->dump_int("total_kb_used", pgs->get_osd_sum().kb_used);
-    f->dump_int("total_kb_avail", pgs->get_osd_sum().kb_avail);
+    auto sum = pgmap.get_osd_sum(get_dumped_osds());
+    auto& s = sum.statfs;
+
+    f->dump_int("total_kb", s.kb());
+    f->dump_int("total_kb_used", s.kb_used_raw());
+    f->dump_int("total_kb_used_data", s.kb_used_data());
+    f->dump_int("total_kb_used_omap", s.kb_used_omap());
+    f->dump_int("total_kb_used_meta", s.kb_used_internal_metadata());
+    f->dump_int("total_kb_avail", s.kb_avail());
     f->dump_float("average_utilization", average_util);
     f->dump_float("min_var", min_var);
     f->dump_float("max_var", max_var);
@@ -4529,21 +5373,25 @@ public:
 };
 
 void print_osd_utilization(const OSDMap& osdmap,
-                          const PGStatService *pgstat,
-                          ostream& out,
-                          Formatter *f,
-                          bool tree)
+                           const PGMap& pgmap,
+                           ostream& out,
+                           Formatter *f,
+                           bool tree,
+                           const string& class_name,
+                           const string& item_name)
 {
   const CrushWrapper *crush = osdmap.crush.get();
   if (f) {
     f->open_object_section("df");
-    OSDUtilizationFormatDumper d(crush, &osdmap, pgstat, tree);
+    OSDUtilizationFormatDumper d(crush, &osdmap, pgmap, tree,
+                                 class_name, item_name);
     d.dump(f);
     d.summary(f);
     f->close_section();
     f->flush(out);
   } else {
-    OSDUtilizationPlainDumper d(crush, &osdmap, pgstat, tree);
+    OSDUtilizationPlainDumper d(crush, &osdmap, pgmap, tree,
+                                class_name, item_name);
     TextTable tbl;
     d.dump(&tbl);
     out << tbl << d.summary() << "\n";
@@ -4627,7 +5475,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
       }
     }
     num_down_in_osds = down_in_osds.size();
-    assert(num_down_in_osds <= num_in_osds);
+    ceph_assert(num_down_in_osds <= num_in_osds);
     if (num_down_in_osds > 0) {
       // summary of down subtree types and osds
       for (int type = max_type; type > 0; type--) {
@@ -4699,7 +5547,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
   {
     // An osd could configure failsafe ratio, to something different
     // but for now assume it is the same here.
-    float fsr = g_conf->osd_failsafe_full_ratio;
+    float fsr = g_conf()->osd_failsafe_full_ratio;
     if (fsr > 1.0) fsr /= 100;
     float fr = get_full_ratio();
     float br = get_backfillfull_ratio();
@@ -4792,6 +5640,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
       CEPH_OSDMAP_NOSCRUB |
       CEPH_OSDMAP_NODEEP_SCRUB |
       CEPH_OSDMAP_NOTIERAGENT |
+      CEPH_OSDMAP_NOSNAPTRIM |
       CEPH_OSDMAP_NOREBALANCE;
     if (test_flag(warn_flags)) {
       ostringstream ss;
@@ -4818,28 +5667,50 @@ void OSDMap::check_health(health_check_map_t *checks) const
        detail.push_back(ss.str());
       }
     }
+    for (auto& i : crush_node_flags) {
+      if (i.second && crush->item_exists(i.first)) {
+       ostringstream ss;
+       set<string> states;
+       OSDMap::calc_state_set(i.second, states);
+       int t = i.first >= 0 ? 0 : crush->get_bucket_type(i.first);
+       const char *tn = crush->get_type_name(t);
+       ss << (tn ? tn : "node") << " "
+          << crush->get_item_name(i.first) << " has flags " << states;
+       detail.push_back(ss.str());
+      }
+    }
+    for (auto& i : device_class_flags) {
+      const char* class_name = crush->get_class_name(i.first);
+      if (i.second && class_name) {
+        ostringstream ss;
+        set<string> states;
+        OSDMap::calc_state_set(i.second, states);
+        ss << "device class '" << class_name << "' has flags " << states;
+        detail.push_back(ss.str());
+      }
+    }
     if (!detail.empty()) {
       ostringstream ss;
-      ss << detail.size() << " osd(s) have {NOUP,NODOWN,NOIN,NOOUT} flags set";
+      ss << detail.size() << " OSDs or CRUSH {nodes, device-classes} have {NOUP,NODOWN,NOIN,NOOUT} flags set";
       auto& d = checks->add("OSD_FLAGS", HEALTH_WARN, ss.str());
       d.detail.swap(detail);
     }
   }
 
   // OLD_CRUSH_TUNABLES
-  if (g_conf->mon_warn_on_legacy_crush_tunables) {
+  if (g_conf()->mon_warn_on_legacy_crush_tunables) {
     string min = crush->get_min_required_version();
-    if (min < g_conf->mon_crush_min_required_version) {
+    if (min < g_conf()->mon_crush_min_required_version) {
       ostringstream ss;
       ss << "crush map has legacy tunables (require " << min
-        << ", min is " << g_conf->mon_crush_min_required_version << ")";
+        << ", min is " << g_conf()->mon_crush_min_required_version << ")";
       auto& d = checks->add("OLD_CRUSH_TUNABLES", HEALTH_WARN, ss.str());
       d.detail.push_back("see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
     }
   }
 
   // OLD_CRUSH_STRAW_CALC_VERSION
-  if (g_conf->mon_warn_on_crush_straw_calc_version_zero) {
+  if (g_conf()->mon_warn_on_crush_straw_calc_version_zero) {
     if (crush->get_straw_calc_version() == 0) {
       ostringstream ss;
       ss << "crush map has straw_calc_version=0";
@@ -4850,7 +5721,7 @@ void OSDMap::check_health(health_check_map_t *checks) const
   }
 
   // CACHE_POOL_NO_HIT_SET
-  if (g_conf->mon_warn_on_cache_pools_without_hit_sets) {
+  if (g_conf()->mon_warn_on_cache_pools_without_hit_sets) {
     list<string> detail;
     for (map<int64_t, pg_pool_t>::const_iterator p = pools.begin();
         p != pools.end();
@@ -4874,11 +5745,9 @@ void OSDMap::check_health(health_check_map_t *checks) const
   }
 
   // OSD_NO_SORTBITWISE
-  if (!test_flag(CEPH_OSDMAP_SORTBITWISE) &&
-      (get_up_osd_features() &
-       CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT)) {
+  if (!test_flag(CEPH_OSDMAP_SORTBITWISE)) {
     ostringstream ss;
-    ss << "no legacy OSD present but 'sortbitwise' flag is not set";
+    ss << "'sortbitwise' flag is not set";
     checks->add("OSD_NO_SORTBITWISE", HEALTH_WARN, ss.str());
   }
 
@@ -4893,10 +5762,10 @@ void OSDMap::check_health(health_check_map_t *checks) const
       const string& pool_name = get_pool_name(it.first);
       if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
        stringstream ss;
-        if (pool.has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
+        if (pool.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) {
           // may run out of space too,
           // but we want EQUOTA taking precedence
-          ss << "pool '" << pool_name << "' is full (no quota)";
+          ss << "pool '" << pool_name << "' is full (running out of quota)";
         } else {
           ss << "pool '" << pool_name << "' is full (no space)";
         }
@@ -4951,3 +5820,108 @@ int OSDMap::parse_osd_id_list(const vector<string>& ls, set<int> *out,
   }
   return 0;
 }
+
+void OSDMap::get_random_up_osds_by_subtree(int n,     // whoami
+                                           string &subtree,
+                                           int limit, // how many
+                                           set<int> skip,
+                                           set<int> *want) const {
+  if (limit <= 0)
+    return;
+  int subtree_type = crush->get_type_id(subtree);
+  if (subtree_type < 1)
+    return;
+  vector<int> subtrees;
+  crush->get_subtree_of_type(subtree_type, &subtrees);
+  std::random_device rd;
+  std::default_random_engine rng{rd()};
+  std::shuffle(subtrees.begin(), subtrees.end(), rng);
+  for (auto s : subtrees) {
+    if (limit <= 0)
+      break;
+    if (crush->subtree_contains(s, n))
+      continue;
+    vector<int> osds;
+    crush->get_children_of_type(s, 0, &osds);
+    if (osds.empty())
+      continue;
+    vector<int> up_osds;
+    for (auto o : osds) {
+      if (is_up(o) && !skip.count(o))
+        up_osds.push_back(o);
+    }
+    if (up_osds.empty())
+      continue;
+    auto it = up_osds.begin();
+    std::advance(it, (n % up_osds.size()));
+    want->insert(*it);
+    --limit;
+  }
+}
+
+float OSDMap::pool_raw_used_rate(int64_t poolid) const
+{
+  const pg_pool_t *pool = get_pg_pool(poolid);
+  assert(pool != nullptr);
+
+  switch (pool->get_type()) {
+  case pg_pool_t::TYPE_REPLICATED:
+    return pool->get_size();
+    break;
+  case pg_pool_t::TYPE_ERASURE:
+  {
+    auto& ecp =
+      get_erasure_code_profile(pool->erasure_code_profile);
+    auto pm = ecp.find("m");
+    auto pk = ecp.find("k");
+    if (pm != ecp.end() && pk != ecp.end()) {
+      int k = atoi(pk->second.c_str());
+      int m = atoi(pm->second.c_str());
+      int mk = m + k;
+      ceph_assert(mk != 0);
+      ceph_assert(k != 0);
+      return (float)mk / k;
+    } else {
+      return 0.0;
+    }
+  }
+  break;
+  default:
+    ceph_abort_msg("unrecognized pool type");
+  }
+}
+
+unsigned OSDMap::get_osd_crush_node_flags(int osd) const
+{
+  unsigned flags = 0;
+  if (!crush_node_flags.empty()) {
+    // the map will contain type -> name
+    std::map<std::string,std::string> ploc = crush->get_full_location(osd);
+    for (auto& i : ploc) {
+      int id = crush->get_item_id(i.second);
+      auto p = crush_node_flags.find(id);
+      if (p != crush_node_flags.end()) {
+       flags |= p->second;
+      }
+    }
+  }
+  return flags;
+}
+
+unsigned OSDMap::get_crush_node_flags(int id) const
+{
+  unsigned flags = 0;
+  auto it = crush_node_flags.find(id);
+  if (it != crush_node_flags.end())
+    flags = it->second;
+  return flags;
+}
+
+unsigned OSDMap::get_device_class_flags(int id) const
+{
+  unsigned flags = 0;
+  auto it = device_class_flags.find(id);
+  if (it != device_class_flags.end())
+    flags = it->second;
+  return flags;
+}