]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/BlueStore.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
index 916054010d9b236667e71549d121bab61d7df42d..84a6e3961c1013d43041646e07d3cdd751089096 100644 (file)
 #include <boost/functional/hash.hpp>
 #include <boost/dynamic_bitset.hpp>
 
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 #include "include/unordered_map.h"
-#include "include/memory.h"
 #include "include/mempool.h"
+#include "common/bloom_filter.hpp"
 #include "common/Finisher.h"
+#include "common/Throttle.h"
 #include "common/perf_counters.h"
 #include "common/PriorityCache.h"
 #include "compressor/Compressor.h"
 
 #include "bluestore_types.h"
 #include "BlockDevice.h"
+#include "BlueFS.h"
 #include "common/EventTrace.h"
 
 class Allocator;
 class FreelistManager;
-class BlueFS;
+class BlueStoreRepairer;
 
 //#define DEBUG_CACHE
 //#define DEBUG_DEFERRED
@@ -60,7 +62,8 @@ enum {
   l_bluestore_first = 732430,
   l_bluestore_kv_flush_lat,
   l_bluestore_kv_commit_lat,
-  l_bluestore_kv_lat,
+  l_bluestore_kv_sync_lat,
+  l_bluestore_kv_final_lat,
   l_bluestore_state_prepare_lat,
   l_bluestore_state_aio_wait_lat,
   l_bluestore_state_io_done_lat,
@@ -120,18 +123,29 @@ enum {
   l_bluestore_read_eio,
   l_bluestore_reads_with_retries,
   l_bluestore_fragmentation,
+  l_bluestore_omap_seek_to_first_lat,
+  l_bluestore_omap_upper_bound_lat,
+  l_bluestore_omap_lower_bound_lat,
+  l_bluestore_omap_next_lat,
+  l_bluestore_clist_lat,
   l_bluestore_last
 };
 
+#define META_POOL_ID ((uint64_t)-1ull)
+
 class BlueStore : public ObjectStore,
+                 public BlueFSDeviceExpander,
                  public md_config_obs_t {
   // -----------------------------------------------------
   // types
 public:
   // config observer
   const char** get_tracked_conf_keys() const override;
-  void handle_conf_change(const struct md_config_t *conf,
-                                  const std::set<std::string> &changed) override;
+  void handle_conf_change(const ConfigProxy& conf,
+                         const std::set<std::string> &changed) override;
+
+  //handler for discard event
+  void handle_discard(interval_set<uint64_t>& to_release);
 
   void _set_csum();
   void _set_compression();
@@ -213,7 +227,7 @@ public:
     }
 
     void truncate(uint32_t newlen) {
-      assert(newlen < length);
+      ceph_assert(newlen < length);
       if (data.length()) {
        bufferlist t;
        t.substr_of(data, 0, newlen);
@@ -262,8 +276,8 @@ public:
     state_list_t writing;   ///< writing buffers, sorted by seq, ascending
 
     ~BufferSpace() {
-      assert(buffer_map.empty());
-      assert(writing.empty());
+      ceph_assert(buffer_map.empty());
+      ceph_assert(writing.empty());
     }
 
     void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
@@ -279,7 +293,7 @@ public:
             ++it;
           }
 
-          assert(it->seq >= b->seq);
+          ceph_assert(it->seq >= b->seq);
           // note that this will insert b before it
           // hence the order is maintained
           writing.insert(it, *b);
@@ -295,7 +309,7 @@ public:
     }
     void _rm_buffer(Cache* cache,
                    map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
-      assert(p != buffer_map.end());
+      ceph_assert(p != buffer_map.end());
       cache->_audit("_rm_buffer start");
       if (p->second->is_writing()) {
         writing.erase(writing.iterator_to(*p->second));
@@ -322,14 +336,14 @@ public:
 
     // return value is the highest cache_private of a trimmed buffer, or 0.
     int discard(Cache* cache, uint32_t offset, uint32_t length) {
-      std::lock_guard<std::recursive_mutex> l(cache->lock);
+      std::lock_guard l(cache->lock);
       return _discard(cache, offset, length);
     }
     int _discard(Cache* cache, uint32_t offset, uint32_t length);
 
     void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
               unsigned flags) {
-      std::lock_guard<std::recursive_mutex> l(cache->lock);
+      std::lock_guard l(cache->lock);
       Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
                             flags);
       b->cache_private = _discard(cache, offset, bl.length());
@@ -337,7 +351,7 @@ public:
     }
     void _finish_write(Cache* cache, uint64_t seq);
     void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
-      std::lock_guard<std::recursive_mutex> l(cache->lock);
+      std::lock_guard l(cache->lock);
       Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
       b->cache_private = _discard(cache, offset, bl.length());
       _add_buffer(cache, b, 1, nullptr);
@@ -355,11 +369,11 @@ public:
     void split(Cache* cache, size_t pos, BufferSpace &r);
 
     void dump(Cache* cache, Formatter *f) const {
-      std::lock_guard<std::recursive_mutex> l(cache->lock);
+      std::lock_guard l(cache->lock);
       f->open_array_section("buffers");
       for (auto& i : buffer_map) {
        f->open_object_section("buffer");
-       assert(i.first == i.second->offset);
+       ceph_assert(i.first == i.second->offset);
        i.second->dump(f);
        f->close_section();
       }
@@ -410,7 +424,7 @@ public:
 
     /// put logical references, and get back any released extents
     void put_ref(uint64_t offset, uint32_t length,
-                PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
+                PExtentVector *r, bool *unshare);
 
     void finish_write(uint64_t seq);
 
@@ -432,14 +446,15 @@ public:
 
   /// a lookup table of SharedBlobs
   struct SharedBlobSet {
-    std::mutex lock;   ///< protect lookup, insertion, removal
+    /// protect lookup, insertion, removal
+    ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
 
     // we use a bare pointer because we don't want to affect the ref
     // count
     mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
 
     SharedBlobRef lookup(uint64_t sbid) {
-      std::lock_guard<std::mutex> l(lock);
+      std::lock_guard l(lock);
       auto p = sb_map.find(sbid);
       if (p == sb_map.end() ||
          p->second->nref == 0) {
@@ -449,14 +464,14 @@ public:
     }
 
     void add(Collection* coll, SharedBlob *sb) {
-      std::lock_guard<std::mutex> l(lock);
+      std::lock_guard l(lock);
       sb_map[sb->get_sbid()] = sb;
       sb->coll = coll;
     }
 
     bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
-      std::lock_guard<std::mutex> l(lock);
-      assert(sb->get_parent() == this);
+      std::lock_guard l(lock);
+      ceph_assert(sb->get_parent() == this);
       if (verify_nref_is_zero && sb->nref != 0) {
        return false;
       }
@@ -470,11 +485,12 @@ public:
     }
 
     bool empty() {
-      std::lock_guard<std::mutex> l(lock);
+      std::lock_guard l(lock);
       return sb_map.empty();
     }
 
-    void dump(CephContext *cct, int lvl);
+    template <int LogLevelV>
+    void dump(CephContext *cct);
   };
 
 //#define CACHE_BLOB_BL  // not sure if this is a win yet or not... :/
@@ -518,7 +534,7 @@ public:
     }
 
     bool can_split() const {
-      std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
+      std::lock_guard l(shared_blob->get_cache()->lock);
       // splitting a BufferSpace writing list is too hard; don't try.
       return shared_blob->bc.writing.empty() &&
              used_in_blob.can_split() &&
@@ -577,9 +593,9 @@ public:
 #ifdef CACHE_BLOB_BL
     void _encode() const {
       if (blob_bl.length() == 0 ) {
-       ::encode(blob, blob_bl);
+       encode(blob, blob_bl);
       } else {
-       assert(blob_bl.length());
+       ceph_assert(blob_bl.length());
       }
     }
     void bound_encode(
@@ -602,7 +618,7 @@ public:
     }
     void decode(
       Collection */*coll*/,
-      bufferptr::iterator& p,
+      bufferptr::const_iterator& p,
       bool include_ref_map) {
       const char *start = p.get_pos();
       denc(blob, p);
@@ -642,7 +658,7 @@ public:
     }
     void decode(
       Collection *coll,
-      bufferptr::iterator& p,
+      bufferptr::const_iterator& p,
       uint64_t struct_v,
       uint64_t* sbid,
       bool include_ref_map);
@@ -679,7 +695,7 @@ public:
     }
 
     void assign_blob(const BlobRef& b) {
-      assert(!blob);
+      ceph_assert(!blob);
       blob = b;
       blob->shared_blob->get_cache()->add_extent();
     }
@@ -747,6 +763,7 @@ public:
     Onode *onode;
     extent_map_t extent_map;        ///< map of Extents to Blobs
     blob_map_t spanning_blob_map;   ///< blobs that span shards
+    typedef boost::intrusive_ptr<Onode> OnodeRef;
 
     struct Shard {
       bluestore_onode_t::shard_info *shard_info = nullptr;
@@ -761,6 +778,9 @@ public:
     uint32_t needs_reshard_begin = 0;
     uint32_t needs_reshard_end = 0;
 
+    void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
+      uint64_t&, uint64_t&, uint64_t&);
+
     bool needs_reshard() const {
       return needs_reshard_end > needs_reshard_begin;
     }
@@ -798,11 +818,11 @@ public:
 
     void bound_encode_spanning_blobs(size_t& p);
     void encode_spanning_blobs(bufferlist::contiguous_appender& p);
-    void decode_spanning_blobs(bufferptr::iterator& p);
+    void decode_spanning_blobs(bufferptr::const_iterator& p);
 
     BlobRef get_spanning_blob(int id) {
       auto p = spanning_blob_map.find(id);
-      assert(p != spanning_blob_map.end());
+      ceph_assert(p != spanning_blob_map.end());
       return p->second;
     }
 
@@ -845,7 +865,7 @@ public:
        return false;
       }
       int s = seek_shard(offset);
-      assert(s >= 0);
+      ceph_assert(s >= 0);
       if (s == (int)shards.size() - 1) {
        return false; // last shard
       }
@@ -996,8 +1016,8 @@ public:
     int64_t expected_for_release = 0;      ///< alloc units currently used by
                                            ///< compressed blobs that might
                                            ///< gone after GC
-    uint64_t gc_start_offset;              ///starting offset for GC
-    uint64_t gc_end_offset;                ///ending offset for GC
+    uint64_t gc_start_offset = 0;              ///starting offset for GC
+    uint64_t gc_end_offset = 0;                ///ending offset for GC
 
   protected:
     void process_protrusive_extents(const BlueStore::ExtentMap& extent_map, 
@@ -1032,8 +1052,9 @@ public:
     // track txc's that have not been committed to kv store (and whose
     // effects cannot be read via the kvdb read methods)
     std::atomic<int> flushing_count = {0};
-    std::mutex flush_lock;  ///< protect flush_txns
-    std::condition_variable flush_cond;   ///< wait here for uncommitted txns
+    /// protect flush_txns
+    ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
+    ceph::condition_variable flush_cond;   ///< wait here for uncommitted txns
 
     Onode(Collection *c, const ghobject_t& o,
          const mempool::bluestore_cache_other::string& k)
@@ -1061,7 +1082,10 @@ public:
   struct Cache {
     CephContext* cct;
     PerfCounters *logger;
-    std::recursive_mutex lock;          ///< protect lru and other structures
+
+    /// protect lru and other structures
+    ceph::recursive_mutex lock = {
+      ceph::make_recursive_mutex("BlueStore::Cache::lock") };
 
     std::atomic<uint64_t> num_extents = {0};
     std::atomic<uint64_t> num_blobs = {0};
@@ -1110,7 +1134,7 @@ public:
                           uint64_t *bytes) = 0;
 
     bool empty() {
-      std::lock_guard<std::recursive_mutex> l(lock);
+      std::lock_guard l(lock);
       return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
     }
 
@@ -1174,7 +1198,7 @@ public:
       buffer_size += b->length;
     }
     void _rm_buffer(Buffer *b) override {
-      assert(buffer_size >= b->length);
+      ceph_assert(buffer_size >= b->length);
       buffer_size -= b->length;
       auto q = buffer_lru.iterator_to(*b);
       buffer_lru.erase(q);
@@ -1184,7 +1208,7 @@ public:
       _add_buffer(b, 0, nullptr);
     }
     void _adjust_buffer_size(Buffer *b, int64_t delta) override {
-      assert((int64_t)buffer_size + delta >= 0);
+      ceph_assert((int64_t)buffer_size + delta >= 0);
       buffer_size += delta;
     }
     void _touch_buffer(Buffer *b) override {
@@ -1200,7 +1224,7 @@ public:
                   uint64_t *blobs,
                   uint64_t *buffers,
                   uint64_t *bytes) override {
-      std::lock_guard<std::recursive_mutex> l(lock);
+      std::lock_guard l(lock);
       *onodes += onode_lru.size();
       *extents += num_extents;
       *blobs += num_blobs;
@@ -1278,7 +1302,7 @@ public:
        break;
       case BUFFER_WARM_OUT:
        // move from warm_out to hot LRU
-       assert(0 == "this happens via discard hint");
+       ceph_abort_msg("this happens via discard hint");
        break;
       case BUFFER_HOT:
        // move to front of hot LRU
@@ -1295,7 +1319,7 @@ public:
                   uint64_t *blobs,
                   uint64_t *buffers,
                   uint64_t *bytes) override {
-      std::lock_guard<std::recursive_mutex> l(lock);
+      std::lock_guard l(lock);
       *onodes += onode_lru.size();
       *extents += num_extents;
       *blobs += num_blobs;
@@ -1334,16 +1358,20 @@ public:
     void clear();
     bool empty();
 
-    void dump(CephContext *cct, int lvl);
+    template <int LogLevelV>
+    void dump(CephContext *cct);
 
     /// return true if f true for any item
     bool map_any(std::function<bool(OnodeRef)> f);
   };
 
+  class OpSequencer;
+  typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
+
   struct Collection : public CollectionImpl {
     BlueStore *store;
+    OpSequencerRef osr;
     Cache *cache;       ///< our cache shard
-    coll_t cid;
     bluestore_cnode_t cnode;
     RWLock lock;
 
@@ -1357,6 +1385,7 @@ public:
 
     //pool options
     pool_opts_t pool_opts;
+    ContextQueue *commit_queue;
 
     OnodeRef get_onode(const ghobject_t& oid, bool create);
 
@@ -1382,10 +1411,6 @@ public:
       return b;
     }
 
-    const coll_t &get_cid() override {
-      return cid;
-    }
-
     bool contains(const ghobject_t& oid) {
       if (cid.is_meta())
        return oid.hobj.pool == -1;
@@ -1399,6 +1424,10 @@ public:
 
     void split_cache(Collection *dest);
 
+    bool flush_commit(Context *c) override;
+    void flush() override;
+    void flush_all_but_last();
+
     Collection(BlueStore *ns, Cache *ca, coll_t c);
   };
 
@@ -1407,13 +1436,16 @@ public:
     OnodeRef o;
     KeyValueDB::Iterator it;
     string head, tail;
+
+    string _stringify() const;
+
   public:
     OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
     int seek_to_first() override;
     int upper_bound(const string &after) override;
     int lower_bound(const string &to) override;
     bool valid() override;
-    int next(bool validate=true) override;
+    int next() override;
     string key() override;
     bufferlist value() override;
     int status() override {
@@ -1421,9 +1453,6 @@ public:
     }
   };
 
-  class OpSequencer;
-  typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
-
   struct volatile_statfs{
     enum {
       STATFS_ALLOCATED = 0,
@@ -1440,6 +1469,14 @@ public:
     void reset() {
       *this = volatile_statfs();
     }
+    void publish(store_statfs_t* buf) const {
+      buf->allocated = allocated();
+      buf->data_stored = stored();
+      buf->data_compressed = compressed();
+      buf->data_compressed_original = compressed_original();
+      buf->data_compressed_allocated = compressed_allocated();
+    }
+
     volatile_statfs& operator+=(const volatile_statfs& other) {
       for (size_t i = 0; i < STATFS_LAST; ++i) {
        values[i] += other.values[i];
@@ -1461,6 +1498,29 @@ public:
     int64_t& compressed_allocated() {
       return values[STATFS_COMPRESSED_ALLOCATED];
     }
+    int64_t allocated() const {
+      return values[STATFS_ALLOCATED];
+    }
+    int64_t stored() const {
+      return values[STATFS_STORED];
+    }
+    int64_t compressed_original() const {
+      return values[STATFS_COMPRESSED_ORIGINAL];
+    }
+    int64_t compressed() const {
+      return values[STATFS_COMPRESSED];
+    }
+    int64_t compressed_allocated() const {
+      return values[STATFS_COMPRESSED_ALLOCATED];
+    }
+    volatile_statfs& operator=(const store_statfs_t& st) {
+      values[STATFS_ALLOCATED] = st.allocated;
+      values[STATFS_STORED] = st.data_stored;
+      values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
+      values[STATFS_COMPRESSED] = st.data_compressed;
+      values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
+      return *this;
+    }
     bool is_empty() {
       return values[STATFS_ALLOCATED] == 0 &&
        values[STATFS_STORED] == 0 &&
@@ -1468,20 +1528,22 @@ public:
        values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
        values[STATFS_COMPRESSED_ALLOCATED] == 0;
     }
-    void decode(bufferlist::iterator& it) {
+    void decode(bufferlist::const_iterator& it) {
+      using ceph::decode;
       for (size_t i = 0; i < STATFS_LAST; i++) {
-       ::decode(values[i], it);
+       decode(values[i], it);
       }
     }
 
     void encode(bufferlist& bl) {
+      using ceph::encode;
       for (size_t i = 0; i < STATFS_LAST; i++) {
-       ::encode(values[i], bl);
+       encode(values[i], bl);
       }
     }
   };
 
-  struct TransContext : public AioContext {
+  struct TransContext final : public AioContext {
     MEMPOOL_CLASS_HELPERS();
 
     typedef enum {
@@ -1535,7 +1597,7 @@ public:
     }
 #endif
 
-    void log_state_latency(PerfCounters *logger, int state) {
+    utime_t log_state_latency(PerfCounters *logger, int state) {
       utime_t lat, now = ceph_clock_now();
       lat = now - last_stamp;
       logger->tinc(state, lat);
@@ -1546,9 +1608,11 @@ public:
       }
 #endif
       last_stamp = now;
+      return lat;
     }
 
-    OpSequencerRef osr;
+    CollectionRef ch;
+    OpSequencerRef osr;  // this should be ch->osr
     boost::intrusive::list_member_hook<> sequencer_item;
 
     uint64_t bytes = 0, cost = 0;
@@ -1559,9 +1623,6 @@ public:
     set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
 
     KeyValueDB::Transaction t; ///< then we will commit this
-    Context *oncommit = nullptr;         ///< signal on commit
-    Context *onreadable = nullptr;       ///< signal on readable
-    Context *onreadable_sync = nullptr;  ///< signal on readable
     list<Context*> oncommits;  ///< more commit completions
     list<CollectionRef> removed_collections; ///< colls we removed
 
@@ -1569,8 +1630,9 @@ public:
     bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
 
     interval_set<uint64_t> allocated, released;
-    volatile_statfs statfs_delta;
-
+    volatile_statfs statfs_delta;         ///< overall store statistics delta
+    uint64_t osd_pool_id = META_POOL_ID;    ///< osd pool id we're operating on
+    
     IOContext ioc;
     bool had_ios = false;  ///< true if we submitted IOs before our kv txn
 
@@ -1581,11 +1643,16 @@ public:
     uint64_t last_nid = 0;     ///< if non-zero, highest new nid we allocated
     uint64_t last_blobid = 0;  ///< if non-zero, highest new blobid we allocated
 
-    explicit TransContext(CephContext* cct, OpSequencer *o)
-      : osr(o),
+    explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
+                         list<Context*> *on_commits)
+      : ch(c),
+       osr(o),
        ioc(cct, this),
        start(ceph_clock_now()) {
       last_stamp = start;
+      if (on_commits) {
+       oncommits.swap(*on_commits);
+      }
     }
     ~TransContext() {
       delete deferred_txn;
@@ -1623,7 +1690,7 @@ public:
       boost::intrusive::list_member_hook<>,
       &TransContext::deferred_queue_item> > deferred_queue_t;
 
-  struct DeferredBatch : public AioContext {
+  struct DeferredBatch final : public AioContext {
     OpSequencer *osr;
     struct deferred_io {
       bufferlist bl;    ///< data
@@ -1651,10 +1718,10 @@ public:
     }
   };
 
-  class OpSequencer : public Sequencer_impl {
+  class OpSequencer : public RefCountedObject {
   public:
-    std::mutex qlock;
-    std::condition_variable qcond;
+    ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
+    ceph::condition_variable qcond;
     typedef boost::intrusive::list<
       TransContext,
       boost::intrusive::member_hook<
@@ -1668,8 +1735,8 @@ public:
     DeferredBatch *deferred_running = nullptr;
     DeferredBatch *deferred_pending = nullptr;
 
-    Sequencer *parent;
     BlueStore *store;
+    coll_t cid;
 
     uint64_t last_seq = 0;
 
@@ -1679,66 +1746,37 @@ public:
 
     std::atomic_int kv_submitted_waiters = {0};
 
-    std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
-    std::atomic_bool zombie = {false};    ///< owning Sequencer has gone away
+    std::atomic_bool zombie = {false};    ///< in zombie_osr set (collection going away)
 
-    OpSequencer(CephContext* cct, BlueStore *store)
-      : Sequencer_impl(cct),
-       parent(NULL), store(store) {
-      store->register_osr(this);
+    OpSequencer(BlueStore *store, const coll_t& c)
+      : RefCountedObject(store->cct, 0),
+       store(store), cid(c) {
     }
-    ~OpSequencer() override {
-      assert(q.empty());
-      _unregister();
-    }
-
-    void discard() override {
-      // Note that we may have txc's in flight when the parent Sequencer
-      // goes away.  Reflect this with zombie==registered==true and let
-      // _osr_drain_all clean up later.
-      assert(!zombie);
-      zombie = true;
-      parent = nullptr;
-      bool empty;
-      {
-       std::lock_guard<std::mutex> l(qlock);
-       empty = q.empty();
-      }
-      if (empty) {
-       _unregister();
-      }
-    }
-
-    void _unregister() {
-      if (registered) {
-       store->unregister_osr(this);
-       registered = false;
-      }
+    ~OpSequencer() {
+      ceph_assert(q.empty());
     }
 
     void queue_new(TransContext *txc) {
-      std::lock_guard<std::mutex> l(qlock);
+      std::lock_guard l(qlock);
       txc->seq = ++last_seq;
       q.push_back(*txc);
     }
 
     void drain() {
-      std::unique_lock<std::mutex> l(qlock);
+      std::unique_lock l(qlock);
       while (!q.empty())
        qcond.wait(l);
     }
 
     void drain_preceding(TransContext *txc) {
-      std::unique_lock<std::mutex> l(qlock);
+      std::unique_lock l(qlock);
       while (!q.empty() && &q.front() != txc)
        qcond.wait(l);
     }
 
     bool _is_all_kv_submitted() {
-      // caller must hold qlock
-      if (q.empty()) {
-       return true;
-      }
+      // caller must hold qlock & q.empty() must not empty
+      ceph_assert(!q.empty());
       TransContext *txc = &q.back();
       if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
        return true;
@@ -1746,14 +1784,15 @@ public:
       return false;
     }
 
-    void flush() override {
-      std::unique_lock<std::mutex> l(qlock);
+    void flush() {
+      std::unique_lock l(qlock);
       while (true) {
        // set flag before the check because the condition
        // may become true outside qlock, and we need to make
        // sure those threads see waiters and signal qcond.
        ++kv_submitted_waiters;
-       if (_is_all_kv_submitted()) {
+       if (q.empty() || _is_all_kv_submitted()) {
+         --kv_submitted_waiters;
          return;
        }
        qcond.wait(l);
@@ -1761,8 +1800,31 @@ public:
       }
     }
 
-    bool flush_commit(Context *c) override {
-      std::lock_guard<std::mutex> l(qlock);
+    void flush_all_but_last() {
+      std::unique_lock l(qlock);
+      assert (q.size() >= 1);
+      while (true) {
+       // set flag before the check because the condition
+       // may become true outside qlock, and we need to make
+       // sure those threads see waiters and signal qcond.
+       ++kv_submitted_waiters;
+       if (q.size() <= 1) {
+         --kv_submitted_waiters;
+         return;
+       } else {
+         auto it = q.rbegin();
+         it++;
+         if (it->state >= TransContext::STATE_KV_SUBMITTED) {
+           return;
+          }
+       }
+       qcond.wait(l);
+       --kv_submitted_waiters;
+      }
+    }
+
+    bool flush_commit(Context *c) {
+      std::lock_guard l(qlock);
       if (q.empty()) {
        return true;
       }
@@ -1828,8 +1890,8 @@ private:
   BlueFS *bluefs = nullptr;
   unsigned bluefs_shared_bdev = 0;  ///< which bluefs bdev we are sharing
   bool bluefs_single_shared_device = true;
-  utime_t bluefs_last_balance;
-  utime_t next_dump_on_bluefs_balance_failure;
+  mono_time bluefs_last_balance;
+  utime_t next_dump_on_bluefs_alloc_failure;
 
   KeyValueDB *db = nullptr;
   BlockDevice *bdev = nullptr;
@@ -1843,11 +1905,13 @@ private:
 
   RWLock coll_lock = {"BlueStore::coll_lock"};  ///< rwlock to protect coll_map
   mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
+  map<coll_t,CollectionRef> new_coll_map;
 
   vector<Cache*> cache_shards;
 
-  std::mutex osr_lock;              ///< protect osd_set
-  std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
+  /// protect zombie_osr_set
+  ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
+  std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< set of OpSequencers for deleted collections
 
   std::atomic<uint64_t> nid_last = {0};
   std::atomic<uint64_t> nid_max = {0};
@@ -1860,19 +1924,16 @@ private:
   interval_set<uint64_t> bluefs_extents;  ///< block extents owned by bluefs
   interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
 
-  std::mutex deferred_lock;
+  ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
   std::atomic<uint64_t> deferred_seq = {0};
   deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
   int deferred_queue_size = 0;         ///< num txc's queued across all osrs
   atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
-  Finisher deferred_finisher;
-
-  int m_finisher_num = 1;
-  vector<Finisher*> finishers;
+  Finisher deferred_finisher, finisher;
 
   KVSyncThread kv_sync_thread;
-  std::mutex kv_lock;
-  std::condition_variable kv_cond;
+  ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
+  ceph::condition_variable kv_cond;
   bool _kv_only = false;
   bool kv_sync_started = false;
   bool kv_stop = false;
@@ -1882,11 +1943,10 @@ private:
   deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
   deque<TransContext*> kv_committing;        ///< currently syncing
   deque<DeferredBatch*> deferred_done_queue;   ///< deferred ios done
-  deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
 
   KVFinalizeThread kv_finalize_thread;
-  std::mutex kv_finalize_lock;
-  std::condition_variable kv_finalize_cond;
+  ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
+  ceph::condition_variable kv_finalize_cond;
   deque<TransContext*> kv_committing_to_finalize;   ///< pending finalization
   deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
 
@@ -1940,28 +2000,35 @@ private:
   double cache_kv_ratio = 0;     ///< cache ratio dedicated to kv (e.g., rocksdb)
   double cache_data_ratio = 0;   ///< cache ratio dedicated to object data
   bool cache_autotune = false;   ///< cache autotune setting
-  uint64_t cache_autotune_chunk_size = 0; ///< cache autotune chunk size
   double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
   uint64_t osd_memory_target = 0;   ///< OSD memory target when autotuning cache
   uint64_t osd_memory_base = 0;     ///< OSD base memory when autotuning cache
   double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
-  uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cahce
+  uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
   double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing 
-  std::mutex vstatfs_lock;
+
+  typedef map<uint64_t, volatile_statfs> osd_pools_map;
+
+  ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
   volatile_statfs vstatfs;
+  osd_pools_map osd_pools; // protected by vstatfs_lock as well
+
+  bool per_pool_stat_collection = true;
 
   struct MempoolThread : public Thread {
   public:
     BlueStore *store;
 
-    Cond cond;
-    Mutex lock;
+    ceph::condition_variable cond;
+    ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
     bool stop = false;
     uint64_t autotune_cache_size = 0;
+    std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
 
     struct MempoolCache : public PriorityCache::PriCache {
       BlueStore *store;
-      int64_t cache_bytes[PriorityCache::Priority::LAST+1];
+      int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
+      int64_t committed_bytes = 0;
       double cache_ratio = 0;
 
       MempoolCache(BlueStore *s) : store(s) {};
@@ -1969,15 +2036,14 @@ private:
       virtual uint64_t _get_used_bytes() const = 0;
 
       virtual int64_t request_cache_bytes(
-          PriorityCache::Priority pri, uint64_t chunk_bytes) const {
+          PriorityCache::Priority pri, uint64_t total_cache) const {
         int64_t assigned = get_cache_bytes(pri);
 
         switch (pri) {
         // All cache items are currently shoved into the LAST priority 
         case PriorityCache::Priority::LAST:
           {
-            uint64_t usage = _get_used_bytes();
-            int64_t request = PriorityCache::get_chunk(usage, chunk_bytes);
+            int64_t request = _get_used_bytes();
             return(request > assigned) ? request - assigned : 0;
           }
         default:
@@ -2004,8 +2070,13 @@ private:
       virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
         cache_bytes[pri] += bytes;
       }
-      virtual int64_t commit_cache_size() {
-        return get_cache_bytes(); 
+      virtual int64_t commit_cache_size(uint64_t total_cache) {
+        committed_bytes = PriorityCache::get_chunk(
+            get_cache_bytes(), total_cache);
+        return committed_bytes;
+      }
+      virtual int64_t get_committed_size() const {
+        return committed_bytes;
       }
       virtual double get_cache_ratio() const {
         return cache_ratio;
@@ -2037,7 +2108,8 @@ private:
       double get_bytes_per_onode() const {
         return (double)_get_used_bytes() / (double)_get_num_onodes();
       }
-    } meta_cache;
+    };
+    std::shared_ptr<MetaCache> meta_cache;
 
     struct DataCache : public MempoolCache {
       DataCache(BlueStore *s) : MempoolCache(s) {};
@@ -2052,25 +2124,25 @@ private:
       virtual string get_cache_name() const {
         return "BlueStore Data Cache";
       }
-    } data_cache;
+    };
+    std::shared_ptr<DataCache> data_cache;
 
   public:
     explicit MempoolThread(BlueStore *s)
       : store(s),
-       lock("BlueStore::MempoolThread::lock"),
-        meta_cache(MetaCache(s)),
-        data_cache(DataCache(s)) {}
+        meta_cache(new MetaCache(s)),
+        data_cache(new DataCache(s)) {}
 
     void *entry() override;
     void init() {
-      assert(stop == false);
+      ceph_assert(stop == false);
       create("bstore_mempool");
     }
     void shutdown() {
-      lock.Lock();
+      lock.lock();
       stop = true;
-      cond.Signal();
-      lock.Unlock();
+      cond.notify_all();
+      lock.unlock();
       join();
     }
 
@@ -2078,10 +2150,12 @@ private:
     void _adjust_cache_settings();
     void _trim_shards(bool interval_stats);
     void _tune_cache_size(bool interval_stats);
-    void _balance_cache(const std::list<PriorityCache::PriCache *>& caches);
-    void _balance_cache_pri(int64_t *mem_avail, 
-                            const std::list<PriorityCache::PriCache *>& caches, 
-                            PriorityCache::Priority pri);
+    void _balance_cache(
+        const std::list<std::shared_ptr<PriorityCache::PriCache>>& caches);
+    void _balance_cache_pri(
+        int64_t *mem_avail, 
+        const std::list<std::shared_ptr<PriorityCache::PriCache>>& caches, 
+        PriorityCache::Priority pri);
   } mempool_thread;
 
   // --------------------------------------------------------
@@ -2103,10 +2177,44 @@ private:
   void _set_finisher_num();
 
   int _open_bdev(bool create);
+  // Verifies if disk space is enough for reserved + min bluefs
+  // and alters the latter if needed.
+  // Depends on min_alloc_size hence should be called after
+  // its initialization (and outside of _open_bdev)
+  void _validate_bdev();
   void _close_bdev();
-  int _open_db(bool create);
+
+  int _minimal_open_bluefs(bool create);
+  void _minimal_close_bluefs();
+  int _open_bluefs(bool create);
+  void _close_bluefs();
+
+  // Limited (u)mount intended for BlueFS operations only
+  int _mount_for_bluefs();
+  void _umount_for_bluefs();
+
+
+  int _is_bluefs(bool create, bool* ret);
+  /*
+  * opens both DB and dependant super_meta, FreelistManager and allocator
+  * in the proper order
+  */
+  int _open_db_and_around(bool read_only);
+  void _close_db_and_around();
+
+  // updates legacy bluefs related recs in DB to a state valid for
+  // downgrades from nautilus.
+  void _sync_bluefs_and_fm();
+
+  /*
+   * @warning to_repair_db means that we open this db to repair it, will not
+   * hold the rocksdb's file lock.
+   */
+  int _open_db(bool create,
+              bool to_repair_db=false,
+              bool read_only = false);
   void _close_db();
-  int _open_fm(bool create);
+  int _open_fm(KeyValueDB::Transaction t);
   void _close_fm();
   int _open_alloc();
   void _close_alloc();
@@ -2128,11 +2236,12 @@ private:
   int _open_super_meta();
 
   void _open_statfs();
+  void _get_statfs_overall(struct store_statfs_t *buf);
+
+  void _dump_alloc_on_failure();
 
-  void _dump_alloc_on_rebalance_failure();
-  int _reconcile_bluefs_freespace();
-  int _balance_bluefs_freespace(PExtentVector *extents);
-  void _commit_bluefs_freespace(const PExtentVector& extents);
+  int64_t _get_bluefs_size_delta(uint64_t bluefs_free, uint64_t bluefs_total);
+  int _balance_bluefs_freespace();
 
   CollectionRef _get_collection(const coll_t& cid);
   void _queue_reap_collection(CollectionRef& c);
@@ -2142,11 +2251,15 @@ private:
   void _assign_nid(TransContext *txc, OnodeRef o);
   uint64_t _assign_blobid(TransContext *txc);
 
-  void _dump_onode(const OnodeRef& o, int log_level=30);
-  void _dump_extent_map(ExtentMap& em, int log_level=30);
-  void _dump_transaction(Transaction *t, int log_level = 30);
+  template <int LogLevelV>
+  friend void _dump_onode(CephContext *cct, const Onode& o);
+  template <int LogLevelV>
+  friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
+  template <int LogLevelV>
+  friend void _dump_transaction(CephContext *cct, Transaction *t);
 
-  TransContext *_txc_create(OpSequencer *osr);
+  TransContext *_txc_create(Collection *c, OpSequencer *osr,
+                           list<Context*> *on_commits);
   void _txc_update_store_statfs(TransContext *txc);
   void _txc_add_transaction(TransContext *txc, Transaction *t);
   void _txc_calc_cost(TransContext *txc);
@@ -2165,9 +2278,11 @@ private:
   void _txc_finish(TransContext *txc);
   void _txc_release_alloc(TransContext *txc);
 
+  void _osr_attach(Collection *c);
+  void _osr_register_zombie(OpSequencer *osr);
+  void _osr_drain(OpSequencer *osr);
   void _osr_drain_preceding(TransContext *txc);
   void _osr_drain_all();
-  void _osr_unregister_all();
 
   void _kv_start();
   void _kv_stop();
@@ -2190,13 +2305,23 @@ public:
 
 private:
   int _fsck_check_extents(
+    const coll_t& cid,
     const ghobject_t& oid,
     const PExtentVector& extents,
     bool compressed,
     mempool_dynamic_bitset &used_blocks,
     uint64_t granularity,
+    BlueStoreRepairer* repairer,
     store_statfs_t& expected_statfs);
 
+  using  per_pool_statfs =
+    mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
+  void _fsck_check_pool_statfs(
+    per_pool_statfs& expected_pool_statfs,
+    bool need_per_pool_stats,
+    int& errors,
+    BlueStoreRepairer* repairer);
+
   void _buffer_cache_write(
     TransContext *txc,
     BlobRef b,
@@ -2226,6 +2351,8 @@ private:
                      uint64_t tail_pad,
                      bufferlist& padded);
 
+  void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
+
   // -- ondisk version ---
 public:
   const int32_t latest_ondisk_format = 2;        ///< our version
@@ -2236,6 +2363,7 @@ private:
   int32_t ondisk_format = 0;  ///< value detected on mount
 
   int _upgrade_super();  ///< upgrade (called during open_super)
+  uint64_t _get_ondisk_reserved() const;
   void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
 
   // --- public interface ---
@@ -2252,6 +2380,8 @@ public:
   bool wants_journal() override { return false; };
   bool allows_journal() override { return false; };
 
+  int get_devices(set<string> *ls) override;
+
   bool is_rotational() override;
   bool is_journal_rotational() override;
 
@@ -2266,21 +2396,26 @@ public:
     return device_class;
   }
 
+  int get_numa_node(
+    int *numa_node,
+    set<int> *nodes,
+    set<string> *failed) override;
+
   static int get_block_device_fsid(CephContext* cct, const string& path,
                                   uuid_d *fsid);
 
   bool test_mount_in_use() override;
 
 private:
-  int _mount(bool kv_only);
+  int _mount(bool kv_only, bool open_db=true);
 public:
   int mount() override {
     return _mount(false);
   }
   int umount() override;
 
-  int start_kv_only(KeyValueDB **pdb) {
-    int r = _mount(true);
+  int start_kv_only(KeyValueDB **pdb, bool open_db=true) {
+    int r = _mount(true, open_db);
     if (r < 0)
       return r;
     *pdb = db;
@@ -2300,6 +2435,24 @@ public:
   int _fsck(bool deep, bool repair);
 
   void set_cache_shards(unsigned num) override;
+  void dump_cache_stats(Formatter *f) override {
+    int onode_count = 0, buffers_bytes = 0;
+    for (auto i: cache_shards) {
+      onode_count += i->_get_num_onodes();
+      buffers_bytes += i->_get_buffer_bytes();
+    }
+    f->dump_int("bluestore_onode", onode_count);
+    f->dump_int("bluestore_buffers", buffers_bytes);
+  }
+  void dump_cache_stats(ostream& ss) override {
+    int onode_count = 0, buffers_bytes = 0;
+    for (auto i: cache_shards) {
+      onode_count += i->_get_num_onodes();
+      buffers_bytes += i->_get_buffer_bytes();
+    }
+    ss << "bluestore_onode: " << onode_count;
+    ss << "bluestore_buffers: " << buffers_bytes;
+  }
 
   int validate_hobject_key(const hobject_t &obj) const override {
     return 0;
@@ -2316,49 +2469,38 @@ public:
   void get_db_statistics(Formatter *f) override;
   void generate_db_histogram(Formatter *f) override;
   void _flush_cache();
-  void flush_cache() override;
+  int flush_cache(ostream *os = NULL) override;
   void dump_perf_counters(Formatter *f) override {
     f->open_object_section("perf_counters");
     logger->dump_formatted(f, false);
     f->close_section();
   }
 
-  void register_osr(OpSequencer *osr) {
-    std::lock_guard<std::mutex> l(osr_lock);
-    osr_set.insert(osr);
-  }
-  void unregister_osr(OpSequencer *osr) {
-    std::lock_guard<std::mutex> l(osr_lock);
-    osr_set.erase(osr);
-  }
+  int add_new_bluefs_device(int id, const string& path);
+  int migrate_to_existing_bluefs_device(const set<int>& devs_source,
+    int id);
+  int migrate_to_new_bluefs_device(const set<int>& devs_source,
+    int id,
+    const string& path);
+  int expand_devices(ostream& out);
+  string get_device_path(unsigned id);
 
 public:
-  int statfs(struct store_statfs_t *buf) override;
+  int statfs(struct store_statfs_t *buf,
+             osd_alert_list_t* alerts = nullptr) override;
+  int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf) override;
 
   void collect_metadata(map<string,string> *pm) override;
 
-  bool exists(const coll_t& cid, const ghobject_t& oid) override;
   bool exists(CollectionHandle &c, const ghobject_t& oid) override;
   int set_collection_opts(
-    const coll_t& cid,
+    CollectionHandle& c,
     const pool_opts_t& opts) override;
-  int stat(
-    const coll_t& cid,
-    const ghobject_t& oid,
-    struct stat *st,
-    bool allow_eio = false) override;
   int stat(
     CollectionHandle &c,
     const ghobject_t& oid,
     struct stat *st,
     bool allow_eio = false) override;
-  int read(
-    const coll_t& cid,
-    const ghobject_t& oid,
-    uint64_t offset,
-    size_t len,
-    bufferlist& bl,
-    uint32_t op_flags = 0) override;
   int read(
     CollectionHandle &c,
     const ghobject_t& oid,
@@ -2379,51 +2521,35 @@ private:
   int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
             uint64_t offset, size_t len, interval_set<uint64_t>& destset);
 public:
-  int fiemap(const coll_t& cid, const ghobject_t& oid,
-            uint64_t offset, size_t len, bufferlist& bl) override;
   int fiemap(CollectionHandle &c, const ghobject_t& oid,
             uint64_t offset, size_t len, bufferlist& bl) override;
-  int fiemap(const coll_t& cid, const ghobject_t& oid,
-            uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
   int fiemap(CollectionHandle &c, const ghobject_t& oid,
             uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
 
 
-  int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
-             bufferptr& value) override;
   int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
              bufferptr& value) override;
 
-  int getattrs(const coll_t& cid, const ghobject_t& oid,
-              map<string,bufferptr>& aset) override;
   int getattrs(CollectionHandle &c, const ghobject_t& oid,
               map<string,bufferptr>& aset) override;
 
   int list_collections(vector<coll_t>& ls) override;
 
   CollectionHandle open_collection(const coll_t &c) override;
+  CollectionHandle create_new_collection(const coll_t& cid) override;
+  void set_collection_commit_queue(const coll_t& cid,
+                                  ContextQueue *commit_queue) override;
 
   bool collection_exists(const coll_t& c) override;
-  int collection_empty(const coll_t& c, bool *empty) override;
-  int collection_bits(const coll_t& c) override;
+  int collection_empty(CollectionHandle& c, bool *empty) override;
+  int collection_bits(CollectionHandle& c) override;
 
-  int collection_list(const coll_t& cid,
-                     const ghobject_t& start,
-                     const ghobject_t& end,
-                     int max,
-                     vector<ghobject_t> *ls, ghobject_t *next) override;
   int collection_list(CollectionHandle &c,
                      const ghobject_t& start,
                      const ghobject_t& end,
                      int max,
                      vector<ghobject_t> *ls, ghobject_t *next) override;
 
-  int omap_get(
-    const coll_t& cid,                ///< [in] Collection containing oid
-    const ghobject_t &oid,   ///< [in] Object containing omap
-    bufferlist *header,      ///< [out] omap header
-    map<string, bufferlist> *out /// < [out] Key to value map
-    ) override;
   int omap_get(
     CollectionHandle &c,     ///< [in] Collection containing oid
     const ghobject_t &oid,   ///< [in] Object containing omap
@@ -2432,12 +2558,6 @@ public:
     ) override;
 
   /// Get omap header
-  int omap_get_header(
-    const coll_t& cid,                ///< [in] Collection containing oid
-    const ghobject_t &oid,   ///< [in] Object containing omap
-    bufferlist *header,      ///< [out] omap header
-    bool allow_eio = false ///< [in] don't assert on eio
-    ) override;
   int omap_get_header(
     CollectionHandle &c,                ///< [in] Collection containing oid
     const ghobject_t &oid,   ///< [in] Object containing omap
@@ -2446,11 +2566,6 @@ public:
     ) override;
 
   /// Get keys defined on oid
-  int omap_get_keys(
-    const coll_t& cid,              ///< [in] Collection containing oid
-    const ghobject_t &oid, ///< [in] Object containing omap
-    set<string> *keys      ///< [out] Keys defined on oid
-    ) override;
   int omap_get_keys(
     CollectionHandle &c,              ///< [in] Collection containing oid
     const ghobject_t &oid, ///< [in] Object containing omap
@@ -2458,12 +2573,6 @@ public:
     ) override;
 
   /// Get key values
-  int omap_get_values(
-    const coll_t& cid,                    ///< [in] Collection containing oid
-    const ghobject_t &oid,       ///< [in] Object containing omap
-    const set<string> &keys,     ///< [in] Keys to get
-    map<string, bufferlist> *out ///< [out] Returned keys and values
-    ) override;
   int omap_get_values(
     CollectionHandle &c,         ///< [in] Collection containing oid
     const ghobject_t &oid,       ///< [in] Object containing omap
@@ -2472,12 +2581,6 @@ public:
     ) override;
 
   /// Filters keys into out which are defined on oid
-  int omap_check_keys(
-    const coll_t& cid,                ///< [in] Collection containing oid
-    const ghobject_t &oid,   ///< [in] Object containing omap
-    const set<string> &keys, ///< [in] Keys to check
-    set<string> *out         ///< [out] Subset of keys defined on oid
-    ) override;
   int omap_check_keys(
     CollectionHandle &c,                ///< [in] Collection containing oid
     const ghobject_t &oid,   ///< [in] Object containing omap
@@ -2485,10 +2588,6 @@ public:
     set<string> *out         ///< [out] Subset of keys defined on oid
     ) override;
 
-  ObjectMap::ObjectMapIterator get_omap_iterator(
-    const coll_t& cid,              ///< [in] collection
-    const ghobject_t &oid  ///< [in] object
-    ) override;
   ObjectMap::ObjectMapIterator get_omap_iterator(
     CollectionHandle &c,   ///< [in] collection
     const ghobject_t &oid  ///< [in] object
@@ -2506,13 +2605,13 @@ public:
   }
 
   struct BSPerfTracker {
-    PerfCounters::avg_tracker<uint64_t> os_commit_latency;
-    PerfCounters::avg_tracker<uint64_t> os_apply_latency;
+    PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
+    PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
 
     objectstore_perf_stat_t get_cur_stats() const {
       objectstore_perf_stat_t ret;
-      ret.os_commit_latency = os_commit_latency.current_avg();
-      ret.os_apply_latency = os_apply_latency.current_avg();
+      ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
+      ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
       return ret;
     }
 
@@ -2528,7 +2627,7 @@ public:
   }
 
   int queue_transactions(
-    Sequencer *osr,
+    CollectionHandle& ch,
     vector<Transaction>& tls,
     TrackedOpRef op = TrackedOpRef(),
     ThreadPool::TPHandle *handle = NULL) override;
@@ -2542,14 +2641,47 @@ public:
     RWLock::WLocker l(debug_read_error_lock);
     debug_mdata_error_objects.insert(o);
   }
+
+  /// methods to inject various errors fsck can repair
+  void inject_broken_shared_blob_key(const string& key,
+                        const bufferlist& bl);
+  void inject_leaked(uint64_t len);
+  void inject_false_free(coll_t cid, ghobject_t oid);
+  void inject_statfs(const string& key, const store_statfs_t& new_statfs);
+  void inject_misreference(coll_t cid1, ghobject_t oid1,
+                          coll_t cid2, ghobject_t oid2,
+                          uint64_t offset);
+
   void compact() override {
-    assert(db);
+    ceph_assert(db);
     db->compact();
   }
   bool has_builtin_csum() const override {
     return true;
   }
 
+  /*
+  Allocate space for BlueFS from slow device.
+  Either automatically applies allocated extents to underlying 
+  BlueFS (extents == nullptr) or just return them (non-null extents) provided
+  */
+  int allocate_bluefs_freespace(
+    uint64_t min_size,
+    uint64_t size,
+    PExtentVector* extents);
+
+  inline void log_latency(const char* name,
+    int idx,
+    const ceph::timespan& lat,
+    double lat_threshold,
+    const char* info = "") const;
+
+  inline void log_latency_fn(const char* name,
+    int idx,
+    const ceph::timespan& lat,
+    double lat_threshold,
+    std::function<string (const ceph::timespan& lat)> fn) const;
+
 private:
   bool _debug_data_eio(const ghobject_t& o) {
     if (!cct->_conf->bluestore_debug_inject_read_err) {
@@ -2572,6 +2704,44 @@ private:
       debug_mdata_error_objects.erase(o);
     }
   }
+private:
+  ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
+  string failed_cmode;
+  set<string> failed_compressors;
+  string spillover_alert;
+  string legacy_statfs_alert;
+  string disk_size_mismatch_alert;
+
+  void _log_alerts(osd_alert_list_t& alerts);
+  bool _set_compression_alert(bool cmode, const char* s) {
+    std::lock_guard l(qlock);
+    if (cmode) {
+      bool ret = failed_cmode.empty();
+      failed_cmode = s;
+      return ret;
+    }
+    return failed_compressors.emplace(s).second;
+  }
+  void _clear_compression_alert() {
+    std::lock_guard l(qlock);
+    failed_compressors.clear();
+    failed_cmode.clear();
+  }
+
+  void _set_spillover_alert(const string& s) {
+    std::lock_guard l(qlock);
+    spillover_alert = s;
+  }
+  void _clear_spillover_alert() {
+    std::lock_guard l(qlock);
+    spillover_alert.clear();
+  }
+
+  void _check_legacy_statfs_alert();
+  void _set_disk_size_mismatch_alert(const string& s) {
+    std::lock_guard l(qlock);
+    disk_size_mismatch_alert = s;
+  }
 
 private:
 
@@ -2697,10 +2867,6 @@ private:
     WriteContext *wctx,
     set<SharedBlob*> *maybe_unshared_blobs=0);
 
-  int _do_transaction(Transaction *t,
-                     TransContext *txc,
-                     ThreadPool::TPHandle *handle);
-
   int _write(TransContext *txc,
             CollectionRef& c,
             OnodeRef& o,
@@ -2779,7 +2945,7 @@ private:
   int _rmattrs(TransContext *txc,
               CollectionRef& c,
               OnodeRef& o);
-  void _do_omap_clear(TransContext *txc, uint64_t id);
+  void _do_omap_clear(TransContext *txc, const string& prefix, uint64_t id);
   int _omap_clear(TransContext *txc,
                  CollectionRef& c,
                  OnodeRef& o);
@@ -2829,14 +2995,45 @@ private:
                         unsigned bits, CollectionRef *c);
   int _remove_collection(TransContext *txc, const coll_t &cid,
                          CollectionRef *c);
+  void _do_remove_collection(TransContext *txc, CollectionRef *c);
   int _split_collection(TransContext *txc,
                        CollectionRef& c,
                        CollectionRef& d,
                        unsigned bits, int rem);
+  int _merge_collection(TransContext *txc,
+                       CollectionRef *c,
+                       CollectionRef& d,
+                       unsigned bits);
+
+private:
+  std::atomic<uint64_t> out_of_sync_fm = {0};
+  // --------------------------------------------------------
+  // BlueFSDeviceExpander implementation
+  uint64_t get_recommended_expansion_delta(uint64_t bluefs_free,
+    uint64_t bluefs_total) override {
+    auto delta = _get_bluefs_size_delta(bluefs_free, bluefs_total);
+    return delta > 0 ? delta : 0;
+  }
+  int allocate_freespace(
+    uint64_t min_size,
+    uint64_t size,
+    PExtentVector& extents) override {
+    return allocate_bluefs_freespace(min_size, size, &extents);
+  };
 };
 
-inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
-  return out << *s.parent;
+inline ostream& operator<<(ostream& out, const BlueStore::volatile_statfs& s) {
+  return out 
+    << " allocated:"
+      << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
+    << " stored:"
+      << s.values[BlueStore::volatile_statfs::STATFS_STORED]
+    << " compressed:"
+      << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
+    << " compressed_orig:"
+      << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
+    << " compressed_alloc:"
+      << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
 }
 
 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
@@ -2853,4 +3050,188 @@ static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
   o->put();
 }
 
+class BlueStoreRepairer
+{
+public:
+  // to simplify future potential migration to mempools
+  using fsck_interval = interval_set<uint64_t>;
+
+  // Structure to track what pextents are used for specific cid/oid.
+  // Similar to Bloom filter positive and false-positive matches are 
+  // possible only.
+  // Maintains two lists of bloom filters for both cids and oids
+  //   where each list entry is a BF for specific disk pextent
+  //   The length of the extent per filter is measured on init.
+  // Allows to filter out 'uninteresting' pextents to speadup subsequent
+  //  'is_used' access. 
+  struct StoreSpaceTracker {
+    const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
+    const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
+    const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
+    static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
+
+    typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
+    bloom_vector collections_bfs;
+    bloom_vector objects_bfs;
+    
+    bool was_filtered_out = false; 
+    uint64_t granularity = 0; // extent length for a single filter
+
+    StoreSpaceTracker() {
+    }
+    StoreSpaceTracker(const StoreSpaceTracker& from) :
+      collections_bfs(from.collections_bfs),
+      objects_bfs(from.objects_bfs),
+      granularity(from.granularity) {
+    }
+
+    void init(uint64_t total,
+             uint64_t min_alloc_size,
+             uint64_t mem_cap = DEF_MEM_CAP) {
+      ceph_assert(!granularity); // not initialized yet
+      ceph_assert(min_alloc_size && isp2(min_alloc_size));
+      ceph_assert(mem_cap);
+      
+      total = round_up_to(total, min_alloc_size);
+      granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
+
+      if (!granularity) {
+       granularity = min_alloc_size;
+      } else {
+       granularity = round_up_to(granularity, min_alloc_size);
+      }
+
+      uint64_t entries = round_up_to(total, granularity) / granularity;
+      collections_bfs.resize(entries,
+        bloom_filter(BLOOM_FILTER_SALT_COUNT,
+                     BLOOM_FILTER_TABLE_SIZE,
+                     0,
+                     BLOOM_FILTER_EXPECTED_COUNT));
+      objects_bfs.resize(entries, 
+        bloom_filter(BLOOM_FILTER_SALT_COUNT,
+                     BLOOM_FILTER_TABLE_SIZE,
+                     0,
+                     BLOOM_FILTER_EXPECTED_COUNT));
+    }
+    inline uint32_t get_hash(const coll_t& cid) const {
+      return cid.hash_to_shard(1);
+    }
+    inline void set_used(uint64_t offset, uint64_t len,
+                        const coll_t& cid, const ghobject_t& oid) {
+      ceph_assert(granularity); // initialized
+      
+      // can't call this func after filter_out has been applied
+      ceph_assert(!was_filtered_out);
+      if (!len) {
+       return;
+      }
+      auto pos = offset / granularity;
+      auto end_pos = (offset + len - 1) / granularity;
+      while (pos <= end_pos) {
+        collections_bfs[pos].insert(get_hash(cid));
+        objects_bfs[pos].insert(oid.hobj.get_hash());
+        ++pos;
+      }
+    }
+    // filter-out entries unrelated to the specified(broken) extents.
+    // 'is_used' calls are permitted after that only
+    size_t filter_out(const fsck_interval& extents);
+
+    // determines if collection's present after filtering-out 
+    inline bool is_used(const coll_t& cid) const {
+      ceph_assert(was_filtered_out);
+      for(auto& bf : collections_bfs) {
+        if (bf.contains(get_hash(cid))) {
+          return true;
+        }
+      }
+      return false;
+    }
+    // determines if object's present after filtering-out 
+    inline bool is_used(const ghobject_t& oid) const {
+      ceph_assert(was_filtered_out);
+      for(auto& bf : objects_bfs) {
+        if (bf.contains(oid.hobj.get_hash())) {
+          return true;
+        }
+      }
+      return false;
+    }
+    // determines if collection's present before filtering-out 
+    inline bool is_used(const coll_t& cid, uint64_t offs) const {
+      ceph_assert(granularity); // initialized
+      ceph_assert(!was_filtered_out);
+      auto &bf = collections_bfs[offs / granularity];
+      if (bf.contains(get_hash(cid))) {
+        return true;
+      }
+      return false;
+    }
+    // determines if object's present before filtering-out 
+    inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
+      ceph_assert(granularity); // initialized
+      ceph_assert(!was_filtered_out);
+      auto &bf = objects_bfs[offs / granularity];
+      if (bf.contains(oid.hobj.get_hash())) {
+        return true;
+      }
+      return false;
+    }
+  };
+public:
+
+  bool remove_key(KeyValueDB *db, const string& prefix, const string& key);
+  bool fix_shared_blob(KeyValueDB *db,
+                        uint64_t sbid,
+                      const bufferlist* bl);
+  bool fix_statfs(KeyValueDB *db, const string& key,
+    const store_statfs_t& new_statfs);
+
+  bool fix_leaked(KeyValueDB *db,
+                 FreelistManager* fm,
+                 uint64_t offset, uint64_t len);
+  bool fix_false_free(KeyValueDB *db,
+                     FreelistManager* fm,
+                     uint64_t offset, uint64_t len);
+  bool fix_bluefs_extents(std::atomic<uint64_t>& out_of_sync_flag);
+
+  void init(uint64_t total_space, uint64_t lres_tracking_unit_size);
+
+  bool preprocess_misreference(KeyValueDB *db);
+
+  unsigned apply(KeyValueDB* db);
+
+  void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
+    misreferenced_extents.union_insert(offs, len);
+    if (inc_error) {
+      ++to_repair_cnt;
+    }
+  }
+
+  StoreSpaceTracker& get_space_usage_tracker() {
+    return space_usage_tracker;
+  }
+  const fsck_interval& get_misreferences() const {
+    return misreferenced_extents;
+  }
+  KeyValueDB::Transaction get_fix_misreferences_txn() {
+    return fix_misreferences_txn;
+  }
+
+private:
+  unsigned to_repair_cnt = 0;
+  KeyValueDB::Transaction fix_fm_leaked_txn;
+  KeyValueDB::Transaction fix_fm_false_free_txn;
+  KeyValueDB::Transaction remove_key_txn;
+  KeyValueDB::Transaction fix_statfs_txn;
+  KeyValueDB::Transaction fix_shared_blob_txn;
+
+  KeyValueDB::Transaction fix_misreferences_txn;
+
+  StoreSpaceTracker space_usage_tracker;
+
+  // non-shared extents with multiple references
+  fsck_interval misreferenced_extents;
+
+};
 #endif