]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction/compaction_job.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_job.cc
index a517a2015b9d70724575f2c6ad69f83db44b3824..1da1bcda87261074980d7d18d45ebca2d19cee0a 100644 (file)
 
 #include <algorithm>
 #include <cinttypes>
-#include <functional>
-#include <list>
 #include <memory>
-#include <random>
+#include <optional>
 #include <set>
-#include <thread>
 #include <utility>
 #include <vector>
 
+#include "db/blob/blob_counting_iterator.h"
 #include "db/blob/blob_file_addition.h"
 #include "db/blob/blob_file_builder.h"
 #include "db/builder.h"
+#include "db/compaction/clipping_iterator.h"
+#include "db/compaction/compaction_state.h"
 #include "db/db_impl/db_impl.h"
-#include "db/db_iter.h"
 #include "db/dbformat.h"
 #include "db/error_handler.h"
 #include "db/event_helpers.h"
-#include "db/log_reader.h"
+#include "db/history_trimming_iterator.h"
 #include "db/log_writer.h"
-#include "db/memtable.h"
-#include "db/memtable_list.h"
-#include "db/merge_context.h"
 #include "db/merge_helper.h"
-#include "db/output_validator.h"
 #include "db/range_del_aggregator.h"
+#include "db/version_edit.h"
 #include "db/version_set.h"
 #include "file/filename.h"
 #include "file/read_write_util.h"
 #include "logging/log_buffer.h"
 #include "logging/logging.h"
 #include "monitoring/iostats_context_imp.h"
-#include "monitoring/perf_context_imp.h"
 #include "monitoring/thread_status_util.h"
+#include "options/configurable_helper.h"
+#include "options/options_helper.h"
 #include "port/port.h"
 #include "rocksdb/db.h"
 #include "rocksdb/env.h"
-#include "rocksdb/sst_partitioner.h"
+#include "rocksdb/options.h"
 #include "rocksdb/statistics.h"
 #include "rocksdb/status.h"
 #include "rocksdb/table.h"
-#include "table/block_based/block.h"
-#include "table/block_based/block_based_table_factory.h"
+#include "rocksdb/utilities/options_type.h"
 #include "table/merging_iterator.h"
 #include "table/table_builder.h"
+#include "table/unique_id_impl.h"
 #include "test_util/sync_point.h"
-#include "util/coding.h"
-#include "util/hash.h"
-#include "util/mutexlock.h"
-#include "util/random.h"
 #include "util/stop_watch.h"
-#include "util/string_util.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -101,6 +93,12 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
       return "ExternalSstIngestion";
     case CompactionReason::kPeriodicCompaction:
       return "PeriodicCompaction";
+    case CompactionReason::kChangeTemperature:
+      return "ChangeTemperature";
+    case CompactionReason::kForcedBlobGC:
+      return "ForcedBlobGC";
+    case CompactionReason::kRoundRobinTtl:
+      return "RoundRobinTtl";
     case CompactionReason::kNumOfReasons:
       // fall through
     default:
@@ -109,215 +107,57 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
   }
 }
 
-// Maintains state for each sub-compaction
-struct CompactionJob::SubcompactionState {
-  const Compaction* compaction;
-  std::unique_ptr<CompactionIterator> c_iter;
-
-  // The boundaries of the key-range this compaction is interested in. No two
-  // subcompactions may have overlapping key-ranges.
-  // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
-  Slice *start, *end;
-
-  // The return status of this subcompaction
-  Status status;
-
-  // The return IO Status of this subcompaction
-  IOStatus io_status;
-
-  // Files produced by this subcompaction
-  struct Output {
-    Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
-           bool _enable_order_check, bool _enable_hash)
-        : meta(std::move(_meta)),
-          validator(_icmp, _enable_order_check, _enable_hash),
-          finished(false) {}
-    FileMetaData meta;
-    OutputValidator validator;
-    bool finished;
-    std::shared_ptr<const TableProperties> table_properties;
-  };
-
-  // State kept for output being generated
-  std::vector<Output> outputs;
-  std::vector<BlobFileAddition> blob_file_additions;
-  std::unique_ptr<WritableFileWriter> outfile;
-  std::unique_ptr<TableBuilder> builder;
-
-  Output* current_output() {
-    if (outputs.empty()) {
-      // This subcompaction's output could be empty if compaction was aborted
-      // before this subcompaction had a chance to generate any output files.
-      // When subcompactions are executed sequentially this is more likely and
-      // will be particulalry likely for the later subcompactions to be empty.
-      // Once they are run in parallel however it should be much rarer.
-      return nullptr;
-    } else {
-      return &outputs.back();
-    }
-  }
-
-  uint64_t current_output_file_size = 0;
-
-  // State during the subcompaction
-  uint64_t total_bytes = 0;
-  uint64_t num_output_records = 0;
-  CompactionJobStats compaction_job_stats;
-  uint64_t approx_size = 0;
-  // An index that used to speed up ShouldStopBefore().
-  size_t grandparent_index = 0;
-  // The number of bytes overlapping between the current output and
-  // grandparent files used in ShouldStopBefore().
-  uint64_t overlapped_bytes = 0;
-  // A flag determine whether the key has been seen in ShouldStopBefore()
-  bool seen_key = false;
-
-  SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size)
-      : compaction(c), start(_start), end(_end), approx_size(size) {
-    assert(compaction != nullptr);
-  }
-
-  // Adds the key and value to the builder
-  // If paranoid is true, adds the key-value to the paranoid hash
-  Status AddToBuilder(const Slice& key, const Slice& value) {
-    auto curr = current_output();
-    assert(builder != nullptr);
-    assert(curr != nullptr);
-    Status s = curr->validator.Add(key, value);
-    if (!s.ok()) {
-      return s;
-    }
-    builder->Add(key, value);
-    return Status::OK();
-  }
-
-  // Returns true iff we should stop building the current output
-  // before processing "internal_key".
-  bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
-    const InternalKeyComparator* icmp =
-        &compaction->column_family_data()->internal_comparator();
-    const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
-
-    // Scan to find earliest grandparent file that contains key.
-    while (grandparent_index < grandparents.size() &&
-           icmp->Compare(internal_key,
-                         grandparents[grandparent_index]->largest.Encode()) >
-               0) {
-      if (seen_key) {
-        overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
-      }
-      assert(grandparent_index + 1 >= grandparents.size() ||
-             icmp->Compare(
-                 grandparents[grandparent_index]->largest.Encode(),
-                 grandparents[grandparent_index + 1]->smallest.Encode()) <= 0);
-      grandparent_index++;
-    }
-    seen_key = true;
-
-    if (overlapped_bytes + curr_file_size >
-        compaction->max_compaction_bytes()) {
-      // Too much overlap for current output; start new output
-      overlapped_bytes = 0;
-      return true;
-    }
-
-    return false;
-  }
-};
-
-// Maintains state for the entire compaction
-struct CompactionJob::CompactionState {
-  Compaction* const compaction;
-
-  // REQUIRED: subcompaction states are stored in order of increasing
-  // key-range
-  std::vector<CompactionJob::SubcompactionState> sub_compact_states;
-  Status status;
-
-  size_t num_output_files = 0;
-  uint64_t total_bytes = 0;
-  size_t num_blob_output_files = 0;
-  uint64_t total_blob_bytes = 0;
-  uint64_t num_output_records = 0;
-
-  explicit CompactionState(Compaction* c) : compaction(c) {}
-
-  Slice SmallestUserKey() {
-    for (const auto& sub_compact_state : sub_compact_states) {
-      if (!sub_compact_state.outputs.empty() &&
-          sub_compact_state.outputs[0].finished) {
-        return sub_compact_state.outputs[0].meta.smallest.user_key();
-      }
-    }
-    // If there is no finished output, return an empty slice.
-    return Slice(nullptr, 0);
-  }
-
-  Slice LargestUserKey() {
-    for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
-         ++it) {
-      if (!it->outputs.empty() && it->current_output()->finished) {
-        assert(it->current_output() != nullptr);
-        return it->current_output()->meta.largest.user_key();
-      }
-    }
-    // If there is no finished output, return an empty slice.
-    return Slice(nullptr, 0);
-  }
-};
-
-void CompactionJob::AggregateStatistics() {
-  assert(compact_);
-
-  for (SubcompactionState& sc : compact_->sub_compact_states) {
-    auto& outputs = sc.outputs;
-
-    if (!outputs.empty() && !outputs.back().meta.fd.file_size) {
-      // An error occurred, so ignore the last output.
-      outputs.pop_back();
-    }
-
-    compact_->num_output_files += outputs.size();
-    compact_->total_bytes += sc.total_bytes;
-
-    const auto& blobs = sc.blob_file_additions;
-
-    compact_->num_blob_output_files += blobs.size();
-
-    for (const auto& blob : blobs) {
-      compact_->total_blob_bytes += blob.GetTotalBlobBytes();
-    }
-
-    compact_->num_output_records += sc.num_output_records;
-
-    compaction_job_stats_->Add(sc.compaction_job_stats);
+const char* GetCompactionPenultimateOutputRangeTypeString(
+    Compaction::PenultimateOutputRangeType range_type) {
+  switch (range_type) {
+    case Compaction::PenultimateOutputRangeType::kNotSupported:
+      return "NotSupported";
+    case Compaction::PenultimateOutputRangeType::kFullRange:
+      return "FullRange";
+    case Compaction::PenultimateOutputRangeType::kNonLastRange:
+      return "NonLastRange";
+    case Compaction::PenultimateOutputRangeType::kDisabled:
+      return "Disabled";
+    default:
+      assert(false);
+      return "Invalid";
   }
 }
 
 CompactionJob::CompactionJob(
     int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
-    const FileOptions& file_options, VersionSet* versions,
-    const std::atomic<bool>* shutting_down,
-    const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
-    FSDirectory* db_directory, FSDirectory* output_directory,
-    FSDirectory* blob_output_directory, Statistics* stats,
-    InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
+    const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
+    VersionSet* versions, const std::atomic<bool>* shutting_down,
+    LogBuffer* log_buffer, FSDirectory* db_directory,
+    FSDirectory* output_directory, FSDirectory* blob_output_directory,
+    Statistics* stats, InstrumentedMutex* db_mutex,
+    ErrorHandler* db_error_handler,
     std::vector<SequenceNumber> existing_snapshots,
     SequenceNumber earliest_write_conflict_snapshot,
-    const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
-    EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
-    const std::string& dbname, CompactionJobStats* compaction_job_stats,
-    Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
-    const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
-    const std::string& db_session_id, std::string full_history_ts_low)
-    : job_id_(job_id),
-      compact_(new CompactionState(compaction)),
-      compaction_job_stats_(compaction_job_stats),
+    const SnapshotChecker* snapshot_checker, JobContext* job_context,
+    std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
+    bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
+    CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
+    const std::shared_ptr<IOTracer>& io_tracer,
+    const std::atomic<bool>& manual_compaction_canceled,
+    const std::string& db_id, const std::string& db_session_id,
+    std::string full_history_ts_low, std::string trim_ts,
+    BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
+    int* bg_bottom_compaction_scheduled)
+    : compact_(new CompactionState(compaction)),
       compaction_stats_(compaction->compaction_reason(), 1),
+      db_options_(db_options),
+      mutable_db_options_copy_(mutable_db_options),
+      log_buffer_(log_buffer),
+      output_directory_(output_directory),
+      stats_(stats),
+      bottommost_level_(false),
+      write_hint_(Env::WLTH_NOT_SET),
+      compaction_job_stats_(compaction_job_stats),
+      job_id_(job_id),
       dbname_(dbname),
       db_id_(db_id),
       db_session_id_(db_session_id),
-      db_options_(db_options),
       file_options_(file_options),
       env_(db_options.env),
       io_tracer_(io_tracer),
@@ -326,28 +166,29 @@ CompactionJob::CompactionJob(
           fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
       versions_(versions),
       shutting_down_(shutting_down),
-      manual_compaction_paused_(manual_compaction_paused),
-      preserve_deletes_seqnum_(preserve_deletes_seqnum),
-      log_buffer_(log_buffer),
+      manual_compaction_canceled_(manual_compaction_canceled),
       db_directory_(db_directory),
-      output_directory_(output_directory),
       blob_output_directory_(blob_output_directory),
-      stats_(stats),
       db_mutex_(db_mutex),
       db_error_handler_(db_error_handler),
       existing_snapshots_(std::move(existing_snapshots)),
       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
       snapshot_checker_(snapshot_checker),
+      job_context_(job_context),
       table_cache_(std::move(table_cache)),
       event_logger_(event_logger),
-      bottommost_level_(false),
       paranoid_file_checks_(paranoid_file_checks),
       measure_io_stats_(measure_io_stats),
-      write_hint_(Env::WLTH_NOT_SET),
       thread_pri_(thread_pri),
-      full_history_ts_low_(std::move(full_history_ts_low)) {
+      full_history_ts_low_(std::move(full_history_ts_low)),
+      trim_ts_(std::move(trim_ts)),
+      blob_callback_(blob_callback),
+      extra_num_subcompaction_threads_reserved_(0),
+      bg_compaction_scheduled_(bg_compaction_scheduled),
+      bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
   assert(compaction_job_stats_ != nullptr);
   assert(log_buffer_ != nullptr);
+
   const auto* cfd = compact_->compaction->column_family_data();
   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
                                     db_options_.enable_thread_tracking);
@@ -407,39 +248,194 @@ void CompactionJob::Prepare() {
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_COMPACTION_PREPARE);
 
-  // Generate file_levels_ for compaction berfore making Iterator
+  // Generate file_levels_ for compaction before making Iterator
   auto* c = compact_->compaction;
-  assert(c->column_family_data() != nullptr);
-  assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
+  ColumnFamilyData* cfd = c->column_family_data();
+  assert(cfd != nullptr);
+  assert(cfd->current()->storage_info()->NumLevelFiles(
              compact_->compaction->level()) > 0);
 
-  write_hint_ =
-      c->column_family_data()->CalculateSSTWriteHint(c->output_level());
+  write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
   bottommost_level_ = c->bottommost_level();
 
   if (c->ShouldFormSubcompactions()) {
-    {
-      StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
-      GenSubcompactionBoundaries();
-    }
-    assert(sizes_.size() == boundaries_.size() + 1);
-
+    StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
+    GenSubcompactionBoundaries();
+  }
+  if (boundaries_.size() > 1) {
     for (size_t i = 0; i <= boundaries_.size(); i++) {
-      Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
-      Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
-      compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
+      compact_->sub_compact_states.emplace_back(
+          c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt,
+          (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i])
+                                    : std::nullopt,
+          static_cast<uint32_t>(i));
+      // assert to validate that boundaries don't have same user keys (without
+      // timestamp part).
+      assert(i == 0 || i == boundaries_.size() ||
+             cfd->user_comparator()->CompareWithoutTimestamp(
+                 boundaries_[i - 1], boundaries_[i]) < 0);
     }
     RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
                       compact_->sub_compact_states.size());
   } else {
-    constexpr Slice* start = nullptr;
-    constexpr Slice* end = nullptr;
-    constexpr uint64_t size = 0;
+    compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt,
+                                              /*sub_job_id*/ 0);
+  }
+
+  // collect all seqno->time information from the input files which will be used
+  // to encode seqno->time to the output files.
+  uint64_t preserve_time_duration =
+      std::max(c->immutable_options()->preserve_internal_time_seconds,
+               c->immutable_options()->preclude_last_level_data_seconds);
+
+  if (preserve_time_duration > 0) {
+    // setup seqno_time_mapping_
+    seqno_time_mapping_.SetMaxTimeDuration(preserve_time_duration);
+    for (const auto& each_level : *c->inputs()) {
+      for (const auto& fmd : each_level.files) {
+        std::shared_ptr<const TableProperties> tp;
+        Status s = cfd->current()->GetTableProperties(&tp, fmd, nullptr);
+        if (s.ok()) {
+          seqno_time_mapping_.Add(tp->seqno_to_time_mapping)
+              .PermitUncheckedError();
+          seqno_time_mapping_.Add(fmd->fd.smallest_seqno,
+                                  fmd->oldest_ancester_time);
+        }
+      }
+    }
 
-    compact_->sub_compact_states.emplace_back(c, start, end, size);
+    auto status = seqno_time_mapping_.Sort();
+    if (!status.ok()) {
+      ROCKS_LOG_WARN(db_options_.info_log,
+                     "Invalid sequence number to time mapping: Status: %s",
+                     status.ToString().c_str());
+    }
+    int64_t _current_time = 0;
+    status = db_options_.clock->GetCurrentTime(&_current_time);
+    if (!status.ok()) {
+      ROCKS_LOG_WARN(db_options_.info_log,
+                     "Failed to get current time in compaction: Status: %s",
+                     status.ToString().c_str());
+      // preserve all time information
+      preserve_time_min_seqno_ = 0;
+      preclude_last_level_min_seqno_ = 0;
+    } else {
+      seqno_time_mapping_.TruncateOldEntries(_current_time);
+      uint64_t preserve_time =
+          static_cast<uint64_t>(_current_time) > preserve_time_duration
+              ? _current_time - preserve_time_duration
+              : 0;
+      preserve_time_min_seqno_ =
+          seqno_time_mapping_.GetOldestSequenceNum(preserve_time);
+      if (c->immutable_options()->preclude_last_level_data_seconds > 0) {
+        uint64_t preclude_last_level_time =
+            static_cast<uint64_t>(_current_time) >
+                    c->immutable_options()->preclude_last_level_data_seconds
+                ? _current_time -
+                      c->immutable_options()->preclude_last_level_data_seconds
+                : 0;
+        preclude_last_level_min_seqno_ =
+            seqno_time_mapping_.GetOldestSequenceNum(preclude_last_level_time);
+      }
+    }
   }
 }
 
+uint64_t CompactionJob::GetSubcompactionsLimit() {
+  return extra_num_subcompaction_threads_reserved_ +
+         std::max(
+             std::uint64_t(1),
+             static_cast<uint64_t>(compact_->compaction->max_subcompactions()));
+}
+
+void CompactionJob::AcquireSubcompactionResources(
+    int num_extra_required_subcompactions) {
+  TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
+  TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
+  int max_db_compactions =
+      DBImpl::GetBGJobLimits(
+          mutable_db_options_copy_.max_background_flushes,
+          mutable_db_options_copy_.max_background_compactions,
+          mutable_db_options_copy_.max_background_jobs,
+          versions_->GetColumnFamilySet()
+              ->write_controller()
+              ->NeedSpeedupCompaction())
+          .max_compactions;
+  InstrumentedMutexLock l(db_mutex_);
+  // Apply min function first since We need to compute the extra subcompaction
+  // against compaction limits. And then try to reserve threads for extra
+  // subcompactions. The actual number of reserved threads could be less than
+  // the desired number.
+  int available_bg_compactions_against_db_limit =
+      std::max(max_db_compactions - *bg_compaction_scheduled_ -
+                   *bg_bottom_compaction_scheduled_,
+               0);
+  // Reservation only supports backgrdoun threads of which the priority is
+  // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
+  // origin thread_pri_ is higher than that. Similar to ReleaseThreads().
+  extra_num_subcompaction_threads_reserved_ =
+      env_->ReserveThreads(std::min(num_extra_required_subcompactions,
+                                    available_bg_compactions_against_db_limit),
+                           std::min(thread_pri_, Env::Priority::HIGH));
+
+  // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
+  // depending on if this compaction has the bottommost priority
+  if (thread_pri_ == Env::Priority::BOTTOM) {
+    *bg_bottom_compaction_scheduled_ +=
+        extra_num_subcompaction_threads_reserved_;
+  } else {
+    *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_;
+  }
+}
+
+void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) {
+  // Do nothing when we have zero resources to shrink
+  if (num_extra_resources == 0) return;
+  db_mutex_->Lock();
+  // We cannot release threads more than what we reserved before
+  int extra_num_subcompaction_threads_released = env_->ReleaseThreads(
+      (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH));
+  // Update the number of reserved threads and the number of background
+  // scheduled compactions for this compaction job
+  extra_num_subcompaction_threads_reserved_ -=
+      extra_num_subcompaction_threads_released;
+  // TODO (zichen): design a test case with new subcompaction partitioning
+  // when the number of actual partitions is less than the number of planned
+  // partitions
+  assert(extra_num_subcompaction_threads_released == (int)num_extra_resources);
+  // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
+  // depending on if this compaction has the bottommost priority
+  if (thread_pri_ == Env::Priority::BOTTOM) {
+    *bg_bottom_compaction_scheduled_ -=
+        extra_num_subcompaction_threads_released;
+  } else {
+    *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released;
+  }
+  db_mutex_->Unlock();
+  TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
+}
+
+void CompactionJob::ReleaseSubcompactionResources() {
+  if (extra_num_subcompaction_threads_reserved_ == 0) {
+    return;
+  }
+  {
+    InstrumentedMutexLock l(db_mutex_);
+    // The number of reserved threads becomes larger than 0 only if the
+    // compaction prioity is round robin and there is no sufficient
+    // sub-compactions available
+
+    // The scheduled compaction must be no less than 1 + extra number
+    // subcompactions using acquired resources since this compaction job has not
+    // finished yet
+    assert(*bg_bottom_compaction_scheduled_ >=
+               1 + extra_num_subcompaction_threads_reserved_ ||
+           *bg_compaction_scheduled_ >=
+               1 + extra_num_subcompaction_threads_reserved_);
+  }
+  ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_);
+}
+
 struct RangeWithSize {
   Range range;
   uint64_t size;
@@ -449,15 +445,51 @@ struct RangeWithSize {
 };
 
 void CompactionJob::GenSubcompactionBoundaries() {
+  // The goal is to find some boundary keys so that we can evenly partition
+  // the compaction input data into max_subcompactions ranges.
+  // For every input file, we ask TableReader to estimate 128 anchor points
+  // that evenly partition the input file into 128 ranges and the range
+  // sizes. This can be calculated by scanning index blocks of the file.
+  // Once we have the anchor points for all the input files, we merge them
+  // together and try to find keys dividing ranges evenly.
+  // For example, if we have two input files, and each returns following
+  // ranges:
+  //   File1: (a1, 1000), (b1, 1200), (c1, 1100)
+  //   File2: (a2, 1100), (b2, 1000), (c2, 1000)
+  // We total sort the keys to following:
+  //  (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
+  // We calculate the total size by adding up all ranges' size, which is 6400.
+  // If we would like to partition into 2 subcompactions, the target of the
+  // range size is 3200. Based on the size, we take "b1" as the partition key
+  // since the first three ranges would hit 3200.
+  //
+  // Note that the ranges are actually overlapping. For example, in the example
+  // above, the range ending with "b1" is overlapping with the range ending with
+  // "b2". So the size 1000+1100+1200 is an underestimation of data size up to
+  // "b1". In extreme cases where we only compact N L0 files, a range can
+  // overlap with N-1 other ranges. Since we requested a relatively large number
+  // (128) of ranges from each input files, even N range overlapping would
+  // cause relatively small inaccuracy.
+
   auto* c = compact_->compaction;
+  if (c->max_subcompactions() <= 1 &&
+      !(c->immutable_options()->compaction_pri == kRoundRobin &&
+        c->immutable_options()->compaction_style == kCompactionStyleLevel)) {
+    return;
+  }
   auto* cfd = c->column_family_data();
   const Comparator* cfd_comparator = cfd->user_comparator();
-  std::vector<Slice> bounds;
+  const InternalKeyComparator& icomp = cfd->internal_comparator();
+
+  auto* v = compact_->compaction->input_version();
+  int base_level = v->storage_info()->base_level();
+  InstrumentedMutexUnlock unlock_guard(db_mutex_);
+
+  uint64_t total_size = 0;
+  std::vector<TableReader::Anchor> all_anchors;
   int start_lvl = c->start_level();
   int out_lvl = c->output_level();
 
-  // Add the starting and/or ending key of certain input files as a potential
-  // boundary
   for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
     int lvl = c->level(lvl_idx);
     if (lvl >= start_lvl && lvl <= out_lvl) {
@@ -468,112 +500,115 @@ void CompactionJob::GenSubcompactionBoundaries() {
         continue;
       }
 
-      if (lvl == 0) {
-        // For level 0 add the starting and ending key of each file since the
-        // files may have greatly differing key ranges (not range-partitioned)
-        for (size_t i = 0; i < num_files; i++) {
-          bounds.emplace_back(flevel->files[i].smallest_key);
-          bounds.emplace_back(flevel->files[i].largest_key);
+      for (size_t i = 0; i < num_files; i++) {
+        FileMetaData* f = flevel->files[i].file_metadata;
+        std::vector<TableReader::Anchor> my_anchors;
+        Status s = cfd->table_cache()->ApproximateKeyAnchors(
+            ReadOptions(), icomp, *f, my_anchors);
+        if (!s.ok() || my_anchors.empty()) {
+          my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
         }
-      } else {
-        // For all other levels add the smallest/largest key in the level to
-        // encompass the range covered by that level
-        bounds.emplace_back(flevel->files[0].smallest_key);
-        bounds.emplace_back(flevel->files[num_files - 1].largest_key);
-        if (lvl == out_lvl) {
-          // For the last level include the starting keys of all files since
-          // the last level is the largest and probably has the widest key
-          // range. Since it's range partitioned, the ending key of one file
-          // and the starting key of the next are very close (or identical).
-          for (size_t i = 1; i < num_files; i++) {
-            bounds.emplace_back(flevel->files[i].smallest_key);
-          }
+        for (auto& ac : my_anchors) {
+          // Can be optimize to avoid this loop.
+          total_size += ac.range_size;
         }
+
+        all_anchors.insert(all_anchors.end(), my_anchors.begin(),
+                           my_anchors.end());
       }
     }
   }
-
-  std::sort(bounds.begin(), bounds.end(),
-            [cfd_comparator](const Slice& a, const Slice& b) -> bool {
-              return cfd_comparator->Compare(ExtractUserKey(a),
-                                             ExtractUserKey(b)) < 0;
-            });
-  // Remove duplicated entries from bounds
-  bounds.erase(
-      std::unique(bounds.begin(), bounds.end(),
-                  [cfd_comparator](const Slice& a, const Slice& b) -> bool {
-                    return cfd_comparator->Compare(ExtractUserKey(a),
-                                                   ExtractUserKey(b)) == 0;
+  // Here we total sort all the anchor points across all files and go through
+  // them in the sorted order to find partitioning boundaries.
+  // Not the most efficient implementation. A much more efficient algorithm
+  // probably exists. But they are more complex. If performance turns out to
+  // be a problem, we can optimize.
+  std::sort(
+      all_anchors.begin(), all_anchors.end(),
+      [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
+        return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) <
+               0;
+      });
+
+  // Remove duplicated entries from boundaries.
+  all_anchors.erase(
+      std::unique(all_anchors.begin(), all_anchors.end(),
+                  [cfd_comparator](TableReader::Anchor& a,
+                                   TableReader::Anchor& b) -> bool {
+                    return cfd_comparator->CompareWithoutTimestamp(
+                               a.user_key, b.user_key) == 0;
                   }),
-      bounds.end());
-
-  // Combine consecutive pairs of boundaries into ranges with an approximate
-  // size of data covered by keys in that range
-  uint64_t sum = 0;
-  std::vector<RangeWithSize> ranges;
-  // Get input version from CompactionState since it's already referenced
-  // earlier in SetInputVersioCompaction::SetInputVersion and will not change
-  // when db_mutex_ is released below
-  auto* v = compact_->compaction->input_version();
-  for (auto it = bounds.begin();;) {
-    const Slice a = *it;
-    ++it;
-
-    if (it == bounds.end()) {
-      break;
+      all_anchors.end());
+
+  // Get the number of planned subcompactions, may update reserve threads
+  // and update extra_num_subcompaction_threads_reserved_ for round-robin
+  uint64_t num_planned_subcompactions;
+  if (c->immutable_options()->compaction_pri == kRoundRobin &&
+      c->immutable_options()->compaction_style == kCompactionStyleLevel) {
+    // For round-robin compaction prioity, we need to employ more
+    // subcompactions (may exceed the max_subcompaction limit). The extra
+    // subcompactions will be executed using reserved threads and taken into
+    // account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
+
+    // Initialized by the number of input files
+    num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0));
+    uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
+    if (max_subcompactions_limit < num_planned_subcompactions) {
+      // Assert two pointers are not empty so that we can use extra
+      // subcompactions against db compaction limits
+      assert(bg_bottom_compaction_scheduled_ != nullptr);
+      assert(bg_compaction_scheduled_ != nullptr);
+      // Reserve resources when max_subcompaction is not sufficient
+      AcquireSubcompactionResources(
+          (int)(num_planned_subcompactions - max_subcompactions_limit));
+      // Subcompactions limit changes after acquiring additional resources.
+      // Need to call GetSubcompactionsLimit() again to update the number
+      // of planned subcompactions
+      num_planned_subcompactions =
+          std::min(num_planned_subcompactions, GetSubcompactionsLimit());
+    } else {
+      num_planned_subcompactions = max_subcompactions_limit;
     }
+  } else {
+    num_planned_subcompactions = GetSubcompactionsLimit();
+  }
 
-    const Slice b = *it;
+  TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
+                           &num_planned_subcompactions);
+  if (num_planned_subcompactions == 1) return;
 
-    // ApproximateSize could potentially create table reader iterator to seek
-    // to the index block and may incur I/O cost in the process. Unlock db
-    // mutex to reduce contention
-    db_mutex_->Unlock();
-    uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
-                                               b, start_lvl, out_lvl + 1,
-                                               TableReaderCaller::kCompaction);
-    db_mutex_->Lock();
-    ranges.emplace_back(a, b, size);
-    sum += size;
+  // Group the ranges into subcompactions
+  uint64_t target_range_size = std::max(
+      total_size / num_planned_subcompactions,
+      MaxFileSizeForLevel(
+          *(c->mutable_cf_options()), out_lvl,
+          c->immutable_options()->compaction_style, base_level,
+          c->immutable_options()->level_compaction_dynamic_level_bytes));
+
+  if (target_range_size >= total_size) {
+    return;
   }
 
-  // Group the ranges into subcompactions
-  const double min_file_fill_percent = 4.0 / 5;
-  int base_level = v->storage_info()->base_level();
-  uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
-      sum / min_file_fill_percent /
-      MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
-          c->immutable_cf_options()->compaction_style, base_level,
-          c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
-  uint64_t subcompactions =
-      std::min({static_cast<uint64_t>(ranges.size()),
-                static_cast<uint64_t>(c->max_subcompactions()),
-                max_output_files});
-
-  if (subcompactions > 1) {
-    double mean = sum * 1.0 / subcompactions;
-    // Greedily add ranges to the subcompaction until the sum of the ranges'
-    // sizes becomes >= the expected mean size of a subcompaction
-    sum = 0;
-    for (size_t i = 0; i + 1 < ranges.size(); i++) {
-      sum += ranges[i].size;
-      if (subcompactions == 1) {
-        // If there's only one left to schedule then it goes to the end so no
-        // need to put an end boundary
-        continue;
-      }
-      if (sum >= mean) {
-        boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
-        sizes_.emplace_back(sum);
-        subcompactions--;
-        sum = 0;
-      }
+  uint64_t next_threshold = target_range_size;
+  uint64_t cumulative_size = 0;
+  uint64_t num_actual_subcompactions = 1U;
+  for (TableReader::Anchor& anchor : all_anchors) {
+    cumulative_size += anchor.range_size;
+    if (cumulative_size > next_threshold) {
+      next_threshold += target_range_size;
+      num_actual_subcompactions++;
+      boundaries_.push_back(anchor.user_key);
+    }
+    if (num_actual_subcompactions == num_planned_subcompactions) {
+      break;
     }
-    sizes_.emplace_back(sum + ranges.back().size);
-  } else {
-    // Only one range so its size is the total sum of sizes computed above
-    sizes_.emplace_back(sum);
   }
+  TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
+                           &num_actual_subcompactions);
+  // Shrink extra subcompactions resources when extra resrouces are acquired
+  ShrinkSubcompactionResources(
+      std::min((int)(num_planned_subcompactions - num_actual_subcompactions),
+               extra_num_subcompaction_threads_reserved_));
 }
 
 Status CompactionJob::Run() {
@@ -585,7 +620,7 @@ Status CompactionJob::Run() {
 
   const size_t num_threads = compact_->sub_compact_states.size();
   assert(num_threads > 0);
-  const uint64_t start_micros = env_->NowMicros();
+  const uint64_t start_micros = db_options_.clock->NowMicros();
 
   // Launch a thread for each of subcompactions 1...num_threads-1
   std::vector<port::Thread> thread_pool;
@@ -604,16 +639,17 @@ Status CompactionJob::Run() {
     thread.join();
   }
 
-  compaction_stats_.micros = env_->NowMicros() - start_micros;
-  compaction_stats_.cpu_micros = 0;
-  for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
-    compaction_stats_.cpu_micros +=
-        compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
+  compaction_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
+
+  for (auto& state : compact_->sub_compact_states) {
+    compaction_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
+    state.RemoveLastEmptyOutput();
   }
 
-  RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
+  RecordTimeToHistogram(stats_, COMPACTION_TIME,
+                        compaction_stats_.stats.micros);
   RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
-                        compaction_stats_.cpu_micros);
+                        compaction_stats_.stats.cpu_micros);
 
   TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
 
@@ -629,7 +665,7 @@ Status CompactionJob::Run() {
       break;
     }
 
-    if (!state.blob_file_additions.empty()) {
+    if (state.Current().HasBlobFileAdditions()) {
       wrote_new_blob_files = true;
     }
   }
@@ -641,12 +677,16 @@ Status CompactionJob::Run() {
     constexpr IODebugContext* dbg = nullptr;
 
     if (output_directory_) {
-      io_s = output_directory_->Fsync(IOOptions(), dbg);
+      io_s = output_directory_->FsyncWithDirOptions(
+          IOOptions(), dbg,
+          DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
     }
 
     if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
         blob_output_directory_ != output_directory_) {
-      io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
+      io_s = blob_output_directory_->FsyncWithDirOptions(
+          IOOptions(), dbg,
+          DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
     }
   }
   if (io_status_.ok()) {
@@ -657,15 +697,15 @@ Status CompactionJob::Run() {
   }
   if (status.ok()) {
     thread_pool.clear();
-    std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
+    std::vector<const CompactionOutputs::Output*> files_output;
     for (const auto& state : compact_->sub_compact_states) {
-      for (const auto& output : state.outputs) {
+      for (const auto& output : state.GetOutputs()) {
         files_output.emplace_back(&output);
       }
     }
     ColumnFamilyData* cfd = compact_->compaction->column_family_data();
-    auto prefix_extractor =
-        compact_->compaction->mutable_cf_options()->prefix_extractor.get();
+    auto& prefix_extractor =
+        compact_->compaction->mutable_cf_options()->prefix_extractor;
     std::atomic<size_t> next_file_idx(0);
     auto verify_table = [&](Status& output_status) {
       while (true) {
@@ -674,11 +714,12 @@ Status CompactionJob::Run() {
           break;
         }
         // Verify that the table is usable
-        // We set for_compaction to false and don't OptimizeForCompactionTableRead
-        // here because this is a special case after we finish the table building
-        // No matter whether use_direct_io_for_flush_and_compaction is true,
-        // we will regard this verification as user reads since the goal is
-        // to cache it here for further user reads
+        // We set for_compaction to false and don't
+        // OptimizeForCompactionTableRead here because this is a special case
+        // after we finish the table building No matter whether
+        // use_direct_io_for_flush_and_compaction is true, we will regard this
+        // verification as user reads since the goal is to cache it here for
+        // further user reads
         ReadOptions read_options;
         InternalIterator* iter = cfd->table_cache()->NewIterator(
             read_options, file_options_, cfd->internal_comparator(),
@@ -724,13 +765,14 @@ Status CompactionJob::Run() {
       }
     };
     for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
-      thread_pool.emplace_back(verify_table,
-                               std::ref(compact_->sub_compact_states[i].status));
+      thread_pool.emplace_back(
+          verify_table, std::ref(compact_->sub_compact_states[i].status));
     }
     verify_table(compact_->sub_compact_states[0].status);
     for (auto& thread : thread_pool) {
       thread.join();
     }
+
     for (const auto& state : compact_->sub_compact_states) {
       if (!state.status.ok()) {
         status = state.status;
@@ -739,11 +781,15 @@ Status CompactionJob::Run() {
     }
   }
 
+  ReleaseSubcompactionResources();
+  TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:0");
+  TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:1");
+
   TablePropertiesCollection tp;
   for (const auto& state : compact_->sub_compact_states) {
-    for (const auto& output : state.outputs) {
+    for (const auto& output : state.GetOutputs()) {
       auto fn =
-          TableFileName(state.compaction->immutable_cf_options()->cf_paths,
+          TableFileName(state.compaction->immutable_options()->cf_paths,
                         output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
       tp[fn] = output.table_properties;
     }
@@ -751,7 +797,7 @@ Status CompactionJob::Run() {
   compact_->compaction->SetOutputTableProperties(std::move(tp));
 
   // Finish up all book-keeping to unify the subcompaction results
-  AggregateStatistics();
+  compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
   UpdateCompactionStats();
 
   RecordCompactionIOStats();
@@ -773,8 +819,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
   ColumnFamilyData* cfd = compact_->compaction->column_family_data();
   assert(cfd);
 
-  cfd->internal_stats()->AddCompactionStats(
-      compact_->compaction->output_level(), thread_pri_, compaction_stats_);
+  int output_level = compact_->compaction->output_level();
+  cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_,
+                                            compaction_stats_);
 
   if (status.ok()) {
     status = InstallCompactionResults(mutable_cf_options);
@@ -785,77 +832,98 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
 
   VersionStorageInfo::LevelSummaryStorage tmp;
   auto vstorage = cfd->current()->storage_info();
-  const auto& stats = compaction_stats_;
+  const auto& stats = compaction_stats_.stats;
 
   double read_write_amp = 0.0;
   double write_amp = 0.0;
   double bytes_read_per_sec = 0;
   double bytes_written_per_sec = 0;
 
-  if (stats.bytes_read_non_output_levels > 0) {
-    read_write_amp = (stats.bytes_written + stats.bytes_read_output_level +
-                      stats.bytes_read_non_output_levels) /
-                     static_cast<double>(stats.bytes_read_non_output_levels);
-    write_amp = stats.bytes_written /
-                static_cast<double>(stats.bytes_read_non_output_levels);
+  const uint64_t bytes_read_non_output_and_blob =
+      stats.bytes_read_non_output_levels + stats.bytes_read_blob;
+  const uint64_t bytes_read_all =
+      stats.bytes_read_output_level + bytes_read_non_output_and_blob;
+  const uint64_t bytes_written_all =
+      stats.bytes_written + stats.bytes_written_blob;
+
+  if (bytes_read_non_output_and_blob > 0) {
+    read_write_amp = (bytes_written_all + bytes_read_all) /
+                     static_cast<double>(bytes_read_non_output_and_blob);
+    write_amp =
+        bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
   }
   if (stats.micros > 0) {
-    bytes_read_per_sec =
-        (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
-        static_cast<double>(stats.micros);
+    bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
     bytes_written_per_sec =
-        stats.bytes_written / static_cast<double>(stats.micros);
+        bytes_written_all / static_cast<double>(stats.micros);
   }
 
   const std::string& column_family_name = cfd->GetName();
 
+  constexpr double kMB = 1048576.0;
+
   ROCKS_LOG_BUFFER(
       log_buffer_,
       "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
-      "files in(%d, %d) out(%d) "
-      "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
-      "write-amplify(%.1f) %s, records in: %" PRIu64
+      "files in(%d, %d) out(%d +%d blob) "
+      "MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
+      "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
       ", records dropped: %" PRIu64 " output_compression: %s\n",
       column_family_name.c_str(), vstorage->LevelSummary(&tmp),
       bytes_read_per_sec, bytes_written_per_sec,
       compact_->compaction->output_level(),
       stats.num_input_files_in_non_output_levels,
       stats.num_input_files_in_output_level, stats.num_output_files,
-      stats.bytes_read_non_output_levels / 1048576.0,
-      stats.bytes_read_output_level / 1048576.0,
-      stats.bytes_written / 1048576.0, read_write_amp, write_amp,
-      status.ToString().c_str(), stats.num_input_records,
+      stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
+      stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
+      stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp,
+      write_amp, status.ToString().c_str(), stats.num_input_records,
       stats.num_dropped_records,
       CompressionTypeToString(compact_->compaction->output_compression())
           .c_str());
 
   const auto& blob_files = vstorage->GetBlobFiles();
   if (!blob_files.empty()) {
-    ROCKS_LOG_BUFFER(log_buffer_,
-                     "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
-                     "\n",
-                     column_family_name.c_str(), blob_files.begin()->first,
-                     blob_files.rbegin()->first);
+    assert(blob_files.front());
+    assert(blob_files.back());
+
+    ROCKS_LOG_BUFFER(
+        log_buffer_,
+        "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
+        column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
+        blob_files.back()->GetBlobFileNumber());
+  }
+
+  if (compaction_stats_.has_penultimate_level_output) {
+    ROCKS_LOG_BUFFER(
+        log_buffer_,
+        "[%s] has Penultimate Level output: %" PRIu64
+        ", level %d, number of files: %" PRIu64 ", number of records: %" PRIu64,
+        column_family_name.c_str(),
+        compaction_stats_.penultimate_level_stats.bytes_written,
+        compact_->compaction->GetPenultimateLevel(),
+        compaction_stats_.penultimate_level_stats.num_output_files,
+        compaction_stats_.penultimate_level_stats.num_output_records);
   }
 
   UpdateCompactionJobStats(stats);
 
-  auto stream = event_logger_->LogToBuffer(log_buffer_);
+  auto stream = event_logger_->LogToBuffer(log_buffer_, 8192);
   stream << "job" << job_id_ << "event"
          << "compaction_finished"
          << "compaction_time_micros" << stats.micros
          << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
          << compact_->compaction->output_level() << "num_output_files"
-         << compact_->num_output_files << "total_output_size"
-         << compact_->total_bytes;
+         << stats.num_output_files << "total_output_size"
+         << stats.bytes_written;
 
-  if (compact_->num_blob_output_files > 0) {
-    stream << "num_blob_output_files" << compact_->num_blob_output_files
-           << "total_blob_output_size" << compact_->total_blob_bytes;
+  if (stats.num_output_files_blob > 0) {
+    stream << "num_blob_output_files" << stats.num_output_files_blob
+           << "total_blob_output_size" << stats.bytes_written_blob;
   }
 
   stream << "num_input_records" << stats.num_input_records
-         << "num_output_records" << compact_->num_output_records
+         << "num_output_records" << stats.num_output_records
          << "num_subcompactions" << compact_->sub_compact_states.size()
          << "output_compression"
          << CompressionTypeToString(compact_->compaction->output_compression());
@@ -882,19 +950,109 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
   stream.EndArray();
 
   if (!blob_files.empty()) {
-    stream << "blob_file_head" << blob_files.begin()->first;
-    stream << "blob_file_tail" << blob_files.rbegin()->first;
+    assert(blob_files.front());
+    stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
+
+    assert(blob_files.back());
+    stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
+  }
+
+  if (compaction_stats_.has_penultimate_level_output) {
+    InternalStats::CompactionStats& pl_stats =
+        compaction_stats_.penultimate_level_stats;
+    stream << "penultimate_level_num_output_files" << pl_stats.num_output_files;
+    stream << "penultimate_level_bytes_written" << pl_stats.bytes_written;
+    stream << "penultimate_level_num_output_records"
+           << pl_stats.num_output_records;
+    stream << "penultimate_level_num_output_files_blob"
+           << pl_stats.num_output_files_blob;
+    stream << "penultimate_level_bytes_written_blob"
+           << pl_stats.bytes_written_blob;
   }
 
   CleanupCompaction();
   return status;
 }
 
+void CompactionJob::NotifyOnSubcompactionBegin(
+    SubcompactionState* sub_compact) {
+#ifndef ROCKSDB_LITE
+  Compaction* c = compact_->compaction;
+
+  if (db_options_.listeners.empty()) {
+    return;
+  }
+  if (shutting_down_->load(std::memory_order_acquire)) {
+    return;
+  }
+  if (c->is_manual_compaction() &&
+      manual_compaction_canceled_.load(std::memory_order_acquire)) {
+    return;
+  }
+
+  sub_compact->notify_on_subcompaction_completion = true;
+
+  SubcompactionJobInfo info{};
+  sub_compact->BuildSubcompactionJobInfo(info);
+  info.job_id = static_cast<int>(job_id_);
+  info.thread_id = env_->GetThreadID();
+
+  for (const auto& listener : db_options_.listeners) {
+    listener->OnSubcompactionBegin(info);
+  }
+  info.status.PermitUncheckedError();
+
+#else
+  (void)sub_compact;
+#endif  // ROCKSDB_LITE
+}
+
+void CompactionJob::NotifyOnSubcompactionCompleted(
+    SubcompactionState* sub_compact) {
+#ifndef ROCKSDB_LITE
+
+  if (db_options_.listeners.empty()) {
+    return;
+  }
+  if (shutting_down_->load(std::memory_order_acquire)) {
+    return;
+  }
+
+  if (sub_compact->notify_on_subcompaction_completion == false) {
+    return;
+  }
+
+  SubcompactionJobInfo info{};
+  sub_compact->BuildSubcompactionJobInfo(info);
+  info.job_id = static_cast<int>(job_id_);
+  info.thread_id = env_->GetThreadID();
+
+  for (const auto& listener : db_options_.listeners) {
+    listener->OnSubcompactionCompleted(info);
+  }
+#else
+  (void)sub_compact;
+#endif  // ROCKSDB_LITE
+}
+
 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   assert(sub_compact);
   assert(sub_compact->compaction);
 
-  uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
+#ifndef ROCKSDB_LITE
+  if (db_options_.compaction_service) {
+    CompactionServiceJobStatus comp_status =
+        ProcessKeyValueCompactionWithCompactionService(sub_compact);
+    if (comp_status == CompactionServiceJobStatus::kSuccess ||
+        comp_status == CompactionServiceJobStatus::kFailure) {
+      return;
+    }
+    // fallback to local compaction
+    assert(comp_status == CompactionServiceJobStatus::kUseLocal);
+  }
+#endif  // !ROCKSDB_LITE
+
+  uint64_t prev_cpu_micros = db_options_.clock->CPUMicros();
 
   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
 
@@ -915,22 +1073,115 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     return;
   }
 
-  CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
-                                             existing_snapshots_);
+  NotifyOnSubcompactionBegin(sub_compact);
+
+  auto range_del_agg = std::make_unique<CompactionRangeDelAggregator>(
+      &cfd->internal_comparator(), existing_snapshots_, &full_history_ts_low_,
+      &trim_ts_);
+
+  // TODO: since we already use C++17, should use
+  // std::optional<const Slice> instead.
+  const std::optional<Slice> start = sub_compact->start;
+  const std::optional<Slice> end = sub_compact->end;
+
+  std::optional<Slice> start_without_ts;
+  std::optional<Slice> end_without_ts;
+
   ReadOptions read_options;
   read_options.verify_checksums = true;
   read_options.fill_cache = false;
+  read_options.rate_limiter_priority = GetRateLimiterPriority();
   // Compaction iterators shouldn't be confined to a single prefix.
   // Compactions use Seek() for
   // (a) concurrent compactions,
   // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
   read_options.total_order_seek = true;
 
+  // Remove the timestamps from boundaries because boundaries created in
+  // GenSubcompactionBoundaries doesn't strip away the timestamp.
+  size_t ts_sz = cfd->user_comparator()->timestamp_size();
+  if (start.has_value()) {
+    read_options.iterate_lower_bound = &start.value();
+    if (ts_sz > 0) {
+      start_without_ts = StripTimestampFromUserKey(start.value(), ts_sz);
+      read_options.iterate_lower_bound = &start_without_ts.value();
+    }
+  }
+  if (end.has_value()) {
+    read_options.iterate_upper_bound = &end.value();
+    if (ts_sz > 0) {
+      end_without_ts = StripTimestampFromUserKey(end.value(), ts_sz);
+      read_options.iterate_upper_bound = &end_without_ts.value();
+    }
+  }
+
   // Although the v2 aggregator is what the level iterator(s) know about,
   // the AddTombstones calls will be propagated down to the v1 aggregator.
-  std::unique_ptr<InternalIterator> input(
-      versions_->MakeInputIterator(read_options, sub_compact->compaction,
-                                   &range_del_agg, file_options_for_read_));
+  std::unique_ptr<InternalIterator> raw_input(versions_->MakeInputIterator(
+      read_options, sub_compact->compaction, range_del_agg.get(),
+      file_options_for_read_, start, end));
+  InternalIterator* input = raw_input.get();
+
+  IterKey start_ikey;
+  IterKey end_ikey;
+  Slice start_slice;
+  Slice end_slice;
+
+  static constexpr char kMaxTs[] =
+      "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
+  Slice ts_slice;
+  std::string max_ts;
+  if (ts_sz > 0) {
+    if (ts_sz <= strlen(kMaxTs)) {
+      ts_slice = Slice(kMaxTs, ts_sz);
+    } else {
+      max_ts = std::string(ts_sz, '\xff');
+      ts_slice = Slice(max_ts);
+    }
+  }
+
+  if (start.has_value()) {
+    start_ikey.SetInternalKey(start.value(), kMaxSequenceNumber,
+                              kValueTypeForSeek);
+    if (ts_sz > 0) {
+      start_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
+                                   &ts_slice);
+    }
+    start_slice = start_ikey.GetInternalKey();
+  }
+  if (end.has_value()) {
+    end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek);
+    if (ts_sz > 0) {
+      end_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
+                                 &ts_slice);
+    }
+    end_slice = end_ikey.GetInternalKey();
+  }
+
+  std::unique_ptr<InternalIterator> clip;
+  if (start.has_value() || end.has_value()) {
+    clip = std::make_unique<ClippingIterator>(
+        raw_input.get(), start.has_value() ? &start_slice : nullptr,
+        end.has_value() ? &end_slice : nullptr, &cfd->internal_comparator());
+    input = clip.get();
+  }
+
+  std::unique_ptr<InternalIterator> blob_counter;
+
+  if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
+    BlobGarbageMeter* meter = sub_compact->Current().CreateBlobGarbageMeter();
+    blob_counter = std::make_unique<BlobCountingIterator>(input, meter);
+    input = blob_counter.get();
+  }
+
+  std::unique_ptr<InternalIterator> trim_history_iter;
+  if (ts_sz > 0 && !trim_ts_.empty()) {
+    trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
+        input, cfd->user_comparator(), trim_ts_);
+    input = trim_history_iter.get();
+  }
+
+  input->SeekToFirst();
 
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
@@ -956,12 +1207,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   }
 
   MergeHelper merge(
-      env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
+      env_, cfd->user_comparator(), cfd->ioptions()->merge_operator.get(),
       compaction_filter, db_options_.info_log.get(),
       false /* internal key corruption is expected */,
       existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
-      snapshot_checker_, compact_->compaction->level(),
-      db_options_.statistics.get());
+      snapshot_checker_, compact_->compaction->level(), db_options_.stats);
 
   const MutableCFOptions* mutable_cf_options =
       sub_compact->compaction->mutable_cf_options();
@@ -969,73 +1219,77 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
 
   std::vector<std::string> blob_file_paths;
 
+  // TODO: BlobDB to support output_to_penultimate_level compaction, which needs
+  //  2 builders, so may need to move to `CompactionOutputs`
   std::unique_ptr<BlobFileBuilder> blob_file_builder(
-      mutable_cf_options->enable_blob_files
+      (mutable_cf_options->enable_blob_files &&
+       sub_compact->compaction->output_level() >=
+           mutable_cf_options->blob_file_starting_level)
           ? new BlobFileBuilder(
-                versions_, env_, fs_.get(),
-                sub_compact->compaction->immutable_cf_options(),
-                mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
-                cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
-                &blob_file_paths, &sub_compact->blob_file_additions)
+                versions_, fs_.get(),
+                sub_compact->compaction->immutable_options(),
+                mutable_cf_options, &file_options_, db_id_, db_session_id_,
+                job_id_, cfd->GetID(), cfd->GetName(), Env::IOPriority::IO_LOW,
+                write_hint_, io_tracer_, blob_callback_,
+                BlobFileCreationReason::kCompaction, &blob_file_paths,
+                sub_compact->Current().GetBlobFileAdditionsPtr())
           : nullptr);
 
   TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
   TEST_SYNC_POINT_CALLBACK(
       "CompactionJob::Run():PausingManualCompaction:1",
       reinterpret_cast<void*>(
-          const_cast<std::atomic<int>*>(manual_compaction_paused_)));
-
-  Slice* start = sub_compact->start;
-  Slice* end = sub_compact->end;
-  if (start != nullptr) {
-    IterKey start_iter;
-    start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
-    input->Seek(start_iter.GetInternalKey());
-  } else {
-    input->SeekToFirst();
-  }
+          const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
 
-  Status status;
   const std::string* const full_history_ts_low =
       full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
-  sub_compact->c_iter.reset(new CompactionIterator(
-      input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
-      &existing_snapshots_, earliest_write_conflict_snapshot_,
+  const SequenceNumber job_snapshot_seq =
+      job_context_ ? job_context_->GetJobSnapshotSequence()
+                   : kMaxSequenceNumber;
+
+  auto c_iter = std::make_unique<CompactionIterator>(
+      input, cfd->user_comparator(), &merge, versions_->LastSequence(),
+      &existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq,
       snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
-      /*expect_valid_internal_key=*/true, &range_del_agg,
+      /*expect_valid_internal_key=*/true, range_del_agg.get(),
       blob_file_builder.get(), db_options_.allow_data_in_errors,
+      db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
       sub_compact->compaction, compaction_filter, shutting_down_,
-      preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
-      full_history_ts_low));
-  auto c_iter = sub_compact->c_iter.get();
+      db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_,
+      preclude_last_level_min_seqno_);
   c_iter->SeekToFirst();
-  if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
-    // ShouldStopBefore() maintains state based on keys processed so far. The
-    // compaction loop always calls it on the "next" key, thus won't tell it the
-    // first key. So we do that here.
-    sub_compact->ShouldStopBefore(c_iter->key(),
-                                  sub_compact->current_output_file_size);
-  }
+
+  // Assign range delete aggregator to the target output level, which makes sure
+  // it only output to single level
+  sub_compact->AssignRangeDelAggregator(std::move(range_del_agg));
+
   const auto& c_iter_stats = c_iter->iter_stats();
 
-  std::unique_ptr<SstPartitioner> partitioner =
-      sub_compact->compaction->output_level() == 0
-          ? nullptr
-          : sub_compact->compaction->CreateSstPartitioner();
-  std::string last_key_for_partitioner;
+  // define the open and close functions for the compaction files, which will be
+  // used open/close output files when needed.
+  const CompactionFileOpenFunc open_file_func =
+      [this, sub_compact](CompactionOutputs& outputs) {
+        return this->OpenCompactionOutputFile(sub_compact, outputs);
+      };
+  const CompactionFileCloseFunc close_file_func =
+      [this, sub_compact](CompactionOutputs& outputs, const Status& status,
+                          const Slice& next_table_min_key) {
+        return this->FinishCompactionOutputFile(status, sub_compact, outputs,
+                                                next_table_min_key);
+      };
 
+  Status status;
+  TEST_SYNC_POINT_CALLBACK(
+      "CompactionJob::ProcessKeyValueCompaction()::Processing",
+      reinterpret_cast<void*>(
+          const_cast<Compaction*>(sub_compact->compaction)));
   while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
     // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
     // returns true.
-    const Slice& key = c_iter->key();
-    const Slice& value = c_iter->value();
 
-    // If an end key (exclusive) is specified, check if the current key is
-    // >= than it and exit if it is because the iterator is out of its range
-    if (end != nullptr &&
-        cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
-      break;
-    }
+    assert(!end.has_value() || cfd->user_comparator()->Compare(
+                                   c_iter->user_key(), end.value()) < 0);
+
     if (c_iter_stats.num_input_records % kRecordStatsEvery ==
         kRecordStatsEvery - 1) {
       RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
@@ -1043,82 +1297,30 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
       RecordCompactionIOStats();
     }
 
-    // Open output file if necessary
-    if (sub_compact->builder == nullptr) {
-      status = OpenCompactionOutputFile(sub_compact);
-      if (!status.ok()) {
-        break;
-      }
-    }
-    status = sub_compact->AddToBuilder(key, value);
+    // Add current compaction_iterator key to target compaction output, if the
+    // output file needs to be close or open, it will call the `open_file_func`
+    // and `close_file_func`.
+    // TODO: it would be better to have the compaction file open/close moved
+    // into `CompactionOutputs` which has the output file information.
+    status = sub_compact->AddToOutput(*c_iter, open_file_func, close_file_func);
     if (!status.ok()) {
       break;
     }
 
-    sub_compact->current_output_file_size =
-        sub_compact->builder->EstimatedFileSize();
-    const ParsedInternalKey& ikey = c_iter->ikey();
-    sub_compact->current_output()->meta.UpdateBoundaries(
-        key, value, ikey.sequence, ikey.type);
-    sub_compact->num_output_records++;
-
-    // Close output file if it is big enough. Two possibilities determine it's
-    // time to close it: (1) the current key should be this file's last key, (2)
-    // the next key should not be in this file.
-    //
-    // TODO(aekmekji): determine if file should be closed earlier than this
-    // during subcompactions (i.e. if output size, estimated by input size, is
-    // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
-    // and 0.6MB instead of 1MB and 0.2MB)
-    bool output_file_ended = false;
-    if (sub_compact->compaction->output_level() != 0 &&
-        sub_compact->current_output_file_size >=
-            sub_compact->compaction->max_output_file_size()) {
-      // (1) this key terminates the file. For historical reasons, the iterator
-      // status before advancing will be given to FinishCompactionOutputFile().
-      output_file_ended = true;
-    }
     TEST_SYNC_POINT_CALLBACK(
         "CompactionJob::Run():PausingManualCompaction:2",
         reinterpret_cast<void*>(
-            const_cast<std::atomic<int>*>(manual_compaction_paused_)));
-    if (partitioner.get()) {
-      last_key_for_partitioner.assign(c_iter->user_key().data_,
-                                      c_iter->user_key().size_);
-    }
+            const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
     c_iter->Next();
     if (c_iter->status().IsManualCompactionPaused()) {
       break;
     }
-    if (!output_file_ended && c_iter->Valid()) {
-      if (((partitioner.get() &&
-            partitioner->ShouldPartition(PartitionerRequest(
-                last_key_for_partitioner, c_iter->user_key(),
-                sub_compact->current_output_file_size)) == kRequired) ||
-           (sub_compact->compaction->output_level() != 0 &&
-            sub_compact->ShouldStopBefore(
-                c_iter->key(), sub_compact->current_output_file_size))) &&
-          sub_compact->builder != nullptr) {
-        // (2) this key belongs to the next file. For historical reasons, the
-        // iterator status after advancing will be given to
-        // FinishCompactionOutputFile().
-        output_file_ended = true;
-      }
-    }
-    if (output_file_ended) {
-      const Slice* next_key = nullptr;
-      if (c_iter->Valid()) {
-        next_key = &c_iter->key();
-      }
-      CompactionIterationStats range_del_out_stats;
-      status = FinishCompactionOutputFile(input->status(), sub_compact,
-                                          &range_del_agg, &range_del_out_stats,
-                                          next_key);
-      RecordDroppedKeys(range_del_out_stats,
-                        &sub_compact->compaction_job_stats);
-    }
   }
 
+  sub_compact->compaction_job_stats.num_blobs_read =
+      c_iter_stats.num_blobs_read;
+  sub_compact->compaction_job_stats.total_blob_bytes_read =
+      c_iter_stats.total_blob_bytes_read;
   sub_compact->compaction_job_stats.num_input_deletion_records =
       c_iter_stats.num_input_deletion_records;
   sub_compact->compaction_job_stats.num_corrupt_keys =
@@ -1134,6 +1336,16 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
 
   RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
              c_iter_stats.total_filter_time);
+
+  if (c_iter_stats.num_blobs_relocated > 0) {
+    RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
+               c_iter_stats.num_blobs_relocated);
+  }
+  if (c_iter_stats.total_blob_bytes_relocated > 0) {
+    RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
+               c_iter_stats.total_blob_bytes_relocated);
+  }
+
   RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
   RecordCompactionIOStats();
 
@@ -1146,8 +1358,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     status = Status::ShutdownInProgress("Database shutdown");
   }
   if ((status.ok() || status.IsColumnFamilyDropped()) &&
-      (manual_compaction_paused_ &&
-       manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
+      (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
     status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
   }
   if (status.ok()) {
@@ -1157,34 +1368,25 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     status = c_iter->status();
   }
 
-  if (status.ok() && sub_compact->builder == nullptr &&
-      sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
-    // handle subcompaction containing only range deletions
-    status = OpenCompactionOutputFile(sub_compact);
-  }
-
   // Call FinishCompactionOutputFile() even if status is not ok: it needs to
-  // close the output file.
-  if (sub_compact->builder != nullptr) {
-    CompactionIterationStats range_del_out_stats;
-    Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
-                                          &range_del_out_stats);
-    if (!s.ok() && status.ok()) {
-      status = s;
-    }
-    RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
-  }
+  // close the output files. Open file function is also passed, in case there's
+  // only range-dels, no file was opened, to save the range-dels, it need to
+  // create a new output file.
+  status = sub_compact->CloseCompactionFiles(status, open_file_func,
+                                             close_file_func);
 
   if (blob_file_builder) {
     if (status.ok()) {
       status = blob_file_builder->Finish();
+    } else {
+      blob_file_builder->Abandon(status);
     }
-
     blob_file_builder.reset();
+    sub_compact->Current().UpdateBlobStats();
   }
 
   sub_compact->compaction_job_stats.cpu_micros =
-      env_->NowCPUNanos() / 1000 - prev_cpu_micros;
+      db_options_.clock->CPUMicros() - prev_cpu_micros;
 
   if (measure_io_stats_) {
     sub_compact->compaction_job_stats.file_write_nanos +=
@@ -1205,8 +1407,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   }
 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
   if (!status.ok()) {
-    if (sub_compact->c_iter) {
-      sub_compact->c_iter->status().PermitUncheckedError();
+    if (c_iter) {
+      c_iter->status().PermitUncheckedError();
     }
     if (input) {
       input->status().PermitUncheckedError();
@@ -1214,9 +1416,15 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   }
 #endif  // ROCKSDB_ASSERT_STATUS_CHECKED
 
-  sub_compact->c_iter.reset();
-  input.reset();
+  blob_counter.reset();
+  clip.reset();
+  raw_input.reset();
   sub_compact->status = status;
+  NotifyOnSubcompactionCompleted(sub_compact);
+}
+
+uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const {
+  return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id;
 }
 
 void CompactionJob::RecordDroppedKeys(
@@ -1258,225 +1466,81 @@ void CompactionJob::RecordDroppedKeys(
 
 Status CompactionJob::FinishCompactionOutputFile(
     const Status& input_status, SubcompactionState* sub_compact,
-    CompactionRangeDelAggregator* range_del_agg,
-    CompactionIterationStats* range_del_out_stats,
-    const Slice* next_table_min_key /* = nullptr */) {
+    CompactionOutputs& outputs, const Slice& next_table_min_key) {
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
   assert(sub_compact != nullptr);
-  assert(sub_compact->outfile);
-  assert(sub_compact->builder != nullptr);
-  assert(sub_compact->current_output() != nullptr);
+  assert(outputs.HasBuilder());
 
-  uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
+  FileMetaData* meta = outputs.GetMetaData();
+  uint64_t output_number = meta->fd.GetNumber();
   assert(output_number != 0);
 
   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
-  const Comparator* ucmp = cfd->user_comparator();
   std::string file_checksum = kUnknownFileChecksum;
   std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
 
   // Check for iterator errors
   Status s = input_status;
-  auto meta = &sub_compact->current_output()->meta;
-  assert(meta != nullptr);
-  if (s.ok()) {
-    Slice lower_bound_guard, upper_bound_guard;
-    std::string smallest_user_key;
-    const Slice *lower_bound, *upper_bound;
-    bool lower_bound_from_sub_compact = false;
-    if (sub_compact->outputs.size() == 1) {
-      // For the first output table, include range tombstones before the min key
-      // but after the subcompaction boundary.
-      lower_bound = sub_compact->start;
-      lower_bound_from_sub_compact = true;
-    } else if (meta->smallest.size() > 0) {
-      // For subsequent output tables, only include range tombstones from min
-      // key onwards since the previous file was extended to contain range
-      // tombstones falling before min key.
-      smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
-      lower_bound_guard = Slice(smallest_user_key);
-      lower_bound = &lower_bound_guard;
-    } else {
-      lower_bound = nullptr;
-    }
-    if (next_table_min_key != nullptr) {
-      // This may be the last file in the subcompaction in some cases, so we
-      // need to compare the end key of subcompaction with the next file start
-      // key. When the end key is chosen by the subcompaction, we know that
-      // it must be the biggest key in output file. Therefore, it is safe to
-      // use the smaller key as the upper bound of the output file, to ensure
-      // that there is no overlapping between different output files.
-      upper_bound_guard = ExtractUserKey(*next_table_min_key);
-      if (sub_compact->end != nullptr &&
-          ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
-        upper_bound = sub_compact->end;
-      } else {
-        upper_bound = &upper_bound_guard;
-      }
-    } else {
-      // This is the last file in the subcompaction, so extend until the
-      // subcompaction ends.
-      upper_bound = sub_compact->end;
-    }
-    auto earliest_snapshot = kMaxSequenceNumber;
-    if (existing_snapshots_.size() > 0) {
-      earliest_snapshot = existing_snapshots_[0];
-    }
-    bool has_overlapping_endpoints;
-    if (upper_bound != nullptr && meta->largest.size() > 0) {
-      has_overlapping_endpoints =
-          ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
-    } else {
-      has_overlapping_endpoints = false;
-    }
 
-    // The end key of the subcompaction must be bigger or equal to the upper
-    // bound. If the end of subcompaction is null or the upper bound is null,
-    // it means that this file is the last file in the compaction. So there
-    // will be no overlapping between this file and others.
-    assert(sub_compact->end == nullptr ||
-           upper_bound == nullptr ||
-           ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
-    auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
-                                         has_overlapping_endpoints);
-    // Position the range tombstone output iterator. There may be tombstone
-    // fragments that are entirely out of range, so make sure that we do not
-    // include those.
-    if (lower_bound != nullptr) {
-      it->Seek(*lower_bound);
-    } else {
-      it->SeekToFirst();
+  // Add range tombstones
+  auto earliest_snapshot = kMaxSequenceNumber;
+  if (existing_snapshots_.size() > 0) {
+    earliest_snapshot = existing_snapshots_[0];
+  }
+  if (s.ok()) {
+    CompactionIterationStats range_del_out_stats;
+    // if the compaction supports per_key_placement, only output range dels to
+    // the penultimate level.
+    // Note: Use `bottommost_level_ = true` for both bottommost and
+    // output_to_penultimate_level compaction here, as it's only used to decide
+    // if range dels could be dropped.
+    if (outputs.HasRangeDel()) {
+      s = outputs.AddRangeDels(
+          sub_compact->start.has_value() ? &(sub_compact->start.value())
+                                         : nullptr,
+          sub_compact->end.has_value() ? &(sub_compact->end.value()) : nullptr,
+          range_del_out_stats, bottommost_level_, cfd->internal_comparator(),
+          earliest_snapshot, next_table_min_key, full_history_ts_low_);
     }
+    RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
     TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
-    for (; it->Valid(); it->Next()) {
-      auto tombstone = it->Tombstone();
-      if (upper_bound != nullptr) {
-        int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
-        if ((has_overlapping_endpoints && cmp < 0) ||
-            (!has_overlapping_endpoints && cmp <= 0)) {
-          // Tombstones starting after upper_bound only need to be included in
-          // the next table. If the current SST ends before upper_bound, i.e.,
-          // `has_overlapping_endpoints == false`, we can also skip over range
-          // tombstones that start exactly at upper_bound. Such range tombstones
-          // will be included in the next file and are not relevant to the point
-          // keys or endpoints of the current file.
-          break;
-        }
-      }
+  }
 
-      if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
-        // TODO(andrewkr): tombstones that span multiple output files are
-        // counted for each compaction output file, so lots of double counting.
-        range_del_out_stats->num_range_del_drop_obsolete++;
-        range_del_out_stats->num_record_drop_obsolete++;
-        continue;
-      }
+  const uint64_t current_entries = outputs.NumEntries();
 
-      auto kv = tombstone.Serialize();
-      assert(lower_bound == nullptr ||
-             ucmp->Compare(*lower_bound, kv.second) < 0);
-      // Range tombstone is not supported by output validator yet.
-      sub_compact->builder->Add(kv.first.Encode(), kv.second);
-      InternalKey smallest_candidate = std::move(kv.first);
-      if (lower_bound != nullptr &&
-          ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
-        // Pretend the smallest key has the same user key as lower_bound
-        // (the max key in the previous table or subcompaction) in order for
-        // files to appear key-space partitioned.
-        //
-        // When lower_bound is chosen by a subcompaction, we know that
-        // subcompactions over smaller keys cannot contain any keys at
-        // lower_bound. We also know that smaller subcompactions exist, because
-        // otherwise the subcompaction woud be unbounded on the left. As a
-        // result, we know that no other files on the output level will contain
-        // actual keys at lower_bound (an output file may have a largest key of
-        // lower_bound@kMaxSequenceNumber, but this only indicates a large range
-        // tombstone was truncated). Therefore, it is safe to use the
-        // tombstone's sequence number, to ensure that keys at lower_bound at
-        // lower levels are covered by truncated tombstones.
-        //
-        // If lower_bound was chosen by the smallest data key in the file,
-        // choose lowest seqnum so this file's smallest internal key comes after
-        // the previous file's largest. The fake seqnum is OK because the read
-        // path's file-picking code only considers user key.
-        smallest_candidate = InternalKey(
-            *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
-            kTypeRangeDeletion);
-      }
-      InternalKey largest_candidate = tombstone.SerializeEndKey();
-      if (upper_bound != nullptr &&
-          ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
-        // Pretend the largest key has the same user key as upper_bound (the
-        // min key in the following table or subcompaction) in order for files
-        // to appear key-space partitioned.
-        //
-        // Choose highest seqnum so this file's largest internal key comes
-        // before the next file's/subcompaction's smallest. The fake seqnum is
-        // OK because the read path's file-picking code only considers the user
-        // key portion.
-        //
-        // Note Seek() also creates InternalKey with (user_key,
-        // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
-        // kTypeRangeDeletion (0xF), so the range tombstone comes before the
-        // Seek() key in InternalKey's ordering. So Seek() will look in the
-        // next file for the user key.
-        largest_candidate =
-            InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
-      }
-#ifndef NDEBUG
-      SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
-      if (meta->smallest.size() > 0) {
-        smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
+  s = outputs.Finish(s, seqno_time_mapping_);
+
+  if (s.ok()) {
+    // With accurate smallest and largest key, we can get a slightly more
+    // accurate oldest ancester time.
+    // This makes oldest ancester time in manifest more accurate than in
+    // table properties. Not sure how to resolve it.
+    if (meta->smallest.size() > 0 && meta->largest.size() > 0) {
+      uint64_t refined_oldest_ancester_time;
+      Slice new_smallest = meta->smallest.user_key();
+      Slice new_largest = meta->largest.user_key();
+      if (!new_largest.empty() && !new_smallest.empty()) {
+        refined_oldest_ancester_time =
+            sub_compact->compaction->MinInputFileOldestAncesterTime(
+                &(meta->smallest), &(meta->largest));
+        if (refined_oldest_ancester_time !=
+            std::numeric_limits<uint64_t>::max()) {
+          meta->oldest_ancester_time = refined_oldest_ancester_time;
+        }
       }
-#endif
-      meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
-                                     tombstone.seq_,
-                                     cfd->internal_comparator());
-
-      // The smallest key in a file is used for range tombstone truncation, so
-      // it cannot have a seqnum of 0 (unless the smallest data key in a file
-      // has a seqnum of 0). Otherwise, the truncated tombstone may expose
-      // deleted keys at lower levels.
-      assert(smallest_ikey_seqnum == 0 ||
-             ExtractInternalKeyFooter(meta->smallest.Encode()) !=
-                 PackSequenceAndType(0, kTypeRangeDeletion));
     }
   }
-  const uint64_t current_entries = sub_compact->builder->NumEntries();
-  if (s.ok()) {
-    s = sub_compact->builder->Finish();
-  } else {
-    sub_compact->builder->Abandon();
-  }
-  IOStatus io_s = sub_compact->builder->io_status();
-  if (s.ok()) {
-    s = io_s;
-  }
-  const uint64_t current_bytes = sub_compact->builder->FileSize();
-  if (s.ok()) {
-    meta->fd.file_size = current_bytes;
-    meta->marked_for_compaction = sub_compact->builder->NeedCompact();
-  }
-  sub_compact->current_output()->finished = true;
-  sub_compact->total_bytes += current_bytes;
 
   // Finish and check for file errors
-  if (s.ok()) {
-    StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
-    io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
-  }
-  if (s.ok() && io_s.ok()) {
-    io_s = sub_compact->outfile->Close();
-  }
+  IOStatus io_s = outputs.WriterSyncClose(s, db_options_.clock, stats_,
+                                          db_options_.use_fsync);
+
   if (s.ok() && io_s.ok()) {
-    // Add the checksum information to file metadata.
-    meta->file_checksum = sub_compact->outfile->GetFileChecksum();
-    meta->file_checksum_func_name =
-        sub_compact->outfile->GetFileChecksumFuncName();
     file_checksum = meta->file_checksum;
     file_checksum_func_name = meta->file_checksum_func_name;
   }
+
   if (s.ok()) {
     s = io_s;
   }
@@ -1486,11 +1550,10 @@ Status CompactionJob::FinishCompactionOutputFile(
     // "normal" status, it does not also need to be checked
     sub_compact->io_status.PermitUncheckedError();
   }
-  sub_compact->outfile.reset();
 
   TableProperties tp;
   if (s.ok()) {
-    tp = sub_compact->builder->GetTableProperties();
+    tp = outputs.GetTableProperties();
   }
 
   if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
@@ -1498,44 +1561,56 @@ Status CompactionJob::FinishCompactionOutputFile(
     // This happens when the output level is bottom level, at the same time
     // the sub_compact output nothing.
     std::string fname =
-        TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
+        TableFileName(sub_compact->compaction->immutable_options()->cf_paths,
                       meta->fd.GetNumber(), meta->fd.GetPathId());
-    env_->DeleteFile(fname);
+
+    // TODO(AR) it is not clear if there are any larger implications if
+    // DeleteFile fails here
+    Status ds = env_->DeleteFile(fname);
+    if (!ds.ok()) {
+      ROCKS_LOG_WARN(
+          db_options_.info_log,
+          "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
+          " at bottom level%s",
+          cfd->GetName().c_str(), job_id_, output_number,
+          meta->marked_for_compaction ? " (need compaction)" : "");
+    }
 
     // Also need to remove the file from outputs, or it will be added to the
     // VersionEdit.
-    assert(!sub_compact->outputs.empty());
-    sub_compact->outputs.pop_back();
+    outputs.RemoveLastOutput();
     meta = nullptr;
   }
 
   if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
     // Output to event logger and fire events.
-    sub_compact->current_output()->table_properties =
-        std::make_shared<TableProperties>(tp);
+    outputs.UpdateTableProperties();
     ROCKS_LOG_INFO(db_options_.info_log,
                    "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
-                   " keys, %" PRIu64 " bytes%s",
+                   " keys, %" PRIu64 " bytes%s, temperature: %s",
                    cfd->GetName().c_str(), job_id_, output_number,
-                   current_entries, current_bytes,
-                   meta->marked_for_compaction ? " (need compaction)" : "");
+                   current_entries, meta->fd.file_size,
+                   meta->marked_for_compaction ? " (need compaction)" : "",
+                   temperature_to_string[meta->temperature].c_str());
   }
   std::string fname;
   FileDescriptor output_fd;
   uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
+  Status status_for_listener = s;
   if (meta != nullptr) {
-    fname =
-        TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
-                      meta->fd.GetNumber(), meta->fd.GetPathId());
+    fname = GetTableFileName(meta->fd.GetNumber());
     output_fd = meta->fd;
     oldest_blob_file_number = meta->oldest_blob_file_number;
   } else {
     fname = "(nil)";
+    if (s.ok()) {
+      status_for_listener = Status::Aborted("Empty SST file not kept");
+    }
   }
   EventHelpers::LogAndNotifyTableFileCreationFinished(
       event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
       job_id_, output_fd, oldest_blob_file_number, tp,
-      TableFileCreationReason::kCompaction, s, file_checksum,
+      TableFileCreationReason::kCompaction, status_for_listener, file_checksum,
       file_checksum_func_name);
 
 #ifndef ROCKSDB_LITE
@@ -1552,18 +1627,14 @@ Status CompactionJob::FinishCompactionOutputFile(
       // compaction output file (similarly to how flush works when full)?
       s = Status::SpaceLimit("Max allowed space was reached");
       TEST_SYNC_POINT(
-          "CompactionJob::FinishCompactionOutputFile:"
-          "MaxAllowedSpaceReached");
+          "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
       InstrumentedMutexLock l(db_mutex_);
-      // Should handle return error?
-      db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction)
-          .PermitUncheckedError();
+      db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
     }
   }
 #endif
 
-  sub_compact->builder.reset();
-  sub_compact->current_output_file_size = 0;
+  outputs.ResetBuilder();
   return s;
 }
 
@@ -1576,26 +1647,25 @@ Status CompactionJob::InstallCompactionResults(
   auto* compaction = compact_->compaction;
   assert(compaction);
 
-  // paranoia: verify that the files that we started with
-  // still exist in the current version and in the same original level.
-  // This ensures that a concurrent compaction did not erroneously
-  // pick the same files to compact_.
-  if (!versions_->VerifyCompactionFileConsistency(compaction)) {
-    Compaction::InputLevelSummaryBuffer inputs_summary;
-
-    ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted",
-                    compaction->column_family_data()->GetName().c_str(),
-                    job_id_, compaction->InputLevelSummary(&inputs_summary));
-    return Status::Corruption("Compaction input files inconsistent");
-  }
-
   {
     Compaction::InputLevelSummaryBuffer inputs_summary;
-    ROCKS_LOG_INFO(db_options_.info_log,
-                   "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
-                   compaction->column_family_data()->GetName().c_str(), job_id_,
-                   compaction->InputLevelSummary(&inputs_summary),
-                   compact_->total_bytes + compact_->total_blob_bytes);
+    if (compaction_stats_.has_penultimate_level_output) {
+      ROCKS_LOG_BUFFER(
+          log_buffer_,
+          "[%s] [JOB %d] Compacted %s => output_to_penultimate_level: %" PRIu64
+          " bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes",
+          compaction->column_family_data()->GetName().c_str(), job_id_,
+          compaction->InputLevelSummary(&inputs_summary),
+          compaction_stats_.penultimate_level_stats.bytes_written,
+          compaction_stats_.stats.bytes_written,
+          compaction_stats_.TotalBytesWritten());
+    } else {
+      ROCKS_LOG_BUFFER(log_buffer_,
+                       "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
+                       compaction->column_family_data()->GetName().c_str(),
+                       job_id_, compaction->InputLevelSummary(&inputs_summary),
+                       compaction_stats_.TotalBytesWritten());
+    }
   }
 
   VersionEdit* const edit = compaction->edit();
@@ -1604,14 +1674,50 @@ Status CompactionJob::InstallCompactionResults(
   // Add compaction inputs
   compaction->AddInputDeletions(edit);
 
+  std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage;
+
   for (const auto& sub_compact : compact_->sub_compact_states) {
-    for (const auto& out : sub_compact.outputs) {
-      edit->AddFile(compaction->output_level(), out.meta);
-    }
+    sub_compact.AddOutputsEdit(edit);
 
-    for (const auto& blob : sub_compact.blob_file_additions) {
+    for (const auto& blob : sub_compact.Current().GetBlobFileAdditions()) {
       edit->AddBlobFile(blob);
     }
+
+    if (sub_compact.Current().GetBlobGarbageMeter()) {
+      const auto& flows = sub_compact.Current().GetBlobGarbageMeter()->flows();
+
+      for (const auto& pair : flows) {
+        const uint64_t blob_file_number = pair.first;
+        const BlobGarbageMeter::BlobInOutFlow& flow = pair.second;
+
+        assert(flow.IsValid());
+        if (flow.HasGarbage()) {
+          blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(),
+                                                   flow.GetGarbageBytes());
+        }
+      }
+    }
+  }
+
+  for (const auto& pair : blob_total_garbage) {
+    const uint64_t blob_file_number = pair.first;
+    const BlobGarbageMeter::BlobStats& stats = pair.second;
+
+    edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(),
+                             stats.GetBytes());
+  }
+
+  if ((compaction->compaction_reason() ==
+           CompactionReason::kLevelMaxLevelSize ||
+       compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) &&
+      compaction->immutable_options()->compaction_pri == kRoundRobin) {
+    int start_level = compaction->start_level();
+    if (start_level > 0) {
+      auto vstorage = compaction->input_version()->storage_info();
+      edit->AddCompactCursor(start_level,
+                             vstorage->GetNextCompactCursor(
+                                 start_level, compaction->num_input_files(0)));
+    }
   }
 
   return versions_->LogAndApply(compaction->column_family_data(),
@@ -1642,15 +1748,13 @@ void CompactionJob::RecordCompactionIOStats() {
   IOSTATS_RESET(bytes_written);
 }
 
-Status CompactionJob::OpenCompactionOutputFile(
-    SubcompactionState* sub_compact) {
+Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
+                                               CompactionOutputs& outputs) {
   assert(sub_compact != nullptr);
-  assert(sub_compact->builder == nullptr);
+
   // no need to lock because VersionSet::next_file_number_ is atomic
   uint64_t file_number = versions_->NewFileNumber();
-  std::string fname =
-      TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
-                    file_number, sub_compact->compaction->output_path_id());
+  std::string fname = GetTableFileName(file_number);
   // Fire events.
   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
 #ifndef ROCKSDB_LITE
@@ -1665,9 +1769,22 @@ Status CompactionJob::OpenCompactionOutputFile(
   TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
                            &syncpoint_arg);
 #endif
+
+  // Pass temperature of the last level files to FileSystem.
+  FileOptions fo_copy = file_options_;
+  Temperature temperature = sub_compact->compaction->output_temperature();
+  // only set for the last level compaction and also it's not output to
+  // penultimate level (when preclude_last_level feature is enabled)
+  if (temperature == Temperature::kUnknown &&
+      sub_compact->compaction->is_last_level() &&
+      !sub_compact->IsCurrentPenultimateLevel()) {
+    temperature =
+        sub_compact->compaction->mutable_cf_options()->last_level_temperature;
+  }
+  fo_copy.temperature = temperature;
+
   Status s;
-  IOStatus io_s =
-      NewWritableFile(fs_.get(), fname, &writable_file, file_options_);
+  IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
   s = io_s;
   if (sub_compact->io_status.ok()) {
     sub_compact->io_status = io_s;
@@ -1693,7 +1810,7 @@ Status CompactionJob::OpenCompactionOutputFile(
 
   // Try to figure out the output file's oldest ancester time.
   int64_t temp_current_time = 0;
-  auto get_time_status = env_->GetCurrentTime(&temp_current_time);
+  auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
   // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
   if (!get_time_status.ok()) {
     ROCKS_LOG_WARN(db_options_.info_log,
@@ -1701,9 +1818,18 @@ Status CompactionJob::OpenCompactionOutputFile(
                    get_time_status.ToString().c_str());
   }
   uint64_t current_time = static_cast<uint64_t>(temp_current_time);
+  InternalKey tmp_start, tmp_end;
+  if (sub_compact->start.has_value()) {
+    tmp_start.SetMinPossibleForUserKey(sub_compact->start.value());
+  }
+  if (sub_compact->end.has_value()) {
+    tmp_end.SetMinPossibleForUserKey(sub_compact->end.value());
+  }
   uint64_t oldest_ancester_time =
-      sub_compact->compaction->MinInputFileOldestAncesterTime();
-  if (oldest_ancester_time == port::kMaxUint64) {
+      sub_compact->compaction->MinInputFileOldestAncesterTime(
+          sub_compact->start.has_value() ? &tmp_start : nullptr,
+          sub_compact->end.has_value() ? &tmp_end : nullptr);
+  if (oldest_ancester_time == std::numeric_limits<uint64_t>::max()) {
     oldest_ancester_time = current_time;
   }
 
@@ -1714,67 +1840,57 @@ Status CompactionJob::OpenCompactionOutputFile(
                              sub_compact->compaction->output_path_id(), 0);
     meta.oldest_ancester_time = oldest_ancester_time;
     meta.file_creation_time = current_time;
-    sub_compact->outputs.emplace_back(
-        std::move(meta), cfd->internal_comparator(),
-        /*enable_order_check=*/
-        sub_compact->compaction->mutable_cf_options()
-            ->check_flush_compaction_key_order,
-        /*enable_hash=*/paranoid_file_checks_);
+    meta.temperature = temperature;
+    assert(!db_id_.empty());
+    assert(!db_session_id_.empty());
+    s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(),
+                               &meta.unique_id);
+    if (!s.ok()) {
+      ROCKS_LOG_ERROR(db_options_.info_log,
+                      "[%s] [JOB %d] file #%" PRIu64
+                      " failed to generate unique id: %s.",
+                      cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
+                      s.ToString().c_str());
+      return s;
+    }
+
+    outputs.AddOutput(std::move(meta), cfd->internal_comparator(),
+                      sub_compact->compaction->mutable_cf_options()
+                          ->check_flush_compaction_key_order,
+                      paranoid_file_checks_);
   }
 
-  writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
+  writable_file->SetIOPriority(GetRateLimiterPriority());
   writable_file->SetWriteLifeTimeHint(write_hint_);
+  FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
   writable_file->SetPreallocationBlockSize(static_cast<size_t>(
       sub_compact->compaction->OutputFilePreallocationSize()));
   const auto& listeners =
-      sub_compact->compaction->immutable_cf_options()->listeners;
-  sub_compact->outfile.reset(new WritableFileWriter(
-      std::move(writable_file), fname, file_options_, env_, io_tracer_,
-      db_options_.statistics.get(), listeners,
-      db_options_.file_checksum_gen_factory.get()));
-
-  // If the Column family flag is to only optimize filters for hits,
-  // we can skip creating filters if this is the bottommost_level where
-  // data is going to be found
-  bool skip_filters =
-      cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
-
-  sub_compact->builder.reset(NewTableBuilder(
+      sub_compact->compaction->immutable_options()->listeners;
+  outputs.AssignFileWriter(new WritableFileWriter(
+      std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_,
+      db_options_.stats, listeners, db_options_.file_checksum_gen_factory.get(),
+      tmp_set.Contains(FileType::kTableFile), false));
+
+  TableBuilderOptions tboptions(
       *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
       cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
-      cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
       sub_compact->compaction->output_compression(),
-      0 /*sample_for_compression */,
-      sub_compact->compaction->output_compression_opts(),
-      sub_compact->compaction->output_level(), skip_filters,
-      oldest_ancester_time, 0 /* oldest_key_time */,
-      sub_compact->compaction->max_output_file_size(), current_time, db_id_,
-      db_session_id_));
+      sub_compact->compaction->output_compression_opts(), cfd->GetID(),
+      cfd->GetName(), sub_compact->compaction->output_level(),
+      bottommost_level_, TableFileCreationReason::kCompaction,
+      0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
+      sub_compact->compaction->max_output_file_size(), file_number);
+
+  outputs.NewBuilder(tboptions);
+
   LogFlush(db_options_.info_log);
   return s;
 }
 
 void CompactionJob::CleanupCompaction() {
   for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
-    const auto& sub_status = sub_compact.status;
-
-    if (sub_compact.builder != nullptr) {
-      // May happen if we get a shutdown call in the middle of compaction
-      sub_compact.builder->Abandon();
-      sub_compact.builder.reset();
-    } else {
-      assert(!sub_status.ok() || sub_compact.outfile == nullptr);
-    }
-    for (const auto& out : sub_compact.outputs) {
-      // If this file was inserted into the table cache then remove
-      // them here because this compaction was not committed.
-      if (!sub_status.ok()) {
-        TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
-      }
-    }
-    // TODO: sub_compact.io_status is not checked like status. Not sure if thats
-    // intentional. So ignoring the io_status as of now.
-    sub_compact.io_status.PermitUncheckedError();
+    sub_compact.Cleanup(table_cache_.get());
   }
   delete compact_;
   compact_ = nullptr;
@@ -1795,32 +1911,28 @@ void CompactionJob::UpdateCompactionStats() {
   assert(compact_);
 
   Compaction* compaction = compact_->compaction;
-  compaction_stats_.num_input_files_in_non_output_levels = 0;
-  compaction_stats_.num_input_files_in_output_level = 0;
+  compaction_stats_.stats.num_input_files_in_non_output_levels = 0;
+  compaction_stats_.stats.num_input_files_in_output_level = 0;
   for (int input_level = 0;
        input_level < static_cast<int>(compaction->num_input_levels());
        ++input_level) {
     if (compaction->level(input_level) != compaction->output_level()) {
       UpdateCompactionInputStatsHelper(
-          &compaction_stats_.num_input_files_in_non_output_levels,
-          &compaction_stats_.bytes_read_non_output_levels, input_level);
+          &compaction_stats_.stats.num_input_files_in_non_output_levels,
+          &compaction_stats_.stats.bytes_read_non_output_levels, input_level);
     } else {
       UpdateCompactionInputStatsHelper(
-          &compaction_stats_.num_input_files_in_output_level,
-          &compaction_stats_.bytes_read_output_level, input_level);
+          &compaction_stats_.stats.num_input_files_in_output_level,
+          &compaction_stats_.stats.bytes_read_output_level, input_level);
     }
   }
 
-  compaction_stats_.num_output_files =
-      static_cast<int>(compact_->num_output_files) +
-      static_cast<int>(compact_->num_blob_output_files);
-  compaction_stats_.bytes_written =
-      compact_->total_bytes + compact_->total_blob_bytes;
+  assert(compaction_job_stats_);
+  compaction_stats_.stats.bytes_read_blob =
+      compaction_job_stats_->total_blob_bytes_read;
 
-  if (compaction_stats_.num_input_records > compact_->num_output_records) {
-    compaction_stats_.num_dropped_records =
-        compaction_stats_.num_input_records - compact_->num_output_records;
-  }
+  compaction_stats_.stats.num_dropped_records =
+      compaction_stats_.DroppedRecords();
 }
 
 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
@@ -1833,7 +1945,7 @@ void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
   for (size_t i = 0; i < num_input_files; ++i) {
     const auto* file_meta = compaction->input(input_level, i);
     *bytes_read += file_meta->fd.GetFileSize();
-    compaction_stats_.num_input_records +=
+    compaction_stats_.stats.num_input_records +=
         static_cast<uint64_t>(file_meta->num_entries);
   }
 }
@@ -1855,8 +1967,10 @@ void CompactionJob::UpdateCompactionJobStats(
 
   // output information
   compaction_job_stats_->total_output_bytes = stats.bytes_written;
-  compaction_job_stats_->num_output_records = compact_->num_output_records;
+  compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
+  compaction_job_stats_->num_output_records = stats.num_output_records;
   compaction_job_stats_->num_output_files = stats.num_output_files;
+  compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
 
   if (stats.num_output_files > 0) {
     CopyPrefix(compact_->SmallestUserKey(),
@@ -1884,7 +1998,7 @@ void CompactionJob::LogCompaction() {
         compaction->InputLevelSummary(&inputs_summary), compaction->score());
     char scratch[2345];
     compaction->Summary(scratch, sizeof(scratch));
-    ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
+    ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n",
                    cfd->GetName().c_str(), scratch);
     // build event logger report
     auto stream = event_logger_->Log();
@@ -1893,7 +2007,7 @@ void CompactionJob::LogCompaction() {
            << "compaction_reason"
            << GetCompactionReasonString(compaction->compaction_reason());
     for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
-      stream << ("files_L" + ToString(compaction->level(i)));
+      stream << ("files_L" + std::to_string(compaction->level(i)));
       stream.StartArray();
       for (auto f : *compaction->inputs(i)) {
         stream << f->fd.GetNumber();
@@ -1901,8 +2015,46 @@ void CompactionJob::LogCompaction() {
       stream.EndArray();
     }
     stream << "score" << compaction->score() << "input_data_size"
-           << compaction->CalculateTotalInputSize();
+           << compaction->CalculateTotalInputSize() << "oldest_snapshot_seqno"
+           << (existing_snapshots_.empty()
+                   ? int64_t{-1}  // Use -1 for "none"
+                   : static_cast<int64_t>(existing_snapshots_[0]));
+    if (compaction->SupportsPerKeyPlacement()) {
+      stream << "preclude_last_level_min_seqno"
+             << preclude_last_level_min_seqno_;
+      stream << "penultimate_output_level" << compaction->GetPenultimateLevel();
+      stream << "penultimate_output_range"
+             << GetCompactionPenultimateOutputRangeTypeString(
+                    compaction->GetPenultimateOutputRangeType());
+
+      if (compaction->GetPenultimateOutputRangeType() ==
+          Compaction::PenultimateOutputRangeType::kDisabled) {
+        ROCKS_LOG_WARN(
+            db_options_.info_log,
+            "[%s] [JOB %d] Penultimate level output is disabled, likely "
+            "because of the range conflict in the penultimate level",
+            cfd->GetName().c_str(), job_id_);
+      }
+    }
+  }
+}
+
+std::string CompactionJob::GetTableFileName(uint64_t file_number) {
+  return TableFileName(compact_->compaction->immutable_options()->cf_paths,
+                       file_number, compact_->compaction->output_path_id());
+}
+
+Env::IOPriority CompactionJob::GetRateLimiterPriority() {
+  if (versions_ && versions_->GetColumnFamilySet() &&
+      versions_->GetColumnFamilySet()->write_controller()) {
+    WriteController* write_controller =
+        versions_->GetColumnFamilySet()->write_controller();
+    if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
+      return Env::IO_USER;
+    }
   }
+
+  return Env::IO_LOW;
 }
 
 }  // namespace ROCKSDB_NAMESPACE