]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_impl_files.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_impl_files.cc
index ea6095fcdf7d23e0a1e68594a1b4aec64e540450..99fb1df40cb225cc3781b0d6a458c28a94e7cdb8 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 #define __STDC_FORMAT_MACROS
 #endif
 #include <inttypes.h>
+#include <set>
+#include <unordered_set>
 #include "db/event_helpers.h"
+#include "db/memtable_list.h"
 #include "util/file_util.h"
 #include "util/sst_file_manager_impl.h"
 
-
 namespace rocksdb {
-uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
-  if (!allow_2pc()) {
-    return 0;
-  }
-
-  uint64_t min_log = 0;
-
-  // we must look through the memtables for two phase transactions
-  // that have been committed but not yet flushed
-  for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
-    if (loop_cfd->IsDropped()) {
-      continue;
-    }
-
-    auto log = loop_cfd->imm()->GetMinLogContainingPrepSection();
-
-    if (log > 0 && (min_log == 0 || log < min_log)) {
-      min_log = log;
-    }
-
-    log = loop_cfd->mem()->GetMinLogContainingPrepSection();
-
-    if (log > 0 && (min_log == 0 || log < min_log)) {
-      min_log = log;
-    }
-  }
-
-  return min_log;
-}
-
-void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
-  assert(log != 0);
-  std::lock_guard<std::mutex> lock(prep_heap_mutex_);
-  auto it = prepared_section_completed_.find(log);
-  assert(it != prepared_section_completed_.end());
-  it->second += 1;
-}
-
-void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
-  assert(log != 0);
-  std::lock_guard<std::mutex> lock(prep_heap_mutex_);
-  min_log_with_prep_.push(log);
-  auto it = prepared_section_completed_.find(log);
-  if (it == prepared_section_completed_.end()) {
-    prepared_section_completed_[log] = 0;
-  }
-}
-
-uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
-
-  if (!allow_2pc()) {
-    return 0;
-  }
-
-  std::lock_guard<std::mutex> lock(prep_heap_mutex_);
-  uint64_t min_log = 0;
-
-  // first we look in the prepared heap where we keep
-  // track of transactions that have been prepared (written to WAL)
-  // but not yet committed.
-  while (!min_log_with_prep_.empty()) {
-    min_log = min_log_with_prep_.top();
-
-    auto it = prepared_section_completed_.find(min_log);
-
-    // value was marked as 'deleted' from heap
-    if (it != prepared_section_completed_.end() && it->second > 0) {
-      it->second -= 1;
-      min_log_with_prep_.pop();
-
-      // back to squere one...
-      min_log = 0;
-      continue;
-    } else {
-      // found a valid value
-      break;
-    }
-  }
-
-  return min_log;
-}
-
 uint64_t DBImpl::MinLogNumberToKeep() {
-  uint64_t log_number = versions_->MinLogNumber();
-
   if (allow_2pc()) {
-    // if are 2pc we must consider logs containing prepared
-    // sections of outstanding transactions.
-    //
-    // We must check min logs with outstanding prep before we check
-    // logs referneces by memtables because a log referenced by the
-    // first data structure could transition to the second under us.
-    //
-    // TODO(horuff): iterating over all column families under db mutex.
-    // should find more optimial solution
-    auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
-
-    if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) {
-      log_number = min_log_in_prep_heap;
-    }
-
-    auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
-
-    if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) {
-      log_number = min_log_refed_by_mem;
-    }
+    return versions_->min_log_number_to_keep_2pc();
+  } else {
+    return versions_->MinLogNumberWithUnflushedData();
   }
-  return log_number;
 }
 
 // * Returns the list of live files in 'sst_live'
@@ -148,7 +48,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
 
   bool doing_the_full_scan = false;
 
-  // logic for figurint out if we're doing the full scan
+  // logic for figuring out if we're doing the full scan
   if (no_full_scan) {
     doing_the_full_scan = false;
   } else if (force ||
@@ -168,7 +68,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
   // threads
   // Since job_context->min_pending_output is set, until file scan finishes,
   // mutex_ cannot be released. Otherwise, we might see no min_pending_output
-  // here but later find newer generated unfinalized files while scannint.
+  // here but later find newer generated unfinalized files while scanning.
   if (!pending_outputs_.empty()) {
     job_context->min_pending_output = *pending_outputs_.begin();
   } else {
@@ -182,27 +82,69 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
                               &job_context->manifest_delete_files,
                               job_context->min_pending_output);
 
+  // Mark the elements in job_context->sst_delete_files as grabbedForPurge
+  // so that other threads calling FindObsoleteFiles with full_scan=true
+  // will not add these files to candidate list for purge.
+  for (const auto& sst_to_del : job_context->sst_delete_files) {
+    MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
+  }
+
   // store the current filenum, lognum, etc
   job_context->manifest_file_number = versions_->manifest_file_number();
   job_context->pending_manifest_file_number =
       versions_->pending_manifest_file_number();
   job_context->log_number = MinLogNumberToKeep();
-
   job_context->prev_log_number = versions_->prev_log_number();
 
   versions_->AddLiveFiles(&job_context->sst_live);
   if (doing_the_full_scan) {
+    InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
+        dbname_);
+    std::set<std::string> paths;
     for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
          path_id++) {
+      paths.insert(immutable_db_options_.db_paths[path_id].path);
+    }
+
+    // Note that if cf_paths is not specified in the ColumnFamilyOptions
+    // of a particular column family, we use db_paths as the cf_paths
+    // setting. Hence, there can be multiple duplicates of files from db_paths
+    // in the following code. The duplicate are removed while identifying
+    // unique files in PurgeObsoleteFiles.
+    for (auto cfd : *versions_->GetColumnFamilySet()) {
+      for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
+           path_id++) {
+        auto& path = cfd->ioptions()->cf_paths[path_id].path;
+
+        if (paths.find(path) == paths.end()) {
+          paths.insert(path);
+        }
+      }
+    }
+
+    for (auto& path : paths) {
       // set of all files in the directory. We'll exclude files that are still
       // alive in the subsequent processings.
       std::vector<std::string> files;
-      env_->GetChildren(immutable_db_options_.db_paths[path_id].path,
-                        &files);  // Ignore errors
-      for (std::string file : files) {
+      env_->GetChildren(path, &files);  // Ignore errors
+      for (const std::string& file : files) {
+        uint64_t number;
+        FileType type;
+        // 1. If we cannot parse the file name, we skip;
+        // 2. If the file with file_number equals number has already been
+        // grabbed for purge by another compaction job, or it has already been
+        // schedule for purge, we also skip it if we
+        // are doing full scan in order to avoid double deletion of the same
+        // file under race conditions. See
+        // https://github.com/facebook/rocksdb/issues/3573
+        if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) ||
+            !ShouldPurge(number)) {
+          continue;
+        }
+
         // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
         job_context->full_scan_candidate_files.emplace_back(
-            "/" + file, static_cast<uint32_t>(path_id));
+            "/" + file, path);
       }
     }
 
@@ -211,8 +153,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
       std::vector<std::string> log_files;
       env_->GetChildren(immutable_db_options_.wal_dir,
                         &log_files);  // Ignore errors
-      for (std::string log_file : log_files) {
-        job_context->full_scan_candidate_files.emplace_back(log_file, 0);
+      for (const std::string& log_file : log_files) {
+        job_context->full_scan_candidate_files.emplace_back(log_file,
+            immutable_db_options_.wal_dir);
       }
     }
     // Add info log files in db_log_dir
@@ -221,8 +164,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
       std::vector<std::string> info_log_files;
       // Ignore errors
       env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
-      for (std::string log_file : info_log_files) {
-        job_context->full_scan_candidate_files.emplace_back(log_file, 0);
+      for (std::string& log_file : info_log_files) {
+        job_context->full_scan_candidate_files.emplace_back(log_file,
+            immutable_db_options_.db_log_dir);
       }
     }
   }
@@ -236,11 +180,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
     while (alive_log_files_.begin()->number < min_log_number) {
       auto& earliest = *alive_log_files_.begin();
       if (immutable_db_options_.recycle_log_file_num >
-          log_recycle_files.size()) {
+          log_recycle_files_.size()) {
         ROCKS_LOG_INFO(immutable_db_options_.info_log,
                        "adding log %" PRIu64 " to recycle list\n",
                        earliest.number);
-        log_recycle_files.push_back(earliest.number);
+        log_recycle_files_.push_back(earliest.number);
       } else {
         job_context->log_delete_files.push_back(earliest.number);
       }
@@ -250,7 +194,13 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
       }
       job_context->size_log_to_delete += earliest.size;
       total_log_size_ -= earliest.size;
+      if (two_write_queues_) {
+        log_write_mutex_.Lock();
+      }
       alive_log_files_.pop_front();
+      if (two_write_queues_) {
+        log_write_mutex_.Unlock();
+      }
       // Current log should always stay alive since it can't have
       // number < MinLogNumber().
       assert(alive_log_files_.size());
@@ -263,7 +213,10 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
         continue;
       }
       logs_to_free_.push_back(log.ReleaseWriter());
-      logs_.pop_front();
+      {
+        InstrumentedMutexLock wl(&log_write_mutex_);
+        logs_.pop_front();
+      }
     }
     // Current log cannot be obsolete.
     assert(!logs_.empty());
@@ -272,8 +225,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
   // We're just cleaning up for DB::Write().
   assert(job_context->logs_to_free.empty());
   job_context->logs_to_free = logs_to_free_;
-  job_context->log_recycle_files.assign(log_recycle_files.begin(),
-                                        log_recycle_files.end());
+  job_context->log_recycle_files.assign(log_recycle_files_.begin(),
+                                        log_recycle_files_.end());
+  if (job_context->HaveSomethingToDelete()) {
+    ++pending_purge_obsolete_files_;
+  }
   logs_to_free_.clear();
 }
 
@@ -285,21 +241,24 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
   } else if (first.file_name < second.file_name) {
     return false;
   } else {
-    return (first.path_id > second.path_id);
+    return (first.file_path > second.file_path);
   }
 }
 };  // namespace
 
 // Delete obsolete files and log status and information of file deletion
-void DBImpl::DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
-                                    const std::string& fname, FileType type,
-                                    uint64_t number, uint32_t path_id) {
+void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
+                                    const std::string& path_to_sync,
+                                    FileType type, uint64_t number) {
+  Status file_deletion_status;
   if (type == kTableFile) {
     file_deletion_status =
-        DeleteSSTFile(&immutable_db_options_, fname, path_id);
+        DeleteSSTFile(&immutable_db_options_, fname, path_to_sync);
   } else {
     file_deletion_status = env_->DeleteFile(fname);
   }
+  TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
+      &file_deletion_status);
   if (file_deletion_status.ok()) {
     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                     "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
@@ -326,19 +285,16 @@ void DBImpl::DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
 }
 
 // Diffs the files listed in filenames and those that do not
-// belong to live files are posibly removed. Also, removes all the
+// belong to live files are possibly removed. Also, removes all the
 // files in sst_delete_files and log_delete_files.
 // It is not necessary to hold the mutex when invoking this method.
-void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
+void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
+  TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
   // we'd better have sth to delete
   assert(state.HaveSomethingToDelete());
 
-  // this checks if FindObsoleteFiles() was run before. If not, don't do
-  // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
-  // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
-  if (state.manifest_file_number == 0) {
-    return;
-  }
+  // FindObsoleteFiles() should've populated this so nonzero
+  assert(state.manifest_file_number != 0);
 
   // Now, convert live list to an unordered map, WITHOUT mutex held;
   // set is slow.
@@ -355,20 +311,23 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
       state.log_delete_files.size() + state.manifest_delete_files.size());
   // We may ignore the dbname when generating the file names.
   const char* kDumbDbName = "";
-  for (auto file : state.sst_delete_files) {
+  for (auto& file : state.sst_delete_files) {
     candidate_files.emplace_back(
-        MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
-        file->fd.GetPathId());
-    delete file;
+        MakeTableFileName(kDumbDbName, file.metadata->fd.GetNumber()), file.path);
+    if (file.metadata->table_reader_handle) {
+      table_cache_->Release(file.metadata->table_reader_handle);
+    }
+    file.DeleteMetadata();
   }
 
   for (auto file_num : state.log_delete_files) {
     if (file_num > 0) {
-      candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), 0);
+      candidate_files.emplace_back(LogFileName(kDumbDbName, file_num),
+          immutable_db_options_.wal_dir);
     }
   }
   for (const auto& filename : state.manifest_delete_files) {
-    candidate_files.emplace_back(filename, 0);
+    candidate_files.emplace_back(filename, dbname_);
   }
 
   // dedup state.candidate_files so we don't try to delete the same
@@ -391,9 +350,32 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
   std::vector<std::string> old_info_log_files;
   InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
                                 dbname_);
+
+  // File numbers of most recent two OPTIONS file in candidate_files (found in
+  // previos FindObsoleteFiles(full_scan=true))
+  // At this point, there must not be any duplicate file numbers in
+  // candidate_files.
+  uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
+  uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();
   for (const auto& candidate_file : candidate_files) {
-    std::string to_delete = candidate_file.file_name;
-    uint32_t path_id = candidate_file.path_id;
+    const std::string& fname = candidate_file.file_name;
+    uint64_t number;
+    FileType type;
+    if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
+        type != kOptionsFile) {
+      continue;
+    }
+    if (number > optsfile_num1) {
+      optsfile_num2 = optsfile_num1;
+      optsfile_num1 = number;
+    } else if (number > optsfile_num2) {
+      optsfile_num2 = number;
+    }
+  }
+
+  std::unordered_set<uint64_t> files_to_del;
+  for (const auto& candidate_file : candidate_files) {
+    const std::string& to_delete = candidate_file.file_name;
     uint64_t number;
     FileType type;
     // Ignore file if we cannot recognize it.
@@ -419,6 +401,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
         // DontDeletePendingOutputs fail
         keep = (sst_live_map.find(number) != sst_live_map.end()) ||
                number >= state.min_pending_output;
+        if (!keep) {
+          files_to_del.insert(number);
+        }
         break;
       case kTempFile:
         // Any temp files that are currently being written to must
@@ -439,11 +424,19 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
           old_info_log_files.push_back(to_delete);
         }
         break;
+      case kOptionsFile:
+        keep = (number >= optsfile_num2);
+        TEST_SYNC_POINT_CALLBACK(
+            "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:1",
+            reinterpret_cast<void*>(&number));
+        TEST_SYNC_POINT_CALLBACK(
+            "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:2",
+            reinterpret_cast<void*>(&keep));
+        break;
       case kCurrentFile:
       case kDBLockFile:
       case kIdentityFile:
       case kMetaDatabase:
-      case kOptionsFile:
       case kBlobFile:
         keep = true;
         break;
@@ -454,13 +447,16 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
     }
 
     std::string fname;
+    std::string dir_to_sync;
     if (type == kTableFile) {
       // evict from cache
       TableCache::Evict(table_cache_.get(), number);
-      fname = TableFileName(immutable_db_options_.db_paths, number, path_id);
+      fname = MakeTableFileName(candidate_file.file_path, number);
+      dir_to_sync = candidate_file.file_path;
     } else {
-      fname = ((type == kLogFile) ? immutable_db_options_.wal_dir : dbname_) +
-              "/" + to_delete;
+      dir_to_sync =
+          (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_;
+      fname = dir_to_sync + "/" + to_delete;
     }
 
 #ifndef ROCKSDB_LITE
@@ -474,13 +470,25 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
     Status file_deletion_status;
     if (schedule_only) {
       InstrumentedMutexLock guard_lock(&mutex_);
-      SchedulePendingPurge(fname, type, number, path_id, state.job_id);
+      SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
     } else {
-      DeleteObsoleteFileImpl(file_deletion_status, state.job_id, fname, type,
-                             number, path_id);
+      DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
     }
   }
 
+  {
+    // After purging obsolete files, remove them from files_grabbed_for_purge_.
+    // Use a temporary vector to perform bulk deletion via swap.
+    InstrumentedMutexLock guard_lock(&mutex_);
+    std::vector<uint64_t> tmp;
+    for (auto fn : files_grabbed_for_purge_) {
+      if (files_to_del.count(fn) == 0) {
+        tmp.emplace_back(fn);
+      }
+    }
+    files_grabbed_for_purge_.swap(tmp);
+  }
+
   // Delete old info log files.
   size_t old_info_log_file_count = old_info_log_files.size();
   if (old_info_log_file_count != 0 &&
@@ -519,6 +527,13 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
   wal_manager_.PurgeObsoleteWALFiles();
 #endif  // ROCKSDB_LITE
   LogFlush(immutable_db_options_.info_log);
+  InstrumentedMutexLock l(&mutex_);
+  --pending_purge_obsolete_files_;
+  assert(pending_purge_obsolete_files_ >= 0);
+  if (pending_purge_obsolete_files_ == 0) {
+    bg_cv_.SignalAll();
+  }
+  TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
 }
 
 void DBImpl::DeleteObsoleteFiles() {
@@ -533,4 +548,95 @@ void DBImpl::DeleteObsoleteFiles() {
   job_context.Clean();
   mutex_.Lock();
 }
+
+uint64_t FindMinPrepLogReferencedByMemTable(
+    VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
+    const autovector<MemTable*>& memtables_to_flush) {
+  uint64_t min_log = 0;
+
+  // we must look through the memtables for two phase transactions
+  // that have been committed but not yet flushed
+  for (auto loop_cfd : *vset->GetColumnFamilySet()) {
+    if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
+      continue;
+    }
+
+    auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
+        memtables_to_flush);
+
+    if (log > 0 && (min_log == 0 || log < min_log)) {
+      min_log = log;
+    }
+
+    log = loop_cfd->mem()->GetMinLogContainingPrepSection();
+
+    if (log > 0 && (min_log == 0 || log < min_log)) {
+      min_log = log;
+    }
+  }
+
+  return min_log;
+}
+
+uint64_t PrecomputeMinLogNumberToKeep(
+    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
+    autovector<VersionEdit*> edit_list,
+    const autovector<MemTable*>& memtables_to_flush,
+    LogsWithPrepTracker* prep_tracker) {
+  assert(vset != nullptr);
+  assert(prep_tracker != nullptr);
+  // Calculate updated min_log_number_to_keep
+  // Since the function should only be called in 2pc mode, log number in
+  // the version edit should be sufficient.
+
+  // Precompute the min log number containing unflushed data for the column
+  // family being flushed (`cfd_to_flush`).
+  uint64_t cf_min_log_number_to_keep = 0;
+  for (auto& e : edit_list) {
+    if (e->has_log_number()) {
+      cf_min_log_number_to_keep =
+          std::max(cf_min_log_number_to_keep, e->log_number());
+    }
+  }
+  if (cf_min_log_number_to_keep == 0) {
+    // No version edit contains information on log number. The log number
+    // for this column family should stay the same as it is.
+    cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber();
+  }
+
+  // Get min log number containing unflushed data for other column families.
+  uint64_t min_log_number_to_keep =
+      vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush);
+  if (cf_min_log_number_to_keep != 0) {
+    min_log_number_to_keep =
+        std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
+  }
+
+  // if are 2pc we must consider logs containing prepared
+  // sections of outstanding transactions.
+  //
+  // We must check min logs with outstanding prep before we check
+  // logs references by memtables because a log referenced by the
+  // first data structure could transition to the second under us.
+  //
+  // TODO: iterating over all column families under db mutex.
+  // should find more optimal solution
+  auto min_log_in_prep_heap =
+      prep_tracker->FindMinLogContainingOutstandingPrep();
+
+  if (min_log_in_prep_heap != 0 &&
+      min_log_in_prep_heap < min_log_number_to_keep) {
+    min_log_number_to_keep = min_log_in_prep_heap;
+  }
+
+  uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
+      vset, &cfd_to_flush, memtables_to_flush);
+
+  if (min_log_refed_by_mem != 0 &&
+      min_log_refed_by_mem < min_log_number_to_keep) {
+    min_log_number_to_keep = min_log_refed_by_mem;
+  }
+  return min_log_number_to_keep;
+}
+
 }  // namespace rocksdb