]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/objectstore/store_test.cc
import ceph 15.2.14
[ceph.git] / ceph / src / test / objectstore / store_test.cc
index 7e1d442f4ebb64e82964583ecf7ffc501223cfd0..13ece64799b80d133b38dad94ae87df2fa47fd91 100644 (file)
 #include "os/filestore/FileStore.h"
 #if defined(WITH_BLUESTORE)
 #include "os/bluestore/BlueStore.h"
+#include "os/bluestore/BlueFS.h"
 #endif
 #include "include/Context.h"
 #include "common/ceph_argparse.h"
+#include "common/admin_socket.h"
 #include "global/global_init.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/Cond.h"
 #include "common/errno.h"
 #include "include/stringify.h"
@@ -48,8 +50,6 @@ typedef boost::mt11213b gen_type;
 const uint64_t DEF_STORE_TEST_BLOCKDEV_SIZE = 10240000000;
 #define dout_context g_ceph_context
 
-#if GTEST_HAS_PARAM_TEST
-
 static bool bl_eq(bufferlist& expected, bufferlist& actual)
 {
   if (expected.contents_equal(actual))
@@ -101,6 +101,17 @@ int queue_transaction(
   }
 }
 
+template <typename T>
+int collection_list(T &store, ObjectStore::CollectionHandle &c,
+                    const ghobject_t& start, const ghobject_t& end, int max,
+                    vector<ghobject_t> *ls, ghobject_t *pnext,
+                    bool disable_legacy = false) {
+  if (disable_legacy || rand() % 2) {
+    return store->collection_list(c, start, end, max, ls, pnext);
+  } else {
+    return store->collection_list_legacy(c, start, end, max, ls, pnext);
+  }
+}
 
 bool sorted(const vector<ghobject_t> &in) {
   ghobject_t start;
@@ -1315,11 +1326,105 @@ TEST_P(StoreTest, SimpleObjectTest) {
 
 #if defined(WITH_BLUESTORE)
 
+TEST_P(StoreTestSpecificAUSize, ReproBug41901Test) {
+  if(string(GetParam()) != "bluestore")
+    return;
+  SetVal(g_conf(), "bluestore_debug_enforce_settings", "hdd");
+  g_conf().apply_changes(nullptr);
+  StartDeferred(65536);
+
+  int r;
+  coll_t cid;
+  ghobject_t hoid(hobject_t(sobject_t("Object 1", CEPH_NOSNAP)));
+  const PerfCounters* logger = store->get_perf_counters();
+  auto ch = store->create_new_collection(cid);
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    cerr << "Creating collection " << cid << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  {
+    bool exists = store->exists(ch, hoid);
+    ASSERT_TRUE(!exists);
+
+    ObjectStore::Transaction t;
+    t.touch(cid, hoid);
+    cerr << "Creating object " << hoid << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    exists = store->exists(ch, hoid);
+    ASSERT_EQ(true, exists);
+  }
+  {
+    ObjectStore::Transaction t;
+    bufferlist bl, orig;
+    string s(4096, 'a');
+    bl.append(s);
+    t.write(cid, hoid, 0x11000, bl.length(), bl);
+    cerr << "write1" << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  {
+    ObjectStore::Transaction t;
+    bufferlist bl, orig;
+    string s(4096 * 3, 'a');
+    bl.append(s);
+    t.write(cid, hoid, 0x15000, bl.length(), bl);
+    cerr << "write2" << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  ASSERT_EQ(logger->get(l_bluestore_write_small), 2u);
+  ASSERT_EQ(logger->get(l_bluestore_write_small_unused), 1u);
+
+  {
+    ObjectStore::Transaction t;
+    bufferlist bl, orig;
+    string s(4096 * 2, 'a');
+    bl.append(s);
+    t.write(cid, hoid, 0xe000, bl.length(), bl);
+    cerr << "write3" << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  ASSERT_EQ(logger->get(l_bluestore_write_small), 3u);
+  ASSERT_EQ(logger->get(l_bluestore_write_small_unused), 2u);
+
+
+  {
+    ObjectStore::Transaction t;
+    bufferlist bl, orig;
+    string s(4096, 'a');
+    bl.append(s);
+    t.write(cid, hoid, 0xf000, bl.length(), bl);
+    t.write(cid, hoid, 0x10000, bl.length(), bl);
+    cerr << "write3" << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  ASSERT_EQ(logger->get(l_bluestore_write_small), 5u);
+  ASSERT_EQ(logger->get(l_bluestore_write_small_unused), 2u);
+  {
+    ObjectStore::Transaction t;
+    t.remove(cid, hoid);
+    t.remove_collection(cid);
+    cerr << "Cleaning" << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+}
+
+
 TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
   if(string(GetParam()) != "bluestore")
     return;
   StartDeferred(65536);
   SetVal(g_conf(), "bluestore_compression_mode", "force");
+  SetVal(g_conf(), "bluestore_max_blob_size", "524288");
   // just a big number to disble gc
   SetVal(g_conf(), "bluestore_gc_enable_total_threshold", "100000");
   SetVal(g_conf(), "bluestore_fsck_on_umount", "true");
@@ -1370,7 +1475,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_TRUE(statfs.available > 0u && statfs.available < g_conf()->bluestore_block_size);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ( 0u, statfs_pool.allocated);
     ASSERT_EQ( 0u, statfs_pool.data_stored);
@@ -1401,7 +1507,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(0, statfs.data_compressed_allocated);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(5, statfs_pool.data_stored);
     ASSERT_EQ(0x10000, statfs_pool.allocated);
@@ -1410,7 +1517,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(0, statfs_pool.data_compressed_allocated);
 
     // accessing unknown pool
-    r = store->pool_statfs(poolid + 1, &statfs_pool);
+    r = store->pool_statfs(poolid + 1, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(0, statfs_pool.data_stored);
     ASSERT_EQ(0, statfs_pool.allocated);
@@ -1445,7 +1552,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(statfs.data_compressed_allocated, 0x10000);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(0x30005, statfs_pool.data_stored);
     ASSERT_EQ(0x30000, statfs_pool.allocated);
@@ -1477,7 +1585,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(statfs.data_compressed_allocated, 0x10000);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(0x30005 - 3 - 9, statfs_pool.data_stored);
     ASSERT_EQ(0x30000, statfs_pool.allocated);
@@ -1512,7 +1621,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(statfs.data_compressed_allocated, 0x10000);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(0x30001 - 9 + 0x1000, statfs_pool.data_stored);
     ASSERT_EQ(0x40000, statfs_pool.allocated);
@@ -1548,7 +1658,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(0, statfs.data_compressed_allocated);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(0x30000 + 0x1001, statfs_pool.data_stored);
     ASSERT_EQ(0x40000, statfs_pool.allocated);
@@ -1578,7 +1689,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(0u, statfs.data_compressed_allocated);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(0u, statfs_pool.allocated);
     ASSERT_EQ(0u, statfs_pool.data_stored);
@@ -1614,7 +1726,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(0x10000, statfs.data_compressed_allocated);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(0x40000 - 2, statfs_pool.data_stored);
     ASSERT_EQ(0x30000, statfs_pool.allocated);
@@ -1634,7 +1747,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(r, 0);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
 
     ObjectStore::Transaction t;
@@ -1652,7 +1766,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ(statfs2.data_compressed_allocated, statfs.data_compressed_allocated);
 
     struct store_statfs_t statfs2_pool;
-    r = store->pool_statfs(poolid, &statfs2_pool);
+    r = store->pool_statfs(poolid, &statfs2_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_GT(statfs2_pool.data_stored, statfs_pool.data_stored);
     ASSERT_EQ(statfs2_pool.allocated, statfs_pool.allocated);
@@ -1677,7 +1791,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     {
 
       struct store_statfs_t statfs1_pool;
-      int r = store->pool_statfs(poolid, &statfs1_pool);
+      bool per_pool_omap;
+      int r = store->pool_statfs(poolid, &statfs1_pool, &per_pool_omap);
       ASSERT_EQ(r, 0);
 
       cerr << "Creating second collection " << cid2 << std::endl;
@@ -1694,7 +1809,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
       ASSERT_EQ(r, 0);
 
       struct store_statfs_t statfs2_pool;
-      r = store->pool_statfs(poolid2, &statfs2_pool);
+      r = store->pool_statfs(poolid2, &statfs2_pool, &per_pool_omap);
       ASSERT_EQ(r, 0);
       ASSERT_EQ(5, statfs2_pool.data_stored);
       ASSERT_EQ(0x10000, statfs2_pool.allocated);
@@ -1703,7 +1818,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
       ASSERT_EQ(0, statfs2_pool.data_compressed_allocated);
 
       struct store_statfs_t statfs1_pool_again;
-      r = store->pool_statfs(poolid, &statfs1_pool_again);
+      r = store->pool_statfs(poolid, &statfs1_pool_again, &per_pool_omap);
       ASSERT_EQ(r, 0);
       // adjust 'available' since it has changed
       statfs1_pool_again.available = statfs1_pool.available;
@@ -1733,7 +1848,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     auto ch3 = store->create_new_collection(cid3);
     {
       struct store_statfs_t statfs1_pool;
-      int r = store->pool_statfs(poolid, &statfs1_pool);
+      bool per_pool_omap;
+      int r = store->pool_statfs(poolid, &statfs1_pool, &per_pool_omap);
       ASSERT_EQ(r, 0);
 
       cerr << "Creating third collection " << cid3 << std::endl;
@@ -1750,7 +1866,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
       ASSERT_EQ(r, 0);
 
       struct store_statfs_t statfs3_pool;
-      r = store->pool_statfs(poolid3, &statfs3_pool);
+      r = store->pool_statfs(poolid3, &statfs3_pool, &per_pool_omap);
       ASSERT_EQ(r, 0);
       ASSERT_EQ(5, statfs3_pool.data_stored);
       ASSERT_EQ(0x10000, statfs3_pool.allocated);
@@ -1759,7 +1875,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
       ASSERT_EQ(0, statfs3_pool.data_compressed_allocated);
 
       struct store_statfs_t statfs1_pool_again;
-      r = store->pool_statfs(poolid, &statfs1_pool_again);
+      r = store->pool_statfs(poolid, &statfs1_pool_again, &per_pool_omap);
       ASSERT_EQ(r, 0);
       // adjust 'available' since it has changed
       statfs1_pool_again.available = statfs1_pool.available;
@@ -1781,7 +1897,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
       ASSERT_EQ(r, 0);
 
       struct store_statfs_t statfs3_pool_again;
-      r = store->pool_statfs(poolid3, &statfs3_pool_again);
+      r = store->pool_statfs(poolid3, &statfs3_pool_again, &per_pool_omap);
       ASSERT_EQ(r, 0);
       ASSERT_EQ(statfs3_pool_again, statfs3_pool);
 
@@ -1821,7 +1937,8 @@ TEST_P(StoreTestSpecificAUSize, BluestoreStatFSTest) {
     ASSERT_EQ( 0u, statfs.data_compressed_allocated);
 
     struct store_statfs_t statfs_pool;
-    r = store->pool_statfs(poolid, &statfs_pool);
+    bool per_pool_omap;
+    r = store->pool_statfs(poolid, &statfs_pool, &per_pool_omap);
     ASSERT_EQ(r, 0);
     ASSERT_EQ( 0u, statfs_pool.allocated);
     ASSERT_EQ( 0u, statfs_pool.data_stored);
@@ -2730,9 +2847,8 @@ TEST_P(StoreTest, SimpleListTest) {
     vector<ghobject_t> objects;
     ghobject_t next, current;
     while (!next.is_max()) {
-      int r = store->collection_list(ch, current, ghobject_t::get_max(),
-                                    50,
-                                    &objects, &next);
+      int r = collection_list(store, ch, current, ghobject_t::get_max(), 50,
+                              &objects, &next);
       ASSERT_EQ(r, 0);
       ASSERT_TRUE(sorted(objects));
       cout << " got " << objects.size() << " next " << next << std::endl;
@@ -2795,8 +2911,7 @@ TEST_P(StoreTest, ListEndTest) {
     end.hobj.pool = 1;
     vector<ghobject_t> objects;
     ghobject_t next;
-    int r = store->collection_list(ch, ghobject_t(), end, 500,
-                                  &objects, &next);
+    int r = collection_list(store, ch, ghobject_t(), end, 500, &objects, &next);
     ASSERT_EQ(r, 0);
     for (auto &p : objects) {
       ASSERT_NE(p, end);
@@ -2875,8 +2990,8 @@ TEST_P(StoreTest, MultipoolListTest) {
     vector<ghobject_t> objects;
     ghobject_t next, current;
     while (!next.is_max()) {
-      int r = store->collection_list(ch, current, ghobject_t::get_max(), 50,
-                                    &objects, &next);
+      int r = collection_list(store, ch, current, ghobject_t::get_max(), 50,
+                              &objects, &next);
       ASSERT_EQ(r, 0);
       cout << " got " << objects.size() << " next " << next << std::endl;
       for (vector<ghobject_t>::iterator p = objects.begin(); p != objects.end();
@@ -2902,6 +3017,9 @@ TEST_P(StoreTest, MultipoolListTest) {
 TEST_P(StoreTest, SimpleCloneTest) {
   int r;
   coll_t cid;
+
+  SetDeathTestStyle("threadsafe");
+
   auto ch = store->create_new_collection(cid);
   {
     ObjectStore::Transaction t;
@@ -3174,13 +3292,7 @@ TEST_P(StoreTest, SimpleCloneTest) {
     ASSERT_TRUE(bl_eq(rl, final));
   }
 
-  //Unfortunately we need a workaround for filestore since EXPECT_DEATH
-  // macro has potential issues when using /in multithread environments. 
-  //It works well for all stores but filestore for now. 
-  //A fix setting gtest_death_test_style = "threadsafe" doesn't help as well - 
-  //  test app clone asserts on store folder presence.
-  //
-  if (string(GetParam()) != "filestore") { 
+  {
     //verify if non-empty collection is properly handled after store reload
     ch.reset();
     r = store->umount();
@@ -3201,8 +3313,7 @@ TEST_P(StoreTest, SimpleCloneTest) {
     r = queue_transaction(store, ch, std::move(t));
     ASSERT_EQ(r, 0);
   }
-  //See comment above for "filestore" check explanation.
-  if (string(GetParam()) != "filestore") {
+  {
     ObjectStore::Transaction t;
     //verify if non-empty collection is properly handled when there are some pending removes and live records in db
     cerr << "Invalid rm coll again" << std::endl;
@@ -3542,7 +3653,8 @@ TEST_P(StoreTest, ManyObjectTest) {
 
   set<ghobject_t> listed, listed2;
   vector<ghobject_t> objects;
-  r = store->collection_list(ch, ghobject_t(), ghobject_t::get_max(), INT_MAX, &objects, 0);
+  r = collection_list(store, ch, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+                      &objects, 0);
   ASSERT_EQ(r, 0);
 
   cerr << "objects.size() is " << objects.size() << std::endl;
@@ -3556,7 +3668,8 @@ TEST_P(StoreTest, ManyObjectTest) {
 
   ghobject_t start, next;
   objects.clear();
-  r = store->collection_list(
+  r = collection_list(
+    store,
     ch,
     ghobject_t::get_max(),
     ghobject_t::get_max(),
@@ -3571,10 +3684,8 @@ TEST_P(StoreTest, ManyObjectTest) {
   listed.clear();
   ghobject_t start2, next2;
   while (1) {
-    r = store->collection_list(ch, start, ghobject_t::get_max(),
-                              50,
-                              &objects,
-                              &next);
+    r = collection_list(store, ch, start, ghobject_t::get_max(), 50, &objects,
+                        &next);
     ASSERT_TRUE(sorted(objects));
     ASSERT_EQ(r, 0);
     listed.insert(objects.begin(), objects.end());
@@ -3662,14 +3773,15 @@ public:
   unsigned in_flight;
   map<ghobject_t, Object> contents;
   set<ghobject_t> available_objects;
+  set<ghobject_t>::iterator next_available_object;
   set<ghobject_t> in_flight_objects;
   ObjectGenerator *object_gen;
   gen_type *rng;
   ObjectStore *store;
   ObjectStore::CollectionHandle ch;
 
-  Mutex lock;
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("State lock");
+  ceph::condition_variable cond;
 
   struct EnterExit {
     const char *msg;
@@ -3689,7 +3801,7 @@ public:
       : state(state), hoid(hoid) {}
 
     void finish(int r) override {
-      Mutex::Locker locker(state->lock);
+      std::lock_guard locker{state->lock};
       EnterExit ee("onreadable finish");
       ASSERT_TRUE(state->in_flight_objects.count(hoid));
       ASSERT_EQ(r, 0);
@@ -3697,12 +3809,12 @@ public:
       if (state->contents.count(hoid))
         state->available_objects.insert(hoid);
       --(state->in_flight);
-      state->cond.Signal();
+      state->cond.notify_all();
 
       bufferlist r2;
       r = state->store->read(state->ch, hoid, 0, state->contents[hoid].data.length(), r2);
       ceph_assert(bl_eq(state->contents[hoid].data, r2));
-      state->cond.Signal();
+      state->cond.notify_all();
     }
   };
 
@@ -3716,7 +3828,7 @@ public:
       : state(state), oid(oid), noid(noid) {}
 
     void finish(int r) override {
-      Mutex::Locker locker(state->lock);
+      std::lock_guard locker{state->lock};
       EnterExit ee("stash finish");
       ASSERT_TRUE(state->in_flight_objects.count(oid));
       ASSERT_EQ(r, 0);
@@ -3729,7 +3841,7 @@ public:
        state->ch, noid, 0,
        state->contents[noid].data.length(), r2);
       ceph_assert(bl_eq(state->contents[noid].data, r2));
-      state->cond.Signal();
+      state->cond.notify_all();
     }
   };
 
@@ -3743,7 +3855,7 @@ public:
       : state(state), oid(oid), noid(noid) {}
 
     void finish(int r) override {
-      Mutex::Locker locker(state->lock);
+      std::lock_guard locker{state->lock};
       EnterExit ee("clone finish");
       ASSERT_TRUE(state->in_flight_objects.count(oid));
       ASSERT_EQ(r, 0);
@@ -3756,7 +3868,7 @@ public:
       bufferlist r2;
       r = state->store->read(state->ch, noid, 0, state->contents[noid].data.length(), r2);
       ceph_assert(bl_eq(state->contents[noid].data, r2));
-      state->cond.Signal();
+      state->cond.notify_all();
     }
   };
 
@@ -3786,9 +3898,9 @@ public:
                         unsigned max_write,
                         unsigned alignment)
     : cid(cid), write_alignment(alignment), max_object_len(max_size),
-      max_write_len(max_write), in_flight(0), object_gen(gen),
-      rng(rng), store(store),
-      lock("State lock") {}
+      max_write_len(max_write), in_flight(0),
+      next_available_object(available_objects.end()),
+      object_gen(gen), rng(rng), store(store) {}
 
   int init() {
     ObjectStore::Transaction t;
@@ -3797,17 +3909,19 @@ public:
     return queue_transaction(store, ch, std::move(t));
   }
   void shutdown() {
+    ghobject_t next;
     while (1) {
       vector<ghobject_t> objects;
-      int r = store->collection_list(ch, ghobject_t(), ghobject_t::get_max(),
-                                    10, &objects, 0);
+      int r = collection_list(store, ch, next, ghobject_t::get_max(), 10,
+                              &objects, &next);
       ceph_assert(r >= 0);
-      if (objects.empty())
-       break;
+      if (objects.size() == 0)
+        break;
       ObjectStore::Transaction t;
+      std::map<std::string, ceph::buffer::list> attrset;
       for (vector<ghobject_t>::iterator p = objects.begin();
-          p != objects.end(); ++p) {
-       t.remove(cid, *p);
+           p != objects.end(); ++p) {
+        t.remove(cid, *p);
       }
       queue_transaction(store, ch, std::move(t));
     }
@@ -3819,9 +3933,10 @@ public:
     store->statfs(&stat);
   }
 
-  ghobject_t get_uniform_random_object() {
-    while (in_flight >= max_in_flight || available_objects.empty())
-      cond.Wait(lock);
+  ghobject_t get_uniform_random_object(std::unique_lock<ceph::mutex>& locker) {
+    cond.wait(locker, [this] {
+      return in_flight < max_in_flight && !available_objects.empty();
+    });
     boost::uniform_int<> choose(0, available_objects.size() - 1);
     int index = choose(*rng);
     set<ghobject_t>::iterator i = available_objects.begin();
@@ -3830,15 +3945,27 @@ public:
     return ret;
   }
 
-  void wait_for_ready() {
-    while (in_flight >= max_in_flight)
-      cond.Wait(lock);
+  ghobject_t get_next_object(std::unique_lock<ceph::mutex>& locker) {
+    cond.wait(locker, [this] {
+      return in_flight < max_in_flight && !available_objects.empty();
+      });
+
+    if (next_available_object == available_objects.end()) {
+      next_available_object = available_objects.begin();
+    }
+
+    ghobject_t ret = *next_available_object;
+    ++next_available_object;
+    return ret;
+  }
+
+  void wait_for_ready(std::unique_lock<ceph::mutex>& locker) {
+    cond.wait(locker, [this] { return in_flight < max_in_flight; });
   }
 
   void wait_for_done() {
-    Mutex::Locker locker(lock);
-    while (in_flight)
-      cond.Wait(lock);
+    std::unique_lock locker{lock};
+    cond.wait(locker, [this] { return in_flight == 0; });
   }
 
   bool can_create() {
@@ -3904,11 +4031,11 @@ public:
   }
 
   int touch() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("touch");
     if (!can_create())
       return -ENOSPC;
-    wait_for_ready();
+    wait_for_ready(locker);
     ghobject_t new_obj = object_gen->create_object(rng);
     available_objects.erase(new_obj);
     ObjectStore::Transaction t;
@@ -3929,18 +4056,18 @@ public:
   }
 
   int stash() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("stash");
     if (!can_unlink())
       return -ENOENT;
     if (!can_create())
       return -ENOSPC;
-    wait_for_ready();
+    wait_for_ready(locker);
 
     ghobject_t old_obj;
     int max = 20;
     do {
-      old_obj = get_uniform_random_object();
+      old_obj = get_uniform_random_object(locker);
     } while (--max && !contents[old_obj].data.length());
     available_objects.erase(old_obj);
     ghobject_t new_obj = old_obj;
@@ -3961,18 +4088,18 @@ public:
   }
 
   int clone() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("clone");
     if (!can_unlink())
       return -ENOENT;
     if (!can_create())
       return -ENOSPC;
-    wait_for_ready();
+    wait_for_ready(locker);
 
     ghobject_t old_obj;
     int max = 20;
     do {
-      old_obj = get_uniform_random_object();
+      old_obj = get_uniform_random_object(locker);
     } while (--max && !contents[old_obj].data.length());
     available_objects.erase(old_obj);
     ghobject_t new_obj = object_gen->create_object(rng);
@@ -3994,25 +4121,25 @@ public:
   }
 
   int clone_range() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("clone_range");
     if (!can_unlink())
       return -ENOENT;
     if (!can_create())
       return -ENOSPC;
-    wait_for_ready();
+    wait_for_ready(locker);
 
     ghobject_t old_obj;
     int max = 20;
     do {
-      old_obj = get_uniform_random_object();
+      old_obj = get_uniform_random_object(locker);
     } while (--max && !contents[old_obj].data.length());
     bufferlist &srcdata = contents[old_obj].data;
     if (srcdata.length() == 0) {
       return 0;
     }
     available_objects.erase(old_obj);
-    ghobject_t new_obj = get_uniform_random_object();
+    ghobject_t new_obj = get_uniform_random_object(locker);
     available_objects.erase(new_obj);
 
     boost::uniform_int<> u1(0, max_object_len - max_write_len);
@@ -4061,11 +4188,11 @@ public:
     } else {
       bufferlist value;
       ceph_assert(dstdata.length() > dstoff);
-      dstdata.copy(0, dstoff, value);
+      dstdata.cbegin().copy(dstoff, value);
       value.append(bl);
       if (value.length() < dstdata.length())
-        dstdata.copy(value.length(),
-                    dstdata.length() - value.length(), value);
+        dstdata.cbegin(value.length()).copy(
+          dstdata.length() - value.length(), value);
       value.swap(dstdata);
     }
 
@@ -4076,13 +4203,13 @@ public:
 
 
   int write() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("write");
     if (!can_unlink())
       return -ENOENT;
-    wait_for_ready();
+    wait_for_ready(locker);
 
-    ghobject_t new_obj = get_uniform_random_object();
+    ghobject_t new_obj = get_uniform_random_object(locker);
     available_objects.erase(new_obj);
     ObjectStore::Transaction t;
 
@@ -4107,11 +4234,11 @@ public:
     } else {
       bufferlist value;
       ceph_assert(data.length() > offset);
-      data.copy(0, offset, value);
+      data.cbegin().copy(offset, value);
       value.append(bl);
       if (value.length() < data.length())
-        data.copy(value.length(),
-                 data.length()-value.length(), value);
+        data.cbegin(value.length()).copy(
+          data.length()-value.length(), value);
       value.swap(data);
     }
 
@@ -4124,13 +4251,13 @@ public:
   }
 
   int truncate() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("truncate");
     if (!can_unlink())
       return -ENOENT;
-    wait_for_ready();
+    wait_for_ready(locker);
 
-    ghobject_t obj = get_uniform_random_object();
+    ghobject_t obj = get_uniform_random_object(locker);
     available_objects.erase(obj);
     ObjectStore::Transaction t;
 
@@ -4148,7 +4275,7 @@ public:
       data.append_zero(len - data.length());
     } else {
       bufferlist bl;
-      data.copy(0, len, bl);
+      data.cbegin().copy(len, bl);
       bl.swap(data);
     }
 
@@ -4158,13 +4285,13 @@ public:
   }
 
   int zero() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("zero");
     if (!can_unlink())
       return -ENOENT;
-    wait_for_ready();
+    wait_for_ready(locker);
 
-    ghobject_t new_obj = get_uniform_random_object();
+    ghobject_t new_obj = get_uniform_random_object(locker);
     available_objects.erase(new_obj);
     ObjectStore::Transaction t;
 
@@ -4186,7 +4313,7 @@ public:
       n.substr_of(data, 0, offset);
       n.append_zero(len);
       if (data.length() > offset + len)
-       data.copy(offset + len, data.length() - offset - len, n);
+       data.cbegin(offset + len).copy(data.length() - offset - len, n);
       data.swap(n);
     }
 
@@ -4211,13 +4338,13 @@ public:
     bufferlist expected;
     int r;
     {
-      Mutex::Locker locker(lock);
+      std::unique_lock locker{lock};
       EnterExit ee("read locked");
       if (!can_unlink())
         return ;
-      wait_for_ready();
+      wait_for_ready(locker);
 
-      obj = get_uniform_random_object();
+      obj = get_uniform_random_object(locker);
       expected = contents[obj].data;
     }
     bufferlist bl, result;
@@ -4234,20 +4361,20 @@ public:
         len = max_len;
       ceph_assert(len == result.length());
       ASSERT_EQ(len, result.length());
-      expected.copy(offset, len, bl);
+      expected.cbegin(offset).copy(len, bl);
       ASSERT_EQ(r, (int)len);
       ASSERT_TRUE(bl_eq(bl, result));
     }
   }
 
   int setattrs() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("setattrs");
     if (!can_unlink())
       return -ENOENT;
-    wait_for_ready();
+    wait_for_ready(locker);
 
-    ghobject_t obj = get_uniform_random_object();
+    ghobject_t obj = get_uniform_random_object(locker);
     available_objects.erase(obj);
     ObjectStore::Transaction t;
 
@@ -4288,20 +4415,49 @@ public:
     return status;
   }
 
+  int set_fixed_attrs(size_t entries, size_t key_size, size_t val_size) {
+    std::unique_lock locker{ lock };
+    EnterExit ee("setattrs");
+    if (!can_unlink())
+      return -ENOENT;
+    wait_for_ready(locker);
+
+    ghobject_t obj = get_next_object(locker);
+    available_objects.erase(obj);
+    ObjectStore::Transaction t;
+
+    map<string, bufferlist> attrs;
+    set<string> keys;
+
+    while (entries--) {
+      bufferlist name, value;
+      filled_byte_array(value, val_size);
+      filled_byte_array(name, key_size);
+      attrs[name.c_str()] = value;
+      contents[obj].attrs[name.c_str()] = value;
+    }
+    t.setattrs(cid, obj, attrs);
+    ++in_flight;
+    in_flight_objects.insert(obj);
+    t.register_on_applied(new C_SyntheticOnReadable(this, obj));
+    int status = store->queue_transaction(ch, std::move(t));
+    return status;
+  }
+
   void getattrs() {
     EnterExit ee("getattrs");
     ghobject_t obj;
     map<string, bufferlist> expected;
     {
-      Mutex::Locker locker(lock);
+      std::unique_lock locker{lock};
       EnterExit ee("getattrs locked");
       if (!can_unlink())
         return ;
-      wait_for_ready();
+      wait_for_ready(locker);
 
       int retry = 10;
       do {
-        obj = get_uniform_random_object();
+        obj = get_uniform_random_object(locker);
         if (!--retry)
           return ;
       } while (contents[obj].attrs.empty());
@@ -4324,15 +4480,15 @@ public:
     int retry;
     map<string, bufferlist> expected;
     {
-      Mutex::Locker locker(lock);
+      std::unique_lock locker{lock};
       EnterExit ee("getattr locked");
       if (!can_unlink())
         return ;
-      wait_for_ready();
+      wait_for_ready(locker);
 
       retry = 10;
       do {
-        obj = get_uniform_random_object();
+        obj = get_uniform_random_object(locker);
         if (!--retry)
           return ;
       } while (contents[obj].attrs.empty());
@@ -4353,16 +4509,16 @@ public:
   }
 
   int rmattr() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("rmattr");
     if (!can_unlink())
       return -ENOENT;
-    wait_for_ready();
+    wait_for_ready(locker);
 
     ghobject_t obj;
     int retry = 10;
     do {
-      obj = get_uniform_random_object();
+      obj = get_uniform_random_object(locker);
       if (!--retry)
         return 0;
     } while (contents[obj].attrs.empty());
@@ -4388,10 +4544,9 @@ public:
   }
 
   void fsck(bool deep) {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("fsck");
-    while (in_flight)
-      cond.Wait(lock);
+    cond.wait(locker, [this] { return in_flight == 0; });
     ch.reset();
     store->umount();
     int r = store->fsck(deep);
@@ -4401,17 +4556,16 @@ public:
   }
 
   void scan() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("scan");
-    while (in_flight)
-      cond.Wait(lock);
+    cond.wait(locker, [this] { return in_flight == 0; });
     vector<ghobject_t> objects;
     set<ghobject_t> objects_set, objects_set2;
     ghobject_t next, current;
     while (1) {
       //cerr << "scanning..." << std::endl;
-      int r = store->collection_list(ch, current, ghobject_t::get_max(), 100,
-                                    &objects, &next);
+      int r = collection_list(store, ch, current, ghobject_t::get_max(), 100,
+                              &objects, &next);
       ASSERT_EQ(r, 0);
       ASSERT_TRUE(sorted(objects));
       objects_set.insert(objects.begin(), objects.end());
@@ -4444,8 +4598,8 @@ public:
       ASSERT_GT(available_objects.count(*i), (unsigned)0);
     }
 
-    int r = store->collection_list(ch, ghobject_t(), ghobject_t::get_max(),
-                                  INT_MAX, &objects, 0);
+    int r = collection_list(store, ch, ghobject_t(), ghobject_t::get_max(),
+                            INT_MAX, &objects, 0);
     ASSERT_EQ(r, 0);
     objects_set2.insert(objects.begin(), objects.end());
     ASSERT_EQ(objects_set2.size(), available_objects.size());
@@ -4464,11 +4618,11 @@ public:
     ghobject_t hoid;
     uint64_t expected;
     {
-      Mutex::Locker locker(lock);
+      std::unique_lock locker{lock};
       EnterExit ee("stat lock1");
       if (!can_unlink())
         return ;
-      hoid = get_uniform_random_object();
+      hoid = get_uniform_random_object(locker);
       in_flight_objects.insert(hoid);
       available_objects.erase(hoid);
       ++in_flight;
@@ -4480,21 +4634,21 @@ public:
     ceph_assert((uint64_t)buf.st_size == expected);
     ASSERT_TRUE((uint64_t)buf.st_size == expected);
     {
-      Mutex::Locker locker(lock);
+      std::lock_guard locker{lock};
       EnterExit ee("stat lock2");
       --in_flight;
-      cond.Signal();
+      cond.notify_all();
       in_flight_objects.erase(hoid);
       available_objects.insert(hoid);
     }
   }
 
   int unlink() {
-    Mutex::Locker locker(lock);
+    std::unique_lock locker{lock};
     EnterExit ee("unlink");
     if (!can_unlink())
       return -ENOENT;
-    ghobject_t to_remove = get_uniform_random_object();
+    ghobject_t to_remove = get_uniform_random_object(locker);
     ObjectStore::Transaction t;
     t.remove(cid, to_remove);
     ++in_flight;
@@ -4507,7 +4661,7 @@ public:
   }
 
   void print_internal_state() {
-    Mutex::Locker locker(lock);
+    std::lock_guard locker{lock};
     cerr << "available_objects: " << available_objects.size()
         << " in_flight_objects: " << in_flight_objects.size()
         << " total objects: " << in_flight_objects.size() + available_objects.size()
@@ -4576,6 +4730,7 @@ TEST_P(StoreTest, Synthetic) {
   doSyntheticTest(10000, 400*1024, 40*1024, 0);
 }
 
+#if defined(WITH_BLUESTORE)
 TEST_P(StoreTestSpecificAUSize, BlueFSExtenderTest) {
   if(string(GetParam()) != "bluestore")
     return;
@@ -4611,7 +4766,6 @@ TEST_P(StoreTestSpecificAUSize, BlueFSExtenderTest) {
   bstore->mount();
 }
 
-#if defined(WITH_BLUESTORE)
 TEST_P(StoreTestSpecificAUSize, SyntheticMatrixSharding) {
   if (string(GetParam()) != "bluestore")
     return;
@@ -4859,7 +5013,8 @@ TEST_P(StoreTest, HashCollisionTest) {
   }
   }
   vector<ghobject_t> objects;
-  r = store->collection_list(ch, ghobject_t(), ghobject_t::get_max(), INT_MAX, &objects, 0);
+  r = collection_list(store, ch, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+                      &objects, 0);
   ASSERT_EQ(r, 0);
   set<ghobject_t> listed(objects.begin(), objects.end());
   cerr << "listed.size() is " << listed.size() << " and created.size() is " << created.size() << std::endl;
@@ -4868,8 +5023,8 @@ TEST_P(StoreTest, HashCollisionTest) {
   listed.clear();
   ghobject_t current, next;
   while (1) {
-    r = store->collection_list(ch, current, ghobject_t::get_max(), 60,
-                              &objects, &next);
+    r = collection_list(store, ch, current, ghobject_t::get_max(), 60, &objects,
+                        &next);
     ASSERT_EQ(r, 0);
     ASSERT_TRUE(sorted(objects));
     for (vector<ghobject_t>::iterator i = objects.begin();
@@ -4908,6 +5063,118 @@ TEST_P(StoreTest, HashCollisionTest) {
   ASSERT_EQ(r, 0);
 }
 
+TEST_P(StoreTest, HashCollisionSorting) {
+  bool disable_legacy = (string(GetParam()) == "bluestore");
+
+  char buf121664318_1[] = {18, -119, -121, -111, 0};
+  char buf121664318_2[] = {19, 127, -121, 32, 0};
+  char buf121664318_3[] = {19, -118, 15, 19, 0};
+  char buf121664318_4[] = {28, 27, -116, -113, 0};
+  char buf121664318_5[] = {28, 27, -115, -124, 0};
+
+  char buf121666222_1[] = {18, -119, -120, -111, 0};
+  char buf121666222_2[] = {19, 127, -120, 32, 0};
+  char buf121666222_3[] = {19, -118, 15, 30, 0};
+  char buf121666222_4[] = {29, 17, -126, -113, 0};
+  char buf121666222_5[] = {29, 17, -125, -124, 0};
+
+  std::map<uint32_t, std::vector<std::string>> object_names = {
+    {121664318, {{buf121664318_1},
+                 {buf121664318_2},
+                 {buf121664318_3},
+                 {buf121664318_4},
+                 {buf121664318_5}}},
+    {121666222, {{buf121666222_1},
+                 {buf121666222_2},
+                 {buf121666222_3},
+                 {buf121666222_4},
+                 {buf121666222_5}}}};
+
+  int64_t poolid = 111;
+  coll_t cid = coll_t(spg_t(pg_t(0, poolid), shard_id_t::NO_SHARD));
+  auto ch = store->create_new_collection(cid);
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    int r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+
+  std::set<ghobject_t> created;
+  for (auto &[hash, names] : object_names) {
+    for (auto &name : names) {
+      ghobject_t hoid(hobject_t(sobject_t(name, CEPH_NOSNAP),
+                                string(),
+                                hash,
+                                poolid,
+                                string()));
+      ASSERT_EQ(hash, hoid.hobj.get_hash());
+      ObjectStore::Transaction t;
+      t.touch(cid, hoid);
+      int r = queue_transaction(store, ch, std::move(t));
+      ASSERT_EQ(r, 0);
+      created.insert(hoid);
+    }
+  }
+
+  vector<ghobject_t> objects;
+  int r = collection_list(store, ch, ghobject_t(), ghobject_t::get_max(),
+                          INT_MAX, &objects, 0, disable_legacy);
+  ASSERT_EQ(r, 0);
+  ASSERT_EQ(created.size(), objects.size());
+  auto it = objects.begin();
+  for (auto &hoid : created) {
+    ASSERT_EQ(hoid, *it);
+    it++;
+  }
+
+  for (auto i = created.begin(); i != created.end(); i++) {
+    auto j = i;
+    for (j++; j != created.end(); j++) {
+      std::set<ghobject_t> created_sub(i, j);
+      objects.clear();
+      ghobject_t next;
+      r = collection_list(store, ch, *i, ghobject_t::get_max(),
+                          created_sub.size(), &objects, &next, disable_legacy);
+      ASSERT_EQ(r, 0);
+      ASSERT_EQ(created_sub.size(), objects.size());
+      it = objects.begin();
+      for (auto &hoid : created_sub) {
+        ASSERT_EQ(hoid, *it);
+        it++;
+      }
+      if (j == created.end()) {
+        ASSERT_TRUE(next.is_max());
+      } else {
+        ASSERT_EQ(*j, next);
+      }
+    }
+  }
+
+  for (auto i = created.begin(); i != created.end(); i++) {
+    auto j = i;
+    for (j++; j != created.end(); j++) {
+      std::set<ghobject_t> created_sub(i, j);
+      objects.clear();
+      ghobject_t next;
+      r = collection_list(store, ch, *i, *j, INT_MAX, &objects, &next,
+                          disable_legacy);
+      ASSERT_EQ(r, 0);
+      ASSERT_EQ(created_sub.size(), objects.size());
+      it = objects.begin();
+      for (auto &hoid : created_sub) {
+        ASSERT_EQ(hoid, *it);
+        it++;
+      }
+      if (j == created.end()) {
+        ASSERT_TRUE(next.is_max());
+      } else {
+        ASSERT_EQ(*j, next);
+      }
+    }
+  }
+}
+
 TEST_P(StoreTest, ScrubTest) {
   int64_t poolid = 111;
   coll_t cid(spg_t(pg_t(0, poolid),shard_id_t(1)));
@@ -4957,8 +5224,8 @@ TEST_P(StoreTest, ScrubTest) {
   }
 
   vector<ghobject_t> objects;
-  r = store->collection_list(ch, ghobject_t(), ghobject_t::get_max(),
-                            INT_MAX, &objects, 0);
+  r = collection_list(store, ch, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+                      &objects, 0);
   ASSERT_EQ(r, 0);
   set<ghobject_t> listed(objects.begin(), objects.end());
   cerr << "listed.size() is " << listed.size() << " and created.size() is " << created.size() << std::endl;
@@ -4967,8 +5234,8 @@ TEST_P(StoreTest, ScrubTest) {
   listed.clear();
   ghobject_t current, next;
   while (1) {
-    r = store->collection_list(ch, current, ghobject_t::get_max(), 60,
-                              &objects, &next);
+    r = collection_list(store, ch, current, ghobject_t::get_max(), 60, &objects,
+                        &next);
     ASSERT_EQ(r, 0);
     ASSERT_TRUE(sorted(objects));
     for (vector<ghobject_t>::iterator i = objects.begin();
@@ -5090,9 +5357,7 @@ TEST_P(StoreTest, OMapTest) {
     }
 
     string to_remove = attrs.begin()->first;
-    set<string> keys_to_remove;
-    keys_to_remove.insert(to_remove);
-    t.omap_rmkeys(cid, hoid, keys_to_remove);
+    t.omap_rmkey(cid, hoid, to_remove);
     r = queue_transaction(store, ch, std::move(t));
     ASSERT_EQ(r, 0);
 
@@ -5434,8 +5699,8 @@ void colsplittest(
 
   // check
   vector<ghobject_t> objects;
-  r = store->collection_list(ch, ghobject_t(), ghobject_t::get_max(),
-                            INT_MAX, &objects, 0);
+  r = collection_list(store, ch, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+                      &objects, 0);
   ASSERT_EQ(r, 0);
   ASSERT_EQ(objects.size(), num_objects);
   for (vector<ghobject_t>::iterator i = objects.begin();
@@ -5445,8 +5710,8 @@ void colsplittest(
   }
 
   objects.clear();
-  r = store->collection_list(tch, ghobject_t(), ghobject_t::get_max(),
-                            INT_MAX, &objects, 0);
+  r = collection_list(store, tch, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+                      &objects, 0);
   ASSERT_EQ(r, 0);
   ASSERT_EQ(objects.size(), num_objects);
   for (vector<ghobject_t>::iterator i = objects.begin();
@@ -5467,8 +5732,8 @@ void colsplittest(
   ObjectStore::Transaction t;
   {
     vector<ghobject_t> objects;
-    r = store->collection_list(ch, ghobject_t(), ghobject_t::get_max(),
-                              INT_MAX, &objects, 0);
+    r = collection_list(store, ch, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+                        &objects, 0);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(objects.size(), num_objects * 2); // both halves
     unsigned size = 0;
@@ -5605,8 +5870,8 @@ void test_merge_skewed(ObjectStore *store,
   // verify
   {
     vector<ghobject_t> got;
-    store->collection_list(cha, ghobject_t(), ghobject_t::get_max(), INT_MAX,
-                          &got, 0);
+    collection_list(store, cha, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+                    &got, 0);
     set<ghobject_t> gotset;
     for (auto& o : got) {
       ASSERT_TRUE(aobjects.count(o) || bobjects.count(o));
@@ -5914,8 +6179,8 @@ TEST_P(StoreTest, BigRGWObjectName) {
 
   {
     vector<ghobject_t> objects;
-    r = store->collection_list(ch, ghobject_t(), ghobject_t::get_max(),
-                              INT_MAX, &objects, 0);
+    r = collection_list(store, ch, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+                        &objects, 0);
     ASSERT_EQ(r, 0);
     ASSERT_EQ(objects.size(), 1u);
     ASSERT_EQ(objects[0], oid2);
@@ -6188,7 +6453,7 @@ TEST_P(StoreTest, BluestoreOnOffCSumTest) {
 }
 #endif
 
-INSTANTIATE_TEST_CASE_P(
+INSTANTIATE_TEST_SUITE_P(
   ObjectStore,
   StoreTest,
   ::testing::Values(
@@ -6200,7 +6465,7 @@ INSTANTIATE_TEST_CASE_P(
     "kstore"));
 
 // Note: instantiate all stores to preserve store numbering order only
-INSTANTIATE_TEST_CASE_P(
+INSTANTIATE_TEST_SUITE_P(
   ObjectStore,
   StoreTestSpecificAUSize,
   ::testing::Values(
@@ -6211,18 +6476,6 @@ INSTANTIATE_TEST_CASE_P(
 #endif
     "kstore"));
 
-#else
-
-// Google Test may not support value-parameterized tests with some
-// compilers. If we use conditional compilation to compile out all
-// code referring to the gtest_main library, MSVC linker will not link
-// that library at all and consequently complain about missing entry
-// point defined in that library (fatal error LNK1561: entry point
-// must be defined). This dummy test keeps gtest_main linked in.
-TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
-
-#endif
-
 void doMany4KWritesTest(boost::scoped_ptr<ObjectStore>& store,
                         unsigned max_objects,
                         unsigned max_ops,
@@ -6301,15 +6554,18 @@ TEST_P(StoreTestSpecificAUSize, TooManyBlobsTest) {
 #if defined(WITH_BLUESTORE)
 void get_mempool_stats(uint64_t* total_bytes, uint64_t* total_items)
 {
+  uint64_t meta_allocated = mempool::bluestore_cache_meta::allocated_bytes();
   uint64_t onode_allocated = mempool::bluestore_cache_onode::allocated_bytes();
   uint64_t other_allocated = mempool::bluestore_cache_other::allocated_bytes();
 
+  uint64_t meta_items = mempool::bluestore_cache_meta::allocated_items();
   uint64_t onode_items = mempool::bluestore_cache_onode::allocated_items();
   uint64_t other_items = mempool::bluestore_cache_other::allocated_items();
-  cout << "onode(" << onode_allocated << "/" << onode_items
+  cout << "meta(" << meta_allocated << "/" << meta_items
+       << ") onode(" << onode_allocated << "/" << onode_items
        << ") other(" << other_allocated << "/" << other_items
        << ")" << std::endl;
-  *total_bytes = onode_allocated + other_allocated;
+  *total_bytes = meta_allocated + onode_allocated + other_allocated;
   *total_items = onode_items;
 }
 
@@ -6930,106 +7186,6 @@ TEST_P(StoreTestSpecificAUSize, SmallWriteOnShardedExtents) {
   }
 }
 
-TEST_P(StoreTestSpecificAUSize, ExcessiveFragmentation) {
-  if (string(GetParam()) != "bluestore")
-    return;
-
-  SetVal(g_conf(), "bluestore_block_size",
-    stringify((uint64_t)2048 * 1024 * 1024).c_str());
-
-  ASSERT_EQ(g_conf().get_val<Option::size_t>("bluefs_alloc_size"),
-           1024 * 1024U);
-
-  size_t block_size = 0x10000;
-  StartDeferred(block_size);
-
-  int r;
-  coll_t cid;
-  ghobject_t hoid1(hobject_t(sobject_t("Object 1", CEPH_NOSNAP)));
-  ghobject_t hoid2(hobject_t(sobject_t("Object 2", CEPH_NOSNAP)));
-  auto ch = store->create_new_collection(cid);
-
-  {
-    ObjectStore::Transaction t;
-    t.create_collection(cid, 0);
-    r = queue_transaction(store, ch, std::move(t));
-    ASSERT_EQ(r, 0);
-  }
-  {
-    // create 2x400MB objects in a way that their pextents are interleaved
-    ObjectStore::Transaction t;
-    bufferlist bl;
-
-    bl.append(std::string(block_size * 4, 'a')); // 256KB
-    uint64_t offs = 0;
-    while(offs < (uint64_t)400 * 1024 * 1024) {
-      t.write(cid, hoid1, offs, bl.length(), bl, 0);
-      t.write(cid, hoid2, offs, bl.length(), bl, 0);
-      r = queue_transaction(store, ch, std::move(t));
-      ASSERT_EQ(r, 0);
-      offs += bl.length();
-      if( (offs % (100 * 1024 * 1024)) == 0) {
-       std::cout<<"written " << offs << std::endl;
-      }
-    }
-  }
-  std::cout<<"written 800MB"<<std::endl;
-  {
-    // Partially overwrite objects with 100MB each leaving space
-    // fragmented and occuping still unfragmented space at the end
-    // So we'll have enough free space but it'll lack long enough (e.g. 1MB)
-    // contiguous pextents.
-    ObjectStore::Transaction t;
-    bufferlist bl;
-
-    bl.append(std::string(block_size * 4, 'a'));
-    uint64_t offs = 0;
-    while(offs < 112 * 1024 * 1024) {
-      t.write(cid, hoid1, offs, bl.length(), bl, 0);
-      t.write(cid, hoid2, offs, bl.length(), bl, 0);
-      r = queue_transaction(store, ch, std::move(t));
-      ASSERT_EQ(r, 0);
-      // this will produce high fragmentation if original allocations
-      // were contiguous
-      offs += bl.length();
-      if( (offs % (10 * 1024 * 1024)) == 0) {
-       std::cout<<"written " << offs << std::endl;
-      }
-    }
-  }
-  {
-    // remove one of the object producing much free space
-    // and hence triggering bluefs rebalance.
-    // Which should fail as there is no long enough pextents.
-    ObjectStore::Transaction t;
-    t.remove(cid, hoid2);
-    r = queue_transaction(store, ch, std::move(t));
-    ASSERT_EQ(r, 0);
-  }
-
-  auto to_sleep = 5 *
-    (int)g_conf().get_val<double>("bluestore_bluefs_balance_interval");
-  std::cout<<"sleeping... " << std::endl;
-  sleep(to_sleep);
-
-  {
-    // touch another object to triggerrebalance
-    ObjectStore::Transaction t;
-    t.touch(cid, hoid1);
-    r = queue_transaction(store, ch, std::move(t));
-    ASSERT_EQ(r, 0);
-  }
-  {
-    ObjectStore::Transaction t;
-    t.remove(cid, hoid1);
-    t.remove(cid, hoid2);
-    t.remove_collection(cid);
-    cerr << "Cleaning" << std::endl;
-    r = queue_transaction(store, ch, std::move(t));
-    ASSERT_EQ(r, 0);
-  }
-}
-
 #endif //#if defined(WITH_BLUESTORE)
 
 TEST_P(StoreTest, KVDBHistogramTest) {
@@ -7322,7 +7478,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreRepairTest) {
   SetVal(g_conf(), "bluestore_max_blob_size", 
     stringify(2 * offs_base).c_str());
   SetVal(g_conf(), "bluestore_extent_map_shard_max_size", "12000");
-  SetVal(g_conf(), "bluestore_no_per_pool_stats_tolerance", "enforce");
+  SetVal(g_conf(), "bluestore_fsck_error_on_no_per_pool_stats", "false");
 
   StartDeferred(0x10000);
 
@@ -7396,7 +7552,7 @@ TEST_P(StoreTestSpecificAUSize, BluestoreRepairTest) {
   bstore->inject_statfs("bluestore_statfs", statfs);
   bstore->umount();
 
-  ASSERT_EQ(bstore->fsck(false), 1);
+  ASSERT_EQ(bstore->fsck(false), 2);
   ASSERT_EQ(bstore->repair(false), 0);
   ASSERT_EQ(bstore->fsck(false), 0);
   ASSERT_EQ(bstore->mount(), 0);
@@ -7462,34 +7618,251 @@ TEST_P(StoreTestSpecificAUSize, BluestoreRepairTest) {
     ASSERT_EQ(bstore->fsck(false), 0);
   }
 
-  // enable per-pool stats collection hence causing fsck to fail
-  cerr << "per-pool statfs" << std::endl;
-  SetVal(g_conf(), "bluestore_no_per_pool_stats_tolerance", "until_fsck");
-  g_ceph_context->_conf.apply_changes(nullptr);
+  cerr << "Zombie spanning blob" << std::endl;
+  {
+    bstore->mount();
+    ghobject_t hoid4 = make_object("Object 4", pool);
+    auto ch = store->open_collection(cid);
+    {
+      bufferlist bl;
+      string s(0x1000, 'a');
+      bl.append(s);
+      ObjectStore::Transaction t;
+      for(size_t i = 0; i < 0x10; i++) {
+              t.write(cid, hoid4, i * bl.length(), bl.length(), bl);
+      }
+      r = queue_transaction(store, ch, std::move(t));
+      ASSERT_EQ(r, 0);
+    }
+    sleep(5);
+    {
+      bstore->inject_zombie_spanning_blob(cid, hoid4, 12345);
+      bstore->inject_zombie_spanning_blob(cid, hoid4, 23456);
+      bstore->inject_zombie_spanning_blob(cid, hoid4, 23457);
+    }
 
-  ASSERT_EQ(bstore->fsck(false), 2);
-  ASSERT_EQ(bstore->repair(false), 0);
-  ASSERT_EQ(bstore->fsck(false), 0);
+    bstore->umount();
+    ASSERT_EQ(bstore->fsck(false), 1);
+    ASSERT_LE(bstore->repair(false), 0);
+    ASSERT_EQ(bstore->fsck(false), 0);
+  }
 
   cerr << "Completing" << std::endl;
   bstore->mount();
 
 }
 
-TEST_P(StoreTest, BluestoreStatistics) {
+TEST_P(StoreTestSpecificAUSize, BluestoreBrokenZombieRepairTest) {
   if (string(GetParam()) != "bluestore")
     return;
 
-  SetVal(g_conf(), "rocksdb_perf", "true");
-  SetVal(g_conf(), "rocksdb_collect_compaction_stats", "true");
-  SetVal(g_conf(), "rocksdb_collect_extended_stats","true");
-  SetVal(g_conf(), "rocksdb_collect_memory_stats","true");
+  SetVal(g_conf(), "bluestore_fsck_on_mount", "false");
+  SetVal(g_conf(), "bluestore_fsck_on_umount", "false");
 
-  // disable cache
-  SetVal(g_conf(), "bluestore_cache_size_ssd", "0");
-  SetVal(g_conf(), "bluestore_cache_size_hdd", "0");
-  SetVal(g_conf(), "bluestore_cache_size", "0");
-  g_ceph_context->_conf.apply_changes(nullptr);
+  StartDeferred(0x10000);
+
+  BlueStore* bstore = dynamic_cast<BlueStore*> (store.get());
+
+  int r;
+
+  cerr << "initializing" << std::endl;
+  {
+    const size_t col_count = 16;
+    const size_t obj_count = 1024;
+    ObjectStore::CollectionHandle ch[col_count];
+    ghobject_t hoid[col_count][obj_count];
+
+    unique_ptr<coll_t> cid[col_count];
+
+    for (size_t i = 0; i < col_count; i++) {
+      cid[i].reset(new coll_t(spg_t(pg_t(0, i), shard_id_t::NO_SHARD)));
+      ch[i] = store->create_new_collection(*cid[i]);
+      for (size_t j = 0; j < obj_count; j++) {
+       hoid[i][j] = make_object(stringify(j).c_str(), i);
+      }
+    }
+
+    for (size_t i = 0; i < col_count; i++) {
+      ObjectStore::Transaction t;
+      t.create_collection(*cid[i], 0);
+      r = queue_transaction(store, ch[i], std::move(t));
+      ASSERT_EQ(r, 0);
+    }
+    cerr << "onode preparing" << std::endl;
+    bufferlist bl;
+    string s(0x1000, 'a');
+    bl.append(s);
+
+    for (size_t i = 0; i < col_count; i++) {
+      for (size_t j = 0; j < obj_count; j++) {
+       ObjectStore::Transaction t;
+       t.write(*cid[i], hoid[i][j], bl.length(), bl.length(), bl);
+       r = queue_transaction(store, ch[i], std::move(t));
+       ASSERT_EQ(r, 0);
+      }
+    }
+    cerr << "Zombie spanning blob injection" << std::endl;
+
+    sleep(5);
+
+    for (size_t i = 0; i < col_count; i++) {
+      for (size_t j = 0; j < obj_count; j++) {
+       bstore->inject_zombie_spanning_blob(*cid[i], hoid[i][j], 12345);
+      }
+    }
+
+    cerr << "fscking/fixing" << std::endl;
+    bstore->umount();
+    ASSERT_EQ(bstore->fsck(false), col_count * obj_count);
+    ASSERT_LE(bstore->quick_fix(), 0);
+    ASSERT_EQ(bstore->fsck(false), 0);
+  }
+
+  cerr << "Completing" << std::endl;
+  bstore->mount();
+}
+
+TEST_P(StoreTest, BluestoreRepairGlobalStats) {
+  if (string(GetParam()) != "bluestore")
+    return;
+  const size_t offs_base = 65536 / 2;
+
+  BlueStore* bstore = dynamic_cast<BlueStore*> (store.get());
+
+  // start with global stats
+  bstore->inject_global_statfs({});
+  bstore->umount();
+  SetVal(g_conf(), "bluestore_fsck_quick_fix_on_mount", "false");
+  bstore->mount();
+
+  // fill the store with some data
+  const uint64_t pool = 555;
+  coll_t cid(spg_t(pg_t(0, pool), shard_id_t::NO_SHARD));
+  auto ch = store->create_new_collection(cid);
+
+  ghobject_t hoid = make_object("Object 1", pool);
+  ghobject_t hoid_dup = make_object("Object 1(dup)", pool);
+  ghobject_t hoid2 = make_object("Object 2", pool);
+  ghobject_t hoid_cloned = hoid2;
+  hoid_cloned.hobj.snap = 1;
+  ghobject_t hoid3 = make_object("Object 3", pool);
+  ghobject_t hoid3_cloned = hoid3;
+  hoid3_cloned.hobj.snap = 1;
+  bufferlist bl;
+  bl.append("1234512345");
+  int r;
+  const size_t repeats = 16;
+  {
+    auto ch = store->create_new_collection(cid);
+    cerr << "create collection + write" << std::endl;
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    for( auto i = 0ul; i < repeats; ++i ) {
+      t.write(cid, hoid, i * offs_base, bl.length(), bl);
+      t.write(cid, hoid_dup, i * offs_base, bl.length(), bl);
+    }
+    for( auto i = 0ul; i < repeats; ++i ) {
+      t.write(cid, hoid2, i * offs_base, bl.length(), bl);
+    }
+    t.clone(cid, hoid2, hoid_cloned);
+
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+
+  bstore->umount();
+
+  // enable per-pool stats collection hence causing fsck to fail
+  cerr << "per-pool statfs" << std::endl;
+  SetVal(g_conf(), "bluestore_fsck_error_on_no_per_pool_stats", "true");
+  g_ceph_context->_conf.apply_changes(nullptr);
+
+  ASSERT_EQ(bstore->fsck(false), 1);
+  ASSERT_EQ(bstore->repair(false), 0);
+  ASSERT_EQ(bstore->fsck(false), 0);
+
+  bstore->mount();
+}
+
+TEST_P(StoreTest, BluestoreRepairGlobalStatsFixOnMount) {
+  if (string(GetParam()) != "bluestore")
+    return;
+  const size_t offs_base = 65536 / 2;
+
+  BlueStore* bstore = dynamic_cast<BlueStore*> (store.get());
+
+  // start with global stats
+  bstore->inject_global_statfs({});
+  bstore->umount();
+  SetVal(g_conf(), "bluestore_fsck_quick_fix_on_mount", "false");
+  bstore->mount();
+
+  // fill the store with some data
+  const uint64_t pool = 555;
+  coll_t cid(spg_t(pg_t(0, pool), shard_id_t::NO_SHARD));
+  auto ch = store->create_new_collection(cid);
+
+  ghobject_t hoid = make_object("Object 1", pool);
+  ghobject_t hoid_dup = make_object("Object 1(dup)", pool);
+  ghobject_t hoid2 = make_object("Object 2", pool);
+  ghobject_t hoid_cloned = hoid2;
+  hoid_cloned.hobj.snap = 1;
+  ghobject_t hoid3 = make_object("Object 3", pool);
+  ghobject_t hoid3_cloned = hoid3;
+  hoid3_cloned.hobj.snap = 1;
+  bufferlist bl;
+  bl.append("1234512345");
+  int r;
+  const size_t repeats = 16;
+  {
+    auto ch = store->create_new_collection(cid);
+    cerr << "create collection + write" << std::endl;
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    for( auto i = 0ul; i < repeats; ++i ) {
+      t.write(cid, hoid, i * offs_base, bl.length(), bl);
+      t.write(cid, hoid_dup, i * offs_base, bl.length(), bl);
+    }
+    for( auto i = 0ul; i < repeats; ++i ) {
+      t.write(cid, hoid2, i * offs_base, bl.length(), bl);
+    }
+    t.clone(cid, hoid2, hoid_cloned);
+
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+
+  bstore->umount();
+
+  // enable per-pool stats collection hence causing fsck to fail
+  cerr << "per-pool statfs" << std::endl;
+  SetVal(g_conf(), "bluestore_fsck_error_on_no_per_pool_stats", "true");
+  g_ceph_context->_conf.apply_changes(nullptr);
+
+  ASSERT_EQ(bstore->fsck(false), 1);
+
+  SetVal(g_conf(), "bluestore_fsck_quick_fix_on_mount", "true");
+  bstore->mount();
+  bstore->umount();
+  ASSERT_EQ(bstore->fsck(false), 0);
+
+  bstore->mount();
+}
+
+TEST_P(StoreTest, BluestoreStatistics) {
+  if (string(GetParam()) != "bluestore")
+    return;
+
+  SetVal(g_conf(), "rocksdb_perf", "true");
+  SetVal(g_conf(), "rocksdb_collect_compaction_stats", "true");
+  SetVal(g_conf(), "rocksdb_collect_extended_stats","true");
+  SetVal(g_conf(), "rocksdb_collect_memory_stats","true");
+
+  // disable cache
+  SetVal(g_conf(), "bluestore_cache_size_ssd", "0");
+  SetVal(g_conf(), "bluestore_cache_size_hdd", "0");
+  SetVal(g_conf(), "bluestore_cache_size", "0");
+  g_ceph_context->_conf.apply_changes(nullptr);
 
   int r = store->umount();
   ASSERT_EQ(r, 0);
@@ -7525,6 +7898,93 @@ TEST_P(StoreTest, BluestoreStatistics) {
   cout << std::endl;
 }
 
+TEST_P(StoreTest, BluestorePerPoolOmapFixOnMount)
+{
+  if (string(GetParam()) != "bluestore")
+    return;
+
+  BlueStore* bstore = dynamic_cast<BlueStore*> (store.get());
+  const uint64_t pool = 555;
+  coll_t cid(spg_t(pg_t(0, pool), shard_id_t::NO_SHARD));
+  ghobject_t oid = make_object("Object 1", pool);
+  ghobject_t oid2 = make_object("Object 2", pool);
+  // fill the store with some data
+  auto ch = store->create_new_collection(cid);
+  map<string, bufferlist> omap;
+  bufferlist h;
+  h.append("header");
+  {
+    omap["omap_key"].append("omap value");
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    t.touch(cid, oid);
+    t.omap_setheader(cid, oid, h);
+    t.touch(cid, oid2);
+    t.omap_setheader(cid, oid2, h);
+    int r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+
+  // inject legacy omaps
+  bstore->inject_legacy_omap();
+  bstore->inject_legacy_omap(cid, oid);
+  bstore->inject_legacy_omap(cid, oid2);
+
+  bstore->umount();
+
+  // check we injected an issue
+  SetVal(g_conf(), "bluestore_fsck_quick_fix_on_mount", "false");
+  SetVal(g_conf(), "bluestore_fsck_error_on_no_per_pool_omap", "true");
+  g_ceph_context->_conf.apply_changes(nullptr);
+  ASSERT_EQ(bstore->fsck(false), 3);
+
+  // set autofix and mount
+  SetVal(g_conf(), "bluestore_fsck_quick_fix_on_mount", "true");
+  g_ceph_context->_conf.apply_changes(nullptr);
+  bstore->mount();
+  bstore->umount();
+
+  // check we fixed it..
+  ASSERT_EQ(bstore->fsck(false), 0);
+  bstore->mount();
+
+  //
+  // Now repro https://tracker.ceph.com/issues/43824
+  //
+  // inject legacy omaps again
+  bstore->inject_legacy_omap();
+  bstore->inject_legacy_omap(cid, oid);
+  bstore->inject_legacy_omap(cid, oid2);
+  bstore->umount();
+
+  // check we injected an issue
+  SetVal(g_conf(), "bluestore_fsck_quick_fix_on_mount", "true");
+  SetVal(g_conf(), "bluestore_fsck_error_on_no_per_pool_omap", "true");
+  g_ceph_context->_conf.apply_changes(nullptr);
+  bstore->mount();
+  ch = store->open_collection(cid);
+
+  {
+    // write to onode which will partiall revert per-pool
+    // omap repair done on mount due to #43824.
+    // And object removal will leave stray per-pool omap recs
+    //
+    ObjectStore::Transaction t;
+    bufferlist bl;
+    bl.append("data");
+    //this triggers onode rec update and hence legacy omap
+    t.write(cid, oid, 0, bl.length(), bl);
+    t.remove(cid, oid2); // this will trigger stray per-pool omap
+    int r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  bstore->umount();
+  // check omap's been fixed.
+  ASSERT_EQ(bstore->fsck(false), 0); // this will fail without fix for #43824
+
+  bstore->mount();
+}
+
 TEST_P(StoreTestSpecificAUSize, BluestoreTinyDevFailure) {
   if (string(GetParam()) != "bluestore")
     return;
@@ -7591,6 +8051,7 @@ TEST_P(StoreTest, SpuriousReadErrorTest) {
     EXPECT_EQ(store->umount(), 0);
     EXPECT_EQ(store->mount(), 0);
   }
+  ch = store->open_collection(cid);
 
   cerr << "Injecting CRC error with no retry, expecting EIO" << std::endl;
   SetVal(g_conf(), "bluestore_retry_disk_reads", "0");
@@ -7731,6 +8192,272 @@ TEST_P(StoreTest, mergeRegionTest) {
     ASSERT_EQ(final_len, static_cast<uint64_t>(r));
   }
 }
+
+TEST_P(StoreTestSpecificAUSize, BluestoreEnforceHWSettingsHdd) {
+  if (string(GetParam()) != "bluestore")
+    return;
+
+  SetVal(g_conf(), "bluestore_debug_enforce_settings", "hdd");
+  StartDeferred(0x1000);
+
+  int r;
+  coll_t cid;
+  ghobject_t hoid(hobject_t(sobject_t("Object", CEPH_NOSNAP)));
+  auto ch = store->create_new_collection(cid);
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    cerr << "Creating collection " << cid << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  {
+    ObjectStore::Transaction t;
+    bufferlist bl, orig;
+    string s(g_ceph_context->_conf->bluestore_max_blob_size_hdd, '0');
+    bl.append(s);
+    t.write(cid, hoid, 0, bl.length(), bl);
+    cerr << "write" << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    const PerfCounters* logger = store->get_perf_counters();
+    ASSERT_EQ(logger->get(l_bluestore_write_big_blobs), 1u);
+  }
+}
+  
+TEST_P(StoreTestSpecificAUSize, BluestoreEnforceHWSettingsSsd) {
+  if (string(GetParam()) != "bluestore")
+    return;
+
+  SetVal(g_conf(), "bluestore_debug_enforce_settings", "ssd");
+  StartDeferred(0x1000);
+
+  int r;
+  coll_t cid;
+  ghobject_t hoid(hobject_t(sobject_t("Object", CEPH_NOSNAP)));
+  auto ch = store->create_new_collection(cid);
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    cerr << "Creating collection " << cid << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  {
+    ObjectStore::Transaction t;
+    bufferlist bl, orig;
+    string s(g_ceph_context->_conf->bluestore_max_blob_size_ssd * 8, '0');
+    bl.append(s);
+    t.write(cid, hoid, 0, bl.length(), bl);
+    cerr << "write" << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    const PerfCounters* logger = store->get_perf_counters();
+    ASSERT_EQ(logger->get(l_bluestore_write_big_blobs), 8u);
+  }
+}
+  
+TEST_P(StoreTestSpecificAUSize, ReproNoBlobMultiTest) {
+
+  if(string(GetParam()) != "bluestore")
+    return;
+
+  SetVal(g_conf(), "bluestore_block_db_create", "true");
+  SetVal(g_conf(), "bluestore_block_db_size", "4294967296");
+  SetVal(g_conf(), "bluestore_block_size", "12884901888");
+  SetVal(g_conf(), "bluestore_max_blob_size", "524288");
+
+  g_conf().apply_changes(nullptr);
+
+  StartDeferred(65536);
+
+  int r;
+  coll_t cid;
+  ghobject_t hoid(hobject_t(sobject_t("Object 1", CEPH_NOSNAP)));
+  ghobject_t hoid2 = hoid;
+  hoid2.hobj.snap = 1;
+
+  auto ch = store->create_new_collection(cid);
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    cerr << "Creating collection " << cid << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  {
+    bool exists = store->exists(ch, hoid);
+    ASSERT_TRUE(!exists);
+
+    ObjectStore::Transaction t;
+    t.touch(cid, hoid);
+    cerr << "Creating object " << hoid << std::endl;
+    r = queue_transaction(store, ch, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    exists = store->exists(ch, hoid);
+    ASSERT_EQ(true, exists);
+  }
+  {
+    uint64_t offs = 0;
+    bufferlist bl;
+    const int size = 0x100;
+    bufferptr ap(size);
+    memset(ap.c_str(), 'a', size);
+    bl.append(ap);
+    int i = 0;
+    uint64_t  blob_size = 524288;
+    uint64_t total = 0;
+    for (i = 0; i <= 512; i++) {
+      offs = 0 + i * size;
+      ObjectStore::Transaction t;
+      ghobject_t hoid2 = hoid;
+      hoid2.hobj.snap = i + 1;
+      while (offs < 128 * 1024 * 1024) {
+
+        t.write(cid, hoid, offs, ap.length(), bl);
+       offs += blob_size;
+       total += ap.length();
+      }
+      t.clone(cid, hoid, hoid2);
+      r = queue_transaction(store, ch, std::move(t));
+      ASSERT_EQ(r, 0);
+    }
+    cerr << "Total written = " << total << std::endl;
+  }
+  {
+    cerr << "Finalizing" << std::endl;
+    const PerfCounters* logger = store->get_perf_counters();
+    ASSERT_GE(logger->get(l_bluestore_gc_merged), 1024*1024*1024);
+  }
+}
+
+void doManySetAttr(ObjectStore* store,
+  std::function<void(ObjectStore*)> do_check_fn)
+{
+  MixedGenerator gen(447);
+  gen_type rng(time(NULL));
+  coll_t cid(spg_t(pg_t(0, 447), shard_id_t::NO_SHARD));
+
+  SyntheticWorkloadState test_obj(store, &gen, &rng, cid, 0, 0, 0);
+  test_obj.init();
+  size_t object_count = 256;
+  for (size_t i = 0; i < object_count; ++i) {
+    if (!(i % 10)) cerr << "seeding object " << i << std::endl;
+    test_obj.touch();
+  }
+  for (size_t i = 0; i < object_count; ++i) {
+    if (!(i % 100)) {
+      cerr << "Op " << i << std::endl;
+      test_obj.print_internal_state();
+    }
+    test_obj.set_fixed_attrs(1024, 64, 4096); // 1024 attributes, 64 bytes name and 4K value
+  }
+  test_obj.wait_for_done();
+
+  std::cout << "done" << std::endl;
+  do_check_fn(store);
+  AdminSocket* admin_socket = g_ceph_context->get_admin_socket();
+  ceph_assert(admin_socket);
+
+  ceph::bufferlist in, out;
+  ostringstream err;
+
+  auto r = admin_socket->execute_command(
+    { "{\"prefix\": \"bluefs stats\"}" },
+    in, err, &out);
+  if (r != 0) {
+    cerr << "failure querying: " << cpp_strerror(r) << std::endl;
+  } else {
+    std::cout << std::string(out.c_str(), out.length()) << std::endl;
+  }
+  do_check_fn(store);
+  test_obj.shutdown();
+}
+
+TEST_P(StoreTestSpecificAUSize, SpilloverTest) {
+  if (string(GetParam()) != "bluestore")
+    return;
+
+  SetVal(g_conf(), "bluestore_block_db_create", "true");
+  SetVal(g_conf(), "bluestore_block_db_size", "3221225472");
+  SetVal(g_conf(), "bluestore_volume_selection_policy", "rocksdb_original");
+
+  g_conf().apply_changes(nullptr);
+
+  StartDeferred(65536);
+  doManySetAttr(store.get(),
+    [&](ObjectStore* _store) {
+
+      BlueStore* bstore = dynamic_cast<BlueStore*> (_store);
+      ceph_assert(bstore);
+      bstore->compact();
+      const PerfCounters* logger = bstore->get_bluefs_perf_counters();
+      //experimentally it was discovered that this case results in 400+MB spillover
+      //using lower 300MB threshold just to be safe enough
+      std::cout << "db_used:" << logger->get(l_bluefs_db_used_bytes) << std::endl;
+      std::cout << "slow_used:" << logger->get(l_bluefs_slow_used_bytes) << std::endl;
+
+      // Disabling any validation/assertion for now as it looks like
+      // we're unable to 100% force RocksDB to spillover.
+      // Leaving test case hoping to fix that one day though.
+      //ASSERT_GE(logger->get(l_bluefs_slow_used_bytes), 16 * 1024 * 1024);
+    }
+  );
+}
+
+TEST_P(StoreTestSpecificAUSize, SpilloverFixedTest) {
+  if (string(GetParam()) != "bluestore")
+    return;
+
+  SetVal(g_conf(), "bluestore_block_db_create", "true");
+  SetVal(g_conf(), "bluestore_block_db_size", "3221225472");
+  SetVal(g_conf(), "bluestore_volume_selection_policy", "use_some_extra");
+  SetVal(g_conf(), "bluestore_volume_selection_reserved", "1"); // just use non-zero to enable
+
+  g_conf().apply_changes(nullptr);
+
+  StartDeferred(65536);
+  doManySetAttr(store.get(),
+    [&](ObjectStore* _store) {
+
+      BlueStore* bstore = dynamic_cast<BlueStore*> (_store);
+      ceph_assert(bstore);
+      bstore->compact();
+      const PerfCounters* logger = bstore->get_bluefs_perf_counters();
+      ASSERT_EQ(0, logger->get(l_bluefs_slow_used_bytes));
+    }
+  );
+}
+
+TEST_P(StoreTestSpecificAUSize, SpilloverFixed2Test) {
+  if (string(GetParam()) != "bluestore")
+    return;
+
+  SetVal(g_conf(), "bluestore_block_db_create", "true");
+  SetVal(g_conf(), "bluestore_block_db_size", "3221225472");
+  SetVal(g_conf(), "bluestore_volume_selection_policy", "use_some_extra");
+  //default 2.0 factor results in too high threshold, using less value
+  // that results in less but still present spillover.
+  SetVal(g_conf(), "bluestore_volume_selection_reserved_factor", "0.5");
+
+  g_conf().apply_changes(nullptr);
+
+  StartDeferred(65536);
+  doManySetAttr(store.get(),
+    [&](ObjectStore* _store) {
+
+      BlueStore* bstore = dynamic_cast<BlueStore*> (_store);
+      ceph_assert(bstore);
+      bstore->compact();
+      const PerfCounters* logger = bstore->get_bluefs_perf_counters();
+      ASSERT_LE(logger->get(l_bluefs_slow_used_bytes), 300 * 1024 * 1024); // see SpilloverTest for 300MB choice rationale
+    }
+  );
+}
+
 #endif  // WITH_BLUESTORE
 
 int main(int argc, char **argv) {