]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/BlueStore.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / os / bluestore / BlueStore.cc
index 3239a825846d8bf65e7598d78d7f8fbb9c56749a..bfc6c71f4e607960ceb949aae7e9a6d9fe563e62 100644 (file)
@@ -21,6 +21,8 @@
 
 #include <boost/container/flat_set.hpp>
 #include <boost/algorithm/string.hpp>
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_real.hpp>
 
 #include "include/cpp-btree/btree_set.h"
 
@@ -379,6 +381,7 @@ static const char *_key_decode_prefix(const char *p, ghobject_t *oid)
   return p;
 }
 
+
 #define ENCODED_KEY_PREFIX_LEN (1 + 8 + 4)
 
 static int _get_key_object(const char *p, ghobject_t *oid)
@@ -1108,12 +1111,11 @@ struct LruOnodeCacheShard : public BlueStore::OnodeCacheShard {
 
   void _add(BlueStore::Onode* o, int level) override
   {
-    if (o->put_cache()) {
+    o->set_cached();
+    if (o->pin_nref == 1) {
       (level > 0) ? lru.push_front(*o) : lru.push_back(*o);
       o->cache_age_bin = age_bins.front();
       *(o->cache_age_bin) += 1;
-    } else {
-      ++num_pinned;
     }
     ++num; // we count both pinned and unpinned entries
     dout(20) << __func__ << " " << this << " " << o->oid << " added, num="
@@ -1121,86 +1123,100 @@ struct LruOnodeCacheShard : public BlueStore::OnodeCacheShard {
   }
   void _rm(BlueStore::Onode* o) override
   {
-    if (o->pop_cache()) {
+    o->clear_cached();
+    if (o->lru_item.is_linked()) {
       *(o->cache_age_bin) -= 1;
       lru.erase(lru.iterator_to(*o));
-    } else {
-      ceph_assert(num_pinned);
-      --num_pinned;
     }
     ceph_assert(num);
     --num;
     dout(20) << __func__ << " " << this << " " << " " << o->oid << " removed, num=" << num << dendl;
   }
-  void _pin(BlueStore::Onode* o) override
-  {
-    *(o->cache_age_bin) -= 1;
-    lru.erase(lru.iterator_to(*o));
-    ++num_pinned;
-    dout(20) << __func__ << " " << this << " " << " " << " " << o->oid << " pinned" << dendl;
-  }
-  void _unpin(BlueStore::Onode* o) override
-  {
-    lru.push_front(*o);
-    o->cache_age_bin = age_bins.front();
-    *(o->cache_age_bin) += 1;
-    ceph_assert(num_pinned);
-    --num_pinned;
-    dout(20) << __func__ << " " << this << " " << " " << " " << o->oid << " unpinned" << dendl;
-  }
-  void _unpin_and_rm(BlueStore::Onode* o) override
+
+  void maybe_unpin(BlueStore::Onode* o) override
   {
-    o->pop_cache();
-    ceph_assert(num_pinned);
-    --num_pinned;
-    ceph_assert(num);
-    --num;
+    OnodeCacheShard* ocs = this;
+    ocs->lock.lock();
+    // It is possible that during waiting split_cache moved us to different OnodeCacheShard.
+    while (ocs != o->c->get_onode_cache()) {
+      ocs->lock.unlock();
+      ocs = o->c->get_onode_cache();
+      ocs->lock.lock();
+    }
+    if (o->is_cached() && o->pin_nref == 1) {
+      if(!o->lru_item.is_linked()) {
+        if (o->exists) {
+         lru.push_front(*o);
+         o->cache_age_bin = age_bins.front();
+         *(o->cache_age_bin) += 1;
+         dout(20) << __func__ << " " << this << " " << o->oid << " unpinned"
+                   << dendl;
+        } else {
+         ceph_assert(num);
+         --num;
+         o->clear_cached();
+         dout(20) << __func__ << " " << this << " " << o->oid << " removed"
+                   << dendl;
+          // remove will also decrement nref
+          o->c->onode_space._remove(o->oid);
+        }
+      } else if (o->exists) {
+        // move onode within LRU
+        lru.erase(lru.iterator_to(*o));
+        lru.push_front(*o);
+        if (o->cache_age_bin != age_bins.front()) {
+          *(o->cache_age_bin) -= 1;
+          o->cache_age_bin = age_bins.front();
+          *(o->cache_age_bin) += 1;
+        }
+        dout(20) << __func__ << " " << this << " " << o->oid << " touched"
+                 << dendl;
+      }
+    }
+    ocs->lock.unlock();
   }
+
   void _trim_to(uint64_t new_size) override
   {
     if (new_size >= lru.size()) {
       return; // don't even try
     } 
-    uint64_t n = lru.size() - new_size;
-    auto p = lru.end();
-    ceph_assert(p != lru.begin());
-    --p;
-    ceph_assert(num >= n);
-    num -= n;
-    while (n-- > 0) {
-      BlueStore::Onode *o = &*p;
+    uint64_t n = num - new_size; // note: we might get empty LRU
+                                 // before n == 0 due to pinned
+                                 // entries. And hence being unable
+                                 // to reach new_size target.
+    while (n-- > 0 && lru.size() > 0) {
+      BlueStore::Onode *o = &lru.back();
+      lru.pop_back();
+
       dout(20) << __func__ << "  rm " << o->oid << " "
-               << o->nref << " " << o->cached << " " << o->pinned << dendl;
-      if (p != lru.begin()) {
-        lru.erase(p--);
+               << o->nref << " " << o->cached << dendl;
+
+      *(o->cache_age_bin) -= 1;
+      if (o->pin_nref > 1) {
+        dout(20) << __func__ << " " << this << " " << " " << " " << o->oid << dendl;
       } else {
-        ceph_assert(n == 0);
-        lru.erase(p);
+       ceph_assert(num);
+        --num;
+        o->clear_cached();
+        o->c->onode_space._remove(o->oid);
       }
-      *(o->cache_age_bin) -= 1;
-      auto pinned = !o->pop_cache();
-      ceph_assert(!pinned);
-      o->c->onode_map._remove(o->oid);
     }
   }
-  void move_pinned(OnodeCacheShard *to, BlueStore::Onode *o) override
+  void _move_pinned(OnodeCacheShard *to, BlueStore::Onode *o) override
   {
     if (to == this) {
       return;
     }
-    ceph_assert(o->cached);
-    ceph_assert(o->pinned);
-    ceph_assert(num);
-    ceph_assert(num_pinned);
-    --num_pinned;
-    --num;
-    ++to->num_pinned;
-    ++to->num;
+    _rm(o);
+    ceph_assert(o->nref > 1);
+    to->_add(o, 0);
   }
   void add_stats(uint64_t *onodes, uint64_t *pinned_onodes) override
   {
+    std::lock_guard l(lock);
     *onodes += num;
-    *pinned_onodes += num_pinned;
+    *pinned_onodes += num - lru.size();
   }
 };
 
@@ -1921,19 +1937,19 @@ void BlueStore::BufferSpace::split(BufferCacheShard* cache, size_t pos, BlueStor
 #undef dout_prefix
 #define dout_prefix *_dout << "bluestore.OnodeSpace(" << this << " in " << cache << ") "
 
-BlueStore::OnodeRef BlueStore::OnodeSpace::add(const ghobject_t& oid,
+BlueStore::OnodeRef BlueStore::OnodeSpace::add_onode(const ghobject_t& oid,
   OnodeRef& o)
 {
   std::lock_guard l(cache->lock);
-  auto p = onode_map.find(oid);
-  if (p != onode_map.end()) {
+  // add entry or return existing one
+  auto p = onode_map.emplace(oid, o);
+  if (!p.second) {
     ldout(cache->cct, 30) << __func__ << " " << oid << " " << o
-                         << " raced, returning existing " << p->second
+                         << " raced, returning existing " << p.first->second
                          << dendl;
-    return p->second;
+    return p.first->second;
   }
   ldout(cache->cct, 20) << __func__ << " " << oid << " " << o << dendl;
-  onode_map[oid] = o;
   cache->_add(o.get(), 1);
   cache->_trim();
   return o;
@@ -1954,18 +1970,16 @@ BlueStore::OnodeRef BlueStore::OnodeSpace::lookup(const ghobject_t& oid)
     std::lock_guard l(cache->lock);
     ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
     if (p == onode_map.end()) {
-      cache->logger->inc(l_bluestore_onode_misses);
       ldout(cache->cct, 30) << __func__ << " " << oid << " miss" << dendl;
+      cache->logger->inc(l_bluestore_onode_misses);
     } else {
       ldout(cache->cct, 30) << __func__ << " " << oid << " hit " << p->second
                             << " " << p->second->nref
                             << " " << p->second->cached
-                            << " " << p->second->pinned
                            << dendl;
       // This will pin onode and implicitly touch the cache when Onode
       // eventually will become unpinned
       o = p->second;
-      ceph_assert(!o->cached || o->pinned);
 
       cache->logger->inc(l_bluestore_onode_hits);
     }
@@ -2021,7 +2035,6 @@ void BlueStore::OnodeSpace::rename(
   // This will pin 'o' and implicitly touch cache
   // when it will eventually become unpinned
   onode_map.insert(make_pair(new_oid, o));
-  ceph_assert(o->pinned);
 
   o->oid = new_oid;
   o->key = new_okey;
@@ -2047,7 +2060,6 @@ void BlueStore::OnodeSpace::dump(CephContext *cct)
     ldout(cct, LogLevelV) << i.first << " : " << i.second
       << " " << i.second->nref
       << " " << i.second->cached
-      << " " << i.second->pinned
       << dendl;
   }
 }
@@ -2388,11 +2400,11 @@ void BlueStore::Blob::split(Collection *coll, uint32_t blob_offset, Blob *r)
 
 #ifndef CACHE_BLOB_BL
 void BlueStore::Blob::decode(
-  Collection *coll,
   bufferptr::const_iterator& p,
   uint64_t struct_v,
   uint64_t* sbid,
-  bool include_ref_map)
+  bool include_ref_map,
+  Collection *coll)
 {
   denc(blob, p, struct_v);
   if (blob.is_shared()) {
@@ -2405,11 +2417,13 @@ void BlueStore::Blob::decode(
       used_in_blob.clear();
       bluestore_extent_ref_map_t legacy_ref_map;
       legacy_ref_map.decode(p);
-      for (auto r : legacy_ref_map.ref_map) {
-        get_ref(
-          coll,
-          r.first,
-          r.second.refs * r.second.length);
+      if (coll) {
+        for (auto r : legacy_ref_map.ref_map) {
+          get_ref(
+            coll,
+            r.first,
+            r.second.refs * r.second.length);
+        }
       }
     }
   }
@@ -2452,10 +2466,9 @@ BlueStore::OldExtent* BlueStore::OldExtent::create(CollectionRef c,
 #undef dout_context
 #define dout_context onode->c->store->cct
 
-BlueStore::ExtentMap::ExtentMap(Onode *o)
+BlueStore::ExtentMap::ExtentMap(Onode *o, size_t inline_shard_prealloc_size)
   : onode(o),
-    inline_bl(
-      o->c->store->cct->_conf->bluestore_extent_map_inline_shard_prealloc_size) {
+    inline_bl(inline_shard_prealloc_size) {
 }
 
 void BlueStore::ExtentMap::dump(Formatter* f) const
@@ -2908,9 +2921,9 @@ void BlueStore::ExtentMap::reshard(
     bool was_too_many_blobs_check = false;
     auto too_many_blobs_threshold =
       g_conf()->bluestore_debug_too_many_blobs_threshold;
-    auto& dumped_onodes = onode->c->onode_map.cache->dumped_onodes;
-    decltype(onode->c->onode_map.cache->dumped_onodes)::value_type* oid_slot = nullptr;
-    decltype(onode->c->onode_map.cache->dumped_onodes)::value_type* oldest_slot = nullptr;
+    auto& dumped_onodes = onode->c->onode_space.cache->dumped_onodes;
+    decltype(onode->c->onode_space.cache->dumped_onodes)::value_type* oid_slot = nullptr;
+    decltype(onode->c->onode_space.cache->dumped_onodes)::value_type* oldest_slot = nullptr;
 
     for (auto e = extent_map.lower_bound(dummy); e != extent_map.end(); ++e) {
       if (e->logical_offset >= needs_reshard_end) {
@@ -3126,80 +3139,144 @@ bool BlueStore::ExtentMap::encode_some(
   return false;
 }
 
-unsigned BlueStore::ExtentMap::decode_some(bufferlist& bl)
+/////////////////// BlueStore::ExtentMap::DecoderExtent ///////////
+void BlueStore::ExtentMap::ExtentDecoder::decode_extent(
+  Extent* le,
+  __u8 struct_v,
+  bptr_c_it_t& p,
+  Collection* c)
 {
-  /*
-  derr << __func__ << ":";
-  bl.hexdump(*_dout);
-  *_dout << dendl;
-  */
+  uint64_t blobid;
+  denc_varint(blobid, p);
+  if ((blobid & BLOBID_FLAG_CONTIGUOUS) == 0) {
+    uint64_t gap;
+    denc_varint_lowz(gap, p);
+    pos += gap;
+  }
+  le->logical_offset = pos;
+  if ((blobid & BLOBID_FLAG_ZEROOFFSET) == 0) {
+    denc_varint_lowz(le->blob_offset, p);
+  } else {
+    le->blob_offset = 0;
+  }
+  if ((blobid & BLOBID_FLAG_SAMELENGTH) == 0) {
+    denc_varint_lowz(prev_len, p);
+  }
+  le->length = prev_len;
+  if (blobid & BLOBID_FLAG_SPANNING) {
+    consume_blobid(le, true, blobid >> BLOBID_SHIFT_BITS);
+  } else {
+    blobid >>= BLOBID_SHIFT_BITS;
+    if (blobid) {
+      consume_blobid(le, false, blobid - 1);
+    } else {
+      Blob *b = new Blob();
+      uint64_t sbid = 0;
+      b->decode(p, struct_v, &sbid, false, c);
+      consume_blob(le, extent_pos, sbid, b);
+    }
+  }
+  pos += prev_len;
+  ++extent_pos;
+}
+
+unsigned BlueStore::ExtentMap::ExtentDecoder::decode_some(
+  const bufferlist& bl, Collection* c)
+{
+  __u8 struct_v;
+  uint32_t num;
 
   ceph_assert(bl.get_num_buffers() <= 1);
   auto p = bl.front().begin_deep();
-  __u8 struct_v;
   denc(struct_v, p);
   // Version 2 differs from v1 in blob's ref_map
   // serialization only. Hence there is no specific
   // handling at ExtentMap level below.
   ceph_assert(struct_v == 1 || struct_v == 2);
-
-  uint32_t num;
   denc_varint(num, p);
-  vector<BlobRef> blobs(num);
-  uint64_t pos = 0;
-  uint64_t prev_len = 0;
-  unsigned n = 0;
 
+  extent_pos = 0;
   while (!p.end()) {
-    Extent *le = new Extent();
-    uint64_t blobid;
-    denc_varint(blobid, p);
-    if ((blobid & BLOBID_FLAG_CONTIGUOUS) == 0) {
-      uint64_t gap;
-      denc_varint_lowz(gap, p);
-      pos += gap;
-    }
-    le->logical_offset = pos;
-    if ((blobid & BLOBID_FLAG_ZEROOFFSET) == 0) {
-      denc_varint_lowz(le->blob_offset, p);
-    } else {
-      le->blob_offset = 0;
-    }
-    if ((blobid & BLOBID_FLAG_SAMELENGTH) == 0) {
-      denc_varint_lowz(prev_len, p);
-    }
-    le->length = prev_len;
+    Extent* le = get_next_extent();
+    decode_extent(le, struct_v, p, c);
+    add_extent(le);
+  }
+  ceph_assert(extent_pos == num);
+  return num;
+}
 
-    if (blobid & BLOBID_FLAG_SPANNING) {
-      dout(30) << __func__ << "  getting spanning blob "
-              << (blobid >> BLOBID_SHIFT_BITS) << dendl;
-      le->assign_blob(get_spanning_blob(blobid >> BLOBID_SHIFT_BITS));
-    } else {
-      blobid >>= BLOBID_SHIFT_BITS;
-      if (blobid) {
-       le->assign_blob(blobs[blobid - 1]);
-       ceph_assert(le->blob);
-      } else {
-       Blob *b = new Blob();
-        uint64_t sbid = 0;
-        b->decode(onode->c, p, struct_v, &sbid, false);
-       blobs[n] = b;
-       onode->c->open_shared_blob(sbid, b);
-       le->assign_blob(b);
-      }
-      // we build ref_map dynamically for non-spanning blobs
-      le->blob->get_ref(
-       onode->c,
-       le->blob_offset,
-       le->length);
-    }
-    pos += prev_len;
-    ++n;
-    extent_map.insert(*le);
+void BlueStore::ExtentMap::ExtentDecoder::decode_spanning_blobs(
+  bptr_c_it_t& p, Collection* c)
+{
+  __u8 struct_v;
+  denc(struct_v, p);
+  // Version 2 differs from v1 in blob's ref_map
+  // serialization only. Hence there is no specific
+  // handling at ExtentMap level.
+  ceph_assert(struct_v == 1 || struct_v == 2);
+
+  unsigned n;
+  denc_varint(n, p);
+  while (n--) {
+    BlueStore::BlobRef b(new Blob());
+    denc_varint(b->id, p);
+    uint64_t sbid = 0;
+    b->decode(p, struct_v, &sbid, true, c);
+    consume_spanning_blob(sbid, b);
   }
+}
 
-  ceph_assert(n == num);
-  return num;
+/////////////////// BlueStore::ExtentMap::DecoderExtentFull ///////////
+void BlueStore::ExtentMap::ExtentDecoderFull::consume_blobid(
+  BlueStore::Extent* le, bool spanning, uint64_t blobid) {
+  ceph_assert(le);
+  if (spanning) {
+    le->assign_blob(extent_map.get_spanning_blob(blobid));
+  } else {
+    ceph_assert(blobid < blobs.size());
+    le->assign_blob(blobs[blobid]);
+    // we build ref_map dynamically for non-spanning blobs
+    le->blob->get_ref(
+      extent_map.onode->c,
+      le->blob_offset,
+      le->length);
+  }
+}
+
+void BlueStore::ExtentMap::ExtentDecoderFull::consume_blob(
+  BlueStore::Extent* le, uint64_t extent_no, uint64_t sbid, BlobRef b) {
+  ceph_assert(le);
+  blobs.resize(extent_no + 1);
+  blobs[extent_no] = b;
+  extent_map.onode->c->open_shared_blob(sbid, b);
+  le->assign_blob(b);
+  le->blob->get_ref(
+    extent_map.onode->c,
+    le->blob_offset,
+    le->length);
+}
+
+void BlueStore::ExtentMap::ExtentDecoderFull::consume_spanning_blob(
+  uint64_t sbid, BlueStore::BlobRef b) {
+  extent_map.spanning_blob_map[b->id] = b;
+  extent_map.onode->c->open_shared_blob(sbid, b);
+}
+
+BlueStore::Extent* BlueStore::ExtentMap::ExtentDecoderFull::get_next_extent()
+{
+  return new Extent();
+}
+
+void BlueStore::ExtentMap::ExtentDecoderFull::add_extent(BlueStore::Extent* le)
+{
+  extent_map.extent_map.insert(*le);
+}
+
+unsigned BlueStore::ExtentMap::decode_some(bufferlist& bl)
+{
+  ExtentDecoderFull edecoder(*this);
+  unsigned n = edecoder.decode_some(bl, onode->c);
+  return n;
 }
 
 void BlueStore::ExtentMap::bound_encode_spanning_blobs(size_t& p)
@@ -3235,28 +3312,6 @@ void BlueStore::ExtentMap::encode_spanning_blobs(
   }
 }
 
-void BlueStore::ExtentMap::decode_spanning_blobs(
-  bufferptr::const_iterator& p)
-{
-  __u8 struct_v;
-  denc(struct_v, p);
-  // Version 2 differs from v1 in blob's ref_map
-  // serialization only. Hence there is no specific
-  // handling at ExtentMap level.
-  ceph_assert(struct_v == 1 || struct_v == 2);
-
-  unsigned n;
-  denc_varint(n, p);
-  while (n--) {
-    BlobRef b(new Blob());
-    denc_varint(b->id, p);
-    spanning_blob_map[b->id] = b;
-    uint64_t sbid = 0;
-    b->decode(onode->c, p, struct_v, &sbid, true);
-    onode->c->open_shared_blob(sbid, b);
-  }
-}
-
 void BlueStore::ExtentMap::init_shards(bool loaded, bool dirty)
 {
   shards.resize(onode->onode.extent_map_shards.size());
@@ -3657,81 +3712,63 @@ void BlueStore::Onode::calc_omap_tail(
   out->push_back('~');
 }
 
-void BlueStore::Onode::get() {
-  if (++nref >= 2 && !pinned) {
-    OnodeCacheShard* ocs = c->get_onode_cache();
-    ocs->lock.lock();
-    // It is possible that during waiting split_cache moved us to different OnodeCacheShard.
-    while (ocs != c->get_onode_cache()) {
-      ocs->lock.unlock();
-      ocs = c->get_onode_cache();
-      ocs->lock.lock();
-    }
-    bool was_pinned = pinned;
-    pinned = nref >= 2;
-    bool r = !was_pinned && pinned;
-    if (cached && r) {
-      ocs->_pin(this);
-    }
-    ocs->lock.unlock();
-  }
+void BlueStore::Onode::get()
+{
+  ++nref;
+  ++pin_nref;
 }
-void BlueStore::Onode::put() {
-  ++put_nref;
-  int n = --nref;
-  if (n == 1) {
-    OnodeCacheShard* ocs = c->get_onode_cache();
-    ocs->lock.lock();
-    // It is possible that during waiting split_cache moved us to different OnodeCacheShard.
-    while (ocs != c->get_onode_cache()) {
-      ocs->lock.unlock();
-      ocs = c->get_onode_cache();
-      ocs->lock.lock();
-    }
-    bool need_unpin = pinned;
-    pinned = pinned && nref >= 2;
-    need_unpin = need_unpin && !pinned;
-    if (cached && need_unpin) {
-      if (exists) {
-        ocs->_unpin(this);
-      } else {
-        ocs->_unpin_and_rm(this);
-        // remove will also decrement nref
-        c->onode_map._remove(oid);
-      }
-    }
-    ocs->lock.unlock();
+void BlueStore::Onode::put()
+{
+  if (--pin_nref == 1) {
+    c->get_onode_cache()->maybe_unpin(this);
   }
-  auto pn = --put_nref;
-  if (nref == 0 && pn == 0) {
+  if (--nref == 0) {
     delete this;
   }
 }
 
-BlueStore::Onode* BlueStore::Onode::decode(
-  CollectionRef c,
-  const ghobject_t& oid,
-  const string& key,
-  const bufferlist& v)
+void BlueStore::Onode::decode_raw(
+  BlueStore::Onode* on,
+  const bufferlist& v,
+  BlueStore::ExtentMap::ExtentDecoder& edecoder)
 {
-  Onode* on = new Onode(c.get(), oid, key);
   on->exists = true;
   auto p = v.front().begin_deep();
   on->onode.decode(p);
-  for (auto& i : on->onode.attrs) {
-    i.second.reassign_to_mempool(mempool::mempool_bluestore_cache_meta);
-  }
 
   // initialize extent_map
-  on->extent_map.decode_spanning_blobs(p);
+  edecoder.decode_spanning_blobs(p, on->c);
   if (on->onode.extent_map_shards.empty()) {
     denc(on->extent_map.inline_bl, p);
-    on->extent_map.decode_some(on->extent_map.inline_bl);
-    on->extent_map.inline_bl.reassign_to_mempool(
-      mempool::mempool_bluestore_cache_data);
+    edecoder.decode_some(on->extent_map.inline_bl, on->c);
   }
-  else {
-    on->extent_map.init_shards(false, false);
+}
+
+BlueStore::Onode* BlueStore::Onode::create_decode(
+  CollectionRef c,
+  const ghobject_t& oid,
+  const string& key,
+  const bufferlist& v,
+  bool allow_empty)
+{
+  ceph_assert(v.length() || allow_empty);
+  Onode* on = new Onode(c.get(), oid, key);
+
+  if (v.length()) {
+    ExtentMap::ExtentDecoderFull edecoder(on->extent_map);
+    decode_raw(on, v, edecoder);
+
+    for (auto& i : on->onode.attrs) {
+      i.second.reassign_to_mempool(mempool::mempool_bluestore_cache_meta);
+    }
+
+    // initialize extent_map
+    if (on->onode.extent_map_shards.empty()) {
+      on->extent_map.inline_bl.reassign_to_mempool(
+        mempool::mempool_bluestore_cache_data);
+    } else {
+      on->extent_map.init_shards(false, false);
+    }
   }
   return on;
 }
@@ -3930,7 +3967,7 @@ BlueStore::Collection::Collection(BlueStore *store_, OnodeCacheShard *oc, Buffer
     store(store_),
     cache(bc),
     exists(true),
-    onode_map(oc),
+    onode_space(oc),
     commit_queue(nullptr)
 {
 }
@@ -4050,7 +4087,7 @@ BlueStore::OnodeRef BlueStore::Collection::get_onode(
     }
   }
 
-  OnodeRef o = onode_map.lookup(oid);
+  OnodeRef o = onode_space.lookup(oid);
   if (o)
     return o;
 
@@ -4071,16 +4108,14 @@ BlueStore::OnodeRef BlueStore::Collection::get_onode(
     ceph_assert(r == -ENOENT);
     if (!create)
       return OnodeRef();
-
-    // new object, new onode
-    on = new Onode(this, oid, key);
   } else {
-    // loaded
     ceph_assert(r >= 0);
-    on = Onode::decode(this, oid, key, v);
   }
+
+  // new object, load onode if available
+  on = Onode::create_decode(this, oid, key, v, true);
   o.reset(on);
-  return onode_map.add(oid, o);
+  return onode_space.add_onode(oid, o);
 }
 
 void BlueStore::Collection::split_cache(
@@ -4103,8 +4138,8 @@ void BlueStore::Collection::split_cache(
   bool is_pg = dest->cid.is_pg(&destpg);
   ceph_assert(is_pg);
 
-  auto p = onode_map.onode_map.begin();
-  while (p != onode_map.onode_map.end()) {
+  auto p = onode_space.onode_map.begin();
+  while (p != onode_space.onode_map.end()) {
     OnodeRef o = p->second;
     if (!p->second->oid.match(destbits, destpg.pgid.ps())) {
       // onode does not belong to this child
@@ -4115,15 +4150,13 @@ void BlueStore::Collection::split_cache(
       ldout(store->cct, 20) << __func__ << " moving " << o << " " << o->oid
                            << dendl;
 
-      // ensuring that nref is always >= 2 and hence onode is pinned and 
-      // physically out of cache during the transition
+      // ensuring that nref is always >= 2 and hence onode is pinned
       OnodeRef o_pin = o;
-      ceph_assert(o->pinned);
 
-      p = onode_map.onode_map.erase(p);
-      dest->onode_map.onode_map[o->oid] = o;
+      p = onode_space.onode_map.erase(p);
+      dest->onode_space.onode_map[o->oid] = o;
       if (o->cached) {
-        get_onode_cache()->move_pinned(dest->get_onode_cache(), o.get());
+        get_onode_cache()->_move_pinned(dest->get_onode_cache(), o.get());
       }
       o->c = dest;
 
@@ -4412,7 +4445,7 @@ void BlueStore::MempoolThread::_update_cache_settings()
 #define dout_prefix *_dout << "bluestore.OmapIteratorImpl(" << this << ") "
 
 BlueStore::OmapIteratorImpl::OmapIteratorImpl(
-  CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
+  CollectionRef c, OnodeRef& o, KeyValueDB::Iterator it)
   : c(c), o(o), it(it)
 {
   std::shared_lock l(c->lock);
@@ -5553,7 +5586,10 @@ void BlueStore::_close_bdev()
   bdev = NULL;
 }
 
-int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only, bool fm_restore)
+int BlueStore::_open_fm(KeyValueDB::Transaction t,
+                        bool read_only,
+                        bool db_avail,
+                        bool fm_restore)
 {
   int r;
 
@@ -5564,19 +5600,20 @@ int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only, bool fm_resto
   // fm restore must pass in a valid transaction
   ceph_assert(!fm_restore || (t != nullptr));
 
+  // when function is called in repair mode (to_repair=true) we skip db->open()/create()
+  bool can_have_null_fm = !is_db_rotational() &&
+                          !read_only &&
+                          db_avail &&
+                          cct->_conf->bluestore_allocation_from_file &&
+                          !bdev->is_smr();
+
   // When allocation-info is stored in a single file we set freelist_type to "null"
-  bool set_null_freemap = false;
-  if (freelist_type == "null") {
-    // use BitmapFreelistManager with the null option to stop allocations from going to RocksDB
-    // we will store the allocation info in a single file during umount()
-    freelist_type = "bitmap";
-    set_null_freemap = true;
+  if (can_have_null_fm) {
+    freelist_type = "null";
+    need_to_destage_allocation_file = true;
   }
   fm = FreelistManager::create(cct, freelist_type, PREFIX_ALLOC);
   ceph_assert(fm);
-  if (set_null_freemap) {
-    fm->set_null_manager();
-  }
   if (t) {
     // create mode. initialize freespace
     dout(20) << __func__ << " initializing freespace" << dendl;
@@ -5590,16 +5627,12 @@ int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only, bool fm_resto
     ceph_assert(cct->_conf->bdev_block_size <= min_alloc_size);
 
     uint64_t alloc_size = min_alloc_size;
-#ifdef HAVE_LIBZBD
-    if (bdev->is_smr()) {
-      if (freelist_type != "zoned") {
-       derr << "SMR device but freelist_type = " << freelist_type << " (not zoned)"
-            << dendl;
-       return -EINVAL;
-      }
-    } else
-#endif
-    if (freelist_type == "zoned") {
+    if (bdev->is_smr() && freelist_type != "zoned") {
+      derr << "SMR device but freelist_type = " << freelist_type << " (not zoned)"
+           << dendl;
+      return -EINVAL;
+    }
+    if (!bdev->is_smr() && freelist_type == "zoned") {
       derr << "non-SMR device (or SMR support not built-in) but freelist_type = zoned"
           << dendl;
       return -EINVAL;
@@ -5668,12 +5701,15 @@ int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only, bool fm_resto
     r = _write_out_fm_meta(0);
     ceph_assert(r == 0);
   } else {
+    if (can_have_null_fm) {
+      commit_to_null_manager();
+    }
     r = fm->init(db, read_only,
       [&](const std::string& key, std::string* result) {
         return read_meta(key, result);
     });
     if (r < 0) {
-      derr << __func__ << " freelist init failed: " << cpp_strerror(r) << dendl;
+      derr << __func__ << " failed: " << cpp_strerror(r) << dendl;
       delete fm;
       fm = NULL;
       return r;
@@ -5762,12 +5798,12 @@ int BlueStore::_create_alloc()
       delete alloc;
       return -EINVAL;
     }
-    shared_alloc.set(a);
+    shared_alloc.set(a, alloc_size);
   } else
 #endif
   {
     // BlueFS will share the same allocator
-    shared_alloc.set(alloc);
+    shared_alloc.set(alloc, alloc_size);
   }
 
   return 0;
@@ -5867,7 +5903,6 @@ int BlueStore::_init_alloc(std::map<uint64_t, uint64_t> *zone_adjustments)
       derr << __func__ << "::NCB::Please change the value of bluestore_allocation_from_file to TRUE in your ceph.conf file" << dendl;
       return -ENOTSUP; // Operation not supported
     }
-
     if (restore_allocator(alloc, &num, &bytes) == 0) {
       dout(5) << __func__ << "::NCB::restore_allocator() completed successfully alloc=" << alloc << dendl;
     } else {
@@ -5897,21 +5932,33 @@ int BlueStore::_init_alloc(std::map<uint64_t, uint64_t> *zone_adjustments)
 
 void BlueStore::_post_init_alloc(const std::map<uint64_t, uint64_t>& zone_adjustments)
 {
+  int r = 0;
 #ifdef HAVE_LIBZBD
-  assert(bdev->is_smr());
-  dout(1) << __func__ << " adjusting freelist based on device write pointers" << dendl;
-  auto f = dynamic_cast<ZonedFreelistManager*>(fm);
-  ceph_assert(f);
-  KeyValueDB::Transaction t = db->get_transaction();
-  for (auto& i : zone_adjustments) {
-    // allocate AND release since this gap is now dead space
-    // note that the offset is imprecise, but only need to select the zone
-    f->allocate(i.first, i.second, t);
-    f->release(i.first, i.second, t);
-  }
-  int r = db->submit_transaction_sync(t);
-  ceph_assert(r == 0);
+  if (bdev->is_smr()) {
+    if (zone_adjustments.empty()) {
+      return;
+    }
+    dout(1) << __func__ << " adjusting freelist based on device write pointers" << dendl;
+    auto f = dynamic_cast<ZonedFreelistManager*>(fm);
+    ceph_assert(f);
+    KeyValueDB::Transaction t = db->get_transaction();
+    for (auto& i : zone_adjustments) {
+      // allocate AND release since this gap is now dead space
+      // note that the offset is imprecise, but only need to select the zone
+      f->allocate(i.first, i.second, t);
+      f->release(i.first, i.second, t);
+    }
+    r = db->submit_transaction_sync(t);
+  } else
 #endif
+  if (fm->is_null_manager()) {
+    // Now that we load the allocation map we need to invalidate the file as new allocation won't be reflected
+    // Changes to the allocation map (alloc/release) are not updated inline and will only be stored on umount()
+    // This means that we should not use the existing file on failure case (unplanned shutdown) and must resort
+    //  to recovery from RocksDB::ONodes
+    r = invalidate_allocation_file_on_bluefs();
+  }
+  ceph_assert(r >= 0);
 }
 
 void BlueStore::_close_alloc()
@@ -6079,6 +6126,12 @@ bool BlueStore::_use_rotational_settings()
   return bdev->is_rotational();
 }
 
+bool BlueStore::is_statfs_recoverable() const
+{
+  // abuse fm for now
+  return has_null_manager();
+}
+
 bool BlueStore::test_mount_in_use()
 {
   // most error conditions mean the mount is not in use (e.g., because
@@ -6355,7 +6408,7 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
     goto out_db;
   }
 
-  r = _open_fm(nullptr, true);
+  r = _open_fm(nullptr, true, false);
   if (r < 0)
     goto out_db;
 
@@ -6375,8 +6428,7 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
     goto out_alloc;
   }
 
-  if (!read_only && !zone_adjustments.empty()) {
-    // for SMR devices that have freelist mismatch with device write pointers
+  if (!read_only) {
     _post_init_alloc(zone_adjustments);
   }
 
@@ -6428,6 +6480,11 @@ void BlueStore::_close_db_and_around()
   if (db) {
     _close_db();
   }
+  _close_around_db();
+}
+
+void BlueStore::_close_around_db()
+{
   if (bluefs) {
     _close_bluefs();
   }
@@ -6452,7 +6509,11 @@ int BlueStore::open_db_environment(KeyValueDB **pdb, bool to_repair)
 
 int BlueStore::close_db_environment()
 {
-  _close_db_and_around();
+  if (db) {
+    delete db;
+    db = nullptr;
+  }
+  _close_around_db();
   return 0;
 }
 
@@ -6668,20 +6729,64 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
   return 0;
 }
 
-void BlueStore::_close_db_leave_bluefs()
+void BlueStore::_close_db()
 {
+  dout(10) << __func__ << ":read_only=" << db_was_opened_read_only
+           << " fm=" << fm
+           << " destage_alloc_file=" << need_to_destage_allocation_file
+           << " per_pool=" << per_pool_stat_collection
+           << " pool stats=" << osd_pools.size()
+           << dendl;
+  bool do_destage = !db_was_opened_read_only && need_to_destage_allocation_file;
+  if (do_destage && is_statfs_recoverable()) {
+    auto t = db->get_transaction();
+    store_statfs_t s;
+    if (per_pool_stat_collection) {
+      KeyValueDB::Iterator it = db->get_iterator(PREFIX_STAT, KeyValueDB::ITERATOR_NOCACHE);
+      uint64_t pool_id;
+      for (it->upper_bound(string()); it->valid(); it->next()) {
+        int r = get_key_pool_stat(it->key(), &pool_id);
+        if (r >= 0) {
+          dout(10) << __func__ << " wiping statfs for: " << pool_id << dendl;
+        } else {
+          derr << __func__ << " wiping invalid statfs key: " << it->key() << dendl;
+        }
+        t->rmkey(PREFIX_STAT, it->key());
+      }
+
+      std::lock_guard l(vstatfs_lock);
+      for(auto &p : osd_pools) {
+        string key;
+        get_pool_stat_key(p.first, &key);
+        bufferlist bl;
+        if (!p.second.is_empty()) {
+          p.second.encode(bl);
+          p.second.publish(&s);
+          t->set(PREFIX_STAT, key, bl);
+          dout(10) << __func__ << " persisting: "
+                   << p.first << "->"  << s
+                   << dendl;
+        }
+      }
+    } else {
+      bufferlist bl;
+      {
+        std::lock_guard l(vstatfs_lock);
+        vstatfs.encode(bl);
+        vstatfs.publish(&s);
+      }
+      t->set(PREFIX_STAT, BLUESTORE_GLOBAL_STATFS_KEY, bl);
+      dout(10) << __func__ << "persisting: " << s << dendl;
+    }
+    int r = db->submit_transaction_sync(t);
+    dout(10) << __func__ << " statfs persisted." << dendl;
+    ceph_assert(r >= 0);
+  }
   ceph_assert(db);
   delete db;
   db = nullptr;
-}
 
-void BlueStore::_close_db()
-{
-  dout(10) << __func__ << ":read_only=" << db_was_opened_read_only << " fm=" << fm << " destage_alloc_file=" << need_to_destage_allocation_file << dendl;
-  _close_db_leave_bluefs();
-
-  if (need_to_destage_allocation_file) {
-    ceph_assert(fm && fm->is_null_manager());
+  if (do_destage && fm && fm->is_null_manager()) {
     int ret = store_allocator(alloc);
     if (ret != 0) {
       derr << __func__ << "::NCB::store_allocator() failed (continue with bitmapFreelistManager)" << dendl;
@@ -6828,15 +6933,17 @@ void BlueStore::_open_statfs()
         st.decode(p);
         vstatfs += st;
 
-        dout(30) << __func__ << " pool " << pool_id
-                << " statfs " << st << dendl;
+        dout(10) << __func__ << " pool " << std::hex << pool_id
+                << " statfs(hex) " << st
+                << std::dec << dendl;
       } catch (ceph::buffer::error& e) {
         derr << __func__ << " failed to decode pool stats, key:"
              << pretty_binary_string(it->key()) << dendl;
       }   
     }
   }
-  dout(30) << __func__ << " statfs " << vstatfs << dendl;
+  dout(10) << __func__ << " statfs " << std::hex
+           << vstatfs  << std::dec << dendl;
 
 }
 
@@ -7104,7 +7211,7 @@ int BlueStore::mkfs()
 
   {
     KeyValueDB::Transaction t = db->get_transaction();
-    r = _open_fm(t, true);
+    r = _open_fm(t, false, true);
     if (r < 0)
       goto out_close_db;
     {
@@ -7578,7 +7685,7 @@ void BlueStore::set_cache_shards(unsigned num)
 }
 
 //---------------------------------------------
-bool BlueStore::has_null_manager()
+bool BlueStore::has_null_manager() const
 {
   return (fm && fm->is_null_manager());
 }
@@ -7666,11 +7773,6 @@ int BlueStore::_mount()
     dout(1) << __func__ << " quick-fix on mount" << dendl;
     _fsck_on_open(FSCK_SHALLOW, true);
 
-    //reread statfs
-    //FIXME minor: replace with actual open/close?
-    _open_statfs();
-    _check_legacy_statfs_alert();
-
     //set again as hopefully it has been fixed
     if (was_per_pool_omap != OMAP_PER_PG) {
       _set_per_pool_omap();
@@ -7810,110 +7912,95 @@ int BlueStore::_fsck_check_extents(
   return errors;
 }
 
-void BlueStore::_fsck_check_pool_statfs(
-  BlueStore::per_pool_statfs& expected_pool_statfs,
+void BlueStore::_fsck_check_statfs(
+  const store_statfs_t& expected_statfs,
+  const per_pool_statfs& expected_pool_statfs,
   int64_t& errors,
   int64_t& warnings,
   BlueStoreRepairer* repairer)
 {
-  auto it = db->get_iterator(PREFIX_STAT, KeyValueDB::ITERATOR_NOCACHE);
-  if (it) {
-    for (it->lower_bound(string()); it->valid(); it->next()) {
-      string key = it->key();
-      if (key == BLUESTORE_GLOBAL_STATFS_KEY) {
-        if (repairer) {
-         ++errors;
-         repairer->remove_key(db, PREFIX_STAT, BLUESTORE_GLOBAL_STATFS_KEY);
-         derr << "fsck error: " << "legacy statfs record found, removing"
-              << dendl;
-       }
-       continue;
-      }
-      uint64_t pool_id;
-      if (get_key_pool_stat(key, &pool_id) < 0) {
-       derr << "fsck error: bad key " << key
-            << "in statfs namespece" << dendl;
-       if (repairer) {
-         repairer->remove_key(db, PREFIX_STAT, key);
-       }
-       ++errors;
-       continue;
-      }
-
-      volatile_statfs vstatfs;
-      bufferlist bl = it->value();
-      auto blp = bl.cbegin();
-      try {
-       vstatfs.decode(blp);
-      } catch (ceph::buffer::error& e) {
-        derr << "fsck error: failed to decode Pool StatFS record"
-            << pretty_binary_string(key) << dendl;
+  string key;
+  store_statfs_t actual_statfs;
+  store_statfs_t s;
+  {
+    // make a copy
+    per_pool_statfs my_expected_pool_statfs(expected_pool_statfs);
+    auto op = osd_pools.begin();
+    while (op != osd_pools.end()) {
+      get_pool_stat_key(op->first, &key);
+      op->second.publish(&s);
+      auto it_expected = my_expected_pool_statfs.find(op->first);
+      if (it_expected == my_expected_pool_statfs.end()) {
+        auto op0 = op++;
+        if (op0->second.is_empty()) {
+          // It's OK to lack relevant empty statfs record
+          continue;
+        }
+        derr << __func__ << "::fsck error: " << std::hex
+             << "pool " << op0->first << " has got no statfs to match against: "
+             << s
+             << std::dec << dendl;
+        ++errors;
         if (repairer) {
-         dout(20) << __func__ << " undecodable Pool StatFS record, key:'"
-                  << pretty_binary_string(key)
-                  << "', removing" << dendl;
+          osd_pools.erase(op0);
           repairer->remove_key(db, PREFIX_STAT, key);
         }
-        ++errors;
-       vstatfs.reset();
-      }
-      auto stat_it = expected_pool_statfs.find(pool_id);
-      if (stat_it == expected_pool_statfs.end()) {
-        if (vstatfs.is_empty()) {
-          // we don't consider that as an error since empty pool statfs
-          // are left in DB for now
-         dout(20) << "fsck inf: found empty stray Pool StatFS record for pool id 0x"
-                   << std::hex << pool_id << std::dec << dendl;
+      } else {
+        if (!(s == it_expected->second)) {
+          derr << "fsck error: actual " << s
+              << " != expected " << it_expected->second
+              << " for pool "
+              << std::hex << op->first << std::dec << dendl;
+         ++errors;
          if (repairer) {
-           // but we need to increment error count in case of repair
-           // to have proper counters at the end
-           // (as repairer increments recovery counter anyway).
-           ++errors;
+           // repair in-memory in a hope this would be flushed properly on shutdown
+           s = it_expected->second;
+           op->second = it_expected->second;
+           repairer->fix_statfs(db, key, it_expected->second);
          }
-        } else {
-         derr << "fsck error: found stray Pool StatFS record for pool id 0x"
-              << std::hex << pool_id << std::dec << dendl;
-         ++errors;
        }
-       if (repairer) {
-         repairer->remove_key(db, PREFIX_STAT, key);
-       }
-       continue;
-      }
-      store_statfs_t statfs;
-      vstatfs.publish(&statfs);
-      if (!(stat_it->second == statfs)) {
-        derr << "fsck error: actual " << statfs
-            << " != expected " << stat_it->second
-            << " for pool "
-            << std::hex << pool_id << std::dec << dendl;
-       if (repairer) {
-         repairer->fix_statfs(db, key, stat_it->second);
-       }
-        ++errors;
+        actual_statfs.add(s);
+        my_expected_pool_statfs.erase(it_expected);
+        ++op;
       }
-      expected_pool_statfs.erase(stat_it);
-    }
-  } // if (it)
-  for (auto& s : expected_pool_statfs) {
-    if (s.second.is_zero()) {
-      // we might lack empty statfs recs in DB
-      continue;
     }
-    derr << "fsck error: missing Pool StatFS record for pool "
-        << std::hex << s.first << std::dec << dendl;
-    if (repairer) {
-      string key;
-      get_pool_stat_key(s.first, &key);
-      repairer->fix_statfs(db, key, s.second);
+    // check stats that lack matching entities in osd_pools
+    for (auto &p : my_expected_pool_statfs) {
+      if (p.second.is_zero()) {
+        // It's OK to lack relevant empty statfs record
+        continue;
+      }
+      get_pool_stat_key(p.first, &key);
+      derr << __func__ << "::fsck error: " << std::hex
+           << "pool " << p.first << " has got no actual statfs: "
+           << std::dec << p.second
+           << dendl;
+      ++errors;
+      if (repairer) {
+       osd_pools[p.first] = p.second;
+        repairer->fix_statfs(db, key, p.second);
+        actual_statfs.add(p.second);
+      }
+    }
+  }
+  // process global statfs
+  if (repairer) {
+    if (!per_pool_stat_collection) {
+      // by virtue of running this method, we correct the top-level
+      // error of having global stats
+      repairer->remove_key(db, PREFIX_STAT, BLUESTORE_GLOBAL_STATFS_KEY);
+      per_pool_stat_collection = true;
+    }
+    vstatfs = actual_statfs;
+    dout(20) << __func__ << " setting vstatfs to " << actual_statfs << dendl;
+  } else if (!per_pool_stat_collection) {
+    // check global stats only if fscking (not repairing) w/o per-pool stats
+    vstatfs.publish(&s);
+    if (!(s == expected_statfs)) {
+      derr << "fsck error: actual " << s
+           << " != expected " << expected_statfs << dendl;
+      ++errors;
     }
-    ++errors;
-  }
-  if (!per_pool_stat_collection &&
-      repairer) {
-    // by virtue of running this method, we correct the top-level
-    // error of having global stats
-    repairer->inc_repaired();
   }
 }
 
@@ -7973,7 +8060,7 @@ void BlueStore::_fsck_repair_shared_blobs(
                   << dendl;
 
           OnodeRef o;
-          o.reset(Onode::decode(c, oid, it->key(), it->value()));
+          o.reset(Onode::create_decode(c, oid, it->key(), it->value()));
           o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
 
           _dump_onode<30>(cct, *o);
@@ -8112,7 +8199,7 @@ BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
 
   dout(10) << __func__ << "  " << oid << dendl;
   OnodeRef o;
-  o.reset(Onode::decode(c, oid, key, value));
+  o.reset(Onode::create_decode(c, oid, key, value));
   ++num_objects;
 
   num_spanning_blobs += o->extent_map.spanning_blob_map.size();
@@ -8304,6 +8391,8 @@ BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
         if (!broken) {
           first_broken = it1->second;
           ++errors;
+          derr << "fsck error:" << " stray spanning blob found:" << it1->first
+               << dendl;
         }
         broken++;
         if (repairer) {
@@ -9120,7 +9209,7 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
 
   mempool_dynamic_bitset used_blocks, bluefs_used_blocks;
   KeyValueDB::Iterator it;
-  store_statfs_t expected_store_statfs, actual_statfs;
+  store_statfs_t expected_store_statfs;
   per_pool_statfs expected_pool_statfs;
 
   sb_info_space_efficient_map_t sb_info;
@@ -9212,15 +9301,6 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
         << dendl;
   }
 
-  // get expected statfs; reset unaffected fields to be able to compare
-  // structs
-  statfs(&actual_statfs);
-  actual_statfs.total = 0;
-  actual_statfs.internally_reserved = 0;
-  actual_statfs.available = 0;
-  actual_statfs.internal_metadata = 0;
-  actual_statfs.omap_allocated = 0;
-
   if (g_conf()->bluestore_debug_fsck_abort) {
     dout(1) << __func__ << " debug abort" << dendl;
     goto out_scan;
@@ -9369,8 +9449,9 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
 
   sb_ref_mismatches = sb_ref_counts.count_non_zero();
   if (sb_ref_mismatches != 0) {
-    derr << "fsck error: shared blob references aren't matching, at least "
-      << sb_ref_mismatches << " found" << dendl;
+    derr << "fsck error:" << "*" << sb_ref_mismatches
+         << " shared blob references aren't matching, at least "
+         << sb_ref_mismatches << " found" << dendl;
     errors += sb_ref_mismatches;
   }
 
@@ -9509,7 +9590,7 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
                  << " obj:" << oid << dendl;
 
         OnodeRef o;
-        o.reset(Onode::decode(c, oid, it->key(), it->value()));
+        o.reset(Onode::create_decode(c, oid, it->key(), it->value()));
        o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
        mempool::bluestore_fsck::set<BlobRef> blobs;
 
@@ -9657,23 +9738,9 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
   sb_info.clear();
   sb_ref_counts.reset();
 
-  // check global stats only if fscking (not repairing) w/o per-pool stats
-  if (!per_pool_stat_collection &&
-      !repair &&
-      !(actual_statfs == expected_store_statfs)) {
-    derr << "fsck error: actual " << actual_statfs
-        << " != expected " << expected_store_statfs << dendl;
-    if (repair) {
-      repairer.fix_statfs(db, BLUESTORE_GLOBAL_STATFS_KEY,
-                         expected_store_statfs);
-    }
-    ++errors;
-  }
-
   dout(1) << __func__ << " checking pool_statfs" << dendl;
-  _fsck_check_pool_statfs(expected_pool_statfs,
-                         errors, warnings, repair ? &repairer : nullptr);
-
+  _fsck_check_statfs(expected_store_statfs, expected_pool_statfs,
+    errors, warnings, repair ? &repairer : nullptr);
   if (depth != FSCK_SHALLOW) {
     dout(1) << __func__ << " checking for stray omap data " << dendl;
     it = db->get_iterator(PREFIX_OMAP, KeyValueDB::ITERATOR_NOCACHE);
@@ -10020,7 +10087,7 @@ void BlueStore::inject_stray_shared_blob_key(uint64_t sbid)
   string key;
   get_shared_blob_key(sbid, &key);
   bluestore_shared_blob_t persistent(sbid);
-  persistent.ref_map.get(0xdead0000, 0x1000);
+  persistent.ref_map.get(0xdead0000, min_alloc_size);
   bufferlist bl;
   encode(persistent, bl);
   dout(20) << __func__ << " sbid " << sbid
@@ -10556,7 +10623,7 @@ void BlueStore::_reap_collections()
   while (p != removed_colls.end()) {
     CollectionRef c = *p;
     dout(10) << __func__ << " " << c << " " << c->cid << dendl;
-    if (c->onode_map.map_any([&](Onode* o) {
+    if (c->onode_space.map_any([&](Onode* o) {
          ceph_assert(!o->exists);
          if (o->flushing_count.load()) {
            dout(10) << __func__ << " " << c << " " << c->cid << " " << o->oid
@@ -10568,7 +10635,7 @@ void BlueStore::_reap_collections()
       ++p;
       continue;
     }
-    c->onode_map.clear();
+    c->onode_space.clear();
     p = removed_colls.erase(p);
     dout(10) << __func__ << " " << c << " " << c->cid << " done" << dendl;
   }
@@ -10763,7 +10830,7 @@ int BlueStore::read(
 }
 
 void BlueStore::_read_cache(
-  OnodeRef o,
+  OnodeRef& o,
   uint64_t offset,
   size_t length,
   int read_cache_policy,
@@ -10929,7 +10996,7 @@ int BlueStore::_prepare_read_ioc(
 }
 
 int BlueStore::_generate_read_result_bl(
-  OnodeRef o,
+  OnodeRef& o,
   uint64_t offset,
   size_t length,
   ready_regions_t& ready_regions,
@@ -11023,7 +11090,7 @@ int BlueStore::_generate_read_result_bl(
 
 int BlueStore::_do_read(
   Collection *c,
-  OnodeRef o,
+  OnodeRef& o,
   uint64_t offset,
   size_t length,
   bufferlist& bl,
@@ -11392,7 +11459,7 @@ int BlueStore::readv(
 
 int BlueStore::_do_readv(
   Collection *c,
-  OnodeRef o,
+  OnodeRef& o,
   const interval_set<uint64_t>& m,
   bufferlist& bl,
   uint32_t op_flags,
@@ -12398,7 +12465,7 @@ int BlueStore::_upgrade_super()
   return 0;
 }
 
-void BlueStore::_assign_nid(TransContext *txc, OnodeRef o)
+void BlueStore::_assign_nid(TransContext *txc, OnodeRef& o)
 {
   if (o->onode.nid) {
     ceph_assert(o->exists);
@@ -12470,12 +12537,14 @@ void BlueStore::_txc_update_store_statfs(TransContext *txc)
   logger->inc(l_bluestore_compressed_allocated, txc->statfs_delta.compressed_allocated());
   logger->inc(l_bluestore_compressed_original, txc->statfs_delta.compressed_original());
 
-  bufferlist bl;
-  txc->statfs_delta.encode(bl);
   if (per_pool_stat_collection) {
-    string key;
-    get_pool_stat_key(txc->osd_pool_id, &key);
-    txc->t->merge(PREFIX_STAT, key, bl);
+    if (!is_statfs_recoverable()) {
+      bufferlist bl;
+      txc->statfs_delta.encode(bl);
+      string key;
+      get_pool_stat_key(txc->osd_pool_id, &key);
+      txc->t->merge(PREFIX_STAT, key, bl);
+    }
 
     std::lock_guard l(vstatfs_lock);
     auto& stats = osd_pools[txc->osd_pool_id];
@@ -12484,7 +12553,11 @@ void BlueStore::_txc_update_store_statfs(TransContext *txc)
     vstatfs += txc->statfs_delta; //non-persistent in this mode
 
   } else {
-    txc->t->merge(PREFIX_STAT, BLUESTORE_GLOBAL_STATFS_KEY, bl);
+    if (!is_statfs_recoverable()) {
+      bufferlist bl;
+      txc->statfs_delta.encode(bl);
+      txc->t->merge(PREFIX_STAT, BLUESTORE_GLOBAL_STATFS_KEY, bl);
+    }
 
     std::lock_guard l(vstatfs_lock);
     vstatfs += txc->statfs_delta;
@@ -14549,7 +14622,7 @@ void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t)
 
 int BlueStore::_touch(TransContext *txc,
                      CollectionRef& c,
-                     OnodeRef &o)
+                     OnodeRefo)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
   int r = 0;
@@ -14624,7 +14697,7 @@ void BlueStore::_pad_zeros(
 void BlueStore::_do_write_small(
     TransContext *txc,
     CollectionRef &c,
-    OnodeRef o,
+    OnodeRef& o,
     uint64_t offset, uint64_t length,
     bufferlist::iterator& blp,
     WriteContext *wctx)
@@ -15045,11 +15118,6 @@ void BlueStore::_do_write_small(
   return;
 }
 
-bool BlueStore::has_null_fm()
-{
-  return fm->is_null_manager();
-}
-
 bool BlueStore::BigDeferredWriteContext::can_defer(
     BlueStore::extent_map_t::iterator ep,
     uint64_t prefer_deferred_size,
@@ -15108,7 +15176,7 @@ bool BlueStore::BigDeferredWriteContext::apply_defer()
 void BlueStore::_do_write_big_apply_deferred(
     TransContext* txc,
     CollectionRef& c,
-    OnodeRef o,
+    OnodeRef& o,
     BlueStore::BigDeferredWriteContext& dctx,
     bufferlist::iterator& blp,
     WriteContext* wctx)
@@ -15171,7 +15239,7 @@ void BlueStore::_do_write_big_apply_deferred(
 void BlueStore::_do_write_big(
     TransContext *txc,
     CollectionRef &c,
-    OnodeRef o,
+    OnodeRef& o,
     uint64_t offset, uint64_t length,
     bufferlist::iterator& blp,
     WriteContext *wctx)
@@ -15384,7 +15452,7 @@ void BlueStore::_do_write_big(
 int BlueStore::_do_alloc_write(
   TransContext *txc,
   CollectionRef coll,
-  OnodeRef o,
+  OnodeRef& o,
   WriteContext *wctx)
 {
   dout(20) << __func__ << " txc " << txc
@@ -15448,6 +15516,18 @@ int BlueStore::_do_alloc_write(
 
   // compress (as needed) and calc needed space
   uint64_t need = 0;
+  uint64_t data_size = 0;
+  // 'need' is amount of space that must be provided by allocator.
+  // 'data_size' is a size of data that will be transferred to disk.
+  // Note that data_size is always <= need. This comes from:
+  // - write to blob was unaligned, and there is free space
+  // - data has been compressed
+  //
+  // We make one decision and apply it to all blobs.
+  // All blobs will be deferred or none will.
+  // We assume that allocator does its best to provide contiguous space,
+  // and the condition is : (data_size < deferred).
+
   auto max_bsize = std::max(wctx->target_blob_size, min_alloc_size);
   for (auto& wi : wctx->writes) {
     if (c && wi.blob_length > min_alloc_size) {
@@ -15494,6 +15574,7 @@ int BlueStore::_do_alloc_write(
          txc->statfs_delta.compressed_allocated() += result_len;
          logger->inc(l_bluestore_compress_success_count);
          need += result_len;
+         data_size += result_len;
        } else {
          rejected = true;
        }
@@ -15506,6 +15587,7 @@ int BlueStore::_do_alloc_write(
                 << dendl;
        logger->inc(l_bluestore_compress_rejected_count);
        need += wi.blob_length;
+       data_size += wi.bl.length();
       } else {
        rejected = true;
       }
@@ -15520,6 +15602,7 @@ int BlueStore::_do_alloc_write(
                 << std::dec << dendl;
        logger->inc(l_bluestore_compress_rejected_count);
        need += wi.blob_length;
+       data_size += wi.bl.length();
       }
       log_latency("compress@_do_alloc_write",
        l_bluestore_compress_lat,
@@ -15527,10 +15610,11 @@ int BlueStore::_do_alloc_write(
        cct->_conf->bluestore_log_op_age );
     } else {
       need += wi.blob_length;
+      data_size += wi.bl.length();
     }
   }
   PExtentVector prealloc;
-  prealloc.reserve(2 * wctx->writes.size());;
+  prealloc.reserve(2 * wctx->writes.size());
   int64_t prealloc_left = 0;
   prealloc_left = alloc->allocate(
     need, min_alloc_size, need,
@@ -15548,10 +15632,10 @@ int BlueStore::_do_alloc_write(
   }
   _collect_allocation_stats(need, min_alloc_size, prealloc);
 
-  dout(20) << __func__ << " prealloc " << prealloc << dendl;
+  dout(20) << __func__ << std::hex << " need=0x" << need << " data=0x" << data_size
+          << " prealloc " << prealloc << dendl;
   auto prealloc_pos = prealloc.begin();
   ceph_assert(prealloc_pos != prealloc.end());
-  uint64_t prealloc_pos_length = prealloc_pos->length;
 
   for (auto& wi : wctx->writes) {
     bluestore_blob_t& dblob = wi.b->dirty_blob();
@@ -15614,20 +15698,15 @@ int BlueStore::_do_alloc_write(
 
     PExtentVector extents;
     int64_t left = final_length;
-    bool has_chunk2defer = false;
     auto prefer_deferred_size_snapshot = prefer_deferred_size.load();
     while (left > 0) {
       ceph_assert(prealloc_left > 0);
-      has_chunk2defer |= (prealloc_pos_length < prefer_deferred_size_snapshot);
       if (prealloc_pos->length <= left) {
        prealloc_left -= prealloc_pos->length;
        left -= prealloc_pos->length;
        txc->statfs_delta.allocated() += prealloc_pos->length;
        extents.push_back(*prealloc_pos);
        ++prealloc_pos;
-       if (prealloc_pos != prealloc.end()) {
-         prealloc_pos_length = prealloc_pos->length;
-       }
       } else {
        extents.emplace_back(prealloc_pos->offset, left);
        prealloc_pos->offset += left;
@@ -15673,7 +15752,7 @@ int BlueStore::_do_alloc_write(
 
     // queue io
     if (!g_conf()->bluestore_debug_omit_block_device_write) {
-      if (has_chunk2defer && l->length() < prefer_deferred_size_snapshot) {
+      if (data_size < prefer_deferred_size_snapshot) {
        dout(20) << __func__ << " deferring 0x" << std::hex
                 << l->length() << std::dec << " write via deferred" << dendl;
        bluestore_deferred_op_t *op = _get_deferred_op(txc, l->length());
@@ -15704,7 +15783,7 @@ int BlueStore::_do_alloc_write(
 void BlueStore::_wctx_finish(
   TransContext *txc,
   CollectionRef& c,
-  OnodeRef o,
+  OnodeRef& o,
   WriteContext *wctx,
   set<SharedBlob*> *maybe_unshared_blobs)
 {
@@ -15829,7 +15908,7 @@ void BlueStore::_wctx_finish(
 void BlueStore::_do_write_data(
   TransContext *txc,
   CollectionRef& c,
-  OnodeRef o,
+  OnodeRef& o,
   uint64_t offset,
   uint64_t length,
   bufferlist& bl,
@@ -15870,7 +15949,7 @@ void BlueStore::_do_write_data(
 
 void BlueStore::_choose_write_options(
    CollectionRef& c,
-   OnodeRef o,
+   OnodeRef& o,
    uint32_t fadvise_flags,
    WriteContext *wctx)
 {
@@ -15976,7 +16055,7 @@ void BlueStore::_choose_write_options(
 int BlueStore::_do_gc(
   TransContext *txc,
   CollectionRef& c,
-  OnodeRef o,
+  OnodeRef& o,
   const WriteContext& wctx,
   uint64_t *dirty_start,
   uint64_t *dirty_end)
@@ -16031,7 +16110,7 @@ int BlueStore::_do_gc(
 int BlueStore::_do_write(
   TransContext *txc,
   CollectionRef& c,
-  OnodeRef o,
+  OnodeRef& o,
   uint64_t offset,
   uint64_t length,
   bufferlist& bl,
@@ -16200,7 +16279,7 @@ int BlueStore::_do_zero(TransContext *txc,
 }
 
 void BlueStore::_do_truncate(
-  TransContext *txc, CollectionRef& c, OnodeRef o, uint64_t offset,
+  TransContext *txc, CollectionRef& c, OnodeRef& o, uint64_t offset,
   set<SharedBlob*> *maybe_unshared_blobs)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid
@@ -16275,7 +16354,7 @@ int BlueStore::_truncate(TransContext *txc,
 int BlueStore::_do_remove(
   TransContext *txc,
   CollectionRef& c,
-  OnodeRef o)
+  OnodeRef& o)
 {
   set<SharedBlob*> maybe_unshared_blobs;
   bool is_gen = !o->oid.is_no_gen();
@@ -16375,7 +16454,7 @@ int BlueStore::_do_remove(
 
 int BlueStore::_remove(TransContext *txc,
                       CollectionRef& c,
-                      OnodeRef &o)
+                      OnodeRefo)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " onode " << o.get()
@@ -16574,7 +16653,7 @@ int BlueStore::_omap_setkeys(TransContext *txc,
 
 int BlueStore::_omap_setheader(TransContext *txc,
                               CollectionRef& c,
-                              OnodeRef &o,
+                              OnodeRefo,
                               bufferlist& bl)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
@@ -16925,7 +17004,7 @@ int BlueStore::_rename(TransContext *txc,
 
   // this adjusts oldo->{oid,key}, and reset oldo to a fresh empty
   // Onode in the old slot
-  c->onode_map.rename(oldo, old_oid, new_oid, new_okey);
+  c->onode_space.rename(oldo, old_oid, new_oid, new_okey);
   r = 0;
 
   // hold a ref to new Onode in old name position, to ensure we don't drop
@@ -17009,7 +17088,7 @@ int BlueStore::_remove_collection(TransContext *txc, const coll_t &cid,
     }
     size_t nonexistent_count = 0;
     ceph_assert((*c)->exists);
-    if ((*c)->onode_map.map_any([&](Onode* o) {
+    if ((*c)->onode_space.map_any([&](Onode* o) {
       if (o->exists) {
         dout(1) << __func__ << " " << o->oid << " " << o
                << " exists in onode_map" << dendl;
@@ -17034,7 +17113,7 @@ int BlueStore::_remove_collection(TransContext *txc, const coll_t &cid,
       bool exists = (!next.is_max());
       for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
         dout(10) << __func__ << " oid " << *it << dendl;
-        auto onode = (*c)->onode_map.lookup(*it);
+        auto onode = (*c)->onode_space.lookup(*it);
         exists = !onode || onode->exists;
         if (exists) {
           dout(1) << __func__ << " " << *it
@@ -17100,7 +17179,7 @@ int BlueStore::_split_collection(TransContext *txc,
   ceph_assert(is_pg);
 
   // the destination should initially be empty.
-  ceph_assert(d->onode_map.empty());
+  ceph_assert(d->onode_space.empty());
   ceph_assert(d->shared_blob_set.empty());
   ceph_assert(d->cnode.bits == bits);
 
@@ -17491,12 +17570,12 @@ void BlueStore::_shutdown_cache()
     ceph_assert(i->empty());
   }
   for (auto& p : coll_map) {
-    p.second->onode_map.clear();
+    p.second->onode_space.clear();
     if (!p.second->shared_blob_set.empty()) {
       derr << __func__ << " stray shared blobs on " << p.first << dendl;
       p.second->shared_blob_set.dump<0>(cct);
     }
-    ceph_assert(p.second->onode_map.empty());
+    ceph_assert(p.second->onode_space.empty());
     ceph_assert(p.second->shared_blob_set.empty());
   }
   coll_map.clear();
@@ -17539,7 +17618,7 @@ void BlueStore::_apply_padding(uint64_t head_pad,
   }
 }
 
-void BlueStore::_record_onode(OnodeRef &o, KeyValueDB::Transaction &txn)
+void BlueStore::_record_onode(OnodeRefo, KeyValueDB::Transaction &txn)
 {
   // finalize extent_map shards
   o->extent_map.update(txn, false);
@@ -18309,7 +18388,7 @@ int BlueStore::invalidate_allocation_file_on_bluefs()
 {
   // mark that allocation-file was invalidated and we should destage a new copy whne closing db
   need_to_destage_allocation_file = true;
-  dout(10) << "need_to_destage_allocation_file was set" << dendl;
+  dout(10) << __func__ << " need_to_destage_allocation_file was set" << dendl;
 
   BlueFS::FileWriter *p_handle = nullptr;
   if (!bluefs->dir_exists(allocator_dir)) {
@@ -18320,7 +18399,7 @@ int BlueStore::invalidate_allocation_file_on_bluefs()
 
   int ret = bluefs->stat(allocator_dir, allocator_file, nullptr, nullptr);
   if (ret != 0) {
-    dout(5) << "allocator_file(" << allocator_file << ") doesn't exist" << dendl;
+    dout(5) << __func__ << " allocator_file(" << allocator_file << ") doesn't exist" << dendl;
     // nothing to do -> return
     return 0;
   }
@@ -18328,14 +18407,16 @@ int BlueStore::invalidate_allocation_file_on_bluefs()
 
   ret = bluefs->open_for_write(allocator_dir, allocator_file, &p_handle, true);
   if (ret != 0) {
-    derr << "Failed open_for_write with error-code " << ret << dendl;
+    derr << __func__ << "::NCB:: Failed open_for_write with error-code "
+         << ret << dendl;
     return -1;
   }
 
   dout(5) << "invalidate using bluefs->truncate(p_handle, 0)" << dendl;
   ret = bluefs->truncate(p_handle, 0);
   if (ret != 0) {
-    derr << "Failed truncate with error-code " << ret << dendl;
+    derr << __func__ << "::NCB:: Failed truncaste with error-code "
+         << ret << dendl;
     bluefs->close_writer(p_handle);
     return -1;
   }
@@ -18602,6 +18683,14 @@ int calc_allocator_image_trailer_size()
 //-----------------------------------------------------------------------------------
 int BlueStore::__restore_allocator(Allocator* allocator, uint64_t *num, uint64_t *bytes)
 {
+  if (cct->_conf->bluestore_debug_inject_allocation_from_file_failure > 0) {
+     boost::mt11213b rng(time(NULL));
+    boost::uniform_real<> ur(0, 1);
+    if (ur(rng) < cct->_conf->bluestore_debug_inject_allocation_from_file_failure) {
+      derr << __func__ << " failure injected." << dendl;
+      return -1;
+    }
+  }
   utime_t start_time = ceph_clock_now();
   BlueFS::FileReader *p_temp_handle = nullptr;
   int ret = bluefs->open_for_read(allocator_dir, allocator_file, &p_temp_handle, false);
@@ -18766,228 +18855,248 @@ int BlueStore::restore_allocator(Allocator* dest_allocator, uint64_t *num, uint6
   return ret;
 }
 
-//-------------------------------------------------------------------------
-void BlueStore::ExtentMap::provide_shard_info_to_onode(bufferlist v, uint32_t shard_id)
-{
-  [[maybe_unused]] auto cct  = onode->c->store->cct;
-  auto path = onode->c->store->path;
-  if (shard_id < shards.size()) {
-    auto p = &shards[shard_id];
-    if (!p->loaded) {
-      dout(30) << "opening shard 0x" << std::hex << p->shard_info->offset << std::dec << dendl;
-      p->extents = decode_some(v);
-      p->loaded = true;
-      dout(20) << "open shard 0x" << std::hex << p->shard_info->offset << std::dec << dendl;
-      ceph_assert(p->dirty == false);
-      ceph_assert(v.length() == p->shard_info->bytes);
-    }
-  } else {
-    derr << "illegal shard-id=" << shard_id << " shards.size()=" << shards.size() << dendl;
-    ceph_assert(shard_id < shards.size());
-  }
-}
-
 //-----------------------------------------------------------------------------------
 void BlueStore::set_allocation_in_simple_bmap(SimpleBitmap* sbmap, uint64_t offset, uint64_t length)
 {
+  dout(30) << __func__ << " 0x" << std::hex
+           << offset << "~" << length
+           << " " << min_alloc_size_mask
+           << dendl;
   ceph_assert((offset & min_alloc_size_mask) == 0);
   ceph_assert((length & min_alloc_size_mask) == 0);
   sbmap->set(offset >> min_alloc_size_order, length >> min_alloc_size_order);
 }
 
-//---------------------------------------------------------
-// Process all physical extents from a given Onode (including all its shards)
-void BlueStore::read_allocation_from_single_onode(
-  SimpleBitmap*        sbmap,
-  BlueStore::OnodeRef& onode_ref,
-  read_alloc_stats_t&  stats)
-{
-  // create a map holding all physical-extents of this Onode to prevent duplication from being added twice and more
-  std::unordered_map<uint64_t, uint32_t> lcl_extnt_map;
-  unsigned blobs_count = 0;
-  uint64_t pos = 0;
+void BlueStore::ExtentDecoderPartial::_consume_new_blob(bool spanning,
+                                                        uint64_t extent_no,
+                                                        uint64_t sbid,
+                                                        BlobRef b)
+{
+  [[maybe_unused]] auto cct = store.cct;
+  ceph_assert(per_pool_statfs);
+  ceph_assert(oid != ghobject_t());
 
-  stats.spanning_blob_count += onode_ref->extent_map.spanning_blob_map.size();
-  // first iterate over all logical-extents
-  for (struct Extent& l_extent : onode_ref->extent_map.extent_map) {
-    ceph_assert(l_extent.logical_offset >= pos);
+  auto &blob = b->get_blob();
+  if(spanning) {
+    dout(20) << __func__ << " " << spanning << " " << b->id << dendl;
+    ceph_assert(b->id >= 0);
+    spanning_blobs[b->id] = b;
+    ++stats.spanning_blob_count;
+  } else {
+    dout(20) << __func__ << " " << spanning << " " << extent_no << dendl;
+    blobs[extent_no] = b;
+  }
+  bool compressed = blob.is_compressed();
+  if (!blob.is_shared()) {
+    for (auto& pe : blob.get_extents()) {
+      if (pe.offset == bluestore_pextent_t::INVALID_OFFSET) {
+        ++stats.skipped_illegal_extent;
+        continue;
+      }
+      store.set_allocation_in_simple_bmap(&sbmap, pe.offset, pe.length);
 
-    pos = l_extent.logical_offset + l_extent.length;
-    ceph_assert(l_extent.blob);
-    const bluestore_blob_t& blob         = l_extent.blob->get_blob();
-    const PExtentVector&    p_extent_vec = blob.get_extents();
-    blobs_count++;
-    if (blob.is_compressed()) {
-      stats.compressed_blob_count++;
+      per_pool_statfs->allocated() += pe.length;
+      if (compressed) {
+        per_pool_statfs->compressed_allocated() += pe.length;
+      }
     }
-
-    if (blob.is_shared()) {
-      stats.shared_blobs_count++;
+    if (compressed) {
+      per_pool_statfs->compressed() +=
+        blob.get_compressed_payload_length();
+      ++stats.compressed_blob_count;
     }
-
-    // process all physical extent in this blob
-    for (auto p_extent = p_extent_vec.begin(); p_extent != p_extent_vec.end(); p_extent++) {
-      auto offset = p_extent->offset;
-      auto length = p_extent->length;
-
-      // Offset of -1 means that the extent was removed (and it is only a place holder) and can be safely skipped
-      if (offset == (uint64_t)-1) {
-       stats.skipped_illegal_extent++;
-       continue;
+  } else {
+    auto it = sb_info.find(sbid);
+    if (it == sb_info.end()) {
+      derr << __func__ << " shared blob not found:" << sbid
+           << dendl;
+    }
+    auto &sbi = *it;
+    auto pool_id = oid.hobj.get_logical_pool();
+    if (sbi.pool_id == sb_info_t::INVALID_POOL_ID) {
+      sbi.pool_id = pool_id;
+      size_t alloc_delta = sbi.allocated_chunks << min_alloc_size_order;
+      per_pool_statfs->allocated() += alloc_delta;
+      if (compressed) {
+        per_pool_statfs->compressed_allocated() += alloc_delta;
+        ++stats.compressed_blob_count;
       }
+    }
+    if (compressed) {
+      per_pool_statfs->compressed() +=
+        blob.get_compressed_payload_length();
+    }
+  }
+}
 
-      if (!blob.is_shared()) {
-       // skip repeating extents
-       auto lcl_itr = lcl_extnt_map.find(offset);
-       // extents using shared blobs might have differnt length
-       if (lcl_itr != lcl_extnt_map.end() ) {
-         // repeated extents must have the same length!
-         ceph_assert(lcl_extnt_map[offset] == length);
-         stats.skipped_repeated_extent++;
-       } else {
-         lcl_extnt_map[offset] = length;
-         set_allocation_in_simple_bmap(sbmap, offset, length);
-         stats.extent_count++;
-       }
-      } else {
-       // extents using shared blobs might have differnt length
-       set_allocation_in_simple_bmap(sbmap, offset, length);
-       stats.extent_count++;
-      }
+void BlueStore::ExtentDecoderPartial::consume_blobid(Extent* le,
+                                                     bool spanning,
+                                                     uint64_t blobid)
+{
+  [[maybe_unused]] auto cct = store.cct;
+  dout(20) << __func__ << " " << spanning << " " << blobid << dendl;
+  auto &map = spanning ? spanning_blobs : blobs;
+  auto it = map.find(blobid);
+  ceph_assert(it != map.end());
+  per_pool_statfs->stored() += le->length;
+  if (it->second->get_blob().is_compressed()) {
+    per_pool_statfs->compressed_original() += le->length;
+  }
+}
 
-    } // physical-extents loop
+void BlueStore::ExtentDecoderPartial::consume_blob(Extent* le,
+                                                   uint64_t extent_no,
+                                                   uint64_t sbid,
+                                                   BlobRef b)
+{
+  _consume_new_blob(false, extent_no, sbid, b);
+  per_pool_statfs->stored() += le->length;
+  if (b->get_blob().is_compressed()) {
+    per_pool_statfs->compressed_original() += le->length;
+  }
+}
 
-  } // logical-extents loop
+void BlueStore::ExtentDecoderPartial::consume_spanning_blob(uint64_t sbid,
+                                                            BlobRef b)
+{
+  _consume_new_blob(true, 0/*doesn't matter*/, sbid, b);
+}
 
-  if (blobs_count < MAX_BLOBS_IN_ONODE) {
-    stats.blobs_in_onode[blobs_count]++;
-  } else {
-    // store all counts higher than MAX_BLOBS_IN_ONODE in a single bucket at offset zero
-    stats.blobs_in_onode[MAX_BLOBS_IN_ONODE]++;
-  }
+void BlueStore::ExtentDecoderPartial::reset(const ghobject_t _oid,
+                                            volatile_statfs* _per_pool_statfs)
+{
+  oid = _oid;
+  per_pool_statfs = _per_pool_statfs;
+  blob_map_t empty;
+  blob_map_t empty2;
+  std::swap(blobs, empty);
+  std::swap(spanning_blobs, empty2);
 }
 
-//-------------------------------------------------------------------------
 int BlueStore::read_allocation_from_onodes(SimpleBitmap *sbmap, read_alloc_stats_t& stats)
 {
-  // finally add all space take by user data
-  auto it = db->get_iterator(PREFIX_OBJ, KeyValueDB::ITERATOR_NOCACHE);
+  sb_info_space_efficient_map_t sb_info;
+  // iterate over all shared blobs
+  auto it = db->get_iterator(PREFIX_SHARED_BLOB, KeyValueDB::ITERATOR_NOCACHE);
   if (!it) {
-    // TBD - find a better error code
-    derr << "failed db->get_iterator(PREFIX_OBJ)" << dendl;
-    return -1;
+    derr << "failed getting shared blob's iterator" << dendl;
+    return -ENOENT;
+  }
+  if (it) {
+    for (it->lower_bound(string()); it->valid(); it->next()) {
+      const auto& key = it->key();
+      dout(20) << __func__ << " decode sb " << pretty_binary_string(key) << dendl;
+      uint64_t sbid = 0;
+      if (get_key_shared_blob(key, &sbid) != 0) {
+       derr << __func__ << " bad shared blob key '" << pretty_binary_string(key)
+            << "'" << dendl;
+      }
+      bluestore_shared_blob_t shared_blob(sbid);
+      bufferlist bl = it->value();
+      auto blp = bl.cbegin();
+      try {
+        decode(shared_blob, blp);
+      }
+      catch (ceph::buffer::error& e) {
+       derr << __func__ << " failed to decode Shared Blob"
+            << pretty_binary_string(key) << dendl;
+       continue;
+      }
+      dout(20) << __func__ << "  " << shared_blob << dendl;
+      uint64_t allocated = 0;
+      for (auto& r : shared_blob.ref_map.ref_map) {
+        ceph_assert(r.first != bluestore_pextent_t::INVALID_OFFSET);
+        set_allocation_in_simple_bmap(sbmap, r.first, r.second.length);
+        allocated += r.second.length;
+      }
+      auto &sbi = sb_info.add_or_adopt(sbid);
+      ceph_assert(p2phase(allocated, min_alloc_size) == 0);
+      sbi.allocated_chunks += (allocated >> min_alloc_size_order);
+      ++stats.shared_blob_count;
+    }
+  }
+
+  it = db->get_iterator(PREFIX_OBJ, KeyValueDB::ITERATOR_NOCACHE);
+  if (!it) {
+    derr << "failed getting onode's iterator" << dendl;
+    return -ENOENT;
   }
 
-  CollectionRef       collection_ref;
-  spg_t               pgid;
-  BlueStore::OnodeRef onode_ref;
-  bool                has_open_onode = false;
-  uint32_t            shard_id       = 0;
   uint64_t            kv_count       = 0;
   uint64_t            count_interval = 1'000'000;
+  ExtentDecoderPartial edecoder(*this,
+                                stats,
+                                *sbmap,
+                                sb_info,
+                                min_alloc_size_order);
+
   // iterate over all ONodes stored in RocksDB
   for (it->lower_bound(string()); it->valid(); it->next(), kv_count++) {
     // trace an even after every million processed objects (typically every 5-10 seconds)
     if (kv_count && (kv_count % count_interval == 0) ) {
-      dout(5) << "processed objects count = " << kv_count << dendl;
-    }
-
-    // Shards - Code
-    // add the extents from the shards to the main Obj
-    if (is_extent_shard_key(it->key())) {
-      // shards must follow a valid main object
-      if (has_open_onode) {
-       // shards keys must start with the main object key
-       if (it->key().find(onode_ref->key) == 0) {
-         // shards count can't exceed declared shard-count in the main-object
-         if (shard_id < onode_ref->extent_map.shards.size()) {
-           onode_ref->extent_map.provide_shard_info_to_onode(it->value(), shard_id);
-           stats.shard_count++;
-           shard_id++;
-         } else {
-           derr << "illegal shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
-           derr << "shard->key=" << pretty_binary_string(it->key()) << dendl;
-           ceph_assert(shard_id < onode_ref->extent_map.shards.size());
-         }
-       } else {
-         derr << "illegal shard-key::onode->key=" << pretty_binary_string(onode_ref->key) << " shard->key=" << pretty_binary_string(it->key()) << dendl;
-         ceph_assert(it->key().find(onode_ref->key) == 0);
-       }
-      } else {
-       derr << "error::shard without main objects for key=" << pretty_binary_string(it->key()) << dendl;
-       ceph_assert(has_open_onode);
-      }
-
-    } else {
-      // Main Object Code
+      dout(5) << __func__ << " processed objects count = " << kv_count << dendl;
+    }
 
-      if (has_open_onode) {
-       // make sure we got all shards of this object
-       if (shard_id == onode_ref->extent_map.shards.size()) {
-         // We completed an Onode Object -> pass it to be processed
-         read_allocation_from_single_onode(sbmap, onode_ref, stats);
-       } else {
-         derr << "Missing shards! shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
-         ceph_assert(shard_id == onode_ref->extent_map.shards.size());
-       }
-      } else {
-       // We opened a new Object
-       has_open_onode =  true;
+    auto key = it->key();
+    auto okey = key;
+    dout(20) << __func__ << " decode onode " << pretty_binary_string(key) << dendl;
+    ghobject_t oid;
+    if (!is_extent_shard_key(it->key())) {
+      int r = get_key_object(okey, &oid);
+      if (r != 0) {
+        derr << __func__ << " failed to decode onode key = "
+             << pretty_binary_string(okey) << dendl;
+        return -EIO;
       }
-
-      // The main Obj is always first in RocksDB so we can start with shard_id set to zero
-      shard_id = 0;
-      stats.onode_count++;
-      ghobject_t oid;
-      int ret = get_key_object(it->key(), &oid);
-      if (ret < 0) {
-       derr << "bad object key " << pretty_binary_string(it->key()) << dendl;
-       ceph_assert(ret == 0);
-       continue;
+      edecoder.reset(oid,
+        &stats.actual_pool_vstatfs[oid.hobj.get_logical_pool()]);
+      Onode dummy_on(cct);
+      Onode::decode_raw(&dummy_on,
+        it->value(),
+        edecoder);
+      ++stats.onode_count;
+    } else {
+      uint32_t offset;
+      int r = get_key_extent_shard(key, &okey, &offset);
+      if (r != 0) {
+        derr << __func__ << " failed to decode onode extent key = "
+             << pretty_binary_string(key) << dendl;
+        return -EIO;
       }
-
-      // fill collection_ref if doesn't exist yet
-      // We process all the obejcts in a given collection and then move to the next collection
-      // This means we only search once for every given collection
-      if (!collection_ref                                     ||
-         oid.shard_id                != pgid.shard           ||
-         oid.hobj.get_logical_pool() != (int64_t)pgid.pool() ||
-         !collection_ref->contains(oid)) {
-       stats.collection_search++;
-       collection_ref = nullptr;
-
-       for (auto& p : coll_map) {
-         if (p.second->contains(oid)) {
-           collection_ref = p.second;
-           break;
-         }
-       }
-
-       if (!collection_ref) {
-         derr << "stray object " << oid << " not owned by any collection" << dendl;
-         ceph_assert(collection_ref);
-         continue;
-       }
-
-       collection_ref->cid.is_pg(&pgid);
+      r = get_key_object(okey, &oid);
+      if (r != 0) {
+        derr << __func__
+             << " failed to decode onode key= " << pretty_binary_string(okey)
+             << " from extent key= " << pretty_binary_string(key)
+             << dendl;
+        return -EIO;
       }
-      onode_ref.reset(BlueStore::Onode::decode(collection_ref, oid, it->key(), it->value()));
+      ceph_assert(oid == edecoder.get_oid());
+      edecoder.decode_some(it->value(), nullptr);
+      ++stats.shard_count;
     }
   }
 
-  // process the last object
-  if (has_open_onode) {
-    // make sure we got all shards of this object
-    if (shard_id == onode_ref->extent_map.shards.size()) {
-      // We completed an Onode Object -> pass it to be processed
-      read_allocation_from_single_onode(sbmap, onode_ref, stats);
-    } else {
-      derr << "Last Object is missing shards! shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
-      ceph_assert(shard_id == onode_ref->extent_map.shards.size());
-    }
+  std::lock_guard l(vstatfs_lock);
+  store_statfs_t s;
+  osd_pools.clear();
+  for (auto& p : stats.actual_pool_vstatfs) {
+    if (per_pool_stat_collection) {
+      osd_pools[p.first] = p.second;
+    }
+    stats.actual_store_vstatfs += p.second;
+    p.second.publish(&s);
+    dout(5) << __func__ << " recovered pool "
+            << std::hex
+            << p.first << "->" << s
+            << std::dec
+            << " per-pool:" << per_pool_stat_collection
+            << dendl;
   }
-  dout(5) << "onode_count=" << stats.onode_count << " ,shard_count=" << stats.shard_count << dendl;
-
+  vstatfs = stats.actual_store_vstatfs;
+  vstatfs.publish(&s);
+  dout(5) << __func__ << " recovered " << s
+          << dendl;
   return 0;
 }
 
@@ -19329,7 +19438,7 @@ int BlueStore::reset_fm_for_restore()
   KeyValueDB::Transaction t = db->get_transaction();
   // call _open_fm() with fm_restore set to TRUE
   // this will mark the full device space as allocated (and not just the reserved space)
-  _open_fm(t, true, true);
+  _open_fm(t, true, true, true);
   if (fm == nullptr) {
     derr << "Failed _open_fm()" << dendl;
     return -1;
@@ -19444,7 +19553,7 @@ int BlueStore::push_allocation_to_rocksdb()
 #endif // CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
 
 //-------------------------------------------------------------------------------------
-static int commit_freelist_type(KeyValueDB *db, const std::string& freelist_type, CephContext *cct, const std::string &path)
+int BlueStore::commit_freelist_type()
 {
   // When freelist_type to "bitmap" we will store allocation in RocksDB
   // When allocation-info is stored in a single file we set freelist_type to "null"
@@ -19469,14 +19578,14 @@ static int commit_freelist_type(KeyValueDB *db, const std::string& freelist_type
 //-------------------------------------------------------------------------------------
 int BlueStore::commit_to_null_manager()
 {
-  dout(5) << "Set FreelistManager to NULL FM..." << dendl;
+  dout(5) << __func__ << " Set FreelistManager to NULL FM..." << dendl;
   fm->set_null_manager();
   freelist_type = "null";
 #if 1
-  return commit_freelist_type(db, freelist_type, cct, path);
+  return commit_freelist_type();
 #else
   // should check how long this step take on a big configuration as deletes are expensive
-  if (commit_freelist_type(db, freelist_type, cct, path) == 0) {
+  if (commit_freelist_type() == 0) {
     // remove all objects of PREFIX_ALLOC_BITMAP from RocksDB to guarantee a clean start
     clear_allocation_objects_from_rocksdb(db, cct, path);
   }
@@ -19490,7 +19599,7 @@ int BlueStore::commit_to_real_manager()
   dout(5) << "Set FreelistManager to Real FM..." << dendl;
   ceph_assert(!fm->is_null_manager());
   freelist_type = "bitmap";
-  int ret = commit_freelist_type(db, freelist_type, cct, path);
+  int ret = commit_freelist_type();
   if (ret == 0) {
     //remove the allocation_file
     invalidate_allocation_file_on_bluefs();