]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction_job.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / compaction_job.cc
index b0a19ead40aebcf929fbb0172dc3536457a722b7..65e9719a396dcc4a1b39061601db7057e20c2009 100644 (file)
@@ -36,6 +36,7 @@
 #include "db/memtable_list.h"
 #include "db/merge_context.h"
 #include "db/merge_helper.h"
+#include "db/range_del_aggregator.h"
 #include "db/version_set.h"
 #include "monitoring/iostats_context_imp.h"
 #include "monitoring/perf_context_imp.h"
@@ -156,7 +157,6 @@ struct CompactionJob::SubcompactionState {
   uint64_t overlapped_bytes = 0;
   // A flag determine whether the key has been seen in ShouldStopBefore()
   bool seen_key = false;
-  std::string compression_dict;
 
   SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
                      uint64_t size = 0)
@@ -172,8 +172,7 @@ struct CompactionJob::SubcompactionState {
         approx_size(size),
         grandparent_index(0),
         overlapped_bytes(0),
-        seen_key(false),
-        compression_dict() {
+        seen_key(false) {
     assert(compaction != nullptr);
   }
 
@@ -196,11 +195,10 @@ struct CompactionJob::SubcompactionState {
     grandparent_index = std::move(o.grandparent_index);
     overlapped_bytes = std::move(o.overlapped_bytes);
     seen_key = std::move(o.seen_key);
-    compression_dict = std::move(o.compression_dict);
     return *this;
   }
 
-  // Because member unique_ptrs do not have these.
+  // Because member std::unique_ptrs do not have these.
   SubcompactionState(const SubcompactionState&) = delete;
 
   SubcompactionState& operator=(const SubcompactionState&) = delete;
@@ -314,7 +312,8 @@ CompactionJob::CompactionJob(
     SequenceNumber earliest_write_conflict_snapshot,
     const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
     EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
-    const std::string& dbname, CompactionJobStats* compaction_job_stats)
+    const std::string& dbname, CompactionJobStats* compaction_job_stats,
+    Env::Priority thread_pri)
     : job_id_(job_id),
       compact_(new CompactionState(compaction)),
       compaction_job_stats_(compaction_job_stats),
@@ -342,7 +341,8 @@ CompactionJob::CompactionJob(
       bottommost_level_(false),
       paranoid_file_checks_(paranoid_file_checks),
       measure_io_stats_(measure_io_stats),
-      write_hint_(Env::WLTH_NOT_SET) {
+      write_hint_(Env::WLTH_NOT_SET),
+      thread_pri_(thread_pri) {
   assert(log_buffer_ != nullptr);
   const auto* cfd = compact_->compaction->column_family_data();
   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
@@ -416,11 +416,10 @@ void CompactionJob::Prepare() {
   bottommost_level_ = c->bottommost_level();
 
   if (c->ShouldFormSubcompactions()) {
-    const uint64_t start_micros = env_->NowMicros();
-    GenSubcompactionBoundaries();
-    MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME,
-                env_->NowMicros() - start_micros);
-
+    {
+      StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
+      GenSubcompactionBoundaries();
+    }
     assert(sizes_.size() == boundaries_.size() + 1);
 
     for (size_t i = 0; i <= boundaries_.size(); i++) {
@@ -428,8 +427,8 @@ void CompactionJob::Prepare() {
       Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
       compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
     }
-    MeasureTime(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
-                compact_->sub_compact_states.size());
+    RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
+                      compact_->sub_compact_states.size());
   } else {
     compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
   }
@@ -511,7 +510,10 @@ void CompactionJob::GenSubcompactionBoundaries() {
   // size of data covered by keys in that range
   uint64_t sum = 0;
   std::vector<RangeWithSize> ranges;
-  auto* v = cfd->current();
+  // Get input version from CompactionState since it's already referenced
+  // earlier in SetInputVersioCompaction::SetInputVersion and will not change
+  // when db_mutex_ is released below
+  auto* v = compact_->compaction->input_version();
   for (auto it = bounds.begin();;) {
     const Slice a = *it;
     it++;
@@ -521,7 +523,13 @@ void CompactionJob::GenSubcompactionBoundaries() {
     }
 
     const Slice b = *it;
+
+    // ApproximateSize could potentially create table reader iterator to seek
+    // to the index block and may incur I/O cost in the process. Unlock db
+    // mutex to reduce contention
+    db_mutex_->Unlock();
     uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1);
+    db_mutex_->Lock();
     ranges.emplace_back(a, b, size);
     sum += size;
   }
@@ -594,7 +602,15 @@ Status CompactionJob::Run() {
   }
 
   compaction_stats_.micros = env_->NowMicros() - start_micros;
-  MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
+  compaction_stats_.cpu_micros = 0;
+  for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
+    compaction_stats_.cpu_micros +=
+        compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
+  }
+
+  RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
+  RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
+                        compaction_stats_.cpu_micros);
 
   TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
 
@@ -703,7 +719,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
   Status status = compact_->status;
   ColumnFamilyData* cfd = compact_->compaction->column_family_data();
   cfd->internal_stats()->AddCompactionStats(
-      compact_->compaction->output_level(), compaction_stats_);
+      compact_->compaction->output_level(), thread_pri_, compaction_stats_);
 
   if (status.ok()) {
     status = InstallCompactionResults(mutable_cf_options);
@@ -757,6 +773,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
   stream << "job" << job_id_ << "event"
          << "compaction_finished"
          << "compaction_time_micros" << compaction_stats_.micros
+         << "compaction_time_cpu_micros" << compaction_stats_.cpu_micros
          << "output_level" << compact_->compaction->output_level()
          << "num_output_files" << compact_->NumOutputFiles()
          << "total_output_size" << compact_->total_bytes << "num_input_records"
@@ -794,11 +811,35 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
 
 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   assert(sub_compact != nullptr);
+
+  uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
+
   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
-  std::unique_ptr<RangeDelAggregator> range_del_agg(
-      new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_));
+
+  // Create compaction filter and fail the compaction if
+  // IgnoreSnapshots() = false because it is not supported anymore
+  const CompactionFilter* compaction_filter =
+      cfd->ioptions()->compaction_filter;
+  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
+  if (compaction_filter == nullptr) {
+    compaction_filter_from_factory =
+        sub_compact->compaction->CreateCompactionFilter();
+    compaction_filter = compaction_filter_from_factory.get();
+  }
+  if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
+    sub_compact->status = Status::NotSupported(
+        "CompactionFilter::IgnoreSnapshots() = false is not supported "
+        "anymore.");
+    return;
+  }
+
+  CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
+                                             existing_snapshots_);
+
+  // Although the v2 aggregator is what the level iterator(s) know about,
+  // the AddTombstones calls will be propagated down to the v1 aggregator.
   std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
-      sub_compact->compaction, range_del_agg.get(), env_optiosn_for_read_));
+      sub_compact->compaction, &range_del_agg, env_optiosn_for_read_));
 
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
@@ -810,58 +851,19 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   uint64_t prev_fsync_nanos = 0;
   uint64_t prev_range_sync_nanos = 0;
   uint64_t prev_prepare_write_nanos = 0;
+  uint64_t prev_cpu_write_nanos = 0;
+  uint64_t prev_cpu_read_nanos = 0;
   if (measure_io_stats_) {
     prev_perf_level = GetPerfLevel();
-    SetPerfLevel(PerfLevel::kEnableTime);
+    SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
     prev_write_nanos = IOSTATS(write_nanos);
     prev_fsync_nanos = IOSTATS(fsync_nanos);
     prev_range_sync_nanos = IOSTATS(range_sync_nanos);
     prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
+    prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
+    prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
   }
 
-  const MutableCFOptions* mutable_cf_options =
-      sub_compact->compaction->mutable_cf_options();
-
-  // To build compression dictionary, we sample the first output file, assuming
-  // it'll reach the maximum length. We optionally pass these samples through
-  // zstd's dictionary trainer, or just use them directly. Then, the dictionary
-  // is used for compressing subsequent output files in the same subcompaction.
-  const bool kUseZstdTrainer =
-      sub_compact->compaction->output_compression_opts().zstd_max_train_bytes >
-      0;
-  const size_t kSampleBytes =
-      kUseZstdTrainer
-          ? sub_compact->compaction->output_compression_opts()
-                .zstd_max_train_bytes
-          : sub_compact->compaction->output_compression_opts().max_dict_bytes;
-  const int kSampleLenShift = 6;  // 2^6 = 64-byte samples
-  std::set<size_t> sample_begin_offsets;
-  if (bottommost_level_ && kSampleBytes > 0) {
-    const size_t kMaxSamples = kSampleBytes >> kSampleLenShift;
-    const size_t kOutFileLen =
-        static_cast<size_t>(MaxFileSizeForLevel(*mutable_cf_options,
-            compact_->compaction->output_level(),
-            cfd->ioptions()->compaction_style,
-            compact_->compaction->GetInputBaseLevel(),
-            cfd->ioptions()->level_compaction_dynamic_level_bytes));
-    if (kOutFileLen != port::kMaxSizet) {
-      const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift;
-      Random64 generator{versions_->NewFileNumber()};
-      for (size_t i = 0; i < kMaxSamples; ++i) {
-        sample_begin_offsets.insert(
-            static_cast<size_t>(generator.Uniform(kOutFileNumSamples))
-            << kSampleLenShift);
-      }
-    }
-  }
-
-  auto compaction_filter = cfd->ioptions()->compaction_filter;
-  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
-  if (compaction_filter == nullptr) {
-    compaction_filter_from_factory =
-        sub_compact->compaction->CreateCompactionFilter();
-    compaction_filter = compaction_filter_from_factory.get();
-  }
   MergeHelper merge(
       env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
       compaction_filter, db_options_.info_log.get(),
@@ -887,7 +889,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
       input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
       &existing_snapshots_, earliest_write_conflict_snapshot_,
       snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
-      range_del_agg.get(), sub_compact->compaction, compaction_filter,
+      &range_del_agg, sub_compact->compaction, compaction_filter,
       shutting_down_, preserve_deletes_seqnum_));
   auto c_iter = sub_compact->c_iter.get();
   c_iter->SeekToFirst();
@@ -899,12 +901,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
                                   sub_compact->current_output_file_size);
   }
   const auto& c_iter_stats = c_iter->iter_stats();
-  auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
-  // data_begin_offset and dict_sample_data are only valid while generating
-  // dictionary from the first output file.
-  size_t data_begin_offset = 0;
-  std::string dict_sample_data;
-  dict_sample_data.reserve(kSampleBytes);
 
   while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
     // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
@@ -940,55 +936,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
         key, c_iter->ikey().sequence);
     sub_compact->num_output_records++;
 
-    if (sub_compact->outputs.size() == 1) {  // first output file
-      // Check if this key/value overlaps any sample intervals; if so, appends
-      // overlapping portions to the dictionary.
-      for (const auto& data_elmt : {key, value}) {
-        size_t data_end_offset = data_begin_offset + data_elmt.size();
-        while (sample_begin_offset_iter != sample_begin_offsets.cend() &&
-               *sample_begin_offset_iter < data_end_offset) {
-          size_t sample_end_offset =
-              *sample_begin_offset_iter + (1 << kSampleLenShift);
-          // Invariant: Because we advance sample iterator while processing the
-          // data_elmt containing the sample's last byte, the current sample
-          // cannot end before the current data_elmt.
-          assert(data_begin_offset < sample_end_offset);
-
-          size_t data_elmt_copy_offset, data_elmt_copy_len;
-          if (*sample_begin_offset_iter <= data_begin_offset) {
-            // The sample starts before data_elmt starts, so take bytes starting
-            // at the beginning of data_elmt.
-            data_elmt_copy_offset = 0;
-          } else {
-            // data_elmt starts before the sample starts, so take bytes starting
-            // at the below offset into data_elmt.
-            data_elmt_copy_offset =
-                *sample_begin_offset_iter - data_begin_offset;
-          }
-          if (sample_end_offset <= data_end_offset) {
-            // The sample ends before data_elmt ends, so take as many bytes as
-            // needed.
-            data_elmt_copy_len =
-                sample_end_offset - (data_begin_offset + data_elmt_copy_offset);
-          } else {
-            // data_elmt ends before the sample ends, so take all remaining
-            // bytes in data_elmt.
-            data_elmt_copy_len =
-                data_end_offset - (data_begin_offset + data_elmt_copy_offset);
-          }
-          dict_sample_data.append(&data_elmt.data()[data_elmt_copy_offset],
-                                  data_elmt_copy_len);
-          if (sample_end_offset > data_end_offset) {
-            // Didn't finish sample. Try to finish it with the next data_elmt.
-            break;
-          }
-          // Next sample may require bytes from same data_elmt.
-          sample_begin_offset_iter++;
-        }
-        data_begin_offset = data_end_offset;
-      }
-    }
-
     // Close output file if it is big enough. Two possibilities determine it's
     // time to close it: (1) the current key should be this file's last key, (2)
     // the next key should not be in this file.
@@ -1025,23 +972,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
         next_key = &c_iter->key();
       }
       CompactionIterationStats range_del_out_stats;
-      status = FinishCompactionOutputFile(input_status, sub_compact,
-                                          range_del_agg.get(),
-                                          &range_del_out_stats, next_key);
+      status =
+          FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
+                                     &range_del_out_stats, next_key);
       RecordDroppedKeys(range_del_out_stats,
                         &sub_compact->compaction_job_stats);
-      if (sub_compact->outputs.size() == 1) {
-        // Use samples from first output file to create dictionary for
-        // compression of subsequent files.
-        if (kUseZstdTrainer) {
-          sub_compact->compression_dict = ZSTD_TrainDictionary(
-              dict_sample_data, kSampleLenShift,
-              sub_compact->compaction->output_compression_opts()
-                  .max_dict_bytes);
-        } else {
-          sub_compact->compression_dict = std::move(dict_sample_data);
-        }
-      }
     }
   }
 
@@ -1077,8 +1012,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   }
 
   if (status.ok() && sub_compact->builder == nullptr &&
-      sub_compact->outputs.size() == 0 &&
-      !range_del_agg->IsEmpty()) {
+      sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
     // handle subcompaction containing only range deletions
     status = OpenCompactionOutputFile(sub_compact);
   }
@@ -1087,14 +1021,17 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   // close the output file.
   if (sub_compact->builder != nullptr) {
     CompactionIterationStats range_del_out_stats;
-    Status s = FinishCompactionOutputFile(
-        status, sub_compact, range_del_agg.get(), &range_del_out_stats);
+    Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
+                                          &range_del_out_stats);
     if (status.ok()) {
       status = s;
     }
     RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
   }
 
+  sub_compact->compaction_job_stats.cpu_micros =
+      env_->NowCPUNanos() / 1000 - prev_cpu_micros;
+
   if (measure_io_stats_) {
     sub_compact->compaction_job_stats.file_write_nanos +=
         IOSTATS(write_nanos) - prev_write_nanos;
@@ -1104,7 +1041,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
         IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
     sub_compact->compaction_job_stats.file_prepare_write_nanos +=
         IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
-    if (prev_perf_level != PerfLevel::kEnableTime) {
+    sub_compact->compaction_job_stats.cpu_micros -=
+        (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
+         IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
+        1000;
+    if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
       SetPerfLevel(prev_perf_level);
     }
   }
@@ -1153,7 +1094,7 @@ void CompactionJob::RecordDroppedKeys(
 
 Status CompactionJob::FinishCompactionOutputFile(
     const Status& input_status, SubcompactionState* sub_compact,
-    RangeDelAggregator* range_del_agg,
+    CompactionRangeDelAggregator* range_del_agg,
     CompactionIterationStats* range_del_out_stats,
     const Slice* next_table_min_key /* = nullptr */) {
   AutoThreadOperationStageUpdater stage_updater(
@@ -1177,10 +1118,12 @@ Status CompactionJob::FinishCompactionOutputFile(
     Slice lower_bound_guard, upper_bound_guard;
     std::string smallest_user_key;
     const Slice *lower_bound, *upper_bound;
+    bool lower_bound_from_sub_compact = false;
     if (sub_compact->outputs.size() == 1) {
       // For the first output table, include range tombstones before the min key
       // but after the subcompaction boundary.
       lower_bound = sub_compact->start;
+      lower_bound_from_sub_compact = true;
     } else if (meta->smallest.size() > 0) {
       // For subsequent output tables, only include range tombstones from min
       // key onwards since the previous file was extended to contain range
@@ -1192,10 +1135,19 @@ Status CompactionJob::FinishCompactionOutputFile(
       lower_bound = nullptr;
     }
     if (next_table_min_key != nullptr) {
-      // This isn't the last file in the subcompaction, so extend until the next
-      // file starts.
+      // This may be the last file in the subcompaction in some cases, so we
+      // need to compare the end key of subcompaction with the next file start
+      // key. When the end key is chosen by the subcompaction, we know that
+      // it must be the biggest key in output file. Therefore, it is safe to
+      // use the smaller key as the upper bound of the output file, to ensure
+      // that there is no overlapping between different output files.
       upper_bound_guard = ExtractUserKey(*next_table_min_key);
-      upper_bound = &upper_bound_guard;
+      if (sub_compact->end != nullptr &&
+          ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
+        upper_bound = sub_compact->end;
+      } else {
+        upper_bound = &upper_bound_guard;
+      }
     } else {
       // This is the last file in the subcompaction, so extend until the
       // subcompaction ends.
@@ -1205,18 +1157,45 @@ Status CompactionJob::FinishCompactionOutputFile(
     if (existing_snapshots_.size() > 0) {
       earliest_snapshot = existing_snapshots_[0];
     }
-    auto it = range_del_agg->NewIterator();
+    bool has_overlapping_endpoints;
+    if (upper_bound != nullptr && meta->largest.size() > 0) {
+      has_overlapping_endpoints =
+          ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
+    } else {
+      has_overlapping_endpoints = false;
+    }
+
+    // The end key of the subcompaction must be bigger or equal to the upper
+    // bound. If the end of subcompaction is null or the upper bound is null,
+    // it means that this file is the last file in the compaction. So there
+    // will be no overlapping between this file and others.
+    assert(sub_compact->end == nullptr ||
+           upper_bound == nullptr ||
+           ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
+    auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
+                                         has_overlapping_endpoints);
+    // Position the range tombstone output iterator. There may be tombstone
+    // fragments that are entirely out of range, so make sure that we do not
+    // include those.
     if (lower_bound != nullptr) {
       it->Seek(*lower_bound);
+    } else {
+      it->SeekToFirst();
     }
     for (; it->Valid(); it->Next()) {
       auto tombstone = it->Tombstone();
-      if (upper_bound != nullptr &&
-          ucmp->Compare(*upper_bound, tombstone.start_key_) <= 0) {
-        // Tombstones starting at upper_bound or later only need to be included
-        // in the next table. Break because subsequent tombstones will start
-        // even later.
-        break;
+      if (upper_bound != nullptr) {
+        int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
+        if ((has_overlapping_endpoints && cmp < 0) ||
+            (!has_overlapping_endpoints && cmp <= 0)) {
+          // Tombstones starting after upper_bound only need to be included in
+          // the next table. If the current SST ends before upper_bound, i.e.,
+          // `has_overlapping_endpoints == false`, we can also skip over range
+          // tombstones that start exactly at upper_bound. Such range tombstones
+          // will be included in the next file and are not relevant to the point
+          // keys or endpoints of the current file.
+          break;
+        }
       }
 
       if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
@@ -1228,6 +1207,8 @@ Status CompactionJob::FinishCompactionOutputFile(
       }
 
       auto kv = tombstone.Serialize();
+      assert(lower_bound == nullptr ||
+             ucmp->Compare(*lower_bound, kv.second) < 0);
       sub_compact->builder->Add(kv.first.Encode(), kv.second);
       InternalKey smallest_candidate = std::move(kv.first);
       if (lower_bound != nullptr &&
@@ -1236,11 +1217,24 @@ Status CompactionJob::FinishCompactionOutputFile(
         // (the max key in the previous table or subcompaction) in order for
         // files to appear key-space partitioned.
         //
-        // Choose lowest seqnum so this file's smallest internal key comes
-        // after the previous file's/subcompaction's largest. The fake seqnum
-        // is OK because the read path's file-picking code only considers user
-        // key.
-        smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion);
+        // When lower_bound is chosen by a subcompaction, we know that
+        // subcompactions over smaller keys cannot contain any keys at
+        // lower_bound. We also know that smaller subcompactions exist, because
+        // otherwise the subcompaction woud be unbounded on the left. As a
+        // result, we know that no other files on the output level will contain
+        // actual keys at lower_bound (an output file may have a largest key of
+        // lower_bound@kMaxSequenceNumber, but this only indicates a large range
+        // tombstone was truncated). Therefore, it is safe to use the
+        // tombstone's sequence number, to ensure that keys at lower_bound at
+        // lower levels are covered by truncated tombstones.
+        //
+        // If lower_bound was chosen by the smallest data key in the file,
+        // choose lowest seqnum so this file's smallest internal key comes after
+        // the previous file's largest. The fake seqnum is OK because the read
+        // path's file-picking code only considers user key.
+        smallest_candidate = InternalKey(
+            *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
+            kTypeRangeDeletion);
       }
       InternalKey largest_candidate = tombstone.SerializeEndKey();
       if (upper_bound != nullptr &&
@@ -1262,9 +1256,23 @@ Status CompactionJob::FinishCompactionOutputFile(
         largest_candidate =
             InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
       }
+#ifndef NDEBUG
+      SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
+      if (meta->smallest.size() > 0) {
+        smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
+      }
+#endif
       meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
                                      tombstone.seq_,
                                      cfd->internal_comparator());
+
+      // The smallest key in a file is used for range tombstone truncation, so
+      // it cannot have a seqnum of 0 (unless the smallest data key in a file
+      // has a seqnum of 0). Otherwise, the truncated tombstone may expose
+      // deleted keys at lower levels.
+      assert(smallest_ikey_seqnum == 0 ||
+             ExtractInternalKeyFooter(meta->smallest.Encode()) !=
+                 PackSequenceAndType(0, kTypeRangeDeletion));
     }
     meta->marked_for_compaction = sub_compact->builder->NeedCompact();
   }
@@ -1342,10 +1350,7 @@ Status CompactionJob::FinishCompactionOutputFile(
   auto sfm =
       static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
   if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
-    auto fn =
-        TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
-                      meta->fd.GetNumber(), meta->fd.GetPathId());
-    sfm->OnAddFile(fn);
+    sfm->OnAddFile(fname);
     if (sfm->IsMaxAllowedSpaceReached()) {
       // TODO(ajkr): should we return OK() if max space was reached by the final
       // compaction output file (similarly to how flush works when full)?
@@ -1431,7 +1436,7 @@ Status CompactionJob::OpenCompactionOutputFile(
       TableFileCreationReason::kCompaction);
 #endif  // !ROCKSDB_LITE
   // Make the output file
-  unique_ptr<WritableFile> writable_file;
+  std::unique_ptr<WritableFile> writable_file;
 #ifndef NDEBUG
   bool syncpoint_arg = env_options_.use_direct_writes;
   TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
@@ -1463,9 +1468,11 @@ Status CompactionJob::OpenCompactionOutputFile(
   writable_file->SetWriteLifeTimeHint(write_hint_);
   writable_file->SetPreallocationBlockSize(static_cast<size_t>(
       sub_compact->compaction->OutputFilePreallocationSize()));
+  const auto& listeners =
+      sub_compact->compaction->immutable_cf_options()->listeners;
   sub_compact->outfile.reset(
       new WritableFileWriter(std::move(writable_file), fname, env_options_,
-                             db_options_.statistics.get()));
+                             env_, db_options_.statistics.get(), listeners));
 
   // If the Column family flag is to only optimize filters for hits,
   // we can skip creating filters if this is the bottommost_level where
@@ -1494,9 +1501,11 @@ Status CompactionJob::OpenCompactionOutputFile(
       cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
       cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
       sub_compact->compaction->output_compression(),
+      0 /*sample_for_compression */,
       sub_compact->compaction->output_compression_opts(),
-      sub_compact->compaction->output_level(), &sub_compact->compression_dict,
-      skip_filters, output_file_creation_time));
+      sub_compact->compaction->output_level(), skip_filters,
+      output_file_creation_time, 0 /* oldest_key_time */,
+      sub_compact->compaction->max_output_file_size()));
   LogFlush(db_options_.info_log);
   return s;
 }