]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction/compaction_picker_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_picker_test.cc
index 278bdb06a3fd209e581c363846974d43dd9060f9..c415fd811a4de936ea5eb4d6a2c149e4b27ef843 100644 (file)
@@ -12,7 +12,6 @@
 #include "db/compaction/compaction_picker_level.h"
 #include "db/compaction/compaction_picker_universal.h"
 
-#include "logging/logging.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 #include "util/string_util.h"
@@ -33,6 +32,7 @@ class CompactionPickerTest : public testing::Test {
   Options options_;
   ImmutableCFOptions ioptions_;
   MutableCFOptions mutable_cf_options_;
+  MutableDBOptions mutable_db_options_;
   LevelCompactionPicker level_compaction_picker;
   std::string cf_name_;
   CountingLogger logger_;
@@ -52,6 +52,7 @@ class CompactionPickerTest : public testing::Test {
         icmp_(ucmp_),
         ioptions_(options_),
         mutable_cf_options_(options_),
+        mutable_db_options_(),
         level_compaction_picker(ioptions_, &icmp_),
         cf_name_("dummy"),
         log_buffer_(InfoLogLevel::INFO_LEVEL, &logger_),
@@ -78,8 +79,17 @@ class CompactionPickerTest : public testing::Test {
     vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
   }
 
+  // Create a new VersionStorageInfo object so we can add mode files and then
+  // merge it with the existing VersionStorageInfo
+  void AddVersionStorage() {
+    temp_vstorage_.reset(new VersionStorageInfo(
+        &icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style,
+        vstorage_.get(), false));
+  }
+
   void DeleteVersionStorage() {
     vstorage_.reset();
+    temp_vstorage_.reset();
     files_.clear();
     file_map_.clear();
     input_files_.clear();
@@ -88,18 +98,24 @@ class CompactionPickerTest : public testing::Test {
   void Add(int level, uint32_t file_number, const char* smallest,
            const char* largest, uint64_t file_size = 1, uint32_t path_id = 0,
            SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
-           size_t compensated_file_size = 0) {
-    assert(level < vstorage_->num_levels());
+           size_t compensated_file_size = 0, bool marked_for_compact = false) {
+    VersionStorageInfo* vstorage;
+    if (temp_vstorage_) {
+      vstorage = temp_vstorage_.get();
+    } else {
+      vstorage = vstorage_.get();
+    }
+    assert(level < vstorage->num_levels());
     FileMetaData* f = new FileMetaData(
         file_number, path_id, file_size,
         InternalKey(smallest, smallest_seq, kTypeValue),
         InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
-        largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
+        largest_seq, marked_for_compact, kInvalidBlobFileNumber,
         kUnknownOldestAncesterTime, kUnknownFileCreationTime,
         kUnknownFileChecksum, kUnknownFileChecksumFuncName);
     f->compensated_file_size =
         (compensated_file_size != 0) ? compensated_file_size : file_size;
-    vstorage_->AddFile(level, f);
+    vstorage->AddFile(level, f);
     files_.emplace_back(f);
     file_map_.insert({file_number, {f, level}});
   }
@@ -122,6 +138,12 @@ class CompactionPickerTest : public testing::Test {
   }
 
   void UpdateVersionStorageInfo() {
+    if (temp_vstorage_) {
+      VersionBuilder builder(FileOptions(), &ioptions_, nullptr,
+                             vstorage_.get(), nullptr);
+      builder.SaveTo(temp_vstorage_.get());
+      vstorage_ = std::move(temp_vstorage_);
+    }
     vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
     vstorage_->UpdateFilesByCompactionPri(ioptions_.compaction_pri);
     vstorage_->UpdateNumNonEmptyLevels();
@@ -132,13 +154,36 @@ class CompactionPickerTest : public testing::Test {
     vstorage_->ComputeFilesMarkedForCompaction();
     vstorage_->SetFinalized();
   }
+  void AddFileToVersionStorage(int level, uint32_t file_number,
+                               const char* smallest, const char* largest,
+                               uint64_t file_size = 1, uint32_t path_id = 0,
+                               SequenceNumber smallest_seq = 100,
+                               SequenceNumber largest_seq = 100,
+                               size_t compensated_file_size = 0,
+                               bool marked_for_compact = false) {
+    VersionStorageInfo* base_vstorage = vstorage_.release();
+    vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels,
+                                           kCompactionStyleUniversal,
+                                           base_vstorage, false));
+    Add(level, file_number, smallest, largest, file_size, path_id, smallest_seq,
+        largest_seq, compensated_file_size, marked_for_compact);
+
+    VersionBuilder builder(FileOptions(), &ioptions_, nullptr, base_vstorage,
+                           nullptr);
+    builder.SaveTo(vstorage_.get());
+    UpdateVersionStorageInfo();
+  }
+
+ private:
+  std::unique_ptr<VersionStorageInfo> temp_vstorage_;
 };
 
 TEST_F(CompactionPickerTest, Empty) {
   NewVersionStorage(6, kCompactionStyleLevel);
   UpdateVersionStorageInfo();
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() == nullptr);
 }
 
@@ -149,7 +194,8 @@ TEST_F(CompactionPickerTest, Single) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() == nullptr);
 }
 
@@ -162,7 +208,8 @@ TEST_F(CompactionPickerTest, Level0Trigger) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_files(0));
   ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@@ -175,7 +222,8 @@ TEST_F(CompactionPickerTest, Level1Trigger) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_files(0));
   ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber());
@@ -193,7 +241,8 @@ TEST_F(CompactionPickerTest, Level1Trigger2) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_files(0));
   ASSERT_EQ(2U, compaction->num_input_files(1));
@@ -224,7 +273,8 @@ TEST_F(CompactionPickerTest, LevelMaxScore) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_files(0));
   ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber());
@@ -271,7 +321,8 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_files(0));
   ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@@ -295,7 +346,8 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic2) {
   ASSERT_EQ(vstorage_->base_level(), num_levels - 2);
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_files(0));
   ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@@ -320,7 +372,8 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic3) {
   ASSERT_EQ(vstorage_->base_level(), num_levels - 3);
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_files(0));
   ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@@ -349,7 +402,8 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic4) {
   ASSERT_EQ(vstorage_->base_level(), num_levels - 3);
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_files(0));
   ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@@ -371,8 +425,8 @@ TEST_F(CompactionPickerTest, LevelTriggerDynamic4) {
   mutable_cf_options_.max_bytes_for_level_multiplier = 10;
   NewVersionStorage(num_levels, kCompactionStyleLevel);
   Add(0, 1U, "150", "200");
-  Add(num_levels - 1, 3U, "200", "250", 300U);
-  Add(num_levels - 1, 4U, "300", "350", 3000U);
+  Add(num_levels - 1, 2U, "200", "250", 300U);
+  Add(num_levels - 1, 3U, "300", "350", 3000U);
   Add(num_levels - 1, 4U, "400", "450", 3U);
   Add(num_levels - 2, 5U, "150", "180", 300U);
   Add(num_levels - 2, 6U, "181", "350", 500U);
@@ -381,7 +435,8 @@ TEST_F(CompactionPickerTest, LevelTriggerDynamic4) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_files(0));
   ASSERT_EQ(5U, compaction->input(0, 0)->fd.GetNumber());
@@ -438,7 +493,8 @@ TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
 
   // output level should be the one above the bottom-most
   ASSERT_EQ(1, compaction->output_level());
@@ -472,7 +528,8 @@ TEST_F(CompactionPickerTest, CannotTrivialMoveUniversal) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
 
   ASSERT_TRUE(!compaction->is_trivial_move());
 }
@@ -498,7 +555,8 @@ TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
 
   ASSERT_TRUE(compaction->is_trivial_move());
 }
@@ -526,7 +584,8 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction1) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
 
   ASSERT_TRUE(compaction);
   ASSERT_EQ(4, compaction->output_level());
@@ -556,7 +615,8 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction2) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
 
   ASSERT_FALSE(compaction);
 }
@@ -582,7 +642,8 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction3) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
 
   ASSERT_FALSE(compaction);
 }
@@ -612,7 +673,8 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction4) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
   ASSERT_TRUE(!compaction ||
               compaction->start_level() != compaction->output_level());
 }
@@ -632,7 +694,8 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction5) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
   ASSERT_TRUE(compaction);
   ASSERT_EQ(0, compaction->start_level());
   ASSERT_EQ(1U, compaction->num_input_files(0));
@@ -656,7 +719,8 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction6) {
 
   std::unique_ptr<Compaction> compaction(
       universal_compaction_picker.PickCompaction(
-          cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
   ASSERT_TRUE(compaction);
   ASSERT_EQ(4, compaction->start_level());
   ASSERT_EQ(2U, compaction->num_input_files(0));
@@ -716,7 +780,8 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping1) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_files(0));
   // Pick file 8 because it overlaps with 0 files on level 3.
@@ -739,7 +804,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping2) {
   Add(2, 8U, "201", "300",
       60000000U);  // Overlaps with file 28, 29, total size 521M
 
-  Add(3, 26U, "100", "110", 261000000U);
+  Add(3, 25U, "100", "110", 261000000U);
   Add(3, 26U, "150", "170", 261000000U);
   Add(3, 27U, "171", "179", 260000000U);
   Add(3, 28U, "191", "220", 260000000U);
@@ -748,7 +813,8 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping2) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_files(0));
   // Picking file 7 because overlapping ratio is the biggest.
@@ -775,7 +841,8 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping3) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_files(0));
   // Picking file 8 because overlapping ratio is the biggest.
@@ -804,7 +871,8 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping4) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_files(0));
   // Picking file 8 because overlapping ratio is the biggest.
@@ -831,7 +899,8 @@ TEST_F(CompactionPickerTest, ParentIndexResetBug) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
 }
 
 // This test checks ExpandWhileOverlapping() by having overlapping user keys
@@ -848,7 +917,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-              cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_levels());
   ASSERT_EQ(2U, compaction->num_input_files(0));
@@ -867,7 +937,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys2) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-              cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(2U, compaction->num_input_files(0));
@@ -894,7 +965,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys3) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-              cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(5U, compaction->num_input_files(0));
@@ -924,7 +996,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys4) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(1U, compaction->num_input_files(0));
@@ -947,7 +1020,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys5) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() == nullptr);
 }
 
@@ -968,7 +1042,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys6) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(1U, compaction->num_input_files(0));
@@ -988,7 +1063,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys7) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_GE(1U, compaction->num_input_files(0));
@@ -1016,7 +1092,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys8) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(3U, compaction->num_input_files(0));
@@ -1048,7 +1125,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys9) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(5U, compaction->num_input_files(0));
@@ -1088,7 +1166,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys10) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(1U, compaction->num_input_files(0));
@@ -1126,7 +1205,8 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys11) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(1U, compaction->num_input_files(0));
@@ -1163,7 +1243,8 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) {
   ASSERT_EQ(0, vstorage_->CompactionScoreLevel(0));
   ASSERT_EQ(1, vstorage_->CompactionScoreLevel(1));
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() == nullptr);
 }
 
@@ -1193,7 +1274,8 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri2) {
   ASSERT_EQ(0, vstorage_->CompactionScoreLevel(0));
   ASSERT_EQ(1, vstorage_->CompactionScoreLevel(1));
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
 }
 
@@ -1226,7 +1308,8 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri3) {
   ASSERT_EQ(1, vstorage_->CompactionScoreLevel(0));
   ASSERT_EQ(0, vstorage_->CompactionScoreLevel(1));
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
 }
 
@@ -1255,7 +1338,7 @@ TEST_F(CompactionPickerTest, EstimateCompactionBytesNeeded1) {
   // Size ratio L4/L3 is 9.9
   // After merge from L3, L4 size is 1000900
   Add(4, 11U, "400", "500", 999900);
-  Add(5, 11U, "400", "500", 8007200);
+  Add(5, 12U, "400", "500", 8007200);
 
   UpdateVersionStorageInfo();
 
@@ -1520,7 +1603,8 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesHit) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(1U, compaction->num_input_files(0));
@@ -1544,7 +1628,8 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesNotHit) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(2U, compaction->num_input_levels());
   ASSERT_EQ(3U, compaction->num_input_files(0));
@@ -1568,16 +1653,43 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOn) {
 
   Add(3, 5U, "120", "130", 7000U);
   Add(3, 6U, "170", "180", 7000U);
-  Add(3, 5U, "220", "230", 7000U);
-  Add(3, 5U, "270", "280", 7000U);
+  Add(3, 7U, "220", "230", 7000U);
+  Add(3, 8U, "270", "280", 7000U);
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-    cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_TRUE(compaction->IsTrivialMove());
 }
 
+TEST_F(CompactionPickerTest, IsTrivialMoveOffSstPartitioned) {
+  mutable_cf_options_.max_bytes_for_level_base = 10000u;
+  mutable_cf_options_.max_compaction_bytes = 10001u;
+  ioptions_.level_compaction_dynamic_level_bytes = false;
+  ioptions_.sst_partitioner_factory = NewSstPartitionerFixedPrefixFactory(1);
+  NewVersionStorage(6, kCompactionStyleLevel);
+  // A compaction should be triggered and pick file 2
+  Add(1, 1U, "100", "150", 3000U);
+  Add(1, 2U, "151", "200", 3001U);
+  Add(1, 3U, "201", "250", 3000U);
+  Add(1, 4U, "251", "300", 3000U);
+
+  Add(3, 5U, "120", "130", 7000U);
+  Add(3, 6U, "170", "180", 7000U);
+  Add(3, 7U, "220", "230", 7000U);
+  Add(3, 8U, "270", "280", 7000U);
+  UpdateVersionStorageInfo();
+
+  std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
+  ASSERT_TRUE(compaction.get() != nullptr);
+  // No trivial move, because partitioning is applied
+  ASSERT_TRUE(!compaction->IsTrivialMove());
+}
+
 TEST_F(CompactionPickerTest, IsTrivialMoveOff) {
   mutable_cf_options_.max_bytes_for_level_base = 1000000u;
   mutable_cf_options_.max_compaction_bytes = 10000u;
@@ -1594,7 +1706,8 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOff) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-    cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_FALSE(compaction->IsTrivialMove());
 }
@@ -1619,7 +1732,8 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_levels());
   ASSERT_EQ(1U, compaction->num_input_files(0));
@@ -1628,7 +1742,8 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) {
   ASSERT_EQ(2, vstorage_->NextCompactionIndex(1 /* level */));
 
   compaction.reset(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_levels());
   ASSERT_EQ(1U, compaction->num_input_files(0));
@@ -1637,7 +1752,8 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) {
   ASSERT_EQ(3, vstorage_->NextCompactionIndex(1 /* level */));
 
   compaction.reset(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() == nullptr);
   ASSERT_EQ(4, vstorage_->NextCompactionIndex(1 /* level */));
 }
@@ -1662,7 +1778,8 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesNotHit) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_levels());
   ASSERT_EQ(5U, compaction->num_input_files(0));
@@ -1692,7 +1809,8 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesHit) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_levels());
   ASSERT_EQ(4U, compaction->num_input_files(0));
@@ -1724,7 +1842,8 @@ TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) {
   UpdateVersionStorageInfo();
 
   std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
-      cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_, 107));
+      cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+      &log_buffer_, 107));
   ASSERT_TRUE(compaction.get() != nullptr);
   ASSERT_EQ(1U, compaction->num_input_levels());
   ASSERT_EQ(4U, compaction->num_input_files(0));
@@ -1733,6 +1852,336 @@ TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) {
   ASSERT_EQ(0, compaction->output_level());
 }
 
+#ifndef ROCKSDB_LITE
+TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) {
+  const uint64_t kFileSize = 100000;
+
+  ioptions_.compaction_style = kCompactionStyleUniversal;
+  UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
+
+  // This test covers the case where a "regular" universal compaction is
+  // scheduled first, followed by a delete triggered compaction. The latter
+  // should fail
+  NewVersionStorage(5, kCompactionStyleUniversal);
+
+  Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
+  Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
+  Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300);
+  Add(3, 5U, "010", "080", 8 * kFileSize, 0, 200, 251);
+  Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
+  Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
+
+  UpdateVersionStorageInfo();
+
+  std::unique_ptr<Compaction> compaction(
+      universal_compaction_picker.PickCompaction(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
+
+  ASSERT_TRUE(compaction);
+  // Validate that its a compaction to reduce sorted runs
+  ASSERT_EQ(CompactionReason::kUniversalSortedRunNum,
+            compaction->compaction_reason());
+  ASSERT_EQ(0, compaction->output_level());
+  ASSERT_EQ(0, compaction->start_level());
+  ASSERT_EQ(2U, compaction->num_input_files(0));
+
+  AddVersionStorage();
+  // Simulate a flush and mark the file for compaction
+  Add(0, 7U, "150", "200", kFileSize, 0, 551, 600, 0, true);
+  UpdateVersionStorageInfo();
+
+  std::unique_ptr<Compaction> compaction2(
+      universal_compaction_picker.PickCompaction(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
+  ASSERT_FALSE(compaction2);
+}
+
+TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) {
+  const uint64_t kFileSize = 100000;
+
+  ioptions_.compaction_style = kCompactionStyleUniversal;
+  UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
+
+  // This test covers the case where a delete triggered compaction is
+  // scheduled first, followed by a "regular" compaction. The latter
+  // should fail
+  NewVersionStorage(5, kCompactionStyleUniversal);
+
+  // Mark file number 4 for compaction
+  Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
+  Add(3, 5U, "240", "290", 8 * kFileSize, 0, 201, 250);
+  Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
+  Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
+  UpdateVersionStorageInfo();
+
+  std::unique_ptr<Compaction> compaction(
+      universal_compaction_picker.PickCompaction(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
+
+  ASSERT_TRUE(compaction);
+  // Validate that its a delete triggered compaction
+  ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
+            compaction->compaction_reason());
+  ASSERT_EQ(3, compaction->output_level());
+  ASSERT_EQ(0, compaction->start_level());
+  ASSERT_EQ(1U, compaction->num_input_files(0));
+  ASSERT_EQ(1U, compaction->num_input_files(1));
+
+  AddVersionStorage();
+  Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
+  Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
+  UpdateVersionStorageInfo();
+
+  std::unique_ptr<Compaction> compaction2(
+      universal_compaction_picker.PickCompaction(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
+  ASSERT_FALSE(compaction2);
+}
+
+TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) {
+  // The case where universal periodic compaction can be picked
+  // with some newer files being compacted.
+  const uint64_t kFileSize = 100000;
+
+  ioptions_.compaction_style = kCompactionStyleUniversal;
+
+  bool input_level_overlap = false;
+  bool output_level_overlap = false;
+  // Let's mark 2 files in 2 different levels for compaction. The
+  // compaction picker will randomly pick one, so use the sync point to
+  // ensure a deterministic order. Loop until both cases are covered
+  size_t random_index = 0;
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionPicker::PickFilesMarkedForCompaction", [&](void* arg) {
+        size_t* index = static_cast<size_t*>(arg);
+        *index = random_index;
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+  while (!input_level_overlap || !output_level_overlap) {
+    // Ensure that the L0 file gets picked first
+    random_index = !input_level_overlap ? 0 : 1;
+    UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
+    NewVersionStorage(5, kCompactionStyleUniversal);
+
+    Add(0, 1U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
+    Add(3, 2U, "010", "020", 2 * kFileSize, 0, 201, 248);
+    Add(3, 3U, "250", "270", 2 * kFileSize, 0, 202, 249);
+    Add(3, 4U, "290", "310", 2 * kFileSize, 0, 203, 250);
+    Add(3, 5U, "310", "320", 2 * kFileSize, 0, 204, 251, 0, true);
+    Add(4, 6U, "301", "350", 8 * kFileSize, 0, 101, 150);
+    Add(4, 7U, "501", "750", 8 * kFileSize, 0, 101, 150);
+    UpdateVersionStorageInfo();
+
+    std::unique_ptr<Compaction> compaction(
+        universal_compaction_picker.PickCompaction(
+            cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+            &log_buffer_));
+
+    ASSERT_TRUE(compaction);
+    // Validate that its a delete triggered compaction
+    ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
+              compaction->compaction_reason());
+    ASSERT_TRUE(compaction->start_level() == 0 ||
+                compaction->start_level() == 3);
+    if (compaction->start_level() == 0) {
+      // The L0 file was picked. The next compaction will detect an
+      // overlap on its input level
+      input_level_overlap = true;
+      ASSERT_EQ(3, compaction->output_level());
+      ASSERT_EQ(1U, compaction->num_input_files(0));
+      ASSERT_EQ(3U, compaction->num_input_files(1));
+    } else {
+      // The level 3 file was picked. The next compaction will pick
+      // the L0 file and will detect overlap when adding output
+      // level inputs
+      output_level_overlap = true;
+      ASSERT_EQ(4, compaction->output_level());
+      ASSERT_EQ(2U, compaction->num_input_files(0));
+      ASSERT_EQ(1U, compaction->num_input_files(1));
+    }
+
+    vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
+    // After recomputing the compaction score, only one marked file will remain
+    random_index = 0;
+    std::unique_ptr<Compaction> compaction2(
+        universal_compaction_picker.PickCompaction(
+            cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+            &log_buffer_));
+    ASSERT_FALSE(compaction2);
+    DeleteVersionStorage();
+  }
+}
+
+TEST_F(CompactionPickerTest, UniversalMarkedL0NoOverlap) {
+  const uint64_t kFileSize = 100000;
+
+  ioptions_.compaction_style = kCompactionStyleUniversal;
+  UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
+
+  // This test covers the case where a delete triggered compaction is
+  // scheduled and should result in a full compaction
+  NewVersionStorage(1, kCompactionStyleUniversal);
+
+  // Mark file number 4 for compaction
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 260, 300, 0, true);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 201, 250);
+  Add(0, 3U, "301", "350", 4 * kFileSize, 0, 101, 150);
+  Add(0, 6U, "501", "750", 8 * kFileSize, 0, 50, 100);
+  UpdateVersionStorageInfo();
+
+  std::unique_ptr<Compaction> compaction(
+      universal_compaction_picker.PickCompaction(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
+
+  ASSERT_TRUE(compaction);
+  // Validate that its a delete triggered compaction
+  ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
+            compaction->compaction_reason());
+  ASSERT_EQ(0, compaction->output_level());
+  ASSERT_EQ(0, compaction->start_level());
+  ASSERT_EQ(4U, compaction->num_input_files(0));
+  ASSERT_TRUE(file_map_[4].first->being_compacted);
+  ASSERT_TRUE(file_map_[5].first->being_compacted);
+  ASSERT_TRUE(file_map_[3].first->being_compacted);
+  ASSERT_TRUE(file_map_[6].first->being_compacted);
+}
+
+TEST_F(CompactionPickerTest, UniversalMarkedL0WithOverlap) {
+  const uint64_t kFileSize = 100000;
+
+  ioptions_.compaction_style = kCompactionStyleUniversal;
+  UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
+
+  // This test covers the case where a file is being compacted, and a
+  // delete triggered compaction is then scheduled. The latter should stop
+  // at the first file being compacted
+  NewVersionStorage(1, kCompactionStyleUniversal);
+
+  // Mark file number 4 for compaction
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 260, 300, 0, true);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 201, 250);
+  Add(0, 3U, "301", "350", 4 * kFileSize, 0, 101, 150);
+  Add(0, 6U, "501", "750", 8 * kFileSize, 0, 50, 100);
+  UpdateVersionStorageInfo();
+  file_map_[3].first->being_compacted = true;
+
+  std::unique_ptr<Compaction> compaction(
+      universal_compaction_picker.PickCompaction(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
+
+  ASSERT_TRUE(compaction);
+  // Validate that its a delete triggered compaction
+  ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
+            compaction->compaction_reason());
+  ASSERT_EQ(0, compaction->output_level());
+  ASSERT_EQ(0, compaction->start_level());
+  ASSERT_EQ(2U, compaction->num_input_files(0));
+  ASSERT_TRUE(file_map_[4].first->being_compacted);
+  ASSERT_TRUE(file_map_[5].first->being_compacted);
+}
+
+TEST_F(CompactionPickerTest, UniversalMarkedL0Overlap2) {
+  const uint64_t kFileSize = 100000;
+
+  ioptions_.compaction_style = kCompactionStyleUniversal;
+  UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
+
+  // This test covers the case where a delete triggered compaction is
+  // scheduled first, followed by a "regular" compaction. The latter
+  // should fail
+  NewVersionStorage(1, kCompactionStyleUniversal);
+
+  // Mark file number 4 for compaction
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 260, 300);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 201, 250, 0, true);
+  Add(0, 3U, "301", "350", 4 * kFileSize, 0, 101, 150);
+  Add(0, 6U, "501", "750", 8 * kFileSize, 0, 50, 100);
+  UpdateVersionStorageInfo();
+
+  std::unique_ptr<Compaction> compaction(
+      universal_compaction_picker.PickCompaction(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
+
+  ASSERT_TRUE(compaction);
+  // Validate that its a delete triggered compaction
+  ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
+            compaction->compaction_reason());
+  ASSERT_EQ(0, compaction->output_level());
+  ASSERT_EQ(0, compaction->start_level());
+  ASSERT_EQ(3U, compaction->num_input_files(0));
+  ASSERT_TRUE(file_map_[5].first->being_compacted);
+  ASSERT_TRUE(file_map_[3].first->being_compacted);
+  ASSERT_TRUE(file_map_[6].first->being_compacted);
+
+  AddVersionStorage();
+  Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
+  Add(0, 2U, "201", "250", kFileSize, 0, 401, 450);
+  UpdateVersionStorageInfo();
+
+  std::unique_ptr<Compaction> compaction2(
+      universal_compaction_picker.PickCompaction(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          &log_buffer_));
+  ASSERT_TRUE(compaction2);
+  ASSERT_EQ(3U, compaction->num_input_files(0));
+  ASSERT_TRUE(file_map_[1].first->being_compacted);
+  ASSERT_TRUE(file_map_[2].first->being_compacted);
+  ASSERT_TRUE(file_map_[4].first->being_compacted);
+}
+
+TEST_F(CompactionPickerTest, UniversalMarkedManualCompaction) {
+  const uint64_t kFileSize = 100000;
+  const int kNumLevels = 7;
+
+  // This test makes sure the `files_marked_for_compaction_` is updated after
+  // creating manual compaction.
+  ioptions_.compaction_style = kCompactionStyleUniversal;
+  UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
+
+  NewVersionStorage(kNumLevels, kCompactionStyleUniversal);
+
+  // Add 3 files marked for compaction
+  Add(0, 3U, "301", "350", 4 * kFileSize, 0, 101, 150, 0, true);
+  Add(0, 4U, "260", "300", 1 * kFileSize, 0, 260, 300, 0, true);
+  Add(0, 5U, "240", "290", 2 * kFileSize, 0, 201, 250, 0, true);
+  UpdateVersionStorageInfo();
+
+  // All 3 files are marked for compaction
+  ASSERT_EQ(3U, vstorage_->FilesMarkedForCompaction().size());
+
+  bool manual_conflict = false;
+  InternalKey* manual_end = NULL;
+  std::unique_ptr<Compaction> compaction(
+      universal_compaction_picker.CompactRange(
+          cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
+          ColumnFamilyData::kCompactAllLevels, 6, CompactRangeOptions(), NULL,
+          NULL, &manual_end, &manual_conflict, port::kMaxUint64));
+
+  ASSERT_TRUE(compaction);
+
+  ASSERT_EQ(CompactionReason::kManualCompaction,
+            compaction->compaction_reason());
+  ASSERT_EQ(kNumLevels - 1, compaction->output_level());
+  ASSERT_EQ(0, compaction->start_level());
+  ASSERT_EQ(3U, compaction->num_input_files(0));
+  ASSERT_TRUE(file_map_[3].first->being_compacted);
+  ASSERT_TRUE(file_map_[4].first->being_compacted);
+  ASSERT_TRUE(file_map_[5].first->being_compacted);
+
+  // After creating the manual compaction, all files should be cleared from
+  // `FilesMarkedForCompaction`. So they won't be picked by others.
+  ASSERT_EQ(0U, vstorage_->FilesMarkedForCompaction().size());
+}
+
+#endif  // ROCKSDB_LITE
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {