]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/BlueStore.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / os / bluestore / BlueStore.cc
index 8b893be79d18f36113ab14fcfe5660f467564b5b..d1a0fe4897cc887fd61d1f14bd51799890998ddb 100644 (file)
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <fcntl.h>
+#include <algorithm>
 
 #include <boost/container/flat_set.hpp>
-#include "boost/algorithm/string.hpp"
+#include <boost/algorithm/string.hpp>
 
 #include "include/cpp-btree/btree_set.h"
 
 #include "BlueStore.h"
 #include "bluestore_common.h"
+#include "simple_bitmap.h"
 #include "os/kv.h"
 #include "include/compat.h"
 #include "include/intarith.h"
@@ -34,7 +36,7 @@
 #include "common/errno.h"
 #include "common/safe_io.h"
 #include "common/PriorityCache.h"
-#include "common/RWLock.h"
+#include "common/url_escape.h"
 #include "Allocator.h"
 #include "FreelistManager.h"
 #include "BlueFS.h"
 #include "common/blkdev.h"
 #include "common/numa.h"
 #include "common/pretty_binary.h"
+#include "kv/KeyValueHistogram.h"
+
+#ifdef HAVE_LIBZBD
+#include "ZonedAllocator.h"
+#include "ZonedFreelistManager.h"
+#endif
 
 #if defined(WITH_LTTNG)
 #define TRACEPOINT_DEFINE
@@ -78,12 +86,15 @@ MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::SharedBlob, bluestore_shared_blob,
 // bluestore_txc
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::TransContext, bluestore_transcontext,
                              bluestore_txc);
+using std::byte;
 using std::deque;
 using std::min;
 using std::make_pair;
 using std::numeric_limits;
 using std::pair;
+using std::less;
 using std::list;
+using std::make_unique;
 using std::map;
 using std::max;
 using std::ostream;
@@ -91,6 +102,7 @@ using std::ostringstream;
 using std::set;
 using std::string;
 using std::stringstream;
+using std::unique_ptr;
 using std::vector;
 
 using ceph::bufferlist;
@@ -117,10 +129,13 @@ const string PREFIX_PERPG_OMAP = "p";   // u64(pool) + u32(hash) + u64(id) + key
 const string PREFIX_DEFERRED = "L";    // id -> deferred_transaction_t
 const string PREFIX_ALLOC = "B";       // u64 offset -> u64 length (freelist)
 const string PREFIX_ALLOC_BITMAP = "b";// (see BitmapFreelistManager)
-const string PREFIX_SHARED_BLOB = "X"; // u64 offset -> shared_blob_t
+const string PREFIX_SHARED_BLOB = "X"; // u64 SB id -> shared_blob_t
+
+#ifdef HAVE_LIBZBD
 const string PREFIX_ZONED_FM_META = "Z";  // (see ZonedFreelistManager)
 const string PREFIX_ZONED_FM_INFO = "z";  // (see ZonedFreelistManager)
 const string PREFIX_ZONED_CL_INFO = "G";  // (per-zone cleaner metadata)
+#endif
 
 const string BLUESTORE_GLOBAL_STATFS_KEY = "bluestore_statfs";
 
@@ -366,20 +381,12 @@ static const char *_key_decode_prefix(const char *p, ghobject_t *oid)
 
 #define ENCODED_KEY_PREFIX_LEN (1 + 8 + 4)
 
-template<typename S>
-static int get_key_object(const S& key, ghobject_t *oid)
+static int _get_key_object(const char *p, ghobject_t *oid)
 {
   int r;
-  const char *p = key.c_str();
-
-  if (key.length() < ENCODED_KEY_PREFIX_LEN)
-    return -1;
 
   p = _key_decode_prefix(p, oid);
 
-  if (key.length() == ENCODED_KEY_PREFIX_LEN)
-    return -2;
-
   r = decode_escaped(p, &oid->hobj.nspace);
   if (r < 0)
     return -2;
@@ -424,10 +431,19 @@ static int get_key_object(const S& key, ghobject_t *oid)
 }
 
 template<typename S>
-static void get_object_key(CephContext *cct, const ghobject_t& oid, S *key)
+static int get_key_object(const S& key, ghobject_t *oid)
 {
-  key->clear();
+  if (key.length() < ENCODED_KEY_PREFIX_LEN)
+    return -1;
+  if (key.length() == ENCODED_KEY_PREFIX_LEN)
+    return -2;
+  const char *p = key.c_str();
+  return _get_key_object(p, oid);
+}
 
+template<typename S>
+static void _get_object_key(const ghobject_t& oid, S *key)
+{
   size_t max_len = ENCODED_KEY_PREFIX_LEN +
                   (oid.hobj.nspace.length() * 3 + 1) +
                   (oid.hobj.get_key().length() * 3 + 1) +
@@ -462,6 +478,13 @@ static void get_object_key(CephContext *cct, const ghobject_t& oid, S *key)
   _key_encode_u64(oid.generation, key);
 
   key->push_back(ONODE_KEY_SUFFIX);
+}
+
+template<typename S>
+static void get_object_key(CephContext *cct, const ghobject_t& oid, S *key)
+{
+  key->clear();
+  _get_object_key(oid, key);
 
   // sanity check
   if (true) {
@@ -550,6 +573,37 @@ static int get_key_pool_stat(const string& key, uint64_t* pool_id)
   return 0;
 }
 
+#ifdef HAVE_LIBZBD
+static void get_zone_offset_object_key(
+  uint32_t zone,
+  uint64_t offset,
+  ghobject_t oid,
+  std::string *key)
+{
+  key->clear();
+  _key_encode_u32(zone, key);
+  _key_encode_u64(offset, key);
+  _get_object_key(oid, key);
+}
+
+static int get_key_zone_offset_object(
+  const string& key,
+  uint32_t *zone,
+  uint64_t *offset,
+  ghobject_t *oid)
+{
+  const char *p = key.c_str();
+  if (key.length() < sizeof(uint64_t) + sizeof(uint32_t) + ENCODED_KEY_PREFIX_LEN + 1)
+    return -1;
+  p = _key_decode_u32(p, zone);
+  p = _key_decode_u64(p, offset);
+  int r = _get_key_object(p, oid);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+#endif
 
 template <int LogLevelV>
 void _dump_extent_map(CephContext *cct, const BlueStore::ExtentMap &em)
@@ -598,6 +652,10 @@ void _dump_onode(CephContext *cct, const BlueStore::Onode& o)
                  << ", " << o.extent_map.spanning_blob_map.size()
                  << " spanning blobs"
                  << dendl;
+  for (auto& [zone, offset] : o.onode.zone_offset_refs) {
+    dout(LogLevelV) << __func__ << " zone ref 0x" << std::hex << zone
+                   << " offset 0x" << offset << std::dec << dendl;
+  }
   for (auto p = o.onode.attrs.begin();
        p != o.onode.attrs.end();
        ++p) {
@@ -1052,15 +1110,19 @@ struct LruOnodeCacheShard : public BlueStore::OnodeCacheShard {
   {
     if (o->put_cache()) {
       (level > 0) ? lru.push_front(*o) : lru.push_back(*o);
+      o->cache_age_bin = age_bins.front();
+      *(o->cache_age_bin) += 1;
     } else {
       ++num_pinned;
     }
     ++num; // we count both pinned and unpinned entries
-    dout(20) << __func__ << " " << this << " " << o->oid << " added, num=" << num << dendl;
+    dout(20) << __func__ << " " << this << " " << o->oid << " added, num="
+             << num << dendl;
   }
   void _rm(BlueStore::Onode* o) override
   {
     if (o->pop_cache()) {
+      *(o->cache_age_bin) -= 1;
       lru.erase(lru.iterator_to(*o));
     } else {
       ceph_assert(num_pinned);
@@ -1072,16 +1134,19 @@ struct LruOnodeCacheShard : public BlueStore::OnodeCacheShard {
   }
   void _pin(BlueStore::Onode* o) override
   {
+    *(o->cache_age_bin) -= 1;
     lru.erase(lru.iterator_to(*o));
     ++num_pinned;
-    dout(20) << __func__ << this << " " << " " << " " << o->oid << " pinned" << dendl;
+    dout(20) << __func__ << " " << this << " " << " " << " " << o->oid << " pinned" << dendl;
   }
   void _unpin(BlueStore::Onode* o) override
   {
     lru.push_front(*o);
+    o->cache_age_bin = age_bins.front();
+    *(o->cache_age_bin) += 1;
     ceph_assert(num_pinned);
     --num_pinned;
-    dout(20) << __func__ << this << " " << " " << " " << o->oid << " unpinned" << dendl;
+    dout(20) << __func__ << " " << this << " " << " " << " " << o->oid << " unpinned" << dendl;
   }
   void _unpin_and_rm(BlueStore::Onode* o) override
   {
@@ -1112,6 +1177,7 @@ struct LruOnodeCacheShard : public BlueStore::OnodeCacheShard {
         ceph_assert(n == 0);
         lru.erase(p);
       }
+      *(o->cache_age_bin) -= 1;
       auto pinned = !o->pop_cache();
       ceph_assert(!pinned);
       o->c->onode_map._remove(o->oid);
@@ -1173,11 +1239,15 @@ struct LruBufferCacheShard : public BlueStore::BufferCacheShard {
       lru.push_back(*b);
     }
     buffer_bytes += b->length;
+    b->cache_age_bin = age_bins.front();
+    *(b->cache_age_bin) += b->length;
     num = lru.size();
   }
   void _rm(BlueStore::Buffer *b) override {
     ceph_assert(buffer_bytes >= b->length);
     buffer_bytes -= b->length;
+    assert(*(b->cache_age_bin) >= b->length);
+    *(b->cache_age_bin) -= b->length;
     auto q = lru.iterator_to(*b);
     lru.erase(q);
     num = lru.size();
@@ -1189,11 +1259,16 @@ struct LruBufferCacheShard : public BlueStore::BufferCacheShard {
   void _adjust_size(BlueStore::Buffer *b, int64_t delta) override {
     ceph_assert((int64_t)buffer_bytes + delta >= 0);
     buffer_bytes += delta;
+    assert(*(b->cache_age_bin) + delta >= 0);
+    *(b->cache_age_bin) += delta;
   }
   void _touch(BlueStore::Buffer *b) override {
     auto p = lru.iterator_to(*b);
     lru.erase(p);
     lru.push_front(*b);
+    *(b->cache_age_bin) -= b->length;
+    b->cache_age_bin = age_bins.front();
+    *(b->cache_age_bin) += b->length;
     num = lru.size();
     _audit("_touch_buffer end");
   }
@@ -1210,6 +1285,8 @@ struct LruBufferCacheShard : public BlueStore::BufferCacheShard {
       BlueStore::Buffer *b = &*i;
       ceph_assert(b->is_clean());
       dout(20) << __func__ << " rm " << *b << dendl;
+      assert(*(b->cache_age_bin) >= b->length);
+      *(b->cache_age_bin) -= b->length;
       b->space->_rm_buffer(this, b);
     }
     num = lru.size();
@@ -1321,9 +1398,11 @@ public:
         ceph_abort_msg("bad cache_private");
       }
     }
+    b->cache_age_bin = age_bins.front();
     if (!b->is_empty()) {
       buffer_bytes += b->length;
       list_bytes[b->cache_private] += b->length;
+      *(b->cache_age_bin) += b->length;
     }
     num = hot.size() + warm_in.size();
   }
@@ -1336,6 +1415,8 @@ public:
       buffer_bytes -= b->length;
       ceph_assert(list_bytes[b->cache_private] >= b->length);
       list_bytes[b->cache_private] -= b->length;
+      assert(*(b->cache_age_bin) >= b->length);
+      *(b->cache_age_bin) -= b->length;
     }
     switch (b->cache_private) {
     case BUFFER_WARM_IN:
@@ -1378,6 +1459,7 @@ public:
     if (!b->is_empty()) {
       buffer_bytes += b->length;
       list_bytes[b->cache_private] += b->length;
+      *(b->cache_age_bin) += b->length;
     }
     num = hot.size() + warm_in.size();
   }
@@ -1390,6 +1472,8 @@ public:
       buffer_bytes += delta;
       ceph_assert((int64_t)list_bytes[b->cache_private] + delta >= 0);
       list_bytes[b->cache_private] += delta;
+      assert(*(b->cache_age_bin) + delta >= 0);
+      *(b->cache_age_bin) += delta;
     }
   }
 
@@ -1408,6 +1492,9 @@ public:
       hot.push_front(*b);
       break;
     }
+    *(b->cache_age_bin) -= b->length;
+    b->cache_age_bin = age_bins.front();
+    *(b->cache_age_bin) += b->length;
     num = hot.size() + warm_in.size();
     _audit("_touch_buffer end");
   }
@@ -1455,7 +1542,9 @@ public:
         buffer_bytes -= b->length;
         ceph_assert(list_bytes[BUFFER_WARM_IN] >= b->length);
         list_bytes[BUFFER_WARM_IN] -= b->length;
-        to_evict_bytes -= b->length;
+        assert(*(b->cache_age_bin) >= b->length);
+        *(b->cache_age_bin) -= b->length;
+       to_evict_bytes -= b->length;
         evicted += b->length;
         b->state = BlueStore::Buffer::STATE_EMPTY;
         b->data.clear();
@@ -1860,12 +1949,12 @@ BlueStore::OnodeRef BlueStore::OnodeSpace::lookup(const ghobject_t& oid)
 {
   ldout(cache->cct, 30) << __func__ << dendl;
   OnodeRef o;
-  bool hit = false;
 
   {
     std::lock_guard l(cache->lock);
     ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
     if (p == onode_map.end()) {
+      cache->logger->inc(l_bluestore_onode_misses);
       ldout(cache->cct, 30) << __func__ << " " << oid << " miss" << dendl;
     } else {
       ldout(cache->cct, 30) << __func__ << " " << oid << " hit " << p->second
@@ -1878,15 +1967,10 @@ BlueStore::OnodeRef BlueStore::OnodeSpace::lookup(const ghobject_t& oid)
       o = p->second;
       ceph_assert(!o->cached || o->pinned);
 
-      hit = true;
+      cache->logger->inc(l_bluestore_onode_hits);
     }
   }
 
-  if (hit) {
-    cache->logger->inc(l_bluestore_onode_hits);
-  } else {
-    cache->logger->inc(l_bluestore_onode_misses);
-  }
   return o;
 }
 
@@ -2585,6 +2669,8 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
     // schedule DB update for dirty shards
     string key;
     for (auto& it : encoded_shards) {
+      dout(20) << __func__ << "  encoding key for shard 0x" << std::hex
+              << it.shard->shard_info->offset << std::dec << dendl;
       it.shard->dirty = false;
       it.shard->shard_info->bytes = it.bl.length();
       generate_extent_shard_key_and_apply(
@@ -3504,14 +3590,73 @@ BlueStore::BlobRef BlueStore::ExtentMap::split_blob(
 #undef dout_prefix
 #define dout_prefix *_dout << "bluestore.onode(" << this << ")." << __func__ << " "
 
-//
-// A tricky thing about Onode's ref counter is that we do an additional
-// increment when newly pinned instance is detected. And -1 on unpin.
-// This prevents from a conflict with a delete call (when nref == 0).
-// The latter might happen while the thread is in unpin() function
-// (and e.g. waiting for lock acquisition) since nref is already
-// decremented. And another 'putting' thread on the instance will release it.
-//
+const std::string& BlueStore::Onode::calc_omap_prefix(uint8_t flags)
+{
+  if (bluestore_onode_t::is_pgmeta_omap(flags)) {
+    return PREFIX_PGMETA_OMAP;
+  }
+  if (bluestore_onode_t::is_perpg_omap(flags)) {
+    return PREFIX_PERPG_OMAP;
+  }
+  if (bluestore_onode_t::is_perpool_omap(flags)) {
+    return PREFIX_PERPOOL_OMAP;
+  }
+  return PREFIX_OMAP;
+}
+
+// '-' < '.' < '~'
+void BlueStore::Onode::calc_omap_header(
+  uint8_t flags,
+  const Onode* o,
+  std::string* out)
+{
+  if (!bluestore_onode_t::is_pgmeta_omap(flags)) {
+    if (bluestore_onode_t::is_perpg_omap(flags)) {
+      _key_encode_u64(o->c->pool(), out);
+      _key_encode_u32(o->oid.hobj.get_bitwise_key_u32(), out);
+    } else if (bluestore_onode_t::is_perpool_omap(flags)) {
+      _key_encode_u64(o->c->pool(), out);
+    }
+  }
+  _key_encode_u64(o->onode.nid, out);
+  out->push_back('-');
+}
+
+void BlueStore::Onode::calc_omap_key(uint8_t flags,
+                                   const Onode* o,
+                                   const std::string& key,
+                                   std::string* out)
+{
+  if (!bluestore_onode_t::is_pgmeta_omap(flags)) {
+    if (bluestore_onode_t::is_perpg_omap(flags)) {
+      _key_encode_u64(o->c->pool(), out);
+      _key_encode_u32(o->oid.hobj.get_bitwise_key_u32(), out);
+    } else if (bluestore_onode_t::is_perpool_omap(flags)) {
+      _key_encode_u64(o->c->pool(), out);
+    }
+  }
+  _key_encode_u64(o->onode.nid, out);
+  out->push_back('.');
+  out->append(key);
+}
+
+void BlueStore::Onode::calc_omap_tail(
+  uint8_t flags,
+  const Onode* o,
+  std::string* out)
+{
+  if (!bluestore_onode_t::is_pgmeta_omap(flags)) {
+    if (bluestore_onode_t::is_perpg_omap(flags)) {
+      _key_encode_u64(o->c->pool(), out);
+      _key_encode_u32(o->oid.hobj.get_bitwise_key_u32(), out);
+    } else if (bluestore_onode_t::is_perpool_omap(flags)) {
+      _key_encode_u64(o->c->pool(), out);
+    }
+  }
+  _key_encode_u64(o->onode.nid, out);
+  out->push_back('~');
+}
+
 void BlueStore::Onode::get() {
   if (++nref >= 2 && !pinned) {
     OnodeCacheShard* ocs = c->get_onode_cache();
@@ -3524,11 +3669,7 @@ void BlueStore::Onode::get() {
     }
     bool was_pinned = pinned;
     pinned = nref >= 2;
-    // additional increment for newly pinned instance
     bool r = !was_pinned && pinned;
-    if (r) {
-      ++nref;
-    }
     if (cached && r) {
       ocs->_pin(this);
     }
@@ -3536,8 +3677,9 @@ void BlueStore::Onode::get() {
   }
 }
 void BlueStore::Onode::put() {
+  ++put_nref;
   int n = --nref;
-  if (n == 2) {
+  if (n == 1) {
     OnodeCacheShard* ocs = c->get_onode_cache();
     ocs->lock.lock();
     // It is possible that during waiting split_cache moved us to different OnodeCacheShard.
@@ -3547,27 +3689,21 @@ void BlueStore::Onode::put() {
       ocs->lock.lock();
     }
     bool need_unpin = pinned;
-    pinned = pinned && nref > 2; // intentionally use > not >= as we have
-                                 // +1 due to pinned state
+    pinned = pinned && nref >= 2;
     need_unpin = need_unpin && !pinned;
     if (cached && need_unpin) {
       if (exists) {
         ocs->_unpin(this);
       } else {
         ocs->_unpin_and_rm(this);
-        // remove will also decrement nref and delete Onode
+        // remove will also decrement nref
         c->onode_map._remove(oid);
       }
     }
-    // additional decrement for newly unpinned instance
-    // should be the last action since Onode can be released
-    // at any point after this decrement
-    if (need_unpin) {
-      n = --nref;
-    }
     ocs->lock.unlock();
   }
-  if (n == 0) {
+  auto pn = --put_nref;
+  if (nref == 0 && pn == 0) {
     delete this;
   }
 }
@@ -3620,56 +3756,6 @@ void BlueStore::Onode::dump(Formatter* f) const
   extent_map.dump(f);
 }
 
-const std::string& BlueStore::Onode::calc_omap_prefix(uint8_t flags)
-{
-  if (bluestore_onode_t::is_pgmeta_omap(flags)) {
-    return PREFIX_PGMETA_OMAP;
-  }
-  if (bluestore_onode_t::is_perpg_omap(flags)) {
-    return PREFIX_PERPG_OMAP;
-  }
-  if (bluestore_onode_t::is_perpool_omap(flags)) {
-    return PREFIX_PERPOOL_OMAP;
-  }
-  return PREFIX_OMAP;
-}
-
-// '-' < '.' < '~'
-void BlueStore::Onode::calc_omap_header(
-  uint8_t flags,
-  const Onode* o,
-  std::string* out)
-{
-  if (!bluestore_onode_t::is_pgmeta_omap(flags)) {
-    if (bluestore_onode_t::is_perpg_omap(flags)) {
-      _key_encode_u64(o->c->pool(), out);
-      _key_encode_u32(o->oid.hobj.get_bitwise_key_u32(), out);
-    } else if (bluestore_onode_t::is_perpool_omap(flags)) {
-      _key_encode_u64(o->c->pool(), out);
-    }
-  }
-  _key_encode_u64(o->onode.nid, out);
-  out->push_back('-');
-}
-
-void BlueStore::Onode::calc_omap_key(uint8_t flags,
-                                   const Onode* o,
-                                   const std::string& key,
-                                   std::string* out)
-{
-  if (!bluestore_onode_t::is_pgmeta_omap(flags)) {
-    if (bluestore_onode_t::is_perpg_omap(flags)) {
-      _key_encode_u64(o->c->pool(), out);
-      _key_encode_u32(o->oid.hobj.get_bitwise_key_u32(), out);
-    } else if (bluestore_onode_t::is_perpool_omap(flags)) {
-      _key_encode_u64(o->c->pool(), out);
-    }
-  }
-  _key_encode_u64(o->onode.nid, out);
-  out->push_back('.');
-  out->append(key);
-}
-
 void BlueStore::Onode::rewrite_omap_key(const string& old, string *out)
 {
   if (!onode.is_pgmeta_omap()) {
@@ -3684,23 +3770,6 @@ void BlueStore::Onode::rewrite_omap_key(const string& old, string *out)
   out->append(old.c_str() + out->length(), old.size() - out->length());
 }
 
-void BlueStore::Onode::calc_omap_tail(
-  uint8_t flags,
-  const Onode* o,
-  std::string* out)
-{
-  if (!bluestore_onode_t::is_pgmeta_omap(flags)) {
-    if (bluestore_onode_t::is_perpg_omap(flags)) {
-      _key_encode_u64(o->c->pool(), out);
-      _key_encode_u32(o->oid.hobj.get_bitwise_key_u32(), out);
-    } else if (bluestore_onode_t::is_perpool_omap(flags)) {
-      _key_encode_u64(o->c->pool(), out);
-    }
-  }
-  _key_encode_u64(o->onode.nid, out);
-  out->push_back('~');
-}
-
 void BlueStore::Onode::decode_omap_key(const string& key, string *user_key)
 {
   size_t pos = sizeof(uint64_t) + 1;
@@ -4140,6 +4209,7 @@ void *BlueStore::MempoolThread::entry()
 
   utime_t next_balance = ceph_clock_now();
   utime_t next_resize = ceph_clock_now();
+  utime_t next_bin_rotation = ceph_clock_now();
   utime_t next_deferred_force_submit = ceph_clock_now();
   utime_t alloc_stats_dump_clock = ceph_clock_now();
 
@@ -4152,21 +4222,47 @@ void *BlueStore::MempoolThread::entry()
       prev_config_change = cur_config_change;
     }
 
-    // Before we trim, check and see if it's time to rebalance/resize.
+    // define various intervals for background work
+    double age_bin_interval = store->cache_age_bin_interval;
     double autotune_interval = store->cache_autotune_interval;
     double resize_interval = store->osd_memory_cache_resize_interval;
     double max_defer_interval = store->max_defer_interval;
-
     double alloc_stats_dump_interval =
       store->cct->_conf->bluestore_alloc_stats_dump_interval;
 
+    // alloc stats dump
     if (alloc_stats_dump_interval > 0 &&
         alloc_stats_dump_clock + alloc_stats_dump_interval < ceph_clock_now()) {
       store->_record_allocation_stats();
       alloc_stats_dump_clock = ceph_clock_now();
     }
+    // cache age binning
+    if (age_bin_interval > 0 && next_bin_rotation < ceph_clock_now()) {
+      if (binned_kv_cache != nullptr) {
+        binned_kv_cache->import_bins(store->kv_bins);
+      }
+      if (binned_kv_onode_cache != nullptr) {
+        binned_kv_onode_cache->import_bins(store->kv_onode_bins);
+      }
+      meta_cache->import_bins(store->meta_bins);
+      data_cache->import_bins(store->data_bins);
+
+      if (pcm != nullptr) {
+        pcm->shift_bins();
+      }
+      next_bin_rotation = ceph_clock_now();
+      next_bin_rotation += age_bin_interval;
+    }
+    // cache balancing
     if (autotune_interval > 0 && next_balance < ceph_clock_now()) {
-      _adjust_cache_settings();
+      if (binned_kv_cache != nullptr) {
+        binned_kv_cache->set_cache_ratio(store->cache_kv_ratio);
+      }
+      if (binned_kv_onode_cache != nullptr) {
+        binned_kv_onode_cache->set_cache_ratio(store->cache_kv_onode_ratio);
+      }
+      meta_cache->set_cache_ratio(store->cache_meta_ratio);
+      data_cache->set_cache_ratio(store->cache_data_ratio);
 
       // Log events at 5 instead of 20 when balance happens.
       interval_stats_trim = true;
@@ -4178,6 +4274,7 @@ void *BlueStore::MempoolThread::entry()
       next_balance = ceph_clock_now();
       next_balance += autotune_interval;
     }
+    // memory resizing (ie autotuning)
     if (resize_interval > 0 && next_resize < ceph_clock_now()) {
       if (ceph_using_tcmalloc() && pcm != nullptr) {
         pcm->tune_memory();
@@ -4185,7 +4282,7 @@ void *BlueStore::MempoolThread::entry()
       next_resize = ceph_clock_now();
       next_resize += resize_interval;
     }
-
+    // deferred force submit
     if (max_defer_interval > 0 &&
        next_deferred_force_submit < ceph_clock_now()) {
       if (store->get_deferred_last_submitted() + max_defer_interval <
@@ -4212,18 +4309,6 @@ void *BlueStore::MempoolThread::entry()
   return NULL;
 }
 
-void BlueStore::MempoolThread::_adjust_cache_settings()
-{
-  if (binned_kv_cache != nullptr) {
-    binned_kv_cache->set_cache_ratio(store->cache_kv_ratio);
-  }
-  if (binned_kv_onode_cache != nullptr) {
-    binned_kv_onode_cache->set_cache_ratio(store->cache_kv_onode_ratio);
-  }
-  meta_cache->set_cache_ratio(store->cache_meta_ratio);
-  data_cache->set_cache_ratio(store->cache_data_ratio);
-}
-
 void BlueStore::MempoolThread::_resize_shards(bool interval_stats)
 {
   size_t onode_shards = store->onode_cache_shards.size();
@@ -4235,7 +4320,7 @@ void BlueStore::MempoolThread::_resize_shards(bool interval_stats)
 
   uint64_t cache_size = store->cache_size;
   int64_t kv_alloc =
-     static_cast<int64_t>(store->cache_kv_ratio * cache_size); 
+     static_cast<int64_t>(store->cache_kv_ratio * cache_size);
   int64_t kv_onode_alloc =
      static_cast<int64_t>(store->cache_kv_onode_ratio * cache_size);
   int64_t meta_alloc =
@@ -4491,8 +4576,8 @@ static void discard_cb(void *priv, void *priv2)
 void BlueStore::handle_discard(interval_set<uint64_t>& to_release)
 {
   dout(10) << __func__ << dendl;
-  ceph_assert(shared_alloc.a);
-  shared_alloc.a->release(to_release);
+  ceph_assert(alloc);
+  alloc->release(to_release);
 }
 
 BlueStore::BlueStore(CephContext *cct, const string& path)
@@ -4506,7 +4591,9 @@ BlueStore::BlueStore(CephContext *cct,
     finisher(cct, "commit_finisher", "cfin"),
     kv_sync_thread(this),
     kv_finalize_thread(this),
+#ifdef HAVE_LIBZBD
     zoned_cleaner_thread(this),
+#endif
     min_alloc_size(_min_alloc_size),
     min_alloc_size_order(ctz(_min_alloc_size)),
     mempool_thread(this)
@@ -4570,8 +4657,14 @@ const char **BlueStore::get_tracked_conf_keys() const
     "osd_memory_expected_fragmentation",
     "bluestore_cache_autotune",
     "bluestore_cache_autotune_interval",
+    "bluestore_cache_age_bin_interval",
+    "bluestore_cache_kv_age_bins",
+    "bluestore_cache_kv_onode_age_bins",
+    "bluestore_cache_meta_age_bins",
+    "bluestore_cache_data_age_bins",
     "bluestore_warn_on_legacy_statfs",
     "bluestore_warn_on_no_per_pool_omap",
+    "bluestore_warn_on_no_per_pg_omap",
     "bluestore_max_defer_interval",
     NULL
   };
@@ -4768,6 +4861,22 @@ int BlueStore::_set_cache_sizes()
   cache_autotune = cct->_conf.get_val<bool>("bluestore_cache_autotune");
   cache_autotune_interval =
       cct->_conf.get_val<double>("bluestore_cache_autotune_interval");
+  cache_age_bin_interval =
+      cct->_conf.get_val<double>("bluestore_cache_age_bin_interval");
+  auto _set_bin = [&](std::string conf_name, std::vector<uint64_t>* intervals)
+  {
+    std::string intervals_str = cct->_conf.get_val<std::string>(conf_name);
+    std::istringstream interval_stream(intervals_str);
+    std::copy(
+      std::istream_iterator<uint64_t>(interval_stream),
+      std::istream_iterator<uint64_t>(),
+      std::back_inserter(*intervals));
+  };
+  _set_bin("bluestore_cache_age_bins_kv", &kv_bins);
+  _set_bin("bluestore_cache_age_bins_kv_onode", &kv_onode_bins);
+  _set_bin("bluestore_cache_age_bins_meta", &meta_bins);
+  _set_bin("bluestore_cache_age_bins_data", &data_bins);
+
   osd_memory_target = cct->_conf.get_val<Option::size_t>("osd_memory_target");
   osd_memory_base = cct->_conf.get_val<Option::size_t>("osd_memory_base");
   osd_memory_expected_fragmentation =
@@ -4867,174 +4976,322 @@ void BlueStore::_init_logger()
 {
   PerfCountersBuilder b(cct, "bluestore",
                         l_bluestore_first, l_bluestore_last);
-  b.add_time_avg(l_bluestore_kv_flush_lat, "kv_flush_lat",
-                "Average kv_thread flush latency",
-                "fl_l", PerfCountersBuilder::PRIO_INTERESTING);
-  b.add_time_avg(l_bluestore_kv_commit_lat, "kv_commit_lat",
-                "Average kv_thread commit latency");
-  b.add_time_avg(l_bluestore_kv_sync_lat, "kv_sync_lat",
-                "Average kv_sync thread latency",
-                "ks_l", PerfCountersBuilder::PRIO_INTERESTING);
-  b.add_time_avg(l_bluestore_kv_final_lat, "kv_final_lat",
-                "Average kv_finalize thread latency",
-                "kf_l", PerfCountersBuilder::PRIO_INTERESTING);
+
+  // space utilization stats
+  //****************************************
+  b.add_u64(l_bluestore_allocated, "allocated",
+           "Sum for allocated bytes",
+           "al_b",
+           PerfCountersBuilder::PRIO_CRITICAL,
+           unit_t(UNIT_BYTES));
+  b.add_u64(l_bluestore_stored, "stored",
+           "Sum for stored bytes",
+           "st_b",
+           PerfCountersBuilder::PRIO_CRITICAL,
+           unit_t(UNIT_BYTES));
+  b.add_u64(l_bluestore_fragmentation, "fragmentation_micros",
+            "How fragmented bluestore free space is (free extents / max possible number of free extents) * 1000");
+  b.add_u64(l_bluestore_alloc_unit, "alloc_unit",
+           "allocation unit size in bytes",
+           "au_b",
+           PerfCountersBuilder::PRIO_CRITICAL,
+           unit_t(UNIT_BYTES));
+  //****************************************
+
+  // Update op processing state latencies
+  //****************************************
   b.add_time_avg(l_bluestore_state_prepare_lat, "state_prepare_lat",
-    "Average prepare state latency");
+                "Average prepare state latency",
+                "sprl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_state_aio_wait_lat, "state_aio_wait_lat",
                 "Average aio_wait state latency",
-                "io_l", PerfCountersBuilder::PRIO_INTERESTING);
+                "sawl", PerfCountersBuilder::PRIO_INTERESTING);
   b.add_time_avg(l_bluestore_state_io_done_lat, "state_io_done_lat",
-    "Average io_done state latency");
+                "Average io_done state latency",
+                "sidl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_state_kv_queued_lat, "state_kv_queued_lat",
-    "Average kv_queued state latency");
+               "Average kv_queued state latency",
+               "skql", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_state_kv_committing_lat, "state_kv_commiting_lat",
-    "Average kv_commiting state latency");
+                "Average kv_commiting state latency",
+                "skcl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_state_kv_done_lat, "state_kv_done_lat",
-    "Average kv_done state latency");
+                "Average kv_done state latency",
+                "skdl", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_time_avg(l_bluestore_state_finishing_lat, "state_finishing_lat",
+                "Average finishing state latency",
+                "sfnl", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_time_avg(l_bluestore_state_done_lat, "state_done_lat",
+                "Average done state latency",
+                "sdnl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_state_deferred_queued_lat, "state_deferred_queued_lat",
-    "Average deferred_queued state latency");
+                "Average deferred_queued state latency",
+                "sdql", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_state_deferred_aio_wait_lat, "state_deferred_aio_wait_lat",
-    "Average aio_wait state latency");
+                "Average aio_wait state latency",
+                "sdal", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_state_deferred_cleanup_lat, "state_deferred_cleanup_lat",
-    "Average cleanup state latency");
-  b.add_time_avg(l_bluestore_state_finishing_lat, "state_finishing_lat",
-    "Average finishing state latency");
-  b.add_time_avg(l_bluestore_state_done_lat, "state_done_lat",
-    "Average done state latency");
-  b.add_time_avg(l_bluestore_throttle_lat, "throttle_lat",
+                "Average cleanup state latency",
+                "sdcl", PerfCountersBuilder::PRIO_USEFUL);
+  //****************************************
+
+  // Update Transaction stats
+  //****************************************
+  b.add_time_avg(l_bluestore_throttle_lat, "txc_throttle_lat",
                 "Average submit throttle latency",
                 "th_l", PerfCountersBuilder::PRIO_CRITICAL);
-  b.add_time_avg(l_bluestore_submit_lat, "submit_lat",
+  b.add_time_avg(l_bluestore_submit_lat, "txc_submit_lat",
                 "Average submit latency",
                 "s_l", PerfCountersBuilder::PRIO_CRITICAL);
-  b.add_time_avg(l_bluestore_commit_lat, "commit_lat",
+  b.add_time_avg(l_bluestore_commit_lat, "txc_commit_lat",
                 "Average commit latency",
                 "c_l", PerfCountersBuilder::PRIO_CRITICAL);
-  b.add_time_avg(l_bluestore_read_lat, "read_lat",
-                "Average read latency",
-                "r_l", PerfCountersBuilder::PRIO_CRITICAL);
+  b.add_u64_counter(l_bluestore_txc, "txc_count", "Transactions committed");
+  //****************************************
+
+  // Read op stats
+  //****************************************
   b.add_time_avg(l_bluestore_read_onode_meta_lat, "read_onode_meta_lat",
-    "Average read onode metadata latency");
+                "Average read onode metadata latency",
+                "roml", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_read_wait_aio_lat, "read_wait_aio_lat",
-    "Average read latency");
-  b.add_time_avg(l_bluestore_compress_lat, "compress_lat",
-    "Average compress latency");
-  b.add_time_avg(l_bluestore_decompress_lat, "decompress_lat",
-    "Average decompress latency");
+                "Average read I/O waiting latency",
+                "rwal", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_csum_lat, "csum_lat",
-    "Average checksum latency");
-  b.add_u64_counter(l_bluestore_compress_success_count, "compress_success_count",
-    "Sum for beneficial compress ops");
-  b.add_u64_counter(l_bluestore_compress_rejected_count, "compress_rejected_count",
-    "Sum for compress ops rejected due to low net gain of space");
-  b.add_u64_counter(l_bluestore_write_pad_bytes, "write_pad_bytes",
-                   "Sum for write-op padded bytes", NULL, 0, unit_t(UNIT_BYTES));
-  b.add_u64_counter(l_bluestore_deferred_write_ops, "deferred_write_ops",
-                   "Sum for deferred write op");
-  b.add_u64_counter(l_bluestore_deferred_write_bytes, "deferred_write_bytes",
-                   "Sum for deferred write bytes", "def", 0, unit_t(UNIT_BYTES));
-  b.add_u64_counter(l_bluestore_write_penalty_read_ops, "write_penalty_read_ops",
-                   "Sum for write penalty read ops");
-  b.add_u64(l_bluestore_allocated, "bluestore_allocated",
-    "Sum for allocated bytes");
-  b.add_u64(l_bluestore_stored, "bluestore_stored",
-    "Sum for stored bytes");
-  b.add_u64(l_bluestore_compressed, "bluestore_compressed",
-    "Sum for stored compressed bytes",
-    "c", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
-  b.add_u64(l_bluestore_compressed_allocated, "bluestore_compressed_allocated",
-    "Sum for bytes allocated for compressed data",
-    "c_a", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
-  b.add_u64(l_bluestore_compressed_original, "bluestore_compressed_original",
-    "Sum for original bytes that were compressed",
-    "c_o", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
-  b.add_u64(l_bluestore_onodes, "bluestore_onodes",
-           "Number of onodes in cache");
-  b.add_u64(l_bluestore_pinned_onodes, "bluestore_pinned_onodes",
-            "Number of pinned onodes in cache");
-  b.add_u64_counter(l_bluestore_onode_hits, "bluestore_onode_hits",
-                   "Sum for onode-lookups hit in the cache");
-  b.add_u64_counter(l_bluestore_onode_misses, "bluestore_onode_misses",
-                   "Sum for onode-lookups missed in the cache");
-  b.add_u64_counter(l_bluestore_onode_shard_hits, "bluestore_onode_shard_hits",
-                   "Sum for onode-shard lookups hit in the cache");
-  b.add_u64_counter(l_bluestore_onode_shard_misses,
-                   "bluestore_onode_shard_misses",
-                   "Sum for onode-shard lookups missed in the cache");
-  b.add_u64(l_bluestore_extents, "bluestore_extents",
-           "Number of extents in cache");
-  b.add_u64(l_bluestore_blobs, "bluestore_blobs",
-           "Number of blobs in cache");
-  b.add_u64(l_bluestore_buffers, "bluestore_buffers",
-           "Number of buffers in cache");
-  b.add_u64(l_bluestore_buffer_bytes, "bluestore_buffer_bytes",
-           "Number of buffer bytes in cache", NULL, 0, unit_t(UNIT_BYTES));
-  b.add_u64_counter(l_bluestore_buffer_hit_bytes, "bluestore_buffer_hit_bytes",
-           "Sum for bytes of read hit in the cache", NULL, 0, unit_t(UNIT_BYTES));
-  b.add_u64_counter(l_bluestore_buffer_miss_bytes, "bluestore_buffer_miss_bytes",
-           "Sum for bytes of read missed in the cache", NULL, 0, unit_t(UNIT_BYTES));
-
-  b.add_u64_counter(l_bluestore_write_big, "bluestore_write_big",
+                "Average checksum latency",
+                "csml", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_u64_counter(l_bluestore_read_eio, "read_eio",
+                    "Read EIO errors propagated to high level callers");
+  b.add_u64_counter(l_bluestore_reads_with_retries, "reads_with_retries",
+                    "Read operations that required at least one retry due to failed checksum validation",
+                   "rd_r", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_time_avg(l_bluestore_read_lat, "read_lat",
+                "Average read latency",
+                "r_l", PerfCountersBuilder::PRIO_CRITICAL);
+  //****************************************
+
+  // kv_thread latencies
+  //****************************************
+  b.add_time_avg(l_bluestore_kv_flush_lat, "kv_flush_lat",
+                "Average kv_thread flush latency",
+                "kfsl", PerfCountersBuilder::PRIO_INTERESTING);
+  b.add_time_avg(l_bluestore_kv_commit_lat, "kv_commit_lat",
+                "Average kv_thread commit latency",
+                "kcol", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_time_avg(l_bluestore_kv_sync_lat, "kv_sync_lat",
+                "Average kv_sync thread latency",
+                "kscl", PerfCountersBuilder::PRIO_INTERESTING);
+  b.add_time_avg(l_bluestore_kv_final_lat, "kv_final_lat",
+                "Average kv_finalize thread latency",
+                "kfll", PerfCountersBuilder::PRIO_INTERESTING);
+  //****************************************
+
+  // write op stats
+  //****************************************
+  b.add_u64_counter(l_bluestore_write_big, "write_big",
                    "Large aligned writes into fresh blobs");
-  b.add_u64_counter(l_bluestore_write_big_bytes, "bluestore_write_big_bytes",
-                   "Large aligned writes into fresh blobs (bytes)", NULL, 0, unit_t(UNIT_BYTES));
-  b.add_u64_counter(l_bluestore_write_big_blobs, "bluestore_write_big_blobs",
+  b.add_u64_counter(l_bluestore_write_big_bytes, "write_big_bytes",
+                   "Large aligned writes into fresh blobs (bytes)",
+                   NULL,
+                   PerfCountersBuilder::PRIO_DEBUGONLY,
+                   unit_t(UNIT_BYTES));
+  b.add_u64_counter(l_bluestore_write_big_blobs, "write_big_blobs",
                    "Large aligned writes into fresh blobs (blobs)");
   b.add_u64_counter(l_bluestore_write_big_deferred,
-                   "bluestore_write_big_deferred",
+                   "write_big_deferred",
                    "Big overwrites using deferred");
-  b.add_u64_counter(l_bluestore_write_small, "bluestore_write_small",
+
+  b.add_u64_counter(l_bluestore_write_small, "write_small",
                    "Small writes into existing or sparse small blobs");
-  b.add_u64_counter(l_bluestore_write_small_bytes, "bluestore_write_small_bytes",
-                   "Small writes into existing or sparse small blobs (bytes)", NULL, 0, unit_t(UNIT_BYTES));
+  b.add_u64_counter(l_bluestore_write_small_bytes, "write_small_bytes",
+                   "Small writes into existing or sparse small blobs (bytes)",
+                   NULL,
+                   PerfCountersBuilder::PRIO_DEBUGONLY,
+                   unit_t(UNIT_BYTES));
   b.add_u64_counter(l_bluestore_write_small_unused,
-                   "bluestore_write_small_unused",
+                   "write_small_unused",
                    "Small writes into unused portion of existing blob");
-  b.add_u64_counter(l_bluestore_write_deferred,
-                   "bluestore_write_deferred",
-                   "Total deferred writes submitted");
-  b.add_u64_counter(l_bluestore_write_deferred_bytes,
-                   "bluestore_write_deferred_bytes",
-                   "Total bytes submitted as deferred writes");
   b.add_u64_counter(l_bluestore_write_small_pre_read,
-                   "bluestore_write_small_pre_read",
+                   "write_small_pre_read",
                    "Small writes that required we read some data (possibly "
                    "cached) to fill out the block");
-  b.add_u64_counter(l_bluestore_write_new, "bluestore_write_new",
-                   "Write into new blob");
 
-  b.add_u64_counter(l_bluestore_txc, "bluestore_txc", "Transactions committed");
-  b.add_u64_counter(l_bluestore_onode_reshard, "bluestore_onode_reshard",
-                   "Onode extent map reshard events");
-  b.add_u64_counter(l_bluestore_blob_split, "bluestore_blob_split",
+  b.add_u64_counter(l_bluestore_write_pad_bytes, "write_pad_bytes",
+                   "Sum for write-op padded bytes",
+                   NULL,
+                   PerfCountersBuilder::PRIO_DEBUGONLY,
+                   unit_t(UNIT_BYTES));
+  b.add_u64_counter(l_bluestore_write_penalty_read_ops, "write_penalty_read_ops",
+                   "Sum for write penalty read ops");
+  b.add_u64_counter(l_bluestore_write_new, "write_new",
+                   "Write into new blob");
+
+  b.add_u64_counter(l_bluestore_issued_deferred_writes,
+                   "issued_deferred_writes",
+                   "Total deferred writes issued");
+  b.add_u64_counter(l_bluestore_issued_deferred_write_bytes,
+                   "issued_deferred_write_bytes",
+                   "Total bytes in issued deferred writes",
+                   NULL,
+                   PerfCountersBuilder::PRIO_DEBUGONLY,
+                   unit_t(UNIT_BYTES));
+  b.add_u64_counter(l_bluestore_submitted_deferred_writes,
+                   "submitted_deferred_writes",
+                   "Total deferred writes submitted to disk");
+  b.add_u64_counter(l_bluestore_submitted_deferred_write_bytes,
+                   "submitted_deferred_write_bytes",
+                   "Total bytes submitted to disk by deferred writes",
+                   NULL,
+                   PerfCountersBuilder::PRIO_DEBUGONLY,
+                   unit_t(UNIT_BYTES));
+
+  b.add_u64_counter(l_bluestore_write_big_skipped_blobs,
+      "write_big_skipped_blobs",
+      "Large aligned writes into fresh blobs skipped due to zero detection (blobs)");
+  b.add_u64_counter(l_bluestore_write_big_skipped_bytes,
+      "write_big_skipped_bytes",
+      "Large aligned writes into fresh blobs skipped due to zero detection (bytes)");
+  b.add_u64_counter(l_bluestore_write_small_skipped,
+      "write_small_skipped",
+      "Small writes into existing or sparse small blobs skipped due to zero detection");
+  b.add_u64_counter(l_bluestore_write_small_skipped_bytes,
+      "write_small_skipped_bytes",
+      "Small writes into existing or sparse small blobs skipped due to zero detection (bytes)");
+  //****************************************
+
+  // compressions stats
+  //****************************************
+  b.add_u64(l_bluestore_compressed, "compressed",
+           "Sum for stored compressed bytes",
+           "c", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+  b.add_u64(l_bluestore_compressed_allocated, "compressed_allocated",
+           "Sum for bytes allocated for compressed data",
+           "c_a", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+  b.add_u64(l_bluestore_compressed_original, "compressed_original",
+           "Sum for original bytes that were compressed",
+           "c_o", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+  b.add_time_avg(l_bluestore_compress_lat, "compress_lat",
+           "Average compress latency",
+           "_cpl", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_time_avg(l_bluestore_decompress_lat, "decompress_lat",
+           "Average decompress latency",
+           "dcpl", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_u64_counter(l_bluestore_compress_success_count, "compress_success_count",
+           "Sum for beneficial compress ops");
+  b.add_u64_counter(l_bluestore_compress_rejected_count, "compress_rejected_count",
+           "Sum for compress ops rejected due to low net gain of space");
+  //****************************************
+
+  // onode cache stats
+  //****************************************
+  b.add_u64(l_bluestore_onodes, "onodes",
+           "Number of onodes in cache");
+  b.add_u64(l_bluestore_pinned_onodes, "onodes_pinned",
+            "Number of pinned onodes in cache");
+  b.add_u64_counter(l_bluestore_onode_hits, "onode_hits",
+                   "Count of onode cache lookup hits",
+                   "o_ht", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_u64_counter(l_bluestore_onode_misses, "onode_misses",
+                   "Count of onode cache lookup misses",
+                   "o_ms", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_u64_counter(l_bluestore_onode_shard_hits, "onode_shard_hits",
+                   "Count of onode shard cache lookups hits");
+  b.add_u64_counter(l_bluestore_onode_shard_misses,
+                   "onode_shard_misses",
+                   "Count of onode shard cache lookups misses");
+  b.add_u64(l_bluestore_extents, "onode_extents",
+           "Number of extents in cache");
+  b.add_u64(l_bluestore_blobs, "onode_blobs",
+           "Number of blobs in cache");
+  //****************************************
+
+  // buffer cache stats
+  //****************************************
+  b.add_u64(l_bluestore_buffers, "buffers",
+           "Number of buffers in cache");
+  b.add_u64(l_bluestore_buffer_bytes, "buffer_bytes",
+           "Number of buffer bytes in cache",
+            NULL,
+            PerfCountersBuilder::PRIO_DEBUGONLY,
+            unit_t(UNIT_BYTES));
+  b.add_u64_counter(l_bluestore_buffer_hit_bytes, "buffer_hit_bytes",
+           "Sum for bytes of read hit in the cache",
+           NULL,
+           PerfCountersBuilder::PRIO_DEBUGONLY,
+           unit_t(UNIT_BYTES));
+  b.add_u64_counter(l_bluestore_buffer_miss_bytes, "buffer_miss_bytes",
+           "Sum for bytes of read missed in the cache",
+           NULL,
+           PerfCountersBuilder::PRIO_DEBUGONLY,
+           unit_t(UNIT_BYTES));
+  //****************************************
+
+  // internal stats
+  //****************************************
+  b.add_u64_counter(l_bluestore_onode_reshard, "onode_reshard",
+                   "Onode extent map reshard events");
+  b.add_u64_counter(l_bluestore_blob_split, "blob_split",
                    "Sum for blob splitting due to resharding");
-  b.add_u64_counter(l_bluestore_extent_compress, "bluestore_extent_compress",
+  b.add_u64_counter(l_bluestore_extent_compress, "extent_compress",
                    "Sum for extents that have been removed due to compression");
-  b.add_u64_counter(l_bluestore_gc_merged, "bluestore_gc_merged",
+  b.add_u64_counter(l_bluestore_gc_merged, "gc_merged",
                    "Sum for extents that have been merged due to garbage "
                    "collection");
-  b.add_u64_counter(l_bluestore_read_eio, "bluestore_read_eio",
-                    "Read EIO errors propagated to high level callers");
-  b.add_u64_counter(l_bluestore_reads_with_retries, "bluestore_reads_with_retries",
-                    "Read operations that required at least one retry due to failed checksum validation");
-  b.add_u64(l_bluestore_fragmentation, "bluestore_fragmentation_micros",
-            "How fragmented bluestore free space is (free extents / max possible number of free extents) * 1000");
+  //****************************************
+
+  // other client ops latencies
+  //****************************************
   b.add_time_avg(l_bluestore_omap_seek_to_first_lat, "omap_seek_to_first_lat",
-    "Average omap iterator seek_to_first call latency");
+    "Average omap iterator seek_to_first call latency",
+    "osfl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_omap_upper_bound_lat, "omap_upper_bound_lat",
-    "Average omap iterator upper_bound call latency");
+    "Average omap iterator upper_bound call latency",
+    "oubl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_omap_lower_bound_lat, "omap_lower_bound_lat",
-    "Average omap iterator lower_bound call latency");
+    "Average omap iterator lower_bound call latency",
+    "olbl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_omap_next_lat, "omap_next_lat",
-    "Average omap iterator next call latency");
+    "Average omap iterator next call latency",
+    "onxl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_omap_get_keys_lat, "omap_get_keys_lat",
-    "Average omap get_keys call latency");
+    "Average omap get_keys call latency",
+    "ogkl", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_omap_get_values_lat, "omap_get_values_lat",
-    "Average omap get_values call latency");
+    "Average omap get_values call latency",
+    "ogvl", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_time_avg(l_bluestore_omap_clear_lat, "omap_clear_lat",
+    "Average omap clear call latency");
   b.add_time_avg(l_bluestore_clist_lat, "clist_lat",
-    "Average collection listing latency");
+    "Average collection listing latency",
+    "cl_l", PerfCountersBuilder::PRIO_USEFUL);
   b.add_time_avg(l_bluestore_remove_lat, "remove_lat",
-    "Average removal latency");
+    "Average removal latency",
+    "rm_l", PerfCountersBuilder::PRIO_USEFUL);
+  b.add_time_avg(l_bluestore_truncate_lat, "truncate_lat",
+    "Average truncate latency",
+    "tr_l", PerfCountersBuilder::PRIO_USEFUL);
+  //****************************************
+
+  // Resulting size axis configuration for op histograms, values are in bytes
+  PerfHistogramCommon::axis_config_d alloc_hist_x_axis_config{
+    "Given size (bytes)",
+    PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale
+    0,                               ///< Start at 0
+    4096,                            ///< Quantization unit
+    13,                               ///< Enough to cover 4+M requests
+  };
+  // Req size axis configuration for op histograms, values are in bytes
+  PerfHistogramCommon::axis_config_d alloc_hist_y_axis_config{
+    "Request size (bytes)",
+    PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale
+    0,                               ///< Start at 0
+    4096,                            ///< Quantization unit
+    13,                               ///< Enough to cover 4+M requests
+  };
+  b.add_u64_counter_histogram(
+    l_bluestore_allocate_hist, "allocate_histogram",
+    alloc_hist_x_axis_config, alloc_hist_y_axis_config,
+    "Histogram of requested block allocations vs. given ones");
 
   logger = b.create_perf_counters();
   cct->get_perfcounters_collection()->add(logger);
@@ -5092,7 +5349,7 @@ void BlueStore::_close_path()
 }
 
 int BlueStore::_write_bdev_label(CephContext *cct,
-                                string path, bluestore_bdev_label_t label)
+                                const string &path, bluestore_bdev_label_t label)
 {
   dout(10) << __func__ << " path " << path << " label " << label << dendl;
   bufferlist bl;
@@ -5127,7 +5384,7 @@ out:
   return r;
 }
 
-int BlueStore::_read_bdev_label(CephContext* cct, string path,
+int BlueStore::_read_bdev_label(CephContext* cct, const string &path,
                                bluestore_bdev_label_t *label)
 {
   dout(10) << __func__ << dendl;
@@ -5203,10 +5460,15 @@ void BlueStore::_set_alloc_sizes(void)
 {
   max_alloc_size = cct->_conf->bluestore_max_alloc_size;
 
+#ifdef HAVE_LIBZBD
+  ceph_assert(bdev);
+  if (bdev->is_smr()) {
+    prefer_deferred_size = 0;
+  } else
+#endif
   if (cct->_conf->bluestore_prefer_deferred_size) {
     prefer_deferred_size = cct->_conf->bluestore_prefer_deferred_size;
   } else {
-    ceph_assert(bdev);
     if (_use_rotational_settings()) {
       prefer_deferred_size = cct->_conf->bluestore_prefer_deferred_size_hdd;
     } else {
@@ -5217,7 +5479,6 @@ void BlueStore::_set_alloc_sizes(void)
   if (cct->_conf->bluestore_deferred_batch_ops) {
     deferred_batch_ops = cct->_conf->bluestore_deferred_batch_ops;
   } else {
-    ceph_assert(bdev);
     if (_use_rotational_settings()) {
       deferred_batch_ops = cct->_conf->bluestore_deferred_batch_ops_hdd;
     } else {
@@ -5264,10 +5525,9 @@ int BlueStore::_open_bdev(bool create)
   if (r < 0) {
     goto fail_close;
   }
+  // get block dev optimal io size
+  optimal_io_size = bdev->get_optimal_io_size();
 
-  if (bdev->is_smr()) {
-    freelist_type = "zoned";
-  }
   return 0;
 
  fail_close:
@@ -5293,13 +5553,30 @@ void BlueStore::_close_bdev()
   bdev = NULL;
 }
 
-int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only)
+int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only, bool fm_restore)
 {
   int r;
 
+  dout(5) << __func__ << "::NCB::freelist_type=" << freelist_type << dendl;
   ceph_assert(fm == NULL);
+  // fm_restore means we are transitioning from null-fm to bitmap-fm
+  ceph_assert(!fm_restore || (freelist_type != "null"));
+  // fm restore must pass in a valid transaction
+  ceph_assert(!fm_restore || (t != nullptr));
+
+  // When allocation-info is stored in a single file we set freelist_type to "null"
+  bool set_null_freemap = false;
+  if (freelist_type == "null") {
+    // use BitmapFreelistManager with the null option to stop allocations from going to RocksDB
+    // we will store the allocation info in a single file during umount()
+    freelist_type = "bitmap";
+    set_null_freemap = true;
+  }
   fm = FreelistManager::create(cct, freelist_type, PREFIX_ALLOC);
   ceph_assert(fm);
+  if (set_null_freemap) {
+    fm->set_null_manager();
+  }
   if (t) {
     // create mode. initialize freespace
     dout(20) << __func__ << " initializing freespace" << dendl;
@@ -5310,21 +5587,43 @@ int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only)
     }
     // being able to allocate in units less than bdev block size 
     // seems to be a bad idea.
-    ceph_assert( cct->_conf->bdev_block_size <= (int64_t)min_alloc_size);
+    ceph_assert(cct->_conf->bdev_block_size <= min_alloc_size);
 
     uint64_t alloc_size = min_alloc_size;
+#ifdef HAVE_LIBZBD
     if (bdev->is_smr()) {
-      alloc_size = _zoned_piggyback_device_parameters_onto(alloc_size);
+      if (freelist_type != "zoned") {
+       derr << "SMR device but freelist_type = " << freelist_type << " (not zoned)"
+            << dendl;
+       return -EINVAL;
+      }
+    } else
+#endif
+    if (freelist_type == "zoned") {
+      derr << "non-SMR device (or SMR support not built-in) but freelist_type = zoned"
+          << dendl;
+      return -EINVAL;
     }
 
-    fm->create(bdev->get_size(), alloc_size, t);
+    fm->create(bdev->get_size(), alloc_size,
+              zone_size, first_sequential_zone,
+              t);
 
     // allocate superblock reserved space.  note that we do not mark
     // bluefs space as allocated in the freelist; we instead rely on
     // bluefs doing that itself.
     auto reserved = _get_ondisk_reserved();
-    fm->allocate(0, reserved, t);
-
+    if (fm_restore) {
+      // we need to allocate the full space in restore case
+      // as later we will add free-space marked in the allocator file
+      fm->allocate(0, bdev->get_size(), t);
+    } else {
+      // allocate superblock reserved space.  note that we do not mark
+      // bluefs space as allocated in the freelist; we instead rely on
+      // bluefs doing that itself.
+      fm->allocate(0, reserved, t);
+    }
+    // debug code - not needed for NULL FM
     if (cct->_conf->bluestore_debug_prefill > 0) {
       uint64_t end = bdev->get_size() - reserved;
       dout(1) << __func__ << " pre-fragmenting freespace, using "
@@ -5422,77 +5721,216 @@ int BlueStore::_write_out_fm_meta(uint64_t target_size)
 
 int BlueStore::_create_alloc()
 {
+  ceph_assert(alloc == NULL);
   ceph_assert(shared_alloc.a == NULL);
   ceph_assert(bdev->get_size());
 
   uint64_t alloc_size = min_alloc_size;
-  if (bdev->is_smr()) {
-    int r = _zoned_check_config_settings();
-    if (r < 0)
-      return r;
-    alloc_size = _zoned_piggyback_device_parameters_onto(alloc_size);
+
+  std::string allocator_type = cct->_conf->bluestore_allocator;
+
+#ifdef HAVE_LIBZBD
+  if (freelist_type == "zoned") {
+    allocator_type = "zoned";
   }
+#endif
 
-  shared_alloc.set(Allocator::create(cct, cct->_conf->bluestore_allocator,
+  alloc = Allocator::create(
+    cct, allocator_type,
     bdev->get_size(),
-    alloc_size, "block"));
-
-  if (!shared_alloc.a) {
-    lderr(cct) << __func__ << "Failed to create allocator:: "
-      << cct->_conf->bluestore_allocator
-      << dendl;
+    alloc_size,
+    zone_size,
+    first_sequential_zone,
+    "block");
+  if (!alloc) {
+    lderr(cct) << __func__ << " failed to create " << allocator_type << " allocator"
+              << dendl;
     return -EINVAL;
   }
+
+#ifdef HAVE_LIBZBD
+  if (freelist_type == "zoned") {
+    Allocator *a = Allocator::create(
+      cct, cct->_conf->bluestore_allocator,
+      bdev->get_conventional_region_size(),
+      alloc_size,
+      0, 0,
+      "zoned_block");
+    if (!a) {
+      lderr(cct) << __func__ << " failed to create " << cct->_conf->bluestore_allocator
+                << " allocator" << dendl;
+      delete alloc;
+      return -EINVAL;
+    }
+    shared_alloc.set(a);
+  } else
+#endif
+  {
+    // BlueFS will share the same allocator
+    shared_alloc.set(alloc);
+  }
+
   return 0;
 }
 
-int BlueStore::_init_alloc()
+int BlueStore::_init_alloc(std::map<uint64_t, uint64_t> *zone_adjustments)
 {
   int r = _create_alloc();
   if (r < 0) {
     return r;
   }
-  ceph_assert(shared_alloc.a != NULL);
+  ceph_assert(alloc != NULL);
 
+#ifdef HAVE_LIBZBD
   if (bdev->is_smr()) {
-    shared_alloc.a->zoned_set_zone_states(fm->get_zone_states(db));
+    auto a = dynamic_cast<ZonedAllocator*>(alloc);
+    ceph_assert(a);
+    auto f = dynamic_cast<ZonedFreelistManager*>(fm);
+    ceph_assert(f);
+    vector<uint64_t> wp = bdev->get_zones();
+    vector<zone_state_t> zones = f->get_zone_states(db);
+    ceph_assert(wp.size() == zones.size());
+
+    // reconcile zone state
+    auto num_zones = bdev->get_size() / zone_size;
+    for (unsigned i = first_sequential_zone; i < num_zones; ++i) {
+      ceph_assert(wp[i] >= i * zone_size);
+      ceph_assert(wp[i] <= (i + 1) * zone_size); // pos might be at start of next zone
+      uint64_t p = wp[i] - i * zone_size;
+      if (zones[i].write_pointer > p) {
+       derr << __func__ << " zone 0x" << std::hex << i
+            << " bluestore write pointer 0x" << zones[i].write_pointer
+            << " > device write pointer 0x" << p
+            << std::dec << " -- VERY SUSPICIOUS!" << dendl;
+      } else if (zones[i].write_pointer < p) {
+       // this is "normal" in that it can happen after any crash (if we have a
+       // write in flight but did not manage to commit the transaction)
+       auto delta = p - zones[i].write_pointer;
+       dout(1) << __func__ << " zone 0x" << std::hex << i
+                << " device write pointer 0x" << p
+                << " > bluestore pointer 0x" << zones[i].write_pointer
+                << ", advancing 0x" << delta << std::dec << dendl;
+       (*zone_adjustments)[zones[i].write_pointer] = delta;
+       zones[i].num_dead_bytes += delta;
+       zones[i].write_pointer = p;
+      }
+    }
+
+    // start with conventional zone "free" (bluefs may adjust this when it starts up)
+    auto reserved = _get_ondisk_reserved();
+    // for now we require a conventional zone
+    ceph_assert(bdev->get_conventional_region_size());
+    ceph_assert(shared_alloc.a != alloc);  // zoned allocator doesn't use conventional region
+    shared_alloc.a->init_add_free(
+      reserved,
+      p2align(bdev->get_conventional_region_size(), min_alloc_size) - reserved);
+
+    // init sequential zone based on the device's write pointers
+    a->init_from_zone_pointers(std::move(zones));
+    dout(1) << __func__
+           << " loaded zone pointers: "
+           << std::hex
+           << ", allocator type " << alloc->get_type()
+           << ", capacity 0x" << alloc->get_capacity()
+           << ", block size 0x" << alloc->get_block_size()
+           << ", free 0x" << alloc->get_free()
+           << ", fragmentation " << alloc->get_fragmentation()
+           << std::dec << dendl;
+
+    return 0;
   }
+#endif
 
   uint64_t num = 0, bytes = 0;
+  utime_t start_time = ceph_clock_now();
+  if (!fm->is_null_manager()) {
+    // This is the original path - loading allocation map from RocksDB and feeding into the allocator
+    dout(5) << __func__ << "::NCB::loading allocation from FM -> alloc" << dendl;
+    // initialize from freelist
+    fm->enumerate_reset();
+    uint64_t offset, length;
+    while (fm->enumerate_next(db, &offset, &length)) {
+      alloc->init_add_free(offset, length);
+      ++num;
+      bytes += length;
+    }
+    fm->enumerate_reset();
+
+    utime_t duration = ceph_clock_now() - start_time;
+    dout(5) << __func__ << "::num_entries=" << num << " free_size=" << bytes << " alloc_size=" <<
+      alloc->get_capacity() - bytes << " time=" << duration << " seconds" << dendl;
+  } else {
+    // This is the new path reading the allocation map from a flat bluefs file and feeding them into the allocator
 
-  dout(1) << __func__ << " opening allocation metadata" << dendl;
-  // initialize from freelist
-  fm->enumerate_reset();
-  uint64_t offset, length;
-  while (fm->enumerate_next(db, &offset, &length)) {
-    shared_alloc.a->init_add_free(offset, length);
-    ++num;
-    bytes += length;
-  }
-  fm->enumerate_reset();
+    if (!cct->_conf->bluestore_allocation_from_file) {
+      derr << __func__ << "::NCB::cct->_conf->bluestore_allocation_from_file is set to FALSE with an active NULL-FM" << dendl;
+      derr << __func__ << "::NCB::Please change the value of bluestore_allocation_from_file to TRUE in your ceph.conf file" << dendl;
+      return -ENOTSUP; // Operation not supported
+    }
 
+    if (restore_allocator(alloc, &num, &bytes) == 0) {
+      dout(5) << __func__ << "::NCB::restore_allocator() completed successfully alloc=" << alloc << dendl;
+    } else {
+      // This must mean that we had an unplanned shutdown and didn't manage to destage the allocator
+      dout(0) << __func__ << "::NCB::restore_allocator() failed! Run Full Recovery from ONodes (might take a while) ..." << dendl;
+      // if failed must recover from on-disk ONode internal state
+      if (read_allocation_from_drive_on_startup() != 0) {
+       derr << __func__ << "::NCB::Failed Recovery" << dendl;
+       derr << __func__ << "::NCB::Ceph-OSD won't start, make sure your drives are connected and readable" << dendl;
+       derr << __func__ << "::NCB::If no HW fault is found, please report failure and consider redeploying OSD" << dendl;
+       return -ENOTRECOVERABLE;
+      }
+    }
+  }
   dout(1) << __func__
           << " loaded " << byte_u_t(bytes) << " in " << num << " extents"
           << std::hex
-          << ", allocator type " << shared_alloc.a->get_type()
-          << ", capacity 0x" << shared_alloc.a->get_capacity()
-          << ", block size 0x" << shared_alloc.a->get_block_size()
-          << ", free 0x" << shared_alloc.a->get_free()
-          << ", fragmentation " << shared_alloc.a->get_fragmentation()
+          << ", allocator type " << alloc->get_type()
+          << ", capacity 0x" << alloc->get_capacity()
+          << ", block size 0x" << alloc->get_block_size()
+          << ", free 0x" << alloc->get_free()
+          << ", fragmentation " << alloc->get_fragmentation()
           << std::dec << dendl;
 
   return 0;
 }
 
+void BlueStore::_post_init_alloc(const std::map<uint64_t, uint64_t>& zone_adjustments)
+{
+#ifdef HAVE_LIBZBD
+  assert(bdev->is_smr());
+  dout(1) << __func__ << " adjusting freelist based on device write pointers" << dendl;
+  auto f = dynamic_cast<ZonedFreelistManager*>(fm);
+  ceph_assert(f);
+  KeyValueDB::Transaction t = db->get_transaction();
+  for (auto& i : zone_adjustments) {
+    // allocate AND release since this gap is now dead space
+    // note that the offset is imprecise, but only need to select the zone
+    f->allocate(i.first, i.second, t);
+    f->release(i.first, i.second, t);
+  }
+  int r = db->submit_transaction_sync(t);
+  ceph_assert(r == 0);
+#endif
+}
+
 void BlueStore::_close_alloc()
 {
   ceph_assert(bdev);
   bdev->discard_drain();
 
+  ceph_assert(alloc);
+  alloc->shutdown();
+  delete alloc;
+
   ceph_assert(shared_alloc.a);
-  shared_alloc.a->shutdown();
-  delete shared_alloc.a;
+  if (alloc != shared_alloc.a) {
+    shared_alloc.a->shutdown();
+    delete shared_alloc.a;
+  }
+
   shared_alloc.reset();
+  alloc = nullptr;
 }
 
 int BlueStore::_open_fsid(bool create)
@@ -5811,9 +6249,9 @@ int BlueStore::_open_bluefs(bool create, bool read_only)
   return r;
 }
 
-void BlueStore::_close_bluefs(bool cold_close)
+void BlueStore::_close_bluefs()
 {
-  bluefs->umount(cold_close);
+  bluefs->umount(db_was_opened_read_only);
   _minimal_close_bluefs();
 }
 
@@ -5853,8 +6291,7 @@ int BlueStore::_is_bluefs(bool create, bool* ret)
 */
 int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
 {
-  dout(0) << __func__ << " read-only:" << read_only
-          << " repair:" << to_repair << dendl;
+  dout(5) << __func__ << "::NCB::read_only=" << read_only << ", to_repair=" << to_repair << dendl;
   {
     string type;
     int r = read_meta("type", &type);
@@ -5870,6 +6307,10 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
     }
   }
 
+  // SMR devices may require a freelist adjustment, but that can only happen after
+  // the db is read-write. we'll stash pending changes here.
+  std::map<uint64_t, uint64_t> zone_adjustments;
+
   int r = _open_path();
   if (r < 0)
     return r;
@@ -5889,6 +6330,9 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
   if (r < 0)
     goto out_fsid;
 
+  // GBH: can probably skip open_db step in REad-Only mode when operating in NULL-FM mode
+  // (might need to open if failed to restore from file)
+
   // open in read-only first to read FM list and init allocator
   // as they might be needed for some BlueFS procedures
   r = _open_db(false, false, true);
@@ -5904,7 +6348,7 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
   if (r < 0)
     goto out_db;
 
-  r = _init_alloc();
+  r = _init_alloc(&zone_adjustments);
   if (r < 0)
     goto out_fm;
 
@@ -5914,12 +6358,43 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
   // load allocated extents from bluefs into allocator.
   // And now it's time to do that
   //
-  _close_db(true);
-
+  _close_db();
   r = _open_db(false, to_repair, read_only);
   if (r < 0) {
     goto out_alloc;
   }
+
+  if (!read_only && !zone_adjustments.empty()) {
+    // for SMR devices that have freelist mismatch with device write pointers
+    _post_init_alloc(zone_adjustments);
+  }
+
+  // when function is called in repair mode (to_repair=true) we skip db->open()/create()
+  // we can't change bluestore allocation so no need to invlidate allocation-file
+  if (fm->is_null_manager() && !read_only && !to_repair) {
+    // Now that we load the allocation map we need to invalidate the file as new allocation won't be reflected
+    // Changes to the allocation map (alloc/release) are not updated inline and will only be stored on umount()
+    // This means that we should not use the existing file on failure case (unplanned shutdown) and must resort
+    //  to recovery from RocksDB::ONodes
+    r = invalidate_allocation_file_on_bluefs();
+    if (r != 0) {
+      derr << __func__ << "::NCB::invalidate_allocation_file_on_bluefs() failed!" << dendl;
+      goto out_alloc;
+    }
+  }
+
+  // when function is called in repair mode (to_repair=true) we skip db->open()/create()
+  if (!read_only && !to_repair && cct->_conf->bluestore_allocation_from_file
+#ifdef HAVE_LIBZBD
+      && !bdev->is_smr()
+#endif
+    ) {
+    dout(5) << __func__ << "::NCB::Commit to Null-Manager" << dendl;
+    commit_to_null_manager();
+    need_to_destage_allocation_file = true;
+    dout(10) << __func__ << "::NCB::need_to_destage_allocation_file was set" << dendl;
+  }
+
   return 0;
 
 out_alloc:
@@ -5927,7 +6402,7 @@ out_alloc:
 out_fm:
   _close_fm();
  out_db:
-  _close_db(read_only);
+  _close_db();
  out_bdev:
   _close_bdev();
  out_fsid:
@@ -5937,9 +6412,14 @@ out_fm:
   return r;
 }
 
-void BlueStore::_close_db_and_around(bool read_only)
+void BlueStore::_close_db_and_around()
 {
-  _close_db(read_only);
+  if (db) {
+    _close_db();
+  }
+  if (bluefs) {
+    _close_bluefs();
+  }
   _close_fm();
   _close_alloc();
   _close_bdev();
@@ -5961,10 +6441,15 @@ int BlueStore::open_db_environment(KeyValueDB **pdb, bool to_repair)
 
 int BlueStore::close_db_environment()
 {
-  _close_db_and_around(false);
+  _close_db_and_around();
   return 0;
 }
 
+/* gets access to bluefs supporting RocksDB */
+BlueFS* BlueStore::get_bluefs() {
+  return bluefs;
+}
+
 int BlueStore::_prepare_db_environment(bool create, bool read_only,
                                       std::string* _fn, std::string* _kv_backend)
 {
@@ -6099,7 +6584,7 @@ int BlueStore::_prepare_db_environment(bool create, bool read_only,
   if (!db) {
     derr << __func__ << " error creating db" << dendl;
     if (bluefs) {
-      _close_bluefs(read_only);
+      _close_bluefs();
     }
     // delete env manually here since we can't depend on db to do this
     // under this case
@@ -6124,11 +6609,16 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
   string kv_dir_fn;
   string kv_backend;
   std::string sharding_def;
+  // prevent write attempts to BlueFS in case we failed before BlueFS was opened
+  db_was_opened_read_only = true;
   r = _prepare_db_environment(create, read_only, &kv_dir_fn, &kv_backend);
   if (r < 0) {
     derr << __func__ << " failed to prepare db environment: " << err.str() << dendl;
     return -EIO;
   }
+  // if reached here then BlueFS is already opened
+  db_was_opened_read_only = read_only;
+  dout(10) << __func__ << "::db_was_opened_read_only was set to " << read_only << dendl;
   if (kv_backend == "rocksdb") {
     options = cct->_conf->bluestore_rocksdb_options;
     options_annex = cct->_conf->bluestore_rocksdb_options_annex;
@@ -6159,7 +6649,7 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
   }
   if (r) {
     derr << __func__ << " erroring opening db: " << err.str() << dendl;
-    _close_db(read_only);
+    _close_db();
     return -EIO;
   }
   dout(1) << __func__ << " opened " << kv_backend
@@ -6167,13 +6657,28 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
   return 0;
 }
 
-void BlueStore::_close_db(bool cold_close)
+void BlueStore::_close_db_leave_bluefs()
 {
   ceph_assert(db);
   delete db;
-  db = NULL;
+  db = nullptr;
+}
+
+void BlueStore::_close_db()
+{
+  dout(10) << __func__ << ":read_only=" << db_was_opened_read_only << " fm=" << fm << " destage_alloc_file=" << need_to_destage_allocation_file << dendl;
+  _close_db_leave_bluefs();
+
+  if (need_to_destage_allocation_file) {
+    ceph_assert(fm && fm->is_null_manager());
+    int ret = store_allocator(alloc);
+    if (ret != 0) {
+      derr << __func__ << "::NCB::store_allocator() failed (continue with bitmapFreelistManager)" << dendl;
+    }
+  }
+
   if (bluefs) {
-    _close_bluefs(cold_close);
+    _close_bluefs();
   }
 }
 
@@ -6191,10 +6696,16 @@ void BlueStore::_dump_alloc_on_failure()
 
 int BlueStore::_open_collections()
 {
+  if (!coll_map.empty()) {
+    // could be opened from another path
+    dout(20) << __func__ << "::NCB::collections are already opened, nothing to do" << dendl;
+    return 0;
+  }
+
   dout(10) << __func__ << dendl;
   collections_had_errors = false;
-  ceph_assert(coll_map.empty());
   KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
+  size_t load_cnt = 0;
   for (it->upper_bound(string());
        it->valid();
        it->next()) {
@@ -6218,12 +6729,14 @@ int BlueStore::_open_collections()
               << " " << c->cnode << dendl;
       _osr_attach(c.get());
       coll_map[cid] = c;
-
+      load_cnt++;
     } else {
       derr << __func__ << " unrecognized collection " << it->key() << dendl;
       collections_had_errors = true;
     }
   }
+  dout(10) << __func__ << " collections loaded: " << load_cnt
+           <<  dendl;
   return 0;
 }
 
@@ -6451,8 +6964,6 @@ int BlueStore::mkfs()
     }
   }
 
-  freelist_type = "bitmap";
-
   r = _open_path();
   if (r < 0)
     return r;
@@ -6506,8 +7017,29 @@ int BlueStore::mkfs()
   if (r < 0)
     goto out_close_fsid;
 
+  // choose freelist manager
+#ifdef HAVE_LIBZBD
+  if (bdev->is_smr()) {
+    freelist_type = "zoned";
+    zone_size = bdev->get_zone_size();
+    first_sequential_zone = bdev->get_conventional_region_size() / zone_size;
+    bdev->reset_all_zones();
+  } else
+#endif
+  {
+    freelist_type = "bitmap";
+  }
+  dout(10) << " freelist_type " << freelist_type << dendl;
+
   // choose min_alloc_size
-  if (cct->_conf->bluestore_min_alloc_size) {
+  dout(5) << __func__ << " optimal_io_size 0x" << std::hex << optimal_io_size
+         << " block_size: 0x" << block_size << std::dec << dendl;
+  if ((cct->_conf->bluestore_use_optimal_io_size_for_min_alloc_size) && (optimal_io_size != 0)) {
+    dout(5) << __func__ << " optimal_io_size 0x" << std::hex << optimal_io_size
+               << " for min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+    min_alloc_size = optimal_io_size;
+  }
+  else if (cct->_conf->bluestore_min_alloc_size) {
     min_alloc_size = cct->_conf->bluestore_min_alloc_size;
   } else {
     ceph_assert(bdev);
@@ -6529,14 +7061,31 @@ int BlueStore::mkfs()
     goto out_close_bdev;
   }
 
+  // make sure min_alloc_size is >= and aligned with block size
+  if (min_alloc_size % block_size != 0) {
+    derr << __func__ << " min_alloc_size 0x"
+        << std::hex << min_alloc_size
+        << " is less or not aligned with block_size: 0x"
+        << block_size << std::dec <<  dendl;
+    r = -EINVAL;
+    goto out_close_bdev;
+  }
+
   r = _create_alloc();
   if (r < 0) {
     goto out_close_bdev;
   }
 
   reserved = _get_ondisk_reserved();
-  shared_alloc.a->init_add_free(reserved,
+  alloc->init_add_free(reserved,
     p2align(bdev->get_size(), min_alloc_size) - reserved);
+#ifdef HAVE_LIBZBD
+  if (bdev->is_smr() && alloc != shared_alloc.a) {
+    shared_alloc.a->init_add_free(reserved,
+                                 p2align(bdev->get_conventional_region_size(),
+                                         min_alloc_size) - reserved);
+  }
+#endif
 
   r = _open_db(true);
   if (r < 0)
@@ -6568,6 +7117,22 @@ int BlueStore::mkfs()
       }
       t->set(PREFIX_SUPER, "per_pool_omap", bl);
     }
+
+#ifdef HAVE_LIBZBD
+    if (bdev->is_smr()) {
+      {
+       bufferlist bl;
+       encode((uint64_t)zone_size, bl);
+       t->set(PREFIX_SUPER, "zone_size", bl);
+      }
+      {
+       bufferlist bl;
+       encode((uint64_t)first_sequential_zone, bl);
+       t->set(PREFIX_SUPER, "first_sequential_zone", bl);
+      }
+    }
+#endif
+    
     ondisk_format = latest_ondisk_format;
     _prepare_ondisk_format_super(t);
     db->submit_transaction_sync(t);
@@ -6592,7 +7157,7 @@ int BlueStore::mkfs()
  out_close_fm:
   _close_fm();
  out_close_db:
-  _close_db(false);
+  _close_db();
  out_close_alloc:
   _close_alloc();
  out_close_bdev:
@@ -6638,8 +7203,11 @@ int BlueStore::add_new_bluefs_device(int id, const string& dev_path)
     derr << __func__ << " bluefs isn't configured, can't add new device " << dendl;
     return -EIO;
   }
-
+  dout(5) << __func__ << "::NCB::calling open_db_and_around(read-only)" << dendl;
   r = _open_db_and_around(true);
+  if (r < 0) {
+    return r;
+  }
 
   if (id == BlueFS::BDEV_NEWWAL) {
     string p = path + "/block.wal";
@@ -6686,7 +7254,6 @@ int BlueStore::add_new_bluefs_device(int id, const string& dev_path)
     bluefs_layout.shared_bdev = BlueFS::BDEV_SLOW;
     bluefs_layout.dedicated_db = true;
   }
-
   bluefs->umount();
   bluefs->mount();
 
@@ -6699,7 +7266,7 @@ int BlueStore::add_new_bluefs_device(int id, const string& dev_path)
     dout(0) << __func__ << " success" << dendl;
   }
 
-  _close_db_and_around(true);
+  _close_db_and_around();
   return r;
 }
 
@@ -6717,7 +7284,12 @@ int BlueStore::migrate_to_existing_bluefs_device(const set<int>& devs_source,
   }
 
   int r = _open_db_and_around(true);
-
+  if (r < 0) {
+    return r;
+  }
+  auto close_db = make_scope_guard([&] {
+    _close_db_and_around();
+  });
   uint64_t used_space = 0;
   for(auto src_id : devs_source) {
     used_space += bluefs->get_used(src_id);
@@ -6728,8 +7300,7 @@ int BlueStore::migrate_to_existing_bluefs_device(const set<int>& devs_source,
          << " can't migrate, free space at target: " << target_free
         << " is less than required space: " << used_space
         << dendl;
-    r = -ENOSPC;
-    goto shutdown;
+    return -ENOSPC;
   }
   if (devs_source.count(BlueFS::BDEV_DB)) {
     bluefs_layout.shared_bdev = BlueFS::BDEV_DB;
@@ -6741,7 +7312,7 @@ int BlueStore::migrate_to_existing_bluefs_device(const set<int>& devs_source,
   r = bluefs->device_migrate_to_existing(cct, devs_source, id, bluefs_layout);
   if (r < 0) {
     derr << __func__ << " failed during BlueFS migration, " << cpp_strerror(r) << dendl;
-    goto shutdown;
+    return r;
   }
 
   if (devs_source.count(BlueFS::BDEV_DB)) {
@@ -6752,9 +7323,6 @@ int BlueStore::migrate_to_existing_bluefs_device(const set<int>& devs_source,
     r = unlink(string(path + "/block.wal").c_str());
     ceph_assert(r == 0);
   }
-
-shutdown:
-  _close_db_and_around(true);
   return r;
 }
 
@@ -6763,7 +7331,6 @@ int BlueStore::migrate_to_new_bluefs_device(const set<int>& devs_source,
   const string& dev_path)
 {
   dout(10) << __func__ << " path " << dev_path << " id:" << id << dendl;
-  int r;
   ceph_assert(path_fd < 0);
 
   ceph_assert(id == BlueFS::BDEV_NEWWAL || id == BlueFS::BDEV_NEWDB);
@@ -6773,7 +7340,13 @@ int BlueStore::migrate_to_new_bluefs_device(const set<int>& devs_source,
     return -EIO;
   }
 
-  r = _open_db_and_around(true);
+  int r = _open_db_and_around(true);
+  if (r < 0) {
+    return r;
+  }
+  auto close_db = make_scope_guard([&] {
+    _close_db_and_around();
+  });
 
   string link_db;
   string link_wal;
@@ -6788,7 +7361,7 @@ int BlueStore::migrate_to_new_bluefs_device(const set<int>& devs_source,
     bluefs_layout.dedicated_wal = false;
   }
 
-  size_t target_size;
+  size_t target_size = 0;
   string target_name;
   if (id == BlueFS::BDEV_NEWWAL) {
     target_name = "block.wal";
@@ -6836,7 +7409,7 @@ int BlueStore::migrate_to_new_bluefs_device(const set<int>& devs_source,
 
   if (r < 0) {
     derr << __func__ << " failed during BlueFS migration, " << cpp_strerror(r) << dendl;
-    goto shutdown;
+    return r;
   }
 
   if (!link_db.empty()) {
@@ -6855,9 +7428,6 @@ int BlueStore::migrate_to_new_bluefs_device(const set<int>& devs_source,
   ceph_assert(r == 0);
   dout(0) << __func__ << " success" << dendl;
 
-shutdown:
-  _close_db_and_around(true);
-
   return r;
 }
 
@@ -6949,14 +7519,19 @@ int BlueStore::expand_devices(ostream& out)
           << std::endl;
       }
     }
-    _close_db_and_around(true);
+
+    // we grow the allocation range, must reflect it in the allocation file
+    alloc->init_add_free(size0, size - size0);
+    need_to_destage_allocation_file = true;
+
+    _close_db_and_around();
 
     // mount in read/write to sync expansion changes
     r = _mount();
     ceph_assert(r == 0);
     umount();
   } else {
-    _close_db_and_around(true);
+    _close_db_and_around();
   }
   return r;
 }
@@ -6966,7 +7541,7 @@ int BlueStore::dump_bluefs_sizes(ostream& out)
   int r = _open_db_and_around(true);
   ceph_assert(r == 0);
   bluefs->dump_block_extents(out);
-  _close_db_and_around(true);
+  _close_db_and_around();
   return r;
 }
 
@@ -6992,10 +7567,10 @@ void BlueStore::set_cache_shards(unsigned num)
 
 int BlueStore::_mount()
 {
-  dout(1) << __func__ << " path " << path << dendl;
-
+  dout(5) << __func__ << "NCB:: path " << path << dendl;
   _kv_only = false;
   if (cct->_conf->bluestore_fsck_on_mount) {
+    dout(5) << __func__ << "::NCB::calling fsck()" << dendl;
     int rc = fsck(cct->_conf->bluestore_fsck_on_mount_deep);
     if (rc < 0)
       return rc;
@@ -7012,33 +7587,55 @@ int BlueStore::_mount()
     return -EINVAL;
   }
 
+  dout(5) << __func__ << "::NCB::calling open_db_and_around(read/write)" << dendl;
   int r = _open_db_and_around(false);
   if (r < 0) {
     return r;
   }
+  auto close_db = make_scope_guard([&] {
+    if (!mounted) {
+      _close_db_and_around();
+    }
+  });
 
   r = _upgrade_super();
   if (r < 0) {
-    goto out_db;
+    return r;
   }
 
+  // The recovery process for allocation-map needs to open collection early
   r = _open_collections();
-  if (r < 0)
-    goto out_db;
+  if (r < 0) {
+    return r;
+  }
+  auto shutdown_cache = make_scope_guard([&] {
+    if (!mounted) {
+      _shutdown_cache();
+    }
+  });
 
   r = _reload_logger();
-  if (r < 0)
-    goto out_coll;
+  if (r < 0) {
+    return r;
+  }
 
   _kv_start();
+  auto stop_kv = make_scope_guard([&] {
+    if (!mounted) {
+      _kv_stop();
+    }
+  });
+
+  r = _deferred_replay();
+  if (r < 0) {
+    return r;
+  }
 
+#ifdef HAVE_LIBZBD
   if (bdev->is_smr()) {
     _zoned_cleaner_start();
   }
-
-  r = _deferred_replay();
-  if (r < 0)
-    goto out_stop;
+#endif
 
   mempool_thread.init();
 
@@ -7063,41 +7660,32 @@ int BlueStore::_mount()
 
   mounted = true;
   return 0;
-
- out_stop:
-  if (bdev->is_smr()) {
-    _zoned_cleaner_stop();
-  }
-  _kv_stop();
- out_coll:
-  _shutdown_cache();
- out_db:
-  _close_db_and_around(false);
-  return r;
 }
 
 int BlueStore::umount()
 {
   ceph_assert(_kv_only || mounted);
-  dout(1) << __func__ << dendl;
-
   _osr_drain_all();
 
   mounted = false;
+
+  ceph_assert(alloc);
+
   if (!_kv_only) {
     mempool_thread.shutdown();
+#ifdef HAVE_LIBZBD
     if (bdev->is_smr()) {
       dout(20) << __func__ << " stopping zone cleaner thread" << dendl;
       _zoned_cleaner_stop();
     }
+#endif
     dout(20) << __func__ << " stopping kv thread" << dendl;
     _kv_stop();
     _shutdown_cache();
     dout(20) << __func__ << " closing" << dendl;
-
   }
-  _close_db_and_around(false);
 
+  _close_db_and_around();
   if (cct->_conf->bluestore_fsck_on_umount) {
     int rc = fsck(cct->_conf->bluestore_fsck_on_umount_deep);
     if (rc < 0)
@@ -7117,7 +7705,7 @@ int BlueStore::cold_open()
 
 int BlueStore::cold_close()
 {
-  _close_db_and_around(true);
+  _close_db_and_around();
   return 0;
 }
 
@@ -7151,8 +7739,7 @@ int _fsck_sum_extents(
 }
 
 int BlueStore::_fsck_check_extents(
-  const coll_t& cid,
-  const ghobject_t& oid,
+  std::string_view ctx_descr,
   const PExtentVector& extents,
   bool compressed,
   mempool_dynamic_bitset &used_blocks,
@@ -7161,7 +7748,7 @@ int BlueStore::_fsck_check_extents(
   store_statfs_t& expected_statfs,
   FSCKDepth depth)
 {
-  dout(30) << __func__ << " oid " << oid << " extents " << extents << dendl;
+  dout(30) << __func__ << " " << ctx_descr << ", extents " << extents << dendl;
   int errors = 0;
   for (auto e : extents) {
     if (!e.is_valid())
@@ -7181,7 +7768,7 @@ int BlueStore::_fsck_check_extents(
                pos * min_alloc_size, min_alloc_size, !already);
            }
             if (!already) {
-              derr << "fsck error: " << oid << " extent " << e
+              derr << __func__ << "::fsck error: " << ctx_descr << ", extent " << e
                   << " or a subset is already allocated (misreferenced)" << dendl;
              ++errors;
              already = true;
@@ -7190,12 +7777,9 @@ int BlueStore::_fsck_check_extents(
          else
            bs.set(pos);
         });
-        if (repairer) {
-         repairer->set_space_used(e.offset, e.length, cid, oid);
-        }
 
       if (e.end() > bdev->get_size()) {
-        derr << "fsck error:  " << oid << " extent " << e
+        derr << "fsck error:  " << ctx_descr << ", extent " << e
             << " past end of block device" << dendl;
         ++errors;
       }
@@ -7311,6 +7895,170 @@ void BlueStore::_fsck_check_pool_statfs(
   }
 }
 
+void BlueStore::_fsck_repair_shared_blobs(
+  BlueStoreRepairer& repairer,
+  shared_blob_2hash_tracker_t& sb_ref_counts,
+  sb_info_space_efficient_map_t& sb_info)
+{
+  auto sb_ref_mismatches = sb_ref_counts.count_non_zero();
+  dout(1) << __func__ << " repairing shared_blobs, ref mismatch estimate: "
+         << sb_ref_mismatches << dendl;
+  if (!sb_ref_mismatches) // not expected to succeed, just in case
+    return;
+
+
+  auto foreach_shared_blob = [&](std::function<
+    void (coll_t,
+          ghobject_t,
+          uint64_t,
+          const bluestore_blob_t&)> cb) {
+      auto it = db->get_iterator(PREFIX_OBJ, KeyValueDB::ITERATOR_NOCACHE);
+      if (it) {
+        CollectionRef c;
+        spg_t pgid;
+        for (it->lower_bound(string()); it->valid(); it->next()) {
+          dout(30) << __func__ << " key "
+                  << pretty_binary_string(it->key())
+                  << dendl;
+          if (is_extent_shard_key(it->key())) {
+           continue;
+          }
+
+          ghobject_t oid;
+          int r = get_key_object(it->key(), &oid);
+          if (r < 0) {
+           continue;
+          }
+
+          if (!c ||
+           oid.shard_id != pgid.shard ||
+           oid.hobj.get_logical_pool() != (int64_t)pgid.pool() ||
+           !c->contains(oid)) {
+           c = nullptr;
+           for (auto& p : coll_map) {
+             if (p.second->contains(oid)) {
+               c = p.second;
+               break;
+             }
+           }
+           if (!c) {
+             continue;
+           }
+          }
+          dout(20) << __func__
+                   << " inspecting shared blob refs for col:" << c->cid
+                  << " obj:" << oid
+                  << dendl;
+
+          OnodeRef o;
+          o.reset(Onode::decode(c, oid, it->key(), it->value()));
+          o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
+
+          _dump_onode<30>(cct, *o);
+
+          mempool::bluestore_fsck::set<BlobRef> passed_sbs;
+          for (auto& e : o->extent_map.extent_map) {
+           auto& b = e.blob->get_blob();
+           if (b.is_shared() && passed_sbs.count(e.blob) == 0) {
+             auto sbid = e.blob->shared_blob->get_sbid();
+             cb(c->cid, oid, sbid, b);
+             passed_sbs.emplace(e.blob);
+           }
+          } // for ... extent_map
+        } // for ... it->valid
+      } //if (it(PREFIX_OBJ))
+    }; //foreach_shared_blob fn declaration
+
+  mempool::bluestore_fsck::map<uint64_t, bluestore_extent_ref_map_t> refs_map;
+
+  // first iteration over objects to identify all the broken sbids
+  foreach_shared_blob( [&](coll_t cid,
+                           ghobject_t oid,
+                           uint64_t sbid,
+                           const bluestore_blob_t& b) {
+    auto it = refs_map.lower_bound(sbid);
+    if(it != refs_map.end() && it->first == sbid) {
+      return;
+    }
+    for (auto& p : b.get_extents()) {
+      if (p.is_valid() &&
+         !sb_ref_counts.test_all_zero_range(sbid,
+                                            p.offset,
+                                            p.length)) {
+       refs_map.emplace_hint(it, sbid, bluestore_extent_ref_map_t());
+        dout(20) << __func__
+                 << " broken shared blob found for col:" << cid
+                << " obj:" << oid
+                << " sbid 0x " << std::hex << sbid << std::dec
+                << dendl;
+       break;
+      }
+    }
+  });
+
+  // second iteration over objects to build new ref map for the broken sbids
+  foreach_shared_blob( [&](coll_t cid,
+                           ghobject_t oid,
+                           uint64_t sbid,
+                           const bluestore_blob_t& b) {
+    auto it = refs_map.find(sbid);
+    if(it == refs_map.end()) {
+      return;
+    }
+    for (auto& p : b.get_extents()) {
+      if (p.is_valid()) {
+       it->second.get(p.offset, p.length);
+       break;
+      }
+    }
+  });
+
+  // update shared blob records
+  auto ref_it = refs_map.begin();
+  while (ref_it != refs_map.end()) {
+    size_t cnt = 0;
+    const size_t max_transactions = 4096;
+    KeyValueDB::Transaction txn = db->get_transaction();
+    for (cnt = 0;
+      cnt < max_transactions && ref_it != refs_map.end();
+      ref_it++) {
+      auto sbid = ref_it->first;
+      dout(20) << __func__ << " repaired shared_blob 0x"
+       << std::hex << sbid << std::dec
+       << ref_it->second << dendl;
+      repairer.fix_shared_blob(txn, sbid, &ref_it->second, 0);
+      cnt++;
+    }
+    if (cnt) {
+      db->submit_transaction_sync(txn);
+      cnt = 0;
+    }
+  }
+  // remove stray shared blob records
+  size_t cnt = 0;
+  const size_t max_transactions = 4096;
+  KeyValueDB::Transaction txn = db->get_transaction();
+  sb_info.foreach_stray([&](const sb_info_t& sbi) {
+    auto sbid = sbi.get_sbid();
+    dout(20) << __func__ << " removing stray shared_blob 0x"
+      << std::hex << sbid << std::dec
+      << dendl;
+    repairer.fix_shared_blob(txn, sbid, nullptr, 0);
+    cnt++;
+    if (cnt >= max_transactions) {}
+      db->submit_transaction_sync(txn);
+      txn = db->get_transaction();
+      cnt = 0;
+    });
+  if (cnt > 0) {
+    db->submit_transaction_sync(txn);
+  }
+
+  // amount of repairs to report to be equal to previously
+  // determined error estimation, not the actual number of updated shared blobs
+  repairer.inc_repaired(sb_ref_mismatches);
+}
+
 BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
   BlueStore::FSCKDepth depth,
   int64_t pool_id,
@@ -7331,12 +8079,15 @@ BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
   auto used_blocks = ctx.used_blocks;
   auto sb_info_lock = ctx.sb_info_lock;
   auto& sb_info = ctx.sb_info;
+  auto& sb_ref_counts = ctx.sb_ref_counts;
   auto repairer = ctx.repairer;
 
   store_statfs_t* res_statfs = (per_pool_stat_collection || repairer) ?
     &ctx.expected_pool_statfs[pool_id] :
     &ctx.expected_store_statfs;
 
+  map<uint32_t, uint64_t> zone_first_offsets;  // for zoned/smr devices
+
   dout(10) << __func__ << "  " << oid << dendl;
   OnodeRef o;
   o.reset(Onode::decode(c, oid, key, value));
@@ -7392,11 +8143,27 @@ BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
     ceph_assert(l.blob);
     const bluestore_blob_t& blob = l.blob->get_blob();
 
-    auto& ref = ref_map[l.blob];
-    if (ref.is_empty()) {
-      uint32_t min_release_size = blob.get_release_size(min_alloc_size);
-      uint32_t l = blob.get_logical_length();
-      ref.init(l, min_release_size);
+#ifdef HAVE_LIBZBD
+    if (bdev->is_smr() && depth != FSCK_SHALLOW) {
+      for (auto& e : blob.get_extents()) {
+       if (e.is_valid()) {
+         uint32_t zone = e.offset / zone_size;
+         uint64_t offset = e.offset % zone_size;
+         auto p = zone_first_offsets.find(zone);
+         if (p == zone_first_offsets.end() || p->second > offset) {
+           // FIXME: use interator for guided insert?
+           zone_first_offsets[zone] = offset;
+         }
+       }
+      }
+    }
+#endif
+
+    auto& ref = ref_map[l.blob];
+    if (ref.is_empty()) {
+      uint32_t min_release_size = blob.get_release_size(min_alloc_size);
+      uint32_t l = blob.get_logical_length();
+      ref.init(l, min_release_size);
     }
     ref.get(
       l.blob_offset,
@@ -7442,14 +8209,20 @@ BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
       res_statfs->data_compressed_original +=
         i.first->get_referenced_bytes();
     }
+    if (depth != FSCK_SHALLOW && repairer) {
+      for (auto e : blob.get_extents()) {
+       if (!e.is_valid())
+         continue;
+       repairer->set_space_used(e.offset, e.length, c->cid, oid);
+      }
+    }
     if (blob.is_shared()) {
       if (i.first->shared_blob->get_sbid() > blobid_max) {
         derr << "fsck error: " << oid << " blob " << blob
           << " sbid " << i.first->shared_blob->get_sbid() << " > blobid_max "
           << blobid_max << dendl;
         ++errors;
-      }
-      else if (i.first->shared_blob->get_sbid() == 0) {
+      } else if (i.first->shared_blob->get_sbid() == 0) {
         derr << "fsck error: " << oid << " blob " << blob
           << " marked as shared but has uninitialized sbid"
           << dendl;
@@ -7459,18 +8232,22 @@ BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
       if (sb_info_lock) {
         sb_info_lock->lock();
       }
-      sb_info_t& sbi = sb_info[i.first->shared_blob->get_sbid()];
-      ceph_assert(sbi.cid == coll_t() || sbi.cid == c->cid);
-      ceph_assert(sbi.pool_id == INT64_MIN ||
+      auto sbid = i.first->shared_blob->get_sbid();
+      sb_info_t& sbi = sb_info.add_or_adopt(i.first->shared_blob->get_sbid());
+      ceph_assert(sbi.pool_id == sb_info_t::INVALID_POOL_ID ||
         sbi.pool_id == oid.hobj.get_logical_pool());
-      sbi.cid = c->cid;
       sbi.pool_id = oid.hobj.get_logical_pool();
-      sbi.sb = i.first->shared_blob;
-      sbi.oids.push_back(oid);
-      sbi.compressed = blob.is_compressed();
+      bool compressed = blob.is_compressed();
       for (auto e : blob.get_extents()) {
         if (e.is_valid()) {
-          sbi.ref_map.get(e.offset, e.length);
+         if (compressed) {
+           ceph_assert(sbi.allocated_chunks <= 0);
+           sbi.allocated_chunks -= (e.length >> min_alloc_size_order);
+         } else {
+           ceph_assert(sbi.allocated_chunks >= 0);
+           sbi.allocated_chunks += (e.length >> min_alloc_size_order);
+         }
+         sb_ref_counts.inc_range(sbid, e.offset, e.length, 1);
         }
       }
       if (sb_info_lock) {
@@ -7478,11 +8255,13 @@ BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
       }
     } else if (depth != FSCK_SHALLOW) {
       ceph_assert(used_blocks);
-      errors += _fsck_check_extents(c->cid, oid, blob.get_extents(),
+      string ctx_descr = " oid " + stringify(oid);
+      errors += _fsck_check_extents(ctx_descr,
+       blob.get_extents(),
         blob.is_compressed(),
         *used_blocks,
         fm->get_alloc_size(),
-        repairer,
+       repairer,
         *res_statfs,
         depth);
     } else {
@@ -7510,6 +8289,34 @@ BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
         }
       }
     }
+
+#ifdef HAVE_LIBZBD
+    if (bdev->is_smr() && depth != FSCK_SHALLOW) {
+      for (auto& [zone, first_offset] : zone_first_offsets) {
+       auto p = (*ctx.zone_refs)[zone].find(oid);
+       if (p != (*ctx.zone_refs)[zone].end()) {
+         if (first_offset < p->second) {
+           dout(20) << " slightly wonky zone ref 0x" << std::hex << zone
+                << " offset 0x" << p->second
+                << " but first offset is 0x" << first_offset
+                << "; this can happen due to clone_range"
+                << dendl;
+         } else {
+           dout(20) << " good zone ref 0x" << std::hex << zone << " offset 0x" << p->second
+                    << " <= first offset 0x" << first_offset
+                    << std::dec << dendl;
+         }
+         (*ctx.zone_refs)[zone].erase(p);
+       } else {
+         derr << "fsck error: " << oid << " references zone 0x" << std::hex << zone
+              << " but there is no zone ref" << std::dec << dendl;
+         // FIXME: add repair
+         ++errors;
+       }
+      }
+    }
+#endif
+
     if (broken) {
       derr << "fsck error: " << oid << " - " << broken
            << " zombie spanning blob(s) found, the first one: "
@@ -7584,7 +8391,8 @@ public:
     BlueStore* store = nullptr;
 
     ceph::mutex* sb_info_lock = nullptr;
-    BlueStore::sb_info_map_t* sb_info = nullptr;
+    sb_info_space_efficient_map_t* sb_info = nullptr;
+    shared_blob_2hash_tracker_t* sb_ref_counts = nullptr;
     BlueStoreRepairer* repairer = nullptr;
 
     Batch* batches = nullptr;
@@ -7595,13 +8403,15 @@ public:
                   size_t _batchCount,
                   BlueStore* _store,
                   ceph::mutex* _sb_info_lock,
-                  BlueStore::sb_info_map_t& _sb_info,
+                  sb_info_space_efficient_map_t& _sb_info,
+                 shared_blob_2hash_tracker_t& _sb_ref_counts,
                   BlueStoreRepairer* _repairer) :
       WorkQueue_(n, ceph::timespan::zero(), ceph::timespan::zero()),
       batchCount(_batchCount),
       store(_store),
       sb_info_lock(_sb_info_lock),
       sb_info(&_sb_info),
+      sb_ref_counts(&_sb_ref_counts),
       repairer(_repairer)
     {
       batches = new Batch[batchCount];
@@ -7652,8 +8462,10 @@ public:
         batch->num_spanning_blobs,
         nullptr, // used_blocks
         nullptr, //used_omap_head
+       nullptr,
         sb_info_lock,
         *sb_info,
+       *sb_ref_counts,
         batch->expected_store_statfs,
         batch->expected_pool_statfs,
         repairer);
@@ -7672,7 +8484,6 @@ public:
           nullptr, // referenced
           ctx);
       }
-      //std::cout << "processed " << batch << std::endl;
       batch->entry_count = 0;
       batch->running--;
     }
@@ -7921,12 +8732,14 @@ void BlueStore::_fsck_check_object_omap(FSCKDepth depth,
   }
 }
 
-void BlueStore::_fsck_check_objects(FSCKDepth depth,
+void BlueStore::_fsck_check_objects(
+  FSCKDepth depth,
   BlueStore::FSCK_ObjectCtx& ctx)
 {
   auto& errors = ctx.errors;
   auto sb_info_lock = ctx.sb_info_lock;
   auto& sb_info = ctx.sb_info;
+  auto& sb_ref_counts = ctx.sb_ref_counts;
   auto repairer = ctx.repairer;
 
   uint64_t_btree_t used_nids;
@@ -7945,6 +8758,7 @@ void BlueStore::_fsck_check_objects(FSCKDepth depth,
         this,
         sb_info_lock,
         sb_info,
+       sb_ref_counts,
         repairer));
 
     ShallowFSCKThreadPool thread_pool(cct, "ShallowFSCKThreadPool", "ShallowFSCK", thread_count);
@@ -7956,7 +8770,7 @@ void BlueStore::_fsck_check_objects(FSCKDepth depth,
       thread_pool.start();
     }
 
-    //fill global if not overriden below
+    // fill global if not overriden below
     CollectionRef c;
     int64_t pool_id = -1;
     spg_t pgid;
@@ -8061,7 +8875,6 @@ void BlueStore::_fsck_check_objects(FSCKDepth depth,
 
       if (!queued) {
         ++processed_myself;
-
          o = fsck_check_objects_shallow(
           depth,
           pool_id,
@@ -8214,32 +9027,40 @@ Detection stage (in processing order):
 */
 int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
 {
-  dout(1) << __func__
+  dout(5) << __func__
     << (repair ? " repair" : " check")
     << (depth == FSCK_DEEP ? " (deep)" :
       depth == FSCK_SHALLOW ? " (shallow)" : " (regular)")
     << dendl;
 
   // in deep mode we need R/W write access to be able to replay deferred ops
-  bool read_only = !(repair || depth == FSCK_DEEP);
-
+  const bool read_only = !(repair || depth == FSCK_DEEP);
   int r = _open_db_and_around(read_only);
-  if (r < 0)
+  if (r < 0) {
     return r;
+  }
+  auto close_db = make_scope_guard([&] {
+    _close_db_and_around();
+  });
 
   if (!read_only) {
     r = _upgrade_super();
     if (r < 0) {
-      goto out_db;
+      return r;
     }
   }
 
+  // NullFreelistManager needs to open collection early
   r = _open_collections();
-  if (r < 0)
-    goto out_db;
+  if (r < 0) {
+    return r;
+  }
 
   mempool_thread.init();
-
+  auto stop_mempool = make_scope_guard([&] {
+    mempool_thread.shutdown();
+    _shutdown_cache();
+  });
   // we need finisher and kv_{sync,finalize}_thread *just* for replay
   // enable in repair or deep mode modes only
   if (!read_only) {
@@ -8247,28 +9068,27 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
     r = _deferred_replay();
     _kv_stop();
   }
-  if (r < 0)
-    goto out_scan;
-
-  r = _fsck_on_open(depth, repair);
-
-out_scan:
-  mempool_thread.shutdown();
-  _shutdown_cache();
-out_db:
-  _close_db_and_around(false);
 
-  return r;
+  if (r < 0) {
+    return r;
+  }
+  return _fsck_on_open(depth, repair);
 }
 
 int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
 {
+  uint64_t sb_hash_size = uint64_t(
+    cct->_conf.get_val<Option::size_t>("osd_memory_target") *
+    cct->_conf.get_val<double>(
+      "bluestore_fsck_shared_blob_tracker_size"));
+
   dout(1) << __func__
          << " <<<START>>>"
          << (repair ? " repair" : " check")
          << (depth == FSCK_DEEP ? " (deep)" :
                 depth == FSCK_SHALLOW ? " (shallow)" : " (regular)")
-          << " start" << dendl;
+          << " start sb_tracker_hash_size:" << sb_hash_size
+          << dendl;
   int64_t errors = 0;
   int64_t warnings = 0;
   unsigned repaired = 0;
@@ -8281,7 +9101,14 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
   store_statfs_t expected_store_statfs, actual_statfs;
   per_pool_statfs expected_pool_statfs;
 
-  sb_info_map_t sb_info;
+  sb_info_space_efficient_map_t sb_info;
+  shared_blob_2hash_tracker_t sb_ref_counts(
+    sb_hash_size,
+    min_alloc_size);
+  size_t sb_ref_mismatches = 0;
+
+  /// map of oid -> (first_)offset for each zone
+  std::vector<std::unordered_map<ghobject_t, uint64_t>> zone_refs;   // FIXME: this may be a lot of RAM!
 
   uint64_t num_objects = 0;
   uint64_t num_extents = 0;
@@ -8376,6 +9203,107 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
     dout(1) << __func__ << " debug abort" << dendl;
     goto out_scan;
   }
+
+#ifdef HAVE_LIBZBD
+  if (bdev->is_smr()) {
+    auto a = dynamic_cast<ZonedAllocator*>(alloc);
+    ceph_assert(a);
+    auto f = dynamic_cast<ZonedFreelistManager*>(fm);
+    ceph_assert(f);
+    vector<uint64_t> wp = bdev->get_zones();
+    vector<zone_state_t> zones = f->get_zone_states(db);
+    ceph_assert(wp.size() == zones.size());
+    auto num_zones = bdev->get_size() / zone_size;
+    for (unsigned i = first_sequential_zone; i < num_zones; ++i) {
+      uint64_t p = wp[i] == (i + 1) * zone_size ? zone_size : wp[i] % zone_size;
+      if (zones[i].write_pointer > p &&
+         zones[i].num_dead_bytes < zones[i].write_pointer) {
+       derr << "fsck error: zone 0x" << std::hex << i
+            << " bluestore write pointer 0x" << zones[i].write_pointer
+            << " > device write pointer 0x" << p
+            << " (with only 0x" << zones[i].num_dead_bytes << " dead bytes)"
+            << std::dec << dendl;
+       ++errors;
+      }
+    }
+
+    if (depth != FSCK_SHALLOW) {
+      // load zone refs
+      zone_refs.resize(bdev->get_size() / zone_size);
+      it = db->get_iterator(PREFIX_ZONED_CL_INFO, KeyValueDB::ITERATOR_NOCACHE);
+      if (it) {
+       for (it->lower_bound(string());
+            it->valid();
+            it->next()) {
+         uint32_t zone = 0;
+         uint64_t offset = 0;
+         ghobject_t oid;
+         string key = it->key();
+         int r = get_key_zone_offset_object(key, &zone, &offset, &oid);
+         if (r < 0) {
+           derr << "fsck error: invalid zone ref key " << pretty_binary_string(key)
+                << dendl;
+           if (repair) {
+             repairer.remove_key(db, PREFIX_ZONED_CL_INFO, key);
+           }
+           ++errors;
+           continue;
+         }
+         dout(30) << " zone ref 0x" << std::hex << zone << " offset 0x" << offset
+                  << " -> " << std::dec << oid << dendl;
+         if (zone_refs[zone].count(oid)) {
+           derr << "fsck error: second zone ref in zone 0x" << std::hex << zone
+                << " offset 0x" << offset << std::dec << " for " << oid << dendl;
+           if (repair) {
+             repairer.remove_key(db, PREFIX_ZONED_CL_INFO, key);
+           }
+           ++errors;
+           continue;
+         }
+         zone_refs[zone][oid] = offset;
+       }
+      }
+    }
+  }
+#endif
+
+  dout(1) << __func__ << " checking shared_blobs (phase 1)" << dendl;
+  it = db->get_iterator(PREFIX_SHARED_BLOB, KeyValueDB::ITERATOR_NOCACHE);
+  if (it) {
+    for (it->lower_bound(string()); it->valid(); it->next()) {
+      string key = it->key();
+      uint64_t sbid;
+      if (get_key_shared_blob(key, &sbid) < 0) {
+        // Failed to parse the key.
+       // This gonna to be handled at the second stage
+       continue;
+      }
+      bluestore_shared_blob_t shared_blob(sbid);
+      bufferlist bl = it->value();
+      auto blp = bl.cbegin();
+      try {
+       decode(shared_blob, blp);
+      }
+      catch (ceph::buffer::error& e) {
+       // this gonna to be handled at the second stage
+       continue;
+      }
+      dout(20) << __func__ << "  " << shared_blob << dendl;
+      auto& sbi = sb_info.add_maybe_stray(sbid);
+
+      // primarily to silent the 'unused' warning
+      ceph_assert(sbi.pool_id == sb_info_t::INVALID_POOL_ID);
+
+      for (auto& r : shared_blob.ref_map.ref_map) {
+       sb_ref_counts.inc_range(
+         sbid,
+         r.first,
+         r.second.length,
+         -r.second.refs);
+      }
+    }
+  } // if (it) //checking shared_blobs (phase1)
+
   // walk PREFIX_OBJ
   {
     dout(1) << __func__ << " walking object keyspace" << dendl;
@@ -8390,10 +9318,12 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
       num_spanning_blobs,
       &used_blocks,
       &used_omap_head,
+      &zone_refs,
       //no need for the below lock when in non-shallow mode as
       // there is no multithreading in this case
       depth == FSCK_SHALLOW ? &sb_info_lock : nullptr,
       sb_info,
+      sb_ref_counts,
       expected_store_statfs,
       expected_pool_statfs,
       repair ? &repairer : nullptr);
@@ -8401,19 +9331,42 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
     _fsck_check_objects(depth, ctx);
   }
 
-  dout(1) << __func__ << " checking shared_blobs" << dendl;
+#ifdef HAVE_LIBZBD
+  if (bdev->is_smr() && depth != FSCK_SHALLOW) {
+    dout(1) << __func__ << " checking for leaked zone refs" << dendl;
+    for (uint32_t zone = 0; zone < zone_refs.size(); ++zone) {
+      for (auto& [oid, offset] : zone_refs[zone]) {
+       derr << "fsck error: stray zone ref 0x" << std::hex << zone
+            << " offset 0x" << offset << " -> " << std::dec << oid << dendl;
+       // FIXME: add repair
+       ++errors;
+      }
+    }
+  }
+#endif
+
+  sb_ref_mismatches = sb_ref_counts.count_non_zero();
+  if (sb_ref_mismatches != 0) {
+    derr << "fsck error: shared blob references aren't matching, at least "
+      << sb_ref_mismatches << " found" << dendl;
+    errors += sb_ref_mismatches;
+  }
+
+  if (depth != FSCK_SHALLOW && repair) {
+    _fsck_repair_shared_blobs(repairer, sb_ref_counts, sb_info);
+  }
+  dout(1) << __func__ << " checking shared_blobs (phase 2)" << dendl;
   it = db->get_iterator(PREFIX_SHARED_BLOB, KeyValueDB::ITERATOR_NOCACHE);
   if (it) {
     // FIXME minor: perhaps simplify for shallow mode?
     // fill global if not overriden below
     auto expected_statfs = &expected_store_statfs;
-
     for (it->lower_bound(string()); it->valid(); it->next()) {
       string key = it->key();
       uint64_t sbid;
       if (get_key_shared_blob(key, &sbid)) {
        derr << "fsck error: bad key '" << key
-            << "' in shared blob namespace" << dendl;
+         << "' in shared blob namespace" << dendl;
        if (repair) {
          repairer.remove_key(db, PREFIX_SHARED_BLOB, key);
        }
@@ -8422,63 +9375,62 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
       }
       auto p = sb_info.find(sbid);
       if (p == sb_info.end()) {
-       derr << "fsck error: found stray shared blob data for sbid 0x"
-            << std::hex << sbid << std::dec << dendl;
-       if (repair) {
-         repairer.remove_key(db, PREFIX_SHARED_BLOB, key);
+        if (sb_ref_mismatches > 0) {
+         // highly likely this has been already reported before, ignoring...
+         dout(5) << __func__ << " found duplicate(?) stray shared blob data for sbid 0x"
+           << std::hex << sbid << std::dec << dendl;
+       } else {
+         derr<< "fsck error: found stray shared blob data for sbid 0x"
+           << std::hex << sbid << std::dec << dendl;
+         ++errors;
+         if (repair) {
+           repairer.remove_key(db, PREFIX_SHARED_BLOB, key);
+         }
        }
-       ++errors;
       } else {
        ++num_shared_blobs;
-       sb_info_t& sbi = p->second;
+       sb_info_t& sbi = *p;
        bluestore_shared_blob_t shared_blob(sbid);
        bufferlist bl = it->value();
        auto blp = bl.cbegin();
        try {
-         decode(shared_blob, blp);
-       } catch (ceph::buffer::error& e) {
-          ++errors;
-          // Force update and don't report as missing
-          sbi.updated = sbi.passed = true;
-
-          derr << "fsck error: failed to decode Shared Blob"
-              << pretty_binary_string(it->key()) << dendl;
-          if (repair) {
-           dout(20) << __func__ << " undecodable Shared Blob, key:'"
-                    << pretty_binary_string(it->key())
-                    << "', removing" << dendl;
-            repairer.remove_key(db, PREFIX_DEFERRED, it->key());
-          }
-          continue;
-        }      
-       dout(20) << __func__ << "  " << *sbi.sb << " " << shared_blob << dendl;
-       if (shared_blob.ref_map != sbi.ref_map) {
-         derr << "fsck error: shared blob 0x" << std::hex << sbid
-               << std::dec << " ref_map " << shared_blob.ref_map
-               << " != expected " << sbi.ref_map << dendl;
-         sbi.updated = true; // will update later in repair mode only!
+         decode(shared_blob, blp);
+       }
+       catch (ceph::buffer::error& e) {
          ++errors;
+
+         derr << "fsck error: failed to decode Shared Blob"
+           << pretty_binary_string(key) << dendl;
+         if (repair) {
+           dout(20) << __func__ << " undecodable Shared Blob, key:'"
+             << pretty_binary_string(key)
+             << "', removing" << dendl;
+           repairer.remove_key(db, PREFIX_SHARED_BLOB, key);
+         }
+         continue;
        }
+       dout(20) << __func__ << "  " << shared_blob << dendl;
        PExtentVector extents;
-       for (auto &r : shared_blob.ref_map.ref_map) {
+       for (autor : shared_blob.ref_map.ref_map) {
          extents.emplace_back(bluestore_pextent_t(r.first, r.second.length));
        }
-       if (per_pool_stat_collection || repair) {
+       if (sbi.pool_id != sb_info_t::INVALID_POOL_ID &&
+           (per_pool_stat_collection || repair)) {
          expected_statfs = &expected_pool_statfs[sbi.pool_id];
        }
-       errors += _fsck_check_extents(sbi.cid,
-                                     sbi.oids.front(),
-                                     extents,
-                                     sbi.compressed,
-                                     used_blocks,
-                                     fm->get_alloc_size(),
-                                     repair ? &repairer : nullptr,
-                                     *expected_statfs,
-                                      depth);
-       sbi.passed = true;
+       std::stringstream ss;
+       ss << "sbid 0x" << std::hex << sbid << std::dec;
+       errors += _fsck_check_extents(ss.str(),
+         extents,
+         sbi.allocated_chunks < 0,
+         used_blocks,
+         fm->get_alloc_size(),
+         repair ? &repairer : nullptr,
+         *expected_statfs,
+         depth);
       }
     }
-  } // if (it)
+  } // if (it) /* checking shared_blobs (phase 2)*/
 
   if (repair && repairer.preprocess_misreference(db)) {
 
@@ -8582,18 +9534,19 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
              continue;
            }
            PExtentVector exts;
+           dout(5) << __func__ << "::NCB::(F)alloc=" << alloc << ", length=" << e->length << dendl;
            int64_t alloc_len =
-              shared_alloc.a->allocate(e->length, min_alloc_size,
+              alloc->allocate(e->length, min_alloc_size,
                                       0, 0, &exts);
            if (alloc_len < 0 || alloc_len < (int64_t)e->length) {
              derr << __func__
                   << " failed to allocate 0x" << std::hex << e->length
                   << " allocated 0x " << (alloc_len < 0 ? 0 : alloc_len)
                   << " min_alloc_size 0x" << min_alloc_size
-                  << " available 0x " << shared_alloc.a->get_free()
+                  << " available 0x " << alloc->get_free()
                   << std::dec << dendl;
              if (alloc_len > 0) {
-                shared_alloc.a->release(exts);
+                alloc->release(exts);
              }
              bypass_rest = true;
              break;
@@ -8604,7 +9557,7 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
            }
 
            bufferlist bl;
-           IOContext ioc(cct, NULL, true); // allow EIO
+           IOContext ioc(cct, NULL, !cct->_conf->bluestore_fail_eio);
            r = bdev->read(e->offset, e->length, &bl, &ioc, false);
            if (r < 0) {
              derr << __func__ << " failed to read from 0x" << std::hex << e->offset
@@ -8615,8 +9568,7 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
            e = pextents.erase(e);
            e = pextents.insert(e, exts.begin(), exts.end());
            b->get_blob().map_bl(
-             b_off_cur,
-             bl,
+             b_off_cur, bl,
              [&](uint64_t offset, bufferlist& t) {
                int r = bdev->write(offset, t, false);
                ceph_assert(r == 0);
@@ -8630,22 +9582,24 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
          if (b->get_blob().is_shared()) {
             b->dirty_blob().clear_flag(bluestore_blob_t::FLAG_SHARED);
 
-           auto sb_it = sb_info.find(b->shared_blob->get_sbid());
+           auto sbid = b->shared_blob->get_sbid();
+           auto sb_it = sb_info.find(sbid);
            ceph_assert(sb_it != sb_info.end());
-           sb_info_t& sbi = sb_it->second;
-
-           for (auto& r : sbi.ref_map.ref_map) {
-             expected_statfs->allocated -= r.second.length;
-             if (sbi.compressed) {
-               // NB: it's crucial to use compressed flag from sb_info_t
-               // as we originally used that value while accumulating 
-               // expected_statfs
-               expected_statfs->data_compressed_allocated -= r.second.length;
-             }
+           sb_info_t& sbi = *sb_it;
+
+           if (sbi.allocated_chunks < 0) {
+             // NB: it's crucial to use compressed_allocated_chunks from sb_info_t
+             // as we originally used that value while accumulating
+             // expected_statfs
+             expected_statfs->allocated -= uint64_t(-sbi.allocated_chunks) << min_alloc_size_order;
+             expected_statfs->data_compressed_allocated -=
+               uint64_t(-sbi.allocated_chunks) << min_alloc_size_order;
+           } else {
+             expected_statfs->allocated -= uint64_t(sbi.allocated_chunks) << min_alloc_size_order;
            }
-           sbi.updated = sbi.passed = true;
-           sbi.ref_map.clear();
-           
+           sbi.allocated_chunks = 0;
+           repairer.fix_shared_blob(txn, sbid, nullptr, 0);
+
            // relying on blob's pextents to decide what to release.
            for (auto& p : pext_to_release) {
              to_release.union_insert(p.offset, p.length);
@@ -8674,61 +9628,12 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
                 << "~" << it.get_len() << std::dec << dendl;
        fm->release(it.get_start(), it.get_len(), txn);
       }
-      shared_alloc.a->release(to_release);
+      alloc->release(to_release);
       to_release.clear();
     } // if (it) {
   } //if (repair && repairer.preprocess_misreference()) {
-
-  if (depth != FSCK_SHALLOW) {
-    for (auto &p : sb_info) {
-      sb_info_t& sbi = p.second;
-      if (!sbi.passed) {
-        derr << "fsck error: missing " << *sbi.sb << dendl;
-        ++errors;
-      }
-      if (repair && (!sbi.passed || sbi.updated)) {
-        auto sbid = p.first;
-        if (sbi.ref_map.empty()) {
-         ceph_assert(sbi.passed);
-         dout(20) << __func__ << " " << *sbi.sb
-                  << " is empty, removing" << dendl;
-         repairer.fix_shared_blob(db, sbid, nullptr);
-        } else {
-         bufferlist bl;
-         bluestore_shared_blob_t persistent(sbid, std::move(sbi.ref_map));
-         encode(persistent, bl);
-         dout(20) << __func__ << " " << *sbi.sb
-                  << " is " << bl.length() << " bytes, updating"
-                  << dendl;
-
-         repairer.fix_shared_blob(db, sbid, &bl);
-         // we need to account for shared blob pextents at both
-         // stats and used blocks to avoid related errors.
-         PExtentVector extents;
-         for (auto& r : persistent.ref_map.ref_map) {
-           extents.emplace_back(bluestore_pextent_t(r.first, r.second.length));
-         }
-         auto* expected_statfs = &expected_pool_statfs[sbi.pool_id];
-         int errors = _fsck_check_extents(sbi.cid,
-           ghobject_t(), // doesn't matter
-           extents,
-           sbi.compressed,
-           used_blocks,
-           fm->get_alloc_size(),
-           nullptr,
-           *expected_statfs,
-           depth);
-         if (errors) {
-           derr << __func__ << " " << errors
-                << "  unexpected error(s) after missed shared blob repair,"
-                << " perhaps worth one more repair attempt"
-                << dendl;
-         }
-        }
-      }
-    }
-  }
   sb_info.clear();
+  sb_ref_counts.reset();
 
   // check global stats only if fscking (not repairing) w/o per-pool stats
   if (!per_pool_stat_collection &&
@@ -8759,9 +9664,13 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
 
         if (used_omap_head.count(omap_head) == 0 &&
            omap_head != last_omap_head) {
+          pair<string,string> rk = it->raw_key();
           fsck_derr(errors, MAX_FSCK_ERROR_LINES)
             << "fsck error: found stray omap data on omap_head "
-            << omap_head << " " << last_omap_head << " " << used_omap_head.count(omap_head) << fsck_dendl;
+            << omap_head << " " << last_omap_head
+            << " prefix/key: " << url_escape(rk.first)
+            << " " << url_escape(rk.second)
+            << fsck_dendl;
           ++errors;
           last_omap_head = omap_head;
         }
@@ -8775,9 +9684,13 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
         _key_decode_u64(it->key().c_str(), &omap_head);
         if (used_omap_head.count(omap_head) == 0 &&
            omap_head != last_omap_head) {
+          pair<string,string> rk = it->raw_key();
           fsck_derr(errors, MAX_FSCK_ERROR_LINES)
             << "fsck error: found stray (pgmeta) omap data on omap_head "
-            << omap_head << " " << last_omap_head << " " << used_omap_head.count(omap_head) << fsck_dendl;
+            << omap_head << " " << last_omap_head
+            << " prefix/key: " << url_escape(rk.first)
+            << " " << url_escape(rk.second)
+            << fsck_dendl;
           last_omap_head = omap_head;
          ++errors;
         }
@@ -8795,9 +9708,13 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
         c = _key_decode_u64(c, &omap_head);
         if (used_omap_head.count(omap_head) == 0 &&
           omap_head != last_omap_head) {
+          pair<string,string> rk = it->raw_key();
           fsck_derr(errors, MAX_FSCK_ERROR_LINES)
             << "fsck error: found stray (per-pool) omap data on omap_head "
-            << omap_head << " " << last_omap_head << " " << used_omap_head.count(omap_head) << fsck_dendl;
+            << omap_head << " " << last_omap_head
+            << " prefix/key: " << url_escape(rk.first)
+            << " " << url_escape(rk.second)
+            << fsck_dendl;
           ++errors;
           last_omap_head = omap_head;
         }
@@ -8819,6 +9736,7 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
           omap_head != last_omap_head) {
           fsck_derr(errors, MAX_FSCK_ERROR_LINES)
             << "fsck error: found stray (per-pg) omap data on omap_head "
+           << " key " << pretty_binary_string(it->key())
             << omap_head << " " << last_omap_head << " " << used_omap_head.count(omap_head) << fsck_dendl;
           ++errors;
           last_omap_head = omap_head;
@@ -8859,76 +9777,153 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
       }
     }
 
-    dout(1) << __func__ << " checking freelist vs allocated" << dendl;
-    {
-      fm->enumerate_reset();
-      uint64_t offset, length;
-      while (fm->enumerate_next(db, &offset, &length)) {
-        bool intersects = false;
-        apply_for_bitset_range(
-          offset, length, alloc_size, used_blocks,
-          [&](uint64_t pos, mempool_dynamic_bitset &bs) {
-           ceph_assert(pos < bs.size());
-            if (bs.test(pos) && !bluefs_used_blocks.test(pos)) {
-             if (offset == SUPER_RESERVED &&
-                 length == min_alloc_size - SUPER_RESERVED) {
-               // this is due to the change just after luminous to min_alloc_size
-               // granularity allocations, and our baked in assumption at the top
-               // of _fsck that 0~round_up_to(SUPER_RESERVED,min_alloc_size) is used
-               // (vs luminous's round_up_to(SUPER_RESERVED,block_size)).  harmless,
-               // since we will never allocate this region below min_alloc_size.
-               dout(10) << __func__ << " ignoring free extent between SUPER_RESERVED"
-                        << " and min_alloc_size, 0x" << std::hex << offset << "~"
-                        << length << std::dec << dendl;
+    // skip freelist vs allocated compare when we have Null fm
+    if (!fm->is_null_manager()) {
+      dout(1) << __func__ << " checking freelist vs allocated" << dendl;
+#ifdef HAVE_LIBZBD
+      if (freelist_type == "zoned") {
+       // verify per-zone state
+       //  - verify no allocations beyond write pointer
+       //  - verify num_dead_bytes count (neither allocated nor
+       //    free space past the write pointer)
+       auto a = dynamic_cast<ZonedAllocator*>(alloc);
+       auto num_zones = bdev->get_size() / zone_size;
+
+       // mark the free space past the write pointer
+       for (uint32_t zone = first_sequential_zone; zone < num_zones; ++zone) {
+         auto wp = a->get_write_pointer(zone);
+         uint64_t offset = zone_size * zone + wp;
+         uint64_t length = zone_size - wp;
+         if (!length) {
+           continue;
+         }
+         bool intersects = false;
+         dout(10) << "  marking zone 0x" << std::hex << zone
+                  << " region after wp 0x" << offset << "~" << length
+                  << std::dec << dendl;
+         apply_for_bitset_range(
+           offset, length, alloc_size, used_blocks,
+           [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+             if (bs.test(pos)) {
+               derr << "fsck error: zone 0x" << std::hex << zone
+                    << " has used space at 0x" << pos * alloc_size
+                    << " beyond write pointer 0x" << wp
+                    << std::dec << dendl;
+               intersects = true;
              } else {
-                intersects = true;
-               if (repair) {
-                 repairer.fix_false_free(db, fm,
-                                         pos * min_alloc_size,
-                                         min_alloc_size);
-               }
+               bs.set(pos);
              }
-            } else {
-             bs.set(pos);
-            }
-          }
-        );
-        if (intersects) {
-         derr << "fsck error: free extent 0x" << std::hex << offset
-               << "~" << length << std::dec
-               << " intersects allocated blocks" << dendl;
-         ++errors;
-        }
-      }
-      fm->enumerate_reset();
-      size_t count = used_blocks.count();
-      if (used_blocks.size() != count) {
-        ceph_assert(used_blocks.size() > count);
-        used_blocks.flip();
-        size_t start = used_blocks.find_first();
-        while (start != decltype(used_blocks)::npos) {
-         size_t cur = start;
-         while (true) {
-           size_t next = used_blocks.find_next(cur);
-           if (next != cur + 1) {
-             ++errors;
-             derr << "fsck error: leaked extent 0x" << std::hex
-                  << ((uint64_t)start * fm->get_alloc_size()) << "~"
-                  << ((cur + 1 - start) * fm->get_alloc_size()) << std::dec
-                  << dendl;
-             if (repair) {
-               repairer.fix_leaked(db,
-                                   fm,
-                                   start * min_alloc_size,
-                                   (cur + 1 - start) * min_alloc_size);
+           }
+           );
+         if (intersects) {
+           ++errors;
+         }
+       }
+
+       used_blocks.flip();
+
+       // skip conventional zones
+       uint64_t pos = (first_sequential_zone * zone_size) / min_alloc_size - 1;
+       pos = used_blocks.find_next(pos);
+
+       uint64_t zone_dead = 0;
+       for (uint32_t zone = first_sequential_zone;
+            zone < num_zones;
+            ++zone, zone_dead = 0) {
+         while (pos != decltype(used_blocks)::npos &&
+                (pos * min_alloc_size) / zone_size == zone) {
+           dout(40) << " zone 0x" << std::hex << zone
+                    << " dead 0x" << (pos * min_alloc_size) << "~" << min_alloc_size
+                    << std::dec << dendl;
+           zone_dead += min_alloc_size;
+           pos = used_blocks.find_next(pos);
+         }
+         dout(20) << " zone 0x" << std::hex << zone << " dead is 0x" << zone_dead
+                  << std::dec << dendl;
+         // cross-check dead bytes against zone state
+         if (a->get_dead_bytes(zone) != zone_dead) {
+           derr << "fsck error: zone 0x" << std::hex << zone << " has 0x" << zone_dead
+                << " dead bytes but freelist says 0x" << a->get_dead_bytes(zone)
+                << dendl;
+           ++errors;
+           // TODO: repair
+         }
+       }
+       used_blocks.flip();
+      } else
+#endif
+      {
+       fm->enumerate_reset();
+       uint64_t offset, length;
+       while (fm->enumerate_next(db, &offset, &length)) {
+         bool intersects = false;
+         apply_for_bitset_range(
+           offset, length, alloc_size, used_blocks,
+           [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+             ceph_assert(pos < bs.size());
+             if (bs.test(pos) && !bluefs_used_blocks.test(pos)) {
+               if (offset == SUPER_RESERVED &&
+                   length == min_alloc_size - SUPER_RESERVED) {
+                 // this is due to the change just after luminous to min_alloc_size
+                 // granularity allocations, and our baked in assumption at the top
+                 // of _fsck that 0~round_up_to(SUPER_RESERVED,min_alloc_size) is used
+                 // (vs luminous's round_up_to(SUPER_RESERVED,block_size)).  harmless,
+                 // since we will never allocate this region below min_alloc_size.
+                 dout(10) << __func__ << " ignoring free extent between SUPER_RESERVED"
+                          << " and min_alloc_size, 0x" << std::hex << offset << "~"
+                          << length << std::dec << dendl;
+               } else {
+                 intersects = true;
+                 if (repair) {
+                   repairer.fix_false_free(db, fm,
+                                           pos * min_alloc_size,
+                                           min_alloc_size);
+                 }
+               }
+             } else {
+               bs.set(pos);
              }
-             start = next;
-             break;
            }
-           cur = next;
+           );
+         if (intersects) {
+           derr << "fsck error: free extent 0x" << std::hex << offset
+                << "~" << length << std::dec
+                << " intersects allocated blocks" << dendl;
+           ++errors;
          }
-        }
-        used_blocks.flip();
+       }
+       fm->enumerate_reset();
+
+       // check for leaked extents
+       size_t count = used_blocks.count();
+       if (used_blocks.size() != count) {
+         ceph_assert(used_blocks.size() > count);
+         used_blocks.flip();
+         size_t start = used_blocks.find_first();
+         while (start != decltype(used_blocks)::npos) {
+           size_t cur = start;
+           while (true) {
+             size_t next = used_blocks.find_next(cur);
+             if (next != cur + 1) {
+               ++errors;
+               derr << "fsck error: leaked extent 0x" << std::hex
+                    << ((uint64_t)start * fm->get_alloc_size()) << "~"
+                    << ((cur + 1 - start) * fm->get_alloc_size()) << std::dec
+                    << dendl;
+               if (repair) {
+                 repairer.fix_leaked(db,
+                                     fm,
+                                     start * min_alloc_size,
+                                     (cur + 1 - start) * min_alloc_size);
+               }
+               start = next;
+               break;
+             }
+             cur = next;
+           }
+         }
+         used_blocks.flip();
+       }
       }
     }
   }
@@ -8993,15 +9988,41 @@ void BlueStore::inject_no_shared_blob_key()
   db->submit_transaction_sync(txn);
 };
 
-
-void BlueStore::inject_leaked(uint64_t len)
+void BlueStore::inject_stray_shared_blob_key(uint64_t sbid)
 {
   KeyValueDB::Transaction txn;
   txn = db->get_transaction();
 
+  dout(5) << __func__ << " " << sbid << dendl;
+
+  string key;
+  get_shared_blob_key(sbid, &key);
+  bluestore_shared_blob_t persistent(sbid);
+  persistent.ref_map.get(0xdead0000, 0x1000);
+  bufferlist bl;
+  encode(persistent, bl);
+  dout(20) << __func__ << " sbid " << sbid
+    << " takes " << bl.length() << " bytes, updating"
+    << dendl;
+
+  txn->set(PREFIX_SHARED_BLOB, key, bl);
+  db->submit_transaction_sync(txn);
+};
+
+
+void BlueStore::inject_leaked(uint64_t len)
+{
   PExtentVector exts;
-  int64_t alloc_len = shared_alloc.a->allocate(len, min_alloc_size,
+  int64_t alloc_len = alloc->allocate(len, min_alloc_size,
                                           min_alloc_size * 256, 0, &exts);
+
+  if (fm->is_null_manager()) {
+    return;
+  }
+
+  KeyValueDB::Transaction txn;
+  txn = db->get_transaction();
+
   ceph_assert(alloc_len >= (int64_t)len);
   for (auto& p : exts) {
     fm->allocate(p.offset, p.length, txn);
@@ -9011,6 +10032,8 @@ void BlueStore::inject_leaked(uint64_t len)
 
 void BlueStore::inject_false_free(coll_t cid, ghobject_t oid)
 {
+  ceph_assert(!fm->is_null_manager());
+
   KeyValueDB::Transaction txn;
   OnodeRef o;
   CollectionRef c = _get_collection(cid);
@@ -9086,6 +10109,19 @@ void BlueStore::inject_legacy_omap(coll_t cid, ghobject_t oid)
   db->submit_transaction_sync(txn);
 }
 
+void BlueStore::inject_stray_omap(uint64_t head, const string& name)
+{
+  dout(1) << __func__ << dendl;
+  KeyValueDB::Transaction txn = db->get_transaction();
+
+  string key;
+  bufferlist bl;
+  _key_encode_u64(head, &key);
+  key.append(name);
+  txn->set(PREFIX_OMAP, key, bl);
+
+  db->submit_transaction_sync(txn);
+}
 
 void BlueStore::inject_statfs(const string& key, const store_statfs_t& new_statfs)
 {
@@ -9278,40 +10314,41 @@ int BlueStore::get_devices(set<string> *ls)
     }
     return 0;
   }
-  
+
   // grumble, we haven't started up yet.
-  int r = _open_path();
-  if (r < 0)
-    goto out;
-  r = _open_fsid(false);
-  if (r < 0)
-    goto out_path;
-  r = _read_fsid(&fsid);
-  if (r < 0)
-    goto out_fsid;
-  r = _lock_fsid();
-  if (r < 0)
-    goto out_fsid;
-  r = _open_bdev(false);
-  if (r < 0)
-    goto out_fsid;
-  r = _minimal_open_bluefs(false);
-  if (r < 0)
-    goto out_bdev;
+  if (int r = _open_path(); r < 0) {
+    return r;
+  }
+  auto close_path = make_scope_guard([&] {
+    _close_path();
+  });
+  if (int r = _open_fsid(false); r < 0) {
+    return r;
+  }
+  auto close_fsid = make_scope_guard([&] {
+    _close_fsid();
+  });
+  if (int r = _read_fsid(&fsid); r < 0) {
+    return r;
+  }
+  if (int r = _lock_fsid(); r < 0) {
+    return r;
+  }
+  if (int r = _open_bdev(false); r < 0) {
+    return r;
+  }
+  auto close_bdev = make_scope_guard([&] {
+    _close_bdev();
+  });
+  if (int r = _minimal_open_bluefs(false); r < 0) {
+    return r;
+  }
   bdev->get_devices(ls);
   if (bluefs) {
     bluefs->get_devices(ls);
   }
-  r = 0;
   _minimal_close_bluefs();
- out_bdev:
-  _close_bdev();
- out_fsid:
-  _close_fsid();
- out_path:
-  _close_path();
- out:
-  return r;
+  return 0;
 }
 
 void BlueStore::_get_statfs_overall(struct store_statfs_t *buf)
@@ -9326,7 +10363,7 @@ void BlueStore::_get_statfs_overall(struct store_statfs_t *buf)
   buf->omap_allocated =
     db->estimate_prefix_size(prefix, string());
 
-  uint64_t bfree = shared_alloc.a->get_free();
+  uint64_t bfree = alloc->get_free();
 
   if (bluefs) {
     buf->internally_reserved = 0;
@@ -9450,6 +10487,22 @@ BlueStore::CollectionRef BlueStore::_get_collection(const coll_t& cid)
   return cp->second;
 }
 
+BlueStore::CollectionRef BlueStore::_get_collection_by_oid(const ghobject_t& oid)
+{
+  std::shared_lock l(coll_lock);
+
+  // FIXME: we must replace this with something more efficient
+
+  for (auto& i : coll_map) {
+    spg_t spgid;
+    if (i.first.is_pg(&spgid) &&
+       i.second->contains(oid)) {
+      return i.second;
+    }
+  }
+  return CollectionRef();
+}
+
 void BlueStore::_queue_reap_collection(CollectionRef& c)
 {
   dout(10) << __func__ << " " << c << " " << c->cid << dendl;
@@ -9786,8 +10839,8 @@ int BlueStore::_prepare_read_ioc(
   for (auto& p : blobs2read) {
     const BlobRef& bptr = p.first;
     regions2read_t& r2r = p.second;
-    dout(20) << __func__ << "  blob " << *bptr << std::hex
-             << " need " << r2r << std::dec << dendl;
+    dout(20) << __func__ << "  blob " << *bptr << " need "
+             << r2r << dendl;
     if (bptr->get_blob().is_compressed()) {
       // read the whole thing
       if (compressed_blob_bls->empty()) {
@@ -9864,8 +10917,8 @@ int BlueStore::_generate_read_result_bl(
   while (b2r_it != blobs2read.end()) {
     const BlobRef& bptr = b2r_it->first;
     regions2read_t& r2r = b2r_it->second;
-    dout(20) << __func__ << "  blob " << *bptr << std::hex
-             << " need 0x" << r2r << std::dec << dendl;
+    dout(20) << __func__ << "  blob " << *bptr << " need "
+             << r2r << dendl;
     if (bptr->get_blob().is_compressed()) {
       ceph_assert(p != compressed_blob_bls.end());
       bufferlist& compressed_bl = *p++;
@@ -10005,7 +11058,7 @@ int BlueStore::_do_read(
                              // measure the whole block below.
                              // The error isn't that much...
   vector<bufferlist> compressed_blob_bls;
-  IOContext ioc(cct, NULL, true); // allow EIO
+  IOContext ioc(cct, NULL, !cct->_conf->bluestore_fail_eio);
   r = _prepare_read_ioc(blobs2read, &compressed_blob_bls, &ioc);
   // we always issue aio for reading, so errors other than EIO are not allowed
   if (r < 0)
@@ -10033,7 +11086,8 @@ int BlueStore::_do_read(
   bool csum_error = false;
   r = _generate_read_result_bl(o, offset, length, ready_regions,
                               compressed_blob_bls, blobs2read,
-                              buffered, &csum_error, bl);
+                              buffered && !ioc.skip_cache(),
+                              &csum_error, bl);
   if (csum_error) {
     // Handles spurious read errors caused by a kernel bug.
     // We sometimes get all-zero pages as a result of the read under
@@ -10351,7 +11405,7 @@ int BlueStore::_do_readv(
     cct->_conf->bluestore_log_op_age);
   _dump_onode<30>(cct, *o);
 
-  IOContext ioc(cct, NULL, true); // allow EIO
+  IOContext ioc(cct, NULL, !cct->_conf->bluestore_fail_eio);
   vector<std::tuple<ready_regions_t, vector<bufferlist>, blobs2read_t>> raw_results;
   raw_results.reserve(m.num_intervals());
   int i = 0;
@@ -10494,7 +11548,7 @@ int BlueStore::getattr(
 int BlueStore::getattrs(
   CollectionHandle &c_,
   const ghobject_t& oid,
-  map<string,bufferptr>& aset)
+  map<string,bufferptr,less<>>& aset)
 {
   Collection *c = static_cast<Collection *>(c_.get());
   dout(15) << __func__ << " " << c->cid << " " << oid << dendl;
@@ -11138,12 +12192,11 @@ int BlueStore::_open_super_meta()
     db->get(PREFIX_SUPER, "freelist_type", &bl);
     if (bl.length()) {
       freelist_type = std::string(bl.c_str(), bl.length());
-      dout(1) << __func__ << " freelist_type " << freelist_type << dendl;
     } else {
       ceph_abort_msg("Not Support extent freelist manager");
     }
+    dout(5) << __func__ << "::NCB::freelist_type=" << freelist_type << dendl;
   }
-
   // ondisk format
   int32_t compat_ondisk_format = 0;
   {
@@ -11197,6 +12250,8 @@ int BlueStore::_open_super_meta()
       decode(val, p);
       min_alloc_size = val;
       min_alloc_size_order = ctz(val);
+      min_alloc_size_mask  = min_alloc_size - 1;
+
       ceph_assert(min_alloc_size == 1u << min_alloc_size_order);
     } catch (ceph::buffer::error& e) {
       derr << __func__ << " unable to read min_alloc_size" << dendl;
@@ -11204,6 +12259,34 @@ int BlueStore::_open_super_meta()
     }
     dout(1) << __func__ << " min_alloc_size 0x" << std::hex << min_alloc_size
             << std::dec << dendl;
+    logger->set(l_bluestore_alloc_unit, min_alloc_size);
+  }
+
+  // smr fields
+  {
+    bufferlist bl;
+    int r = db->get(PREFIX_SUPER, "zone_size", &bl);
+    if (r >= 0) {
+      auto p = bl.cbegin();
+      decode(zone_size, p);
+      dout(1) << __func__ << " zone_size 0x" << std::hex << zone_size << std::dec << dendl;
+      ceph_assert(bdev->is_smr());
+    } else {
+      ceph_assert(!bdev->is_smr());
+    }
+  }
+  {
+    bufferlist bl;
+    int r = db->get(PREFIX_SUPER, "first_sequential_zone", &bl);
+    if (r >= 0) {
+      auto p = bl.cbegin();
+      decode(first_sequential_zone, p);
+      dout(1) << __func__ << " first_sequential_zone 0x" << std::hex
+             << first_sequential_zone << std::dec << dendl;
+      ceph_assert(bdev->is_smr());
+    } else {
+      ceph_assert(!bdev->is_smr());
+    }
   }
 
   _set_per_pool_omap();
@@ -11582,77 +12665,6 @@ void BlueStore::BSPerfTracker::update_from_perfcounters(
       l_bluestore_commit_lat));
 }
 
-// For every object we maintain <zone_num+oid, offset> tuple in the key-value
-// store.  When a new object written to a zone, we insert the corresponding
-// tuple to the database.  When an object is truncated, we remove the
-// corresponding tuple.  When an object is overwritten, we remove the old tuple
-// and insert a new tuple corresponding to the new location of the object.  The
-// cleaner can now identify live objects within the zone <zone_num> by
-// enumerating all the keys starting with <zone_num> prefix.
-void BlueStore::_zoned_update_cleaning_metadata(TransContext *txc) {
-  for (const auto &[o, offsets] : txc->zoned_onode_to_offset_map) {
-    std::string key;
-    get_object_key(cct, o->oid, &key);
-    for (auto offset : offsets) {
-      if (offset > 0) {
-       bufferlist offset_bl;
-       encode(offset, offset_bl);
-        txc->t->set(_zoned_get_prefix(offset), key, offset_bl);
-      } else {
-        txc->t->rmkey(_zoned_get_prefix(-offset), key);
-      }
-    }
-  }
-}
-
-std::string BlueStore::_zoned_get_prefix(uint64_t offset) {
-  uint64_t zone_num = offset / bdev->get_zone_size();
-  std::string zone_key;
-  _key_encode_u64(zone_num, &zone_key);
-  return PREFIX_ZONED_CL_INFO + zone_key;
-}
-
-// For now, to avoid interface changes we piggyback zone_size (in MiB) and the
-// first sequential zone number onto min_alloc_size and pass it to functions
-// Allocator::create and FreelistManager::create.
-uint64_t BlueStore::_zoned_piggyback_device_parameters_onto(uint64_t min_alloc_size) {
-  uint64_t zone_size = bdev->get_zone_size();
-  uint64_t zone_size_mb = zone_size / (1024 * 1024);
-  uint64_t first_seq_zone = bdev->get_conventional_region_size() / zone_size;
-  min_alloc_size |= (zone_size_mb << 32);
-  min_alloc_size |= (first_seq_zone << 48);
-  return min_alloc_size;
-}
-
-int BlueStore::_zoned_check_config_settings() {
-  if (cct->_conf->bluestore_allocator != "zoned") {
-    dout(1) << __func__ << " The drive is HM-SMR but "
-           << cct->_conf->bluestore_allocator << " allocator is specified. "
-           << "Only zoned allocator can be used with HM-SMR drive." << dendl;
-    return -EINVAL;
-  }
-
-  // At least for now we want to use large min_alloc_size with HM-SMR drives.
-  // Populating used_blocks bitset on a debug build of ceph-osd takes about 5
-  // minutes with a 14 TB HM-SMR drive and 4 KiB min_alloc_size.
-  if (min_alloc_size < 64 * 1024) {
-    dout(1) << __func__ << " The drive is HM-SMR but min_alloc_size is "
-           << min_alloc_size << ". "
-           << "Please set to at least 64 KiB." << dendl;
-    return -EINVAL;
-  }
-
-  // We don't want to defer writes with HM-SMR because it violates sequential
-  // write requirement.
-  if (prefer_deferred_size) {
-    dout(1) << __func__ << " The drive is HM-SMR but prefer_deferred_size is "
-           << prefer_deferred_size << ". "
-           << "Please set to 0." << dendl;
-    return -EINVAL;
-  }
-  return 0;
-}
-
 void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t)
 {
   dout(20) << __func__ << " txc " << txc << std::hex
@@ -11660,47 +12672,69 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t)
           << " released 0x" << txc->released
           << std::dec << dendl;
 
-  // We have to handle the case where we allocate *and* deallocate the
-  // same region in this transaction.  The freelist doesn't like that.
-  // (Actually, the only thing that cares is the BitmapFreelistManager
-  // debug check. But that's important.)
-  interval_set<uint64_t> tmp_allocated, tmp_released;
-  interval_set<uint64_t> *pallocated = &txc->allocated;
-  interval_set<uint64_t> *preleased = &txc->released;
-  if (!txc->allocated.empty() && !txc->released.empty()) {
-    interval_set<uint64_t> overlap;
-    overlap.intersection_of(txc->allocated, txc->released);
-    if (!overlap.empty()) {
-      tmp_allocated = txc->allocated;
-      tmp_allocated.subtract(overlap);
-      tmp_released = txc->released;
-      tmp_released.subtract(overlap);
-      dout(20) << __func__ << "  overlap 0x" << std::hex << overlap
-              << ", new allocated 0x" << tmp_allocated
-              << " released 0x" << tmp_released << std::dec
-              << dendl;
-      pallocated = &tmp_allocated;
-      preleased = &tmp_released;
+  if (!fm->is_null_manager())
+  {
+    // We have to handle the case where we allocate *and* deallocate the
+    // same region in this transaction.  The freelist doesn't like that.
+    // (Actually, the only thing that cares is the BitmapFreelistManager
+    // debug check. But that's important.)
+    interval_set<uint64_t> tmp_allocated, tmp_released;
+    interval_set<uint64_t> *pallocated = &txc->allocated;
+    interval_set<uint64_t> *preleased = &txc->released;
+    if (!txc->allocated.empty() && !txc->released.empty()) {
+      interval_set<uint64_t> overlap;
+      overlap.intersection_of(txc->allocated, txc->released);
+      if (!overlap.empty()) {
+       tmp_allocated = txc->allocated;
+       tmp_allocated.subtract(overlap);
+       tmp_released = txc->released;
+       tmp_released.subtract(overlap);
+       dout(20) << __func__ << "  overlap 0x" << std::hex << overlap
+                << ", new allocated 0x" << tmp_allocated
+                << " released 0x" << tmp_released << std::dec
+                << dendl;
+       pallocated = &tmp_allocated;
+       preleased = &tmp_released;
+      }
     }
-  }
 
-  // update freelist with non-overlap sets
-  for (interval_set<uint64_t>::iterator p = pallocated->begin();
-       p != pallocated->end();
-       ++p) {
-    fm->allocate(p.get_start(), p.get_len(), t);
-  }
-  for (interval_set<uint64_t>::iterator p = preleased->begin();
-       p != preleased->end();
-       ++p) {
-    dout(20) << __func__ << " release 0x" << std::hex << p.get_start()
-            << "~" << p.get_len() << std::dec << dendl;
-    fm->release(p.get_start(), p.get_len(), t);
+    // update freelist with non-overlap sets
+    for (interval_set<uint64_t>::iterator p = pallocated->begin();
+        p != pallocated->end();
+        ++p) {
+      fm->allocate(p.get_start(), p.get_len(), t);
+    }
+    for (interval_set<uint64_t>::iterator p = preleased->begin();
+        p != preleased->end();
+        ++p) {
+      dout(20) << __func__ << " release 0x" << std::hex << p.get_start()
+              << "~" << p.get_len() << std::dec << dendl;
+      fm->release(p.get_start(), p.get_len(), t);
+    }
   }
 
+#ifdef HAVE_LIBZBD
   if (bdev->is_smr()) {
-    _zoned_update_cleaning_metadata(txc);
+    for (auto& i : txc->old_zone_offset_refs) {
+      dout(20) << __func__ << " rm ref zone 0x" << std::hex << i.first.second
+              << " offset 0x" << i.second << std::dec
+              << " -> " << i.first.first->oid << dendl;
+      string key;
+      get_zone_offset_object_key(i.first.second, i.second, i.first.first->oid, &key);
+      txc->t->rmkey(PREFIX_ZONED_CL_INFO, key);
+    }
+    for (auto& i : txc->new_zone_offset_refs) {
+      // (zone, offset) -> oid
+      dout(20) << __func__ << " add ref zone 0x" << std::hex << i.first.second
+              << " offset 0x" << i.second << std::dec
+              << " -> " << i.first.first->oid << dendl;
+      string key;
+      get_zone_offset_object_key(i.first.second, i.second, i.first.first->oid, &key);
+      bufferlist v;
+      txc->t->set(PREFIX_ZONED_CL_INFO, key, v);
+    }
   }
+#endif
 
   _txc_update_store_statfs(txc);
 }
@@ -11881,7 +12915,7 @@ void BlueStore::_txc_release_alloc(TransContext *txc)
     }
     dout(10) << __func__ << "(sync) " << txc << " " << std::hex
              << txc->released << std::dec << dendl;
-    shared_alloc.a->release(txc->released);
+    alloc->release(txc->released);
   }
 
 out:
@@ -11891,7 +12925,7 @@ out:
 
 void BlueStore::_osr_attach(Collection *c)
 {
-  // note: caller has RWLock on coll_map
+  // note: caller has coll_lock
   auto q = coll_map.find(c->cid);
   if (q != coll_map.end()) {
     c->osr = q->second->osr;
@@ -12410,7 +13444,7 @@ void BlueStore::_kv_finalize_thread()
       _reap_collections();
 
       logger->set(l_bluestore_fragmentation,
-         (uint64_t)(shared_alloc.a->get_fragmentation() * 1000));
+         (uint64_t)(alloc->get_fragmentation() * 1000));
 
       log_latency("kv_final",
        l_bluestore_kv_final_lat,
@@ -12424,13 +13458,15 @@ void BlueStore::_kv_finalize_thread()
   kv_finalize_started = false;
 }
 
-void BlueStore::_zoned_cleaner_start() {
+#ifdef HAVE_LIBZBD
+void BlueStore::_zoned_cleaner_start()
+{
   dout(10) << __func__ << dendl;
-
   zoned_cleaner_thread.create("bstore_zcleaner");
 }
 
-void BlueStore::_zoned_cleaner_stop() {
+void BlueStore::_zoned_cleaner_stop()
+{
   dout(10) << __func__ << dendl;
   {
     std::unique_lock l{zoned_cleaner_lock};
@@ -12448,28 +13484,36 @@ void BlueStore::_zoned_cleaner_stop() {
   dout(10) << __func__ << " done" << dendl;
 }
 
-void BlueStore::_zoned_cleaner_thread() {
+void BlueStore::_zoned_cleaner_thread()
+{
   dout(10) << __func__ << " start" << dendl;
   std::unique_lock l{zoned_cleaner_lock};
   ceph_assert(!zoned_cleaner_started);
   zoned_cleaner_started = true;
   zoned_cleaner_cond.notify_all();
-  std::deque<uint64_t> zones_to_clean;
+  auto a = dynamic_cast<ZonedAllocator*>(alloc);
+  ceph_assert(a);
+  auto f = dynamic_cast<ZonedFreelistManager*>(fm);
+  ceph_assert(f);
   while (true) {
-    if (zoned_cleaner_queue.empty()) {
+    // thresholds to trigger cleaning
+    // FIXME
+    float min_score = .05;                // score: bytes saved / bytes moved
+    uint64_t min_saved = zone_size / 32;  // min bytes saved to consider cleaning
+    auto zone_to_clean = a->pick_zone_to_clean(min_score, min_saved);
+    if (zone_to_clean < 0) {
       if (zoned_cleaner_stop) {
        break;
       }
-      dout(20) << __func__ << " sleep" << dendl;
-      zoned_cleaner_cond.wait(l);
+      auto period = ceph::make_timespan(cct->_conf->bluestore_cleaner_sleep_interval);
+      dout(20) << __func__ << " sleep for " << period << dendl;
+      zoned_cleaner_cond.wait_for(l, period);
       dout(20) << __func__ << " wake" << dendl;
     } else {
-      zones_to_clean.swap(zoned_cleaner_queue);
       l.unlock();
-      while (!zones_to_clean.empty()) {
-       _zoned_clean_zone(zones_to_clean.front());
-       zones_to_clean.pop_front();
-      }
+      a->set_cleaning_zone(zone_to_clean);
+      _zoned_clean_zone(zone_to_clean, a, f);
+      a->clear_cleaning_zone(zone_to_clean);
       l.lock();
     }
   }
@@ -12477,9 +13521,147 @@ void BlueStore::_zoned_cleaner_thread() {
   zoned_cleaner_started = false;
 }
 
-void BlueStore::_zoned_clean_zone(uint64_t zone_num) {
-  dout(10) << __func__ << " cleaning zone " << zone_num << dendl;
+void BlueStore::_zoned_clean_zone(
+  uint64_t zone,
+  ZonedAllocator *a,
+  ZonedFreelistManager *f
+  )
+{
+  dout(10) << __func__ << " cleaning zone 0x" << std::hex << zone << std::dec << dendl;
+
+  KeyValueDB::Iterator it = db->get_iterator(PREFIX_ZONED_CL_INFO);
+  std::string zone_start;
+  get_zone_offset_object_key(zone, 0, ghobject_t(), &zone_start);
+  for (it->lower_bound(zone_start); it->valid(); it->next()) {
+    uint32_t z;
+    uint64_t offset;
+    ghobject_t oid;
+    string k = it->key();
+    int r = get_key_zone_offset_object(k, &z, &offset, &oid);
+    if (r < 0) {
+      derr << __func__ << " failed to decode zone ref " << pretty_binary_string(k)
+          << dendl;
+      continue;
+    }
+    if (zone != z) {
+      dout(10) << __func__ << " reached end of zone refs" << dendl;
+      break;
+    }
+    dout(10) << __func__ << " zone 0x" << std::hex << zone << " offset 0x" << offset
+            << std::dec << " " << oid << dendl;
+    _clean_some(oid, zone);
+  }
+
+  if (a->get_live_bytes(zone) > 0) {
+    derr << "zone 0x" << std::hex << zone << " still has 0x" << a->get_live_bytes(zone)
+        << " live bytes" << std::dec << dendl;
+    // should we do something else here to avoid a live-lock in the event of a problem?
+    return;
+  }
+
+  // make sure transactions flush/drain/commit (and data is all rewritten
+  // safely elsewhere) before we blow away the cleaned zone
+  _osr_drain_all();
+
+  // reset the device zone
+  dout(10) << __func__ << " resetting zone 0x" << std::hex << zone << std::dec << dendl;
+  bdev->reset_zone(zone);
+
+  // record that we can now write there
+  f->mark_zone_to_clean_free(zone, db);
+  bdev->flush();
+
+  // then allow ourselves to start allocating there
+  dout(10) << __func__ << " done cleaning zone 0x" << std::hex << zone << std::dec
+          << dendl;
+  a->reset_zone(zone);
+}
+
+void BlueStore::_clean_some(ghobject_t oid, uint32_t zone)
+{
+  dout(10) << __func__ << " " << oid << " from zone 0x" << std::hex << zone << std::dec
+          << dendl;
+
+  CollectionRef cref = _get_collection_by_oid(oid);
+  if (!cref) {
+    dout(10) << __func__ << " can't find collection for " << oid << dendl;
+    return;
+  }
+  Collection *c = cref.get();
+
+  // serialize io dispatch vs other transactions
+  std::lock_guard l(atomic_alloc_and_submit_lock);
+  std::unique_lock l2(c->lock);
+
+  auto o = c->get_onode(oid, false);
+  if (!o) {
+    dout(10) << __func__ << " can't find " << oid << dendl;
+    return;
+  }
+
+  o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
+  _dump_onode<30>(cct, *o);
+
+  // NOTE: This is a naive rewrite strategy.  If any blobs are
+  // shared, they will be duplicated for each object that references
+  // them.  That means any cloned/snapshotted objects will explode
+  // their utilization.  This won't matter for RGW workloads, but
+  // for RBD and CephFS it is completely unacceptable, and it's
+  // entirely reasonable to have "archival" data workloads on SMR
+  // for CephFS and (possibly/probably) RBD.
+  //
+  // At some point we need to replace this with something more
+  // sophisticated that ensures that a shared blob gets moved once
+  // and all referencing objects get updated to point to the new
+  // location.
+
+  map<uint32_t, uint32_t> to_move;
+  for (auto& e : o->extent_map.extent_map) {
+    bool touches_zone = false;
+    for (auto& be : e.blob->get_blob().get_extents()) {
+      if (be.is_valid()) {
+       uint32_t z = be.offset / zone_size;
+       if (z == zone) {
+         touches_zone = true;
+         break;
+       }
+      }
+    }
+    if (touches_zone) {
+      to_move[e.logical_offset] = e.length;
+    }
+  }
+  if (to_move.empty()) {
+    dout(10) << __func__ << " no references to zone 0x" << std::hex << zone
+            << std::dec << " from " << oid << dendl;
+    return;
+  }
+
+  dout(10) << __func__ << " rewriting object extents 0x" << std::hex << to_move
+          << std::dec << dendl;
+  OpSequencer *osr = c->osr.get();
+  TransContext *txc = _txc_create(c, osr, nullptr);
+
+  spg_t pgid;
+  if (c->cid.is_pg(&pgid)) {
+    txc->osd_pool_id = pgid.pool();
+  }
+
+  for (auto& [offset, length] : to_move) {
+    bufferlist bl;
+    int r = _do_read(c, o, offset, length, bl, 0);
+    ceph_assert(r == (int)length);
+
+    r = _do_write(txc, cref, o, offset, length, bl, 0);
+    ceph_assert(r >= 0);
+  }
+  txc->write_onode(o);
+
+  _txc_write_nodes(txc, txc->t);
+  _txc_finalize_kv(txc, txc->t);
+  _txc_state_proc(txc);
 }
+#endif
 
 bluestore_deferred_op_t *BlueStore::_get_deferred_op(
   TransContext *txc, uint64_t len)
@@ -12488,8 +13670,8 @@ bluestore_deferred_op_t *BlueStore::_get_deferred_op(
     txc->deferred_txn = new bluestore_deferred_transaction_t;
   }
   txc->deferred_txn->ops.push_back(bluestore_deferred_op_t());
-  logger->inc(l_bluestore_write_deferred);
-  logger->inc(l_bluestore_write_deferred_bytes, len);
+  logger->inc(l_bluestore_issued_deferred_writes);
+  logger->inc(l_bluestore_issued_deferred_write_bytes, len);
   return &txc->deferred_txn->ops.back();
 }
 
@@ -12604,8 +13786,8 @@ void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
                 << start << "~" << bl.length()
                 << " crc " << bl.crc32c(-1) << std::dec << dendl;
        if (!g_conf()->bluestore_debug_omit_block_device_write) {
-         logger->inc(l_bluestore_deferred_write_ops);
-         logger->inc(l_bluestore_deferred_write_bytes, bl.length());
+         logger->inc(l_bluestore_submitted_deferred_writes);
+         logger->inc(l_bluestore_submitted_deferred_write_bytes, bl.length());
          int r = bdev->aio_write(start, bl, &b->ioc, false);
          ceph_assert(r == 0);
        }
@@ -12761,10 +13943,6 @@ int BlueStore::queue_transactions(
   OpSequencer *osr = c->osr.get();
   dout(10) << __func__ << " ch " << c << " " << c->cid << dendl;
 
-  // prepare
-  TransContext *txc = _txc_create(static_cast<Collection*>(ch.get()), osr,
-                                 &on_commit, op);
-
   // With HM-SMR drives (and ZNS SSDs) we want the I/O allocation and I/O
   // submission to happen atomically because if I/O submission happens in a
   // different order than I/O allocation, we end up issuing non-sequential
@@ -12774,6 +13952,11 @@ int BlueStore::queue_transactions(
   if (bdev->is_smr()) {
     atomic_alloc_and_submit_lock.lock();
   }
+
+  // prepare
+  TransContext *txc = _txc_create(static_cast<Collection*>(ch.get()), osr,
+                                 &on_commit, op);
+
   for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
     txc->bytes += (*p).get_num_bytes();
     _txc_add_transaction(txc, &(*p));
@@ -13357,6 +14540,7 @@ void BlueStore::_do_write_small(
   // than 'offset' only).
   o->extent_map.fault_range(db, min_off, offset + max_bsize - min_off);
 
+#ifdef HAVE_LIBZBD
   // On zoned devices, the first goal is to support non-overwrite workloads,
   // such as RGW, with large, aligned objects.  Therefore, for user writes
   // _do_write_small should not trigger.  OSDs, however, write and update a tiny
@@ -13364,14 +14548,27 @@ void BlueStore::_do_write_small(
   // temporarily just pad them to min_alloc_size and write them to a new place
   // on every update.
   if (bdev->is_smr()) {
-    BlobRef b = c->new_blob();
     uint64_t b_off = p2phase<uint64_t>(offset, alloc_len);
     uint64_t b_off0 = b_off;
-    _pad_zeros(&bl, &b_off0, min_alloc_size);
     o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
-    wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length, false, true);
+
+    // Zero detection -- small block
+    if (!bl.is_zero()) {
+      BlobRef b = c->new_blob();
+      _pad_zeros(&bl, &b_off0, min_alloc_size);
+      wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length, false, true);
+    } else { // if (bl.is_zero())
+      dout(20) << __func__ << " skip small zero block " << std::hex
+        << " (0x" << b_off0 << "~" << bl.length() << ")"
+        << " (0x" << b_off << "~" << length << ")"
+        << std::dec << dendl;
+      logger->inc(l_bluestore_write_small_skipped);
+      logger->inc(l_bluestore_write_small_skipped_bytes, length);
+    }
+
     return;
   }
+#endif
 
   // Look for an existing mutable blob we can use.
   auto begin = o->extent_map.extent_map.begin();
@@ -13592,17 +14789,29 @@ void BlueStore::_do_write_small(
            // due to existent extents
            uint64_t b_off = offset - bstart;
            uint64_t b_off0 = b_off;
-           _pad_zeros(&bl, &b_off0, chunk_size);
+           o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
 
-           dout(20) << __func__ << " reuse blob " << *b << std::hex
-                    << " (0x" << b_off0 << "~" << bl.length() << ")"
-                    << " (0x" << b_off << "~" << length << ")"
-                    << std::dec << dendl;
+           // Zero detection -- small block
+           if (!bl.is_zero()) {
+             _pad_zeros(&bl, &b_off0, chunk_size);
+
+             dout(20) << __func__ << " reuse blob " << *b << std::hex
+                      << " (0x" << b_off0 << "~" << bl.length() << ")"
+                      << " (0x" << b_off << "~" << length << ")"
+                      << std::dec << dendl;
+
+             wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length,
+                 false, false);
+             logger->inc(l_bluestore_write_small_unused);
+           } else { // if (bl.is_zero())
+             dout(20) << __func__ << " skip small zero block " << std::hex
+                << " (0x" << b_off0 << "~" << bl.length() << ")"
+                << " (0x" << b_off << "~" << length << ")"
+                << std::dec << dendl;
+             logger->inc(l_bluestore_write_small_skipped);
+             logger->inc(l_bluestore_write_small_skipped_bytes, length);
+           }
 
-           o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
-           wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length,
-                       false, false);
-           logger->inc(l_bluestore_write_small_unused);
            return;
          }
        }
@@ -13640,20 +14849,32 @@ void BlueStore::_do_write_small(
                                offset0 + alloc_len, 
                                min_alloc_size)) {
 
-         uint64_t chunk_size = b->get_blob().get_chunk_size(block_size);
          uint64_t b_off = offset - bstart;
          uint64_t b_off0 = b_off;
-         _pad_zeros(&bl, &b_off0, chunk_size);
+         o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
 
-         dout(20) << __func__ << " reuse blob " << *b << std::hex
-                   << " (0x" << b_off0 << "~" << bl.length() << ")"
-                   << " (0x" << b_off << "~" << length << ")"
-                   << std::dec << dendl;
+         // Zero detection -- small block
+         if (!bl.is_zero()) {
+           uint64_t chunk_size = b->get_blob().get_chunk_size(block_size);
+           _pad_zeros(&bl, &b_off0, chunk_size);
+
+           dout(20) << __func__ << " reuse blob " << *b << std::hex
+             << " (0x" << b_off0 << "~" << bl.length() << ")"
+             << " (0x" << b_off << "~" << length << ")"
+             << std::dec << dendl;
+
+           wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length,
+               false, false);
+           logger->inc(l_bluestore_write_small_unused);
+         } else { // if (bl.is_zero())
+           dout(20) << __func__ << " skip small zero block " << std::hex
+             << " (0x" << b_off0 << "~" << bl.length() << ")"
+             << " (0x" << b_off << "~" << length << ")"
+             << std::dec << dendl;
+           logger->inc(l_bluestore_write_small_skipped);
+           logger->inc(l_bluestore_write_small_skipped_bytes, length);
+         }
 
-         o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
-         wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length,
-                     false, false);
-         logger->inc(l_bluestore_write_small_unused);
          return;
        }
       } 
@@ -13684,20 +14905,36 @@ void BlueStore::_do_write_small(
               << std::hex << offset << "~" << length
              << std::dec << dendl;
   }
-  // new blob.
-  BlobRef b = c->new_blob();
   uint64_t b_off = p2phase<uint64_t>(offset, alloc_len);
   uint64_t b_off0 = b_off;
-  _pad_zeros(&bl, &b_off0, block_size);
   o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
-  wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length,
-    min_alloc_size != block_size, // use 'unused' bitmap when alloc granularity
-                                  // doesn't match disk one only
-    true);
+
+  // Zero detection -- small block
+  if (!bl.is_zero()) {
+    // new blob.
+    BlobRef b = c->new_blob();
+    _pad_zeros(&bl, &b_off0, block_size);
+    wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length,
+       min_alloc_size != block_size, // use 'unused' bitmap when alloc granularity
+                                      // doesn't match disk one only
+       true);
+  } else { // if (bl.is_zero())
+    dout(20) << __func__ << " skip small zero block " << std::hex
+      << " (0x" << b_off0 << "~" << bl.length() << ")"
+      << " (0x" << b_off << "~" << length << ")"
+      << std::dec << dendl;
+    logger->inc(l_bluestore_write_small_skipped);
+    logger->inc(l_bluestore_write_small_skipped_bytes, length);
+  }
 
   return;
 }
 
+bool BlueStore::has_null_fm()
+{
+  return fm->is_null_manager();
+}
+
 bool BlueStore::BigDeferredWriteContext::can_defer(
     BlueStore::extent_map_t::iterator ep,
     uint64_t prefer_deferred_size,
@@ -14004,14 +15241,28 @@ void BlueStore::_do_write_big(
     }
     bufferlist t;
     blp.copy(l, t);
-    wctx->write(offset, b, l, b_off, t, b_off, l, false, new_blob);
-    dout(20) << __func__ << " schedule write big: 0x"
+
+    // Zero detection -- big block
+    if (!t.is_zero()) {
+      wctx->write(offset, b, l, b_off, t, b_off, l, false, new_blob);
+
+      dout(20) << __func__ << " schedule write big: 0x"
       << std::hex << offset << "~" << l << std::dec
       << (new_blob ? " new " : " reuse ")
       << *b << dendl;
+
+      logger->inc(l_bluestore_write_big_blobs);
+    } else { // if (!t.is_zero())
+      dout(20) << __func__ << " skip big zero block " << std::hex
+        << " (0x" << b_off << "~" << t.length() << ")"
+        << " (0x" << b_off << "~" << l << ")"
+        << std::dec << dendl;
+      logger->inc(l_bluestore_write_big_skipped_blobs);
+      logger->inc(l_bluestore_write_big_skipped_bytes, l);
+    }
+
     offset += l;
     length -= l;
-    logger->inc(l_bluestore_write_big_blobs);
   }
 }
 
@@ -14166,30 +15417,21 @@ int BlueStore::_do_alloc_write(
   PExtentVector prealloc;
   prealloc.reserve(2 * wctx->writes.size());;
   int64_t prealloc_left = 0;
-  prealloc_left = shared_alloc.a->allocate(
+  prealloc_left = alloc->allocate(
     need, min_alloc_size, need,
     0, &prealloc);
   if (prealloc_left < 0 || prealloc_left < (int64_t)need) {
     derr << __func__ << " failed to allocate 0x" << std::hex << need
          << " allocated 0x " << (prealloc_left < 0 ? 0 : prealloc_left)
          << " min_alloc_size 0x" << min_alloc_size
-         << " available 0x " << shared_alloc.a->get_free()
+         << " available 0x " << alloc->get_free()
          << std::dec << dendl;
     if (prealloc.size()) {
-      shared_alloc.a->release(prealloc);
+      alloc->release(prealloc);
     }
     return -ENOSPC;
   }
-  _collect_allocation_stats(need, min_alloc_size, prealloc.size());
-
-  if (bdev->is_smr()) {
-    std::deque<uint64_t> zones_to_clean;
-    if (shared_alloc.a->zoned_get_zones_to_clean(&zones_to_clean)) {
-      std::lock_guard l{zoned_cleaner_lock};
-      zoned_cleaner_queue.swap(zones_to_clean);
-      zoned_cleaner_cond.notify_one();
-    }
-  }
+  _collect_allocation_stats(need, min_alloc_size, prealloc);
 
   dout(20) << __func__ << " prealloc " << prealloc << dendl;
   auto prealloc_pos = prealloc.begin();
@@ -14351,6 +15593,26 @@ void BlueStore::_wctx_finish(
   WriteContext *wctx,
   set<SharedBlob*> *maybe_unshared_blobs)
 {
+#ifdef HAVE_LIBZBD
+  if (bdev->is_smr()) {
+    for (auto& w : wctx->writes) {
+      for (auto& e : w.b->get_blob().get_extents()) {
+       if (!e.is_valid()) {
+         continue;
+       }
+       uint32_t zone = e.offset / zone_size;
+       if (!o->onode.zone_offset_refs.count(zone)) {
+         uint64_t zoff = e.offset % zone_size;
+         dout(20) << __func__ << " add ref zone 0x" << std::hex << zone
+                  << " offset 0x" << zoff << std::dec << dendl;
+         txc->note_write_zone_offset(o, zone, zoff);
+       }
+      }
+    }
+  }
+  set<uint32_t> zones_with_releases;
+#endif
+
   auto oep = wctx->old_extents.begin();
   while (oep != wctx->old_extents.end()) {
     auto &lo = *oep;
@@ -14378,6 +15640,12 @@ void BlueStore::_wctx_finish(
          b->shared_blob->put_ref(
            e.offset, e.length, &final,
            unshare_ptr);
+#ifdef HAVE_LIBZBD
+         // we also drop zone ref for shared blob extents
+         if (bdev->is_smr() && e.is_valid()) {
+           zones_with_releases.insert(e.offset / zone_size);
+         }
+#endif
        }
        if (unshare) {
          ceph_assert(maybe_unshared_blobs);
@@ -14404,6 +15672,11 @@ void BlueStore::_wctx_finish(
       if (blob.is_compressed()) {
         txc->statfs_delta.compressed_allocated() -= e.length;
       }
+#ifdef HAVE_LIBZBD
+      if (bdev->is_smr() && e.is_valid()) {
+       zones_with_releases.insert(e.offset / zone_size);
+      }
+#endif
     }
 
     if (b->is_spanning() && !b->is_referenced() && lo.blob_empty) {
@@ -14413,6 +15686,29 @@ void BlueStore::_wctx_finish(
     }
     delete &lo;
   }
+
+#ifdef HAVE_LIBZBD
+  if (!zones_with_releases.empty()) {
+    // we need to fault the entire extent range in here to determinte if we've dropped
+    // all refs to a zone.
+    o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
+    for (auto& b : o->extent_map.extent_map) {
+      for (auto& e : b.blob->get_blob().get_extents()) {
+       if (e.is_valid()) {
+         zones_with_releases.erase(e.offset / zone_size);
+       }
+      }
+    }
+    for (auto zone : zones_with_releases) {
+      auto p = o->onode.zone_offset_refs.find(zone);
+      if (p != o->onode.zone_offset_refs.end()) {
+       dout(20) << __func__ << " rm ref zone 0x" << std::hex << zone
+                << " offset 0x" << p->second << std::dec << dendl;
+       txc->note_release_zone_offset(o, zone, p->second);
+      }
+    }
+  }
+#endif
 }
 
 void BlueStore::_do_write_data(
@@ -14674,15 +15970,6 @@ int BlueStore::_do_write(
                          min_alloc_size);
   }
 
-  if (bdev->is_smr()) {
-    if (wctx.old_extents.empty()) {
-      txc->zoned_note_new_object(o);
-    } else {
-      int64_t old_ondisk_offset = wctx.old_extents.begin()->r.begin()->offset;
-      txc->zoned_note_updated_object(o, old_ondisk_offset);
-    }
-  }
-
   // NB: _wctx_finish() will empty old_extents
   // so we must do gc estimation before that
   _wctx_finish(txc, c, o, &wctx);
@@ -14815,6 +16102,7 @@ void BlueStore::_do_truncate(
     o->extent_map.fault_range(db, offset, length);
     o->extent_map.punch_hole(c, offset, length, &wctx.old_extents);
     o->extent_map.dirty_range(offset, length);
+
     _wctx_finish(txc, c, o, &wctx, maybe_unshared_blobs);
 
     // if we have shards past EOF, ask for a reshard
@@ -14831,14 +16119,6 @@ void BlueStore::_do_truncate(
 
   o->onode.size = offset;
 
-  if (bdev->is_smr()) {
-    // On zoned devices, we currently support only removing an object or
-    // truncating it to zero size, both of which fall through this code path.
-    ceph_assert(offset == 0 && !wctx.old_extents.empty());
-    int64_t ondisk_offset = wctx.old_extents.begin()->r.begin()->offset;
-    txc->zoned_note_truncated_object(o, ondisk_offset);
-  }
-
   txc->write_onode(o);
 }
 
@@ -14850,12 +16130,27 @@ int BlueStore::_truncate(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << std::dec
           << dendl;
+
+  auto start_time = mono_clock::now();
   int r = 0;
   if (offset >= OBJECT_MAX_SIZE) {
     r = -E2BIG;
   } else {
     _do_truncate(txc, c, o, offset);
   }
+  log_latency_fn(
+    __func__,
+    l_bluestore_truncate_lat,
+    mono_clock::now() - start_time,
+    cct->_conf->bluestore_log_op_age,
+    [&](const ceph::timespan& lat) {
+      ostringstream ostr;
+      ostr << ", lat = " << timespan_str(lat)
+        << " cid =" << c->cid
+        << " oid =" << o->oid;
+      return ostr.str();
+    }
+  );
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << std::dec
           << " = " << r << dendl;
@@ -14970,9 +16265,9 @@ int BlueStore::_remove(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " onode " << o.get()
           << " txc "<< txc << dendl;
-
-  auto start_time = mono_clock::now();
+ auto start_time = mono_clock::now();
   int r = _do_remove(txc, c, o);
+
   log_latency_fn(
     __func__,
     l_bluestore_remove_lat,
@@ -15091,6 +16386,7 @@ void BlueStore::_do_omap_clear(TransContext *txc, OnodeRef& o)
   o->get_omap_tail(&tail);
   txc->t->rm_range_keys(omap_prefix, prefix, tail);
   txc->t->rmkey(omap_prefix, tail);
+  o->onode.clear_omap_flag();
   dout(20) << __func__ << " remove range start: "
            << pretty_binary_string(prefix) << " end: "
            << pretty_binary_string(tail) << dendl;
@@ -15101,13 +16397,16 @@ int BlueStore::_omap_clear(TransContext *txc,
                           OnodeRef& o)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
+  auto t0 = mono_clock::now();
+
   int r = 0;
   if (o->onode.has_omap()) {
     o->flush();
     _do_omap_clear(txc, o);
-    o->onode.clear_omap_flag();
     txc->write_onode(o);
   }
+  logger->tinc(l_bluestore_omap_clear_lat, mono_clock::now() - t0);
+
   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
   return r;
 }
@@ -15319,7 +16618,6 @@ int BlueStore::_clone(TransContext *txc,
     dout(20) << __func__ << " clearing old omap data" << dendl;
     newo->flush();
     _do_omap_clear(txc, newo);
-    newo->onode.clear_omap_flag();
   }
   if (oldo->onode.has_omap()) {
     dout(20) << __func__ << " copying omap data" << dendl;
@@ -15328,6 +16626,9 @@ int BlueStore::_clone(TransContext *txc,
     } else {
       newo->onode.set_omap_flags(per_pool_omap == OMAP_BULK);
     }
+    // check if prefix for omap key is exactly the same size for both objects
+    // otherwise rewrite_omap_key will corrupt data
+    ceph_assert(oldo->onode.flags == newo->onode.flags);
     const string& prefix = newo->get_omap_prefix();
     KeyValueDB::Iterator it = db->get_iterator(prefix);
     string head, tail;
@@ -15381,18 +16682,50 @@ int BlueStore::_do_clone_range(
   _dump_onode<30>(cct, *newo);
 
   oldo->extent_map.dup(this, txc, c, oldo, newo, srcoff, length, dstoff);
-  _dump_onode<30>(cct, *oldo);
-  _dump_onode<30>(cct, *newo);
-  return 0;
-}
 
-int BlueStore::_clone_range(TransContext *txc,
-                           CollectionRef& c,
-                           OnodeRef& oldo,
-                           OnodeRef& newo,
-                           uint64_t srcoff, uint64_t length, uint64_t dstoff)
-{
-  dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
+#ifdef HAVE_LIBZBD
+  if (bdev->is_smr()) {
+    // duplicate the refs for the shared region.
+    Extent dummy(dstoff);
+    for (auto e = newo->extent_map.extent_map.lower_bound(dummy);
+        e != newo->extent_map.extent_map.end();
+        ++e) {
+      if (e->logical_offset >= dstoff + length) {
+       break;
+      }
+      for (auto& ex : e->blob->get_blob().get_extents()) {
+       // note that we may introduce a new extent reference that is
+       // earlier than the first zone ref.  we allow this since it is
+       // a lot of work to avoid and has marginal impact on cleaning
+       // performance.
+       if (!ex.is_valid()) {
+         continue;
+       }
+       uint32_t zone = ex.offset / zone_size;
+       if (!newo->onode.zone_offset_refs.count(zone)) {
+         uint64_t zoff = ex.offset % zone_size;
+         dout(20) << __func__ << " add ref zone 0x" << std::hex << zone
+                  << " offset 0x" << zoff << std::dec
+                  << " -> " << newo->oid << dendl;
+         txc->note_write_zone_offset(newo, zone, zoff);
+       }
+      }
+    }
+  }
+#endif
+
+  _dump_onode<30>(cct, *oldo);
+  _dump_onode<30>(cct, *newo);
+  return 0;
+}
+
+int BlueStore::_clone_range(TransContext *txc,
+                           CollectionRef& c,
+                           OnodeRef& oldo,
+                           OnodeRef& newo,
+                           uint64_t srcoff, uint64_t length, uint64_t dstoff)
+{
+  dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
           << newo->oid << " from 0x" << std::hex << srcoff << "~" << length
           << " to offset 0x" << dstoff << std::dec << dendl;
   int r = 0;
@@ -15485,6 +16818,27 @@ int BlueStore::_rename(TransContext *txc,
   // and read newo's metadata via the old name).
   txc->note_modified_object(oldo);
 
+#ifdef HAVE_LIBZBD
+  if (bdev->is_smr()) {
+    // adjust zone refs
+    for (auto& [zone, offset] : newo->onode.zone_offset_refs) {
+      dout(20) << __func__ << " rm ref zone 0x" << std::hex << zone
+              << " offset 0x" << offset << std::dec
+              << " -> " << oldo->oid << dendl;
+      string key;
+      get_zone_offset_object_key(zone, offset, oldo->oid, &key);
+      txc->t->rmkey(PREFIX_ZONED_CL_INFO, key);
+
+      dout(20) << __func__ << " add ref zone 0x" << std::hex << zone
+              << " offset 0x" << offset << std::dec
+              << " -> " << newo->oid << dendl;
+      get_zone_offset_object_key(zone, offset, newo->oid, &key);
+      bufferlist v;
+      txc->t->set(PREFIX_ZONED_CL_INFO, key, v);
+    }
+  }
+#endif
+
  out:
   dout(10) << __func__ << " " << c->cid << " " << old_oid << " -> "
           << new_oid << " = " << r << dendl;
@@ -15905,81 +17259,9 @@ void BlueStore::BlueStoreThrottle::complete(TransContext &txc)
 }
 #endif
 
-// DB key value Histogram
-#define KEY_SLAB 32
-#define VALUE_SLAB 64
-
 const string prefix_onode = "o";
 const string prefix_onode_shard = "x";
 const string prefix_other = "Z";
-
-int BlueStore::DBHistogram::get_key_slab(size_t sz)
-{
-  return (sz/KEY_SLAB);
-}
-
-string BlueStore::DBHistogram::get_key_slab_to_range(int slab)
-{
-  int lower_bound = slab * KEY_SLAB;
-  int upper_bound = (slab + 1) * KEY_SLAB;
-  string ret = "[" + stringify(lower_bound) + "," + stringify(upper_bound) + ")";
-  return ret;
-}
-
-int BlueStore::DBHistogram::get_value_slab(size_t sz)
-{
-  return (sz/VALUE_SLAB);
-}
-
-string BlueStore::DBHistogram::get_value_slab_to_range(int slab)
-{
-  int lower_bound = slab * VALUE_SLAB;
-  int upper_bound = (slab + 1) * VALUE_SLAB;
-  string ret = "[" + stringify(lower_bound) + "," + stringify(upper_bound) + ")";
-  return ret;
-}
-
-void BlueStore::DBHistogram::update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
-                      const string &prefix, size_t key_size, size_t value_size)
-{
-    uint32_t key_slab = get_key_slab(key_size);
-    uint32_t value_slab = get_value_slab(value_size);
-    key_hist[prefix][key_slab].count++;
-    key_hist[prefix][key_slab].max_len =
-      std::max<size_t>(key_size, key_hist[prefix][key_slab].max_len);
-    key_hist[prefix][key_slab].val_map[value_slab].count++;
-    key_hist[prefix][key_slab].val_map[value_slab].max_len =
-      std::max<size_t>(value_size,
-                       key_hist[prefix][key_slab].val_map[value_slab].max_len);
-}
-
-void BlueStore::DBHistogram::dump(Formatter *f)
-{
-  f->open_object_section("rocksdb_value_distribution");
-  for (auto i : value_hist) {
-    f->dump_unsigned(get_value_slab_to_range(i.first).data(), i.second);
-  }
-  f->close_section();
-
-  f->open_object_section("rocksdb_key_value_histogram");
-  for (auto i : key_hist) {
-    f->dump_string("prefix", i.first);
-    f->open_object_section("key_hist");
-    for ( auto k : i.second) {
-      f->dump_unsigned(get_key_slab_to_range(k.first).data(), k.second.count);
-      f->dump_unsigned("max_len", k.second.max_len);
-      f->open_object_section("value_hist");
-      for ( auto j : k.second.val_map) {
-       f->dump_unsigned(get_value_slab_to_range(j.first).data(), j.second.count);
-       f->dump_unsigned("max_len", j.second.max_len);
-      }
-      f->close_section();
-    }
-    f->close_section();
-  }
-  f->close_section();
-}
-
 //Itrerates through the db and collects the stats
 void BlueStore::generate_db_histogram(Formatter *f)
 {
@@ -15998,7 +17280,7 @@ void BlueStore::generate_db_histogram(Formatter *f)
   size_t max_key_size =0, max_value_size = 0;
   uint64_t total_key_size = 0, total_value_size = 0;
   size_t key_size = 0, value_size = 0;
-  DBHistogram hist;
+  KeyValueHistogram hist;
 
   auto start = coarse_mono_clock::now();
 
@@ -16249,11 +17531,15 @@ void BlueStore::_log_alerts(osd_alert_list_t& alerts)
 }
 
 void BlueStore::_collect_allocation_stats(uint64_t need, uint32_t alloc_size,
-                                          size_t extents)
+                                          const PExtentVector& extents)
 {
   alloc_stats_count++;
-  alloc_stats_fragments += extents;
+  alloc_stats_fragments += extents.size();
   alloc_stats_size += need;
+
+  for (auto& e : extents) {
+    logger->hinc(l_bluestore_allocate_hist, e.length, need);
+  }
 }
 
 void BlueStore::_record_allocation_stats()
@@ -16365,29 +17651,22 @@ void BlueStoreRepairer::fix_per_pool_omap(KeyValueDB *db, int val)
 }
 
 bool BlueStoreRepairer::fix_shared_blob(
-  KeyValueDB *db,
+  KeyValueDB::Transaction txn,
   uint64_t sbid,
-  const bufferlist* bl)
+  bluestore_extent_ref_map_t* ref_map,
+  size_t repaired)
 {
-  std::lock_guard l(lock); // possibly redundant
-  KeyValueDB::Transaction txn;
-  if (fix_misreferences_txn) { // reuse this txn
-    txn = fix_misreferences_txn;
-  } else {
-    if (!fix_shared_blob_txn) {
-      fix_shared_blob_txn = db->get_transaction();
-    }
-    txn = fix_shared_blob_txn;
-  }
   string key;
   get_shared_blob_key(sbid, &key);
-
-  ++to_repair_cnt;
-  if (bl) {
-    txn->set(PREFIX_SHARED_BLOB, key, *bl);
+  if (ref_map) {
+    bluestore_shared_blob_t persistent(sbid, std::move(*ref_map));
+    bufferlist bl;
+    encode(persistent, bl);
+    txn->set(PREFIX_SHARED_BLOB, key, bl);
   } else {
     txn->rmkey(PREFIX_SHARED_BLOB, key);
   }
+  to_repair_cnt += repaired;
   return true;
 }
 
@@ -16413,6 +17692,8 @@ bool BlueStoreRepairer::fix_leaked(KeyValueDB *db,
                                   uint64_t offset, uint64_t len)
 {
   std::lock_guard l(lock);
+  ceph_assert(!fm->is_null_manager());
+
   if (!fix_fm_leaked_txn) {
     fix_fm_leaked_txn = db->get_transaction();
   }
@@ -16425,6 +17706,8 @@ bool BlueStoreRepairer::fix_false_free(KeyValueDB *db,
                                       uint64_t offset, uint64_t len)
 {
   std::lock_guard l(lock);
+  ceph_assert(!fm->is_null_manager());
+
   if (!fix_fm_false_free_txn) {
     fix_fm_false_free_txn = db->get_transaction();
   }
@@ -16464,36 +17747,43 @@ unsigned BlueStoreRepairer::apply(KeyValueDB* db)
 {
   //NB: not for use in multithreading mode!!!
   if (fix_per_pool_omap_txn) {
-    db->submit_transaction_sync(fix_per_pool_omap_txn);
+    auto ok = db->submit_transaction_sync(fix_per_pool_omap_txn) == 0;
+    ceph_assert(ok);
     fix_per_pool_omap_txn = nullptr;
   }
   if (fix_fm_leaked_txn) {
-    db->submit_transaction_sync(fix_fm_leaked_txn);
+    auto ok = db->submit_transaction_sync(fix_fm_leaked_txn) == 0;
+    ceph_assert(ok);
     fix_fm_leaked_txn = nullptr;
   }
   if (fix_fm_false_free_txn) {
-    db->submit_transaction_sync(fix_fm_false_free_txn);
+    auto ok = db->submit_transaction_sync(fix_fm_false_free_txn) == 0;
+    ceph_assert(ok);
     fix_fm_false_free_txn = nullptr;
   }
   if (remove_key_txn) {
-    db->submit_transaction_sync(remove_key_txn);
+    auto ok = db->submit_transaction_sync(remove_key_txn) == 0;
+    ceph_assert(ok);
     remove_key_txn = nullptr;
   }
   if (fix_misreferences_txn) {
-    db->submit_transaction_sync(fix_misreferences_txn);
+    auto ok = db->submit_transaction_sync(fix_misreferences_txn) == 0;
+    ceph_assert(ok);
     fix_misreferences_txn = nullptr;
   }
   if (fix_onode_txn) {
-    db->submit_transaction_sync(fix_onode_txn);
+    auto ok = db->submit_transaction_sync(fix_onode_txn) == 0;
+    ceph_assert(ok);
     fix_onode_txn = nullptr;
   }
   if (fix_shared_blob_txn) {
-    db->submit_transaction_sync(fix_shared_blob_txn);
+    auto ok = db->submit_transaction_sync(fix_shared_blob_txn) == 0;
+    ceph_assert(ok);
     fix_shared_blob_txn = nullptr;
   }
-
   if (fix_statfs_txn) {
-    db->submit_transaction_sync(fix_statfs_txn);
+    auto ok = db->submit_transaction_sync(fix_statfs_txn) == 0;
+    ceph_assert(ok);
     fix_statfs_txn = nullptr;
   }
   if (need_compact) {
@@ -16658,4 +17948,1459 @@ void RocksDBBlueFSVolumeSelector::dump(ostream& sout) {
   }
 }
 
+BlueFSVolumeSelector* RocksDBBlueFSVolumeSelector::clone_empty() const {
+  RocksDBBlueFSVolumeSelector* ns =
+    new RocksDBBlueFSVolumeSelector(0, 0, 0,
+                                   0, 0, 0,
+                                   0, 0, false);
+  return ns;
+}
+
+bool RocksDBBlueFSVolumeSelector::compare(BlueFSVolumeSelector* other) {
+  RocksDBBlueFSVolumeSelector* o = dynamic_cast<RocksDBBlueFSVolumeSelector*>(other);
+  ceph_assert(o);
+  bool equal = true;
+  for (size_t x = 0; x < BlueFS::MAX_BDEV + 1; x++) {
+    for (size_t y = 0; y <LEVEL_MAX - LEVEL_FIRST + 1; y++) {
+      equal &= (per_level_per_dev_usage.at(x, y) == o->per_level_per_dev_usage.at(x, y));
+    }
+  }
+  for (size_t t = 0; t < LEVEL_MAX - LEVEL_FIRST + 1; t++) {
+    equal &= (per_level_files[t] == o->per_level_files[t]);
+  }
+  return equal;
+}
+
 // =======================================================
+
+//================================================================================================================
+// BlueStore is committing all allocation information (alloc/release) into RocksDB before the client Write is performed.
+// This cause a delay in write path and add significant load to the CPU/Memory/Disk.
+// The reason for the RocksDB updates is that it allows Ceph to survive any failure without losing the allocation state.
+//
+// We changed the code skiping RocksDB updates on allocation time and instead performing a full desatge of the allocator object
+// with all the OSD allocation state in a single step during umount().
+// This change leads to a 25% increase in IOPS and reduced latency in small random-write workload, but exposes the system
+// to losing allocation info in failure cases where we don't call umount.
+// We add code to perform a full allocation-map rebuild from information stored inside the ONode which is used in failure cases.
+// When we perform a graceful shutdown there is no need for recovery and we simply read the allocation-map from a flat file
+// where we store the allocation-map during umount().
+//================================================================================================================
+
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore::NCB::" << __func__ << "::"
+
+static const std::string allocator_dir    = "ALLOCATOR_NCB_DIR";
+static const std::string allocator_file   = "ALLOCATOR_NCB_FILE";
+static uint32_t    s_format_version = 0x01; // support future changes to allocator-map file
+static uint32_t    s_serial         = 0x01;
+
+#if 1
+#define CEPHTOH_32 le32toh
+#define CEPHTOH_64 le64toh
+#define HTOCEPH_32 htole32
+#define HTOCEPH_64 htole64
+#else
+// help debug the encode/decode by forcing alien format
+#define CEPHTOH_32 be32toh
+#define CEPHTOH_64 be64toh
+#define HTOCEPH_32 htobe32
+#define HTOCEPH_64 htobe64
+#endif
+
+// 48 Bytes header for on-disk alloator image
+const uint64_t ALLOCATOR_IMAGE_VALID_SIGNATURE = 0x1FACE0FF;
+struct allocator_image_header {
+  uint32_t format_version;     // 0x00
+  uint32_t valid_signature;    // 0x04
+  utime_t  timestamp;          // 0x08
+  uint32_t serial;             // 0x10
+  uint32_t pad[0x7];           // 0x14
+
+  allocator_image_header() {
+    memset((char*)this, 0, sizeof(allocator_image_header));
+  }
+
+  // create header in CEPH format
+  allocator_image_header(utime_t timestamp, uint32_t format_version, uint32_t serial) {
+    this->format_version  = format_version;
+    this->timestamp       = timestamp;
+    this->valid_signature = ALLOCATOR_IMAGE_VALID_SIGNATURE;
+    this->serial          = serial;
+    memset(this->pad, 0, sizeof(this->pad));
+  }
+
+  friend std::ostream& operator<<(std::ostream& out, const allocator_image_header& header) {
+    out << "format_version  = " << header.format_version << std::endl;
+    out << "valid_signature = " << header.valid_signature << "/" << ALLOCATOR_IMAGE_VALID_SIGNATURE << std::endl;
+    out << "timestamp       = " << header.timestamp << std::endl;
+    out << "serial          = " << header.serial << std::endl;
+    for (unsigned i = 0; i < sizeof(header.pad)/sizeof(uint32_t); i++) {
+      if (header.pad[i]) {
+       out << "header.pad[" << i << "] = " << header.pad[i] << std::endl;
+      }
+    }
+    return out;
+  }
+
+  DENC(allocator_image_header, v, p) {
+    denc(v.format_version, p);
+    denc(v.valid_signature, p);
+    denc(v.timestamp.tv.tv_sec, p);
+    denc(v.timestamp.tv.tv_nsec, p);
+    denc(v.serial, p);
+    for (auto& pad: v.pad) {
+      denc(pad, p);
+    }
+  }
+
+
+  int verify(CephContext* cct, const std::string &path) {
+    if (valid_signature == ALLOCATOR_IMAGE_VALID_SIGNATURE) {
+      for (unsigned i = 0; i < (sizeof(pad) / sizeof(uint32_t)); i++) {
+       if (this->pad[i]) {
+         derr << "Illegal Header - pad[" << i << "]="<< pad[i] << dendl;
+         return -1;
+       }
+      }
+      return 0;
+    }
+    else {
+      derr << "Illegal Header - signature="<< valid_signature << "(" << ALLOCATOR_IMAGE_VALID_SIGNATURE << ")" << dendl;
+      return -1;
+    }
+  }
+};
+WRITE_CLASS_DENC(allocator_image_header)
+
+// 56 Bytes trailer for on-disk alloator image
+struct allocator_image_trailer {
+  extent_t null_extent;         // 0x00
+
+  uint32_t format_version;     // 0x10
+  uint32_t valid_signature;    // 0x14
+
+  utime_t  timestamp;          // 0x18
+
+  uint32_t serial;             // 0x20
+  uint32_t pad;                // 0x24
+  uint64_t entries_count;      // 0x28
+  uint64_t allocation_size;    // 0x30
+
+  // trailer is created in CEPH format
+  allocator_image_trailer(utime_t timestamp, uint32_t format_version, uint32_t serial, uint64_t entries_count, uint64_t allocation_size) {
+    memset((char*)&(this->null_extent), 0, sizeof(this->null_extent));
+    this->format_version  = format_version;
+    this->valid_signature = ALLOCATOR_IMAGE_VALID_SIGNATURE;
+    this->timestamp       = timestamp;
+    this->serial          = serial;
+    this->pad             = 0;
+    this->entries_count   = entries_count;
+    this->allocation_size = allocation_size;
+  }
+
+  allocator_image_trailer() {
+    memset((char*)this, 0, sizeof(allocator_image_trailer));
+  }
+
+  friend std::ostream& operator<<(std::ostream& out, const allocator_image_trailer& trailer) {
+    if (trailer.null_extent.offset || trailer.null_extent.length) {
+      out << "trailer.null_extent.offset = " << trailer.null_extent.offset << std::endl;
+      out << "trailer.null_extent.length = " << trailer.null_extent.length << std::endl;
+    }
+    out << "format_version  = " << trailer.format_version << std::endl;
+    out << "valid_signature = " << trailer.valid_signature << "/" << ALLOCATOR_IMAGE_VALID_SIGNATURE << std::endl;
+    out << "timestamp       = " << trailer.timestamp << std::endl;
+    out << "serial          = " << trailer.serial << std::endl;
+    if (trailer.pad) {
+      out << "trailer.pad= " << trailer.pad << std::endl;
+    }
+    out << "entries_count   = " << trailer.entries_count   << std::endl;
+    out << "allocation_size = " << trailer.allocation_size << std::endl;
+    return out;
+  }
+
+  int verify(CephContext* cct, const std::string &path, const allocator_image_header *p_header, uint64_t entries_count, uint64_t allocation_size) {
+    if (valid_signature == ALLOCATOR_IMAGE_VALID_SIGNATURE) {
+
+      // trailer must starts with null extents (both fields set to zero) [no need to convert formats for zero)
+      if (null_extent.offset || null_extent.length) {
+       derr << "illegal trailer - null_extent = [" << null_extent.offset << "," << null_extent.length << "]"<< dendl;
+       return -1;
+      }
+
+      if (serial != p_header->serial) {
+       derr << "Illegal trailer: header->serial(" << p_header->serial << ") != trailer->serial(" << serial << ")" << dendl;
+       return -1;
+      }
+
+      if (format_version != p_header->format_version) {
+       derr << "Illegal trailer: header->format_version(" << p_header->format_version
+            << ") != trailer->format_version(" << format_version << ")" << dendl;
+       return -1;
+      }
+
+      if (timestamp != p_header->timestamp) {
+       derr << "Illegal trailer: header->timestamp(" << p_header->timestamp
+            << ") != trailer->timestamp(" << timestamp << ")" << dendl;
+       return -1;
+      }
+
+      if (this->entries_count != entries_count) {
+       derr << "Illegal trailer: entries_count(" << entries_count << ") != trailer->entries_count("
+            << this->entries_count << ")" << dendl;
+       return -1;
+      }
+
+      if (this->allocation_size != allocation_size) {
+       derr << "Illegal trailer: allocation_size(" << allocation_size << ") != trailer->allocation_size("
+            << this->allocation_size << ")" << dendl;
+       return -1;
+      }
+
+      if (pad) {
+       derr << "Illegal Trailer - pad="<< pad << dendl;
+       return -1;
+      }
+
+      // if arrived here -> trailer is valid !!
+      return 0;
+    } else {
+      derr << "Illegal Trailer - signature="<< valid_signature << "(" << ALLOCATOR_IMAGE_VALID_SIGNATURE << ")" << dendl;
+      return -1;
+    }
+  }
+
+  DENC(allocator_image_trailer, v, p) {
+    denc(v.null_extent.offset, p);
+    denc(v.null_extent.length, p);
+    denc(v.format_version, p);
+    denc(v.valid_signature, p);
+    denc(v.timestamp.tv.tv_sec, p);
+    denc(v.timestamp.tv.tv_nsec, p);
+    denc(v.serial, p);
+    denc(v.pad, p);
+    denc(v.entries_count, p);
+    denc(v.allocation_size, p);
+  }
+};
+WRITE_CLASS_DENC(allocator_image_trailer)
+
+
+//-------------------------------------------------------------------------------------
+// invalidate old allocation file if exists so will go directly to recovery after failure
+// we can safely ignore non-existing file
+int BlueStore::invalidate_allocation_file_on_bluefs()
+{
+  // mark that allocation-file was invalidated and we should destage a new copy whne closing db
+  need_to_destage_allocation_file = true;
+  dout(10) << "need_to_destage_allocation_file was set" << dendl;
+
+  BlueFS::FileWriter *p_handle = nullptr;
+  if (!bluefs->dir_exists(allocator_dir)) {
+    dout(5) << "allocator_dir(" << allocator_dir << ") doesn't exist" << dendl;
+    // nothing to do -> return
+    return 0;
+  }
+
+  int ret = bluefs->stat(allocator_dir, allocator_file, nullptr, nullptr);
+  if (ret != 0) {
+    dout(5) << "allocator_file(" << allocator_file << ") doesn't exist" << dendl;
+    // nothing to do -> return
+    return 0;
+  }
+
+
+  ret = bluefs->open_for_write(allocator_dir, allocator_file, &p_handle, true);
+  if (ret != 0) {
+    derr << "Failed open_for_write with error-code " << ret << dendl;
+    return -1;
+  }
+
+  dout(5) << "invalidate using bluefs->truncate(p_handle, 0)" << dendl;
+  ret = bluefs->truncate(p_handle, 0);
+  if (ret != 0) {
+    derr << "Failed truncate with error-code " << ret << dendl;
+    bluefs->close_writer(p_handle);
+    return -1;
+  }
+
+  bluefs->fsync(p_handle);
+  bluefs->close_writer(p_handle);
+
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+// load bluefs extents into bluefs_extents_vec
+int load_bluefs_extents(BlueFS                *bluefs,
+                       bluefs_layout_t       *bluefs_layout,
+                       CephContext*           cct,
+                       const std::string     &path,
+                       std::vector<extent_t> &bluefs_extents_vec,
+                       uint64_t               min_alloc_size)
+{
+  if (! bluefs) {
+    dout(5) <<  "No BlueFS device found!!" << dendl;
+    return 0;
+  }
+
+  interval_set<uint64_t> bluefs_extents;
+  int ret = bluefs->get_block_extents(bluefs_layout->shared_bdev, &bluefs_extents);
+  if (ret < 0) {
+    derr  <<  "failed bluefs->get_block_extents()!!" << dendl;
+    return ret;
+  }
+
+  for (auto itr = bluefs_extents.begin(); itr != bluefs_extents.end(); itr++) {
+    extent_t e = { .offset = itr.get_start(), .length = itr.get_len() };
+    bluefs_extents_vec.push_back(e);
+  }
+
+  dout(5) <<  "BlueFS extent_count=" << bluefs_extents_vec.size() << dendl;
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+int BlueStore::copy_allocator(Allocator* src_alloc, Allocator* dest_alloc, uint64_t* p_num_entries)
+{
+  *p_num_entries = 0;
+  auto count_entries = [&](uint64_t extent_offset, uint64_t extent_length) {
+    (*p_num_entries)++;
+  };
+  src_alloc->dump(count_entries);
+
+  dout(5) << "count num_entries=" << *p_num_entries << dendl;
+
+  // add 16K extra entries in case new allocation happened
+  (*p_num_entries) += 16*1024;
+  unique_ptr<extent_t[]> arr;
+  try {
+    arr = make_unique<extent_t[]>(*p_num_entries);
+  } catch (std::bad_alloc&) {
+    derr << "****Failed dynamic allocation, num_entries=" << *p_num_entries << dendl;
+    return -1;
+  }
+
+  uint64_t idx         = 0;
+  auto copy_entries = [&](uint64_t extent_offset, uint64_t extent_length) {
+    if (extent_length > 0) {
+      if (idx < *p_num_entries) {
+       arr[idx] = {extent_offset, extent_length};
+      }
+      idx++;
+    }
+    else {
+      derr << "zero length extent!!! offset=" << extent_offset << ", index=" << idx << dendl;
+    }
+  };
+  src_alloc->dump(copy_entries);
+
+  dout(5) << "copy num_entries=" << idx << dendl;
+  if (idx > *p_num_entries) {
+    derr << "****spillover, num_entries=" << *p_num_entries << ", spillover=" << (idx - *p_num_entries) << dendl;
+    ceph_assert(idx <= *p_num_entries);
+  }
+
+  *p_num_entries = idx;
+
+  for (idx = 0; idx < *p_num_entries; idx++) {
+    const extent_t *p_extent = &arr[idx];
+    dest_alloc->init_add_free(p_extent->offset, p_extent->length);
+  }
+
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+static uint32_t flush_extent_buffer_with_crc(BlueFS::FileWriter *p_handle, const char* buffer, const char *p_curr, uint32_t crc)
+{
+  std::ptrdiff_t length = p_curr - buffer;
+  p_handle->append(buffer, length);
+
+  crc = ceph_crc32c(crc, (const uint8_t*)buffer, length);
+  uint32_t encoded_crc = HTOCEPH_32(crc);
+  p_handle->append((byte*)&encoded_crc, sizeof(encoded_crc));
+
+  return crc;
+}
+
+const unsigned MAX_EXTENTS_IN_BUFFER = 4 * 1024; // 4K extents = 64KB of data
+// write the allocator to a flat bluefs file - 4K extents at a time
+//-----------------------------------------------------------------------------------
+int BlueStore::store_allocator(Allocator* src_allocator)
+{
+  // when storing allocations to file we must be sure there is no background compactions
+  // the easiest way to achieve it is to make sure db is closed
+  ceph_assert(db == nullptr);
+  utime_t  start_time = ceph_clock_now();
+  int ret = 0;
+
+  // create dir if doesn't exist already
+  if (!bluefs->dir_exists(allocator_dir) ) {
+    ret = bluefs->mkdir(allocator_dir);
+    if (ret != 0) {
+      derr << "Failed mkdir with error-code " << ret << dendl;
+      return -1;
+    }
+  }
+
+  // reuse previous file-allocation if exists
+  ret = bluefs->stat(allocator_dir, allocator_file, nullptr, nullptr);
+  bool overwrite_file = (ret == 0);
+  //derr <<  __func__ << "bluefs->open_for_write(" << overwrite_file << ")" << dendl;
+  BlueFS::FileWriter *p_handle = nullptr;
+  ret = bluefs->open_for_write(allocator_dir, allocator_file, &p_handle, overwrite_file);
+  if (ret != 0) {
+    derr <<  __func__ << "Failed open_for_write with error-code " << ret << dendl;
+    return -1;
+  }
+
+  uint64_t file_size = p_handle->file->fnode.size;
+  uint64_t allocated = p_handle->file->fnode.get_allocated();
+  dout(5) << "file_size=" << file_size << ", allocated=" << allocated << dendl;
+
+  unique_ptr<Allocator> allocator(clone_allocator_without_bluefs(src_allocator));
+  if (!allocator) {
+    bluefs->close_writer(p_handle);
+    return -1;
+  }
+
+  // store all extents (except for the bluefs extents we removed) in a single flat file
+  utime_t                 timestamp = ceph_clock_now();
+  uint32_t                crc       = -1;
+  {
+    allocator_image_header  header(timestamp, s_format_version, s_serial);
+    bufferlist              header_bl;
+    encode(header, header_bl);
+    crc = header_bl.crc32c(crc);
+    encode(crc, header_bl);
+    p_handle->append(header_bl);
+  }
+
+  crc = -1;                                     // reset crc
+  extent_t        buffer[MAX_EXTENTS_IN_BUFFER]; // 64KB
+  extent_t       *p_curr          = buffer;
+  const extent_t *p_end           = buffer + MAX_EXTENTS_IN_BUFFER;
+  uint64_t        extent_count    = 0;
+  uint64_t        allocation_size = 0;
+  auto iterated_allocation = [&](uint64_t extent_offset, uint64_t extent_length) {
+    if (extent_length == 0) {
+      derr <<  __func__ << "" << extent_count << "::[" << extent_offset << "," << extent_length << "]" << dendl;
+      ret = -1;
+      return;
+    }
+    p_curr->offset = HTOCEPH_64(extent_offset);
+    p_curr->length = HTOCEPH_64(extent_length);
+    extent_count++;
+    allocation_size += extent_length;
+    p_curr++;
+
+    if (p_curr == p_end) {
+      crc = flush_extent_buffer_with_crc(p_handle, (const char*)buffer, (const char*)p_curr, crc);
+      p_curr = buffer; // recycle the buffer
+    }
+  };
+  allocator->dump(iterated_allocation);
+  // if got null extent -> fail the operation
+  if (ret != 0) {
+    derr << "Illegal extent, fail store operation" << dendl;
+    derr << "invalidate using bluefs->truncate(p_handle, 0)" << dendl;
+    bluefs->truncate(p_handle, 0);
+    bluefs->close_writer(p_handle);
+    return -1;
+  }
+
+  // if we got any leftovers -> add crc and append to file
+  if (p_curr > buffer) {
+    crc = flush_extent_buffer_with_crc(p_handle, (const char*)buffer, (const char*)p_curr, crc);
+  }
+
+  {
+    allocator_image_trailer trailer(timestamp, s_format_version, s_serial, extent_count, allocation_size);
+    bufferlist trailer_bl;
+    encode(trailer, trailer_bl);
+    uint32_t crc = -1;
+    crc = trailer_bl.crc32c(crc);
+    encode(crc, trailer_bl);
+    p_handle->append(trailer_bl);
+  }
+
+  bluefs->fsync(p_handle);
+  bluefs->truncate(p_handle, p_handle->pos);
+  bluefs->fsync(p_handle);
+
+  utime_t duration = ceph_clock_now() - start_time;
+  dout(5) <<"WRITE-extent_count=" << extent_count << ", file_size=" << p_handle->file->fnode.size << dendl;
+  dout(5) <<"p_handle->pos=" << p_handle->pos << " WRITE-duration=" << duration << " seconds" << dendl;
+
+  bluefs->close_writer(p_handle);
+  need_to_destage_allocation_file = false;
+  dout(10) << "need_to_destage_allocation_file was clear" << dendl;
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+Allocator* BlueStore::create_bitmap_allocator(uint64_t bdev_size) {
+  // create allocator
+  uint64_t alloc_size = min_alloc_size;
+  Allocator* alloc = Allocator::create(cct, "bitmap", bdev_size, alloc_size,
+                                      zone_size, first_sequential_zone,
+                                      "recovery");
+  if (alloc) {
+    return alloc;
+  } else {
+    derr << "Failed Allocator Creation" << dendl;
+    return nullptr;
+  }
+}
+
+//-----------------------------------------------------------------------------------
+size_t calc_allocator_image_header_size()
+{
+  utime_t                 timestamp = ceph_clock_now();
+  allocator_image_header  header(timestamp, s_format_version, s_serial);
+  bufferlist              header_bl;
+  encode(header, header_bl);
+  uint32_t crc = -1;
+  crc = header_bl.crc32c(crc);
+  encode(crc, header_bl);
+
+  return header_bl.length();
+}
+
+//-----------------------------------------------------------------------------------
+int calc_allocator_image_trailer_size()
+{
+  utime_t                 timestamp       = ceph_clock_now();
+  uint64_t                extent_count    = -1;
+  uint64_t                allocation_size = -1;
+  uint32_t                crc             = -1;
+  bufferlist              trailer_bl;
+  allocator_image_trailer trailer(timestamp, s_format_version, s_serial, extent_count, allocation_size);
+
+  encode(trailer, trailer_bl);
+  crc = trailer_bl.crc32c(crc);
+  encode(crc, trailer_bl);
+  return trailer_bl.length();
+}
+
+//-----------------------------------------------------------------------------------
+int BlueStore::__restore_allocator(Allocator* allocator, uint64_t *num, uint64_t *bytes)
+{
+  utime_t start_time = ceph_clock_now();
+  BlueFS::FileReader *p_temp_handle = nullptr;
+  int ret = bluefs->open_for_read(allocator_dir, allocator_file, &p_temp_handle, false);
+  if (ret != 0) {
+    derr << "Failed open_for_read with error-code " << ret << dendl;
+    return -1;
+  }
+  unique_ptr<BlueFS::FileReader> p_handle(p_temp_handle);
+  uint64_t read_alloc_size = 0;
+  uint64_t file_size = p_handle->file->fnode.size;
+  dout(5) << "file_size=" << file_size << ",sizeof(extent_t)=" << sizeof(extent_t) << dendl;
+
+  // make sure we were able to store a valid copy
+  if (file_size == 0) {
+    derr << "No Valid allocation info on disk (empty file)" << dendl;
+    return -1;
+  }
+
+  // first read the header
+  size_t                 offset = 0;
+  allocator_image_header header;
+  int                    header_size = calc_allocator_image_header_size();
+  {
+    bufferlist header_bl,temp_bl;
+    int        read_bytes = bluefs->read(p_handle.get(), offset, header_size, &temp_bl, nullptr);
+    if (read_bytes != header_size) {
+      derr << "Failed bluefs->read() for header::read_bytes=" << read_bytes << ", req_bytes=" << header_size << dendl;
+      return -1;
+    }
+
+    offset += read_bytes;
+
+    header_bl.claim_append(temp_bl);
+    auto p = header_bl.cbegin();
+    decode(header, p);
+    if (header.verify(cct, path) != 0 ) {
+      derr << "header = \n" << header << dendl;
+      return -1;
+    }
+
+    uint32_t crc_calc = -1, crc;
+    crc_calc = header_bl.cbegin().crc32c(p.get_off(), crc_calc); //crc from begin to current pos
+    decode(crc, p);
+    if (crc != crc_calc) {
+      derr << "crc mismatch!!! crc=" << crc << ", crc_calc=" << crc_calc << dendl;
+      derr << "header = \n" << header << dendl;
+      return -1;
+    }
+
+    // increment version for next store
+    s_serial = header.serial + 1;
+  }
+
+  // then read the payload (extents list) using a recycled buffer
+  extent_t        buffer[MAX_EXTENTS_IN_BUFFER]; // 64KB
+  uint32_t        crc                = -1;
+  int             trailer_size       = calc_allocator_image_trailer_size();
+  uint64_t        extent_count       = 0;
+  uint64_t        extents_bytes_left = file_size - (header_size + trailer_size + sizeof(crc));
+  while (extents_bytes_left) {
+    int req_bytes  = std::min(extents_bytes_left, sizeof(buffer));
+    int read_bytes = bluefs->read(p_handle.get(), offset, req_bytes, nullptr, (char*)buffer);
+    if (read_bytes != req_bytes) {
+      derr << "Failed bluefs->read()::read_bytes=" << read_bytes << ", req_bytes=" << req_bytes << dendl;
+      return -1;
+    }
+
+    offset             += read_bytes;
+    extents_bytes_left -= read_bytes;
+
+    const unsigned  num_extent_in_buffer = read_bytes/sizeof(extent_t);
+    const extent_t *p_end                = buffer + num_extent_in_buffer;
+    for (const extent_t *p_ext = buffer; p_ext < p_end; p_ext++) {
+      uint64_t offset = CEPHTOH_64(p_ext->offset);
+      uint64_t length = CEPHTOH_64(p_ext->length);
+      read_alloc_size += length;
+
+      if (length > 0) {
+       allocator->init_add_free(offset, length);
+       extent_count ++;
+      } else {
+       derr << "extent with zero length at idx=" << extent_count << dendl;
+       return -1;
+      }
+    }
+
+    uint32_t calc_crc = ceph_crc32c(crc, (const uint8_t*)buffer, read_bytes);
+    read_bytes        = bluefs->read(p_handle.get(), offset, sizeof(crc), nullptr, (char*)&crc);
+    if (read_bytes == sizeof(crc) ) {
+      crc     = CEPHTOH_32(crc);
+      if (crc != calc_crc) {
+       derr << "data crc mismatch!!! crc=" << crc << ", calc_crc=" << calc_crc << dendl;
+       derr << "extents_bytes_left=" << extents_bytes_left << ", offset=" << offset << ", extent_count=" << extent_count << dendl;
+       return -1;
+      }
+
+      offset += read_bytes;
+      if (extents_bytes_left) {
+       extents_bytes_left -= read_bytes;
+      }
+    } else {
+      derr << "Failed bluefs->read() for crc::read_bytes=" << read_bytes << ", req_bytes=" << sizeof(crc) << dendl;
+      return -1;
+    }
+
+  }
+
+  // finally, read teh trailer and verify it is in good shape and that we got all the extents
+  {
+    bufferlist trailer_bl,temp_bl;
+    int        read_bytes = bluefs->read(p_handle.get(), offset, trailer_size, &temp_bl, nullptr);
+    if (read_bytes != trailer_size) {
+      derr << "Failed bluefs->read() for trailer::read_bytes=" << read_bytes << ", req_bytes=" << trailer_size << dendl;
+      return -1;
+    }
+    offset += read_bytes;
+
+    trailer_bl.claim_append(temp_bl);
+    uint32_t crc_calc = -1;
+    uint32_t crc;
+    allocator_image_trailer trailer;
+    auto p = trailer_bl.cbegin();
+    decode(trailer, p);
+    if (trailer.verify(cct, path, &header, extent_count, read_alloc_size) != 0 ) {
+      derr << "trailer=\n" << trailer << dendl;
+      return -1;
+    }
+
+    crc_calc = trailer_bl.cbegin().crc32c(p.get_off(), crc_calc); //crc from begin to current pos
+    decode(crc, p);
+    if (crc != crc_calc) {
+      derr << "trailer crc mismatch!::crc=" << crc << ", crc_calc=" << crc_calc << dendl;
+      derr << "trailer=\n" << trailer << dendl;
+      return -1;
+    }
+  }
+
+  utime_t duration = ceph_clock_now() - start_time;
+  dout(5) << "READ--extent_count=" << extent_count << ", read_alloc_size=  "
+           << read_alloc_size << ", file_size=" << file_size << dendl;
+  dout(5) << "READ duration=" << duration << " seconds, s_serial=" << s_serial << dendl;
+  *num   = extent_count;
+  *bytes = read_alloc_size;
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+int BlueStore::restore_allocator(Allocator* dest_allocator, uint64_t *num, uint64_t *bytes)
+{
+  utime_t    start = ceph_clock_now();
+  auto temp_allocator = unique_ptr<Allocator>(create_bitmap_allocator(bdev->get_size()));
+  int ret = __restore_allocator(temp_allocator.get(), num, bytes);
+  if (ret != 0) {
+    return ret;
+  }
+
+  uint64_t num_entries = 0;
+  dout(5) << " calling copy_allocator(bitmap_allocator -> shared_alloc.a)" << dendl;
+  copy_allocator(temp_allocator.get(), dest_allocator, &num_entries);
+  utime_t duration = ceph_clock_now() - start;
+  dout(5) << "restored in " << duration << " seconds, num_entries=" << num_entries << dendl;
+  return ret;
+}
+
+//-------------------------------------------------------------------------
+void BlueStore::ExtentMap::provide_shard_info_to_onode(bufferlist v, uint32_t shard_id)
+{
+  [[maybe_unused]] auto cct  = onode->c->store->cct;
+  auto path = onode->c->store->path;
+  if (shard_id < shards.size()) {
+    auto p = &shards[shard_id];
+    if (!p->loaded) {
+      dout(30) << "opening shard 0x" << std::hex << p->shard_info->offset << std::dec << dendl;
+      p->extents = decode_some(v);
+      p->loaded = true;
+      dout(20) << "open shard 0x" << std::hex << p->shard_info->offset << std::dec << dendl;
+      ceph_assert(p->dirty == false);
+      ceph_assert(v.length() == p->shard_info->bytes);
+    }
+  } else {
+    derr << "illegal shard-id=" << shard_id << " shards.size()=" << shards.size() << dendl;
+    ceph_assert(shard_id < shards.size());
+  }
+}
+
+//-----------------------------------------------------------------------------------
+void BlueStore::set_allocation_in_simple_bmap(SimpleBitmap* sbmap, uint64_t offset, uint64_t length)
+{
+  ceph_assert((offset & min_alloc_size_mask) == 0);
+  ceph_assert((length & min_alloc_size_mask) == 0);
+  sbmap->set(offset >> min_alloc_size_order, length >> min_alloc_size_order);
+}
+
+//---------------------------------------------------------
+// Process all physical extents from a given Onode (including all its shards)
+void BlueStore::read_allocation_from_single_onode(
+  SimpleBitmap*        sbmap,
+  BlueStore::OnodeRef& onode_ref,
+  read_alloc_stats_t&  stats)
+{
+  // create a map holding all physical-extents of this Onode to prevent duplication from being added twice and more
+  std::unordered_map<uint64_t, uint32_t> lcl_extnt_map;
+  unsigned blobs_count = 0;
+  uint64_t pos = 0;
+
+  stats.spanning_blob_count += onode_ref->extent_map.spanning_blob_map.size();
+  // first iterate over all logical-extents
+  for (struct Extent& l_extent : onode_ref->extent_map.extent_map) {
+    ceph_assert(l_extent.logical_offset >= pos);
+
+    pos = l_extent.logical_offset + l_extent.length;
+    ceph_assert(l_extent.blob);
+    const bluestore_blob_t& blob         = l_extent.blob->get_blob();
+    const PExtentVector&    p_extent_vec = blob.get_extents();
+    blobs_count++;
+    if (blob.is_compressed()) {
+      stats.compressed_blob_count++;
+    }
+
+    if (blob.is_shared()) {
+      stats.shared_blobs_count++;
+    }
+
+    // process all physical extent in this blob
+    for (auto p_extent = p_extent_vec.begin(); p_extent != p_extent_vec.end(); p_extent++) {
+      auto offset = p_extent->offset;
+      auto length = p_extent->length;
+
+      // Offset of -1 means that the extent was removed (and it is only a place holder) and can be safely skipped
+      if (offset == (uint64_t)-1) {
+       stats.skipped_illegal_extent++;
+       continue;
+      }
+
+      if (!blob.is_shared()) {
+       // skip repeating extents
+       auto lcl_itr = lcl_extnt_map.find(offset);
+       // extents using shared blobs might have differnt length
+       if (lcl_itr != lcl_extnt_map.end() ) {
+         // repeated extents must have the same length!
+         ceph_assert(lcl_extnt_map[offset] == length);
+         stats.skipped_repeated_extent++;
+       } else {
+         lcl_extnt_map[offset] = length;
+         set_allocation_in_simple_bmap(sbmap, offset, length);
+         stats.extent_count++;
+       }
+      } else {
+       // extents using shared blobs might have differnt length
+       set_allocation_in_simple_bmap(sbmap, offset, length);
+       stats.extent_count++;
+      }
+
+    } // physical-extents loop
+
+  } // logical-extents loop
+
+  if (blobs_count < MAX_BLOBS_IN_ONODE) {
+    stats.blobs_in_onode[blobs_count]++;
+  } else {
+    // store all counts higher than MAX_BLOBS_IN_ONODE in a single bucket at offset zero
+    stats.blobs_in_onode[MAX_BLOBS_IN_ONODE]++;
+  }
+}
+
+//-------------------------------------------------------------------------
+int BlueStore::read_allocation_from_onodes(SimpleBitmap *sbmap, read_alloc_stats_t& stats)
+{
+  // finally add all space take by user data
+  auto it = db->get_iterator(PREFIX_OBJ, KeyValueDB::ITERATOR_NOCACHE);
+  if (!it) {
+    // TBD - find a better error code
+    derr << "failed db->get_iterator(PREFIX_OBJ)" << dendl;
+    return -1;
+  }
+
+  CollectionRef       collection_ref;
+  spg_t               pgid;
+  BlueStore::OnodeRef onode_ref;
+  bool                has_open_onode = false;
+  uint32_t            shard_id       = 0;
+  uint64_t            kv_count       = 0;
+  uint64_t            count_interval = 1'000'000;
+  // iterate over all ONodes stored in RocksDB
+  for (it->lower_bound(string()); it->valid(); it->next(), kv_count++) {
+    // trace an even after every million processed objects (typically every 5-10 seconds)
+    if (kv_count && (kv_count % count_interval == 0) ) {
+      dout(5) << "processed objects count = " << kv_count << dendl;
+    }
+
+    // Shards - Code
+    // add the extents from the shards to the main Obj
+    if (is_extent_shard_key(it->key())) {
+      // shards must follow a valid main object
+      if (has_open_onode) {
+       // shards keys must start with the main object key
+       if (it->key().find(onode_ref->key) == 0) {
+         // shards count can't exceed declared shard-count in the main-object
+         if (shard_id < onode_ref->extent_map.shards.size()) {
+           onode_ref->extent_map.provide_shard_info_to_onode(it->value(), shard_id);
+           stats.shard_count++;
+           shard_id++;
+         } else {
+           derr << "illegal shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
+           derr << "shard->key=" << pretty_binary_string(it->key()) << dendl;
+           ceph_assert(shard_id < onode_ref->extent_map.shards.size());
+         }
+       } else {
+         derr << "illegal shard-key::onode->key=" << pretty_binary_string(onode_ref->key) << " shard->key=" << pretty_binary_string(it->key()) << dendl;
+         ceph_assert(it->key().find(onode_ref->key) == 0);
+       }
+      } else {
+       derr << "error::shard without main objects for key=" << pretty_binary_string(it->key()) << dendl;
+       ceph_assert(has_open_onode);
+      }
+
+    } else {
+      // Main Object Code
+
+      if (has_open_onode) {
+       // make sure we got all shards of this object
+       if (shard_id == onode_ref->extent_map.shards.size()) {
+         // We completed an Onode Object -> pass it to be processed
+         read_allocation_from_single_onode(sbmap, onode_ref, stats);
+       } else {
+         derr << "Missing shards! shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
+         ceph_assert(shard_id == onode_ref->extent_map.shards.size());
+       }
+      } else {
+       // We opened a new Object
+       has_open_onode =  true;
+      }
+
+      // The main Obj is always first in RocksDB so we can start with shard_id set to zero
+      shard_id = 0;
+      stats.onode_count++;
+      ghobject_t oid;
+      int ret = get_key_object(it->key(), &oid);
+      if (ret < 0) {
+       derr << "bad object key " << pretty_binary_string(it->key()) << dendl;
+       ceph_assert(ret == 0);
+       continue;
+      }
+
+      // fill collection_ref if doesn't exist yet
+      // We process all the obejcts in a given collection and then move to the next collection
+      // This means we only search once for every given collection
+      if (!collection_ref                                     ||
+         oid.shard_id                != pgid.shard           ||
+         oid.hobj.get_logical_pool() != (int64_t)pgid.pool() ||
+         !collection_ref->contains(oid)) {
+       stats.collection_search++;
+       collection_ref = nullptr;
+
+       for (auto& p : coll_map) {
+         if (p.second->contains(oid)) {
+           collection_ref = p.second;
+           break;
+         }
+       }
+
+       if (!collection_ref) {
+         derr << "stray object " << oid << " not owned by any collection" << dendl;
+         ceph_assert(collection_ref);
+         continue;
+       }
+
+       collection_ref->cid.is_pg(&pgid);
+      }
+      onode_ref.reset(BlueStore::Onode::decode(collection_ref, oid, it->key(), it->value()));
+    }
+  }
+
+  // process the last object
+  if (has_open_onode) {
+    // make sure we got all shards of this object
+    if (shard_id == onode_ref->extent_map.shards.size()) {
+      // We completed an Onode Object -> pass it to be processed
+      read_allocation_from_single_onode(sbmap, onode_ref, stats);
+    } else {
+      derr << "Last Object is missing shards! shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
+      ceph_assert(shard_id == onode_ref->extent_map.shards.size());
+    }
+  }
+  dout(5) << "onode_count=" << stats.onode_count << " ,shard_count=" << stats.shard_count << dendl;
+
+  return 0;
+}
+
+//---------------------------------------------------------
+int BlueStore::reconstruct_allocations(SimpleBitmap *sbmap, read_alloc_stats_t &stats)
+{
+  // first set space used by superblock
+  auto super_length = std::max<uint64_t>(min_alloc_size, SUPER_RESERVED);
+  set_allocation_in_simple_bmap(sbmap, 0, super_length);
+  stats.extent_count++;
+
+  // then set all space taken by Objects
+  int ret = read_allocation_from_onodes(sbmap, stats);
+  if (ret < 0) {
+    derr << "failed read_allocation_from_onodes()" << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+static void copy_simple_bitmap_to_allocator(SimpleBitmap* sbmap, Allocator* dest_alloc, uint64_t alloc_size)
+{
+  int alloc_size_shift = ctz(alloc_size);
+  uint64_t offset = 0;
+  extent_t ext    = sbmap->get_next_clr_extent(offset);
+  while (ext.length != 0) {
+    dest_alloc->init_add_free(ext.offset << alloc_size_shift, ext.length << alloc_size_shift);
+    offset = ext.offset + ext.length;
+    ext = sbmap->get_next_clr_extent(offset);
+  }
+}
+
+//---------------------------------------------------------
+int BlueStore::read_allocation_from_drive_on_startup()
+{
+  int ret = 0;
+
+  ret = _open_collections();
+  if (ret < 0) {
+    return ret;
+  }
+  auto shutdown_cache = make_scope_guard([&] {
+    _shutdown_cache();
+  });
+
+  utime_t            start = ceph_clock_now();
+  read_alloc_stats_t stats = {};
+  SimpleBitmap       sbmap(cct, div_round_up(bdev->get_size(), min_alloc_size));
+  ret = reconstruct_allocations(&sbmap, stats);
+  if (ret != 0) {
+    return ret;
+  }
+
+  copy_simple_bitmap_to_allocator(&sbmap, alloc, min_alloc_size);
+
+  utime_t duration = ceph_clock_now() - start;
+  dout(1) << "::Allocation Recovery was completed in " << duration << " seconds, extent_count=" << stats.extent_count << dendl;
+  return ret;
+}
+
+
+
+
+// Only used for debugging purposes - we build a secondary allocator from the Onodes and compare it to the existing one
+// Not meant to be run by customers
+#ifdef CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
+
+#include <stdlib.h>
+#include <algorithm>
+//---------------------------------------------------------
+int cmpfunc (const void * a, const void * b)
+{
+  if ( ((extent_t*)a)->offset > ((extent_t*)b)->offset ) {
+    return 1;
+  }
+  else if( ((extent_t*)a)->offset < ((extent_t*)b)->offset ) {
+    return -1;
+  }
+  else {
+    return 0;
+  }
+}
+
+// compare the allocator built from Onodes with the system allocator (CF-B)
+//---------------------------------------------------------
+int BlueStore::compare_allocators(Allocator* alloc1, Allocator* alloc2, uint64_t req_extent_count, uint64_t memory_target)
+{
+  uint64_t allocation_size = std::min((req_extent_count) * sizeof(extent_t), memory_target / 3);
+  uint64_t extent_count    = allocation_size/sizeof(extent_t);
+  dout(5) << "req_extent_count=" << req_extent_count << ", granted extent_count="<< extent_count << dendl;
+
+  unique_ptr<extent_t[]> arr1;
+  unique_ptr<extent_t[]> arr2;
+  try {
+    arr1 = make_unique<extent_t[]>(extent_count);
+    arr2 = make_unique<extent_t[]>(extent_count);
+  } catch (std::bad_alloc&) {
+    derr << "****Failed dynamic allocation, extent_count=" << extent_count << dendl;
+    return -1;
+  }
+
+  // copy the extents from the allocators into simple array and then compare them
+  uint64_t size1 = 0, size2 = 0;
+  uint64_t idx1  = 0, idx2  = 0;
+  auto iterated_mapper1 = [&](uint64_t offset, uint64_t length) {
+    size1 += length;
+    if (idx1 < extent_count) {
+      arr1[idx1++] = {offset, length};
+    }
+    else if (idx1 == extent_count) {
+      derr << "(2)compare_allocators:: spillover"  << dendl;
+      idx1 ++;
+    }
+
+  };
+
+  auto iterated_mapper2 = [&](uint64_t offset, uint64_t length) {
+    size2 += length;
+    if (idx2 < extent_count) {
+      arr2[idx2++] = {offset, length};
+    }
+    else if (idx2 == extent_count) {
+      derr << "(2)compare_allocators:: spillover"  << dendl;
+      idx2 ++;
+    }
+  };
+
+  alloc1->dump(iterated_mapper1);
+  alloc2->dump(iterated_mapper2);
+
+  qsort(arr1.get(), std::min(idx1, extent_count), sizeof(extent_t), cmpfunc);
+  qsort(arr2.get(), std::min(idx2, extent_count), sizeof(extent_t), cmpfunc);
+
+  if (idx1 == idx2) {
+    idx1 = idx2 = std::min(idx1, extent_count);
+    if (memcmp(arr1.get(), arr2.get(), sizeof(extent_t) * idx2) == 0) {
+      return 0;
+    }
+    derr << "Failed memcmp(arr1, arr2, sizeof(extent_t)*idx2)"  << dendl;
+    for (uint64_t i = 0; i < idx1; i++) {
+      if (memcmp(arr1.get()+i, arr2.get()+i, sizeof(extent_t)) != 0) {
+       derr << "!!!![" << i << "] arr1::<" << arr1[i].offset << "," << arr1[i].length << ">" << dendl;
+       derr << "!!!![" << i << "] arr2::<" << arr2[i].offset << "," << arr2[i].length << ">" << dendl;
+       return -1;
+      }
+    }
+    return 0;
+  } else {
+    derr << "mismatch:: idx1=" << idx1 << " idx2=" << idx2 << dendl;
+    std::cout << "==================================================================="  << std::endl;
+    for (uint64_t i = 0; i < idx1; i++) {
+      std::cout << "arr1[" << i << "]<" << arr1[i].offset << "," << arr1[i].length << "> " << std::endl;
+    }
+
+    std::cout << "==================================================================="  << std::endl;
+    for (uint64_t i = 0; i < idx2; i++) {
+      std::cout << "arr2[" << i << "]<" << arr2[i].offset << "," << arr2[i].length << "> " << std::endl;
+    }
+    return -1;
+  }
+}
+
+//---------------------------------------------------------
+int BlueStore::add_existing_bluefs_allocation(Allocator* allocator, read_alloc_stats_t &stats)
+{
+  // then add space used by bluefs to store rocksdb
+  unsigned extent_count = 0;
+  if (bluefs) {
+    interval_set<uint64_t> bluefs_extents;
+    int ret = bluefs->get_block_extents(bluefs_layout.shared_bdev, &bluefs_extents);
+    if (ret < 0) {
+      return ret;
+    }
+    for (auto itr = bluefs_extents.begin(); itr != bluefs_extents.end(); extent_count++, itr++) {
+      allocator->init_rm_free(itr.get_start(), itr.get_len());
+      stats.extent_count++;
+    }
+  }
+
+  dout(5) << "bluefs extent_count=" << extent_count << dendl;
+  return 0;
+}
+
+//---------------------------------------------------------
+int BlueStore::read_allocation_from_drive_for_bluestore_tool()
+{
+  dout(5) << __func__ << dendl;
+  int ret = 0;
+  uint64_t memory_target = cct->_conf.get_val<Option::size_t>("osd_memory_target");
+  ret = _open_db_and_around(true, false);
+  if (ret < 0) {
+    return ret;
+  }
+
+  ret = _open_collections();
+  if (ret < 0) {
+    _close_db_and_around();
+    return ret;
+  }
+
+  utime_t            duration;
+  read_alloc_stats_t stats = {};
+  utime_t            start = ceph_clock_now();
+
+  auto shutdown_cache = make_scope_guard([&] {
+    std::cout << "Allocation Recovery was completed in " << duration
+             << " seconds; insert_count=" << stats.insert_count
+             << "; extent_count=" << stats.extent_count << std::endl;
+    _shutdown_cache();
+    _close_db_and_around();
+  });
+
+  {
+    auto allocator = unique_ptr<Allocator>(create_bitmap_allocator(bdev->get_size()));
+    //reconstruct allocations into a temp simple-bitmap and copy into allocator
+    {
+      SimpleBitmap sbmap(cct, div_round_up(bdev->get_size(), min_alloc_size));
+      ret = reconstruct_allocations(&sbmap, stats);
+      if (ret != 0) {
+       return ret;
+      }
+      copy_simple_bitmap_to_allocator(&sbmap, allocator.get(), min_alloc_size);
+    }
+
+    // add allocation space used by the bluefs itself
+    ret = add_existing_bluefs_allocation(allocator.get(), stats);
+    if (ret < 0) {
+      return ret;
+    }
+
+    duration = ceph_clock_now() - start;
+    stats.insert_count = 0;
+    auto count_entries = [&](uint64_t extent_offset, uint64_t extent_length) {
+      stats.insert_count++;
+    };
+    allocator->dump(count_entries);
+    ret = compare_allocators(allocator.get(), alloc, stats.insert_count, memory_target);
+    if (ret != 0) {
+      dout(5) << "Allocator drive - file integrity check OK" << dendl;
+    } else {
+      derr << "FAILURE. Allocator from file and allocator from metadata differ::ret=" << ret << dendl;
+    }
+  }
+
+  std::cout << stats << std::endl;
+  return ret;
+}
+
+//---------------------------------------------------------
+Allocator* BlueStore::clone_allocator_without_bluefs(Allocator *src_allocator)
+{
+  uint64_t   bdev_size = bdev->get_size();
+  Allocator* allocator = create_bitmap_allocator(bdev_size);
+  if (allocator) {
+    dout(5) << "bitmap-allocator=" << allocator << dendl;
+  } else {
+    derr << "****failed create_bitmap_allocator()" << dendl;
+    return nullptr;
+  }
+
+  uint64_t num_entries = 0;
+  copy_allocator(src_allocator, allocator, &num_entries);
+
+  // BlueFS stores its internal allocation outside RocksDB (FM) so we should not destage them to the allcoator-file
+  // we are going to hide bluefs allocation during allocator-destage as they are stored elsewhere
+  {
+    std::vector<extent_t> bluefs_extents_vec;
+    // load current bluefs internal allocation into a vector
+    load_bluefs_extents(bluefs, &bluefs_layout, cct, path, bluefs_extents_vec, min_alloc_size);
+    // then remove them from the shared allocator before dumping it to disk (bluefs stored them internally)
+    for (auto itr = bluefs_extents_vec.begin(); itr != bluefs_extents_vec.end(); ++itr) {
+      allocator->init_add_free(itr->offset, itr->length);
+    }
+  }
+
+  return allocator;
+}
+
+//---------------------------------------------------------
+static void clear_allocation_objects_from_rocksdb(KeyValueDB *db, CephContext *cct, const std::string &path)
+{
+  dout(5) << "t->rmkeys_by_prefix(PREFIX_ALLOC_BITMAP)" << dendl;
+  KeyValueDB::Transaction t = db->get_transaction();
+  t->rmkeys_by_prefix(PREFIX_ALLOC_BITMAP);
+  db->submit_transaction_sync(t);
+}
+
+//---------------------------------------------------------
+void BlueStore::copy_allocator_content_to_fm(Allocator *allocator, FreelistManager *real_fm)
+{
+  unsigned max_txn = 1024;
+  dout(5) << "max_transaction_submit=" << max_txn << dendl;
+  uint64_t size = 0, idx = 0;
+  KeyValueDB::Transaction txn = db->get_transaction();
+  auto iterated_insert = [&](uint64_t offset, uint64_t length) {
+    size += length;
+    real_fm->release(offset, length, txn);
+    if ((++idx % max_txn) == 0) {
+      db->submit_transaction_sync(txn);
+      txn = db->get_transaction();
+    }
+  };
+  allocator->dump(iterated_insert);
+  if (idx % max_txn != 0) {
+    db->submit_transaction_sync(txn);
+  }
+  dout(5) << "size=" << size << ", num extents=" << idx  << dendl;
+}
+
+//---------------------------------------------------------
+Allocator* BlueStore::initialize_allocator_from_freelist(FreelistManager *real_fm)
+{
+  dout(5) << "real_fm->enumerate_next" << dendl;
+  Allocator* allocator2 = create_bitmap_allocator(bdev->get_size());
+  if (allocator2) {
+    dout(5) << "bitmap-allocator=" << allocator2 << dendl;
+  } else {
+    return nullptr;
+  }
+
+  uint64_t size2 = 0, idx2 = 0;
+  real_fm->enumerate_reset();
+  uint64_t offset, length;
+  while (real_fm->enumerate_next(db, &offset, &length)) {
+    allocator2->init_add_free(offset, length);
+    ++idx2;
+    size2 += length;
+  }
+  real_fm->enumerate_reset();
+
+  dout(5) << "size2=" << size2 << ", num2=" << idx2 << dendl;
+  return allocator2;
+}
+
+//---------------------------------------------------------
+// close the active fm and open it in a new mode like makefs()
+// but make sure to mark the full device space as allocated
+// later we will mark all exetents from the allocator as free
+int BlueStore::reset_fm_for_restore()
+{
+  dout(5) << "<<==>> fm->clear_null_manager()" << dendl;
+  fm->shutdown();
+  delete fm;
+  fm = nullptr;
+  freelist_type = "bitmap";
+  KeyValueDB::Transaction t = db->get_transaction();
+  // call _open_fm() with fm_restore set to TRUE
+  // this will mark the full device space as allocated (and not just the reserved space)
+  _open_fm(t, true, true);
+  if (fm == nullptr) {
+    derr << "Failed _open_fm()" << dendl;
+    return -1;
+  }
+  db->submit_transaction_sync(t);
+  ceph_assert(!fm->is_null_manager());
+  dout(5) << "fm was reactivated in full mode" << dendl;
+  return 0;
+}
+
+
+//---------------------------------------------------------
+// create a temp allocator filled with allocation state from the fm
+// and compare it to the base allocator passed in
+int BlueStore::verify_rocksdb_allocations(Allocator *allocator)
+{
+  dout(5) << "verify that alloc content is identical to FM" << dendl;
+  // initialize from freelist
+  Allocator* temp_allocator = initialize_allocator_from_freelist(fm);
+  if (temp_allocator == nullptr) {
+    return -1;
+  }
+
+  uint64_t insert_count = 0;
+  auto count_entries = [&](uint64_t extent_offset, uint64_t extent_length) {
+    insert_count++;
+  };
+  temp_allocator->dump(count_entries);
+  uint64_t memory_target = cct->_conf.get_val<Option::size_t>("osd_memory_target");
+  int ret = compare_allocators(allocator, temp_allocator, insert_count, memory_target);
+
+  delete temp_allocator;
+
+  if (ret == 0) {
+    dout(5) << "SUCCESS!!! compare(allocator, temp_allocator)" << dendl;
+    return 0;
+  } else {
+    derr << "**** FAILURE compare(allocator, temp_allocator)::ret=" << ret << dendl;
+    return -1;
+  }
+}
+
+//---------------------------------------------------------
+int BlueStore::db_cleanup(int ret)
+{
+  _shutdown_cache();
+  _close_db_and_around();
+  return ret;
+}
+
+//---------------------------------------------------------
+// convert back the system from null-allocator to using rocksdb to store allocation
+int BlueStore::push_allocation_to_rocksdb()
+{
+  if (cct->_conf->bluestore_allocation_from_file) {
+    derr << "cct->_conf->bluestore_allocation_from_file must be cleared first" << dendl;
+    derr << "please change default to false in ceph.conf file>" << dendl;
+    return -1;
+  }
+
+  dout(5) << "calling open_db_and_around() in read/write mode" << dendl;
+  int ret = _open_db_and_around(false);
+  if (ret < 0) {
+    return ret;
+  }
+
+  if (!fm->is_null_manager()) {
+    derr << "This is not a NULL-MANAGER -> nothing to do..." << dendl;
+    return db_cleanup(0);
+  }
+
+  // start by creating a clone copy of the shared-allocator
+  unique_ptr<Allocator> allocator(clone_allocator_without_bluefs(alloc));
+  if (!allocator) {
+    return db_cleanup(-1);
+  }
+
+  // remove all objects of PREFIX_ALLOC_BITMAP from RocksDB to guarantee a clean start
+  clear_allocation_objects_from_rocksdb(db, cct, path);
+
+  // then open fm in new mode with the full devie marked as alloctaed
+  if (reset_fm_for_restore() != 0) {
+    return db_cleanup(-1);
+  }
+
+  // push the free-space from the allocator (shared-alloc without bfs) to rocksdb
+  copy_allocator_content_to_fm(allocator.get(), fm);
+
+  // compare the allocator info with the info stored in the fm/rocksdb
+  if (verify_rocksdb_allocations(allocator.get()) == 0) {
+    // all is good -> we can commit to rocksdb allocator
+    commit_to_real_manager();
+  } else {
+    return db_cleanup(-1);
+  }
+
+  // can't be too paranoid :-)
+  dout(5) << "Running full scale verification..." << dendl;
+  // close db/fm/allocator and start fresh
+  db_cleanup(0);
+  dout(5) << "calling open_db_and_around() in read-only mode" << dendl;
+  ret = _open_db_and_around(true);
+  if (ret < 0) {
+    return db_cleanup(ret);
+  }
+  ceph_assert(!fm->is_null_manager());
+  ceph_assert(verify_rocksdb_allocations(allocator.get()) == 0);
+
+  return db_cleanup(ret);
+}
+
+#endif // CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
+
+//-------------------------------------------------------------------------------------
+static int commit_freelist_type(KeyValueDB *db, const std::string& freelist_type, CephContext *cct, const std::string &path)
+{
+  // When freelist_type to "bitmap" we will store allocation in RocksDB
+  // When allocation-info is stored in a single file we set freelist_type to "null"
+  // This will direct the startup code to read allocation from file and not RocksDB
+  KeyValueDB::Transaction t = db->get_transaction();
+  if (t == nullptr) {
+    derr << "db->get_transaction() failed!!!" << dendl;
+    return -1;
+  }
+
+  bufferlist bl;
+  bl.append(freelist_type);
+  t->set(PREFIX_SUPER, "freelist_type", bl);
+
+  int ret = db->submit_transaction_sync(t);
+  if (ret != 0) {
+    derr << "Failed db->submit_transaction_sync(t)" << dendl;
+  }
+  return ret;
+}
+
+//-------------------------------------------------------------------------------------
+int BlueStore::commit_to_null_manager()
+{
+  dout(5) << "Set FreelistManager to NULL FM..." << dendl;
+  fm->set_null_manager();
+  freelist_type = "null";
+#if 1
+  return commit_freelist_type(db, freelist_type, cct, path);
+#else
+  // should check how long this step take on a big configuration as deletes are expensive
+  if (commit_freelist_type(db, freelist_type, cct, path) == 0) {
+    // remove all objects of PREFIX_ALLOC_BITMAP from RocksDB to guarantee a clean start
+    clear_allocation_objects_from_rocksdb(db, cct, path);
+  }
+#endif
+}
+
+
+//-------------------------------------------------------------------------------------
+int BlueStore::commit_to_real_manager()
+{
+  dout(5) << "Set FreelistManager to Real FM..." << dendl;
+  ceph_assert(!fm->is_null_manager());
+  freelist_type = "bitmap";
+  int ret = commit_freelist_type(db, freelist_type, cct, path);
+  if (ret == 0) {
+    //remove the allocation_file
+    invalidate_allocation_file_on_bluefs();
+    ret = bluefs->unlink(allocator_dir, allocator_file);
+    bluefs->sync_metadata(false);
+    if (ret == 0) {
+      dout(5) << "Remove Allocation File successfully" << dendl;
+    }
+    else {
+      derr << "Remove Allocation File ret_code=" << ret << dendl;
+    }
+  }
+
+  return ret;
+}
+
+//================================================================================================================
+//================================================================================================================