]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction/compaction_job_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_job_test.cc
index fe53f0b00d22fa577fb0cc7bd35c7d75d73645ac..c87871100ce1a119d287cde67958ccdcfc552469 100644 (file)
@@ -5,6 +5,8 @@
 
 #ifndef ROCKSDB_LITE
 
+#include "db/compaction/compaction_job.h"
+
 #include <algorithm>
 #include <array>
 #include <cinttypes>
 
 #include "db/blob/blob_index.h"
 #include "db/column_family.h"
-#include "db/compaction/compaction_job.h"
 #include "db/db_impl/db_impl.h"
 #include "db/error_handler.h"
 #include "db/version_set.h"
+#include "file/random_access_file_reader.h"
 #include "file/writable_file_writer.h"
+#include "options/options_helper.h"
 #include "rocksdb/cache.h"
+#include "rocksdb/convenience.h"
 #include "rocksdb/db.h"
+#include "rocksdb/file_system.h"
 #include "rocksdb/options.h"
 #include "rocksdb/write_buffer_manager.h"
 #include "table/mock_table.h"
+#include "table/unique_id_impl.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 #include "util/string_util.h"
@@ -34,7 +40,7 @@ namespace ROCKSDB_NAMESPACE {
 namespace {
 
 void VerifyInitializationOfCompactionJobStats(
-      const CompactionJobStats& compaction_job_stats) {
+    const CompactionJobStats& compaction_job_stats) {
 #if !defined(IOS_CROSS_COMPILE)
   ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U);
 
@@ -65,15 +71,141 @@ void VerifyInitializationOfCompactionJobStats(
 #endif  // !defined(IOS_CROSS_COMPILE)
 }
 
+// Mock FSWritableFile for testing io priority.
+// Only override the essential functions for testing compaction io priority.
+class MockTestWritableFile : public FSWritableFileOwnerWrapper {
+ public:
+  MockTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
+                       Env::IOPriority io_priority)
+      : FSWritableFileOwnerWrapper(std::move(file)),
+        write_io_priority_(io_priority) {}
+  IOStatus Append(const Slice& data, const IOOptions& options,
+                  IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Append(data, options, dbg);
+  }
+  IOStatus Append(const Slice& data, const IOOptions& options,
+                  const DataVerificationInfo& verification_info,
+                  IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Append(data, options, verification_info, dbg);
+  }
+  IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Close(options, dbg);
+  }
+  IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Flush(options, dbg);
+  }
+  IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Sync(options, dbg);
+  }
+  IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Fsync(options, dbg);
+  }
+  uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->GetFileSize(options, dbg);
+  }
+  IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options,
+                     IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->RangeSync(offset, nbytes, options, dbg);
+  }
+
+  void PrepareWrite(size_t offset, size_t len, const IOOptions& options,
+                    IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    target()->PrepareWrite(offset, len, options, dbg);
+  }
+
+  IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
+                    IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
+    return target()->Allocate(offset, len, options, dbg);
+  }
+
+ private:
+  Env::IOPriority write_io_priority_;
+};
+
+// Mock FSRandomAccessFile for testing io priority.
+// Only override the essential functions for testing compaction io priority.
+class MockTestRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
+ public:
+  MockTestRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
+                           Env::IOPriority io_priority)
+      : FSRandomAccessFileOwnerWrapper(std::move(file)),
+        read_io_priority_(io_priority) {}
+
+  IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
+                Slice* result, char* scratch,
+                IODebugContext* dbg) const override {
+    EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
+    return target()->Read(offset, n, options, result, scratch, dbg);
+  }
+  IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
+                    IODebugContext* dbg) override {
+    EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
+    return target()->Prefetch(offset, n, options, dbg);
+  }
+
+ private:
+  Env::IOPriority read_io_priority_;
+};
+
+// Mock FileSystem for testing io priority.
+class MockTestFileSystem : public FileSystemWrapper {
+ public:
+  explicit MockTestFileSystem(const std::shared_ptr<FileSystem>& base,
+                              Env::IOPriority read_io_priority,
+                              Env::IOPriority write_io_priority)
+      : FileSystemWrapper(base),
+        read_io_priority_(read_io_priority),
+        write_io_priority_(write_io_priority) {}
+
+  static const char* kClassName() { return "MockTestFileSystem"; }
+  const char* Name() const override { return kClassName(); }
+
+  IOStatus NewRandomAccessFile(const std::string& fname,
+                               const FileOptions& file_opts,
+                               std::unique_ptr<FSRandomAccessFile>* result,
+                               IODebugContext* dbg) override {
+    IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
+    EXPECT_OK(s);
+    result->reset(
+        new MockTestRandomAccessFile(std::move(*result), read_io_priority_));
+    return s;
+  }
+  IOStatus NewWritableFile(const std::string& fname,
+                           const FileOptions& file_opts,
+                           std::unique_ptr<FSWritableFile>* result,
+                           IODebugContext* dbg) override {
+    IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
+    EXPECT_OK(s);
+    result->reset(
+        new MockTestWritableFile(std::move(*result), write_io_priority_));
+    return s;
+  }
+
+ private:
+  Env::IOPriority read_io_priority_;
+  Env::IOPriority write_io_priority_;
+};
+
+enum TableTypeForTest : uint8_t { kMockTable = 0, kBlockBasedTable = 1 };
+
 }  // namespace
 
 class CompactionJobTestBase : public testing::Test {
  protected:
   CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
-                        std::function<std::string(uint64_t)> encode_u64_ts)
-      : env_(Env::Default()),
-        fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
-        dbname_(std::move(dbname)),
+                        std::function<std::string(uint64_t)> encode_u64_ts,
+                        bool test_io_priority, TableTypeForTest table_type)
+      : dbname_(std::move(dbname)),
         ucmp_(ucmp),
         db_options_(),
         mutable_cf_options_(cf_options_),
@@ -83,12 +215,23 @@ class CompactionJobTestBase : public testing::Test {
         versions_(new VersionSet(
             dbname_, &db_options_, env_options_, table_cache_.get(),
             &write_buffer_manager_, &write_controller_,
-            /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)),
+            /*block_cache_tracer=*/nullptr,
+            /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")),
         shutting_down_(false),
-        preserve_deletes_seqnum_(0),
         mock_table_factory_(new mock::MockTableFactory()),
         error_handler_(nullptr, db_options_, &mutex_),
-        encode_u64_ts_(std::move(encode_u64_ts)) {}
+        encode_u64_ts_(std::move(encode_u64_ts)),
+        test_io_priority_(test_io_priority),
+        table_type_(table_type) {
+    Env* base_env = Env::Default();
+    EXPECT_OK(
+        test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
+    env_ = base_env;
+    fs_ = env_->GetFileSystem();
+    // set default for the tests
+    mutable_cf_options_.target_file_size_base = 1024 * 1024;
+    mutable_cf_options_.max_compaction_bytes = 10 * 1024 * 1024;
+  }
 
   void SetUp() override {
     EXPECT_OK(env_->CreateDirIfMissing(dbname_));
@@ -97,7 +240,14 @@ class CompactionJobTestBase : public testing::Test {
     db_options_.db_paths.emplace_back(dbname_,
                                       std::numeric_limits<uint64_t>::max());
     cf_options_.comparator = ucmp_;
-    cf_options_.table_factory = mock_table_factory_;
+    if (table_type_ == TableTypeForTest::kBlockBasedTable) {
+      BlockBasedTableOptions table_options;
+      cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
+    } else if (table_type_ == TableTypeForTest::kMockTable) {
+      cf_options_.table_factory = mock_table_factory_;
+    } else {
+      assert(false);
+    }
   }
 
   std::string GenerateFileName(uint64_t file_number) {
@@ -137,6 +287,33 @@ class CompactionJobTestBase : public testing::Test {
     return blob_index;
   }
 
+  // Creates a table with the specificied key value pairs.
+  void CreateTable(const std::string& table_name,
+                   const mock::KVVector& contents, uint64_t& file_size) {
+    std::unique_ptr<WritableFileWriter> file_writer;
+    Status s = WritableFileWriter::Create(fs_, table_name, FileOptions(),
+                                          &file_writer, nullptr);
+    ASSERT_OK(s);
+    std::unique_ptr<TableBuilder> table_builder(
+        cf_options_.table_factory->NewTableBuilder(
+            TableBuilderOptions(*cfd_->ioptions(), mutable_cf_options_,
+                                cfd_->internal_comparator(),
+                                cfd_->int_tbl_prop_collector_factories(),
+                                CompressionType::kNoCompression,
+                                CompressionOptions(), 0 /* column_family_id */,
+                                kDefaultColumnFamilyName, -1 /* level */),
+            file_writer.get()));
+    // Build table.
+    for (auto kv : contents) {
+      std::string key;
+      std::string value;
+      std::tie(key, value) = kv;
+      table_builder->Add(key, value);
+    }
+    ASSERT_OK(table_builder->Finish());
+    file_size = table_builder->FileSize();
+  }
+
   void AddMockFile(const mock::KVVector& contents, int level = 0) {
     assert(contents.size() > 0);
 
@@ -190,22 +367,111 @@ class CompactionJobTestBase : public testing::Test {
     }
 
     uint64_t file_number = versions_->NewFileNumber();
-    EXPECT_OK(mock_table_factory_->CreateMockTable(
-        env_, GenerateFileName(file_number), std::move(contents)));
+
+    uint64_t file_size = 0;
+    if (table_type_ == TableTypeForTest::kBlockBasedTable) {
+      CreateTable(GenerateFileName(file_number), contents, file_size);
+    } else if (table_type_ == TableTypeForTest::kMockTable) {
+      file_size = 10;
+      EXPECT_OK(mock_table_factory_->CreateMockTable(
+          env_, GenerateFileName(file_number), std::move(contents)));
+    } else {
+      assert(false);
+    }
 
     VersionEdit edit;
-    edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
-                 smallest_seqno, largest_seqno, false, oldest_blob_file_number,
-                 kUnknownOldestAncesterTime, kUnknownFileCreationTime,
-                 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
+    edit.AddFile(level, file_number, 0, file_size, smallest_key, largest_key,
+                 smallest_seqno, largest_seqno, false, Temperature::kUnknown,
+                 oldest_blob_file_number, kUnknownOldestAncesterTime,
+                 kUnknownFileCreationTime, kUnknownFileChecksum,
+                 kUnknownFileChecksumFuncName, kNullUniqueId64x2);
 
     mutex_.Lock();
     EXPECT_OK(
         versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
-                               mutable_cf_options_, &edit, &mutex_));
+                               mutable_cf_options_, &edit, &mutex_, nullptr));
     mutex_.Unlock();
   }
 
+  void VerifyTables(int output_level,
+                    const std::vector<mock::KVVector>& expected_results,
+                    std::vector<uint64_t> expected_oldest_blob_file_numbers) {
+    if (expected_results.empty()) {
+      ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
+      return;
+    }
+    int expected_output_file_num = 0;
+    for (const auto& e : expected_results) {
+      if (!e.empty()) {
+        ++expected_output_file_num;
+      }
+    }
+    ASSERT_EQ(expected_output_file_num, compaction_job_stats_.num_output_files);
+    if (expected_output_file_num == 0) {
+      return;
+    }
+
+    if (expected_oldest_blob_file_numbers.empty()) {
+      expected_oldest_blob_file_numbers.resize(expected_output_file_num,
+                                               kInvalidBlobFileNumber);
+    }
+
+    auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+    if (table_type_ == TableTypeForTest::kMockTable) {
+      ASSERT_EQ(compaction_job_stats_.num_output_files,
+                expected_results.size());
+      mock_table_factory_->AssertLatestFiles(expected_results);
+    } else {
+      assert(table_type_ == TableTypeForTest::kBlockBasedTable);
+    }
+
+    auto output_files =
+        cfd->current()->storage_info()->LevelFiles(output_level);
+    ASSERT_EQ(expected_output_file_num, output_files.size());
+
+    if (table_type_ == TableTypeForTest::kMockTable) {
+      assert(output_files.size() ==
+             static_cast<size_t>(expected_output_file_num));
+      const FileMetaData* const output_file = output_files[0];
+      ASSERT_EQ(output_file->oldest_blob_file_number,
+                expected_oldest_blob_file_numbers[0]);
+      return;
+    }
+
+    for (size_t i = 0; i < expected_results.size(); ++i) {
+      const FileMetaData* const output_file = output_files[i];
+      std::string file_name = GenerateFileName(output_file->fd.GetNumber());
+      const auto& fs = env_->GetFileSystem();
+      std::unique_ptr<RandomAccessFileReader> freader;
+      IOStatus ios = RandomAccessFileReader::Create(
+          fs, file_name, FileOptions(), &freader, nullptr);
+      ASSERT_OK(ios);
+      std::unique_ptr<TableReader> table_reader;
+      uint64_t file_size = output_file->fd.GetFileSize();
+      ReadOptions read_opts;
+      Status s = cf_options_.table_factory->NewTableReader(
+          read_opts,
+          TableReaderOptions(*cfd->ioptions(), nullptr, FileOptions(),
+                             cfd_->internal_comparator()),
+          std::move(freader), file_size, &table_reader, false);
+      ASSERT_OK(s);
+      assert(table_reader);
+      std::unique_ptr<InternalIterator> iiter(
+          table_reader->NewIterator(read_opts, nullptr, nullptr, true,
+                                    TableReaderCaller::kUncategorized));
+      assert(iiter);
+
+      mock::KVVector from_db;
+      for (iiter->SeekToFirst(); iiter->Valid(); iiter->Next()) {
+        const Slice key = iiter->key();
+        const Slice value = iiter->value();
+        from_db.emplace_back(
+            make_pair(key.ToString(false), value.ToString(false)));
+      }
+      ASSERT_EQ(expected_results[i], from_db);
+    }
+  }
+
   void SetLastSequence(const SequenceNumber sequence_number) {
     versions_->SetLastAllocatedSequence(sequence_number + 1);
     versions_->SetLastPublishedSequence(sequence_number + 1);
@@ -227,14 +493,13 @@ class CompactionJobTestBase : public testing::Test {
     for (int i = 0; i < 2; ++i) {
       auto contents = mock::MakeMockFile();
       for (int k = 0; k < kKeysPerFile; ++k) {
-        auto key = ToString(i * kMatchingKeys + k);
-        auto value = ToString(i * kKeysPerFile + k);
+        auto key = std::to_string(i * kMatchingKeys + k);
+        auto value = std::to_string(i * kKeysPerFile + k);
         InternalKey internal_key(key, ++sequence_number, kTypeValue);
 
         // This is how the key will look like once it's written in bottommost
         // file
-        InternalKey bottommost_internal_key(
-            key, 0, kTypeValue);
+        InternalKey bottommost_internal_key(key, 0, kTypeValue);
 
         if (corrupt_id(k)) {
           test::CorruptKeyType(&internal_key);
@@ -264,12 +529,20 @@ class CompactionJobTestBase : public testing::Test {
   void NewDB() {
     EXPECT_OK(DestroyDB(dbname_, Options()));
     EXPECT_OK(env_->CreateDirIfMissing(dbname_));
+
+    std::shared_ptr<Logger> info_log;
+    DBOptions db_opts = BuildDBOptions(db_options_, mutable_db_options_);
+    Status s = CreateLoggerFromOptions(dbname_, db_opts, &info_log);
+    ASSERT_OK(s);
+    db_options_.info_log = info_log;
+
     versions_.reset(
         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
                        &write_buffer_manager_, &write_controller_,
-                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
+                       /*db_id*/ "", /*db_session_id*/ ""));
     compaction_job_stats_.Reset();
-    SetIdentityFile(env_, dbname_);
+    ASSERT_OK(SetIdentityFile(env_, dbname_));
 
     VersionEdit new_db;
     new_db.SetLogNumber(0);
@@ -277,12 +550,13 @@ class CompactionJobTestBase : public testing::Test {
     new_db.SetLastSequence(0);
 
     const std::string manifest = DescriptorFileName(dbname_, 1);
-    std::unique_ptr<WritableFile> file;
-    Status s = env_->NewWritableFile(
-        manifest, &file, env_->OptimizeForManifestWrite(env_options_));
+    std::unique_ptr<WritableFileWriter> file_writer;
+    const auto& fs = env_->GetFileSystem();
+    s = WritableFileWriter::Create(fs, manifest,
+                                   fs->OptimizeForManifestWrite(env_options_),
+                                   &file_writer, nullptr);
+
     ASSERT_OK(s);
-    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
-        NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
     {
       log::Writer log(std::move(file_writer), 0, false);
       std::string record;
@@ -304,35 +578,71 @@ class CompactionJobTestBase : public testing::Test {
     cfd_ = versions_->GetColumnFamilySet()->GetDefault();
   }
 
+  // input_files[i] on input_levels[i]
+  void RunLastLevelCompaction(
+      const std::vector<std::vector<FileMetaData*>>& input_files,
+      const std::vector<int> input_levels,
+      std::function<void(Compaction& comp)>&& verify_func,
+      const std::vector<SequenceNumber>& snapshots = {}) {
+    const int kLastLevel = cf_options_.num_levels - 1;
+    verify_per_key_placement_ = std::move(verify_func);
+    mock::KVVector empty_map;
+    RunCompaction(input_files, input_levels, {empty_map}, snapshots,
+                  kMaxSequenceNumber, kLastLevel, false);
+  }
+
+  // input_files[i] on input_levels[i]
   void RunCompaction(
       const std::vector<std::vector<FileMetaData*>>& input_files,
-      const mock::KVVector& expected_results,
+      const std::vector<int>& input_levels,
+      const std::vector<mock::KVVector>& expected_results,
       const std::vector<SequenceNumber>& snapshots = {},
       SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
       int output_level = 1, bool verify = true,
-      uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber) {
+      std::vector<uint64_t> expected_oldest_blob_file_numbers = {},
+      bool check_get_priority = false,
+      Env::IOPriority read_io_priority = Env::IO_TOTAL,
+      Env::IOPriority write_io_priority = Env::IO_TOTAL,
+      int max_subcompactions = 0) {
+    // For compaction, set fs as MockTestFileSystem to check the io_priority.
+    if (test_io_priority_) {
+      db_options_.fs.reset(
+          new MockTestFileSystem(fs_, read_io_priority, write_io_priority));
+    }
+
     auto cfd = versions_->GetColumnFamilySet()->GetDefault();
 
     size_t num_input_files = 0;
     std::vector<CompactionInputFiles> compaction_input_files;
-    for (size_t level = 0; level < input_files.size(); level++) {
-      auto level_files = input_files[level];
+    for (size_t i = 0; i < input_files.size(); ++i) {
+      auto level_files = input_files[i];
       CompactionInputFiles compaction_level;
-      compaction_level.level = static_cast<int>(level);
+      compaction_level.level = input_levels[i];
       compaction_level.files.insert(compaction_level.files.end(),
-          level_files.begin(), level_files.end());
+                                    level_files.begin(), level_files.end());
       compaction_input_files.push_back(compaction_level);
       num_input_files += level_files.size();
     }
 
+    std::vector<FileMetaData*> grandparents;
+    // it should actually be the next non-empty level
+    const int kGrandparentsLevel = output_level + 1;
+    if (kGrandparentsLevel < cf_options_.num_levels) {
+      grandparents =
+          cfd_->current()->storage_info()->LevelFiles(kGrandparentsLevel);
+    }
+
     Compaction compaction(
         cfd->current()->storage_info(), *cfd->ioptions(),
         *cfd->GetLatestMutableCFOptions(), mutable_db_options_,
-        compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0,
-        kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts, 0,
-        {}, true);
+        compaction_input_files, output_level,
+        mutable_cf_options_.target_file_size_base,
+        mutable_cf_options_.max_compaction_bytes, 0, kNoCompression,
+        cfd->GetLatestMutableCFOptions()->compression_opts,
+        Temperature::kUnknown, max_subcompactions, grandparents, true);
     compaction.SetInputVersion(cfd->current());
 
+    assert(db_options_.info_log);
     LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
     mutex_.Lock();
     EventLogger event_logger(db_options_.info_log.get());
@@ -340,15 +650,17 @@ class CompactionJobTestBase : public testing::Test {
     SnapshotChecker* snapshot_checker = nullptr;
     ASSERT_TRUE(full_history_ts_low_.empty() ||
                 ucmp_->timestamp_size() == full_history_ts_low_.size());
+    const std::atomic<bool> kManualCompactionCanceledFalse{false};
     CompactionJob compaction_job(
-        0, &compaction, db_options_, env_options_, versions_.get(),
-        &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr,
-        nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots,
-        earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
-        &event_logger, false, false, dbname_, &compaction_job_stats_,
-        Env::Priority::USER, nullptr /* IOTracer */,
-        /*manual_compaction_paused=*/nullptr, /*db_id=*/"",
-        /*db_session_id=*/"", full_history_ts_low_);
+        0, &compaction, db_options_, mutable_db_options_, env_options_,
+        versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr,
+        nullptr, nullptr, &mutex_, &error_handler_, snapshots,
+        earliest_write_conflict_snapshot, snapshot_checker, nullptr,
+        table_cache_, &event_logger, false, false, dbname_,
+        &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */,
+        /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
+        env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr),
+        full_history_ts_low_);
     VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
 
     compaction_job.Prepare();
@@ -360,26 +672,50 @@ class CompactionJobTestBase : public testing::Test {
     ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions()));
     ASSERT_OK(compaction_job.io_status());
     mutex_.Unlock();
+    log_buffer.FlushBufferToLog();
 
     if (verify) {
       ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
       ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
 
-      if (expected_results.empty()) {
-        ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
-      } else {
-        ASSERT_EQ(compaction_job_stats_.num_output_files, 1U);
-        mock_table_factory_->AssertLatestFile(expected_results);
-
-        auto output_files =
-            cfd->current()->storage_info()->LevelFiles(output_level);
-        ASSERT_EQ(output_files.size(), 1);
-        ASSERT_EQ(output_files[0]->oldest_blob_file_number,
-                  expected_oldest_blob_file_number);
-      }
+      VerifyTables(output_level, expected_results,
+                   expected_oldest_blob_file_numbers);
+    }
+
+    if (check_get_priority) {
+      CheckGetRateLimiterPriority(compaction_job);
+    }
+
+    if (verify_per_key_placement_) {
+      // Verify per_key_placement compaction
+      assert(compaction.SupportsPerKeyPlacement());
+      verify_per_key_placement_(compaction);
     }
   }
 
+  void CheckGetRateLimiterPriority(CompactionJob& compaction_job) {
+    // When the state from WriteController is normal.
+    ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_LOW);
+
+    WriteController* write_controller =
+        compaction_job.versions_->GetColumnFamilySet()->write_controller();
+
+    {
+      // When the state from WriteController is Delayed.
+      std::unique_ptr<WriteControllerToken> delay_token =
+          write_controller->GetDelayToken(1000000);
+      ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
+    }
+
+    {
+      // When the state from WriteController is Stopped.
+      std::unique_ptr<WriteControllerToken> stop_token =
+          write_controller->GetStopToken();
+      ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
+    }
+  }
+
+  std::shared_ptr<Env> env_guard_;
   Env* env_;
   std::shared_ptr<FileSystem> fs_;
   std::string dbname_;
@@ -395,7 +731,6 @@ class CompactionJobTestBase : public testing::Test {
   std::unique_ptr<VersionSet> versions_;
   InstrumentedMutex mutex_;
   std::atomic<bool> shutting_down_;
-  SequenceNumber preserve_deletes_seqnum_;
   std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
   CompactionJobStats compaction_job_stats_;
   ColumnFamilyData* cfd_;
@@ -404,15 +739,19 @@ class CompactionJobTestBase : public testing::Test {
   ErrorHandler error_handler_;
   std::string full_history_ts_low_;
   const std::function<std::string(uint64_t)> encode_u64_ts_;
+  const bool test_io_priority_;
+  std::function<void(Compaction& comp)> verify_per_key_placement_;
+  const TableTypeForTest table_type_ = kMockTable;
 };
 
 // TODO(icanadi) Make it simpler once we mock out VersionSet
 class CompactionJobTest : public CompactionJobTestBase {
  public:
   CompactionJobTest()
-      : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"),
-                              BytewiseComparator(),
-                              [](uint64_t /*ts*/) { return ""; }) {}
+      : CompactionJobTestBase(
+            test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(),
+            [](uint64_t /*ts*/) { return ""; }, /*test_io_priority=*/false,
+            TableTypeForTest::kMockTable) {}
 };
 
 TEST_F(CompactionJobTest, Simple) {
@@ -420,9 +759,10 @@ TEST_F(CompactionJobTest, Simple) {
 
   auto expected_results = CreateTwoFiles(false);
   auto cfd = versions_->GetColumnFamilySet()->GetDefault();
-  auto files = cfd->current()->storage_info()->LevelFiles(0);
+  constexpr int input_level = 0;
+  auto files = cfd->current()->storage_info()->LevelFiles(input_level);
   ASSERT_EQ(2U, files.size());
-  RunCompaction({ files }, expected_results);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
@@ -430,8 +770,9 @@ TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
 
   auto expected_results = CreateTwoFiles(true);
   auto cfd = versions_->GetColumnFamilySet()->GetDefault();
-  auto files = cfd->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
   ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U);
 }
 
@@ -450,8 +791,9 @@ TEST_F(CompactionJobTest, SimpleDeletion) {
       mock::MakeMockFile({{KeyStr("b", 0U, kTypeValue), "val"}});
 
   SetLastSequence(4U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTest, OutputNothing) {
@@ -468,8 +810,10 @@ TEST_F(CompactionJobTest, OutputNothing) {
   auto expected_results = mock::MakeMockFile();
 
   SetLastSequence(4U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTest, SimpleOverwrite) {
@@ -490,8 +834,9 @@ TEST_F(CompactionJobTest, SimpleOverwrite) {
                           {KeyStr("b", 0U, kTypeValue), "val3"}});
 
   SetLastSequence(4U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTest, SimpleNonLastLevel) {
@@ -518,9 +863,12 @@ TEST_F(CompactionJobTest, SimpleNonLastLevel) {
                           {KeyStr("b", 6U, kTypeValue), "val3"}});
 
   SetLastSequence(6U);
-  auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
-  auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
-  RunCompaction({lvl0_files, lvl1_files}, expected_results);
+  const std::vector<int> input_levels = {0, 1};
+  auto lvl0_files =
+      cfd_->current()->storage_info()->LevelFiles(input_levels[0]);
+  auto lvl1_files =
+      cfd_->current()->storage_info()->LevelFiles(input_levels[1]);
+  RunCompaction({lvl0_files, lvl1_files}, input_levels, {expected_results});
 }
 
 TEST_F(CompactionJobTest, SimpleMerge) {
@@ -543,8 +891,9 @@ TEST_F(CompactionJobTest, SimpleMerge) {
                           {KeyStr("b", 0U, kTypeValue), "1,2"}});
 
   SetLastSequence(5U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTest, NonAssocMerge) {
@@ -567,8 +916,9 @@ TEST_F(CompactionJobTest, NonAssocMerge) {
                           {KeyStr("b", 0U, kTypeValue), "1,2"}});
 
   SetLastSequence(5U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 // Filters merge operands with value 10.
@@ -594,8 +944,9 @@ TEST_F(CompactionJobTest, MergeOperandFilter) {
                           {KeyStr("b", 0U, kTypeValue), test::EncodeInt(2U)}});
 
   SetLastSequence(5U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTest, FilterSomeMergeOperands) {
@@ -630,8 +981,9 @@ TEST_F(CompactionJobTest, FilterSomeMergeOperands) {
   });
 
   SetLastSequence(5U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 // Test where all operands/merge results are filtered out.
@@ -664,10 +1016,11 @@ TEST_F(CompactionJobTest, FilterAllMergeOperands) {
   AddMockFile(file3, 2);
 
   SetLastSequence(11U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
 
   mock::KVVector empty_map;
-  RunCompaction({files}, empty_map);
+  RunCompaction({files}, {input_level}, {empty_map});
 }
 
 TEST_F(CompactionJobTest, SimpleSingleDelete) {
@@ -692,8 +1045,9 @@ TEST_F(CompactionJobTest, SimpleSingleDelete) {
       mock::MakeMockFile({{KeyStr("a", 5U, kTypeDeletion), ""}});
 
   SetLastSequence(6U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTest, SingleDeleteSnapshots) {
@@ -757,8 +1111,9 @@ TEST_F(CompactionJobTest, SingleDeleteSnapshots) {
   });
 
   SetLastSequence(22U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results, {10U, 20U}, 10U);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U}, 10U);
 }
 
 TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) {
@@ -835,8 +1190,10 @@ TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) {
   });
 
   SetLastSequence(24U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results, {10U, 20U, 30U}, 20U);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U, 30U},
+                20U);
 }
 
 TEST_F(CompactionJobTest, SingleDeleteZeroSeq) {
@@ -858,8 +1215,9 @@ TEST_F(CompactionJobTest, SingleDeleteZeroSeq) {
   });
 
   SetLastSequence(22U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results, {});
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results}, {});
 }
 
 TEST_F(CompactionJobTest, MultiSingleDelete) {
@@ -878,10 +1236,10 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
   //      -> Snapshot Put
   // K: SDel SDel Put SDel Put Put Snapshot SDel Put SDel SDel Put SDel
   //      -> Snapshot Put Snapshot SDel
-  // L: SDel Put Del Put SDel Snapshot Del Put Del SDel Put SDel
-  //      -> Snapshot SDel
-  // M: (Put) SDel Put Del Put SDel Snapshot Put Del SDel Put SDel Del
-  //      -> SDel Snapshot Del
+  // L: SDel Put SDel Put SDel Snapshot SDel Put SDel SDel Put SDel
+  //      -> Snapshot SDel Put SDel
+  // M: (Put) SDel Put SDel Put SDel Snapshot Put SDel SDel Put SDel SDel
+  //      -> SDel Snapshot Put SDel
   NewDB();
 
   auto file1 = mock::MakeMockFile({
@@ -912,14 +1270,14 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
       {KeyStr("L", 16U, kTypeSingleDeletion), ""},
       {KeyStr("L", 15U, kTypeValue), "val"},
       {KeyStr("L", 14U, kTypeSingleDeletion), ""},
-      {KeyStr("L", 13U, kTypeDeletion), ""},
+      {KeyStr("L", 13U, kTypeSingleDeletion), ""},
       {KeyStr("L", 12U, kTypeValue), "val"},
-      {KeyStr("L", 11U, kTypeDeletion), ""},
-      {KeyStr("M", 16U, kTypeDeletion), ""},
+      {KeyStr("L", 11U, kTypeSingleDeletion), ""},
+      {KeyStr("M", 16U, kTypeSingleDeletion), ""},
       {KeyStr("M", 15U, kTypeSingleDeletion), ""},
       {KeyStr("M", 14U, kTypeValue), "val"},
       {KeyStr("M", 13U, kTypeSingleDeletion), ""},
-      {KeyStr("M", 12U, kTypeDeletion), ""},
+      {KeyStr("M", 12U, kTypeSingleDeletion), ""},
       {KeyStr("M", 11U, kTypeValue), "val"},
   });
   AddMockFile(file1);
@@ -960,12 +1318,12 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
       {KeyStr("K", 1U, kTypeSingleDeletion), ""},
       {KeyStr("L", 5U, kTypeSingleDeletion), ""},
       {KeyStr("L", 4U, kTypeValue), "val"},
-      {KeyStr("L", 3U, kTypeDeletion), ""},
+      {KeyStr("L", 3U, kTypeSingleDeletion), ""},
       {KeyStr("L", 2U, kTypeValue), "val"},
       {KeyStr("L", 1U, kTypeSingleDeletion), ""},
       {KeyStr("M", 10U, kTypeSingleDeletion), ""},
       {KeyStr("M", 7U, kTypeValue), "val"},
-      {KeyStr("M", 5U, kTypeDeletion), ""},
+      {KeyStr("M", 5U, kTypeSingleDeletion), ""},
       {KeyStr("M", 4U, kTypeValue), "val"},
       {KeyStr("M", 3U, kTypeSingleDeletion), ""},
   });
@@ -1007,12 +1365,15 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
                           {KeyStr("K", 8U, kTypeValue), "val3"},
                           {KeyStr("L", 16U, kTypeSingleDeletion), ""},
                           {KeyStr("L", 15U, kTypeValue), ""},
-                          {KeyStr("M", 16U, kTypeDeletion), ""},
+                          {KeyStr("L", 11U, kTypeSingleDeletion), ""},
+                          {KeyStr("M", 15U, kTypeSingleDeletion), ""},
+                          {KeyStr("M", 14U, kTypeValue), ""},
                           {KeyStr("M", 3U, kTypeSingleDeletion), ""}});
 
   SetLastSequence(22U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results, {10U}, 10U);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results}, {10U}, 10U);
 }
 
 // This test documents the behavior where a corrupt key follows a deletion or a
@@ -1041,8 +1402,9 @@ TEST_F(CompactionJobTest, DISABLED_CorruptionAfterDeletion) {
                           {test::KeyStr("c", 0U, kTypeValue), "val2"}});
 
   SetLastSequence(6U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTest, OldestBlobFileNumber) {
@@ -1087,17 +1449,686 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) {
                           expected_blob4, expected_blob5, expected_blob6});
 
   SetLastSequence(6U);
-  auto files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results, std::vector<SequenceNumber>(),
-                kMaxSequenceNumber, /* output_level */ 1, /* verify */ true,
-                /* expected_oldest_blob_file_number */ 19);
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results},
+                std::vector<SequenceNumber>(), kMaxSequenceNumber,
+                /* output_level */ 1, /* verify */ true,
+                /* expected_oldest_blob_file_numbers */ {19});
+}
+
+TEST_F(CompactionJobTest, VerifyPenultimateLevelOutput) {
+  cf_options_.bottommost_temperature = Temperature::kCold;
+  SyncPoint::GetInstance()->SetCallBack(
+      "Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
+        auto supports_per_key_placement = static_cast<bool*>(arg);
+        *supports_per_key_placement = true;
+      });
+
+  std::atomic_uint64_t latest_cold_seq = 0;
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator::PrepareOutput.context", [&](void* arg) {
+        auto context = static_cast<PerKeyPlacementContext*>(arg);
+        context->output_to_penultimate_level =
+            context->seq_num > latest_cold_seq;
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  NewDB();
+
+  // Add files on different levels that may overlap
+  auto file0_1 = mock::MakeMockFile({{KeyStr("z", 12U, kTypeValue), "val"}});
+  AddMockFile(file0_1);
+
+  auto file1_1 = mock::MakeMockFile({{KeyStr("b", 10U, kTypeValue), "val"},
+                                     {KeyStr("f", 11U, kTypeValue), "val"}});
+  AddMockFile(file1_1, 1);
+  auto file1_2 = mock::MakeMockFile({{KeyStr("j", 12U, kTypeValue), "val"},
+                                     {KeyStr("k", 13U, kTypeValue), "val"}});
+  AddMockFile(file1_2, 1);
+  auto file1_3 = mock::MakeMockFile({{KeyStr("p", 14U, kTypeValue), "val"},
+                                     {KeyStr("u", 15U, kTypeValue), "val"}});
+  AddMockFile(file1_3, 1);
+
+  auto file2_1 = mock::MakeMockFile({{KeyStr("f", 8U, kTypeValue), "val"},
+                                     {KeyStr("h", 9U, kTypeValue), "val"}});
+  AddMockFile(file2_1, 2);
+  auto file2_2 = mock::MakeMockFile({{KeyStr("m", 6U, kTypeValue), "val"},
+                                     {KeyStr("p", 7U, kTypeValue), "val"}});
+  AddMockFile(file2_2, 2);
+
+  auto file3_1 = mock::MakeMockFile({{KeyStr("g", 2U, kTypeValue), "val"},
+                                     {KeyStr("k", 3U, kTypeValue), "val"}});
+  AddMockFile(file3_1, 3);
+  auto file3_2 = mock::MakeMockFile({{KeyStr("v", 4U, kTypeValue), "val"},
+                                     {KeyStr("x", 5U, kTypeValue), "val"}});
+  AddMockFile(file3_2, 3);
+
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  const std::vector<int> input_levels = {0, 1, 2, 3};
+  auto files0 = cfd->current()->storage_info()->LevelFiles(input_levels[0]);
+  auto files1 = cfd->current()->storage_info()->LevelFiles(input_levels[1]);
+  auto files2 = cfd->current()->storage_info()->LevelFiles(input_levels[2]);
+  auto files3 = cfd->current()->storage_info()->LevelFiles(input_levels[3]);
+
+  RunLastLevelCompaction(
+      {files0, files1, files2, files3}, input_levels,
+      /*verify_func=*/[&](Compaction& comp) {
+        for (char c = 'a'; c <= 'z'; c++) {
+          std::string c_str;
+          c_str = c;
+          const Slice key(c_str);
+          if (c == 'a') {
+            ASSERT_FALSE(comp.WithinPenultimateLevelOutputRange(key));
+          } else {
+            ASSERT_TRUE(comp.WithinPenultimateLevelOutputRange(key));
+          }
+        }
+      });
+}
+
+TEST_F(CompactionJobTest, NoEnforceSingleDeleteContract) {
+  db_options_.enforce_single_del_contracts = false;
+  NewDB();
+
+  auto file =
+      mock::MakeMockFile({{KeyStr("a", 4U, kTypeSingleDeletion), ""},
+                          {KeyStr("a", 3U, kTypeDeletion), "dontcare"}});
+  AddMockFile(file);
+  SetLastSequence(4U);
+
+  auto expected_results = mock::MakeMockFile();
+  constexpr int input_level = 0;
+  auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
+}
+
+TEST_F(CompactionJobTest, InputSerialization) {
+  // Setup a random CompactionServiceInput
+  CompactionServiceInput input;
+  const int kStrMaxLen = 1000;
+  Random rnd(static_cast<uint32_t>(time(nullptr)));
+  Random64 rnd64(time(nullptr));
+  input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
+  input.column_family.options.comparator = ReverseBytewiseComparator();
+  input.column_family.options.max_bytes_for_level_base =
+      rnd64.Uniform(UINT64_MAX);
+  input.column_family.options.disable_auto_compactions = rnd.OneIn(2);
+  input.column_family.options.compression = kZSTD;
+  input.column_family.options.compression_opts.level = 4;
+  input.db_options.max_background_flushes = 10;
+  input.db_options.paranoid_checks = rnd.OneIn(2);
+  input.db_options.statistics = CreateDBStatistics();
+  input.db_options.env = env_;
+  while (!rnd.OneIn(10)) {
+    input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
+  }
+  while (!rnd.OneIn(10)) {
+    input.input_files.emplace_back(rnd.RandomString(
+        rnd.Uniform(kStrMaxLen - 1) +
+        1));  // input file name should have at least one character
+  }
+  input.output_level = 4;
+  input.has_begin = rnd.OneIn(2);
+  if (input.has_begin) {
+    input.begin = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
+  }
+  input.has_end = rnd.OneIn(2);
+  if (input.has_end) {
+    input.end = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
+  }
+
+  std::string output;
+  ASSERT_OK(input.Write(&output));
+
+  // Test deserialization
+  CompactionServiceInput deserialized1;
+  ASSERT_OK(CompactionServiceInput::Read(output, &deserialized1));
+  ASSERT_TRUE(deserialized1.TEST_Equals(&input));
+
+  // Test mismatch
+  deserialized1.db_options.max_background_flushes += 10;
+  std::string mismatch;
+  ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
+  ASSERT_EQ(mismatch, "db_options.max_background_flushes");
+
+  // Test unknown field
+  CompactionServiceInput deserialized2;
+  output.clear();
+  ASSERT_OK(input.Write(&output));
+  output.append("new_field=123;");
+
+  ASSERT_OK(CompactionServiceInput::Read(output, &deserialized2));
+  ASSERT_TRUE(deserialized2.TEST_Equals(&input));
+
+  // Test missing field
+  CompactionServiceInput deserialized3;
+  deserialized3.output_level = 0;
+  std::string to_remove = "output_level=4;";
+  size_t pos = output.find(to_remove);
+  ASSERT_TRUE(pos != std::string::npos);
+  output.erase(pos, to_remove.length());
+  ASSERT_OK(CompactionServiceInput::Read(output, &deserialized3));
+  mismatch.clear();
+  ASSERT_FALSE(deserialized3.TEST_Equals(&input, &mismatch));
+  ASSERT_EQ(mismatch, "output_level");
+
+  // manually set the value back, should match the original structure
+  deserialized3.output_level = 4;
+  ASSERT_TRUE(deserialized3.TEST_Equals(&input));
+
+  // Test invalid version
+  output.clear();
+  ASSERT_OK(input.Write(&output));
+
+  uint32_t data_version = DecodeFixed32(output.data());
+  const size_t kDataVersionSize = sizeof(data_version);
+  ASSERT_EQ(data_version,
+            1U);  // Update once the default data version is changed
+  char buf[kDataVersionSize];
+  EncodeFixed32(buf, data_version + 10);  // make sure it's not valid
+  output.replace(0, kDataVersionSize, buf, kDataVersionSize);
+  Status s = CompactionServiceInput::Read(output, &deserialized3);
+  ASSERT_TRUE(s.IsNotSupported());
+}
+
+TEST_F(CompactionJobTest, ResultSerialization) {
+  // Setup a random CompactionServiceResult
+  CompactionServiceResult result;
+  const int kStrMaxLen = 1000;
+  Random rnd(static_cast<uint32_t>(time(nullptr)));
+  Random64 rnd64(time(nullptr));
+  std::vector<Status> status_list = {
+      Status::OK(),
+      Status::InvalidArgument("invalid option"),
+      Status::Aborted("failed to run"),
+      Status::NotSupported("not supported option"),
+  };
+  result.status =
+      status_list.at(rnd.Uniform(static_cast<int>(status_list.size())));
+  while (!rnd.OneIn(10)) {
+    UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)};
+    result.output_files.emplace_back(
+        rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX),
+        rnd64.Uniform(UINT64_MAX),
+        rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
+        rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
+        rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX),
+        rnd64.Uniform(UINT64_MAX), rnd.OneIn(2), id);
+  }
+  result.output_level = rnd.Uniform(10);
+  result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
+  result.num_output_records = rnd64.Uniform(UINT64_MAX);
+  result.total_bytes = rnd64.Uniform(UINT64_MAX);
+  result.bytes_read = 123;
+  result.bytes_written = rnd64.Uniform(UINT64_MAX);
+  result.stats.elapsed_micros = rnd64.Uniform(UINT64_MAX);
+  result.stats.num_output_files = rnd.Uniform(1000);
+  result.stats.is_full_compaction = rnd.OneIn(2);
+  result.stats.num_single_del_mismatch = rnd64.Uniform(UINT64_MAX);
+  result.stats.num_input_files = 9;
+
+  std::string output;
+  ASSERT_OK(result.Write(&output));
+
+  // Test deserialization
+  CompactionServiceResult deserialized1;
+  ASSERT_OK(CompactionServiceResult::Read(output, &deserialized1));
+  ASSERT_TRUE(deserialized1.TEST_Equals(&result));
+
+  // Test mismatch
+  deserialized1.stats.num_input_files += 10;
+  std::string mismatch;
+  ASSERT_FALSE(deserialized1.TEST_Equals(&result, &mismatch));
+  ASSERT_EQ(mismatch, "stats.num_input_files");
+
+  // Test unique id mismatch
+  if (!result.output_files.empty()) {
+    CompactionServiceResult deserialized_tmp;
+    ASSERT_OK(CompactionServiceResult::Read(output, &deserialized_tmp));
+    deserialized_tmp.output_files[0].unique_id[0] += 1;
+    ASSERT_FALSE(deserialized_tmp.TEST_Equals(&result, &mismatch));
+    ASSERT_EQ(mismatch, "output_files.unique_id");
+    deserialized_tmp.status.PermitUncheckedError();
+  }
+
+  // Test unknown field
+  CompactionServiceResult deserialized2;
+  output.clear();
+  ASSERT_OK(result.Write(&output));
+  output.append("new_field=123;");
+
+  ASSERT_OK(CompactionServiceResult::Read(output, &deserialized2));
+  ASSERT_TRUE(deserialized2.TEST_Equals(&result));
+
+  // Test missing field
+  CompactionServiceResult deserialized3;
+  deserialized3.bytes_read = 0;
+  std::string to_remove = "bytes_read=123;";
+  size_t pos = output.find(to_remove);
+  ASSERT_TRUE(pos != std::string::npos);
+  output.erase(pos, to_remove.length());
+  ASSERT_OK(CompactionServiceResult::Read(output, &deserialized3));
+  mismatch.clear();
+  ASSERT_FALSE(deserialized3.TEST_Equals(&result, &mismatch));
+  ASSERT_EQ(mismatch, "bytes_read");
+
+  deserialized3.bytes_read = 123;
+  ASSERT_TRUE(deserialized3.TEST_Equals(&result));
+
+  // Test invalid version
+  output.clear();
+  ASSERT_OK(result.Write(&output));
+
+  uint32_t data_version = DecodeFixed32(output.data());
+  const size_t kDataVersionSize = sizeof(data_version);
+  ASSERT_EQ(data_version,
+            1U);  // Update once the default data version is changed
+  char buf[kDataVersionSize];
+  EncodeFixed32(buf, data_version + 10);  // make sure it's not valid
+  output.replace(0, kDataVersionSize, buf, kDataVersionSize);
+  Status s = CompactionServiceResult::Read(output, &deserialized3);
+  ASSERT_TRUE(s.IsNotSupported());
+  for (const auto& item : status_list) {
+    item.PermitUncheckedError();
+  }
+}
+
+class CompactionJobDynamicFileSizeTest
+    : public CompactionJobTestBase,
+      public ::testing::WithParamInterface<bool> {
+ public:
+  CompactionJobDynamicFileSizeTest()
+      : CompactionJobTestBase(
+            test::PerThreadDBPath("compaction_job_dynamic_file_size_test"),
+            BytewiseComparator(), [](uint64_t /*ts*/) { return ""; },
+            /*test_io_priority=*/false, TableTypeForTest::kMockTable) {}
+};
+
+TEST_P(CompactionJobDynamicFileSizeTest, CutForMaxCompactionBytes) {
+  // dynamic_file_size option should have no impact on cutting for max
+  // compaction bytes.
+  bool enable_dyanmic_file_size = GetParam();
+  cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
+
+  NewDB();
+  mutable_cf_options_.target_file_size_base = 80;
+  mutable_cf_options_.max_compaction_bytes = 21;
+
+  auto file1 = mock::MakeMockFile({
+      {KeyStr("c", 5U, kTypeValue), "val2"},
+      {KeyStr("n", 6U, kTypeValue), "val3"},
+  });
+  AddMockFile(file1);
+
+  auto file2 = mock::MakeMockFile({{KeyStr("h", 3U, kTypeValue), "val"},
+                                   {KeyStr("j", 4U, kTypeValue), "val"}});
+  AddMockFile(file2, 1);
+
+  // Create three L2 files, each size 10.
+  // max_compaction_bytes 21 means the compaction output in L1 will
+  // be cut to at least two files.
+  auto file3 = mock::MakeMockFile({{KeyStr("b", 1U, kTypeValue), "val"},
+                                   {KeyStr("c", 1U, kTypeValue), "val"},
+                                   {KeyStr("c1", 1U, kTypeValue), "val"},
+                                   {KeyStr("c2", 1U, kTypeValue), "val"},
+                                   {KeyStr("c3", 1U, kTypeValue), "val"},
+                                   {KeyStr("c4", 1U, kTypeValue), "val"},
+                                   {KeyStr("d", 1U, kTypeValue), "val"},
+                                   {KeyStr("e", 2U, kTypeValue), "val"}});
+  AddMockFile(file3, 2);
+
+  auto file4 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
+                                   {KeyStr("i", 1U, kTypeValue), "val"},
+                                   {KeyStr("i1", 1U, kTypeValue), "val"},
+                                   {KeyStr("i2", 1U, kTypeValue), "val"},
+                                   {KeyStr("i3", 1U, kTypeValue), "val"},
+                                   {KeyStr("i4", 1U, kTypeValue), "val"},
+                                   {KeyStr("j", 1U, kTypeValue), "val"},
+                                   {KeyStr("k", 2U, kTypeValue), "val"}});
+  AddMockFile(file4, 2);
+
+  auto file5 = mock::MakeMockFile({{KeyStr("l", 1U, kTypeValue), "val"},
+                                   {KeyStr("m", 1U, kTypeValue), "val"},
+                                   {KeyStr("m1", 1U, kTypeValue), "val"},
+                                   {KeyStr("m2", 1U, kTypeValue), "val"},
+                                   {KeyStr("m3", 1U, kTypeValue), "val"},
+                                   {KeyStr("m4", 1U, kTypeValue), "val"},
+                                   {KeyStr("n", 1U, kTypeValue), "val"},
+                                   {KeyStr("o", 2U, kTypeValue), "val"}});
+  AddMockFile(file5, 2);
+
+  // The expected output should be:
+  //  L1:   [c,   h,  j]        [n]
+  //  L2: [b ... e] [h ... k] [l ... o]
+  // It's better to have "j" in the first file, because anyway it's overlapping
+  // with the second file on L2.
+  // (Note: before this PR, it was cut at "h" because it's using the internal
+  // comparator which think L1 "h" with seqno 3 is smaller than L2 "h" with
+  // seqno 1, but actually they're overlapped with the compaction picker).
+
+  auto expected_file1 =
+      mock::MakeMockFile({{KeyStr("c", 5U, kTypeValue), "val2"},
+                          {KeyStr("h", 3U, kTypeValue), "val"},
+                          {KeyStr("j", 4U, kTypeValue), "val"}});
+  auto expected_file2 =
+      mock::MakeMockFile({{KeyStr("n", 6U, kTypeValue), "val3"}});
+
+  SetLastSequence(6U);
+
+  const std::vector<int> input_levels = {0, 1};
+  auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
+  auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
+
+  RunCompaction({lvl0_files, lvl1_files}, input_levels,
+                {expected_file1, expected_file2});
+}
+
+TEST_P(CompactionJobDynamicFileSizeTest, CutToSkipGrandparentFile) {
+  bool enable_dyanmic_file_size = GetParam();
+  cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
+
+  NewDB();
+  // Make sure the grandparent level file size (10) qualifies skipping.
+  // Currently, it has to be > 1/8 of target file size.
+  mutable_cf_options_.target_file_size_base = 70;
+
+  auto file1 = mock::MakeMockFile({
+      {KeyStr("a", 5U, kTypeValue), "val2"},
+      {KeyStr("z", 6U, kTypeValue), "val3"},
+  });
+  AddMockFile(file1);
+
+  auto file2 = mock::MakeMockFile({{KeyStr("c", 3U, kTypeValue), "val"},
+                                   {KeyStr("x", 4U, kTypeValue), "val"}});
+  AddMockFile(file2, 1);
+
+  auto file3 = mock::MakeMockFile({{KeyStr("b", 1U, kTypeValue), "val"},
+                                   {KeyStr("d", 2U, kTypeValue), "val"}});
+  AddMockFile(file3, 2);
+
+  auto file4 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
+                                   {KeyStr("i", 2U, kTypeValue), "val"}});
+  AddMockFile(file4, 2);
+
+  auto file5 = mock::MakeMockFile({{KeyStr("v", 1U, kTypeValue), "val"},
+                                   {KeyStr("y", 2U, kTypeValue), "val"}});
+  AddMockFile(file5, 2);
+
+  auto expected_file1 =
+      mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
+                          {KeyStr("c", 3U, kTypeValue), "val"}});
+  auto expected_file2 =
+      mock::MakeMockFile({{KeyStr("x", 4U, kTypeValue), "val"},
+                          {KeyStr("z", 6U, kTypeValue), "val3"}});
+
+  auto expected_file_disable_dynamic_file_size =
+      mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
+                          {KeyStr("c", 3U, kTypeValue), "val"},
+                          {KeyStr("x", 4U, kTypeValue), "val"},
+                          {KeyStr("z", 6U, kTypeValue), "val3"}});
+
+  SetLastSequence(6U);
+  const std::vector<int> input_levels = {0, 1};
+  auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
+  auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
+  if (enable_dyanmic_file_size) {
+    RunCompaction({lvl0_files, lvl1_files}, input_levels,
+                  {expected_file1, expected_file2});
+  } else {
+    RunCompaction({lvl0_files, lvl1_files}, input_levels,
+                  {expected_file_disable_dynamic_file_size});
+  }
+}
+
+TEST_P(CompactionJobDynamicFileSizeTest, CutToAlignGrandparentBoundary) {
+  bool enable_dyanmic_file_size = GetParam();
+  cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
+  NewDB();
+
+  // MockTable has 1 byte per entry by default and each file is 10 bytes.
+  // When the file size is smaller than 100, it won't cut file earlier to align
+  // with its grandparent boundary.
+  const size_t kKeyValueSize = 10000;
+  mock_table_factory_->SetKeyValueSize(kKeyValueSize);
+
+  mutable_cf_options_.target_file_size_base = 10 * kKeyValueSize;
+
+  mock::KVVector file1;
+  char ch = 'd';
+  // Add value from d -> o
+  for (char i = 0; i < 12; i++) {
+    file1.emplace_back(KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
+                       "val" + std::to_string(i));
+  }
+
+  AddMockFile(file1);
+
+  auto file2 = mock::MakeMockFile({{KeyStr("e", 3U, kTypeValue), "val"},
+                                   {KeyStr("s", 4U, kTypeValue), "val"}});
+  AddMockFile(file2, 1);
+
+  // the 1st grandparent file should be skipped
+  auto file3 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
+                                   {KeyStr("b", 2U, kTypeValue), "val"}});
+  AddMockFile(file3, 2);
+
+  auto file4 = mock::MakeMockFile({{KeyStr("c", 1U, kTypeValue), "val"},
+                                   {KeyStr("e", 2U, kTypeValue), "val"}});
+  AddMockFile(file4, 2);
+
+  auto file5 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
+                                   {KeyStr("j", 2U, kTypeValue), "val"}});
+  AddMockFile(file5, 2);
+
+  auto file6 = mock::MakeMockFile({{KeyStr("k", 1U, kTypeValue), "val"},
+                                   {KeyStr("n", 2U, kTypeValue), "val"}});
+  AddMockFile(file6, 2);
+
+  auto file7 = mock::MakeMockFile({{KeyStr("q", 1U, kTypeValue), "val"},
+                                   {KeyStr("t", 2U, kTypeValue), "val"}});
+  AddMockFile(file7, 2);
+
+  // The expected outputs are:
+  //  L1:         [d,e,f,g,h,i,j] [k,l,m,n,o,s]
+  //  L2: [a, b] [c,  e]   [h, j] [k, n]  [q, t]
+  // The first output cut earlier at "j", so it could be aligned with L2 files.
+  // If dynamic_file_size is not enabled, it will be cut based on the
+  // target_file_size
+  mock::KVVector expected_file1;
+  for (char i = 0; i < 7; i++) {
+    expected_file1.emplace_back(
+        KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
+        "val" + std::to_string(i));
+  }
+
+  mock::KVVector expected_file2;
+  for (char i = 7; i < 12; i++) {
+    expected_file2.emplace_back(
+        KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
+        "val" + std::to_string(i));
+  }
+  expected_file2.emplace_back(KeyStr("s", 4U, kTypeValue), "val");
+
+  mock::KVVector expected_file_disable_dynamic_file_size1;
+  for (char i = 0; i < 10; i++) {
+    expected_file_disable_dynamic_file_size1.emplace_back(
+        KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
+        "val" + std::to_string(i));
+  }
+
+  mock::KVVector expected_file_disable_dynamic_file_size2;
+  for (char i = 10; i < 12; i++) {
+    expected_file_disable_dynamic_file_size2.emplace_back(
+        KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
+        "val" + std::to_string(i));
+  }
+
+  expected_file_disable_dynamic_file_size2.emplace_back(
+      KeyStr("s", 4U, kTypeValue), "val");
+
+  SetLastSequence(22U);
+  const std::vector<int> input_levels = {0, 1};
+  auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
+  auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
+  if (enable_dyanmic_file_size) {
+    RunCompaction({lvl0_files, lvl1_files}, input_levels,
+                  {expected_file1, expected_file2});
+  } else {
+    RunCompaction({lvl0_files, lvl1_files}, input_levels,
+                  {expected_file_disable_dynamic_file_size1,
+                   expected_file_disable_dynamic_file_size2});
+  }
+}
+
+TEST_P(CompactionJobDynamicFileSizeTest, CutToAlignGrandparentBoundarySameKey) {
+  bool enable_dyanmic_file_size = GetParam();
+  cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
+  NewDB();
+
+  // MockTable has 1 byte per entry by default and each file is 10 bytes.
+  // When the file size is smaller than 100, it won't cut file earlier to align
+  // with its grandparent boundary.
+  const size_t kKeyValueSize = 10000;
+  mock_table_factory_->SetKeyValueSize(kKeyValueSize);
+
+  mutable_cf_options_.target_file_size_base = 10 * kKeyValueSize;
+
+  mock::KVVector file1;
+  for (int i = 0; i < 7; i++) {
+    file1.emplace_back(KeyStr("a", 100 - i, kTypeValue),
+                       "val" + std::to_string(100 - i));
+  }
+  file1.emplace_back(KeyStr("b", 90, kTypeValue), "valb");
+
+  AddMockFile(file1);
+
+  auto file2 = mock::MakeMockFile({{KeyStr("a", 93U, kTypeValue), "val93"},
+                                   {KeyStr("b", 90U, kTypeValue), "valb"}});
+  AddMockFile(file2, 1);
+
+  auto file3 = mock::MakeMockFile({{KeyStr("a", 89U, kTypeValue), "val"},
+                                   {KeyStr("a", 88U, kTypeValue), "val"}});
+  AddMockFile(file3, 2);
+
+  auto file4 = mock::MakeMockFile({{KeyStr("a", 87U, kTypeValue), "val"},
+                                   {KeyStr("a", 86U, kTypeValue), "val"}});
+  AddMockFile(file4, 2);
+
+  auto file5 = mock::MakeMockFile({{KeyStr("b", 85U, kTypeValue), "val"},
+                                   {KeyStr("b", 84U, kTypeValue), "val"}});
+  AddMockFile(file5, 2);
+
+  mock::KVVector expected_file1;
+  mock::KVVector expected_file_disable_dynamic_file_size;
+
+  for (int i = 0; i < 8; i++) {
+    expected_file1.emplace_back(KeyStr("a", 100 - i, kTypeValue),
+                                "val" + std::to_string(100 - i));
+    expected_file_disable_dynamic_file_size.emplace_back(
+        KeyStr("a", 100 - i, kTypeValue), "val" + std::to_string(100 - i));
+  }
+
+  // make sure `b` is cut in a separated file (so internally it's not using
+  // internal comparator, which will think the "b:90" (seqno 90) here is smaller
+  // than "b:85" on L2.)
+  auto expected_file2 =
+      mock::MakeMockFile({{KeyStr("b", 90U, kTypeValue), "valb"}});
+
+  expected_file_disable_dynamic_file_size.emplace_back(
+      KeyStr("b", 90U, kTypeValue), "valb");
+
+  SetLastSequence(122U);
+  const std::vector<int> input_levels = {0, 1};
+  auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
+  auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
+
+  // Just keep all the history
+  std::vector<SequenceNumber> snapshots;
+  for (int i = 80; i <= 100; i++) {
+    snapshots.emplace_back(i);
+  }
+  if (enable_dyanmic_file_size) {
+    RunCompaction({lvl0_files, lvl1_files}, input_levels,
+                  {expected_file1, expected_file2}, snapshots);
+  } else {
+    RunCompaction({lvl0_files, lvl1_files}, input_levels,
+                  {expected_file_disable_dynamic_file_size}, snapshots);
+  }
 }
 
+TEST_P(CompactionJobDynamicFileSizeTest, CutForMaxCompactionBytesSameKey) {
+  // dynamic_file_size option should have no impact on cutting for max
+  // compaction bytes.
+  bool enable_dyanmic_file_size = GetParam();
+  cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
+
+  NewDB();
+  mutable_cf_options_.target_file_size_base = 80;
+  mutable_cf_options_.max_compaction_bytes = 20;
+
+  auto file1 = mock::MakeMockFile({{KeyStr("a", 104U, kTypeValue), "val1"},
+                                   {KeyStr("b", 103U, kTypeValue), "val"}});
+  AddMockFile(file1);
+
+  auto file2 = mock::MakeMockFile({{KeyStr("a", 102U, kTypeValue), "val2"},
+                                   {KeyStr("c", 101U, kTypeValue), "val"}});
+  AddMockFile(file2, 1);
+
+  for (int i = 0; i < 10; i++) {
+    auto file =
+        mock::MakeMockFile({{KeyStr("a", 100 - (i * 2), kTypeValue), "val"},
+                            {KeyStr("a", 99 - (i * 2), kTypeValue), "val"}});
+    AddMockFile(file, 2);
+  }
+
+  for (int i = 0; i < 10; i++) {
+    auto file =
+        mock::MakeMockFile({{KeyStr("b", 80 - (i * 2), kTypeValue), "val"},
+                            {KeyStr("b", 79 - (i * 2), kTypeValue), "val"}});
+    AddMockFile(file, 2);
+  }
+
+  auto file5 = mock::MakeMockFile({{KeyStr("c", 60U, kTypeValue), "valc"},
+                                   {KeyStr("c", 59U, kTypeValue), "valc"}});
+
+  // "a" has 10 overlapped grandparent files (each size 10), which is far
+  // exceeded the `max_compaction_bytes`, but make sure 2 "a" are not separated,
+  // as splitting them won't help reducing the compaction size.
+  // also make sure "b" and "c" are cut separately.
+  mock::KVVector expected_file1 =
+      mock::MakeMockFile({{KeyStr("a", 104U, kTypeValue), "val1"},
+                          {KeyStr("a", 102U, kTypeValue), "val2"}});
+  mock::KVVector expected_file2 =
+      mock::MakeMockFile({{KeyStr("b", 103U, kTypeValue), "val"}});
+  mock::KVVector expected_file3 =
+      mock::MakeMockFile({{KeyStr("c", 101U, kTypeValue), "val"}});
+
+  SetLastSequence(122U);
+  const std::vector<int> input_levels = {0, 1};
+  auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
+  auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
+
+  // Just keep all the history
+  std::vector<SequenceNumber> snapshots;
+  for (int i = 80; i <= 105; i++) {
+    snapshots.emplace_back(i);
+  }
+  RunCompaction({lvl0_files, lvl1_files}, input_levels,
+                {expected_file1, expected_file2, expected_file3}, snapshots);
+}
+
+INSTANTIATE_TEST_CASE_P(CompactionJobDynamicFileSizeTest,
+                        CompactionJobDynamicFileSizeTest, testing::Bool());
+
 class CompactionJobTimestampTest : public CompactionJobTestBase {
  public:
   CompactionJobTimestampTest()
       : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
-                              test::ComparatorWithU64Ts(), test::EncodeInt) {}
+                              test::BytewiseComparatorWithU64TsWrapper(),
+                              test::EncodeInt, /*test_io_priority=*/false,
+                              TableTypeForTest::kMockTable) {}
 };
 
 TEST_F(CompactionJobTimestampTest, GCDisabled) {
@@ -1106,13 +2137,16 @@ TEST_F(CompactionJobTimestampTest, GCDisabled) {
   auto file1 =
       mock::MakeMockFile({{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
                           {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
-                          {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"}});
+                          {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
+                          {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"}});
+
   AddMockFile(file1);
 
   auto file2 = mock::MakeMockFile(
-      {{KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""},
-       {KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
-       {KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}});
+      {{KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
+       {KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
+       {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
+       {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
   AddMockFile(file2);
 
   SetLastSequence(10);
@@ -1121,11 +2155,14 @@ TEST_F(CompactionJobTimestampTest, GCDisabled) {
       {{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
        {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
        {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
-       {KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""},
-       {KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
-       {KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}});
-  const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
-  RunCompaction({files}, expected_results);
+       {KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
+       {KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
+       {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
+       {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"},
+       {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
+  constexpr int input_level = 0;
+  const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
@@ -1150,10 +2187,11 @@ TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
                           {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
                           {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"},
                           {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
-  const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
+  constexpr int input_level = 0;
+  const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
 
   full_history_ts_low_ = encode_u64_ts_(0);
-  RunCompaction({files}, expected_results);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
@@ -1161,23 +2199,26 @@ TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
 
   auto file1 = mock::MakeMockFile(
       {{KeyStr("a", 5, ValueType::kTypeDeletionWithTimestamp, 100), ""},
-       {KeyStr("b", 6, ValueType::kTypeValue, 99), "b6"}});
+       {KeyStr("b", 6, ValueType::kTypeSingleDeletion, 99), ""},
+       {KeyStr("c", 7, ValueType::kTypeValue, 98), "c7"}});
   AddMockFile(file1);
 
   auto file2 = mock::MakeMockFile(
-      {{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
-       {KeyStr("b", 3, ValueType::kTypeDeletionWithTimestamp, 97), ""},
-       {KeyStr("b", 2, ValueType::kTypeValue, 96), "b2"}});
+      {{KeyStr("a", 4, ValueType::kTypeValue, 97), "a4"},
+       {KeyStr("b", 3, ValueType::kTypeValue, 96), "b3"},
+       {KeyStr("c", 2, ValueType::kTypeDeletionWithTimestamp, 95), ""},
+       {KeyStr("c", 1, ValueType::kTypeValue, 94), "c1"}});
   AddMockFile(file2);
 
-  SetLastSequence(6);
+  SetLastSequence(7);
 
   auto expected_results =
-      mock::MakeMockFile({{KeyStr("b", 0, ValueType::kTypeValue, 0), "b6"}});
-  const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
+      mock::MakeMockFile({{KeyStr("c", 0, ValueType::kTypeValue, 0), "c7"}});
+  constexpr int input_level = 0;
+  const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
 
   full_history_ts_low_ = encode_u64_ts_(std::numeric_limits<uint64_t>::max());
-  RunCompaction({files}, expected_results);
+  RunCompaction({files}, {input_level}, {expected_results});
 }
 
 TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
@@ -1198,17 +2239,203 @@ TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
 
   auto expected_results =
       mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
+                          {KeyStr("a", 0, ValueType::kTypeValue, 0), "a3"},
                           {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
-  const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
+  constexpr int input_level = 0;
+  const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
 
   full_history_ts_low_ = encode_u64_ts_(49);
-  RunCompaction({files}, expected_results);
+  RunCompaction({files}, {input_level}, {expected_results});
+}
+
+class CompactionJobTimestampTestWithBbTable : public CompactionJobTestBase {
+ public:
+  // Block-based table is needed if we want to test subcompaction partitioning
+  // with anchors.
+  explicit CompactionJobTimestampTestWithBbTable()
+      : CompactionJobTestBase(
+            test::PerThreadDBPath("compaction_job_ts_bbt_test"),
+            test::BytewiseComparatorWithU64TsWrapper(), test::EncodeInt,
+            /*test_io_priority=*/false, TableTypeForTest::kBlockBasedTable) {}
+};
+
+TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionAnchorL1) {
+  cf_options_.target_file_size_base = 20;
+  mutable_cf_options_.target_file_size_base = 20;
+  NewDB();
+
+  const std::vector<std::string> keys = {
+      KeyStr("a", 20, ValueType::kTypeValue, 200),
+      KeyStr("b", 21, ValueType::kTypeValue, 210),
+      KeyStr("b", 20, ValueType::kTypeValue, 200),
+      KeyStr("b", 18, ValueType::kTypeValue, 180),
+      KeyStr("c", 17, ValueType::kTypeValue, 170),
+      KeyStr("c", 16, ValueType::kTypeValue, 160),
+      KeyStr("c", 15, ValueType::kTypeValue, 150)};
+  const std::vector<std::string> values = {"a20", "b21", "b20", "b18",
+                                           "c17", "c16", "c15"};
+
+  constexpr int input_level = 1;
+
+  auto file1 = mock::MakeMockFile(
+      {{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}});
+  AddMockFile(file1, input_level);
+
+  auto file2 = mock::MakeMockFile(
+      {{keys[3], values[3]}, {keys[4], values[4]}, {keys[5], values[5]}});
+  AddMockFile(file2, input_level);
+
+  auto file3 = mock::MakeMockFile({{keys[6], values[6]}});
+  AddMockFile(file3, input_level);
+
+  SetLastSequence(20);
+
+  auto output1 = mock::MakeMockFile({{keys[0], values[0]}});
+  auto output2 = mock::MakeMockFile(
+      {{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}});
+  auto output3 = mock::MakeMockFile(
+      {{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}});
+
+  auto expected_results =
+      std::vector<mock::KVVector>{output1, output2, output3};
+  const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
+
+  constexpr int output_level = 2;
+  constexpr int max_subcompactions = 4;
+  RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{},
+                /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+                output_level, /*verify=*/true, {kInvalidBlobFileNumber},
+                /*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL,
+                max_subcompactions);
+}
+
+TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionL0) {
+  cf_options_.target_file_size_base = 20;
+  mutable_cf_options_.target_file_size_base = 20;
+  NewDB();
+
+  const std::vector<std::string> keys = {
+      KeyStr("a", 20, ValueType::kTypeValue, 200),
+      KeyStr("b", 20, ValueType::kTypeValue, 200),
+      KeyStr("b", 19, ValueType::kTypeValue, 190),
+      KeyStr("b", 18, ValueType::kTypeValue, 180),
+      KeyStr("c", 17, ValueType::kTypeValue, 170),
+      KeyStr("c", 16, ValueType::kTypeValue, 160),
+      KeyStr("c", 15, ValueType::kTypeValue, 150)};
+  const std::vector<std::string> values = {"a20", "b20", "b19", "b18",
+                                           "c17", "c16", "c15"};
+
+  constexpr int input_level = 0;
+
+  auto file1 = mock::MakeMockFile({{keys[5], values[5]}, {keys[6], values[6]}});
+  AddMockFile(file1, input_level);
+
+  auto file2 = mock::MakeMockFile({{keys[3], values[3]}, {keys[4], values[4]}});
+  AddMockFile(file2, input_level);
+
+  auto file3 = mock::MakeMockFile(
+      {{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}});
+  AddMockFile(file3, input_level);
+
+  SetLastSequence(20);
+
+  auto output1 = mock::MakeMockFile({{keys[0], values[0]}});
+  auto output2 = mock::MakeMockFile(
+      {{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}});
+  auto output3 = mock::MakeMockFile(
+      {{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}});
+
+  auto expected_results =
+      std::vector<mock::KVVector>{output1, output2, output3};
+  const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
+
+  constexpr int output_level = 1;
+  constexpr int max_subcompactions = 4;
+  RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{},
+                /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+                output_level, /*verify=*/true, {kInvalidBlobFileNumber},
+                /*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL,
+                max_subcompactions);
+}
+
+// The io priority of the compaction reads and writes are different from
+// other DB reads and writes. To prepare the compaction input files, use the
+// default filesystem from Env. To test the io priority of the compaction
+// reads and writes, db_options_.fs is set as MockTestFileSystem.
+class CompactionJobIOPriorityTest : public CompactionJobTestBase {
+ public:
+  CompactionJobIOPriorityTest()
+      : CompactionJobTestBase(
+            test::PerThreadDBPath("compaction_job_io_priority_test"),
+            BytewiseComparator(), [](uint64_t /*ts*/) { return ""; },
+            /*test_io_priority=*/true, TableTypeForTest::kBlockBasedTable) {}
+};
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
+  // When the state from WriteController is normal.
+  NewDB();
+  mock::KVVector expected_results = CreateTwoFiles(false);
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  constexpr int input_level = 0;
+  auto files = cfd->current()->storage_info()->LevelFiles(input_level);
+  ASSERT_EQ(2U, files.size());
+  RunCompaction({files}, {input_level}, {expected_results}, {},
+                kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
+                Env::IO_LOW, Env::IO_LOW);
+}
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
+  // When the state from WriteController is Delayed.
+  NewDB();
+  mock::KVVector expected_results = CreateTwoFiles(false);
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  constexpr int input_level = 0;
+  auto files = cfd->current()->storage_info()->LevelFiles(input_level);
+  ASSERT_EQ(2U, files.size());
+  {
+    std::unique_ptr<WriteControllerToken> delay_token =
+        write_controller_.GetDelayToken(1000000);
+    RunCompaction({files}, {input_level}, {expected_results}, {},
+                  kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
+                  Env::IO_USER, Env::IO_USER);
+  }
+}
+
+TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) {
+  // When the state from WriteController is Stalled.
+  NewDB();
+  mock::KVVector expected_results = CreateTwoFiles(false);
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  constexpr int input_level = 0;
+  auto files = cfd->current()->storage_info()->LevelFiles(input_level);
+  ASSERT_EQ(2U, files.size());
+  {
+    std::unique_ptr<WriteControllerToken> stop_token =
+        write_controller_.GetStopToken();
+    RunCompaction({files}, {input_level}, {expected_results}, {},
+                  kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
+                  Env::IO_USER, Env::IO_USER);
+  }
+}
+
+TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) {
+  NewDB();
+  mock::KVVector expected_results = CreateTwoFiles(false);
+  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
+  constexpr int input_level = 0;
+  auto files = cfd->current()->storage_info()->LevelFiles(input_level);
+  ASSERT_EQ(2U, files.size());
+  RunCompaction({files}, {input_level}, {expected_results}, {},
+                kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, true,
+                Env::IO_LOW, Env::IO_LOW);
 }
 
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
+  RegisterCustomObjects(argc, argv);
   return RUN_ALL_TESTS();
 }