]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/BlueStore.cc
update sources to 12.2.7
[ceph.git] / ceph / src / os / bluestore / BlueStore.cc
index 789c05f0ae97643c876f3034503ac26585e7e49a..6fdd11b170b2403af505c2cf68717d686314893f 100644 (file)
@@ -17,6 +17,8 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 
+#include "include/cpp-btree/btree_set.h"
+
 #include "BlueStore.h"
 #include "os/kv.h"
 #include "include/compat.h"
 #define dout_context cct
 #define dout_subsys ceph_subsys_bluestore
 
-// bluestore_meta_onode
+using bid_t = decltype(BlueStore::Blob::id);
+
+// bluestore_cache_onode
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Onode, bluestore_onode,
-                             bluestore_meta_onode);
+                             bluestore_cache_onode);
 
-// bluestore_meta_other
+// bluestore_cache_other
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Buffer, bluestore_buffer,
-                             bluestore_meta_other);
+                             bluestore_cache_other);
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Extent, bluestore_extent,
-                             bluestore_meta_other);
+                             bluestore_cache_other);
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Blob, bluestore_blob,
-                             bluestore_meta_other);
+                             bluestore_cache_other);
 MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::SharedBlob, bluestore_shared_blob,
-                             bluestore_meta_other);
+                             bluestore_cache_other);
+
+// bluestore_txc
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::TransContext, bluestore_transcontext,
+                             bluestore_txc);
+
 
 // kv store prefixes
 const string PREFIX_SUPER = "S";   // field -> value
@@ -125,36 +134,68 @@ const string PREFIX_SHARED_BLOB = "X"; // u64 offset -> shared_blob_t
 template<typename S>
 static void append_escaped(const string &in, S *out)
 {
-  char hexbyte[8];
+  char hexbyte[in.length() * 3 + 1];
+  char* ptr = &hexbyte[0];
   for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
     if (*i <= '#') {
-      snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
-      out->append(hexbyte);
+      *ptr++ = '#';
+      *ptr++ = "0123456789abcdef"[(*i >> 4) & 0x0f];
+      *ptr++ = "0123456789abcdef"[*i & 0x0f];
     } else if (*i >= '~') {
-      snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
-      out->append(hexbyte);
+      *ptr++ = '~';
+      *ptr++ = "0123456789abcdef"[(*i >> 4) & 0x0f];
+      *ptr++ = "0123456789abcdef"[*i & 0x0f];
     } else {
-      out->push_back(*i);
+      *ptr++  = *i;
     }
   }
-  out->push_back('!');
+  *ptr++ = '!';
+  out->append(hexbyte, ptr - &hexbyte[0]);
+}
+
+inline unsigned h2i(char c)
+{
+  if ((c >= '0') && (c <= '9')) {
+    return c - 0x30;
+  } else if ((c >= 'a') && (c <= 'f')) {
+    return c - 'a' + 10;
+  } else if ((c >= 'A') && (c <= 'F')) {
+    return c - 'A' + 10;
+  } else {
+    return 256; // make it always larger than 255
+  }
 }
 
 static int decode_escaped(const char *p, string *out)
 {
+  char buff[256];
+  char* ptr = &buff[0];
+  char* max = &buff[252];
   const char *orig_p = p;
   while (*p && *p != '!') {
     if (*p == '#' || *p == '~') {
-      unsigned hex;
-      int r = sscanf(++p, "%2x", &hex);
-      if (r < 1)
-       return -EINVAL;
-      out->push_back((char)hex);
-      p += 2;
+      unsigned hex = 0;
+      p++;
+      hex = h2i(*p++) << 4;
+      if (hex > 255) {
+        return -EINVAL;
+      }
+      hex |= h2i(*p++);
+      if (hex > 255) {
+        return -EINVAL;
+      }
+      *ptr++ = hex;
     } else {
-      out->push_back(*p++);
+      *ptr++ = *p++;
+    }
+    if (ptr > max) {
+       out->append(buff, ptr-buff);
+       ptr = &buff[0];
     }
   }
+  if (ptr != buff) {
+     out->append(buff, ptr-buff);
+  }
   return p - orig_p;
 }
 
@@ -276,7 +317,7 @@ static int get_key_shared_blob(const string& key, uint64_t *sbid)
   const char *p = key.c_str();
   if (key.length() < sizeof(uint64_t))
     return -1;
-  p = _key_decode_u64(p, sbid);
+  _key_decode_u64(p, sbid);
   return 0;
 }
 
@@ -443,7 +484,7 @@ int get_key_extent_shard(const string& key, string *onode_key, uint32_t *offset)
   int okey_len = key.size() - sizeof(uint32_t) - 1;
   *onode_key = key.substr(0, okey_len);
   const char *p = key.data() + okey_len;
-  p = _key_decode_u32(p, offset);
+  _key_decode_u32(p, offset);
   return 0;
 }
 
@@ -654,7 +695,7 @@ void BlueStore::GarbageCollector::process_protrusive_extents(
             }
             bExit = it == bi.last_lextent;
             ++it;
-          } while(!bExit);
+          } while (!bExit);
         }
         expected_for_release += blob_expected_for_release;
         expected_allocations += bi.expected_allocations;
@@ -740,13 +781,12 @@ void BlueStore::Cache::trim_all()
 {
   std::lock_guard<std::recursive_mutex> l(lock);
   _trim(0, 0);
-  assert(_get_num_onodes() == 0);
-  assert(_get_buffer_bytes() == 0);
 }
 
 void BlueStore::Cache::trim(
   uint64_t target_bytes,
   float target_meta_ratio,
+  float target_data_ratio,
   float bytes_per_onode)
 {
   std::lock_guard<std::recursive_mutex> l(lock);
@@ -754,23 +794,18 @@ void BlueStore::Cache::trim(
   uint64_t current_buffer = _get_buffer_bytes();
   uint64_t current = current_meta + current_buffer;
 
-  uint64_t target_meta = target_bytes * (double)target_meta_ratio; //need to cast to double
-                                                                   //since float(1) might produce inaccurate value
-                                                                   // for target_meta (a bit greater than target_bytes)
-                                                                   // that causes overflow in target_buffer below.
-                                                                   //Consider the following code:
-                                                                   //uint64_t i =(uint64_t)227*1024*1024*1024 + 1;
-                                                                   //float f = 1;
-                                                                   //uint64_t i2 = i*f;
-                                                                   //assert(i == i2);
+  uint64_t target_meta = target_bytes * target_meta_ratio;
+  uint64_t target_buffer = target_bytes * target_data_ratio;
 
-  target_meta = min(target_bytes, target_meta); //and just in case that ratio is > 1
-  uint64_t target_buffer = target_bytes - target_meta;
+  // correct for overflow or float imprecision
+  target_meta = min(target_bytes, target_meta);
+  target_buffer = min(target_bytes - target_meta, target_buffer);
 
   if (current <= target_bytes) {
     dout(10) << __func__
             << " shard target " << pretty_si_t(target_bytes)
-            << " ratio " << target_meta_ratio << " ("
+            << " meta/data ratios " << target_meta_ratio
+            << " + " << target_data_ratio << " ("
             << pretty_si_t(target_meta) << " + "
             << pretty_si_t(target_buffer) << "), "
             << " current " << pretty_si_t(current) << " ("
@@ -1270,14 +1305,18 @@ int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t len
        if (b->data.length()) {
          bufferlist bl;
          bl.substr_of(b->data, b->length - tail, tail);
-         _add_buffer(cache, new Buffer(this, b->state, b->seq, end, bl), 0, b);
+         Buffer *nb = new Buffer(this, b->state, b->seq, end, bl);
+         nb->maybe_rebuild();
+         _add_buffer(cache, nb, 0, b);
        } else {
-         _add_buffer(cache, new Buffer(this, b->state, b->seq, end, tail), 0, b);
+         _add_buffer(cache, new Buffer(this, b->state, b->seq, end, tail),
+                     0, b);
        }
        if (!b->is_writing()) {
          cache->_adjust_buffer_size(b, front - (int64_t)b->length);
        }
        b->truncate(front);
+       b->maybe_rebuild();
        cache->_audit("discard end 1");
        break;
       } else {
@@ -1286,6 +1325,7 @@ int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t len
          cache->_adjust_buffer_size(b, front - (int64_t)b->length);
        }
        b->truncate(front);
+       b->maybe_rebuild();
        ++i;
        continue;
       }
@@ -1300,7 +1340,9 @@ int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t len
     if (b->data.length()) {
       bufferlist bl;
       bl.substr_of(b->data, b->length - keep, keep);
-      _add_buffer(cache, new Buffer(this, b->state, b->seq, end, bl), 0, b);
+      Buffer *nb = new Buffer(this, b->state, b->seq, end, bl);
+      nb->maybe_rebuild();
+      _add_buffer(cache, nb, 0, b);
     } else {
       _add_buffer(cache, new Buffer(this, b->state, b->seq, end, keep), 0, b);
     }
@@ -1313,55 +1355,59 @@ int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t len
 
 void BlueStore::BufferSpace::read(
   Cache* cache, 
-  uint32_t offset, uint32_t length,
+  uint32_t offset,
+  uint32_t length,
   BlueStore::ready_regions_t& res,
   interval_set<uint32_t>& res_intervals)
 {
-  std::lock_guard<std::recursive_mutex> l(cache->lock);
   res.clear();
   res_intervals.clear();
   uint32_t want_bytes = length;
   uint32_t end = offset + length;
-  for (auto i = _data_lower_bound(offset);
-       i != buffer_map.end() && offset < end && i->first < end;
-       ++i) {
-    Buffer *b = i->second.get();
-    assert(b->end() > offset);
-    if (b->is_writing() || b->is_clean()) {
-      if (b->offset < offset) {
-       uint32_t skip = offset - b->offset;
-       uint32_t l = MIN(length, b->length - skip);
-       res[offset].substr_of(b->data, skip, l);
-       res_intervals.insert(offset, l);
-       offset += l;
-       length -= l;
-       if (!b->is_writing()) {
+
+  {
+    std::lock_guard<std::recursive_mutex> l(cache->lock);
+    for (auto i = _data_lower_bound(offset);
+         i != buffer_map.end() && offset < end && i->first < end;
+         ++i) {
+      Buffer *b = i->second.get();
+      assert(b->end() > offset);
+      if (b->is_writing() || b->is_clean()) {
+        if (b->offset < offset) {
+         uint32_t skip = offset - b->offset;
+         uint32_t l = MIN(length, b->length - skip);
+         res[offset].substr_of(b->data, skip, l);
+         res_intervals.insert(offset, l);
+         offset += l;
+         length -= l;
+         if (!b->is_writing()) {
+           cache->_touch_buffer(b);
+         }
+         continue;
+        }
+        if (b->offset > offset) {
+         uint32_t gap = b->offset - offset;
+         if (length <= gap) {
+           break;
+         }
+         offset += gap;
+         length -= gap;
+        }
+        if (!b->is_writing()) {
          cache->_touch_buffer(b);
-       }
-       continue;
-      }
-      if (b->offset > offset) {
-       uint32_t gap = b->offset - offset;
-       if (length <= gap) {
-         break;
-       }
-       offset += gap;
-       length -= gap;
-      }
-      if (!b->is_writing()) {
-       cache->_touch_buffer(b);
-      }
-      if (b->length > length) {
-       res[offset].substr_of(b->data, 0, length);
-       res_intervals.insert(offset, length);
-        break;
-      } else {
-       res[offset].append(b->data);
-       res_intervals.insert(offset, b->length);
-        if (b->length == length)
+        }
+        if (b->length > length) {
+         res[offset].substr_of(b->data, 0, length);
+         res_intervals.insert(offset, length);
           break;
-       offset += b->length;
-       length -= b->length;
+        } else {
+         res[offset].append(b->data);
+         res_intervals.insert(offset, b->length);
+          if (b->length == length)
+            break;
+         offset += b->length;
+         length -= b->length;
+        }
       }
     }
   }
@@ -1397,6 +1443,8 @@ void BlueStore::BufferSpace::finish_write(Cache* cache, uint64_t seq)
     } else {
       b->state = Buffer::STATE_CLEAN;
       writing.erase(i++);
+      b->maybe_rebuild();
+      b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
       cache->_add_buffer(b, 1, nullptr);
       ldout(cache->cct, 20) << __func__ << " added " << *b << dendl;
     }
@@ -1478,19 +1526,30 @@ BlueStore::OnodeRef BlueStore::OnodeSpace::add(const ghobject_t& oid, OnodeRef o
 
 BlueStore::OnodeRef BlueStore::OnodeSpace::lookup(const ghobject_t& oid)
 {
-  std::lock_guard<std::recursive_mutex> l(cache->lock);
   ldout(cache->cct, 30) << __func__ << dendl;
-  ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
-  if (p == onode_map.end()) {
-    ldout(cache->cct, 30) << __func__ << " " << oid << " miss" << dendl;
+  OnodeRef o;
+  bool hit = false;
+
+  {
+    std::lock_guard<std::recursive_mutex> l(cache->lock);
+    ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
+    if (p == onode_map.end()) {
+      ldout(cache->cct, 30) << __func__ << " " << oid << " miss" << dendl;
+    } else {
+      ldout(cache->cct, 30) << __func__ << " " << oid << " hit " << p->second
+                           << dendl;
+      cache->_touch_onode(p->second);
+      hit = true;
+      o = p->second;
+    }
+  }
+
+  if (hit) {
+    cache->logger->inc(l_bluestore_onode_hits);
+  } else {
     cache->logger->inc(l_bluestore_onode_misses);
-    return OnodeRef();
   }
-  ldout(cache->cct, 30) << __func__ << " " << oid << " hit " << p->second
-                       << dendl;
-  cache->_touch_onode(p->second);
-  cache->logger->inc(l_bluestore_onode_hits);
-  return p->second;
+  return o;
 }
 
 void BlueStore::OnodeSpace::clear()
@@ -1513,7 +1572,7 @@ void BlueStore::OnodeSpace::rename(
   OnodeRef& oldo,
   const ghobject_t& old_oid,
   const ghobject_t& new_oid,
-  const mempool::bluestore_meta_other::string& new_okey)
+  const mempool::bluestore_cache_other::string& new_okey)
 {
   std::lock_guard<std::recursive_mutex> l(cache->lock);
   ldout(cache->cct, 30) << __func__ << " " << old_oid << " -> " << new_oid
@@ -1556,6 +1615,12 @@ bool BlueStore::OnodeSpace::map_any(std::function<bool(OnodeRef)> f)
   return false;
 }
 
+void BlueStore::OnodeSpace::dump(CephContext *cct, int lvl)
+{
+  for (auto& i : onode_map) {
+    ldout(cct, lvl) << i.first << " : " << i.second << dendl;
+  }
+}
 
 // SharedBlob
 
@@ -1602,16 +1667,9 @@ void BlueStore::SharedBlob::put()
                             << " removing self from set " << get_parent()
                             << dendl;
     if (get_parent()) {
-      if (get_parent()->remove(this)) {
-       delete this;
-      } else {
-       ldout(coll->store->cct, 20)
-         << __func__ << " " << this << " lost race to remove myself from set"
-         << dendl;
-      }
-    } else {
-      delete this;
+      get_parent()->remove(this);
     }
+    delete this;
   }
 }
 
@@ -1622,10 +1680,28 @@ void BlueStore::SharedBlob::get_ref(uint64_t offset, uint32_t length)
 }
 
 void BlueStore::SharedBlob::put_ref(uint64_t offset, uint32_t length,
-  PExtentVector *r)
+                                   PExtentVector *r,
+                                   set<SharedBlob*> *maybe_unshared)
 {
   assert(persistent);
-  persistent->ref_map.put(offset, length, r);
+  bool maybe = false;
+  persistent->ref_map.put(offset, length, r, maybe_unshared ? &maybe : nullptr);
+  if (maybe_unshared && maybe) {
+    maybe_unshared->insert(this);
+  }
+}
+
+// SharedBlobSet
+
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore.sharedblobset(" << this << ") "
+
+void BlueStore::SharedBlobSet::dump(CephContext *cct, int lvl)
+{
+  std::lock_guard<std::mutex> l(lock);
+  for (auto& i : sb_map) {
+    ldout(cct, lvl) << i.first << " : " << *i.second << dendl;
+  }
 }
 
 // Blob
@@ -1639,21 +1715,25 @@ ostream& operator<<(ostream& out, const BlueStore::Blob& b)
   if (b.is_spanning()) {
     out << " spanning " << b.id;
   }
-  out << " " << b.get_blob() << " " << b.get_blob_use_tracker()
-      << " " << *b.shared_blob
-      << ")";
+  out << " " << b.get_blob() << " " << b.get_blob_use_tracker();
+  if (b.shared_blob) {
+    out << " " << *b.shared_blob;
+  } else {
+    out << " (shared_blob=NULL)";
+  }
+  out << ")";
   return out;
 }
 
 void BlueStore::Blob::discard_unallocated(Collection *coll)
 {
-  if (blob.is_shared()) {
+  if (get_blob().is_shared()) {
     return;
   }
-  if (blob.is_compressed()) {
+  if (get_blob().is_compressed()) {
     bool discard = false;
     bool all_invalid = true;
-    for (auto e : blob.get_extents()) {
+    for (auto e : get_blob().get_extents()) {
       if (!e.is_valid()) {
         discard = true;
       } else {
@@ -1663,11 +1743,12 @@ void BlueStore::Blob::discard_unallocated(Collection *coll)
     assert(discard == all_invalid); // in case of compressed blob all
                                    // or none pextents are invalid.
     if (discard) {
-      shared_blob->bc.discard(shared_blob->get_cache(), 0, blob.get_logical_length());
+      shared_blob->bc.discard(shared_blob->get_cache(), 0,
+                              get_blob().get_logical_length());
     }
   } else {
     size_t pos = 0;
-    for (auto e : blob.get_extents()) {
+    for (auto e : get_blob().get_extents()) {
       if (!e.is_valid()) {
        ldout(coll->store->cct, 20) << __func__ << " 0x" << std::hex << pos
                                    << "~" << e.length
@@ -1676,12 +1757,11 @@ void BlueStore::Blob::discard_unallocated(Collection *coll)
       }
       pos += e.length;
     }
-    if (blob.can_prune_tail()) {
-      dirty_blob();
-      blob.prune_tail();
-      used_in_blob.prune_tail(blob.get_ondisk_length());
+    if (get_blob().can_prune_tail()) {
+      dirty_blob().prune_tail();
+      used_in_blob.prune_tail(get_blob().get_ondisk_length());
       auto cct = coll->store->cct; //used by dout
-      dout(20) << __func__ << " pruned tail, now " << blob << dendl;
+      dout(20) << __func__ << " pruned tail, now " << get_blob() << dendl;
     }
   }
 }
@@ -1702,10 +1782,10 @@ void BlueStore::Blob::get_ref(
 
   if (used_in_blob.is_empty()) {
     uint32_t min_release_size =
-      blob.get_release_size(coll->store->min_alloc_size);
-    uint64_t l = blob.get_logical_length();
-    dout(20) << __func__ << " init 0x" << std::hex << l << ", " << min_release_size
-             << std::dec << dendl;
+      get_blob().get_release_size(coll->store->min_alloc_size);
+    uint64_t l = get_blob().get_logical_length();
+    dout(20) << __func__ << " init 0x" << std::hex << l << ", "
+             << min_release_size << std::dec << dendl;
     used_in_blob.init(l, min_release_size);
   }
   used_in_blob.get(
@@ -1739,7 +1819,7 @@ bool BlueStore::Blob::put_ref(
   return b.release_extents(empty, logical, r);
 }
 
-bool BlueStore::Blob::try_reuse_blob(uint32_t min_alloc_size,
+bool BlueStore::Blob::can_reuse_blob(uint32_t min_alloc_size,
                                     uint32_t target_blob_size,
                                     uint32_t b_offset,
                                     uint32_t *length0) {
@@ -1767,17 +1847,25 @@ bool BlueStore::Blob::try_reuse_blob(uint32_t min_alloc_size,
   target_blob_size = MAX(blen, target_blob_size);
 
   if (b_offset >= blen) {
-    //new data totally stands out of the existing blob
-    new_blen = b_offset + length;
+    // new data totally stands out of the existing blob
+    new_blen = end;
   } else {
-    //new data overlaps with the existing blob
-    new_blen = MAX(blen, length + b_offset);
-    if (!get_blob().is_unallocated(
-      b_offset,
-      new_blen > blen ? blen - b_offset : length)) {
-          return false;
+    // new data overlaps with the existing blob
+    new_blen = MAX(blen, end);
+
+    uint32_t overlap = 0;
+    if (new_blen > blen) {
+      overlap = blen - b_offset;
+    } else {
+      overlap = length;
+    }
+
+    if (!get_blob().is_unallocated(b_offset, overlap)) {
+      // abort if any piece of the overlap has already been allocated
+      return false;
     }
   }
+
   if (new_blen > blen) {
     int64_t overflow = int64_t(new_blen) - target_blob_size;
     // Unable to decrease the provided length to fit into max_blob_size
@@ -1795,10 +1883,11 @@ bool BlueStore::Blob::try_reuse_blob(uint32_t min_alloc_size,
       length -= overflow;
       *length0 = length;
     }
+
     if (new_blen > blen) {
       dirty_blob().add_tail(new_blen);
       used_in_blob.add_tail(new_blen,
-                           blob.get_release_size(min_alloc_size));
+                            get_blob().get_release_size(min_alloc_size));
     }
   }
   return true;
@@ -1899,6 +1988,7 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
       unsigned n;
       // we need to encode inline_bl to measure encoded length
       bool never_happen = encode_some(0, OBJECT_MAX_SIZE, inline_bl, &n);
+      inline_bl.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
       assert(!never_happen);
       size_t len = inline_bl.length();
       dout(20) << __func__ << "  inline shard " << len << " bytes from " << n
@@ -1924,6 +2014,7 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
     auto p = shards.begin();
     auto prev_p = p;
     while (p != shards.end()) {
+      assert(p->shard_info->offset >= prev_p->shard_info->offset);
       auto n = p;
       ++n;
       if (p->dirty) {
@@ -1957,20 +2048,13 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
          // avoid resharding the trailing shard, even if it is small
          else if (n != shards.end() &&
                   len < g_conf->bluestore_extent_map_shard_min_size) {
-           // we are small; combine with a neighbor
-           if (p == shards.begin() && endoff == OBJECT_MAX_SIZE) {
-             // we are an only shard
-             request_reshard(0, OBJECT_MAX_SIZE);
-             return;
-           } else if (p == shards.begin()) {
-             // combine with next shard
+            assert(endoff != OBJECT_MAX_SIZE);
+           if (p == shards.begin()) {
+             // we are the first shard, combine with next shard
              request_reshard(p->shard_info->offset, endoff + 1);
-           } else if (endoff == OBJECT_MAX_SIZE) {
-             // combine with previous shard
-             request_reshard(prev_p->shard_info->offset, endoff);
-             return;
            } else {
-             // combine with the smaller of the two
+             // combine either with the previous shard or the next,
+             // whichever is smaller
              if (prev_p->shard_info->bytes > n->shard_info->bytes) {
                request_reshard(p->shard_info->offset, endoff + 1);
              } else {
@@ -2004,6 +2088,28 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
   }
 }
 
+bid_t BlueStore::ExtentMap::allocate_spanning_blob_id()
+{
+  if (spanning_blob_map.empty())
+    return 0;
+  bid_t bid = spanning_blob_map.rbegin()->first + 1;
+  // bid is valid and available.
+  if (bid >= 0)
+    return bid;
+  // Find next unused bid;
+  bid = rand() % (numeric_limits<bid_t>::max() + 1);
+  const auto begin_bid = bid;
+  do {
+    if (!spanning_blob_map.count(bid))
+      return bid;
+    else {
+      bid++;
+      if (bid < 0) bid = 0;
+    }
+  } while (bid != begin_bid);
+  assert(0 == "no available blob id");
+}
+
 void BlueStore::ExtentMap::reshard(
   KeyValueDB *db,
   KeyValueDB::Transaction t)
@@ -2040,7 +2146,7 @@ void BlueStore::ExtentMap::reshard(
             << needs_reshard_end << ")" << std::dec << dendl;
   }
 
-  fault_range(db, needs_reshard_begin, needs_reshard_end);
+  fault_range(db, needs_reshard_begin, (needs_reshard_end - needs_reshard_begin));
 
   // we may need to fault in a larger interval later must have all
   // referring extents for spanning blobs loaded in order to have
@@ -2080,7 +2186,7 @@ void BlueStore::ExtentMap::reshard(
 
   // reshard
   unsigned estimate = 0;
-  unsigned offset = 0;
+  unsigned offset = needs_reshard_begin;
   vector<bluestore_onode_t::shard_info> new_shard_info;
   unsigned max_blob_end = 0;
   Extent dummy(needs_reshard_begin);
@@ -2097,11 +2203,11 @@ void BlueStore::ExtentMap::reshard(
     if (estimate &&
        estimate + extent_avg > target + (would_span ? slop : 0)) {
       // new shard
-      if (offset == 0) {
+      if (offset == needs_reshard_begin) {
        new_shard_info.emplace_back(bluestore_onode_t::shard_info());
        new_shard_info.back().offset = offset;
        dout(20) << __func__ << "  new shard 0x" << std::hex << offset
-                << std::dec << dendl;
+                 << std::dec << dendl;
       }
       offset = e->logical_offset;
       new_shard_info.emplace_back(bluestore_onode_t::shard_info());
@@ -2111,9 +2217,9 @@ void BlueStore::ExtentMap::reshard(
       estimate = 0;
     }
     estimate += extent_avg;
-    unsigned bb = e->blob_start();
-    if (bb < spanning_scan_begin) {
-      spanning_scan_begin = bb;
+    unsigned bs = e->blob_start();
+    if (bs < spanning_scan_begin) {
+      spanning_scan_begin = bs;
     }
     uint32_t be = e->blob_end();
     if (be > max_blob_end) {
@@ -2149,16 +2255,21 @@ void BlueStore::ExtentMap::reshard(
       new_shard_info.begin(),
       new_shard_info.end());
     shards.insert(shards.begin() + si_begin, new_shard_info.size(), Shard());
-    unsigned n = sv.size();
     si_end = si_begin + new_shard_info.size();
-    for (unsigned i = si_begin; i < si_end; ++i) {
+
+    assert(sv.size() == shards.size());
+
+    // note that we need to update every shard_info of shards here,
+    // as sv might have been totally re-allocated above
+    for (unsigned i = 0; i < shards.size(); i++) {
       shards[i].shard_info = &sv[i];
+    }
+
+    // mark newly added shards as dirty
+    for (unsigned i = si_begin; i < si_end; ++i) {
       shards[i].loaded = true;
       shards[i].dirty = true;
     }
-    for (unsigned i = si_end; i < n; ++i) {
-      shards[i].shard_info = &sv[i];
-    }
   }
   dout(20) << __func__ << "  fin " << sv << dendl;
   inline_bl.clear();
@@ -2181,7 +2292,7 @@ void BlueStore::ExtentMap::reshard(
     }
     if (spanning_scan_end > needs_reshard_end) {
       fault_range(db, needs_reshard_end,
-                 spanning_scan_end - needs_reshard_begin);
+                 spanning_scan_end - needs_reshard_end);
     }
     auto sp = sv.begin() + si_begin;
     auto esp = sv.end();
@@ -2193,12 +2304,6 @@ void BlueStore::ExtentMap::reshard(
     } else {
       shard_end = sp->offset;
     }
-    int bid;
-    if (spanning_blob_map.empty()) {
-      bid = 0;
-    } else {
-      bid = spanning_blob_map.rbegin()->first + 1;
-    }
     Extent dummy(needs_reshard_begin);
     for (auto e = extent_map.lower_bound(dummy); e != extent_map.end(); ++e) {
       if (e->logical_offset >= needs_reshard_end) {
@@ -2252,7 +2357,8 @@ void BlueStore::ExtentMap::reshard(
            must_span = true;
          }
          if (must_span) {
-           b->id = bid++;
+            auto bid = allocate_spanning_blob_id();
+            b->id = bid;
            spanning_blob_map[b->id] = b;
            dout(20) << __func__ << "    adding spanning " << *b << dendl;
          }
@@ -2287,8 +2393,6 @@ bool BlueStore::ExtentMap::encode_some(
 
   unsigned n = 0;
   size_t bound = 0;
-  denc(struct_v, bound);
-  denc_varint(0, bound);
   bool must_reshard = false;
   for (auto p = start;
        p != extent_map.end() && p->logical_offset < end;
@@ -2301,21 +2405,26 @@ bool BlueStore::ExtentMap::encode_some(
       request_reshard(p->blob_start(), p->blob_end());
       must_reshard = true;
     }
-    denc_varint(0, bound); // blobid
-    denc_varint(0, bound); // logical_offset
-    denc_varint(0, bound); // len
-    denc_varint(0, bound); // blob_offset
+    if (!must_reshard) {
+      denc_varint(0, bound); // blobid
+      denc_varint(0, bound); // logical_offset
+      denc_varint(0, bound); // len
+      denc_varint(0, bound); // blob_offset
 
-    p->blob->bound_encode(
-      bound,
-      struct_v,
-      p->blob->shared_blob->get_sbid(),
-      false);
+      p->blob->bound_encode(
+        bound,
+        struct_v,
+        p->blob->shared_blob->get_sbid(),
+        false);
+    }
   }
   if (must_reshard) {
     return true;
   }
 
+  denc(struct_v, bound);
+  denc_varint(0, bound); // number of extents
+
   {
     auto app = bl.get_contiguous_appender(bound);
     denc(struct_v, app);
@@ -2572,7 +2681,6 @@ void BlueStore::ExtentMap::fault_range(
 }
 
 void BlueStore::ExtentMap::dirty_range(
-  KeyValueDB::Transaction t,
   uint32_t offset,
   uint32_t length)
 {
@@ -2614,15 +2722,6 @@ BlueStore::extent_map_t::iterator BlueStore::ExtentMap::find(
   return extent_map.find(dummy);
 }
 
-BlueStore::extent_map_t::iterator BlueStore::ExtentMap::find_lextent(
-  uint64_t offset)
-{
-  auto fp = seek_lextent(offset);
-  if (fp != extent_map.end() && fp->logical_offset > offset)
-    return extent_map.end();  // extent is past offset
-  return fp;
-}
-
 BlueStore::extent_map_t::iterator BlueStore::ExtentMap::seek_lextent(
   uint64_t offset)
 {
@@ -2879,9 +2978,9 @@ bool BlueStore::WriteContext::has_conflict(
   for (auto w : writes) {
     if (b == w.b) {
       auto loffs2 = P2ALIGN(w.logical_offset, min_alloc_size);
-      auto loffs2_end = ROUND_UP_TO( w.logical_offset + w.length0, min_alloc_size);
+      auto loffs2_end = P2ROUNDUP(w.logical_offset + w.length0, min_alloc_size);
       if ((loffs <= loffs2 && loffs_end > loffs2) ||
-          (loffs >= loffs2 &&  loffs < loffs2_end)) {
+          (loffs >= loffs2 && loffs < loffs2_end)) {
         return true;
       }
     }
@@ -2905,6 +3004,8 @@ void BlueStore::DeferredBatch::prepare_write(
   assert(i.second);  // this should be a new insertion
   i.first->second.seq = seq;
   blp.copy(length, i.first->second.bl);
+  i.first->second.bl.reassign_to_mempool(
+    mempool::mempool_bluestore_writing_deferred);
   dout(20) << __func__ << " seq " << seq
           << " 0x" << std::hex << offset << "~" << length
           << " crc " << i.first->second.bl.crc32c(-1)
@@ -2931,6 +3032,7 @@ void BlueStore::DeferredBatch::_discard(
               << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
               << " -> 0x" << head.length() << std::dec << dendl;
       auto i = seq_bytes.find(p->second.seq);
+      assert(i != seq_bytes.end());
       if (end > offset + length) {
        bufferlist tail;
        tail.substr_of(p->second.bl, offset + length - p->first,
@@ -2945,6 +3047,7 @@ void BlueStore::DeferredBatch::_discard(
       } else {
        i->second -= end - offset;
       }
+      assert(i->second >= 0);
       p->second.bl.swap(head);
     }
     ++p;
@@ -2954,6 +3057,7 @@ void BlueStore::DeferredBatch::_discard(
       break;
     }
     auto i = seq_bytes.find(p->second.seq);
+    assert(i != seq_bytes.end());
     auto end = p->first + p->second.bl.length();
     if (end > offset + length) {
       unsigned drop_front = offset + length - p->first;
@@ -2973,6 +3077,7 @@ void BlueStore::DeferredBatch::_discard(
               << std::dec << dendl;
       i->second -= p->second.bl.length();
     }
+    assert(i->second >= 0);
     p = iomap.erase(p);
   }
 }
@@ -3057,14 +3162,12 @@ void BlueStore::Collection::load_shared_blob(SharedBlobRef sb)
 
 void BlueStore::Collection::make_blob_shared(uint64_t sbid, BlobRef b)
 {
-  assert(!b->shared_blob->is_loaded());
-
   ldout(store->cct, 10) << __func__ << " " << *b << dendl;
-  bluestore_blob_t& blob = b->dirty_blob();
+  assert(!b->shared_blob->is_loaded());
 
   // update blob
+  bluestore_blob_t& blob = b->dirty_blob();
   blob.set_flag(bluestore_blob_t::FLAG_SHARED);
-  blob.clear_flag(bluestore_blob_t::FLAG_MUTABLE);
 
   // update shared blob
   b->shared_blob->loaded = true;
@@ -3080,6 +3183,20 @@ void BlueStore::Collection::make_blob_shared(uint64_t sbid, BlobRef b)
   ldout(store->cct, 20) << __func__ << " now " << *b << dendl;
 }
 
+uint64_t BlueStore::Collection::make_blob_unshared(SharedBlob *sb)
+{
+  ldout(store->cct, 10) << __func__ << " " << *sb << dendl;
+  assert(sb->is_loaded());
+
+  uint64_t sbid = sb->get_sbid();
+  shared_blob_set.remove(sb);
+  sb->loaded = false;
+  delete sb->persistent;
+  sb->sbid_unloaded = 0;
+  ldout(store->cct, 20) << __func__ << " now " << *sb << dendl;
+  return sbid;
+}
+
 BlueStore::OnodeRef BlueStore::Collection::get_onode(
   const ghobject_t& oid,
   bool create)
@@ -3099,7 +3216,7 @@ BlueStore::OnodeRef BlueStore::Collection::get_onode(
   if (o)
     return o;
 
-  mempool::bluestore_meta_other::string key;
+  mempool::bluestore_cache_other::string key;
   get_object_key(store->cct, oid, &key);
 
   ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
@@ -3122,14 +3239,19 @@ BlueStore::OnodeRef BlueStore::Collection::get_onode(
     assert(r >= 0);
     on = new Onode(this, oid, key);
     on->exists = true;
-    bufferptr::iterator p = v.front().begin();
+    bufferptr::iterator p = v.front().begin_deep();
     on->onode.decode(p);
+    for (auto& i : on->onode.attrs) {
+      i.second.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+    }
 
     // initialize extent_map
     on->extent_map.decode_spanning_blobs(p);
     if (on->onode.extent_map_shards.empty()) {
       denc(on->extent_map.inline_bl, p);
       on->extent_map.decode_some(on->extent_map.inline_bl);
+      on->extent_map.inline_bl.reassign_to_mempool(
+       mempool::mempool_bluestore_cache_other);
     } else {
       on->extent_map.init_shards(false, false);
     }
@@ -3188,13 +3310,14 @@ void BlueStore::Collection::split_cache(
          continue;
        }
        ldout(store->cct, 20) << __func__ << "  moving " << *sb << dendl;
+       if (sb->get_sbid()) {
+         ldout(store->cct, 20) << __func__
+                               << "   moving registration " << *sb << dendl;
+         shared_blob_set.remove(sb);
+         dest->shared_blob_set.add(dest, sb);
+       }
        sb->coll = dest;
        if (dest->cache != cache) {
-         if (sb->get_sbid()) {
-           ldout(store->cct, 20) << __func__ << "   moving registration " << *sb << dendl;
-           shared_blob_set.remove(sb);
-           dest->shared_blob_set.add(dest, sb);
-         }
          for (auto& i : sb->bc.buffer_map) {
            if (!i.second->is_writing()) {
              ldout(store->cct, 20) << __func__ << "   moving " << *i.second
@@ -3204,54 +3327,41 @@ void BlueStore::Collection::split_cache(
          }
        }
       }
-
-
     }
   }
 }
 
-void BlueStore::Collection::trim_cache()
-{
-  // see if mempool stats have updated
-  uint64_t total_bytes;
-  uint64_t total_onodes;
-  size_t seq;
-  store->get_mempool_stats(&seq, &total_bytes, &total_onodes);
-  if (seq == cache->last_trim_seq) {
-    ldout(store->cct, 30) << __func__ << " no new mempool stats; nothing to do"
-                         << dendl;
-    return;
-  }
-  cache->last_trim_seq = seq;
-
-  // trim
-  if (total_onodes < 2) {
-    total_onodes = 2;
-  }
-  float bytes_per_onode = (float)total_bytes / (float)total_onodes;
-  size_t num_shards = store->cache_shards.size();
-  uint64_t shard_target = store->cct->_conf->bluestore_cache_size / num_shards;
-  ldout(store->cct, 30) << __func__
-                       << " total meta bytes " << total_bytes
-                       << ", total onodes " << total_onodes
-                       << ", bytes_per_onode " << bytes_per_onode
-          << dendl;
-  cache->trim(shard_target, store->cct->_conf->bluestore_cache_meta_ratio,
-             bytes_per_onode);
-
-  store->_update_cache_logger();
-}
-
 // =======================================================
 
 void *BlueStore::MempoolThread::entry()
 {
   Mutex::Locker l(lock);
   while (!stop) {
-    store->mempool_bytes = mempool::bluestore_meta_other::allocated_bytes() +
-      mempool::bluestore_meta_onode::allocated_bytes();
-    store->mempool_onodes = mempool::bluestore_meta_onode::allocated_items();
-    ++store->mempool_seq;
+    uint64_t meta_bytes =
+      mempool::bluestore_cache_other::allocated_bytes() +
+      mempool::bluestore_cache_onode::allocated_bytes();
+    uint64_t onode_num =
+      mempool::bluestore_cache_onode::allocated_items();
+
+    if (onode_num < 2) {
+      onode_num = 2;
+    }
+
+    float bytes_per_onode = (float)meta_bytes / (float)onode_num;
+    size_t num_shards = store->cache_shards.size();
+    float target_ratio = store->cache_meta_ratio + store->cache_data_ratio;
+    // A little sloppy but should be close enough
+    uint64_t shard_target = target_ratio * (store->cache_size / num_shards);
+
+    for (auto i : store->cache_shards) {
+      i->trim(shard_target,
+             store->cache_meta_ratio,
+             store->cache_data_ratio,
+             bytes_per_onode);
+    }
+
+    store->_update_cache_logger();
+
     utime_t wait;
     wait += store->cct->_conf->bluestore_cache_trim_interval;
     cond.WaitInterval(lock, wait);
@@ -3262,6 +3372,108 @@ void *BlueStore::MempoolThread::entry()
 
 // =======================================================
 
+// OmapIteratorImpl
+
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore.OmapIteratorImpl(" << this << ") "
+
+BlueStore::OmapIteratorImpl::OmapIteratorImpl(
+  CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
+  : c(c), o(o), it(it)
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    get_omap_key(o->onode.nid, string(), &head);
+    get_omap_tail(o->onode.nid, &tail);
+    it->lower_bound(head);
+  }
+}
+
+int BlueStore::OmapIteratorImpl::seek_to_first()
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    it->lower_bound(head);
+  } else {
+    it = KeyValueDB::Iterator();
+  }
+  return 0;
+}
+
+int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    string key;
+    get_omap_key(o->onode.nid, after, &key);
+    ldout(c->store->cct,20) << __func__ << " after " << after << " key "
+                           << pretty_binary_string(key) << dendl;
+    it->upper_bound(key);
+  } else {
+    it = KeyValueDB::Iterator();
+  }
+  return 0;
+}
+
+int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    string key;
+    get_omap_key(o->onode.nid, to, &key);
+    ldout(c->store->cct,20) << __func__ << " to " << to << " key "
+                           << pretty_binary_string(key) << dendl;
+    it->lower_bound(key);
+  } else {
+    it = KeyValueDB::Iterator();
+  }
+  return 0;
+}
+
+bool BlueStore::OmapIteratorImpl::valid()
+{
+  RWLock::RLocker l(c->lock);
+  bool r = o->onode.has_omap() && it && it->valid() &&
+    it->raw_key().second <= tail;
+  if (it && it->valid()) {
+    ldout(c->store->cct,20) << __func__ << " is at "
+                           << pretty_binary_string(it->raw_key().second)
+                           << dendl;
+  }
+  return r;
+}
+
+int BlueStore::OmapIteratorImpl::next(bool validate)
+{
+  RWLock::RLocker l(c->lock);
+  if (o->onode.has_omap()) {
+    it->next();
+    return 0;
+  } else {
+    return -1;
+  }
+}
+
+string BlueStore::OmapIteratorImpl::key()
+{
+  RWLock::RLocker l(c->lock);
+  assert(it->valid());
+  string db_key = it->raw_key().second;
+  string user_key;
+  decode_omap_key(db_key, &user_key);
+  return user_key;
+}
+
+bufferlist BlueStore::OmapIteratorImpl::value()
+{
+  RWLock::RLocker l(c->lock);
+  assert(it->valid());
+  return it->value();
+}
+
+
+// =====================================
+
 #undef dout_prefix
 #define dout_prefix *_dout << "bluestore(" << path << ") "
 
@@ -3280,23 +3492,14 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
     throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
                       cct->_conf->bluestore_throttle_bytes +
                       cct->_conf->bluestore_throttle_deferred_bytes),
+    deferred_finisher(cct, "defered_finisher", "dfin"),
     kv_sync_thread(this),
+    kv_finalize_thread(this),
     mempool_thread(this)
 {
   _init_logger();
   cct->_conf->add_observer(this);
   set_cache_shards(1);
-
-  if (cct->_conf->bluestore_shard_finishers) {
-    m_finisher_num = cct->_conf->osd_op_num_shards;
-  }
-
-  for (int i = 0; i < m_finisher_num; ++i) {
-    ostringstream oss;
-    oss << "finisher-" << i;
-    Finisher *f = new Finisher(cct, oss.str(), "finisher");
-    finishers.push_back(f);
-  }
 }
 
 BlueStore::BlueStore(CephContext *cct,
@@ -3308,7 +3511,9 @@ BlueStore::BlueStore(CephContext *cct,
     throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
                       cct->_conf->bluestore_throttle_bytes +
                       cct->_conf->bluestore_throttle_deferred_bytes),
+    deferred_finisher(cct, "defered_finisher", "dfin"),
     kv_sync_thread(this),
+    kv_finalize_thread(this),
     min_alloc_size(_min_alloc_size),
     min_alloc_size_order(ctz(_min_alloc_size)),
     mempool_thread(this)
@@ -3316,17 +3521,6 @@ BlueStore::BlueStore(CephContext *cct,
   _init_logger();
   cct->_conf->add_observer(this);
   set_cache_shards(1);
-
-  if (cct->_conf->bluestore_shard_finishers) {
-    m_finisher_num = cct->_conf->osd_op_num_shards;
-  }
-
-  for (int i = 0; i < m_finisher_num; ++i) {
-    ostringstream oss;
-    oss << "finisher-" << i;
-    Finisher *f = new Finisher(cct, oss.str(), "finisher");
-    finishers.push_back(f);
-  }
 }
 
 BlueStore::~BlueStore()
@@ -3361,11 +3555,14 @@ const char **BlueStore::get_tracked_conf_keys() const
     "bluestore_compression_max_blob_size",
     "bluestore_compression_max_blob_size_ssd",
     "bluestore_compression_max_blob_size_hdd",
+    "bluestore_compression_required_ratio",
     "bluestore_max_alloc_size",
     "bluestore_prefer_deferred_size",
-    "bleustore_deferred_batch_ops",
-    "bleustore_deferred_batch_ops_hdd",
-    "bleustore_deferred_batch_ops_ssd",
+    "bluestore_prefer_deferred_size_hdd",
+    "bluestore_prefer_deferred_size_ssd",
+    "bluestore_deferred_batch_ops",
+    "bluestore_deferred_batch_ops_hdd",
+    "bluestore_deferred_batch_ops_ssd",
     "bluestore_throttle_bytes",
     "bluestore_throttle_deferred_bytes",
     "bluestore_throttle_cost_per_io_hdd",
@@ -3402,6 +3599,8 @@ void BlueStore::handle_conf_change(const struct md_config_t *conf,
     }
   }
   if (changed.count("bluestore_prefer_deferred_size") ||
+      changed.count("bluestore_prefer_deferred_size_hdd") ||
+      changed.count("bluestore_prefer_deferred_size_ssd") ||
       changed.count("bluestore_max_alloc_size") ||
       changed.count("bluestore_deferred_batch_ops") ||
       changed.count("bluestore_deferred_batch_ops_hdd") ||
@@ -3431,16 +3630,35 @@ void BlueStore::handle_conf_change(const struct md_config_t *conf,
 
 void BlueStore::_set_compression()
 {
-  if (cct->_conf->bluestore_compression_max_blob_size) {
-    comp_min_blob_size = cct->_conf->bluestore_compression_max_blob_size;
+  auto m = Compressor::get_comp_mode_type(cct->_conf->bluestore_compression_mode);
+  if (m) {
+    comp_mode = *m;
   } else {
-    assert(bdev);
-    if (bdev->is_rotational()) {
-      comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size_hdd;
-    } else {
-      comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size_ssd;
-    }
-  }
+    derr << __func__ << " unrecognized value '"
+         << cct->_conf->bluestore_compression_mode
+         << "' for bluestore_compression_mode, reverting to 'none'"
+         << dendl;
+    comp_mode = Compressor::COMP_NONE;
+  }
+
+  compressor = nullptr;
+
+  if (comp_mode == Compressor::COMP_NONE) {
+    dout(10) << __func__ << " compression mode set to 'none', "
+             << "ignore other compression setttings" << dendl;
+    return;
+  }
+
+  if (cct->_conf->bluestore_compression_min_blob_size) {
+    comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size;
+  } else {
+    assert(bdev);
+    if (bdev->is_rotational()) {
+      comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size_hdd;
+    } else {
+      comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size_ssd;
+    }
+  }
 
   if (cct->_conf->bluestore_compression_max_blob_size) {
     comp_max_blob_size = cct->_conf->bluestore_compression_max_blob_size;
@@ -3453,19 +3671,6 @@ void BlueStore::_set_compression()
     }
   }
 
-  auto m = Compressor::get_comp_mode_type(cct->_conf->bluestore_compression_mode);
-  if (m) {
-    comp_mode = *m;
-  } else {
-    derr << __func__ << " unrecognized value '"
-         << cct->_conf->bluestore_compression_mode
-         << "' for bluestore_compression_mode, reverting to 'none'"
-         << dendl;
-    comp_mode = Compressor::COMP_NONE;
-  }
-
-  compressor = nullptr;
-
   auto& alg_name = cct->_conf->bluestore_compression_algorithm;
   if (!alg_name.empty()) {
     compressor = Compressor::create(cct, alg_name);
@@ -3524,6 +3729,99 @@ void BlueStore::_set_blob_size()
            << std::dec << dendl;
 }
 
+int BlueStore::_set_cache_sizes()
+{
+  assert(bdev);
+  if (cct->_conf->bluestore_cache_size) {
+    cache_size = cct->_conf->bluestore_cache_size;
+  } else {
+    // choose global cache size based on backend type
+    if (bdev->is_rotational()) {
+      cache_size = cct->_conf->bluestore_cache_size_hdd;
+    } else {
+      cache_size = cct->_conf->bluestore_cache_size_ssd;
+    }
+  }
+  cache_meta_ratio = cct->_conf->bluestore_cache_meta_ratio;
+  cache_kv_ratio = cct->_conf->bluestore_cache_kv_ratio;
+
+  double cache_kv_max = cct->_conf->bluestore_cache_kv_max;
+  double cache_kv_max_ratio = 0;
+
+  // if cache_kv_max is negative, disable it
+  if (cache_size > 0 && cache_kv_max >= 0) {
+    cache_kv_max_ratio = (double) cache_kv_max / (double) cache_size;
+    if (cache_kv_max_ratio < 1.0 && cache_kv_max_ratio < cache_kv_ratio) {
+      dout(1) << __func__ << " max " << cache_kv_max_ratio
+            << " < ratio " << cache_kv_ratio
+            << dendl;
+      cache_meta_ratio = cache_meta_ratio + cache_kv_ratio - cache_kv_max_ratio;
+      cache_kv_ratio = cache_kv_max_ratio;
+    }
+  }  
+
+  cache_data_ratio =
+    (double)1.0 - (double)cache_meta_ratio - (double)cache_kv_ratio;
+
+  if (cache_meta_ratio < 0 || cache_meta_ratio > 1.0) {
+    derr << __func__ << " bluestore_cache_meta_ratio (" << cache_meta_ratio
+        << ") must be in range [0,1.0]" << dendl;
+    return -EINVAL;
+  }
+  if (cache_kv_ratio < 0 || cache_kv_ratio > 1.0) {
+    derr << __func__ << " bluestore_cache_kv_ratio (" << cache_kv_ratio
+        << ") must be in range [0,1.0]" << dendl;
+    return -EINVAL;
+  }
+  if (cache_meta_ratio + cache_kv_ratio > 1.0) {
+    derr << __func__ << " bluestore_cache_meta_ratio (" << cache_meta_ratio
+        << ") + bluestore_cache_kv_ratio (" << cache_kv_ratio
+        << ") = " << cache_meta_ratio + cache_kv_ratio << "; must be <= 1.0"
+        << dendl;
+    return -EINVAL;
+  }
+  if (cache_data_ratio < 0) {
+    // deal with floating point imprecision
+    cache_data_ratio = 0;
+  }
+  dout(1) << __func__ << " cache_size " << cache_size
+          << " meta " << cache_meta_ratio
+         << " kv " << cache_kv_ratio
+         << " data " << cache_data_ratio
+         << dendl;
+  return 0;
+}
+
+int BlueStore::write_meta(const std::string& key, const std::string& value)
+{
+  bluestore_bdev_label_t label;
+  string p = path + "/block";
+  int r = _read_bdev_label(cct, p, &label);
+  if (r < 0) {
+    return ObjectStore::write_meta(key, value);
+  }
+  label.meta[key] = value;
+  r = _write_bdev_label(cct, p, label);
+  assert(r == 0);
+  return ObjectStore::write_meta(key, value);
+}
+
+int BlueStore::read_meta(const std::string& key, std::string *value)
+{
+  bluestore_bdev_label_t label;
+  string p = path + "/block";
+  int r = _read_bdev_label(cct, p, &label);
+  if (r < 0) {
+    return ObjectStore::read_meta(key, value);
+  }
+  auto i = label.meta.find(key);
+  if (i == label.meta.end()) {
+    return ObjectStore::read_meta(key, value);
+  }
+  *value = i->second;
+  return 0;
+}
+
 void BlueStore::_init_logger()
 {
   PerfCountersBuilder b(cct, "bluestore",
@@ -3661,6 +3959,8 @@ void BlueStore::_init_logger()
   b.add_u64_counter(l_bluestore_gc_merged, "bluestore_gc_merged",
                    "Sum for extents that have been merged due to garbage "
                    "collection");
+  b.add_u64_counter(l_bluestore_read_eio, "bluestore_read_eio",
+                    "Read EIO errors propagated to high level callers");
   logger = b.create_perf_counters();
   cct->get_perfcounters_collection()->add(logger);
 }
@@ -3699,8 +3999,14 @@ int BlueStore::get_block_device_fsid(CephContext* cct, const string& path,
 
 int BlueStore::_open_path()
 {
+  // sanity check(s)
+  if (cct->_conf->get_val<uint64_t>("osd_max_object_size") >=
+      4*1024*1024*1024ull) {
+    derr << __func__ << " osd_max_object_size >= 4GB; BlueStore has hard limit of 4GB." << dendl;
+    return -EINVAL;
+  }
   assert(path_fd < 0);
-  path_fd = ::open(path.c_str(), O_DIRECTORY);
+  path_fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_DIRECTORY));
   if (path_fd < 0) {
     int r = -errno;
     derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
@@ -3716,7 +4022,8 @@ void BlueStore::_close_path()
   path_fd = -1;
 }
 
-int BlueStore::_write_bdev_label(string path, bluestore_bdev_label_t label)
+int BlueStore::_write_bdev_label(CephContext *cct,
+                                string path, bluestore_bdev_label_t label)
 {
   dout(10) << __func__ << " path " << path << " label " << label << dendl;
   bufferlist bl;
@@ -3728,7 +4035,7 @@ int BlueStore::_write_bdev_label(string path, bluestore_bdev_label_t label)
   z.zero();
   bl.append(std::move(z));
 
-  int fd = ::open(path.c_str(), O_WRONLY);
+  int fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_WRONLY));
   if (fd < 0) {
     fd = -errno;
     derr << __func__ << " failed to open " << path << ": " << cpp_strerror(fd)
@@ -3740,6 +4047,11 @@ int BlueStore::_write_bdev_label(string path, bluestore_bdev_label_t label)
     derr << __func__ << " failed to write to " << path
         << ": " << cpp_strerror(r) << dendl;
   }
+  r = ::fsync(fd);
+  if (r < 0) {
+    derr << __func__ << " failed to fsync " << path
+        << ": " << cpp_strerror(r) << dendl;
+  }
   VOID_TEMP_FAILURE_RETRY(::close(fd));
   return r;
 }
@@ -3748,7 +4060,7 @@ int BlueStore::_read_bdev_label(CephContext* cct, string path,
                                bluestore_bdev_label_t *label)
 {
   dout(10) << __func__ << dendl;
-  int fd = ::open(path.c_str(), O_RDONLY);
+  int fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_RDONLY));
   if (fd < 0) {
     fd = -errno;
     derr << __func__ << " failed to open " << path << ": " << cpp_strerror(fd)
@@ -3774,10 +4086,10 @@ int BlueStore::_read_bdev_label(CephContext* cct, string path,
     ::decode(expected_crc, p);
   }
   catch (buffer::error& e) {
-    derr << __func__ << " unable to decode label at offset " << p.get_off()
+    dout(2) << __func__ << " unable to decode label at offset " << p.get_off()
         << ": " << e.what()
         << dendl;
-    return -EINVAL;
+    return -ENOENT;
   }
   if (crc != expected_crc) {
     derr << __func__ << " bad crc on label, expected " << expected_crc
@@ -3797,14 +4109,18 @@ int BlueStore::_check_or_set_bdev_label(
     label.size = size;
     label.btime = ceph_clock_now();
     label.description = desc;
-    int r = _write_bdev_label(path, label);
+    int r = _write_bdev_label(cct, path, label);
     if (r < 0)
       return r;
   } else {
     int r = _read_bdev_label(cct, path, &label);
     if (r < 0)
       return r;
-    if (label.osd_uuid != fsid) {
+    if (cct->_conf->bluestore_debug_permit_any_bdev_label) {
+      dout(20) << __func__ << " bdev " << path << " fsid " << label.osd_uuid
+          << " and fsid " << fsid << " check bypassed" << dendl;
+    }
+    else if (label.osd_uuid != fsid) {
       derr << __func__ << " bdev " << path << " fsid " << label.osd_uuid
           << " does not match our fsid " << fsid << dendl;
       return -EIO;
@@ -3815,9 +4131,6 @@ int BlueStore::_check_or_set_bdev_label(
 
 void BlueStore::_set_alloc_sizes(void)
 {
-  min_alloc_size_order = ctz(min_alloc_size);
-  assert(min_alloc_size == 1u << min_alloc_size_order);
-
   max_alloc_size = cct->_conf->bluestore_max_alloc_size;
 
   if (cct->_conf->bluestore_prefer_deferred_size) {
@@ -3871,6 +4184,11 @@ int BlueStore::_open_bdev(bool create)
   block_mask = ~(block_size - 1);
   block_size_order = ctz(block_size);
   assert(block_size == 1u << block_size_order);
+  // and set cache_size based on device type
+  r = _set_cache_sizes();
+  if (r < 0) {
+    goto fail_close;
+  }
   return 0;
 
  fail_close:
@@ -3903,18 +4221,22 @@ int BlueStore::_open_fm(bool create)
       bl.append(freelist_type);
       t->set(PREFIX_SUPER, "freelist_type", bl);
     }
-    fm->create(bdev->get_size(), t);
+    // being able to allocate in units less than bdev block size 
+    // seems to be a bad idea.
+    assert( cct->_conf->bdev_block_size <= (int64_t)min_alloc_size);
+    fm->create(bdev->get_size(), (int64_t)min_alloc_size, t);
 
     // allocate superblock reserved space.  note that we do not mark
     // bluefs space as allocated in the freelist; we instead rely on
     // bluefs_extents.
-    fm->allocate(0, SUPER_RESERVED, t);
+    uint64_t reserved = ROUND_UP_TO(MAX(SUPER_RESERVED, min_alloc_size),
+                                   min_alloc_size);
+    fm->allocate(0, reserved, t);
 
-    uint64_t reserved = 0;
     if (cct->_conf->bluestore_bluefs) {
       assert(bluefs_extents.num_intervals() == 1);
       interval_set<uint64_t>::iterator p = bluefs_extents.begin();
-      reserved = p.get_start() + p.get_len();
+      reserved = ROUND_UP_TO(p.get_start() + p.get_len(), min_alloc_size);
       dout(20) << __func__ << " reserved 0x" << std::hex << reserved << std::dec
               << " for bluefs" << dendl;
       bufferlist bl;
@@ -3922,8 +4244,6 @@ int BlueStore::_open_fm(bool create)
       t->set(PREFIX_SUPER, "bluefs_extents", bl);
       dout(20) << __func__ << " bluefs_extents 0x" << std::hex << bluefs_extents
               << std::dec << dendl;
-    } else {
-      reserved = SUPER_RESERVED;
     }
 
     if (cct->_conf->bluestore_debug_prefill > 0) {
@@ -3970,7 +4290,7 @@ int BlueStore::_open_fm(bool create)
     db->submit_transaction_sync(t);
   }
 
-  int r = fm->init();
+  int r = fm->init(bdev->get_size());
   if (r < 0) {
     derr << __func__ << " freelist init failed: " << cpp_strerror(r) << dendl;
     delete fm;
@@ -4014,6 +4334,7 @@ int BlueStore::_open_alloc()
     ++num;
     bytes += length;
   }
+  fm->enumerate_reset();
   dout(1) << __func__ << " loaded " << pretty_si_t(bytes)
          << " in " << num << " extents"
          << dendl;
@@ -4117,6 +4438,49 @@ int BlueStore::_lock_fsid()
   return 0;
 }
 
+bool BlueStore::is_rotational()
+{
+  if (bdev) {
+    return bdev->is_rotational();
+  }
+
+  bool rotational = true;
+  int r = _open_path();
+  if (r < 0)
+    goto out;
+  r = _open_fsid(false);
+  if (r < 0)
+    goto out_path;
+  r = _read_fsid(&fsid);
+  if (r < 0)
+    goto out_fsid;
+  r = _lock_fsid();
+  if (r < 0)
+    goto out_fsid;
+  r = _open_bdev(false);
+  if (r < 0)
+    goto out_fsid;
+  rotational = bdev->is_rotational();
+  _close_bdev();
+ out_fsid:
+  _close_fsid();
+ out_path:
+  _close_path();
+  out:
+  return rotational;
+}
+
+bool BlueStore::is_journal_rotational()
+{
+  if (!bluefs) {
+    dout(5) << __func__ << " bluefs disabled, default to store media type"
+            << dendl;
+    return is_rotational();
+  }
+  dout(10) << __func__ << " " << (int)bluefs->wal_is_rotational() << dendl;
+  return bluefs->wal_is_rotational();
+}
+
 bool BlueStore::test_mount_in_use()
 {
   // most error conditions mean the mount is not in use (e.g., because
@@ -4223,7 +4587,15 @@ int BlueStore::_open_db(bool create)
       bluefs_shared_bdev = BlueFS::BDEV_SLOW;
       bluefs_single_shared_device = false;
     } else {
-      bluefs_shared_bdev = BlueFS::BDEV_DB;
+      r = -errno;
+      if (::lstat(bfn.c_str(), &st) == -1) {
+       r = 0;
+       bluefs_shared_bdev = BlueFS::BDEV_DB;
+      } else {
+       derr << __func__ << " " << bfn << " symlink exists but target unusable: "
+            << cpp_strerror(r) << dendl;
+       goto free_bluefs;
+      }
     }
 
     // shared device
@@ -4240,11 +4612,20 @@ int BlueStore::_open_db(bool create)
        bdev->get_size() * (cct->_conf->bluestore_bluefs_min_ratio +
                            cct->_conf->bluestore_bluefs_gift_ratio);
       initial = MAX(initial, cct->_conf->bluestore_bluefs_min);
+      if (cct->_conf->bluefs_alloc_size % min_alloc_size) {
+       derr << __func__ << " bluefs_alloc_size 0x" << std::hex
+            << cct->_conf->bluefs_alloc_size << " is not a multiple of "
+            << "min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+       r = -EINVAL;
+       goto free_bluefs;
+      }
       // align to bluefs's alloc_size
       initial = P2ROUNDUP(initial, cct->_conf->bluefs_alloc_size);
-      initial += cct->_conf->bluefs_alloc_size - SUPER_RESERVED;
-      bluefs->add_block_extent(bluefs_shared_bdev, SUPER_RESERVED, initial);
-      bluefs_extents.insert(SUPER_RESERVED, initial);
+      // put bluefs in the middle of the device in case it is an HDD
+      uint64_t start = P2ALIGN((bdev->get_size() - initial) / 2,
+                              cct->_conf->bluefs_alloc_size);
+      bluefs->add_block_extent(bluefs_shared_bdev, start, initial);
+      bluefs_extents.insert(start, initial);
     }
 
     bfn = path + "/block.wal";
@@ -4277,7 +4658,15 @@ int BlueStore::_open_db(bool create)
       cct->_conf->set_val("rocksdb_separate_wal_dir", "true");
       bluefs_single_shared_device = false;
     } else {
-      cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
+      r = -errno;
+      if (::lstat(bfn.c_str(), &st) == -1) {
+       r = 0;
+       cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
+      } else {
+       derr << __func__ << " " << bfn << " symlink exists but target unusable: "
+            << cpp_strerror(r) << dendl;
+       goto free_bluefs;
+      }
     }
 
     if (create) {
@@ -4374,6 +4763,8 @@ int BlueStore::_open_db(bool create)
   FreelistManager::setup_merge_operators(db);
   db->set_merge_operator(PREFIX_STAT, merge_op);
 
+  db->set_cache_size(cache_size * cache_kv_ratio);
+
   if (kv_backend == "rocksdb")
     options = cct->_conf->bluestore_rocksdb_options;
   db->init(options);
@@ -4503,9 +4894,11 @@ int BlueStore::_balance_bluefs_freespace(PExtentVector *extents)
             << " > max_ratio " << cct->_conf->bluestore_bluefs_max_ratio
             << ", should reclaim " << pretty_si_t(reclaim) << dendl;
   }
+
+  // don't take over too much of the freespace
+  uint64_t free_cap = cct->_conf->bluestore_bluefs_max_ratio * total_free;
   if (bluefs_total < cct->_conf->bluestore_bluefs_min &&
-    cct->_conf->bluestore_bluefs_min <
-      (uint64_t)(cct->_conf->bluestore_bluefs_max_ratio * total_free)) {
+      cct->_conf->bluestore_bluefs_min < free_cap) {
     uint64_t g = cct->_conf->bluestore_bluefs_min - bluefs_total;
     dout(10) << __func__ << " bluefs_total " << bluefs_total
             << " < min " << cct->_conf->bluestore_bluefs_min
@@ -4514,6 +4907,17 @@ int BlueStore::_balance_bluefs_freespace(PExtentVector *extents)
       gift = g;
     reclaim = 0;
   }
+  uint64_t min_free = cct->_conf->get_val<uint64_t>("bluestore_bluefs_min_free");
+  if (bluefs_free < min_free &&
+      min_free < free_cap) {
+    uint64_t g = min_free - bluefs_free;
+    dout(10) << __func__ << " bluefs_free " << bluefs_total
+            << " < min " << min_free
+            << ", should gift " << pretty_si_t(g) << dendl;
+    if (g > gift)
+      gift = g;
+    reclaim = 0;
+  }
 
   if (gift) {
     // round up to alloc size
@@ -4532,12 +4936,19 @@ int BlueStore::_balance_bluefs_freespace(PExtentVector *extents)
     int64_t alloc_len = alloc->allocate(gift, cct->_conf->bluefs_alloc_size,
                                        0, 0, &exts);
 
-    if (alloc_len < (int64_t)gift) {
-      derr << __func__ << " allocate failed on 0x" << std::hex << gift
-           << " min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+    if (alloc_len <= 0) {
+      dout(1) << __func__ << " no allocate on 0x" << std::hex << gift
+              << " min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+      alloc->unreserve(gift);
+      alloc->dump();
+      return 0;
+    } else if (alloc_len < (int64_t)gift) {
+      dout(1) << __func__ << " insufficient allocate on 0x" << std::hex << gift
+              << " min_alloc_size 0x" << min_alloc_size 
+             << " allocated 0x" << alloc_len
+             << std::dec << dendl;
+      alloc->unreserve(gift - alloc_len);
       alloc->dump();
-      assert(0 == "allocate failed, wtf");
-      return -ENOSPC;
     }
     for (auto& p : exts) {
       bluestore_pextent_t e = bluestore_pextent_t(p);
@@ -4593,6 +5004,7 @@ void BlueStore::_commit_bluefs_freespace(
 
 int BlueStore::_open_collections(int *errors)
 {
+  dout(10) << __func__ << dendl;
   assert(coll_map.empty());
   KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
   for (it->upper_bound(string());
@@ -4614,7 +5026,8 @@ int BlueStore::_open_collections(int *errors)
              << pretty_binary_string(it->key()) << dendl;
         return -EIO;
       }   
-      dout(20) << __func__ << " opened " << cid << " " << c << dendl;
+      dout(20) << __func__ << " opened " << cid << " " << c
+              << " " << c->cnode << dendl;
       coll_map[cid] = c;
     } else {
       derr << __func__ << " unrecognized collection " << it->key() << dendl;
@@ -4625,6 +5038,23 @@ int BlueStore::_open_collections(int *errors)
   return 0;
 }
 
+void BlueStore::_open_statfs()
+{
+  bufferlist bl;
+  int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
+  if (r >= 0) {
+    if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
+      auto it = bl.begin();
+      vstatfs.decode(it);
+    } else {
+      dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
+    }
+  }
+  else {
+    dout(10) << __func__ << " store_statfs missed, using empty" << dendl;
+  }
+}
+
 int BlueStore::_setup_block_symlink_or_file(
   string name,
   string epath,
@@ -4681,30 +5111,13 @@ int BlueStore::_setup_block_symlink_or_file(
        }
 
        if (cct->_conf->bluestore_block_preallocate_file) {
-#ifdef HAVE_POSIX_FALLOCATE
-         r = ::posix_fallocate(fd, 0, size);
-         if (r) {
+          r = ::ceph_posix_fallocate(fd, 0, size);
+          if (r > 0) {
            derr << __func__ << " failed to prefallocate " << name << " file to "
              << size << ": " << cpp_strerror(r) << dendl;
            VOID_TEMP_FAILURE_RETRY(::close(fd));
            return -r;
          }
-#else
-         char data[1024*128];
-         for (uint64_t off = 0; off < size; off += sizeof(data)) {
-           if (off + sizeof(data) > size)
-             r = ::write(fd, data, size - off);
-           else
-             r = ::write(fd, data, sizeof(data));
-           if (r < 0) {
-             r = -errno;
-             derr << __func__ << " failed to prefallocate w/ write " << name << " file to "
-               << size << ": " << cpp_strerror(r) << dendl;
-             VOID_TEMP_FAILURE_RETRY(::close(fd));
-             return r;
-           }
-         }
-#endif
        }
        dout(1) << __func__ << " resized " << name << " file to "
                << pretty_si_t(size) << "B" << dendl;
@@ -4819,6 +5232,28 @@ int BlueStore::mkfs()
   if (r < 0)
     goto out_close_fsid;
 
+  // choose min_alloc_size
+  if (cct->_conf->bluestore_min_alloc_size) {
+    min_alloc_size = cct->_conf->bluestore_min_alloc_size;
+  } else {
+    assert(bdev);
+    if (bdev->is_rotational()) {
+      min_alloc_size = cct->_conf->bluestore_min_alloc_size_hdd;
+    } else {
+      min_alloc_size = cct->_conf->bluestore_min_alloc_size_ssd;
+    }
+  }
+
+  // make sure min_alloc_size is power of 2 aligned.
+  if (!ISP2(min_alloc_size)) {
+    derr << __func__ << " min_alloc_size 0x"
+        << std::hex << min_alloc_size << std::dec
+        << " is not power of 2 aligned!"
+        << dendl;
+    r = -EINVAL;
+    goto out_close_bdev;
+  }
+
   r = _open_db(true);
   if (r < 0)
     goto out_close_bdev;
@@ -4836,18 +5271,6 @@ int BlueStore::mkfs()
       t->set(PREFIX_SUPER, "blobid_max", bl);
     }
 
-    // choose min_alloc_size
-    if (cct->_conf->bluestore_min_alloc_size) {
-      min_alloc_size = cct->_conf->bluestore_min_alloc_size;
-    } else {
-      assert(bdev);
-      if (bdev->is_rotational()) {
-       min_alloc_size = cct->_conf->bluestore_min_alloc_size_hdd;
-      } else {
-       min_alloc_size = cct->_conf->bluestore_min_alloc_size_ssd;
-      }
-    }
-    _set_alloc_sizes();
     {
       bufferlist bl;
       ::encode((uint64_t)min_alloc_size, bl);
@@ -4859,33 +5282,23 @@ int BlueStore::mkfs()
     db->submit_transaction_sync(t);
   }
 
-  r = _open_alloc();
-  if (r < 0)
-    goto out_close_fm;
 
   r = write_meta("kv_backend", cct->_conf->bluestore_kvbackend);
   if (r < 0)
-    goto out_close_alloc;
-  r = write_meta("bluefs", stringify((int)cct->_conf->bluestore_bluefs));
+    goto out_close_fm;
+
+  r = write_meta("bluefs", stringify(bluefs ? 1 : 0));
   if (r < 0)
-    goto out_close_alloc;
+    goto out_close_fm;
 
   if (fsid != old_fsid) {
     r = _write_fsid();
     if (r < 0) {
       derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
-      goto out_close_alloc;
+      goto out_close_fm;
     }
   }
 
-  // indicate success by writing the 'mkfs_done' file
-  r = write_meta("mkfs_done", "yes");
-  if (r < 0)
-    goto out_close_alloc;
-  dout(10) << __func__ << " success" << dendl;
-
- out_close_alloc:
-  _close_alloc();
  out_close_fm:
   _close_fm();
  out_close_db:
@@ -4907,8 +5320,16 @@ int BlueStore::mkfs()
       r = -EIO;
     }
   }
+
+  if (r == 0) {
+    // indicate success by writing the 'mkfs_done' file
+    r = write_meta("mkfs_done", "yes");
+  }
+
   if (r < 0) {
     derr << __func__ << " failed, " << cpp_strerror(r) << dendl;
+  } else {
+    dout(0) << __func__ << " success" << dendl;
   }
   return r;
 }
@@ -4929,6 +5350,8 @@ int BlueStore::_mount(bool kv_only)
 {
   dout(1) << __func__ << " path " << path << dendl;
 
+  _kv_only = kv_only;
+
   {
     string type;
     int r = read_meta("type", &type);
@@ -5006,10 +5429,7 @@ int BlueStore::_mount(bool kv_only)
       goto out_coll;
   }
 
-  for (auto f : finishers) {
-    f->start();
-  }
-  kv_sync_thread.create("bstore_kv_sync");
+  _kv_start();
 
   r = _deferred_replay();
   if (r < 0)
@@ -5017,18 +5437,13 @@ int BlueStore::_mount(bool kv_only)
 
   mempool_thread.init();
 
-
   mounted = true;
   return 0;
 
  out_stop:
   _kv_stop();
-  for (auto f : finishers) {
-    f->wait_for_empty();
-    f->stop();
-  }
  out_coll:
-  flush_cache();
+  _flush_cache();
  out_alloc:
   _close_alloc();
  out_fm:
@@ -5046,29 +5461,23 @@ int BlueStore::_mount(bool kv_only)
 
 int BlueStore::umount()
 {
-  assert(mounted);
+  assert(_kv_only || mounted);
   dout(1) << __func__ << dendl;
 
   _osr_drain_all();
   _osr_unregister_all();
 
-  mempool_thread.shutdown();
+  mounted = false;
+  if (!_kv_only) {
+    mempool_thread.shutdown();
+    dout(20) << __func__ << " stopping kv thread" << dendl;
+    _kv_stop();
+    _flush_cache();
+    dout(20) << __func__ << " closing" << dendl;
 
-  dout(20) << __func__ << " stopping kv thread" << dendl;
-  _kv_stop();
-  for (auto f : finishers) {
-    dout(20) << __func__ << " draining finisher" << dendl;
-    f->wait_for_empty();
-    dout(20) << __func__ << " stopping finisher" << dendl;
-    f->stop();
+    _close_alloc();
+    _close_fm();
   }
-  _reap_collections();
-  flush_cache();
-  dout(20) << __func__ << " closing" << dendl;
-
-  mounted = false;
-  _close_alloc();
-  _close_fm();
   _close_db();
   _close_bdev();
   _close_fsid();
@@ -5090,7 +5499,6 @@ static void apply(uint64_t off,
                   uint64_t len,
                   uint64_t granularity,
                   BlueStore::mempool_dynamic_bitset &bitset,
-                 const char *what,
                   std::function<void(uint64_t,
                                     BlueStore::mempool_dynamic_bitset &)> f) {
   auto end = ROUND_UP_TO(off + len, granularity);
@@ -5106,6 +5514,7 @@ int BlueStore::_fsck_check_extents(
   const PExtentVector& extents,
   bool compressed,
   mempool_dynamic_bitset &used_blocks,
+  uint64_t granularity,
   store_statfs_t& expected_statfs)
 {
   dout(30) << __func__ << " oid " << oid << " extents " << extents << dendl;
@@ -5119,8 +5528,9 @@ int BlueStore::_fsck_check_extents(
     }
     bool already = false;
     apply(
-      e.offset, e.length, block_size, used_blocks, __func__,
+      e.offset, e.length, granularity, used_blocks,
       [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
        if (bs.test(pos))
          already = true;
        else
@@ -5140,14 +5550,22 @@ int BlueStore::_fsck_check_extents(
   return errors;
 }
 
-int BlueStore::fsck(bool deep)
+int BlueStore::_fsck(bool deep, bool repair)
 {
-  dout(1) << __func__ << (deep ? " (deep)" : " (shallow)") << " start" << dendl;
+  dout(1) << __func__
+         << (repair ? " fsck" : " repair")
+         << (deep ? " (deep)" : " (shallow)") << " start" << dendl;
   int errors = 0;
-  mempool::bluestore_fsck::set<uint64_t> used_nids;
-  mempool::bluestore_fsck::set<uint64_t> used_omap_head;
+  int repaired = 0;
+
+  typedef btree::btree_set<
+    uint64_t,std::less<uint64_t>,
+    mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
+  uint64_t_btree_t used_nids;
+  uint64_t_btree_t used_omap_head;
+  uint64_t_btree_t used_sbids;
+
   mempool_dynamic_bitset used_blocks;
-  mempool::bluestore_fsck::set<uint64_t> used_sbids;
   KeyValueDB::Iterator it;
   store_statfs_t expected_statfs, actual_statfs;
   struct sb_info_t {
@@ -5209,14 +5627,18 @@ int BlueStore::fsck(bool deep)
 
   mempool_thread.init();
 
+  // we need finishers and kv_{sync,finalize}_thread *just* for replay
+  _kv_start();
   r = _deferred_replay();
+  _kv_stop();
   if (r < 0)
     goto out_scan;
 
-  used_blocks.resize(bdev->get_size() / block_size);
+  used_blocks.resize(fm->get_alloc_units());
   apply(
-    0, SUPER_RESERVED, block_size, used_blocks, "0~SUPER_RESERVED",
+    0, MAX(min_alloc_size, SUPER_RESERVED), fm->get_alloc_size(), used_blocks,
     [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
       bs.set(pos);
     }
   );
@@ -5224,8 +5646,9 @@ int BlueStore::fsck(bool deep)
   if (bluefs) {
     for (auto e = bluefs_extents.begin(); e != bluefs_extents.end(); ++e) {
       apply(
-        e.get_start(), e.get_len(), block_size, used_blocks, "bluefs",
+        e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
         [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
           bs.set(pos);
         }
        );
@@ -5252,11 +5675,14 @@ int BlueStore::fsck(bool deep)
     spg_t pgid;
     mempool::bluestore_fsck::list<string> expecting_shards;
     for (it->lower_bound(string()); it->valid(); it->next()) {
+      if (g_conf->bluestore_debug_fsck_abort) {
+       goto out_scan;
+      }
       dout(30) << " key " << pretty_binary_string(it->key()) << dendl;
       if (is_extent_shard_key(it->key())) {
        while (!expecting_shards.empty() &&
               expecting_shards.front() < it->key()) {
-         derr << __func__ << " error: missing shard key "
+         derr << "fsck error: missing shard key "
               << pretty_binary_string(expecting_shards.front())
               << dendl;
          ++errors;
@@ -5272,18 +5698,18 @@ int BlueStore::fsck(bool deep)
         uint32_t offset;
         string okey;
         get_key_extent_shard(it->key(), &okey, &offset);
-        derr << __func__ << " error: stray shard 0x" << std::hex << offset
+        derr << "fsck error: stray shard 0x" << std::hex << offset
             << std::dec << dendl;
         if (expecting_shards.empty()) {
-          derr << __func__ << " error: " << pretty_binary_string(it->key())
+          derr << "fsck error: " << pretty_binary_string(it->key())
                << " is unexpected" << dendl;
           ++errors;
           continue;
         }
        while (expecting_shards.front() > it->key()) {
-         derr << __func__ << " error:   saw " << pretty_binary_string(it->key())
+         derr << "fsck error:   saw " << pretty_binary_string(it->key())
               << dendl;
-         derr << __func__ << " error:   exp "
+         derr << "fsck error:   exp "
               << pretty_binary_string(expecting_shards.front()) << dendl;
          ++errors;
          expecting_shards.pop_front();
@@ -5297,7 +5723,7 @@ int BlueStore::fsck(bool deep)
       ghobject_t oid;
       int r = get_key_object(it->key(), &oid);
       if (r < 0) {
-        derr << __func__ << " error: bad object key "
+        derr << "fsck error: bad object key "
              << pretty_binary_string(it->key()) << dendl;
        ++errors;
        continue;
@@ -5317,18 +5743,19 @@ int BlueStore::fsck(bool deep)
          }
        }
        if (!c) {
-          derr << __func__ << " error: stray object " << oid
+          derr << "fsck error: stray object " << oid
                << " not owned by any collection" << dendl;
          ++errors;
          continue;
        }
        c->cid.is_pg(&pgid);
-       dout(20) << __func__ << "  collection " << c->cid << dendl;
+       dout(20) << __func__ << "  collection " << c->cid << " " << c->cnode
+                << dendl;
       }
 
       if (!expecting_shards.empty()) {
        for (auto &k : expecting_shards) {
-         derr << __func__ << " error: missing shard key "
+         derr << "fsck error: missing shard key "
               << pretty_binary_string(k) << dendl;
        }
        ++errors;
@@ -5340,12 +5767,12 @@ int BlueStore::fsck(bool deep)
       OnodeRef o = c->get_onode(oid, false);
       if (o->onode.nid) {
        if (o->onode.nid > nid_max) {
-         derr << __func__ << " error: " << oid << " nid " << o->onode.nid
+         derr << "fsck error: " << oid << " nid " << o->onode.nid
               << " > nid_max " << nid_max << dendl;
          ++errors;
        }
        if (used_nids.count(o->onode.nid)) {
-         derr << __func__ << " error: " << oid << " nid " << o->onode.nid
+         derr << "fsck error: " << oid << " nid " << o->onode.nid
               << " already in use" << dendl;
          ++errors;
          continue; // go for next object
@@ -5367,7 +5794,7 @@ int BlueStore::fsck(bool deep)
        get_extent_shard_key(o->key, s.shard_info->offset,
                             &expecting_shards.back());
        if (s.shard_info->offset >= o->onode.size) {
-         derr << __func__ << " error: " << oid << " shard 0x" << std::hex
+         derr << "fsck error: " << oid << " shard 0x" << std::hex
               << s.shard_info->offset << " past EOF at 0x" << o->onode.size
               << std::dec << dendl;
          ++errors;
@@ -5381,14 +5808,14 @@ int BlueStore::fsck(bool deep)
       for (auto& l : o->extent_map.extent_map) {
        dout(20) << __func__ << "    " << l << dendl;
        if (l.logical_offset < pos) {
-         derr << __func__ << " error: " << oid << " lextent at 0x"
+         derr << "fsck error: " << oid << " lextent at 0x"
               << std::hex << l.logical_offset
               << " overlaps with the previous, which ends at 0x" << pos
               << std::dec << dendl;
          ++errors;
        }
        if (o->extent_map.spans_shard(l.logical_offset, l.length)) {
-         derr << __func__ << " error: " << oid << " lextent at 0x"
+         derr << "fsck error: " << oid << " lextent at 0x"
               << std::hex << l.logical_offset << "~" << l.length
               << " spans a shard boundary"
               << std::dec << dendl;
@@ -5434,7 +5861,7 @@ int BlueStore::fsck(bool deep)
                 << std::dec << " for " << *i.first << dendl;
        const bluestore_blob_t& blob = i.first->get_blob();
        if (i.second & blob.unused) {
-         derr << __func__ << " error: " << oid << " blob claims unused 0x"
+         derr << "fsck error: " << oid << " blob claims unused 0x"
               << std::hex << blob.unused
               << " but extents reference 0x" << i.second
               << " on blob " << *i.first << dendl;
@@ -5456,7 +5883,7 @@ int BlueStore::fsck(bool deep)
            if ((blob.unused & mask) == mask) {
              // this csum chunk region is marked unused
              if (blob.get_csum_item(p) != 0) {
-               derr << __func__ << " error: " << oid
+               derr << "fsck error: " << oid
                     << " blob claims csum chunk 0x" << std::hex << pos
                     << "~" << csum_chunk_size
                     << " is unused (mask 0x" << mask << " of unused 0x"
@@ -5474,7 +5901,7 @@ int BlueStore::fsck(bool deep)
        const bluestore_blob_t& blob = i.first->get_blob();
        bool equal = i.first->get_blob_use_tracker().equal(i.second);
        if (!equal) {
-         derr << __func__ << " error: " << oid << " blob " << *i.first
+         derr << "fsck error: " << oid << " blob " << *i.first
               << " doesn't match expected ref_map " << i.second << dendl;
          ++errors;
        }
@@ -5485,12 +5912,12 @@ int BlueStore::fsck(bool deep)
        }
        if (blob.is_shared()) {
          if (i.first->shared_blob->get_sbid() > blobid_max) {
-           derr << __func__ << " error: " << oid << " blob " << blob
+           derr << "fsck error: " << oid << " blob " << blob
                 << " sbid " << i.first->shared_blob->get_sbid() << " > blobid_max "
                 << blobid_max << dendl;
            ++errors;
          } else if (i.first->shared_blob->get_sbid() == 0) {
-            derr << __func__ << " error: " << oid << " blob " << blob
+            derr << "fsck error: " << oid << " blob " << blob
                  << " marked as shared but has uninitialized sbid"
                  << dendl;
             ++errors;
@@ -5508,6 +5935,7 @@ int BlueStore::fsck(bool deep)
          errors += _fsck_check_extents(oid, blob.get_extents(),
                                        blob.is_compressed(),
                                        used_blocks,
+                                       fm->get_alloc_size(),
                                        expected_statfs);
         }
       }
@@ -5516,21 +5944,20 @@ int BlueStore::fsck(bool deep)
        int r = _do_read(c.get(), o, 0, o->onode.size, bl, 0);
        if (r < 0) {
          ++errors;
-         derr << __func__ << " error: " << oid << " error during read: "
+         derr << "fsck error: " << oid << " error during read: "
               << cpp_strerror(r) << dendl;
        }
       }
       // omap
       if (o->onode.has_omap()) {
        if (used_omap_head.count(o->onode.nid)) {
-         derr << __func__ << " error: " << oid << " omap_head " << o->onode.nid
+         derr << "fsck error: " << oid << " omap_head " << o->onode.nid
               << " already in use" << dendl;
          ++errors;
        } else {
          used_omap_head.insert(o->onode.nid);
        }
       }
-      c->trim_cache();
     }
   }
   dout(1) << __func__ << " checking shared_blobs" << dendl;
@@ -5540,14 +5967,14 @@ int BlueStore::fsck(bool deep)
       string key = it->key();
       uint64_t sbid;
       if (get_key_shared_blob(key, &sbid)) {
-       derr << __func__ << " error: bad key '" << key
+       derr << "fsck error: bad key '" << key
             << "' in shared blob namespace" << dendl;
        ++errors;
        continue;
       }
       auto p = sb_info.find(sbid);
       if (p == sb_info.end()) {
-       derr << __func__ << " error: found stray shared blob data for sbid 0x"
+       derr << "fsck error: found stray shared blob data for sbid 0x"
             << std::hex << sbid << std::dec << dendl;
        ++errors;
       } else {
@@ -5559,7 +5986,7 @@ int BlueStore::fsck(bool deep)
        ::decode(shared_blob, blp);
        dout(20) << __func__ << "  " << *sbi.sb << " " << shared_blob << dendl;
        if (shared_blob.ref_map != sbi.ref_map) {
-         derr << __func__ << " error: shared blob 0x" << std::hex << sbid
+         derr << "fsck error: shared blob 0x" << std::hex << sbid
               << std::dec << " ref_map " << shared_blob.ref_map
               << " != expected " << sbi.ref_map << dendl;
          ++errors;
@@ -5571,18 +5998,20 @@ int BlueStore::fsck(bool deep)
        errors += _fsck_check_extents(p->second.oids.front(),
                                      extents,
                                      p->second.compressed,
-                                     used_blocks, expected_statfs);
+                                     used_blocks,
+                                     fm->get_alloc_size(),
+                                     expected_statfs);
        sb_info.erase(p);
       }
     }
   }
   for (auto &p : sb_info) {
-    derr << __func__ << " error: shared_blob 0x" << p.first
+    derr << "fsck error: shared_blob 0x" << p.first
         << " key is missing (" << *p.second.sb << ")" << dendl;
     ++errors;
   }
   if (!(actual_statfs == expected_statfs)) {
-    derr << __func__ << " error: actual " << actual_statfs
+    derr << "fsck error: actual " << actual_statfs
         << " != expected " << expected_statfs << dendl;
     ++errors;
   }
@@ -5594,7 +6023,7 @@ int BlueStore::fsck(bool deep)
       uint64_t omap_head;
       _key_decode_u64(it->key().c_str(), &omap_head);
       if (used_omap_head.count(omap_head) == 0) {
-       derr << __func__ << " error: found stray omap data on omap_head "
+       derr << "fsck error: found stray omap data on omap_head "
             << omap_head << dendl;
        ++errors;
       }
@@ -5611,7 +6040,7 @@ int BlueStore::fsck(bool deep)
       try {
        ::decode(wt, p);
       } catch (buffer::error& e) {
-       derr << __func__ << " error: failed to decode deferred txn "
+       derr << "fsck error: failed to decode deferred txn "
             << pretty_binary_string(it->key()) << dendl;
        r = -EIO;
         goto out_scan;
@@ -5621,8 +6050,9 @@ int BlueStore::fsck(bool deep)
               << " released 0x" << std::hex << wt.released << std::dec << dendl;
       for (auto e = wt.released.begin(); e != wt.released.end(); ++e) {
         apply(
-          e.get_start(), e.get_len(), block_size, used_blocks, "deferred",
+          e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
           [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
             bs.set(pos);
           }
         );
@@ -5636,8 +6066,9 @@ int BlueStore::fsck(bool deep)
     // know they are allocated.
     for (auto e = bluefs_extents.begin(); e != bluefs_extents.end(); ++e) {
       apply(
-        e.get_start(), e.get_len(), block_size, used_blocks, "bluefs_extents",
+        e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
         [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
          bs.reset(pos);
         }
       );
@@ -5647,8 +6078,9 @@ int BlueStore::fsck(bool deep)
     while (fm->enumerate_next(&offset, &length)) {
       bool intersects = false;
       apply(
-        offset, length, block_size, used_blocks, "free",
+        offset, length, fm->get_alloc_size(), used_blocks,
         [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+         assert(pos < bs.size());
           if (bs.test(pos)) {
             intersects = true;
           } else {
@@ -5657,25 +6089,53 @@ int BlueStore::fsck(bool deep)
         }
       );
       if (intersects) {
-       derr << __func__ << " error: free extent 0x" << std::hex << offset
-            << "~" << length << std::dec
-            << " intersects allocated blocks" << dendl;
-       ++errors;
+       if (offset == SUPER_RESERVED &&
+           length == min_alloc_size - SUPER_RESERVED) {
+         // this is due to the change just after luminous to min_alloc_size
+         // granularity allocations, and our baked in assumption at the top
+         // of _fsck that 0~ROUND_UP_TO(SUPER_RESERVED,min_alloc_size) is used
+         // (vs luminous's ROUND_UP_TO(SUPER_RESERVED,block_size)).  harmless,
+         // since we will never allocate this region below min_alloc_size.
+         dout(10) << __func__ << " ignoring free extent between SUPER_RESERVED"
+                  << " and min_alloc_size, 0x" << std::hex << offset << "~"
+                  << length << dendl;
+       } else {
+         derr << "fsck error: free extent 0x" << std::hex << offset
+              << "~" << length << std::dec
+              << " intersects allocated blocks" << dendl;
+         ++errors;
+       }
       }
     }
+    fm->enumerate_reset();
     size_t count = used_blocks.count();
     if (used_blocks.size() != count) {
       assert(used_blocks.size() > count);
-      derr << __func__ << " error: leaked some space;"
-          << (used_blocks.size() - count) * min_alloc_size
-          << " bytes leaked" << dendl;
       ++errors;
+      used_blocks.flip();
+      size_t start = used_blocks.find_first();
+      while (start != decltype(used_blocks)::npos) {
+       size_t cur = start;
+       while (true) {
+         size_t next = used_blocks.find_next(cur);
+         if (next != cur + 1) {
+           derr << "fsck error: leaked extent 0x" << std::hex
+                << ((uint64_t)start * fm->get_alloc_size()) << "~"
+                << ((cur + 1 - start) * fm->get_alloc_size()) << std::dec
+                << dendl;
+           start = next;
+           break;
+         }
+         cur = next;
+       }
+      }
+      used_blocks.flip();
     }
   }
 
  out_scan:
   mempool_thread.shutdown();
-  flush_cache();
+  _flush_cache();
  out_alloc:
   _close_alloc();
  out_fm:
@@ -5704,9 +6164,10 @@ int BlueStore::fsck(bool deep)
          << dendl;
 
   utime_t duration = ceph_clock_now() - start;
-  dout(1) << __func__ << " finish with " << errors << " errors in "
+  dout(1) << __func__ << " finish with " << errors << " errors, " << repaired
+         << " repaired, " << (errors - repaired) << " remaining in "
          << duration << " seconds" << dendl;
-  return errors;
+  return errors - repaired;
 }
 
 void BlueStore::collect_metadata(map<string,string> *pm)
@@ -5729,38 +6190,26 @@ int BlueStore::statfs(struct store_statfs_t *buf)
   buf->available = alloc->get_free();
 
   if (bluefs) {
-    // part of our shared device is "free" according to BlueFS
-    // Don't include bluestore_bluefs_min because that space can't
-    // be used for any other purpose.
-    buf->available += bluefs->get_free(bluefs_shared_bdev) - cct->_conf->bluestore_bluefs_min;
-
-    // include dedicated db, too, if that isn't the shared device.
-    if (bluefs_shared_bdev != BlueFS::BDEV_DB) {
-      buf->total += bluefs->get_total(BlueFS::BDEV_DB);
+    // part of our shared device is "free" according to BlueFS, but we
+    // can't touch bluestore_bluefs_min of it.
+    int64_t shared_available = std::min(
+      bluefs->get_free(bluefs_shared_bdev),
+      bluefs->get_total(bluefs_shared_bdev) - cct->_conf->bluestore_bluefs_min);
+    if (shared_available > 0) {
+      buf->available += shared_available;
     }
   }
 
-  bufferlist bl;
-  int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
-  if (r >= 0) {
-       TransContext::volatile_statfs vstatfs;
-     if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
-       auto it = bl.begin();
-       vstatfs.decode(it);
-
-       buf->allocated = vstatfs.allocated();
-       buf->stored = vstatfs.stored();
-       buf->compressed = vstatfs.compressed();
-       buf->compressed_original = vstatfs.compressed_original();
-       buf->compressed_allocated = vstatfs.compressed_allocated();
-     } else {
-       dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
-     }
-  } else {
-    dout(10) << __func__ << " store_statfs missed, using empty" << dendl;
+  {
+    std::lock_guard<std::mutex> l(vstatfs_lock);
+    
+    buf->allocated = vstatfs.allocated();
+    buf->stored = vstatfs.stored();
+    buf->compressed = vstatfs.compressed();
+    buf->compressed_original = vstatfs.compressed_original();
+    buf->compressed_allocated = vstatfs.compressed_allocated();
   }
 
-
   dout(20) << __func__ << *buf << dendl;
   return 0;
 }
@@ -5780,23 +6229,26 @@ BlueStore::CollectionRef BlueStore::_get_collection(const coll_t& cid)
 void BlueStore::_queue_reap_collection(CollectionRef& c)
 {
   dout(10) << __func__ << " " << c << " " << c->cid << dendl;
-  std::lock_guard<std::mutex> l(reap_lock);
+  // _reap_collections and this in the same thread,
+  // so no need a lock.
   removed_collections.push_back(c);
 }
 
 void BlueStore::_reap_collections()
 {
+
   list<CollectionRef> removed_colls;
   {
-    std::lock_guard<std::mutex> l(reap_lock);
-    removed_colls.swap(removed_collections);
+    // _queue_reap_collection and this in the same thread.
+    // So no need a lock.
+    if (!removed_collections.empty())
+      removed_colls.swap(removed_collections);
+    else
+      return;
   }
 
-  bool all_reaped = true;
-
-  for (list<CollectionRef>::iterator p = removed_colls.begin();
-       p != removed_colls.end();
-       ++p) {
+  list<CollectionRef>::iterator p = removed_colls.begin();
+  while (p != removed_colls.end()) {
     CollectionRef c = *p;
     dout(10) << __func__ << " " << c << " " << c->cid << dendl;
     if (c->onode_map.map_any([&](OnodeRef o) {
@@ -5804,19 +6256,21 @@ void BlueStore::_reap_collections()
          if (o->flushing_count.load()) {
            dout(10) << __func__ << " " << c << " " << c->cid << " " << o->oid
                     << " flush_txns " << o->flushing_count << dendl;
-           return false;
+           return true;
          }
-         return true;
+         return false;
        })) {
-      all_reaped = false;
+      ++p;
       continue;
     }
     c->onode_map.clear();
+    p = removed_colls.erase(p);
     dout(10) << __func__ << " " << c << " " << c->cid << " done" << dendl;
   }
-
-  if (all_reaped) {
+  if (removed_colls.empty()) {
     dout(10) << __func__ << " all reaped" << dendl;
+  } else {
+    removed_collections.splice(removed_collections.begin(), removed_colls);
   }
 }
 
@@ -5870,7 +6324,6 @@ bool BlueStore::exists(CollectionHandle &c_, const ghobject_t& oid)
       r = false;
   }
 
-  c->trim_cache();
   return r;
 }
 
@@ -5908,7 +6361,6 @@ int BlueStore::stat(
     st->st_nlink = 1;
   }
 
-  c->trim_cache();
   int r = 0;
   if (_debug_mdata_eio(oid)) {
     r = -EIO;
@@ -5938,13 +6390,12 @@ int BlueStore::read(
   uint64_t offset,
   size_t length,
   bufferlist& bl,
-  uint32_t op_flags,
-  bool allow_eio)
+  uint32_t op_flags)
 {
   CollectionHandle c = _get_collection(cid);
   if (!c)
     return -ENOENT;
-  return read(c, oid, offset, length, bl, op_flags, allow_eio);
+  return read(c, oid, offset, length, bl, op_flags);
 }
 
 int BlueStore::read(
@@ -5953,8 +6404,7 @@ int BlueStore::read(
   uint64_t offset,
   size_t length,
   bufferlist& bl,
-  uint32_t op_flags,
-  bool allow_eio)
+  uint32_t op_flags)
 {
   utime_t start = ceph_clock_now();
   Collection *c = static_cast<Collection *>(c_.get());
@@ -5981,14 +6431,19 @@ int BlueStore::read(
       length = o->onode.size;
 
     r = _do_read(c, o, offset, length, bl, op_flags);
+    if (r == -EIO) {
+      logger->inc(l_bluestore_read_eio);
+    }
   }
 
  out:
-  assert(allow_eio || r != -EIO);
-  c->trim_cache();
-  if (r == 0 && _debug_data_eio(oid)) {
+  if (r >= 0 && _debug_data_eio(oid)) {
     r = -EIO;
     derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
+  } else if (cct->_conf->bluestore_debug_random_read_err &&
+    (rand() % (int)(cct->_conf->bluestore_debug_random_read_err * 100.0)) == 0) {
+    dout(0) << __func__ << ": inject random EIO" << dendl;
+    r = -EIO;
   }
   dout(10) << __func__ << " " << cid << " " << oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
@@ -6036,7 +6491,6 @@ int BlueStore::_do_read(
   uint32_t op_flags)
 {
   FUNCTRACE();
-  boost::intrusive::set<Extent>::iterator ep, eend;
   int r = 0;
 
   dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
@@ -6089,7 +6543,7 @@ int BlueStore::_do_read(
       pos += hole;
       left -= hole;
     }
-    BlobRef bptr = lp->blob;
+    BlobRef& bptr = lp->blob;
     unsigned l_off = pos - lp->logical_offset;
     unsigned b_off = l_off + lp->blob_offset;
     unsigned b_len = std::min(left, lp->length - l_off);
@@ -6137,9 +6591,9 @@ int BlueStore::_do_read(
                                     // measure the whole block below.
                                     // The error isn't that much...
   vector<bufferlist> compressed_blob_bls;
-  IOContext ioc(cct, NULL);
+  IOContext ioc(cct, NULL, true); // allow EIO
   for (auto& p : blobs2read) {
-    BlobRef bptr = p.first;
+    const BlobRef& bptr = p.first;
     dout(20) << __func__ << "  blob " << *bptr << std::hex
             << " need " << p.second << std::dec << dendl;
     if (bptr->get_blob().is_compressed()) {
@@ -6164,7 +6618,14 @@ int BlueStore::_do_read(
             return r;
           return 0;
        });
+      if (r < 0) {
+        derr << __func__ << " bdev-read failed: " << cpp_strerror(r) << dendl;
+        if (r == -EIO) {
+          // propagate EIO to caller
+          return r;
+        }
         assert(r == 0);
+      }
     } else {
       // read the pieces
       for (auto& reg : p.second) {
@@ -6202,7 +6663,15 @@ int BlueStore::_do_read(
               return r;
             return 0;
          });
-       assert(r == 0);
+        if (r < 0) {
+          derr << __func__ << " bdev-read failed: " << cpp_strerror(r)
+               << dendl;
+          if (r == -EIO) {
+            // propagate EIO to caller
+            return r;
+          }
+          assert(r == 0);
+        }
        assert(reg.bl.length() == r_len);
       }
     }
@@ -6211,6 +6680,11 @@ int BlueStore::_do_read(
     bdev->aio_submit(&ioc);
     dout(20) << __func__ << " waiting for aio" << dendl;
     ioc.aio_wait();
+    r = ioc.get_return_value();
+    if (r < 0) {
+      assert(r == -EIO); // no other errors allowed
+      return -EIO;
+    }
   }
   logger->tinc(l_bluestore_read_wait_aio_lat, ceph_clock_now() - start);
 
@@ -6218,7 +6692,7 @@ int BlueStore::_do_read(
   auto p = compressed_blob_bls.begin();
   blobs2read_t::iterator b2r_it = blobs2read.begin();
   while (b2r_it != blobs2read.end()) {
-    BlobRef bptr = b2r_it->first;
+    const BlobRef& bptr = b2r_it->first;
     dout(20) << __func__ << "  blob " << *bptr << std::hex
             << " need 0x" << b2r_it->second << std::dec << dendl;
     if (bptr->get_blob().is_compressed()) {
@@ -6425,7 +6899,6 @@ int BlueStore::_fiemap(
   }
 
  out:
-  c->trim_cache();
   dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
           << " size = 0x(" << destset << ")" << std::dec << dendl;
   return 0;
@@ -6513,7 +6986,7 @@ int BlueStore::getattr(
   int r;
   {
     RWLock::RLocker l(c->lock);
-    mempool::bluestore_meta_other::string k(name);
+    mempool::bluestore_cache_other::string k(name);
 
     OnodeRef o = c->get_onode(oid, false);
     if (!o || !o->exists) {
@@ -6529,7 +7002,6 @@ int BlueStore::getattr(
     r = 0;
   }
  out:
-  c->trim_cache();
   if (r == 0 && _debug_mdata_eio(oid)) {
     r = -EIO;
     derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
@@ -6577,7 +7049,6 @@ int BlueStore::getattrs(
   }
 
  out:
-  c->trim_cache();
   if (r == 0 && _debug_mdata_eio(oid)) {
     r = -EIO;
     derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
@@ -6654,7 +7125,6 @@ int BlueStore::collection_list(
     r = _collection_list(c, start, end, max, ls, pnext);
   }
 
-  c->trim_cache();
   dout(10) << __func__ << " " << c->cid
     << " start " << start << " end " << end << " max " << max
     << " = " << r << ", ls.size() = " << ls->size()
@@ -6774,91 +7244,6 @@ out:
   return r;
 }
 
-// omap reads
-
-BlueStore::OmapIteratorImpl::OmapIteratorImpl(
-  CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
-  : c(c), o(o), it(it)
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    get_omap_key(o->onode.nid, string(), &head);
-    get_omap_tail(o->onode.nid, &tail);
-    it->lower_bound(head);
-  }
-}
-
-int BlueStore::OmapIteratorImpl::seek_to_first()
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    it->lower_bound(head);
-  } else {
-    it = KeyValueDB::Iterator();
-  }
-  return 0;
-}
-
-int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    string key;
-    get_omap_key(o->onode.nid, after, &key);
-    it->upper_bound(key);
-  } else {
-    it = KeyValueDB::Iterator();
-  }
-  return 0;
-}
-
-int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    string key;
-    get_omap_key(o->onode.nid, to, &key);
-    it->lower_bound(key);
-  } else {
-    it = KeyValueDB::Iterator();
-  }
-  return 0;
-}
-
-bool BlueStore::OmapIteratorImpl::valid()
-{
-  RWLock::RLocker l(c->lock);
-  return o->onode.has_omap() && it && it->valid() && it->raw_key().second <= tail;
-}
-
-int BlueStore::OmapIteratorImpl::next(bool validate)
-{
-  RWLock::RLocker l(c->lock);
-  if (o->onode.has_omap()) {
-    it->next();
-    return 0;
-  } else {
-    return -1;
-  }
-}
-
-string BlueStore::OmapIteratorImpl::key()
-{
-  RWLock::RLocker l(c->lock);
-  assert(it->valid());
-  string db_key = it->raw_key().second;
-  string user_key;
-  decode_omap_key(db_key, &user_key);
-  return user_key;
-}
-
-bufferlist BlueStore::OmapIteratorImpl::value()
-{
-  RWLock::RLocker l(c->lock);
-  assert(it->valid());
-  return it->value();
-}
-
 int BlueStore::omap_get(
   const coll_t& cid,                ///< [in] Collection containing oid
   const ghobject_t &oid,   ///< [in] Object containing omap
@@ -7318,6 +7703,8 @@ int BlueStore::_open_super_meta()
       uint64_t val;
       ::decode(val, p);
       min_alloc_size = val;
+      min_alloc_size_order = ctz(val);
+      assert(min_alloc_size == 1u << min_alloc_size_order);
     } catch (buffer::error& e) {
       derr << __func__ << " unable to read min_alloc_size" << dendl;
       return -EIO;
@@ -7325,6 +7712,7 @@ int BlueStore::_open_super_meta()
     dout(10) << __func__ << " min_alloc_size 0x" << std::hex << min_alloc_size
             << std::dec << dendl;
   }
+  _open_statfs();
   _set_alloc_sizes();
   _set_throttle_params();
 
@@ -7378,12 +7766,15 @@ int BlueStore::_upgrade_super()
 
 void BlueStore::_assign_nid(TransContext *txc, OnodeRef o)
 {
-  if (o->onode.nid)
+  if (o->onode.nid) {
+    assert(o->exists);
     return;
+  }
   uint64_t nid = ++nid_last;
   dout(20) << __func__ << " " << nid << dendl;
   o->onode.nid = nid;
   txc->last_nid = nid;
+  o->exists = true;
 }
 
 uint64_t BlueStore::_assign_blobid(TransContext *txc)
@@ -7438,6 +7829,11 @@ void BlueStore::_txc_update_store_statfs(TransContext *txc)
   logger->inc(l_bluestore_compressed_allocated, txc->statfs_delta.compressed_allocated());
   logger->inc(l_bluestore_compressed_original, txc->statfs_delta.compressed_original());
 
+  {
+    std::lock_guard<std::mutex> l(vstatfs_lock);
+    vstatfs += txc->statfs_delta;
+  }
+
   bufferlist bl;
   txc->statfs_delta.encode(bl);
 
@@ -7496,7 +7892,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
                   << dendl;
        } else {
          txc->state = TransContext::STATE_KV_SUBMITTED;
-         int r = db->submit_transaction(txc->t);
+         int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
          assert(r == 0);
          _txc_applied_kv(txc);
        }
@@ -7509,6 +7905,9 @@ void BlueStore::_txc_state_proc(TransContext *txc)
          kv_queue_unsubmitted.push_back(txc);
          ++txc->osr->kv_committing_serially;
        }
+       if (txc->had_ios)
+         kv_ios++;
+       kv_throttle_costs += txc->cost;
       }
       return;
     case TransContext::STATE_KV_SUBMITTED:
@@ -7559,6 +7958,9 @@ void BlueStore::_txc_finish_io(TransContext *txc)
   std::lock_guard<std::mutex> l(osr->qlock);
   txc->state = TransContext::STATE_IO_DONE;
 
+  // release aio contexts (including pinned buffers).
+  txc->ioc.running_aios.clear();
+
   OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
   while (p != osr->q.begin()) {
     --p;
@@ -7662,7 +8064,7 @@ void BlueStore::_txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t)
       bufferlist bl;
       ::encode(*(sb->persistent), bl);
       dout(20) << "  shared_blob 0x" << std::hex << sbid << std::dec
-              << " is " << bl.length() << dendl;
+              << " is " << bl.length() << " " << *sb << dendl;
       t->set(PREFIX_SHARED_BLOB, key, bl);
     }
   }
@@ -7782,8 +8184,8 @@ void BlueStore::_txc_finish(TransContext *txc)
   }
 
   OpSequencerRef osr = txc->osr;
-  CollectionRef c;
   bool empty = false;
+  bool submit_deferred = false;
   OpSequencer::q_list_t releasing_txc;
   {
     std::lock_guard<std::mutex> l(osr->qlock);
@@ -7799,12 +8201,13 @@ void BlueStore::_txc_finish(TransContext *txc)
          // for _osr_drain_preceding()
           notify = true;
        }
+       if (txc->state == TransContext::STATE_DEFERRED_QUEUED &&
+           osr->q.size() > g_conf->bluestore_max_deferred_txc) {
+         submit_deferred = true;
+       }
         break;
       }
 
-      if (!c && txc->first_collection) {
-        c = txc->first_collection;
-      }
       osr->q.pop_front();
       releasing_txc.push_back(*txc);
       notify = true;
@@ -7828,11 +8231,12 @@ void BlueStore::_txc_finish(TransContext *txc)
     delete txc;
   }
 
-  if (c) {
-    c->trim_cache();
+  if (submit_deferred) {
+    // we're pinning memory; flush!  we could be more fine-grained here but
+    // i'm not sure it's worth the bother.
+    deferred_try_submit();
   }
 
-
   if (empty && osr->zombie) {
     dout(10) << __func__ << " reaping empty zombie osr " << osr << dendl;
     osr->_unregister();
@@ -7843,7 +8247,8 @@ void BlueStore::_txc_release_alloc(TransContext *txc)
 {
   // update allocator with full released set
   if (!cct->_conf->bluestore_debug_no_reuse_blocks) {
-    dout(10) << __func__ << " " << txc << " " << txc->released << dendl;
+    dout(10) << __func__ << " " << txc << " " << std::hex
+             << txc->released << std::dec << dendl;
     for (interval_set<uint64_t>::iterator p = txc->released.begin();
         p != txc->released.end();
         ++p) {
@@ -7862,9 +8267,11 @@ void BlueStore::_osr_drain_preceding(TransContext *txc)
   ++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
   {
     // submit anything pending
-    std::lock_guard<std::mutex> l(deferred_lock);
+    deferred_lock.lock();
     if (osr->deferred_pending) {
-      _deferred_submit(osr);
+      _deferred_submit_unlock(osr);
+    } else {
+      deferred_lock.unlock();
     }
   }
   {
@@ -7891,14 +8298,17 @@ void BlueStore::_osr_drain_all()
   ++deferred_aggressive;
   {
     // submit anything pending
-    std::lock_guard<std::mutex> l(deferred_lock);
-    _deferred_try_submit();
+    deferred_try_submit();
   }
   {
     // wake up any previously finished deferred events
     std::lock_guard<std::mutex> l(kv_lock);
     kv_cond.notify_one();
   }
+  {
+    std::lock_guard<std::mutex> l(kv_finalize_lock);
+    kv_finalize_cond.notify_one();
+  }
   for (auto osr : s) {
     dout(20) << __func__ << " drain " << osr << dendl;
     osr->drain();
@@ -7934,10 +8344,87 @@ void BlueStore::_osr_unregister_all()
   }
 }
 
+void BlueStore::_kv_start()
+{
+  dout(10) << __func__ << dendl;
+
+  if (cct->_conf->bluestore_shard_finishers) {
+    if (cct->_conf->osd_op_num_shards) {
+      m_finisher_num = cct->_conf->osd_op_num_shards;
+    } else {
+      assert(bdev);
+      if (bdev->is_rotational()) {
+        m_finisher_num = cct->_conf->osd_op_num_shards_hdd;
+      } else {
+        m_finisher_num = cct->_conf->osd_op_num_shards_ssd;
+      }
+    }
+  }
+
+  assert(m_finisher_num != 0);
+
+  for (int i = 0; i < m_finisher_num; ++i) {
+    ostringstream oss;
+    oss << "finisher-" << i;
+    Finisher *f = new Finisher(cct, oss.str(), "finisher");
+    finishers.push_back(f);
+  }
+
+  deferred_finisher.start();
+  for (auto f : finishers) {
+    f->start();
+  }
+  kv_sync_thread.create("bstore_kv_sync");
+  kv_finalize_thread.create("bstore_kv_final");
+}
+
+void BlueStore::_kv_stop()
+{
+  dout(10) << __func__ << dendl;
+  {
+    std::unique_lock<std::mutex> l(kv_lock);
+    while (!kv_sync_started) {
+      kv_cond.wait(l);
+    }
+    kv_stop = true;
+    kv_cond.notify_all();
+  }
+  {
+    std::unique_lock<std::mutex> l(kv_finalize_lock);
+    while (!kv_finalize_started) {
+      kv_finalize_cond.wait(l);
+    }
+    kv_finalize_stop = true;
+    kv_finalize_cond.notify_all();
+  }
+  kv_sync_thread.join();
+  kv_finalize_thread.join();
+  assert(removed_collections.empty());
+  {
+    std::lock_guard<std::mutex> l(kv_lock);
+    kv_stop = false;
+  }
+  {
+    std::lock_guard<std::mutex> l(kv_finalize_lock);
+    kv_finalize_stop = false;
+  }
+  dout(10) << __func__ << " stopping finishers" << dendl;
+  deferred_finisher.wait_for_empty();
+  deferred_finisher.stop();
+  for (auto f : finishers) {
+    f->wait_for_empty();
+    f->stop();
+  }
+  dout(10) << __func__ << " stopped" << dendl;
+}
+
 void BlueStore::_kv_sync_thread()
 {
   dout(10) << __func__ << " start" << dendl;
   std::unique_lock<std::mutex> l(kv_lock);
+  assert(!kv_sync_started);
+  kv_sync_started = true;
+  kv_cond.notify_all();
   while (true) {
     assert(kv_committing.empty());
     if (kv_queue.empty() &&
@@ -7951,6 +8438,8 @@ void BlueStore::_kv_sync_thread()
     } else {
       deque<TransContext*> kv_submitting;
       deque<DeferredBatch*> deferred_done, deferred_stable;
+      uint64_t aios = 0, costs = 0;
+
       dout(20) << __func__ << " committing " << kv_queue.size()
               << " submitting " << kv_queue_unsubmitted.size()
               << " deferred done " << deferred_done_queue.size()
@@ -7960,6 +8449,10 @@ void BlueStore::_kv_sync_thread()
       kv_submitting.swap(kv_queue_unsubmitted);
       deferred_done.swap(deferred_done_queue);
       deferred_stable.swap(deferred_stable_queue);
+      aios = kv_ios;
+      costs = kv_throttle_costs;
+      kv_ios = 0;
+      kv_throttle_costs = 0;
       utime_t start = ceph_clock_now();
       l.unlock();
 
@@ -7968,20 +8461,13 @@ void BlueStore::_kv_sync_thread()
       dout(30) << __func__ << " deferred_done " << deferred_done << dendl;
       dout(30) << __func__ << " deferred_stable " << deferred_stable << dendl;
 
-      int num_aios = 0;
-      for (auto txc : kv_committing) {
-       if (txc->had_ios) {
-         ++num_aios;
-       }
-      }
-
       bool force_flush = false;
       // if bluefs is sharing the same device as data (only), then we
       // can rely on the bluefs commit to flush the device and make
       // deferred aios stable.  that means that if we do have done deferred
       // txcs AND we are not on a single device, we need to force a flush.
       if (bluefs_single_shared_device && bluefs) {
-       if (num_aios) {
+       if (aios) {
          force_flush = true;
        } else if (kv_committing.empty() && kv_submitting.empty() &&
                   deferred_stable.empty()) {
@@ -7993,7 +8479,7 @@ void BlueStore::_kv_sync_thread()
        force_flush = true;
 
       if (force_flush) {
-       dout(20) << __func__ << " num_aios=" << num_aios
+       dout(20) << __func__ << " num_aios=" << aios
                 << " force_flush=" << (int)force_flush
                 << ", flushing, deferred done->stable" << dendl;
        // flush/barrier on block device
@@ -8032,35 +8518,39 @@ void BlueStore::_kv_sync_thread()
        t->set(PREFIX_SUPER, "blobid_max", bl);
        dout(10) << __func__ << " new_blobid_max " << new_blobid_max << dendl;
       }
-      for (auto txc : kv_submitting) {
-       assert(txc->state == TransContext::STATE_KV_QUEUED);
-       txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
-       int r = db->submit_transaction(txc->t);
-       assert(r == 0);
-       _txc_applied_kv(txc);
-       --txc->osr->kv_committing_serially;
-       txc->state = TransContext::STATE_KV_SUBMITTED;
-       if (txc->osr->kv_submitted_waiters) {
-         std::lock_guard<std::mutex> l(txc->osr->qlock);
-         if (txc->osr->_is_all_kv_submitted()) {
-           txc->osr->qcond.notify_all();
+
+      for (auto txc : kv_committing) {
+       if (txc->state == TransContext::STATE_KV_QUEUED) {
+         txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
+         int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
+         assert(r == 0);
+         _txc_applied_kv(txc);
+         --txc->osr->kv_committing_serially;
+         txc->state = TransContext::STATE_KV_SUBMITTED;
+         if (txc->osr->kv_submitted_waiters) {
+           std::lock_guard<std::mutex> l(txc->osr->qlock);
+           if (txc->osr->_is_all_kv_submitted()) {
+             txc->osr->qcond.notify_all();
+           }
          }
+
+       } else {
+         assert(txc->state == TransContext::STATE_KV_SUBMITTED);
+         txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
        }
-      }
-      for (auto txc : kv_committing) {
        if (txc->had_ios) {
          --txc->osr->txc_with_unstable_io;
        }
-       txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
-       // release throttle *before* we commit.  this allows new ops
-       // to be prepared and enter pipeline while we are waiting on
-       // the kv commit sync/flush.  then hopefully on the next
-       // iteration there will already be ops awake.  otherwise, we
-       // end up going to sleep, and then wake up when the very first
-       // transaction is ready for commit.
-       throttle_bytes.put(txc->cost);
       }
 
+      // release throttle *before* we commit.  this allows new ops
+      // to be prepared and enter pipeline while we are waiting on
+      // the kv commit sync/flush.  then hopefully on the next
+      // iteration there will already be ops awake.  otherwise, we
+      // end up going to sleep, and then wake up when the very first
+      // transaction is ready for commit.
+      throttle_bytes.put(costs);
+
       PExtentVector bluefs_gift_extents;
       if (bluefs &&
          after_flush - bluefs_last_balance >
@@ -8100,7 +8590,7 @@ void BlueStore::_kv_sync_thread()
       }
 
       // submit synct synchronously (block and wait for it to commit)
-      int r = db->submit_transaction_sync(synct);
+      int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction_sync(synct);
       assert(r == 0);
 
       if (new_nid_max) {
@@ -8112,26 +8602,102 @@ void BlueStore::_kv_sync_thread()
        dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
       }
 
-      utime_t finish = ceph_clock_now();
-      utime_t dur_flush = after_flush - start;
-      utime_t dur_kv = finish - after_flush;
-      utime_t dur = finish - start;
-      dout(20) << __func__ << " committed " << kv_committing.size()
-              << " cleaned " << deferred_stable.size()
-              << " in " << dur
-              << " (" << dur_flush << " flush + " << dur_kv << " kv commit)"
-              << dendl;
-      if (logger) {
+      {
+       utime_t finish = ceph_clock_now();
+       utime_t dur_flush = after_flush - start;
+       utime_t dur_kv = finish - after_flush;
+       utime_t dur = finish - start;
+       dout(20) << __func__ << " committed " << kv_committing.size()
+         << " cleaned " << deferred_stable.size()
+         << " in " << dur
+         << " (" << dur_flush << " flush + " << dur_kv << " kv commit)"
+         << dendl;
        logger->tinc(l_bluestore_kv_flush_lat, dur_flush);
        logger->tinc(l_bluestore_kv_commit_lat, dur_kv);
        logger->tinc(l_bluestore_kv_lat, dur);
       }
-      while (!kv_committing.empty()) {
-       TransContext *txc = kv_committing.front();
+
+      if (bluefs) {
+       if (!bluefs_gift_extents.empty()) {
+         _commit_bluefs_freespace(bluefs_gift_extents);
+       }
+       for (auto p = bluefs_extents_reclaiming.begin();
+            p != bluefs_extents_reclaiming.end();
+            ++p) {
+         dout(20) << __func__ << " releasing old bluefs 0x" << std::hex
+                  << p.get_start() << "~" << p.get_len() << std::dec
+                  << dendl;
+         alloc->release(p.get_start(), p.get_len());
+       }
+       bluefs_extents_reclaiming.clear();
+      }
+
+      {
+       std::unique_lock<std::mutex> m(kv_finalize_lock);
+       if (kv_committing_to_finalize.empty()) {
+         kv_committing_to_finalize.swap(kv_committing);
+       } else {
+         kv_committing_to_finalize.insert(
+           kv_committing_to_finalize.end(),
+           kv_committing.begin(),
+           kv_committing.end());
+         kv_committing.clear();
+       }
+       if (deferred_stable_to_finalize.empty()) {
+         deferred_stable_to_finalize.swap(deferred_stable);
+       } else {
+         deferred_stable_to_finalize.insert(
+           deferred_stable_to_finalize.end(),
+           deferred_stable.begin(),
+           deferred_stable.end());
+          deferred_stable.clear();
+       }
+       kv_finalize_cond.notify_one();
+      }
+
+      l.lock();
+      // previously deferred "done" are now "stable" by virtue of this
+      // commit cycle.
+      deferred_stable_queue.swap(deferred_done);
+    }
+  }
+  dout(10) << __func__ << " finish" << dendl;
+  kv_sync_started = false;
+}
+
+void BlueStore::_kv_finalize_thread()
+{
+  deque<TransContext*> kv_committed;
+  deque<DeferredBatch*> deferred_stable;
+  dout(10) << __func__ << " start" << dendl;
+  std::unique_lock<std::mutex> l(kv_finalize_lock);
+  assert(!kv_finalize_started);
+  kv_finalize_started = true;
+  kv_finalize_cond.notify_all();
+  while (true) {
+    assert(kv_committed.empty());
+    assert(deferred_stable.empty());
+    if (kv_committing_to_finalize.empty() &&
+       deferred_stable_to_finalize.empty()) {
+      if (kv_finalize_stop)
+       break;
+      dout(20) << __func__ << " sleep" << dendl;
+      kv_finalize_cond.wait(l);
+      dout(20) << __func__ << " wake" << dendl;
+    } else {
+      kv_committed.swap(kv_committing_to_finalize);
+      deferred_stable.swap(deferred_stable_to_finalize);
+      l.unlock();
+      dout(20) << __func__ << " kv_committed " << kv_committed << dendl;
+      dout(20) << __func__ << " deferred_stable " << deferred_stable << dendl;
+
+      while (!kv_committed.empty()) {
+       TransContext *txc = kv_committed.front();
        assert(txc->state == TransContext::STATE_KV_SUBMITTED);
        _txc_state_proc(txc);
-       kv_committing.pop_front();
+       kv_committed.pop_front();
       }
+
       for (auto b : deferred_stable) {
        auto p = b->txcs.begin();
        while (p != b->txcs.end()) {
@@ -8141,40 +8707,23 @@ void BlueStore::_kv_sync_thread()
        }
        delete b;
       }
+      deferred_stable.clear();
 
       if (!deferred_aggressive) {
-       std::lock_guard<std::mutex> l(deferred_lock);
-       if (deferred_queue_size >= deferred_batch_ops ||
+       if (deferred_queue_size >= deferred_batch_ops.load() ||
            throttle_deferred_bytes.past_midpoint()) {
-         _deferred_try_submit();
+         deferred_try_submit();
        }
       }
 
       // this is as good a place as any ...
       _reap_collections();
 
-      if (bluefs) {
-       if (!bluefs_gift_extents.empty()) {
-         _commit_bluefs_freespace(bluefs_gift_extents);
-       }
-       for (auto p = bluefs_extents_reclaiming.begin();
-            p != bluefs_extents_reclaiming.end();
-            ++p) {
-         dout(20) << __func__ << " releasing old bluefs 0x" << std::hex
-                  << p.get_start() << "~" << p.get_len() << std::dec
-                  << dendl;
-         alloc->release(p.get_start(), p.get_len());
-       }
-       bluefs_extents_reclaiming.clear();
-      }
-
       l.lock();
-      // previously deferred "done" are now "stable" by virtue of this
-      // commit cycle.
-      deferred_stable_queue.swap(deferred_done);
     }
   }
   dout(10) << __func__ << " finish" << dendl;
+  kv_finalize_started = false;
 }
 
 bluestore_deferred_op_t *BlueStore::_get_deferred_op(
@@ -8190,7 +8739,7 @@ bluestore_deferred_op_t *BlueStore::_get_deferred_op(
 void BlueStore::_deferred_queue(TransContext *txc)
 {
   dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
-  std::lock_guard<std::mutex> l(deferred_lock);
+  deferred_lock.lock();
   if (!txc->osr->deferred_pending &&
       !txc->osr->deferred_running) {
     deferred_queue.push_back(*txc->osr);
@@ -8212,22 +8761,38 @@ void BlueStore::_deferred_queue(TransContext *txc)
   }
   if (deferred_aggressive &&
       !txc->osr->deferred_running) {
-    _deferred_submit(txc->osr.get());
+    _deferred_submit_unlock(txc->osr.get());
+  } else {
+    deferred_lock.unlock();
   }
 }
 
-void BlueStore::_deferred_try_submit()
+void BlueStore::deferred_try_submit()
 {
   dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
           << deferred_queue_size << " txcs" << dendl;
+  std::lock_guard<std::mutex> l(deferred_lock);
+  vector<OpSequencerRef> osrs;
+  osrs.reserve(deferred_queue.size());
   for (auto& osr : deferred_queue) {
-    if (!osr.deferred_running) {
-      _deferred_submit(&osr);
+    osrs.push_back(&osr);
+  }
+  for (auto& osr : osrs) {
+    if (osr->deferred_pending) {
+      if (!osr->deferred_running) {
+       _deferred_submit_unlock(osr.get());
+       deferred_lock.lock();
+      } else {
+       dout(20) << __func__ << "  osr " << osr << " already has running"
+                << dendl;
+      }
+    } else {
+      dout(20) << __func__ << "  osr " << osr << " has no pending" << dendl;
     }
   }
 }
 
-void BlueStore::_deferred_submit(OpSequencer *osr)
+void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
 {
   dout(10) << __func__ << " osr " << osr
           << " " << osr->deferred_pending->iomap.size() << " ios pending "
@@ -8275,9 +8840,19 @@ void BlueStore::_deferred_submit(OpSequencer *osr)
     bl.claim_append(i->second.bl);
     ++i;
   }
+
+  deferred_lock.unlock();
   bdev->aio_submit(&b->ioc);
 }
 
+struct C_DeferredTrySubmit : public Context {
+  BlueStore *store;
+  C_DeferredTrySubmit(BlueStore *s) : store(s) {}
+  void finish(int r) {
+    store->deferred_try_submit();
+  }
+};
+
 void BlueStore::_deferred_aio_finish(OpSequencer *osr)
 {
   dout(10) << __func__ << " osr " << osr << dendl;
@@ -8289,21 +8864,27 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
     assert(osr->deferred_running == b);
     osr->deferred_running = nullptr;
     if (!osr->deferred_pending) {
+      dout(20) << __func__ << " dequeueing" << dendl;
       auto q = deferred_queue.iterator_to(*osr);
       deferred_queue.erase(q);
     } else if (deferred_aggressive) {
-      _deferred_submit(osr);
+      dout(20) << __func__ << " queuing async deferred_try_submit" << dendl;
+      deferred_finisher.queue(new C_DeferredTrySubmit(this));
+    } else {
+      dout(20) << __func__ << " leaving queued, more pending" << dendl;
     }
   }
 
   {
+    uint64_t costs = 0;
     std::lock_guard<std::mutex> l2(osr->qlock);
     for (auto& i : b->txcs) {
       TransContext *txc = &i;
       txc->state = TransContext::STATE_DEFERRED_CLEANUP;
-      txc->osr->qcond.notify_all();
-      throttle_deferred_bytes.put(txc->cost);
+      costs += txc->cost;
     }
+    osr->qcond.notify_all();
+    throttle_deferred_bytes.put(costs);
     std::lock_guard<std::mutex> l(kv_lock);
     deferred_done_queue.emplace_back(b);
   }
@@ -8424,9 +9005,18 @@ int BlueStore::queue_transactions(
   if (txc->deferred_txn) {
     // ensure we do not block here because of deferred writes
     if (!throttle_deferred_bytes.get_or_fail(txc->cost)) {
+      dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
+              << dendl;
+      ++deferred_aggressive;
       deferred_try_submit();
+      {
+       // wake up any previously finished deferred events
+       std::lock_guard<std::mutex> l(kv_lock);
+       kv_cond.notify_one();
+      }
       throttle_deferred_bytes.get(txc->cost);
-    }
+      --deferred_aggressive;
+   }
   }
   utime_t tend = ceph_clock_now();
 
@@ -8460,10 +9050,6 @@ void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t)
   for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
        ++p, ++j) {
     cvec[j] = _get_collection(*p);
-
-    // note first collection we reference
-    if (!txc->first_collection)
-      txc->first_collection = cvec[j];
   }
   vector<OnodeRef> ovec(i.objects.size());
 
@@ -8608,7 +9194,7 @@ void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t)
     case Transaction::OP_TRUNCATE:
       {
         uint64_t off = op->off;
-       _truncate(txc, c, o, off);
+       r = _truncate(txc, c, o, off);
       }
       break;
 
@@ -8811,14 +9397,13 @@ int BlueStore::_touch(TransContext *txc,
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
   int r = 0;
-  o->exists = true;
   _assign_nid(txc, o);
   txc->write_onode(o);
   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
   return r;
 }
 
-void BlueStore::_dump_onode(OnodeRef o, int log_level)
+void BlueStore::_dump_onode(const OnodeRef& o, int log_level)
 {
   if (!cct->_conf->subsys.should_gather(ceph_subsys_bluestore, log_level))
     return;
@@ -8900,12 +9485,12 @@ void BlueStore::_pad_zeros(
   if (front_pad) {
     size_t front_copy = MIN(chunk_size - front_pad, length);
     bufferptr z = buffer::create_page_aligned(chunk_size);
-    memset(z.c_str(), 0, front_pad);
+    z.zero(0, front_pad, false);
     pad_count += front_pad;
-    memcpy(z.c_str() + front_pad, bl->get_contiguous(0, front_copy), front_copy);
+    bl->copy(0, front_copy, z.c_str() + front_pad);
     if (front_copy + front_pad < chunk_size) {
       back_pad = chunk_size - (length + front_pad);
-      memset(z.c_str() + front_pad + length, 0, back_pad);
+      z.zero(front_pad + length, back_pad, false);
       pad_count += back_pad;
     }
     bufferlist old, t;
@@ -8914,7 +9499,7 @@ void BlueStore::_pad_zeros(
     bl->append(z);
     bl->claim_append(t);
     *offset -= front_pad;
-    length += front_pad + back_pad;
+    length += pad_count;
   }
 
   // back
@@ -8925,9 +9510,8 @@ void BlueStore::_pad_zeros(
     back_pad = chunk_size - back_copy;
     assert(back_copy <= length);
     bufferptr tail(chunk_size);
-    memcpy(tail.c_str(), bl->get_contiguous(length - back_copy, back_copy),
-          back_copy);
-    memset(tail.c_str() + back_copy, 0, back_pad);
+    bl->copy(length - back_copy, back_copy, tail.c_str());
+    tail.zero(back_copy, back_pad, false);
     bufferlist old;
     old.swap(*bl);
     bl->substr_of(old, 0, length - back_copy);
@@ -8991,7 +9575,7 @@ void BlueStore::_do_write_small(
 
   // search suitable extent in both forward and reverse direction in
   // [offset - target_max_blob_size, offset + target_max_blob_size] range
-  // then check if blob can be reused via try_reuse_blob func or apply
+  // then check if blob can be reused via can_reuse_blob func or apply
   // direct/deferred write (the latter for extents including or higher
   // than 'offset' only).
   do {
@@ -9035,14 +9619,13 @@ void BlueStore::_do_write_small(
            b->get_blob().get_ondisk_length() >= b_off + b_len &&
            b->get_blob().is_unused(b_off, b_len) &&
            b->get_blob().is_allocated(b_off, b_len)) {
-         bufferlist padded;
-         _apply_padding(head_pad, tail_pad, bl, padded);
+         _apply_padding(head_pad, tail_pad, bl);
 
          dout(20) << __func__ << "  write to unused 0x" << std::hex
                   << b_off << "~" << b_len
                   << " pad 0x" << head_pad << " + 0x" << tail_pad
                   << std::dec << " of mutable " << *b << dendl;
-         _buffer_cache_write(txc, b, b_off, padded,
+         _buffer_cache_write(txc, b, b_off, bl,
                              wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
 
          if (!g_conf->bluestore_debug_omit_block_device_write) {
@@ -9057,17 +9640,17 @@ void BlueStore::_do_write_small(
                  op->extents.emplace_back(bluestore_pextent_t(offset, length));
                  return 0;
                });
-             op->data = padded;
+             op->data = bl;
            } else {
              b->get_blob().map_bl(
-               b_off, padded,
+               b_off, bl,
                [&](uint64_t offset, bufferlist& t) {
                  bdev->aio_write(offset, t,
                                  &txc->ioc, wctx->buffered);
                });
            }
          }
-         b->dirty_blob().calc_csum(b_off, padded);
+         b->dirty_blob().calc_csum(b_off, bl);
          dout(20) << __func__ << "  lex old " << *ep << dendl;
          Extent *le = o->extent_map.set_lextent(c, offset, b_off + head_pad, length,
                                                 b,
@@ -9097,8 +9680,7 @@ void BlueStore::_do_write_small(
            b_len % chunk_size == 0 &&
            b->get_blob().is_allocated(b_off, b_len)) {
 
-         bufferlist padded;
-         _apply_padding(head_pad, tail_pad, bl, padded);
+         _apply_padding(head_pad, tail_pad, bl);
 
          dout(20) << __func__ << "  reading head 0x" << std::hex << head_read
                   << " and tail 0x" << tail_read << std::dec << dendl;
@@ -9112,8 +9694,7 @@ void BlueStore::_do_write_small(
              head_bl.append_zero(zlen);
              logger->inc(l_bluestore_write_pad_bytes, zlen);
            }
-           head_bl.claim_append(padded);
-           padded.swap(head_bl);
+           bl.claim_prepend(head_bl);
            logger->inc(l_bluestore_write_penalty_read_ops);
          }
          if (tail_read) {
@@ -9126,14 +9707,14 @@ void BlueStore::_do_write_small(
              tail_bl.append_zero(zlen);
              logger->inc(l_bluestore_write_pad_bytes, zlen);
            }
-           padded.claim_append(tail_bl);
+           bl.claim_append(tail_bl);
            logger->inc(l_bluestore_write_penalty_read_ops);
          }
          logger->inc(l_bluestore_write_small_pre_read);
 
          bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
          op->op = bluestore_deferred_op_t::OP_WRITE;
-         _buffer_cache_write(txc, b, b_off, padded,
+         _buffer_cache_write(txc, b, b_off, bl,
                              wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
 
          int r = b->get_blob().map(
@@ -9144,9 +9725,9 @@ void BlueStore::_do_write_small(
            });
          assert(r == 0);
          if (b->get_blob().csum_type) {
-           b->dirty_blob().calc_csum(b_off, padded);
+           b->dirty_blob().calc_csum(b_off, bl);
          }
-         op->data.claim(padded);
+         op->data.claim(bl);
          dout(20) << __func__ << "  deferred write 0x" << std::hex << b_off << "~"
                   << b_len << std::dec << " of mutable " << *b
                   << " at " << op->extents << dendl;
@@ -9158,8 +9739,8 @@ void BlueStore::_do_write_small(
          logger->inc(l_bluestore_write_small_deferred);
          return;
        }
-       //try to reuse blob
-       if (b->try_reuse_blob(min_alloc_size,
+       // try to reuse blob if we can
+       if (b->can_reuse_blob(min_alloc_size,
                              max_bsize,
                              offset0 - bstart,
                              &alloc_len)) {
@@ -9183,8 +9764,8 @@ void BlueStore::_do_write_small(
            _pad_zeros(&bl, &b_off0, chunk_size);
 
            dout(20) << __func__ << " reuse blob " << *b << std::hex
-                    << " (" << b_off0 << "~" << bl.length() << ")"
-                    << " (" << b_off << "~" << length << ")"
+                    << " (0x" << b_off0 << "~" << bl.length() << ")"
+                    << " (0x" << b_off << "~" << length << ")"
                     << std::dec << dendl;
 
            o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
@@ -9205,7 +9786,7 @@ void BlueStore::_do_write_small(
       auto bstart = prev_ep->blob_start();
       dout(20) << __func__ << " considering " << *b
               << " bstart 0x" << std::hex << bstart << std::dec << dendl;
-      if (b->try_reuse_blob(min_alloc_size,
+      if (b->can_reuse_blob(min_alloc_size,
                            max_bsize,
                             offset0 - bstart,
                             &alloc_len)) {
@@ -9228,8 +9809,8 @@ void BlueStore::_do_write_small(
          _pad_zeros(&bl, &b_off0, chunk_size);
 
          dout(20) << __func__ << " reuse blob " << *b << std::hex
-                   << " (" << b_off0 << "~" << bl.length() << ")"
-                   << " (" << b_off << "~" << length << ")"
+                   << " (0x" << b_off0 << "~" << bl.length() << ")"
+                   << " (0x" << b_off << "~" << length << ")"
                    << std::dec << dendl;
 
          o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
@@ -9298,20 +9879,20 @@ void BlueStore::_do_write_big(
       auto min_off = offset >= max_bsize ? offset - max_bsize : 0;
       // search suitable extent in both forward and reverse direction in
       // [offset - target_max_blob_size, offset + target_max_blob_size] range
-      // then check if blob can be reused via try_reuse_blob func.
+      // then check if blob can be reused via can_reuse_blob func.
       bool any_change;
       do {
        any_change = false;
        if (ep != end && ep->logical_offset < offset + max_bsize) {
          if (offset >= ep->blob_start() &&
-              ep->blob->try_reuse_blob(min_alloc_size, max_bsize,
+              ep->blob->can_reuse_blob(min_alloc_size, max_bsize,
                                       offset - ep->blob_start(),
                                       &l)) {
            b = ep->blob;
            b_off = offset - ep->blob_start();
             prev_ep = end; // to avoid check below
            dout(20) << __func__ << " reuse blob " << *b << std::hex
-                    << " (" << b_off << "~" << l << ")" << std::dec << dendl;
+                    << " (0x" << b_off << "~" << l << ")" << std::dec << dendl;
          } else {
            ++ep;
            any_change = true;
@@ -9319,13 +9900,13 @@ void BlueStore::_do_write_big(
        }
 
        if (prev_ep != end && prev_ep->logical_offset >= min_off) {
-         if (prev_ep->blob->try_reuse_blob(min_alloc_size, max_bsize,
+         if (prev_ep->blob->can_reuse_blob(min_alloc_size, max_bsize,
                                            offset - prev_ep->blob_start(),
                                            &l)) {
            b = prev_ep->blob;
            b_off = offset - prev_ep->blob_start();
            dout(20) << __func__ << " reuse blob " << *b << std::hex
-                    << " (" << b_off << "~" << l << ")" << std::dec << dendl;
+                    << " (0x" << b_off << "~" << l << ")" << std::dec << dendl;
          } else if (prev_ep != begin) {
            --prev_ep;
            any_change = true;
@@ -9359,20 +9940,10 @@ int BlueStore::_do_alloc_write(
   dout(20) << __func__ << " txc " << txc
           << " " << wctx->writes.size() << " blobs"
           << dendl;
-
-  uint64_t need = 0;
-  auto max_bsize = MAX(wctx->target_blob_size, min_alloc_size);
-  for (auto &wi : wctx->writes) {
-    need += wi.blob_length;
-  }
-  int r = alloc->reserve(need);
-  if (r < 0) {
-    derr << __func__ << " failed to reserve 0x" << std::hex << need << std::dec
-        << dendl;
-    return r;
+  if (wctx->writes.empty()) {
+    return 0;
   }
 
-  uint64_t hint = 0;
   CompressorRef c;
   double crr = 0;
   if (wctx->compress) {
@@ -9397,7 +9968,7 @@ int BlueStore::_do_alloc_write(
       cct->_conf->bluestore_compression_required_ratio,
       [&]() {
         double val;
-        if(coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) {
+        if (coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) {
           return boost::optional<double>(val);
         }
         return boost::optional<double>();
@@ -9412,87 +9983,109 @@ int BlueStore::_do_alloc_write(
     csum,
     [&]() {
       int val;
-      if(coll->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) {
+      if (coll->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) {
         return  boost::optional<int>(val);
       }
       return boost::optional<int>();
     }
   );
 
+  // compress (as needed) and calc needed space
+  uint64_t need = 0;
+  auto max_bsize = MAX(wctx->target_blob_size, min_alloc_size);
   for (auto& wi : wctx->writes) {
-    BlobRef b = wi.b;
-    bluestore_blob_t& dblob = b->dirty_blob();
-    uint64_t b_off = wi.b_off;
-    bufferlist *l = &wi.bl;
-    uint64_t final_length = wi.blob_length;
-    uint64_t csum_length = wi.blob_length;
-    unsigned csum_order = block_size_order;
-    bufferlist compressed_bl;
-    bool compressed = false;
-    if(c && wi.blob_length > min_alloc_size) {
-
+    if (c && wi.blob_length > min_alloc_size) {
       utime_t start = ceph_clock_now();
 
       // compress
-      assert(b_off == 0);
-      assert(wi.blob_length == l->length());
-      bluestore_compression_header_t chdr;
-      chdr.type = c->get_type();
+      assert(wi.b_off == 0);
+      assert(wi.blob_length == wi.bl.length());
+
       // FIXME: memory alignment here is bad
       bufferlist t;
-
-      r = c->compress(*l, t);
+      int r = c->compress(wi.bl, t);
       assert(r == 0);
 
+      bluestore_compression_header_t chdr;
+      chdr.type = c->get_type();
       chdr.length = t.length();
-      ::encode(chdr, compressed_bl);
-      compressed_bl.claim_append(t);
-      uint64_t rawlen = compressed_bl.length();
-      uint64_t newlen = P2ROUNDUP(rawlen, min_alloc_size);
-      uint64_t want_len_raw = final_length * crr;
+      ::encode(chdr, wi.compressed_bl);
+      wi.compressed_bl.claim_append(t);
+
+      wi.compressed_len = wi.compressed_bl.length();
+      uint64_t newlen = P2ROUNDUP(wi.compressed_len, min_alloc_size);
+      uint64_t want_len_raw = wi.blob_length * crr;
       uint64_t want_len = P2ROUNDUP(want_len_raw, min_alloc_size);
-      if (newlen <= want_len && newlen < final_length) {
-        // Cool. We compressed at least as much as we were hoping to.
-        // pad out to min_alloc_size
-       compressed_bl.append_zero(newlen - rawlen);
-       logger->inc(l_bluestore_write_pad_bytes, newlen - rawlen);
+      if (newlen <= want_len && newlen < wi.blob_length) {
+       // Cool. We compressed at least as much as we were hoping to.
+       // pad out to min_alloc_size
+       wi.compressed_bl.append_zero(newlen - wi.compressed_len);
+       logger->inc(l_bluestore_write_pad_bytes, newlen - wi.compressed_len);
        dout(20) << __func__ << std::hex << "  compressed 0x" << wi.blob_length
-                << " -> 0x" << rawlen << " => 0x" << newlen
+                << " -> 0x" << wi.compressed_len << " => 0x" << newlen
                 << " with " << c->get_type()
                 << std::dec << dendl;
-       txc->statfs_delta.compressed() += rawlen;
-       txc->statfs_delta.compressed_original() += l->length();
+       txc->statfs_delta.compressed() += wi.compressed_len;
+       txc->statfs_delta.compressed_original() += wi.blob_length;
        txc->statfs_delta.compressed_allocated() += newlen;
-       l = &compressed_bl;
-       final_length = newlen;
-       csum_length = newlen;
-       csum_order = ctz(newlen);
-       dblob.set_compressed(wi.blob_length, rawlen);
-       compressed = true;
-        logger->inc(l_bluestore_compress_success_count);
+       logger->inc(l_bluestore_compress_success_count);
+       wi.compressed = true;
+       need += newlen;
       } else {
-       dout(20) << __func__ << std::hex << "  0x" << l->length()
-                << " compressed to 0x" << rawlen << " -> 0x" << newlen
-                 << " with " << c->get_type()
-                 << ", which is more than required 0x" << want_len_raw
+       dout(20) << __func__ << std::hex << "  0x" << wi.blob_length
+                << " compressed to 0x" << wi.compressed_len << " -> 0x" << newlen
+                << " with " << c->get_type()
+                << ", which is more than required 0x" << want_len_raw
                 << " -> 0x" << want_len
-                 << ", leaving uncompressed"
-                 << std::dec << dendl;
-        logger->inc(l_bluestore_compress_rejected_count);
+                << ", leaving uncompressed"
+                << std::dec << dendl;
+       logger->inc(l_bluestore_compress_rejected_count);
+       need += wi.blob_length;
       }
       logger->tinc(l_bluestore_compress_lat,
                   ceph_clock_now() - start);
+    } else {
+      need += wi.blob_length;
     }
-    if (!compressed && wi.new_blob) {
-      // initialize newly created blob only
-      assert(!dblob.has_flag(bluestore_blob_t::FLAG_MUTABLE));
-      dblob.set_flag(bluestore_blob_t::FLAG_MUTABLE);
+  }
+  int r = alloc->reserve(need);
+  if (r < 0) {
+    derr << __func__ << " failed to reserve 0x" << std::hex << need << std::dec
+        << dendl;
+    return r;
+  }
+  AllocExtentVector prealloc;
+  prealloc.reserve(2 * wctx->writes.size());;
+  int prealloc_left = 0;
+  prealloc_left = alloc->allocate(
+    need, min_alloc_size, need,
+    0, &prealloc);
+  assert(prealloc_left == (int64_t)need);
+  dout(20) << __func__ << " prealloc " << prealloc << dendl;
+  auto prealloc_pos = prealloc.begin();
 
+  for (auto& wi : wctx->writes) {
+    BlobRef b = wi.b;
+    bluestore_blob_t& dblob = b->dirty_blob();
+    uint64_t b_off = wi.b_off;
+    bufferlist *l = &wi.bl;
+    uint64_t final_length = wi.blob_length;
+    uint64_t csum_length = wi.blob_length;
+    unsigned csum_order = block_size_order;
+    if (wi.compressed) {
+      final_length = wi.compressed_bl.length();
+      csum_length = final_length;
+      csum_order = ctz(csum_length);
+      l = &wi.compressed_bl;
+      dblob.set_compressed(wi.blob_length, wi.compressed_len);
+    } else if (wi.new_blob) {
+      // initialize newly created blob only
+      assert(dblob.is_mutable());
       if (l->length() != wi.blob_length) {
         // hrm, maybe we could do better here, but let's not bother.
         dout(20) << __func__ << " forcing csum_order to block_size_order "
                 << block_size_order << dendl;
-       csum_order = block_size_order;
+       csum_order = block_size_order;
       } else {
         csum_order = std::min(wctx->csum_order, ctz(l->length()));
       }
@@ -9503,41 +10096,52 @@ int BlueStore::_do_alloc_write(
       if ((suggested_boff % (1 << csum_order)) == 0 &&
            suggested_boff + final_length <= max_bsize &&
            suggested_boff > b_off) {
-        dout(20) << __func__ << " forcing blob_offset to "
+        dout(20) << __func__ << " forcing blob_offset to 0x"
                  << std::hex << suggested_boff << std::dec << dendl;
         assert(suggested_boff >= b_off);
         csum_length += suggested_boff - b_off;
         b_off = suggested_boff;
       }
+      if (csum != Checksummer::CSUM_NONE) {
+        dout(20) << __func__ << " initialize csum setting for new blob " << *b
+                 << " csum_type " << Checksummer::get_csum_type_string(csum)
+                 << " csum_order " << csum_order
+                 << " csum_length 0x" << std::hex << csum_length << std::dec
+                 << dendl;
+        dblob.init_csum(csum, csum_order, csum_length);
+      }
     }
 
     AllocExtentVector extents;
-    extents.reserve(4);  // 4 should be (more than) enough for most allocations
-    int64_t got = alloc->allocate(final_length, min_alloc_size, 
-                                 max_alloc_size.load(),
-                                 hint, &extents);
-    assert(got == (int64_t)final_length);
-    need -= got;
-    txc->statfs_delta.allocated() += got;
+    int64_t left = final_length;
+    while (left > 0) {
+      assert(prealloc_left > 0);
+      if (prealloc_pos->length <= left) {
+       prealloc_left -= prealloc_pos->length;
+       left -= prealloc_pos->length;
+       txc->statfs_delta.allocated() += prealloc_pos->length;
+       extents.push_back(*prealloc_pos);
+       ++prealloc_pos;
+      } else {
+       extents.emplace_back(prealloc_pos->offset, left);
+       prealloc_pos->offset += left;
+       prealloc_pos->length -= left;
+       prealloc_left -= left;
+       txc->statfs_delta.allocated() += left;
+       left = 0;
+       break;
+      }
+    }
     for (auto& p : extents) {
-      bluestore_pextent_t e = bluestore_pextent_t(p);
-      txc->allocated.insert(e.offset, e.length);
-      hint = p.end();
+      txc->allocated.insert(p.offset, p.length);
     }
     dblob.allocated(P2ALIGN(b_off, min_alloc_size), final_length, extents);
 
-    dout(20) << __func__ << " blob " << *b
-            << " csum_type " << Checksummer::get_csum_type_string(csum)
-            << " csum_order " << csum_order
-            << " csum_length 0x" << std::hex << csum_length << std::dec
-            << dendl;
-
-    if (csum != Checksummer::CSUM_NONE) {
-      if (!dblob.has_csum()) {
-       dblob.init_csum(csum, csum_order, csum_length);
-      }
+    dout(20) << __func__ << " blob " << *b << dendl;
+    if (dblob.has_csum()) {
       dblob.calc_csum(b_off, *l);
     }
+
     if (wi.mark_unused) {
       auto b_end = b_off + wi.bl.length();
       if (b_off) {
@@ -9583,9 +10187,8 @@ int BlueStore::_do_alloc_write(
       }
     }
   }
-  if (need > 0) {
-    alloc->unreserve(need);
-  }
+  assert(prealloc_pos == prealloc.end());
+  assert(prealloc_left == 0);
   return 0;
 }
 
@@ -9593,7 +10196,8 @@ void BlueStore::_wctx_finish(
   TransContext *txc,
   CollectionRef& c,
   OnodeRef o,
-  WriteContext *wctx)
+  WriteContext *wctx,
+  set<SharedBlob*> *maybe_unshared_blobs)
 {
   auto oep = wctx->old_extents.begin();
   while (oep != wctx->old_extents.end()) {
@@ -9616,7 +10220,9 @@ void BlueStore::_wctx_finish(
        PExtentVector final;
         c->load_shared_blob(b->shared_blob);
        for (auto e : r) {
-         b->shared_blob->put_ref(e.offset, e.length, &final);
+         b->shared_blob->put_ref(
+           e.offset, e.length, &final,
+           b->is_referenced() ? nullptr : maybe_unshared_blobs);
        }
        dout(20) << __func__ << "  shared_blob release " << final
                 << " from " << *b->shared_blob << dendl;
@@ -9693,68 +10299,41 @@ void BlueStore::_do_write_data(
   }
 }
 
-int BlueStore::_do_write(
-  TransContext *txc,
-  CollectionRef& c,
-  OnodeRef o,
-  uint64_t offset,
-  uint64_t length,
-  bufferlist& bl,
-  uint32_t fadvise_flags)
+void BlueStore::_choose_write_options(
+   CollectionRef& c,
+   OnodeRef o,
+   uint32_t fadvise_flags,
+   WriteContext *wctx)
 {
-  int r = 0;
-
-  dout(20) << __func__
-          << " " << o->oid
-          << " 0x" << std::hex << offset << "~" << length
-          << " - have 0x" << o->onode.size
-          << " (" << std::dec << o->onode.size << ")"
-          << " bytes"
-          << " fadvise_flags 0x" << std::hex << fadvise_flags << std::dec
-          << dendl;
-  _dump_onode(o);
-
-  if (length == 0) {
-    return 0;
-  }
-
-  uint64_t end = offset + length;
-  bool was_gc = false;
-  GarbageCollector gc(c->store->cct);
-  int64_t benefit;
-  auto dirty_start = offset;
-  auto dirty_end = offset + length;
-
-  WriteContext wctx, wctx_gc;
   if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) {
     dout(20) << __func__ << " will do buffered write" << dendl;
-    wctx.buffered = true;
+    wctx->buffered = true;
   } else if (cct->_conf->bluestore_default_buffered_write &&
             (fadvise_flags & (CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
                               CEPH_OSD_OP_FLAG_FADVISE_NOCACHE)) == 0) {
     dout(20) << __func__ << " defaulting to buffered write" << dendl;
-    wctx.buffered = true;
+    wctx->buffered = true;
   }
 
-  // FIXME: Using the MAX of the block_size_order and preferred_csum_order
-  // results in poor small random read performance when data was initially 
-  // written out in large chunks.  Reverting to previous behavior for now.
-  wctx.csum_order = block_size_order;
+  // apply basic csum block size
+  wctx->csum_order = block_size_order;
 
   // compression parameters
   unsigned alloc_hints = o->onode.alloc_hint_flags;
   auto cm = select_option(
     "compression_mode",
-    comp_mode.load(), 
+    comp_mode.load(),
     [&]() {
       string val;
       if(c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) {
-       return  boost::optional<Compressor::CompressionMode>(Compressor::get_comp_mode_type(val));
+       return boost::optional<Compressor::CompressionMode>(
+         Compressor::get_comp_mode_type(val));
       }
       return boost::optional<Compressor::CompressionMode>();
     }
   );
-  wctx.compress = (cm != Compressor::COMP_NONE) &&
+
+  wctx->compress = (cm != Compressor::COMP_NONE) &&
     ((cm == Compressor::COMP_FORCE) ||
      (cm == Compressor::COMP_AGGRESSIVE &&
       (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE) == 0) ||
@@ -9763,22 +10342,23 @@ int BlueStore::_do_write(
 
   if ((alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ) &&
       (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ) == 0 &&
-      (alloc_hints & (CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE|
-                       CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY)) &&
+      (alloc_hints & (CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE |
+                      CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY)) &&
       (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE) == 0) {
+
     dout(20) << __func__ << " will prefer large blob and csum sizes" << dendl;
-    auto order = min_alloc_size_order.load();
+
     if (o->onode.expected_write_size) {
-      wctx.csum_order = std::max(order,
-                                (uint8_t)ctz(o->onode.expected_write_size));
+      wctx->csum_order = std::max(min_alloc_size_order,
+                                 (uint8_t)ctz(o->onode.expected_write_size));
     } else {
-      wctx.csum_order = order;
+      wctx->csum_order = min_alloc_size_order;
     }
 
-    if (wctx.compress) {
-      wctx.target_blob_size = select_option(
+    if (wctx->compress) {
+      wctx->target_blob_size = select_option(
         "compression_max_blob_size",
-        comp_max_blob_size.load(), 
+        comp_max_blob_size.load(),
         [&]() {
           int val;
           if(c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &val)) {
@@ -9789,10 +10369,10 @@ int BlueStore::_do_write(
       );
     }
   } else {
-    if (wctx.compress) {
-      wctx.target_blob_size = select_option(
+    if (wctx->compress) {
+      wctx->target_blob_size = select_option(
         "compression_min_blob_size",
-        comp_min_blob_size.load(), 
+        comp_min_blob_size.load(),
         [&]() {
           int val;
           if(c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &val)) {
@@ -9803,25 +10383,107 @@ int BlueStore::_do_write(
       );
     }
   }
+
   uint64_t max_bsize = max_blob_size.load();
-  if (wctx.target_blob_size == 0 || wctx.target_blob_size > max_bsize) {
-    wctx.target_blob_size = max_bsize;
+  if (wctx->target_blob_size == 0 || wctx->target_blob_size > max_bsize) {
+    wctx->target_blob_size = max_bsize;
   }
+
   // set the min blob size floor at 2x the min_alloc_size, or else we
   // won't be able to allocate a smaller extent for the compressed
   // data.
-  if (wctx.compress &&
-      wctx.target_blob_size < min_alloc_size * 2) {
-    wctx.target_blob_size = min_alloc_size * 2;
+  if (wctx->compress &&
+      wctx->target_blob_size < min_alloc_size * 2) {
+    wctx->target_blob_size = min_alloc_size * 2;
   }
+
+  dout(20) << __func__ << " prefer csum_order " << wctx->csum_order
+           << " target_blob_size 0x" << std::hex << wctx->target_blob_size
+           << std::dec << dendl;
+}
+
+int BlueStore::_do_gc(
+  TransContext *txc,
+  CollectionRef& c,
+  OnodeRef o,
+  const GarbageCollector& gc,
+  const WriteContext& wctx,
+  uint64_t *dirty_start,
+  uint64_t *dirty_end)
+{
+  auto& extents_to_collect = gc.get_extents_to_collect();
+
+  WriteContext wctx_gc;
   wctx_gc.fork(wctx); // make a clone for garbage collection
-  dout(20) << __func__ << " prefer csum_order " << wctx.csum_order
-          << " target_blob_size 0x" << std::hex << wctx.target_blob_size
-          << std::dec << dendl;
 
+  for (auto it = extents_to_collect.begin();
+       it != extents_to_collect.end();
+       ++it) {
+    bufferlist bl;
+    int r = _do_read(c.get(), o, it->offset, it->length, bl, 0);
+    assert(r == (int)it->length);
+
+    o->extent_map.fault_range(db, it->offset, it->length);
+    _do_write_data(txc, c, o, it->offset, it->length, bl, &wctx_gc);
+    logger->inc(l_bluestore_gc_merged, it->length);
+
+    if (*dirty_start > it->offset) {
+      *dirty_start = it->offset;
+    }
+
+    if (*dirty_end < it->offset + it->length) {
+      *dirty_end = it->offset + it->length;
+    }
+  }
+
+  dout(30) << __func__ << " alloc write" << dendl;
+  int r = _do_alloc_write(txc, c, o, &wctx_gc);
+  if (r < 0) {
+    derr << __func__ << " _do_alloc_write failed with " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
+
+  _wctx_finish(txc, c, o, &wctx_gc);
+  return 0;
+}
+
+int BlueStore::_do_write(
+  TransContext *txc,
+  CollectionRef& c,
+  OnodeRef o,
+  uint64_t offset,
+  uint64_t length,
+  bufferlist& bl,
+  uint32_t fadvise_flags)
+{
+  int r = 0;
+
+  dout(20) << __func__
+          << " " << o->oid
+          << " 0x" << std::hex << offset << "~" << length
+          << " - have 0x" << o->onode.size
+          << " (" << std::dec << o->onode.size << ")"
+          << " bytes"
+          << " fadvise_flags 0x" << std::hex << fadvise_flags << std::dec
+          << dendl;
+  _dump_onode(o);
+
+  if (length == 0) {
+    return 0;
+  }
+
+  uint64_t end = offset + length;
+
+  GarbageCollector gc(c->store->cct);
+  int64_t benefit;
+  auto dirty_start = offset;
+  auto dirty_end = end;
+
+  WriteContext wctx;
+  _choose_write_options(c, o, fadvise_flags, &wctx);
   o->extent_map.fault_range(db, offset, length);
   _do_write_data(txc, c, o, offset, length, bl, &wctx);
-
   r = _do_alloc_write(txc, c, o, &wctx);
   if (r < 0) {
     derr << __func__ << " _do_alloc_write failed with " << cpp_strerror(r)
@@ -9829,54 +10491,37 @@ int BlueStore::_do_write(
     goto out;
   }
 
+  // NB: _wctx_finish() will empty old_extents
+  // so we must do gc estimation before that
   benefit = gc.estimate(offset,
-                                length,
-                               o->extent_map,
-                               wctx.old_extents,
-                               min_alloc_size);
+                        length,
+                       o->extent_map,
+                       wctx.old_extents,
+                       min_alloc_size);
 
   _wctx_finish(txc, c, o, &wctx);
   if (end > o->onode.size) {
     dout(20) << __func__ << " extending size to 0x" << std::hex << end
-            << std::dec << dendl;
+             << std::dec << dendl;
     o->onode.size = end;
   }
 
   if (benefit >= g_conf->bluestore_gc_enable_total_threshold) {
-    dout(20)  << __func__ << " perform garbage collection, expected benefit = "
-              << benefit << " AUs" << dendl;
-    auto& extents_to_collect = gc.get_extents_to_collect();
-    for (auto it = extents_to_collect.begin();
-         it != extents_to_collect.end();
-         ++it) {
-      bufferlist bl;
-      int r = _do_read(c.get(), o, it->offset, it->length, bl, 0);
-      assert(r == (int)it->length);
-      o->extent_map.fault_range(db, it->offset, it->length);
-      _do_write_data(txc, c, o, it->offset, it->length, bl, &wctx_gc);
-      logger->inc(l_bluestore_gc_merged, it->length);
-      was_gc = true;
-      if (dirty_start > it->offset) {
-        dirty_start = it->offset;
-      }
-      if (dirty_end < it->offset + it->length) {
-        dirty_end = it->offset + it->length;
+    if (!gc.get_extents_to_collect().empty()) {
+      dout(20) << __func__ << " perform garbage collection, "
+               << "expected benefit = " << benefit << " AUs" << dendl;
+      r = _do_gc(txc, c, o, gc, wctx, &dirty_start, &dirty_end);
+      if (r < 0) {
+        derr << __func__ << " _do_gc failed with " << cpp_strerror(r)
+             << dendl;
+        goto out;
       }
     }
   }
-  if (was_gc) {
-    dout(30)  << __func__ << " alloc write for GC" << dendl;
-    r = _do_alloc_write(txc, c, o, &wctx_gc);
-    if (r < 0) {
-      derr << __func__ << " _do_alloc_write(gc) failed with " << cpp_strerror(r)
-          << dendl;
-      goto out;
-    }
-    _wctx_finish(txc, c, o, &wctx_gc);
-  }
 
   o->extent_map.compress_extent_map(dirty_start, dirty_end - dirty_start);
-  o->extent_map.dirty_range(txc->t, dirty_start, dirty_end - dirty_start);
+  o->extent_map.dirty_range(dirty_start, dirty_end - dirty_start);
+
   r = 0;
 
  out:
@@ -9886,18 +10531,21 @@ int BlueStore::_do_write(
 int BlueStore::_write(TransContext *txc,
                      CollectionRef& c,
                      OnodeRef& o,
-                    uint64_t offset, size_t length,
-                    bufferlist& bl,
-                    uint32_t fadvise_flags)
+                     uint64_t offset, size_t length,
+                     bufferlist& bl,
+                     uint32_t fadvise_flags)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << dendl;
-  o->exists = true;
-  _assign_nid(txc, o);
-  int r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
-  txc->write_onode(o);
-
+  int r = 0;
+  if (offset + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _assign_nid(txc, o);
+    r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
+    txc->write_onode(o);
+  }
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << " = " << r << dendl;
@@ -9912,9 +10560,13 @@ int BlueStore::_zero(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << dendl;
-  o->exists = true;
-  _assign_nid(txc, o);
-  int r = _do_zero(txc, c, o, offset, length);
+  int r = 0;
+  if (offset + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _assign_nid(txc, o);
+    r = _do_zero(txc, c, o, offset, length);
+  }
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << " = " << r << dendl;
@@ -9936,10 +10588,10 @@ int BlueStore::_do_zero(TransContext *txc,
   WriteContext wctx;
   o->extent_map.fault_range(db, offset, length);
   o->extent_map.punch_hole(c, offset, length, &wctx.old_extents);
-  o->extent_map.dirty_range(txc->t, offset, length);
+  o->extent_map.dirty_range(offset, length);
   _wctx_finish(txc, c, o, &wctx);
 
-  if (offset + length > o->onode.size) {
+  if (length > 0 && offset + length > o->onode.size) {
     o->onode.size = offset + length;
     dout(20) << __func__ << " extending size to " << offset + length
             << dendl;
@@ -9953,7 +10605,8 @@ int BlueStore::_do_zero(TransContext *txc,
 }
 
 void BlueStore::_do_truncate(
-  TransContext *txc, CollectionRef& c, OnodeRef o, uint64_t offset)
+  TransContext *txc, CollectionRef& c, OnodeRef o, uint64_t offset,
+  set<SharedBlob*> *maybe_unshared_blobs)
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << std::dec << dendl;
@@ -9961,15 +10614,15 @@ void BlueStore::_do_truncate(
   _dump_onode(o, 30);
 
   if (offset == o->onode.size)
-    return ;
+    return;
 
   if (offset < o->onode.size) {
     WriteContext wctx;
     uint64_t length = o->onode.size - offset;
     o->extent_map.fault_range(db, offset, length);
     o->extent_map.punch_hole(c, offset, length, &wctx.old_extents);
-    o->extent_map.dirty_range(txc->t, offset, length);
-    _wctx_finish(txc, c, o, &wctx);
+    o->extent_map.dirty_range(offset, length);
+    _wctx_finish(txc, c, o, &wctx, maybe_unshared_blobs);
 
     // if we have shards past EOF, ask for a reshard
     if (!o->onode.extent_map_shards.empty() &&
@@ -9988,7 +10641,7 @@ void BlueStore::_do_truncate(
   txc->write_onode(o);
 }
 
-void BlueStore::_truncate(TransContext *txc,
+int BlueStore::_truncate(TransContext *txc,
                         CollectionRef& c,
                         OnodeRef& o,
                         uint64_t offset)
@@ -9996,7 +10649,16 @@ void BlueStore::_truncate(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << std::dec
           << dendl;
-  _do_truncate(txc, c, o, offset);
+  int r = 0;
+  if (offset >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _do_truncate(txc, c, o, offset);
+  }
+  dout(10) << __func__ << " " << c->cid << " " << o->oid
+          << " 0x" << std::hex << offset << std::dec
+          << " = " << r << dendl;
+  return r;
 }
 
 int BlueStore::_do_remove(
@@ -10004,7 +10666,9 @@ int BlueStore::_do_remove(
   CollectionRef& c,
   OnodeRef o)
 {
-  _do_truncate(txc, c, o, 0);
+  set<SharedBlob*> maybe_unshared_blobs;
+  bool is_gen = !o->oid.is_no_gen();
+  _do_truncate(txc, c, o, 0, is_gen ? &maybe_unshared_blobs : nullptr);
   if (o->onode.has_omap()) {
     o->flush();
     _do_omap_clear(txc, o->onode.nid);
@@ -10025,6 +10689,76 @@ int BlueStore::_do_remove(
   o->extent_map.clear();
   o->onode = bluestore_onode_t();
   _debug_obj_on_delete(o->oid);
+
+  if (!is_gen || maybe_unshared_blobs.empty()) {
+    return 0;
+  }
+
+  // see if we can unshare blobs still referenced by the head
+  dout(10) << __func__ << " gen and maybe_unshared_blobs "
+          << maybe_unshared_blobs << dendl;
+  ghobject_t nogen = o->oid;
+  nogen.generation = ghobject_t::NO_GEN;
+  OnodeRef h = c->onode_map.lookup(nogen);
+
+  if (!h || !h->exists) {
+    return 0;
+  }
+
+  dout(20) << __func__ << " checking for unshareable blobs on " << h
+          << " " << h->oid << dendl;
+  map<SharedBlob*,bluestore_extent_ref_map_t> expect;
+  for (auto& e : h->extent_map.extent_map) {
+    const bluestore_blob_t& b = e.blob->get_blob();
+    SharedBlob *sb = e.blob->shared_blob.get();
+    if (b.is_shared() &&
+       sb->loaded &&
+       maybe_unshared_blobs.count(sb)) {
+      if (b.is_compressed()) {
+       expect[sb].get(0, b.get_ondisk_length());
+      } else {
+       b.map(e.blob_offset, e.length, [&](uint64_t off, uint64_t len) {
+           expect[sb].get(off, len);
+           return 0;
+         });
+      }
+    }
+  }
+
+  vector<SharedBlob*> unshared_blobs;
+  unshared_blobs.reserve(maybe_unshared_blobs.size());
+  for (auto& p : expect) {
+    dout(20) << " ? " << *p.first << " vs " << p.second << dendl;
+    if (p.first->persistent->ref_map == p.second) {
+      SharedBlob *sb = p.first;
+      dout(20) << __func__ << "  unsharing " << *sb << dendl;
+      unshared_blobs.push_back(sb);
+      txc->unshare_blob(sb);
+      uint64_t sbid = c->make_blob_unshared(sb);
+      string key;
+      get_shared_blob_key(sbid, &key);
+      txc->t->rmkey(PREFIX_SHARED_BLOB, key);
+    }
+  }
+
+  if (unshared_blobs.empty()) {
+    return 0;
+  }
+
+  for (auto& e : h->extent_map.extent_map) {
+    const bluestore_blob_t& b = e.blob->get_blob();
+    SharedBlob *sb = e.blob->shared_blob.get();
+    if (b.is_shared() &&
+        std::find(unshared_blobs.begin(), unshared_blobs.end(),
+                  sb) != unshared_blobs.end()) {
+      dout(20) << __func__ << "  unsharing " << e << dendl;
+      bluestore_blob_t& blob = e.blob->dirty_blob();
+      blob.clear_flag(bluestore_blob_t::FLAG_SHARED);
+      h->extent_map.dirty_range(e.logical_offset, 1);
+    }
+  }
+  txc->write_onode(h);
+
   return 0;
 }
 
@@ -10048,10 +10782,14 @@ int BlueStore::_setattr(TransContext *txc,
           << " " << name << " (" << val.length() << " bytes)"
           << dendl;
   int r = 0;
-  if (val.is_partial())
-    o->onode.attrs[name.c_str()] = bufferptr(val.c_str(), val.length());
-  else
-    o->onode.attrs[name.c_str()] = val;
+  if (val.is_partial()) {
+    auto& b = o->onode.attrs[name.c_str()] = bufferptr(val.c_str(),
+                                                      val.length());
+    b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+  } else {
+    auto& b = o->onode.attrs[name.c_str()] = val;
+    b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+  }
   txc->write_onode(o);
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " " << name << " (" << val.length() << " bytes)"
@@ -10070,11 +10808,14 @@ int BlueStore::_setattrs(TransContext *txc,
   int r = 0;
   for (map<string,bufferptr>::const_iterator p = aset.begin();
        p != aset.end(); ++p) {
-    if (p->second.is_partial())
-      o->onode.attrs[p->first.c_str()] =
+    if (p->second.is_partial()) {
+      auto& b = o->onode.attrs[p->first.c_str()] =
        bufferptr(p->second.c_str(), p->second.length());
-    else
-      o->onode.attrs[p->first.c_str()] = p->second;
+      b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+    } else {
+      auto& b = o->onode.attrs[p->first.c_str()] = p->second;
+      b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+    }
   }
   txc->write_onode(o);
   dout(10) << __func__ << " " << c->cid << " " << o->oid
@@ -10321,7 +11062,6 @@ int BlueStore::_clone(TransContext *txc,
     return -EINVAL;
   }
 
-  newo->exists = true;
   _assign_nid(txc, newo);
 
   // clone data
@@ -10389,7 +11129,9 @@ int BlueStore::_do_clone_range(
   CollectionRef& c,
   OnodeRef& oldo,
   OnodeRef& newo,
-  uint64_t srcoff, uint64_t length, uint64_t dstoff)
+  uint64_t srcoff,
+  uint64_t length,
+  uint64_t dstoff)
 {
   dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
           << newo->oid
@@ -10406,8 +11148,10 @@ int BlueStore::_do_clone_range(
     e.blob->last_encoded_id = -1;
   }
   int n = 0;
-  bool dirtied_oldo = false;
   uint64_t end = srcoff + length;
+  uint32_t dirty_range_begin = 0;
+  uint32_t dirty_range_end = 0;
+  bool src_dirty = false;
   for (auto ep = oldo->extent_map.seek_lextent(srcoff);
        ep != oldo->extent_map.extent_map.end();
        ++ep) {
@@ -10428,7 +11172,13 @@ int BlueStore::_do_clone_range(
       // make sure it is shared
       if (!blob.is_shared()) {
        c->make_blob_shared(_assign_blobid(txc), e.blob);
-       dirtied_oldo = true;  // fixme: overkill
+       if (!src_dirty) {
+         src_dirty = true;
+          dirty_range_begin = e.logical_offset;
+        }
+        assert(e.logical_end() > 0);
+        // -1 to exclude next potential shard
+        dirty_range_end = e.logical_end() - 1;
       } else {
        c->load_shared_blob(e.blob->shared_blob);
       }
@@ -10475,8 +11225,9 @@ int BlueStore::_do_clone_range(
     dout(20) << __func__ << "  dst " << *ne << dendl;
     ++n;
   }
-  if (dirtied_oldo) {
-    oldo->extent_map.dirty_range(txc->t, srcoff, length); // overkill
+  if (src_dirty) {
+    oldo->extent_map.dirty_range(dirty_range_begin,
+      dirty_range_end - dirty_range_begin);
     txc->write_onode(oldo);
   }
   txc->write_onode(newo);
@@ -10484,7 +11235,7 @@ int BlueStore::_do_clone_range(
   if (dstoff + length > newo->onode.size) {
     newo->onode.size = dstoff + length;
   }
-  newo->extent_map.dirty_range(txc->t, dstoff, length);
+  newo->extent_map.dirty_range(dstoff, length);
   _dump_onode(oldo);
   _dump_onode(newo);
   return 0;
@@ -10501,12 +11252,16 @@ int BlueStore::_clone_range(TransContext *txc,
           << " to offset 0x" << dstoff << std::dec << dendl;
   int r = 0;
 
+  if (srcoff + length >= OBJECT_MAX_SIZE ||
+      dstoff + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+    goto out;
+  }
   if (srcoff + length > oldo->onode.size) {
     r = -EINVAL;
     goto out;
   }
 
-  newo->exists = true;
   _assign_nid(txc, newo);
 
   if (length > 0) {
@@ -10545,7 +11300,7 @@ int BlueStore::_rename(TransContext *txc,
           << new_oid << dendl;
   int r;
   ghobject_t old_oid = oldo->oid;
-  mempool::bluestore_meta_other::string new_okey;
+  mempool::bluestore_cache_other::string new_okey;
 
   if (newo) {
     if (newo->exists) {
@@ -10906,30 +11661,46 @@ void BlueStore::generate_db_histogram(Formatter *f)
 
 }
 
-void BlueStore::flush_cache()
+void BlueStore::_flush_cache()
 {
   dout(10) << __func__ << dendl;
   for (auto i : cache_shards) {
     i->trim_all();
+    assert(i->empty());
   }
   for (auto& p : coll_map) {
+    if (!p.second->onode_map.empty()) {
+      derr << __func__ << "stray onodes on " << p.first << dendl;
+      p.second->onode_map.dump(cct, 0);
+    }
+    if (!p.second->shared_blob_set.empty()) {
+      derr << __func__ << " stray shared blobs on " << p.first << dendl;
+      p.second->shared_blob_set.dump(cct, 0);
+    }
     assert(p.second->onode_map.empty());
     assert(p.second->shared_blob_set.empty());
   }
   coll_map.clear();
 }
 
+// For external caller.
+// We use a best-effort policy instead, e.g.,
+// we don't care if there are still some pinned onodes/data in the cache
+// after this command is completed.
+void BlueStore::flush_cache()
+{
+  dout(10) << __func__ << dendl;
+  for (auto i : cache_shards) {
+    i->trim_all();
+  }
+}
+
 void BlueStore::_apply_padding(uint64_t head_pad,
                               uint64_t tail_pad,
-                              bufferlist& bl,
                               bufferlist& padded)
 {
-  padded = bl;
   if (head_pad) {
-    bufferlist z;
-    z.append_zero(head_pad);
-    z.claim_append(padded);
-    padded.claim(z);
+    padded.prepend_zero(head_pad);
   }
   if (tail_pad) {
     padded.append_zero(tail_pad);