]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/BlueStore.cc
update sources to 12.2.7
[ceph.git] / ceph / src / os / bluestore / BlueStore.cc
index 1ad6c81e6c389762b29f91beebad1742c294631f..6fdd11b170b2403af505c2cf68717d686314893f 100644 (file)
@@ -1615,6 +1615,12 @@ bool BlueStore::OnodeSpace::map_any(std::function<bool(OnodeRef)> f)
   return false;
 }
 
+void BlueStore::OnodeSpace::dump(CephContext *cct, int lvl)
+{
+  for (auto& i : onode_map) {
+    ldout(cct, lvl) << i.first << " : " << i.second << dendl;
+  }
+}
 
 // SharedBlob
 
@@ -1661,16 +1667,9 @@ void BlueStore::SharedBlob::put()
                             << " removing self from set " << get_parent()
                             << dendl;
     if (get_parent()) {
-      if (get_parent()->remove(this)) {
-       delete this;
-      } else {
-       ldout(coll->store->cct, 20)
-         << __func__ << " " << this << " lost race to remove myself from set"
-         << dendl;
-      }
-    } else {
-      delete this;
+      get_parent()->remove(this);
     }
+    delete this;
   }
 }
 
@@ -1692,6 +1691,19 @@ void BlueStore::SharedBlob::put_ref(uint64_t offset, uint32_t length,
   }
 }
 
+// SharedBlobSet
+
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore.sharedblobset(" << this << ") "
+
+void BlueStore::SharedBlobSet::dump(CephContext *cct, int lvl)
+{
+  std::lock_guard<std::mutex> l(lock);
+  for (auto& i : sb_map) {
+    ldout(cct, lvl) << i.first << " : " << *i.second << dendl;
+  }
+}
+
 // Blob
 
 #undef dout_prefix
@@ -1703,9 +1715,13 @@ ostream& operator<<(ostream& out, const BlueStore::Blob& b)
   if (b.is_spanning()) {
     out << " spanning " << b.id;
   }
-  out << " " << b.get_blob() << " " << b.get_blob_use_tracker()
-      << " " << *b.shared_blob
-      << ")";
+  out << " " << b.get_blob() << " " << b.get_blob_use_tracker();
+  if (b.shared_blob) {
+    out << " " << *b.shared_blob;
+  } else {
+    out << " (shared_blob=NULL)";
+  }
+  out << ")";
   return out;
 }
 
@@ -1972,6 +1988,7 @@ void BlueStore::ExtentMap::update(KeyValueDB::Transaction t,
       unsigned n;
       // we need to encode inline_bl to measure encoded length
       bool never_happen = encode_some(0, OBJECT_MAX_SIZE, inline_bl, &n);
+      inline_bl.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
       assert(!never_happen);
       size_t len = inline_bl.length();
       dout(20) << __func__ << "  inline shard " << len << " bytes from " << n
@@ -2129,7 +2146,7 @@ void BlueStore::ExtentMap::reshard(
             << needs_reshard_end << ")" << std::dec << dendl;
   }
 
-  fault_range(db, needs_reshard_begin, needs_reshard_end);
+  fault_range(db, needs_reshard_begin, (needs_reshard_end - needs_reshard_begin));
 
   // we may need to fault in a larger interval later must have all
   // referring extents for spanning blobs loaded in order to have
@@ -3224,12 +3241,17 @@ BlueStore::OnodeRef BlueStore::Collection::get_onode(
     on->exists = true;
     bufferptr::iterator p = v.front().begin_deep();
     on->onode.decode(p);
+    for (auto& i : on->onode.attrs) {
+      i.second.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+    }
 
     // initialize extent_map
     on->extent_map.decode_spanning_blobs(p);
     if (on->onode.extent_map_shards.empty()) {
       denc(on->extent_map.inline_bl, p);
       on->extent_map.decode_some(on->extent_map.inline_bl);
+      on->extent_map.inline_bl.reassign_to_mempool(
+       mempool::mempool_bluestore_cache_other);
     } else {
       on->extent_map.init_shards(false, false);
     }
@@ -3288,13 +3310,13 @@ void BlueStore::Collection::split_cache(
          continue;
        }
        ldout(store->cct, 20) << __func__ << "  moving " << *sb << dendl;
-       sb->coll = dest;
        if (sb->get_sbid()) {
          ldout(store->cct, 20) << __func__
                                << "   moving registration " << *sb << dendl;
          shared_blob_set.remove(sb);
          dest->shared_blob_set.add(dest, sb);
        }
+       sb->coll = dest;
        if (dest->cache != cache) {
          for (auto& i : sb->bc.buffer_map) {
            if (!i.second->is_writing()) {
@@ -3470,6 +3492,7 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
     throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
                       cct->_conf->bluestore_throttle_bytes +
                       cct->_conf->bluestore_throttle_deferred_bytes),
+    deferred_finisher(cct, "defered_finisher", "dfin"),
     kv_sync_thread(this),
     kv_finalize_thread(this),
     mempool_thread(this)
@@ -3488,6 +3511,7 @@ BlueStore::BlueStore(CephContext *cct,
     throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
                       cct->_conf->bluestore_throttle_bytes +
                       cct->_conf->bluestore_throttle_deferred_bytes),
+    deferred_finisher(cct, "defered_finisher", "dfin"),
     kv_sync_thread(this),
     kv_finalize_thread(this),
     min_alloc_size(_min_alloc_size),
@@ -3534,6 +3558,8 @@ const char **BlueStore::get_tracked_conf_keys() const
     "bluestore_compression_required_ratio",
     "bluestore_max_alloc_size",
     "bluestore_prefer_deferred_size",
+    "bluestore_prefer_deferred_size_hdd",
+    "bluestore_prefer_deferred_size_ssd",
     "bluestore_deferred_batch_ops",
     "bluestore_deferred_batch_ops_hdd",
     "bluestore_deferred_batch_ops_ssd",
@@ -3573,6 +3599,8 @@ void BlueStore::handle_conf_change(const struct md_config_t *conf,
     }
   }
   if (changed.count("bluestore_prefer_deferred_size") ||
+      changed.count("bluestore_prefer_deferred_size_hdd") ||
+      changed.count("bluestore_prefer_deferred_size_ssd") ||
       changed.count("bluestore_max_alloc_size") ||
       changed.count("bluestore_deferred_batch_ops") ||
       changed.count("bluestore_deferred_batch_ops_hdd") ||
@@ -3621,8 +3649,8 @@ void BlueStore::_set_compression()
     return;
   }
 
-  if (cct->_conf->bluestore_compression_max_blob_size) {
-    comp_min_blob_size = cct->_conf->bluestore_compression_max_blob_size;
+  if (cct->_conf->bluestore_compression_min_blob_size) {
+    comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size;
   } else {
     assert(bdev);
     if (bdev->is_rotational()) {
@@ -3736,17 +3764,17 @@ int BlueStore::_set_cache_sizes()
     (double)1.0 - (double)cache_meta_ratio - (double)cache_kv_ratio;
 
   if (cache_meta_ratio < 0 || cache_meta_ratio > 1.0) {
-    derr << __func__ << "bluestore_cache_meta_ratio (" << cache_meta_ratio
+    derr << __func__ << " bluestore_cache_meta_ratio (" << cache_meta_ratio
         << ") must be in range [0,1.0]" << dendl;
     return -EINVAL;
   }
   if (cache_kv_ratio < 0 || cache_kv_ratio > 1.0) {
-    derr << __func__ << "bluestore_cache_kv_ratio (" << cache_kv_ratio
+    derr << __func__ << " bluestore_cache_kv_ratio (" << cache_kv_ratio
         << ") must be in range [0,1.0]" << dendl;
     return -EINVAL;
   }
   if (cache_meta_ratio + cache_kv_ratio > 1.0) {
-    derr << __func__ << "bluestore_cache_meta_ratio (" << cache_meta_ratio
+    derr << __func__ << " bluestore_cache_meta_ratio (" << cache_meta_ratio
         << ") + bluestore_cache_kv_ratio (" << cache_kv_ratio
         << ") = " << cache_meta_ratio + cache_kv_ratio << "; must be <= 1.0"
         << dendl;
@@ -3764,6 +3792,36 @@ int BlueStore::_set_cache_sizes()
   return 0;
 }
 
+int BlueStore::write_meta(const std::string& key, const std::string& value)
+{
+  bluestore_bdev_label_t label;
+  string p = path + "/block";
+  int r = _read_bdev_label(cct, p, &label);
+  if (r < 0) {
+    return ObjectStore::write_meta(key, value);
+  }
+  label.meta[key] = value;
+  r = _write_bdev_label(cct, p, label);
+  assert(r == 0);
+  return ObjectStore::write_meta(key, value);
+}
+
+int BlueStore::read_meta(const std::string& key, std::string *value)
+{
+  bluestore_bdev_label_t label;
+  string p = path + "/block";
+  int r = _read_bdev_label(cct, p, &label);
+  if (r < 0) {
+    return ObjectStore::read_meta(key, value);
+  }
+  auto i = label.meta.find(key);
+  if (i == label.meta.end()) {
+    return ObjectStore::read_meta(key, value);
+  }
+  *value = i->second;
+  return 0;
+}
+
 void BlueStore::_init_logger()
 {
   PerfCountersBuilder b(cct, "bluestore",
@@ -3901,6 +3959,8 @@ void BlueStore::_init_logger()
   b.add_u64_counter(l_bluestore_gc_merged, "bluestore_gc_merged",
                    "Sum for extents that have been merged due to garbage "
                    "collection");
+  b.add_u64_counter(l_bluestore_read_eio, "bluestore_read_eio",
+                    "Read EIO errors propagated to high level callers");
   logger = b.create_perf_counters();
   cct->get_perfcounters_collection()->add(logger);
 }
@@ -3939,6 +3999,12 @@ int BlueStore::get_block_device_fsid(CephContext* cct, const string& path,
 
 int BlueStore::_open_path()
 {
+  // sanity check(s)
+  if (cct->_conf->get_val<uint64_t>("osd_max_object_size") >=
+      4*1024*1024*1024ull) {
+    derr << __func__ << " osd_max_object_size >= 4GB; BlueStore has hard limit of 4GB." << dendl;
+    return -EINVAL;
+  }
   assert(path_fd < 0);
   path_fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_DIRECTORY));
   if (path_fd < 0) {
@@ -3956,7 +4022,8 @@ void BlueStore::_close_path()
   path_fd = -1;
 }
 
-int BlueStore::_write_bdev_label(string path, bluestore_bdev_label_t label)
+int BlueStore::_write_bdev_label(CephContext *cct,
+                                string path, bluestore_bdev_label_t label)
 {
   dout(10) << __func__ << " path " << path << " label " << label << dendl;
   bufferlist bl;
@@ -3980,6 +4047,11 @@ int BlueStore::_write_bdev_label(string path, bluestore_bdev_label_t label)
     derr << __func__ << " failed to write to " << path
         << ": " << cpp_strerror(r) << dendl;
   }
+  r = ::fsync(fd);
+  if (r < 0) {
+    derr << __func__ << " failed to fsync " << path
+        << ": " << cpp_strerror(r) << dendl;
+  }
   VOID_TEMP_FAILURE_RETRY(::close(fd));
   return r;
 }
@@ -4014,10 +4086,10 @@ int BlueStore::_read_bdev_label(CephContext* cct, string path,
     ::decode(expected_crc, p);
   }
   catch (buffer::error& e) {
-    derr << __func__ << " unable to decode label at offset " << p.get_off()
+    dout(2) << __func__ << " unable to decode label at offset " << p.get_off()
         << ": " << e.what()
         << dendl;
-    return -EINVAL;
+    return -ENOENT;
   }
   if (crc != expected_crc) {
     derr << __func__ << " bad crc on label, expected " << expected_crc
@@ -4037,7 +4109,7 @@ int BlueStore::_check_or_set_bdev_label(
     label.size = size;
     label.btime = ceph_clock_now();
     label.description = desc;
-    int r = _write_bdev_label(path, label);
+    int r = _write_bdev_label(cct, path, label);
     if (r < 0)
       return r;
   } else {
@@ -4149,18 +4221,22 @@ int BlueStore::_open_fm(bool create)
       bl.append(freelist_type);
       t->set(PREFIX_SUPER, "freelist_type", bl);
     }
-    fm->create(bdev->get_size(), t);
+    // being able to allocate in units less than bdev block size 
+    // seems to be a bad idea.
+    assert( cct->_conf->bdev_block_size <= (int64_t)min_alloc_size);
+    fm->create(bdev->get_size(), (int64_t)min_alloc_size, t);
 
     // allocate superblock reserved space.  note that we do not mark
     // bluefs space as allocated in the freelist; we instead rely on
     // bluefs_extents.
-    fm->allocate(0, SUPER_RESERVED, t);
+    uint64_t reserved = ROUND_UP_TO(MAX(SUPER_RESERVED, min_alloc_size),
+                                   min_alloc_size);
+    fm->allocate(0, reserved, t);
 
-    uint64_t reserved = 0;
     if (cct->_conf->bluestore_bluefs) {
       assert(bluefs_extents.num_intervals() == 1);
       interval_set<uint64_t>::iterator p = bluefs_extents.begin();
-      reserved = p.get_start() + p.get_len();
+      reserved = ROUND_UP_TO(p.get_start() + p.get_len(), min_alloc_size);
       dout(20) << __func__ << " reserved 0x" << std::hex << reserved << std::dec
               << " for bluefs" << dendl;
       bufferlist bl;
@@ -4168,8 +4244,6 @@ int BlueStore::_open_fm(bool create)
       t->set(PREFIX_SUPER, "bluefs_extents", bl);
       dout(20) << __func__ << " bluefs_extents 0x" << std::hex << bluefs_extents
               << std::dec << dendl;
-    } else {
-      reserved = SUPER_RESERVED;
     }
 
     if (cct->_conf->bluestore_debug_prefill > 0) {
@@ -4216,7 +4290,7 @@ int BlueStore::_open_fm(bool create)
     db->submit_transaction_sync(t);
   }
 
-  int r = fm->init();
+  int r = fm->init(bdev->get_size());
   if (r < 0) {
     derr << __func__ << " freelist init failed: " << cpp_strerror(r) << dendl;
     delete fm;
@@ -4396,6 +4470,17 @@ bool BlueStore::is_rotational()
   return rotational;
 }
 
+bool BlueStore::is_journal_rotational()
+{
+  if (!bluefs) {
+    dout(5) << __func__ << " bluefs disabled, default to store media type"
+            << dendl;
+    return is_rotational();
+  }
+  dout(10) << __func__ << " " << (int)bluefs->wal_is_rotational() << dendl;
+  return bluefs->wal_is_rotational();
+}
+
 bool BlueStore::test_mount_in_use()
 {
   // most error conditions mean the mount is not in use (e.g., because
@@ -4501,13 +4586,16 @@ int BlueStore::_open_db(bool create)
       }
       bluefs_shared_bdev = BlueFS::BDEV_SLOW;
       bluefs_single_shared_device = false;
-    } else if (::lstat(bfn.c_str(), &st) == -1) {
-      bluefs_shared_bdev = BlueFS::BDEV_DB;
     } else {
-      //symlink exist is bug
-      derr << __func__ << " " << bfn << " link target doesn't exist" << dendl;
       r = -errno;
-      goto free_bluefs;
+      if (::lstat(bfn.c_str(), &st) == -1) {
+       r = 0;
+       bluefs_shared_bdev = BlueFS::BDEV_DB;
+      } else {
+       derr << __func__ << " " << bfn << " symlink exists but target unusable: "
+            << cpp_strerror(r) << dendl;
+       goto free_bluefs;
+      }
     }
 
     // shared device
@@ -4524,6 +4612,13 @@ int BlueStore::_open_db(bool create)
        bdev->get_size() * (cct->_conf->bluestore_bluefs_min_ratio +
                            cct->_conf->bluestore_bluefs_gift_ratio);
       initial = MAX(initial, cct->_conf->bluestore_bluefs_min);
+      if (cct->_conf->bluefs_alloc_size % min_alloc_size) {
+       derr << __func__ << " bluefs_alloc_size 0x" << std::hex
+            << cct->_conf->bluefs_alloc_size << " is not a multiple of "
+            << "min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+       r = -EINVAL;
+       goto free_bluefs;
+      }
       // align to bluefs's alloc_size
       initial = P2ROUNDUP(initial, cct->_conf->bluefs_alloc_size);
       // put bluefs in the middle of the device in case it is an HDD
@@ -4562,13 +4657,16 @@ int BlueStore::_open_db(bool create)
       }
       cct->_conf->set_val("rocksdb_separate_wal_dir", "true");
       bluefs_single_shared_device = false;
-    } else if (::lstat(bfn.c_str(), &st) == -1) {
-      cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
     } else {
-      //symlink exist is bug
-      derr << __func__ << " " << bfn << " link target doesn't exist" << dendl;
       r = -errno;
-      goto free_bluefs;
+      if (::lstat(bfn.c_str(), &st) == -1) {
+       r = 0;
+       cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
+      } else {
+       derr << __func__ << " " << bfn << " symlink exists but target unusable: "
+            << cpp_strerror(r) << dendl;
+       goto free_bluefs;
+      }
     }
 
     if (create) {
@@ -4796,9 +4894,11 @@ int BlueStore::_balance_bluefs_freespace(PExtentVector *extents)
             << " > max_ratio " << cct->_conf->bluestore_bluefs_max_ratio
             << ", should reclaim " << pretty_si_t(reclaim) << dendl;
   }
+
+  // don't take over too much of the freespace
+  uint64_t free_cap = cct->_conf->bluestore_bluefs_max_ratio * total_free;
   if (bluefs_total < cct->_conf->bluestore_bluefs_min &&
-    cct->_conf->bluestore_bluefs_min <
-      (uint64_t)(cct->_conf->bluestore_bluefs_max_ratio * total_free)) {
+      cct->_conf->bluestore_bluefs_min < free_cap) {
     uint64_t g = cct->_conf->bluestore_bluefs_min - bluefs_total;
     dout(10) << __func__ << " bluefs_total " << bluefs_total
             << " < min " << cct->_conf->bluestore_bluefs_min
@@ -4807,6 +4907,17 @@ int BlueStore::_balance_bluefs_freespace(PExtentVector *extents)
       gift = g;
     reclaim = 0;
   }
+  uint64_t min_free = cct->_conf->get_val<uint64_t>("bluestore_bluefs_min_free");
+  if (bluefs_free < min_free &&
+      min_free < free_cap) {
+    uint64_t g = min_free - bluefs_free;
+    dout(10) << __func__ << " bluefs_free " << bluefs_total
+            << " < min " << min_free
+            << ", should gift " << pretty_si_t(g) << dendl;
+    if (g > gift)
+      gift = g;
+    reclaim = 0;
+  }
 
   if (gift) {
     // round up to alloc size
@@ -4825,12 +4936,19 @@ int BlueStore::_balance_bluefs_freespace(PExtentVector *extents)
     int64_t alloc_len = alloc->allocate(gift, cct->_conf->bluefs_alloc_size,
                                        0, 0, &exts);
 
-    if (alloc_len < (int64_t)gift) {
-      derr << __func__ << " allocate failed on 0x" << std::hex << gift
-           << " min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+    if (alloc_len <= 0) {
+      dout(1) << __func__ << " no allocate on 0x" << std::hex << gift
+              << " min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+      alloc->unreserve(gift);
+      alloc->dump();
+      return 0;
+    } else if (alloc_len < (int64_t)gift) {
+      dout(1) << __func__ << " insufficient allocate on 0x" << std::hex << gift
+              << " min_alloc_size 0x" << min_alloc_size 
+             << " allocated 0x" << alloc_len
+             << std::dec << dendl;
+      alloc->unreserve(gift - alloc_len);
       alloc->dump();
-      assert(0 == "allocate failed, wtf");
-      return -ENOSPC;
     }
     for (auto& p : exts) {
       bluestore_pextent_t e = bluestore_pextent_t(p);
@@ -4886,6 +5004,7 @@ void BlueStore::_commit_bluefs_freespace(
 
 int BlueStore::_open_collections(int *errors)
 {
+  dout(10) << __func__ << dendl;
   assert(coll_map.empty());
   KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
   for (it->upper_bound(string());
@@ -4907,7 +5026,8 @@ int BlueStore::_open_collections(int *errors)
              << pretty_binary_string(it->key()) << dendl;
         return -EIO;
       }   
-      dout(20) << __func__ << " opened " << cid << " " << c << dendl;
+      dout(20) << __func__ << " opened " << cid << " " << c
+              << " " << c->cnode << dendl;
       coll_map[cid] = c;
     } else {
       derr << __func__ << " unrecognized collection " << it->key() << dendl;
@@ -4991,30 +5111,13 @@ int BlueStore::_setup_block_symlink_or_file(
        }
 
        if (cct->_conf->bluestore_block_preallocate_file) {
-#ifdef HAVE_POSIX_FALLOCATE
-         r = ::posix_fallocate(fd, 0, size);
-         if (r) {
+          r = ::ceph_posix_fallocate(fd, 0, size);
+          if (r > 0) {
            derr << __func__ << " failed to prefallocate " << name << " file to "
              << size << ": " << cpp_strerror(r) << dendl;
            VOID_TEMP_FAILURE_RETRY(::close(fd));
            return -r;
          }
-#else
-         char data[1024*128];
-         for (uint64_t off = 0; off < size; off += sizeof(data)) {
-           if (off + sizeof(data) > size)
-             r = ::write(fd, data, size - off);
-           else
-             r = ::write(fd, data, sizeof(data));
-           if (r < 0) {
-             r = -errno;
-             derr << __func__ << " failed to prefallocate w/ write " << name << " file to "
-               << size << ": " << cpp_strerror(r) << dendl;
-             VOID_TEMP_FAILURE_RETRY(::close(fd));
-             return r;
-           }
-         }
-#endif
        }
        dout(1) << __func__ << " resized " << name << " file to "
                << pretty_si_t(size) << "B" << dendl;
@@ -5129,6 +5232,28 @@ int BlueStore::mkfs()
   if (r < 0)
     goto out_close_fsid;
 
+  // choose min_alloc_size
+  if (cct->_conf->bluestore_min_alloc_size) {
+    min_alloc_size = cct->_conf->bluestore_min_alloc_size;
+  } else {
+    assert(bdev);
+    if (bdev->is_rotational()) {
+      min_alloc_size = cct->_conf->bluestore_min_alloc_size_hdd;
+    } else {
+      min_alloc_size = cct->_conf->bluestore_min_alloc_size_ssd;
+    }
+  }
+
+  // make sure min_alloc_size is power of 2 aligned.
+  if (!ISP2(min_alloc_size)) {
+    derr << __func__ << " min_alloc_size 0x"
+        << std::hex << min_alloc_size << std::dec
+        << " is not power of 2 aligned!"
+        << dendl;
+    r = -EINVAL;
+    goto out_close_bdev;
+  }
+
   r = _open_db(true);
   if (r < 0)
     goto out_close_bdev;
@@ -5146,28 +5271,6 @@ int BlueStore::mkfs()
       t->set(PREFIX_SUPER, "blobid_max", bl);
     }
 
-    // choose min_alloc_size
-    if (cct->_conf->bluestore_min_alloc_size) {
-      min_alloc_size = cct->_conf->bluestore_min_alloc_size;
-    } else {
-      assert(bdev);
-      if (bdev->is_rotational()) {
-       min_alloc_size = cct->_conf->bluestore_min_alloc_size_hdd;
-      } else {
-       min_alloc_size = cct->_conf->bluestore_min_alloc_size_ssd;
-      }
-    }
-
-    // make sure min_alloc_size is power of 2 aligned.
-    if (!ISP2(min_alloc_size)) {
-      derr << __func__ << " min_alloc_size 0x"
-           << std::hex << min_alloc_size << std::dec
-           << " is not power of 2 aligned!"
-           << dendl;
-      r = -EINVAL;
-      goto out_close_fm;
-    }
-
     {
       bufferlist bl;
       ::encode((uint64_t)min_alloc_size, bl);
@@ -5184,7 +5287,7 @@ int BlueStore::mkfs()
   if (r < 0)
     goto out_close_fm;
 
-  r = write_meta("bluefs", stringify((int)cct->_conf->bluestore_bluefs));
+  r = write_meta("bluefs", stringify(bluefs ? 1 : 0));
   if (r < 0)
     goto out_close_fm;
 
@@ -5247,6 +5350,8 @@ int BlueStore::_mount(bool kv_only)
 {
   dout(1) << __func__ << " path " << path << dendl;
 
+  _kv_only = kv_only;
+
   {
     string type;
     int r = read_meta("type", &type);
@@ -5332,7 +5437,6 @@ int BlueStore::_mount(bool kv_only)
 
   mempool_thread.init();
 
-
   mounted = true;
   return 0;
 
@@ -5357,23 +5461,23 @@ int BlueStore::_mount(bool kv_only)
 
 int BlueStore::umount()
 {
-  assert(mounted);
+  assert(_kv_only || mounted);
   dout(1) << __func__ << dendl;
 
   _osr_drain_all();
   _osr_unregister_all();
 
-  mempool_thread.shutdown();
-
-  dout(20) << __func__ << " stopping kv thread" << dendl;
-  _kv_stop();
-  _reap_collections();
-  _flush_cache();
-  dout(20) << __func__ << " closing" << dendl;
-
   mounted = false;
-  _close_alloc();
-  _close_fm();
+  if (!_kv_only) {
+    mempool_thread.shutdown();
+    dout(20) << __func__ << " stopping kv thread" << dendl;
+    _kv_stop();
+    _flush_cache();
+    dout(20) << __func__ << " closing" << dendl;
+
+    _close_alloc();
+    _close_fm();
+  }
   _close_db();
   _close_bdev();
   _close_fsid();
@@ -5395,7 +5499,6 @@ static void apply(uint64_t off,
                   uint64_t len,
                   uint64_t granularity,
                   BlueStore::mempool_dynamic_bitset &bitset,
-                 const char *what,
                   std::function<void(uint64_t,
                                     BlueStore::mempool_dynamic_bitset &)> f) {
   auto end = ROUND_UP_TO(off + len, granularity);
@@ -5411,6 +5514,7 @@ int BlueStore::_fsck_check_extents(
   const PExtentVector& extents,
   bool compressed,
   mempool_dynamic_bitset &used_blocks,
+  uint64_t granularity,
   store_statfs_t& expected_statfs)
 {
   dout(30) << __func__ << " oid " << oid << " extents " << extents << dendl;
@@ -5424,8 +5528,9 @@ int BlueStore::_fsck_check_extents(
     }
     bool already = false;
     apply(
-      e.offset, e.length, block_size, used_blocks, __func__,
+      e.offset, e.length, granularity, used_blocks,
       [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
        if (bs.test(pos))
          already = true;
        else
@@ -5445,10 +5550,13 @@ int BlueStore::_fsck_check_extents(
   return errors;
 }
 
-int BlueStore::fsck(bool deep)
+int BlueStore::_fsck(bool deep, bool repair)
 {
-  dout(1) << __func__ << (deep ? " (deep)" : " (shallow)") << " start" << dendl;
+  dout(1) << __func__
+         << (repair ? " fsck" : " repair")
+         << (deep ? " (deep)" : " (shallow)") << " start" << dendl;
   int errors = 0;
+  int repaired = 0;
 
   typedef btree::btree_set<
     uint64_t,std::less<uint64_t>,
@@ -5526,10 +5634,11 @@ int BlueStore::fsck(bool deep)
   if (r < 0)
     goto out_scan;
 
-  used_blocks.resize(bdev->get_size() / block_size);
+  used_blocks.resize(fm->get_alloc_units());
   apply(
-    0, SUPER_RESERVED, block_size, used_blocks, "0~SUPER_RESERVED",
+    0, MAX(min_alloc_size, SUPER_RESERVED), fm->get_alloc_size(), used_blocks,
     [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
       bs.set(pos);
     }
   );
@@ -5537,8 +5646,9 @@ int BlueStore::fsck(bool deep)
   if (bluefs) {
     for (auto e = bluefs_extents.begin(); e != bluefs_extents.end(); ++e) {
       apply(
-        e.get_start(), e.get_len(), block_size, used_blocks, "bluefs",
+        e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
         [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
           bs.set(pos);
         }
        );
@@ -5572,7 +5682,7 @@ int BlueStore::fsck(bool deep)
       if (is_extent_shard_key(it->key())) {
        while (!expecting_shards.empty() &&
               expecting_shards.front() < it->key()) {
-         derr << __func__ << " error: missing shard key "
+         derr << "fsck error: missing shard key "
               << pretty_binary_string(expecting_shards.front())
               << dendl;
          ++errors;
@@ -5588,18 +5698,18 @@ int BlueStore::fsck(bool deep)
         uint32_t offset;
         string okey;
         get_key_extent_shard(it->key(), &okey, &offset);
-        derr << __func__ << " error: stray shard 0x" << std::hex << offset
+        derr << "fsck error: stray shard 0x" << std::hex << offset
             << std::dec << dendl;
         if (expecting_shards.empty()) {
-          derr << __func__ << " error: " << pretty_binary_string(it->key())
+          derr << "fsck error: " << pretty_binary_string(it->key())
                << " is unexpected" << dendl;
           ++errors;
           continue;
         }
        while (expecting_shards.front() > it->key()) {
-         derr << __func__ << " error:   saw " << pretty_binary_string(it->key())
+         derr << "fsck error:   saw " << pretty_binary_string(it->key())
               << dendl;
-         derr << __func__ << " error:   exp "
+         derr << "fsck error:   exp "
               << pretty_binary_string(expecting_shards.front()) << dendl;
          ++errors;
          expecting_shards.pop_front();
@@ -5613,7 +5723,7 @@ int BlueStore::fsck(bool deep)
       ghobject_t oid;
       int r = get_key_object(it->key(), &oid);
       if (r < 0) {
-        derr << __func__ << " error: bad object key "
+        derr << "fsck error: bad object key "
              << pretty_binary_string(it->key()) << dendl;
        ++errors;
        continue;
@@ -5633,18 +5743,19 @@ int BlueStore::fsck(bool deep)
          }
        }
        if (!c) {
-          derr << __func__ << " error: stray object " << oid
+          derr << "fsck error: stray object " << oid
                << " not owned by any collection" << dendl;
          ++errors;
          continue;
        }
        c->cid.is_pg(&pgid);
-       dout(20) << __func__ << "  collection " << c->cid << dendl;
+       dout(20) << __func__ << "  collection " << c->cid << " " << c->cnode
+                << dendl;
       }
 
       if (!expecting_shards.empty()) {
        for (auto &k : expecting_shards) {
-         derr << __func__ << " error: missing shard key "
+         derr << "fsck error: missing shard key "
               << pretty_binary_string(k) << dendl;
        }
        ++errors;
@@ -5656,12 +5767,12 @@ int BlueStore::fsck(bool deep)
       OnodeRef o = c->get_onode(oid, false);
       if (o->onode.nid) {
        if (o->onode.nid > nid_max) {
-         derr << __func__ << " error: " << oid << " nid " << o->onode.nid
+         derr << "fsck error: " << oid << " nid " << o->onode.nid
               << " > nid_max " << nid_max << dendl;
          ++errors;
        }
        if (used_nids.count(o->onode.nid)) {
-         derr << __func__ << " error: " << oid << " nid " << o->onode.nid
+         derr << "fsck error: " << oid << " nid " << o->onode.nid
               << " already in use" << dendl;
          ++errors;
          continue; // go for next object
@@ -5683,7 +5794,7 @@ int BlueStore::fsck(bool deep)
        get_extent_shard_key(o->key, s.shard_info->offset,
                             &expecting_shards.back());
        if (s.shard_info->offset >= o->onode.size) {
-         derr << __func__ << " error: " << oid << " shard 0x" << std::hex
+         derr << "fsck error: " << oid << " shard 0x" << std::hex
               << s.shard_info->offset << " past EOF at 0x" << o->onode.size
               << std::dec << dendl;
          ++errors;
@@ -5697,14 +5808,14 @@ int BlueStore::fsck(bool deep)
       for (auto& l : o->extent_map.extent_map) {
        dout(20) << __func__ << "    " << l << dendl;
        if (l.logical_offset < pos) {
-         derr << __func__ << " error: " << oid << " lextent at 0x"
+         derr << "fsck error: " << oid << " lextent at 0x"
               << std::hex << l.logical_offset
               << " overlaps with the previous, which ends at 0x" << pos
               << std::dec << dendl;
          ++errors;
        }
        if (o->extent_map.spans_shard(l.logical_offset, l.length)) {
-         derr << __func__ << " error: " << oid << " lextent at 0x"
+         derr << "fsck error: " << oid << " lextent at 0x"
               << std::hex << l.logical_offset << "~" << l.length
               << " spans a shard boundary"
               << std::dec << dendl;
@@ -5750,7 +5861,7 @@ int BlueStore::fsck(bool deep)
                 << std::dec << " for " << *i.first << dendl;
        const bluestore_blob_t& blob = i.first->get_blob();
        if (i.second & blob.unused) {
-         derr << __func__ << " error: " << oid << " blob claims unused 0x"
+         derr << "fsck error: " << oid << " blob claims unused 0x"
               << std::hex << blob.unused
               << " but extents reference 0x" << i.second
               << " on blob " << *i.first << dendl;
@@ -5772,7 +5883,7 @@ int BlueStore::fsck(bool deep)
            if ((blob.unused & mask) == mask) {
              // this csum chunk region is marked unused
              if (blob.get_csum_item(p) != 0) {
-               derr << __func__ << " error: " << oid
+               derr << "fsck error: " << oid
                     << " blob claims csum chunk 0x" << std::hex << pos
                     << "~" << csum_chunk_size
                     << " is unused (mask 0x" << mask << " of unused 0x"
@@ -5790,7 +5901,7 @@ int BlueStore::fsck(bool deep)
        const bluestore_blob_t& blob = i.first->get_blob();
        bool equal = i.first->get_blob_use_tracker().equal(i.second);
        if (!equal) {
-         derr << __func__ << " error: " << oid << " blob " << *i.first
+         derr << "fsck error: " << oid << " blob " << *i.first
               << " doesn't match expected ref_map " << i.second << dendl;
          ++errors;
        }
@@ -5801,12 +5912,12 @@ int BlueStore::fsck(bool deep)
        }
        if (blob.is_shared()) {
          if (i.first->shared_blob->get_sbid() > blobid_max) {
-           derr << __func__ << " error: " << oid << " blob " << blob
+           derr << "fsck error: " << oid << " blob " << blob
                 << " sbid " << i.first->shared_blob->get_sbid() << " > blobid_max "
                 << blobid_max << dendl;
            ++errors;
          } else if (i.first->shared_blob->get_sbid() == 0) {
-            derr << __func__ << " error: " << oid << " blob " << blob
+            derr << "fsck error: " << oid << " blob " << blob
                  << " marked as shared but has uninitialized sbid"
                  << dendl;
             ++errors;
@@ -5824,6 +5935,7 @@ int BlueStore::fsck(bool deep)
          errors += _fsck_check_extents(oid, blob.get_extents(),
                                        blob.is_compressed(),
                                        used_blocks,
+                                       fm->get_alloc_size(),
                                        expected_statfs);
         }
       }
@@ -5832,14 +5944,14 @@ int BlueStore::fsck(bool deep)
        int r = _do_read(c.get(), o, 0, o->onode.size, bl, 0);
        if (r < 0) {
          ++errors;
-         derr << __func__ << " error: " << oid << " error during read: "
+         derr << "fsck error: " << oid << " error during read: "
               << cpp_strerror(r) << dendl;
        }
       }
       // omap
       if (o->onode.has_omap()) {
        if (used_omap_head.count(o->onode.nid)) {
-         derr << __func__ << " error: " << oid << " omap_head " << o->onode.nid
+         derr << "fsck error: " << oid << " omap_head " << o->onode.nid
               << " already in use" << dendl;
          ++errors;
        } else {
@@ -5855,14 +5967,14 @@ int BlueStore::fsck(bool deep)
       string key = it->key();
       uint64_t sbid;
       if (get_key_shared_blob(key, &sbid)) {
-       derr << __func__ << " error: bad key '" << key
+       derr << "fsck error: bad key '" << key
             << "' in shared blob namespace" << dendl;
        ++errors;
        continue;
       }
       auto p = sb_info.find(sbid);
       if (p == sb_info.end()) {
-       derr << __func__ << " error: found stray shared blob data for sbid 0x"
+       derr << "fsck error: found stray shared blob data for sbid 0x"
             << std::hex << sbid << std::dec << dendl;
        ++errors;
       } else {
@@ -5874,7 +5986,7 @@ int BlueStore::fsck(bool deep)
        ::decode(shared_blob, blp);
        dout(20) << __func__ << "  " << *sbi.sb << " " << shared_blob << dendl;
        if (shared_blob.ref_map != sbi.ref_map) {
-         derr << __func__ << " error: shared blob 0x" << std::hex << sbid
+         derr << "fsck error: shared blob 0x" << std::hex << sbid
               << std::dec << " ref_map " << shared_blob.ref_map
               << " != expected " << sbi.ref_map << dendl;
          ++errors;
@@ -5886,18 +5998,20 @@ int BlueStore::fsck(bool deep)
        errors += _fsck_check_extents(p->second.oids.front(),
                                      extents,
                                      p->second.compressed,
-                                     used_blocks, expected_statfs);
+                                     used_blocks,
+                                     fm->get_alloc_size(),
+                                     expected_statfs);
        sb_info.erase(p);
       }
     }
   }
   for (auto &p : sb_info) {
-    derr << __func__ << " error: shared_blob 0x" << p.first
+    derr << "fsck error: shared_blob 0x" << p.first
         << " key is missing (" << *p.second.sb << ")" << dendl;
     ++errors;
   }
   if (!(actual_statfs == expected_statfs)) {
-    derr << __func__ << " error: actual " << actual_statfs
+    derr << "fsck error: actual " << actual_statfs
         << " != expected " << expected_statfs << dendl;
     ++errors;
   }
@@ -5909,7 +6023,7 @@ int BlueStore::fsck(bool deep)
       uint64_t omap_head;
       _key_decode_u64(it->key().c_str(), &omap_head);
       if (used_omap_head.count(omap_head) == 0) {
-       derr << __func__ << " error: found stray omap data on omap_head "
+       derr << "fsck error: found stray omap data on omap_head "
             << omap_head << dendl;
        ++errors;
       }
@@ -5926,7 +6040,7 @@ int BlueStore::fsck(bool deep)
       try {
        ::decode(wt, p);
       } catch (buffer::error& e) {
-       derr << __func__ << " error: failed to decode deferred txn "
+       derr << "fsck error: failed to decode deferred txn "
             << pretty_binary_string(it->key()) << dendl;
        r = -EIO;
         goto out_scan;
@@ -5936,8 +6050,9 @@ int BlueStore::fsck(bool deep)
               << " released 0x" << std::hex << wt.released << std::dec << dendl;
       for (auto e = wt.released.begin(); e != wt.released.end(); ++e) {
         apply(
-          e.get_start(), e.get_len(), block_size, used_blocks, "deferred",
+          e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
           [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
             bs.set(pos);
           }
         );
@@ -5951,8 +6066,9 @@ int BlueStore::fsck(bool deep)
     // know they are allocated.
     for (auto e = bluefs_extents.begin(); e != bluefs_extents.end(); ++e) {
       apply(
-        e.get_start(), e.get_len(), block_size, used_blocks, "bluefs_extents",
+        e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
         [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+       assert(pos < bs.size());
          bs.reset(pos);
         }
       );
@@ -5962,8 +6078,9 @@ int BlueStore::fsck(bool deep)
     while (fm->enumerate_next(&offset, &length)) {
       bool intersects = false;
       apply(
-        offset, length, block_size, used_blocks, "free",
+        offset, length, fm->get_alloc_size(), used_blocks,
         [&](uint64_t pos, mempool_dynamic_bitset &bs) {
+         assert(pos < bs.size());
           if (bs.test(pos)) {
             intersects = true;
           } else {
@@ -5972,20 +6089,47 @@ int BlueStore::fsck(bool deep)
         }
       );
       if (intersects) {
-       derr << __func__ << " error: free extent 0x" << std::hex << offset
-            << "~" << length << std::dec
-            << " intersects allocated blocks" << dendl;
-       ++errors;
+       if (offset == SUPER_RESERVED &&
+           length == min_alloc_size - SUPER_RESERVED) {
+         // this is due to the change just after luminous to min_alloc_size
+         // granularity allocations, and our baked in assumption at the top
+         // of _fsck that 0~ROUND_UP_TO(SUPER_RESERVED,min_alloc_size) is used
+         // (vs luminous's ROUND_UP_TO(SUPER_RESERVED,block_size)).  harmless,
+         // since we will never allocate this region below min_alloc_size.
+         dout(10) << __func__ << " ignoring free extent between SUPER_RESERVED"
+                  << " and min_alloc_size, 0x" << std::hex << offset << "~"
+                  << length << dendl;
+       } else {
+         derr << "fsck error: free extent 0x" << std::hex << offset
+              << "~" << length << std::dec
+              << " intersects allocated blocks" << dendl;
+         ++errors;
+       }
       }
     }
     fm->enumerate_reset();
     size_t count = used_blocks.count();
     if (used_blocks.size() != count) {
       assert(used_blocks.size() > count);
-      derr << __func__ << " error: leaked some space;"
-          << (used_blocks.size() - count) * min_alloc_size
-          << " bytes leaked" << dendl;
       ++errors;
+      used_blocks.flip();
+      size_t start = used_blocks.find_first();
+      while (start != decltype(used_blocks)::npos) {
+       size_t cur = start;
+       while (true) {
+         size_t next = used_blocks.find_next(cur);
+         if (next != cur + 1) {
+           derr << "fsck error: leaked extent 0x" << std::hex
+                << ((uint64_t)start * fm->get_alloc_size()) << "~"
+                << ((cur + 1 - start) * fm->get_alloc_size()) << std::dec
+                << dendl;
+           start = next;
+           break;
+         }
+         cur = next;
+       }
+      }
+      used_blocks.flip();
     }
   }
 
@@ -6020,9 +6164,10 @@ int BlueStore::fsck(bool deep)
          << dendl;
 
   utime_t duration = ceph_clock_now() - start;
-  dout(1) << __func__ << " finish with " << errors << " errors in "
+  dout(1) << __func__ << " finish with " << errors << " errors, " << repaired
+         << " repaired, " << (errors - repaired) << " remaining in "
          << duration << " seconds" << dendl;
-  return errors;
+  return errors - repaired;
 }
 
 void BlueStore::collect_metadata(map<string,string> *pm)
@@ -6045,14 +6190,13 @@ int BlueStore::statfs(struct store_statfs_t *buf)
   buf->available = alloc->get_free();
 
   if (bluefs) {
-    // part of our shared device is "free" according to BlueFS
-    // Don't include bluestore_bluefs_min because that space can't
-    // be used for any other purpose.
-    buf->available += bluefs->get_free(bluefs_shared_bdev) - cct->_conf->bluestore_bluefs_min;
-
-    // include dedicated db, too, if that isn't the shared device.
-    if (bluefs_shared_bdev != BlueFS::BDEV_DB) {
-      buf->total += bluefs->get_total(BlueFS::BDEV_DB);
+    // part of our shared device is "free" according to BlueFS, but we
+    // can't touch bluestore_bluefs_min of it.
+    int64_t shared_available = std::min(
+      bluefs->get_free(bluefs_shared_bdev),
+      bluefs->get_total(bluefs_shared_bdev) - cct->_conf->bluestore_bluefs_min);
+    if (shared_available > 0) {
+      buf->available += shared_available;
     }
   }
 
@@ -6085,23 +6229,26 @@ BlueStore::CollectionRef BlueStore::_get_collection(const coll_t& cid)
 void BlueStore::_queue_reap_collection(CollectionRef& c)
 {
   dout(10) << __func__ << " " << c << " " << c->cid << dendl;
-  std::lock_guard<std::mutex> l(reap_lock);
+  // _reap_collections and this in the same thread,
+  // so no need a lock.
   removed_collections.push_back(c);
 }
 
 void BlueStore::_reap_collections()
 {
+
   list<CollectionRef> removed_colls;
   {
-    std::lock_guard<std::mutex> l(reap_lock);
-    removed_colls.swap(removed_collections);
+    // _queue_reap_collection and this in the same thread.
+    // So no need a lock.
+    if (!removed_collections.empty())
+      removed_colls.swap(removed_collections);
+    else
+      return;
   }
 
-  bool all_reaped = true;
-
-  for (list<CollectionRef>::iterator p = removed_colls.begin();
-       p != removed_colls.end();
-       ++p) {
+  list<CollectionRef>::iterator p = removed_colls.begin();
+  while (p != removed_colls.end()) {
     CollectionRef c = *p;
     dout(10) << __func__ << " " << c << " " << c->cid << dendl;
     if (c->onode_map.map_any([&](OnodeRef o) {
@@ -6109,19 +6256,21 @@ void BlueStore::_reap_collections()
          if (o->flushing_count.load()) {
            dout(10) << __func__ << " " << c << " " << c->cid << " " << o->oid
                     << " flush_txns " << o->flushing_count << dendl;
-           return false;
+           return true;
          }
-         return true;
+         return false;
        })) {
-      all_reaped = false;
+      ++p;
       continue;
     }
     c->onode_map.clear();
+    p = removed_colls.erase(p);
     dout(10) << __func__ << " " << c << " " << c->cid << " done" << dendl;
   }
-
-  if (all_reaped) {
+  if (removed_colls.empty()) {
     dout(10) << __func__ << " all reaped" << dendl;
+  } else {
+    removed_collections.splice(removed_collections.begin(), removed_colls);
   }
 }
 
@@ -6282,10 +6431,13 @@ int BlueStore::read(
       length = o->onode.size;
 
     r = _do_read(c, o, offset, length, bl, op_flags);
+    if (r == -EIO) {
+      logger->inc(l_bluestore_read_eio);
+    }
   }
 
  out:
-  if (r == 0 && _debug_data_eio(oid)) {
+  if (r >= 0 && _debug_data_eio(oid)) {
     r = -EIO;
     derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
   } else if (cct->_conf->bluestore_debug_random_read_err &&
@@ -6391,7 +6543,7 @@ int BlueStore::_do_read(
       pos += hole;
       left -= hole;
     }
-    BlobRef bptr = lp->blob;
+    BlobRef& bptr = lp->blob;
     unsigned l_off = pos - lp->logical_offset;
     unsigned b_off = l_off + lp->blob_offset;
     unsigned b_len = std::min(left, lp->length - l_off);
@@ -6439,9 +6591,9 @@ int BlueStore::_do_read(
                                     // measure the whole block below.
                                     // The error isn't that much...
   vector<bufferlist> compressed_blob_bls;
-  IOContext ioc(cct, NULL);
+  IOContext ioc(cct, NULL, true); // allow EIO
   for (auto& p : blobs2read) {
-    BlobRef bptr = p.first;
+    const BlobRef& bptr = p.first;
     dout(20) << __func__ << "  blob " << *bptr << std::hex
             << " need " << p.second << std::dec << dendl;
     if (bptr->get_blob().is_compressed()) {
@@ -6466,7 +6618,14 @@ int BlueStore::_do_read(
             return r;
           return 0;
        });
+      if (r < 0) {
+        derr << __func__ << " bdev-read failed: " << cpp_strerror(r) << dendl;
+        if (r == -EIO) {
+          // propagate EIO to caller
+          return r;
+        }
         assert(r == 0);
+      }
     } else {
       // read the pieces
       for (auto& reg : p.second) {
@@ -6504,7 +6663,15 @@ int BlueStore::_do_read(
               return r;
             return 0;
          });
-       assert(r == 0);
+        if (r < 0) {
+          derr << __func__ << " bdev-read failed: " << cpp_strerror(r)
+               << dendl;
+          if (r == -EIO) {
+            // propagate EIO to caller
+            return r;
+          }
+          assert(r == 0);
+        }
        assert(reg.bl.length() == r_len);
       }
     }
@@ -6513,6 +6680,11 @@ int BlueStore::_do_read(
     bdev->aio_submit(&ioc);
     dout(20) << __func__ << " waiting for aio" << dendl;
     ioc.aio_wait();
+    r = ioc.get_return_value();
+    if (r < 0) {
+      assert(r == -EIO); // no other errors allowed
+      return -EIO;
+    }
   }
   logger->tinc(l_bluestore_read_wait_aio_lat, ceph_clock_now() - start);
 
@@ -6520,7 +6692,7 @@ int BlueStore::_do_read(
   auto p = compressed_blob_bls.begin();
   blobs2read_t::iterator b2r_it = blobs2read.begin();
   while (b2r_it != blobs2read.end()) {
-    BlobRef bptr = b2r_it->first;
+    const BlobRef& bptr = b2r_it->first;
     dout(20) << __func__ << "  blob " << *bptr << std::hex
             << " need 0x" << b2r_it->second << std::dec << dendl;
     if (bptr->get_blob().is_compressed()) {
@@ -8075,7 +8247,8 @@ void BlueStore::_txc_release_alloc(TransContext *txc)
 {
   // update allocator with full released set
   if (!cct->_conf->bluestore_debug_no_reuse_blocks) {
-    dout(10) << __func__ << " " << txc << " " << txc->released << dendl;
+    dout(10) << __func__ << " " << txc << " " << std::hex
+             << txc->released << std::dec << dendl;
     for (interval_set<uint64_t>::iterator p = txc->released.begin();
         p != txc->released.end();
         ++p) {
@@ -8197,6 +8370,7 @@ void BlueStore::_kv_start()
     finishers.push_back(f);
   }
 
+  deferred_finisher.start();
   for (auto f : finishers) {
     f->start();
   }
@@ -8225,6 +8399,7 @@ void BlueStore::_kv_stop()
   }
   kv_sync_thread.join();
   kv_finalize_thread.join();
+  assert(removed_collections.empty());
   {
     std::lock_guard<std::mutex> l(kv_lock);
     kv_stop = false;
@@ -8234,6 +8409,8 @@ void BlueStore::_kv_stop()
     kv_finalize_stop = false;
   }
   dout(10) << __func__ << " stopping finishers" << dendl;
+  deferred_finisher.wait_for_empty();
+  deferred_finisher.stop();
   for (auto f : finishers) {
     f->wait_for_empty();
     f->stop();
@@ -8601,9 +8778,16 @@ void BlueStore::deferred_try_submit()
     osrs.push_back(&osr);
   }
   for (auto& osr : osrs) {
-    if (osr->deferred_pending && !osr->deferred_running) {
-      _deferred_submit_unlock(osr.get());
-      deferred_lock.lock();
+    if (osr->deferred_pending) {
+      if (!osr->deferred_running) {
+       _deferred_submit_unlock(osr.get());
+       deferred_lock.lock();
+      } else {
+       dout(20) << __func__ << "  osr " << osr << " already has running"
+                << dendl;
+      }
+    } else {
+      dout(20) << __func__ << "  osr " << osr << " has no pending" << dendl;
     }
   }
 }
@@ -8657,12 +8841,18 @@ void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
     ++i;
   }
 
-  // demote to deferred_submit_lock, then drop that too
-  std::lock_guard<std::mutex> l(deferred_submit_lock);
   deferred_lock.unlock();
   bdev->aio_submit(&b->ioc);
 }
 
+struct C_DeferredTrySubmit : public Context {
+  BlueStore *store;
+  C_DeferredTrySubmit(BlueStore *s) : store(s) {}
+  void finish(int r) {
+    store->deferred_try_submit();
+  }
+};
+
 void BlueStore::_deferred_aio_finish(OpSequencer *osr)
 {
   dout(10) << __func__ << " osr " << osr << dendl;
@@ -8674,13 +8864,14 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
     assert(osr->deferred_running == b);
     osr->deferred_running = nullptr;
     if (!osr->deferred_pending) {
+      dout(20) << __func__ << " dequeueing" << dendl;
       auto q = deferred_queue.iterator_to(*osr);
       deferred_queue.erase(q);
     } else if (deferred_aggressive) {
       dout(20) << __func__ << " queuing async deferred_try_submit" << dendl;
-      finishers[0]->queue(new FunctionContext([&](int) {
-           deferred_try_submit();
-         }));
+      deferred_finisher.queue(new C_DeferredTrySubmit(this));
+    } else {
+      dout(20) << __func__ << " leaving queued, more pending" << dendl;
     }
   }
 
@@ -8814,9 +9005,18 @@ int BlueStore::queue_transactions(
   if (txc->deferred_txn) {
     // ensure we do not block here because of deferred writes
     if (!throttle_deferred_bytes.get_or_fail(txc->cost)) {
+      dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
+              << dendl;
+      ++deferred_aggressive;
       deferred_try_submit();
+      {
+       // wake up any previously finished deferred events
+       std::lock_guard<std::mutex> l(kv_lock);
+       kv_cond.notify_one();
+      }
       throttle_deferred_bytes.get(txc->cost);
-    }
+      --deferred_aggressive;
+   }
   }
   utime_t tend = ceph_clock_now();
 
@@ -8994,7 +9194,7 @@ void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t)
     case Transaction::OP_TRUNCATE:
       {
         uint64_t off = op->off;
-       _truncate(txc, c, o, off);
+       r = _truncate(txc, c, o, off);
       }
       break;
 
@@ -9203,7 +9403,7 @@ int BlueStore::_touch(TransContext *txc,
   return r;
 }
 
-void BlueStore::_dump_onode(OnodeRef o, int log_level)
+void BlueStore::_dump_onode(const OnodeRef& o, int log_level)
 {
   if (!cct->_conf->subsys.should_gather(ceph_subsys_bluestore, log_level))
     return;
@@ -9740,20 +9940,10 @@ int BlueStore::_do_alloc_write(
   dout(20) << __func__ << " txc " << txc
           << " " << wctx->writes.size() << " blobs"
           << dendl;
-
-  uint64_t need = 0;
-  auto max_bsize = MAX(wctx->target_blob_size, min_alloc_size);
-  for (auto &wi : wctx->writes) {
-    need += wi.blob_length;
-  }
-  int r = alloc->reserve(need);
-  if (r < 0) {
-    derr << __func__ << " failed to reserve 0x" << std::hex << need << std::dec
-        << dendl;
-    return r;
+  if (wctx->writes.empty()) {
+    return 0;
   }
 
-  uint64_t hint = 0;
   CompressorRef c;
   double crr = 0;
   if (wctx->compress) {
@@ -9778,7 +9968,7 @@ int BlueStore::_do_alloc_write(
       cct->_conf->bluestore_compression_required_ratio,
       [&]() {
         double val;
-        if(coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) {
+        if (coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) {
           return boost::optional<double>(val);
         }
         return boost::optional<double>();
@@ -9793,78 +9983,102 @@ int BlueStore::_do_alloc_write(
     csum,
     [&]() {
       int val;
-      if(coll->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) {
+      if (coll->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) {
         return  boost::optional<int>(val);
       }
       return boost::optional<int>();
     }
   );
 
+  // compress (as needed) and calc needed space
+  uint64_t need = 0;
+  auto max_bsize = MAX(wctx->target_blob_size, min_alloc_size);
   for (auto& wi : wctx->writes) {
-    BlobRef b = wi.b;
-    bluestore_blob_t& dblob = b->dirty_blob();
-    uint64_t b_off = wi.b_off;
-    bufferlist *l = &wi.bl;
-    uint64_t final_length = wi.blob_length;
-    uint64_t csum_length = wi.blob_length;
-    unsigned csum_order = block_size_order;
-    bufferlist compressed_bl;
-    bool compressed = false;
-    if(c && wi.blob_length > min_alloc_size) {
-
+    if (c && wi.blob_length > min_alloc_size) {
       utime_t start = ceph_clock_now();
 
       // compress
-      assert(b_off == 0);
-      assert(wi.blob_length == l->length());
-      bluestore_compression_header_t chdr;
-      chdr.type = c->get_type();
+      assert(wi.b_off == 0);
+      assert(wi.blob_length == wi.bl.length());
+
       // FIXME: memory alignment here is bad
       bufferlist t;
-
-      r = c->compress(*l, t);
+      int r = c->compress(wi.bl, t);
       assert(r == 0);
 
+      bluestore_compression_header_t chdr;
+      chdr.type = c->get_type();
       chdr.length = t.length();
-      ::encode(chdr, compressed_bl);
-      compressed_bl.claim_append(t);
-      uint64_t rawlen = compressed_bl.length();
-      uint64_t newlen = P2ROUNDUP(rawlen, min_alloc_size);
-      uint64_t want_len_raw = final_length * crr;
+      ::encode(chdr, wi.compressed_bl);
+      wi.compressed_bl.claim_append(t);
+
+      wi.compressed_len = wi.compressed_bl.length();
+      uint64_t newlen = P2ROUNDUP(wi.compressed_len, min_alloc_size);
+      uint64_t want_len_raw = wi.blob_length * crr;
       uint64_t want_len = P2ROUNDUP(want_len_raw, min_alloc_size);
-      if (newlen <= want_len && newlen < final_length) {
-        // Cool. We compressed at least as much as we were hoping to.
-        // pad out to min_alloc_size
-       compressed_bl.append_zero(newlen - rawlen);
-       logger->inc(l_bluestore_write_pad_bytes, newlen - rawlen);
+      if (newlen <= want_len && newlen < wi.blob_length) {
+       // Cool. We compressed at least as much as we were hoping to.
+       // pad out to min_alloc_size
+       wi.compressed_bl.append_zero(newlen - wi.compressed_len);
+       logger->inc(l_bluestore_write_pad_bytes, newlen - wi.compressed_len);
        dout(20) << __func__ << std::hex << "  compressed 0x" << wi.blob_length
-                << " -> 0x" << rawlen << " => 0x" << newlen
+                << " -> 0x" << wi.compressed_len << " => 0x" << newlen
                 << " with " << c->get_type()
                 << std::dec << dendl;
-       txc->statfs_delta.compressed() += rawlen;
-       txc->statfs_delta.compressed_original() += l->length();
+       txc->statfs_delta.compressed() += wi.compressed_len;
+       txc->statfs_delta.compressed_original() += wi.blob_length;
        txc->statfs_delta.compressed_allocated() += newlen;
-       l = &compressed_bl;
-       final_length = newlen;
-       csum_length = newlen;
-       csum_order = ctz(newlen);
-       dblob.set_compressed(wi.blob_length, rawlen);
-       compressed = true;
-        logger->inc(l_bluestore_compress_success_count);
+       logger->inc(l_bluestore_compress_success_count);
+       wi.compressed = true;
+       need += newlen;
       } else {
-       dout(20) << __func__ << std::hex << "  0x" << l->length()
-                << " compressed to 0x" << rawlen << " -> 0x" << newlen
-                 << " with " << c->get_type()
-                 << ", which is more than required 0x" << want_len_raw
+       dout(20) << __func__ << std::hex << "  0x" << wi.blob_length
+                << " compressed to 0x" << wi.compressed_len << " -> 0x" << newlen
+                << " with " << c->get_type()
+                << ", which is more than required 0x" << want_len_raw
                 << " -> 0x" << want_len
-                 << ", leaving uncompressed"
-                 << std::dec << dendl;
-        logger->inc(l_bluestore_compress_rejected_count);
+                << ", leaving uncompressed"
+                << std::dec << dendl;
+       logger->inc(l_bluestore_compress_rejected_count);
+       need += wi.blob_length;
       }
       logger->tinc(l_bluestore_compress_lat,
                   ceph_clock_now() - start);
+    } else {
+      need += wi.blob_length;
     }
-    if (!compressed && wi.new_blob) {
+  }
+  int r = alloc->reserve(need);
+  if (r < 0) {
+    derr << __func__ << " failed to reserve 0x" << std::hex << need << std::dec
+        << dendl;
+    return r;
+  }
+  AllocExtentVector prealloc;
+  prealloc.reserve(2 * wctx->writes.size());;
+  int prealloc_left = 0;
+  prealloc_left = alloc->allocate(
+    need, min_alloc_size, need,
+    0, &prealloc);
+  assert(prealloc_left == (int64_t)need);
+  dout(20) << __func__ << " prealloc " << prealloc << dendl;
+  auto prealloc_pos = prealloc.begin();
+
+  for (auto& wi : wctx->writes) {
+    BlobRef b = wi.b;
+    bluestore_blob_t& dblob = b->dirty_blob();
+    uint64_t b_off = wi.b_off;
+    bufferlist *l = &wi.bl;
+    uint64_t final_length = wi.blob_length;
+    uint64_t csum_length = wi.blob_length;
+    unsigned csum_order = block_size_order;
+    if (wi.compressed) {
+      final_length = wi.compressed_bl.length();
+      csum_length = final_length;
+      csum_order = ctz(csum_length);
+      l = &wi.compressed_bl;
+      dblob.set_compressed(wi.blob_length, wi.compressed_len);
+    } else if (wi.new_blob) {
       // initialize newly created blob only
       assert(dblob.is_mutable());
       if (l->length() != wi.blob_length) {
@@ -9882,41 +10096,52 @@ int BlueStore::_do_alloc_write(
       if ((suggested_boff % (1 << csum_order)) == 0 &&
            suggested_boff + final_length <= max_bsize &&
            suggested_boff > b_off) {
-        dout(20) << __func__ << " forcing blob_offset to "
+        dout(20) << __func__ << " forcing blob_offset to 0x"
                  << std::hex << suggested_boff << std::dec << dendl;
         assert(suggested_boff >= b_off);
         csum_length += suggested_boff - b_off;
         b_off = suggested_boff;
       }
+      if (csum != Checksummer::CSUM_NONE) {
+        dout(20) << __func__ << " initialize csum setting for new blob " << *b
+                 << " csum_type " << Checksummer::get_csum_type_string(csum)
+                 << " csum_order " << csum_order
+                 << " csum_length 0x" << std::hex << csum_length << std::dec
+                 << dendl;
+        dblob.init_csum(csum, csum_order, csum_length);
+      }
     }
 
     AllocExtentVector extents;
-    extents.reserve(4);  // 4 should be (more than) enough for most allocations
-    int64_t got = alloc->allocate(final_length, min_alloc_size, 
-                                 max_alloc_size.load(),
-                                 hint, &extents);
-    assert(got == (int64_t)final_length);
-    need -= got;
-    txc->statfs_delta.allocated() += got;
+    int64_t left = final_length;
+    while (left > 0) {
+      assert(prealloc_left > 0);
+      if (prealloc_pos->length <= left) {
+       prealloc_left -= prealloc_pos->length;
+       left -= prealloc_pos->length;
+       txc->statfs_delta.allocated() += prealloc_pos->length;
+       extents.push_back(*prealloc_pos);
+       ++prealloc_pos;
+      } else {
+       extents.emplace_back(prealloc_pos->offset, left);
+       prealloc_pos->offset += left;
+       prealloc_pos->length -= left;
+       prealloc_left -= left;
+       txc->statfs_delta.allocated() += left;
+       left = 0;
+       break;
+      }
+    }
     for (auto& p : extents) {
-      bluestore_pextent_t e = bluestore_pextent_t(p);
-      txc->allocated.insert(e.offset, e.length);
-      hint = p.end();
+      txc->allocated.insert(p.offset, p.length);
     }
     dblob.allocated(P2ALIGN(b_off, min_alloc_size), final_length, extents);
 
-    dout(20) << __func__ << " blob " << *b
-            << " csum_type " << Checksummer::get_csum_type_string(csum)
-            << " csum_order " << csum_order
-            << " csum_length 0x" << std::hex << csum_length << std::dec
-            << dendl;
-
-    if (csum != Checksummer::CSUM_NONE) {
-      if (!dblob.has_csum()) {
-       dblob.init_csum(csum, csum_order, csum_length);
-      }
+    dout(20) << __func__ << " blob " << *b << dendl;
+    if (dblob.has_csum()) {
       dblob.calc_csum(b_off, *l);
     }
+
     if (wi.mark_unused) {
       auto b_end = b_off + wi.bl.length();
       if (b_off) {
@@ -9962,9 +10187,8 @@ int BlueStore::_do_alloc_write(
       }
     }
   }
-  if (need > 0) {
-    alloc->unreserve(need);
-  }
+  assert(prealloc_pos == prealloc.end());
+  assert(prealloc_left == 0);
   return 0;
 }
 
@@ -10314,10 +10538,14 @@ int BlueStore::_write(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << dendl;
-  _assign_nid(txc, o);
-  int r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
-  txc->write_onode(o);
-
+  int r = 0;
+  if (offset + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _assign_nid(txc, o);
+    r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
+    txc->write_onode(o);
+  }
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << " = " << r << dendl;
@@ -10332,8 +10560,13 @@ int BlueStore::_zero(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << dendl;
-  _assign_nid(txc, o);
-  int r = _do_zero(txc, c, o, offset, length);
+  int r = 0;
+  if (offset + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _assign_nid(txc, o);
+    r = _do_zero(txc, c, o, offset, length);
+  }
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << "~" << length << std::dec
           << " = " << r << dendl;
@@ -10358,7 +10591,7 @@ int BlueStore::_do_zero(TransContext *txc,
   o->extent_map.dirty_range(offset, length);
   _wctx_finish(txc, c, o, &wctx);
 
-  if (offset + length > o->onode.size) {
+  if (length > 0 && offset + length > o->onode.size) {
     o->onode.size = offset + length;
     dout(20) << __func__ << " extending size to " << offset + length
             << dendl;
@@ -10408,7 +10641,7 @@ void BlueStore::_do_truncate(
   txc->write_onode(o);
 }
 
-void BlueStore::_truncate(TransContext *txc,
+int BlueStore::_truncate(TransContext *txc,
                         CollectionRef& c,
                         OnodeRef& o,
                         uint64_t offset)
@@ -10416,7 +10649,16 @@ void BlueStore::_truncate(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << o->oid
           << " 0x" << std::hex << offset << std::dec
           << dendl;
-  _do_truncate(txc, c, o, offset);
+  int r = 0;
+  if (offset >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+  } else {
+    _do_truncate(txc, c, o, offset);
+  }
+  dout(10) << __func__ << " " << c->cid << " " << o->oid
+          << " 0x" << std::hex << offset << std::dec
+          << " = " << r << dendl;
+  return r;
 }
 
 int BlueStore::_do_remove(
@@ -10472,10 +10714,14 @@ int BlueStore::_do_remove(
     if (b.is_shared() &&
        sb->loaded &&
        maybe_unshared_blobs.count(sb)) {
-      b.map(e.blob_offset, e.length, [&](uint64_t off, uint64_t len) {
-        expect[sb].get(off, len);
-       return 0;
-      });
+      if (b.is_compressed()) {
+       expect[sb].get(0, b.get_ondisk_length());
+      } else {
+       b.map(e.blob_offset, e.length, [&](uint64_t off, uint64_t len) {
+           expect[sb].get(off, len);
+           return 0;
+         });
+      }
     }
   }
 
@@ -10499,8 +10745,6 @@ int BlueStore::_do_remove(
     return 0;
   }
 
-  uint32_t b_start = OBJECT_MAX_SIZE;
-  uint32_t b_end = 0;
   for (auto& e : h->extent_map.extent_map) {
     const bluestore_blob_t& b = e.blob->get_blob();
     SharedBlob *sb = e.blob->shared_blob.get();
@@ -10510,17 +10754,9 @@ int BlueStore::_do_remove(
       dout(20) << __func__ << "  unsharing " << e << dendl;
       bluestore_blob_t& blob = e.blob->dirty_blob();
       blob.clear_flag(bluestore_blob_t::FLAG_SHARED);
-      if (e.logical_offset < b_start) {
-        b_start = e.logical_offset;
-      }
-      if (e.logical_end() > b_end) {
-        b_end = e.logical_end();
-      }
+      h->extent_map.dirty_range(e.logical_offset, 1);
     }
   }
-
-  assert(b_end > b_start);
-  h->extent_map.dirty_range(b_start, b_end - b_start);
   txc->write_onode(h);
 
   return 0;
@@ -10546,10 +10782,14 @@ int BlueStore::_setattr(TransContext *txc,
           << " " << name << " (" << val.length() << " bytes)"
           << dendl;
   int r = 0;
-  if (val.is_partial())
-    o->onode.attrs[name.c_str()] = bufferptr(val.c_str(), val.length());
-  else
-    o->onode.attrs[name.c_str()] = val;
+  if (val.is_partial()) {
+    auto& b = o->onode.attrs[name.c_str()] = bufferptr(val.c_str(),
+                                                      val.length());
+    b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+  } else {
+    auto& b = o->onode.attrs[name.c_str()] = val;
+    b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+  }
   txc->write_onode(o);
   dout(10) << __func__ << " " << c->cid << " " << o->oid
           << " " << name << " (" << val.length() << " bytes)"
@@ -10568,11 +10808,14 @@ int BlueStore::_setattrs(TransContext *txc,
   int r = 0;
   for (map<string,bufferptr>::const_iterator p = aset.begin();
        p != aset.end(); ++p) {
-    if (p->second.is_partial())
-      o->onode.attrs[p->first.c_str()] =
+    if (p->second.is_partial()) {
+      auto& b = o->onode.attrs[p->first.c_str()] =
        bufferptr(p->second.c_str(), p->second.length());
-    else
-      o->onode.attrs[p->first.c_str()] = p->second;
+      b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+    } else {
+      auto& b = o->onode.attrs[p->first.c_str()] = p->second;
+      b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+    }
   }
   txc->write_onode(o);
   dout(10) << __func__ << " " << c->cid << " " << o->oid
@@ -10908,6 +11151,7 @@ int BlueStore::_do_clone_range(
   uint64_t end = srcoff + length;
   uint32_t dirty_range_begin = 0;
   uint32_t dirty_range_end = 0;
+  bool src_dirty = false;
   for (auto ep = oldo->extent_map.seek_lextent(srcoff);
        ep != oldo->extent_map.extent_map.end();
        ++ep) {
@@ -10928,7 +11172,8 @@ int BlueStore::_do_clone_range(
       // make sure it is shared
       if (!blob.is_shared()) {
        c->make_blob_shared(_assign_blobid(txc), e.blob);
-       if (dirty_range_begin == 0) {
+       if (!src_dirty) {
+         src_dirty = true;
           dirty_range_begin = e.logical_offset;
         }
         assert(e.logical_end() > 0);
@@ -10980,7 +11225,7 @@ int BlueStore::_do_clone_range(
     dout(20) << __func__ << "  dst " << *ne << dendl;
     ++n;
   }
-  if (dirty_range_end > dirty_range_begin) {
+  if (src_dirty) {
     oldo->extent_map.dirty_range(dirty_range_begin,
       dirty_range_end - dirty_range_begin);
     txc->write_onode(oldo);
@@ -11007,6 +11252,11 @@ int BlueStore::_clone_range(TransContext *txc,
           << " to offset 0x" << dstoff << std::dec << dendl;
   int r = 0;
 
+  if (srcoff + length >= OBJECT_MAX_SIZE ||
+      dstoff + length >= OBJECT_MAX_SIZE) {
+    r = -E2BIG;
+    goto out;
+  }
   if (srcoff + length > oldo->onode.size) {
     r = -EINVAL;
     goto out;
@@ -11419,6 +11669,14 @@ void BlueStore::_flush_cache()
     assert(i->empty());
   }
   for (auto& p : coll_map) {
+    if (!p.second->onode_map.empty()) {
+      derr << __func__ << "stray onodes on " << p.first << dendl;
+      p.second->onode_map.dump(cct, 0);
+    }
+    if (!p.second->shared_blob_set.empty()) {
+      derr << __func__ << " stray shared blobs on " << p.first << dendl;
+      p.second->shared_blob_set.dump(cct, 0);
+    }
     assert(p.second->onode_map.empty());
     assert(p.second->shared_blob_set.empty());
   }