]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction/compaction_job.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_job.cc
index 576ec7b4592d5b499209d7a899fc4e10d67f1980..a517a2015b9d70724575f2c6ad69f83db44b3824 100644 (file)
@@ -7,6 +7,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
+#include "db/compaction/compaction_job.h"
+
 #include <algorithm>
 #include <cinttypes>
 #include <functional>
@@ -18,8 +20,9 @@
 #include <utility>
 #include <vector>
 
+#include "db/blob/blob_file_addition.h"
+#include "db/blob/blob_file_builder.h"
 #include "db/builder.h"
-#include "db/compaction/compaction_job.h"
 #include "db/db_impl/db_impl.h"
 #include "db/db_iter.h"
 #include "db/dbformat.h"
@@ -31,6 +34,7 @@
 #include "db/memtable_list.h"
 #include "db/merge_context.h"
 #include "db/merge_helper.h"
+#include "db/output_validator.h"
 #include "db/range_del_aggregator.h"
 #include "db/version_set.h"
 #include "file/filename.h"
@@ -45,6 +49,7 @@
 #include "port/port.h"
 #include "rocksdb/db.h"
 #include "rocksdb/env.h"
+#include "rocksdb/sst_partitioner.h"
 #include "rocksdb/statistics.h"
 #include "rocksdb/status.h"
 #include "rocksdb/table.h"
@@ -54,6 +59,7 @@
 #include "table/table_builder.h"
 #include "test_util/sync_point.h"
 #include "util/coding.h"
+#include "util/hash.h"
 #include "util/mutexlock.h"
 #include "util/random.h"
 #include "util/stop_watch.h"
@@ -116,20 +122,31 @@ struct CompactionJob::SubcompactionState {
   // The return status of this subcompaction
   Status status;
 
+  // The return IO Status of this subcompaction
+  IOStatus io_status;
+
   // Files produced by this subcompaction
   struct Output {
+    Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
+           bool _enable_order_check, bool _enable_hash)
+        : meta(std::move(_meta)),
+          validator(_icmp, _enable_order_check, _enable_hash),
+          finished(false) {}
     FileMetaData meta;
+    OutputValidator validator;
     bool finished;
     std::shared_ptr<const TableProperties> table_properties;
   };
 
   // State kept for output being generated
   std::vector<Output> outputs;
+  std::vector<BlobFileAddition> blob_file_additions;
   std::unique_ptr<WritableFileWriter> outfile;
   std::unique_ptr<TableBuilder> builder;
+
   Output* current_output() {
     if (outputs.empty()) {
-      // This subcompaction's outptut could be empty if compaction was aborted
+      // This subcompaction's output could be empty if compaction was aborted
       // before this subcompaction had a chance to generate any output files.
       // When subcompactions are executed sequentially this is more likely and
       // will be particulalry likely for the later subcompactions to be empty.
@@ -140,13 +157,13 @@ struct CompactionJob::SubcompactionState {
     }
   }
 
-  uint64_t current_output_file_size;
+  uint64_t current_output_file_size = 0;
 
   // State during the subcompaction
-  uint64_t total_bytes;
-  uint64_t num_output_records;
+  uint64_t total_bytes = 0;
+  uint64_t num_output_records = 0;
   CompactionJobStats compaction_job_stats;
-  uint64_t approx_size;
+  uint64_t approx_size = 0;
   // An index that used to speed up ShouldStopBefore().
   size_t grandparent_index = 0;
   // The number of bytes overlapping between the current output and
@@ -155,49 +172,25 @@ struct CompactionJob::SubcompactionState {
   // A flag determine whether the key has been seen in ShouldStopBefore()
   bool seen_key = false;
 
-  SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
-                     uint64_t size = 0)
-      : compaction(c),
-        start(_start),
-        end(_end),
-        outfile(nullptr),
-        builder(nullptr),
-        current_output_file_size(0),
-        total_bytes(0),
-        num_output_records(0),
-        approx_size(size),
-        grandparent_index(0),
-        overlapped_bytes(0),
-        seen_key(false) {
+  SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size)
+      : compaction(c), start(_start), end(_end), approx_size(size) {
     assert(compaction != nullptr);
   }
 
-  SubcompactionState(SubcompactionState&& o) { *this = std::move(o); }
-
-  SubcompactionState& operator=(SubcompactionState&& o) {
-    compaction = std::move(o.compaction);
-    start = std::move(o.start);
-    end = std::move(o.end);
-    status = std::move(o.status);
-    outputs = std::move(o.outputs);
-    outfile = std::move(o.outfile);
-    builder = std::move(o.builder);
-    current_output_file_size = std::move(o.current_output_file_size);
-    total_bytes = std::move(o.total_bytes);
-    num_output_records = std::move(o.num_output_records);
-    compaction_job_stats = std::move(o.compaction_job_stats);
-    approx_size = std::move(o.approx_size);
-    grandparent_index = std::move(o.grandparent_index);
-    overlapped_bytes = std::move(o.overlapped_bytes);
-    seen_key = std::move(o.seen_key);
-    return *this;
+  // Adds the key and value to the builder
+  // If paranoid is true, adds the key-value to the paranoid hash
+  Status AddToBuilder(const Slice& key, const Slice& value) {
+    auto curr = current_output();
+    assert(builder != nullptr);
+    assert(curr != nullptr);
+    Status s = curr->validator.Add(key, value);
+    if (!s.ok()) {
+      return s;
+    }
+    builder->Add(key, value);
+    return Status::OK();
   }
 
-  // Because member std::unique_ptrs do not have these.
-  SubcompactionState(const SubcompactionState&) = delete;
-
-  SubcompactionState& operator=(const SubcompactionState&) = delete;
-
   // Returns true iff we should stop building the current output
   // before processing "internal_key".
   bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
@@ -241,21 +234,13 @@ struct CompactionJob::CompactionState {
   std::vector<CompactionJob::SubcompactionState> sub_compact_states;
   Status status;
 
-  uint64_t total_bytes;
-  uint64_t num_output_records;
+  size_t num_output_files = 0;
+  uint64_t total_bytes = 0;
+  size_t num_blob_output_files = 0;
+  uint64_t total_blob_bytes = 0;
+  uint64_t num_output_records = 0;
 
-  explicit CompactionState(Compaction* c)
-      : compaction(c),
-        total_bytes(0),
-        num_output_records(0) {}
-
-  size_t NumOutputFiles() {
-    size_t total = 0;
-    for (auto& s : sub_compact_states) {
-      total += s.outputs.size();
-    }
-    return total;
-  }
+  explicit CompactionState(Compaction* c) : compaction(c) {}
 
   Slice SmallestUserKey() {
     for (const auto& sub_compact_state : sub_compact_states) {
@@ -282,14 +267,30 @@ struct CompactionJob::CompactionState {
 };
 
 void CompactionJob::AggregateStatistics() {
+  assert(compact_);
+
   for (SubcompactionState& sc : compact_->sub_compact_states) {
+    auto& outputs = sc.outputs;
+
+    if (!outputs.empty() && !outputs.back().meta.fd.file_size) {
+      // An error occurred, so ignore the last output.
+      outputs.pop_back();
+    }
+
+    compact_->num_output_files += outputs.size();
     compact_->total_bytes += sc.total_bytes;
-    compact_->num_output_records += sc.num_output_records;
-  }
-  if (compaction_job_stats_) {
-    for (SubcompactionState& sc : compact_->sub_compact_states) {
-      compaction_job_stats_->Add(sc.compaction_job_stats);
+
+    const auto& blobs = sc.blob_file_additions;
+
+    compact_->num_blob_output_files += blobs.size();
+
+    for (const auto& blob : blobs) {
+      compact_->total_blob_bytes += blob.GetTotalBlobBytes();
     }
+
+    compact_->num_output_records += sc.num_output_records;
+
+    compaction_job_stats_->Add(sc.compaction_job_stats);
   }
 }
 
@@ -298,23 +299,29 @@ CompactionJob::CompactionJob(
     const FileOptions& file_options, VersionSet* versions,
     const std::atomic<bool>* shutting_down,
     const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
-    Directory* db_directory, Directory* output_directory, Statistics* stats,
+    FSDirectory* db_directory, FSDirectory* output_directory,
+    FSDirectory* blob_output_directory, Statistics* stats,
     InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
     std::vector<SequenceNumber> existing_snapshots,
     SequenceNumber earliest_write_conflict_snapshot,
     const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
     EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
     const std::string& dbname, CompactionJobStats* compaction_job_stats,
-    Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused)
+    Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
+    const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
+    const std::string& db_session_id, std::string full_history_ts_low)
     : job_id_(job_id),
       compact_(new CompactionState(compaction)),
       compaction_job_stats_(compaction_job_stats),
       compaction_stats_(compaction->compaction_reason(), 1),
       dbname_(dbname),
+      db_id_(db_id),
+      db_session_id_(db_session_id),
       db_options_(db_options),
       file_options_(file_options),
       env_(db_options.env),
-      fs_(db_options.fs.get()),
+      io_tracer_(io_tracer),
+      fs_(db_options.fs, io_tracer),
       file_options_for_read_(
           fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
       versions_(versions),
@@ -324,6 +331,7 @@ CompactionJob::CompactionJob(
       log_buffer_(log_buffer),
       db_directory_(db_directory),
       output_directory_(output_directory),
+      blob_output_directory_(blob_output_directory),
       stats_(stats),
       db_mutex_(db_mutex),
       db_error_handler_(db_error_handler),
@@ -336,7 +344,9 @@ CompactionJob::CompactionJob(
       paranoid_file_checks_(paranoid_file_checks),
       measure_io_stats_(measure_io_stats),
       write_hint_(Env::WLTH_NOT_SET),
-      thread_pri_(thread_pri) {
+      thread_pri_(thread_pri),
+      full_history_ts_low_(std::move(full_history_ts_low)) {
+  assert(compaction_job_stats_ != nullptr);
   assert(log_buffer_ != nullptr);
   const auto* cfd = compact_->compaction->column_family_data();
   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
@@ -388,10 +398,9 @@ void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
   // to ensure GetThreadList() can always show them all together.
   ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
 
-  if (compaction_job_stats_) {
-    compaction_job_stats_->is_manual_compaction =
-        compaction->is_manual_compaction();
-  }
+  compaction_job_stats_->is_manual_compaction =
+      compaction->is_manual_compaction();
+  compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
 }
 
 void CompactionJob::Prepare() {
@@ -423,7 +432,11 @@ void CompactionJob::Prepare() {
     RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
                       compact_->sub_compact_states.size());
   } else {
-    compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
+    constexpr Slice* start = nullptr;
+    constexpr Slice* end = nullptr;
+    constexpr uint64_t size = 0;
+
+    compact_->sub_compact_states.emplace_back(c, start, end, size);
   }
 }
 
@@ -542,7 +555,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
     // Greedily add ranges to the subcompaction until the sum of the ranges'
     // sizes becomes >= the expected mean size of a subcompaction
     sum = 0;
-    for (size_t i = 0; i < ranges.size() - 1; i++) {
+    for (size_t i = 0; i + 1 < ranges.size(); i++) {
       sum += ranges[i].size;
       if (subcompactions == 1) {
         // If there's only one left to schedule then it goes to the end so no
@@ -606,33 +619,58 @@ Status CompactionJob::Run() {
 
   // Check if any thread encountered an error during execution
   Status status;
+  IOStatus io_s;
+  bool wrote_new_blob_files = false;
+
   for (const auto& state : compact_->sub_compact_states) {
     if (!state.status.ok()) {
       status = state.status;
+      io_s = state.io_status;
       break;
     }
+
+    if (!state.blob_file_additions.empty()) {
+      wrote_new_blob_files = true;
+    }
   }
 
-  if (status.ok() && output_directory_) {
-    status = output_directory_->Fsync();
+  if (io_status_.ok()) {
+    io_status_ = io_s;
   }
+  if (status.ok()) {
+    constexpr IODebugContext* dbg = nullptr;
+
+    if (output_directory_) {
+      io_s = output_directory_->Fsync(IOOptions(), dbg);
+    }
 
+    if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
+        blob_output_directory_ != output_directory_) {
+      io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
+    }
+  }
+  if (io_status_.ok()) {
+    io_status_ = io_s;
+  }
+  if (status.ok()) {
+    status = io_s;
+  }
   if (status.ok()) {
     thread_pool.clear();
-    std::vector<const FileMetaData*> files_meta;
+    std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
     for (const auto& state : compact_->sub_compact_states) {
       for (const auto& output : state.outputs) {
-        files_meta.emplace_back(&output.meta);
+        files_output.emplace_back(&output);
       }
     }
     ColumnFamilyData* cfd = compact_->compaction->column_family_data();
     auto prefix_extractor =
         compact_->compaction->mutable_cf_options()->prefix_extractor.get();
-    std::atomic<size_t> next_file_meta_idx(0);
+    std::atomic<size_t> next_file_idx(0);
     auto verify_table = [&](Status& output_status) {
       while (true) {
-        size_t file_idx = next_file_meta_idx.fetch_add(1);
-        if (file_idx >= files_meta.size()) {
+        size_t file_idx = next_file_idx.fetch_add(1);
+        if (file_idx >= files_output.size()) {
           break;
         }
         // Verify that the table is usable
@@ -641,21 +679,40 @@ Status CompactionJob::Run() {
         // No matter whether use_direct_io_for_flush_and_compaction is true,
         // we will regard this verification as user reads since the goal is
         // to cache it here for further user reads
+        ReadOptions read_options;
         InternalIterator* iter = cfd->table_cache()->NewIterator(
-            ReadOptions(), file_options_, cfd->internal_comparator(),
-            *files_meta[file_idx], /*range_del_agg=*/nullptr, prefix_extractor,
+            read_options, file_options_, cfd->internal_comparator(),
+            files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
+            prefix_extractor,
             /*table_reader_ptr=*/nullptr,
             cfd->internal_stats()->GetFileReadHist(
                 compact_->compaction->output_level()),
             TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
             /*skip_filters=*/false, compact_->compaction->output_level(),
+            MaxFileSizeForL0MetaPin(
+                *compact_->compaction->mutable_cf_options()),
             /*smallest_compaction_key=*/nullptr,
-            /*largest_compaction_key=*/nullptr);
+            /*largest_compaction_key=*/nullptr,
+            /*allow_unprepared_value=*/false);
         auto s = iter->status();
 
         if (s.ok() && paranoid_file_checks_) {
-          for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
-          s = iter->status();
+          OutputValidator validator(cfd->internal_comparator(),
+                                    /*_enable_order_check=*/true,
+                                    /*_enable_hash=*/true);
+          for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+            s = validator.Add(iter->key(), iter->value());
+            if (!s.ok()) {
+              break;
+            }
+          }
+          if (s.ok()) {
+            s = iter->status();
+          }
+          if (s.ok() &&
+              !validator.CompareValidator(files_output[file_idx]->validator)) {
+            s = Status::Corruption("Paranoid checksums do not match");
+          }
         }
 
         delete iter;
@@ -696,6 +753,7 @@ Status CompactionJob::Run() {
   // Finish up all book-keeping to unify the subcompaction results
   AggregateStatistics();
   UpdateCompactionStats();
+
   RecordCompactionIOStats();
   LogFlush(db_options_.info_log);
   TEST_SYNC_POINT("CompactionJob::Run():End");
@@ -705,17 +763,26 @@ Status CompactionJob::Run() {
 }
 
 Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
+  assert(compact_);
+
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_COMPACTION_INSTALL);
   db_mutex_->AssertHeld();
   Status status = compact_->status;
+
   ColumnFamilyData* cfd = compact_->compaction->column_family_data();
+  assert(cfd);
+
   cfd->internal_stats()->AddCompactionStats(
       compact_->compaction->output_level(), thread_pri_, compaction_stats_);
 
   if (status.ok()) {
     status = InstallCompactionResults(mutable_cf_options);
   }
+  if (!versions_->io_status().ok()) {
+    io_status_ = versions_->io_status();
+  }
+
   VersionStorageInfo::LevelSummaryStorage tmp;
   auto vstorage = cfd->current()->storage_info();
   const auto& stats = compaction_stats_;
@@ -740,6 +807,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
         stats.bytes_written / static_cast<double>(stats.micros);
   }
 
+  const std::string& column_family_name = cfd->GetName();
+
   ROCKS_LOG_BUFFER(
       log_buffer_,
       "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
@@ -747,8 +816,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
       "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
       "write-amplify(%.1f) %s, records in: %" PRIu64
       ", records dropped: %" PRIu64 " output_compression: %s\n",
-      cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec,
-      bytes_written_per_sec, compact_->compaction->output_level(),
+      column_family_name.c_str(), vstorage->LevelSummary(&tmp),
+      bytes_read_per_sec, bytes_written_per_sec,
+      compact_->compaction->output_level(),
       stats.num_input_files_in_non_output_levels,
       stats.num_input_files_in_output_level, stats.num_output_files,
       stats.bytes_read_non_output_levels / 1048576.0,
@@ -759,6 +829,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
       CompressionTypeToString(compact_->compaction->output_compression())
           .c_str());
 
+  const auto& blob_files = vstorage->GetBlobFiles();
+  if (!blob_files.empty()) {
+    ROCKS_LOG_BUFFER(log_buffer_,
+                     "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
+                     "\n",
+                     column_family_name.c_str(), blob_files.begin()->first,
+                     blob_files.rbegin()->first);
+  }
+
   UpdateCompactionJobStats(stats);
 
   auto stream = event_logger_->LogToBuffer(log_buffer_);
@@ -767,21 +846,26 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
          << "compaction_time_micros" << stats.micros
          << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
          << compact_->compaction->output_level() << "num_output_files"
-         << compact_->NumOutputFiles() << "total_output_size"
-         << compact_->total_bytes << "num_input_records"
-         << stats.num_input_records << "num_output_records"
-         << compact_->num_output_records << "num_subcompactions"
-         << compact_->sub_compact_states.size() << "output_compression"
-         << CompressionTypeToString(compact_->compaction->output_compression());
+         << compact_->num_output_files << "total_output_size"
+         << compact_->total_bytes;
 
-  if (compaction_job_stats_ != nullptr) {
-    stream << "num_single_delete_mismatches"
-           << compaction_job_stats_->num_single_del_mismatch;
-    stream << "num_single_delete_fallthrough"
-           << compaction_job_stats_->num_single_del_fallthru;
+  if (compact_->num_blob_output_files > 0) {
+    stream << "num_blob_output_files" << compact_->num_blob_output_files
+           << "total_blob_output_size" << compact_->total_blob_bytes;
   }
 
-  if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
+  stream << "num_input_records" << stats.num_input_records
+         << "num_output_records" << compact_->num_output_records
+         << "num_subcompactions" << compact_->sub_compact_states.size()
+         << "output_compression"
+         << CompressionTypeToString(compact_->compaction->output_compression());
+
+  stream << "num_single_delete_mismatches"
+         << compaction_job_stats_->num_single_del_mismatch;
+  stream << "num_single_delete_fallthrough"
+         << compaction_job_stats_->num_single_del_fallthru;
+
+  if (measure_io_stats_) {
     stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
     stream << "file_range_sync_nanos"
            << compaction_job_stats_->file_range_sync_nanos;
@@ -797,12 +881,18 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
   }
   stream.EndArray();
 
+  if (!blob_files.empty()) {
+    stream << "blob_file_head" << blob_files.begin()->first;
+    stream << "blob_file_tail" << blob_files.rbegin()->first;
+  }
+
   CleanupCompaction();
   return status;
 }
 
 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
-  assert(sub_compact != nullptr);
+  assert(sub_compact);
+  assert(sub_compact->compaction);
 
   uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
 
@@ -827,11 +917,20 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
 
   CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
                                              existing_snapshots_);
+  ReadOptions read_options;
+  read_options.verify_checksums = true;
+  read_options.fill_cache = false;
+  // Compaction iterators shouldn't be confined to a single prefix.
+  // Compactions use Seek() for
+  // (a) concurrent compactions,
+  // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
+  read_options.total_order_seek = true;
 
   // Although the v2 aggregator is what the level iterator(s) know about,
   // the AddTombstones calls will be propagated down to the v1 aggregator.
-  std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
-      sub_compact->compaction, &range_del_agg, file_options_for_read_));
+  std::unique_ptr<InternalIterator> input(
+      versions_->MakeInputIterator(read_options, sub_compact->compaction,
+                                   &range_del_agg, file_options_for_read_));
 
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
@@ -864,11 +963,27 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
       snapshot_checker_, compact_->compaction->level(),
       db_options_.statistics.get());
 
+  const MutableCFOptions* mutable_cf_options =
+      sub_compact->compaction->mutable_cf_options();
+  assert(mutable_cf_options);
+
+  std::vector<std::string> blob_file_paths;
+
+  std::unique_ptr<BlobFileBuilder> blob_file_builder(
+      mutable_cf_options->enable_blob_files
+          ? new BlobFileBuilder(
+                versions_, env_, fs_.get(),
+                sub_compact->compaction->immutable_cf_options(),
+                mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
+                cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
+                &blob_file_paths, &sub_compact->blob_file_additions)
+          : nullptr);
+
   TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
   TEST_SYNC_POINT_CALLBACK(
       "CompactionJob::Run():PausingManualCompaction:1",
       reinterpret_cast<void*>(
-          const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
+          const_cast<std::atomic<int>*>(manual_compaction_paused_)));
 
   Slice* start = sub_compact->start;
   Slice* end = sub_compact->end;
@@ -881,13 +996,17 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   }
 
   Status status;
+  const std::string* const full_history_ts_low =
+      full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
   sub_compact->c_iter.reset(new CompactionIterator(
       input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
       &existing_snapshots_, earliest_write_conflict_snapshot_,
-      snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
-      &range_del_agg, sub_compact->compaction, compaction_filter,
-      shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_,
-      db_options_.info_log));
+      snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
+      /*expect_valid_internal_key=*/true, &range_del_agg,
+      blob_file_builder.get(), db_options_.allow_data_in_errors,
+      sub_compact->compaction, compaction_filter, shutting_down_,
+      preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
+      full_history_ts_low));
   auto c_iter = sub_compact->c_iter.get();
   c_iter->SeekToFirst();
   if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
@@ -899,6 +1018,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   }
   const auto& c_iter_stats = c_iter->iter_stats();
 
+  std::unique_ptr<SstPartitioner> partitioner =
+      sub_compact->compaction->output_level() == 0
+          ? nullptr
+          : sub_compact->compaction->CreateSstPartitioner();
+  std::string last_key_for_partitioner;
+
   while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
     // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
     // returns true.
@@ -925,10 +1050,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
         break;
       }
     }
-    assert(sub_compact->builder != nullptr);
-    assert(sub_compact->current_output() != nullptr);
-    sub_compact->builder->Add(key, value);
-    sub_compact->current_output_file_size = sub_compact->builder->FileSize();
+    status = sub_compact->AddToBuilder(key, value);
+    if (!status.ok()) {
+      break;
+    }
+
+    sub_compact->current_output_file_size =
+        sub_compact->builder->EstimatedFileSize();
     const ParsedInternalKey& ikey = c_iter->ikey();
     sub_compact->current_output()->meta.UpdateBoundaries(
         key, value, ikey.sequence, ikey.type);
@@ -943,33 +1071,39 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
     // and 0.6MB instead of 1MB and 0.2MB)
     bool output_file_ended = false;
-    Status input_status;
     if (sub_compact->compaction->output_level() != 0 &&
         sub_compact->current_output_file_size >=
             sub_compact->compaction->max_output_file_size()) {
       // (1) this key terminates the file. For historical reasons, the iterator
       // status before advancing will be given to FinishCompactionOutputFile().
-      input_status = input->status();
       output_file_ended = true;
     }
     TEST_SYNC_POINT_CALLBACK(
         "CompactionJob::Run():PausingManualCompaction:2",
         reinterpret_cast<void*>(
-            const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
+            const_cast<std::atomic<int>*>(manual_compaction_paused_)));
+    if (partitioner.get()) {
+      last_key_for_partitioner.assign(c_iter->user_key().data_,
+                                      c_iter->user_key().size_);
+    }
     c_iter->Next();
     if (c_iter->status().IsManualCompactionPaused()) {
       break;
     }
-    if (!output_file_ended && c_iter->Valid() &&
-        sub_compact->compaction->output_level() != 0 &&
-        sub_compact->ShouldStopBefore(c_iter->key(),
-                                      sub_compact->current_output_file_size) &&
-        sub_compact->builder != nullptr) {
-      // (2) this key belongs to the next file. For historical reasons, the
-      // iterator status after advancing will be given to
-      // FinishCompactionOutputFile().
-      input_status = input->status();
-      output_file_ended = true;
+    if (!output_file_ended && c_iter->Valid()) {
+      if (((partitioner.get() &&
+            partitioner->ShouldPartition(PartitionerRequest(
+                last_key_for_partitioner, c_iter->user_key(),
+                sub_compact->current_output_file_size)) == kRequired) ||
+           (sub_compact->compaction->output_level() != 0 &&
+            sub_compact->ShouldStopBefore(
+                c_iter->key(), sub_compact->current_output_file_size))) &&
+          sub_compact->builder != nullptr) {
+        // (2) this key belongs to the next file. For historical reasons, the
+        // iterator status after advancing will be given to
+        // FinishCompactionOutputFile().
+        output_file_ended = true;
+      }
     }
     if (output_file_ended) {
       const Slice* next_key = nullptr;
@@ -977,9 +1111,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
         next_key = &c_iter->key();
       }
       CompactionIterationStats range_del_out_stats;
-      status =
-          FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
-                                     &range_del_out_stats, next_key);
+      status = FinishCompactionOutputFile(input->status(), sub_compact,
+                                          &range_del_agg, &range_del_out_stats,
+                                          next_key);
       RecordDroppedKeys(range_del_out_stats,
                         &sub_compact->compaction_job_stats);
     }
@@ -1013,7 +1147,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
   }
   if ((status.ok() || status.IsColumnFamilyDropped()) &&
       (manual_compaction_paused_ &&
-       manual_compaction_paused_->load(std::memory_order_relaxed))) {
+       manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
     status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
   }
   if (status.ok()) {
@@ -1035,12 +1169,20 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     CompactionIterationStats range_del_out_stats;
     Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
                                           &range_del_out_stats);
-    if (status.ok()) {
+    if (!s.ok() && status.ok()) {
       status = s;
     }
     RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
   }
 
+  if (blob_file_builder) {
+    if (status.ok()) {
+      status = blob_file_builder->Finish();
+    }
+
+    blob_file_builder.reset();
+  }
+
   sub_compact->compaction_job_stats.cpu_micros =
       env_->NowCPUNanos() / 1000 - prev_cpu_micros;
 
@@ -1061,6 +1203,16 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
       SetPerfLevel(prev_perf_level);
     }
   }
+#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
+  if (!status.ok()) {
+    if (sub_compact->c_iter) {
+      sub_compact->c_iter->status().PermitUncheckedError();
+    }
+    if (input) {
+      input->status().PermitUncheckedError();
+    }
+  }
+#endif  // ROCKSDB_ASSERT_STATUS_CHECKED
 
   sub_compact->c_iter.reset();
   input.reset();
@@ -1121,6 +1273,8 @@ Status CompactionJob::FinishCompactionOutputFile(
 
   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
   const Comparator* ucmp = cfd->user_comparator();
+  std::string file_checksum = kUnknownFileChecksum;
+  std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
 
   // Check for iterator errors
   Status s = input_status;
@@ -1194,6 +1348,7 @@ Status CompactionJob::FinishCompactionOutputFile(
     } else {
       it->SeekToFirst();
     }
+    TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
     for (; it->Valid(); it->Next()) {
       auto tombstone = it->Tombstone();
       if (upper_bound != nullptr) {
@@ -1221,6 +1376,7 @@ Status CompactionJob::FinishCompactionOutputFile(
       auto kv = tombstone.Serialize();
       assert(lower_bound == nullptr ||
              ucmp->Compare(*lower_bound, kv.second) < 0);
+      // Range tombstone is not supported by output validator yet.
       sub_compact->builder->Add(kv.first.Encode(), kv.second);
       InternalKey smallest_candidate = std::move(kv.first);
       if (lower_bound != nullptr &&
@@ -1286,7 +1442,6 @@ Status CompactionJob::FinishCompactionOutputFile(
              ExtractInternalKeyFooter(meta->smallest.Encode()) !=
                  PackSequenceAndType(0, kTypeRangeDeletion));
     }
-    meta->marked_for_compaction = sub_compact->builder->NeedCompact();
   }
   const uint64_t current_entries = sub_compact->builder->NumEntries();
   if (s.ok()) {
@@ -1294,14 +1449,14 @@ Status CompactionJob::FinishCompactionOutputFile(
   } else {
     sub_compact->builder->Abandon();
   }
+  IOStatus io_s = sub_compact->builder->io_status();
+  if (s.ok()) {
+    s = io_s;
+  }
   const uint64_t current_bytes = sub_compact->builder->FileSize();
   if (s.ok()) {
-    // Add the checksum information to file metadata.
-    meta->file_checksum = sub_compact->builder->GetFileChecksum();
-    meta->file_checksum_func_name =
-        sub_compact->builder->GetFileChecksumFuncName();
-
     meta->fd.file_size = current_bytes;
+    meta->marked_for_compaction = sub_compact->builder->NeedCompact();
   }
   sub_compact->current_output()->finished = true;
   sub_compact->total_bytes += current_bytes;
@@ -1309,10 +1464,27 @@ Status CompactionJob::FinishCompactionOutputFile(
   // Finish and check for file errors
   if (s.ok()) {
     StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
-    s = sub_compact->outfile->Sync(db_options_.use_fsync);
+    io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
+  }
+  if (s.ok() && io_s.ok()) {
+    io_s = sub_compact->outfile->Close();
+  }
+  if (s.ok() && io_s.ok()) {
+    // Add the checksum information to file metadata.
+    meta->file_checksum = sub_compact->outfile->GetFileChecksum();
+    meta->file_checksum_func_name =
+        sub_compact->outfile->GetFileChecksumFuncName();
+    file_checksum = meta->file_checksum;
+    file_checksum_func_name = meta->file_checksum_func_name;
   }
   if (s.ok()) {
-    s = sub_compact->outfile->Close();
+    s = io_s;
+  }
+  if (sub_compact->io_status.ok()) {
+    sub_compact->io_status = io_s;
+    // Since this error is really a copy of the
+    // "normal" status, it does not also need to be checked
+    sub_compact->io_status.PermitUncheckedError();
   }
   sub_compact->outfile.reset();
 
@@ -1363,14 +1535,18 @@ Status CompactionJob::FinishCompactionOutputFile(
   EventHelpers::LogAndNotifyTableFileCreationFinished(
       event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
       job_id_, output_fd, oldest_blob_file_number, tp,
-      TableFileCreationReason::kCompaction, s);
+      TableFileCreationReason::kCompaction, s, file_checksum,
+      file_checksum_func_name);
 
 #ifndef ROCKSDB_LITE
   // Report new file to SstFileManagerImpl
   auto sfm =
       static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
   if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
-    sfm->OnAddFile(fname);
+    Status add_s = sfm->OnAddFile(fname);
+    if (!add_s.ok() && s.ok()) {
+      s = add_s;
+    }
     if (sfm->IsMaxAllowedSpaceReached()) {
       // TODO(ajkr): should we return OK() if max space was reached by the final
       // compaction output file (similarly to how flush works when full)?
@@ -1379,7 +1555,9 @@ Status CompactionJob::FinishCompactionOutputFile(
           "CompactionJob::FinishCompactionOutputFile:"
           "MaxAllowedSpaceReached");
       InstrumentedMutexLock l(db_mutex_);
-      db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
+      // Should handle return error?
+      db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction)
+          .PermitUncheckedError();
     }
   }
 #endif
@@ -1391,9 +1569,13 @@ Status CompactionJob::FinishCompactionOutputFile(
 
 Status CompactionJob::InstallCompactionResults(
     const MutableCFOptions& mutable_cf_options) {
+  assert(compact_);
+
   db_mutex_->AssertHeld();
 
   auto* compaction = compact_->compaction;
+  assert(compaction);
+
   // paranoia: verify that the files that we started with
   // still exist in the current version and in the same original level.
   // This ensures that a concurrent compaction did not erroneously
@@ -1409,31 +1591,52 @@ Status CompactionJob::InstallCompactionResults(
 
   {
     Compaction::InputLevelSummaryBuffer inputs_summary;
-    ROCKS_LOG_INFO(
-        db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
-        compaction->column_family_data()->GetName().c_str(), job_id_,
-        compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
+    ROCKS_LOG_INFO(db_options_.info_log,
+                   "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
+                   compaction->column_family_data()->GetName().c_str(), job_id_,
+                   compaction->InputLevelSummary(&inputs_summary),
+                   compact_->total_bytes + compact_->total_blob_bytes);
   }
 
+  VersionEdit* const edit = compaction->edit();
+  assert(edit);
+
   // Add compaction inputs
-  compaction->AddInputDeletions(compact_->compaction->edit());
+  compaction->AddInputDeletions(edit);
 
   for (const auto& sub_compact : compact_->sub_compact_states) {
     for (const auto& out : sub_compact.outputs) {
-      compaction->edit()->AddFile(compaction->output_level(), out.meta);
+      edit->AddFile(compaction->output_level(), out.meta);
+    }
+
+    for (const auto& blob : sub_compact.blob_file_additions) {
+      edit->AddBlobFile(blob);
     }
   }
+
   return versions_->LogAndApply(compaction->column_family_data(),
-                                mutable_cf_options, compaction->edit(),
-                                db_mutex_, db_directory_);
+                                mutable_cf_options, edit, db_mutex_,
+                                db_directory_);
 }
 
 void CompactionJob::RecordCompactionIOStats() {
   RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
+  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
+  CompactionReason compaction_reason =
+      compact_->compaction->compaction_reason();
+  if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
+    RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
+    RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
+  } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
+    RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
+    RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
+  } else if (compaction_reason == CompactionReason::kTtl) {
+    RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
+    RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
+  }
   ThreadStatusUtil::IncreaseThreadOperationProperty(
       ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
   IOSTATS_RESET(bytes_read);
-  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
   ThreadStatusUtil::IncreaseThreadOperationProperty(
       ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
   IOSTATS_RESET(bytes_written);
@@ -1462,7 +1665,16 @@ Status CompactionJob::OpenCompactionOutputFile(
   TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
                            &syncpoint_arg);
 #endif
-  Status s = NewWritableFile(fs_, fname, &writable_file, file_options_);
+  Status s;
+  IOStatus io_s =
+      NewWritableFile(fs_.get(), fname, &writable_file, file_options_);
+  s = io_s;
+  if (sub_compact->io_status.ok()) {
+    sub_compact->io_status = io_s;
+    // Since this error is really a copy of the io_s that is checked below as s,
+    // it does not also need to be checked.
+    sub_compact->io_status.PermitUncheckedError();
+  }
   if (!s.ok()) {
     ROCKS_LOG_ERROR(
         db_options_.info_log,
@@ -1474,7 +1686,8 @@ Status CompactionJob::OpenCompactionOutputFile(
     EventHelpers::LogAndNotifyTableFileCreationFinished(
         event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
         fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
-        TableProperties(), TableFileCreationReason::kCompaction, s);
+        TableProperties(), TableFileCreationReason::kCompaction, s,
+        kUnknownFileChecksum, kUnknownFileChecksumFuncName);
     return s;
   }
 
@@ -1496,13 +1709,17 @@ Status CompactionJob::OpenCompactionOutputFile(
 
   // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
   {
-    SubcompactionState::Output out;
-    out.meta.fd = FileDescriptor(file_number,
-                                 sub_compact->compaction->output_path_id(), 0);
-    out.meta.oldest_ancester_time = oldest_ancester_time;
-    out.meta.file_creation_time = current_time;
-    out.finished = false;
-    sub_compact->outputs.push_back(out);
+    FileMetaData meta;
+    meta.fd = FileDescriptor(file_number,
+                             sub_compact->compaction->output_path_id(), 0);
+    meta.oldest_ancester_time = oldest_ancester_time;
+    meta.file_creation_time = current_time;
+    sub_compact->outputs.emplace_back(
+        std::move(meta), cfd->internal_comparator(),
+        /*enable_order_check=*/
+        sub_compact->compaction->mutable_cf_options()
+            ->check_flush_compaction_key_order,
+        /*enable_hash=*/paranoid_file_checks_);
   }
 
   writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
@@ -1511,10 +1728,10 @@ Status CompactionJob::OpenCompactionOutputFile(
       sub_compact->compaction->OutputFilePreallocationSize()));
   const auto& listeners =
       sub_compact->compaction->immutable_cf_options()->listeners;
-  sub_compact->outfile.reset(
-      new WritableFileWriter(std::move(writable_file), fname, file_options_,
-                             env_, db_options_.statistics.get(), listeners,
-                             db_options_.sst_file_checksum_func.get()));
+  sub_compact->outfile.reset(new WritableFileWriter(
+      std::move(writable_file), fname, file_options_, env_, io_tracer_,
+      db_options_.statistics.get(), listeners,
+      db_options_.file_checksum_gen_factory.get()));
 
   // If the Column family flag is to only optimize filters for hits,
   // we can skip creating filters if this is the bottommost_level where
@@ -1531,7 +1748,8 @@ Status CompactionJob::OpenCompactionOutputFile(
       sub_compact->compaction->output_compression_opts(),
       sub_compact->compaction->output_level(), skip_filters,
       oldest_ancester_time, 0 /* oldest_key_time */,
-      sub_compact->compaction->max_output_file_size(), current_time));
+      sub_compact->compaction->max_output_file_size(), current_time, db_id_,
+      db_session_id_));
   LogFlush(db_options_.info_log);
   return s;
 }
@@ -1554,6 +1772,9 @@ void CompactionJob::CleanupCompaction() {
         TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
       }
     }
+    // TODO: sub_compact.io_status is not checked like status. Not sure if thats
+    // intentional. So ignoring the io_status as of now.
+    sub_compact.io_status.PermitUncheckedError();
   }
   delete compact_;
   compact_ = nullptr;
@@ -1571,6 +1792,8 @@ void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
 #endif  // !ROCKSDB_LITE
 
 void CompactionJob::UpdateCompactionStats() {
+  assert(compact_);
+
   Compaction* compaction = compact_->compaction;
   compaction_stats_.num_input_files_in_non_output_levels = 0;
   compaction_stats_.num_input_files_in_output_level = 0;
@@ -1588,27 +1811,15 @@ void CompactionJob::UpdateCompactionStats() {
     }
   }
 
-  uint64_t num_output_records = 0;
-
-  for (const auto& sub_compact : compact_->sub_compact_states) {
-    size_t num_output_files = sub_compact.outputs.size();
-    if (sub_compact.builder != nullptr) {
-      // An error occurred so ignore the last output.
-      assert(num_output_files > 0);
-      --num_output_files;
-    }
-    compaction_stats_.num_output_files += static_cast<int>(num_output_files);
-
-    num_output_records += sub_compact.num_output_records;
+  compaction_stats_.num_output_files =
+      static_cast<int>(compact_->num_output_files) +
+      static_cast<int>(compact_->num_blob_output_files);
+  compaction_stats_.bytes_written =
+      compact_->total_bytes + compact_->total_blob_bytes;
 
-    for (const auto& out : sub_compact.outputs) {
-      compaction_stats_.bytes_written += out.meta.fd.file_size;
-    }
-  }
-
-  if (compaction_stats_.num_input_records > num_output_records) {
+  if (compaction_stats_.num_input_records > compact_->num_output_records) {
     compaction_stats_.num_dropped_records =
-        compaction_stats_.num_input_records - num_output_records;
+        compaction_stats_.num_input_records - compact_->num_output_records;
   }
 }
 
@@ -1630,32 +1841,29 @@ void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
 void CompactionJob::UpdateCompactionJobStats(
     const InternalStats::CompactionStats& stats) const {
 #ifndef ROCKSDB_LITE
-  if (compaction_job_stats_) {
-    compaction_job_stats_->elapsed_micros = stats.micros;
-
-    // input information
-    compaction_job_stats_->total_input_bytes =
-        stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
-    compaction_job_stats_->num_input_records = stats.num_input_records;
-    compaction_job_stats_->num_input_files =
-        stats.num_input_files_in_non_output_levels +
-        stats.num_input_files_in_output_level;
-    compaction_job_stats_->num_input_files_at_output_level =
-        stats.num_input_files_in_output_level;
-
-    // output information
-    compaction_job_stats_->total_output_bytes = stats.bytes_written;
-    compaction_job_stats_->num_output_records = compact_->num_output_records;
-    compaction_job_stats_->num_output_files = stats.num_output_files;
-
-    if (compact_->NumOutputFiles() > 0U) {
-      CopyPrefix(compact_->SmallestUserKey(),
-                 CompactionJobStats::kMaxPrefixLength,
-                 &compaction_job_stats_->smallest_output_key_prefix);
-      CopyPrefix(compact_->LargestUserKey(),
-                 CompactionJobStats::kMaxPrefixLength,
-                 &compaction_job_stats_->largest_output_key_prefix);
-    }
+  compaction_job_stats_->elapsed_micros = stats.micros;
+
+  // input information
+  compaction_job_stats_->total_input_bytes =
+      stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
+  compaction_job_stats_->num_input_records = stats.num_input_records;
+  compaction_job_stats_->num_input_files =
+      stats.num_input_files_in_non_output_levels +
+      stats.num_input_files_in_output_level;
+  compaction_job_stats_->num_input_files_at_output_level =
+      stats.num_input_files_in_output_level;
+
+  // output information
+  compaction_job_stats_->total_output_bytes = stats.bytes_written;
+  compaction_job_stats_->num_output_records = compact_->num_output_records;
+  compaction_job_stats_->num_output_files = stats.num_output_files;
+
+  if (stats.num_output_files > 0) {
+    CopyPrefix(compact_->SmallestUserKey(),
+               CompactionJobStats::kMaxPrefixLength,
+               &compaction_job_stats_->smallest_output_key_prefix);
+    CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
+               &compaction_job_stats_->largest_output_key_prefix);
   }
 #else
   (void)stats;