]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_impl_compaction_flush.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_impl_compaction_flush.cc
index dd3e4a7d410ae71771f6826746b9797393028288..eef8cf98de641fb60bc16fe95cb0021c7fee9f44 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
@@ -14,6 +14,8 @@
 #include <inttypes.h>
 
 #include "db/builder.h"
+#include "db/error_handler.h"
+#include "db/event_helpers.h"
 #include "monitoring/iostats_context_imp.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/thread_status_updater.h"
 #include "util/sync_point.h"
 
 namespace rocksdb {
+
+bool DBImpl::EnoughRoomForCompaction(
+    ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
+    bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
+  // Check if we have enough room to do the compaction
+  bool enough_room = true;
+#ifndef ROCKSDB_LITE
+  auto sfm = static_cast<SstFileManagerImpl*>(
+      immutable_db_options_.sst_file_manager.get());
+  if (sfm) {
+    // Pass the current bg_error_ to SFM so it can decide what checks to
+    // perform. If this DB instance hasn't seen any error yet, the SFM can be
+    // optimistic and not do disk space checks
+    enough_room =
+        sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
+    if (enough_room) {
+      *sfm_reserved_compact_space = true;
+    }
+  }
+#else
+  (void)cfd;
+  (void)inputs;
+  (void)sfm_reserved_compact_space;
+#endif  // ROCKSDB_LITE
+  if (!enough_room) {
+    // Just in case tests want to change the value of enough_room
+    TEST_SYNC_POINT_CALLBACK(
+        "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
+    ROCKS_LOG_BUFFER(log_buffer,
+                     "Cancelled compaction because not enough room");
+    RecordTick(stats_, COMPACTION_CANCELLED, 1);
+  }
+  return enough_room;
+}
+
 Status DBImpl::SyncClosedLogs(JobContext* job_context) {
   TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
   mutex_.AssertHeld();
@@ -48,6 +85,9 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
                      "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
                      log->get_log_number());
       s = log->file()->Sync(immutable_db_options_.use_fsync);
+      if (!s.ok()) {
+        break;
+      }
     }
     if (s.ok()) {
       s = directories_.GetWalDir()->Fsync();
@@ -59,7 +99,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
     // "number < current_log_number".
     MarkLogsSynced(current_log_number - 1, true, s);
     if (!s.ok()) {
-      bg_error_ = s;
+      error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
       TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
       return s;
     }
@@ -69,7 +109,8 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
 
 Status DBImpl::FlushMemTableToOutputFile(
     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
-    bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
+    bool* made_progress, JobContext* job_context,
+    SuperVersionContext* superversion_context, LogBuffer* log_buffer) {
   mutex_.AssertHeld();
   assert(cfd->imm()->NumNotFlushed() != 0);
   assert(cfd->imm()->IsFlushPending());
@@ -78,11 +119,15 @@ Status DBImpl::FlushMemTableToOutputFile(
   std::vector<SequenceNumber> snapshot_seqs =
       snapshots_.GetAll(&earliest_write_conflict_snapshot);
 
+  auto snapshot_checker = snapshot_checker_.get();
+  if (use_custom_gc_ && snapshot_checker == nullptr) {
+    snapshot_checker = DisableGCSnapshotChecker::Instance();
+  }
   FlushJob flush_job(
-      dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_,
-      versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
-      earliest_write_conflict_snapshot, job_context, log_buffer,
-      directories_.GetDbDir(), directories_.GetDataDir(0U),
+      dbname_, cfd, immutable_db_options_, mutable_cf_options,
+      env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
+      snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
+      job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
       GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
       &event_logger_, mutable_cf_options.report_bg_io_stats);
 
@@ -115,14 +160,14 @@ Status DBImpl::FlushMemTableToOutputFile(
   // and EventListener callback will be called when the db_mutex
   // is unlocked by the current thread.
   if (s.ok()) {
-    s = flush_job.Run(&file_meta);
+    s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
   } else {
     flush_job.Cancel();
   }
 
   if (s.ok()) {
-    InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
-                                              mutable_cf_options);
+    InstallSuperVersionAndScheduleWork(cfd, superversion_context,
+                                       mutable_cf_options);
     if (made_progress) {
       *made_progress = 1;
     }
@@ -132,11 +177,9 @@ Status DBImpl::FlushMemTableToOutputFile(
                      cfd->current()->storage_info()->LevelSummary(&tmp));
   }
 
-  if (!s.ok() && !s.IsShutdownInProgress() &&
-      immutable_db_options_.paranoid_checks && bg_error_.ok()) {
-    // if a bad error happened (not ShutdownInProgress) and paranoid_checks is
-    // true, mark DB read-only
-    bg_error_ = s;
+  if (!s.ok() && !s.IsShutdownInProgress()) {
+    Status new_bg_error = s;
+    error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
   }
   if (s.ok()) {
 #ifndef ROCKSDB_LITE
@@ -148,13 +191,14 @@ Status DBImpl::FlushMemTableToOutputFile(
     if (sfm) {
       // Notify sst_file_manager that a new file was added
       std::string file_path = MakeTableFileName(
-          immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber());
+          cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
       sfm->OnAddFile(file_path);
-      if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) {
-        bg_error_ = Status::IOError("Max allowed space was reached");
+      if (sfm->IsMaxAllowedSpaceReached()) {
+        Status new_bg_error = Status::SpaceLimit("Max allowed space was reached");
         TEST_SYNC_POINT_CALLBACK(
             "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
-            &bg_error_);
+            &new_bg_error);
+        error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
       }
     }
 #endif  // ROCKSDB_LITE
@@ -162,6 +206,25 @@ Status DBImpl::FlushMemTableToOutputFile(
   return s;
 }
 
+Status DBImpl::FlushMemTablesToOutputFiles(
+    const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
+    JobContext* job_context, LogBuffer* log_buffer) {
+  Status s;
+  for (auto& arg : bg_flush_args) {
+    ColumnFamilyData* cfd = arg.cfd_;
+    const MutableCFOptions& mutable_cf_options =
+        *cfd->GetLatestMutableCFOptions();
+    SuperVersionContext* superversion_context = arg.superversion_context_;
+    s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
+                                  job_context, superversion_context,
+                                  log_buffer);
+    if (!s.ok()) {
+      break;
+    }
+  }
+  return s;
+}
+
 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
                                 const MutableCFOptions& mutable_cf_options,
                                 int job_id, TableProperties prop) {
@@ -186,15 +249,16 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
     info.cf_name = cfd->GetName();
     // TODO(yhchiang): make db_paths dynamic in case flush does not
     //                 go to L0 in the future.
-    info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
+    info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
                                        file_meta->fd.GetNumber());
     info.thread_id = env_->GetThreadID();
     info.job_id = job_id;
     info.triggered_writes_slowdown = triggered_writes_slowdown;
     info.triggered_writes_stop = triggered_writes_stop;
-    info.smallest_seqno = file_meta->smallest_seqno;
-    info.largest_seqno = file_meta->largest_seqno;
+    info.smallest_seqno = file_meta->fd.smallest_seqno;
+    info.largest_seqno = file_meta->fd.largest_seqno;
     info.table_properties = prop;
+    info.flush_reason = cfd->GetFlushReason();
     for (auto listener : immutable_db_options_.listeners) {
       listener->OnFlushBegin(this, info);
     }
@@ -202,6 +266,12 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
   mutex_.Lock();
 // no need to signal bg_cv_ as it will be signaled at the end of the
 // flush process.
+#else
+  (void)cfd;
+  (void)file_meta;
+  (void)mutable_cf_options;
+  (void)job_id;
+  (void)prop;
 #endif  // ROCKSDB_LITE
 }
 
@@ -230,15 +300,16 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
     info.cf_name = cfd->GetName();
     // TODO(yhchiang): make db_paths dynamic in case flush does not
     //                 go to L0 in the future.
-    info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
+    info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
                                        file_meta->fd.GetNumber());
     info.thread_id = env_->GetThreadID();
     info.job_id = job_id;
     info.triggered_writes_slowdown = triggered_writes_slowdown;
     info.triggered_writes_stop = triggered_writes_stop;
-    info.smallest_seqno = file_meta->smallest_seqno;
-    info.largest_seqno = file_meta->largest_seqno;
+    info.smallest_seqno = file_meta->fd.smallest_seqno;
+    info.largest_seqno = file_meta->fd.largest_seqno;
     info.table_properties = prop;
+    info.flush_reason = cfd->GetFlushReason();
     for (auto listener : immutable_db_options_.listeners) {
       listener->OnFlushCompleted(this, info);
     }
@@ -246,24 +317,48 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
   mutex_.Lock();
   // no need to signal bg_cv_ as it will be signaled at the end of the
   // flush process.
+#else
+  (void)cfd;
+  (void)file_meta;
+  (void)mutable_cf_options;
+  (void)job_id;
+  (void)prop;
 #endif  // ROCKSDB_LITE
 }
 
 Status DBImpl::CompactRange(const CompactRangeOptions& options,
                             ColumnFamilyHandle* column_family,
                             const Slice* begin, const Slice* end) {
-  if (options.target_path_id >= immutable_db_options_.db_paths.size()) {
+  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  auto cfd = cfh->cfd();
+
+  if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
     return Status::InvalidArgument("Invalid target path ID");
   }
 
-  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
-  auto cfd = cfh->cfd();
   bool exclusive = options.exclusive_manual_compaction;
 
-  Status s = FlushMemTable(cfd, FlushOptions());
-  if (!s.ok()) {
-    LogFlush(immutable_db_options_.info_log);
-    return s;
+  bool flush_needed = true;
+  if (begin != nullptr && end != nullptr) {
+    // TODO(ajkr): We could also optimize away the flush in certain cases where
+    // one/both sides of the interval are unbounded. But it requires more
+    // changes to RangesOverlapWithMemtables.
+    Range range(*begin, *end);
+    SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+    cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
+    CleanupSuperVersion(super_version);
+  }
+
+  Status s;
+  if (flush_needed) {
+    FlushOptions fo;
+    fo.allow_write_stall = options.allow_write_stall;
+    s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
+      false /* writes_stopped*/);
+    if (!s.ok()) {
+      LogFlush(immutable_db_options_.info_log);
+      return s;
+    }
   }
 
   int max_level_with_files = 0;
@@ -282,10 +377,14 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
   if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
       cfd->NumberLevels() > 1) {
     // Always compact all files together.
-    s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
-                            cfd->NumberLevels() - 1, options.target_path_id,
-                            begin, end, exclusive);
     final_output_level = cfd->NumberLevels() - 1;
+    // if bottom most level is reserved
+    if (immutable_db_options_.allow_ingest_behind) {
+      final_output_level--;
+    }
+    s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
+                            final_output_level, options.target_path_id,
+                            options.max_subcompactions, begin, end, exclusive);
   } else {
     for (int level = 0; level <= max_level_with_files; level++) {
       int output_level;
@@ -319,7 +418,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
         }
       }
       s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
-                              begin, end, exclusive);
+                              options.max_subcompactions, begin, end, exclusive);
       if (!s.ok()) {
         break;
       }
@@ -358,13 +457,19 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
   return s;
 }
 
-Status DBImpl::CompactFiles(
-    const CompactionOptions& compact_options,
-    ColumnFamilyHandle* column_family,
-    const std::vector<std::string>& input_file_names,
-    const int output_level, const int output_path_id) {
+Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
+                            ColumnFamilyHandle* column_family,
+                            const std::vector<std::string>& input_file_names,
+                            const int output_level, const int output_path_id,
+                            std::vector<std::string>* const output_file_names) {
 #ifdef ROCKSDB_LITE
-    // not supported in lite version
+  (void)compact_options;
+  (void)column_family;
+  (void)input_file_names;
+  (void)output_level;
+  (void)output_path_id;
+  (void)output_file_names;
+  // not supported in lite version
   return Status::NotSupported("Not supported in ROCKSDB LITE");
 #else
   if (column_family == nullptr) {
@@ -388,9 +493,9 @@ Status DBImpl::CompactFiles(
     // IngestExternalFile() calls to finish.
     WaitForIngestFile();
 
-    s = CompactFilesImpl(compact_options, cfd, sv->current,
-                         input_file_names, output_level,
-                         output_path_id, &job_context, &log_buffer);
+    s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names,
+                         output_file_names, output_level, output_path_id,
+                         &job_context, &log_buffer);
   }
   if (sv->Unref()) {
     mutex_.Lock();
@@ -410,7 +515,8 @@ Status DBImpl::CompactFiles(
   }  // release the mutex
 
   // delete unnecessary files if any, this is done outside the mutex
-  if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+  if (job_context.HaveSomethingToClean() ||
+      job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
     // Have to flush the info logs before bg_compaction_scheduled_--
     // because if bg_flush_scheduled_ becomes 0 and the lock is
     // released, the deconstructor of DB can kick in and destroy all the
@@ -432,8 +538,8 @@ Status DBImpl::CompactFiles(
 Status DBImpl::CompactFilesImpl(
     const CompactionOptions& compact_options, ColumnFamilyData* cfd,
     Version* version, const std::vector<std::string>& input_file_names,
-    const int output_level, int output_path_id, JobContext* job_context,
-    LogBuffer* log_buffer) {
+    std::vector<std::string>* const output_file_names, const int output_level,
+    int output_path_id, JobContext* job_context, LogBuffer* log_buffer) {
   mutex_.AssertHeld();
 
   if (shutting_down_.load(std::memory_order_acquire)) {
@@ -451,7 +557,7 @@ Status DBImpl::CompactFilesImpl(
   version->GetColumnFamilyMetaData(&cf_meta);
 
   if (output_path_id < 0) {
-    if (immutable_db_options_.db_paths.size() == 1U) {
+    if (cfd->ioptions()->cf_paths.size() == 1U) {
       output_path_id = 0;
     } else {
       return Status::NotSupported(
@@ -480,6 +586,16 @@ Status DBImpl::CompactFilesImpl(
           "files are already being compacted");
     }
   }
+  bool sfm_reserved_compact_space = false;
+  // First check if we have enough room to do the compaction
+  bool enough_room = EnoughRoomForCompaction(
+      cfd, input_files, &sfm_reserved_compact_space, log_buffer);
+
+  if (!enough_room) {
+    // m's vars will get set properly at the end of this function,
+    // as long as status == CompactionTooLarge
+    return Status::CompactionTooLarge();
+  }
 
   // At this point, CompactFiles will be run.
   bg_compaction_scheduled_++;
@@ -489,9 +605,10 @@ Status DBImpl::CompactFilesImpl(
   c.reset(cfd->compaction_picker()->CompactFiles(
       compact_options, input_files, output_level, version->storage_info(),
       *cfd->GetLatestMutableCFOptions(), output_path_id));
-  if (!c) {
-    return Status::Aborted("Another Level 0 compaction is running");
-  }
+  // we already sanitized the set of input files and checked for conflicts
+  // without releasing the lock, so we're guaranteed a compaction can be formed.
+  assert(c != nullptr);
+
   c->SetInputVersion(version);
   // deletion compaction currently not allowed in CompactFiles.
   assert(!c->deletion_compaction());
@@ -503,13 +620,19 @@ Status DBImpl::CompactFilesImpl(
   auto pending_outputs_inserted_elem =
       CaptureCurrentFileNumberInPendingOutputs();
 
+  auto snapshot_checker = snapshot_checker_.get();
+  if (use_custom_gc_ && snapshot_checker == nullptr) {
+    snapshot_checker = DisableGCSnapshotChecker::Instance();
+  }
   assert(is_snapshot_supported_ || snapshots_.empty());
   CompactionJob compaction_job(
-      job_context->job_id, c.get(), immutable_db_options_, env_options_,
-      versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
-      directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
-      snapshot_seqs, earliest_write_conflict_snapshot, table_cache_,
-      &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
+      job_context->job_id, c.get(), immutable_db_options_,
+      env_options_for_compaction_, versions_.get(), &shutting_down_,
+      preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
+      GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
+      &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
+      snapshot_checker, table_cache_, &event_logger_,
+      c->mutable_cf_options()->paranoid_file_checks,
       c->mutable_cf_options()->report_bg_io_stats, dbname_,
       nullptr);  // Here we pass a nullptr for CompactionJobStats because
                  // CompactFiles does not trigger OnCompactionCompleted(),
@@ -544,10 +667,19 @@ Status DBImpl::CompactFilesImpl(
 
   Status status = compaction_job.Install(*c->mutable_cf_options());
   if (status.ok()) {
-    InstallSuperVersionAndScheduleWorkWrapper(
-        c->column_family_data(), job_context, *c->mutable_cf_options());
+    InstallSuperVersionAndScheduleWork(
+        c->column_family_data(), &job_context->superversion_contexts[0],
+        *c->mutable_cf_options(), FlushReason::kManualCompaction);
   }
   c->ReleaseCompactionFiles(s);
+#ifndef ROCKSDB_LITE
+  // Need to make sure SstFileManager does its bookkeeping
+  auto sfm = static_cast<SstFileManagerImpl*>(
+      immutable_db_options_.sst_file_manager.get());
+  if (sfm && sfm_reserved_compact_space) {
+    sfm->OnCompactionCompletion(c.get());
+  }
+#endif  // ROCKSDB_LITE
 
   ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
 
@@ -560,8 +692,15 @@ Status DBImpl::CompactFilesImpl(
                    "[%s] [JOB %d] Compaction error: %s",
                    c->column_family_data()->GetName().c_str(),
                    job_context->job_id, status.ToString().c_str());
-    if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
-      bg_error_ = status;
+    error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
+  }
+
+  if (output_file_names != nullptr) {
+    for (const auto newf : c->edit()->GetNewFiles()) {
+      (*output_file_names)
+          .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
+                                   newf.second.fd.GetNumber(),
+                                   newf.second.fd.GetPathId()));
     }
   }
 
@@ -571,6 +710,7 @@ Status DBImpl::CompactFilesImpl(
   if (bg_compaction_scheduled_ == 0) {
     bg_cv_.SignalAll();
   }
+  TEST_SYNC_POINT("CompactFilesImpl:End");
 
   return status;
 }
@@ -579,7 +719,8 @@ Status DBImpl::CompactFilesImpl(
 Status DBImpl::PauseBackgroundWork() {
   InstrumentedMutexLock guard_lock(&mutex_);
   bg_compaction_paused_++;
-  while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) {
+  while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
+         bg_flush_scheduled_ > 0) {
     bg_cv_.Wait();
   }
   bg_work_paused_++;
@@ -604,9 +745,8 @@ Status DBImpl::ContinueBackgroundWork() {
 }
 
 void DBImpl::NotifyOnCompactionCompleted(
-    ColumnFamilyData* cfd, Compaction *c, const Status &st,
-    const CompactionJobStats& compaction_job_stats,
-    const int job_id) {
+    ColumnFamilyData* cfd, Compaction* c, const Status& st,
+    const CompactionJobStats& compaction_job_stats, const int job_id) {
 #ifndef ROCKSDB_LITE
   if (immutable_db_options_.listeners.size() == 0U) {
     return;
@@ -615,6 +755,8 @@ void DBImpl::NotifyOnCompactionCompleted(
   if (shutting_down_.load(std::memory_order_acquire)) {
     return;
   }
+  Version* current = cfd->current();
+  current->Ref();
   // release lock while notifying events
   mutex_.Unlock();
   TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
@@ -632,12 +774,12 @@ void DBImpl::NotifyOnCompactionCompleted(
     info.compression = c->output_compression();
     for (size_t i = 0; i < c->num_input_levels(); ++i) {
       for (const auto fmd : *c->inputs(i)) {
-        auto fn = TableFileName(immutable_db_options_.db_paths,
+        auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
                                 fmd->fd.GetNumber(), fmd->fd.GetPathId());
         info.input_files.push_back(fn);
         if (info.table_properties.count(fn) == 0) {
           std::shared_ptr<const TableProperties> tp;
-          auto s = cfd->current()->GetTableProperties(&tp, fmd, &fn);
+          auto s = current->GetTableProperties(&tp, fmd, &fn);
           if (s.ok()) {
             info.table_properties[fn] = tp;
           }
@@ -645,17 +787,24 @@ void DBImpl::NotifyOnCompactionCompleted(
       }
     }
     for (const auto newf : c->edit()->GetNewFiles()) {
-      info.output_files.push_back(TableFileName(immutable_db_options_.db_paths,
-                                                newf.second.fd.GetNumber(),
-                                                newf.second.fd.GetPathId()));
+      info.output_files.push_back(TableFileName(
+          c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(),
+          newf.second.fd.GetPathId()));
     }
     for (auto listener : immutable_db_options_.listeners) {
       listener->OnCompactionCompleted(this, info);
     }
   }
   mutex_.Lock();
+  current->Unref();
   // no need to signal bg_cv_ as it will be signaled at the end of the
   // flush process.
+#else
+  (void)cfd;
+  (void)c;
+  (void)st;
+  (void)compaction_job_stats;
+  (void)job_id;
 #endif  // ROCKSDB_LITE
 }
 
@@ -667,8 +816,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
     return Status::InvalidArgument("Target level exceeds number of levels");
   }
 
-  std::unique_ptr<SuperVersion> superversion_to_free;
-  std::unique_ptr<SuperVersion> new_superversion(new SuperVersion());
+  SuperVersionContext sv_context(/* create_superversion */ true);
 
   Status status;
 
@@ -714,7 +862,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
       edit.DeleteFile(level, f->fd.GetNumber());
       edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
                    f->fd.GetFileSize(), f->smallest, f->largest,
-                   f->smallest_seqno, f->largest_seqno,
+                   f->fd.smallest_seqno, f->fd.largest_seqno,
                    f->marked_for_compaction);
     }
     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
@@ -723,8 +871,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
 
     status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
                                     directories_.GetDbDir());
-    superversion_to_free.reset(InstallSuperVersionAndScheduleWork(
-        cfd, new_superversion.release(), mutable_cf_options));
+    InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
 
     ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
                     cfd->GetName().c_str(), status.ToString().data());
@@ -736,6 +883,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
     }
   }
 
+  sv_context.Clean();
   refitting_level_ = false;
 
   return status;
@@ -746,25 +894,96 @@ int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
   return cfh->cfd()->NumberLevels();
 }
 
-int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
+int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
   return 0;
 }
 
 int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
   InstrumentedMutexLock l(&mutex_);
-  return cfh->cfd()->GetSuperVersion()->
-      mutable_cf_options.level0_stop_writes_trigger;
+  return cfh->cfd()
+      ->GetSuperVersion()
+      ->mutable_cf_options.level0_stop_writes_trigger;
 }
 
 Status DBImpl::Flush(const FlushOptions& flush_options,
                      ColumnFamilyHandle* column_family) {
   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
-  return FlushMemTable(cfh->cfd(), flush_options);
+  ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
+                 cfh->GetName().c_str());
+  Status s =
+      FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
+  ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                 "[%s] Manual flush finished, status: %s\n",
+                 cfh->GetName().c_str(), s.ToString().c_str());
+  return s;
+}
+
+
+Status DBImpl::FlushAllCFs(FlushReason flush_reason) {
+  Status s;
+  WriteContext context;
+  WriteThread::Writer w;
+
+  mutex_.AssertHeld();
+  write_thread_.EnterUnbatched(&w, &mutex_);
+
+  FlushRequest flush_req;
+  for (auto cfd : *versions_->GetColumnFamilySet()) {
+    if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
+        cached_recoverable_state_empty_.load()) {
+      // Nothing to flush
+      continue;
+    }
+
+    // SwitchMemtable() will release and reacquire mutex during execution
+    s = SwitchMemtable(cfd, &context);
+    if (!s.ok()) {
+      break;
+    }
+
+    cfd->imm()->FlushRequested();
+
+    flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
+  }
+
+  // schedule flush
+  if (s.ok() && !flush_req.empty()) {
+    SchedulePendingFlush(flush_req, flush_reason);
+    MaybeScheduleFlushOrCompaction();
+  }
+
+  write_thread_.ExitUnbatched(&w);
+
+  if (s.ok()) {
+    for (auto& flush : flush_req) {
+      auto cfd = flush.first;
+      auto flush_memtable_id = flush.second;
+      while (cfd->imm()->NumNotFlushed() > 0 &&
+             cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) {
+        if (!error_handler_.GetRecoveryError().ok()) {
+          break;
+        }
+        if (cfd->IsDropped()) {
+          // FlushJob cannot flush a dropped CF, if we did not break here
+          // we will loop forever since cfd->imm()->NumNotFlushed() will never
+          // drop to zero
+          continue;
+        }
+        cfd->Ref();
+        bg_cv_.Wait();
+        cfd->Unref();
+      }
+    }
+  }
+
+  flush_req.clear();
+  return s;
 }
 
 Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
                                    int output_level, uint32_t output_path_id,
+                                   uint32_t max_subcompactions,
                                    const Slice* begin, const Slice* end,
                                    bool exclusive, bool disallow_trivial_move) {
   assert(input_level == ColumnFamilyData::kCompactAllLevels ||
@@ -775,7 +994,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
 
   bool scheduled = false;
   bool manual_conflict = false;
-  ManualCompaction manual;
+  ManualCompactionState manual;
   manual.cfd = cfd;
   manual.input_level = input_level;
   manual.output_level = output_level;
@@ -792,7 +1011,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
       cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
     manual.begin = nullptr;
   } else {
-    begin_storage.SetMaxPossibleForUserKey(*begin);
+    begin_storage.SetMinPossibleForUserKey(*begin);
     manual.begin = &begin_storage;
   }
   if (end == nullptr ||
@@ -800,7 +1019,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
       cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
     manual.end = nullptr;
   } else {
-    end_storage.SetMinPossibleForUserKey(*end);
+    end_storage.SetMaxPossibleForUserKey(*end);
     manual.end = &end_storage;
   }
 
@@ -825,7 +1044,9 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
   AddManualCompaction(&manual);
   TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
   if (exclusive) {
-    while (bg_compaction_scheduled_ > 0) {
+    while (bg_bottom_compaction_scheduled_ > 0 ||
+           bg_compaction_scheduled_ > 0) {
+      TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
       ROCKS_LOG_INFO(
           immutable_db_options_.info_log,
           "[%s] Manual compaction waiting for all other scheduled background "
@@ -844,15 +1065,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
   while (!manual.done) {
     assert(HasPendingManualCompaction());
     manual_conflict = false;
+    Compaction* compaction = nullptr;
     if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
         scheduled ||
-        ((manual.manual_end = &manual.tmp_storage1)&&(
-             (manual.compaction = manual.cfd->CompactRange(
-                  *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
-                  manual.output_level, manual.output_path_id, manual.begin,
-                  manual.end, &manual.manual_end, &manual_conflict)) ==
-             nullptr) &&
-         manual_conflict)) {
+        (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
+         ((compaction = manual.cfd->CompactRange(
+               *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
+               manual.output_level, manual.output_path_id, max_subcompactions,
+               manual.begin, manual.end, &manual.manual_end,
+               &manual_conflict)) == nullptr &&
+          manual_conflict))) {
       // exclusive manual compactions should not see a conflict during
       // CompactRange
       assert(!exclusive || !manual_conflict);
@@ -864,14 +1086,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
         manual.incomplete = false;
       }
     } else if (!scheduled) {
-      if (manual.compaction == nullptr) {
+      if (compaction == nullptr) {
         manual.done = true;
         bg_cv_.SignalAll();
         continue;
       }
       ca = new CompactionArg;
       ca->db = this;
-      ca->m = &manual;
+      ca->prepicked_compaction = new PrepickedCompaction;
+      ca->prepicked_compaction->manual_compaction_state = &manual;
+      ca->prepicked_compaction->compaction = compaction;
       manual.incomplete = false;
       bg_compaction_scheduled_++;
       env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
@@ -889,62 +1113,169 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
 
 Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
                              const FlushOptions& flush_options,
-                             bool writes_stopped) {
+                             FlushReason flush_reason, bool writes_stopped) {
   Status s;
+  uint64_t flush_memtable_id = 0;
+  if (!flush_options.allow_write_stall) {
+    bool flush_needed = true;
+    s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
+    TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
+    if (!s.ok() || !flush_needed) {
+      return s;
+    }
+  }
+  FlushRequest flush_req;
   {
     WriteContext context;
     InstrumentedMutexLock guard_lock(&mutex_);
 
-    if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) {
-      // Nothing to flush
-      return Status::OK();
-    }
-
     WriteThread::Writer w;
     if (!writes_stopped) {
       write_thread_.EnterUnbatched(&w, &mutex_);
     }
 
-    // SwitchMemtable() will release and reacquire mutex
-    // during execution
-    s = SwitchMemtable(cfd, &context);
+    if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
+        !cached_recoverable_state_empty_.load()) {
+      s = SwitchMemtable(cfd, &context);
+      flush_memtable_id = cfd->imm()->GetLatestMemTableID();
+      flush_req.emplace_back(cfd, flush_memtable_id);
+    }
+
+    if (s.ok() && !flush_req.empty()) {
+      for (auto& elem : flush_req) {
+        ColumnFamilyData* loop_cfd = elem.first;
+        loop_cfd->imm()->FlushRequested();
+      }
+      SchedulePendingFlush(flush_req, flush_reason);
+      MaybeScheduleFlushOrCompaction();
+    }
 
     if (!writes_stopped) {
       write_thread_.ExitUnbatched(&w);
     }
-
-    cfd->imm()->FlushRequested();
-
-    // schedule flush
-    SchedulePendingFlush(cfd);
-    MaybeScheduleFlushOrCompaction();
   }
 
   if (s.ok() && flush_options.wait) {
-    // Wait until the compaction completes
-    s = WaitForFlushMemTable(cfd);
+    autovector<ColumnFamilyData*> cfds;
+    autovector<const uint64_t*> flush_memtable_ids;
+    for (auto& iter : flush_req) {
+      cfds.push_back(iter.first);
+      flush_memtable_ids.push_back(&(iter.second));
+    }
+    s = WaitForFlushMemTables(cfds, flush_memtable_ids);
   }
+  TEST_SYNC_POINT("FlushMemTableFinished");
   return s;
 }
 
-Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
-  Status s;
+// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
+// cause write stall, for example if one memtable is being flushed already.
+// This method tries to avoid write stall (similar to CompactRange() behavior)
+// it emulates how the SuperVersion / LSM would change if flush happens, checks
+// it against various constrains and delays flush if it'd cause write stall.
+// Called should check status and flush_needed to see if flush already happened.
+Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
+    bool* flush_needed) {
+  {
+    *flush_needed = true;
+    InstrumentedMutexLock l(&mutex_);
+    uint64_t orig_active_memtable_id = cfd->mem()->GetID();
+    WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
+    do {
+      if (write_stall_condition != WriteStallCondition::kNormal) {
+        TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
+        ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                       "[%s] WaitUntilFlushWouldNotStallWrites"
+                       " waiting on stall conditions to clear",
+                       cfd->GetName().c_str());
+        bg_cv_.Wait();
+      }
+      if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
+        return Status::ShutdownInProgress();
+      }
+
+      uint64_t earliest_memtable_id =
+          std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
+      if (earliest_memtable_id > orig_active_memtable_id) {
+        // We waited so long that the memtable we were originally waiting on was
+        // flushed.
+        *flush_needed = false;
+        return Status::OK();
+      }
+
+      const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
+      const auto* vstorage = cfd->current()->storage_info();
+
+      // Skip stalling check if we're below auto-flush and auto-compaction
+      // triggers. If it stalled in these conditions, that'd mean the stall
+      // triggers are so low that stalling is needed for any background work. In
+      // that case we shouldn't wait since background work won't be scheduled.
+      if (cfd->imm()->NumNotFlushed() <
+              cfd->ioptions()->min_write_buffer_number_to_merge &&
+          vstorage->l0_delay_trigger_count() <
+              mutable_cf_options.level0_file_num_compaction_trigger) {
+        break;
+      }
+
+      // check whether one extra immutable memtable or an extra L0 file would
+      // cause write stalling mode to be entered. It could still enter stall
+      // mode due to pending compaction bytes, but that's less common
+      write_stall_condition =
+          ColumnFamilyData::GetWriteStallConditionAndCause(
+              cfd->imm()->NumNotFlushed() + 1,
+              vstorage->l0_delay_trigger_count() + 1,
+              vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
+              .first;
+    } while (write_stall_condition != WriteStallCondition::kNormal);
+  }
+  return Status::OK();
+}
+
+// Wait for memtables to be flushed for multiple column families.
+// let N = cfds.size()
+// for i in [0, N),
+//  1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
+//     have to be flushed for THIS column family;
+//  2) if flush_memtable_ids[i] is null, then all memtables in THIS column
+//     family have to be flushed.
+// Finish waiting when ALL column families finish flushing memtables.
+Status DBImpl::WaitForFlushMemTables(
+    const autovector<ColumnFamilyData*>& cfds,
+    const autovector<const uint64_t*>& flush_memtable_ids) {
+  int num = static_cast<int>(cfds.size());
   // Wait until the compaction completes
   InstrumentedMutexLock l(&mutex_);
-  while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) {
+  while (!error_handler_.IsDBStopped()) {
     if (shutting_down_.load(std::memory_order_acquire)) {
       return Status::ShutdownInProgress();
     }
-    if (cfd->IsDropped()) {
-      // FlushJob cannot flush a dropped CF, if we did not break here
-      // we will loop forever since cfd->imm()->NumNotFlushed() will never
-      // drop to zero
+    // Number of column families that have been dropped.
+    int num_dropped = 0;
+    // Number of column families that have finished flush.
+    int num_finished = 0;
+    for (int i = 0; i < num; ++i) {
+      if (cfds[i]->IsDropped()) {
+        ++num_dropped;
+      } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
+                 (flush_memtable_ids[i] != nullptr &&
+                  cfds[i]->imm()->GetEarliestMemTableID() >
+                      *flush_memtable_ids[i])) {
+        ++num_finished;
+      }
+    }
+    if (1 == num_dropped && 1 == num) {
       return Status::InvalidArgument("Cannot flush a dropped CF");
     }
+    // Column families involved in this flush request have either been dropped
+    // or finished flush. Then it's time to finish waiting.
+    if (num_dropped + num_finished == num) {
+      break;
+    }
     bg_cv_.Wait();
   }
-  if (!bg_error_.ok()) {
-    s = bg_error_;
+  Status s;
+  if (error_handler_.IsDBStopped()) {
+    s = error_handler_.GetBGError();
   }
   return s;
 }
@@ -972,27 +1303,31 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
   if (bg_work_paused_ > 0) {
     // we paused the background work
     return;
+  } else if (error_handler_.IsBGWorkStopped() &&
+      !error_handler_.IsRecoveryInProgress()) {
+    // There has been a hard error and this call is not part of the recovery
+    // sequence. Bail out here so we don't get into an endless loop of
+    // scheduling BG work which will again call this function
+    return;
   } else if (shutting_down_.load(std::memory_order_acquire)) {
     // DB is being deleted; no more background compactions
     return;
   }
-
-  while (unscheduled_flushes_ > 0 &&
-         bg_flush_scheduled_ < immutable_db_options_.max_background_flushes) {
-    unscheduled_flushes_--;
+  auto bg_job_limits = GetBGJobLimits();
+  bool is_flush_pool_empty =
+      env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
+  while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
+         bg_flush_scheduled_ < bg_job_limits.max_flushes) {
     bg_flush_scheduled_++;
     env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
   }
 
-  auto bg_compactions_allowed = BGCompactionsAllowed();
-
-  // special case -- if max_background_flushes == 0, then schedule flush on a
-  // compaction thread
-  if (immutable_db_options_.max_background_flushes == 0) {
+  // special case -- if high-pri (flush) thread pool is empty, then schedule
+  // flushes in low-pri (compaction) thread pool.
+  if (is_flush_pool_empty) {
     while (unscheduled_flushes_ > 0 &&
            bg_flush_scheduled_ + bg_compaction_scheduled_ <
-               bg_compactions_allowed) {
-      unscheduled_flushes_--;
+               bg_job_limits.max_flushes) {
       bg_flush_scheduled_++;
       env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
     }
@@ -1001,19 +1336,26 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
   if (bg_compaction_paused_ > 0) {
     // we paused the background compaction
     return;
+  } else if (error_handler_.IsBGWorkStopped()) {
+    // Compaction is not part of the recovery sequence from a hard error. We
+    // might get here because recovery might do a flush and install a new
+    // super version, which will try to schedule pending compactions. Bail
+    // out here and let the higher level recovery handle compactions
+    return;
   }
 
   if (HasExclusiveManualCompaction()) {
     // only manual compactions are allowed to run. don't schedule automatic
     // compactions
+    TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
     return;
   }
 
-  while (bg_compaction_scheduled_ < bg_compactions_allowed &&
+  while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
          unscheduled_compactions_ > 0) {
     CompactionArg* ca = new CompactionArg;
     ca->db = this;
-    ca->m = nullptr;
+    ca->prepicked_compaction = nullptr;
     bg_compaction_scheduled_++;
     unscheduled_compactions_--;
     env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
@@ -1021,66 +1363,88 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
   }
 }
 
-int DBImpl::BGCompactionsAllowed() const {
+DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
   mutex_.AssertHeld();
-  if (write_controller_.NeedSpeedupCompaction()) {
-    return mutable_db_options_.max_background_compactions;
+  return GetBGJobLimits(immutable_db_options_.max_background_flushes,
+                        mutable_db_options_.max_background_compactions,
+                        mutable_db_options_.max_background_jobs,
+                        write_controller_.NeedSpeedupCompaction());
+}
+
+DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
+                                           int max_background_compactions,
+                                           int max_background_jobs,
+                                           bool parallelize_compactions) {
+  BGJobLimits res;
+  if (max_background_flushes == -1 && max_background_compactions == -1) {
+    // for our first stab implementing max_background_jobs, simply allocate a
+    // quarter of the threads to flushes.
+    res.max_flushes = std::max(1, max_background_jobs / 4);
+    res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
   } else {
-    return mutable_db_options_.base_background_compactions;
+    // compatibility code in case users haven't migrated to max_background_jobs,
+    // which automatically computes flush/compaction limits
+    res.max_flushes = std::max(1, max_background_flushes);
+    res.max_compactions = std::max(1, max_background_compactions);
   }
+  if (!parallelize_compactions) {
+    // throttle background compactions until we deem necessary
+    res.max_compactions = 1;
+  }
+  return res;
 }
 
 void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
-  assert(!cfd->pending_compaction());
+  assert(!cfd->queued_for_compaction());
   cfd->Ref();
   compaction_queue_.push_back(cfd);
-  cfd->set_pending_compaction(true);
+  cfd->set_queued_for_compaction(true);
 }
 
 ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
   assert(!compaction_queue_.empty());
   auto cfd = *compaction_queue_.begin();
   compaction_queue_.pop_front();
-  assert(cfd->pending_compaction());
-  cfd->set_pending_compaction(false);
+  assert(cfd->queued_for_compaction());
+  cfd->set_queued_for_compaction(false);
   return cfd;
 }
 
-void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) {
-  assert(!cfd->pending_flush());
-  cfd->Ref();
-  flush_queue_.push_back(cfd);
-  cfd->set_pending_flush(true);
-}
-
-ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
+DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
   assert(!flush_queue_.empty());
-  auto cfd = *flush_queue_.begin();
+  FlushRequest flush_req = flush_queue_.front();
+  assert(unscheduled_flushes_ >= static_cast<int>(flush_req.size()));
+  unscheduled_flushes_ -= static_cast<int>(flush_req.size());
   flush_queue_.pop_front();
-  assert(cfd->pending_flush());
-  cfd->set_pending_flush(false);
-  return cfd;
+  // TODO: need to unset flush reason?
+  return flush_req;
 }
 
-void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) {
-  if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) {
-    AddToFlushQueue(cfd);
-    ++unscheduled_flushes_;
+void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
+                                  FlushReason flush_reason) {
+  if (flush_req.empty()) {
+    return;
+  }
+  for (auto& iter : flush_req) {
+    ColumnFamilyData* cfd = iter.first;
+    cfd->Ref();
+    cfd->SetFlushReason(flush_reason);
   }
+  unscheduled_flushes_ += static_cast<int>(flush_req.size());
+  flush_queue_.push_back(flush_req);
 }
 
 void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
-  if (!cfd->pending_compaction() && cfd->NeedsCompaction()) {
+  if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
     AddToCompactionQueue(cfd);
     ++unscheduled_compactions_;
   }
 }
 
-void DBImpl::SchedulePendingPurge(std::string fname, FileType type,
-                                  uint64_t number, uint32_t path_id,
-                                  int job_id) {
+void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
+                                  FileType type, uint64_t number, int job_id) {
   mutex_.AssertHeld();
-  PurgeFileInfo file_info(fname, type, number, path_id, job_id);
+  PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
   purge_queue_.push_back(std::move(file_info));
 }
 
@@ -1096,7 +1460,23 @@ void DBImpl::BGWorkCompaction(void* arg) {
   delete reinterpret_cast<CompactionArg*>(arg);
   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
   TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
-  reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(ca.m);
+  auto prepicked_compaction =
+      static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
+  reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
+      prepicked_compaction, Env::Priority::LOW);
+  delete prepicked_compaction;
+}
+
+void DBImpl::BGWorkBottomCompaction(void* arg) {
+  CompactionArg ca = *(static_cast<CompactionArg*>(arg));
+  delete static_cast<CompactionArg*>(arg);
+  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
+  TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
+  auto* prepicked_compaction = ca.prepicked_compaction;
+  assert(prepicked_compaction && prepicked_compaction->compaction &&
+         !prepicked_compaction->manual_compaction_state);
+  ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
+  delete prepicked_compaction;
 }
 
 void DBImpl::BGWorkPurge(void* db) {
@@ -1109,57 +1489,87 @@ void DBImpl::BGWorkPurge(void* db) {
 void DBImpl::UnscheduleCallback(void* arg) {
   CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
   delete reinterpret_cast<CompactionArg*>(arg);
-  if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) {
-    delete ca.m->compaction;
+  if (ca.prepicked_compaction != nullptr) {
+    if (ca.prepicked_compaction->compaction != nullptr) {
+      delete ca.prepicked_compaction->compaction;
+    }
+    delete ca.prepicked_compaction;
   }
   TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
 }
 
 Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
-                               LogBuffer* log_buffer) {
+                               LogBuffer* log_buffer, FlushReason* reason) {
   mutex_.AssertHeld();
 
-  Status status = bg_error_;
-  if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
-    status = Status::ShutdownInProgress();
+  Status status;
+  *reason = FlushReason::kOthers;
+  // If BG work is stopped due to an error, but a recovery is in progress,
+  // that means this flush is part of the recovery. So allow it to go through
+  if (!error_handler_.IsBGWorkStopped()) {
+    if (shutting_down_.load(std::memory_order_acquire)) {
+      status = Status::ShutdownInProgress();
+    }
+  } else if (!error_handler_.IsRecoveryInProgress()) {
+    status = error_handler_.GetBGError();
   }
 
   if (!status.ok()) {
     return status;
   }
 
-  ColumnFamilyData* cfd = nullptr;
+  autovector<BGFlushArg> bg_flush_args;
+  std::vector<SuperVersionContext>& superversion_contexts =
+      job_context->superversion_contexts;
   while (!flush_queue_.empty()) {
     // This cfd is already referenced
-    auto first_cfd = PopFirstFromFlushQueue();
-
-    if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
-      // can't flush this CF, try next one
-      if (first_cfd->Unref()) {
-        delete first_cfd;
+    const FlushRequest& flush_req = PopFirstFromFlushQueue();
+    superversion_contexts.clear();
+    superversion_contexts.reserve(flush_req.size());
+
+    for (const auto& iter : flush_req) {
+      ColumnFamilyData* cfd = iter.first;
+      if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
+        // can't flush this CF, try next one
+        if (cfd->Unref()) {
+          delete cfd;
+        }
+        continue;
       }
-      continue;
+      superversion_contexts.emplace_back(SuperVersionContext(true));
+      bg_flush_args.emplace_back(cfd, iter.second,
+                                 &(superversion_contexts.back()));
+    }
+    if (!bg_flush_args.empty()) {
+      break;
     }
-
-    // found a flush!
-    cfd = first_cfd;
-    break;
   }
 
-  if (cfd != nullptr) {
-    const MutableCFOptions mutable_cf_options =
-        *cfd->GetLatestMutableCFOptions();
-    ROCKS_LOG_BUFFER(
-        log_buffer,
-        "Calling FlushMemTableToOutputFile with column "
-        "family [%s], flush slots available %d, compaction slots allowed %d, "
-        "compaction slots scheduled %d",
-        cfd->GetName().c_str(), immutable_db_options_.max_background_flushes,
-        bg_flush_scheduled_, BGCompactionsAllowed() - bg_compaction_scheduled_);
-    status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
-                                       job_context, log_buffer);
-    if (cfd->Unref()) {
-      delete cfd;
+  if (!bg_flush_args.empty()) {
+    auto bg_job_limits = GetBGJobLimits();
+    for (const auto& arg : bg_flush_args) {
+      ColumnFamilyData* cfd = arg.cfd_;
+      ROCKS_LOG_BUFFER(
+          log_buffer,
+          "Calling FlushMemTableToOutputFile with column "
+          "family [%s], flush slots available %d, compaction slots available "
+          "%d, "
+          "flush slots scheduled %d, compaction slots scheduled %d",
+          cfd->GetName().c_str(), bg_job_limits.max_flushes,
+          bg_job_limits.max_compactions, bg_flush_scheduled_,
+          bg_compaction_scheduled_);
+    }
+    status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
+                                         job_context, log_buffer);
+    // All the CFDs in the FlushReq must have the same flush reason, so just
+    // grab the first one
+    *reason = bg_flush_args[0].cfd_->GetFlushReason();
+    for (auto& arg : bg_flush_args) {
+      ColumnFamilyData* cfd = arg.cfd_;
+      if (cfd->Unref()) {
+        delete cfd;
+        arg.cfd_ = nullptr;
+      }
     }
   }
   return status;
@@ -1168,7 +1578,6 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
 void DBImpl::BackgroundCallFlush() {
   bool made_progress = false;
   JobContext job_context(next_job_id_.fetch_add(1), true);
-  assert(bg_flush_scheduled_);
 
   TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
 
@@ -1176,19 +1585,23 @@ void DBImpl::BackgroundCallFlush() {
                        immutable_db_options_.info_log.get());
   {
     InstrumentedMutexLock l(&mutex_);
+    assert(bg_flush_scheduled_);
     num_running_flushes_++;
 
     auto pending_outputs_inserted_elem =
         CaptureCurrentFileNumberInPendingOutputs();
+    FlushReason reason;
 
-    Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer);
-    if (!s.ok() && !s.IsShutdownInProgress()) {
+    Status s =
+        BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason);
+    if (!s.ok() && !s.IsShutdownInProgress() &&
+        reason != FlushReason::kErrorRecovery) {
       // Wait a little bit before retrying background flush in
       // case this is an environmental problem and we do not want to
       // chew up resources for failed flushes for the duration of
       // the problem.
       uint64_t error_cnt =
-        default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
+          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
       mutex_.Unlock();
       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
@@ -1207,8 +1620,10 @@ void DBImpl::BackgroundCallFlush() {
     // created. Thus, we force full scan in FindObsoleteFiles()
     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
     // delete unnecessary files if any, this is done outside the mutex
-    if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+    if (job_context.HaveSomethingToClean() ||
+        job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
       mutex_.Unlock();
+      TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
       // Have to flush the info logs before bg_flush_scheduled_--
       // because if bg_flush_scheduled_ becomes 0 and the lock is
       // released, the deconstructor of DB can kick in and destroy all the
@@ -1235,9 +1650,9 @@ void DBImpl::BackgroundCallFlush() {
   }
 }
 
-void DBImpl::BackgroundCallCompaction(void* arg) {
+void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
+                                      Env::Priority bg_thread_pri) {
   bool made_progress = false;
-  ManualCompaction* m = reinterpret_cast<ManualCompaction*>(arg);
   JobContext job_context(next_job_id_.fetch_add(1), true);
   TEST_SYNC_POINT("BackgroundCallCompaction:0");
   MaybeDumpStats();
@@ -1255,9 +1670,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
     auto pending_outputs_inserted_elem =
         CaptureCurrentFileNumberInPendingOutputs();
 
-    assert(bg_compaction_scheduled_);
-    Status s =
-        BackgroundCompaction(&made_progress, &job_context, &log_buffer, m);
+    assert((bg_thread_pri == Env::Priority::BOTTOM &&
+            bg_bottom_compaction_scheduled_) ||
+           (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
+    Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
+                                    prepicked_compaction);
     TEST_SYNC_POINT("BackgroundCallCompaction:1");
     if (!s.ok() && !s.IsShutdownInProgress()) {
       // Wait a little bit before retrying background compaction in
@@ -1284,9 +1701,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
     // have created (they might not be all recorded in job_context in case of a
     // failure). Thus, we force full scan in FindObsoleteFiles()
     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
+    TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
 
     // delete unnecessary files if any, this is done outside the mutex
-    if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+    if (job_context.HaveSomethingToClean() ||
+        job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
       mutex_.Unlock();
       // Have to flush the info logs before bg_compaction_scheduled_--
       // because if bg_flush_scheduled_ becomes 0 and the lock is
@@ -1296,6 +1715,7 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
       log_buffer.FlushBufferToLog();
       if (job_context.HaveSomethingToDelete()) {
         PurgeObsoleteFiles(job_context);
+        TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
       }
       job_context.Clean();
       mutex_.Lock();
@@ -1303,17 +1723,24 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
 
     assert(num_running_compactions_ > 0);
     num_running_compactions_--;
-    bg_compaction_scheduled_--;
+    if (bg_thread_pri == Env::Priority::LOW) {
+      bg_compaction_scheduled_--;
+    } else {
+      assert(bg_thread_pri == Env::Priority::BOTTOM);
+      bg_bottom_compaction_scheduled_--;
+    }
 
     versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
 
     // See if there's more work to be done
     MaybeScheduleFlushOrCompaction();
-    if (made_progress || bg_compaction_scheduled_ == 0 ||
-        HasPendingManualCompaction()) {
+    if (made_progress ||
+        (bg_compaction_scheduled_ == 0 &&
+         bg_bottom_compaction_scheduled_ == 0) ||
+        HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
       // signal if
       // * made_progress -- need to wakeup DelayWrite
-      // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
+      // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
       // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
       // If none of this is true, there is no need to signal since nobody is
       // waiting for it
@@ -1328,23 +1755,41 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
 
 Status DBImpl::BackgroundCompaction(bool* made_progress,
                                     JobContext* job_context,
-                                    LogBuffer* log_buffer, void* arg) {
-  ManualCompaction* manual_compaction =
-      reinterpret_cast<ManualCompaction*>(arg);
+                                    LogBuffer* log_buffer,
+                                    PrepickedCompaction* prepicked_compaction) {
+  ManualCompactionState* manual_compaction =
+      prepicked_compaction == nullptr
+          ? nullptr
+          : prepicked_compaction->manual_compaction_state;
   *made_progress = false;
   mutex_.AssertHeld();
   TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
 
   bool is_manual = (manual_compaction != nullptr);
+  unique_ptr<Compaction> c;
+  if (prepicked_compaction != nullptr &&
+      prepicked_compaction->compaction != nullptr) {
+    c.reset(prepicked_compaction->compaction);
+  }
+  bool is_prepicked = is_manual || c;
 
   // (manual_compaction->in_progress == false);
   bool trivial_move_disallowed =
       is_manual && manual_compaction->disallow_trivial_move;
 
   CompactionJobStats compaction_job_stats;
-  Status status = bg_error_;
-  if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
-    status = Status::ShutdownInProgress();
+  Status status;
+  if (!error_handler_.IsBGWorkStopped()) {
+    if (shutting_down_.load(std::memory_order_acquire)) {
+      status = Status::ShutdownInProgress();
+    }
+  } else {
+    status = error_handler_.GetBGError();
+    // If we get here, it means a hard error happened after this compaction
+    // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
+    // a chance to execute. Since we didn't pop a cfd from the compaction
+    // queue, increment unscheduled_compactions_
+    unscheduled_compactions_++;
   }
 
   if (!status.ok()) {
@@ -1352,7 +1797,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
       manual_compaction->status = status;
       manual_compaction->done = true;
       manual_compaction->in_progress = false;
-      delete manual_compaction->compaction;
       manual_compaction = nullptr;
     }
     return status;
@@ -1363,13 +1807,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
     manual_compaction->in_progress = true;
   }
 
-  unique_ptr<Compaction> c;
   // InternalKey manual_end_storage;
   // InternalKey* manual_end = &manual_end_storage;
+  bool sfm_reserved_compact_space = false;
   if (is_manual) {
-    ManualCompaction* m = manual_compaction;
+    ManualCompactionState* m = manual_compaction;
     assert(m->in_progress);
-    c.reset(std::move(m->compaction));
     if (!c) {
       m->done = true;
       m->manual_end = nullptr;
@@ -1380,18 +1823,41 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
                        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
                        (m->end ? m->end->DebugString().c_str() : "(end)"));
     } else {
-      ROCKS_LOG_BUFFER(
-          log_buffer,
-          "[%s] Manual compaction from level-%d to level-%d from %s .. "
-          "%s; will stop at %s\n",
-          m->cfd->GetName().c_str(), m->input_level, c->output_level(),
-          (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
-          (m->end ? m->end->DebugString().c_str() : "(end)"),
-          ((m->done || m->manual_end == nullptr)
-               ? "(end)"
-               : m->manual_end->DebugString().c_str()));
-    }
-  } else if (!compaction_queue_.empty()) {
+      // First check if we have enough room to do the compaction
+      bool enough_room = EnoughRoomForCompaction(
+          m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
+
+      if (!enough_room) {
+        // Then don't do the compaction
+        c->ReleaseCompactionFiles(status);
+        c.reset();
+        // m's vars will get set properly at the end of this function,
+        // as long as status == CompactionTooLarge
+        status = Status::CompactionTooLarge();
+      } else {
+        ROCKS_LOG_BUFFER(
+            log_buffer,
+            "[%s] Manual compaction from level-%d to level-%d from %s .. "
+            "%s; will stop at %s\n",
+            m->cfd->GetName().c_str(), m->input_level, c->output_level(),
+            (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
+            (m->end ? m->end->DebugString().c_str() : "(end)"),
+            ((m->done || m->manual_end == nullptr)
+                 ? "(end)"
+                 : m->manual_end->DebugString().c_str()));
+      }
+    }
+  } else if (!is_prepicked && !compaction_queue_.empty()) {
+    if (HasExclusiveManualCompaction()) {
+      // Can't compact right now, but try again later
+      TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
+
+      // Stay in the compaction queue.
+      unscheduled_compactions_++;
+
+      return Status::OK();
+    }
+
     // cfd is referenced here
     auto cfd = PopFirstFromCompactionQueue();
     // We unreference here because the following code will take a Ref() on
@@ -1406,12 +1872,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
       return Status::OK();
     }
 
-    if (HaveManualCompaction(cfd)) {
-      // Can't compact right now, but try again later
-      TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
-      return Status::OK();
-    }
-
     // Pick up latest mutable CF Options and use it throughout the
     // compaction job
     // Compaction makes a copy of the latest MutableCFOptions. It should be used
@@ -1425,27 +1885,48 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
       TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
       c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
       TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
+
       if (c != nullptr) {
-        // update statistics
-        MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
-                    c->inputs(0)->size());
-        // There are three things that can change compaction score:
-        // 1) When flush or compaction finish. This case is covered by
-        // InstallSuperVersionAndScheduleWork
-        // 2) When MutableCFOptions changes. This case is also covered by
-        // InstallSuperVersionAndScheduleWork, because this is when the new
-        // options take effect.
-        // 3) When we Pick a new compaction, we "remove" those files being
-        // compacted from the calculation, which then influences compaction
-        // score. Here we check if we need the new compaction even without the
-        // files that are currently being compacted. If we need another
-        // compaction, we might be able to execute it in parallel, so we add it
-        // to the queue and schedule a new thread.
-        if (cfd->NeedsCompaction()) {
-          // Yes, we need more compactions!
+        bool enough_room = EnoughRoomForCompaction(
+            cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
+
+        if (!enough_room) {
+          // Then don't do the compaction
+          c->ReleaseCompactionFiles(status);
+          c->column_family_data()
+              ->current()
+              ->storage_info()
+              ->ComputeCompactionScore(*(c->immutable_cf_options()),
+                                       *(c->mutable_cf_options()));
           AddToCompactionQueue(cfd);
           ++unscheduled_compactions_;
-          MaybeScheduleFlushOrCompaction();
+
+          c.reset();
+          // Don't need to sleep here, because BackgroundCallCompaction
+          // will sleep if !s.ok()
+          status = Status::CompactionTooLarge();
+        } else {
+          // update statistics
+          MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
+                      c->inputs(0)->size());
+          // There are three things that can change compaction score:
+          // 1) When flush or compaction finish. This case is covered by
+          // InstallSuperVersionAndScheduleWork
+          // 2) When MutableCFOptions changes. This case is also covered by
+          // InstallSuperVersionAndScheduleWork, because this is when the new
+          // options take effect.
+          // 3) When we Pick a new compaction, we "remove" those files being
+          // compacted from the calculation, which then influences compaction
+          // score. Here we check if we need the new compaction even without the
+          // files that are currently being compacted. If we need another
+          // compaction, we might be able to execute it in parallel, so we add
+          // it to the queue and schedule a new thread.
+          if (cfd->NeedsCompaction()) {
+            // Yes, we need more compactions!
+            AddToCompactionQueue(cfd);
+            ++unscheduled_compactions_;
+            MaybeScheduleFlushOrCompaction();
+          }
         }
       }
     }
@@ -1470,8 +1951,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
     status = versions_->LogAndApply(c->column_family_data(),
                                     *c->mutable_cf_options(), c->edit(),
                                     &mutex_, directories_.GetDbDir());
-    InstallSuperVersionAndScheduleWorkWrapper(
-        c->column_family_data(), job_context, *c->mutable_cf_options());
+    InstallSuperVersionAndScheduleWork(
+        c->column_family_data(), &job_context->superversion_contexts[0],
+        *c->mutable_cf_options(), FlushReason::kAutoCompaction);
     ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
                      c->column_family_data()->GetName().c_str(),
                      c->num_input_files(0));
@@ -1499,14 +1981,14 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
         c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
         c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
                            f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
-                           f->largest, f->smallest_seqno, f->largest_seqno,
-                           f->marked_for_compaction);
-
-        ROCKS_LOG_BUFFER(log_buffer, "[%s] Moving #%" PRIu64
-                                     " to level-%d %" PRIu64 " bytes\n",
-                         c->column_family_data()->GetName().c_str(),
-                         f->fd.GetNumber(), c->output_level(),
-                         f->fd.GetFileSize());
+                           f->largest, f->fd.smallest_seqno,
+                           f->fd.largest_seqno, f->marked_for_compaction);
+
+        ROCKS_LOG_BUFFER(
+            log_buffer,
+            "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
+            c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
+            c->output_level(), f->fd.GetFileSize());
         ++moved_files;
         moved_bytes += f->fd.GetFileSize();
       }
@@ -1516,8 +1998,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
                                     *c->mutable_cf_options(), c->edit(),
                                     &mutex_, directories_.GetDbDir());
     // Use latest MutableCFOptions
-    InstallSuperVersionAndScheduleWorkWrapper(
-        c->column_family_data(), job_context, *c->mutable_cf_options());
+    InstallSuperVersionAndScheduleWork(
+        c->column_family_data(), &job_context->superversion_contexts[0],
+        *c->mutable_cf_options(), FlushReason::kAutoCompaction);
 
     VersionStorageInfo::LevelSummaryStorage tmp;
     c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
@@ -1539,22 +2022,47 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
 
     // Clear Instrument
     ThreadStatusUtil::ResetThreadStatus();
+  } else if (!is_prepicked && c->output_level() > 0 &&
+             c->output_level() ==
+                 c->column_family_data()
+                     ->current()
+                     ->storage_info()
+                     ->MaxOutputLevel(
+                         immutable_db_options_.allow_ingest_behind) &&
+             env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
+    // Forward compactions involving last level to the bottom pool if it exists,
+    // such that compactions unlikely to contribute to write stalls can be
+    // delayed or deprioritized.
+    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
+    CompactionArg* ca = new CompactionArg;
+    ca->db = this;
+    ca->prepicked_compaction = new PrepickedCompaction;
+    ca->prepicked_compaction->compaction = c.release();
+    ca->prepicked_compaction->manual_compaction_state = nullptr;
+    ++bg_bottom_compaction_scheduled_;
+    env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
+                   this, &DBImpl::UnscheduleCallback);
   } else {
-    int output_level  __attribute__((unused)) = c->output_level();
+    int output_level __attribute__((__unused__));
+    output_level = c->output_level();
     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
                              &output_level);
-
     SequenceNumber earliest_write_conflict_snapshot;
     std::vector<SequenceNumber> snapshot_seqs =
         snapshots_.GetAll(&earliest_write_conflict_snapshot);
 
+    auto snapshot_checker = snapshot_checker_.get();
+    if (use_custom_gc_ && snapshot_checker == nullptr) {
+      snapshot_checker = DisableGCSnapshotChecker::Instance();
+    }
     assert(is_snapshot_supported_ || snapshots_.empty());
     CompactionJob compaction_job(
-        job_context->job_id, c.get(), immutable_db_options_, env_options_,
-        versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
-        directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
-        &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
-        table_cache_, &event_logger_,
+        job_context->job_id, c.get(), immutable_db_options_,
+        env_options_for_compaction_, versions_.get(), &shutting_down_,
+        preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
+        GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
+        &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
+        snapshot_checker, table_cache_, &event_logger_,
         c->mutable_cf_options()->paranoid_file_checks,
         c->mutable_cf_options()->report_bg_io_stats, dbname_,
         &compaction_job_stats);
@@ -1567,35 +2075,60 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
 
     status = compaction_job.Install(*c->mutable_cf_options());
     if (status.ok()) {
-      InstallSuperVersionAndScheduleWorkWrapper(
-          c->column_family_data(), job_context, *c->mutable_cf_options());
+      InstallSuperVersionAndScheduleWork(
+          c->column_family_data(), &job_context->superversion_contexts[0],
+          *c->mutable_cf_options(), FlushReason::kAutoCompaction);
     }
     *made_progress = true;
   }
   if (c != nullptr) {
     c->ReleaseCompactionFiles(status);
     *made_progress = true;
-    NotifyOnCompactionCompleted(
-        c->column_family_data(), c.get(), status,
-        compaction_job_stats, job_context->job_id);
+
+#ifndef ROCKSDB_LITE
+    // Need to make sure SstFileManager does its bookkeeping
+    auto sfm = static_cast<SstFileManagerImpl*>(
+        immutable_db_options_.sst_file_manager.get());
+    if (sfm && sfm_reserved_compact_space) {
+      sfm->OnCompactionCompletion(c.get());
+    }
+#endif  // ROCKSDB_LITE
+
+    NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
+                                compaction_job_stats, job_context->job_id);
   }
-  // this will unref its input_version and column_family_data
-  c.reset();
 
-  if (status.ok()) {
+  if (status.ok() || status.IsCompactionTooLarge()) {
     // Done
   } else if (status.IsShutdownInProgress()) {
     // Ignore compaction errors found during shutting down
   } else {
     ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
                    status.ToString().c_str());
-    if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
-      bg_error_ = status;
+    error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
+    if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
+      // Put this cfd back in the compaction queue so we can retry after some
+      // time
+      auto cfd = c->column_family_data();
+      assert(cfd != nullptr);
+      // Since this compaction failed, we need to recompute the score so it
+      // takes the original input files into account
+      c->column_family_data()
+          ->current()
+          ->storage_info()
+          ->ComputeCompactionScore(*(c->immutable_cf_options()),
+                                   *(c->mutable_cf_options()));
+      if (!cfd->queued_for_compaction()) {
+        AddToCompactionQueue(cfd);
+        ++unscheduled_compactions_;
+      }
     }
   }
+  // this will unref its input_version and column_family_data
+  c.reset();
 
   if (is_manual) {
-    ManualCompaction* m = manual_compaction;
+    ManualCompactionState* m = manual_compaction;
     if (!status.ok()) {
       m->status = status;
       m->done = true;
@@ -1628,7 +2161,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
       m->begin = &m->tmp_storage;
       m->incomplete = true;
     }
-    m->in_progress = false; // not being processed anymore
+    m->in_progress = false;  // not being processed anymore
   }
   TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
   return status;
@@ -1638,13 +2171,13 @@ bool DBImpl::HasPendingManualCompaction() {
   return (!manual_compaction_dequeue_.empty());
 }
 
-void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) {
+void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
   manual_compaction_dequeue_.push_back(m);
 }
 
-void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
+void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
   // Remove from queue
-  std::deque<ManualCompaction*>::iterator it =
+  std::deque<ManualCompactionState*>::iterator it =
       manual_compaction_dequeue_.begin();
   while (it != manual_compaction_dequeue_.end()) {
     if (m == (*it)) {
@@ -1657,16 +2190,17 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
   return;
 }
 
-bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
+bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
   if (num_running_ingest_file_ > 0) {
     // We need to wait for other IngestExternalFile() calls to finish
     // before running a manual compaction.
     return true;
   }
   if (m->exclusive) {
-    return (bg_compaction_scheduled_ > 0);
+    return (bg_bottom_compaction_scheduled_ > 0 ||
+            bg_compaction_scheduled_ > 0);
   }
-  std::deque<ManualCompaction*>::iterator it =
+  std::deque<ManualCompactionState*>::iterator it =
       manual_compaction_dequeue_.begin();
   bool seen = false;
   while (it != manual_compaction_dequeue_.end()) {
@@ -1687,7 +2221,7 @@ bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
 
 bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
   // Remove from priority queue
-  std::deque<ManualCompaction*>::iterator it =
+  std::deque<ManualCompactionState*>::iterator it =
       manual_compaction_dequeue_.begin();
   while (it != manual_compaction_dequeue_.end()) {
     if ((*it)->exclusive) {
@@ -1695,7 +2229,7 @@ bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
     }
     if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
       // Allow automatic compaction if manual compaction is
-      // is in progress
+      // in progress
       return true;
     }
     it++;
@@ -1705,7 +2239,7 @@ bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
 
 bool DBImpl::HasExclusiveManualCompaction() {
   // Remove from priority queue
-  std::deque<ManualCompaction*>::iterator it =
+  std::deque<ManualCompactionState*>::iterator it =
       manual_compaction_dequeue_.begin();
   while (it != manual_compaction_dequeue_.end()) {
     if ((*it)->exclusive) {
@@ -1716,7 +2250,7 @@ bool DBImpl::HasExclusiveManualCompaction() {
   return false;
 }
 
-bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
+bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
   if ((m->exclusive) || (m1->exclusive)) {
     return true;
   }
@@ -1726,31 +2260,24 @@ bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
   return true;
 }
 
-// JobContext gets created and destructed outside of the lock --
-// we
-// use this convinently to:
+// SuperVersionContext gets created and destructed outside of the lock --
+// we use this conveniently to:
 // * malloc one SuperVersion() outside of the lock -- new_superversion
 // * delete SuperVersion()s outside of the lock -- superversions_to_free
 //
 // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
-// same job_context, we can't reuse the SuperVersion() that got
+// same sv_context, we can't reuse the SuperVersion() that got
 // malloced because
 // first call already used it. In that rare case, we take a hit and create a
 // new SuperVersion() inside of the mutex. We do similar thing
 // for superversion_to_free
-void DBImpl::InstallSuperVersionAndScheduleWorkWrapper(
-    ColumnFamilyData* cfd, JobContext* job_context,
-    const MutableCFOptions& mutable_cf_options) {
-  mutex_.AssertHeld();
-  SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork(
-      cfd, job_context->new_superversion, mutable_cf_options);
-  job_context->new_superversion = nullptr;
-  job_context->superversions_to_free.push_back(old_superversion);
-}
 
-SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
-    ColumnFamilyData* cfd, SuperVersion* new_sv,
-    const MutableCFOptions& mutable_cf_options) {
+void DBImpl::InstallSuperVersionAndScheduleWork(
+    ColumnFamilyData* cfd, SuperVersionContext* sv_context,
+    const MutableCFOptions& mutable_cf_options,
+    FlushReason /* flush_reason */) {
+  // TODO(yanqin) investigate if 'flush_reason' can be removed since it's not
+  // used.
   mutex_.AssertHeld();
 
   // Update max_total_in_memory_state_
@@ -1761,20 +2288,60 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
                         old_sv->mutable_cf_options.max_write_buffer_number;
   }
 
-  auto* old = cfd->InstallSuperVersion(
-      new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
+  // this branch is unlikely to step in
+  if (UNLIKELY(sv_context->new_superversion == nullptr)) {
+    sv_context->NewSuperVersion();
+  }
+  cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
 
   // Whenever we install new SuperVersion, we might need to issue new flushes or
   // compactions.
-  SchedulePendingFlush(cfd);
   SchedulePendingCompaction(cfd);
   MaybeScheduleFlushOrCompaction();
 
   // Update max_total_in_memory_state_
-  max_total_in_memory_state_ =
-      max_total_in_memory_state_ - old_memtable_size +
-      mutable_cf_options.write_buffer_size *
-      mutable_cf_options.max_write_buffer_number;
-  return old;
+  max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
+                               mutable_cf_options.write_buffer_size *
+                                   mutable_cf_options.max_write_buffer_number;
+}
+
+// ShouldPurge is called by FindObsoleteFiles when doing a full scan,
+// and db mutex (mutex_) should already be held. This function performs a
+// linear scan of an vector (files_grabbed_for_purge_) in search of a
+// certain element. We expect FindObsoleteFiles with full scan to occur once
+// every 10 hours by default, and the size of the vector is small.
+// Therefore, the cost is affordable even if the mutex is held.
+// Actually, the current implementation of FindObsoleteFiles with
+// full_scan=true can issue I/O requests to obtain list of files in
+// directories, e.g. env_->getChildren while holding db mutex.
+// In the future, if we want to reduce the cost of search, we may try to keep
+// the vector sorted.
+bool DBImpl::ShouldPurge(uint64_t file_number) const {
+  for (auto fn : files_grabbed_for_purge_) {
+    if (file_number == fn) {
+      return false;
+    }
+  }
+  for (const auto& purge_file_info : purge_queue_) {
+    if (purge_file_info.number == file_number) {
+      return false;
+    }
+  }
+  return true;
+}
+
+// MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
+// (mutex_) should already be held.
+void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
+  files_grabbed_for_purge_.emplace_back(file_number);
+}
+
+void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
+  InstrumentedMutexLock l(&mutex_);
+  // snapshot_checker_ should only set once. If we need to set it multiple
+  // times, we need to make sure the old one is not deleted while it is still
+  // using by a compaction job.
+  assert(!snapshot_checker_);
+  snapshot_checker_.reset(snapshot_checker);
 }
 }  // namespace rocksdb