]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction_job.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / compaction_job.cc
index 0f0356002a45dfca94c9df7a1afc3b4db3e01097..65e9719a396dcc4a1b39061601db7057e20c2009 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 #include <vector>
 
 #include "db/builder.h"
+#include "db/db_impl.h"
 #include "db/db_iter.h"
 #include "db/dbformat.h"
+#include "db/error_handler.h"
 #include "db/event_helpers.h"
 #include "db/log_reader.h"
 #include "db/log_writer.h"
 #include "db/memtable_list.h"
 #include "db/merge_context.h"
 #include "db/merge_helper.h"
+#include "db/range_del_aggregator.h"
 #include "db/version_set.h"
 #include "monitoring/iostats_context_imp.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/thread_status_util.h"
-#include "port/likely.h"
 #include "port/port.h"
 #include "rocksdb/db.h"
 #include "rocksdb/env.h"
 
 namespace rocksdb {
 
+const char* GetCompactionReasonString(CompactionReason compaction_reason) {
+  switch (compaction_reason) {
+    case CompactionReason::kUnknown:
+      return "Unknown";
+    case CompactionReason::kLevelL0FilesNum:
+      return "LevelL0FilesNum";
+    case CompactionReason::kLevelMaxLevelSize:
+      return "LevelMaxLevelSize";
+    case CompactionReason::kUniversalSizeAmplification:
+      return "UniversalSizeAmplification";
+    case CompactionReason::kUniversalSizeRatio:
+      return "UniversalSizeRatio";
+    case CompactionReason::kUniversalSortedRunNum:
+      return "UniversalSortedRunNum";
+    case CompactionReason::kFIFOMaxSize:
+      return "FIFOMaxSize";
+    case CompactionReason::kFIFOReduceNumFiles:
+      return "FIFOReduceNumFiles";
+    case CompactionReason::kFIFOTtl:
+      return "FIFOTtl";
+    case CompactionReason::kManualCompaction:
+      return "ManualCompaction";
+    case CompactionReason::kFilesMarkedForCompaction:
+      return "FilesMarkedForCompaction";
+    case CompactionReason::kBottommostFiles:
+      return "BottommostFiles";
+    case CompactionReason::kTtl:
+      return "Ttl";
+    case CompactionReason::kFlush:
+      return "Flush";
+    case CompactionReason::kExternalSstIngestion:
+      return "ExternalSstIngestion";
+    case CompactionReason::kNumOfReasons:
+      // fall through
+    default:
+      assert(false);
+      return "Invalid";
+  }
+}
+
 // Maintains state for each sub-compaction
 struct CompactionJob::SubcompactionState {
   const Compaction* compaction;
@@ -115,7 +157,6 @@ struct CompactionJob::SubcompactionState {
   uint64_t overlapped_bytes = 0;
   // A flag determine whether the key has been seen in ShouldStopBefore()
   bool seen_key = false;
-  std::string compression_dict;
 
   SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
                      uint64_t size = 0)
@@ -131,8 +172,7 @@ struct CompactionJob::SubcompactionState {
         approx_size(size),
         grandparent_index(0),
         overlapped_bytes(0),
-        seen_key(false),
-        compression_dict() {
+        seen_key(false) {
     assert(compaction != nullptr);
   }
 
@@ -155,11 +195,10 @@ struct CompactionJob::SubcompactionState {
     grandparent_index = std::move(o.grandparent_index);
     overlapped_bytes = std::move(o.overlapped_bytes);
     seen_key = std::move(o.seen_key);
-    compression_dict = std::move(o.compression_dict);
     return *this;
   }
 
-  // Because member unique_ptrs do not have these.
+  // Because member std::unique_ptrs do not have these.
   SubcompactionState(const SubcompactionState&) = delete;
 
   SubcompactionState& operator=(const SubcompactionState&) = delete;
@@ -264,37 +303,46 @@ void CompactionJob::AggregateStatistics() {
 
 CompactionJob::CompactionJob(
     int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
-    const EnvOptions& env_options, VersionSet* versions,
-    const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
+    const EnvOptions env_options, VersionSet* versions,
+    const std::atomic<bool>* shutting_down,
+    const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
     Directory* db_directory, Directory* output_directory, Statistics* stats,
-    InstrumentedMutex* db_mutex, Status* db_bg_error,
+    InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
     std::vector<SequenceNumber> existing_snapshots,
     SequenceNumber earliest_write_conflict_snapshot,
-    std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
-    bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
-    CompactionJobStats* compaction_job_stats)
+    const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
+    EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
+    const std::string& dbname, CompactionJobStats* compaction_job_stats,
+    Env::Priority thread_pri)
     : job_id_(job_id),
       compact_(new CompactionState(compaction)),
       compaction_job_stats_(compaction_job_stats),
-      compaction_stats_(1),
+      compaction_stats_(compaction->compaction_reason(), 1),
       dbname_(dbname),
       db_options_(db_options),
       env_options_(env_options),
       env_(db_options.env),
+      env_optiosn_for_read_(
+          env_->OptimizeForCompactionTableRead(env_options, db_options_)),
       versions_(versions),
       shutting_down_(shutting_down),
+      preserve_deletes_seqnum_(preserve_deletes_seqnum),
       log_buffer_(log_buffer),
       db_directory_(db_directory),
       output_directory_(output_directory),
       stats_(stats),
       db_mutex_(db_mutex),
-      db_bg_error_(db_bg_error),
+      db_error_handler_(db_error_handler),
       existing_snapshots_(std::move(existing_snapshots)),
       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
+      snapshot_checker_(snapshot_checker),
       table_cache_(std::move(table_cache)),
       event_logger_(event_logger),
+      bottommost_level_(false),
       paranoid_file_checks_(paranoid_file_checks),
-      measure_io_stats_(measure_io_stats) {
+      measure_io_stats_(measure_io_stats),
+      write_hint_(Env::WLTH_NOT_SET),
+      thread_pri_(thread_pri) {
   assert(log_buffer_ != nullptr);
   const auto* cfd = compact_->compaction->column_family_data();
   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
@@ -308,15 +356,13 @@ CompactionJob::~CompactionJob() {
   ThreadStatusUtil::ResetThreadStatus();
 }
 
-void CompactionJob::ReportStartedCompaction(
-    Compaction* compaction) {
+void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
   const auto* cfd = compact_->compaction->column_family_data();
   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
                                     db_options_.enable_thread_tracking);
 
-  ThreadStatusUtil::SetThreadOperationProperty(
-      ThreadStatus::COMPACTION_JOB_ID,
-      job_id_);
+  ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
+                                               job_id_);
 
   ThreadStatusUtil::SetThreadOperationProperty(
       ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
@@ -346,8 +392,7 @@ void CompactionJob::ReportStartedCompaction(
 
   // Set the thread operation after operation properties
   // to ensure GetThreadList() can always show them all together.
-  ThreadStatusUtil::SetThreadOperation(
-      ThreadStatus::OP_COMPACTION);
+  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
 
   if (compaction_job_stats_) {
     compaction_job_stats_->is_manual_compaction =
@@ -362,18 +407,19 @@ void CompactionJob::Prepare() {
   // Generate file_levels_ for compaction berfore making Iterator
   auto* c = compact_->compaction;
   assert(c->column_family_data() != nullptr);
-  assert(c->column_family_data()->current()->storage_info()
-      ->NumLevelFiles(compact_->compaction->level()) > 0);
+  assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
+             compact_->compaction->level()) > 0);
 
+  write_hint_ =
+      c->column_family_data()->CalculateSSTWriteHint(c->output_level());
   // Is this compaction producing files at the bottommost level?
   bottommost_level_ = c->bottommost_level();
 
   if (c->ShouldFormSubcompactions()) {
-    const uint64_t start_micros = env_->NowMicros();
-    GenSubcompactionBoundaries();
-    MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME,
-                env_->NowMicros() - start_micros);
-
+    {
+      StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
+      GenSubcompactionBoundaries();
+    }
     assert(sizes_.size() == boundaries_.size() + 1);
 
     for (size_t i = 0; i <= boundaries_.size(); i++) {
@@ -381,8 +427,8 @@ void CompactionJob::Prepare() {
       Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
       compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
     }
-    MeasureTime(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
-                compact_->sub_compact_states.size());
+    RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
+                      compact_->sub_compact_states.size());
   } else {
     compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
   }
@@ -447,20 +493,27 @@ void CompactionJob::GenSubcompactionBoundaries() {
   }
 
   std::sort(bounds.begin(), bounds.end(),
-    [cfd_comparator] (const Slice& a, const Slice& b) -> bool {
-      return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) < 0;
-    });
+            [cfd_comparator](const Slice& a, const Slice& b) -> bool {
+              return cfd_comparator->Compare(ExtractUserKey(a),
+                                             ExtractUserKey(b)) < 0;
+            });
   // Remove duplicated entries from bounds
-  bounds.erase(std::unique(bounds.begin(), bounds.end(),
-    [cfd_comparator] (const Slice& a, const Slice& b) -> bool {
-      return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) == 0;
-    }), bounds.end());
+  bounds.erase(
+      std::unique(bounds.begin(), bounds.end(),
+                  [cfd_comparator](const Slice& a, const Slice& b) -> bool {
+                    return cfd_comparator->Compare(ExtractUserKey(a),
+                                                   ExtractUserKey(b)) == 0;
+                  }),
+      bounds.end());
 
   // Combine consecutive pairs of boundaries into ranges with an approximate
   // size of data covered by keys in that range
   uint64_t sum = 0;
   std::vector<RangeWithSize> ranges;
-  auto* v = cfd->current();
+  // Get input version from CompactionState since it's already referenced
+  // earlier in SetInputVersioCompaction::SetInputVersion and will not change
+  // when db_mutex_ is released below
+  auto* v = compact_->compaction->input_version();
   for (auto it = bounds.begin();;) {
     const Slice a = *it;
     it++;
@@ -470,19 +523,28 @@ void CompactionJob::GenSubcompactionBoundaries() {
     }
 
     const Slice b = *it;
+
+    // ApproximateSize could potentially create table reader iterator to seek
+    // to the index block and may incur I/O cost in the process. Unlock db
+    // mutex to reduce contention
+    db_mutex_->Unlock();
     uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1);
+    db_mutex_->Lock();
     ranges.emplace_back(a, b, size);
     sum += size;
   }
 
   // Group the ranges into subcompactions
   const double min_file_fill_percent = 4.0 / 5;
-  uint64_t max_output_files = static_cast<uint64_t>(
-      std::ceil(sum / min_file_fill_percent /
-                c->mutable_cf_options()->MaxFileSizeForLevel(out_lvl)));
+  int base_level = v->storage_info()->base_level();
+  uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
+      sum / min_file_fill_percent /
+      MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
+          c->immutable_cf_options()->compaction_style, base_level,
+          c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
   uint64_t subcompactions =
       std::min({static_cast<uint64_t>(ranges.size()),
-                static_cast<uint64_t>(db_options_.max_subcompactions),
+                static_cast<uint64_t>(c->max_subcompactions()),
                 max_output_files});
 
   if (subcompactions > 1) {
@@ -539,12 +601,18 @@ Status CompactionJob::Run() {
     thread.join();
   }
 
-  if (output_directory_) {
-    output_directory_->Fsync();
+  compaction_stats_.micros = env_->NowMicros() - start_micros;
+  compaction_stats_.cpu_micros = 0;
+  for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
+    compaction_stats_.cpu_micros +=
+        compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
   }
 
-  compaction_stats_.micros = env_->NowMicros() - start_micros;
-  MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
+  RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
+  RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
+                        compaction_stats_.cpu_micros);
+
+  TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
 
   // Check if any thread encountered an error during execution
   Status status;
@@ -555,11 +623,79 @@ Status CompactionJob::Run() {
     }
   }
 
+  if (status.ok() && output_directory_) {
+    status = output_directory_->Fsync();
+  }
+
+  if (status.ok()) {
+    thread_pool.clear();
+    std::vector<const FileMetaData*> files_meta;
+    for (const auto& state : compact_->sub_compact_states) {
+      for (const auto& output : state.outputs) {
+        files_meta.emplace_back(&output.meta);
+      }
+    }
+    ColumnFamilyData* cfd = compact_->compaction->column_family_data();
+    auto prefix_extractor =
+        compact_->compaction->mutable_cf_options()->prefix_extractor.get();
+    std::atomic<size_t> next_file_meta_idx(0);
+    auto verify_table = [&](Status& output_status) {
+      while (true) {
+        size_t file_idx = next_file_meta_idx.fetch_add(1);
+        if (file_idx >= files_meta.size()) {
+          break;
+        }
+        // Verify that the table is usable
+        // We set for_compaction to false and don't OptimizeForCompactionTableRead
+        // here because this is a special case after we finish the table building
+        // No matter whether use_direct_io_for_flush_and_compaction is true,
+        // we will regard this verification as user reads since the goal is
+        // to cache it here for further user reads
+        InternalIterator* iter = cfd->table_cache()->NewIterator(
+            ReadOptions(), env_options_, cfd->internal_comparator(),
+            *files_meta[file_idx], nullptr /* range_del_agg */,
+            prefix_extractor, nullptr,
+            cfd->internal_stats()->GetFileReadHist(
+                compact_->compaction->output_level()),
+            false, nullptr /* arena */, false /* skip_filters */,
+            compact_->compaction->output_level());
+        auto s = iter->status();
+
+        if (s.ok() && paranoid_file_checks_) {
+          for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
+          s = iter->status();
+        }
+
+        delete iter;
+
+        if (!s.ok()) {
+          output_status = s;
+          break;
+        }
+      }
+    };
+    for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
+      thread_pool.emplace_back(verify_table,
+                               std::ref(compact_->sub_compact_states[i].status));
+    }
+    verify_table(compact_->sub_compact_states[0].status);
+    for (auto& thread : thread_pool) {
+      thread.join();
+    }
+    for (const auto& state : compact_->sub_compact_states) {
+      if (!state.status.ok()) {
+        status = state.status;
+        break;
+      }
+    }
+  }
+
   TablePropertiesCollection tp;
   for (const auto& state : compact_->sub_compact_states) {
     for (const auto& output : state.outputs) {
-      auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(),
-                              output.meta.fd.GetPathId());
+      auto fn =
+          TableFileName(state.compaction->immutable_cf_options()->cf_paths,
+                        output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
       tp[fn] = output.table_properties;
     }
   }
@@ -583,7 +719,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
   Status status = compact_->status;
   ColumnFamilyData* cfd = compact_->compaction->column_family_data();
   cfd->internal_stats()->AddCompactionStats(
-      compact_->compaction->output_level(), compaction_stats_);
+      compact_->compaction->output_level(), thread_pri_, compaction_stats_);
 
   if (status.ok()) {
     status = InstallCompactionResults(mutable_cf_options);
@@ -594,6 +730,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
 
   double read_write_amp = 0.0;
   double write_amp = 0.0;
+  double bytes_read_per_sec = 0;
+  double bytes_written_per_sec = 0;
+
   if (stats.bytes_read_non_output_levels > 0) {
     read_write_amp = (stats.bytes_written + stats.bytes_read_output_level +
                       stats.bytes_read_non_output_levels) /
@@ -601,37 +740,47 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
     write_amp = stats.bytes_written /
                 static_cast<double>(stats.bytes_read_non_output_levels);
   }
+  if (stats.micros > 0) {
+    bytes_read_per_sec =
+        (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
+        static_cast<double>(stats.micros);
+    bytes_written_per_sec =
+        stats.bytes_written / static_cast<double>(stats.micros);
+  }
+
   ROCKS_LOG_BUFFER(
       log_buffer_,
       "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
       "files in(%d, %d) out(%d) "
       "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
-      "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
-      cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
-      (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
-          static_cast<double>(stats.micros),
-      stats.bytes_written / static_cast<double>(stats.micros),
-      compact_->compaction->output_level(),
+      "write-amplify(%.1f) %s, records in: %" PRIu64
+      ", records dropped: %" PRIu64 " output_compression: %s\n",
+      cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec,
+      bytes_written_per_sec, compact_->compaction->output_level(),
       stats.num_input_files_in_non_output_levels,
       stats.num_input_files_in_output_level, stats.num_output_files,
       stats.bytes_read_non_output_levels / 1048576.0,
       stats.bytes_read_output_level / 1048576.0,
       stats.bytes_written / 1048576.0, read_write_amp, write_amp,
       status.ToString().c_str(), stats.num_input_records,
-      stats.num_dropped_records);
+      stats.num_dropped_records,
+      CompressionTypeToString(compact_->compaction->output_compression())
+          .c_str());
 
   UpdateCompactionJobStats(stats);
 
   auto stream = event_logger_->LogToBuffer(log_buffer_);
-  stream << "job" << job_id_
-         << "event" << "compaction_finished"
+  stream << "job" << job_id_ << "event"
+         << "compaction_finished"
          << "compaction_time_micros" << compaction_stats_.micros
+         << "compaction_time_cpu_micros" << compaction_stats_.cpu_micros
          << "output_level" << compact_->compaction->output_level()
          << "num_output_files" << compact_->NumOutputFiles()
-         << "total_output_size" << compact_->total_bytes
-         << "num_input_records" << compact_->num_input_records
-         << "num_output_records" << compact_->num_output_records
-         << "num_subcompactions" << compact_->sub_compact_states.size();
+         << "total_output_size" << compact_->total_bytes << "num_input_records"
+         << compact_->num_input_records << "num_output_records"
+         << compact_->num_output_records << "num_subcompactions"
+         << compact_->sub_compact_states.size() << "output_compression"
+         << CompressionTypeToString(compact_->compaction->output_compression());
 
   if (compaction_job_stats_ != nullptr) {
     stream << "num_single_delete_mismatches"
@@ -662,11 +811,35 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
 
 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   assert(sub_compact != nullptr);
+
+  uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
+
   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
-  std::unique_ptr<RangeDelAggregator> range_del_agg(
-      new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_));
+
+  // Create compaction filter and fail the compaction if
+  // IgnoreSnapshots() = false because it is not supported anymore
+  const CompactionFilter* compaction_filter =
+      cfd->ioptions()->compaction_filter;
+  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
+  if (compaction_filter == nullptr) {
+    compaction_filter_from_factory =
+        sub_compact->compaction->CreateCompactionFilter();
+    compaction_filter = compaction_filter_from_factory.get();
+  }
+  if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
+    sub_compact->status = Status::NotSupported(
+        "CompactionFilter::IgnoreSnapshots() = false is not supported "
+        "anymore.");
+    return;
+  }
+
+  CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
+                                             existing_snapshots_);
+
+  // Although the v2 aggregator is what the level iterator(s) know about,
+  // the AddTombstones calls will be propagated down to the v1 aggregator.
   std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
-      sub_compact->compaction, range_del_agg.get()));
+      sub_compact->compaction, &range_del_agg, env_optiosn_for_read_));
 
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
@@ -678,54 +851,26 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   uint64_t prev_fsync_nanos = 0;
   uint64_t prev_range_sync_nanos = 0;
   uint64_t prev_prepare_write_nanos = 0;
+  uint64_t prev_cpu_write_nanos = 0;
+  uint64_t prev_cpu_read_nanos = 0;
   if (measure_io_stats_) {
     prev_perf_level = GetPerfLevel();
-    SetPerfLevel(PerfLevel::kEnableTime);
+    SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
     prev_write_nanos = IOSTATS(write_nanos);
     prev_fsync_nanos = IOSTATS(fsync_nanos);
     prev_range_sync_nanos = IOSTATS(range_sync_nanos);
     prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
+    prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
+    prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
   }
 
-  const MutableCFOptions* mutable_cf_options =
-      sub_compact->compaction->mutable_cf_options();
-
-  // To build compression dictionary, we sample the first output file, assuming
-  // it'll reach the maximum length, and then use the dictionary for compressing
-  // subsequent output files. The dictionary may be less than max_dict_bytes if
-  // the first output file's length is less than the maximum.
-  const int kSampleLenShift = 6;  // 2^6 = 64-byte samples
-  std::set<size_t> sample_begin_offsets;
-  if (bottommost_level_ &&
-      cfd->ioptions()->compression_opts.max_dict_bytes > 0) {
-    const size_t kMaxSamples =
-        cfd->ioptions()->compression_opts.max_dict_bytes >> kSampleLenShift;
-    const size_t kOutFileLen = mutable_cf_options->MaxFileSizeForLevel(
-        compact_->compaction->output_level());
-    if (kOutFileLen != port::kMaxSizet) {
-      const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift;
-      Random64 generator{versions_->NewFileNumber()};
-      for (size_t i = 0; i < kMaxSamples; ++i) {
-        sample_begin_offsets.insert(generator.Uniform(kOutFileNumSamples)
-                                    << kSampleLenShift);
-      }
-    }
-  }
-
-  auto compaction_filter = cfd->ioptions()->compaction_filter;
-  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
-  if (compaction_filter == nullptr) {
-    compaction_filter_from_factory =
-        sub_compact->compaction->CreateCompactionFilter();
-    compaction_filter = compaction_filter_from_factory.get();
-  }
   MergeHelper merge(
       env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
       compaction_filter, db_options_.info_log.get(),
       false /* internal key corruption is expected */,
       existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
-      compact_->compaction->level(), db_options_.statistics.get(),
-      shutting_down_);
+      snapshot_checker_, compact_->compaction->level(),
+      db_options_.statistics.get(), shutting_down_);
 
   TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
 
@@ -739,40 +884,23 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     input->SeekToFirst();
   }
 
-  // we allow only 1 compaction event listener. Used by blob storage
-  CompactionEventListener* comp_event_listener = nullptr;
-#ifndef ROCKSDB_LITE
-  for (auto& celitr : cfd->ioptions()->listeners) {
-    comp_event_listener = celitr->GetCompactionEventListener();
-    if (comp_event_listener != nullptr) {
-      break;
-    }
-  }
-#endif  // ROCKSDB_LITE
-
   Status status;
   sub_compact->c_iter.reset(new CompactionIterator(
       input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
-      &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
-      range_del_agg.get(), sub_compact->compaction, compaction_filter,
-      comp_event_listener, shutting_down_));
+      &existing_snapshots_, earliest_write_conflict_snapshot_,
+      snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
+      &range_del_agg, sub_compact->compaction, compaction_filter,
+      shutting_down_, preserve_deletes_seqnum_));
   auto c_iter = sub_compact->c_iter.get();
   c_iter->SeekToFirst();
-  if (c_iter->Valid() &&
-      sub_compact->compaction->output_level() != 0) {
+  if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
     // ShouldStopBefore() maintains state based on keys processed so far. The
     // compaction loop always calls it on the "next" key, thus won't tell it the
     // first key. So we do that here.
-    sub_compact->ShouldStopBefore(
-      c_iter->key(), sub_compact->current_output_file_size);
+    sub_compact->ShouldStopBefore(c_iter->key(),
+                                  sub_compact->current_output_file_size);
   }
   const auto& c_iter_stats = c_iter->iter_stats();
-  auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
-  // data_begin_offset and compression_dict are only valid while generating
-  // dictionary from the first output file.
-  size_t data_begin_offset = 0;
-  std::string compression_dict;
-  compression_dict.reserve(cfd->ioptions()->compression_opts.max_dict_bytes);
 
   while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
     // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
@@ -808,55 +936,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
         key, c_iter->ikey().sequence);
     sub_compact->num_output_records++;
 
-    if (sub_compact->outputs.size() == 1) {  // first output file
-      // Check if this key/value overlaps any sample intervals; if so, appends
-      // overlapping portions to the dictionary.
-      for (const auto& data_elmt : {key, value}) {
-        size_t data_end_offset = data_begin_offset + data_elmt.size();
-        while (sample_begin_offset_iter != sample_begin_offsets.cend() &&
-               *sample_begin_offset_iter < data_end_offset) {
-          size_t sample_end_offset =
-              *sample_begin_offset_iter + (1 << kSampleLenShift);
-          // Invariant: Because we advance sample iterator while processing the
-          // data_elmt containing the sample's last byte, the current sample
-          // cannot end before the current data_elmt.
-          assert(data_begin_offset < sample_end_offset);
-
-          size_t data_elmt_copy_offset, data_elmt_copy_len;
-          if (*sample_begin_offset_iter <= data_begin_offset) {
-            // The sample starts before data_elmt starts, so take bytes starting
-            // at the beginning of data_elmt.
-            data_elmt_copy_offset = 0;
-          } else {
-            // data_elmt starts before the sample starts, so take bytes starting
-            // at the below offset into data_elmt.
-            data_elmt_copy_offset =
-                *sample_begin_offset_iter - data_begin_offset;
-          }
-          if (sample_end_offset <= data_end_offset) {
-            // The sample ends before data_elmt ends, so take as many bytes as
-            // needed.
-            data_elmt_copy_len =
-                sample_end_offset - (data_begin_offset + data_elmt_copy_offset);
-          } else {
-            // data_elmt ends before the sample ends, so take all remaining
-            // bytes in data_elmt.
-            data_elmt_copy_len =
-                data_end_offset - (data_begin_offset + data_elmt_copy_offset);
-          }
-          compression_dict.append(&data_elmt.data()[data_elmt_copy_offset],
-                                  data_elmt_copy_len);
-          if (sample_end_offset > data_end_offset) {
-            // Didn't finish sample. Try to finish it with the next data_elmt.
-            break;
-          }
-          // Next sample may require bytes from same data_elmt.
-          sample_begin_offset_iter++;
-        }
-        data_begin_offset = data_end_offset;
-      }
-    }
-
     // Close output file if it is big enough. Two possibilities determine it's
     // time to close it: (1) the current key should be this file's last key, (2)
     // the next key should not be in this file.
@@ -878,8 +957,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     c_iter->Next();
     if (!output_file_ended && c_iter->Valid() &&
         sub_compact->compaction->output_level() != 0 &&
-        sub_compact->ShouldStopBefore(
-          c_iter->key(), sub_compact->current_output_file_size) &&
+        sub_compact->ShouldStopBefore(c_iter->key(),
+                                      sub_compact->current_output_file_size) &&
         sub_compact->builder != nullptr) {
       // (2) this key belongs to the next file. For historical reasons, the
       // iterator status after advancing will be given to
@@ -893,16 +972,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
         next_key = &c_iter->key();
       }
       CompactionIterationStats range_del_out_stats;
-      status = FinishCompactionOutputFile(input_status, sub_compact,
-                                          range_del_agg.get(),
-                                          &range_del_out_stats, next_key);
+      status =
+          FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
+                                     &range_del_out_stats, next_key);
       RecordDroppedKeys(range_del_out_stats,
                         &sub_compact->compaction_job_stats);
-      if (sub_compact->outputs.size() == 1) {
-        // Use dictionary from first output file for compression of subsequent
-        // files.
-        sub_compact->compression_dict = std::move(compression_dict);
-      }
     }
   }
 
@@ -925,8 +999,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
   RecordCompactionIOStats();
 
-  if (status.ok() && (shutting_down_->load(std::memory_order_relaxed) ||
-                      cfd->IsDropped())) {
+  if (status.ok() &&
+      (shutting_down_->load(std::memory_order_relaxed) || cfd->IsDropped())) {
     status = Status::ShutdownInProgress(
         "Database shutdown or Column family drop during compaction");
   }
@@ -938,8 +1012,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   }
 
   if (status.ok() && sub_compact->builder == nullptr &&
-      sub_compact->outputs.size() == 0 &&
-      range_del_agg->ShouldAddTombstones(bottommost_level_)) {
+      sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
     // handle subcompaction containing only range deletions
     status = OpenCompactionOutputFile(sub_compact);
   }
@@ -948,14 +1021,17 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   // close the output file.
   if (sub_compact->builder != nullptr) {
     CompactionIterationStats range_del_out_stats;
-    Status s = FinishCompactionOutputFile(
-        status, sub_compact, range_del_agg.get(), &range_del_out_stats);
+    Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
+                                          &range_del_out_stats);
     if (status.ok()) {
       status = s;
     }
     RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
   }
 
+  sub_compact->compaction_job_stats.cpu_micros =
+      env_->NowCPUNanos() / 1000 - prev_cpu_micros;
+
   if (measure_io_stats_) {
     sub_compact->compaction_job_stats.file_write_nanos +=
         IOSTATS(write_nanos) - prev_write_nanos;
@@ -965,7 +1041,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
         IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
     sub_compact->compaction_job_stats.file_prepare_write_nanos +=
         IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
-    if (prev_perf_level != PerfLevel::kEnableTime) {
+    sub_compact->compaction_job_stats.cpu_micros -=
+        (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
+         IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
+        1000;
+    if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
       SetPerfLevel(prev_perf_level);
     }
   }
@@ -1006,11 +1086,15 @@ void CompactionJob::RecordDroppedKeys(
     RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
                c_iter_stats.num_range_del_drop_obsolete);
   }
+  if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
+    RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
+               c_iter_stats.num_optimized_del_drop_obsolete);
+  }
 }
 
 Status CompactionJob::FinishCompactionOutputFile(
     const Status& input_status, SubcompactionState* sub_compact,
-    RangeDelAggregator* range_del_agg,
+    CompactionRangeDelAggregator* range_del_agg,
     CompactionIterationStats* range_del_out_stats,
     const Slice* next_table_min_key /* = nullptr */) {
   AutoThreadOperationStageUpdater stage_updater(
@@ -1023,49 +1107,185 @@ Status CompactionJob::FinishCompactionOutputFile(
   uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
   assert(output_number != 0);
 
-  TableProperties table_properties;
+  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
+  const Comparator* ucmp = cfd->user_comparator();
+
   // Check for iterator errors
   Status s = input_status;
   auto meta = &sub_compact->current_output()->meta;
+  assert(meta != nullptr);
   if (s.ok()) {
     Slice lower_bound_guard, upper_bound_guard;
+    std::string smallest_user_key;
     const Slice *lower_bound, *upper_bound;
+    bool lower_bound_from_sub_compact = false;
     if (sub_compact->outputs.size() == 1) {
       // For the first output table, include range tombstones before the min key
       // but after the subcompaction boundary.
       lower_bound = sub_compact->start;
+      lower_bound_from_sub_compact = true;
     } else if (meta->smallest.size() > 0) {
       // For subsequent output tables, only include range tombstones from min
       // key onwards since the previous file was extended to contain range
       // tombstones falling before min key.
-      lower_bound_guard = meta->smallest.user_key();
+      smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
+      lower_bound_guard = Slice(smallest_user_key);
       lower_bound = &lower_bound_guard;
     } else {
       lower_bound = nullptr;
     }
     if (next_table_min_key != nullptr) {
-      // This isn't the last file in the subcompaction, so extend until the next
-      // file starts.
+      // This may be the last file in the subcompaction in some cases, so we
+      // need to compare the end key of subcompaction with the next file start
+      // key. When the end key is chosen by the subcompaction, we know that
+      // it must be the biggest key in output file. Therefore, it is safe to
+      // use the smaller key as the upper bound of the output file, to ensure
+      // that there is no overlapping between different output files.
       upper_bound_guard = ExtractUserKey(*next_table_min_key);
-      upper_bound = &upper_bound_guard;
+      if (sub_compact->end != nullptr &&
+          ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
+        upper_bound = sub_compact->end;
+      } else {
+        upper_bound = &upper_bound_guard;
+      }
     } else {
       // This is the last file in the subcompaction, so extend until the
       // subcompaction ends.
       upper_bound = sub_compact->end;
     }
-    range_del_agg->AddToBuilder(sub_compact->builder.get(), lower_bound,
-                                upper_bound, meta, range_del_out_stats,
-                                bottommost_level_);
+    auto earliest_snapshot = kMaxSequenceNumber;
+    if (existing_snapshots_.size() > 0) {
+      earliest_snapshot = existing_snapshots_[0];
+    }
+    bool has_overlapping_endpoints;
+    if (upper_bound != nullptr && meta->largest.size() > 0) {
+      has_overlapping_endpoints =
+          ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
+    } else {
+      has_overlapping_endpoints = false;
+    }
+
+    // The end key of the subcompaction must be bigger or equal to the upper
+    // bound. If the end of subcompaction is null or the upper bound is null,
+    // it means that this file is the last file in the compaction. So there
+    // will be no overlapping between this file and others.
+    assert(sub_compact->end == nullptr ||
+           upper_bound == nullptr ||
+           ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
+    auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
+                                         has_overlapping_endpoints);
+    // Position the range tombstone output iterator. There may be tombstone
+    // fragments that are entirely out of range, so make sure that we do not
+    // include those.
+    if (lower_bound != nullptr) {
+      it->Seek(*lower_bound);
+    } else {
+      it->SeekToFirst();
+    }
+    for (; it->Valid(); it->Next()) {
+      auto tombstone = it->Tombstone();
+      if (upper_bound != nullptr) {
+        int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
+        if ((has_overlapping_endpoints && cmp < 0) ||
+            (!has_overlapping_endpoints && cmp <= 0)) {
+          // Tombstones starting after upper_bound only need to be included in
+          // the next table. If the current SST ends before upper_bound, i.e.,
+          // `has_overlapping_endpoints == false`, we can also skip over range
+          // tombstones that start exactly at upper_bound. Such range tombstones
+          // will be included in the next file and are not relevant to the point
+          // keys or endpoints of the current file.
+          break;
+        }
+      }
+
+      if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
+        // TODO(andrewkr): tombstones that span multiple output files are
+        // counted for each compaction output file, so lots of double counting.
+        range_del_out_stats->num_range_del_drop_obsolete++;
+        range_del_out_stats->num_record_drop_obsolete++;
+        continue;
+      }
+
+      auto kv = tombstone.Serialize();
+      assert(lower_bound == nullptr ||
+             ucmp->Compare(*lower_bound, kv.second) < 0);
+      sub_compact->builder->Add(kv.first.Encode(), kv.second);
+      InternalKey smallest_candidate = std::move(kv.first);
+      if (lower_bound != nullptr &&
+          ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
+        // Pretend the smallest key has the same user key as lower_bound
+        // (the max key in the previous table or subcompaction) in order for
+        // files to appear key-space partitioned.
+        //
+        // When lower_bound is chosen by a subcompaction, we know that
+        // subcompactions over smaller keys cannot contain any keys at
+        // lower_bound. We also know that smaller subcompactions exist, because
+        // otherwise the subcompaction woud be unbounded on the left. As a
+        // result, we know that no other files on the output level will contain
+        // actual keys at lower_bound (an output file may have a largest key of
+        // lower_bound@kMaxSequenceNumber, but this only indicates a large range
+        // tombstone was truncated). Therefore, it is safe to use the
+        // tombstone's sequence number, to ensure that keys at lower_bound at
+        // lower levels are covered by truncated tombstones.
+        //
+        // If lower_bound was chosen by the smallest data key in the file,
+        // choose lowest seqnum so this file's smallest internal key comes after
+        // the previous file's largest. The fake seqnum is OK because the read
+        // path's file-picking code only considers user key.
+        smallest_candidate = InternalKey(
+            *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
+            kTypeRangeDeletion);
+      }
+      InternalKey largest_candidate = tombstone.SerializeEndKey();
+      if (upper_bound != nullptr &&
+          ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
+        // Pretend the largest key has the same user key as upper_bound (the
+        // min key in the following table or subcompaction) in order for files
+        // to appear key-space partitioned.
+        //
+        // Choose highest seqnum so this file's largest internal key comes
+        // before the next file's/subcompaction's smallest. The fake seqnum is
+        // OK because the read path's file-picking code only considers the user
+        // key portion.
+        //
+        // Note Seek() also creates InternalKey with (user_key,
+        // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
+        // kTypeRangeDeletion (0xF), so the range tombstone comes before the
+        // Seek() key in InternalKey's ordering. So Seek() will look in the
+        // next file for the user key.
+        largest_candidate =
+            InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
+      }
+#ifndef NDEBUG
+      SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
+      if (meta->smallest.size() > 0) {
+        smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
+      }
+#endif
+      meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
+                                     tombstone.seq_,
+                                     cfd->internal_comparator());
+
+      // The smallest key in a file is used for range tombstone truncation, so
+      // it cannot have a seqnum of 0 (unless the smallest data key in a file
+      // has a seqnum of 0). Otherwise, the truncated tombstone may expose
+      // deleted keys at lower levels.
+      assert(smallest_ikey_seqnum == 0 ||
+             ExtractInternalKeyFooter(meta->smallest.Encode()) !=
+                 PackSequenceAndType(0, kTypeRangeDeletion));
+    }
+    meta->marked_for_compaction = sub_compact->builder->NeedCompact();
   }
   const uint64_t current_entries = sub_compact->builder->NumEntries();
-  meta->marked_for_compaction = sub_compact->builder->NeedCompact();
   if (s.ok()) {
     s = sub_compact->builder->Finish();
   } else {
     sub_compact->builder->Abandon();
   }
   const uint64_t current_bytes = sub_compact->builder->FileSize();
-  meta->fd.file_size = current_bytes;
+  if (s.ok()) {
+    meta->fd.file_size = current_bytes;
+  }
   sub_compact->current_output()->finished = true;
   sub_compact->total_bytes += current_bytes;
 
@@ -1079,65 +1299,67 @@ Status CompactionJob::FinishCompactionOutputFile(
   }
   sub_compact->outfile.reset();
 
-  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
   TableProperties tp;
-  if (s.ok() && current_entries > 0) {
-    // Verify that the table is usable
-    // We set for_compaction to false and don't OptimizeForCompactionTableRead
-    // here because this is a special case after we finish the table building
-    // No matter whether use_direct_io_for_flush_and_compaction is true,
-    // we will regrad this verification as user reads since the goal is
-    // to cache it here for further user reads
-    InternalIterator* iter = cfd->table_cache()->NewIterator(
-        ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
-        nullptr /* range_del_agg */, nullptr,
-        cfd->internal_stats()->GetFileReadHist(
-            compact_->compaction->output_level()),
-        false);
-    s = iter->status();
-
-    if (s.ok() && paranoid_file_checks_) {
-      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
-      s = iter->status();
-    }
+  if (s.ok()) {
+    tp = sub_compact->builder->GetTableProperties();
+  }
 
-    delete iter;
+  if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
+    // If there is nothing to output, no necessary to generate a sst file.
+    // This happens when the output level is bottom level, at the same time
+    // the sub_compact output nothing.
+    std::string fname =
+        TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
+                      meta->fd.GetNumber(), meta->fd.GetPathId());
+    env_->DeleteFile(fname);
+
+    // Also need to remove the file from outputs, or it will be added to the
+    // VersionEdit.
+    assert(!sub_compact->outputs.empty());
+    sub_compact->outputs.pop_back();
+    meta = nullptr;
+  }
 
+  if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
     // Output to event logger and fire events.
-    if (s.ok()) {
-      tp = sub_compact->builder->GetTableProperties();
-      sub_compact->current_output()->table_properties =
-          std::make_shared<TableProperties>(tp);
-      ROCKS_LOG_INFO(db_options_.info_log,
-                     "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
-                     " keys, %" PRIu64 " bytes%s",
-                     cfd->GetName().c_str(), job_id_, output_number,
-                     current_entries, current_bytes,
-                     meta->marked_for_compaction ? " (need compaction)" : "");
-    }
+    sub_compact->current_output()->table_properties =
+        std::make_shared<TableProperties>(tp);
+    ROCKS_LOG_INFO(db_options_.info_log,
+                   "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
+                   " keys, %" PRIu64 " bytes%s",
+                   cfd->GetName().c_str(), job_id_, output_number,
+                   current_entries, current_bytes,
+                   meta->marked_for_compaction ? " (need compaction)" : "");
+  }
+  std::string fname;
+  FileDescriptor output_fd;
+  if (meta != nullptr) {
+    fname =
+        TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
+                      meta->fd.GetNumber(), meta->fd.GetPathId());
+    output_fd = meta->fd;
+  } else {
+    fname = "(nil)";
   }
-  std::string fname = TableFileName(db_options_.db_paths, meta->fd.GetNumber(),
-                                    meta->fd.GetPathId());
   EventHelpers::LogAndNotifyTableFileCreationFinished(
       event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
-      job_id_, meta->fd, tp, TableFileCreationReason::kCompaction, s);
+      job_id_, output_fd, tp, TableFileCreationReason::kCompaction, s);
 
 #ifndef ROCKSDB_LITE
   // Report new file to SstFileManagerImpl
   auto sfm =
       static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
-  if (sfm && meta->fd.GetPathId() == 0) {
-    auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
-                            meta->fd.GetPathId());
-    sfm->OnAddFile(fn);
+  if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
+    sfm->OnAddFile(fname);
     if (sfm->IsMaxAllowedSpaceReached()) {
+      // TODO(ajkr): should we return OK() if max space was reached by the final
+      // compaction output file (similarly to how flush works when full)?
+      s = Status::SpaceLimit("Max allowed space was reached");
+      TEST_SYNC_POINT(
+          "CompactionJob::FinishCompactionOutputFile:"
+          "MaxAllowedSpaceReached");
       InstrumentedMutexLock l(db_mutex_);
-      if (db_bg_error_->ok()) {
-        s = Status::IOError("Max allowed space was reached");
-        *db_bg_error_ = s;
-        TEST_SYNC_POINT(
-            "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
-      }
+      db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
     }
   }
 #endif
@@ -1173,7 +1395,7 @@ Status CompactionJob::InstallCompactionResults(
         compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
   }
 
-  // Add compaction outputs
+  // Add compaction inputs
   compaction->AddInputDeletions(compact_->compaction->edit());
 
   for (const auto& sub_compact : compact_->sub_compact_states) {
@@ -1203,8 +1425,9 @@ Status CompactionJob::OpenCompactionOutputFile(
   assert(sub_compact->builder == nullptr);
   // no need to lock because VersionSet::next_file_number_ is atomic
   uint64_t file_number = versions_->NewFileNumber();
-  std::string fname = TableFileName(db_options_.db_paths, file_number,
-                                    sub_compact->compaction->output_path_id());
+  std::string fname =
+      TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
+                    file_number, sub_compact->compaction->output_path_id());
   // Fire events.
   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
 #ifndef ROCKSDB_LITE
@@ -1213,12 +1436,13 @@ Status CompactionJob::OpenCompactionOutputFile(
       TableFileCreationReason::kCompaction);
 #endif  // !ROCKSDB_LITE
   // Make the output file
-  unique_ptr<WritableFile> writable_file;
-  EnvOptions opt_env_opts =
-      env_->OptimizeForCompactionTableWrite(env_options_, db_options_);
+  std::unique_ptr<WritableFile> writable_file;
+#ifndef NDEBUG
+  bool syncpoint_arg = env_options_.use_direct_writes;
   TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
-                           &opt_env_opts.use_direct_writes);
-  Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts);
+                           &syncpoint_arg);
+#endif
+  Status s = NewWritableFile(env_, fname, &writable_file, env_options_);
   if (!s.ok()) {
     ROCKS_LOG_ERROR(
         db_options_.info_log,
@@ -1241,24 +1465,47 @@ Status CompactionJob::OpenCompactionOutputFile(
 
   sub_compact->outputs.push_back(out);
   writable_file->SetIOPriority(Env::IO_LOW);
+  writable_file->SetWriteLifeTimeHint(write_hint_);
   writable_file->SetPreallocationBlockSize(static_cast<size_t>(
       sub_compact->compaction->OutputFilePreallocationSize()));
-  sub_compact->outfile.reset(new WritableFileWriter(
-      std::move(writable_file), env_options_, db_options_.statistics.get()));
+  const auto& listeners =
+      sub_compact->compaction->immutable_cf_options()->listeners;
+  sub_compact->outfile.reset(
+      new WritableFileWriter(std::move(writable_file), fname, env_options_,
+                             env_, db_options_.statistics.get(), listeners));
 
   // If the Column family flag is to only optimize filters for hits,
   // we can skip creating filters if this is the bottommost_level where
   // data is going to be found
   bool skip_filters =
       cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
+
+  uint64_t output_file_creation_time =
+      sub_compact->compaction->MaxInputFileCreationTime();
+  if (output_file_creation_time == 0) {
+    int64_t _current_time = 0;
+    auto status = db_options_.env->GetCurrentTime(&_current_time);
+    // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
+    if (!status.ok()) {
+      ROCKS_LOG_WARN(
+          db_options_.info_log,
+          "Failed to get current time to populate creation_time property. "
+          "Status: %s",
+          status.ToString().c_str());
+    }
+    output_file_creation_time = static_cast<uint64_t>(_current_time);
+  }
+
   sub_compact->builder.reset(NewTableBuilder(
-      *cfd->ioptions(), cfd->internal_comparator(),
-      cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
-      sub_compact->outfile.get(), sub_compact->compaction->output_compression(),
-      cfd->ioptions()->compression_opts,
-      sub_compact->compaction->output_level(),
-      &sub_compact->compression_dict,
-      skip_filters));
+      *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
+      cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
+      cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
+      sub_compact->compaction->output_compression(),
+      0 /*sample_for_compression */,
+      sub_compact->compaction->output_compression_opts(),
+      sub_compact->compaction->output_level(), skip_filters,
+      output_file_creation_time, 0 /* oldest_key_time */,
+      sub_compact->compaction->max_output_file_size()));
   LogFlush(db_options_.info_log);
   return s;
 }
@@ -1288,8 +1535,7 @@ void CompactionJob::CleanupCompaction() {
 
 #ifndef ROCKSDB_LITE
 namespace {
-void CopyPrefix(
-    const Slice& src, size_t prefix_length, std::string* dst) {
+void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
   assert(prefix_length > 0);
   size_t length = src.size() > prefix_length ? prefix_length : src.size();
   dst->assign(src.data(), length);
@@ -1308,13 +1554,11 @@ void CompactionJob::UpdateCompactionStats() {
     if (compaction->level(input_level) != compaction->output_level()) {
       UpdateCompactionInputStatsHelper(
           &compaction_stats_.num_input_files_in_non_output_levels,
-          &compaction_stats_.bytes_read_non_output_levels,
-          input_level);
+          &compaction_stats_.bytes_read_non_output_levels, input_level);
     } else {
       UpdateCompactionInputStatsHelper(
           &compaction_stats_.num_input_files_in_output_level,
-          &compaction_stats_.bytes_read_output_level,
-          input_level);
+          &compaction_stats_.bytes_read_output_level, input_level);
     }
   }
 
@@ -1337,8 +1581,9 @@ void CompactionJob::UpdateCompactionStats() {
   }
 }
 
-void CompactionJob::UpdateCompactionInputStatsHelper(
-    int* num_files, uint64_t* bytes_read, int input_level) {
+void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
+                                                     uint64_t* bytes_read,
+                                                     int input_level) {
   const Compaction* compaction = compact_->compaction;
   auto num_input_files = compaction->num_input_files(input_level);
   *num_files += static_cast<int>(num_input_files);
@@ -1359,10 +1604,8 @@ void CompactionJob::UpdateCompactionJobStats(
 
     // input information
     compaction_job_stats_->total_input_bytes =
-        stats.bytes_read_non_output_levels +
-        stats.bytes_read_output_level;
-    compaction_job_stats_->num_input_records =
-        compact_->num_input_records;
+        stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
+    compaction_job_stats_->num_input_records = compact_->num_input_records;
     compaction_job_stats_->num_input_files =
         stats.num_input_files_in_non_output_levels +
         stats.num_input_files_in_output_level;
@@ -1371,21 +1614,20 @@ void CompactionJob::UpdateCompactionJobStats(
 
     // output information
     compaction_job_stats_->total_output_bytes = stats.bytes_written;
-    compaction_job_stats_->num_output_records =
-        compact_->num_output_records;
+    compaction_job_stats_->num_output_records = compact_->num_output_records;
     compaction_job_stats_->num_output_files = stats.num_output_files;
 
     if (compact_->NumOutputFiles() > 0U) {
-      CopyPrefix(
-          compact_->SmallestUserKey(),
-          CompactionJobStats::kMaxPrefixLength,
-          &compaction_job_stats_->smallest_output_key_prefix);
-      CopyPrefix(
-          compact_->LargestUserKey(),
-          CompactionJobStats::kMaxPrefixLength,
-          &compaction_job_stats_->largest_output_key_prefix);
+      CopyPrefix(compact_->SmallestUserKey(),
+                 CompactionJobStats::kMaxPrefixLength,
+                 &compaction_job_stats_->smallest_output_key_prefix);
+      CopyPrefix(compact_->LargestUserKey(),
+                 CompactionJobStats::kMaxPrefixLength,
+                 &compaction_job_stats_->largest_output_key_prefix);
     }
   }
+#else
+  (void)stats;
 #endif  // !ROCKSDB_LITE
 }
 
@@ -1408,7 +1650,9 @@ void CompactionJob::LogCompaction() {
     // build event logger report
     auto stream = event_logger_->Log();
     stream << "job" << job_id_ << "event"
-           << "compaction_started";
+           << "compaction_started"
+           << "compaction_reason"
+           << GetCompactionReasonString(compaction->compaction_reason());
     for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
       stream << ("files_L" + ToString(compaction->level(i)));
       stream.StartArray();