]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_compaction_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_compaction_test.cc
index f701101dd56078d71dbdd959accad445f5128d48..5136b03921f90e14953890edbda192f6969646b6 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
@@ -53,7 +53,7 @@ class FlushedFileCollector : public EventListener {
   FlushedFileCollector() {}
   ~FlushedFileCollector() {}
 
-  virtual void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
+  virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
     std::lock_guard<std::mutex> lock(mutex_);
     flushed_files_.push_back(info.file_path);
   }
@@ -74,6 +74,63 @@ class FlushedFileCollector : public EventListener {
   std::mutex mutex_;
 };
 
+class CompactionStatsCollector : public EventListener {
+public:
+  CompactionStatsCollector()
+      : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
+    for (auto& v : compaction_completed_) {
+      v.store(0);
+    }
+  }
+
+  ~CompactionStatsCollector() {}
+
+  virtual void OnCompactionCompleted(DB* /* db */,
+      const CompactionJobInfo& info) override {
+    int k = static_cast<int>(info.compaction_reason);
+    int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
+    assert(k >= 0 && k < num_of_reasons);
+    compaction_completed_[k]++;
+  }
+
+  virtual void OnExternalFileIngested(DB* /* db */,
+      const ExternalFileIngestionInfo& /* info */) override {
+    int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
+    compaction_completed_[k]++;
+  }
+
+  virtual void OnFlushCompleted(DB* /* db */,
+      const FlushJobInfo& /* info */) override {
+    int k = static_cast<int>(CompactionReason::kFlush);
+    compaction_completed_[k]++;
+  }
+
+  int NumberOfCompactions(CompactionReason reason) const {
+    int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
+    int k = static_cast<int>(reason);
+    assert(k >= 0 && k < num_of_reasons);
+    return compaction_completed_.at(k).load();
+  }
+
+private:
+  std::vector<std::atomic<int>> compaction_completed_;
+};
+
+class SstStatsCollector : public EventListener {
+ public:
+  SstStatsCollector() : num_ssts_creation_started_(0) {}
+
+  void OnTableFileCreationStarted(
+      const TableFileCreationBriefInfo& /* info */) override {
+    ++num_ssts_creation_started_;
+  }
+
+  int num_ssts_creation_started() { return num_ssts_creation_started_; }
+
+ private:
+  std::atomic<int> num_ssts_creation_started_;
+};
+
 static const int kCDTValueSize = 1000;
 static const int kCDTKeysPerBuffer = 4;
 static const int kCDTNumLevels = 8;
@@ -154,6 +211,40 @@ void VerifyCompactionResult(
 #endif
 }
 
+/*
+ * Verifies compaction stats of cfd are valid.
+ *
+ * For each level of cfd, its compaction stats are valid if
+ * 1) sum(stat.counts) == stat.count, and
+ * 2) stat.counts[i] == collector.NumberOfCompactions(i)
+ */
+void VerifyCompactionStats(ColumnFamilyData& cfd,
+    const CompactionStatsCollector& collector) {
+#ifndef NDEBUG
+  InternalStats* internal_stats_ptr = cfd.internal_stats();
+  ASSERT_TRUE(internal_stats_ptr != nullptr);
+  const std::vector<InternalStats::CompactionStats>& comp_stats =
+      internal_stats_ptr->TEST_GetCompactionStats();
+  const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
+  std::vector<int> counts(num_of_reasons, 0);
+  // Count the number of compactions caused by each CompactionReason across
+  // all levels.
+  for (const auto& stat : comp_stats) {
+    int sum = 0;
+    for (int i = 0; i < num_of_reasons; i++) {
+      counts[i] += stat.counts[i];
+      sum += stat.counts[i];
+    }
+    ASSERT_EQ(sum, stat.count);
+  }
+  // Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
+  // assuming that all compactions complete.
+  for (int i = 0; i < num_of_reasons; i++) {
+    ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
+  }
+#endif /* NDEBUG */
+}
+
 const SstFileMetaData* PickFileRandomly(
     const ColumnFamilyMetaData& cf_meta,
     Random* rand,
@@ -175,6 +266,7 @@ const SstFileMetaData* PickFileRandomly(
 }
 }  // anonymous namespace
 
+#ifndef ROCKSDB_VALGRIND_RUN
 // All the TEST_P tests run once with sub_compactions disabled (i.e.
 // options.max_subcompactions = 1) and once with it enabled
 TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
@@ -217,6 +309,85 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
     ASSERT_GT(db_size[0] / 3, db_size[1]);
   }
 }
+#endif  // ROCKSDB_VALGRIND_RUN
+
+TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
+  //  For each options type we test following
+  //  - Enable preserve_deletes
+  //  - write bunch of keys and deletes
+  //  - Set start_seqnum to the beginning; compact; check that keys are present
+  //  - rewind start_seqnum way forward; compact; check that keys are gone
+
+  for (int tid = 0; tid < 3; ++tid) {
+    Options options = DeletionTriggerOptions(CurrentOptions());
+    options.max_subcompactions = max_subcompactions_;
+    options.preserve_deletes=true;
+    options.num_levels = 2;
+
+    if (tid == 1) {
+      options.skip_stats_update_on_db_open = true;
+    } else if (tid == 2) {
+      // third pass with universal compaction
+      options.compaction_style = kCompactionStyleUniversal;
+    }
+
+    DestroyAndReopen(options);
+    Random rnd(301);
+    // highlight the default; all deletes should be preserved
+    SetPreserveDeletesSequenceNumber(0);
+
+    const int kTestSize = kCDTKeysPerBuffer;
+    std::vector<std::string> values;
+    for (int k = 0; k < kTestSize; ++k) {
+      values.push_back(RandomString(&rnd, kCDTValueSize));
+      ASSERT_OK(Put(Key(k), values[k]));
+    }
+
+    for (int k = 0; k < kTestSize; ++k) {
+      ASSERT_OK(Delete(Key(k)));
+    }
+    // to ensure we tackle all tombstones
+    CompactRangeOptions cro;
+    cro.change_level = true;
+    cro.target_level = 2;
+    cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+
+    dbfull()->TEST_WaitForFlushMemTable();
+    dbfull()->CompactRange(cro, nullptr, nullptr);
+
+    // check that normal user iterator doesn't see anything
+    Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
+    int i = 0;
+    for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
+      i++;
+    }
+    ASSERT_EQ(i, 0);
+    delete db_iter;
+
+    // check that iterator that sees internal keys sees tombstones
+    ReadOptions ro;
+    ro.iter_start_seqnum=1;
+    db_iter = dbfull()->NewIterator(ro);
+    i = 0;
+    for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
+      i++;
+    }
+    ASSERT_EQ(i, 4);
+    delete db_iter;
+
+    // now all deletes should be gone
+    SetPreserveDeletesSequenceNumber(100000000);
+    dbfull()->CompactRange(cro, nullptr, nullptr);
+
+    db_iter = dbfull()->NewIterator(ro);
+    i = 0;
+    for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
+      i++;
+    }
+    ASSERT_EQ(i, 0);
+    delete db_iter;
+  }
+}
 
 TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
   // This test verify UpdateAccumulatedStats is not on
@@ -282,7 +453,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
       });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "TableCache::GetTableReader:0",
-      [&](void* arg) { num_new_table_reader++; });
+      [&](void* /*arg*/) { num_new_table_reader++; });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   for (int k = 0; k < options.level0_file_num_compaction_trigger; ++k) {
@@ -519,7 +690,6 @@ TEST_F(DBCompactionTest, BGCompactionsAllowed) {
   options.level0_file_num_compaction_trigger = 2;
   options.level0_slowdown_writes_trigger = 20;
   options.soft_pending_compaction_bytes_limit = 1 << 30;  // Infinitely large
-  options.base_background_compactions = 1;
   options.max_background_compactions = 3;
   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
 
@@ -778,6 +948,41 @@ TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
   ASSERT_OK(Put("", ""));
 }
 
+TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) {
+  // github issue #2249
+  Options options = CurrentOptions();
+  options.compaction_style = kCompactionStyleLevel;
+  options.level0_file_num_compaction_trigger = 3;
+  DestroyAndReopen(options);
+
+  // create two files in l1 that we can compact
+  for (int i = 0; i < 2; ++i) {
+    for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) {
+      // make l0 files' ranges overlap to avoid trivial move
+      Put(std::to_string(2 * i), std::string(1, 'A'));
+      Put(std::to_string(2 * i + 1), std::string(1, 'A'));
+      Flush();
+      dbfull()->TEST_WaitForFlushMemTable();
+    }
+    dbfull()->TEST_WaitForCompact();
+    ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
+    ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1);
+  }
+
+  ColumnFamilyMetaData cf_meta;
+  dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
+  ASSERT_EQ(2, cf_meta.levels[1].files.size());
+  std::vector<std::string> input_filenames;
+  for (const auto& sst_file : cf_meta.levels[1].files) {
+    input_filenames.push_back(sst_file.name);
+  }
+
+  // note CompactionOptions::output_file_size_limit is unset.
+  CompactionOptions compact_opt;
+  compact_opt.compression = kNoCompression;
+  dbfull()->CompactFiles(compact_opt, input_filenames, 1);
+}
+
 // Check that writes done during a memtable compaction are recovered
 // if the database is shutdown during the memtable compaction.
 TEST_F(DBCompactionTest, RecoverDuringMemtableCompaction) {
@@ -804,7 +1009,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
   int32_t trivial_move = 0;
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:TrivialMove",
-      [&](void* arg) { trivial_move++; });
+      [&](void* /*arg*/) { trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   Options options = CurrentOptions();
@@ -861,10 +1066,10 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
   int32_t non_trivial_move = 0;
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:TrivialMove",
-      [&](void* arg) { trivial_move++; });
+      [&](void* /*arg*/) { trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:NonTrivial",
-      [&](void* arg) { non_trivial_move++; });
+      [&](void* /*arg*/) { non_trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   Options options = CurrentOptions();
@@ -960,10 +1165,10 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
   int32_t non_trivial_move = 0;
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:TrivialMove",
-      [&](void* arg) { trivial_move++; });
+      [&](void* /*arg*/) { trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:NonTrivial",
-      [&](void* arg) { non_trivial_move++; });
+      [&](void* /*arg*/) { non_trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   Options options = CurrentOptions();
@@ -1019,10 +1224,10 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
   int32_t non_trivial_move = 0;
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:TrivialMove",
-      [&](void* arg) { trivial_move++; });
+      [&](void* /*arg*/) { trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:NonTrivial",
-      [&](void* arg) { non_trivial_move++; });
+      [&](void* /*arg*/) { non_trivial_move++; });
   bool first = true;
   // Purpose of dependencies:
   // 4 -> 1: ensure the order of two non-trivial compactions
@@ -1033,7 +1238,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
        {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"},
        {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}});
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
-      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
+      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
         if (first) {
           first = false;
           TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
@@ -1164,17 +1369,17 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
   int32_t non_trivial_move = 0;
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:TrivialMove",
-      [&](void* arg) { trivial_move++; });
+      [&](void* /*arg*/) { trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:NonTrivial",
-      [&](void* arg) { non_trivial_move++; });
+      [&](void* /*arg*/) { non_trivial_move++; });
   bool first = true;
   bool second = true;
   rocksdb::SyncPoint::GetInstance()->LoadDependency(
       {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"},
        {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}});
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
-      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
+      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
         if (first) {
           TEST_SYNC_POINT("DBCompaction::PartialFill:4");
           first = false;
@@ -1193,6 +1398,9 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
   options.max_background_compactions = 3;
 
   DestroyAndReopen(options);
+  // make sure all background compaction jobs can be scheduled
+  auto stop_token =
+      dbfull()->TEST_write_controler().GetCompactionPressureToken();
   int32_t value_size = 10 * 1024;  // 10 KB
 
   // Add 2 non-overlapping files
@@ -1402,15 +1610,185 @@ TEST_F(DBCompactionTest, DeleteFileRange) {
   ASSERT_GT(old_num_files, new_num_files);
 }
 
+TEST_F(DBCompactionTest, DeleteFilesInRanges) {
+  Options options = CurrentOptions();
+  options.write_buffer_size = 10 * 1024 * 1024;
+  options.max_bytes_for_level_multiplier = 2;
+  options.num_levels = 4;
+  options.max_background_compactions = 3;
+  options.disable_auto_compactions = true;
+
+  DestroyAndReopen(options);
+  int32_t value_size = 10 * 1024;  // 10 KB
+
+  Random rnd(301);
+  std::map<int32_t, std::string> values;
+
+  // file [0 => 100), [100 => 200), ... [900, 1000)
+  for (auto i = 0; i < 10; i++) {
+    for (auto j = 0; j < 100; j++) {
+      auto k = i * 100 + j;
+      values[k] = RandomString(&rnd, value_size);
+      ASSERT_OK(Put(Key(k), values[k]));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_EQ("10", FilesPerLevel(0));
+  CompactRangeOptions compact_options;
+  compact_options.change_level = true;
+  compact_options.target_level = 2;
+  ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
+  ASSERT_EQ("0,0,10", FilesPerLevel(0));
+
+  // file [0 => 100), [200 => 300), ... [800, 900)
+  for (auto i = 0; i < 10; i+=2) {
+    for (auto j = 0; j < 100; j++) {
+      auto k = i * 100 + j;
+      ASSERT_OK(Put(Key(k), values[k]));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_EQ("5,0,10", FilesPerLevel(0));
+  ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
+  ASSERT_EQ("0,5,10", FilesPerLevel(0));
+
+  // Delete files in range [0, 299] (inclusive)
+  {
+    auto begin_str1 = Key(0), end_str1 = Key(100);
+    auto begin_str2 = Key(100), end_str2 = Key(200);
+    auto begin_str3 = Key(200), end_str3 = Key(299);
+    Slice begin1(begin_str1), end1(end_str1);
+    Slice begin2(begin_str2), end2(end_str2);
+    Slice begin3(begin_str3), end3(end_str3);
+    std::vector<RangePtr> ranges;
+    ranges.push_back(RangePtr(&begin1, &end1));
+    ranges.push_back(RangePtr(&begin2, &end2));
+    ranges.push_back(RangePtr(&begin3, &end3));
+    ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
+                                  ranges.data(), ranges.size()));
+    ASSERT_EQ("0,3,7", FilesPerLevel(0));
+
+    // Keys [0, 300) should not exist.
+    for (auto i = 0; i < 300; i++) {
+      ReadOptions ropts;
+      std::string result;
+      auto s = db_->Get(ropts, Key(i), &result);
+      ASSERT_TRUE(s.IsNotFound());
+    }
+    for (auto i = 300; i < 1000; i++) {
+      ASSERT_EQ(Get(Key(i)), values[i]);
+    }
+  }
+
+  // Delete files in range [600, 999) (exclusive)
+  {
+    auto begin_str1 = Key(600), end_str1 = Key(800);
+    auto begin_str2 = Key(700), end_str2 = Key(900);
+    auto begin_str3 = Key(800), end_str3 = Key(999);
+    Slice begin1(begin_str1), end1(end_str1);
+    Slice begin2(begin_str2), end2(end_str2);
+    Slice begin3(begin_str3), end3(end_str3);
+    std::vector<RangePtr> ranges;
+    ranges.push_back(RangePtr(&begin1, &end1));
+    ranges.push_back(RangePtr(&begin2, &end2));
+    ranges.push_back(RangePtr(&begin3, &end3));
+    ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
+                                  ranges.data(), ranges.size(), false));
+    ASSERT_EQ("0,1,4", FilesPerLevel(0));
+
+    // Keys [600, 900) should not exist.
+    for (auto i = 600; i < 900; i++) {
+      ReadOptions ropts;
+      std::string result;
+      auto s = db_->Get(ropts, Key(i), &result);
+      ASSERT_TRUE(s.IsNotFound());
+    }
+    for (auto i = 300; i < 600; i++) {
+      ASSERT_EQ(Get(Key(i)), values[i]);
+    }
+    for (auto i = 900; i < 1000; i++) {
+      ASSERT_EQ(Get(Key(i)), values[i]);
+    }
+  }
+
+  // Delete all files.
+  {
+    RangePtr range;
+    ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), &range, 1));
+    ASSERT_EQ("", FilesPerLevel(0));
+
+    for (auto i = 0; i < 1000; i++) {
+      ReadOptions ropts;
+      std::string result;
+      auto s = db_->Get(ropts, Key(i), &result);
+      ASSERT_TRUE(s.IsNotFound());
+    }
+  }
+}
+
+TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) {
+  // regression test for #2833: groups of files whose user-keys overlap at the
+  // endpoints could be split by `DeleteFilesInRange`. This caused old data to
+  // reappear, either because a new version of the key was removed, or a range
+  // deletion was partially dropped. It could also cause non-overlapping
+  // invariant to be violated if the files dropped by DeleteFilesInRange were
+  // a subset of files that a range deletion spans.
+  const int kNumL0Files = 2;
+  const int kValSize = 8 << 10;  // 8KB
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0Files;
+  options.target_file_size_base = 1 << 10;  // 1KB
+  DestroyAndReopen(options);
+
+  // The snapshot prevents key 1 from having its old version dropped. The low
+  // `target_file_size_base` ensures two keys will be in each output file.
+  const Snapshot* snapshot = nullptr;
+  Random rnd(301);
+  // The value indicates which flush the key belonged to, which is enough
+  // for us to determine the keys' relative ages. After L0 flushes finish,
+  // files look like:
+  //
+  // File 0: 0 -> vals[0], 1 -> vals[0]
+  // File 1:               1 -> vals[1], 2 -> vals[1]
+  //
+  // Then L0->L1 compaction happens, which outputs keys as follows:
+  //
+  // File 0: 0 -> vals[0], 1 -> vals[1]
+  // File 1:               1 -> vals[0], 2 -> vals[1]
+  //
+  // DeleteFilesInRange shouldn't be allowed to drop just file 0, as that
+  // would cause `1 -> vals[0]` (an older key) to reappear.
+  std::string vals[kNumL0Files];
+  for (int i = 0; i < kNumL0Files; ++i) {
+    vals[i] = RandomString(&rnd, kValSize);
+    Put(Key(i), vals[i]);
+    Put(Key(i + 1), vals[i]);
+    Flush();
+    if (i == 0) {
+      snapshot = db_->GetSnapshot();
+    }
+  }
+  dbfull()->TEST_WaitForCompact();
+
+  // Verify `DeleteFilesInRange` can't drop only file 0 which would cause
+  // "1 -> vals[0]" to reappear.
+  std::string begin_str = Key(0), end_str = Key(1);
+  Slice begin = begin_str, end = end_str;
+  ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
+  ASSERT_EQ(vals[1], Get(Key(1)));
+
+  db_->ReleaseSnapshot(snapshot);
+}
+
 TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
   int32_t trivial_move = 0;
   int32_t non_trivial_move = 0;
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:TrivialMove",
-      [&](void* arg) { trivial_move++; });
+      [&](void* /*arg*/) { trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:NonTrivial",
-      [&](void* arg) { non_trivial_move++; });
+      [&](void* /*arg*/) { non_trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   Options options = CurrentOptions();
@@ -1698,6 +2076,125 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
   Destroy(options);
 }
 
+TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
+  Options options = CurrentOptions();
+  options.db_paths.emplace_back(dbname_, 500 * 1024);
+  options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
+  options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
+  options.memtable_factory.reset(
+    new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+  options.compaction_style = kCompactionStyleLevel;
+  options.write_buffer_size = 110 << 10;  // 110KB
+  options.arena_block_size = 4 << 10;
+  options.level0_file_num_compaction_trigger = 2;
+  options.num_levels = 4;
+  options.max_bytes_for_level_base = 400 * 1024;
+  options.max_subcompactions = max_subcompactions_;
+
+  std::vector<Options> option_vector;
+  option_vector.emplace_back(options);
+  ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
+  // Configure CF1 specific paths.
+  cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024);
+  cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
+  cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
+  option_vector.emplace_back(DBOptions(options), cf_opt1);
+  CreateColumnFamilies({"one"},option_vector[1]);
+
+  // Configura CF2 specific paths.
+  cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
+  cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
+  cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
+  option_vector.emplace_back(DBOptions(options), cf_opt2);
+  CreateColumnFamilies({"two"},option_vector[2]);
+
+  ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
+
+  Random rnd(301);
+  int key_idx = 0;
+  int key_idx1 = 0;
+  int key_idx2 = 0;
+
+  auto generate_file = [&]() {
+    GenerateNewFile(0, &rnd, &key_idx);
+    GenerateNewFile(1, &rnd, &key_idx1);
+    GenerateNewFile(2, &rnd, &key_idx2);
+  };
+
+  auto check_sstfilecount = [&](int path_id, int expected) {
+    ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
+    ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
+    ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
+  };
+
+  auto check_filesperlevel = [&](const std::string& expected) {
+    ASSERT_EQ(expected, FilesPerLevel(0));
+    ASSERT_EQ(expected, FilesPerLevel(1));
+    ASSERT_EQ(expected, FilesPerLevel(2));
+  };
+
+  auto check_getvalues = [&]() {
+    for (int i = 0; i < key_idx; i++) {
+      auto v = Get(0, Key(i));
+      ASSERT_NE(v, "NOT_FOUND");
+      ASSERT_TRUE(v.size() == 1 || v.size() == 990);
+    }
+
+    for (int i = 0; i < key_idx1; i++) {
+      auto v = Get(1, Key(i));
+      ASSERT_NE(v, "NOT_FOUND");
+      ASSERT_TRUE(v.size() == 1 || v.size() == 990);
+    }
+
+    for (int i = 0; i < key_idx2; i++) {
+      auto v = Get(2, Key(i));
+      ASSERT_NE(v, "NOT_FOUND");
+      ASSERT_TRUE(v.size() == 1 || v.size() == 990);
+    }
+  };
+
+  // Check that default column family uses db_paths.
+  // And Column family "one" uses cf_paths.
+
+  // First three 110KB files are not going to second path.
+  // After that, (100K, 200K)
+  for (int num = 0; num < 3; num++) {
+    generate_file();
+  }
+
+  // Another 110KB triggers a compaction to 400K file to fill up first path
+  generate_file();
+  check_sstfilecount(1, 3);
+
+  // (1, 4)
+  generate_file();
+  check_filesperlevel("1,4");
+  check_sstfilecount(1, 4);
+  check_sstfilecount(0, 1);
+
+  // (1, 4, 1)
+  generate_file();
+  check_filesperlevel("1,4,1");
+  check_sstfilecount(2, 1);
+  check_sstfilecount(1, 4);
+  check_sstfilecount(0, 1);
+
+  // (1, 4, 2)
+  generate_file();
+  check_filesperlevel("1,4,2");
+  check_sstfilecount(2, 2);
+  check_sstfilecount(1, 4);
+  check_sstfilecount(0, 1);
+
+  check_getvalues();
+
+  ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
+
+  check_getvalues();
+
+  Destroy(options, true);
+}
+
 TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
   Random rnd(301);
   int max_key_level_insert = 200;
@@ -1850,6 +2347,47 @@ TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
   } while (ChangeCompactOptions());
 }
 
+TEST_F(DBCompactionTest, ManualAutoRace) {
+  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::BGWorkCompaction", "DBCompactionTest::ManualAutoRace:1"},
+       {"DBImpl::RunManualCompaction:WaitScheduled",
+        "BackgroundCallCompaction:0"}});
+
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  Put(1, "foo", "");
+  Put(1, "bar", "");
+  Flush(1);
+  Put(1, "foo", "");
+  Put(1, "bar", "");
+  // Generate four files in CF 0, which should trigger an auto compaction
+  Put("foo", "");
+  Put("bar", "");
+  Flush();
+  Put("foo", "");
+  Put("bar", "");
+  Flush();
+  Put("foo", "");
+  Put("bar", "");
+  Flush();
+  Put("foo", "");
+  Put("bar", "");
+  Flush();
+
+  // The auto compaction is scheduled but waited until here
+  TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1");
+  // The auto compaction will wait until the manual compaction is registerd
+  // before processing so that it will be cancelled.
+  dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
+  ASSERT_EQ("0,1", FilesPerLevel(1));
+
+  // Eventually the cancelled compaction will be rescheduled and executed.
+  dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ("0,1", FilesPerLevel(0));
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
 TEST_P(DBCompactionTestWithParam, ManualCompaction) {
   Options options = CurrentOptions();
   options.max_subcompactions = max_subcompactions_;
@@ -1899,7 +2437,6 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) {
 
     if (iter == 0) {
       options = CurrentOptions();
-      options.max_background_flushes = 0;
       options.num_levels = 3;
       options.create_if_missing = true;
       options.statistics = rocksdb::CreateDBStatistics();
@@ -1940,6 +2477,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
 
     // Compaction range overlaps files
     Compact(1, "p1", "p9", 1);
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
     ASSERT_EQ("0,1", FilesPerLevel(1));
     ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
     ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
@@ -1955,6 +2493,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
 
     // Compact just the new range
     Compact(1, "b", "f", 1);
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
     ASSERT_EQ("0,2", FilesPerLevel(1));
     ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
     ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
@@ -1971,6 +2510,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
     compact_options.target_path_id = 1;
     compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
     db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
     ASSERT_EQ("0,1", FilesPerLevel(1));
     ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
@@ -2248,16 +2788,16 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
 
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "Compaction::InputCompressionMatchesOutput:Matches",
-      [&](void* arg) { matches++; });
+      [&](void* /*arg*/) { matches++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "Compaction::InputCompressionMatchesOutput:DidntMatch",
-      [&](void* arg) { didnt_match++; });
+      [&](void* /*arg*/) { didnt_match++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:NonTrivial",
-      [&](void* arg) { non_trivial++; });
+      [&](void* /*arg*/) { non_trivial++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:TrivialMove",
-      [&](void* arg) { trivial_move++; });
+      [&](void* /*arg*/) { trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   Reopen(options);
@@ -2345,15 +2885,12 @@ TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
   options.hard_pending_compaction_bytes_limit = 100;
   options.create_if_missing = true;
   DestroyAndReopen(options);
-  ASSERT_EQ(5, db_->GetOptions().base_background_compactions);
   ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
 
-  options.base_background_compactions = 4;
   options.max_background_compactions = 3;
   options.soft_pending_compaction_bytes_limit = 200;
   options.hard_pending_compaction_bytes_limit = 150;
   DestroyAndReopen(options);
-  ASSERT_EQ(3, db_->GetOptions().base_background_compactions);
   ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
 }
 
@@ -2422,13 +2959,14 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
   int32_t non_trivial_move = 0;
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:TrivialMove",
-      [&](void* arg) { trivial_move++; });
+      [&](void* /*arg*/) { trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::BackgroundCompaction:NonTrivial",
-      [&](void* arg) { non_trivial_move++; });
+      [&](void* /*arg*/) { non_trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   Options options = CurrentOptions();
+  options.target_file_size_base = 100000000;
   options.write_buffer_size = 100000000;
   options.max_subcompactions = max_subcompactions_;
   DestroyAndReopen(options);
@@ -2528,13 +3066,11 @@ TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
   // Files 6-9 are the longest span of available files for which
   // work-per-deleted-file decreases (see "score" row above).
   for (int i = 0; i < 10; ++i) {
-    for (int j = 0; j < 2; ++j) {
-      ASSERT_OK(Put(Key(0), ""));  // prevents trivial move
-      if (i == 5) {
-        ASSERT_OK(Put(Key(i + 1), value + value));
-      } else {
-        ASSERT_OK(Put(Key(i + 1), value));
-      }
+    ASSERT_OK(Put(Key(0), ""));  // prevents trivial move
+    if (i == 5) {
+      ASSERT_OK(Put(Key(i + 1), value + value));
+    } else {
+      ASSERT_OK(Put(Key(i + 1), value));
     }
     ASSERT_OK(Flush());
   }
@@ -2549,48 +3085,831 @@ TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
   ASSERT_EQ(2, level_to_files[0].size());
   ASSERT_GT(level_to_files[1].size(), 0);
   for (int i = 0; i < 2; ++i) {
-    ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 21);
+    ASSERT_GE(level_to_files[0][i].fd.file_size, 1 << 21);
   }
 }
 
-INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
-                        ::testing::Values(std::make_tuple(1, true),
-                                          std::make_tuple(1, false),
-                                          std::make_tuple(4, true),
-                                          std::make_tuple(4, false)));
-
-TEST_P(DBCompactionDirectIOTest, DirectIO) {
+TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) {
+  // regression test for issue #2722: L0->L0 compaction can resurrect deleted
+  // keys from older L0 files if L1+ files' key-ranges do not include the key.
   Options options = CurrentOptions();
-  Destroy(options);
-  options.create_if_missing = true;
-  options.disable_auto_compactions = true;
-  options.use_direct_io_for_flush_and_compaction = GetParam();
-  options.env = new MockEnv(Env::Default());
-  Reopen(options);
-  SyncPoint::GetInstance()->SetCallBack(
-      "TableCache::NewIterator:for_compaction", [&](void* arg) {
-        bool* use_direct_reads = static_cast<bool*>(arg);
-        ASSERT_EQ(*use_direct_reads,
-                  options.use_direct_io_for_flush_and_compaction);
-      });
-  SyncPoint::GetInstance()->SetCallBack(
-      "CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
-        bool* use_direct_writes = static_cast<bool*>(arg);
-        ASSERT_EQ(*use_direct_writes,
-                  options.use_direct_io_for_flush_and_compaction);
-      });
-  SyncPoint::GetInstance()->EnableProcessing();
-  CreateAndReopenWithCF({"pikachu"}, options);
-  MakeTables(3, "p", "q", 1);
-  ASSERT_EQ("1,1,1", FilesPerLevel(1));
-  Compact(1, "p1", "p9");
-  ASSERT_EQ("0,0,1", FilesPerLevel(1));
-  Destroy(options);
-  delete options.env;
-}
-
-INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
-                        testing::Bool());
+  options.compression = kNoCompression;
+  options.level0_file_num_compaction_trigger = 5;
+  options.max_background_compactions = 2;
+  options.max_subcompactions = max_subcompactions_;
+  DestroyAndReopen(options);
+
+  const size_t kValueSize = 1 << 20;
+  Random rnd(301);
+  std::string value(RandomString(&rnd, kValueSize));
+
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+      {{"LevelCompactionPicker::PickCompactionBySize:0",
+        "CompactionJob::Run():Start"}});
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  // index:   0   1   2   3   4    5    6   7   8   9
+  // size:  1MB 1MB 1MB 1MB 1MB  1MB  1MB 1MB 1MB 1MB
+  // score:                     1.25 1.33 1.5 2.0 inf
+  //
+  // Files 0-4 will be included in an L0->L1 compaction.
+  //
+  // L0->L0 will be triggered since the sync points guarantee compaction to base
+  // level is still blocked when files 5-9 trigger another compaction. All files
+  // 5-9 are included in the L0->L0 due to work-per-deleted file decreasing.
+  //
+  // Put a key-value in files 0-4. Delete that key in files 5-9. Verify the
+  // L0->L0 preserves the deletion such that the key remains deleted.
+  for (int i = 0; i < 10; ++i) {
+    // key 0 serves both to prevent trivial move and as the key we want to
+    // verify is not resurrected by L0->L0 compaction.
+    if (i < 5) {
+      ASSERT_OK(Put(Key(0), ""));
+    } else {
+      ASSERT_OK(Delete(Key(0)));
+    }
+    ASSERT_OK(Put(Key(i + 1), value));
+    ASSERT_OK(Flush());
+  }
+  dbfull()->TEST_WaitForCompact();
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+
+  std::vector<std::vector<FileMetaData>> level_to_files;
+  dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
+                                  &level_to_files);
+  ASSERT_GE(level_to_files.size(), 2);  // at least L0 and L1
+  // L0 has a single output file from L0->L0
+  ASSERT_EQ(1, level_to_files[0].size());
+  ASSERT_GT(level_to_files[1].size(), 0);
+  ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 22);
+
+  ReadOptions roptions;
+  std::string result;
+  ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound());
+}
+
+TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
+  const int kNumFilesTrigger = 3;
+  Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
+  for (bool use_universal_compaction : {false, true}) {
+    Options options = CurrentOptions();
+    if (use_universal_compaction) {
+      options.compaction_style = kCompactionStyleUniversal;
+    } else {
+      options.compaction_style = kCompactionStyleLevel;
+      options.level_compaction_dynamic_level_bytes = true;
+    }
+    options.num_levels = 4;
+    options.write_buffer_size = 100 << 10;     // 100KB
+    options.target_file_size_base = 32 << 10;  // 32KB
+    options.level0_file_num_compaction_trigger = kNumFilesTrigger;
+    // Trigger compaction if size amplification exceeds 110%
+    options.compaction_options_universal.max_size_amplification_percent = 110;
+    DestroyAndReopen(options);
+
+    int num_bottom_pri_compactions = 0;
+    SyncPoint::GetInstance()->SetCallBack(
+        "DBImpl::BGWorkBottomCompaction",
+        [&](void* /*arg*/) { ++num_bottom_pri_compactions; });
+    SyncPoint::GetInstance()->EnableProcessing();
+
+    Random rnd(301);
+    for (int num = 0; num < kNumFilesTrigger; num++) {
+      ASSERT_EQ(NumSortedRuns(), num);
+      int key_idx = 0;
+      GenerateNewFile(&rnd, &key_idx);
+    }
+    dbfull()->TEST_WaitForCompact();
+
+    ASSERT_EQ(1, num_bottom_pri_compactions);
+
+    // Verify that size amplification did occur
+    ASSERT_EQ(NumSortedRuns(), 1);
+    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  }
+  Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
+}
+
+TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
+  // Deletions can be dropped when compacted to non-last level if they fall
+  // outside the lower-level files' key-ranges.
+  const int kNumL0Files = 4;
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0Files;
+  options.statistics = rocksdb::CreateDBStatistics();
+  DestroyAndReopen(options);
+
+  // put key 1 and 3 in separate L1, L2 files.
+  // So key 0, 2, and 4+ fall outside these levels' key-ranges.
+  for (int level = 2; level >= 1; --level) {
+    for (int i = 0; i < 2; ++i) {
+      Put(Key(2 * i + 1), "val");
+      Flush();
+    }
+    MoveFilesToLevel(level);
+    ASSERT_EQ(2, NumTableFilesAtLevel(level));
+  }
+
+  // Delete keys in range [1, 4]. These L0 files will be compacted with L1:
+  // - Tombstones for keys 2 and 4 can be dropped early.
+  // - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges.
+  for (int i = 0; i < kNumL0Files; ++i) {
+    Put(Key(0), "val");  // sentinel to prevent trivial move
+    Delete(Key(i + 1));
+    Flush();
+  }
+  dbfull()->TEST_WaitForCompact();
+
+  for (int i = 0; i < kNumL0Files; ++i) {
+    std::string value;
+    ASSERT_TRUE(db_->Get(ReadOptions(), Key(i + 1), &value).IsNotFound());
+  }
+  ASSERT_EQ(2, options.statistics->getTickerCount(
+                   COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE));
+  ASSERT_EQ(2,
+            options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE));
+}
+
+TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
+  // https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/
+  // CompactFiles() had a bug where it failed to pick a compaction when an L0
+  // compaction existed, but marked it as scheduled anyways. It'd never be
+  // unmarked as scheduled, so future compactions or DB close could hang.
+  const int kNumL0Files = 5;
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0Files - 1;
+  options.max_background_compactions = 2;
+  DestroyAndReopen(options);
+
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+      {{"LevelCompactionPicker::PickCompaction:Return",
+        "DBCompactionTest::CompactFilesPendingL0Bug:Picked"},
+       {"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted",
+        "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  auto schedule_multi_compaction_token =
+      dbfull()->TEST_write_controler().GetCompactionPressureToken();
+
+  // Files 0-3 will be included in an L0->L1 compaction.
+  //
+  // File 4 will be included in a call to CompactFiles() while the first
+  // compaction is running.
+  for (int i = 0; i < kNumL0Files - 1; ++i) {
+    ASSERT_OK(Put(Key(0), "val"));  // sentinel to prevent trivial move
+    ASSERT_OK(Put(Key(i + 1), "val"));
+    ASSERT_OK(Flush());
+  }
+  TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked");
+  // file 4 flushed after 0-3 picked
+  ASSERT_OK(Put(Key(kNumL0Files), "val"));
+  ASSERT_OK(Flush());
+
+  // previously DB close would hang forever as this situation caused scheduled
+  // compactions count to never decrement to zero.
+  ColumnFamilyMetaData cf_meta;
+  dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
+  ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
+  std::vector<std::string> input_filenames;
+  input_filenames.push_back(cf_meta.levels[0].files.front().name);
+  ASSERT_OK(dbfull()
+                  ->CompactFiles(CompactionOptions(), input_filenames,
+                                 0 /* output_level */));
+  TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
+  // Regression test for bug of not pulling in L0 files that overlap the user-
+  // specified input files in time- and key-ranges.
+  Put(Key(0), "old_val");
+  Flush();
+  Put(Key(0), "new_val");
+  Flush();
+
+  ColumnFamilyMetaData cf_meta;
+  dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
+  ASSERT_GE(cf_meta.levels.size(), 2);
+  ASSERT_EQ(2, cf_meta.levels[0].files.size());
+
+  // Compacting {new L0 file, L1 file} should pull in the old L0 file since it
+  // overlaps in key-range and time-range.
+  std::vector<std::string> input_filenames;
+  input_filenames.push_back(cf_meta.levels[0].files.front().name);
+  ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
+                                   1 /* output_level */));
+  ASSERT_EQ("new_val", Get(Key(0)));
+}
+
+TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
+  // bottom-level files may contain deletions due to snapshots protecting the
+  // deleted keys. Once the snapshot is released, we should see files with many
+  // such deletions undergo single-file compactions.
+  const int kNumKeysPerFile = 1024;
+  const int kNumLevelFiles = 4;
+  const int kValueSize = 128;
+  Options options = CurrentOptions();
+  options.compression = kNoCompression;
+  options.level0_file_num_compaction_trigger = kNumLevelFiles;
+  // inflate it a bit to account for key/metadata overhead
+  options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
+  Reopen(options);
+
+  Random rnd(301);
+  const Snapshot* snapshot = nullptr;
+  for (int i = 0; i < kNumLevelFiles; ++i) {
+    for (int j = 0; j < kNumKeysPerFile; ++j) {
+      ASSERT_OK(
+          Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
+    }
+    if (i == kNumLevelFiles - 1) {
+      snapshot = db_->GetSnapshot();
+      // delete every other key after grabbing a snapshot, so these deletions
+      // and the keys they cover can't be dropped until after the snapshot is
+      // released.
+      for (int j = 0; j < kNumLevelFiles * kNumKeysPerFile; j += 2) {
+        ASSERT_OK(Delete(Key(j)));
+      }
+    }
+    Flush();
+    if (i < kNumLevelFiles - 1) {
+      ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
+    }
+  }
+  dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
+
+  std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
+  db_->GetLiveFilesMetaData(&pre_release_metadata);
+  // just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
+  // files does not need to be preserved in case of a future snapshot.
+  ASSERT_OK(Put(Key(0), "val"));
+  // release snapshot and wait for compactions to finish. Single-file
+  // compactions should be triggered, which reduce the size of each bottom-level
+  // file without changing file count.
+  db_->ReleaseSnapshot(snapshot);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        ASSERT_TRUE(compaction->compaction_reason() ==
+                    CompactionReason::kBottommostFiles);
+      });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+  dbfull()->TEST_WaitForCompact();
+  db_->GetLiveFilesMetaData(&post_release_metadata);
+  ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
+
+  for (size_t i = 0; i < pre_release_metadata.size(); ++i) {
+    const auto& pre_file = pre_release_metadata[i];
+    const auto& post_file = post_release_metadata[i];
+    ASSERT_EQ(1, pre_file.level);
+    ASSERT_EQ(1, post_file.level);
+    // each file is smaller than it was before as it was rewritten without
+    // deletion markers/deleted keys.
+    ASSERT_LT(post_file.size, pre_file.size);
+  }
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
+  const int kNumKeysPerFile = 32;
+  const int kNumLevelFiles = 2;
+  const int kValueSize = 1024;
+
+  Options options = CurrentOptions();
+  options.compression = kNoCompression;
+  options.ttl = 24 * 60 * 60;  // 24 hours
+  options.max_open_files = -1;
+  env_->time_elapse_only_sleep_ = false;
+  options.env = env_;
+
+  env_->addon_time_.store(0);
+  DestroyAndReopen(options);
+
+  Random rnd(301);
+  for (int i = 0; i < kNumLevelFiles; ++i) {
+    for (int j = 0; j < kNumKeysPerFile; ++j) {
+      ASSERT_OK(
+          Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
+    }
+    Flush();
+  }
+  dbfull()->TEST_WaitForCompact();
+  MoveFilesToLevel(3);
+  ASSERT_EQ("0,0,0,2", FilesPerLevel());
+
+  // Delete previously written keys.
+  for (int i = 0; i < kNumLevelFiles; ++i) {
+    for (int j = 0; j < kNumKeysPerFile; ++j) {
+      ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
+    }
+    Flush();
+  }
+  dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ("2,0,0,2", FilesPerLevel());
+  MoveFilesToLevel(1);
+  ASSERT_EQ("0,2,0,2", FilesPerLevel());
+
+  env_->addon_time_.fetch_add(36 * 60 * 60);  // 36 hours
+  ASSERT_EQ("0,2,0,2", FilesPerLevel());
+
+  // Just do a simple write + flush so that the Ttl expired files get
+  // compacted.
+  ASSERT_OK(Put("a", "1"));
+  Flush();
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
+      });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+  dbfull()->TEST_WaitForCompact();
+  // All non-L0 files are deleted, as they contained only deleted data.
+  ASSERT_EQ("1", FilesPerLevel());
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+
+  // Test dynamically changing ttl.
+
+  env_->addon_time_.store(0);
+  DestroyAndReopen(options);
+
+  for (int i = 0; i < kNumLevelFiles; ++i) {
+    for (int j = 0; j < kNumKeysPerFile; ++j) {
+      ASSERT_OK(
+          Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
+    }
+    Flush();
+  }
+  dbfull()->TEST_WaitForCompact();
+  MoveFilesToLevel(3);
+  ASSERT_EQ("0,0,0,2", FilesPerLevel());
+
+  // Delete previously written keys.
+  for (int i = 0; i < kNumLevelFiles; ++i) {
+    for (int j = 0; j < kNumKeysPerFile; ++j) {
+      ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
+    }
+    Flush();
+  }
+  dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ("2,0,0,2", FilesPerLevel());
+  MoveFilesToLevel(1);
+  ASSERT_EQ("0,2,0,2", FilesPerLevel());
+
+  // Move time forward by 12 hours, and make sure that compaction still doesn't
+  // trigger as ttl is set to 24 hours.
+  env_->addon_time_.fetch_add(12 * 60 * 60);
+  ASSERT_OK(Put("a", "1"));
+  Flush();
+  dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ("1,2,0,2", FilesPerLevel());
+
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+        Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+        ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
+      });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Dynamically change ttl to 10 hours.
+  // This should trigger a ttl compaction, as 12 hours have already passed.
+  ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
+  dbfull()->TEST_WaitForCompact();
+  // All non-L0 files are deleted, as they contained only deleted data.
+  ASSERT_EQ("1", FilesPerLevel());
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
+  // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
+  // compaction only triggers flush after it's sure stall won't be triggered for
+  // L0 file count going too high.
+  const int kNumL0FilesTrigger = 4;
+  const int kNumL0FilesLimit = 8;
+  // i == 0: verifies normal case where stall is avoided by delay
+  // i == 1: verifies no delay in edge case where stall trigger is same as
+  //         compaction trigger, so stall can't be avoided
+  for (int i = 0; i < 2; ++i) {
+    Options options = CurrentOptions();
+    options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
+    if (i == 0) {
+      options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
+    } else {
+      options.level0_file_num_compaction_trigger = kNumL0FilesLimit;
+    }
+    Reopen(options);
+
+    if (i == 0) {
+      // ensure the auto compaction doesn't finish until manual compaction has
+      // had a chance to be delayed.
+      rocksdb::SyncPoint::GetInstance()->LoadDependency(
+          {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
+            "CompactionJob::Run():End"}});
+    } else {
+      // ensure the auto-compaction doesn't finish until manual compaction has
+      // continued without delay.
+      rocksdb::SyncPoint::GetInstance()->LoadDependency(
+          {{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
+    }
+    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+    Random rnd(301);
+    for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
+      for (int k = 0; k < 2; ++k) {
+        ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024)));
+      }
+      Flush();
+    }
+    auto manual_compaction_thread = port::Thread([this]() {
+      CompactRangeOptions cro;
+      cro.allow_write_stall = false;
+      db_->CompactRange(cro, nullptr, nullptr);
+    });
+
+    manual_compaction_thread.join();
+    dbfull()->TEST_WaitForCompact();
+    ASSERT_EQ(0, NumTableFilesAtLevel(0));
+    ASSERT_GT(NumTableFilesAtLevel(1), 0);
+    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  }
+}
+
+TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
+  // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
+  // compaction only triggers flush after it's sure stall won't be triggered for
+  // immutable memtable count going too high.
+  const int kNumImmMemTableLimit = 8;
+  // i == 0: verifies normal case where stall is avoided by delay
+  // i == 1: verifies no delay in edge case where stall trigger is same as flush
+  //         trigger, so stall can't be avoided
+  for (int i = 0; i < 2; ++i) {
+    Options options = CurrentOptions();
+    options.disable_auto_compactions = true;
+    // the delay limit is one less than the stop limit. This test focuses on
+    // avoiding delay limit, but this option sets stop limit, so add one.
+    options.max_write_buffer_number = kNumImmMemTableLimit + 1;
+    if (i == 1) {
+      options.min_write_buffer_number_to_merge = kNumImmMemTableLimit;
+    }
+    Reopen(options);
+
+    if (i == 0) {
+      // ensure the flush doesn't finish until manual compaction has had a
+      // chance to be delayed.
+      rocksdb::SyncPoint::GetInstance()->LoadDependency(
+          {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
+            "FlushJob::WriteLevel0Table"}});
+    } else {
+      // ensure the flush doesn't finish until manual compaction has continued
+      // without delay.
+      rocksdb::SyncPoint::GetInstance()->LoadDependency(
+          {{"DBImpl::FlushMemTable:StallWaitDone",
+            "FlushJob::WriteLevel0Table"}});
+    }
+    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+    Random rnd(301);
+    for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) {
+      ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
+      FlushOptions flush_opts;
+      flush_opts.wait = false;
+      flush_opts.allow_write_stall = true;
+      dbfull()->Flush(flush_opts);
+    }
+
+    auto manual_compaction_thread = port::Thread([this]() {
+      CompactRangeOptions cro;
+      cro.allow_write_stall = false;
+      db_->CompactRange(cro, nullptr, nullptr);
+    });
+
+    manual_compaction_thread.join();
+    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_EQ(0, NumTableFilesAtLevel(0));
+    ASSERT_GT(NumTableFilesAtLevel(1), 0);
+    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  }
+}
+
+TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
+  // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay
+  // does not hang if CF is dropped or DB is closed
+  const int kNumL0FilesTrigger = 4;
+  const int kNumL0FilesLimit = 8;
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
+  options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
+  // i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it
+  // i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to
+  //         simulate what happens during Close as we can't call Close (it
+  //         blocks on the auto-compaction, making a cycle).
+  for (int i = 0; i < 2; ++i) {
+    CreateAndReopenWithCF({"one"}, options);
+    // The calls to close CF/DB wait until the manual compaction stalls.
+    // The auto-compaction waits until the manual compaction finishes to ensure
+    // the signal comes from closing CF/DB, not from compaction making progress.
+    rocksdb::SyncPoint::GetInstance()->LoadDependency(
+        {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
+          "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
+         {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
+          "CompactionJob::Run():End"}});
+    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+    Random rnd(301);
+    for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
+      for (int k = 0; k < 2; ++k) {
+        ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024)));
+      }
+      Flush(1);
+    }
+    auto manual_compaction_thread = port::Thread([this]() {
+      CompactRangeOptions cro;
+      cro.allow_write_stall = false;
+      ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
+                      .IsShutdownInProgress());
+    });
+
+    TEST_SYNC_POINT(
+        "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown");
+    if (i == 0) {
+      ASSERT_OK(db_->DropColumnFamily(handles_[1]));
+    } else {
+      dbfull()->CancelAllBackgroundWork(false /* wait */);
+    }
+    manual_compaction_thread.join();
+    TEST_SYNC_POINT(
+        "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
+    dbfull()->TEST_WaitForCompact();
+    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  }
+}
+
+TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
+  // Verify that, when `CompactRangeOptions::allow_write_stall == false`,
+  // CompactRange skips its flush if the delay is long enough that the memtables
+  // existing at the beginning of the call have already been flushed.
+  const int kNumL0FilesTrigger = 4;
+  const int kNumL0FilesLimit = 8;
+  Options options = CurrentOptions();
+  options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
+  options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
+  Reopen(options);
+
+  Random rnd(301);
+  // The manual flush includes the memtable that was active when CompactRange
+  // began. So it unblocks CompactRange and precludes its flush. Throughout the
+  // test, stall conditions are upheld via high L0 file count.
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
+        "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
+       {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
+        "DBImpl::FlushMemTable:StallWaitDone"},
+       {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  //used for the delayable flushes
+  FlushOptions flush_opts;
+  flush_opts.allow_write_stall = true;
+  for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
+    for (int j = 0; j < 2; ++j) {
+      ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
+    }
+    dbfull()->Flush(flush_opts);
+  }
+  auto manual_compaction_thread = port::Thread([this]() {
+    CompactRangeOptions cro;
+    cro.allow_write_stall = false;
+    db_->CompactRange(cro, nullptr, nullptr);
+  });
+
+  TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
+  Put(ToString(0), RandomString(&rnd, 1024));
+  dbfull()->Flush(flush_opts);
+  Put(ToString(0), RandomString(&rnd, 1024));
+  TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
+  manual_compaction_thread.join();
+
+  // If CompactRange's flush was skipped, the final Put above will still be
+  // in the active memtable.
+  std::string num_keys_in_memtable;
+  db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
+  ASSERT_EQ(ToString(1), num_keys_in_memtable);
+
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
+  // Verify memtable only gets flushed if it contains data overlapping the range
+  // provided to `CompactRange`. Tests all kinds of overlap/non-overlap.
+  const int kNumEndpointKeys = 5;
+  std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"};
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  Reopen(options);
+
+  // One extra iteration for nullptr, which means left side of interval is
+  // unbounded.
+  for (int i = 0; i <= kNumEndpointKeys; ++i) {
+    Slice begin;
+    Slice* begin_ptr;
+    if (i == 0) {
+      begin_ptr = nullptr;
+    } else {
+      begin = keys[i - 1];
+      begin_ptr = &begin;
+    }
+    // Start at `i` so right endpoint comes after left endpoint. One extra
+    // iteration for nullptr, which means right side of interval is unbounded.
+    for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) {
+      Slice end;
+      Slice* end_ptr;
+      if (j == kNumEndpointKeys) {
+        end_ptr = nullptr;
+      } else {
+        end = keys[j];
+        end_ptr = &end;
+      }
+      ASSERT_OK(Put("b", "val"));
+      ASSERT_OK(Put("d", "val"));
+      CompactRangeOptions compact_range_opts;
+      ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr));
+
+      uint64_t get_prop_tmp, num_memtable_entries = 0;
+      ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables,
+                                      &get_prop_tmp));
+      num_memtable_entries += get_prop_tmp;
+      ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
+                                      &get_prop_tmp));
+      num_memtable_entries += get_prop_tmp;
+      if (begin_ptr == nullptr || end_ptr == nullptr ||
+          (i <= 4 && j >= 1 && (begin != "c" || end != "c"))) {
+        // In this case `CompactRange`'s range overlapped in some way with the
+        // memtable's range, so flush should've happened. Then "b" and "d" won't
+        // be in the memtable.
+        ASSERT_EQ(0, num_memtable_entries);
+      } else {
+        ASSERT_EQ(2, num_memtable_entries);
+        // flush anyways to prepare for next iteration
+        db_->Flush(FlushOptions());
+      }
+    }
+  }
+}
+
+TEST_F(DBCompactionTest, CompactionStatsTest) {
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = 2;
+  CompactionStatsCollector* collector = new CompactionStatsCollector();
+  options.listeners.emplace_back(collector);
+  DestroyAndReopen(options);
+
+  for (int i = 0; i < 32; i++) {
+    for (int j = 0; j < 5000; j++) {
+      Put(std::to_string(j), std::string(1, 'A'));
+    }
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+  }
+  dbfull()->TEST_WaitForCompact();
+  ColumnFamilyHandleImpl* cfh =
+      static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
+  ColumnFamilyData* cfd = cfh->cfd();
+
+  VerifyCompactionStats(*cfd, *collector);
+}
+
+TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
+  // LSM setup:
+  // L1:      [ba bz]
+  // L2: [a b]       [c d]
+  // L3: [a b]       [c d]
+  //
+  // Thread 1:                        Thread 2:
+  // Begin compacting all L2->L3
+  //                                  Compact [ba bz] L1->L3
+  // End compacting all L2->L3
+  //
+  // The compaction operation in thread 2 should be disallowed because the range
+  // overlaps with the compaction in thread 1, which also covers that range in
+  // L3.
+  Options options = CurrentOptions();
+  FlushedFileCollector* collector = new FlushedFileCollector();
+  options.listeners.emplace_back(collector);
+  Reopen(options);
+
+  for (int level = 3; level >= 2; --level) {
+    ASSERT_OK(Put("a", "val"));
+    ASSERT_OK(Put("b", "val"));
+    ASSERT_OK(Flush());
+    ASSERT_OK(Put("c", "val"));
+    ASSERT_OK(Put("d", "val"));
+    ASSERT_OK(Flush());
+    MoveFilesToLevel(level);
+  }
+  ASSERT_OK(Put("ba", "val"));
+  ASSERT_OK(Put("bz", "val"));
+  ASSERT_OK(Flush());
+  MoveFilesToLevel(1);
+
+  SyncPoint::GetInstance()->LoadDependency({
+      {"CompactFilesImpl:0",
+       "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"},
+      {"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End",
+       "CompactFilesImpl:1"},
+  });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  auto bg_thread = port::Thread([&]() {
+    // Thread 1
+    std::vector<std::string> filenames = collector->GetFlushedFiles();
+    filenames.pop_back();
+    ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames,
+                                3 /* output_level */));
+  });
+
+  // Thread 2
+  TEST_SYNC_POINT(
+      "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin");
+  std::string filename = collector->GetFlushedFiles().back();
+  ASSERT_FALSE(
+      db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */)
+          .ok());
+  TEST_SYNC_POINT(
+      "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End");
+
+  bg_thread.join();
+}
+
+TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
+  Options options = CurrentOptions();
+  SstStatsCollector* collector = new SstStatsCollector();
+  options.level0_file_num_compaction_trigger = 2;
+  options.listeners.emplace_back(collector);
+  Reopen(options);
+
+  // Make sure the L0 files overlap to prevent trivial move.
+  ASSERT_OK(Put("a", "val"));
+  ASSERT_OK(Put("b", "val"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Delete("a"));
+  ASSERT_OK(Delete("b"));
+  ASSERT_OK(Flush());
+
+  dbfull()->TEST_WaitForCompact();
+  ASSERT_EQ(NumTableFilesAtLevel(0), 0);
+  ASSERT_EQ(NumTableFilesAtLevel(1), 0);
+
+  // Expect one file creation to start for each flush, and zero for compaction
+  // since no keys are written.
+  ASSERT_EQ(2, collector->num_ssts_creation_started());
+}
+
+INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
+                        ::testing::Values(std::make_tuple(1, true),
+                                          std::make_tuple(1, false),
+                                          std::make_tuple(4, true),
+                                          std::make_tuple(4, false)));
+
+TEST_P(DBCompactionDirectIOTest, DirectIO) {
+  Options options = CurrentOptions();
+  Destroy(options);
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+  options.use_direct_io_for_flush_and_compaction = GetParam();
+  options.env = new MockEnv(Env::Default());
+  Reopen(options);
+  bool readahead = false;
+  SyncPoint::GetInstance()->SetCallBack(
+      "TableCache::NewIterator:for_compaction", [&](void* arg) {
+        bool* use_direct_reads = static_cast<bool*>(arg);
+        ASSERT_EQ(*use_direct_reads,
+                  options.use_direct_reads);
+      });
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
+        bool* use_direct_writes = static_cast<bool*>(arg);
+        ASSERT_EQ(*use_direct_writes,
+                  options.use_direct_io_for_flush_and_compaction);
+      });
+  if (options.use_direct_io_for_flush_and_compaction) {
+    SyncPoint::GetInstance()->SetCallBack(
+        "SanitizeOptions:direct_io", [&](void* /*arg*/) {
+          readahead = true;
+        });
+  }
+  SyncPoint::GetInstance()->EnableProcessing();
+  CreateAndReopenWithCF({"pikachu"}, options);
+  MakeTables(3, "p", "q", 1);
+  ASSERT_EQ("1,1,1", FilesPerLevel(1));
+  Compact(1, "p1", "p9");
+  ASSERT_EQ(readahead, options.use_direct_reads);
+  ASSERT_EQ("0,0,1", FilesPerLevel(1));
+  Destroy(options);
+  delete options.env;
+}
+
+INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
+                        testing::Bool());
 
 class CompactionPriTest : public DBTestBase,
                           public testing::WithParamInterface<uint32_t> {
@@ -2642,6 +3961,50 @@ INSTANTIATE_TEST_CASE_P(
                       CompactionPri::kOldestSmallestSeqFirst,
                       CompactionPri::kMinOverlappingRatio));
 
+class NoopMergeOperator : public MergeOperator {
+ public:
+  NoopMergeOperator() {}
+
+  virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
+                           MergeOperationOutput* merge_out) const override {
+    std::string val("bar");
+    merge_out->new_value = val;
+    return true;
+  }
+
+  virtual const char* Name() const override { return "Noop"; }
+};
+
+TEST_F(DBCompactionTest, PartialManualCompaction) {
+  Options opts = CurrentOptions();
+  opts.num_levels = 3;
+  opts.level0_file_num_compaction_trigger = 10;
+  opts.compression = kNoCompression;
+  opts.merge_operator.reset(new NoopMergeOperator());
+  opts.target_file_size_base = 10240;
+  DestroyAndReopen(opts);
+
+  Random rnd(301);
+  for (auto i = 0; i < 8; ++i) {
+    for (auto j = 0; j < 10; ++j) {
+      Merge("foo", RandomString(&rnd, 1024));
+    }
+    Flush();
+  }
+
+  MoveFilesToLevel(2);
+
+  std::string prop;
+  EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
+  uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
+  ASSERT_OK(dbfull()->SetOptions(
+      {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
+
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+  dbfull()->CompactRange(cro, nullptr, nullptr);
+}
+
 #endif // !defined(ROCKSDB_LITE)
 }  // namespace rocksdb
 
@@ -2651,6 +4014,8 @@ int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 #else
+  (void) argc;
+  (void) argv;
   return 0;
 #endif
 }