]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/BlueStore.cc
update sources to v12.2.0
[ceph.git] / ceph / src / os / bluestore / BlueStore.cc
index 38d1e272ebc5a30fac5409213e83c922aa1b6088..b87d0302b306a9fc5526e001391e0f67cf9f2b4d 100644 (file)
@@ -134,36 +134,68 @@ const string PREFIX_SHARED_BLOB = "X"; // u64 offset -> shared_blob_t
 template<typename S>
 static void append_escaped(const string &in, S *out)
 {
-  char hexbyte[8];
+  char hexbyte[in.length() * 3 + 1];
+  char* ptr = &hexbyte[0];
   for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
     if (*i <= '#') {
-      snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
-      out->append(hexbyte);
+      *ptr++ = '#';
+      *ptr++ = "0123456789abcdef"[(*i >> 4) & 0x0f];
+      *ptr++ = "0123456789abcdef"[*i & 0x0f];
     } else if (*i >= '~') {
-      snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
-      out->append(hexbyte);
+      *ptr++ = '~';
+      *ptr++ = "0123456789abcdef"[(*i >> 4) & 0x0f];
+      *ptr++ = "0123456789abcdef"[*i & 0x0f];
     } else {
-      out->push_back(*i);
+      *ptr++  = *i;
     }
   }
-  out->push_back('!');
+  *ptr++ = '!';
+  out->append(hexbyte, ptr - &hexbyte[0]);
+}
+
+inline unsigned h2i(char c)
+{
+  if ((c >= '0') && (c <= '9')) {
+    return c - 0x30;
+  } else if ((c >= 'a') && (c <= 'f')) {
+    return c - 'a' + 10;
+  } else if ((c >= 'A') && (c <= 'F')) {
+    return c - 'A' + 10;
+  } else {
+    return 256; // make it always larger than 255
+  }
 }
 
 static int decode_escaped(const char *p, string *out)
 {
+  char buff[256];
+  char* ptr = &buff[0];
+  char* max = &buff[252];
   const char *orig_p = p;
   while (*p && *p != '!') {
     if (*p == '#' || *p == '~') {
-      unsigned hex;
-      int r = sscanf(++p, "%2x", &hex);
-      if (r < 1)
-       return -EINVAL;
-      out->push_back((char)hex);
-      p += 2;
+      unsigned hex = 0;
+      p++;
+      hex = h2i(*p++) << 4;
+      if (hex > 255) {
+        return -EINVAL;
+      }
+      hex |= h2i(*p++);
+      if (hex > 255) {
+        return -EINVAL;
+      }
+      *ptr++ = hex;
     } else {
-      out->push_back(*p++);
+      *ptr++ = *p++;
+    }
+    if (ptr > max) {
+       out->append(buff, ptr-buff);
+       ptr = &buff[0];
     }
   }
+  if (ptr != buff) {
+     out->append(buff, ptr-buff);
+  }
   return p - orig_p;
 }
 
@@ -285,7 +317,7 @@ static int get_key_shared_blob(const string& key, uint64_t *sbid)
   const char *p = key.c_str();
   if (key.length() < sizeof(uint64_t))
     return -1;
-  p = _key_decode_u64(p, sbid);
+  _key_decode_u64(p, sbid);
   return 0;
 }
 
@@ -452,7 +484,7 @@ int get_key_extent_shard(const string& key, string *onode_key, uint32_t *offset)
   int okey_len = key.size() - sizeof(uint32_t) - 1;
   *onode_key = key.substr(0, okey_len);
   const char *p = key.data() + okey_len;
-  p = _key_decode_u32(p, offset);
+  _key_decode_u32(p, offset);
   return 0;
 }
 
@@ -1323,55 +1355,59 @@ int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t len
 
 void BlueStore::BufferSpace::read(
   Cache* cache, 
-  uint32_t offset, uint32_t length,
+  uint32_t offset,
+  uint32_t length,
   BlueStore::ready_regions_t& res,
   interval_set<uint32_t>& res_intervals)
 {
-  std::lock_guard<std::recursive_mutex> l(cache->lock);
   res.clear();
   res_intervals.clear();
   uint32_t want_bytes = length;
   uint32_t end = offset + length;
-  for (auto i = _data_lower_bound(offset);
-       i != buffer_map.end() && offset < end && i->first < end;
-       ++i) {
-    Buffer *b = i->second.get();
-    assert(b->end() > offset);
-    if (b->is_writing() || b->is_clean()) {
-      if (b->offset < offset) {
-       uint32_t skip = offset - b->offset;
-       uint32_t l = MIN(length, b->length - skip);
-       res[offset].substr_of(b->data, skip, l);
-       res_intervals.insert(offset, l);
-       offset += l;
-       length -= l;
-       if (!b->is_writing()) {
+
+  {
+    std::lock_guard<std::recursive_mutex> l(cache->lock);
+    for (auto i = _data_lower_bound(offset);
+         i != buffer_map.end() && offset < end && i->first < end;
+         ++i) {
+      Buffer *b = i->second.get();
+      assert(b->end() > offset);
+      if (b->is_writing() || b->is_clean()) {
+        if (b->offset < offset) {
+         uint32_t skip = offset - b->offset;
+         uint32_t l = MIN(length, b->length - skip);
+         res[offset].substr_of(b->data, skip, l);
+         res_intervals.insert(offset, l);
+         offset += l;
+         length -= l;
+         if (!b->is_writing()) {
+           cache->_touch_buffer(b);
+         }
+         continue;
+        }
+        if (b->offset > offset) {
+         uint32_t gap = b->offset - offset;
+         if (length <= gap) {
+           break;
+         }
+         offset += gap;
+         length -= gap;
+        }
+        if (!b->is_writing()) {
          cache->_touch_buffer(b);
-       }
-       continue;
-      }
-      if (b->offset > offset) {
-       uint32_t gap = b->offset - offset;
-       if (length <= gap) {
-         break;
-       }
-       offset += gap;
-       length -= gap;
-      }
-      if (!b->is_writing()) {
-       cache->_touch_buffer(b);
-      }
-      if (b->length > length) {
-       res[offset].substr_of(b->data, 0, length);
-       res_intervals.insert(offset, length);
-        break;
-      } else {
-       res[offset].append(b->data);
-       res_intervals.insert(offset, b->length);
-        if (b->length == length)
+        }
+        if (b->length > length) {
+         res[offset].substr_of(b->data, 0, length);
+         res_intervals.insert(offset, length);
           break;
-       offset += b->length;
-       length -= b->length;
+        } else {
+         res[offset].append(b->data);
+         res_intervals.insert(offset, b->length);
+          if (b->length == length)
+            break;
+         offset += b->length;
+         length -= b->length;
+        }
       }
     }
   }
@@ -1490,19 +1526,30 @@ BlueStore::OnodeRef BlueStore::OnodeSpace::add(const ghobject_t& oid, OnodeRef o
 
 BlueStore::OnodeRef BlueStore::OnodeSpace::lookup(const ghobject_t& oid)
 {
-  std::lock_guard<std::recursive_mutex> l(cache->lock);
   ldout(cache->cct, 30) << __func__ << dendl;
-  ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
-  if (p == onode_map.end()) {
-    ldout(cache->cct, 30) << __func__ << " " << oid << " miss" << dendl;
+  OnodeRef o;
+  bool hit = false;
+
+  {
+    std::lock_guard<std::recursive_mutex> l(cache->lock);
+    ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
+    if (p == onode_map.end()) {
+      ldout(cache->cct, 30) << __func__ << " " << oid << " miss" << dendl;
+    } else {
+      ldout(cache->cct, 30) << __func__ << " " << oid << " hit " << p->second
+                           << dendl;
+      cache->_touch_onode(p->second);
+      hit = true;
+      o = p->second;
+    }
+  }
+
+  if (hit) {
+    cache->logger->inc(l_bluestore_onode_hits);
+  } else {
     cache->logger->inc(l_bluestore_onode_misses);
-    return OnodeRef();
   }
-  ldout(cache->cct, 30) << __func__ << " " << oid << " hit " << p->second
-                       << dendl;
-  cache->_touch_onode(p->second);
-  cache->logger->inc(l_bluestore_onode_hits);
-  return p->second;
+  return o;
 }
 
 void BlueStore::OnodeSpace::clear()
@@ -1656,21 +1703,25 @@ ostream& operator<<(ostream& out, const BlueStore::Blob& b)
   if (b.is_spanning()) {
     out << " spanning " << b.id;
   }
-  out << " " << b.get_blob() << " " << b.get_blob_use_tracker()
-      << " " << *b.shared_blob
-      << ")";
+  out << " " << b.get_blob() << " " << b.get_blob_use_tracker();
+  if (b.shared_blob) {
+    out << " " << *b.shared_blob;
+  } else {
+    out << " (shared_blob=NULL)";
+  }
+  out << ")";
   return out;
 }
 
 void BlueStore::Blob::discard_unallocated(Collection *coll)
 {
-  if (blob.is_shared()) {
+  if (get_blob().is_shared()) {
     return;
   }
-  if (blob.is_compressed()) {
+  if (get_blob().is_compressed()) {
     bool discard = false;
     bool all_invalid = true;
-    for (auto e : blob.get_extents()) {
+    for (auto e : get_blob().get_extents()) {
       if (!e.is_valid()) {
         discard = true;
       } else {
@@ -1680,11 +1731,12 @@ void BlueStore::Blob::discard_unallocated(Collection *coll)
     assert(discard == all_invalid); // in case of compressed blob all
                                    // or none pextents are invalid.
     if (discard) {
-      shared_blob->bc.discard(shared_blob->get_cache(), 0, blob.get_logical_length());
+      shared_blob->bc.discard(shared_blob->get_cache(), 0,
+                              get_blob().get_logical_length());
     }
   } else {
     size_t pos = 0;
-    for (auto e : blob.get_extents()) {
+    for (auto e : get_blob().get_extents()) {
       if (!e.is_valid()) {
        ldout(coll->store->cct, 20) << __func__ << " 0x" << std::hex << pos
                                    << "~" << e.length
@@ -1693,12 +1745,11 @@ void BlueStore::Blob::discard_unallocated(Collection *coll)
       }
       pos += e.length;
     }
-    if (blob.can_prune_tail()) {
-      dirty_blob();
-      blob.prune_tail();
-      used_in_blob.prune_tail(blob.get_ondisk_length());
+    if (get_blob().can_prune_tail()) {
+      dirty_blob().prune_tail();
+      used_in_blob.prune_tail(get_blob().get_ondisk_length());
       auto cct = coll->store->cct; //used by dout
-      dout(20) << __func__ << " pruned tail, now " << blob << dendl;
+      dout(20) << __func__ << " pruned tail, now " << get_blob() << dendl;
     }
   }
 }
@@ -1719,10 +1770,10 @@ void BlueStore::Blob::get_ref(
 
   if (used_in_blob.is_empty()) {
     uint32_t min_release_size =
-      blob.get_release_size(coll->store->min_alloc_size);
-    uint64_t l = blob.get_logical_length();
-    dout(20) << __func__ << " init 0x" << std::hex << l << ", " << min_release_size
-             << std::dec << dendl;
+      get_blob().get_release_size(coll->store->min_alloc_size);
+    uint64_t l = get_blob().get_logical_length();
+    dout(20) << __func__ << " init 0x" << std::hex << l << ", "
+             << min_release_size << std::dec << dendl;
     used_in_blob.init(l, min_release_size);
   }
   used_in_blob.get(
@@ -1756,7 +1807,7 @@ bool BlueStore::Blob::put_ref(
   return b.release_extents(empty, logical, r);
 }
 
-bool BlueStore::Blob::try_reuse_blob(uint32_t min_alloc_size,
+bool BlueStore::Blob::can_reuse_blob(uint32_t min_alloc_size,
                                     uint32_t target_blob_size,
                                     uint32_t b_offset,
                                     uint32_t *length0) {
@@ -1784,17 +1835,25 @@ bool BlueStore::Blob::try_reuse_blob(uint32_t min_alloc_size,
   target_blob_size = MAX(blen, target_blob_size);
 
   if (b_offset >= blen) {
-    //new data totally stands out of the existing blob
-    new_blen = b_offset + length;
+    // new data totally stands out of the existing blob
+    new_blen = end;
   } else {
-    //new data overlaps with the existing blob
-    new_blen = MAX(blen, length + b_offset);
-    if (!get_blob().is_unallocated(
-      b_offset,
-      new_blen > blen ? blen - b_offset : length)) {
-          return false;
+    // new data overlaps with the existing blob
+    new_blen = MAX(blen, end);
+
+    uint32_t overlap = 0;
+    if (new_blen > blen) {
+      overlap = blen - b_offset;
+    } else {
+      overlap = length;
+    }
+
+    if (!get_blob().is_unallocated(b_offset, overlap)) {
+      // abort if any piece of the overlap has already been allocated
+      return false;
     }
   }
+
   if (new_blen > blen) {
     int64_t overflow = int64_t(new_blen) - target_blob_size;
     // Unable to decrease the provided length to fit into max_blob_size
@@ -1812,10 +1871,11 @@ bool BlueStore::Blob::try_reuse_blob(uint32_t min_alloc_size,
       length -= overflow;
       *length0 = length;
     }
+
     if (new_blen > blen) {
       dirty_blob().add_tail(new_blen);
       used_in_blob.add_tail(new_blen,
-                           blob.get_release_size(min_alloc_size));
+                            get_blob().get_release_size(min_alloc_size));
     }
   }
   return true;
@@ -2905,9 +2965,9 @@ bool BlueStore::WriteContext::has_conflict(
   for (auto w : writes) {
     if (b == w.b) {
       auto loffs2 = P2ALIGN(w.logical_offset, min_alloc_size);
-      auto loffs2_end = ROUND_UP_TO( w.logical_offset + w.length0, min_alloc_size);
+      auto loffs2_end = P2ROUNDUP(w.logical_offset + w.length0, min_alloc_size);
       if ((loffs <= loffs2 && loffs_end > loffs2) ||
-          (loffs >= loffs2 &&  loffs < loffs2_end)) {
+          (loffs >= loffs2 && loffs < loffs2_end)) {
         return true;
       }
     }
@@ -2959,6 +3019,7 @@ void BlueStore::DeferredBatch::_discard(
               << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
               << " -> 0x" << head.length() << std::dec << dendl;
       auto i = seq_bytes.find(p->second.seq);
+      assert(i != seq_bytes.end());
       if (end > offset + length) {
        bufferlist tail;
        tail.substr_of(p->second.bl, offset + length - p->first,
@@ -2973,6 +3034,7 @@ void BlueStore::DeferredBatch::_discard(
       } else {
        i->second -= end - offset;
       }
+      assert(i->second >= 0);
       p->second.bl.swap(head);
     }
     ++p;
@@ -2982,6 +3044,7 @@ void BlueStore::DeferredBatch::_discard(
       break;
     }
     auto i = seq_bytes.find(p->second.seq);
+    assert(i != seq_bytes.end());
     auto end = p->first + p->second.bl.length();
     if (end > offset + length) {
       unsigned drop_front = offset + length - p->first;
@@ -3001,6 +3064,7 @@ void BlueStore::DeferredBatch::_discard(
               << std::dec << dendl;
       i->second -= p->second.bl.length();
     }
+    assert(i->second >= 0);
     p = iomap.erase(p);
   }
 }
@@ -3269,7 +3333,7 @@ void *BlueStore::MempoolThread::entry()
     size_t num_shards = store->cache_shards.size();
     float target_ratio = store->cache_meta_ratio + store->cache_data_ratio;
     // A little sloppy but should be close enough
-    uint64_t shard_target = target_ratio * (store->cct->_conf->bluestore_cache_size / num_shards);
+    uint64_t shard_target = target_ratio * (store->cache_size / num_shards);
 
     for (auto i : store->cache_shards) {
       i->trim(shard_target,
@@ -3437,17 +3501,6 @@ BlueStore::BlueStore(CephContext *cct,
   _init_logger();
   cct->_conf->add_observer(this);
   set_cache_shards(1);
-
-  if (cct->_conf->bluestore_shard_finishers) {
-    m_finisher_num = cct->_conf->osd_op_num_shards;
-  }
-
-  for (int i = 0; i < m_finisher_num; ++i) {
-    ostringstream oss;
-    oss << "finisher-" << i;
-    Finisher *f = new Finisher(cct, oss.str(), "finisher");
-    finishers.push_back(f);
-  }
 }
 
 BlueStore::~BlueStore()
@@ -3482,6 +3535,7 @@ const char **BlueStore::get_tracked_conf_keys() const
     "bluestore_compression_max_blob_size",
     "bluestore_compression_max_blob_size_ssd",
     "bluestore_compression_max_blob_size_hdd",
+    "bluestore_compression_required_ratio",
     "bluestore_max_alloc_size",
     "bluestore_prefer_deferred_size",
     "bluestore_deferred_batch_ops",
@@ -3552,6 +3606,25 @@ void BlueStore::handle_conf_change(const struct md_config_t *conf,
 
 void BlueStore::_set_compression()
 {
+  auto m = Compressor::get_comp_mode_type(cct->_conf->bluestore_compression_mode);
+  if (m) {
+    comp_mode = *m;
+  } else {
+    derr << __func__ << " unrecognized value '"
+         << cct->_conf->bluestore_compression_mode
+         << "' for bluestore_compression_mode, reverting to 'none'"
+         << dendl;
+    comp_mode = Compressor::COMP_NONE;
+  }
+
+  compressor = nullptr;
+
+  if (comp_mode == Compressor::COMP_NONE) {
+    dout(10) << __func__ << " compression mode set to 'none', "
+             << "ignore other compression setttings" << dendl;
+    return;
+  }
+
   if (cct->_conf->bluestore_compression_max_blob_size) {
     comp_min_blob_size = cct->_conf->bluestore_compression_max_blob_size;
   } else {
@@ -3574,19 +3647,6 @@ void BlueStore::_set_compression()
     }
   }
 
-  auto m = Compressor::get_comp_mode_type(cct->_conf->bluestore_compression_mode);
-  if (m) {
-    comp_mode = *m;
-  } else {
-    derr << __func__ << " unrecognized value '"
-         << cct->_conf->bluestore_compression_mode
-         << "' for bluestore_compression_mode, reverting to 'none'"
-         << dendl;
-    comp_mode = Compressor::COMP_NONE;
-  }
-
-  compressor = nullptr;
-
   auto& alg_name = cct->_conf->bluestore_compression_algorithm;
   if (!alg_name.empty()) {
     compressor = Compressor::create(cct, alg_name);
@@ -3647,23 +3707,50 @@ void BlueStore::_set_blob_size()
 
 int BlueStore::_set_cache_sizes()
 {
+  assert(bdev);
+  if (cct->_conf->bluestore_cache_size) {
+    cache_size = cct->_conf->bluestore_cache_size;
+  } else {
+    // choose global cache size based on backend type
+    if (bdev->is_rotational()) {
+      cache_size = cct->_conf->bluestore_cache_size_hdd;
+    } else {
+      cache_size = cct->_conf->bluestore_cache_size_ssd;
+    }
+  }
   cache_meta_ratio = cct->_conf->bluestore_cache_meta_ratio;
   cache_kv_ratio = cct->_conf->bluestore_cache_kv_ratio;
+
+  double cache_kv_max = cct->_conf->bluestore_cache_kv_max;
+  double cache_kv_max_ratio = 0;
+
+  // if cache_kv_max is negative, disable it
+  if (cache_size > 0 && cache_kv_max >= 0) {
+    cache_kv_max_ratio = (double) cache_kv_max / (double) cache_size;
+    if (cache_kv_max_ratio < 1.0 && cache_kv_max_ratio < cache_kv_ratio) {
+      dout(1) << __func__ << " max " << cache_kv_max_ratio
+            << " < ratio " << cache_kv_ratio
+            << dendl;
+      cache_meta_ratio = cache_meta_ratio + cache_kv_ratio - cache_kv_max_ratio;
+      cache_kv_ratio = cache_kv_max_ratio;
+    }
+  }  
+
   cache_data_ratio =
     (double)1.0 - (double)cache_meta_ratio - (double)cache_kv_ratio;
 
-  if (cache_meta_ratio <= 0 || cache_meta_ratio > 1.0) {
-    derr << __func__ << "bluestore_cache_meta_ratio (" << cache_meta_ratio
-        << ") must be in range (0,1.0]" << dendl;
+  if (cache_meta_ratio < 0 || cache_meta_ratio > 1.0) {
+    derr << __func__ << " bluestore_cache_meta_ratio (" << cache_meta_ratio
+        << ") must be in range [0,1.0]" << dendl;
     return -EINVAL;
   }
-  if (cache_kv_ratio <= 0 || cache_kv_ratio > 1.0) {
-    derr << __func__ << "bluestore_cache_kv_ratio (" << cache_kv_ratio
-        << ") must be in range (0,1.0]" << dendl;
+  if (cache_kv_ratio < 0 || cache_kv_ratio > 1.0) {
+    derr << __func__ << " bluestore_cache_kv_ratio (" << cache_kv_ratio
+        << ") must be in range [0,1.0]" << dendl;
     return -EINVAL;
   }
   if (cache_meta_ratio + cache_kv_ratio > 1.0) {
-    derr << __func__ << "bluestore_cache_meta_ratio (" << cache_meta_ratio
+    derr << __func__ << " bluestore_cache_meta_ratio (" << cache_meta_ratio
         << ") + bluestore_cache_kv_ratio (" << cache_kv_ratio
         << ") = " << cache_meta_ratio + cache_kv_ratio << "; must be <= 1.0"
         << dendl;
@@ -3673,7 +3760,8 @@ int BlueStore::_set_cache_sizes()
     // deal with floating point imprecision
     cache_data_ratio = 0;
   }
-  dout(1) << __func__ << " meta " << cache_meta_ratio
+  dout(1) << __func__ << " cache_size " << cache_size
+          << " meta " << cache_meta_ratio
          << " kv " << cache_kv_ratio
          << " data " << cache_data_ratio
          << dendl;
@@ -3855,14 +3943,8 @@ int BlueStore::get_block_device_fsid(CephContext* cct, const string& path,
 
 int BlueStore::_open_path()
 {
-  // initial sanity check
-  int r = _set_cache_sizes();
-  if (r < 0) {
-    return r;
-  }
-
   assert(path_fd < 0);
-  path_fd = ::open(path.c_str(), O_DIRECTORY);
+  path_fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_DIRECTORY));
   if (path_fd < 0) {
     int r = -errno;
     derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
@@ -3890,7 +3972,7 @@ int BlueStore::_write_bdev_label(string path, bluestore_bdev_label_t label)
   z.zero();
   bl.append(std::move(z));
 
-  int fd = ::open(path.c_str(), O_WRONLY);
+  int fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_WRONLY));
   if (fd < 0) {
     fd = -errno;
     derr << __func__ << " failed to open " << path << ": " << cpp_strerror(fd)
@@ -3910,7 +3992,7 @@ int BlueStore::_read_bdev_label(CephContext* cct, string path,
                                bluestore_bdev_label_t *label)
 {
   dout(10) << __func__ << dendl;
-  int fd = ::open(path.c_str(), O_RDONLY);
+  int fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_RDONLY));
   if (fd < 0) {
     fd = -errno;
     derr << __func__ << " failed to open " << path << ": " << cpp_strerror(fd)
@@ -3981,9 +4063,6 @@ int BlueStore::_check_or_set_bdev_label(
 
 void BlueStore::_set_alloc_sizes(void)
 {
-  min_alloc_size_order = ctz(min_alloc_size);
-  assert(min_alloc_size == 1u << min_alloc_size_order);
-
   max_alloc_size = cct->_conf->bluestore_max_alloc_size;
 
   if (cct->_conf->bluestore_prefer_deferred_size) {
@@ -4037,6 +4116,11 @@ int BlueStore::_open_bdev(bool create)
   block_mask = ~(block_size - 1);
   block_size_order = ctz(block_size);
   assert(block_size == 1u << block_size_order);
+  // and set cache_size based on device type
+  r = _set_cache_sizes();
+  if (r < 0) {
+    goto fail_close;
+  }
   return 0;
 
  fail_close:
@@ -4180,6 +4264,7 @@ int BlueStore::_open_alloc()
     ++num;
     bytes += length;
   }
+  fm->enumerate_reset();
   dout(1) << __func__ << " loaded " << pretty_si_t(bytes)
          << " in " << num << " extents"
          << dendl;
@@ -4315,6 +4400,17 @@ bool BlueStore::is_rotational()
   return rotational;
 }
 
+bool BlueStore::is_journal_rotational()
+{
+  if (!bluefs) {
+    dout(5) << __func__ << " bluefs disabled, default to store media type"
+            << dendl;
+    return is_rotational();
+  }
+  dout(10) << __func__ << " " << (int)bluefs->wal_is_rotational() << dendl;
+  return bluefs->wal_is_rotational();
+}
+
 bool BlueStore::test_mount_in_use()
 {
   // most error conditions mean the mount is not in use (e.g., because
@@ -4584,7 +4680,7 @@ int BlueStore::_open_db(bool create)
   FreelistManager::setup_merge_operators(db);
   db->set_merge_operator(PREFIX_STAT, merge_op);
 
-  db->set_cache_size(cct->_conf->bluestore_cache_size * cache_kv_ratio);
+  db->set_cache_size(cache_size * cache_kv_ratio);
 
   if (kv_backend == "rocksdb")
     options = cct->_conf->bluestore_rocksdb_options;
@@ -4837,7 +4933,7 @@ int BlueStore::_open_collections(int *errors)
   return 0;
 }
 
-void BlueStore::open_statfs()
+void BlueStore::_open_statfs()
 {
   bufferlist bl;
   int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
@@ -4845,8 +4941,7 @@ void BlueStore::open_statfs()
     if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
       auto it = bl.begin();
       vstatfs.decode(it);
-    }
-    else {
+    } else {
       dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
     }
   }
@@ -5077,7 +5172,17 @@ int BlueStore::mkfs()
        min_alloc_size = cct->_conf->bluestore_min_alloc_size_ssd;
       }
     }
-    _set_alloc_sizes();
+
+    // make sure min_alloc_size is power of 2 aligned.
+    if (!ISP2(min_alloc_size)) {
+      derr << __func__ << " min_alloc_size 0x"
+           << std::hex << min_alloc_size << std::dec
+           << " is not power of 2 aligned!"
+           << dendl;
+      r = -EINVAL;
+      goto out_close_fm;
+    }
+
     {
       bufferlist bl;
       ::encode((uint64_t)min_alloc_size, bl);
@@ -5089,27 +5194,23 @@ int BlueStore::mkfs()
     db->submit_transaction_sync(t);
   }
 
-  r = _open_alloc();
-  if (r < 0)
-    goto out_close_fm;
 
   r = write_meta("kv_backend", cct->_conf->bluestore_kvbackend);
   if (r < 0)
-    goto out_close_alloc;
+    goto out_close_fm;
+
   r = write_meta("bluefs", stringify((int)cct->_conf->bluestore_bluefs));
   if (r < 0)
-    goto out_close_alloc;
+    goto out_close_fm;
 
   if (fsid != old_fsid) {
     r = _write_fsid();
     if (r < 0) {
       derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
-      goto out_close_alloc;
+      goto out_close_fm;
     }
   }
 
- out_close_alloc:
-  _close_alloc();
  out_close_fm:
   _close_fm();
  out_close_db:
@@ -5892,13 +5993,94 @@ int BlueStore::fsck(bool deep)
        ++errors;
       }
     }
+    fm->enumerate_reset();
     size_t count = used_blocks.count();
+    if (used_blocks.size() == count + 1) {
+      // this due to http://tracker.ceph.com/issues/21089
+      bufferlist fm_bpb_bl, fm_blocks_bl, fm_bpk_bl;
+      db->get(PREFIX_ALLOC, "bytes_per_block", &fm_bpb_bl);
+      db->get(PREFIX_ALLOC, "blocks", &fm_blocks_bl);
+      db->get(PREFIX_ALLOC, "blocks_per_key", &fm_bpk_bl);
+      uint64_t fm_blocks = 0;
+      uint64_t fm_bsize = 1;
+      uint64_t fm_blocks_per_key = 1;
+      try {
+       auto p = fm_blocks_bl.begin();
+       ::decode(fm_blocks, p);
+       auto q = fm_bpb_bl.begin();
+       ::decode(fm_bsize, q);
+       auto r = fm_bpk_bl.begin();
+       ::decode(fm_blocks_per_key, r);
+      } catch (buffer::error& e) {
+      }
+      uint64_t dev_bsize = bdev->get_block_size();
+      uint64_t bad_size = bdev->get_size() & ~fm_bsize;
+      if (used_blocks.test(bad_size / dev_bsize) == 0) {
+       // this is the last block of the device that we previously
+       // (incorrectly) truncated off of the effective device size.  this
+       // prevented BitmapFreelistManager from marking it as used along with
+       // the other "past-eof" blocks in the last key slot.  mark it used
+       // now.
+       derr << __func__ << " warning: fixing leaked block 0x" << std::hex
+            << bad_size << "~" << fm_bsize << std::dec << " due to old bug"
+            << dendl;
+       KeyValueDB::Transaction t = db->get_transaction();
+       // fix freelistmanager metadata (the internal 'blocks' count is
+       // rounded up to include the trailing key, past eof)
+       uint64_t new_blocks = bdev->get_size() / fm_bsize;
+       if (new_blocks / fm_blocks_per_key * fm_blocks_per_key != new_blocks) {
+         new_blocks = (new_blocks / fm_blocks_per_key + 1) *
+           fm_blocks_per_key;
+       }
+       if (new_blocks != fm_blocks) {
+         // the fm block count increased
+         derr << __func__ << "  freelist block and key count changed, fixing 0x"
+              << std::hex << bdev->get_size() << "~"
+              << ((new_blocks * fm_bsize) - bdev->get_size()) << std::dec
+              << dendl;
+         bufferlist bl;
+         ::encode(new_blocks, bl);
+         t->set(PREFIX_ALLOC, "blocks", bl);
+         fm->allocate(bdev->get_size(),
+                      (new_blocks * fm_bsize) - bdev->get_size(),
+                      t);
+       } else {
+         // block count is the same, but size changed; fix just the size
+         derr << __func__ << "  fixing just the stray block at 0x"
+              << std::hex << bad_size << "~" << fm_bsize << std::dec << dendl;
+         fm->allocate(bad_size, fm_bsize, t);
+       }
+       bufferlist sizebl;
+       ::encode(bdev->get_size(), sizebl);
+       t->set(PREFIX_ALLOC, "size", sizebl);
+       int r = db->submit_transaction_sync(t);
+       assert(r == 0);
+
+       used_blocks.set(bad_size / dev_bsize);
+       ++count;
+      }
+    }
     if (used_blocks.size() != count) {
       assert(used_blocks.size() > count);
-      derr << __func__ << " error: leaked some space;"
-          << (used_blocks.size() - count) * min_alloc_size
-          << " bytes leaked" << dendl;
       ++errors;
+      used_blocks.flip();
+      size_t start = used_blocks.find_first();
+      while (start != decltype(used_blocks)::npos) {
+       size_t cur = start;
+       while (true) {
+         size_t next = used_blocks.find_next(cur);
+         if (next != cur + 1) {
+           derr << __func__ << " error: leaked extent 0x" << std::hex
+                << ((uint64_t)start * block_size) << "~"
+                << ((cur + 1 - start) * block_size) << std::dec
+                << dendl;
+           start = next;
+           break;
+         }
+         cur = next;
+       }
+      }
+      used_blocks.flip();
     }
   }
 
@@ -6154,13 +6336,12 @@ int BlueStore::read(
   uint64_t offset,
   size_t length,
   bufferlist& bl,
-  uint32_t op_flags,
-  bool allow_eio)
+  uint32_t op_flags)
 {
   CollectionHandle c = _get_collection(cid);
   if (!c)
     return -ENOENT;
-  return read(c, oid, offset, length, bl, op_flags, allow_eio);
+  return read(c, oid, offset, length, bl, op_flags);
 }
 
 int BlueStore::read(
@@ -6169,8 +6350,7 @@ int BlueStore::read(
   uint64_t offset,
   size_t length,
   bufferlist& bl,
-  uint32_t op_flags,
-  bool allow_eio)
+  uint32_t op_flags)
 {
   utime_t start = ceph_clock_now();
   Collection *c = static_cast<Collection *>(c_.get());
@@ -6200,10 +6380,13 @@ int BlueStore::read(
   }
 
  out:
-  assert(allow_eio || r != -EIO);
   if (r == 0 && _debug_data_eio(oid)) {
     r = -EIO;
     derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
+  } else if (cct->_conf->bluestore_debug_random_read_err &&
+    (rand() % (int)(cct->_conf->bluestore_debug_random_read_err * 100.0)) == 0) {
+    dout(0) << __func__ << ": inject random EIO" << dendl;
+    r = -EIO;
   }
   dout(10) << __func__ << " " << cid << " " << oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
@@ -7443,6 +7626,8 @@ int BlueStore::_open_super_meta()
       uint64_t val;
       ::decode(val, p);
       min_alloc_size = val;
+      min_alloc_size_order = ctz(val);
+      assert(min_alloc_size == 1u << min_alloc_size_order);
     } catch (buffer::error& e) {
       derr << __func__ << " unable to read min_alloc_size" << dendl;
       return -EIO;
@@ -7450,7 +7635,7 @@ int BlueStore::_open_super_meta()
     dout(10) << __func__ << " min_alloc_size 0x" << std::hex << min_alloc_size
             << std::dec << dendl;
   }
-  open_statfs();
+  _open_statfs();
   _set_alloc_sizes();
   _set_throttle_params();
 
@@ -7504,12 +7689,15 @@ int BlueStore::_upgrade_super()
 
 void BlueStore::_assign_nid(TransContext *txc, OnodeRef o)
 {
-  if (o->onode.nid)
+  if (o->onode.nid) {
+    assert(o->exists);
     return;
+  }
   uint64_t nid = ++nid_last;
   dout(20) << __func__ << " " << nid << dendl;
   o->onode.nid = nid;
   txc->last_nid = nid;
+  o->exists = true;
 }
 
 uint64_t BlueStore::_assign_blobid(TransContext *txc)
@@ -7919,7 +8107,6 @@ void BlueStore::_txc_finish(TransContext *txc)
   }
 
   OpSequencerRef osr = txc->osr;
-  CollectionRef c;
   bool empty = false;
   bool submit_deferred = false;
   OpSequencer::q_list_t releasing_txc;
@@ -7944,9 +8131,6 @@ void BlueStore::_txc_finish(TransContext *txc)
         break;
       }
 
-      if (!c && txc->first_collection) {
-        c = txc->first_collection;
-      }
       osr->q.pop_front();
       releasing_txc.push_back(*txc);
       notify = true;
@@ -8005,9 +8189,11 @@ void BlueStore::_osr_drain_preceding(TransContext *txc)
   ++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
   {
     // submit anything pending
-    std::lock_guard<std::mutex> l(deferred_lock);
+    deferred_lock.lock();
     if (osr->deferred_pending) {
-      _deferred_submit(osr);
+      _deferred_submit_unlock(osr);
+    } else {
+      deferred_lock.unlock();
     }
   }
   {
@@ -8034,8 +8220,7 @@ void BlueStore::_osr_drain_all()
   ++deferred_aggressive;
   {
     // submit anything pending
-    std::lock_guard<std::mutex> l(deferred_lock);
-    _deferred_try_submit();
+    deferred_try_submit();
   }
   {
     // wake up any previously finished deferred events
@@ -8251,26 +8436,29 @@ void BlueStore::_kv_sync_thread()
        t->set(PREFIX_SUPER, "blobid_max", bl);
        dout(10) << __func__ << " new_blobid_max " << new_blobid_max << dendl;
       }
-      for (auto txc : kv_submitting) {
-       assert(txc->state == TransContext::STATE_KV_QUEUED);
-       txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
-       int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
-       assert(r == 0);
-       _txc_applied_kv(txc);
-       --txc->osr->kv_committing_serially;
-       txc->state = TransContext::STATE_KV_SUBMITTED;
-       if (txc->osr->kv_submitted_waiters) {
-         std::lock_guard<std::mutex> l(txc->osr->qlock);
-         if (txc->osr->_is_all_kv_submitted()) {
-           txc->osr->qcond.notify_all();
+
+      for (auto txc : kv_committing) {
+       if (txc->state == TransContext::STATE_KV_QUEUED) {
+         txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
+         int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
+         assert(r == 0);
+         _txc_applied_kv(txc);
+         --txc->osr->kv_committing_serially;
+         txc->state = TransContext::STATE_KV_SUBMITTED;
+         if (txc->osr->kv_submitted_waiters) {
+           std::lock_guard<std::mutex> l(txc->osr->qlock);
+           if (txc->osr->_is_all_kv_submitted()) {
+             txc->osr->qcond.notify_all();
+           }
          }
+
+       } else {
+         assert(txc->state == TransContext::STATE_KV_SUBMITTED);
+         txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
        }
-      }
-      for (auto txc : kv_committing) {
        if (txc->had_ios) {
          --txc->osr->txc_with_unstable_io;
        }
-       txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
       }
 
       // release throttle *before* we commit.  this allows new ops
@@ -8332,16 +8520,16 @@ void BlueStore::_kv_sync_thread()
        dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
       }
 
-      utime_t finish = ceph_clock_now();
-      utime_t dur_flush = after_flush - start;
-      utime_t dur_kv = finish - after_flush;
-      utime_t dur = finish - start;
-      dout(20) << __func__ << " committed " << kv_committing.size()
-              << " cleaned " << deferred_stable.size()
-              << " in " << dur
-              << " (" << dur_flush << " flush + " << dur_kv << " kv commit)"
-              << dendl;
-      if (logger) {
+      {
+       utime_t finish = ceph_clock_now();
+       utime_t dur_flush = after_flush - start;
+       utime_t dur_kv = finish - after_flush;
+       utime_t dur = finish - start;
+       dout(20) << __func__ << " committed " << kv_committing.size()
+         << " cleaned " << deferred_stable.size()
+         << " in " << dur
+         << " (" << dur_flush << " flush + " << dur_kv << " kv commit)"
+         << dendl;
        logger->tinc(l_bluestore_kv_flush_lat, dur_flush);
        logger->tinc(l_bluestore_kv_commit_lat, dur_kv);
        logger->tinc(l_bluestore_kv_lat, dur);
@@ -8440,10 +8628,9 @@ void BlueStore::_kv_finalize_thread()
       deferred_stable.clear();
 
       if (!deferred_aggressive) {
-       std::lock_guard<std::mutex> l(deferred_lock);
        if (deferred_queue_size >= deferred_batch_ops.load() ||
            throttle_deferred_bytes.past_midpoint()) {
-         _deferred_try_submit();
+         deferred_try_submit();
        }
       }
 
@@ -8470,7 +8657,7 @@ bluestore_deferred_op_t *BlueStore::_get_deferred_op(
 void BlueStore::_deferred_queue(TransContext *txc)
 {
   dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
-  std::lock_guard<std::mutex> l(deferred_lock);
+  deferred_lock.lock();
   if (!txc->osr->deferred_pending &&
       !txc->osr->deferred_running) {
     deferred_queue.push_back(*txc->osr);
@@ -8492,22 +8679,31 @@ void BlueStore::_deferred_queue(TransContext *txc)
   }
   if (deferred_aggressive &&
       !txc->osr->deferred_running) {
-    _deferred_submit(txc->osr.get());
+    _deferred_submit_unlock(txc->osr.get());
+  } else {
+    deferred_lock.unlock();
   }
 }
 
-void BlueStore::_deferred_try_submit()
+void BlueStore::deferred_try_submit()
 {
   dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
           << deferred_queue_size << " txcs" << dendl;
+  std::lock_guard<std::mutex> l(deferred_lock);
+  vector<OpSequencerRef> osrs;
+  osrs.reserve(deferred_queue.size());
   for (auto& osr : deferred_queue) {
-    if (!osr.deferred_running) {
-      _deferred_submit(&osr);
+    osrs.push_back(&osr);
+  }
+  for (auto& osr : osrs) {
+    if (osr->deferred_pending && !osr->deferred_running) {
+      _deferred_submit_unlock(osr.get());
+      deferred_lock.lock();
     }
   }
 }
 
-void BlueStore::_deferred_submit(OpSequencer *osr)
+void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
 {
   dout(10) << __func__ << " osr " << osr
           << " " << osr->deferred_pending->iomap.size() << " ios pending "
@@ -8555,6 +8751,10 @@ void BlueStore::_deferred_submit(OpSequencer *osr)
     bl.claim_append(i->second.bl);
     ++i;
   }
+
+  // demote to deferred_submit_lock, then drop that too
+  std::lock_guard<std::mutex> l(deferred_submit_lock);
+  deferred_lock.unlock();
   bdev->aio_submit(&b->ioc);
 }
 
@@ -8572,7 +8772,10 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
       auto q = deferred_queue.iterator_to(*osr);
       deferred_queue.erase(q);
     } else if (deferred_aggressive) {
-      _deferred_submit(osr);
+      dout(20) << __func__ << " queuing async deferred_try_submit" << dendl;
+      finishers[0]->queue(new FunctionContext([&](int) {
+           deferred_try_submit();
+         }));
     }
   }
 
@@ -8706,9 +8909,13 @@ int BlueStore::queue_transactions(
   if (txc->deferred_txn) {
     // ensure we do not block here because of deferred writes
     if (!throttle_deferred_bytes.get_or_fail(txc->cost)) {
+      dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
+              << dendl;
+      ++deferred_aggressive;
       deferred_try_submit();
       throttle_deferred_bytes.get(txc->cost);
-    }
+      --deferred_aggressive;
+   }
   }
   utime_t tend = ceph_clock_now();
 
@@ -8742,10 +8949,6 @@ void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t)
   for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
        ++p, ++j) {
     cvec[j] = _get_collection(*p);
-
-    // note first collection we reference
-    if (!txc->first_collection)
-      txc->first_collection = cvec[j];
   }
   vector<OnodeRef> ovec(i.objects.size());
 
@@ -8890,7 +9093,7 @@ void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t)
     case Transaction::OP_TRUNCATE:
       {
         uint64_t off = op->off;
-       _truncate(txc, c, o, off);
+       r = _truncate(txc, c, o, off);
       }
       break;
 
@@ -9093,7 +9296,6 @@ int BlueStore::_touch(TransContext *txc,
 {
   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
   int r = 0;
-  o->exists = true;
   _assign_nid(txc, o);
   txc->write_onode(o);
   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
@@ -9182,12 +9384,12 @@ void BlueStore::_pad_zeros(
   if (front_pad) {
     size_t front_copy = MIN(chunk_size - front_pad, length);
     bufferptr z = buffer::create_page_aligned(chunk_size);
-    memset(z.c_str(), 0, front_pad);
+    z.zero(0, front_pad, false);
     pad_count += front_pad;
-    memcpy(z.c_str() + front_pad, bl->get_contiguous(0, front_copy), front_copy);
+    bl->copy(0, front_copy, z.c_str() + front_pad);
     if (front_copy + front_pad < chunk_size) {
       back_pad = chunk_size - (length + front_pad);
-      memset(z.c_str() + front_pad + length, 0, back_pad);
+      z.zero(front_pad + length, back_pad, false);
       pad_count += back_pad;
     }
     bufferlist old, t;
@@ -9196,7 +9398,7 @@ void BlueStore::_pad_zeros(
     bl->append(z);
     bl->claim_append(t);
     *offset -= front_pad;
-    length += front_pad + back_pad;
+    length += pad_count;
   }
 
   // back
@@ -9207,9 +9409,8 @@ void BlueStore::_pad_zeros(
     back_pad = chunk_size - back_copy;
     assert(back_copy <= length);
     bufferptr tail(chunk_size);
-    memcpy(tail.c_str(), bl->get_contiguous(length - back_copy, back_copy),
-          back_copy);
-    memset(tail.c_str() + back_copy, 0, back_pad);
+    bl->copy(length - back_copy, back_copy, tail.c_str());
+    tail.zero(back_copy, back_pad, false);
     bufferlist old;
     old.swap(*bl);
     bl->substr_of(old, 0, length - back_copy);
@@ -9273,7 +9474,7 @@ void BlueStore::_do_write_small(
 
   // search suitable extent in both forward and reverse direction in
   // [offset - target_max_blob_size, offset + target_max_blob_size] range
-  // then check if blob can be reused via try_reuse_blob func or apply
+  // then check if blob can be reused via can_reuse_blob func or apply
   // direct/deferred write (the latter for extents including or higher
   // than 'offset' only).
   do {
@@ -9317,14 +9518,13 @@ void BlueStore::_do_write_small(
            b->get_blob().get_ondisk_length() >= b_off + b_len &&
            b->get_blob().is_unused(b_off, b_len) &&
            b->get_blob().is_allocated(b_off, b_len)) {
-         bufferlist padded;
-         _apply_padding(head_pad, tail_pad, bl, padded);
+         _apply_padding(head_pad, tail_pad, bl);
 
          dout(20) << __func__ << "  write to unused 0x" << std::hex
                   << b_off << "~" << b_len
                   << " pad 0x" << head_pad << " + 0x" << tail_pad
                   << std::dec << " of mutable " << *b << dendl;
-         _buffer_cache_write(txc, b, b_off, padded,
+         _buffer_cache_write(txc, b, b_off, bl,
                              wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
 
          if (!g_conf->bluestore_debug_omit_block_device_write) {
@@ -9339,17 +9539,17 @@ void BlueStore::_do_write_small(
                  op->extents.emplace_back(bluestore_pextent_t(offset, length));
                  return 0;
                });
-             op->data = padded;
+             op->data = bl;
            } else {
              b->get_blob().map_bl(
-               b_off, padded,
+               b_off, bl,
                [&](uint64_t offset, bufferlist& t) {
                  bdev->aio_write(offset, t,
                                  &txc->ioc, wctx->buffered);
                });
            }
          }
-         b->dirty_blob().calc_csum(b_off, padded);
+         b->dirty_blob().calc_csum(b_off, bl);
          dout(20) << __func__ << "  lex old " << *ep << dendl;
          Extent *le = o->extent_map.set_lextent(c, offset, b_off + head_pad, length,
                                                 b,
@@ -9379,8 +9579,7 @@ void BlueStore::_do_write_small(
            b_len % chunk_size == 0 &&
            b->get_blob().is_allocated(b_off, b_len)) {
 
-         bufferlist padded;
-         _apply_padding(head_pad, tail_pad, bl, padded);
+         _apply_padding(head_pad, tail_pad, bl);
 
          dout(20) << __func__ << "  reading head 0x" << std::hex << head_read
                   << " and tail 0x" << tail_read << std::dec << dendl;
@@ -9394,8 +9593,7 @@ void BlueStore::_do_write_small(
              head_bl.append_zero(zlen);
              logger->inc(l_bluestore_write_pad_bytes, zlen);
            }
-           head_bl.claim_append(padded);
-           padded.swap(head_bl);
+           bl.claim_prepend(head_bl);
            logger->inc(l_bluestore_write_penalty_read_ops);
          }
          if (tail_read) {
@@ -9408,14 +9606,14 @@ void BlueStore::_do_write_small(
              tail_bl.append_zero(zlen);
              logger->inc(l_bluestore_write_pad_bytes, zlen);
            }
-           padded.claim_append(tail_bl);
+           bl.claim_append(tail_bl);
            logger->inc(l_bluestore_write_penalty_read_ops);
          }
          logger->inc(l_bluestore_write_small_pre_read);
 
          bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
          op->op = bluestore_deferred_op_t::OP_WRITE;
-         _buffer_cache_write(txc, b, b_off, padded,
+         _buffer_cache_write(txc, b, b_off, bl,
                              wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
 
          int r = b->get_blob().map(
@@ -9426,9 +9624,9 @@ void BlueStore::_do_write_small(
            });
          assert(r == 0);
          if (b->get_blob().csum_type) {
-           b->dirty_blob().calc_csum(b_off, padded);
+           b->dirty_blob().calc_csum(b_off, bl);
          }
-         op->data.claim(padded);
+         op->data.claim(bl);
          dout(20) << __func__ << "  deferred write 0x" << std::hex << b_off << "~"
                   << b_len << std::dec << " of mutable " << *b
                   << " at " << op->extents << dendl;
@@ -9440,8 +9638,8 @@ void BlueStore::_do_write_small(
          logger->inc(l_bluestore_write_small_deferred);
          return;
        }
-       //try to reuse blob
-       if (b->try_reuse_blob(min_alloc_size,
+       // try to reuse blob if we can
+       if (b->can_reuse_blob(min_alloc_size,
                              max_bsize,
                              offset0 - bstart,
                              &alloc_len)) {
@@ -9465,8 +9663,8 @@ void BlueStore::_do_write_small(
            _pad_zeros(&bl, &b_off0, chunk_size);
 
            dout(20) << __func__ << " reuse blob " << *b << std::hex
-                    << " (" << b_off0 << "~" << bl.length() << ")"
-                    << " (" << b_off << "~" << length << ")"
+                    << " (0x" << b_off0 << "~" << bl.length() << ")"
+                    << " (0x" << b_off << "~" << length << ")"
                     << std::dec << dendl;
 
            o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
@@ -9487,7 +9685,7 @@ void BlueStore::_do_write_small(
       auto bstart = prev_ep->blob_start();
       dout(20) << __func__ << " considering " << *b
               << " bstart 0x" << std::hex << bstart << std::dec << dendl;
-      if (b->try_reuse_blob(min_alloc_size,
+      if (b->can_reuse_blob(min_alloc_size,
                            max_bsize,
                             offset0 - bstart,
                             &alloc_len)) {
@@ -9510,8 +9708,8 @@ void BlueStore::_do_write_small(
          _pad_zeros(&bl, &b_off0, chunk_size);
 
          dout(20) << __func__ << " reuse blob " << *b << std::hex
-                   << " (" << b_off0 << "~" << bl.length() << ")"
-                   << " (" << b_off << "~" << length << ")"
+                   << " (0x" << b_off0 << "~" << bl.length() << ")"
+                   << " (0x" << b_off << "~" << length << ")"
                    << std::dec << dendl;
 
          o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
@@ -9580,20 +9778,20 @@ void BlueStore::_do_write_big(
       auto min_off = offset >= max_bsize ? offset - max_bsize : 0;
       // search suitable extent in both forward and reverse direction in
       // [offset - target_max_blob_size, offset + target_max_blob_size] range
-      // then check if blob can be reused via try_reuse_blob func.
+      // then check if blob can be reused via can_reuse_blob func.
       bool any_change;
       do {
        any_change = false;
        if (ep != end && ep->logical_offset < offset + max_bsize) {
          if (offset >= ep->blob_start() &&
-              ep->blob->try_reuse_blob(min_alloc_size, max_bsize,
+              ep->blob->can_reuse_blob(min_alloc_size, max_bsize,
                                       offset - ep->blob_start(),
                                       &l)) {
            b = ep->blob;
            b_off = offset - ep->blob_start();
             prev_ep = end; // to avoid check below
            dout(20) << __func__ << " reuse blob " << *b << std::hex
-                    << " (" << b_off << "~" << l << ")" << std::dec << dendl;
+                    << " (0x" << b_off << "~" << l << ")" << std::dec << dendl;
          } else {
            ++ep;
            any_change = true;
@@ -9601,13 +9799,13 @@ void BlueStore::_do_write_big(
        }
 
        if (prev_ep != end && prev_ep->logical_offset >= min_off) {
-         if (prev_ep->blob->try_reuse_blob(min_alloc_size, max_bsize,
+         if (prev_ep->blob->can_reuse_blob(min_alloc_size, max_bsize,
                                            offset - prev_ep->blob_start(),
                                            &l)) {
            b = prev_ep->blob;
            b_off = offset - prev_ep->blob_start();
            dout(20) << __func__ << " reuse blob " << *b << std::hex
-                    << " (" << b_off << "~" << l << ")" << std::dec << dendl;
+                    << " (0x" << b_off << "~" << l << ")" << std::dec << dendl;
          } else if (prev_ep != begin) {
            --prev_ep;
            any_change = true;
@@ -10025,12 +10223,11 @@ void BlueStore::_choose_write_options(
 
     dout(20) << __func__ << " will prefer large blob and csum sizes" << dendl;
 
-    auto order = min_alloc_size_order.load();
     if (o->onode.expected_write_size) {
-      wctx->csum_order = std::max(order,
+      wctx->csum_order = std::max(min_alloc_size_order,
                                  (uint8_t)ctz(o->onode.expected_write_size));
     } else {
-      wctx->csum_order = order;
+      wctx->csum_order = min_alloc_size_order;
     }
 
     if (wctx->compress) {
@@ -10216,11 +10413,14 @@ int BlueStore::_write(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << dendl;
-  o->exists = true;
-  _assign_nid(txc, o);
-  int r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
-  txc->write_onode(o);
-
+  int r = 0;
+  if (offset + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _assign_nid(txc, o);
+    r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
+    txc->write_onode(o);
+  }
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << " = " << r << dendl;
@@ -10235,9 +10435,13 @@ int BlueStore::_zero(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << dendl;
-  o->exists = true;
-  _assign_nid(txc, o);
-  int r = _do_zero(txc, c, o, offset, length);
+  int r = 0;
+  if (offset + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _assign_nid(txc, o);
+    r = _do_zero(txc, c, o, offset, length);
+  }
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << " = " << r << dendl;
@@ -10312,7 +10516,7 @@ void BlueStore::_do_truncate(
   txc->write_onode(o);
 }
 
-void BlueStore::_truncate(TransContext *txc,
+int BlueStore::_truncate(TransContext *txc,
                         CollectionRef& c,
                         OnodeRef& o,
                         uint64_t offset)
@@ -10320,7 +10524,16 @@ void BlueStore::_truncate(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << std::dec
           << dendl;
-  _do_truncate(txc, c, o, offset);
+  int r = 0;
+  if (offset >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _do_truncate(txc, c, o, offset);
+  }
+  dout(10) << __func__ << " " << c->cid << " " << o->oid
+          << " 0x" << std::hex << offset << std::dec
+          << " = " << r << dendl;
+  return r;
 }
 
 int BlueStore::_do_remove(
@@ -10329,7 +10542,8 @@ int BlueStore::_do_remove(
   OnodeRef o)
 {
   set<SharedBlob*> maybe_unshared_blobs;
-  _do_truncate(txc, c, o, 0, &maybe_unshared_blobs);
+  bool is_gen = !o->oid.is_no_gen();
+  _do_truncate(txc, c, o, 0, is_gen ? &maybe_unshared_blobs : nullptr);
   if (o->onode.has_omap()) {
     o->flush();
     _do_omap_clear(txc, o->onode.nid);
@@ -10351,72 +10565,71 @@ int BlueStore::_do_remove(
   o->onode = bluestore_onode_t();
   _debug_obj_on_delete(o->oid);
 
-  if (!o->oid.is_no_gen() &&
-      !maybe_unshared_blobs.empty()) {
-    // see if we can unshare blobs still referenced by the head
-    dout(10) << __func__ << " gen and maybe_unshared_blobs "
-            << maybe_unshared_blobs << dendl;
-    ghobject_t nogen = o->oid;
-    nogen.generation = ghobject_t::NO_GEN;
-    OnodeRef h = c->onode_map.lookup(nogen);
-    if (h && h->exists) {
-      dout(20) << __func__ << " checking for unshareable blobs on " << h
-              << " " << h->oid << dendl;
-      map<SharedBlob*,bluestore_extent_ref_map_t> expect;
-      for (auto& e : h->extent_map.extent_map) {
-       const bluestore_blob_t& b = e.blob->get_blob();
-       SharedBlob *sb = e.blob->shared_blob.get();
-       if (b.is_shared() &&
-           sb->loaded &&
-           maybe_unshared_blobs.count(sb)) {
-         b.map(e.blob_offset, e.length, [&](uint64_t off, uint64_t len) {
-             expect[sb].get(off, len);
-             return 0;
-           });
-       }
-      }
-      vector<SharedBlob*> unshared_blobs;
-      unshared_blobs.reserve(maybe_unshared_blobs.size());
-      for (auto& p : expect) {
-       dout(20) << " ? " << *p.first << " vs " << p.second << dendl;
-       if (p.first->persistent->ref_map == p.second) {
-         SharedBlob *sb = p.first;
-         dout(20) << __func__ << "  unsharing " << *sb << dendl;
-         unshared_blobs.push_back(sb);
-         txc->unshare_blob(sb);
-         uint64_t sbid = c->make_blob_unshared(sb);
-         string key;
-         get_shared_blob_key(sbid, &key);
-         txc->t->rmkey(PREFIX_SHARED_BLOB, key);
-       }
-      }
+  if (!is_gen || maybe_unshared_blobs.empty()) {
+    return 0;
+  }
 
-      if (!unshared_blobs.empty()) {
-        uint32_t b_start = OBJECT_MAX_SIZE;
-        uint32_t b_end = 0;
-        for (auto& e : h->extent_map.extent_map) {
-          const bluestore_blob_t& b = e.blob->get_blob();
-          SharedBlob *sb = e.blob->shared_blob.get();
-          if (b.is_shared() &&
-              std::find(unshared_blobs.begin(), unshared_blobs.end(),
-                        sb) != unshared_blobs.end()) {
-            dout(20) << __func__ << "  unsharing " << e << dendl;
-            bluestore_blob_t& blob = e.blob->dirty_blob();
-            blob.clear_flag(bluestore_blob_t::FLAG_SHARED);
-            if (e.logical_offset < b_start) {
-              b_start = e.logical_offset;
-            }
-            if (e.logical_end() > b_end) {
-              b_end = e.logical_end();
-            }
-          }
-        }
+  // see if we can unshare blobs still referenced by the head
+  dout(10) << __func__ << " gen and maybe_unshared_blobs "
+          << maybe_unshared_blobs << dendl;
+  ghobject_t nogen = o->oid;
+  nogen.generation = ghobject_t::NO_GEN;
+  OnodeRef h = c->onode_map.lookup(nogen);
 
-       h->extent_map.dirty_range(b_start, b_end - b_start);
-       txc->write_onode(h);
-      }
+  if (!h || !h->exists) {
+    return 0;
+  }
+
+  dout(20) << __func__ << " checking for unshareable blobs on " << h
+          << " " << h->oid << dendl;
+  map<SharedBlob*,bluestore_extent_ref_map_t> expect;
+  for (auto& e : h->extent_map.extent_map) {
+    const bluestore_blob_t& b = e.blob->get_blob();
+    SharedBlob *sb = e.blob->shared_blob.get();
+    if (b.is_shared() &&
+       sb->loaded &&
+       maybe_unshared_blobs.count(sb)) {
+      b.map(e.blob_offset, e.length, [&](uint64_t off, uint64_t len) {
+        expect[sb].get(off, len);
+       return 0;
+      });
+    }
+  }
+
+  vector<SharedBlob*> unshared_blobs;
+  unshared_blobs.reserve(maybe_unshared_blobs.size());
+  for (auto& p : expect) {
+    dout(20) << " ? " << *p.first << " vs " << p.second << dendl;
+    if (p.first->persistent->ref_map == p.second) {
+      SharedBlob *sb = p.first;
+      dout(20) << __func__ << "  unsharing " << *sb << dendl;
+      unshared_blobs.push_back(sb);
+      txc->unshare_blob(sb);
+      uint64_t sbid = c->make_blob_unshared(sb);
+      string key;
+      get_shared_blob_key(sbid, &key);
+      txc->t->rmkey(PREFIX_SHARED_BLOB, key);
     }
   }
+
+  if (unshared_blobs.empty()) {
+    return 0;
+  }
+
+  for (auto& e : h->extent_map.extent_map) {
+    const bluestore_blob_t& b = e.blob->get_blob();
+    SharedBlob *sb = e.blob->shared_blob.get();
+    if (b.is_shared() &&
+        std::find(unshared_blobs.begin(), unshared_blobs.end(),
+                  sb) != unshared_blobs.end()) {
+      dout(20) << __func__ << "  unsharing " << e << dendl;
+      bluestore_blob_t& blob = e.blob->dirty_blob();
+      blob.clear_flag(bluestore_blob_t::FLAG_SHARED);
+      h->extent_map.dirty_range(e.logical_offset, 1);
+    }
+  }
+  txc->write_onode(h);
+
   return 0;
 }
 
@@ -10713,7 +10926,6 @@ int BlueStore::_clone(TransContext *txc,
     return -EINVAL;
   }
 
-  newo->exists = true;
   _assign_nid(txc, newo);
 
   // clone data
@@ -10781,7 +10993,9 @@ int BlueStore::_do_clone_range(
   CollectionRef& c,
   OnodeRef& oldo,
   OnodeRef& newo,
-  uint64_t srcoff, uint64_t length, uint64_t dstoff)
+  uint64_t srcoff,
+  uint64_t length,
+  uint64_t dstoff)
 {
   dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
           << newo->oid
@@ -10798,8 +11012,10 @@ int BlueStore::_do_clone_range(
     e.blob->last_encoded_id = -1;
   }
   int n = 0;
-  bool dirtied_oldo = false;
   uint64_t end = srcoff + length;
+  uint32_t dirty_range_begin = 0;
+  uint32_t dirty_range_end = 0;
+  bool src_dirty = false;
   for (auto ep = oldo->extent_map.seek_lextent(srcoff);
        ep != oldo->extent_map.extent_map.end();
        ++ep) {
@@ -10820,7 +11036,13 @@ int BlueStore::_do_clone_range(
       // make sure it is shared
       if (!blob.is_shared()) {
        c->make_blob_shared(_assign_blobid(txc), e.blob);
-       dirtied_oldo = true;  // fixme: overkill
+       if (!src_dirty) {
+         src_dirty = true;
+          dirty_range_begin = e.logical_offset;
+        }
+        assert(e.logical_end() > 0);
+        // -1 to exclude next potential shard
+        dirty_range_end = e.logical_end() - 1;
       } else {
        c->load_shared_blob(e.blob->shared_blob);
       }
@@ -10867,8 +11089,9 @@ int BlueStore::_do_clone_range(
     dout(20) << __func__ << "  dst " << *ne << dendl;
     ++n;
   }
-  if (dirtied_oldo) {
-    oldo->extent_map.dirty_range(srcoff, length); // overkill
+  if (src_dirty) {
+    oldo->extent_map.dirty_range(dirty_range_begin,
+      dirty_range_end - dirty_range_begin);
     txc->write_onode(oldo);
   }
   txc->write_onode(newo);
@@ -10893,12 +11116,16 @@ int BlueStore::_clone_range(TransContext *txc,
           << " to offset 0x" << dstoff << std::dec << dendl;
   int r = 0;
 
+  if (srcoff + length >= OBJECT_MAX_SIZE ||
+      dstoff + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+    goto out;
+  }
   if (srcoff + length > oldo->onode.size) {
     r = -EINVAL;
     goto out;
   }
 
-  newo->exists = true;
   _assign_nid(txc, newo);
 
   if (length > 0) {
@@ -11326,15 +11553,10 @@ void BlueStore::flush_cache()
 
 void BlueStore::_apply_padding(uint64_t head_pad,
                               uint64_t tail_pad,
-                              bufferlist& bl,
                               bufferlist& padded)
 {
-  padded = bl;
   if (head_pad) {
-    bufferlist z;
-    z.append_zero(head_pad);
-    z.claim_append(padded);
-    padded.claim(z);
+    padded.prepend_zero(head_pad);
   }
   if (tail_pad) {
     padded.append_zero(tail_pad);