]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/BlueStore.cc
update sources to v12.1.0
[ceph.git] / ceph / src / os / bluestore / BlueStore.cc
index 789c05f0ae97643c876f3034503ac26585e7e49a..38d1e272ebc5a30fac5409213e83c922aa1b6088 100644 (file)
@@ -17,6 +17,8 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 
+#include "include/cpp-btree/btree_set.h"
+
 #include "BlueStore.h"
 #include "os/kv.h"
 #include "include/compat.h"
 #define dout_context cct
 #define dout_subsys ceph_subsys_bluestore
 
-// bluestore_meta_onode
+using bid_t = decltype(BlueStore::Blob::id);
+
+// bluestore_cache_onode
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Onode, bluestore_onode,
-                             bluestore_meta_onode);
+                             bluestore_cache_onode);
 
-// bluestore_meta_other
+// bluestore_cache_other
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Buffer, bluestore_buffer,
-                             bluestore_meta_other);
+                             bluestore_cache_other);
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Extent, bluestore_extent,
-                             bluestore_meta_other);
+                             bluestore_cache_other);
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Blob, bluestore_blob,
-                             bluestore_meta_other);
+                             bluestore_cache_other);
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::SharedBlob, bluestore_shared_blob,
-                             bluestore_meta_other);
+                             bluestore_cache_other);
+
+// bluestore_txc
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::TransContext, bluestore_transcontext,
+                             bluestore_txc);
+
 
 // kv store prefixes
 const string PREFIX_SUPER = "S";   // field -> value
@@ -654,7 +663,7 @@ void BlueStore::GarbageCollector::process_protrusive_extents(
             }
             bExit = it == bi.last_lextent;
             ++it;
-          } while(!bExit);
+          } while (!bExit);
         }
         expected_for_release += blob_expected_for_release;
         expected_allocations += bi.expected_allocations;
@@ -740,13 +749,12 @@ void BlueStore::Cache::trim_all()
 {
   std::lock_guard<std::recursive_mutex> l(lock);
   _trim(0, 0);
-  assert(_get_num_onodes() == 0);
-  assert(_get_buffer_bytes() == 0);
 }
 
 void BlueStore::Cache::trim(
   uint64_t target_bytes,
   float target_meta_ratio,
+  float target_data_ratio,
   float bytes_per_onode)
 {
   std::lock_guard<std::recursive_mutex> l(lock);
@@ -754,23 +762,18 @@ void BlueStore::Cache::trim(
   uint64_t current_buffer = _get_buffer_bytes();
   uint64_t current = current_meta + current_buffer;
 
-  uint64_t target_meta = target_bytes * (double)target_meta_ratio; //need to cast to double
-                                                                   //since float(1) might produce inaccurate value
-                                                                   // for target_meta (a bit greater than target_bytes)
-                                                                   // that causes overflow in target_buffer below.
-                                                                   //Consider the following code:
-                                                                   //uint64_t i =(uint64_t)227*1024*1024*1024 + 1;
-                                                                   //float f = 1;
-                                                                   //uint64_t i2 = i*f;
-                                                                   //assert(i == i2);
+  uint64_t target_meta = target_bytes * target_meta_ratio;
+  uint64_t target_buffer = target_bytes * target_data_ratio;
 
-  target_meta = min(target_bytes, target_meta); //and just in case that ratio is > 1
-  uint64_t target_buffer = target_bytes - target_meta;
+  // correct for overflow or float imprecision
+  target_meta = min(target_bytes, target_meta);
+  target_buffer = min(target_bytes - target_meta, target_buffer);
 
   if (current <= target_bytes) {
     dout(10) << __func__
             << " shard target " << pretty_si_t(target_bytes)
-            << " ratio " << target_meta_ratio << " ("
+            << " meta/data ratios " << target_meta_ratio
+            << " + " << target_data_ratio << " ("
             << pretty_si_t(target_meta) << " + "
             << pretty_si_t(target_buffer) << "), "
             << " current " << pretty_si_t(current) << " ("
@@ -1270,14 +1273,18 @@ int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t len
        if (b->data.length()) {
          bufferlist bl;
          bl.substr_of(b->data, b->length - tail, tail);
-         _add_buffer(cache, new Buffer(this, b->state, b->seq, end, bl), 0, b);
+         Buffer *nb = new Buffer(this, b->state, b->seq, end, bl);
+         nb->maybe_rebuild();
+         _add_buffer(cache, nb, 0, b);
        } else {
-         _add_buffer(cache, new Buffer(this, b->state, b->seq, end, tail), 0, b);
+         _add_buffer(cache, new Buffer(this, b->state, b->seq, end, tail),
+                     0, b);
        }
        if (!b->is_writing()) {
          cache->_adjust_buffer_size(b, front - (int64_t)b->length);
        }
        b->truncate(front);
+       b->maybe_rebuild();
        cache->_audit("discard end 1");
        break;
       } else {
@@ -1286,6 +1293,7 @@ int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t len
          cache->_adjust_buffer_size(b, front - (int64_t)b->length);
        }
        b->truncate(front);
+       b->maybe_rebuild();
        ++i;
        continue;
       }
@@ -1300,7 +1308,9 @@ int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t len
     if (b->data.length()) {
       bufferlist bl;
       bl.substr_of(b->data, b->length - keep, keep);
-      _add_buffer(cache, new Buffer(this, b->state, b->seq, end, bl), 0, b);
+      Buffer *nb = new Buffer(this, b->state, b->seq, end, bl);
+      nb->maybe_rebuild();
+      _add_buffer(cache, nb, 0, b);
     } else {
       _add_buffer(cache, new Buffer(this, b->state, b->seq, end, keep), 0, b);
     }
@@ -1397,6 +1407,8 @@ void BlueStore::BufferSpace::finish_write(Cache* cache, uint64_t seq)
     } else {
       b->state = Buffer::STATE_CLEAN;
       writing.erase(i++);
+      b->maybe_rebuild();
+      b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
       cache->_add_buffer(b, 1, nullptr);
       ldout(cache->cct, 20) << __func__ << " added " << *b << dendl;
     }
@@ -1513,7 +1525,7 @@ void BlueStore::OnodeSpace::rename(
   OnodeRef& oldo,
   const ghobject_t& old_oid,
   const ghobject_t& new_oid,
-  const mempool::bluestore_meta_other::string& new_okey)
+  const mempool::bluestore_cache_other::string& new_okey)
 {
   std::lock_guard<std::recursive_mutex> l(cache->lock);
   ldout(cache->cct, 30) << __func__ << " " << old_oid << " -> " << new_oid
@@ -1622,10 +1634,15 @@ void BlueStore::SharedBlob::get_ref(uint64_t offset, uint32_t length)
 }
 
 void BlueStore::SharedBlob::put_ref(uint64_t offset, uint32_t length,
-  PExtentVector *r)
+                                   PExtentVector *r,
+                                   set<SharedBlob*> *maybe_unshared)
 {
   assert(persistent);
-  persistent->ref_map.put(offset, length, r);
+  bool maybe = false;
+  persistent->ref_map.put(offset, length, r, maybe_unshared ? &maybe : nullptr);
+  if (maybe_unshared && maybe) {
+    maybe_unshared->insert(this);
+  }
 }
 
 // Blob
@@ -1924,6 +1941,7 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
     auto p = shards.begin();
     auto prev_p = p;
     while (p != shards.end()) {
+      assert(p->shard_info->offset >= prev_p->shard_info->offset);
       auto n = p;
       ++n;
       if (p->dirty) {
@@ -1957,20 +1975,13 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
          // avoid resharding the trailing shard, even if it is small
          else if (n != shards.end() &&
                   len < g_conf->bluestore_extent_map_shard_min_size) {
-           // we are small; combine with a neighbor
-           if (p == shards.begin() && endoff == OBJECT_MAX_SIZE) {
-             // we are an only shard
-             request_reshard(0, OBJECT_MAX_SIZE);
-             return;
-           } else if (p == shards.begin()) {
-             // combine with next shard
+            assert(endoff != OBJECT_MAX_SIZE);
+           if (p == shards.begin()) {
+             // we are the first shard, combine with next shard
              request_reshard(p->shard_info->offset, endoff + 1);
-           } else if (endoff == OBJECT_MAX_SIZE) {
-             // combine with previous shard
-             request_reshard(prev_p->shard_info->offset, endoff);
-             return;
            } else {
-             // combine with the smaller of the two
+             // combine either with the previous shard or the next,
+             // whichever is smaller
              if (prev_p->shard_info->bytes > n->shard_info->bytes) {
                request_reshard(p->shard_info->offset, endoff + 1);
              } else {
@@ -2004,6 +2015,28 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
   }
 }
 
+bid_t BlueStore::ExtentMap::allocate_spanning_blob_id()
+{
+  if (spanning_blob_map.empty())
+    return 0;
+  bid_t bid = spanning_blob_map.rbegin()->first + 1;
+  // bid is valid and available.
+  if (bid >= 0)
+    return bid;
+  // Find next unused bid;
+  bid = rand() % (numeric_limits<bid_t>::max() + 1);
+  const auto begin_bid = bid;
+  do {
+    if (!spanning_blob_map.count(bid))
+      return bid;
+    else {
+      bid++;
+      if (bid < 0) bid = 0;
+    }
+  } while (bid != begin_bid);
+  assert(0 == "no available blob id");
+}
+
 void BlueStore::ExtentMap::reshard(
   KeyValueDB *db,
   KeyValueDB::Transaction t)
@@ -2080,7 +2113,7 @@ void BlueStore::ExtentMap::reshard(
 
   // reshard
   unsigned estimate = 0;
-  unsigned offset = 0;
+  unsigned offset = needs_reshard_begin;
   vector<bluestore_onode_t::shard_info> new_shard_info;
   unsigned max_blob_end = 0;
   Extent dummy(needs_reshard_begin);
@@ -2097,11 +2130,11 @@ void BlueStore::ExtentMap::reshard(
     if (estimate &&
        estimate + extent_avg > target + (would_span ? slop : 0)) {
       // new shard
-      if (offset == 0) {
+      if (offset == needs_reshard_begin) {
        new_shard_info.emplace_back(bluestore_onode_t::shard_info());
        new_shard_info.back().offset = offset;
        dout(20) << __func__ << "  new shard 0x" << std::hex << offset
-                << std::dec << dendl;
+                 << std::dec << dendl;
       }
       offset = e->logical_offset;
       new_shard_info.emplace_back(bluestore_onode_t::shard_info());
@@ -2111,9 +2144,9 @@ void BlueStore::ExtentMap::reshard(
       estimate = 0;
     }
     estimate += extent_avg;
-    unsigned bb = e->blob_start();
-    if (bb < spanning_scan_begin) {
-      spanning_scan_begin = bb;
+    unsigned bs = e->blob_start();
+    if (bs < spanning_scan_begin) {
+      spanning_scan_begin = bs;
     }
     uint32_t be = e->blob_end();
     if (be > max_blob_end) {
@@ -2149,16 +2182,21 @@ void BlueStore::ExtentMap::reshard(
       new_shard_info.begin(),
       new_shard_info.end());
     shards.insert(shards.begin() + si_begin, new_shard_info.size(), Shard());
-    unsigned n = sv.size();
     si_end = si_begin + new_shard_info.size();
-    for (unsigned i = si_begin; i < si_end; ++i) {
+
+    assert(sv.size() == shards.size());
+
+    // note that we need to update every shard_info of shards here,
+    // as sv might have been totally re-allocated above
+    for (unsigned i = 0; i < shards.size(); i++) {
       shards[i].shard_info = &sv[i];
+    }
+
+    // mark newly added shards as dirty
+    for (unsigned i = si_begin; i < si_end; ++i) {
       shards[i].loaded = true;
       shards[i].dirty = true;
     }
-    for (unsigned i = si_end; i < n; ++i) {
-      shards[i].shard_info = &sv[i];
-    }
   }
   dout(20) << __func__ << "  fin " << sv << dendl;
   inline_bl.clear();
@@ -2181,7 +2219,7 @@ void BlueStore::ExtentMap::reshard(
     }
     if (spanning_scan_end > needs_reshard_end) {
       fault_range(db, needs_reshard_end,
-                 spanning_scan_end - needs_reshard_begin);
+                 spanning_scan_end - needs_reshard_end);
     }
     auto sp = sv.begin() + si_begin;
     auto esp = sv.end();
@@ -2193,12 +2231,6 @@ void BlueStore::ExtentMap::reshard(
     } else {
       shard_end = sp->offset;
     }
-    int bid;
-    if (spanning_blob_map.empty()) {
-      bid = 0;
-    } else {
-      bid = spanning_blob_map.rbegin()->first + 1;
-    }
     Extent dummy(needs_reshard_begin);
     for (auto e = extent_map.lower_bound(dummy); e != extent_map.end(); ++e) {
       if (e->logical_offset >= needs_reshard_end) {
@@ -2252,7 +2284,8 @@ void BlueStore::ExtentMap::reshard(
            must_span = true;
          }
          if (must_span) {
-           b->id = bid++;
+            auto bid = allocate_spanning_blob_id();
+            b->id = bid;
            spanning_blob_map[b->id] = b;
            dout(20) << __func__ << "    adding spanning " << *b << dendl;
          }
@@ -2287,8 +2320,6 @@ bool BlueStore::ExtentMap::encode_some(
 
   unsigned n = 0;
   size_t bound = 0;
-  denc(struct_v, bound);
-  denc_varint(0, bound);
   bool must_reshard = false;
   for (auto p = start;
        p != extent_map.end() && p->logical_offset < end;
@@ -2301,21 +2332,26 @@ bool BlueStore::ExtentMap::encode_some(
       request_reshard(p->blob_start(), p->blob_end());
       must_reshard = true;
     }
-    denc_varint(0, bound); // blobid
-    denc_varint(0, bound); // logical_offset
-    denc_varint(0, bound); // len
-    denc_varint(0, bound); // blob_offset
+    if (!must_reshard) {
+      denc_varint(0, bound); // blobid
+      denc_varint(0, bound); // logical_offset
+      denc_varint(0, bound); // len
+      denc_varint(0, bound); // blob_offset
 
-    p->blob->bound_encode(
-      bound,
-      struct_v,
-      p->blob->shared_blob->get_sbid(),
-      false);
+      p->blob->bound_encode(
+        bound,
+        struct_v,
+        p->blob->shared_blob->get_sbid(),
+        false);
+    }
   }
   if (must_reshard) {
     return true;
   }
 
+  denc(struct_v, bound);
+  denc_varint(0, bound); // number of extents
+
   {
     auto app = bl.get_contiguous_appender(bound);
     denc(struct_v, app);
@@ -2572,7 +2608,6 @@ void BlueStore::ExtentMap::fault_range(
 }
 
 void BlueStore::ExtentMap::dirty_range(
-  KeyValueDB::Transaction t,
   uint32_t offset,
   uint32_t length)
 {
@@ -2614,15 +2649,6 @@ BlueStore::extent_map_t::iterator BlueStore::ExtentMap::find(
   return extent_map.find(dummy);
 }
 
-BlueStore::extent_map_t::iterator BlueStore::ExtentMap::find_lextent(
-  uint64_t offset)
-{
-  auto fp = seek_lextent(offset);
-  if (fp != extent_map.end() && fp->logical_offset > offset)
-    return extent_map.end();  // extent is past offset
-  return fp;
-}
-
 BlueStore::extent_map_t::iterator BlueStore::ExtentMap::seek_lextent(
   uint64_t offset)
 {
@@ -2905,6 +2931,8 @@ void BlueStore::DeferredBatch::prepare_write(
   assert(i.second);  // this should be a new insertion
   i.first->second.seq = seq;
   blp.copy(length, i.first->second.bl);
+  i.first->second.bl.reassign_to_mempool(
+    mempool::mempool_bluestore_writing_deferred);
   dout(20) << __func__ << " seq " << seq
           << " 0x" << std::hex << offset << "~" << length
           << " crc " << i.first->second.bl.crc32c(-1)
@@ -3057,14 +3085,12 @@ void BlueStore::Collection::load_shared_blob(SharedBlobRef sb)
 
 void BlueStore::Collection::make_blob_shared(uint64_t sbid, BlobRef b)
 {
-  assert(!b->shared_blob->is_loaded());
-
   ldout(store->cct, 10) << __func__ << " " << *b << dendl;
-  bluestore_blob_t& blob = b->dirty_blob();
+  assert(!b->shared_blob->is_loaded());
 
   // update blob
+  bluestore_blob_t& blob = b->dirty_blob();
   blob.set_flag(bluestore_blob_t::FLAG_SHARED);
-  blob.clear_flag(bluestore_blob_t::FLAG_MUTABLE);
 
   // update shared blob
   b->shared_blob->loaded = true;
@@ -3080,6 +3106,20 @@ void BlueStore::Collection::make_blob_shared(uint64_t sbid, BlobRef b)
   ldout(store->cct, 20) << __func__ << " now " << *b << dendl;
 }
 
+uint64_t BlueStore::Collection::make_blob_unshared(SharedBlob *sb)
+{
+  ldout(store->cct, 10) << __func__ << " " << *sb << dendl;
+  assert(sb->is_loaded());
+
+  uint64_t sbid = sb->get_sbid();
+  shared_blob_set.remove(sb);
+  sb->loaded = false;
+  delete sb->persistent;
+  sb->sbid_unloaded = 0;
+  ldout(store->cct, 20) << __func__ << " now " << *sb << dendl;
+  return sbid;
+}
+
 BlueStore::OnodeRef BlueStore::Collection::get_onode(
   const ghobject_t& oid,
   bool create)
@@ -3099,7 +3139,7 @@ BlueStore::OnodeRef BlueStore::Collection::get_onode(
   if (o)
     return o;
 
-  mempool::bluestore_meta_other::string key;
+  mempool::bluestore_cache_other::string key;
   get_object_key(store->cct, oid, &key);
 
   ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
@@ -3122,7 +3162,7 @@ BlueStore::OnodeRef BlueStore::Collection::get_onode(
     assert(r >= 0);
     on = new Onode(this, oid, key);
     on->exists = true;
-    bufferptr::iterator p = v.front().begin();
+    bufferptr::iterator p = v.front().begin_deep();
     on->onode.decode(p);
 
     // initialize extent_map
@@ -3189,12 +3229,13 @@ void BlueStore::Collection::split_cache(
        }
        ldout(store->cct, 20) << __func__ << "  moving " << *sb << dendl;
        sb->coll = dest;
+       if (sb->get_sbid()) {
+         ldout(store->cct, 20) << __func__
+                               << "   moving registration " << *sb << dendl;
+         shared_blob_set.remove(sb);
+         dest->shared_blob_set.add(dest, sb);
+       }
        if (dest->cache != cache) {
-         if (sb->get_sbid()) {
-           ldout(store->cct, 20) << __func__ << "   moving registration " << *sb << dendl;
-           shared_blob_set.remove(sb);
-           dest->shared_blob_set.add(dest, sb);
-         }
          for (auto& i : sb->bc.buffer_map) {
            if (!i.second->is_writing()) {
              ldout(store->cct, 20) << __func__ << "   moving " << *i.second
@@ -3204,54 +3245,41 @@ void BlueStore::Collection::split_cache(
          }
        }
       }
-
-
     }
   }
 }
 
-void BlueStore::Collection::trim_cache()
-{
-  // see if mempool stats have updated
-  uint64_t total_bytes;
-  uint64_t total_onodes;
-  size_t seq;
-  store->get_mempool_stats(&seq, &total_bytes, &total_onodes);
-  if (seq == cache->last_trim_seq) {
-    ldout(store->cct, 30) << __func__ << " no new mempool stats; nothing to do"
-                         << dendl;
-    return;
-  }
-  cache->last_trim_seq = seq;
-
-  // trim
-  if (total_onodes < 2) {
-    total_onodes = 2;
-  }
-  float bytes_per_onode = (float)total_bytes / (float)total_onodes;
-  size_t num_shards = store->cache_shards.size();
-  uint64_t shard_target = store->cct->_conf->bluestore_cache_size / num_shards;
-  ldout(store->cct, 30) << __func__
-                       << " total meta bytes " << total_bytes
-                       << ", total onodes " << total_onodes
-                       << ", bytes_per_onode " << bytes_per_onode
-          << dendl;
-  cache->trim(shard_target, store->cct->_conf->bluestore_cache_meta_ratio,
-             bytes_per_onode);
-
-  store->_update_cache_logger();
-}
-
 // =======================================================
 
 void *BlueStore::MempoolThread::entry()
 {
   Mutex::Locker l(lock);
   while (!stop) {
-    store->mempool_bytes = mempool::bluestore_meta_other::allocated_bytes() +
-      mempool::bluestore_meta_onode::allocated_bytes();
-    store->mempool_onodes = mempool::bluestore_meta_onode::allocated_items();
-    ++store->mempool_seq;
+    uint64_t meta_bytes =
+      mempool::bluestore_cache_other::allocated_bytes() +
+      mempool::bluestore_cache_onode::allocated_bytes();
+    uint64_t onode_num =
+      mempool::bluestore_cache_onode::allocated_items();
+
+    if (onode_num < 2) {
+      onode_num = 2;
+    }
+
+    float bytes_per_onode = (float)meta_bytes / (float)onode_num;
+    size_t num_shards = store->cache_shards.size();
+    float target_ratio = store->cache_meta_ratio + store->cache_data_ratio;
+    // A little sloppy but should be close enough
+    uint64_t shard_target = target_ratio * (store->cct->_conf->bluestore_cache_size / num_shards);
+
+    for (auto i : store->cache_shards) {
+      i->trim(shard_target,
+             store->cache_meta_ratio,
+             store->cache_data_ratio,
+             bytes_per_onode);
+    }
+
+    store->_update_cache_logger();
+
     utime_t wait;
     wait += store->cct->_conf->bluestore_cache_trim_interval;
     cond.WaitInterval(lock, wait);
@@ -3262,6 +3290,108 @@ void *BlueStore::MempoolThread::entry()
 
 // =======================================================
 
+// OmapIteratorImpl
+
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore.OmapIteratorImpl(" << this << ") "
+
+BlueStore::OmapIteratorImpl::OmapIteratorImpl(
+  CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
+  : c(c), o(o), it(it)
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    get_omap_key(o->onode.nid, string(), &head);
+    get_omap_tail(o->onode.nid, &tail);
+    it->lower_bound(head);
+  }
+}
+
+int BlueStore::OmapIteratorImpl::seek_to_first()
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    it->lower_bound(head);
+  } else {
+    it = KeyValueDB::Iterator();
+  }
+  return 0;
+}
+
+int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    string key;
+    get_omap_key(o->onode.nid, after, &key);
+    ldout(c->store->cct,20) << __func__ << " after " << after << " key "
+                           << pretty_binary_string(key) << dendl;
+    it->upper_bound(key);
+  } else {
+    it = KeyValueDB::Iterator();
+  }
+  return 0;
+}
+
+int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    string key;
+    get_omap_key(o->onode.nid, to, &key);
+    ldout(c->store->cct,20) << __func__ << " to " << to << " key "
+                           << pretty_binary_string(key) << dendl;
+    it->lower_bound(key);
+  } else {
+    it = KeyValueDB::Iterator();
+  }
+  return 0;
+}
+
+bool BlueStore::OmapIteratorImpl::valid()
+{
+  RWLock::RLocker l(c->lock);
+  bool r = o->onode.has_omap() && it && it->valid() &&
+    it->raw_key().second <= tail;
+  if (it && it->valid()) {
+    ldout(c->store->cct,20) << __func__ << " is at "
+                           << pretty_binary_string(it->raw_key().second)
+                           << dendl;
+  }
+  return r;
+}
+
+int BlueStore::OmapIteratorImpl::next(bool validate)
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    it->next();
+    return 0;
+  } else {
+    return -1;
+  }
+}
+
+string BlueStore::OmapIteratorImpl::key()
+{
+  RWLock::RLocker l(c->lock);
+  assert(it->valid());
+  string db_key = it->raw_key().second;
+  string user_key;
+  decode_omap_key(db_key, &user_key);
+  return user_key;
+}
+
+bufferlist BlueStore::OmapIteratorImpl::value()
+{
+  RWLock::RLocker l(c->lock);
+  assert(it->valid());
+  return it->value();
+}
+
+
+// =====================================
+
 #undef dout_prefix
 #define dout_prefix *_dout << "bluestore(" << path << ") "
 
@@ -3281,22 +3411,12 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
                       cct->_conf->bluestore_throttle_bytes +
                       cct->_conf->bluestore_throttle_deferred_bytes),
     kv_sync_thread(this),
+    kv_finalize_thread(this),
     mempool_thread(this)
 {
   _init_logger();
   cct->_conf->add_observer(this);
   set_cache_shards(1);
-
-  if (cct->_conf->bluestore_shard_finishers) {
-    m_finisher_num = cct->_conf->osd_op_num_shards;
-  }
-
-  for (int i = 0; i < m_finisher_num; ++i) {
-    ostringstream oss;
-    oss << "finisher-" << i;
-    Finisher *f = new Finisher(cct, oss.str(), "finisher");
-    finishers.push_back(f);
-  }
 }
 
 BlueStore::BlueStore(CephContext *cct,
@@ -3309,6 +3429,7 @@ BlueStore::BlueStore(CephContext *cct,
                       cct->_conf->bluestore_throttle_bytes +
                       cct->_conf->bluestore_throttle_deferred_bytes),
     kv_sync_thread(this),
+    kv_finalize_thread(this),
     min_alloc_size(_min_alloc_size),
     min_alloc_size_order(ctz(_min_alloc_size)),
     mempool_thread(this)
@@ -3363,9 +3484,9 @@ const char **BlueStore::get_tracked_conf_keys() const
     "bluestore_compression_max_blob_size_hdd",
     "bluestore_max_alloc_size",
     "bluestore_prefer_deferred_size",
-    "bleustore_deferred_batch_ops",
-    "bleustore_deferred_batch_ops_hdd",
-    "bleustore_deferred_batch_ops_ssd",
+    "bluestore_deferred_batch_ops",
+    "bluestore_deferred_batch_ops_hdd",
+    "bluestore_deferred_batch_ops_ssd",
     "bluestore_throttle_bytes",
     "bluestore_throttle_deferred_bytes",
     "bluestore_throttle_cost_per_io_hdd",
@@ -3524,6 +3645,41 @@ void BlueStore::_set_blob_size()
            << std::dec << dendl;
 }
 
+int BlueStore::_set_cache_sizes()
+{
+  cache_meta_ratio = cct->_conf->bluestore_cache_meta_ratio;
+  cache_kv_ratio = cct->_conf->bluestore_cache_kv_ratio;
+  cache_data_ratio =
+    (double)1.0 - (double)cache_meta_ratio - (double)cache_kv_ratio;
+
+  if (cache_meta_ratio <= 0 || cache_meta_ratio > 1.0) {
+    derr << __func__ << "bluestore_cache_meta_ratio (" << cache_meta_ratio
+        << ") must be in range (0,1.0]" << dendl;
+    return -EINVAL;
+  }
+  if (cache_kv_ratio <= 0 || cache_kv_ratio > 1.0) {
+    derr << __func__ << "bluestore_cache_kv_ratio (" << cache_kv_ratio
+        << ") must be in range (0,1.0]" << dendl;
+    return -EINVAL;
+  }
+  if (cache_meta_ratio + cache_kv_ratio > 1.0) {
+    derr << __func__ << "bluestore_cache_meta_ratio (" << cache_meta_ratio
+        << ") + bluestore_cache_kv_ratio (" << cache_kv_ratio
+        << ") = " << cache_meta_ratio + cache_kv_ratio << "; must be <= 1.0"
+        << dendl;
+    return -EINVAL;
+  }
+  if (cache_data_ratio < 0) {
+    // deal with floating point imprecision
+    cache_data_ratio = 0;
+  }
+  dout(1) << __func__ << " meta " << cache_meta_ratio
+         << " kv " << cache_kv_ratio
+         << " data " << cache_data_ratio
+         << dendl;
+  return 0;
+}
+
 void BlueStore::_init_logger()
 {
   PerfCountersBuilder b(cct, "bluestore",
@@ -3699,6 +3855,12 @@ int BlueStore::get_block_device_fsid(CephContext* cct, const string& path,
 
 int BlueStore::_open_path()
 {
+  // initial sanity check
+  int r = _set_cache_sizes();
+  if (r < 0) {
+    return r;
+  }
+
   assert(path_fd < 0);
   path_fd = ::open(path.c_str(), O_DIRECTORY);
   if (path_fd < 0) {
@@ -3804,7 +3966,11 @@ int BlueStore::_check_or_set_bdev_label(
     int r = _read_bdev_label(cct, path, &label);
     if (r < 0)
       return r;
-    if (label.osd_uuid != fsid) {
+    if (cct->_conf->bluestore_debug_permit_any_bdev_label) {
+      dout(20) << __func__ << " bdev " << path << " fsid " << label.osd_uuid
+          << " and fsid " << fsid << " check bypassed" << dendl;
+    }
+    else if (label.osd_uuid != fsid) {
       derr << __func__ << " bdev " << path << " fsid " << label.osd_uuid
           << " does not match our fsid " << fsid << dendl;
       return -EIO;
@@ -4117,6 +4283,38 @@ int BlueStore::_lock_fsid()
   return 0;
 }
 
+bool BlueStore::is_rotational()
+{
+  if (bdev) {
+    return bdev->is_rotational();
+  }
+
+  bool rotational = true;
+  int r = _open_path();
+  if (r < 0)
+    goto out;
+  r = _open_fsid(false);
+  if (r < 0)
+    goto out_path;
+  r = _read_fsid(&fsid);
+  if (r < 0)
+    goto out_fsid;
+  r = _lock_fsid();
+  if (r < 0)
+    goto out_fsid;
+  r = _open_bdev(false);
+  if (r < 0)
+    goto out_fsid;
+  rotational = bdev->is_rotational();
+  _close_bdev();
+ out_fsid:
+  _close_fsid();
+ out_path:
+  _close_path();
+  out:
+  return rotational;
+}
+
 bool BlueStore::test_mount_in_use()
 {
   // most error conditions mean the mount is not in use (e.g., because
@@ -4222,8 +4420,13 @@ int BlueStore::_open_db(bool create)
       }
       bluefs_shared_bdev = BlueFS::BDEV_SLOW;
       bluefs_single_shared_device = false;
-    } else {
+    } else if (::lstat(bfn.c_str(), &st) == -1) {
       bluefs_shared_bdev = BlueFS::BDEV_DB;
+    } else {
+      //symlink exist is bug
+      derr << __func__ << " " << bfn << " link target doesn't exist" << dendl;
+      r = -errno;
+      goto free_bluefs;
     }
 
     // shared device
@@ -4242,9 +4445,11 @@ int BlueStore::_open_db(bool create)
       initial = MAX(initial, cct->_conf->bluestore_bluefs_min);
       // align to bluefs's alloc_size
       initial = P2ROUNDUP(initial, cct->_conf->bluefs_alloc_size);
-      initial += cct->_conf->bluefs_alloc_size - SUPER_RESERVED;
-      bluefs->add_block_extent(bluefs_shared_bdev, SUPER_RESERVED, initial);
-      bluefs_extents.insert(SUPER_RESERVED, initial);
+      // put bluefs in the middle of the device in case it is an HDD
+      uint64_t start = P2ALIGN((bdev->get_size() - initial) / 2,
+                              cct->_conf->bluefs_alloc_size);
+      bluefs->add_block_extent(bluefs_shared_bdev, start, initial);
+      bluefs_extents.insert(start, initial);
     }
 
     bfn = path + "/block.wal";
@@ -4276,8 +4481,13 @@ int BlueStore::_open_db(bool create)
       }
       cct->_conf->set_val("rocksdb_separate_wal_dir", "true");
       bluefs_single_shared_device = false;
-    } else {
+    } else if (::lstat(bfn.c_str(), &st) == -1) {
       cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
+    } else {
+      //symlink exist is bug
+      derr << __func__ << " " << bfn << " link target doesn't exist" << dendl;
+      r = -errno;
+      goto free_bluefs;
     }
 
     if (create) {
@@ -4374,6 +4584,8 @@ int BlueStore::_open_db(bool create)
   FreelistManager::setup_merge_operators(db);
   db->set_merge_operator(PREFIX_STAT, merge_op);
 
+  db->set_cache_size(cct->_conf->bluestore_cache_size * cache_kv_ratio);
+
   if (kv_backend == "rocksdb")
     options = cct->_conf->bluestore_rocksdb_options;
   db->init(options);
@@ -4625,6 +4837,24 @@ int BlueStore::_open_collections(int *errors)
   return 0;
 }
 
+void BlueStore::open_statfs()
+{
+  bufferlist bl;
+  int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
+  if (r >= 0) {
+    if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
+      auto it = bl.begin();
+      vstatfs.decode(it);
+    }
+    else {
+      dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
+    }
+  }
+  else {
+    dout(10) << __func__ << " store_statfs missed, using empty" << dendl;
+  }
+}
+
 int BlueStore::_setup_block_symlink_or_file(
   string name,
   string epath,
@@ -4878,12 +5108,6 @@ int BlueStore::mkfs()
     }
   }
 
-  // indicate success by writing the 'mkfs_done' file
-  r = write_meta("mkfs_done", "yes");
-  if (r < 0)
-    goto out_close_alloc;
-  dout(10) << __func__ << " success" << dendl;
-
  out_close_alloc:
   _close_alloc();
  out_close_fm:
@@ -4907,8 +5131,16 @@ int BlueStore::mkfs()
       r = -EIO;
     }
   }
+
+  if (r == 0) {
+    // indicate success by writing the 'mkfs_done' file
+    r = write_meta("mkfs_done", "yes");
+  }
+
   if (r < 0) {
     derr << __func__ << " failed, " << cpp_strerror(r) << dendl;
+  } else {
+    dout(0) << __func__ << " success" << dendl;
   }
   return r;
 }
@@ -5006,10 +5238,7 @@ int BlueStore::_mount(bool kv_only)
       goto out_coll;
   }
 
-  for (auto f : finishers) {
-    f->start();
-  }
-  kv_sync_thread.create("bstore_kv_sync");
+  _kv_start();
 
   r = _deferred_replay();
   if (r < 0)
@@ -5023,12 +5252,8 @@ int BlueStore::_mount(bool kv_only)
 
  out_stop:
   _kv_stop();
-  for (auto f : finishers) {
-    f->wait_for_empty();
-    f->stop();
-  }
  out_coll:
-  flush_cache();
+  _flush_cache();
  out_alloc:
   _close_alloc();
  out_fm:
@@ -5056,14 +5281,8 @@ int BlueStore::umount()
 
   dout(20) << __func__ << " stopping kv thread" << dendl;
   _kv_stop();
-  for (auto f : finishers) {
-    dout(20) << __func__ << " draining finisher" << dendl;
-    f->wait_for_empty();
-    dout(20) << __func__ << " stopping finisher" << dendl;
-    f->stop();
-  }
   _reap_collections();
-  flush_cache();
+  _flush_cache();
   dout(20) << __func__ << " closing" << dendl;
 
   mounted = false;
@@ -5144,10 +5363,15 @@ int BlueStore::fsck(bool deep)
 {
   dout(1) << __func__ << (deep ? " (deep)" : " (shallow)") << " start" << dendl;
   int errors = 0;
-  mempool::bluestore_fsck::set<uint64_t> used_nids;
-  mempool::bluestore_fsck::set<uint64_t> used_omap_head;
+
+  typedef btree::btree_set<
+    uint64_t,std::less<uint64_t>,
+    mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
+  uint64_t_btree_t used_nids;
+  uint64_t_btree_t used_omap_head;
+  uint64_t_btree_t used_sbids;
+
   mempool_dynamic_bitset used_blocks;
-  mempool::bluestore_fsck::set<uint64_t> used_sbids;
   KeyValueDB::Iterator it;
   store_statfs_t expected_statfs, actual_statfs;
   struct sb_info_t {
@@ -5209,7 +5433,10 @@ int BlueStore::fsck(bool deep)
 
   mempool_thread.init();
 
+  // we need finishers and kv_{sync,finalize}_thread *just* for replay
+  _kv_start();
   r = _deferred_replay();
+  _kv_stop();
   if (r < 0)
     goto out_scan;
 
@@ -5252,6 +5479,9 @@ int BlueStore::fsck(bool deep)
     spg_t pgid;
     mempool::bluestore_fsck::list<string> expecting_shards;
     for (it->lower_bound(string()); it->valid(); it->next()) {
+      if (g_conf->bluestore_debug_fsck_abort) {
+       goto out_scan;
+      }
       dout(30) << " key " << pretty_binary_string(it->key()) << dendl;
       if (is_extent_shard_key(it->key())) {
        while (!expecting_shards.empty() &&
@@ -5530,7 +5760,6 @@ int BlueStore::fsck(bool deep)
          used_omap_head.insert(o->onode.nid);
        }
       }
-      c->trim_cache();
     }
   }
   dout(1) << __func__ << " checking shared_blobs" << dendl;
@@ -5675,7 +5904,7 @@ int BlueStore::fsck(bool deep)
 
  out_scan:
   mempool_thread.shutdown();
-  flush_cache();
+  _flush_cache();
  out_alloc:
   _close_alloc();
  out_fm:
@@ -5740,27 +5969,16 @@ int BlueStore::statfs(struct store_statfs_t *buf)
     }
   }
 
-  bufferlist bl;
-  int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
-  if (r >= 0) {
-       TransContext::volatile_statfs vstatfs;
-     if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
-       auto it = bl.begin();
-       vstatfs.decode(it);
-
-       buf->allocated = vstatfs.allocated();
-       buf->stored = vstatfs.stored();
-       buf->compressed = vstatfs.compressed();
-       buf->compressed_original = vstatfs.compressed_original();
-       buf->compressed_allocated = vstatfs.compressed_allocated();
-     } else {
-       dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
-     }
-  } else {
-    dout(10) << __func__ << " store_statfs missed, using empty" << dendl;
+  {
+    std::lock_guard<std::mutex> l(vstatfs_lock);
+    
+    buf->allocated = vstatfs.allocated();
+    buf->stored = vstatfs.stored();
+    buf->compressed = vstatfs.compressed();
+    buf->compressed_original = vstatfs.compressed_original();
+    buf->compressed_allocated = vstatfs.compressed_allocated();
   }
 
-
   dout(20) << __func__ << *buf << dendl;
   return 0;
 }
@@ -5870,7 +6088,6 @@ bool BlueStore::exists(CollectionHandle &c_, const ghobject_t& oid)
       r = false;
   }
 
-  c->trim_cache();
   return r;
 }
 
@@ -5908,7 +6125,6 @@ int BlueStore::stat(
     st->st_nlink = 1;
   }
 
-  c->trim_cache();
   int r = 0;
   if (_debug_mdata_eio(oid)) {
     r = -EIO;
@@ -5985,7 +6201,6 @@ int BlueStore::read(
 
  out:
   assert(allow_eio || r != -EIO);
-  c->trim_cache();
   if (r == 0 && _debug_data_eio(oid)) {
     r = -EIO;
     derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
@@ -6036,7 +6251,6 @@ int BlueStore::_do_read(
   uint32_t op_flags)
 {
   FUNCTRACE();
-  boost::intrusive::set<Extent>::iterator ep, eend;
   int r = 0;
 
   dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
@@ -6425,7 +6639,6 @@ int BlueStore::_fiemap(
   }
 
  out:
-  c->trim_cache();
   dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
           << " size = 0x(" << destset << ")" << std::dec << dendl;
   return 0;
@@ -6513,7 +6726,7 @@ int BlueStore::getattr(
   int r;
   {
     RWLock::RLocker l(c->lock);
-    mempool::bluestore_meta_other::string k(name);
+    mempool::bluestore_cache_other::string k(name);
 
     OnodeRef o = c->get_onode(oid, false);
     if (!o || !o->exists) {
@@ -6529,7 +6742,6 @@ int BlueStore::getattr(
     r = 0;
   }
  out:
-  c->trim_cache();
   if (r == 0 && _debug_mdata_eio(oid)) {
     r = -EIO;
     derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
@@ -6577,7 +6789,6 @@ int BlueStore::getattrs(
   }
 
  out:
-  c->trim_cache();
   if (r == 0 && _debug_mdata_eio(oid)) {
     r = -EIO;
     derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
@@ -6654,7 +6865,6 @@ int BlueStore::collection_list(
     r = _collection_list(c, start, end, max, ls, pnext);
   }
 
-  c->trim_cache();
   dout(10) << __func__ << " " << c->cid
     << " start " << start << " end " << end << " max " << max
     << " = " << r << ", ls.size() = " << ls->size()
@@ -6774,91 +6984,6 @@ out:
   return r;
 }
 
-// omap reads
-
-BlueStore::OmapIteratorImpl::OmapIteratorImpl(
-  CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
-  : c(c), o(o), it(it)
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    get_omap_key(o->onode.nid, string(), &head);
-    get_omap_tail(o->onode.nid, &tail);
-    it->lower_bound(head);
-  }
-}
-
-int BlueStore::OmapIteratorImpl::seek_to_first()
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    it->lower_bound(head);
-  } else {
-    it = KeyValueDB::Iterator();
-  }
-  return 0;
-}
-
-int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    string key;
-    get_omap_key(o->onode.nid, after, &key);
-    it->upper_bound(key);
-  } else {
-    it = KeyValueDB::Iterator();
-  }
-  return 0;
-}
-
-int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    string key;
-    get_omap_key(o->onode.nid, to, &key);
-    it->lower_bound(key);
-  } else {
-    it = KeyValueDB::Iterator();
-  }
-  return 0;
-}
-
-bool BlueStore::OmapIteratorImpl::valid()
-{
-  RWLock::RLocker l(c->lock);
-  return o->onode.has_omap() && it && it->valid() && it->raw_key().second <= tail;
-}
-
-int BlueStore::OmapIteratorImpl::next(bool validate)
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    it->next();
-    return 0;
-  } else {
-    return -1;
-  }
-}
-
-string BlueStore::OmapIteratorImpl::key()
-{
-  RWLock::RLocker l(c->lock);
-  assert(it->valid());
-  string db_key = it->raw_key().second;
-  string user_key;
-  decode_omap_key(db_key, &user_key);
-  return user_key;
-}
-
-bufferlist BlueStore::OmapIteratorImpl::value()
-{
-  RWLock::RLocker l(c->lock);
-  assert(it->valid());
-  return it->value();
-}
-
 int BlueStore::omap_get(
   const coll_t& cid,                ///< [in] Collection containing oid
   const ghobject_t &oid,   ///< [in] Object containing omap
@@ -7325,6 +7450,7 @@ int BlueStore::_open_super_meta()
     dout(10) << __func__ << " min_alloc_size 0x" << std::hex << min_alloc_size
             << std::dec << dendl;
   }
+  open_statfs();
   _set_alloc_sizes();
   _set_throttle_params();
 
@@ -7438,6 +7564,11 @@ void BlueStore::_txc_update_store_statfs(TransContext *txc)
   logger->inc(l_bluestore_compressed_allocated, txc->statfs_delta.compressed_allocated());
   logger->inc(l_bluestore_compressed_original, txc->statfs_delta.compressed_original());
 
+  {
+    std::lock_guard<std::mutex> l(vstatfs_lock);
+    vstatfs += txc->statfs_delta;
+  }
+
   bufferlist bl;
   txc->statfs_delta.encode(bl);
 
@@ -7496,7 +7627,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
                   << dendl;
        } else {
          txc->state = TransContext::STATE_KV_SUBMITTED;
-         int r = db->submit_transaction(txc->t);
+         int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
          assert(r == 0);
          _txc_applied_kv(txc);
        }
@@ -7509,6 +7640,9 @@ void BlueStore::_txc_state_proc(TransContext *txc)
          kv_queue_unsubmitted.push_back(txc);
          ++txc->osr->kv_committing_serially;
        }
+       if (txc->had_ios)
+         kv_ios++;
+       kv_throttle_costs += txc->cost;
       }
       return;
     case TransContext::STATE_KV_SUBMITTED:
@@ -7559,6 +7693,9 @@ void BlueStore::_txc_finish_io(TransContext *txc)
   std::lock_guard<std::mutex> l(osr->qlock);
   txc->state = TransContext::STATE_IO_DONE;
 
+  // release aio contexts (including pinned buffers).
+  txc->ioc.running_aios.clear();
+
   OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
   while (p != osr->q.begin()) {
     --p;
@@ -7662,7 +7799,7 @@ void BlueStore::_txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t)
       bufferlist bl;
       ::encode(*(sb->persistent), bl);
       dout(20) << "  shared_blob 0x" << std::hex << sbid << std::dec
-              << " is " << bl.length() << dendl;
+              << " is " << bl.length() << " " << *sb << dendl;
       t->set(PREFIX_SHARED_BLOB, key, bl);
     }
   }
@@ -7784,6 +7921,7 @@ void BlueStore::_txc_finish(TransContext *txc)
   OpSequencerRef osr = txc->osr;
   CollectionRef c;
   bool empty = false;
+  bool submit_deferred = false;
   OpSequencer::q_list_t releasing_txc;
   {
     std::lock_guard<std::mutex> l(osr->qlock);
@@ -7799,6 +7937,10 @@ void BlueStore::_txc_finish(TransContext *txc)
          // for _osr_drain_preceding()
           notify = true;
        }
+       if (txc->state == TransContext::STATE_DEFERRED_QUEUED &&
+           osr->q.size() > g_conf->bluestore_max_deferred_txc) {
+         submit_deferred = true;
+       }
         break;
       }
 
@@ -7828,11 +7970,12 @@ void BlueStore::_txc_finish(TransContext *txc)
     delete txc;
   }
 
-  if (c) {
-    c->trim_cache();
+  if (submit_deferred) {
+    // we're pinning memory; flush!  we could be more fine-grained here but
+    // i'm not sure it's worth the bother.
+    deferred_try_submit();
   }
 
-
   if (empty && osr->zombie) {
     dout(10) << __func__ << " reaping empty zombie osr " << osr << dendl;
     osr->_unregister();
@@ -7899,6 +8042,10 @@ void BlueStore::_osr_drain_all()
     std::lock_guard<std::mutex> l(kv_lock);
     kv_cond.notify_one();
   }
+  {
+    std::lock_guard<std::mutex> l(kv_finalize_lock);
+    kv_finalize_cond.notify_one();
+  }
   for (auto osr : s) {
     dout(20) << __func__ << " drain " << osr << dendl;
     osr->drain();
@@ -7934,10 +8081,83 @@ void BlueStore::_osr_unregister_all()
   }
 }
 
+void BlueStore::_kv_start()
+{
+  dout(10) << __func__ << dendl;
+
+  if (cct->_conf->bluestore_shard_finishers) {
+    if (cct->_conf->osd_op_num_shards) {
+      m_finisher_num = cct->_conf->osd_op_num_shards;
+    } else {
+      assert(bdev);
+      if (bdev->is_rotational()) {
+        m_finisher_num = cct->_conf->osd_op_num_shards_hdd;
+      } else {
+        m_finisher_num = cct->_conf->osd_op_num_shards_ssd;
+      }
+    }
+  }
+
+  assert(m_finisher_num != 0);
+
+  for (int i = 0; i < m_finisher_num; ++i) {
+    ostringstream oss;
+    oss << "finisher-" << i;
+    Finisher *f = new Finisher(cct, oss.str(), "finisher");
+    finishers.push_back(f);
+  }
+
+  for (auto f : finishers) {
+    f->start();
+  }
+  kv_sync_thread.create("bstore_kv_sync");
+  kv_finalize_thread.create("bstore_kv_final");
+}
+
+void BlueStore::_kv_stop()
+{
+  dout(10) << __func__ << dendl;
+  {
+    std::unique_lock<std::mutex> l(kv_lock);
+    while (!kv_sync_started) {
+      kv_cond.wait(l);
+    }
+    kv_stop = true;
+    kv_cond.notify_all();
+  }
+  {
+    std::unique_lock<std::mutex> l(kv_finalize_lock);
+    while (!kv_finalize_started) {
+      kv_finalize_cond.wait(l);
+    }
+    kv_finalize_stop = true;
+    kv_finalize_cond.notify_all();
+  }
+  kv_sync_thread.join();
+  kv_finalize_thread.join();
+  {
+    std::lock_guard<std::mutex> l(kv_lock);
+    kv_stop = false;
+  }
+  {
+    std::lock_guard<std::mutex> l(kv_finalize_lock);
+    kv_finalize_stop = false;
+  }
+  dout(10) << __func__ << " stopping finishers" << dendl;
+  for (auto f : finishers) {
+    f->wait_for_empty();
+    f->stop();
+  }
+  dout(10) << __func__ << " stopped" << dendl;
+}
+
 void BlueStore::_kv_sync_thread()
 {
   dout(10) << __func__ << " start" << dendl;
   std::unique_lock<std::mutex> l(kv_lock);
+  assert(!kv_sync_started);
+  kv_sync_started = true;
+  kv_cond.notify_all();
   while (true) {
     assert(kv_committing.empty());
     if (kv_queue.empty() &&
@@ -7951,6 +8171,8 @@ void BlueStore::_kv_sync_thread()
     } else {
       deque<TransContext*> kv_submitting;
       deque<DeferredBatch*> deferred_done, deferred_stable;
+      uint64_t aios = 0, costs = 0;
+
       dout(20) << __func__ << " committing " << kv_queue.size()
               << " submitting " << kv_queue_unsubmitted.size()
               << " deferred done " << deferred_done_queue.size()
@@ -7960,6 +8182,10 @@ void BlueStore::_kv_sync_thread()
       kv_submitting.swap(kv_queue_unsubmitted);
       deferred_done.swap(deferred_done_queue);
       deferred_stable.swap(deferred_stable_queue);
+      aios = kv_ios;
+      costs = kv_throttle_costs;
+      kv_ios = 0;
+      kv_throttle_costs = 0;
       utime_t start = ceph_clock_now();
       l.unlock();
 
@@ -7968,20 +8194,13 @@ void BlueStore::_kv_sync_thread()
       dout(30) << __func__ << " deferred_done " << deferred_done << dendl;
       dout(30) << __func__ << " deferred_stable " << deferred_stable << dendl;
 
-      int num_aios = 0;
-      for (auto txc : kv_committing) {
-       if (txc->had_ios) {
-         ++num_aios;
-       }
-      }
-
       bool force_flush = false;
       // if bluefs is sharing the same device as data (only), then we
       // can rely on the bluefs commit to flush the device and make
       // deferred aios stable.  that means that if we do have done deferred
       // txcs AND we are not on a single device, we need to force a flush.
       if (bluefs_single_shared_device && bluefs) {
-       if (num_aios) {
+       if (aios) {
          force_flush = true;
        } else if (kv_committing.empty() && kv_submitting.empty() &&
                   deferred_stable.empty()) {
@@ -7993,7 +8212,7 @@ void BlueStore::_kv_sync_thread()
        force_flush = true;
 
       if (force_flush) {
-       dout(20) << __func__ << " num_aios=" << num_aios
+       dout(20) << __func__ << " num_aios=" << aios
                 << " force_flush=" << (int)force_flush
                 << ", flushing, deferred done->stable" << dendl;
        // flush/barrier on block device
@@ -8035,7 +8254,7 @@ void BlueStore::_kv_sync_thread()
       for (auto txc : kv_submitting) {
        assert(txc->state == TransContext::STATE_KV_QUEUED);
        txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
-       int r = db->submit_transaction(txc->t);
+       int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
        assert(r == 0);
        _txc_applied_kv(txc);
        --txc->osr->kv_committing_serially;
@@ -8052,15 +8271,16 @@ void BlueStore::_kv_sync_thread()
          --txc->osr->txc_with_unstable_io;
        }
        txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
-       // release throttle *before* we commit.  this allows new ops
-       // to be prepared and enter pipeline while we are waiting on
-       // the kv commit sync/flush.  then hopefully on the next
-       // iteration there will already be ops awake.  otherwise, we
-       // end up going to sleep, and then wake up when the very first
-       // transaction is ready for commit.
-       throttle_bytes.put(txc->cost);
       }
 
+      // release throttle *before* we commit.  this allows new ops
+      // to be prepared and enter pipeline while we are waiting on
+      // the kv commit sync/flush.  then hopefully on the next
+      // iteration there will already be ops awake.  otherwise, we
+      // end up going to sleep, and then wake up when the very first
+      // transaction is ready for commit.
+      throttle_bytes.put(costs);
+
       PExtentVector bluefs_gift_extents;
       if (bluefs &&
          after_flush - bluefs_last_balance >
@@ -8100,7 +8320,7 @@ void BlueStore::_kv_sync_thread()
       }
 
       // submit synct synchronously (block and wait for it to commit)
-      int r = db->submit_transaction_sync(synct);
+      int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction_sync(synct);
       assert(r == 0);
 
       if (new_nid_max) {
@@ -8126,12 +8346,88 @@ void BlueStore::_kv_sync_thread()
        logger->tinc(l_bluestore_kv_commit_lat, dur_kv);
        logger->tinc(l_bluestore_kv_lat, dur);
       }
-      while (!kv_committing.empty()) {
-       TransContext *txc = kv_committing.front();
+
+      if (bluefs) {
+       if (!bluefs_gift_extents.empty()) {
+         _commit_bluefs_freespace(bluefs_gift_extents);
+       }
+       for (auto p = bluefs_extents_reclaiming.begin();
+            p != bluefs_extents_reclaiming.end();
+            ++p) {
+         dout(20) << __func__ << " releasing old bluefs 0x" << std::hex
+                  << p.get_start() << "~" << p.get_len() << std::dec
+                  << dendl;
+         alloc->release(p.get_start(), p.get_len());
+       }
+       bluefs_extents_reclaiming.clear();
+      }
+
+      {
+       std::unique_lock<std::mutex> m(kv_finalize_lock);
+       if (kv_committing_to_finalize.empty()) {
+         kv_committing_to_finalize.swap(kv_committing);
+       } else {
+         kv_committing_to_finalize.insert(
+           kv_committing_to_finalize.end(),
+           kv_committing.begin(),
+           kv_committing.end());
+         kv_committing.clear();
+       }
+       if (deferred_stable_to_finalize.empty()) {
+         deferred_stable_to_finalize.swap(deferred_stable);
+       } else {
+         deferred_stable_to_finalize.insert(
+           deferred_stable_to_finalize.end(),
+           deferred_stable.begin(),
+           deferred_stable.end());
+          deferred_stable.clear();
+       }
+       kv_finalize_cond.notify_one();
+      }
+
+      l.lock();
+      // previously deferred "done" are now "stable" by virtue of this
+      // commit cycle.
+      deferred_stable_queue.swap(deferred_done);
+    }
+  }
+  dout(10) << __func__ << " finish" << dendl;
+  kv_sync_started = false;
+}
+
+void BlueStore::_kv_finalize_thread()
+{
+  deque<TransContext*> kv_committed;
+  deque<DeferredBatch*> deferred_stable;
+  dout(10) << __func__ << " start" << dendl;
+  std::unique_lock<std::mutex> l(kv_finalize_lock);
+  assert(!kv_finalize_started);
+  kv_finalize_started = true;
+  kv_finalize_cond.notify_all();
+  while (true) {
+    assert(kv_committed.empty());
+    assert(deferred_stable.empty());
+    if (kv_committing_to_finalize.empty() &&
+       deferred_stable_to_finalize.empty()) {
+      if (kv_finalize_stop)
+       break;
+      dout(20) << __func__ << " sleep" << dendl;
+      kv_finalize_cond.wait(l);
+      dout(20) << __func__ << " wake" << dendl;
+    } else {
+      kv_committed.swap(kv_committing_to_finalize);
+      deferred_stable.swap(deferred_stable_to_finalize);
+      l.unlock();
+      dout(20) << __func__ << " kv_committed " << kv_committed << dendl;
+      dout(20) << __func__ << " deferred_stable " << deferred_stable << dendl;
+
+      while (!kv_committed.empty()) {
+       TransContext *txc = kv_committed.front();
        assert(txc->state == TransContext::STATE_KV_SUBMITTED);
        _txc_state_proc(txc);
-       kv_committing.pop_front();
+       kv_committed.pop_front();
       }
+
       for (auto b : deferred_stable) {
        auto p = b->txcs.begin();
        while (p != b->txcs.end()) {
@@ -8141,10 +8437,11 @@ void BlueStore::_kv_sync_thread()
        }
        delete b;
       }
+      deferred_stable.clear();
 
       if (!deferred_aggressive) {
        std::lock_guard<std::mutex> l(deferred_lock);
-       if (deferred_queue_size >= deferred_batch_ops ||
+       if (deferred_queue_size >= deferred_batch_ops.load() ||
            throttle_deferred_bytes.past_midpoint()) {
          _deferred_try_submit();
        }
@@ -8153,28 +8450,11 @@ void BlueStore::_kv_sync_thread()
       // this is as good a place as any ...
       _reap_collections();
 
-      if (bluefs) {
-       if (!bluefs_gift_extents.empty()) {
-         _commit_bluefs_freespace(bluefs_gift_extents);
-       }
-       for (auto p = bluefs_extents_reclaiming.begin();
-            p != bluefs_extents_reclaiming.end();
-            ++p) {
-         dout(20) << __func__ << " releasing old bluefs 0x" << std::hex
-                  << p.get_start() << "~" << p.get_len() << std::dec
-                  << dendl;
-         alloc->release(p.get_start(), p.get_len());
-       }
-       bluefs_extents_reclaiming.clear();
-      }
-
       l.lock();
-      // previously deferred "done" are now "stable" by virtue of this
-      // commit cycle.
-      deferred_stable_queue.swap(deferred_done);
     }
   }
   dout(10) << __func__ << " finish" << dendl;
+  kv_finalize_started = false;
 }
 
 bluestore_deferred_op_t *BlueStore::_get_deferred_op(
@@ -8297,13 +8577,15 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
   }
 
   {
+    uint64_t costs = 0;
     std::lock_guard<std::mutex> l2(osr->qlock);
     for (auto& i : b->txcs) {
       TransContext *txc = &i;
       txc->state = TransContext::STATE_DEFERRED_CLEANUP;
-      txc->osr->qcond.notify_all();
-      throttle_deferred_bytes.put(txc->cost);
+      costs += txc->cost;
     }
+    osr->qcond.notify_all();
+    throttle_deferred_bytes.put(costs);
     std::lock_guard<std::mutex> l(kv_lock);
     deferred_done_queue.emplace_back(b);
   }
@@ -9485,14 +9767,12 @@ int BlueStore::_do_alloc_write(
     }
     if (!compressed && wi.new_blob) {
       // initialize newly created blob only
-      assert(!dblob.has_flag(bluestore_blob_t::FLAG_MUTABLE));
-      dblob.set_flag(bluestore_blob_t::FLAG_MUTABLE);
-
+      assert(dblob.is_mutable());
       if (l->length() != wi.blob_length) {
         // hrm, maybe we could do better here, but let's not bother.
         dout(20) << __func__ << " forcing csum_order to block_size_order "
                 << block_size_order << dendl;
-       csum_order = block_size_order;
+       csum_order = block_size_order;
       } else {
         csum_order = std::min(wctx->csum_order, ctz(l->length()));
       }
@@ -9593,7 +9873,8 @@ void BlueStore::_wctx_finish(
   TransContext *txc,
   CollectionRef& c,
   OnodeRef o,
-  WriteContext *wctx)
+  WriteContext *wctx,
+  set<SharedBlob*> *maybe_unshared_blobs)
 {
   auto oep = wctx->old_extents.begin();
   while (oep != wctx->old_extents.end()) {
@@ -9616,7 +9897,9 @@ void BlueStore::_wctx_finish(
        PExtentVector final;
         c->load_shared_blob(b->shared_blob);
        for (auto e : r) {
-         b->shared_blob->put_ref(e.offset, e.length, &final);
+         b->shared_blob->put_ref(
+           e.offset, e.length, &final,
+           b->is_referenced() ? nullptr : maybe_unshared_blobs);
        }
        dout(20) << __func__ << "  shared_blob release " << final
                 << " from " << *b->shared_blob << dendl;
@@ -9693,68 +9976,41 @@ void BlueStore::_do_write_data(
   }
 }
 
-int BlueStore::_do_write(
-  TransContext *txc,
-  CollectionRef& c,
-  OnodeRef o,
-  uint64_t offset,
-  uint64_t length,
-  bufferlist& bl,
-  uint32_t fadvise_flags)
+void BlueStore::_choose_write_options(
+   CollectionRef& c,
+   OnodeRef o,
+   uint32_t fadvise_flags,
+   WriteContext *wctx)
 {
-  int r = 0;
-
-  dout(20) << __func__
-          << " " << o->oid
-          << " 0x" << std::hex << offset << "~" << length
-          << " - have 0x" << o->onode.size
-          << " (" << std::dec << o->onode.size << ")"
-          << " bytes"
-          << " fadvise_flags 0x" << std::hex << fadvise_flags << std::dec
-          << dendl;
-  _dump_onode(o);
-
-  if (length == 0) {
-    return 0;
-  }
-
-  uint64_t end = offset + length;
-  bool was_gc = false;
-  GarbageCollector gc(c->store->cct);
-  int64_t benefit;
-  auto dirty_start = offset;
-  auto dirty_end = offset + length;
-
-  WriteContext wctx, wctx_gc;
   if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) {
     dout(20) << __func__ << " will do buffered write" << dendl;
-    wctx.buffered = true;
+    wctx->buffered = true;
   } else if (cct->_conf->bluestore_default_buffered_write &&
             (fadvise_flags & (CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
                               CEPH_OSD_OP_FLAG_FADVISE_NOCACHE)) == 0) {
     dout(20) << __func__ << " defaulting to buffered write" << dendl;
-    wctx.buffered = true;
+    wctx->buffered = true;
   }
 
-  // FIXME: Using the MAX of the block_size_order and preferred_csum_order
-  // results in poor small random read performance when data was initially 
-  // written out in large chunks.  Reverting to previous behavior for now.
-  wctx.csum_order = block_size_order;
+  // apply basic csum block size
+  wctx->csum_order = block_size_order;
 
   // compression parameters
   unsigned alloc_hints = o->onode.alloc_hint_flags;
   auto cm = select_option(
     "compression_mode",
-    comp_mode.load(), 
+    comp_mode.load(),
     [&]() {
       string val;
       if(c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) {
-       return  boost::optional<Compressor::CompressionMode>(Compressor::get_comp_mode_type(val));
+       return boost::optional<Compressor::CompressionMode>(
+         Compressor::get_comp_mode_type(val));
       }
       return boost::optional<Compressor::CompressionMode>();
     }
   );
-  wctx.compress = (cm != Compressor::COMP_NONE) &&
+
+  wctx->compress = (cm != Compressor::COMP_NONE) &&
     ((cm == Compressor::COMP_FORCE) ||
      (cm == Compressor::COMP_AGGRESSIVE &&
       (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE) == 0) ||
@@ -9763,22 +10019,24 @@ int BlueStore::_do_write(
 
   if ((alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ) &&
       (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ) == 0 &&
-      (alloc_hints & (CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE|
-                       CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY)) &&
+      (alloc_hints & (CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE |
+                      CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY)) &&
       (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE) == 0) {
+
     dout(20) << __func__ << " will prefer large blob and csum sizes" << dendl;
+
     auto order = min_alloc_size_order.load();
     if (o->onode.expected_write_size) {
-      wctx.csum_order = std::max(order,
-                                (uint8_t)ctz(o->onode.expected_write_size));
+      wctx->csum_order = std::max(order,
+                                 (uint8_t)ctz(o->onode.expected_write_size));
     } else {
-      wctx.csum_order = order;
+      wctx->csum_order = order;
     }
 
-    if (wctx.compress) {
-      wctx.target_blob_size = select_option(
+    if (wctx->compress) {
+      wctx->target_blob_size = select_option(
         "compression_max_blob_size",
-        comp_max_blob_size.load(), 
+        comp_max_blob_size.load(),
         [&]() {
           int val;
           if(c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &val)) {
@@ -9789,10 +10047,10 @@ int BlueStore::_do_write(
       );
     }
   } else {
-    if (wctx.compress) {
-      wctx.target_blob_size = select_option(
+    if (wctx->compress) {
+      wctx->target_blob_size = select_option(
         "compression_min_blob_size",
-        comp_min_blob_size.load(), 
+        comp_min_blob_size.load(),
         [&]() {
           int val;
           if(c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &val)) {
@@ -9803,25 +10061,107 @@ int BlueStore::_do_write(
       );
     }
   }
+
   uint64_t max_bsize = max_blob_size.load();
-  if (wctx.target_blob_size == 0 || wctx.target_blob_size > max_bsize) {
-    wctx.target_blob_size = max_bsize;
+  if (wctx->target_blob_size == 0 || wctx->target_blob_size > max_bsize) {
+    wctx->target_blob_size = max_bsize;
   }
+
   // set the min blob size floor at 2x the min_alloc_size, or else we
   // won't be able to allocate a smaller extent for the compressed
   // data.
-  if (wctx.compress &&
-      wctx.target_blob_size < min_alloc_size * 2) {
-    wctx.target_blob_size = min_alloc_size * 2;
+  if (wctx->compress &&
+      wctx->target_blob_size < min_alloc_size * 2) {
+    wctx->target_blob_size = min_alloc_size * 2;
   }
+
+  dout(20) << __func__ << " prefer csum_order " << wctx->csum_order
+           << " target_blob_size 0x" << std::hex << wctx->target_blob_size
+           << std::dec << dendl;
+}
+
+int BlueStore::_do_gc(
+  TransContext *txc,
+  CollectionRef& c,
+  OnodeRef o,
+  const GarbageCollector& gc,
+  const WriteContext& wctx,
+  uint64_t *dirty_start,
+  uint64_t *dirty_end)
+{
+  auto& extents_to_collect = gc.get_extents_to_collect();
+
+  WriteContext wctx_gc;
   wctx_gc.fork(wctx); // make a clone for garbage collection
-  dout(20) << __func__ << " prefer csum_order " << wctx.csum_order
-          << " target_blob_size 0x" << std::hex << wctx.target_blob_size
-          << std::dec << dendl;
 
+  for (auto it = extents_to_collect.begin();
+       it != extents_to_collect.end();
+       ++it) {
+    bufferlist bl;
+    int r = _do_read(c.get(), o, it->offset, it->length, bl, 0);
+    assert(r == (int)it->length);
+
+    o->extent_map.fault_range(db, it->offset, it->length);
+    _do_write_data(txc, c, o, it->offset, it->length, bl, &wctx_gc);
+    logger->inc(l_bluestore_gc_merged, it->length);
+
+    if (*dirty_start > it->offset) {
+      *dirty_start = it->offset;
+    }
+
+    if (*dirty_end < it->offset + it->length) {
+      *dirty_end = it->offset + it->length;
+    }
+  }
+
+  dout(30) << __func__ << " alloc write" << dendl;
+  int r = _do_alloc_write(txc, c, o, &wctx_gc);
+  if (r < 0) {
+    derr << __func__ << " _do_alloc_write failed with " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
+
+  _wctx_finish(txc, c, o, &wctx_gc);
+  return 0;
+}
+
+int BlueStore::_do_write(
+  TransContext *txc,
+  CollectionRef& c,
+  OnodeRef o,
+  uint64_t offset,
+  uint64_t length,
+  bufferlist& bl,
+  uint32_t fadvise_flags)
+{
+  int r = 0;
+
+  dout(20) << __func__
+          << " " << o->oid
+          << " 0x" << std::hex << offset << "~" << length
+          << " - have 0x" << o->onode.size
+          << " (" << std::dec << o->onode.size << ")"
+          << " bytes"
+          << " fadvise_flags 0x" << std::hex << fadvise_flags << std::dec
+          << dendl;
+  _dump_onode(o);
+
+  if (length == 0) {
+    return 0;
+  }
+
+  uint64_t end = offset + length;
+
+  GarbageCollector gc(c->store->cct);
+  int64_t benefit;
+  auto dirty_start = offset;
+  auto dirty_end = end;
+
+  WriteContext wctx;
+  _choose_write_options(c, o, fadvise_flags, &wctx);
   o->extent_map.fault_range(db, offset, length);
   _do_write_data(txc, c, o, offset, length, bl, &wctx);
-
   r = _do_alloc_write(txc, c, o, &wctx);
   if (r < 0) {
     derr << __func__ << " _do_alloc_write failed with " << cpp_strerror(r)
@@ -9829,54 +10169,37 @@ int BlueStore::_do_write(
     goto out;
   }
 
+  // NB: _wctx_finish() will empty old_extents
+  // so we must do gc estimation before that
   benefit = gc.estimate(offset,
-                                length,
-                               o->extent_map,
-                               wctx.old_extents,
-                               min_alloc_size);
+                        length,
+                       o->extent_map,
+                       wctx.old_extents,
+                       min_alloc_size);
 
   _wctx_finish(txc, c, o, &wctx);
   if (end > o->onode.size) {
     dout(20) << __func__ << " extending size to 0x" << std::hex << end
-            << std::dec << dendl;
+             << std::dec << dendl;
     o->onode.size = end;
   }
 
   if (benefit >= g_conf->bluestore_gc_enable_total_threshold) {
-    dout(20)  << __func__ << " perform garbage collection, expected benefit = "
-              << benefit << " AUs" << dendl;
-    auto& extents_to_collect = gc.get_extents_to_collect();
-    for (auto it = extents_to_collect.begin();
-         it != extents_to_collect.end();
-         ++it) {
-      bufferlist bl;
-      int r = _do_read(c.get(), o, it->offset, it->length, bl, 0);
-      assert(r == (int)it->length);
-      o->extent_map.fault_range(db, it->offset, it->length);
-      _do_write_data(txc, c, o, it->offset, it->length, bl, &wctx_gc);
-      logger->inc(l_bluestore_gc_merged, it->length);
-      was_gc = true;
-      if (dirty_start > it->offset) {
-        dirty_start = it->offset;
-      }
-      if (dirty_end < it->offset + it->length) {
-        dirty_end = it->offset + it->length;
+    if (!gc.get_extents_to_collect().empty()) {
+      dout(20) << __func__ << " perform garbage collection, "
+               << "expected benefit = " << benefit << " AUs" << dendl;
+      r = _do_gc(txc, c, o, gc, wctx, &dirty_start, &dirty_end);
+      if (r < 0) {
+        derr << __func__ << " _do_gc failed with " << cpp_strerror(r)
+             << dendl;
+        goto out;
       }
     }
   }
-  if (was_gc) {
-    dout(30)  << __func__ << " alloc write for GC" << dendl;
-    r = _do_alloc_write(txc, c, o, &wctx_gc);
-    if (r < 0) {
-      derr << __func__ << " _do_alloc_write(gc) failed with " << cpp_strerror(r)
-          << dendl;
-      goto out;
-    }
-    _wctx_finish(txc, c, o, &wctx_gc);
-  }
 
   o->extent_map.compress_extent_map(dirty_start, dirty_end - dirty_start);
-  o->extent_map.dirty_range(txc->t, dirty_start, dirty_end - dirty_start);
+  o->extent_map.dirty_range(dirty_start, dirty_end - dirty_start);
+
   r = 0;
 
  out:
@@ -9886,9 +10209,9 @@ int BlueStore::_do_write(
 int BlueStore::_write(TransContext *txc,
                      CollectionRef& c,
                      OnodeRef& o,
-                    uint64_t offset, size_t length,
-                    bufferlist& bl,
-                    uint32_t fadvise_flags)
+                     uint64_t offset, size_t length,
+                     bufferlist& bl,
+                     uint32_t fadvise_flags)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
@@ -9936,7 +10259,7 @@ int BlueStore::_do_zero(TransContext *txc,
   WriteContext wctx;
   o->extent_map.fault_range(db, offset, length);
   o->extent_map.punch_hole(c, offset, length, &wctx.old_extents);
-  o->extent_map.dirty_range(txc->t, offset, length);
+  o->extent_map.dirty_range(offset, length);
   _wctx_finish(txc, c, o, &wctx);
 
   if (offset + length > o->onode.size) {
@@ -9953,7 +10276,8 @@ int BlueStore::_do_zero(TransContext *txc,
 }
 
 void BlueStore::_do_truncate(
-  TransContext *txc, CollectionRef& c, OnodeRef o, uint64_t offset)
+  TransContext *txc, CollectionRef& c, OnodeRef o, uint64_t offset,
+  set<SharedBlob*> *maybe_unshared_blobs)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << std::dec << dendl;
@@ -9961,15 +10285,15 @@ void BlueStore::_do_truncate(
   _dump_onode(o, 30);
 
   if (offset == o->onode.size)
-    return ;
+    return;
 
   if (offset < o->onode.size) {
     WriteContext wctx;
     uint64_t length = o->onode.size - offset;
     o->extent_map.fault_range(db, offset, length);
     o->extent_map.punch_hole(c, offset, length, &wctx.old_extents);
-    o->extent_map.dirty_range(txc->t, offset, length);
-    _wctx_finish(txc, c, o, &wctx);
+    o->extent_map.dirty_range(offset, length);
+    _wctx_finish(txc, c, o, &wctx, maybe_unshared_blobs);
 
     // if we have shards past EOF, ask for a reshard
     if (!o->onode.extent_map_shards.empty() &&
@@ -10004,7 +10328,8 @@ int BlueStore::_do_remove(
   CollectionRef& c,
   OnodeRef o)
 {
-  _do_truncate(txc, c, o, 0);
+  set<SharedBlob*> maybe_unshared_blobs;
+  _do_truncate(txc, c, o, 0, &maybe_unshared_blobs);
   if (o->onode.has_omap()) {
     o->flush();
     _do_omap_clear(txc, o->onode.nid);
@@ -10025,6 +10350,73 @@ int BlueStore::_do_remove(
   o->extent_map.clear();
   o->onode = bluestore_onode_t();
   _debug_obj_on_delete(o->oid);
+
+  if (!o->oid.is_no_gen() &&
+      !maybe_unshared_blobs.empty()) {
+    // see if we can unshare blobs still referenced by the head
+    dout(10) << __func__ << " gen and maybe_unshared_blobs "
+            << maybe_unshared_blobs << dendl;
+    ghobject_t nogen = o->oid;
+    nogen.generation = ghobject_t::NO_GEN;
+    OnodeRef h = c->onode_map.lookup(nogen);
+    if (h && h->exists) {
+      dout(20) << __func__ << " checking for unshareable blobs on " << h
+              << " " << h->oid << dendl;
+      map<SharedBlob*,bluestore_extent_ref_map_t> expect;
+      for (auto& e : h->extent_map.extent_map) {
+       const bluestore_blob_t& b = e.blob->get_blob();
+       SharedBlob *sb = e.blob->shared_blob.get();
+       if (b.is_shared() &&
+           sb->loaded &&
+           maybe_unshared_blobs.count(sb)) {
+         b.map(e.blob_offset, e.length, [&](uint64_t off, uint64_t len) {
+             expect[sb].get(off, len);
+             return 0;
+           });
+       }
+      }
+      vector<SharedBlob*> unshared_blobs;
+      unshared_blobs.reserve(maybe_unshared_blobs.size());
+      for (auto& p : expect) {
+       dout(20) << " ? " << *p.first << " vs " << p.second << dendl;
+       if (p.first->persistent->ref_map == p.second) {
+         SharedBlob *sb = p.first;
+         dout(20) << __func__ << "  unsharing " << *sb << dendl;
+         unshared_blobs.push_back(sb);
+         txc->unshare_blob(sb);
+         uint64_t sbid = c->make_blob_unshared(sb);
+         string key;
+         get_shared_blob_key(sbid, &key);
+         txc->t->rmkey(PREFIX_SHARED_BLOB, key);
+       }
+      }
+
+      if (!unshared_blobs.empty()) {
+        uint32_t b_start = OBJECT_MAX_SIZE;
+        uint32_t b_end = 0;
+        for (auto& e : h->extent_map.extent_map) {
+          const bluestore_blob_t& b = e.blob->get_blob();
+          SharedBlob *sb = e.blob->shared_blob.get();
+          if (b.is_shared() &&
+              std::find(unshared_blobs.begin(), unshared_blobs.end(),
+                        sb) != unshared_blobs.end()) {
+            dout(20) << __func__ << "  unsharing " << e << dendl;
+            bluestore_blob_t& blob = e.blob->dirty_blob();
+            blob.clear_flag(bluestore_blob_t::FLAG_SHARED);
+            if (e.logical_offset < b_start) {
+              b_start = e.logical_offset;
+            }
+            if (e.logical_end() > b_end) {
+              b_end = e.logical_end();
+            }
+          }
+        }
+
+       h->extent_map.dirty_range(b_start, b_end - b_start);
+       txc->write_onode(h);
+      }
+    }
+  }
   return 0;
 }
 
@@ -10476,7 +10868,7 @@ int BlueStore::_do_clone_range(
     ++n;
   }
   if (dirtied_oldo) {
-    oldo->extent_map.dirty_range(txc->t, srcoff, length); // overkill
+    oldo->extent_map.dirty_range(srcoff, length); // overkill
     txc->write_onode(oldo);
   }
   txc->write_onode(newo);
@@ -10484,7 +10876,7 @@ int BlueStore::_do_clone_range(
   if (dstoff + length > newo->onode.size) {
     newo->onode.size = dstoff + length;
   }
-  newo->extent_map.dirty_range(txc->t, dstoff, length);
+  newo->extent_map.dirty_range(dstoff, length);
   _dump_onode(oldo);
   _dump_onode(newo);
   return 0;
@@ -10545,7 +10937,7 @@ int BlueStore::_rename(TransContext *txc,
           << new_oid << dendl;
   int r;
   ghobject_t old_oid = oldo->oid;
-  mempool::bluestore_meta_other::string new_okey;
+  mempool::bluestore_cache_other::string new_okey;
 
   if (newo) {
     if (newo->exists) {
@@ -10906,11 +11298,12 @@ void BlueStore::generate_db_histogram(Formatter *f)
 
 }
 
-void BlueStore::flush_cache()
+void BlueStore::_flush_cache()
 {
   dout(10) << __func__ << dendl;
   for (auto i : cache_shards) {
     i->trim_all();
+    assert(i->empty());
   }
   for (auto& p : coll_map) {
     assert(p.second->onode_map.empty());
@@ -10919,6 +11312,18 @@ void BlueStore::flush_cache()
   coll_map.clear();
 }
 
+// For external caller.
+// We use a best-effort policy instead, e.g.,
+// we don't care if there are still some pinned onodes/data in the cache
+// after this command is completed.
+void BlueStore::flush_cache()
+{
+  dout(10) << __func__ << dendl;
+  for (auto i : cache_shards) {
+    i->trim_all();
+  }
+}
+
 void BlueStore::_apply_padding(uint64_t head_pad,
                               uint64_t tail_pad,
                               bufferlist& bl,