]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/BlueFS.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / os / bluestore / BlueFS.cc
index 70fa142b717743a6eda67f907ae69279feae6b40..64fbe1e88039f91155dc55b2d195293b1870f841 100644 (file)
@@ -1,6 +1,6 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
-
+#include <chrono>
 #include "boost/algorithm/string.hpp" 
 #include "bluestore_common.h"
 #include "BlueFS.h"
@@ -28,6 +28,9 @@ using std::set;
 using std::string;
 using std::to_string;
 using std::vector;
+using std::chrono::duration;
+using std::chrono::duration_cast;
+using std::chrono::seconds;
 
 using ceph::bufferlist;
 using ceph::decode;
@@ -107,6 +110,7 @@ private:
   SocketHook(BlueFS* bluefs) :
     bluefs(bluefs) {}
   int call(std::string_view command, const cmdmap_t& cmdmap,
+          const bufferlist&,
           Formatter *f,
           std::ostream& errss,
           bufferlist& out) override {
@@ -368,6 +372,24 @@ void BlueFS::_init_logger()
                    "Bytes requested in prefetch read mode",
                     NULL,
                    PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_time_avg     (l_bluefs_compaction_lat, "compact_lat",
+                    "Average bluefs log compaction latency",
+                    "c__t",
+                    PerfCountersBuilder::PRIO_INTERESTING);
+ b.add_time_avg     (l_bluefs_compaction_lock_lat, "compact_lock_lat",
+                    "Average lock duration while compacting bluefs log",
+                    "c_lt",
+                    PerfCountersBuilder::PRIO_INTERESTING);
+  b.add_u64_counter(l_bluefs_alloc_shared_dev_fallbacks, "alloc_slow_fallback",
+                   "Amount of allocations that required fallback to "
+                    " slow/shared device",
+                    "asdf",
+                   PerfCountersBuilder::PRIO_USEFUL);
+  b.add_u64_counter(l_bluefs_alloc_shared_size_fallbacks, "alloc_slow_size_fallback",
+                   "Amount of allocations that required fallback to shared device's "
+                    "regular unit size",
+                    "assf",
+                   PerfCountersBuilder::PRIO_USEFUL);
   b.add_u64(l_bluefs_read_zeros_candidate, "read_zeros_candidate",
            "How many times bluefs read found page with all 0s");
   b.add_u64(l_bluefs_read_zeros_errors, "read_zeros_errors",
@@ -563,7 +585,7 @@ int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
   _init_logger();
   _init_alloc();
 
-  super.version = 1;
+  super.version = 0;
   super.block_size = bdev[BDEV_DB]->get_block_size();
   super.osd_uuid = osd_uuid;
   super.uuid.generate_random();
@@ -576,6 +598,7 @@ int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
   int r = _allocate(
     vselector->select_prefer_bdev(log_file->vselector_hint),
     cct->_conf->bluefs_max_log_runway,
+    0,
     &log_file->fnode);
   vselector->add_usage(log_file->vselector_hint, log_file->fnode);
   ceph_assert(r == 0);
@@ -986,7 +1009,6 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout)
       new_log_dev_next,
       RENAME_DB2SLOW,
       layout);
-    //}
   } else if(id == BDEV_NEWWAL) {
     _rewrite_log_and_layout_sync_LNF_LD(false,
       BDEV_DB,
@@ -1026,6 +1048,7 @@ int BlueFS::fsck()
 
 int BlueFS::_write_super(int dev)
 {
+  ++super.version;
   // build superblock
   bufferlist bl;
   encode(super, bl);
@@ -1088,12 +1111,17 @@ int BlueFS::_check_allocations(const bluefs_fnode_t& fnode,
     auto id = e.bdev;
     bool fail = false;
     ceph_assert(id < MAX_BDEV);
+    ceph_assert(bdev[id]);
+    // let's use minimal allocation unit we can have
+    auto alloc_unit = bdev[id]->get_block_size();
+
     if (int r = _verify_alloc_granularity(id, e.offset, e.length,
+                                          alloc_unit,
                                          op_name); r < 0) {
       return r;
     }
 
-    apply_for_bitset_range(e.offset, e.length, alloc_size[id], used_blocks[id],
+    apply_for_bitset_range(e.offset, e.length, alloc_unit, used_blocks[id],
       [&](uint64_t pos, boost::dynamic_bitset<uint64_t> &bs) {
        if (is_alloc == bs.test(pos)) {
          fail = true;
@@ -1115,31 +1143,14 @@ int BlueFS::_check_allocations(const bluefs_fnode_t& fnode,
 }
 
 int BlueFS::_verify_alloc_granularity(
-  __u8 id, uint64_t offset, uint64_t length, const char *op)
+  __u8 id, uint64_t offset, uint64_t length, uint64_t alloc_unit, const char *op)
 {
-  if ((offset & (alloc_size[id] - 1)) ||
-      (length & (alloc_size[id] - 1))) {
+  if ((offset & (alloc_unit - 1)) ||
+      (length & (alloc_unit - 1))) {
     derr << __func__ << " " << op << " of " << (int)id
         << ":0x" << std::hex << offset << "~" << length << std::dec
         << " does not align to alloc_size 0x"
-        << std::hex << alloc_size[id] << std::dec << dendl;
-    // be helpful
-    auto need = alloc_size[id];
-    while (need && ((offset & (need - 1)) ||
-                   (length & (need - 1)))) {
-      need >>= 1;
-    }
-    if (need) {
-      const char *which;
-      if (id == BDEV_SLOW ||
-         (id == BDEV_DB && !bdev[BDEV_SLOW])) {
-       which = "bluefs_shared_alloc_size";
-      } else {
-       which = "bluefs_alloc_size";
-      }
-      derr << "work-around by setting " << which << " = " << need
-          << " for this OSD" << dendl;
-    }
+        << std::hex << alloc_unit << std::dec << dendl;
     return -EFAULT;
   }
   return 0;
@@ -1158,11 +1169,6 @@ int BlueFS::_replay(bool noop, bool to_stdout)
   if (!noop) {
     log_file->vselector_hint =
       vselector->get_hint_for_log();
-  } else {
-    // do not use fnode from superblock in 'noop' mode - log_file's one should
-    // be fine and up-to-date
-    ceph_assert(log_file->fnode.ino == 1);
-    ceph_assert(log_file->fnode.extents.size() != 0);
   }
   dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl;
   if (unlikely(to_stdout)) {
@@ -1181,8 +1187,11 @@ int BlueFS::_replay(bool noop, bool to_stdout)
   if (!noop) {
     if (cct->_conf->bluefs_log_replay_check_allocations) {
       for (size_t i = 0; i < MAX_BDEV; ++i) {
-       if (alloc_size[i] != 0 && bdev[i] != nullptr) {
-         used_blocks[i].resize(round_up_to(bdev[i]->get_size(), alloc_size[i]) / alloc_size[i]);
+       if (bdev[i] != nullptr) {
+          // let's use minimal allocation unit we can have
+          auto au = bdev[i]->get_block_size();
+          //hmm... on 32TB/4K drive this would take 1GB RAM!!!
+         used_blocks[i].resize(round_up_to(bdev[i]->get_size(), au) / au);
        }
       }
       // check initial log layout
@@ -1305,7 +1314,9 @@ int BlueFS::_replay(bool noop, bool to_stdout)
     }
 
     auto p = t.op_bl.cbegin();
+    auto pos0 = pos;
     while (!p.end()) {
+      pos = pos0 + p.get_off();
       __u8 op;
       decode(op, p);
       switch (op) {
@@ -1723,26 +1734,27 @@ int BlueFS::device_migrate_to_existing(
 
   for (auto& [ino, file_ref] : nodes.file_map) {
     //do not copy log
-    if (file_ref->fnode.ino == 1) {
+    if (ino == 1) {
       continue;
     }
     dout(10) << __func__ << " " << ino << " " << file_ref->fnode << dendl;
 
-    auto& fnode_extents = file_ref->fnode.extents;
     vselector->sub_usage(file_ref->vselector_hint, file_ref->fnode);
 
     bool rewrite = std::any_of(
-      fnode_extents.begin(),
-      fnode_extents.end(),
+      file_ref->fnode.extents.begin(),
+      file_ref->fnode.extents.end(),
       [=](auto& ext) {
        return ext.bdev != dev_target && devs_source.count(ext.bdev);
       });
     if (rewrite) {
       dout(10) << __func__ << "  migrating" << dendl;
-
+      bluefs_fnode_t old_fnode;
+      old_fnode.swap_extents(file_ref->fnode);
+      auto& old_fnode_extents = old_fnode.extents;
       // read entire file
       bufferlist bl;
-      for (auto old_ext : fnode_extents) {
+      for (const auto &old_ext : old_fnode_extents) {
        buf.resize(old_ext.length);
        int r = _bdev_read_random(old_ext.bdev,
          old_ext.offset,
@@ -1759,8 +1771,8 @@ int BlueFS::device_migrate_to_existing(
       }
 
       // write entire file
-      PExtentVector extents;
-      auto l = _allocate_without_fallback(dev_target, bl.length(), &extents);
+      auto l = _allocate(dev_target, bl.length(), 0,
+        &file_ref->fnode, 0, false);
       if (l < 0) {
        derr << __func__ << " unable to allocate len 0x" << std::hex
             << bl.length() << std::dec << " from " << (int)dev_target
@@ -1769,7 +1781,7 @@ int BlueFS::device_migrate_to_existing(
       }
 
       uint64_t off = 0;
-      for (auto& i : extents) {
+      for (auto& i : file_ref->fnode.extents) {
        bufferlist cur;
        uint64_t cur_len = std::min<uint64_t>(i.length, bl.length() - off);
        ceph_assert(cur_len > 0);
@@ -1780,7 +1792,7 @@ int BlueFS::device_migrate_to_existing(
       }
 
       // release old extents
-      for (auto old_ext : fnode_extents) {
+      for (const auto &old_ext : old_fnode_extents) {
        PExtentVector to_release;
        to_release.emplace_back(old_ext.offset, old_ext.length);
        alloc[old_ext.bdev]->release(to_release);
@@ -1790,12 +1802,11 @@ int BlueFS::device_migrate_to_existing(
       }
 
       // update fnode
-      fnode_extents.clear();
-      for (auto& i : extents) {
-       fnode_extents.emplace_back(dev_target_new, i.offset, i.length);
+      for (auto& i : file_ref->fnode.extents) {
+       i.bdev = dev_target_new;
       }
     } else {
-      for (auto& ext : fnode_extents) {
+      for (auto& ext : file_ref->fnode.extents) {
        if (dev_target != dev_target_new && ext.bdev == dev_target) {
          dout(20) << __func__ << "  " << " ... adjusting extent 0x"
                   << std::hex << ext.offset << std::dec
@@ -1861,30 +1872,29 @@ int BlueFS::device_migrate_to_new(
   flags |= devs_source.count(BDEV_WAL) ? REMOVE_WAL : 0;
   int dev_target_new = dev_target; //FIXME: remove, makes no sense
 
-  for (auto& p : nodes.file_map) {
+  for (auto& [ino, file_ref] : nodes.file_map) {
     //do not copy log
-    if (p.second->fnode.ino == 1) {
+    if (ino == 1) {
       continue;
     }
-    dout(10) << __func__ << " " << p.first << " " << p.second->fnode << dendl;
+    dout(10) << __func__ << " " << ino << " " << file_ref->fnode << dendl;
 
-    auto& fnode_extents = p.second->fnode.extents;
+    vselector->sub_usage(file_ref->vselector_hint, file_ref->fnode);
 
-    bool rewrite = false;
-    for (auto ext_it = fnode_extents.begin();
-        ext_it != p.second->fnode.extents.end();
-        ++ext_it) {
-      if (ext_it->bdev != dev_target && devs_source.count(ext_it->bdev)) {
-       rewrite = true;
-       break;
-      }
-    }
+    bool rewrite = std::any_of(
+      file_ref->fnode.extents.begin(),
+      file_ref->fnode.extents.end(),
+      [=](auto& ext) {
+       return ext.bdev != dev_target && devs_source.count(ext.bdev);
+      });
     if (rewrite) {
       dout(10) << __func__ << "  migrating" << dendl;
-
+      bluefs_fnode_t old_fnode;
+      old_fnode.swap_extents(file_ref->fnode);
+      auto& old_fnode_extents = old_fnode.extents;
       // read entire file
       bufferlist bl;
-      for (auto old_ext : fnode_extents) {
+      for (const auto &old_ext : old_fnode_extents) {
        buf.resize(old_ext.length);
        int r = _bdev_read_random(old_ext.bdev,
          old_ext.offset,
@@ -1901,8 +1911,8 @@ int BlueFS::device_migrate_to_new(
       }
 
       // write entire file
-      PExtentVector extents;
-      auto l = _allocate_without_fallback(dev_target, bl.length(), &extents);
+      auto l = _allocate(dev_target, bl.length(), 0,
+        &file_ref->fnode, 0, false);
       if (l < 0) {
        derr << __func__ << " unable to allocate len 0x" << std::hex
             << bl.length() << std::dec << " from " << (int)dev_target
@@ -1911,7 +1921,7 @@ int BlueFS::device_migrate_to_new(
       }
 
       uint64_t off = 0;
-      for (auto& i : extents) {
+      for (auto& i : file_ref->fnode.extents) {
        bufferlist cur;
        uint64_t cur_len = std::min<uint64_t>(i.length, bl.length() - off);
        ceph_assert(cur_len > 0);
@@ -1922,7 +1932,7 @@ int BlueFS::device_migrate_to_new(
       }
 
       // release old extents
-      for (auto old_ext : fnode_extents) {
+      for (const auto &old_ext : old_fnode_extents) {
        PExtentVector to_release;
        to_release.emplace_back(old_ext.offset, old_ext.length);
        alloc[old_ext.bdev]->release(to_release);
@@ -1932,9 +1942,8 @@ int BlueFS::device_migrate_to_new(
       }
 
       // update fnode
-      fnode_extents.clear();
-      for (auto& i : extents) {
-       fnode_extents.emplace_back(dev_target_new, i.offset, i.length);
+      for (auto& i : file_ref->fnode.extents) {
+       i.bdev = dev_target_new;
       }
     }
   }
@@ -2129,7 +2138,9 @@ int64_t BlueFS::_read_random(
       buf->pos += r;
     }
   }
-  dout(20) << __func__ << " got " << ret << dendl;
+  dout(20) << __func__ << std::hex
+           << " got 0x" << ret
+           << std::dec  << dendl;
   --h->file->num_reading;
   return ret;
 }
@@ -2206,13 +2217,16 @@ int64_t BlueFS::_read(
                  << std::hex << x_off << "~" << l << std::dec
                  << " of " << *p << dendl;
        int r;
+       // when reading BlueFS log (only happens on startup) use non-buffered io
+       // it makes it in sync with logic in _flush_range()
+       bool use_buffered_io = h->file->fnode.ino == 1 ? false : cct->_conf->bluefs_buffered_io;
        if (!cct->_conf->bluefs_check_for_zeros) {
          r = _bdev_read(p->bdev, p->offset + x_off, l, &buf->bl, ioc[p->bdev],
-                        cct->_conf->bluefs_buffered_io);
+                        use_buffered_io);
        } else {
          r = _read_and_check(
            p->bdev, p->offset + x_off, l, &buf->bl, ioc[p->bdev],
-           cct->_conf->bluefs_buffered_io);
+           use_buffered_io);
        }
        logger->inc(l_bluefs_read_disk_count, 1);
        logger->inc(l_bluefs_read_disk_bytes, l);
@@ -2254,7 +2268,9 @@ int64_t BlueFS::_read(
     buf->pos += r;
   }
 
-  dout(20) << __func__ << " got " << ret << dendl;
+  dout(20) << __func__ << std::hex
+           << " got 0x" << ret
+           << std::dec  << dendl;
   ceph_assert(!outbl || (int)outbl->length() == ret);
   --h->file->num_reading;
   return ret;
@@ -2282,6 +2298,44 @@ void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
   }
 }
 
+
+uint64_t BlueFS::_estimate_transaction_size(bluefs_transaction_t* t)
+{
+  uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL],
+                                    std::max(alloc_size[BDEV_DB],
+                                             alloc_size[BDEV_SLOW]));
+
+  // conservative estimate for final encoded size
+  return round_up_to(t->op_bl.length() + super.block_size * 2, max_alloc_size);
+}
+
+uint64_t BlueFS::_make_initial_transaction(uint64_t start_seq,
+                                           bluefs_fnode_t& fnode,
+                                           uint64_t expected_final_size,
+                                           bufferlist* out)
+{
+  bluefs_transaction_t t0;
+  t0.seq = start_seq;
+  t0.uuid = super.uuid;
+  t0.op_init();
+  t0.op_file_update_inc(fnode);
+  t0.op_jump(start_seq, expected_final_size); // this is a fixed size op,
+                                              // hence it's valid with fake
+                                              // params for overall txc size
+                                              // estimation
+  if (!out) {
+    return _estimate_transaction_size(&t0);
+  }
+
+  ceph_assert(expected_final_size > 0);
+  out->reserve(expected_final_size);
+  encode(t0, *out);
+  // make sure we're not wrong aboth the size
+  ceph_assert(out->length() <= expected_final_size);
+  _pad_bl(*out, expected_final_size);
+  return expected_final_size;
+}
+
 uint64_t BlueFS::_estimate_log_size_N()
 {
   std::lock_guard nl(nodes.lock);
@@ -2329,77 +2383,51 @@ bool BlueFS::_should_start_compact_log_L_N()
   return true;
 }
 
-void BlueFS::_compact_log_dump_metadata_NF(bluefs_transaction_t *t,
-                                       int flags)
+void BlueFS::_compact_log_dump_metadata_NF(uint64_t start_seq,
+                                        bluefs_transaction_t *t,
+                                       int bdev_update_flags,
+                                        uint64_t capture_before_seq)
 {
-  std::lock_guard nl(nodes.lock);
-
-  t->seq = 1;
+  dout(20) << __func__ << dendl;
+  t->seq = start_seq;
   t->uuid = super.uuid;
-  dout(20) << __func__ << " op_init" << dendl;
 
-  t->op_init();
-  for (auto& [ino, file_ref] : nodes.file_map) {
-    if (ino == 1)
-      continue;
-    ceph_assert(ino > 1);
-    std::lock_guard fl(file_ref->lock);
-    for(auto& e : file_ref->fnode.extents) {
-      auto bdev = e.bdev;
-      auto bdev_new = bdev;
-      ceph_assert(!((flags & REMOVE_WAL) && bdev == BDEV_WAL));
-      if ((flags & RENAME_SLOW2DB) && bdev == BDEV_SLOW) {
-       bdev_new = BDEV_DB;
-      }
-      if ((flags & RENAME_DB2SLOW) && bdev == BDEV_DB) {
-       bdev_new = BDEV_SLOW;
-      }
-      if (bdev == BDEV_NEWDB) {
-       // REMOVE_DB xor RENAME_DB
-       ceph_assert(!(flags & REMOVE_DB) != !(flags & RENAME_DB2SLOW));
-       ceph_assert(!(flags & RENAME_SLOW2DB));
-       bdev_new = BDEV_DB;
-      }
-      if (bdev == BDEV_NEWWAL) {
-       ceph_assert(flags & REMOVE_WAL);
-       bdev_new = BDEV_WAL;
-      }
-      e.bdev = bdev_new;
-    }
-    dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl;
-    t->op_file_update(file_ref->fnode);
-  }
-  for (auto& [path, dir_ref] : nodes.dir_map) {
-    dout(20) << __func__ << " op_dir_create " << path << dendl;
-    t->op_dir_create(path);
-    for (auto& [fname, file_ref] : dir_ref->file_map) {
-      dout(20) << __func__ << " op_dir_link " << path << "/" << fname
-              << " to " << file_ref->fnode.ino << dendl;
-      t->op_dir_link(path, fname, file_ref->fnode.ino);
-    }
-  }
-}
-/* Streams to t files modified before *capture_before_seq* and all dirs */
-void BlueFS::_compact_log_async_dump_metadata_NF(bluefs_transaction_t *t,
-                                                uint64_t capture_before_seq)
-{
   std::lock_guard nl(nodes.lock);
 
-  t->seq = 1;
-  t->uuid = super.uuid;
-  dout(20) << __func__ << " op_init" << dendl;
-
-  t->op_init();
   for (auto& [ino, file_ref] : nodes.file_map) {
     if (ino == 1)
       continue;
     ceph_assert(ino > 1);
     std::lock_guard fl(file_ref->lock);
-    if (file_ref->dirty_seq < capture_before_seq) {
+    if (bdev_update_flags) {
+      for(auto& e : file_ref->fnode.extents) {
+        auto bdev = e.bdev;
+        auto bdev_new = bdev;
+        ceph_assert(!((bdev_update_flags & REMOVE_WAL) && bdev == BDEV_WAL));
+        if ((bdev_update_flags & RENAME_SLOW2DB) && bdev == BDEV_SLOW) {
+         bdev_new = BDEV_DB;
+        }
+        if ((bdev_update_flags & RENAME_DB2SLOW) && bdev == BDEV_DB) {
+         bdev_new = BDEV_SLOW;
+        }
+        if (bdev == BDEV_NEWDB) {
+         // REMOVE_DB xor RENAME_DB
+         ceph_assert(!(bdev_update_flags & REMOVE_DB) != !(bdev_update_flags & RENAME_DB2SLOW));
+         ceph_assert(!(bdev_update_flags & RENAME_SLOW2DB));
+         bdev_new = BDEV_DB;
+        }
+        if (bdev == BDEV_NEWWAL) {
+         ceph_assert(bdev_update_flags & REMOVE_WAL);
+         bdev_new = BDEV_WAL;
+        }
+        e.bdev = bdev_new;
+      }
+    }
+    if (capture_before_seq == 0 || file_ref->dirty_seq < capture_before_seq) {
       dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl;
     } else {
       dout(20) << __func__ << " op_file_update just modified, dirty_seq="
-              << file_ref->dirty_seq << " " << file_ref->fnode << dendl;
+               << file_ref->dirty_seq << " " << file_ref->fnode << dendl;
     }
     t->op_file_update(file_ref->fnode);
   }
@@ -2432,17 +2460,49 @@ void BlueFS::_compact_log_sync_LNF_LD()
   logger->inc(l_bluefs_log_compactions);
 }
 
-void BlueFS::_rewrite_log_and_layout_sync_LNF_LD(bool allocate_with_fallback,
+/*
+ * SYNC LOG COMPACTION
+ *
+ * 0. Lock the log completely through the whole procedure
+ *
+ * 1. Build new log. It will include log's starter and compacted metadata
+ *    body. Jump op appended to the starter will link the pieces together.
+ *
+ * 2. Write out new log's content
+ *
+ * 3. Write out new superblock. This includes relevant device layout update.
+ *
+ * 4. Finalization. Old space release.
+ */
+
+void BlueFS::_rewrite_log_and_layout_sync_LNF_LD(bool permit_dev_fallback,
                                         int super_dev,
                                         int log_dev,
                                         int log_dev_new,
                                         int flags,
                                         std::optional<bluefs_layout_t> layout)
 {
+  // we substitute log_dev with log_dev_new for new allocations below
+  // and permitting fallback allocations prevents such a substitution
+  ceph_assert((permit_dev_fallback && log_dev == log_dev_new) ||
+              !permit_dev_fallback);
+
+  dout(10) << __func__ << " super_dev:" << super_dev
+                       << " log_dev:" << log_dev
+                       << " log_dev_new:" << log_dev_new
+                      << " flags:" << flags
+                      << " seq:" << log.seq_live
+                      << dendl;
+  utime_t mtime = ceph_clock_now();
+  uint64_t starter_seq = 1;
+
+  // Part 0.
+  // Lock the log totally till the end of the procedure
   std::lock_guard ll(log.lock);
+  auto t0 = mono_clock::now();
 
   File *log_file = log.writer->file.get();
-
+  bluefs_fnode_t fnode_tail;
   // log.t.seq is always set to current live seq
   ceph_assert(log.t.seq == log.seq_live);
   // Capturing entire state. Dump anything that has been stored there.
@@ -2451,53 +2511,145 @@ void BlueFS::_rewrite_log_and_layout_sync_LNF_LD(bool allocate_with_fallback,
   // From now on, no changes to log.t are permitted until we finish rewriting log.
   // Can allow dirty to remain dirty - log.seq_live will not change.
 
-  dout(20) << __func__ << " super_dev:" << super_dev
-                       << " log_dev:" << log_dev
-                       << " log_dev_new:" << log_dev_new
-                      << " flags:" << flags
-                      << dendl;
-  bluefs_transaction_t t;
-  _compact_log_dump_metadata_NF(&t, flags);
+  //
+  // Part 1.
+  // Build new log starter and compacted metadata body
+  // 1.1. Build full compacted meta transaction.
+  //      Encode a bluefs transaction that dumps all of the in-memory fnodes
+  //      and names.
+  //      This might be pretty large and its allocation map can exceed
+  //      superblock size. Hence instead we'll need log starter part which
+  //      goes to superblock and refers that new meta through op_update_inc.
+  // 1.2.  Allocate space for the above transaction
+  //       using its size estimation.
+  // 1.3.  Allocate the space required for the starter part of the new log.
+  //       It should be small enough to fit into superblock.
+  // 1.4   Building new log persistent fnode representation which will
+  //       finally land to disk.
+  //       Depending on input parameters we might need to perform device ids
+  //       rename - runtime and persistent replicas should be different when we
+  //       are in the device migration process.
+  // 1.5   Store starter fnode to run-time superblock, to be written out later.
+  //       It doesn't contain compacted meta to fit relevant alocation map into
+  //       superblock.
+  // 1.6   Proceed building new log persistent fnode representation.
+  //       Will add log tail with compacted meta extents from 1.1.
+  //       Device rename applied as well
+  //
+  // 1.7.  Encode new log fnode starter,
+  //       It will include op_init, new log's op_update_inc
+  //       and jump to the compacted meta transaction beginning.
+  //       Superblock will reference this starter part
+  //
+  // 1.8.  Encode compacted meta transaction,
+  //       extend the transaction with a jump to proper sequence no
+  //
+
+
+  // 1.1 Build full compacted meta transaction
+  bluefs_transaction_t compacted_meta_t;
+  _compact_log_dump_metadata_NF(starter_seq + 1, &compacted_meta_t, flags, 0);
+
+  // 1.2 Allocate the space required for the compacted meta transaction
+  uint64_t compacted_meta_need =
+    _estimate_transaction_size(&compacted_meta_t) +
+      cct->_conf->bluefs_max_log_runway;
+
+  dout(20) << __func__ << " compacted_meta_need " << compacted_meta_need << dendl;
+
+  int r = _allocate(log_dev, compacted_meta_need, 0, &fnode_tail, 0,
+    permit_dev_fallback);
+  ceph_assert(r == 0);
 
-  dout(20) << __func__ << " op_jump_seq " << log.seq_live << dendl;
-  t.op_jump_seq(log.seq_live);
 
-  bufferlist bl;
-  encode(t, bl);
-  _pad_bl(bl);
+  // 1.3 Allocate the space required for the starter part of the new log.
+  // estimate new log fnode size to be referenced from superblock
+  // hence use dummy fnode and jump parameters
+  uint64_t starter_need = _make_initial_transaction(starter_seq, fnode_tail, 0, nullptr);
 
-  uint64_t need = bl.length() + cct->_conf->bluefs_max_log_runway;
-  dout(20) << __func__ << " need " << need << dendl;
+  bluefs_fnode_t fnode_starter(log_file->fnode.ino, 0, mtime);
+  r = _allocate(log_dev, starter_need, 0, &fnode_starter, 0,
+    permit_dev_fallback);
+  ceph_assert(r == 0);
 
-  bluefs_fnode_t old_fnode;
-  int r;
-  vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
-  log_file->fnode.swap_extents(old_fnode);
-  if (allocate_with_fallback) {
-    r = _allocate(log_dev, need, &log_file->fnode);
-    ceph_assert(r == 0);
-  } else {
-    PExtentVector extents;
-    r = _allocate_without_fallback(log_dev,
-                              need,
-                              &extents);
-    ceph_assert(r == 0);
-    for (auto& p : extents) {
-      log_file->fnode.append_extent(
-       bluefs_extent_t(log_dev, p.offset, p.length));
+  // 1.4 Building starter fnode
+  bluefs_fnode_t fnode_persistent(fnode_starter.ino, 0, mtime);
+  for (auto p : fnode_starter.extents) {
+    // rename device if needed - this is possible when fallback allocations
+    // are prohibited only. Which means every extent is targeted to the same
+    // device and we can unconditionally update them.
+    if (log_dev != log_dev_new) {
+      dout(10) << __func__ << " renaming log extents to "
+               << log_dev_new << dendl;
+      p.bdev = log_dev_new;
     }
+    fnode_persistent.append_extent(p);
+  }
+
+  // 1.5 Store starter fnode to run-time superblock, to be written out later
+  super.log_fnode = fnode_persistent;
+
+  // 1.6 Proceed building new log persistent fnode representation
+  // we'll build incremental update starting from this point
+  fnode_persistent.reset_delta();
+  for (auto p : fnode_tail.extents) {
+    // rename device if needed - this is possible when fallback allocations
+    // are prohibited only. Which means every extent is targeted to the same
+    // device and we can unconditionally update them.
+    if (log_dev != log_dev_new) {
+      dout(10) << __func__ << " renaming log extents to "
+               << log_dev_new << dendl;
+      p.bdev = log_dev_new;
+    }
+    fnode_persistent.append_extent(p);
   }
 
-  _close_writer(log.writer);
-
-  // we will write it to super
-  log_file->fnode.reset_delta();
-  log_file->fnode.size = bl.length();
+  // 1.7 Encode new log fnode
+  // This will flush incremental part of fnode_persistent only.
+  bufferlist starter_bl;
+  _make_initial_transaction(starter_seq, fnode_persistent, starter_need, &starter_bl);
 
+  // 1.8 Encode compacted meta transaction
+  dout(20) << __func__ << " op_jump_seq " << log.seq_live << dendl;
+  // hopefully "compact_meta_need" estimation provides enough extra space
+  // for this op, assert below if not
+  compacted_meta_t.op_jump_seq(log.seq_live);
+
+  bufferlist compacted_meta_bl;
+  encode(compacted_meta_t, compacted_meta_bl);
+  _pad_bl(compacted_meta_bl);
+  ceph_assert(compacted_meta_bl.length() <= compacted_meta_need);
+
+  //
+  // Part 2
+  // Write out new log's content
+  // 2.1. Build the full runtime new log's fnode
+  //
+  // 2.2. Write out new log's
+  //
+  // 2.3. Do flush and wait for completion through flush_bdev()
+  //
+  // 2.4. Finalize log update
+  //      Update all sequence numbers
+  //
+
+  // 2.1 Build the full runtime new log's fnode
+  bluefs_fnode_t old_log_fnode;
+  old_log_fnode.swap(fnode_starter);
+  old_log_fnode.clone_extents(fnode_tail);
+  old_log_fnode.reset_delta();
+  log_file->fnode.swap(old_log_fnode);
+
+  // 2.2 Write out new log's content
+  // Get rid off old writer
+  _close_writer(log.writer);
+  // Make new log writer and stage new log's content writing
   log.writer = _create_writer(log_file);
-  log.writer->append(bl);
+  log.writer->append(starter_bl);
+  log.writer->append(compacted_meta_bl);
+
+  // 2.3 Do flush and wait for completion through flush_bdev()
   _flush_special(log.writer);
-  vselector->add_usage(log_file->vselector_hint, log_file->fnode);
 #ifdef HAVE_LIBAIO
   if (!cct->_conf->bluefs_sync_write) {
     list<aio_t> completed_ios;
@@ -2507,74 +2659,86 @@ void BlueFS::_rewrite_log_and_layout_sync_LNF_LD(bool allocate_with_fallback,
   }
 #endif
   _flush_bdev();
+
+  // 2.4 Finalize log update
   ++log.seq_live;
   dirty.seq_live = log.seq_live;
   log.t.seq = log.seq_live;
+  vselector->sub_usage(log_file->vselector_hint, old_log_fnode);
+  vselector->add_usage(log_file->vselector_hint, log_file->fnode);
 
-  super.memorized_layout = layout;
-  super.log_fnode = log_file->fnode;
-  // rename device if needed
-  if (log_dev != log_dev_new) {
-    dout(10) << __func__ << " renaming log extents to " << log_dev_new << dendl;
-    for (auto& p : super.log_fnode.extents) {
-      p.bdev = log_dev_new;
-    }
-  }
-  dout(10) << __func__ << " writing super, log fnode: " << super.log_fnode << dendl;
+  // Part 3.
+  // Write out new superblock to reflect all the changes.
+  //
 
-  ++super.version;
+  super.memorized_layout = layout;
   _write_super(super_dev);
   _flush_bdev();
 
-  dout(10) << __func__ << " release old log extents " << old_fnode.extents << dendl;
-  std::lock_guard dl(dirty.lock);
-  for (auto& r : old_fnode.extents) {
-    dirty.pending_release[r.bdev].insert(r.offset, r.length);
+  // we're mostly done
+  dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
+  logger->inc(l_bluefs_log_compactions);
+
+  // Part 4
+  // Finalization. Release old space.
+  //
+  {
+    dout(10) << __func__
+             << " release old log extents " << old_log_fnode.extents
+             << dendl;
+    std::lock_guard dl(dirty.lock);
+    for (auto& r : old_log_fnode.extents) {
+      dirty.pending_release[r.bdev].insert(r.offset, r.length);
+    }
   }
+  logger->tinc(l_bluefs_compaction_lock_lat, mono_clock::now() - t0);
 }
 
 /*
- * 1. Allocate a new extent to continue the log, and then log an event
- * that jumps the log write position to the new extent.  At this point, the
- * old extent(s) won't be written to, and reflect everything to compact.
- * New events will be written to the new region that we'll keep.
+ * ASYNC LOG COMPACTION
  *
- * 2. While still holding the lock, encode a bufferlist that dumps all of the
- * in-memory fnodes and names.  This will become the new beginning of the
- * log.  The last event will jump to the log continuation extent from #1.
- *
- * 3. Queue a write to a new extent for the new beginnging of the log.
+ * 0. Lock the log and forbid its extension. The former covers just
+ *    a part of the below procedure while the latter spans over it
+ *    completely.
+ * 1. Allocate a new extent to continue the log, and then log an event
+ *    that jumps the log write position to the new extent.  At this point, the
+ *    old extent(s) won't be written to, and reflect everything to compact.
+ *    New events will be written to the new region that we'll keep.
+ *    The latter will finally become new log tail on compaction completion.
  *
- * 4. Drop lock and wait
+ * 2. Build new log. It will include log's starter, compacted metadata
+ *    body and the above tail. Jump ops appended to the starter and meta body
+ *    will link the pieces togather. Log's lock is releases in the mid of the
+ *    process to permit parallel access to it.
  *
- * 5. Retake the lock.
+ * 3. Write out new log's content.
  *
- * 6. Update the log_fnode to splice in the new beginning.
+ * 4. Write out new superblock to reflect all the changes.
  *
- * 7. Write the new superblock.
+ * 5. Apply new log fnode, log is locked for a while.
  *
- * 8. Release the old log space.  Clean up.
+ * 6. Finalization. Clean up, old space release and total unlocking.
  */
 
 void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer
 {
   dout(10) << __func__ << dendl;
+  utime_t mtime = ceph_clock_now();
+  uint64_t starter_seq = 1;
+  uint64_t old_log_jump_to = 0;
+
+  // Part 0.
+  // Lock the log and forbid its expansion and other compactions
+
   // only one compaction allowed at one time
   bool old_is_comp = std::atomic_exchange(&log_is_compacting, true);
   if (old_is_comp) {
     dout(10) << __func__ << " ongoing" <<dendl;
     return;
   }
-
+  // lock log's run-time structures for a while
   log.lock.lock();
-  File *log_file = log.writer->file.get();
-  FileWriter *new_log_writer = nullptr;
-  FileRef new_log = nullptr;
-  uint64_t new_log_jump_to = 0;
-  uint64_t old_log_jump_to = 0;
-
-  new_log = ceph::make_ref<File>();
-  new_log->fnode.ino = 0;   // we use _flush_special to avoid log of the fnode
+  auto t0 = mono_clock::now();
 
   // Part 1.
   // Prepare current log for jumping into it.
@@ -2588,25 +2752,42 @@ void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer
   bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true);
   ceph_assert(old_forbidden == false);
 
-  vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
+  //
+  // Part 1.
+  // Prepare current log for jumping into it.
+  // 1.1. Allocate extent
+  // 1.2. Save log's fnode extents and add new extents
+  // 1.3. Update op to log
+  // 1.4. Jump op to log
+  // During that, no one else can write to log, otherwise we risk jumping backwards.
+  // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that.
 
-  // 1.1 allocate new log space and jump to it.
+  // 1.1 allocate new log extents and store them at fnode_tail
+  File *log_file = log.writer->file.get();
   old_log_jump_to = log_file->fnode.get_allocated();
+  bluefs_fnode_t fnode_tail;
   uint64_t runway = log_file->fnode.get_allocated() - log.writer->get_effective_write_pos();
   dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
-           << " need 0x" << (old_log_jump_to + cct->_conf->bluefs_max_log_runway) << std::dec << dendl;
+           << " need 0x" << cct->_conf->bluefs_max_log_runway << std::dec << dendl;
   int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
                    cct->_conf->bluefs_max_log_runway,
-                    &log_file->fnode);
+                    0,
+                    &fnode_tail);
   ceph_assert(r == 0);
+
+  // 1.2 save log's fnode extents and add new extents
+  bluefs_fnode_t old_log_fnode(log_file->fnode);
+  log_file->fnode.clone_extents(fnode_tail);
   //adjust usage as flush below will need it
+  vselector->sub_usage(log_file->vselector_hint, old_log_fnode);
   vselector->add_usage(log_file->vselector_hint, log_file->fnode);
   dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
 
-  // update the log file change and log a jump to the offset where we want to
+  // 1.3 update the log file change and log a jump to the offset where we want to
   // write the new entries
-  log.t.op_file_update(log_file->fnode);
-  // jump to new position should mean next seq
+  log.t.op_file_update_inc(log_file->fnode);
+
+  // 1.4 jump to new position should mean next seq
   log.t.op_jump(log.seq_live + 1, old_log_jump_to);
   uint64_t seq_now = log.seq_live;
   // we need to flush all bdev because we will be streaming all dirty files to log
@@ -2615,143 +2796,203 @@ void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer
   _flush_bdev();
   _flush_and_sync_log_jump_D(old_log_jump_to, runway);
 
-  // out of jump section
-
-  // 2. prepare compacted log
-  bluefs_transaction_t t;
-  _compact_log_async_dump_metadata_NF(&t, seq_now);
-
-  // now state is captured to bufferlist
-  // log can be used to write to, ops in log will be continuation of captured state
+  //
+  // Part 2.
+  // Build new log starter and compacted metadata body
+  // 2.1.  Build full compacted meta transaction.
+  //       While still holding the lock, encode a bluefs transaction
+  //       that dumps all of the in-memory fnodes and names.
+  //       This might be pretty large and its allocation map can exceed
+  //       superblock size. Hence instead we'll need log starter part which
+  //       goes to superblock and refers that new meta through op_update_inc.
+  // 2.2.  After releasing the lock allocate space for the above transaction
+  //       using its size estimation.
+  //       Then build tailing list of extents which consists of these
+  //       newly allocated extents followed by ones from Part 1.
+  // 2.3.  Allocate the space required for the starter part of the new log.
+  //       It should be small enough to fit into superblock.
+  //       Effectively we start building new log fnode here.
+  // 2.4.  Store starter fnode to run-time superblock, to be written out later
+  // 2.5.  Finalize new log's fnode building
+  //       This will include log's starter and tailing extents built at 2.2
+  // 2.6.  Encode new log fnode starter,
+  //       It will include op_init, new log's op_update_inc
+  //       and jump to the compacted meta transaction beginning.
+  //       Superblock will reference this starter part
+  // 2.7.  Encode compacted meta transaction,
+  //       extend the transaction with a jump to the log tail from 1.1 before
+  //       encoding.
+  //
+
+  // 2.1 Build full compacted meta transaction
+  bluefs_transaction_t compacted_meta_t;
+  _compact_log_dump_metadata_NF(starter_seq + 1, &compacted_meta_t, 0, seq_now);
+
+  // now state is captured to compacted_meta_t,
+  // current log can be used to write to,
+  //ops in log will be continuation of captured state
+  logger->tinc(l_bluefs_compaction_lock_lat, mono_clock::now() - t0);
   log.lock.unlock();
 
-  uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL],
-                                    std::max(alloc_size[BDEV_DB],
-                                             alloc_size[BDEV_SLOW]));
+  // 2.2 Allocate the space required for the compacted meta transaction
+  uint64_t compacted_meta_need = _estimate_transaction_size(&compacted_meta_t);
+  dout(20) << __func__ << " compacted_meta_need " << compacted_meta_need
+           << dendl;
+  {
+    bluefs_fnode_t fnode_pre_tail;
+    // do allocate
+    r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
+                  compacted_meta_need,
+                  0,
+                  &fnode_pre_tail);
+    ceph_assert(r == 0);
+    // build trailing list of extents in fnode_tail,
+    // this will include newly allocated extents for compacted meta
+    // and aux extents allocated at step 1.1
+    fnode_pre_tail.claim_extents(fnode_tail.extents);
+    fnode_tail.swap_extents(fnode_pre_tail);
+  }
 
-  // conservative estimate for final encoded size
-  new_log_jump_to = round_up_to(t.op_bl.length() + super.block_size * 2,
-                                max_alloc_size);
-  //newly constructed log head will jump to what we had before
-  t.op_jump(seq_now, new_log_jump_to);
-
-  // allocate
-  //FIXME: check if we want DB here?
-  r = _allocate(BlueFS::BDEV_DB, new_log_jump_to,
-                    &new_log->fnode);
+  // 2.3 Allocate the space required for the starter part of the new log.
+  // Start building New log fnode
+  FileRef new_log = nullptr;
+  new_log = ceph::make_ref<File>();
+  new_log->fnode.ino = log_file->fnode.ino;
+  new_log->fnode.mtime = mtime;
+  // Estimate the required space
+  uint64_t starter_need =
+    _make_initial_transaction(starter_seq, fnode_tail, 0, nullptr);
+  // and now allocate and store at new_log_fnode
+  r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
+                starter_need,
+                0,
+                &new_log->fnode);
   ceph_assert(r == 0);
 
-  bufferlist bl;
-  encode(t, bl);
-  _pad_bl(bl);
+  // 2.4 Store starter fnode to run-time superblock, to be written out later
+  super.log_fnode = new_log->fnode;
 
-  dout(10) << __func__ << " new_log_jump_to 0x" << std::hex << new_log_jump_to
-          << std::dec << dendl;
+  // 2.5 Finalize new log's fnode building
+  // start collecting new log fnode updates (to make op_update_inc later)
+  // since this point. This will include compacted meta from 2.2 and aux
+  // extents from 1.1.
+  new_log->fnode.reset_delta();
+  new_log->fnode.claim_extents(fnode_tail.extents);
 
-  new_log_writer = _create_writer(new_log);
+  // 2.6 Encode new log fnode
+  bufferlist starter_bl;
+  _make_initial_transaction(starter_seq, new_log->fnode, starter_need,
+    &starter_bl);
 
-  new_log_writer->append(bl);
-  // 3. flush
+  // 2.7 Encode compacted meta transaction,
+  dout(20) << __func__
+           << " new_log jump seq " << seq_now
+           << std::hex << " offset 0x" << starter_need + compacted_meta_need
+          << std::dec << dendl;
+  // Extent compacted_meta transaction with a just to new log tail.
+  // Hopefully "compact_meta_need" estimation provides enough extra space
+  // for this new jump, assert below if not
+  compacted_meta_t.op_jump(seq_now, starter_need + compacted_meta_need);
+  // Now do encodeing and padding
+  bufferlist compacted_meta_bl;
+  compacted_meta_bl.reserve(compacted_meta_need);
+  encode(compacted_meta_t, compacted_meta_bl);
+  ceph_assert(compacted_meta_bl.length() <= compacted_meta_need);
+  _pad_bl(compacted_meta_bl, compacted_meta_need);
+
+  //
+  // Part 3.
+  // Write out new log's content
+  // 3.1 Stage new log's content writing
+  // 3.2 Do flush and wait for completion through flush_bdev()
+  //
+
+  // 3.1 Stage new log's content writing
+  // Make new log writer and append bufferlists to write out.
+  FileWriter *new_log_writer = _create_writer(new_log);
+  // And append all new log's bufferlists to write out.
+  new_log_writer->append(starter_bl);
+  new_log_writer->append(compacted_meta_bl);
+
+  // 3.2. flush and wait
   _flush_special(new_log_writer);
+  _flush_bdev(new_log_writer, false); // do not check log.lock is locked
 
-  // 4. wait
-  _flush_bdev(new_log_writer);
-  // 5. update our log fnode
-  // we need to append to new_log the extents that were allocated in step 1.1
-  // we do it by inverse logic - we drop 'old_log_jump_to' bytes and keep rest
-  // todo - maybe improve _allocate so we will give clear set of new allocations
-  uint64_t processed = 0;
-  mempool::bluefs::vector<bluefs_extent_t> old_extents;
-  for (auto& e : log_file->fnode.extents) {
-    if (processed + e.length <= old_log_jump_to) {
-      // drop whole extent
-      dout(10) << __func__ << " remove old log extent " << e << dendl;
-      old_extents.push_back(e);
-    } else {
-      // keep, but how much?
-      if (processed < old_log_jump_to) {
-       ceph_assert(processed + e.length > old_log_jump_to);
-       ceph_assert(old_log_jump_to - processed <= std::numeric_limits<uint32_t>::max());
-       uint32_t cut_at = uint32_t(old_log_jump_to - processed);
-       // need to cut, first half gets dropped
-       bluefs_extent_t retire(e.bdev, e.offset, cut_at);
-       old_extents.push_back(retire);
-       // second half goes to new log
-       bluefs_extent_t keep(e.bdev, e.offset + cut_at, e.length - cut_at);
-       new_log->fnode.append_extent(keep);
-       dout(10) << __func__ << " kept " << keep << " removed " << retire << dendl;
-      } else {
-       // take entire extent
-       ceph_assert(processed >= old_log_jump_to);
-       new_log->fnode.append_extent(e);
-       dout(10) << __func__ << " kept " << e << dendl;
-      }
-    }
-    processed += e.length;
-  }
-  // we will write it to super
-  new_log->fnode.reset_delta();
+  // Part 4.
+  // Write out new superblock to reflect all the changes.
+  //
 
-  // 6. write the super block to reflect the changes
-  dout(10) << __func__ << " writing super" << dendl;
-  new_log->fnode.ino = log_file->fnode.ino;
-  new_log->fnode.size = 0;
-  new_log->fnode.mtime = ceph_clock_now();
-  super.log_fnode = new_log->fnode;
-  ++super.version;
   _write_super(BDEV_DB);
   _flush_bdev();
 
+  // Part 5.
+  // Apply new log fnode
+  //
+
+  // we need to acquire log's lock back at this point
   log.lock.lock();
-  // swapping log_file and new_log
+  // Reconstruct actual log object from the new one.
   vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
-
-  // clear the extents from old log file, they are added to new log
-  log_file->fnode.clear_extents();
-  // swap the log files. New log file is the log file now.
-  new_log->fnode.swap_extents(log_file->fnode);
-
-  log.writer->pos = log.writer->file->fnode.size =
-    log.writer->pos - old_log_jump_to + new_log_jump_to;
-
+  log_file->fnode.size =
+    log.writer->pos - old_log_jump_to + starter_need + compacted_meta_need;
+  log_file->fnode.mtime = std::max(mtime, log_file->fnode.mtime);
+  log_file->fnode.swap_extents(new_log->fnode);
+  // update log's writer
+  log.writer->pos = log.writer->file->fnode.size;
   vselector->add_usage(log_file->vselector_hint, log_file->fnode);
-
+  // and unlock
   log.lock.unlock();
 
+  // we're mostly done
+  dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
+  logger->inc(l_bluefs_log_compactions);
+
+  //Part 6.
+  // Finalization
+  // 6.1 Permit log's extension, forbidden at step 0.
+  //
+  // 6.2 Release the new log writer
+  //
+  // 6.3 Release old space
+  //
+  // 6.4. Enable other compactions
+  //
+
+  // 6.1 Permit log's extension, forbidden at step 0.
   old_forbidden = atomic_exchange(&log_forbidden_to_expand, false);
   ceph_assert(old_forbidden == true);
   //to wake up if someone was in need of expanding log
   log_cond.notify_all();
 
-  // 7. release old space
-  dout(10) << __func__ << " release old log extents " << old_extents << dendl;
+  // 6.2 Release the new log writer
+  _close_writer(new_log_writer);
+  new_log_writer = nullptr;
+  new_log = nullptr;
+
+  // 6.3 Release old space
   {
+    dout(10) << __func__
+             << " release old log extents " << old_log_fnode.extents
+             << dendl;
     std::lock_guard dl(dirty.lock);
-    for (auto& r : old_extents) {
+    for (auto& r : old_log_fnode.extents) {
       dirty.pending_release[r.bdev].insert(r.offset, r.length);
     }
   }
 
-  // delete the new log, remove from the dirty files list
-  _close_writer(new_log_writer);
-  new_log_writer = nullptr;
-  new_log = nullptr;
-  log_cond.notify_all();
-
-  dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
-  logger->inc(l_bluefs_log_compactions);
-
+  // 6.4. Enable other compactions
   old_is_comp = atomic_exchange(&log_is_compacting, false);
   ceph_assert(old_is_comp);
 }
 
-void BlueFS::_pad_bl(bufferlist& bl)
+void BlueFS::_pad_bl(bufferlist& bl, uint64_t pad_size)
 {
-  uint64_t partial = bl.length() % super.block_size;
+  pad_size = std::max(pad_size, uint64_t(super.block_size));
+  uint64_t partial = bl.length() % pad_size;
   if (partial) {
     dout(10) << __func__ << " padding with 0x" << std::hex
-            << super.block_size - partial << " zeros" << std::dec << dendl;
-    bl.append_zero(super.block_size - partial);
+            << pad_size - partial << " zeros" << std::dec << dendl;
+    bl.append_zero(pad_size - partial);
   }
 }
 
@@ -2832,6 +3073,7 @@ int64_t BlueFS::_maybe_extend_log()
     int r = _allocate(
       vselector->select_prefer_bdev(log.writer->file->vselector_hint),
       cct->_conf->bluefs_max_log_runway,
+      0,
       &log.writer->file->fnode);
     ceph_assert(r == 0);
     vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
@@ -3128,6 +3370,7 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length)
     // in _flush_and_sync_log.
     int r = _allocate(vselector->select_prefer_bdev(h->file->vselector_hint),
                      offset + length - allocated,
+                      0,
                      &h->file->fnode);
     if (r < 0) {
       derr << __func__ << " allocated: 0x" << std::hex << allocated
@@ -3434,12 +3677,14 @@ int BlueFS::fsync(FileWriter *h)/*_WF_WD_WLD_WLNF_WNF*/
 }
 
 // be careful - either h->file->lock or log.lock must be taken
-void BlueFS::_flush_bdev(FileWriter *h)
+void BlueFS::_flush_bdev(FileWriter *h, bool check_mutext_locked)
 {
-  if (h->file->fnode.ino > 1) {
-    ceph_assert(ceph_mutex_is_locked(h->lock));
-  } else if (h->file->fnode.ino == 1) {
-    ceph_assert(ceph_mutex_is_locked(log.lock));
+  if (check_mutext_locked) {
+    if (h->file->fnode.ino > 1) {
+      ceph_assert(ceph_mutex_is_locked(h->lock));
+    } else if (h->file->fnode.ino == 1) {
+      ceph_assert(ceph_mutex_is_locked(log.lock));
+    }
   }
   std::array<bool, MAX_BDEV> flush_devs = h->dirty_devs;
   h->dirty_devs.fill(false);
@@ -3484,86 +3729,107 @@ const char* BlueFS::get_device_name(unsigned id)
   return names[id];
 }
 
-int BlueFS::_allocate_without_fallback(uint8_t id, uint64_t len,
-                     PExtentVector* extents)
-{
-  dout(10) << __func__ << " len 0x" << std::hex << len << std::dec
-           << " from " << (int)id << dendl;
-  assert(id < alloc.size());
-  if (!alloc[id]) {
-    return -ENOENT;
-  }
-  extents->reserve(4);  // 4 should be (more than) enough for most allocations
-  int64_t need = round_up_to(len, alloc_size[id]);
-  int64_t alloc_len = alloc[id]->allocate(need, alloc_size[id], 0, extents);
-  if (alloc_len < 0 || alloc_len < need) {
-    if (alloc_len > 0) {
-      alloc[id]->release(*extents);
-    }
-    derr << __func__ << " unable to allocate 0x" << std::hex << need
-        << " on bdev " << (int)id
-         << ", allocator name " << alloc[id]->get_name()
-         << ", allocator type " << alloc[id]->get_type()
-         << ", capacity 0x" << alloc[id]->get_capacity()
-         << ", block size 0x" << alloc[id]->get_block_size()
-         << ", alloc size 0x" << alloc_size[id]
-         << ", free 0x" << alloc[id]->get_free()
-         << ", fragmentation " << alloc[id]->get_fragmentation()
-         << ", allocated 0x" << (alloc_len > 0 ? alloc_len : 0)
-        << std::dec << dendl;
-    alloc[id]->dump();
-    return -ENOSPC;
-  }
-  if (is_shared_alloc(id)) {
-    shared_alloc->bluefs_used += alloc_len;
-  }
-
-  return 0;
-}
-
 int BlueFS::_allocate(uint8_t id, uint64_t len,
-                     bluefs_fnode_t* node)
+                     uint64_t alloc_unit,
+                     bluefs_fnode_t* node,
+                      size_t alloc_attempts,
+                      bool permit_dev_fallback)
 {
-  dout(10) << __func__ << " len 0x" << std::hex << len << std::dec
-           << " from " << (int)id << dendl;
+  dout(10) << __func__ << " len 0x" << std::hex << len
+           << " au 0x" << alloc_unit
+           << std::dec << " from " << (int)id
+           << " cooldown " << cooldown_deadline
+           << dendl;
   ceph_assert(id < alloc.size());
   int64_t alloc_len = 0;
   PExtentVector extents;
   uint64_t hint = 0;
   int64_t need = len;
+  bool shared = is_shared_alloc(id);
+  auto shared_unit = shared_alloc ? shared_alloc->alloc_unit : 0;
+  bool was_cooldown = false;
   if (alloc[id]) {
-    need = round_up_to(len, alloc_size[id]);
+    if (!alloc_unit) {
+      alloc_unit = alloc_size[id];
+    }
+    // do not attempt shared_allocator with bluefs alloc unit
+    // when cooling down, fallback to slow dev alloc unit.
+    if (shared && alloc_unit != shared_unit) {
+       if (duration_cast<seconds>(real_clock::now().time_since_epoch()).count() <
+           cooldown_deadline) {
+         logger->inc(l_bluefs_alloc_shared_size_fallbacks);
+         alloc_unit = shared_unit;
+         was_cooldown = true;
+       } else if (cooldown_deadline.fetch_and(0)) {
+         // we might get false cooldown_deadline reset at this point
+         // but that's mostly harmless.
+         dout(1) << __func__ << " shared allocation cooldown period elapsed"
+                 << dendl;
+       }
+    }
+    need = round_up_to(len, alloc_unit);
     if (!node->extents.empty() && node->extents.back().bdev == id) {
       hint = node->extents.back().end();
     }   
+    ++alloc_attempts;
     extents.reserve(4);  // 4 should be (more than) enough for most allocations
-    alloc_len = alloc[id]->allocate(need, alloc_size[id], hint, &extents);
+    alloc_len = alloc[id]->allocate(need, alloc_unit, hint, &extents);
   }
   if (alloc_len < 0 || alloc_len < need) {
     if (alloc[id]) {
       if (alloc_len > 0) {
         alloc[id]->release(extents);
       }
+      if (!was_cooldown && shared) {
+        auto delay_s = cct->_conf->bluefs_failed_shared_alloc_cooldown;
+        cooldown_deadline = delay_s +
+          duration_cast<seconds>(real_clock::now().time_since_epoch()).count();
+        dout(1) << __func__ << " shared allocation cooldown set for "
+                << delay_s << "s"
+                << dendl;
+      }
       dout(1) << __func__ << " unable to allocate 0x" << std::hex << need
              << " on bdev " << (int)id
               << ", allocator name " << alloc[id]->get_name()
               << ", allocator type " << alloc[id]->get_type()
               << ", capacity 0x" << alloc[id]->get_capacity()
               << ", block size 0x" << alloc[id]->get_block_size()
-              << ", alloc size 0x" << alloc_size[id]
+              << ", alloc unit 0x" << alloc_unit
               << ", free 0x" << alloc[id]->get_free()
               << ", fragmentation " << alloc[id]->get_fragmentation()
               << ", allocated 0x" << (alloc_len > 0 ? alloc_len : 0)
              << std::dec << dendl;
     } else {
-      dout(20) << __func__ << " alloc-id not set on index="<< (int)id << " unable to allocate 0x" << std::hex << need
+      dout(20) << __func__ << " alloc-id not set on index="<< (int)id
+               << " unable to allocate 0x" << std::hex << need
               << " on bdev " << (int)id << std::dec << dendl;
     }
-    if (id != BDEV_SLOW) {
+    if (alloc[id] && shared && alloc_unit != shared_unit) {
+      alloc_unit = shared_unit;
+      dout(20) << __func__ << " fallback to bdev "
+              << (int)id
+               << " with alloc unit 0x" << std::hex << alloc_unit
+               << std::dec << dendl;
+      logger->inc(l_bluefs_alloc_shared_size_fallbacks);
+      return _allocate(id,
+                       len,
+                       alloc_unit,
+                       node,
+                       alloc_attempts,
+                       permit_dev_fallback);
+    } else if (permit_dev_fallback && id != BDEV_SLOW && alloc[id + 1]) {
       dout(20) << __func__ << " fallback to bdev "
               << (int)id + 1
               << dendl;
-      return _allocate(id + 1, len, node);
+      if (alloc_attempts > 0 && is_shared_alloc(id + 1)) {
+        logger->inc(l_bluefs_alloc_shared_dev_fallbacks);
+      }
+      return _allocate(id + 1,
+                       len,
+                       0, // back to default alloc unit
+                       node,
+                       alloc_attempts,
+                       permit_dev_fallback);
     } else {
       derr << __func__ << " allocation failed, needed 0x" << std::hex << need
            << dendl;
@@ -3575,7 +3841,7 @@ int BlueFS::_allocate(uint8_t id, uint64_t len,
       logger->set(max_bytes_pcounters[id], used);
       max_bytes[id] = used;
     }
-    if (is_shared_alloc(id)) {
+    if (shared) {
       shared_alloc->bluefs_used += alloc_len;
     }
   }
@@ -3605,6 +3871,7 @@ int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len)/*_LF*/
     vselector->sub_usage(f->vselector_hint, f->fnode);
     int r = _allocate(vselector->select_prefer_bdev(f->vselector_hint),
       want,
+      0,
       &f->fnode);
     vselector->add_usage(f->vselector_hint, f->fnode);
     if (r < 0)
@@ -3644,11 +3911,13 @@ void BlueFS::_maybe_compact_log_LNF_NF_LD_D()
 {
   if (!cct->_conf->bluefs_replay_recovery_disable_compact &&
       _should_start_compact_log_L_N()) {
+    auto t0 = mono_clock::now();
     if (cct->_conf->bluefs_compact_log_sync) {
       _compact_log_sync_LNF_LD();
     } else {
       _compact_log_async_LD_LNF_D();
     }
+    logger->tinc(l_bluefs_compaction_lat, mono_clock::now() - t0);
   }
 }