]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_compaction_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_compaction_test.cc
index 3d24945eb3d4dfdf0bcaa7cf1460f944dbe316ac..ba9c50b9aae8c5f77c759fd20383ad966e8df540 100644 (file)
@@ -7,7 +7,11 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
+#include <tuple>
+
+#include "db/blob/blob_index.h"
 #include "db/db_test_util.h"
+#include "env/mock_env.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
 #include "rocksdb/concurrent_task_limiter.h"
 #include "rocksdb/sst_file_writer.h"
 #include "rocksdb/utilities/convenience.h"
 #include "test_util/sync_point.h"
+#include "test_util/testutil.h"
 #include "util/concurrent_task_limiter_impl.h"
 #include "util/random.h"
 #include "utilities/fault_injection_env.h"
+#include "utilities/fault_injection_fs.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 // SYNC_POINT is not supported in released Windows mode.
 #if !defined(ROCKSDB_LITE)
 
+class CompactionStatsCollector : public EventListener {
+ public:
+  CompactionStatsCollector()
+      : compaction_completed_(
+            static_cast<int>(CompactionReason::kNumOfReasons)) {
+    for (auto& v : compaction_completed_) {
+      v.store(0);
+    }
+  }
+
+  ~CompactionStatsCollector() override {}
+
+  void OnCompactionCompleted(DB* /* db */,
+                             const CompactionJobInfo& info) override {
+    int k = static_cast<int>(info.compaction_reason);
+    int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
+    assert(k >= 0 && k < num_of_reasons);
+    compaction_completed_[k]++;
+  }
+
+  void OnExternalFileIngested(
+      DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
+    int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
+    compaction_completed_[k]++;
+  }
+
+  void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
+    int k = static_cast<int>(CompactionReason::kFlush);
+    compaction_completed_[k]++;
+  }
+
+  int NumberOfCompactions(CompactionReason reason) const {
+    int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
+    int k = static_cast<int>(reason);
+    assert(k >= 0 && k < num_of_reasons);
+    return compaction_completed_.at(k).load();
+  }
+
+ private:
+  std::vector<std::atomic<int>> compaction_completed_;
+};
+
 class DBCompactionTest : public DBTestBase {
  public:
   DBCompactionTest()
-      : DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {}
+      : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {}
+
+ protected:
+  /*
+   * Verifies compaction stats of cfd are valid.
+   *
+   * For each level of cfd, its compaction stats are valid if
+   * 1) sum(stat.counts) == stat.count, and
+   * 2) stat.counts[i] == collector.NumberOfCompactions(i)
+   */
+  void VerifyCompactionStats(ColumnFamilyData& cfd,
+                             const CompactionStatsCollector& collector) {
+#ifndef NDEBUG
+    InternalStats* internal_stats_ptr = cfd.internal_stats();
+    ASSERT_NE(internal_stats_ptr, nullptr);
+    const std::vector<InternalStats::CompactionStats>& comp_stats =
+        internal_stats_ptr->TEST_GetCompactionStats();
+    const int num_of_reasons =
+        static_cast<int>(CompactionReason::kNumOfReasons);
+    std::vector<int> counts(num_of_reasons, 0);
+    // Count the number of compactions caused by each CompactionReason across
+    // all levels.
+    for (const auto& stat : comp_stats) {
+      int sum = 0;
+      for (int i = 0; i < num_of_reasons; i++) {
+        counts[i] += stat.counts[i];
+        sum += stat.counts[i];
+      }
+      ASSERT_EQ(sum, stat.count);
+    }
+    // Verify InternalStats bookkeeping matches that of
+    // CompactionStatsCollector, assuming that all compactions complete.
+    for (int i = 0; i < num_of_reasons; i++) {
+      ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)),
+                counts[i]);
+    }
+#endif /* NDEBUG */
+  }
 };
 
 class DBCompactionTestWithParam
@@ -35,7 +120,7 @@ class DBCompactionTestWithParam
       public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
  public:
   DBCompactionTestWithParam()
-      : DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {
+      : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {
     max_subcompactions_ = std::get<0>(GetParam());
     exclusive_manual_compaction_ = std::get<1>(GetParam());
   }
@@ -53,7 +138,7 @@ class DBCompactionTestWithBottommostParam
       public testing::WithParamInterface<BottommostLevelCompaction> {
  public:
   DBCompactionTestWithBottommostParam()
-      : DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {
+      : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {
     bottommost_level_compaction_ = GetParam();
   }
 
@@ -76,8 +161,32 @@ class ChangeLevelConflictsWithAuto
   ChangeLevelConflictsWithAuto() : DBCompactionTest() {}
 };
 
-namespace {
+// Param = true: grab the compaction pressure token (enable
+// parallel compactions)
+// Param = false: Not grab the token (no parallel compactions)
+class RoundRobinSubcompactionsAgainstPressureToken
+    : public DBCompactionTest,
+      public ::testing::WithParamInterface<bool> {
+ public:
+  RoundRobinSubcompactionsAgainstPressureToken() {
+    grab_pressure_token_ = GetParam();
+  }
+  bool grab_pressure_token_;
+};
+
+class RoundRobinSubcompactionsAgainstResources
+    : public DBCompactionTest,
+      public ::testing::WithParamInterface<std::tuple<int, int>> {
+ public:
+  RoundRobinSubcompactionsAgainstResources() {
+    total_low_pri_threads_ = std::get<0>(GetParam());
+    max_compaction_limits_ = std::get<1>(GetParam());
+  }
+  int total_low_pri_threads_;
+  int max_compaction_limits_;
+};
 
+namespace {
 class FlushedFileCollector : public EventListener {
  public:
   FlushedFileCollector() {}
@@ -104,47 +213,6 @@ class FlushedFileCollector : public EventListener {
   std::mutex mutex_;
 };
 
-class CompactionStatsCollector : public EventListener {
-public:
-  CompactionStatsCollector()
-      : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
-    for (auto& v : compaction_completed_) {
-      v.store(0);
-    }
-  }
-
-  ~CompactionStatsCollector() override {}
-
-  void OnCompactionCompleted(DB* /* db */,
-                             const CompactionJobInfo& info) override {
-    int k = static_cast<int>(info.compaction_reason);
-    int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
-    assert(k >= 0 && k < num_of_reasons);
-    compaction_completed_[k]++;
-  }
-
-  void OnExternalFileIngested(
-      DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
-    int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
-    compaction_completed_[k]++;
-  }
-
-  void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
-    int k = static_cast<int>(CompactionReason::kFlush);
-    compaction_completed_[k]++;
-  }
-
-  int NumberOfCompactions(CompactionReason reason) const {
-    int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
-    int k = static_cast<int>(reason);
-    assert(k >= 0 && k < num_of_reasons);
-    return compaction_completed_.at(k).load();
-  }
-
-private:
-  std::vector<std::atomic<int>> compaction_completed_;
-};
-
 class SstStatsCollector : public EventListener {
  public:
   SstStatsCollector() : num_ssts_creation_started_(0) {}
@@ -176,12 +244,12 @@ Options DeletionTriggerOptions(Options options) {
       options.target_file_size_base * options.target_file_size_multiplier;
   options.max_bytes_for_level_multiplier = 2;
   options.disable_auto_compactions = false;
+  options.compaction_options_universal.max_size_amplification_percent = 100;
   return options;
 }
 
-bool HaveOverlappingKeyRanges(
-    const Comparator* c,
-    const SstFileMetaData& a, const SstFileMetaData& b) {
+bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
+                              const SstFileMetaData& b) {
   if (c->CompareWithoutTimestamp(a.smallestkey, b.smallestkey) >= 0) {
     if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) {
       // b.smallestkey <= a.smallestkey <= b.largestkey
@@ -206,18 +274,15 @@ bool HaveOverlappingKeyRanges(
 // Identifies all files between level "min_level" and "max_level"
 // which has overlapping key range with "input_file_meta".
 void GetOverlappingFileNumbersForLevelCompaction(
-    const ColumnFamilyMetaData& cf_meta,
-    const Comparator* comparator,
-    int min_level, int max_level,
-    const SstFileMetaData* input_file_meta,
+    const ColumnFamilyMetaData& cf_meta, const Comparator* comparator,
+    int min_level, int max_level, const SstFileMetaData* input_file_meta,
     std::set<std::string>* overlapping_file_names) {
   std::set<const SstFileMetaData*> overlapping_files;
   overlapping_files.insert(input_file_meta);
   for (int m = min_level; m <= max_level; ++m) {
     for (auto& file : cf_meta.levels[m].files) {
       for (auto* included_file : overlapping_files) {
-        if (HaveOverlappingKeyRanges(
-                comparator, *included_file, file)) {
+        if (HaveOverlappingKeyRanges(comparator, *included_file, file)) {
           overlapping_files.insert(&file);
           overlapping_file_names->insert(file.name);
           break;
@@ -240,46 +305,9 @@ void VerifyCompactionResult(
 #endif
 }
 
-/*
- * Verifies compaction stats of cfd are valid.
- *
- * For each level of cfd, its compaction stats are valid if
- * 1) sum(stat.counts) == stat.count, and
- * 2) stat.counts[i] == collector.NumberOfCompactions(i)
- */
-void VerifyCompactionStats(ColumnFamilyData& cfd,
-    const CompactionStatsCollector& collector) {
-#ifndef NDEBUG
-  InternalStats* internal_stats_ptr = cfd.internal_stats();
-  ASSERT_TRUE(internal_stats_ptr != nullptr);
-  const std::vector<InternalStats::CompactionStats>& comp_stats =
-      internal_stats_ptr->TEST_GetCompactionStats();
-  const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
-  std::vector<int> counts(num_of_reasons, 0);
-  // Count the number of compactions caused by each CompactionReason across
-  // all levels.
-  for (const auto& stat : comp_stats) {
-    int sum = 0;
-    for (int i = 0; i < num_of_reasons; i++) {
-      counts[i] += stat.counts[i];
-      sum += stat.counts[i];
-    }
-    ASSERT_EQ(sum, stat.count);
-  }
-  // Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
-  // assuming that all compactions complete.
-  for (int i = 0; i < num_of_reasons; i++) {
-    ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
-  }
-#endif /* NDEBUG */
-}
-
-const SstFileMetaData* PickFileRandomly(
-    const ColumnFamilyMetaData& cf_meta,
-    Random* rand,
-    int* level = nullptr) {
-  auto file_id = rand->Uniform(static_cast<int>(
-      cf_meta.file_count)) + 1;
+const SstFileMetaData* PickFileRandomly(const ColumnFamilyMetaData& cf_meta,
+                                        Random* rand, int* level = nullptr) {
+  auto file_id = rand->Uniform(static_cast<int>(cf_meta.file_count)) + 1;
   for (auto& level_meta : cf_meta.levels) {
     if (file_id <= level_meta.files.size()) {
       if (level != nullptr) {
@@ -295,7 +323,7 @@ const SstFileMetaData* PickFileRandomly(
 }
 }  // anonymous namespace
 
-#ifndef ROCKSDB_VALGRIND_RUN
+#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
 // All the TEST_P tests run once with sub_compactions disabled (i.e.
 // options.max_subcompactions = 1) and once with it enabled
 TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
@@ -323,101 +351,44 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
       values.push_back(rnd.RandomString(kCDTValueSize));
       ASSERT_OK(Put(Key(k), values[k]));
     }
-    dbfull()->TEST_WaitForFlushMemTable();
-    dbfull()->TEST_WaitForCompact();
-    db_size[0] = Size(Key(0), Key(kTestSize - 1));
-
-    for (int k = 0; k < kTestSize; ++k) {
-      ASSERT_OK(Delete(Key(k)));
-    }
-    dbfull()->TEST_WaitForFlushMemTable();
-    dbfull()->TEST_WaitForCompact();
-    db_size[1] = Size(Key(0), Key(kTestSize - 1));
-
-    // must have much smaller db size.
-    ASSERT_GT(db_size[0] / 3, db_size[1]);
-  }
-}
-#endif  // ROCKSDB_VALGRIND_RUN
-
-TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
-  //  For each options type we test following
-  //  - Enable preserve_deletes
-  //  - write bunch of keys and deletes
-  //  - Set start_seqnum to the beginning; compact; check that keys are present
-  //  - rewind start_seqnum way forward; compact; check that keys are gone
-
-  for (int tid = 0; tid < 3; ++tid) {
-    Options options = DeletionTriggerOptions(CurrentOptions());
-    options.max_subcompactions = max_subcompactions_;
-    options.preserve_deletes=true;
-    options.num_levels = 2;
-
-    if (tid == 1) {
-      options.skip_stats_update_on_db_open = true;
-    } else if (tid == 2) {
-      // third pass with universal compaction
-      options.compaction_style = kCompactionStyleUniversal;
-    }
-
-    DestroyAndReopen(options);
-    Random rnd(301);
-    // highlight the default; all deletes should be preserved
-    SetPreserveDeletesSequenceNumber(0);
-
-    const int kTestSize = kCDTKeysPerBuffer;
-    std::vector<std::string> values;
-    for (int k = 0; k < kTestSize; ++k) {
-      values.push_back(rnd.RandomString(kCDTValueSize));
-      ASSERT_OK(Put(Key(k), values[k]));
-    }
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[0]));
 
     for (int k = 0; k < kTestSize; ++k) {
       ASSERT_OK(Delete(Key(k)));
     }
-    // to ensure we tackle all tombstones
-    CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = 2;
-    cro.bottommost_level_compaction =
-        BottommostLevelCompaction::kForceOptimized;
-
-    dbfull()->TEST_WaitForFlushMemTable();
-    dbfull()->CompactRange(cro, nullptr, nullptr);
-
-    // check that normal user iterator doesn't see anything
-    Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
-    int i = 0;
-    for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
-      i++;
-    }
-    ASSERT_EQ(i, 0);
-    delete db_iter;
-
-    // check that iterator that sees internal keys sees tombstones
-    ReadOptions ro;
-    ro.iter_start_seqnum=1;
-    db_iter = dbfull()->NewIterator(ro);
-    i = 0;
-    for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
-      i++;
-    }
-    ASSERT_EQ(i, 4);
-    delete db_iter;
-
-    // now all deletes should be gone
-    SetPreserveDeletesSequenceNumber(100000000);
-    dbfull()->CompactRange(cro, nullptr, nullptr);
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[1]));
 
-    db_iter = dbfull()->NewIterator(ro);
-    i = 0;
-    for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
-      i++;
+    if (options.compaction_style == kCompactionStyleUniversal) {
+      // Claim: in universal compaction none of the original data will remain
+      // once compactions settle.
+      //
+      // Proof: The compensated size of the file containing the most tombstones
+      // is enough on its own to trigger size amp compaction. Size amp
+      // compaction is a full compaction, so all tombstones meet the obsolete
+      // keys they cover.
+      ASSERT_EQ(0, db_size[1]);
+    } else {
+      // Claim: in level compaction at most `db_size[0] / 2` of the original
+      // data will remain once compactions settle.
+      //
+      // Proof: Assume the original data is all in the bottom level. If it were
+      // not, it would meet its tombstone sooner. The original data size is
+      // large enough to require fanout to bottom level to be greater than
+      // `max_bytes_for_level_multiplier == 2`. In the level just above,
+      // tombstones must cover less than `db_size[0] / 4` bytes since fanout >=
+      // 2 and file size is compensated by doubling the size of values we expect
+      // are covered (`kDeletionWeightOnCompaction == 2`). The tombstones in
+      // levels above must cover less than `db_size[0] / 8` bytes of original
+      // data, `db_size[0] / 16`, and so on.
+      ASSERT_GT(db_size[0] / 2, db_size[1]);
     }
-    ASSERT_EQ(i, 0);
-    delete db_iter;
   }
 }
+#endif  // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
 
 TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
   // This test verify UpdateAccumulatedStats is not on
@@ -468,9 +439,12 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
 TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
   Options options = CurrentOptions();
   options.env = env_;
-  options.new_table_reader_for_compaction_inputs = true;
   options.max_open_files = 20;
   options.level0_file_num_compaction_trigger = 3;
+  // Avoid many shards with small max_open_files, where as little as
+  // two table insertions could lead to an LRU eviction, depending on
+  // hash values.
+  options.table_cache_numshardbits = 2;
   DestroyAndReopen(options);
   Random rnd(301);
 
@@ -495,8 +469,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
     ASSERT_OK(Put(Key(10 - k), "bar"));
     if (k < options.level0_file_num_compaction_trigger - 1) {
       num_table_cache_lookup = 0;
-      Flush();
-      dbfull()->TEST_WaitForCompact();
+      ASSERT_OK(Flush());
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
       // preloading iterator issues one table cache lookup and create
       // a new table reader, if not preloaded.
       int old_num_table_cache_lookup = num_table_cache_lookup;
@@ -514,8 +488,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
 
   num_table_cache_lookup = 0;
   num_new_table_reader = 0;
-  Flush();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   // Preloading iterator issues one table cache lookup and creates
   // a new table reader. One file is created for flush and one for compaction.
   // Compaction inputs make no table cache look-up for data/range deletion
@@ -542,7 +516,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
   cro.change_level = true;
   cro.target_level = 2;
   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
-  db_->CompactRange(cro, nullptr, nullptr);
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
   // Only verifying compaction outputs issues one table cache lookup
   // for both data block and range deletion block).
   // May preload table cache too.
@@ -583,9 +557,9 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
       values.push_back(rnd.RandomString(kCDTValueSize));
       ASSERT_OK(Put(Key(k), values[k]));
     }
-    dbfull()->TEST_WaitForFlushMemTable();
-    dbfull()->TEST_WaitForCompact();
-    db_size[0] = Size(Key(0), Key(kTestSize - 1));
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[0]));
     Close();
 
     // round 2 --- disable auto-compactions and issue deletions.
@@ -596,11 +570,10 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
     for (int k = 0; k < kTestSize; ++k) {
       ASSERT_OK(Delete(Key(k)));
     }
-    db_size[1] = Size(Key(0), Key(kTestSize - 1));
+    ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[1]));
     Close();
-    // as auto_compaction is off, we shouldn't see too much reduce
-    // in db size.
-    ASSERT_LT(db_size[0] / 3, db_size[1]);
+    // as auto_compaction is off, we shouldn't see any reduction in db size.
+    ASSERT_LE(db_size[0], db_size[1]);
 
     // round 3 --- reopen db with auto_compaction on and see if
     // deletion compensation still work.
@@ -610,11 +583,17 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
     for (int k = 0; k < kTestSize / 10; ++k) {
       ASSERT_OK(Put(Key(k), values[k]));
     }
-    dbfull()->TEST_WaitForFlushMemTable();
-    dbfull()->TEST_WaitForCompact();
-    db_size[2] = Size(Key(0), Key(kTestSize - 1));
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[2]));
     // this time we're expecting significant drop in size.
-    ASSERT_GT(db_size[0] / 3, db_size[2]);
+    //
+    // See "CompactionDeletionTrigger" test for proof that at most
+    // `db_size[0] / 2` of the original data remains. In addition to that, this
+    // test inserts `db_size[0] / 10` to push the tombstones into SST files and
+    // then through automatic compactions. So in total `3 * db_size[0] / 5` of
+    // the original data may remain.
+    ASSERT_GT(3 * db_size[0] / 5, db_size[2]);
   }
 }
 
@@ -630,7 +609,7 @@ TEST_F(DBCompactionTest, CompactRangeBottomPri) {
     CompactRangeOptions cro;
     cro.change_level = true;
     cro.target_level = 2;
-    dbfull()->CompactRange(cro, nullptr, nullptr);
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
   }
   ASSERT_EQ("0,0,3", FilesPerLevel(0));
 
@@ -663,7 +642,7 @@ TEST_F(DBCompactionTest, CompactRangeBottomPri) {
       });
   SyncPoint::GetInstance()->EnableProcessing();
   env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
-  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
   ASSERT_EQ(1, low_pri_count);
   ASSERT_EQ(1, bottom_pri_count);
   ASSERT_EQ("0,0,2", FilesPerLevel(0));
@@ -671,12 +650,12 @@ TEST_F(DBCompactionTest, CompactRangeBottomPri) {
   // Recompact bottom most level uses bottom pool
   CompactRangeOptions cro;
   cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
-  dbfull()->CompactRange(cro, nullptr, nullptr);
+  ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
   ASSERT_EQ(1, low_pri_count);
   ASSERT_EQ(2, bottom_pri_count);
 
   env_->SetBackgroundThreads(0, Env::Priority::BOTTOM);
-  dbfull()->CompactRange(cro, nullptr, nullptr);
+  ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
   // Low pri pool is used if bottom pool has size 0.
   ASSERT_EQ(2, low_pri_count);
   ASSERT_EQ(2, bottom_pri_count);
@@ -701,9 +680,16 @@ TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
       values.push_back(rnd.RandomString(kCDTValueSize));
       ASSERT_OK(Put(Key(k), values[k]));
     }
-    dbfull()->TEST_WaitForFlushMemTable();
-    dbfull()->TEST_WaitForCompact();
-    db_size[0] = Size(Key(0), Key(kTestSize - 1));
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    // L1 and L2 can fit deletions iff size compensation does not take effect,
+    // i.e., when `skip_stats_update_on_db_open == true`. Move any remaining
+    // files at or above L2 down to L3 to ensure obsolete data does not
+    // accidentally meet its tombstone above L3. This makes the final size more
+    // deterministic and easy to see whether size compensation for deletions
+    // took effect.
+    MoveFilesToLevel(3 /* level */);
+    ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[0]));
     Close();
 
     // round 2 --- disable auto-compactions and issue deletions.
@@ -716,32 +702,37 @@ TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
     for (int k = 0; k < kTestSize; ++k) {
       ASSERT_OK(Delete(Key(k)));
     }
-    db_size[1] = Size(Key(0), Key(kTestSize - 1));
+    ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[1]));
     Close();
-    // as auto_compaction is off, we shouldn't see too much reduce
-    // in db size.
-    ASSERT_LT(db_size[0] / 3, db_size[1]);
+    // as auto_compaction is off, we shouldn't see any reduction in db size.
+    ASSERT_LE(db_size[0], db_size[1]);
 
     // round 3 --- reopen db with auto_compaction on and see if
     // deletion compensation still work.
     options.disable_auto_compactions = false;
     Reopen(options);
-    dbfull()->TEST_WaitForFlushMemTable();
-    dbfull()->TEST_WaitForCompact();
-    db_size[2] = Size(Key(0), Key(kTestSize - 1));
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[2]));
 
     if (options.skip_stats_update_on_db_open) {
       // If update stats on DB::Open is disable, we don't expect
       // deletion entries taking effect.
-      ASSERT_LT(db_size[0] / 3, db_size[2]);
+      //
+      // The deletions are small enough to fit in L1 and L2, and obsolete keys
+      // were moved to L3+, so none of the original data should have been
+      // dropped.
+      ASSERT_LE(db_size[0], db_size[2]);
     } else {
       // Otherwise, we should see a significant drop in db size.
-      ASSERT_GT(db_size[0] / 3, db_size[2]);
+      //
+      // See "CompactionDeletionTrigger" test for proof that at most
+      // `db_size[0] / 2` of the original data remains.
+      ASSERT_GT(db_size[0] / 2, db_size[2]);
     }
   }
 }
 
-
 TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
   const int kNumKeysPerFile = 100;
 
@@ -751,7 +742,8 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
   options.num_levels = 3;
   options.level0_file_num_compaction_trigger = 3;
   options.max_subcompactions = max_subcompactions_;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
   CreateAndReopenWithCF({"pikachu"}, options);
 
   Random rnd(301);
@@ -766,7 +758,7 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
     }
     // put extra key to trigger flush
     ASSERT_OK(Put(1, "", ""));
-    dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
     ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
   }
 
@@ -778,7 +770,7 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
   }
   // put extra key to trigger flush
   ASSERT_OK(Put(1, "", ""));
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
   ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
@@ -798,7 +790,8 @@ TEST_F(DBCompactionTest, BGCompactionsAllowed) {
   options.level0_slowdown_writes_trigger = 20;
   options.soft_pending_compaction_bytes_limit = 1 << 30;  // Infinitely large
   options.max_background_compactions = 3;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
 
   // Block all threads in thread pool.
   const size_t kTotalTasks = 4;
@@ -820,7 +813,7 @@ TEST_F(DBCompactionTest, BGCompactionsAllowed) {
       }
       // put extra key to trigger flush
       ASSERT_OK(Put(cf, "", ""));
-      dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+      ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
       ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
     }
   }
@@ -837,7 +830,7 @@ TEST_F(DBCompactionTest, BGCompactionsAllowed) {
     }
     // put extra key to trigger flush
     ASSERT_OK(Put(2, "", ""));
-    dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
     ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
               NumTableFilesAtLevel(0, 2));
   }
@@ -848,7 +841,7 @@ TEST_F(DBCompactionTest, BGCompactionsAllowed) {
     sleeping_tasks[i].WakeUp();
     sleeping_tasks[i].WaitUntilDone();
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   // Verify number of compactions allowed will come back to 1.
 
@@ -865,7 +858,7 @@ TEST_F(DBCompactionTest, BGCompactionsAllowed) {
       }
       // put extra key to trigger flush
       ASSERT_OK(Put(cf, "", ""));
-      dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+      ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
       ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
     }
   }
@@ -882,7 +875,7 @@ TEST_F(DBCompactionTest, BGCompactionsAllowed) {
 
 TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
   Options options = CurrentOptions();
-  options.write_buffer_size = 100000000;        // Large write buffer
+  options.write_buffer_size = 100000000;  // Large write buffer
   options.max_subcompactions = max_subcompactions_;
   CreateAndReopenWithCF({"pikachu"}, options);
 
@@ -898,8 +891,8 @@ TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
 
   // Reopening moves updates to level-0
   ReopenWithColumnFamilies({"default", "pikachu"}, options);
-  dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
-                              true /* disallow trivial move */);
+  ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
+                                        true /* disallow trivial move */));
 
   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
   ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
@@ -943,27 +936,27 @@ TEST_F(DBCompactionTest, UserKeyCrossFile1) {
   DestroyAndReopen(options);
 
   // create first file and flush to l0
-  Put("4", "A");
-  Put("3", "A");
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
-
-  Put("2", "A");
-  Delete("3");
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(Put("4", "A"));
+  ASSERT_OK(Put("3", "A"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+  ASSERT_OK(Put("2", "A"));
+  ASSERT_OK(Delete("3"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   ASSERT_EQ("NOT_FOUND", Get("3"));
 
   // move both files down to l1
-  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
   ASSERT_EQ("NOT_FOUND", Get("3"));
 
   for (int i = 0; i < 3; i++) {
-    Put("2", "B");
-    Flush();
-    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_OK(Put("2", "B"));
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_EQ("NOT_FOUND", Get("3"));
 }
@@ -976,27 +969,27 @@ TEST_F(DBCompactionTest, UserKeyCrossFile2) {
   DestroyAndReopen(options);
 
   // create first file and flush to l0
-  Put("4", "A");
-  Put("3", "A");
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
-
-  Put("2", "A");
-  SingleDelete("3");
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(Put("4", "A"));
+  ASSERT_OK(Put("3", "A"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+  ASSERT_OK(Put("2", "A"));
+  ASSERT_OK(SingleDelete("3"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   ASSERT_EQ("NOT_FOUND", Get("3"));
 
   // move both files down to l1
-  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
   ASSERT_EQ("NOT_FOUND", Get("3"));
 
   for (int i = 0; i < 3; i++) {
-    Put("2", "B");
-    Flush();
-    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_OK(Put("2", "B"));
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_EQ("NOT_FOUND", Get("3"));
 }
@@ -1012,17 +1005,17 @@ TEST_F(DBCompactionTest, CompactionSstPartitioner) {
   DestroyAndReopen(options);
 
   // create first file and flush to l0
-  Put("aaaa1", "A");
-  Put("bbbb1", "B");
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(Put("aaaa1", "A"));
+  ASSERT_OK(Put("bbbb1", "B"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
 
-  Put("aaaa1", "A2");
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(Put("aaaa1", "A2"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
 
   // move both files down to l1
-  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 
   std::vector<LiveFileMetaData> files;
   dbfull()->GetLiveFilesMetaData(&files);
@@ -1042,11 +1035,11 @@ TEST_F(DBCompactionTest, CompactionSstPartitionerNonTrivial) {
   DestroyAndReopen(options);
 
   // create first file and flush to l0
-  Put("aaaa1", "A");
-  Put("bbbb1", "B");
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(Put("aaaa1", "A"));
+  ASSERT_OK(Put("bbbb1", "B"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
   std::vector<LiveFileMetaData> files;
   dbfull()->GetLiveFilesMetaData(&files);
@@ -1068,7 +1061,7 @@ TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
   compact_opt.compression = kNoCompression;
   compact_opt.output_file_size_limit = 4096;
   const size_t key_len =
-    static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
+      static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
 
   DestroyAndReopen(options);
 
@@ -1076,22 +1069,23 @@ TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
 
   // create first file and flush to l0
   for (auto& key : {"1", "2", "3", "3", "3", "3"}) {
-    Put(key, std::string(key_len, 'A'));
+    ASSERT_OK(Put(key, std::string(key_len, 'A')));
     snaps.push_back(dbfull()->GetSnapshot());
   }
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
 
   // create second file and flush to l0
   for (auto& key : {"3", "4", "5", "6", "7", "8"}) {
-    Put(key, std::string(key_len, 'A'));
+    ASSERT_OK(Put(key, std::string(key_len, 'A')));
     snaps.push_back(dbfull()->GetSnapshot());
   }
-  Flush();
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
 
   // move both files down to l1
-  dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1);
+  ASSERT_OK(
+      dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1));
 
   // release snap so that first instance of key(3) can have seqId=0
   for (auto snap : snaps) {
@@ -1100,12 +1094,12 @@ TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
 
   // create 3 files in l0 so to trigger compaction
   for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
-    Put("2", std::string(1, 'A'));
-    Flush();
-    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_OK(Put("2", std::string(1, 'A')));
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   }
 
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_OK(Put("", ""));
 }
 
@@ -1119,16 +1113,20 @@ TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) {
   // create two files in l1 that we can compact
   for (int i = 0; i < 2; ++i) {
     for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) {
-      // make l0 files' ranges overlap to avoid trivial move
-      Put(std::to_string(2 * i), std::string(1, 'A'));
-      Put(std::to_string(2 * i + 1), std::string(1, 'A'));
-      Flush();
-      dbfull()->TEST_WaitForFlushMemTable();
+      ASSERT_OK(Put(std::to_string(2 * i), std::string(1, 'A')));
+      ASSERT_OK(Put(std::to_string(2 * i + 1), std::string(1, 'A')));
+      ASSERT_OK(Flush());
+      ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
     }
-    dbfull()->TEST_WaitForCompact();
-    ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
-    ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1);
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
   }
+  ASSERT_OK(
+      dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"}}));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
+  ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
+  ASSERT_OK(
+      dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "3"}}));
 
   ColumnFamilyMetaData cf_meta;
   dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
@@ -1141,7 +1139,7 @@ TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) {
   // note CompactionOptions::output_file_size_limit is unset.
   CompactionOptions compact_opt;
   compact_opt.compression = kNoCompression;
-  dbfull()->CompactFiles(compact_opt, input_filenames, 1);
+  ASSERT_OK(dbfull()->CompactFiles(compact_opt, input_filenames, 1));
 }
 
 // Check that writes done during a memtable compaction are recovered
@@ -1202,7 +1200,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
   cro.exclusive_manual_compaction = exclusive_manual_compaction_;
 
   // Compaction will initiate a trivial move from L0 to L1
-  dbfull()->CompactRange(cro, nullptr, nullptr);
+  ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
 
   // File moved From L0 to L1
   ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);  // 0 files in L0
@@ -1241,14 +1239,8 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
   DestroyAndReopen(options);
   // non overlapping ranges
   std::vector<std::pair<int32_t, int32_t>> ranges = {
-    {100, 199},
-    {300, 399},
-    {0, 99},
-    {200, 299},
-    {600, 699},
-    {400, 499},
-    {500, 550},
-    {551, 599},
+      {100, 199}, {300, 399}, {0, 99},    {200, 299},
+      {600, 699}, {400, 499}, {500, 550}, {551, 599},
   };
   int32_t value_size = 10 * 1024;  // 10 KB
 
@@ -1271,7 +1263,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
 
   // Since data is non-overlapping we expect compaction to initiate
   // a trivial move
-  db_->CompactRange(cro, nullptr, nullptr);
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
   // We expect that all the files were trivially moved from L0 to L1
   ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
   ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
@@ -1291,14 +1283,15 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
   DestroyAndReopen(options);
   // Same ranges as above but overlapping
   ranges = {
-    {100, 199},
-    {300, 399},
-    {0, 99},
-    {200, 299},
-    {600, 699},
-    {400, 499},
-    {500, 560},  // this range overlap with the next one
-    {551, 599},
+      {100, 199},
+      {300, 399},
+      {0, 99},
+      {200, 299},
+      {600, 699},
+      {400, 499},
+      {500, 560},  // this range overlap with the next
+                   // one
+      {551, 599},
   };
   for (size_t i = 0; i < ranges.size(); i++) {
     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
@@ -1308,7 +1301,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
     ASSERT_OK(Flush());
   }
 
-  db_->CompactRange(cro, nullptr, nullptr);
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
 
   for (size_t i = 0; i < ranges.size(); i++) {
     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
@@ -1380,6 +1373,74 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
   }
 }
 
+TEST_P(DBCompactionTestWithParam, PartialOverlappingL0) {
+  class SubCompactionEventListener : public EventListener {
+   public:
+    void OnSubcompactionCompleted(const SubcompactionJobInfo&) override {
+      sub_compaction_finished_++;
+    }
+    std::atomic<int> sub_compaction_finished_{0};
+  };
+
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  options.write_buffer_size = 10 * 1024 * 1024;
+  options.max_subcompactions = max_subcompactions_;
+  SubCompactionEventListener* listener = new SubCompactionEventListener();
+  options.listeners.emplace_back(listener);
+
+  DestroyAndReopen(options);
+
+  // For subcompactino to trigger, output level needs to be non-empty.
+  ASSERT_OK(Put("key", ""));
+  ASSERT_OK(Put("kez", ""));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put("key", ""));
+  ASSERT_OK(Put("kez", ""));
+  ASSERT_OK(Flush());
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+  // Ranges that are only briefly overlapping so that they won't be trivially
+  // moved but subcompaction ranges would only contain a subset of files.
+  std::vector<std::pair<int32_t, int32_t>> ranges = {
+      {100, 199}, {198, 399}, {397, 600}, {598, 800}, {799, 900}, {895, 999},
+  };
+  int32_t value_size = 10 * 1024;  // 10 KB
+
+  Random rnd(301);
+  std::map<int32_t, std::string> values;
+  for (size_t i = 0; i < ranges.size(); i++) {
+    for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
+      values[j] = rnd.RandomString(value_size);
+      ASSERT_OK(Put(Key(j), values[j]));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  int32_t level0_files = NumTableFilesAtLevel(0, 0);
+  ASSERT_EQ(level0_files, ranges.size());    // Multiple files in L0
+  ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1);  // One file in L1
+
+  listener->sub_compaction_finished_ = 0;
+  ASSERT_OK(db_->EnableAutoCompaction({db_->DefaultColumnFamily()}));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  if (max_subcompactions_ > 3) {
+    // RocksDB might not generate the exact number of sub compactions.
+    // Here we validate that at least subcompaction happened.
+    ASSERT_GT(listener->sub_compaction_finished_.load(), 2);
+  }
+
+  // We expect that all the files were compacted to L1
+  ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
+  ASSERT_GT(NumTableFilesAtLevel(1, 0), 1);
+
+  for (size_t i = 0; i < ranges.size(); i++) {
+    for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
+      ASSERT_EQ(Get(Key(j)), values[j]);
+    }
+  }
+}
+
 TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
   int32_t trivial_move = 0;
   int32_t non_trivial_move = 0;
@@ -1512,9 +1573,9 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
   ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
   TEST_SYNC_POINT("DBCompaction::ManualPartial:5");
 
-  dbfull()->TEST_WaitForFlushMemTable();
-  dbfull()->TEST_WaitForCompact();
-  // After two non-trivial compactions are installed, there is 1 file in L6, and
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  // After two non-trivial compactions are installed, there is 1 file in L6, and
   // 1 file in L1
   ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
   threads.join();
@@ -1626,7 +1687,7 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
     for (int32_t j = 300; j < 4300; j++) {
       if (j == 2300) {
         ASSERT_OK(Flush());
-        dbfull()->TEST_WaitForFlushMemTable();
+        ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
       }
       values[j] = rnd.RandomString(value_size);
       ASSERT_OK(Put(Key(j), values[j]));
@@ -1642,8 +1703,8 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
   }
 
   TEST_SYNC_POINT("DBCompaction::PartialFill:2");
-  dbfull()->TEST_WaitForFlushMemTable();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   threads.join();
 
   for (int32_t i = 0; i < 4300; i++) {
@@ -1661,12 +1722,12 @@ TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) {
   Options options = CurrentOptions();
   options.unordered_write = true;
   DestroyAndReopen(options);
-  Put("foo", "v1");
+  ASSERT_OK(Put("foo", "v1"));
   ASSERT_OK(Flush());
 
-  Put("bar", "v1");
+  ASSERT_OK(Put("bar", "v1"));
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-  port::Thread writer([&]() { Put("foo", "v2"); });
+  port::Thread writer([&]() { ASSERT_OK(Put("foo", "v2")); });
 
   TEST_SYNC_POINT(
       "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL");
@@ -1732,15 +1793,15 @@ TEST_F(DBCompactionTest, DeleteFileRange) {
     for (int32_t j = 300; j < 4300; j++) {
       if (j == 2300) {
         ASSERT_OK(Flush());
-        dbfull()->TEST_WaitForFlushMemTable();
+        ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
       }
       values[j] = rnd.RandomString(value_size);
       ASSERT_OK(Put(Key(j), values[j]));
     }
   }
   ASSERT_OK(Flush());
-  dbfull()->TEST_WaitForFlushMemTable();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   // Verify level sizes
   uint64_t target_size = 4 * options.max_bytes_for_level_base;
@@ -1750,7 +1811,7 @@ TEST_F(DBCompactionTest, DeleteFileRange) {
                                         options.max_bytes_for_level_multiplier);
   }
 
-  size_t old_num_files = CountFiles();
+  const size_t old_num_files = CountFiles();
   std::string begin_string = Key(1000);
   std::string end_string = Key(2000);
   Slice begin(begin_string);
@@ -1785,7 +1846,7 @@ TEST_F(DBCompactionTest, DeleteFileRange) {
   compact_options.change_level = true;
   compact_options.target_level = 1;
   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_OK(
       DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
@@ -1794,12 +1855,11 @@ TEST_F(DBCompactionTest, DeleteFileRange) {
   for (int32_t i = 0; i < 4300; i++) {
     ReadOptions roptions;
     std::string result;
-    Status s = db_->Get(roptions, Key(i), &result);
-    ASSERT_TRUE(s.IsNotFound());
+    ASSERT_TRUE(db_->Get(roptions, Key(i), &result).IsNotFound());
     deleted_count2++;
   }
   ASSERT_GT(deleted_count2, deleted_count);
-  size_t new_num_files = CountFiles();
+  const size_t new_num_files = CountFiles();
   ASSERT_GT(old_num_files, new_num_files);
 }
 
@@ -1834,7 +1894,7 @@ TEST_F(DBCompactionTest, DeleteFilesInRanges) {
   ASSERT_EQ("0,0,10", FilesPerLevel(0));
 
   // file [0 => 100), [200 => 300), ... [800, 900)
-  for (auto i = 0; i < 10; i+=2) {
+  for (auto i = 0; i < 10; i += 2) {
     for (auto j = 0; j < 100; j++) {
       auto k = i * 100 + j;
       ASSERT_OK(Put(Key(k), values[k]));
@@ -1954,14 +2014,14 @@ TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) {
   std::string vals[kNumL0Files];
   for (int i = 0; i < kNumL0Files; ++i) {
     vals[i] = rnd.RandomString(kValSize);
-    Put(Key(i), vals[i]);
-    Put(Key(i + 1), vals[i]);
-    Flush();
+    ASSERT_OK(Put(Key(i), vals[i]));
+    ASSERT_OK(Put(Key(i + 1), vals[i]));
+    ASSERT_OK(Flush());
     if (i == 0) {
       snapshot = db_->GetSnapshot();
     }
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   // Verify `DeleteFilesInRange` can't drop only file 0 which would cause
   // "1 -> vals[0]" to reappear.
@@ -2040,7 +2100,7 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) {
   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
   options.memtable_factory.reset(
-      new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+      test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
   options.compaction_style = kCompactionStyleLevel;
   options.write_buffer_size = 110 << 10;  // 110KB
   options.arena_block_size = 4 << 10;
@@ -2048,16 +2108,8 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) {
   options.num_levels = 4;
   options.max_bytes_for_level_base = 400 * 1024;
   options.max_subcompactions = max_subcompactions_;
-  //  options = CurrentOptions(options);
 
-  std::vector<std::string> filenames;
-  env_->GetChildren(options.db_paths[1].path, &filenames);
-  // Delete archival files.
-  for (size_t i = 0; i < filenames.size(); ++i) {
-    env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
-  }
-  env_->DeleteDir(options.db_paths[1].path);
-  Reopen(options);
+  DestroyAndReopen(options);
 
   Random rnd(301);
   int key_idx = 0;
@@ -2157,7 +2209,7 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
   options.memtable_factory.reset(
-      new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+      test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
   options.compaction_style = kCompactionStyleLevel;
   options.write_buffer_size = 110 << 10;  // 110KB
   options.arena_block_size = 4 << 10;
@@ -2165,16 +2217,8 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
   options.num_levels = 4;
   options.max_bytes_for_level_base = 400 * 1024;
   options.max_subcompactions = max_subcompactions_;
-  //  options = CurrentOptions(options);
 
-  std::vector<std::string> filenames;
-  env_->GetChildren(options.db_paths[1].path, &filenames);
-  // Delete archival files.
-  for (size_t i = 0; i < filenames.size(); ++i) {
-    env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
-  }
-  env_->DeleteDir(options.db_paths[1].path);
-  Reopen(options);
+  DestroyAndReopen(options);
 
   Random rnd(301);
   int key_idx = 0;
@@ -2275,7 +2319,7 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
   options.memtable_factory.reset(
-    new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+      test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
   options.compaction_style = kCompactionStyleLevel;
   options.write_buffer_size = 110 << 10;  // 110KB
   options.arena_block_size = 4 << 10;
@@ -2292,14 +2336,14 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
   option_vector.emplace_back(DBOptions(options), cf_opt1);
-  CreateColumnFamilies({"one"},option_vector[1]);
+  CreateColumnFamilies({"one"}, option_vector[1]);
 
-  // Configura CF2 specific paths.
+  // Configure CF2 specific paths.
   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
   option_vector.emplace_back(DBOptions(options), cf_opt2);
-  CreateColumnFamilies({"two"},option_vector[2]);
+  CreateColumnFamilies({"two"}, option_vector[2]);
 
   ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
 
@@ -2349,13 +2393,16 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
   // Check that default column family uses db_paths.
   // And Column family "one" uses cf_paths.
 
-  // First three 110KB files are not going to second path.
-  // After that, (100K, 200K)
+  // The compaction in level0 outputs the sst files in level1.
+  // The first path cannot hold level1's data(400KB+400KB > 500KB),
+  // so every compaction move a sst file to second path. Please
+  // refer to LevelCompactionBuilder::GetPathId.
   for (int num = 0; num < 3; num++) {
     generate_file();
   }
+  check_sstfilecount(0, 1);
+  check_sstfilecount(1, 2);
 
-  // Another 110KB triggers a compaction to 400K file to fill up first path
   generate_file();
   check_sstfilecount(1, 3);
 
@@ -2381,6 +2428,30 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
 
   check_getvalues();
 
+  {  // Also verify GetLiveFilesStorageInfo with db_paths / cf_paths
+    std::vector<LiveFileStorageInfo> new_infos;
+    LiveFilesStorageInfoOptions lfsio;
+    lfsio.wal_size_for_flush = UINT64_MAX;  // no flush
+    ASSERT_OK(db_->GetLiveFilesStorageInfo(lfsio, &new_infos));
+    std::unordered_map<std::string, int> live_sst_by_dir;
+    for (auto& info : new_infos) {
+      if (info.file_type == kTableFile) {
+        live_sst_by_dir[info.directory]++;
+        // Verify file on disk (no directory confusion)
+        uint64_t size;
+        ASSERT_OK(env_->GetFileSize(
+            info.directory + "/" + info.relative_filename, &size));
+        ASSERT_EQ(info.size, size);
+      }
+    }
+    ASSERT_EQ(3U * 3U, live_sst_by_dir.size());
+    for (auto& paths : {options.db_paths, cf_opt1.cf_paths, cf_opt2.cf_paths}) {
+      ASSERT_EQ(1, live_sst_by_dir[paths[0].path]);
+      ASSERT_EQ(4, live_sst_by_dir[paths[1].path]);
+      ASSERT_EQ(2, live_sst_by_dir[paths[2].path]);
+    }
+  }
+
   ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
 
   check_getvalues();
@@ -2411,7 +2482,7 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
     ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
   }
   ASSERT_OK(Flush(1));
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_GT(TotalTableFiles(1, 4), 1);
   int non_level0_num_files = 0;
@@ -2447,7 +2518,8 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
   compact_options.bottommost_level_compaction =
       BottommostLevelCompaction::kForce;
   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
-  dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
+  ASSERT_OK(
+      dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr));
 
   // Only 1 file in L0
   ASSERT_EQ("1", FilesPerLevel(1));
@@ -2468,9 +2540,9 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
   for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
     ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
   }
-  dbfull()->Flush(FlushOptions());
+  ASSERT_OK(dbfull()->Flush(FlushOptions()));
   ASSERT_OK(Flush(1));
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   for (int i = 1; i < options.num_levels; i++) {
     ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
@@ -2480,6 +2552,7 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
   // compaction style
   std::string keys_in_db;
   Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
+  ASSERT_OK(iter->status());
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     keys_in_db.append(iter->key().ToString());
     keys_in_db.push_back(',');
@@ -2517,24 +2590,24 @@ TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_a) {
 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
   do {
     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
-    Put(1, "", "");
+    ASSERT_OK(Put(1, "", ""));
     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
-    Delete(1, "e");
-    Put(1, "", "");
+    ASSERT_OK(Delete(1, "e"));
+    ASSERT_OK(Put(1, "", ""));
     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
-    Put(1, "c", "cv");
+    ASSERT_OK(Put(1, "c", "cv"));
     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
-    Put(1, "", "");
+    ASSERT_OK(Put(1, "", ""));
     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
-    Put(1, "", "");
+    ASSERT_OK(Put(1, "", ""));
     env_->SleepForMicroseconds(1000000);  // Wait for compaction to finish
     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
-    Put(1, "d", "dv");
+    ASSERT_OK(Put(1, "d", "dv"));
     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
-    Put(1, "", "");
+    ASSERT_OK(Put(1, "", ""));
     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
-    Delete(1, "d");
-    Delete(1, "b");
+    ASSERT_OK(Delete(1, "d"));
+    ASSERT_OK(Delete(1, "b"));
     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
     ASSERT_EQ("(->)(c->cv)", Contents(1));
     env_->SleepForMicroseconds(1000000);  // Wait for compaction to finish
@@ -2551,34 +2624,36 @@ TEST_F(DBCompactionTest, ManualAutoRace) {
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  Put(1, "foo", "");
-  Put(1, "bar", "");
-  Flush(1);
-  Put(1, "foo", "");
-  Put(1, "bar", "");
+  ASSERT_OK(Put(1, "foo", ""));
+  ASSERT_OK(Put(1, "bar", ""));
+  ASSERT_OK(Flush(1));
+  ASSERT_OK(Put(1, "foo", ""));
+  ASSERT_OK(Put(1, "bar", ""));
   // Generate four files in CF 0, which should trigger an auto compaction
-  Put("foo", "");
-  Put("bar", "");
-  Flush();
-  Put("foo", "");
-  Put("bar", "");
-  Flush();
-  Put("foo", "");
-  Put("bar", "");
-  Flush();
-  Put("foo", "");
-  Put("bar", "");
-  Flush();
+  ASSERT_OK(Put("foo", ""));
+  ASSERT_OK(Put("bar", ""));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put("foo", ""));
+  ASSERT_OK(Put("bar", ""));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put("foo", ""));
+  ASSERT_OK(Put("bar", ""));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put("foo", ""));
+  ASSERT_OK(Put("bar", ""));
+  ASSERT_OK(Flush());
 
   // The auto compaction is scheduled but waited until here
   TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1");
   // The auto compaction will wait until the manual compaction is registerd
   // before processing so that it will be cancelled.
-  dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
+  CompactRangeOptions cro;
+  cro.exclusive_manual_compaction = true;
+  ASSERT_OK(dbfull()->CompactRange(cro, handles_[1], nullptr, nullptr));
   ASSERT_EQ("0,1", FilesPerLevel(1));
 
   // Eventually the cancelled compaction will be rescheduled and executed.
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ("0,1", FilesPerLevel(0));
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
@@ -2623,7 +2698,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) {
         options.statistics->getTickerCount(BLOCK_CACHE_ADD);
     CompactRangeOptions cro;
     cro.exclusive_manual_compaction = exclusive_manual_compaction_;
-    db_->CompactRange(cro, handles_[1], nullptr, nullptr);
+    ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));
     // Verify manual compaction doesn't fill block cache
     ASSERT_EQ(prev_block_cache_add,
               options.statistics->getTickerCount(BLOCK_CACHE_ADD));
@@ -2641,7 +2716,6 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) {
   }
 }
 
-
 TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
   Options options = CurrentOptions();
   options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
@@ -2704,7 +2778,8 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
     CompactRangeOptions compact_options;
     compact_options.target_path_id = 1;
     compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
-    db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
+    ASSERT_OK(
+        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
     ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
     ASSERT_EQ("0,1", FilesPerLevel(1));
@@ -2761,10 +2836,10 @@ TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
 
   Random rnd(301);
   for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
-    ASSERT_OK(Put(1, ToString(key), rnd.RandomString(kTestValueSize)));
+    ASSERT_OK(Put(1, std::to_string(key), rnd.RandomString(kTestValueSize)));
   }
-  dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ColumnFamilyMetaData cf_meta;
   dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
@@ -2777,14 +2852,13 @@ TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
       auto file_meta = PickFileRandomly(cf_meta, &rnd, &level);
       compaction_input_file_names.push_back(file_meta->name);
       GetOverlappingFileNumbersForLevelCompaction(
-          cf_meta, options.comparator, level, output_level,
-          file_meta, &overlapping_file_names);
+          cf_meta, options.comparator, level, output_level, file_meta,
+          &overlapping_file_names);
     }
 
-    ASSERT_OK(dbfull()->CompactFiles(
-        CompactionOptions(), handles_[1],
-        compaction_input_file_names,
-        output_level));
+    ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), handles_[1],
+                                     compaction_input_file_names,
+                                     output_level));
 
     // Make sure all overlapping files do not exist after compaction
     dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
@@ -2793,7 +2867,7 @@ TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
 
   // make sure all key-values are still there.
   for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
-    ASSERT_NE(Get(1, ToString(key)), "NOT_FOUND");
+    ASSERT_NE(Get(1, std::to_string(key)), "NOT_FOUND");
   }
 }
 
@@ -2807,8 +2881,7 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
   options.write_buffer_size = kKeysPerBuffer * kKvSize;
   options.max_write_buffer_number = 2;
   options.target_file_size_base =
-      options.write_buffer_size *
-      (options.max_write_buffer_number - 1);
+      options.write_buffer_size * (options.max_write_buffer_number - 1);
   options.level0_file_num_compaction_trigger = kNumL1Files;
   options.max_bytes_for_level_base =
       options.level0_file_num_compaction_trigger *
@@ -2828,10 +2901,9 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
 
   DestroyAndReopen(options);
 
-  const int kNumInsertedKeys =
-      options.level0_file_num_compaction_trigger *
-      (options.max_write_buffer_number - 1) *
-      kKeysPerBuffer;
+  const int kNumInsertedKeys = options.level0_file_num_compaction_trigger *
+                               (options.max_write_buffer_number - 1) *
+                               kKeysPerBuffer;
 
   Random rnd(301);
   std::vector<std::string> keys;
@@ -2840,10 +2912,10 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
     keys.emplace_back(rnd.RandomString(kKeySize));
     values.emplace_back(rnd.RandomString(kKvSize - kKeySize));
     ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
-    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   }
 
-  dbfull()->TEST_FlushMemTable(true);
+  ASSERT_OK(dbfull()->TEST_FlushMemTable(true));
   // Make sure the number of L0 files can trigger compaction.
   ASSERT_GE(NumTableFilesAtLevel(0),
             options.level0_file_num_compaction_trigger);
@@ -2909,7 +2981,7 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
       ASSERT_OK(Flush());
     }
     // this should execute L0->L1
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
     ASSERT_EQ("0,1", FilesPerLevel(0));
 
     // block compactions
@@ -2926,7 +2998,7 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
     sleeping_task.WaitUntilDone();
 
     // this should execute L1->L2 (move)
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
     ASSERT_EQ("0,0,1", FilesPerLevel(0));
 
@@ -2944,7 +3016,7 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
       ASSERT_OK(Flush());
     }
     // this should execute both L0->L1 and L1->L2 (merge with previous file)
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
     ASSERT_EQ("0,0,2", FilesPerLevel(0));
 
@@ -2952,6 +3024,7 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
     ASSERT_OK(env_->FileExists(dbname_ + moved_file_name));
 
     listener->SetExpectedFileName(dbname_ + moved_file_name);
+    ASSERT_OK(iterator->status());
     iterator.reset();
 
     // this file should have been compacted away
@@ -2966,7 +3039,7 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
   }
   Options options = CurrentOptions();
   options.memtable_factory.reset(
-      new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+      test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
   options.compaction_style = kCompactionStyleLevel;
   options.write_buffer_size = 110 << 10;  // 110KB
   options.arena_block_size = 4 << 10;
@@ -3114,7 +3187,7 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
   for (int num = 0; num < 10; num++) {
     GenerateNewRandomFile(&rnd);
   }
-  db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
       {{"CompactionJob::Run():Start",
@@ -3135,7 +3208,7 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
       "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1");
 
   GenerateNewRandomFile(&rnd, /* nowait */ true);
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
   for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
        num++) {
@@ -3145,7 +3218,7 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
 
   TEST_SYNC_POINT(
       "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 }
 
 static std::string ShortKey(int i) {
@@ -3309,7 +3382,7 @@ TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
     }
     ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 
   std::vector<std::vector<FileMetaData>> level_to_files;
@@ -3387,7 +3460,7 @@ TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) {
     ASSERT_OK(Put(Key(i + 1), value));
     ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 
   std::vector<std::vector<FileMetaData>> level_to_files;
@@ -3435,7 +3508,7 @@ TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
       int key_idx = 0;
       GenerateNewFile(&rnd, &key_idx);
     }
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
     ASSERT_EQ(1, num_bottom_pri_compactions);
 
@@ -3459,8 +3532,8 @@ TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
   // So key 0, 2, and 4+ fall outside these levels' key-ranges.
   for (int level = 2; level >= 1; --level) {
     for (int i = 0; i < 2; ++i) {
-      Put(Key(2 * i + 1), "val");
-      Flush();
+      ASSERT_OK(Put(Key(2 * i + 1), "val"));
+      ASSERT_OK(Flush());
     }
     MoveFilesToLevel(level);
     ASSERT_EQ(2, NumTableFilesAtLevel(level));
@@ -3470,11 +3543,11 @@ TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
   // - Tombstones for keys 2 and 4 can be dropped early.
   // - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges.
   for (int i = 0; i < kNumL0Files; ++i) {
-    Put(Key(0), "val");  // sentinel to prevent trivial move
-    Delete(Key(i + 1));
-    Flush();
+    ASSERT_OK(Put(Key(0), "val"));  // sentinel to prevent trivial move
+    ASSERT_OK(Delete(Key(i + 1)));
+    ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   for (int i = 0; i < kNumL0Files; ++i) {
     std::string value;
@@ -3528,9 +3601,8 @@ TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
   ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
   std::vector<std::string> input_filenames;
   input_filenames.push_back(cf_meta.levels[0].files.front().name);
-  ASSERT_OK(dbfull()
-                  ->CompactFiles(CompactionOptions(), input_filenames,
-                                 0 /* output_level */));
+  ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
+                                   0 /* output_level */));
   TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
@@ -3538,10 +3610,10 @@ TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
 TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
   // Regression test for bug of not pulling in L0 files that overlap the user-
   // specified input files in time- and key-ranges.
-  Put(Key(0), "old_val");
-  Flush();
-  Put(Key(0), "new_val");
-  Flush();
+  ASSERT_OK(Put(Key(0), "old_val"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put(Key(0), "new_val"));
+  ASSERT_OK(Flush());
 
   ColumnFamilyMetaData cf_meta;
   dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
@@ -3557,6 +3629,41 @@ TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
   ASSERT_EQ("new_val", Get(Key(0)));
 }
 
+TEST_F(DBCompactionTest, DeleteFilesInRangeConflictWithCompaction) {
+  Options options = CurrentOptions();
+  DestroyAndReopen(options);
+  const Snapshot* snapshot = nullptr;
+  const int kMaxKey = 10;
+
+  for (int i = 0; i < kMaxKey; i++) {
+    ASSERT_OK(Put(Key(i), Key(i)));
+    ASSERT_OK(Delete(Key(i)));
+    if (!snapshot) {
+      snapshot = db_->GetSnapshot();
+    }
+  }
+  ASSERT_OK(Flush());
+  MoveFilesToLevel(1);
+  ASSERT_OK(Put(Key(kMaxKey), Key(kMaxKey)));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  // test DeleteFilesInRange() deletes the files already picked for compaction
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"VersionSet::LogAndApply:WriteManifestStart",
+        "BackgroundCallCompaction:0"},
+       {"DBImpl::BackgroundCompaction:Finish",
+        "VersionSet::LogAndApply:WriteManifestDone"}});
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  // release snapshot which mark bottommost file for compaction
+  db_->ReleaseSnapshot(snapshot);
+  std::string begin_string = Key(0);
+  std::string end_string = Key(kMaxKey + 1);
+  Slice begin(begin_string);
+  Slice end(end_string);
+  ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
+  SyncPoint::GetInstance()->DisableProcessing();
+}
+
 TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
   // bottom-level files may contain deletions due to snapshots protecting the
   // deleted keys. Once the snapshot is released, we should see files with many
@@ -3587,12 +3694,12 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
         ASSERT_OK(Delete(Key(j)));
       }
     }
-    Flush();
+    ASSERT_OK(Flush());
     if (i < kNumLevelFiles - 1) {
       ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
     }
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
 
   std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
@@ -3613,7 +3720,7 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
                     CompactionReason::kBottommostFiles);
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   db_->GetLiveFilesMetaData(&post_release_metadata);
   ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
 
@@ -3662,12 +3769,12 @@ TEST_F(DBCompactionTest, NoCompactBottomLevelFilesWithDeletions) {
         ASSERT_OK(Delete(Key(j)));
       }
     }
-    Flush();
+    ASSERT_OK(Flush());
     if (i < kNumLevelFiles - 1) {
       ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
     }
   }
-  dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr);
+  ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr));
   ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
 
   std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
@@ -3683,7 +3790,7 @@ TEST_F(DBCompactionTest, NoCompactBottomLevelFilesWithDeletions) {
       [&](void* /*arg*/) { num_compactions.fetch_add(1); });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
   db_->ReleaseSnapshot(snapshot);
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ(0, num_compactions);
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 
@@ -3699,6 +3806,242 @@ TEST_F(DBCompactionTest, NoCompactBottomLevelFilesWithDeletions) {
   }
 }
 
+TEST_F(DBCompactionTest, RoundRobinTtlCompactionNormal) {
+  Options options = CurrentOptions();
+  options.compression = kNoCompression;
+  options.level0_file_num_compaction_trigger = 20;
+  options.ttl = 24 * 60 * 60;  // 24 hours
+  options.compaction_pri = kRoundRobin;
+  env_->now_cpu_count_.store(0);
+  env_->SetMockSleep();
+  options.env = env_;
+
+  // add a small second for each wait time, to make sure the file is expired
+  int small_seconds = 1;
+
+  std::atomic_int ttl_compactions{0};
+  std::atomic_int round_robin_ttl_compactions{0};
+  std::atomic_int other_compactions{0};
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        auto compaction_reason = compaction->compaction_reason();
+        if (compaction_reason == CompactionReason::kTtl) {
+          ttl_compactions++;
+        } else if (compaction_reason == CompactionReason::kRoundRobinTtl) {
+          round_robin_ttl_compactions++;
+        } else {
+          other_compactions++;
+        }
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  DestroyAndReopen(options);
+
+  // Setup the files from lower level to up level, each file is 1 hour's older
+  // than the next one.
+  // create 10 files on the last level (L6)
+  for (int i = 0; i < 10; i++) {
+    for (int j = 0; j < 100; j++) {
+      ASSERT_OK(Put(Key(i * 100 + j), "value" + std::to_string(i * 100 + j)));
+    }
+    ASSERT_OK(Flush());
+    env_->MockSleepForSeconds(60 * 60);  // generate 1 file per hour
+  }
+  MoveFilesToLevel(6);
+
+  // create 5 files on L5
+  for (int i = 0; i < 5; i++) {
+    for (int j = 0; j < 200; j++) {
+      ASSERT_OK(Put(Key(i * 200 + j), "value" + std::to_string(i * 200 + j)));
+    }
+    ASSERT_OK(Flush());
+    env_->MockSleepForSeconds(60 * 60);
+  }
+  MoveFilesToLevel(5);
+
+  // create 3 files on L4
+  for (int i = 0; i < 3; i++) {
+    for (int j = 0; j < 300; j++) {
+      ASSERT_OK(Put(Key(i * 300 + j), "value" + std::to_string(i * 300 + j)));
+    }
+    ASSERT_OK(Flush());
+    env_->MockSleepForSeconds(60 * 60);
+  }
+  MoveFilesToLevel(4);
+
+  // The LSM tree should be like:
+  // L4: [0,            299], [300,      599], [600,     899]
+  // L5: [0,  199]      [200,         399]...............[800, 999]
+  // L6: [0,99][100,199][200,299][300,399]...............[800,899][900,999]
+  ASSERT_EQ("0,0,0,0,3,5,10", FilesPerLevel());
+
+  // make sure the first L5 file is expired
+  env_->MockSleepForSeconds(16 * 60 * 60 + small_seconds++);
+
+  // trigger TTL compaction
+  ASSERT_OK(Put(Key(4), "value" + std::to_string(1)));
+  ASSERT_OK(Put(Key(5), "value" + std::to_string(1)));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  // verify there's a RoundRobin TTL compaction
+  ASSERT_EQ(1, round_robin_ttl_compactions);
+  round_robin_ttl_compactions = 0;
+
+  // expire 2 more files
+  env_->MockSleepForSeconds(2 * 60 * 60 + small_seconds++);
+  // trigger TTL compaction
+  ASSERT_OK(Put(Key(4), "value" + std::to_string(2)));
+  ASSERT_OK(Put(Key(5), "value" + std::to_string(2)));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  ASSERT_EQ(2, round_robin_ttl_compactions);
+  round_robin_ttl_compactions = 0;
+
+  // expire 4 more files, 2 out of 3 files on L4 are expired
+  env_->MockSleepForSeconds(4 * 60 * 60 + small_seconds++);
+  // trigger TTL compaction
+  ASSERT_OK(Put(Key(6), "value" + std::to_string(3)));
+  ASSERT_OK(Put(Key(7), "value" + std::to_string(3)));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  ASSERT_EQ(1, NumTableFilesAtLevel(4));
+  ASSERT_EQ(0, NumTableFilesAtLevel(5));
+
+  ASSERT_GT(round_robin_ttl_compactions, 0);
+  round_robin_ttl_compactions = 0;
+
+  // make the first L0 file expired, which triggers a normal TTL compaction
+  // instead of roundrobin TTL compaction, it will also include an extra file
+  // from L0 because of overlap
+  ASSERT_EQ(0, ttl_compactions);
+  env_->MockSleepForSeconds(19 * 60 * 60 + small_seconds++);
+
+  // trigger TTL compaction
+  ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
+  ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  // L0 -> L1 compaction is normal TTL compaction, L1 -> next levels compactions
+  // are RoundRobin TTL compaction.
+  ASSERT_GT(ttl_compactions, 0);
+  ttl_compactions = 0;
+  ASSERT_GT(round_robin_ttl_compactions, 0);
+  round_robin_ttl_compactions = 0;
+
+  // All files are expired, so only the last level has data
+  env_->MockSleepForSeconds(24 * 60 * 60);
+  // trigger TTL compaction
+  ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
+  ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
+
+  ASSERT_GT(ttl_compactions, 0);
+  ttl_compactions = 0;
+  ASSERT_GT(round_robin_ttl_compactions, 0);
+  round_robin_ttl_compactions = 0;
+
+  ASSERT_EQ(0, other_compactions);
+}
+
+TEST_F(DBCompactionTest, RoundRobinTtlCompactionUnsortedTime) {
+  // This is to test the case that the RoundRobin compaction cursor not pointing
+  // to the oldest file, RoundRobin compaction should still compact the file
+  // after cursor until all expired files are compacted.
+  Options options = CurrentOptions();
+  options.compression = kNoCompression;
+  options.level0_file_num_compaction_trigger = 20;
+  options.ttl = 24 * 60 * 60;  // 24 hours
+  options.compaction_pri = kRoundRobin;
+  env_->now_cpu_count_.store(0);
+  env_->SetMockSleep();
+  options.env = env_;
+
+  std::atomic_int ttl_compactions{0};
+  std::atomic_int round_robin_ttl_compactions{0};
+  std::atomic_int other_compactions{0};
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        auto compaction_reason = compaction->compaction_reason();
+        if (compaction_reason == CompactionReason::kTtl) {
+          ttl_compactions++;
+        } else if (compaction_reason == CompactionReason::kRoundRobinTtl) {
+          round_robin_ttl_compactions++;
+        } else {
+          other_compactions++;
+        }
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  DestroyAndReopen(options);
+
+  // create 10 files on the last level (L6)
+  for (int i = 0; i < 10; i++) {
+    for (int j = 0; j < 100; j++) {
+      ASSERT_OK(Put(Key(i * 100 + j), "value" + std::to_string(i * 100 + j)));
+    }
+    ASSERT_OK(Flush());
+    env_->MockSleepForSeconds(60 * 60);  // generate 1 file per hour
+  }
+  MoveFilesToLevel(6);
+
+  // create 5 files on L5
+  for (int i = 0; i < 5; i++) {
+    for (int j = 0; j < 200; j++) {
+      ASSERT_OK(Put(Key(i * 200 + j), "value" + std::to_string(i * 200 + j)));
+    }
+    ASSERT_OK(Flush());
+    env_->MockSleepForSeconds(60 * 60);  // 1 hour
+  }
+  MoveFilesToLevel(5);
+
+  // The LSM tree should be like:
+  // L5: [0,  199]      [200,         399] [400,599] [600,799] [800, 999]
+  // L6: [0,99][100,199][200,299][300,399]....................[800,899][900,999]
+  ASSERT_EQ("0,0,0,0,0,5,10", FilesPerLevel());
+
+  // point the compaction cursor to the 4th file on L5
+  VersionSet* const versions = dbfull()->GetVersionSet();
+  assert(versions);
+  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+  ASSERT_NE(cfd, nullptr);
+  Version* const current = cfd->current();
+  ASSERT_NE(current, nullptr);
+  VersionStorageInfo* storage_info = current->storage_info();
+  ASSERT_NE(storage_info, nullptr);
+  const InternalKey split_cursor = InternalKey(Key(600), 100000, kTypeValue);
+  storage_info->AddCursorForOneLevel(5, split_cursor);
+
+  // make the first file on L5 expired, there should be 3 TTL compactions:
+  // 4th one, 5th one, then 1st one.
+  env_->MockSleepForSeconds(19 * 60 * 60 + 1);
+  // trigger TTL compaction
+  ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
+  ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_EQ(2, NumTableFilesAtLevel(5));
+
+  ASSERT_EQ(3, round_robin_ttl_compactions);
+  ASSERT_EQ(0, ttl_compactions);
+  ASSERT_EQ(0, other_compactions);
+}
+
 TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
   const int kNumKeysPerFile = 32;
   const int kNumLevelFiles = 2;
@@ -3721,9 +4064,9 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
       ASSERT_OK(
           Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
     }
-    Flush();
+    ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   MoveFilesToLevel(3);
   ASSERT_EQ("0,0,0,2", FilesPerLevel());
 
@@ -3732,9 +4075,9 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
     for (int j = 0; j < kNumKeysPerFile; ++j) {
       ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
     }
-    Flush();
+    ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ("2,0,0,2", FilesPerLevel());
   MoveFilesToLevel(1);
   ASSERT_EQ("0,2,0,2", FilesPerLevel());
@@ -3745,14 +4088,14 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
   // Just do a simple write + flush so that the Ttl expired files get
   // compacted.
   ASSERT_OK(Put("a", "1"));
-  Flush();
+  ASSERT_OK(Flush());
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
         ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   // All non-L0 files are deleted, as they contained only deleted data.
   ASSERT_EQ("1", FilesPerLevel());
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
@@ -3768,9 +4111,9 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
       ASSERT_OK(
           Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
     }
-    Flush();
+    ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   MoveFilesToLevel(3);
   ASSERT_EQ("0,0,0,2", FilesPerLevel());
 
@@ -3779,9 +4122,9 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
     for (int j = 0; j < kNumKeysPerFile; ++j) {
       ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
     }
-    Flush();
+    ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ("2,0,0,2", FilesPerLevel());
   MoveFilesToLevel(1);
   ASSERT_EQ("0,2,0,2", FilesPerLevel());
@@ -3790,8 +4133,8 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
   // trigger as ttl is set to 24 hours.
   env_->MockSleepForSeconds(12 * 60 * 60);
   ASSERT_OK(Put("a", "1"));
-  Flush();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ("1,2,0,2", FilesPerLevel());
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
@@ -3804,7 +4147,7 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
   // Dynamically change ttl to 10 hours.
   // This should trigger a ttl compaction, as 12 hours have already passed.
   ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   // All non-L0 files are deleted, as they contained only deleted data.
   ASSERT_EQ("1", FilesPerLevel());
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
@@ -3864,7 +4207,7 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
       for (int i = 1; i <= 100; ++i) {
         ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
       }
-      Flush();
+      ASSERT_OK(Flush());
       // Get the first file's creation time. This will be the oldest file in the
       // DB. Compactions inolving this file's descendents should keep getting
       // this time.
@@ -3877,7 +4220,7 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
       for (int i = 101; i <= 200; ++i) {
         ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
       }
-      Flush();
+      ASSERT_OK(Flush());
       MoveFilesToLevel(6);
       ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
 
@@ -3886,12 +4229,12 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
       for (int i = 1; i <= 50; ++i) {
         ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
       }
-      Flush();
+      ASSERT_OK(Flush());
       env_->MockSleepForSeconds(1 * 60 * 60);
       for (int i = 51; i <= 150; ++i) {
         ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
       }
-      Flush();
+      ASSERT_OK(Flush());
       MoveFilesToLevel(4);
       ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
 
@@ -3900,8 +4243,8 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
       for (int i = 26; i <= 75; ++i) {
         ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
       }
-      Flush();
-      dbfull()->TEST_WaitForCompact();
+      ASSERT_OK(Flush());
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
       MoveFilesToLevel(1);
       ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
 
@@ -3931,9 +4274,9 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
       if (if_restart) {
         Reopen(options);
       } else {
-        Flush();
+        ASSERT_OK(Flush());
       }
-      dbfull()->TEST_WaitForCompact();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
       ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
       ASSERT_EQ(5, ttl_compactions);
 
@@ -3946,9 +4289,9 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
       if (if_restart) {
         Reopen(options);
       } else {
-        Flush();
+        ASSERT_OK(Flush());
       }
-      dbfull()->TEST_WaitForCompact();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
       ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
       ASSERT_GE(ttl_compactions, 6);
 
@@ -4013,9 +4356,9 @@ TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
           ASSERT_OK(
               Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
         }
-        Flush();
+        ASSERT_OK(Flush());
       }
-      dbfull()->TEST_WaitForCompact();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
       ASSERT_EQ("2", FilesPerLevel());
       ASSERT_EQ(0, periodic_compactions);
@@ -4023,8 +4366,8 @@ TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
       // Add 50 hours and do a write
       env_->MockSleepForSeconds(50 * 60 * 60);
       ASSERT_OK(Put("a", "1"));
-      Flush();
-      dbfull()->TEST_WaitForCompact();
+      ASSERT_OK(Flush());
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
       // Assert that the files stay in the same level
       ASSERT_EQ("3", FilesPerLevel());
       // The two old files go through the periodic compaction process
@@ -4039,9 +4382,9 @@ TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
       if (if_restart) {
         Reopen(options);
       } else {
-        Flush();
+        ASSERT_OK(Flush());
       }
-      dbfull()->TEST_WaitForCompact();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
       ASSERT_EQ("1,3", FilesPerLevel());
       // The three old files now go through the periodic compaction process. 2
       // + 3.
@@ -4050,8 +4393,8 @@ TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
       // Add another 50 hours and do another write
       env_->MockSleepForSeconds(50 * 60 * 60);
       ASSERT_OK(Put("c", "3"));
-      Flush();
-      dbfull()->TEST_WaitForCompact();
+      ASSERT_OK(Flush());
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
       ASSERT_EQ("2,3", FilesPerLevel());
       // The four old files now go through the periodic compaction process. 5
       // + 4.
@@ -4108,7 +4451,7 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithOldDB) {
       ASSERT_OK(
           Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
     }
-    Flush();
+    ASSERT_OK(Flush());
     // Move the first two files to L2.
     if (i == 1) {
       MoveFilesToLevel(2);
@@ -4142,9 +4485,9 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
   const int kValueSize = 100;
 
   Options options = CurrentOptions();
-  options.ttl = 10 * 60 * 60;  // 10 hours
+  options.ttl = 10 * 60 * 60;                          // 10 hours
   options.periodic_compaction_seconds = 48 * 60 * 60;  // 2 days
-  options.max_open_files = -1;   // needed for both periodic and ttl compactions
+  options.max_open_files = -1;  // needed for both periodic and ttl compactions
   env_->SetMockSleep();
   options.env = env_;
 
@@ -4172,9 +4515,9 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
       ASSERT_OK(
           Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
     }
-    Flush();
+    ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   MoveFilesToLevel(3);
 
@@ -4185,8 +4528,8 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
   // Add some time greater than periodic_compaction_time.
   env_->MockSleepForSeconds(50 * 60 * 60);
   ASSERT_OK(Put("a", "1"));
-  Flush();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   // Files in the bottom level go through periodic compactions.
   ASSERT_EQ("1,0,0,2", FilesPerLevel());
   ASSERT_EQ(2, periodic_compactions);
@@ -4195,8 +4538,8 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
   // Add a little more time than ttl
   env_->MockSleepForSeconds(11 * 60 * 60);
   ASSERT_OK(Put("b", "1"));
-  Flush();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   // Notice that the previous file in level 1 falls down to the bottom level
   // due to ttl compactions, one level at a time.
   // And bottom level files don't get picked up for ttl compactions.
@@ -4207,8 +4550,8 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
   // Add some time greater than periodic_compaction_time.
   env_->MockSleepForSeconds(50 * 60 * 60);
   ASSERT_OK(Put("c", "1"));
-  Flush();
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   // Previous L0 file falls one level at a time to bottom level due to ttl.
   // And all 4 bottom files go through periodic compactions.
   ASSERT_EQ("1,0,0,4", FilesPerLevel());
@@ -4218,48 +4561,116 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
-TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
-  class TestCompactionFilter : public CompactionFilter {
-    const char* Name() const override { return "TestCompactionFilter"; }
-  };
-  class TestCompactionFilterFactory : public CompactionFilterFactory {
-    const char* Name() const override { return "TestCompactionFilterFactory"; }
-    std::unique_ptr<CompactionFilter> CreateCompactionFilter(
-        const CompactionFilter::Context& /*context*/) override {
-      return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
-    }
-  };
-
+TEST_F(DBCompactionTest, LevelTtlBooster) {
   const int kNumKeysPerFile = 32;
-  const int kNumLevelFiles = 2;
-  const int kValueSize = 100;
-
-  Random rnd(301);
+  const int kNumLevelFiles = 3;
+  const int kValueSize = 1000;
 
   Options options = CurrentOptions();
-  TestCompactionFilter test_compaction_filter;
+  options.ttl = 10 * 60 * 60;                           // 10 hours
+  options.periodic_compaction_seconds = 480 * 60 * 60;  // very long
+  options.level0_file_num_compaction_trigger = 2;
+  options.max_bytes_for_level_base = 5 * uint64_t{kNumKeysPerFile * kValueSize};
+  options.max_open_files = -1;  // needed for both periodic and ttl compactions
+  options.compaction_pri = CompactionPri::kMinOverlappingRatio;
   env_->SetMockSleep();
   options.env = env_;
 
   // NOTE: Presumed unnecessary and removed: resetting mock time in env
 
-  enum CompactionFilterType {
-    kUseCompactionFilter,
-    kUseCompactionFilterFactory
-  };
-
-  for (CompactionFilterType comp_filter_type :
-       {kUseCompactionFilter, kUseCompactionFilterFactory}) {
-    // Assert that periodic compactions are not enabled.
-    ASSERT_EQ(port::kMaxUint64 - 1, options.periodic_compaction_seconds);
+  DestroyAndReopen(options);
 
-    if (comp_filter_type == kUseCompactionFilter) {
-      options.compaction_filter = &test_compaction_filter;
-      options.compaction_filter_factory.reset();
-    } else if (comp_filter_type == kUseCompactionFilterFactory) {
-      options.compaction_filter = nullptr;
-      options.compaction_filter_factory.reset(
-          new TestCompactionFilterFactory());
+  Random rnd(301);
+  for (int i = 0; i < kNumLevelFiles; ++i) {
+    for (int j = 0; j < kNumKeysPerFile; ++j) {
+      ASSERT_OK(
+          Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  MoveFilesToLevel(2);
+
+  ASSERT_EQ("0,0,3", FilesPerLevel());
+
+  // Create some files for L1
+  for (int i = 0; i < 2; i++) {
+    for (int j = 0; j < kNumKeysPerFile; ++j) {
+      ASSERT_OK(Put(Key(2 * j + i), rnd.RandomString(kValueSize)));
+    }
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  }
+
+  ASSERT_EQ("0,1,3", FilesPerLevel());
+
+  // Make the new L0 files qualify TTL boosting and generate one more to trigger
+  // L1 -> L2 compaction. Old files will be picked even if their priority is
+  // lower without boosting.
+  env_->MockSleepForSeconds(8 * 60 * 60);
+  for (int i = 0; i < 2; i++) {
+    for (int j = 0; j < kNumKeysPerFile; ++j) {
+      ASSERT_OK(Put(Key(kNumKeysPerFile * 2 + 2 * j + i),
+                    rnd.RandomString(kValueSize * 2)));
+    }
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  }
+  // Force files to be compacted to L1
+  ASSERT_OK(
+      dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "1"}}));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_EQ("0,1,2", FilesPerLevel());
+  ASSERT_OK(
+      dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"}}));
+
+  ASSERT_GT(SizeAtLevel(1), kNumKeysPerFile * 4 * kValueSize);
+}
+
+TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
+  class TestCompactionFilter : public CompactionFilter {
+    const char* Name() const override { return "TestCompactionFilter"; }
+  };
+  class TestCompactionFilterFactory : public CompactionFilterFactory {
+    const char* Name() const override { return "TestCompactionFilterFactory"; }
+    std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+        const CompactionFilter::Context& /*context*/) override {
+      return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
+    }
+  };
+
+  const int kNumKeysPerFile = 32;
+  const int kNumLevelFiles = 2;
+  const int kValueSize = 100;
+
+  Random rnd(301);
+
+  Options options = CurrentOptions();
+  TestCompactionFilter test_compaction_filter;
+  env_->SetMockSleep();
+  options.env = env_;
+
+  // NOTE: Presumed unnecessary and removed: resetting mock time in env
+
+  enum CompactionFilterType {
+    kUseCompactionFilter,
+    kUseCompactionFilterFactory
+  };
+
+  for (CompactionFilterType comp_filter_type :
+       {kUseCompactionFilter, kUseCompactionFilterFactory}) {
+    // Assert that periodic compactions are not enabled.
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max() - 1,
+              options.periodic_compaction_seconds);
+
+    if (comp_filter_type == kUseCompactionFilter) {
+      options.compaction_filter = &test_compaction_filter;
+      options.compaction_filter_factory.reset();
+    } else if (comp_filter_type == kUseCompactionFilterFactory) {
+      options.compaction_filter = nullptr;
+      options.compaction_filter_factory.reset(
+          new TestCompactionFilterFactory());
     }
     DestroyAndReopen(options);
 
@@ -4284,9 +4695,9 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
         ASSERT_OK(
             Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
       }
-      Flush();
+      ASSERT_OK(Flush());
     }
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
     ASSERT_EQ("2", FilesPerLevel());
     ASSERT_EQ(0, periodic_compactions);
@@ -4294,8 +4705,8 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
     // Add 31 days and do a write
     env_->MockSleepForSeconds(31 * 24 * 60 * 60);
     ASSERT_OK(Put("a", "1"));
-    Flush();
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
     // Assert that the files stay in the same level
     ASSERT_EQ("3", FilesPerLevel());
     // The two old files go through the periodic compaction process
@@ -4344,16 +4755,16 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
       for (int k = 0; k < 2; ++k) {
         ASSERT_OK(Put(Key(k), rnd.RandomString(1024)));
       }
-      Flush();
+      ASSERT_OK(Flush());
     }
     auto manual_compaction_thread = port::Thread([this]() {
       CompactRangeOptions cro;
       cro.allow_write_stall = false;
-      db_->CompactRange(cro, nullptr, nullptr);
+      ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
     });
 
     manual_compaction_thread.join();
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
     ASSERT_EQ(0, NumTableFilesAtLevel(0));
     ASSERT_GT(NumTableFilesAtLevel(1), 0);
     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
@@ -4400,17 +4811,17 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
       FlushOptions flush_opts;
       flush_opts.wait = false;
       flush_opts.allow_write_stall = true;
-      dbfull()->Flush(flush_opts);
+      ASSERT_OK(dbfull()->Flush(flush_opts));
     }
 
     auto manual_compaction_thread = port::Thread([this]() {
       CompactRangeOptions cro;
       cro.allow_write_stall = false;
-      db_->CompactRange(cro, nullptr, nullptr);
+      ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
     });
 
     manual_compaction_thread.join();
-    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
     ASSERT_EQ(0, NumTableFilesAtLevel(0));
     ASSERT_GT(NumTableFilesAtLevel(1), 0);
     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
@@ -4446,12 +4857,11 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
       for (int k = 0; k < 2; ++k) {
         ASSERT_OK(Put(1, Key(k), rnd.RandomString(1024)));
       }
-      Flush(1);
+      ASSERT_OK(Flush(1));
     }
     auto manual_compaction_thread = port::Thread([this, i]() {
       CompactRangeOptions cro;
       cro.allow_write_stall = false;
-      Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr);
       if (i == 0) {
         ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
                         .IsColumnFamilyDropped());
@@ -4471,7 +4881,7 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
     manual_compaction_thread.join();
     TEST_SYNC_POINT(
         "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
   }
 }
@@ -4499,33 +4909,35 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
        {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  //used for the delayable flushes
+  // used for the delayable flushes
   FlushOptions flush_opts;
   flush_opts.allow_write_stall = true;
   for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
     for (int j = 0; j < 2; ++j) {
       ASSERT_OK(Put(Key(j), rnd.RandomString(1024)));
     }
-    dbfull()->Flush(flush_opts);
+    ASSERT_OK(dbfull()->Flush(flush_opts));
   }
   auto manual_compaction_thread = port::Thread([this]() {
     CompactRangeOptions cro;
     cro.allow_write_stall = false;
-    db_->CompactRange(cro, nullptr, nullptr);
+    ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
   });
 
   TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
-  Put(ToString(0), rnd.RandomString(1024));
-  dbfull()->Flush(flush_opts);
-  Put(ToString(0), rnd.RandomString(1024));
-  TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
+  ASSERT_OK(Put(std::to_string(0), rnd.RandomString(1024)));
+  ASSERT_OK(dbfull()->Flush(flush_opts));
+  ASSERT_OK(Put(std::to_string(0), rnd.RandomString(1024)));
+  TEST_SYNC_POINT(
+      "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
   manual_compaction_thread.join();
 
   // If CompactRange's flush was skipped, the final Put above will still be
   // in the active memtable.
   std::string num_keys_in_memtable;
-  db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
-  ASSERT_EQ(ToString(1), num_keys_in_memtable);
+  ASSERT_TRUE(db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable,
+                               &num_keys_in_memtable));
+  ASSERT_EQ(std::to_string(1), num_keys_in_memtable);
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
@@ -4582,7 +4994,7 @@ TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
       } else {
         ASSERT_EQ(2, num_memtable_entries);
         // flush anyways to prepare for next iteration
-        db_->Flush(FlushOptions());
+        ASSERT_OK(db_->Flush(FlushOptions()));
       }
     }
   }
@@ -4597,12 +5009,12 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
 
   for (int i = 0; i < 32; i++) {
     for (int j = 0; j < 5000; j++) {
-      Put(std::to_string(j), std::string(1, 'A'));
+      ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
     }
     ASSERT_OK(Flush());
     ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ColumnFamilyHandleImpl* cfh =
       static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
   ColumnFamilyData* cfd = cfh->cfd();
@@ -4610,6 +5022,98 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
   VerifyCompactionStats(*cfd, *collector);
 }
 
+TEST_F(DBCompactionTest, SubcompactionEvent) {
+  class SubCompactionEventListener : public EventListener {
+   public:
+    void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
+      InstrumentedMutexLock l(&mutex_);
+      ASSERT_EQ(running_compactions_.find(ci.job_id),
+                running_compactions_.end());
+      running_compactions_.emplace(ci.job_id, std::unordered_set<int>());
+    }
+
+    void OnCompactionCompleted(DB* /*db*/,
+                               const CompactionJobInfo& ci) override {
+      InstrumentedMutexLock l(&mutex_);
+      auto it = running_compactions_.find(ci.job_id);
+      ASSERT_NE(it, running_compactions_.end());
+      ASSERT_EQ(it->second.size(), 0);
+      running_compactions_.erase(it);
+    }
+
+    void OnSubcompactionBegin(const SubcompactionJobInfo& si) override {
+      InstrumentedMutexLock l(&mutex_);
+      auto it = running_compactions_.find(si.job_id);
+      ASSERT_NE(it, running_compactions_.end());
+      auto r = it->second.insert(si.subcompaction_job_id);
+      ASSERT_TRUE(r.second);  // each subcompaction_job_id should be different
+      total_subcompaction_cnt_++;
+    }
+
+    void OnSubcompactionCompleted(const SubcompactionJobInfo& si) override {
+      InstrumentedMutexLock l(&mutex_);
+      auto it = running_compactions_.find(si.job_id);
+      ASSERT_NE(it, running_compactions_.end());
+      auto r = it->second.erase(si.subcompaction_job_id);
+      ASSERT_EQ(r, 1);
+    }
+
+    size_t GetRunningCompactionCount() {
+      InstrumentedMutexLock l(&mutex_);
+      return running_compactions_.size();
+    }
+
+    size_t GetTotalSubcompactionCount() {
+      InstrumentedMutexLock l(&mutex_);
+      return total_subcompaction_cnt_;
+    }
+
+   private:
+    InstrumentedMutex mutex_;
+    std::unordered_map<int, std::unordered_set<int>> running_compactions_;
+    size_t total_subcompaction_cnt_ = 0;
+  };
+
+  Options options = CurrentOptions();
+  options.target_file_size_base = 1024;
+  options.level0_file_num_compaction_trigger = 10;
+  auto* listener = new SubCompactionEventListener();
+  options.listeners.emplace_back(listener);
+
+  DestroyAndReopen(options);
+
+  // generate 4 files @ L2
+  for (int i = 0; i < 4; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 10 + j;
+      ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
+  MoveFilesToLevel(2);
+
+  // generate 2 files @ L1 which overlaps with L2 files
+  for (int i = 0; i < 2; i++) {
+    for (int j = 0; j < 10; j++) {
+      int key_id = i * 20 + j * 2;
+      ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+    }
+    ASSERT_OK(Flush());
+  }
+  MoveFilesToLevel(1);
+  ASSERT_EQ(FilesPerLevel(), "0,2,4");
+
+  CompactRangeOptions comp_opts;
+  comp_opts.max_subcompactions = 4;
+  Status s = dbfull()->CompactRange(comp_opts, nullptr, nullptr);
+  ASSERT_OK(s);
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  // make sure there's no running compaction
+  ASSERT_EQ(listener->GetRunningCompactionCount(), 0);
+  // and sub compaction is triggered
+  ASSERT_GT(listener->GetTotalSubcompactionCount(), 0);
+}
+
 TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
   // LSM setup:
   // L1:      [ba bz]
@@ -4687,7 +5191,7 @@ TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
   ASSERT_OK(Delete("b"));
   ASSERT_OK(Flush());
 
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
   ASSERT_EQ(NumTableFilesAtLevel(1), 0);
 
@@ -4718,10 +5222,10 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
   }
 
   std::shared_ptr<ConcurrentTaskLimiter> unique_limiter(
-    NewConcurrentTaskLimiter("unique_limiter", -1));
+      NewConcurrentTaskLimiter("unique_limiter", -1));
 
-  const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5",
-    "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
+  const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5", "6", "7",
+                            "8",       "9", "a", "b", "c", "d", "e", "f"};
   const unsigned int cf_count = sizeof cf_names / sizeof cf_names[0];
 
   std::unordered_map<std::string, CompactionLimiter*> cf_to_limiter;
@@ -4733,9 +5237,10 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
   options.level0_file_num_compaction_trigger = 4;
   options.level0_slowdown_writes_trigger = 64;
   options.level0_stop_writes_trigger = 64;
-  options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
-  options.max_write_buffer_number = 10; // Enough memtables
+  options.max_background_jobs = kMaxBackgroundThreads;  // Enough threads
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
+  options.max_write_buffer_number = 10;  // Enough memtables
   DestroyAndReopen(options);
 
   std::vector<Options> option_vector;
@@ -4763,9 +5268,8 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
     CreateColumnFamilies({cf_names[cf]}, option_vector[cf]);
   }
 
-  ReopenWithColumnFamilies(std::vector<std::string>(cf_names,
-                                                    cf_names + cf_count),
-                           option_vector);
+  ReopenWithColumnFamilies(
+      std::vector<std::string>(cf_names, cf_names + cf_count), option_vector);
 
   port::Mutex mutex;
 
@@ -4820,14 +5324,14 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
     }
 
     for (unsigned int cf = 0; cf < cf_count; cf++) {
-      dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+      ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
     }
   }
 
   // Enough L0 files to trigger compaction
   for (unsigned int cf = 0; cf < cf_count; cf++) {
     ASSERT_EQ(NumTableFilesAtLevel(0, cf),
-      options.level0_file_num_compaction_trigger);
+              options.level0_file_num_compaction_trigger);
   }
 
   // Create more files for one column family, which triggers speed up
@@ -4838,7 +5342,7 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
     }
     // put extra key to trigger flush
     ASSERT_OK(Put(0, "", ""));
-    dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
     ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
               NumTableFilesAtLevel(0, 0));
   }
@@ -4853,7 +5357,7 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
   }
 
   for (unsigned int cf = 0; cf < cf_count; cf++) {
-    dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
   }
 
   ASSERT_OK(dbfull()->TEST_WaitForCompact());
@@ -4870,12 +5374,12 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
 
   // flush one more file to cf 1
   for (int i = 0; i < kNumKeysPerFile; i++) {
-      ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
+    ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
   }
   // put extra key to trigger flush
   ASSERT_OK(Put(cf_test, "", ""));
 
-  dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]);
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]));
   ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test));
 
   Compact(cf_test, Key(0), Key(keyIndex));
@@ -4894,7 +5398,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
   options.create_if_missing = true;
   options.disable_auto_compactions = true;
   options.use_direct_io_for_flush_and_compaction = GetParam();
-  options.env = new MockEnv(Env::Default());
+  options.env = MockEnv::Create(Env::Default());
   Reopen(options);
   bool readahead = false;
   SyncPoint::GetInstance()->SetCallBack(
@@ -4905,9 +5409,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
       });
   if (options.use_direct_io_for_flush_and_compaction) {
     SyncPoint::GetInstance()->SetCallBack(
-        "SanitizeOptions:direct_io", [&](void* /*arg*/) {
-          readahead = true;
-        });
+        "SanitizeOptions:direct_io", [&](void* /*arg*/) { readahead = true; });
   }
   SyncPoint::GetInstance()->EnableProcessing();
   CreateAndReopenWithCF({"pikachu"}, options);
@@ -4927,7 +5429,7 @@ class CompactionPriTest : public DBTestBase,
                           public testing::WithParamInterface<uint32_t> {
  public:
   CompactionPriTest()
-      : DBTestBase("/compaction_pri_test", /*env_do_fsync=*/true) {
+      : DBTestBase("compaction_pri_test", /*env_do_fsync=*/true) {
     compaction_pri_ = GetParam();
   }
 
@@ -4961,7 +5463,7 @@ TEST_P(CompactionPriTest, Test) {
     ASSERT_OK(Put(Key(keys[i]), rnd.RandomString(102)));
   }
 
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   for (int i = 0; i < kNKeys; i++) {
     ASSERT_NE("NOT_FOUND", Get(Key(i)));
   }
@@ -4972,1052 +5474,2740 @@ INSTANTIATE_TEST_CASE_P(
     ::testing::Values(CompactionPri::kByCompensatedSize,
                       CompactionPri::kOldestLargestSeqFirst,
                       CompactionPri::kOldestSmallestSeqFirst,
-                      CompactionPri::kMinOverlappingRatio));
-
-class NoopMergeOperator : public MergeOperator {
- public:
-  NoopMergeOperator() {}
-
-  bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
-                   MergeOperationOutput* merge_out) const override {
-    std::string val("bar");
-    merge_out->new_value = val;
-    return true;
-  }
+                      CompactionPri::kMinOverlappingRatio,
+                      CompactionPri::kRoundRobin));
 
-  const char* Name() const override { return "Noop"; }
-};
+TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) {
+  Options options = CurrentOptions();
+  options.write_buffer_size = 16 * 1024;
+  options.max_bytes_for_level_base = 128 * 1024;
+  options.target_file_size_base = 64 * 1024;
+  options.level0_file_num_compaction_trigger = 4;
+  options.compaction_pri = CompactionPri::kRoundRobin;
+  options.max_bytes_for_level_multiplier = 4;
+  options.num_levels = 3;
+  options.compression = kNoCompression;
 
-TEST_F(DBCompactionTest, PartialManualCompaction) {
-  Options opts = CurrentOptions();
-  opts.num_levels = 3;
-  opts.level0_file_num_compaction_trigger = 10;
-  opts.compression = kNoCompression;
-  opts.merge_operator.reset(new NoopMergeOperator());
-  opts.target_file_size_base = 10240;
-  DestroyAndReopen(opts);
+  DestroyAndReopen(options);
 
   Random rnd(301);
-  for (auto i = 0; i < 8; ++i) {
-    for (auto j = 0; j < 10; ++j) {
-      Merge("foo", rnd.RandomString(1024));
+
+  // 30 Files in L0 to trigger compactions between L1 and L2
+  for (int i = 0; i < 30; i++) {
+    for (int j = 0; j < 16; j++) {
+      ASSERT_OK(Put(rnd.RandomString(24), rnd.RandomString(1000)));
     }
-    Flush();
+    ASSERT_OK(Flush());
   }
 
-  MoveFilesToLevel(2);
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
-  std::string prop;
-  EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
-  uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
-  ASSERT_OK(dbfull()->SetOptions(
-      {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
+  VersionSet* const versions = dbfull()->GetVersionSet();
+  assert(versions);
 
-  CompactRangeOptions cro;
-  cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
-  dbfull()->CompactRange(cro, nullptr, nullptr);
-}
+  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+  ASSERT_NE(cfd, nullptr);
 
-TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
-  // Regression test for bug where manual compaction hangs forever when the DB
-  // is in read-only mode. Verify it now at least returns, despite failing.
-  const int kNumL0Files = 4;
-  std::unique_ptr<FaultInjectionTestEnv> mock_env(
-      new FaultInjectionTestEnv(env_));
-  Options opts = CurrentOptions();
-  opts.disable_auto_compactions = true;
-  opts.env = mock_env.get();
-  DestroyAndReopen(opts);
+  Version* const current = cfd->current();
+  ASSERT_NE(current, nullptr);
 
-  Random rnd(301);
-  for (int i = 0; i < kNumL0Files; ++i) {
-    // Make sure files are overlapping in key-range to prevent trivial move.
-    Put("key1", rnd.RandomString(1024));
-    Put("key2", rnd.RandomString(1024));
-    Flush();
-  }
-  ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
+  const VersionStorageInfo* const storage_info = current->storage_info();
+  ASSERT_NE(storage_info, nullptr);
 
-  // Enter read-only mode by failing a write.
-  mock_env->SetFilesystemActive(false);
-  // Make sure this is outside `CompactRange`'s range so that it doesn't fail
-  // early trying to flush memtable.
-  ASSERT_NOK(Put("key3", rnd.RandomString(1024)));
+  const std::vector<InternalKey> compact_cursors =
+      storage_info->GetCompactCursors();
 
-  // In the bug scenario, the first manual compaction would fail and forget to
-  // unregister itself, causing the second one to hang forever due to conflict
-  // with a non-running compaction.
-  CompactRangeOptions cro;
-  cro.exclusive_manual_compaction = false;
-  Slice begin_key("key1");
-  Slice end_key("key2");
-  ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
-  ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+  Reopen(options);
 
-  // Close before mock_env destruct.
-  Close();
-}
+  VersionSet* const reopened_versions = dbfull()->GetVersionSet();
+  assert(reopened_versions);
 
-// ManualCompactionBottomLevelOptimization tests the bottom level manual
-// compaction optimization to skip recompacting files created by Ln-1 to Ln
-// compaction
-TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
-  Options opts = CurrentOptions();
-  opts.num_levels = 3;
-  opts.level0_file_num_compaction_trigger = 5;
-  opts.compression = kNoCompression;
-  opts.merge_operator.reset(new NoopMergeOperator());
-  opts.target_file_size_base = 1024;
-  opts.max_bytes_for_level_multiplier = 2;
-  opts.disable_auto_compactions = true;
-  DestroyAndReopen(opts);
-  ColumnFamilyHandleImpl* cfh =
-      static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
-  ColumnFamilyData* cfd = cfh->cfd();
-  InternalStats* internal_stats_ptr = cfd->internal_stats();
-  ASSERT_NE(internal_stats_ptr, nullptr);
+  ColumnFamilyData* const reopened_cfd =
+      reopened_versions->GetColumnFamilySet()->GetDefault();
+  ASSERT_NE(reopened_cfd, nullptr);
 
-  Random rnd(301);
-  for (auto i = 0; i < 8; ++i) {
-    for (auto j = 0; j < 10; ++j) {
-      ASSERT_OK(
-          Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+  Version* const reopened_current = reopened_cfd->current();
+  ASSERT_NE(reopened_current, nullptr);
+
+  const VersionStorageInfo* const reopened_storage_info =
+      reopened_current->storage_info();
+  ASSERT_NE(reopened_storage_info, nullptr);
+
+  const std::vector<InternalKey> reopened_compact_cursors =
+      reopened_storage_info->GetCompactCursors();
+  const auto icmp = reopened_storage_info->InternalComparator();
+  ASSERT_EQ(compact_cursors.size(), reopened_compact_cursors.size());
+  for (size_t i = 0; i < compact_cursors.size(); i++) {
+    if (compact_cursors[i].Valid()) {
+      ASSERT_EQ(0,
+                icmp->Compare(compact_cursors[i], reopened_compact_cursors[i]));
+    } else {
+      ASSERT_TRUE(!reopened_compact_cursors[i].Valid());
     }
-    Flush();
   }
+}
 
-  MoveFilesToLevel(2);
+TEST_P(RoundRobinSubcompactionsAgainstPressureToken, PressureTokenTest) {
+  const int kKeysPerBuffer = 100;
+  Options options = CurrentOptions();
+  options.num_levels = 4;
+  options.max_bytes_for_level_multiplier = 2;
+  options.level0_file_num_compaction_trigger = 4;
+  options.target_file_size_base = kKeysPerBuffer * 1024;
+  options.compaction_pri = CompactionPri::kRoundRobin;
+  options.max_bytes_for_level_base = 8 * kKeysPerBuffer * 1024;
+  options.disable_auto_compactions = true;
+  // Setup 7 threads but limited subcompactions so that
+  // RoundRobin requires extra compactions from reserved threads
+  options.max_subcompactions = 1;
+  options.max_background_compactions = 7;
+  options.max_compaction_bytes = 100000000;
+  DestroyAndReopen(options);
+  env_->SetBackgroundThreads(7, Env::LOW);
 
-  for (auto i = 0; i < 8; ++i) {
-    for (auto j = 0; j < 10; ++j) {
-      ASSERT_OK(
-          Put("bar" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+  Random rnd(301);
+  const std::vector<int> files_per_level = {0, 15, 25};
+  for (int lvl = 2; lvl > 0; lvl--) {
+    for (int i = 0; i < files_per_level[lvl]; i++) {
+      for (int j = 0; j < kKeysPerBuffer; j++) {
+        // Add (lvl-1) to ensure nearly equivallent number of files
+        // in L2 are overlapped with fils selected to compact from
+        // L1
+        ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)),
+                      rnd.RandomString(1010)));
+      }
+      ASSERT_OK(Flush());
     }
-    Flush();
+    MoveFilesToLevel(lvl);
+    ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0));
   }
-  const std::vector<InternalStats::CompactionStats>& comp_stats =
-      internal_stats_ptr->TEST_GetCompactionStats();
-  int num = comp_stats[2].num_input_files_in_output_level;
-  ASSERT_EQ(num, 0);
+  // 15 files in L1; 25 files in L2
 
-  CompactRangeOptions cro;
-  cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
-  dbfull()->CompactRange(cro, nullptr, nullptr);
+  // This is a variable for making sure the following callback is called
+  // and the assertions in it are indeed excuted.
+  bool num_planned_subcompactions_verified = false;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) {
+        uint64_t num_planned_subcompactions = *(static_cast<uint64_t*>(arg));
+        if (grab_pressure_token_) {
+          // 7 files are selected for round-robin under auto
+          // compaction. The number of planned subcompaction is restricted by
+          // the limited number of max_background_compactions
+          ASSERT_EQ(num_planned_subcompactions, 7);
+        } else {
+          ASSERT_EQ(num_planned_subcompactions, 1);
+        }
+        num_planned_subcompactions_verified = true;
+      });
 
-  const std::vector<InternalStats::CompactionStats>& comp_stats2 =
-      internal_stats_ptr->TEST_GetCompactionStats();
-  num = comp_stats2[2].num_input_files_in_output_level;
-  ASSERT_EQ(num, 0);
-}
+  // The following 3 dependencies have to be added to ensure the auto
+  // compaction and the pressure token is correctly enabled. Same for
+  // RoundRobinSubcompactionsUsingResources and
+  // DBCompactionTest.RoundRobinSubcompactionsShrinkResources
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"RoundRobinSubcompactionsAgainstPressureToken:0",
+        "BackgroundCallCompaction:0"},
+       {"CompactionJob::AcquireSubcompactionResources:0",
+        "RoundRobinSubcompactionsAgainstPressureToken:1"},
+       {"RoundRobinSubcompactionsAgainstPressureToken:2",
+        "CompactionJob::AcquireSubcompactionResources:1"}});
+  SyncPoint::GetInstance()->EnableProcessing();
 
-TEST_F(DBCompactionTest, CompactionDuringShutdown) {
-  Options opts = CurrentOptions();
-  opts.level0_file_num_compaction_trigger = 2;
-  opts.disable_auto_compactions = true;
-  DestroyAndReopen(opts);
-  ColumnFamilyHandleImpl* cfh =
-      static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
-  ColumnFamilyData* cfd = cfh->cfd();
-  InternalStats* internal_stats_ptr = cfd->internal_stats();
-  ASSERT_NE(internal_stats_ptr, nullptr);
-
-  Random rnd(301);
-  for (auto i = 0; i < 2; ++i) {
-    for (auto j = 0; j < 10; ++j) {
-      ASSERT_OK(
-          Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
-    }
-    Flush();
+  ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
+  TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:0");
+  TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:1");
+  std::unique_ptr<WriteControllerToken> pressure_token;
+  if (grab_pressure_token_) {
+    pressure_token =
+        dbfull()->TEST_write_controler().GetCompactionPressureToken();
   }
+  TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:2");
 
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
-      [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); });
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
-  ASSERT_OK(dbfull()->error_handler_.GetBGError());
+  ASSERT_OK(dbfull()->WaitForCompact());
+  ASSERT_TRUE(num_planned_subcompactions_verified);
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 
-// FixFileIngestionCompactionDeadlock tests and verifies that compaction and
-// file ingestion do not cause deadlock in the event of write stall triggered
-// by number of L0 files reaching level0_stop_writes_trigger.
-TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
-  const int kNumKeysPerFile = 100;
-  // Generate SST files.
-  Options options = CurrentOptions();
-
-  // Generate an external SST file containing a single key, i.e. 99
-  std::string sst_files_dir = dbname_ + "/sst_files/";
-  DestroyDir(env_, sst_files_dir);
-  ASSERT_OK(env_->CreateDir(sst_files_dir));
-  SstFileWriter sst_writer(EnvOptions(), options);
-  const std::string sst_file_path = sst_files_dir + "test.sst";
-  ASSERT_OK(sst_writer.Open(sst_file_path));
-  ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
-  ASSERT_OK(sst_writer.Finish());
+INSTANTIATE_TEST_CASE_P(RoundRobinSubcompactionsAgainstPressureToken,
+                        RoundRobinSubcompactionsAgainstPressureToken,
+                        testing::Bool());
 
-  SyncPoint::GetInstance()->DisableProcessing();
-  SyncPoint::GetInstance()->ClearAllCallBacks();
-  SyncPoint::GetInstance()->LoadDependency({
-      {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
-       "BackgroundCallCompaction:0"},
-  });
-  SyncPoint::GetInstance()->EnableProcessing();
+TEST_P(RoundRobinSubcompactionsAgainstResources, SubcompactionsUsingResources) {
+  const int kKeysPerBuffer = 200;
+  Options options = CurrentOptions();
+  options.num_levels = 4;
+  options.level0_file_num_compaction_trigger = 3;
+  options.target_file_size_base = kKeysPerBuffer * 1024;
+  options.compaction_pri = CompactionPri::kRoundRobin;
+  options.max_bytes_for_level_base = 30 * kKeysPerBuffer * 1024;
+  options.disable_auto_compactions = true;
+  options.max_subcompactions = 1;
+  options.max_background_compactions = max_compaction_limits_;
+  // Set a large number for max_compaction_bytes so that one round-robin
+  // compaction is enough to make post-compaction L1 size less than
+  // the maximum size (this test assumes only one round-robin compaction
+  // is triggered by kLevelMaxLevelSize)
+  options.max_compaction_bytes = 100000000;
 
-  options.write_buffer_size = 110 << 10;  // 110KB
-  options.level0_file_num_compaction_trigger =
-      options.level0_stop_writes_trigger;
-  options.max_subcompactions = max_subcompactions_;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
   DestroyAndReopen(options);
-  Random rnd(301);
+  env_->SetBackgroundThreads(total_low_pri_threads_, Env::LOW);
 
-  // Generate level0_stop_writes_trigger L0 files to trigger write stop
-  for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
-    for (int j = 0; j != kNumKeysPerFile; ++j) {
-      ASSERT_OK(Put(Key(j), rnd.RandomString(990)));
-    }
-    if (0 == i) {
-      // When we reach here, the memtables have kNumKeysPerFile keys. Note that
-      // flush is not yet triggered. We need to write an extra key so that the
-      // write path will call PreprocessWrite and flush the previous key-value
-      // pairs to e flushed. After that, there will be the newest key in the
-      // memtable, and a bunch of L0 files. Since there is already one key in
-      // the memtable, then for i = 1, 2, ..., we do not have to write this
-      // extra key to trigger flush.
-      ASSERT_OK(Put("", ""));
+  Random rnd(301);
+  const std::vector<int> files_per_level = {0, 40, 100};
+  for (int lvl = 2; lvl > 0; lvl--) {
+    for (int i = 0; i < files_per_level[lvl]; i++) {
+      for (int j = 0; j < kKeysPerBuffer; j++) {
+        // Add (lvl-1) to ensure nearly equivallent number of files
+        // in L2 are overlapped with fils selected to compact from
+        // L1
+        ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)),
+                      rnd.RandomString(1010)));
+      }
+      ASSERT_OK(Flush());
     }
-    dbfull()->TEST_WaitForFlushMemTable();
-    ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
+    MoveFilesToLevel(lvl);
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0));
   }
-  // When we reach this point, there will be level0_stop_writes_trigger L0
-  // files and one extra key (99) in memory, which overlaps with the external
-  // SST file. Write stall triggers, and can be cleared only after compaction
-  // reduces the number of L0 files.
-
-  // Compaction will also be triggered since we have reached the threshold for
-  // auto compaction. Note that compaction may begin after the following file
-  // ingestion thread and waits for ingestion to finish.
-
-  // Thread to ingest file with overlapping key range with the current
-  // memtable. Consequently ingestion will trigger a flush. The flush MUST
-  // proceed without waiting for the write stall condition to clear, otherwise
-  // deadlock can happen.
-  port::Thread ingestion_thr([&]() {
-    IngestExternalFileOptions ifo;
-    Status s = db_->IngestExternalFile({sst_file_path}, ifo);
-    ASSERT_OK(s);
-  });
-
-  // More write to trigger write stop
-  ingestion_thr.join();
-  ASSERT_OK(dbfull()->TEST_WaitForCompact());
-  Close();
-}
-
-TEST_F(DBCompactionTest, ConsistencyFailTest) {
-  Options options = CurrentOptions();
-  options.force_consistency_checks = true;
-  DestroyAndReopen(options);
 
+  // 40 files in L1; 100 files in L2
+  // This is a variable for making sure the following callback is called
+  // and the assertions in it are indeed excuted.
+  bool num_planned_subcompactions_verified = false;
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "VersionBuilder::CheckConsistency0", [&](void* arg) {
-        auto p =
-            reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
-        // just swap the two FileMetaData so that we hit error
-        // in CheckConsistency funcion
-        FileMetaData* temp = *(p->first);
-        *(p->first) = *(p->second);
-        *(p->second) = temp;
+      "CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) {
+        uint64_t num_planned_subcompactions = *(static_cast<uint64_t*>(arg));
+        // More than 10 files are selected for round-robin under auto
+        // compaction. The number of planned subcompaction is restricted by
+        // the minimum number between available threads and compaction limits
+        ASSERT_EQ(num_planned_subcompactions - options.max_subcompactions,
+                  std::min(total_low_pri_threads_, max_compaction_limits_) - 1);
+        num_planned_subcompactions_verified = true;
       });
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"RoundRobinSubcompactionsAgainstResources:0",
+        "BackgroundCallCompaction:0"},
+       {"CompactionJob::AcquireSubcompactionResources:0",
+        "RoundRobinSubcompactionsAgainstResources:1"},
+       {"RoundRobinSubcompactionsAgainstResources:2",
+        "CompactionJob::AcquireSubcompactionResources:1"},
+       {"CompactionJob::ReleaseSubcompactionResources:0",
+        "RoundRobinSubcompactionsAgainstResources:3"},
+       {"RoundRobinSubcompactionsAgainstResources:4",
+        "CompactionJob::ReleaseSubcompactionResources:1"}});
+  SyncPoint::GetInstance()->EnableProcessing();
 
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-
-  for (int k = 0; k < 2; ++k) {
-    ASSERT_OK(Put("foo", "bar"));
-    Flush();
-  }
+  ASSERT_OK(dbfull()->WaitForCompact());
+  ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
+  TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:0");
+  TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:1");
+  auto pressure_token =
+      dbfull()->TEST_write_controler().GetCompactionPressureToken();
 
-  ASSERT_NOK(Put("foo", "bar"));
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:2");
+  TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:3");
+  // We can reserve more threads now except one is being used
+  ASSERT_EQ(total_low_pri_threads_ - 1,
+            env_->ReserveThreads(total_low_pri_threads_, Env::Priority::LOW));
+  ASSERT_EQ(
+      total_low_pri_threads_ - 1,
+      env_->ReleaseThreads(total_low_pri_threads_ - 1, Env::Priority::LOW));
+  TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:4");
+  ASSERT_OK(dbfull()->WaitForCompact());
+  ASSERT_TRUE(num_planned_subcompactions_verified);
+  SyncPoint::GetInstance()->DisableProcessing();
   SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 
-TEST_F(DBCompactionTest, ConsistencyFailTest2) {
+INSTANTIATE_TEST_CASE_P(RoundRobinSubcompactionsAgainstResources,
+                        RoundRobinSubcompactionsAgainstResources,
+                        ::testing::Values(std::make_tuple(1, 5),
+                                          std::make_tuple(5, 1),
+                                          std::make_tuple(10, 5),
+                                          std::make_tuple(5, 10),
+                                          std::make_tuple(10, 10)));
+
+TEST_P(DBCompactionTestWithParam, RoundRobinWithoutAdditionalResources) {
+  const int kKeysPerBuffer = 200;
   Options options = CurrentOptions();
-  options.force_consistency_checks = true;
-  options.target_file_size_base = 1000;
-  options.level0_file_num_compaction_trigger = 2;
-  BlockBasedTableOptions bbto;
-  bbto.block_size = 400;  // small block size
-  options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+  options.num_levels = 4;
+  options.level0_file_num_compaction_trigger = 3;
+  options.target_file_size_base = kKeysPerBuffer * 1024;
+  options.compaction_pri = CompactionPri::kRoundRobin;
+  options.max_bytes_for_level_base = 30 * kKeysPerBuffer * 1024;
+  options.disable_auto_compactions = true;
+  options.max_subcompactions = max_subcompactions_;
+  options.max_background_compactions = 1;
+  options.max_compaction_bytes = 100000000;
+  // Similar experiment setting as above except the max_subcompactions
+  // is given by max_subcompactions_ (1 or 4), and we fix the
+  // additional resources as (1, 1) and thus no more extra resources
+  // can be used
   DestroyAndReopen(options);
+  env_->SetBackgroundThreads(1, Env::LOW);
+
+  Random rnd(301);
+  const std::vector<int> files_per_level = {0, 33, 100};
+  for (int lvl = 2; lvl > 0; lvl--) {
+    for (int i = 0; i < files_per_level[lvl]; i++) {
+      for (int j = 0; j < kKeysPerBuffer; j++) {
+        // Add (lvl-1) to ensure nearly equivallent number of files
+        // in L2 are overlapped with fils selected to compact from
+        // L1
+        ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)),
+                      rnd.RandomString(1010)));
+      }
+      ASSERT_OK(Flush());
+    }
+    MoveFilesToLevel(lvl);
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0));
+  }
 
+  // 33 files in L1; 100 files in L2
+  // This is a variable for making sure the following callback is called
+  // and the assertions in it are indeed excuted.
+  bool num_planned_subcompactions_verified = false;
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "VersionBuilder::CheckConsistency1", [&](void* arg) {
-        auto p =
-            reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
-        // just swap the two FileMetaData so that we hit error
-        // in CheckConsistency funcion
-        FileMetaData* temp = *(p->first);
-        *(p->first) = *(p->second);
-        *(p->second) = temp;
+      "CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) {
+        uint64_t num_planned_subcompactions = *(static_cast<uint64_t*>(arg));
+        // At most 4 files are selected for round-robin under auto
+        // compaction. The number of planned subcompaction is restricted by
+        // the max_subcompactions since no extra resources can be used
+        ASSERT_EQ(num_planned_subcompactions, options.max_subcompactions);
+        num_planned_subcompactions_verified = true;
       });
+  // No need to setup dependency for pressure token since
+  // AcquireSubcompactionResources may not be called and it anyway cannot
+  // reserve any additional resources
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBCompactionTest::RoundRobinWithoutAdditionalResources:0",
+        "BackgroundCallCompaction:0"}});
+  SyncPoint::GetInstance()->EnableProcessing();
 
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-
-  Random rnd(301);
-  std::string value = rnd.RandomString(1000);
-
-  ASSERT_OK(Put("foo1", value));
-  ASSERT_OK(Put("z", ""));
-  Flush();
-  ASSERT_OK(Put("foo2", value));
-  ASSERT_OK(Put("z", ""));
-  Flush();
+  ASSERT_OK(dbfull()->WaitForCompact());
+  ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
+  TEST_SYNC_POINT("DBCompactionTest::RoundRobinWithoutAdditionalResources:0");
 
-  // This probably returns non-OK, but we rely on the next Put()
-  // to determine the DB is frozen.
-  dbfull()->TEST_WaitForCompact();
-  ASSERT_NOK(Put("foo", "bar"));
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ASSERT_OK(dbfull()->WaitForCompact());
+  ASSERT_TRUE(num_planned_subcompactions_verified);
+  SyncPoint::GetInstance()->DisableProcessing();
   SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 
-void IngestOneKeyValue(DBImpl* db, const std::string& key,
-                       const std::string& value, const Options& options) {
-  ExternalSstFileInfo info;
-  std::string f = test::PerThreadDBPath("sst_file" + key);
-  EnvOptions env;
-  ROCKSDB_NAMESPACE::SstFileWriter writer(env, options);
-  auto s = writer.Open(f);
-  ASSERT_OK(s);
-  // ASSERT_OK(writer.Put(Key(), ""));
-  ASSERT_OK(writer.Put(key, value));
-
-  ASSERT_OK(writer.Finish(&info));
-  IngestExternalFileOptions ingest_opt;
-
-  ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt));
-}
-
-TEST_P(DBCompactionTestWithParam,
-       FlushAfterIntraL0CompactionCheckConsistencyFail) {
+TEST_F(DBCompactionTest, RoundRobinCutOutputAtCompactCursor) {
   Options options = CurrentOptions();
-  options.force_consistency_checks = true;
+  options.num_levels = 3;
   options.compression = kNoCompression;
-  options.level0_file_num_compaction_trigger = 5;
-  options.max_background_compactions = 2;
-  options.max_subcompactions = max_subcompactions_;
-  DestroyAndReopen(options);
+  options.write_buffer_size = 4 * 1024;
+  options.max_bytes_for_level_base = 64 * 1024;
+  options.max_bytes_for_level_multiplier = 4;
+  options.level0_file_num_compaction_trigger = 4;
+  options.compaction_pri = CompactionPri::kRoundRobin;
 
-  const size_t kValueSize = 1 << 20;
-  Random rnd(301);
-  std::atomic<int> pick_intra_l0_count(0);
-  std::string value(rnd.RandomString(kValueSize));
+  DestroyAndReopen(options);
 
-  // The L0->L1 must be picked before we begin ingesting files to trigger
-  // intra-L0 compaction, and must not finish until after an intra-L0
-  // compaction has been picked.
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"LevelCompactionPicker::PickCompaction:Return",
-        "DBCompactionTestWithParam::"
-        "FlushAfterIntraL0CompactionCheckConsistencyFail:L0ToL1Ready"},
-       {"LevelCompactionPicker::PickCompactionBySize:0",
-        "CompactionJob::Run():Start"}});
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "FindIntraL0Compaction",
-      [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
+  VersionSet* const versions = dbfull()->GetVersionSet();
+  assert(versions);
 
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+  ASSERT_NE(cfd, nullptr);
 
-  // prevents trivial move
-  for (int i = 0; i < 10; ++i) {
-    ASSERT_OK(Put(Key(i), ""));  // prevents trivial move
-  }
-  ASSERT_OK(Flush());
-  Compact("", Key(99));
-  ASSERT_EQ(0, NumTableFilesAtLevel(0));
+  Version* const current = cfd->current();
+  ASSERT_NE(current, nullptr);
 
-  // Flush 5 L0 sst.
-  for (int i = 0; i < 5; ++i) {
-    ASSERT_OK(Put(Key(i + 1), value));
-    ASSERT_OK(Flush());
-  }
-  ASSERT_EQ(5, NumTableFilesAtLevel(0));
+  VersionStorageInfo* storage_info = current->storage_info();
+  ASSERT_NE(storage_info, nullptr);
 
-  // Put one key, to make smallest log sequence number in this memtable is less
-  // than sst which would be ingested in next step.
-  ASSERT_OK(Put(Key(0), "a"));
+  const InternalKey split_cursor = InternalKey(Key(600), 100, kTypeValue);
+  storage_info->AddCursorForOneLevel(2, split_cursor);
 
-  ASSERT_EQ(5, NumTableFilesAtLevel(0));
-  TEST_SYNC_POINT(
-      "DBCompactionTestWithParam::"
-      "FlushAfterIntraL0CompactionCheckConsistencyFail:L0ToL1Ready");
+  Random rnd(301);
 
-  // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction.
-  for (int i = 5; i < 10; i++) {
-    ASSERT_EQ(i, NumTableFilesAtLevel(0));
-    IngestOneKeyValue(dbfull(), Key(i), value, options);
+  for (int i = 0; i < 50; i++) {
+    for (int j = 0; j < 50; j++) {
+      ASSERT_OK(Put(Key(j * 2 + i * 100), rnd.RandomString(102)));
+    }
+  }
+  // Add more overlapping files (avoid trivial move) to trigger compaction that
+  // output files in L2. Note that trivial move does not trigger compaction and
+  // in that case the cursor is not necessarily the boundary of file.
+  for (int i = 0; i < 50; i++) {
+    for (int j = 0; j < 50; j++) {
+      ASSERT_OK(Put(Key(j * 2 + 1 + i * 100), rnd.RandomString(1014)));
+    }
   }
 
-  // Put one key, to make biggest log sequence number in this memtable is bigger
-  // than sst which would be ingested in next step.
-  ASSERT_OK(Put(Key(2), "b"));
-  dbfull()->TEST_WaitForCompact();
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
   std::vector<std::vector<FileMetaData>> level_to_files;
   dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
                                   &level_to_files);
-  ASSERT_GT(level_to_files[0].size(), 0);
-  ASSERT_GT(pick_intra_l0_count.load(), 0);
-
-  ASSERT_OK(Flush());
+  const auto icmp = cfd->current()->storage_info()->InternalComparator();
+  // Files in level 2 should be split by the cursor
+  for (const auto& file : level_to_files[2]) {
+    ASSERT_TRUE(
+        icmp->Compare(file.smallest.Encode(), split_cursor.Encode()) >= 0 ||
+        icmp->Compare(file.largest.Encode(), split_cursor.Encode()) < 0);
+  }
 }
 
-TEST_P(DBCompactionTestWithParam,
-       IntraL0CompactionAfterFlushCheckConsistencyFail) {
-  Options options = CurrentOptions();
-  options.force_consistency_checks = true;
-  options.compression = kNoCompression;
-  options.level0_file_num_compaction_trigger = 5;
-  options.max_background_compactions = 2;
-  options.max_subcompactions = max_subcompactions_;
-  options.write_buffer_size = 2 << 20;
-  options.max_write_buffer_number = 6;
-  DestroyAndReopen(options);
+class NoopMergeOperator : public MergeOperator {
+ public:
+  NoopMergeOperator() {}
+
+  bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
+                   MergeOperationOutput* merge_out) const override {
+    std::string val("bar");
+    merge_out->new_value = val;
+    return true;
+  }
+
+  const char* Name() const override { return "Noop"; }
+};
+
+TEST_F(DBCompactionTest, PartialManualCompaction) {
+  Options opts = CurrentOptions();
+  opts.num_levels = 3;
+  opts.level0_file_num_compaction_trigger = 10;
+  opts.compression = kNoCompression;
+  opts.merge_operator.reset(new NoopMergeOperator());
+  opts.target_file_size_base = 10240;
+  DestroyAndReopen(opts);
+
+  Random rnd(301);
+  for (auto i = 0; i < 8; ++i) {
+    for (auto j = 0; j < 10; ++j) {
+      ASSERT_OK(Merge("foo", rnd.RandomString(1024)));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  MoveFilesToLevel(2);
+
+  std::string prop;
+  EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
+  uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
+  ASSERT_OK(dbfull()->SetOptions(
+      {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
+
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
+  ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+}
+
+TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
+  // Regression test for bug where manual compaction hangs forever when the DB
+  // is in read-only mode. Verify it now at least returns, despite failing.
+  const int kNumL0Files = 4;
+  std::unique_ptr<FaultInjectionTestEnv> mock_env(
+      new FaultInjectionTestEnv(env_));
+  Options opts = CurrentOptions();
+  opts.disable_auto_compactions = true;
+  opts.env = mock_env.get();
+  DestroyAndReopen(opts);
+
+  Random rnd(301);
+  for (int i = 0; i < kNumL0Files; ++i) {
+    // Make sure files are overlapping in key-range to prevent trivial move.
+    ASSERT_OK(Put("key1", rnd.RandomString(1024)));
+    ASSERT_OK(Put("key2", rnd.RandomString(1024)));
+    ASSERT_OK(Flush());
+  }
+  ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
+
+  // Enter read-only mode by failing a write.
+  mock_env->SetFilesystemActive(false);
+  // Make sure this is outside `CompactRange`'s range so that it doesn't fail
+  // early trying to flush memtable.
+  ASSERT_NOK(Put("key3", rnd.RandomString(1024)));
+
+  // In the bug scenario, the first manual compaction would fail and forget to
+  // unregister itself, causing the second one to hang forever due to conflict
+  // with a non-running compaction.
+  CompactRangeOptions cro;
+  cro.exclusive_manual_compaction = false;
+  Slice begin_key("key1");
+  Slice end_key("key2");
+  ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+  ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+
+  // Close before mock_env destruct.
+  Close();
+}
+
+// ManualCompactionBottomLevelOptimization tests the bottom level manual
+// compaction optimization to skip recompacting files created by Ln-1 to Ln
+// compaction
+TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
+  Options opts = CurrentOptions();
+  opts.num_levels = 3;
+  opts.level0_file_num_compaction_trigger = 5;
+  opts.compression = kNoCompression;
+  opts.merge_operator.reset(new NoopMergeOperator());
+  opts.target_file_size_base = 1024;
+  opts.max_bytes_for_level_multiplier = 2;
+  opts.disable_auto_compactions = true;
+  DestroyAndReopen(opts);
+  ColumnFamilyHandleImpl* cfh =
+      static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
+  ColumnFamilyData* cfd = cfh->cfd();
+  InternalStats* internal_stats_ptr = cfd->internal_stats();
+  ASSERT_NE(internal_stats_ptr, nullptr);
+
+  Random rnd(301);
+  for (auto i = 0; i < 8; ++i) {
+    for (auto j = 0; j < 10; ++j) {
+      ASSERT_OK(
+          Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  MoveFilesToLevel(2);
+
+  for (auto i = 0; i < 8; ++i) {
+    for (auto j = 0; j < 10; ++j) {
+      ASSERT_OK(
+          Put("bar" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+    }
+    ASSERT_OK(Flush());
+  }
+  const std::vector<InternalStats::CompactionStats>& comp_stats =
+      internal_stats_ptr->TEST_GetCompactionStats();
+  int num = comp_stats[2].num_input_files_in_output_level;
+  ASSERT_EQ(num, 0);
+
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
+  ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+
+  const std::vector<InternalStats::CompactionStats>& comp_stats2 =
+      internal_stats_ptr->TEST_GetCompactionStats();
+  num = comp_stats2[2].num_input_files_in_output_level;
+  ASSERT_EQ(num, 0);
+}
+
+TEST_F(DBCompactionTest, ManualCompactionMax) {
+  uint64_t l1_avg_size = 0, l2_avg_size = 0;
+  auto generate_sst_func = [&]() {
+    Random rnd(301);
+    for (auto i = 0; i < 100; i++) {
+      for (auto j = 0; j < 10; j++) {
+        ASSERT_OK(Put(Key(i * 10 + j), rnd.RandomString(1024)));
+      }
+      ASSERT_OK(Flush());
+    }
+    MoveFilesToLevel(2);
+
+    for (auto i = 0; i < 10; i++) {
+      for (auto j = 0; j < 10; j++) {
+        ASSERT_OK(Put(Key(i * 100 + j * 10), rnd.RandomString(1024)));
+      }
+      ASSERT_OK(Flush());
+    }
+    MoveFilesToLevel(1);
+
+    std::vector<std::vector<FileMetaData>> level_to_files;
+    dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
+                                    &level_to_files);
+
+    uint64_t total = 0;
+    for (const auto& file : level_to_files[1]) {
+      total += file.compensated_file_size;
+    }
+    l1_avg_size = total / level_to_files[1].size();
+
+    total = 0;
+    for (const auto& file : level_to_files[2]) {
+      total += file.compensated_file_size;
+    }
+    l2_avg_size = total / level_to_files[2].size();
+  };
+
+  std::atomic_int num_compactions(0);
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BGWorkCompaction", [&](void* /*arg*/) { ++num_compactions; });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  Options opts = CurrentOptions();
+  opts.disable_auto_compactions = true;
+
+  // with default setting (1.6G by default), it should cover all files in 1
+  // compaction
+  DestroyAndReopen(opts);
+  generate_sst_func();
+  num_compactions.store(0);
+  CompactRangeOptions cro;
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  ASSERT_TRUE(num_compactions.load() == 1);
+
+  // split the compaction to 5
+  int num_split = 5;
+  DestroyAndReopen(opts);
+  generate_sst_func();
+  uint64_t total_size = (l1_avg_size * 10) + (l2_avg_size * 100);
+  opts.max_compaction_bytes = total_size / num_split;
+  opts.target_file_size_base = total_size / num_split;
+  Reopen(opts);
+  num_compactions.store(0);
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  ASSERT_TRUE(num_compactions.load() == num_split);
+
+  // very small max_compaction_bytes, it should still move forward
+  opts.max_compaction_bytes = l1_avg_size / 2;
+  opts.target_file_size_base = l1_avg_size / 2;
+  DestroyAndReopen(opts);
+  generate_sst_func();
+  num_compactions.store(0);
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  ASSERT_TRUE(num_compactions.load() > 10);
+
+  // dynamically set the option
+  num_split = 2;
+  opts.max_compaction_bytes = 0;
+  DestroyAndReopen(opts);
+  generate_sst_func();
+  total_size = (l1_avg_size * 10) + (l2_avg_size * 100);
+  Status s = db_->SetOptions(
+      {{"max_compaction_bytes", std::to_string(total_size / num_split)},
+       {"target_file_size_base", std::to_string(total_size / num_split)}});
+  ASSERT_OK(s);
+
+  num_compactions.store(0);
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  ASSERT_TRUE(num_compactions.load() == num_split);
+}
+
+TEST_F(DBCompactionTest, CompactionDuringShutdown) {
+  Options opts = CurrentOptions();
+  opts.level0_file_num_compaction_trigger = 2;
+  opts.disable_auto_compactions = true;
+  DestroyAndReopen(opts);
+  ColumnFamilyHandleImpl* cfh =
+      static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
+  ColumnFamilyData* cfd = cfh->cfd();
+  InternalStats* internal_stats_ptr = cfd->internal_stats();
+  ASSERT_NE(internal_stats_ptr, nullptr);
+
+  Random rnd(301);
+  for (auto i = 0; i < 2; ++i) {
+    for (auto j = 0; j < 10; ++j) {
+      ASSERT_OK(
+          Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
+      [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_TRUE(s.ok() || s.IsShutdownInProgress());
+  ASSERT_OK(dbfull()->error_handler_.GetBGError());
+}
+
+// FixFileIngestionCompactionDeadlock tests and verifies that compaction and
+// file ingestion do not cause deadlock in the event of write stall triggered
+// by number of L0 files reaching level0_stop_writes_trigger.
+TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
+  const int kNumKeysPerFile = 100;
+  // Generate SST files.
+  Options options = CurrentOptions();
+
+  // Generate an external SST file containing a single key, i.e. 99
+  std::string sst_files_dir = dbname_ + "/sst_files/";
+  ASSERT_OK(DestroyDir(env_, sst_files_dir));
+  ASSERT_OK(env_->CreateDir(sst_files_dir));
+  SstFileWriter sst_writer(EnvOptions(), options);
+  const std::string sst_file_path = sst_files_dir + "test.sst";
+  ASSERT_OK(sst_writer.Open(sst_file_path));
+  ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
+  ASSERT_OK(sst_writer.Finish());
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->LoadDependency({
+      {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
+       "BackgroundCallCompaction:0"},
+  });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  options.write_buffer_size = 110 << 10;  // 110KB
+  options.level0_file_num_compaction_trigger =
+      options.level0_stop_writes_trigger;
+  options.max_subcompactions = max_subcompactions_;
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
+  DestroyAndReopen(options);
+  Random rnd(301);
+
+  // Generate level0_stop_writes_trigger L0 files to trigger write stop
+  for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
+    for (int j = 0; j != kNumKeysPerFile; ++j) {
+      ASSERT_OK(Put(Key(j), rnd.RandomString(990)));
+    }
+    if (i > 0) {
+      ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+      ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i);
+    }
+  }
+  // When we reach this point, there will be level0_stop_writes_trigger L0
+  // files and one extra key (99) in memory, which overlaps with the external
+  // SST file. Write stall triggers, and can be cleared only after compaction
+  // reduces the number of L0 files.
+
+  // Compaction will also be triggered since we have reached the threshold for
+  // auto compaction. Note that compaction may begin after the following file
+  // ingestion thread and waits for ingestion to finish.
+
+  // Thread to ingest file with overlapping key range with the current
+  // memtable. Consequently ingestion will trigger a flush. The flush MUST
+  // proceed without waiting for the write stall condition to clear, otherwise
+  // deadlock can happen.
+  port::Thread ingestion_thr([&]() {
+    IngestExternalFileOptions ifo;
+    Status s = db_->IngestExternalFile({sst_file_path}, ifo);
+    ASSERT_OK(s);
+  });
+
+  // More write to trigger write stop
+  ingestion_thr.join();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  Close();
+}
+
+TEST_F(DBCompactionTest, ConsistencyFailTest) {
+  Options options = CurrentOptions();
+  options.force_consistency_checks = true;
+  DestroyAndReopen(options);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "VersionBuilder::CheckConsistency0", [&](void* arg) {
+        auto p =
+            reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
+        // just swap the two FileMetaData so that we hit error
+        // in CheckConsistency funcion
+        FileMetaData* temp = *(p->first);
+        *(p->first) = *(p->second);
+        *(p->second) = temp;
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  for (int k = 0; k < 2; ++k) {
+    ASSERT_OK(Put("foo", "bar"));
+    Status s = Flush();
+    if (k < 1) {
+      ASSERT_OK(s);
+    } else {
+      ASSERT_TRUE(s.IsCorruption());
+    }
+  }
+
+  ASSERT_NOK(Put("foo", "bar"));
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(DBCompactionTest, ConsistencyFailTest2) {
+  Options options = CurrentOptions();
+  options.force_consistency_checks = true;
+  options.target_file_size_base = 1000;
+  options.level0_file_num_compaction_trigger = 2;
+  BlockBasedTableOptions bbto;
+  bbto.block_size = 400;  // small block size
+  options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+  DestroyAndReopen(options);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "VersionBuilder::CheckConsistency1", [&](void* arg) {
+        auto p =
+            reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
+        // just swap the two FileMetaData so that we hit error
+        // in CheckConsistency funcion
+        FileMetaData* temp = *(p->first);
+        *(p->first) = *(p->second);
+        *(p->second) = temp;
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  Random rnd(301);
+  std::string value = rnd.RandomString(1000);
+
+  ASSERT_OK(Put("foo1", value));
+  ASSERT_OK(Put("z", ""));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put("foo2", value));
+  ASSERT_OK(Put("z", ""));
+  Status s = Flush();
+  ASSERT_TRUE(s.ok() || s.IsCorruption());
+
+  // This probably returns non-OK, but we rely on the next Put()
+  // to determine the DB is frozen.
+  ASSERT_NOK(dbfull()->TEST_WaitForCompact());
+  ASSERT_NOK(Put("foo", "bar"));
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+void IngestOneKeyValue(DBImpl* db, const std::string& key,
+                       const std::string& value, const Options& options) {
+  ExternalSstFileInfo info;
+  std::string f = test::PerThreadDBPath("sst_file" + key);
+  EnvOptions env;
+  ROCKSDB_NAMESPACE::SstFileWriter writer(env, options);
+  auto s = writer.Open(f);
+  ASSERT_OK(s);
+  // ASSERT_OK(writer.Put(Key(), ""));
+  ASSERT_OK(writer.Put(key, value));
+
+  ASSERT_OK(writer.Finish(&info));
+  IngestExternalFileOptions ingest_opt;
+
+  ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt));
+}
+
+TEST_P(DBCompactionTestWithParam,
+       FlushAfterIntraL0CompactionCheckConsistencyFail) {
+  Options options = CurrentOptions();
+  options.force_consistency_checks = true;
+  options.compression = kNoCompression;
+  options.level0_file_num_compaction_trigger = 5;
+  options.max_background_compactions = 2;
+  options.max_subcompactions = max_subcompactions_;
+  DestroyAndReopen(options);
+
+  const size_t kValueSize = 1 << 20;
+  Random rnd(301);
+  std::atomic<int> pick_intra_l0_count(0);
+  std::string value(rnd.RandomString(kValueSize));
+
+  // The L0->L1 must be picked before we begin ingesting files to trigger
+  // intra-L0 compaction, and must not finish until after an intra-L0
+  // compaction has been picked.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"LevelCompactionPicker::PickCompaction:Return",
+        "DBCompactionTestWithParam::"
+        "FlushAfterIntraL0CompactionCheckConsistencyFail:L0ToL1Ready"},
+       {"LevelCompactionPicker::PickCompactionBySize:0",
+        "CompactionJob::Run():Start"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "FindIntraL0Compaction",
+      [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // prevents trivial move
+  for (int i = 0; i < 10; ++i) {
+    ASSERT_OK(Put(Key(i), ""));  // prevents trivial move
+  }
+  ASSERT_OK(Flush());
+  Compact("", Key(99));
+  ASSERT_EQ(0, NumTableFilesAtLevel(0));
+
+  // Flush 5 L0 sst.
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_OK(Put(Key(i + 1), value));
+    ASSERT_OK(Flush());
+  }
+  ASSERT_EQ(5, NumTableFilesAtLevel(0));
+
+  // Put one key, to make smallest log sequence number in this memtable is less
+  // than sst which would be ingested in next step.
+  ASSERT_OK(Put(Key(0), "a"));
+
+  ASSERT_EQ(5, NumTableFilesAtLevel(0));
+  TEST_SYNC_POINT(
+      "DBCompactionTestWithParam::"
+      "FlushAfterIntraL0CompactionCheckConsistencyFail:L0ToL1Ready");
+
+  // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction.
+  for (int i = 5; i < 10; i++) {
+    ASSERT_EQ(i, NumTableFilesAtLevel(0));
+    IngestOneKeyValue(dbfull(), Key(i), value, options);
+  }
+
+  // Put one key, to make biggest log sequence number in this memtable is bigger
+  // than sst which would be ingested in next step.
+  ASSERT_OK(Put(Key(2), "b"));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  std::vector<std::vector<FileMetaData>> level_to_files;
+  dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
+                                  &level_to_files);
+  ASSERT_GT(level_to_files[0].size(), 0);
+  ASSERT_GT(pick_intra_l0_count.load(), 0);
+
+  ASSERT_OK(Flush());
+}
+
+TEST_P(DBCompactionTestWithParam,
+       IntraL0CompactionAfterFlushCheckConsistencyFail) {
+  Options options = CurrentOptions();
+  options.force_consistency_checks = true;
+  options.compression = kNoCompression;
+  options.level0_file_num_compaction_trigger = 5;
+  options.max_background_compactions = 2;
+  options.max_subcompactions = max_subcompactions_;
+  options.write_buffer_size = 2 << 20;
+  options.max_write_buffer_number = 6;
+  DestroyAndReopen(options);
+
+  const size_t kValueSize = 1 << 20;
+  Random rnd(301);
+  std::string value(rnd.RandomString(kValueSize));
+  std::string value2(rnd.RandomString(kValueSize));
+  std::string bigvalue = value + value;
+
+  // prevents trivial move
+  for (int i = 0; i < 10; ++i) {
+    ASSERT_OK(Put(Key(i), ""));  // prevents trivial move
+  }
+  ASSERT_OK(Flush());
+  Compact("", Key(99));
+  ASSERT_EQ(0, NumTableFilesAtLevel(0));
+
+  std::atomic<int> pick_intra_l0_count(0);
+  // The L0->L1 must be picked before we begin ingesting files to trigger
+  // intra-L0 compaction, and must not finish until after an intra-L0
+  // compaction has been picked.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"LevelCompactionPicker::PickCompaction:Return",
+        "DBCompactionTestWithParam::"
+        "IntraL0CompactionAfterFlushCheckConsistencyFail:L0ToL1Ready"},
+       {"LevelCompactionPicker::PickCompactionBySize:0",
+        "CompactionJob::Run():Start"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "FindIntraL0Compaction",
+      [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  // Make 6 L0 sst.
+  for (int i = 0; i < 6; ++i) {
+    if (i % 2 == 0) {
+      IngestOneKeyValue(dbfull(), Key(i), value, options);
+    } else {
+      ASSERT_OK(Put(Key(i), value));
+      ASSERT_OK(Flush());
+    }
+  }
+
+  ASSERT_EQ(6, NumTableFilesAtLevel(0));
+
+  // Stop run flush job
+  env_->SetBackgroundThreads(1, Env::HIGH);
+  test::SleepingBackgroundTask sleeping_tasks;
+  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
+                 Env::Priority::HIGH);
+  sleeping_tasks.WaitUntilSleeping();
+
+  // Put many keys to make memtable request to flush
+  for (int i = 0; i < 6; ++i) {
+    ASSERT_OK(Put(Key(i), bigvalue));
+  }
+
+  ASSERT_EQ(6, NumTableFilesAtLevel(0));
+  TEST_SYNC_POINT(
+      "DBCompactionTestWithParam::"
+      "IntraL0CompactionAfterFlushCheckConsistencyFail:L0ToL1Ready");
+  // ingest file to trigger IntraL0Compaction
+  for (int i = 6; i < 10; ++i) {
+    ASSERT_EQ(i, NumTableFilesAtLevel(0));
+    IngestOneKeyValue(dbfull(), Key(i), value2, options);
+  }
+
+  // Wake up flush job
+  sleeping_tasks.WakeUp();
+  sleeping_tasks.WaitUntilDone();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+  uint64_t error_count = 0;
+  db_->GetIntProperty("rocksdb.background-errors", &error_count);
+  ASSERT_EQ(error_count, 0);
+  ASSERT_GT(pick_intra_l0_count.load(), 0);
+  for (int i = 0; i < 6; ++i) {
+    ASSERT_EQ(bigvalue, Get(Key(i)));
+  }
+  for (int i = 6; i < 10; ++i) {
+    ASSERT_EQ(value2, Get(Key(i)));
+  }
+}
+
+TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) {
+  constexpr int kSstNum = 10;
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  DestroyAndReopen(options);
+
+  // Generate some sst files on level 0 with sequence keys (no overlap)
+  for (int i = 0; i < kSstNum; i++) {
+    for (int j = 1; j < UCHAR_MAX; j++) {
+      auto key = std::string(kSstNum, '\0');
+      key[kSstNum - i] += static_cast<char>(j);
+      ASSERT_OK(Put(key, std::string(i % 1000, 'A')));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+  ASSERT_EQ(std::to_string(kSstNum), FilesPerLevel(0));
+
+  auto cro = CompactRangeOptions();
+  cro.bottommost_level_compaction = bottommost_level_compaction_;
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  if (bottommost_level_compaction_ == BottommostLevelCompaction::kForce ||
+      bottommost_level_compaction_ ==
+          BottommostLevelCompaction::kForceOptimized) {
+    // Real compaction to compact all sst files from level 0 to 1 file on level
+    // 1
+    ASSERT_EQ("0,1", FilesPerLevel(0));
+  } else {
+    // Just trivial move from level 0 -> 1
+    ASSERT_EQ("0," + std::to_string(kSstNum), FilesPerLevel(0));
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(
+    DBCompactionTestWithBottommostParam, DBCompactionTestWithBottommostParam,
+    ::testing::Values(BottommostLevelCompaction::kSkip,
+                      BottommostLevelCompaction::kIfHaveCompactionFilter,
+                      BottommostLevelCompaction::kForce,
+                      BottommostLevelCompaction::kForceOptimized));
+
+TEST_F(DBCompactionTest, UpdateLevelSubCompactionTest) {
+  Options options = CurrentOptions();
+  options.max_subcompactions = 10;
+  options.target_file_size_base = 1 << 10;  // 1KB
+  DestroyAndReopen(options);
+
+  bool has_compaction = false;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        ASSERT_TRUE(compaction->max_subcompactions() == 10);
+        has_compaction = true;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 10);
+  // Trigger compaction
+  for (int i = 0; i < 32; i++) {
+    for (int j = 0; j < 5000; j++) {
+      ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
+    }
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_TRUE(has_compaction);
+
+  has_compaction = false;
+  ASSERT_OK(dbfull()->SetDBOptions({{"max_subcompactions", "2"}}));
+  ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 2);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        ASSERT_TRUE(compaction->max_subcompactions() == 2);
+        has_compaction = true;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Trigger compaction
+  for (int i = 0; i < 32; i++) {
+    for (int j = 0; j < 5000; j++) {
+      ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
+    }
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_TRUE(has_compaction);
+}
+
+TEST_F(DBCompactionTest, UpdateUniversalSubCompactionTest) {
+  Options options = CurrentOptions();
+  options.max_subcompactions = 10;
+  options.compaction_style = kCompactionStyleUniversal;
+  options.target_file_size_base = 1 << 10;  // 1KB
+  DestroyAndReopen(options);
+
+  bool has_compaction = false;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        ASSERT_TRUE(compaction->max_subcompactions() == 10);
+        has_compaction = true;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Trigger compaction
+  for (int i = 0; i < 32; i++) {
+    for (int j = 0; j < 5000; j++) {
+      ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
+    }
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_TRUE(has_compaction);
+  has_compaction = false;
+
+  ASSERT_OK(dbfull()->SetDBOptions({{"max_subcompactions", "2"}}));
+  ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 2);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        ASSERT_TRUE(compaction->max_subcompactions() == 2);
+        has_compaction = true;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Trigger compaction
+  for (int i = 0; i < 32; i++) {
+    for (int j = 0; j < 5000; j++) {
+      ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
+    }
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_TRUE(has_compaction);
+}
+
+TEST_P(ChangeLevelConflictsWithAuto, TestConflict) {
+  // A `CompactRange()` may race with an automatic compaction, we'll need
+  // to make sure it doesn't corrupte the data.
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = 2;
+  Reopen(options);
+
+  ASSERT_OK(Put("foo", "v1"));
+  ASSERT_OK(Put("bar", "v1"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+  {
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 2;
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  }
+  ASSERT_EQ("0,0,1", FilesPerLevel(0));
+
+  // Run a qury to refitting to level 1 while another thread writing to
+  // the same level.
+  SyncPoint::GetInstance()->LoadDependency({
+      // The first two dependencies ensure the foreground creates an L0 file
+      // between the background compaction's L0->L1 and its L1->L2.
+      {
+          "DBImpl::CompactRange:BeforeRefit:1",
+          "AutoCompactionFinished1",
+      },
+      {
+          "AutoCompactionFinished2",
+          "DBImpl::CompactRange:BeforeRefit:2",
+      },
+  });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  std::thread auto_comp([&] {
+    TEST_SYNC_POINT("AutoCompactionFinished1");
+    ASSERT_OK(Put("bar", "v2"));
+    ASSERT_OK(Put("foo", "v2"));
+    ASSERT_OK(Flush());
+    ASSERT_OK(Put("bar", "v3"));
+    ASSERT_OK(Put("foo", "v3"));
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    TEST_SYNC_POINT("AutoCompactionFinished2");
+  });
+
+  {
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = GetParam() ? 1 : 0;
+    // This should return non-OK, but it's more important for the test to
+    // make sure that the DB is not corrupted.
+    ASSERT_NOK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  }
+  auto_comp.join();
+  // Refitting didn't happen.
+  SyncPoint::GetInstance()->DisableProcessing();
+
+  // Write something to DB just make sure that consistency check didn't
+  // fail and make the DB readable.
+}
+
+INSTANTIATE_TEST_CASE_P(ChangeLevelConflictsWithAuto,
+                        ChangeLevelConflictsWithAuto, testing::Bool());
+
+TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) {
+  // A `CompactRange()` with `change_level == true` needs to execute its final
+  // step, `ReFitLevel()`, in isolation. Previously there was a bug where
+  // refitting could target the same level as an ongoing manual compaction,
+  // leading to overlapping files in that level.
+  //
+  // This test ensures that case is not possible by verifying any manual
+  // compaction issued during the `ReFitLevel()` phase fails with
+  // `Status::Incomplete`.
+  Options options = CurrentOptions();
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+  options.level0_file_num_compaction_trigger = 2;
+  options.num_levels = 3;
+  Reopen(options);
+
+  // Setup an LSM with three levels populated.
+  Random rnd(301);
+  int key_idx = 0;
+  GenerateNewFile(&rnd, &key_idx);
+  {
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 2;
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  }
+  ASSERT_EQ("0,0,2", FilesPerLevel(0));
+
+  GenerateNewFile(&rnd, &key_idx);
+  GenerateNewFile(&rnd, &key_idx);
+  ASSERT_EQ("1,1,2", FilesPerLevel(0));
+
+  // The background thread will refit L2->L1 while the
+  // foreground thread will try to simultaneously compact L0->L1.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+      // The first two dependencies ensure the foreground creates an L0 file
+      // between the background compaction's L0->L1 and its L1->L2.
+      {
+          "DBImpl::RunManualCompaction()::1",
+          "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+          "PutFG",
+      },
+      {
+          "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+          "FlushedFG",
+          "DBImpl::RunManualCompaction()::2",
+      },
+      // The next two dependencies ensure the foreground invokes
+      // `CompactRange()` while the background is refitting. The
+      // foreground's `CompactRange()` is guaranteed to attempt an L0->L1
+      // as we set it up with an empty memtable and a new L0 file.
+      {
+          "DBImpl::CompactRange:PreRefitLevel",
+          "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+          "CompactFG",
+      },
+      {
+          "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+          "CompactedFG",
+          "DBImpl::CompactRange:PostRefitLevel",
+      },
+  });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] {
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 1;
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  });
+
+  TEST_SYNC_POINT(
+      "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:PutFG");
+  // Make sure we have something new to compact in the foreground.
+  // Note key 1 is carefully chosen as it ensures the file we create here
+  // overlaps with one of the files being refitted L2->L1 in the background.
+  // If we chose key 0, the file created here would not overlap.
+  ASSERT_OK(Put(Key(1), "val"));
+  ASSERT_OK(Flush());
+  TEST_SYNC_POINT(
+      "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:FlushedFG");
+
+  TEST_SYNC_POINT(
+      "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:CompactFG");
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+                  .IsIncomplete());
+  TEST_SYNC_POINT(
+      "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+      "CompactedFG");
+  refit_level_thread.join();
+}
+
+TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) {
+  // This test is added to ensure that RefitLevel() error paths are clearing
+  // internal flags and to test that subsequent valid RefitLevel() calls
+  // succeeds
+  Options options = CurrentOptions();
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+  options.level0_file_num_compaction_trigger = 2;
+  options.num_levels = 3;
+  Reopen(options);
+
+  ASSERT_EQ("", FilesPerLevel(0));
+
+  // Setup an LSM with three levels populated.
+  Random rnd(301);
+  int key_idx = 0;
+  GenerateNewFile(&rnd, &key_idx);
+  ASSERT_EQ("1", FilesPerLevel(0));
+  {
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 2;
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  }
+  ASSERT_EQ("0,0,2", FilesPerLevel(0));
+
+  auto start_idx = key_idx;
+  GenerateNewFile(&rnd, &key_idx);
+  GenerateNewFile(&rnd, &key_idx);
+  auto end_idx = key_idx - 1;
+  ASSERT_EQ("1,1,2", FilesPerLevel(0));
+
+  // Next two CompactRange() calls are used to test exercise error paths within
+  // RefitLevel() before triggering a valid RefitLevel() call
+
+  // Trigger a refit to L1 first
+  {
+    std::string begin_string = Key(start_idx);
+    std::string end_string = Key(end_idx);
+    Slice begin(begin_string);
+    Slice end(end_string);
+
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 1;
+    ASSERT_OK(dbfull()->CompactRange(cro, &begin, &end));
+  }
+  ASSERT_EQ("0,3,2", FilesPerLevel(0));
+
+  // Try a refit from L2->L1 - this should fail and exercise error paths in
+  // RefitLevel()
+  {
+    // Select key range that matches the bottom most level (L2)
+    std::string begin_string = Key(0);
+    std::string end_string = Key(start_idx - 1);
+    Slice begin(begin_string);
+    Slice end(end_string);
+
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 1;
+    ASSERT_NOK(dbfull()->CompactRange(cro, &begin, &end));
+  }
+  ASSERT_EQ("0,3,2", FilesPerLevel(0));
+
+  // Try a valid Refit request to ensure, the path is still working
+  {
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 1;
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  }
+  ASSERT_EQ("0,5", FilesPerLevel(0));
+}
+
+TEST_F(DBCompactionTest, CompactionWithBlob) {
+  Options options;
+  options.env = env_;
+  options.disable_auto_compactions = true;
+
+  Reopen(options);
+
+  constexpr char first_key[] = "first_key";
+  constexpr char second_key[] = "second_key";
+  constexpr char first_value[] = "first_value";
+  constexpr char second_value[] = "second_value";
+  constexpr char third_value[] = "third_value";
+
+  ASSERT_OK(Put(first_key, first_value));
+  ASSERT_OK(Put(second_key, first_value));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put(first_key, second_value));
+  ASSERT_OK(Put(second_key, second_value));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put(first_key, third_value));
+  ASSERT_OK(Put(second_key, third_value));
+  ASSERT_OK(Flush());
+
+  options.enable_blob_files = true;
+
+  Reopen(options);
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+  ASSERT_EQ(Get(first_key), third_value);
+  ASSERT_EQ(Get(second_key), third_value);
+
+  VersionSet* const versions = dbfull()->GetVersionSet();
+  assert(versions);
+
+  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+  ASSERT_NE(cfd, nullptr);
+
+  Version* const current = cfd->current();
+  ASSERT_NE(current, nullptr);
+
+  const VersionStorageInfo* const storage_info = current->storage_info();
+  ASSERT_NE(storage_info, nullptr);
+
+  const auto& l1_files = storage_info->LevelFiles(1);
+  ASSERT_EQ(l1_files.size(), 1);
+
+  const FileMetaData* const table_file = l1_files[0];
+  ASSERT_NE(table_file, nullptr);
+
+  const auto& blob_files = storage_info->GetBlobFiles();
+  ASSERT_EQ(blob_files.size(), 1);
+
+  const auto& blob_file = blob_files.front();
+  ASSERT_NE(blob_file, nullptr);
+
+  ASSERT_EQ(table_file->smallest.user_key(), first_key);
+  ASSERT_EQ(table_file->largest.user_key(), second_key);
+  ASSERT_EQ(table_file->oldest_blob_file_number,
+            blob_file->GetBlobFileNumber());
+
+  ASSERT_EQ(blob_file->GetTotalBlobCount(), 2);
+
+  const InternalStats* const internal_stats = cfd->internal_stats();
+  ASSERT_NE(internal_stats, nullptr);
+
+  const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+  ASSERT_GE(compaction_stats.size(), 2);
+  ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
+  ASSERT_EQ(compaction_stats[1].bytes_written, table_file->fd.GetFileSize());
+  ASSERT_EQ(compaction_stats[1].bytes_written_blob,
+            blob_file->GetTotalBlobBytes());
+  ASSERT_EQ(compaction_stats[1].num_output_files, 1);
+  ASSERT_EQ(compaction_stats[1].num_output_files_blob, 1);
+}
+
+class DBCompactionTestBlobError
+    : public DBCompactionTest,
+      public testing::WithParamInterface<std::string> {
+ public:
+  DBCompactionTestBlobError() : sync_point_(GetParam()) {}
+
+  std::string sync_point_;
+};
+
+INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobError, DBCompactionTestBlobError,
+                        ::testing::ValuesIn(std::vector<std::string>{
+                            "BlobFileBuilder::WriteBlobToFile:AddRecord",
+                            "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
+
+TEST_P(DBCompactionTestBlobError, CompactionError) {
+  Options options;
+  options.disable_auto_compactions = true;
+  options.env = env_;
+
+  Reopen(options);
+
+  constexpr char first_key[] = "first_key";
+  constexpr char second_key[] = "second_key";
+  constexpr char first_value[] = "first_value";
+  constexpr char second_value[] = "second_value";
+  constexpr char third_value[] = "third_value";
+
+  ASSERT_OK(Put(first_key, first_value));
+  ASSERT_OK(Put(second_key, first_value));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put(first_key, second_value));
+  ASSERT_OK(Put(second_key, second_value));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put(first_key, third_value));
+  ASSERT_OK(Put(second_key, third_value));
+  ASSERT_OK(Flush());
+
+  options.enable_blob_files = true;
+
+  Reopen(options);
+
+  SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
+    Status* const s = static_cast<Status*>(arg);
+    assert(s);
+
+    (*s) = Status::IOError(sync_point_);
+  });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), begin, end).IsIOError());
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+
+  VersionSet* const versions = dbfull()->GetVersionSet();
+  assert(versions);
+
+  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+  ASSERT_NE(cfd, nullptr);
+
+  Version* const current = cfd->current();
+  ASSERT_NE(current, nullptr);
+
+  const VersionStorageInfo* const storage_info = current->storage_info();
+  ASSERT_NE(storage_info, nullptr);
+
+  const auto& l1_files = storage_info->LevelFiles(1);
+  ASSERT_TRUE(l1_files.empty());
+
+  const auto& blob_files = storage_info->GetBlobFiles();
+  ASSERT_TRUE(blob_files.empty());
+
+  const InternalStats* const internal_stats = cfd->internal_stats();
+  ASSERT_NE(internal_stats, nullptr);
+
+  const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+  ASSERT_GE(compaction_stats.size(), 2);
+
+  if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
+    ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
+    ASSERT_EQ(compaction_stats[1].bytes_written, 0);
+    ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+    ASSERT_EQ(compaction_stats[1].num_output_files, 0);
+    ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0);
+  } else {
+    // SST file writing succeeded; blob file writing failed (during Finish)
+    ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
+    ASSERT_GT(compaction_stats[1].bytes_written, 0);
+    ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+    ASSERT_EQ(compaction_stats[1].num_output_files, 1);
+    ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0);
+  }
+}
+
+class DBCompactionTestBlobGC
+    : public DBCompactionTest,
+      public testing::WithParamInterface<std::tuple<double, bool>> {
+ public:
+  DBCompactionTestBlobGC()
+      : blob_gc_age_cutoff_(std::get<0>(GetParam())),
+        updated_enable_blob_files_(std::get<1>(GetParam())) {}
+
+  double blob_gc_age_cutoff_;
+  bool updated_enable_blob_files_;
+};
+
+INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobGC, DBCompactionTestBlobGC,
+                        ::testing::Combine(::testing::Values(0.0, 0.5, 1.0),
+                                           ::testing::Bool()));
+
+TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGCOverrides) {
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  options.enable_blob_files = true;
+  options.blob_file_size = 32;  // one blob per file
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0;
+
+  DestroyAndReopen(options);
+
+  for (int i = 0; i < 128; i += 2) {
+    ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i)));
+    ASSERT_OK(
+        Put("key" + std::to_string(i + 1), "value" + std::to_string(i + 1)));
+    ASSERT_OK(Flush());
+  }
+
+  std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
+  ASSERT_EQ(original_blob_files.size(), 128);
+
+  // Note: turning off enable_blob_files before the compaction results in
+  // garbage collected values getting inlined.
+  ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
+
+  CompactRangeOptions cro;
+  cro.blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kForce;
+  cro.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_;
+
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+
+  // Check that the GC stats are correct
+  {
+    VersionSet* const versions = dbfull()->GetVersionSet();
+    assert(versions);
+    assert(versions->GetColumnFamilySet());
+
+    ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+    assert(cfd);
+
+    const InternalStats* const internal_stats = cfd->internal_stats();
+    assert(internal_stats);
+
+    const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+    ASSERT_GE(compaction_stats.size(), 2);
+
+    ASSERT_GE(compaction_stats[1].bytes_read_blob, 0);
+    ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+  }
+
+  const size_t cutoff_index = static_cast<size_t>(
+      cro.blob_garbage_collection_age_cutoff * original_blob_files.size());
+  const size_t expected_num_files = original_blob_files.size() - cutoff_index;
+
+  const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
+
+  ASSERT_EQ(new_blob_files.size(), expected_num_files);
+
+  // Original blob files below the cutoff should be gone, original blob files
+  // at or above the cutoff should be still there
+  for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
+    ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
+  }
+
+  for (size_t i = 0; i < 128; ++i) {
+    ASSERT_EQ(Get("key" + std::to_string(i)), "value" + std::to_string(i));
+  }
+}
+
+TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) {
+  Options options;
+  options.env = env_;
+  options.disable_auto_compactions = true;
+  options.enable_blob_files = true;
+  options.blob_file_size = 32;  // one blob per file
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_;
+
+  Reopen(options);
+
+  constexpr char first_key[] = "first_key";
+  constexpr char first_value[] = "first_value";
+  constexpr char second_key[] = "second_key";
+  constexpr char second_value[] = "second_value";
+
+  ASSERT_OK(Put(first_key, first_value));
+  ASSERT_OK(Put(second_key, second_value));
+  ASSERT_OK(Flush());
+
+  constexpr char third_key[] = "third_key";
+  constexpr char third_value[] = "third_value";
+  constexpr char fourth_key[] = "fourth_key";
+  constexpr char fourth_value[] = "fourth_value";
+
+  ASSERT_OK(Put(third_key, third_value));
+  ASSERT_OK(Put(fourth_key, fourth_value));
+  ASSERT_OK(Flush());
+
+  const std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
+
+  ASSERT_EQ(original_blob_files.size(), 4);
+
+  const size_t cutoff_index = static_cast<size_t>(
+      options.blob_garbage_collection_age_cutoff * original_blob_files.size());
+
+  // Note: turning off enable_blob_files before the compaction results in
+  // garbage collected values getting inlined.
+  size_t expected_number_of_files = original_blob_files.size();
+
+  if (!updated_enable_blob_files_) {
+    ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
+
+    expected_number_of_files -= cutoff_index;
+  }
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+  ASSERT_EQ(Get(first_key), first_value);
+  ASSERT_EQ(Get(second_key), second_value);
+  ASSERT_EQ(Get(third_key), third_value);
+  ASSERT_EQ(Get(fourth_key), fourth_value);
+
+  const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
+
+  ASSERT_EQ(new_blob_files.size(), expected_number_of_files);
+
+  // Original blob files below the cutoff should be gone, original blob files at
+  // or above the cutoff should be still there
+  for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
+    ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
+  }
+
+  VersionSet* const versions = dbfull()->GetVersionSet();
+  assert(versions);
+  assert(versions->GetColumnFamilySet());
+
+  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+  assert(cfd);
+
+  const InternalStats* const internal_stats = cfd->internal_stats();
+  assert(internal_stats);
+
+  const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+  ASSERT_GE(compaction_stats.size(), 2);
+
+  if (blob_gc_age_cutoff_ > 0.0) {
+    ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
+
+    if (updated_enable_blob_files_) {
+      // GC relocated some blobs to new blob files
+      ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
+      ASSERT_EQ(compaction_stats[1].bytes_read_blob,
+                compaction_stats[1].bytes_written_blob);
+    } else {
+      // GC moved some blobs back to the LSM, no new blob files
+      ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+    }
+  } else {
+    ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
+    ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+  }
+}
+
+TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) {
+  Options options;
+  options.env = env_;
+  options.disable_auto_compactions = true;
+  options.enable_blob_files = true;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 1.0;
+
+  Reopen(options);
+
+  constexpr char first_key[] = "first_key";
+  constexpr char first_value[] = "first_value";
+  ASSERT_OK(Put(first_key, first_value));
+
+  constexpr char second_key[] = "second_key";
+  constexpr char second_value[] = "second_value";
+  ASSERT_OK(Put(second_key, second_value));
+
+  ASSERT_OK(Flush());
+
+  constexpr char third_key[] = "third_key";
+  constexpr char third_value[] = "third_value";
+  ASSERT_OK(Put(third_key, third_value));
+
+  constexpr char fourth_key[] = "fourth_key";
+  constexpr char fourth_value[] = "fourth_value";
+  ASSERT_OK(Put(fourth_key, fourth_value));
+
+  ASSERT_OK(Flush());
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
+      [](void* arg) {
+        Slice* const blob_index = static_cast<Slice*>(arg);
+        assert(blob_index);
+        assert(!blob_index->empty());
+        blob_index->remove_prefix(1);
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  ASSERT_TRUE(
+      db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(DBCompactionTest, CompactionWithBlobGCError_InlinedTTLIndex) {
+  constexpr uint64_t min_blob_size = 10;
+
+  Options options;
+  options.env = env_;
+  options.disable_auto_compactions = true;
+  options.enable_blob_files = true;
+  options.min_blob_size = min_blob_size;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 1.0;
+
+  Reopen(options);
 
-  const size_t kValueSize = 1 << 20;
-  Random rnd(301);
-  std::string value(rnd.RandomString(kValueSize));
-  std::string value2(rnd.RandomString(kValueSize));
-  std::string bigvalue = value + value;
+  constexpr char first_key[] = "first_key";
+  constexpr char first_value[] = "first_value";
+  ASSERT_OK(Put(first_key, first_value));
+
+  constexpr char second_key[] = "second_key";
+  constexpr char second_value[] = "second_value";
+  ASSERT_OK(Put(second_key, second_value));
 
-  // prevents trivial move
-  for (int i = 0; i < 10; ++i) {
-    ASSERT_OK(Put(Key(i), ""));  // prevents trivial move
-  }
   ASSERT_OK(Flush());
-  Compact("", Key(99));
-  ASSERT_EQ(0, NumTableFilesAtLevel(0));
 
-  std::atomic<int> pick_intra_l0_count(0);
-  // The L0->L1 must be picked before we begin ingesting files to trigger
-  // intra-L0 compaction, and must not finish until after an intra-L0
-  // compaction has been picked.
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"LevelCompactionPicker::PickCompaction:Return",
-        "DBCompactionTestWithParam::"
-        "IntraL0CompactionAfterFlushCheckConsistencyFail:L0ToL1Ready"},
-       {"LevelCompactionPicker::PickCompactionBySize:0",
-        "CompactionJob::Run():Start"}});
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "FindIntraL0Compaction",
-      [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-  // Make 6 L0 sst.
-  for (int i = 0; i < 6; ++i) {
-    if (i % 2 == 0) {
-      IngestOneKeyValue(dbfull(), Key(i), value, options);
-    } else {
-      ASSERT_OK(Put(Key(i), value));
-      ASSERT_OK(Flush());
-    }
-  }
+  constexpr char third_key[] = "third_key";
+  constexpr char third_value[] = "third_value";
+  ASSERT_OK(Put(third_key, third_value));
 
-  ASSERT_EQ(6, NumTableFilesAtLevel(0));
+  constexpr char fourth_key[] = "fourth_key";
+  constexpr char blob[] = "short";
+  static_assert(sizeof(short) - 1 < min_blob_size,
+                "Blob too long to be inlined");
 
-  // Stop run flush job
-  env_->SetBackgroundThreads(1, Env::HIGH);
-  test::SleepingBackgroundTask sleeping_tasks;
-  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
-                 Env::Priority::HIGH);
-  sleeping_tasks.WaitUntilSleeping();
+  // Fake an inlined TTL blob index.
+  std::string blob_index;
 
-  // Put many keys to make memtable request to flush
-  for (int i = 0; i < 6; ++i) {
-    ASSERT_OK(Put(Key(i), bigvalue));
-  }
+  constexpr uint64_t expiration = 1234567890;
 
-  ASSERT_EQ(6, NumTableFilesAtLevel(0));
-  TEST_SYNC_POINT(
-      "DBCompactionTestWithParam::"
-      "IntraL0CompactionAfterFlushCheckConsistencyFail:L0ToL1Ready");
-  // ingest file to trigger IntraL0Compaction
-  for (int i = 6; i < 10; ++i) {
-    ASSERT_EQ(i, NumTableFilesAtLevel(0));
-    IngestOneKeyValue(dbfull(), Key(i), value2, options);
-  }
+  BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
 
-  // Wake up flush job
-  sleeping_tasks.WakeUp();
-  sleeping_tasks.WaitUntilDone();
-  dbfull()->TEST_WaitForCompact();
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  WriteBatch batch;
+  ASSERT_OK(
+      WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, blob_index));
+  ASSERT_OK(db_->Write(WriteOptions(), &batch));
 
-  uint64_t error_count = 0;
-  db_->GetIntProperty("rocksdb.background-errors", &error_count);
-  ASSERT_EQ(error_count, 0);
-  ASSERT_GT(pick_intra_l0_count.load(), 0);
-  for (int i = 0; i < 6; ++i) {
-    ASSERT_EQ(bigvalue, Get(Key(i)));
-  }
-  for (int i = 6; i < 10; ++i) {
-    ASSERT_EQ(value2, Get(Key(i)));
-  }
+  ASSERT_OK(Flush());
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  ASSERT_TRUE(
+      db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
 }
 
-TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) {
-  constexpr int kSstNum = 10;
-  Options options = CurrentOptions();
+TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) {
+  Options options;
+  options.env = env_;
   options.disable_auto_compactions = true;
-  DestroyAndReopen(options);
+  options.enable_blob_files = true;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 1.0;
 
-  // Generate some sst files on level 0 with sequence keys (no overlap)
-  for (int i = 0; i < kSstNum; i++) {
-    for (int j = 1; j < UCHAR_MAX; j++) {
-      auto key = std::string(kSstNum, '\0');
-      key[kSstNum - i] += static_cast<char>(j);
-      Put(key, std::string(i % 1000, 'A'));
-    }
-    ASSERT_OK(Flush());
-  }
-  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  Reopen(options);
 
-  ASSERT_EQ(ToString(kSstNum), FilesPerLevel(0));
+  constexpr char first_key[] = "first_key";
+  constexpr char first_value[] = "first_value";
+  ASSERT_OK(Put(first_key, first_value));
 
-  auto cro = CompactRangeOptions();
-  cro.bottommost_level_compaction = bottommost_level_compaction_;
-  db_->CompactRange(cro, nullptr, nullptr);
-  if (bottommost_level_compaction_ == BottommostLevelCompaction::kForce ||
-      bottommost_level_compaction_ ==
-          BottommostLevelCompaction::kForceOptimized) {
-    // Real compaction to compact all sst files from level 0 to 1 file on level
-    // 1
-    ASSERT_EQ("0,1", FilesPerLevel(0));
-  } else {
-    // Just trivial move from level 0 -> 1
-    ASSERT_EQ("0," + ToString(kSstNum), FilesPerLevel(0));
-  }
-}
+  constexpr char second_key[] = "second_key";
+  constexpr char second_value[] = "second_value";
+  ASSERT_OK(Put(second_key, second_value));
 
-INSTANTIATE_TEST_CASE_P(
-    DBCompactionTestWithBottommostParam, DBCompactionTestWithBottommostParam,
-    ::testing::Values(BottommostLevelCompaction::kSkip,
-                      BottommostLevelCompaction::kIfHaveCompactionFilter,
-                      BottommostLevelCompaction::kForce,
-                      BottommostLevelCompaction::kForceOptimized));
+  ASSERT_OK(Flush());
 
-TEST_F(DBCompactionTest, UpdateLevelSubCompactionTest) {
+  constexpr char third_key[] = "third_key";
+  constexpr char third_value[] = "third_value";
+  ASSERT_OK(Put(third_key, third_value));
+
+  constexpr char fourth_key[] = "fourth_key";
+
+  // Fake a blob index referencing a non-existent blob file.
+  std::string blob_index;
+
+  constexpr uint64_t blob_file_number = 1000;
+  constexpr uint64_t offset = 1234;
+  constexpr uint64_t size = 5678;
+
+  BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
+                        kNoCompression);
+
+  WriteBatch batch;
+  ASSERT_OK(
+      WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, blob_index));
+  ASSERT_OK(db_->Write(WriteOptions(), &batch));
+
+  ASSERT_OK(Flush());
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  ASSERT_TRUE(
+      db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
+}
+
+TEST_F(DBCompactionTest, CompactionWithChecksumHandoff1) {
+  if (mem_env_ || encrypted_env_) {
+    ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+    return;
+  }
+  std::shared_ptr<FaultInjectionTestFS> fault_fs(
+      new FaultInjectionTestFS(FileSystem::Default()));
+  std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
   Options options = CurrentOptions();
-  options.max_subcompactions = 10;
-  options.target_file_size_base = 1 << 10;  // 1KB
-  DestroyAndReopen(options);
+  options.level0_file_num_compaction_trigger = 2;
+  options.num_levels = 3;
+  options.env = fault_fs_env.get();
+  options.create_if_missing = true;
+  options.checksum_handoff_file_types.Add(FileType::kTableFile);
+  Status s;
+  Reopen(options);
 
-  bool has_compaction = false;
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s, Status::OK());
+  Destroy(options);
+  Reopen(options);
+
+  // The hash does not match, compaction write fails
+  // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+  // Since the file system returns IOStatus::Corruption, it is an
+  // unrecoverable error.
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+        "BackgroundCallCompaction:0"}});
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
-        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
-        ASSERT_TRUE(compaction->max_subcompactions() == 10);
-        has_compaction = true;
+      "BackgroundCallCompaction:0", [&](void*) {
+        fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s.severity(),
+            ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  Destroy(options);
+  Reopen(options);
+
+  // The file system does not support checksum handoff. The check
+  // will be ignored.
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s, Status::OK());
+
+  // Each write will be similated as corrupted.
+  // Since the file system returns IOStatus::Corruption, it is an
+  // unrecoverable error.
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+        "BackgroundCallCompaction:0"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "BackgroundCallCompaction:0",
+      [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s.severity(),
+            ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
+  SyncPoint::GetInstance()->DisableProcessing();
 
-  ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 10);
-  // Trigger compaction
-  for (int i = 0; i < 32; i++) {
-    for (int j = 0; j < 5000; j++) {
-      Put(std::to_string(j), std::string(1, 'A'));
-    }
-    ASSERT_OK(Flush());
-    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  Destroy(options);
+}
+
+TEST_F(DBCompactionTest, CompactionWithChecksumHandoff2) {
+  if (mem_env_ || encrypted_env_) {
+    ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+    return;
   }
-  dbfull()->TEST_WaitForCompact();
-  ASSERT_TRUE(has_compaction);
+  std::shared_ptr<FaultInjectionTestFS> fault_fs(
+      new FaultInjectionTestFS(FileSystem::Default()));
+  std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = 2;
+  options.num_levels = 3;
+  options.env = fault_fs_env.get();
+  options.create_if_missing = true;
+  Status s;
+  Reopen(options);
 
-  has_compaction = false;
-  ASSERT_OK(dbfull()->SetDBOptions({{"max_subcompactions", "2"}}));
-  ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 2);
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s, Status::OK());
+  Destroy(options);
+  Reopen(options);
 
+  // options is not set, the checksum handoff will not be triggered
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+        "BackgroundCallCompaction:0"}});
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
-        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
-        ASSERT_TRUE(compaction->max_subcompactions() == 2);
-        has_compaction = true;
+      "BackgroundCallCompaction:0", [&](void*) {
+        fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s, Status::OK());
+  SyncPoint::GetInstance()->DisableProcessing();
+  Destroy(options);
+  Reopen(options);
 
-  // Trigger compaction
-  for (int i = 0; i < 32; i++) {
-    for (int j = 0; j < 5000; j++) {
-      Put(std::to_string(j), std::string(1, 'A'));
-    }
-    ASSERT_OK(Flush());
-    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
-  }
-  dbfull()->TEST_WaitForCompact();
-  ASSERT_TRUE(has_compaction);
+  // The file system does not support checksum handoff. The check
+  // will be ignored.
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s, Status::OK());
+
+  // options is not set, the checksum handoff will not be triggered
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+        "BackgroundCallCompaction:0"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "BackgroundCallCompaction:0",
+      [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s, Status::OK());
+
+  Destroy(options);
 }
 
-TEST_F(DBCompactionTest, UpdateUniversalSubCompactionTest) {
+TEST_F(DBCompactionTest, CompactionWithChecksumHandoffManifest1) {
+  if (mem_env_ || encrypted_env_) {
+    ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+    return;
+  }
+  std::shared_ptr<FaultInjectionTestFS> fault_fs(
+      new FaultInjectionTestFS(FileSystem::Default()));
+  std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
   Options options = CurrentOptions();
-  options.max_subcompactions = 10;
-  options.compaction_style = kCompactionStyleUniversal;
-  options.target_file_size_base = 1 << 10;  // 1KB
-  DestroyAndReopen(options);
+  options.level0_file_num_compaction_trigger = 2;
+  options.num_levels = 3;
+  options.env = fault_fs_env.get();
+  options.create_if_missing = true;
+  options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
+  Status s;
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  Reopen(options);
 
-  bool has_compaction = false;
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s, Status::OK());
+  Destroy(options);
+  Reopen(options);
+
+  // The hash does not match, compaction write fails
+  // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+  // Since the file system returns IOStatus::Corruption, it is mapped to
+  // kFatalError error.
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+        "BackgroundCallCompaction:0"}});
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
-        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
-        ASSERT_TRUE(compaction->max_subcompactions() == 10);
-        has_compaction = true;
+      "BackgroundCallCompaction:0", [&](void*) {
+        fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  Destroy(options);
+}
 
-  // Trigger compaction
-  for (int i = 0; i < 32; i++) {
-    for (int j = 0; j < 5000; j++) {
-      Put(std::to_string(j), std::string(1, 'A'));
-    }
-    ASSERT_OK(Flush());
-    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+TEST_F(DBCompactionTest, CompactionWithChecksumHandoffManifest2) {
+  if (mem_env_ || encrypted_env_) {
+    ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+    return;
   }
-  dbfull()->TEST_WaitForCompact();
-  ASSERT_TRUE(has_compaction);
-  has_compaction = false;
+  std::shared_ptr<FaultInjectionTestFS> fault_fs(
+      new FaultInjectionTestFS(FileSystem::Default()));
+  std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = 2;
+  options.num_levels = 3;
+  options.env = fault_fs_env.get();
+  options.create_if_missing = true;
+  options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
+  Status s;
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+  Reopen(options);
+
+  // The file system does not support checksum handoff. The check
+  // will be ignored.
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s, Status::OK());
+
+  // Each write will be similated as corrupted.
+  // Since the file system returns IOStatus::Corruption, it is mapped to
+  // kFatalError error.
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  ASSERT_OK(Put(Key(0), "value1"));
+  ASSERT_OK(Put(Key(2), "value2"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+        "BackgroundCallCompaction:0"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "BackgroundCallCompaction:0",
+      [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(Put(Key(1), "value3"));
+  s = Flush();
+  ASSERT_EQ(s, Status::OK());
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
+  SyncPoint::GetInstance()->DisableProcessing();
 
-  ASSERT_OK(dbfull()->SetDBOptions({{"max_subcompactions", "2"}}));
-  ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 2);
+  Destroy(options);
+}
+
+TEST_F(DBCompactionTest, FIFOWarm) {
+  Options options = CurrentOptions();
+  options.compaction_style = kCompactionStyleFIFO;
+  options.num_levels = 1;
+  options.max_open_files = -1;
+  options.level0_file_num_compaction_trigger = 2;
+  options.create_if_missing = true;
+  CompactionOptionsFIFO fifo_options;
+  fifo_options.age_for_warm = 1000;
+  fifo_options.max_table_files_size = 100000000;
+  options.compaction_options_fifo = fifo_options;
+  env_->SetMockSleep();
+  Reopen(options);
 
+  int total_warm = 0;
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
-        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
-        ASSERT_TRUE(compaction->max_subcompactions() == 2);
-        has_compaction = true;
+      "NewWritableFile::FileOptions.temperature", [&](void* arg) {
+        Temperature temperature = *(static_cast<Temperature*>(arg));
+        if (temperature == Temperature::kWarm) {
+          total_warm++;
+        }
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  // Trigger compaction
-  for (int i = 0; i < 32; i++) {
-    for (int j = 0; j < 5000; j++) {
-      Put(std::to_string(j), std::string(1, 'A'));
-    }
-    ASSERT_OK(Flush());
-    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
-  }
-  dbfull()->TEST_WaitForCompact();
-  ASSERT_TRUE(has_compaction);
+  // The file system does not support checksum handoff. The check
+  // will be ignored.
+  ASSERT_OK(Put(Key(0), "value1"));
+  env_->MockSleepForSeconds(800);
+  ASSERT_OK(Put(Key(2), "value2"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put(Key(0), "value1"));
+  env_->MockSleepForSeconds(800);
+  ASSERT_OK(Put(Key(2), "value2"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put(Key(0), "value1"));
+  env_->MockSleepForSeconds(800);
+  ASSERT_OK(Put(Key(2), "value2"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  ASSERT_OK(Put(Key(0), "value1"));
+  env_->MockSleepForSeconds(800);
+  ASSERT_OK(Put(Key(2), "value2"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+  ColumnFamilyMetaData metadata;
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(4, metadata.file_count);
+  ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
+  ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[1].temperature);
+  ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[2].temperature);
+  ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[3].temperature);
+  ASSERT_EQ(2, total_warm);
+
+  Destroy(options);
 }
 
-TEST_P(ChangeLevelConflictsWithAuto, TestConflict) {
-  // A `CompactRange()` may race with an automatic compaction, we'll need
-  // to make sure it doesn't corrupte the data.
+TEST_F(DBCompactionTest, DisableMultiManualCompaction) {
+  const int kNumL0Files = 10;
+
   Options options = CurrentOptions();
-  options.level0_file_num_compaction_trigger = 2;
+  options.level0_file_num_compaction_trigger = kNumL0Files;
   Reopen(options);
 
-  ASSERT_OK(Put("foo", "v1"));
-  ASSERT_OK(Put("bar", "v1"));
-  ASSERT_OK(Flush());
-  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  // Generate 2 levels of file to make sure the manual compaction is not skipped
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(Put(Key(i), "value"));
+    if (i % 2) {
+      ASSERT_OK(Flush());
+    }
+  }
+  MoveFilesToLevel(2);
 
-  {
-    CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = 2;
-    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(Put(Key(i), "value"));
+    if (i % 2) {
+      ASSERT_OK(Flush());
+    }
   }
-  ASSERT_EQ("0,0,1", FilesPerLevel(0));
+  MoveFilesToLevel(1);
 
-  // Run a qury to refitting to level 1 while another thread writing to
-  // the same level.
-  SyncPoint::GetInstance()->LoadDependency({
-      // The first two dependencies ensure the foreground creates an L0 file
-      // between the background compaction's L0->L1 and its L1->L2.
-      {
-          "DBImpl::CompactRange:BeforeRefit:1",
-          "AutoCompactionFinished1",
-      },
-      {
-          "AutoCompactionFinished2",
-          "DBImpl::CompactRange:BeforeRefit:2",
-      },
-  });
-  SyncPoint::GetInstance()->EnableProcessing();
+  // Block compaction queue
+  test::SleepingBackgroundTask sleeping_task_low;
+  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+                 Env::Priority::LOW);
 
-  std::thread auto_comp([&] {
-    TEST_SYNC_POINT("AutoCompactionFinished1");
-    ASSERT_OK(Put("bar", "v2"));
-    ASSERT_OK(Put("foo", "v2"));
-    ASSERT_OK(Flush());
-    ASSERT_OK(Put("bar", "v3"));
-    ASSERT_OK(Put("foo", "v3"));
-    ASSERT_OK(Flush());
-    dbfull()->TEST_WaitForCompact();
-    TEST_SYNC_POINT("AutoCompactionFinished2");
+  port::Thread compact_thread1([&]() {
+    CompactRangeOptions cro;
+    cro.exclusive_manual_compaction = false;
+    std::string begin_str = Key(0);
+    std::string end_str = Key(3);
+    Slice b = begin_str;
+    Slice e = end_str;
+    auto s = db_->CompactRange(cro, &b, &e);
+    ASSERT_TRUE(s.IsIncomplete());
   });
 
-  {
+  port::Thread compact_thread2([&]() {
     CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = GetParam() ? 1 : 0;
-    // This should return non-OK, but it's more important for the test to
-    // make sure that the DB is not corrupted.
-    dbfull()->CompactRange(cro, nullptr, nullptr);
-  }
-  auto_comp.join();
-  // Refitting didn't happen.
-  SyncPoint::GetInstance()->DisableProcessing();
+    cro.exclusive_manual_compaction = false;
+    std::string begin_str = Key(4);
+    std::string end_str = Key(7);
+    Slice b = begin_str;
+    Slice e = end_str;
+    auto s = db_->CompactRange(cro, &b, &e);
+    ASSERT_TRUE(s.IsIncomplete());
+  });
 
-  // Write something to DB just make sure that consistency check didn't
-  // fail and make the DB readable.
+  // Disable manual compaction should cancel both manual compactions and both
+  // compaction should return incomplete.
+  db_->DisableManualCompaction();
+
+  compact_thread1.join();
+  compact_thread2.join();
+
+  sleeping_task_low.WakeUp();
+  sleeping_task_low.WaitUntilDone();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 }
 
-INSTANTIATE_TEST_CASE_P(ChangeLevelConflictsWithAuto,
-                        ChangeLevelConflictsWithAuto, testing::Bool());
+TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) {
+  const int kNumL0Files = 4;
 
-TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) {
-  // A `CompactRange()` with `change_level == true` needs to execute its final
-  // step, `ReFitLevel()`, in isolation. Previously there was a bug where
-  // refitting could target the same level as an ongoing manual compaction,
-  // leading to overlapping files in that level.
-  //
-  // This test ensures that case is not possible by verifying any manual
-  // compaction issued during the `ReFitLevel()` phase fails with
-  // `Status::Incomplete`.
   Options options = CurrentOptions();
-  options.memtable_factory.reset(
-      new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
-  options.level0_file_num_compaction_trigger = 2;
-  options.num_levels = 3;
+  options.level0_file_num_compaction_trigger = kNumL0Files;
   Reopen(options);
 
-  // Setup an LSM with three levels populated.
-  Random rnd(301);
-  int key_idx = 0;
-  GenerateNewFile(&rnd, &key_idx);
-  {
-    CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = 2;
-    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  // generate files, but avoid trigger auto compaction
+  for (int i = 0; i < kNumL0Files / 2; i++) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
   }
-  ASSERT_EQ("0,0,2", FilesPerLevel(0));
-
-  GenerateNewFile(&rnd, &key_idx);
-  GenerateNewFile(&rnd, &key_idx);
-  ASSERT_EQ("1,1,2", FilesPerLevel(0));
 
-  // The background thread will refit L2->L1 while the
-  // foreground thread will try to simultaneously compact L0->L1.
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
-      // The first two dependencies ensure the foreground creates an L0 file
-      // between the background compaction's L0->L1 and its L1->L2.
-      {
-          "DBImpl::RunManualCompaction()::1",
-          "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
-          "PutFG",
-      },
-      {
-          "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
-          "FlushedFG",
-          "DBImpl::RunManualCompaction()::2",
-      },
-      // The next two dependencies ensure the foreground invokes
-      // `CompactRange()` while the background is refitting. The
-      // foreground's `CompactRange()` is guaranteed to attempt an L0->L1
-      // as we set it up with an empty memtable and a new L0 file.
-      {
-          "DBImpl::CompactRange:PreRefitLevel",
-          "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
-          "CompactFG",
-      },
-      {
-          "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
-          "CompactedFG",
-          "DBImpl::CompactRange:PostRefitLevel",
-      },
-  });
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  // make sure the manual compaction background is started but not yet set the
+  // status to in_progress, then cancel the manual compaction, which should not
+  // result in segfault
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::BGWorkCompaction",
+        "DBCompactionTest::DisableJustStartedManualCompaction:"
+        "PreDisableManualCompaction"},
+       {"DBImpl::RunManualCompaction:Unscheduled",
+        "BackgroundCallCompaction:0"}});
+  SyncPoint::GetInstance()->EnableProcessing();
 
-  ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] {
+  port::Thread compact_thread([&]() {
     CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = 1;
-    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+    cro.exclusive_manual_compaction = true;
+    auto s = db_->CompactRange(cro, nullptr, nullptr);
+    ASSERT_TRUE(s.IsIncomplete());
   });
-
-  TEST_SYNC_POINT(
-      "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:PutFG");
-  // Make sure we have something new to compact in the foreground.
-  // Note key 1 is carefully chosen as it ensures the file we create here
-  // overlaps with one of the files being refitted L2->L1 in the background.
-  // If we chose key 0, the file created here would not overlap.
-  ASSERT_OK(Put(Key(1), "val"));
-  ASSERT_OK(Flush());
   TEST_SYNC_POINT(
-      "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:FlushedFG");
+      "DBCompactionTest::DisableJustStartedManualCompaction:"
+      "PreDisableManualCompaction");
+  db_->DisableManualCompaction();
 
-  TEST_SYNC_POINT(
-      "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:CompactFG");
-  ASSERT_TRUE(dbfull()
-                  ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
-                  .IsIncomplete());
-  TEST_SYNC_POINT(
-      "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
-      "CompactedFG");
-  refit_level_thread.join();
+  compact_thread.join();
 }
 
-TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) {
-  // This test is added to ensure that RefitLevel() error paths are clearing
-  // internal flags and to test that subsequent valid RefitLevel() calls
-  // succeeds
+TEST_F(DBCompactionTest, DisableInProgressManualCompaction) {
+  const int kNumL0Files = 4;
+
   Options options = CurrentOptions();
-  options.memtable_factory.reset(
-      new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
-  options.level0_file_num_compaction_trigger = 2;
-  options.num_levels = 3;
+  options.level0_file_num_compaction_trigger = kNumL0Files;
   Reopen(options);
 
-  ASSERT_EQ("", FilesPerLevel(0));
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::BackgroundCompaction:InProgress",
+        "DBCompactionTest::DisableInProgressManualCompaction:"
+        "PreDisableManualCompaction"},
+       {"DBImpl::RunManualCompaction:Unscheduled",
+        "CompactionJob::Run():Start"}});
+  SyncPoint::GetInstance()->EnableProcessing();
 
-  // Setup an LSM with three levels populated.
-  Random rnd(301);
-  int key_idx = 0;
-  GenerateNewFile(&rnd, &key_idx);
-  ASSERT_EQ("1", FilesPerLevel(0));
-  {
-    CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = 2;
-    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  // generate files, but avoid trigger auto compaction
+  for (int i = 0; i < kNumL0Files / 2; i++) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
   }
-  ASSERT_EQ("0,0,2", FilesPerLevel(0));
 
-  auto start_idx = key_idx;
-  GenerateNewFile(&rnd, &key_idx);
-  GenerateNewFile(&rnd, &key_idx);
-  auto end_idx = key_idx - 1;
-  ASSERT_EQ("1,1,2", FilesPerLevel(0));
+  port::Thread compact_thread([&]() {
+    CompactRangeOptions cro;
+    cro.exclusive_manual_compaction = true;
+    auto s = db_->CompactRange(cro, nullptr, nullptr);
+    ASSERT_TRUE(s.IsIncomplete());
+  });
 
-  // Next two CompactRange() calls are used to test exercise error paths within
-  // RefitLevel() before triggering a valid RefitLevel() call
+  TEST_SYNC_POINT(
+      "DBCompactionTest::DisableInProgressManualCompaction:"
+      "PreDisableManualCompaction");
+  db_->DisableManualCompaction();
 
-  // Trigger a refit to L1 first
-  {
-    std::string begin_string = Key(start_idx);
-    std::string end_string = Key(end_idx);
-    Slice begin(begin_string);
-    Slice end(end_string);
+  compact_thread.join();
+}
 
-    CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = 1;
-    ASSERT_OK(dbfull()->CompactRange(cro, &begin, &end));
-  }
-  ASSERT_EQ("0,3,2", FilesPerLevel(0));
+TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) {
+  const int kNumL0Files = 4;
 
-  // Try a refit from L2->L1 - this should fail and exercise error paths in
-  // RefitLevel()
-  {
-    // Select key range that matches the bottom most level (L2)
-    std::string begin_string = Key(0);
-    std::string end_string = Key(start_idx - 1);
-    Slice begin(begin_string);
-    Slice end(end_string);
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::RunManualCompaction:Scheduled",
+        "DBCompactionTest::DisableManualCompactionThreadQueueFull:"
+        "PreDisableManualCompaction"}});
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0Files;
+  Reopen(options);
 
-    CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = 1;
-    ASSERT_NOK(dbfull()->CompactRange(cro, &begin, &end));
-  }
-  ASSERT_EQ("0,3,2", FilesPerLevel(0));
+  // Block compaction queue
+  test::SleepingBackgroundTask sleeping_task_low;
+  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+                 Env::Priority::LOW);
 
-  // Try a valid Refit request to ensure, the path is still working
-  {
-    CompactRangeOptions cro;
-    cro.change_level = true;
-    cro.target_level = 1;
-    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  // generate files, but avoid trigger auto compaction
+  for (int i = 0; i < kNumL0Files / 2; i++) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
   }
-  ASSERT_EQ("0,5", FilesPerLevel(0));
-}
 
-TEST_F(DBCompactionTest, CompactionWithBlob) {
-  Options options;
-  options.env = env_;
-  options.disable_auto_compactions = true;
+  port::Thread compact_thread([&]() {
+    CompactRangeOptions cro;
+    cro.exclusive_manual_compaction = true;
+    auto s = db_->CompactRange(cro, nullptr, nullptr);
+    ASSERT_TRUE(s.IsIncomplete());
+  });
 
-  Reopen(options);
+  TEST_SYNC_POINT(
+      "DBCompactionTest::DisableManualCompactionThreadQueueFull:"
+      "PreDisableManualCompaction");
+
+  // Generate more files to trigger auto compaction which is scheduled after
+  // manual compaction. Has to generate 4 more files because existing files are
+  // pending compaction
+  for (int i = 0; i < kNumL0Files; i++) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
+  }
+  ASSERT_EQ(std::to_string(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0));
 
-  constexpr char first_key[] = "first_key";
-  constexpr char second_key[] = "second_key";
-  constexpr char first_value[] = "first_value";
-  constexpr char second_value[] = "second_value";
-  constexpr char third_value[] = "third_value";
+  db_->DisableManualCompaction();
 
-  ASSERT_OK(Put(first_key, first_value));
-  ASSERT_OK(Put(second_key, first_value));
-  ASSERT_OK(Flush());
+  // CompactRange should return before the compaction has the chance to run
+  compact_thread.join();
 
-  ASSERT_OK(Put(first_key, second_value));
-  ASSERT_OK(Put(second_key, second_value));
-  ASSERT_OK(Flush());
+  sleeping_task_low.WakeUp();
+  sleeping_task_low.WaitUntilDone();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+  ASSERT_EQ("0,1", FilesPerLevel(0));
+}
 
-  ASSERT_OK(Put(first_key, third_value));
-  ASSERT_OK(Put(second_key, third_value));
-  ASSERT_OK(Flush());
+TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
+  const int kNumL0Files = 4;
 
-  options.enable_blob_files = true;
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::RunManualCompaction:Scheduled",
+        "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
+        "PreDisableManualCompaction"}});
+  SyncPoint::GetInstance()->EnableProcessing();
 
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0Files;
   Reopen(options);
 
-  constexpr Slice* begin = nullptr;
-  constexpr Slice* end = nullptr;
+  // Block compaction queue
+  test::SleepingBackgroundTask sleeping_task_low;
+  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+                 Env::Priority::LOW);
 
-  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+  // generate files, but avoid trigger auto compaction
+  for (int i = 0; i < kNumL0Files / 2; i++) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
+  }
 
-  ASSERT_EQ(Get(first_key), third_value);
-  ASSERT_EQ(Get(second_key), third_value);
+  port::Thread compact_thread([&]() {
+    CompactRangeOptions cro;
+    cro.exclusive_manual_compaction = true;
+    auto s = db_->CompactRange(cro, nullptr, nullptr);
+    ASSERT_TRUE(s.IsIncomplete());
+  });
 
-  VersionSet* const versions = dbfull()->TEST_GetVersionSet();
-  assert(versions);
+  TEST_SYNC_POINT(
+      "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
+      "PreDisableManualCompaction");
+
+  // Generate more files to trigger auto compaction which is scheduled after
+  // manual compaction. Has to generate 4 more files because existing files are
+  // pending compaction
+  for (int i = 0; i < kNumL0Files; i++) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
+  }
+  ASSERT_EQ(std::to_string(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0));
 
-  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
-  assert(cfd);
+  db_->DisableManualCompaction();
 
-  Version* const current = cfd->current();
-  assert(current);
+  // CompactRange should return before the compaction has the chance to run
+  compact_thread.join();
 
-  const VersionStorageInfo* const storage_info = current->storage_info();
-  assert(storage_info);
+  // Try close DB while manual compaction is canceled but still in the queue.
+  // And an auto-triggered compaction is also in the queue.
+  auto s = db_->Close();
+  ASSERT_OK(s);
 
-  const auto& l1_files = storage_info->LevelFiles(1);
-  ASSERT_EQ(l1_files.size(), 1);
+  sleeping_task_low.WakeUp();
+  sleeping_task_low.WaitUntilDone();
+}
 
-  const FileMetaData* const table_file = l1_files[0];
-  assert(table_file);
+TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
+  const int kNumL0Files = 4;
 
-  const auto& blob_files = storage_info->GetBlobFiles();
-  ASSERT_EQ(blob_files.size(), 1);
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::RunManualCompaction:Scheduled",
+        "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
+        "PreDisableManualCompaction"}});
+  SyncPoint::GetInstance()->EnableProcessing();
 
-  const auto& blob_file = blob_files.begin()->second;
-  assert(blob_file);
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0Files;
+  Reopen(options);
 
-  ASSERT_EQ(table_file->smallest.user_key(), first_key);
-  ASSERT_EQ(table_file->largest.user_key(), second_key);
-  ASSERT_EQ(table_file->oldest_blob_file_number,
-            blob_file->GetBlobFileNumber());
+  // Block compaction queue
+  test::SleepingBackgroundTask sleeping_task_low;
+  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+                 Env::Priority::LOW);
 
-  ASSERT_EQ(blob_file->GetTotalBlobCount(), 2);
+  // generate files, but avoid trigger auto compaction
+  for (int i = 0; i < kNumL0Files / 2; i++) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
+  }
 
-  const InternalStats* const internal_stats = cfd->internal_stats();
-  assert(internal_stats);
+  port::Thread compact_thread([&]() {
+    CompactRangeOptions cro;
+    cro.exclusive_manual_compaction = true;
+    auto s = db_->CompactRange(cro, nullptr, nullptr);
+    ASSERT_TRUE(s.IsIncomplete());
+  });
 
-  const uint64_t expected_bytes =
-      table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
+  TEST_SYNC_POINT(
+      "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
+      "PreDisableManualCompaction");
+
+  // Generate more files to trigger auto compaction which is scheduled after
+  // manual compaction. Has to generate 4 more files because existing files are
+  // pending compaction
+  for (int i = 0; i < kNumL0Files; i++) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
+  }
+  ASSERT_EQ(std::to_string(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0));
 
-  const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
-  ASSERT_GE(compaction_stats.size(), 2);
-  ASSERT_EQ(compaction_stats[1].bytes_written, expected_bytes);
-  ASSERT_EQ(compaction_stats[1].num_output_files, 2);
-}
+  // Close DB with manual compaction and auto triggered compaction in the queue.
+  auto s = db_->Close();
+  ASSERT_OK(s);
 
-class DBCompactionTestBlobError
-    : public DBCompactionTest,
-      public testing::WithParamInterface<std::string> {
- public:
-  DBCompactionTestBlobError()
-      : fault_injection_env_(env_), sync_point_(GetParam()) {}
-  ~DBCompactionTestBlobError() { Close(); }
+  // manual compaction thread should return with Incomplete().
+  compact_thread.join();
 
-  FaultInjectionTestEnv fault_injection_env_;
-  std::string sync_point_;
-};
+  sleeping_task_low.WakeUp();
+  sleeping_task_low.WaitUntilDone();
+}
 
-INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobError, DBCompactionTestBlobError,
-                        ::testing::ValuesIn(std::vector<std::string>{
-                            "BlobFileBuilder::WriteBlobToFile:AddRecord",
-                            "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
+TEST_F(DBCompactionTest,
+       DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) {
+  // When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait
+  // for automatic compactions to drain before starting the manual compaction.
+  // This test verifies `DisableManualCompaction()` can cancel such a compaction
+  // without waiting for the drain to complete.
+  const int kNumL0Files = 4;
 
-TEST_P(DBCompactionTestBlobError, CompactionError) {
-  Options options;
-  options.disable_auto_compactions = true;
-  options.env = env_;
+  // Enforces manual compaction enters wait loop due to pending automatic
+  // compaction.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::BGWorkCompaction", "DBImpl::RunManualCompaction:NotScheduled"},
+       {"DBImpl::RunManualCompaction:WaitScheduled",
+        "BackgroundCallCompaction:0"}});
+  // The automatic compaction will cancel the waiting manual compaction.
+  // Completing this implies the cancellation did not wait on automatic
+  // compactions to finish.
+  bool callback_completed = false;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "BackgroundCallCompaction:0", [&](void* /*arg*/) {
+        db_->DisableManualCompaction();
+        callback_completed = true;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0Files;
   Reopen(options);
 
-  constexpr char first_key[] = "first_key";
-  constexpr char second_key[] = "second_key";
-  constexpr char first_value[] = "first_value";
-  constexpr char second_value[] = "second_value";
-  constexpr char third_value[] = "third_value";
+  for (int i = 0; i < kNumL0Files; ++i) {
+    ASSERT_OK(Put(Key(1), "value1"));
+    ASSERT_OK(Put(Key(2), "value2"));
+    ASSERT_OK(Flush());
+  }
 
-  ASSERT_OK(Put(first_key, first_value));
-  ASSERT_OK(Put(second_key, first_value));
-  ASSERT_OK(Flush());
+  CompactRangeOptions cro;
+  cro.exclusive_manual_compaction = true;
+  ASSERT_TRUE(db_->CompactRange(cro, nullptr, nullptr).IsIncomplete());
 
-  ASSERT_OK(Put(first_key, second_value));
-  ASSERT_OK(Put(second_key, second_value));
-  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_TRUE(callback_completed);
+}
 
-  ASSERT_OK(Put(first_key, third_value));
-  ASSERT_OK(Put(second_key, third_value));
-  ASSERT_OK(Flush());
+TEST_F(DBCompactionTest, ChangeLevelConflictsWithManual) {
+  Options options = CurrentOptions();
+  options.num_levels = 3;
+  Reopen(options);
 
-  options.enable_blob_files = true;
-  options.env = &fault_injection_env_;
+  // Setup an LSM with L2 populated.
+  Random rnd(301);
+  ASSERT_OK(Put(Key(0), rnd.RandomString(990)));
+  ASSERT_OK(Put(Key(1), rnd.RandomString(990)));
+  {
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 2;
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  }
+  ASSERT_EQ("0,0,1", FilesPerLevel(0));
 
-  Reopen(options);
+  // The background thread will refit L2->L1 while the foreground thread will
+  // attempt to run a compaction on new data. The following dependencies
+  // ensure the background manual compaction's refitting phase disables manual
+  // compaction immediately before the foreground manual compaction can register
+  // itself. Manual compaction is kept disabled until the foreground manual
+  // checks for the failure once.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+      // Only do Put()s for foreground CompactRange() once the background
+      // CompactRange() has reached the refitting phase.
+      {
+          "DBImpl::CompactRange:BeforeRefit:1",
+          "DBCompactionTest::ChangeLevelConflictsWithManual:"
+          "PreForegroundCompactRange",
+      },
+      // Right before we register the manual compaction, proceed with
+      // the refitting phase so manual compactions are disabled. Stay in
+      // the refitting phase with manual compactions disabled until it is
+      // noticed.
+      {
+          "DBImpl::RunManualCompaction:0",
+          "DBImpl::CompactRange:BeforeRefit:2",
+      },
+      {
+          "DBImpl::CompactRange:PreRefitLevel",
+          "DBImpl::RunManualCompaction:1",
+      },
+      {
+          "DBImpl::RunManualCompaction:PausedAtStart",
+          "DBImpl::CompactRange:PostRefitLevel",
+      },
+      // If compaction somehow were scheduled, let's let it run after reenabling
+      // manual compactions. This dependency is not expected to be hit but is
+      // here for speculatively coercing future bugs.
+      {
+          "DBImpl::CompactRange:PostRefitLevel:ManualCompactionEnabled",
+          "BackgroundCallCompaction:0",
+      },
+  });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
-    fault_injection_env_.SetFilesystemActive(false,
-                                             Status::IOError(sync_point_));
+  ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] {
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 1;
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
   });
-  SyncPoint::GetInstance()->EnableProcessing();
 
-  constexpr Slice* begin = nullptr;
-  constexpr Slice* end = nullptr;
+  TEST_SYNC_POINT(
+      "DBCompactionTest::ChangeLevelConflictsWithManual:"
+      "PreForegroundCompactRange");
+  ASSERT_OK(Put(Key(0), rnd.RandomString(990)));
+  ASSERT_OK(Put(Key(1), rnd.RandomString(990)));
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+                  .IsIncomplete());
 
-  ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), begin, end).IsIOError());
+  refit_level_thread.join();
+}
 
-  SyncPoint::GetInstance()->DisableProcessing();
-  SyncPoint::GetInstance()->ClearAllCallBacks();
+TEST_F(DBCompactionTest, BottomPriCompactionCountsTowardConcurrencyLimit) {
+  // Flushes several files to trigger compaction while lock is released during
+  // a bottom-pri compaction. Verifies it does not get scheduled to thread pool
+  // because per-DB limit for compaction parallelism is one (default).
+  const int kNumL0Files = 4;
+  const int kNumLevels = 3;
 
-  VersionSet* const versions = dbfull()->TEST_GetVersionSet();
-  assert(versions);
+  env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
 
-  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
-  assert(cfd);
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0Files;
+  options.num_levels = kNumLevels;
+  DestroyAndReopen(options);
 
-  Version* const current = cfd->current();
-  assert(current);
+  // Setup last level to be non-empty since it's a bit unclear whether
+  // compaction to an empty level would be considered "bottommost".
+  ASSERT_OK(Put(Key(0), "val"));
+  ASSERT_OK(Flush());
+  MoveFilesToLevel(kNumLevels - 1);
+
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::BGWorkBottomCompaction",
+        "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
+        "PreTriggerCompaction"},
+       {"DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
+        "PostTriggerCompaction",
+        "BackgroundCallCompaction:0"}});
+  SyncPoint::GetInstance()->EnableProcessing();
 
-  const VersionStorageInfo* const storage_info = current->storage_info();
-  assert(storage_info);
+  port::Thread compact_range_thread([&] {
+    CompactRangeOptions cro;
+    cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+    cro.exclusive_manual_compaction = false;
+    ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+  });
 
-  const auto& l1_files = storage_info->LevelFiles(1);
-  ASSERT_TRUE(l1_files.empty());
+  // Sleep in the low-pri thread so any newly scheduled compaction will be
+  // queued. Otherwise it might finish before we check its existence.
+  test::SleepingBackgroundTask sleeping_task_low;
+  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+                 Env::Priority::LOW);
 
-  const auto& blob_files = storage_info->GetBlobFiles();
-  ASSERT_TRUE(blob_files.empty());
+  TEST_SYNC_POINT(
+      "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
+      "PreTriggerCompaction");
+  for (int i = 0; i < kNumL0Files; ++i) {
+    ASSERT_OK(Put(Key(0), "val"));
+    ASSERT_OK(Flush());
+  }
+  ASSERT_EQ(0u, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
+  TEST_SYNC_POINT(
+      "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
+      "PostTriggerCompaction");
 
-  const InternalStats* const internal_stats = cfd->internal_stats();
-  assert(internal_stats);
+  sleeping_task_low.WakeUp();
+  sleeping_task_low.WaitUntilDone();
+  compact_range_thread.join();
+}
 
-  const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
-  ASSERT_GE(compaction_stats.size(), 2);
+TEST_F(DBCompactionTest, BottommostFileCompactionAllowIngestBehind) {
+  // allow_ingest_behind prevents seqnum zeroing, and could cause
+  // compaction loop with reason kBottommostFiles.
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.compaction_style = kCompactionStyleLevel;
+  options.allow_ingest_behind = true;
+  options.comparator = BytewiseComparator();
+  DestroyAndReopen(options);
 
-  if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
-    ASSERT_EQ(compaction_stats[1].bytes_written, 0);
-    ASSERT_EQ(compaction_stats[1].num_output_files, 0);
-  } else {
-    // SST file writing succeeded; blob file writing failed (during Finish)
-    ASSERT_GT(compaction_stats[1].bytes_written, 0);
-    ASSERT_EQ(compaction_stats[1].num_output_files, 1);
-  }
+  WriteOptions write_opts;
+  ASSERT_OK(db_->Put(write_opts, "infinite", "compaction loop"));
+  ASSERT_OK(db_->Put(write_opts, "infinite", "loop"));
+
+  ASSERT_OK(Flush());
+  MoveFilesToLevel(1);
+  ASSERT_OK(db_->Put(write_opts, "bumpseqnum", ""));
+  ASSERT_OK(Flush());
+  auto snapshot = db_->GetSnapshot();
+  // Bump up oldest_snapshot_seqnum_ in VersionStorageInfo.
+  db_->ReleaseSnapshot(snapshot);
+  bool compacted = false;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "LevelCompactionPicker::PickCompaction:Return", [&](void* /* arg */) {
+        // There should not be a compaction.
+        compacted = true;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  // Wait for compaction to be scheduled.
+  env_->SleepForMicroseconds(2000000);
+  ASSERT_FALSE(compacted);
+  // The following assert can be used to check for compaction loop:
+  // it used to wait forever before the fix.
+  // ASSERT_OK(dbfull()->TEST_WaitForCompact(true /* wait_unscheduled */));
 }
 
 #endif  // !defined(ROCKSDB_LITE)
@@ -6030,8 +8220,8 @@ int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 #else
-  (void) argc;
-  (void) argv;
+  (void)argc;
+  (void)argv;
   return 0;
 #endif
 }