]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/column_family.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / column_family.cc
index b4e9b82d39108a6cf14fdcd8719b9351f3113963..268060ddf538df99811978133e4848b3473fbef0 100644 (file)
@@ -17,6 +17,7 @@
 #include <vector>
 
 #include "db/blob/blob_file_cache.h"
+#include "db/blob/blob_source.h"
 #include "db/compaction/compaction_picker.h"
 #include "db/compaction/compaction_picker_fifo.h"
 #include "db/compaction/compaction_picker_level.h"
 #include "db/version_set.h"
 #include "db/write_controller.h"
 #include "file/sst_file_manager_impl.h"
-#include "memtable/hash_skiplist_rep.h"
+#include "logging/logging.h"
 #include "monitoring/thread_status_util.h"
 #include "options/options_helper.h"
 #include "port/port.h"
+#include "rocksdb/convenience.h"
 #include "rocksdb/table.h"
 #include "table/merging_iterator.h"
 #include "util/autovector.h"
@@ -74,11 +76,6 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
       bool defer_purge =
           db_->immutable_db_options().avoid_unnecessary_blocking_io;
       db_->PurgeObsoleteFiles(job_context, defer_purge);
-      if (defer_purge) {
-        mutex_->Lock();
-        db_->SchedulePurge();
-        mutex_->Unlock();
-      }
     }
     job_context.Clean();
   }
@@ -108,8 +105,9 @@ const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
 
 void GetIntTblPropCollectorFactory(
     const ImmutableCFOptions& ioptions,
-    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
-        int_tbl_prop_collector_factories) {
+    IntTblPropCollectorFactories* int_tbl_prop_collector_factories) {
+  assert(int_tbl_prop_collector_factories);
+
   auto& collector_factories = ioptions.table_properties_collector_factories;
   for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
        ++i) {
@@ -139,9 +137,15 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
     }
   }
   if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
-    if (!ZSTD_TrainDictionarySupported()) {
+    if (cf_options.compression_opts.use_zstd_dict_trainer) {
+      if (!ZSTD_TrainDictionarySupported()) {
+        return Status::InvalidArgument(
+            "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
+            "is not linked with the binary.");
+      }
+    } else if (!ZSTD_FinalizeDictionarySupported()) {
       return Status::InvalidArgument(
-          "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
+          "zstd finalizeDictionary cannot be used because ZSTD 1.4.5+ "
           "is not linked with the binary.");
     }
     if (cf_options.compression_opts.max_dict_bytes == 0) {
@@ -188,8 +192,7 @@ Status CheckCFPathsSupported(const DBOptions& db_options,
       return Status::NotSupported(
           "More than one CF paths are only supported in "
           "universal and level compaction styles. ");
-    } else if (cf_options.cf_paths.empty() &&
-               db_options.db_paths.size() > 1) {
+    } else if (cf_options.cf_paths.empty() && db_options.db_paths.size() > 1) {
       return Status::NotSupported(
           "More than one DB paths are only supported in "
           "universal and level compaction styles. ");
@@ -201,7 +204,7 @@ Status CheckCFPathsSupported(const DBOptions& db_options,
 namespace {
 const uint64_t kDefaultTtl = 0xfffffffffffffffe;
 const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
-};  // namespace
+}  // anonymous namespace
 
 ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
                                     const ColumnFamilyOptions& src) {
@@ -209,11 +212,13 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
   size_t clamp_max = std::conditional<
       sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
       std::integral_constant<uint64_t, 64ull << 30>>::type::value;
-  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
+  ClipToRange(&result.write_buffer_size, (static_cast<size_t>(64)) << 10,
+              clamp_max);
   // if user sets arena_block_size, we trust user to use this value. Otherwise,
   // calculate a proper value from writer_buffer_size;
   if (result.arena_block_size <= 0) {
-    result.arena_block_size = result.write_buffer_size / 8;
+    result.arena_block_size =
+        std::min(size_t{1024 * 1024}, result.write_buffer_size / 8);
 
     // Align up to 4k
     const size_t align = 4 * 1024;
@@ -227,6 +232,21 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
     result.min_write_buffer_number_to_merge = 1;
   }
 
+  if (db_options.atomic_flush && result.min_write_buffer_number_to_merge > 1) {
+    ROCKS_LOG_WARN(
+        db_options.logger,
+        "Currently, if atomic_flush is true, then triggering flush for any "
+        "column family internally (non-manual flush) will trigger flushing "
+        "all column families even if the number of memtables is smaller "
+        "min_write_buffer_number_to_merge. Therefore, configuring "
+        "min_write_buffer_number_to_merge > 1 is not compatible and should "
+        "be satinized to 1. Not doing so will lead to data loss and "
+        "inconsistent state across multiple column families when WAL is "
+        "disabled, which is a common setting for atomic flush");
+
+    result.min_write_buffer_number_to_merge = 1;
+  }
+
   if (result.num_levels < 1) {
     result.num_levels = 1;
   }
@@ -270,7 +290,6 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
   }
 
   if (result.compaction_style == kCompactionStyleFIFO) {
-    result.num_levels = 1;
     // since we delete level0 files in FIFO compaction when there are too many
     // of them, these options don't really mean anything
     result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
@@ -282,7 +301,7 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
   }
 
   if (result.level0_file_num_compaction_trigger == 0) {
-    ROCKS_LOG_WARN(db_options.info_log.get(),
+    ROCKS_LOG_WARN(db_options.logger,
                    "level0_file_num_compaction_trigger cannot be 0");
     result.level0_file_num_compaction_trigger = 1;
   }
@@ -291,7 +310,7 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
           result.level0_slowdown_writes_trigger ||
       result.level0_slowdown_writes_trigger <
           result.level0_file_num_compaction_trigger) {
-    ROCKS_LOG_WARN(db_options.info_log.get(),
+    ROCKS_LOG_WARN(db_options.logger,
                    "This condition must be satisfied: "
                    "level0_stop_writes_trigger(%d) >= "
                    "level0_slowdown_writes_trigger(%d) >= "
@@ -308,7 +327,7 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
         result.level0_slowdown_writes_trigger) {
       result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
     }
-    ROCKS_LOG_WARN(db_options.info_log.get(),
+    ROCKS_LOG_WARN(db_options.logger,
                    "Adjust the value to "
                    "level0_stop_writes_trigger(%d)"
                    "level0_slowdown_writes_trigger(%d)"
@@ -333,9 +352,12 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
   // were not deleted yet, when we open the DB we will find these .trash files
   // and schedule them to be deleted (or delete immediately if SstFileManager
   // was not used)
-  auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
+  auto sfm =
+      static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
   for (size_t i = 0; i < result.cf_paths.size(); i++) {
-    DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
+    DeleteScheduler::CleanupDirectory(db_options.env, sfm,
+                                      result.cf_paths[i].path)
+        .PermitUncheckedError();
   }
 #endif
 
@@ -344,12 +366,18 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
   }
 
   if (result.level_compaction_dynamic_level_bytes) {
-    if (result.compaction_style != kCompactionStyleLevel ||
-        result.cf_paths.size() > 1U) {
-      // 1. level_compaction_dynamic_level_bytes only makes sense for
-      //    level-based compaction.
-      // 2. we don't yet know how to make both of this feature and multiple
-      //    DB path work.
+    if (result.compaction_style != kCompactionStyleLevel) {
+      ROCKS_LOG_WARN(db_options.info_log.get(),
+                     "level_compaction_dynamic_level_bytes only makes sense"
+                     "for level-based compaction");
+      result.level_compaction_dynamic_level_bytes = false;
+    } else if (result.cf_paths.size() > 1U) {
+      // we don't yet know how to make both of this feature and multiple
+      // DB path work.
+      ROCKS_LOG_WARN(db_options.info_log.get(),
+                     "multiple cf_paths/db_paths and"
+                     "level_compaction_dynamic_level_bytes"
+                     "can't be used together");
       result.level_compaction_dynamic_level_bytes = false;
     }
   }
@@ -440,6 +468,9 @@ bool SuperVersion::Unref() {
 
 void SuperVersion::Cleanup() {
   assert(refs.load(std::memory_order_relaxed) == 0);
+  // Since this SuperVersion object is being deleted,
+  // decrement reference to the immutable MemtableList
+  // this SV object was pointing to.
   imm->Unref(&to_delete);
   MemTable* m = mem->Unref();
   if (m != nullptr) {
@@ -449,9 +480,7 @@ void SuperVersion::Cleanup() {
     to_delete.push_back(m);
   }
   current->Unref();
-  if (cfd->Unref()) {
-    delete cfd;
-  }
+  cfd->UnrefAndTryDelete();
 }
 
 void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
@@ -469,10 +498,10 @@ void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
 
 namespace {
 void SuperVersionUnrefHandle(void* ptr) {
-  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
-  // destroyed. When former happens, the thread shouldn't see kSVInUse.
-  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
-  // well.
+  // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
+  // destroyed. When the former happens, the thread shouldn't see kSVInUse.
+  // When the latter happens, only super_version_ holds a reference
+  // to ColumnFamilyData, so no further queries are possible.
   SuperVersion* sv = static_cast<SuperVersion*>(ptr);
   bool was_last_ref __attribute__((__unused__));
   was_last_ref = sv->Unref();
@@ -493,15 +522,17 @@ std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
   return paths;
 }
 
-const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;
+const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId =
+    std::numeric_limits<uint32_t>::max();
 
 ColumnFamilyData::ColumnFamilyData(
     uint32_t id, const std::string& name, Version* _dummy_versions,
     Cache* _table_cache, WriteBufferManager* write_buffer_manager,
     const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
-    const FileOptions& file_options, ColumnFamilySet* column_family_set,
+    const FileOptions* file_options, ColumnFamilySet* column_family_set,
     BlockCacheTracer* const block_cache_tracer,
-    const std::shared_ptr<IOTracer>& io_tracer)
+    const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
+    const std::string& db_session_id)
     : id_(id),
       name_(name),
       dummy_versions_(_dummy_versions),
@@ -533,7 +564,8 @@ ColumnFamilyData::ColumnFamilyData(
       prev_compaction_needed_bytes_(0),
       allow_2pc_(db_options.allow_2pc),
       last_memtable_id_(0),
-      db_paths_registered_(false) {
+      db_paths_registered_(false),
+      mempurge_used_(false) {
   if (id_ != kDummyColumnFamilyDataId) {
     // TODO(cc): RegisterDbPaths can be expensive, considering moving it
     // outside of this constructor which might be called with db mutex held.
@@ -544,7 +576,7 @@ ColumnFamilyData::ColumnFamilyData(
       db_paths_registered_ = true;
     } else {
       ROCKS_LOG_ERROR(
-          ioptions_.info_log,
+          ioptions_.logger,
           "Failed to register data paths of column family (id: %d, name: %s)",
           id_, name_.c_str());
     }
@@ -557,12 +589,15 @@ ColumnFamilyData::ColumnFamilyData(
   // if _dummy_versions is nullptr, then this is a dummy column family.
   if (_dummy_versions != nullptr) {
     internal_stats_.reset(
-        new InternalStats(ioptions_.num_levels, db_options.env, this));
+        new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
     table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
-                                      block_cache_tracer, io_tracer));
+                                      block_cache_tracer, io_tracer,
+                                      db_session_id));
     blob_file_cache_.reset(
         new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
-                          internal_stats_->GetBlobFileReadHist()));
+                          internal_stats_->GetBlobFileReadHist(), io_tracer));
+    blob_source_.reset(new BlobSource(ioptions(), db_id, db_session_id,
+                                      blob_file_cache_.get()));
 
     if (ioptions_.compaction_style == kCompactionStyleLevel) {
       compaction_picker_.reset(
@@ -575,15 +610,15 @@ ColumnFamilyData::ColumnFamilyData(
       compaction_picker_.reset(
           new FIFOCompactionPicker(ioptions_, &internal_comparator_));
     } else if (ioptions_.compaction_style == kCompactionStyleNone) {
-      compaction_picker_.reset(new NullCompactionPicker(
-          ioptions_, &internal_comparator_));
-      ROCKS_LOG_WARN(ioptions_.info_log,
+      compaction_picker_.reset(
+          new NullCompactionPicker(ioptions_, &internal_comparator_));
+      ROCKS_LOG_WARN(ioptions_.logger,
                      "Column family %s does not use any background compaction. "
                      "Compactions can only be done via CompactFiles\n",
                      GetName().c_str());
 #endif  // !ROCKSDB_LITE
     } else {
-      ROCKS_LOG_ERROR(ioptions_.info_log,
+      ROCKS_LOG_ERROR(ioptions_.logger,
                       "Unable to recognize the specified compaction style %d. "
                       "Column family %s will use kCompactionStyleLevel.\n",
                       ioptions_.compaction_style, GetName().c_str());
@@ -592,16 +627,36 @@ ColumnFamilyData::ColumnFamilyData(
     }
 
     if (column_family_set_->NumberOfColumnFamilies() < 10) {
-      ROCKS_LOG_INFO(ioptions_.info_log,
+      ROCKS_LOG_INFO(ioptions_.logger,
                      "--------------- Options for column family [%s]:\n",
                      name.c_str());
-      initial_cf_options_.Dump(ioptions_.info_log);
+      initial_cf_options_.Dump(ioptions_.logger);
     } else {
-      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
+      ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n");
     }
   }
 
   RecalculateWriteStallConditions(mutable_cf_options_);
+
+  if (cf_options.table_factory->IsInstanceOf(
+          TableFactory::kBlockBasedTableName()) &&
+      cf_options.table_factory->GetOptions<BlockBasedTableOptions>()) {
+    const BlockBasedTableOptions* bbto =
+        cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
+    const auto& options_overrides = bbto->cache_usage_options.options_overrides;
+    const auto file_metadata_charged =
+        options_overrides.at(CacheEntryRole::kFileMetadata).charged;
+    if (bbto->block_cache &&
+        file_metadata_charged == CacheEntryRoleOptions::Decision::kEnabled) {
+      // TODO(hx235): Add a `ConcurrentCacheReservationManager` at DB scope
+      // responsible for reservation of `ObsoleteFileInfo` so that we can keep
+      // this `file_metadata_cache_res_mgr_` nonconcurrent
+      file_metadata_cache_res_mgr_.reset(new ConcurrentCacheReservationManager(
+          std::make_shared<
+              CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>(
+              bbto->block_cache)));
+    }
+  }
 }
 
 // DB mutex held
@@ -632,7 +687,7 @@ ColumnFamilyData::~ColumnFamilyData() {
 
   if (dummy_versions_ != nullptr) {
     // List must be empty
-    assert(dummy_versions_->TEST_Next() == dummy_versions_);
+    assert(dummy_versions_->Next() == dummy_versions_);
     bool deleted __attribute__((__unused__));
     deleted = dummy_versions_->Unref();
     assert(deleted);
@@ -653,7 +708,7 @@ ColumnFamilyData::~ColumnFamilyData() {
     Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
     if (!s.ok()) {
       ROCKS_LOG_ERROR(
-          ioptions_.info_log,
+          ioptions_.logger,
           "Failed to unregister data paths of column family (id: %d, name: %s)",
           id_, name_.c_str());
     }
@@ -674,14 +729,13 @@ bool ColumnFamilyData::UnrefAndTryDelete() {
     // Only the super_version_ holds me
     SuperVersion* sv = super_version_;
     super_version_ = nullptr;
-    // Release SuperVersion reference kept in ThreadLocalPtr.
-    // This must be done outside of mutex_ since unref handler can lock mutex.
-    sv->db_mutex->Unlock();
+
+    // Release SuperVersion references kept in ThreadLocalPtr.
     local_sv_.reset();
-    sv->db_mutex->Lock();
 
     if (sv->Unref()) {
-      // May delete this ColumnFamilyData after calling Cleanup()
+      // Note: sv will delete this ColumnFamilyData during Cleanup()
+      assert(sv->cfd == this);
       sv->Cleanup();
       delete sv;
       return true;
@@ -708,9 +762,7 @@ uint64_t ColumnFamilyData::OldestLogToKeep() {
   auto current_log = GetLogNumber();
 
   if (allow_2pc_) {
-    autovector<MemTable*> empty_list;
-    auto imm_prep_log =
-        imm()->PrecomputeMinLogContainingPrepSection(empty_list);
+    auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
     auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
 
     if (imm_prep_log > 0 && imm_prep_log < current_log) {
@@ -735,13 +787,13 @@ namespace {
 std::unique_ptr<WriteControllerToken> SetupDelay(
     WriteController* write_controller, uint64_t compaction_needed_bytes,
     uint64_t prev_compaction_need_bytes, bool penalize_stop,
-    bool auto_comapctions_disabled) {
+    bool auto_compactions_disabled) {
   const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
 
   uint64_t max_write_rate = write_controller->max_delayed_write_rate();
   uint64_t write_rate = write_controller->delayed_write_rate();
 
-  if (auto_comapctions_disabled) {
+  if (auto_compactions_disabled) {
     // When auto compaction is disabled, always use the value user gave.
     write_rate = max_write_rate;
   } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
@@ -819,20 +871,21 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
   // condition.
   // Or twice as compaction trigger, if it is smaller.
   int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
-  if (res >= port::kMaxInt32) {
-    return port::kMaxInt32;
+  if (res >= std::numeric_limits<int32_t>::max()) {
+    return std::numeric_limits<int32_t>::max();
   } else {
     // res fits in int
     return static_cast<int>(res);
   }
 }
-}  // namespace
+}  // anonymous namespace
 
 std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
 ColumnFamilyData::GetWriteStallConditionAndCause(
     int num_unflushed_memtables, int num_l0_files,
     uint64_t num_compaction_needed_bytes,
-    const MutableCFOptions& mutable_cf_options) {
+    const MutableCFOptions& mutable_cf_options,
+    const ImmutableCFOptions& immutable_cf_options) {
   if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
     return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
   } else if (!mutable_cf_options.disable_auto_compactions &&
@@ -846,7 +899,9 @@ ColumnFamilyData::GetWriteStallConditionAndCause(
             WriteStallCause::kPendingCompactionBytes};
   } else if (mutable_cf_options.max_write_buffer_number > 3 &&
              num_unflushed_memtables >=
-                 mutable_cf_options.max_write_buffer_number - 1) {
+                 mutable_cf_options.max_write_buffer_number - 1 &&
+             num_unflushed_memtables - 1 >=
+                 immutable_cf_options.min_write_buffer_number_to_merge) {
     return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
   } else if (!mutable_cf_options.disable_auto_compactions &&
              mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
@@ -864,7 +919,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause(
 }
 
 WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
-      const MutableCFOptions& mutable_cf_options) {
+    const MutableCFOptions& mutable_cf_options) {
   auto write_stall_condition = WriteStallCondition::kNormal;
   if (current_ != nullptr) {
     auto* vstorage = current_->storage_info();
@@ -874,7 +929,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
 
     auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
         imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
-        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
+        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
+        *ioptions());
     write_stall_condition = write_stall_condition_and_cause.first;
     auto write_stall_cause = write_stall_condition_and_cause.second;
 
@@ -886,7 +942,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
       write_controller_token_ = write_controller->GetStopToken();
       internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
       ROCKS_LOG_WARN(
-          ioptions_.info_log,
+          ioptions_.logger,
           "[%s] Stopping writes because we have %d immutable memtables "
           "(waiting for flush), max_write_buffer_number is set to %d",
           name_.c_str(), imm()->NumNotFlushed(),
@@ -899,7 +955,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
         internal_stats_->AddCFStats(
             InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
       }
-      ROCKS_LOG_WARN(ioptions_.info_log,
+      ROCKS_LOG_WARN(ioptions_.logger,
                      "[%s] Stopping writes because we have %d level-0 files",
                      name_.c_str(), vstorage->l0_delay_trigger_count());
     } else if (write_stall_condition == WriteStallCondition::kStopped &&
@@ -908,7 +964,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
       internal_stats_->AddCFStats(
           InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
       ROCKS_LOG_WARN(
-          ioptions_.info_log,
+          ioptions_.logger,
           "[%s] Stopping writes because of estimated pending compaction "
           "bytes %" PRIu64,
           name_.c_str(), compaction_needed_bytes);
@@ -920,7 +976,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
                      mutable_cf_options.disable_auto_compactions);
       internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
       ROCKS_LOG_WARN(
-          ioptions_.info_log,
+          ioptions_.logger,
           "[%s] Stalling writes because we have %d immutable memtables "
           "(waiting for flush), max_write_buffer_number is set to %d "
           "rate %" PRIu64,
@@ -942,7 +998,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
         internal_stats_->AddCFStats(
             InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
       }
-      ROCKS_LOG_WARN(ioptions_.info_log,
+      ROCKS_LOG_WARN(ioptions_.logger,
                      "[%s] Stalling writes because we have %d level-0 files "
                      "rate %" PRIu64,
                      name_.c_str(), vstorage->l0_delay_trigger_count(),
@@ -956,7 +1012,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
           mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
           (compaction_needed_bytes -
            mutable_cf_options.soft_pending_compaction_bytes_limit) >
-              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
+              3 *
+                  (mutable_cf_options.hard_pending_compaction_bytes_limit -
                    mutable_cf_options.soft_pending_compaction_bytes_limit) /
                   4;
 
@@ -967,7 +1024,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
       internal_stats_->AddCFStats(
           InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
       ROCKS_LOG_WARN(
-          ioptions_.info_log,
+          ioptions_.logger,
           "[%s] Stalling writes because of estimated pending compaction "
           "bytes %" PRIu64 " rate %" PRIu64,
           name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
@@ -981,7 +1038,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
         write_controller_token_ =
             write_controller->GetCompactionPressureToken();
         ROCKS_LOG_INFO(
-            ioptions_.info_log,
+            ioptions_.logger,
             "[%s] Increasing compaction threads because we have %d level-0 "
             "files ",
             name_.c_str(), vstorage->l0_delay_trigger_count());
@@ -995,7 +1052,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
             write_controller->GetCompactionPressureToken();
         if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
           ROCKS_LOG_INFO(
-              ioptions_.info_log,
+              ioptions_.logger,
               "[%s] Increasing compaction threads because of estimated pending "
               "compaction "
               "bytes %" PRIu64,
@@ -1040,6 +1097,10 @@ uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
   return VersionSet::GetTotalSstFilesSize(dummy_versions_);
 }
 
+uint64_t ColumnFamilyData::GetTotalBlobFileSize() const {
+  return VersionSet::GetTotalBlobFileSize(dummy_versions_);
+}
+
 uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
   return current_->GetSstFilesSize();
 }
@@ -1098,13 +1159,14 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
   MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
   merge_iter_builder.AddIterator(
       super_version->mem->NewIterator(read_opts, &arena));
-  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
+  super_version->imm->AddIterators(read_opts, &merge_iter_builder,
+                                   false /* add_range_tombstone_iter */);
   ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
 
   auto read_seq = super_version->current->version_set()->LastSequence();
   ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
-  auto* active_range_del_iter =
-      super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
+  auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator(
+      read_opts, read_seq, false /* immutable_memtable */);
   range_del_agg.AddTombstones(
       std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
   Status status;
@@ -1149,12 +1211,12 @@ Compaction* ColumnFamilyData::CompactRange(
     int output_level, const CompactRangeOptions& compact_range_options,
     const InternalKey* begin, const InternalKey* end,
     InternalKey** compaction_end, bool* conflict,
-    uint64_t max_file_num_to_ignore) {
+    uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
   auto* result = compaction_picker_->CompactRange(
       GetName(), mutable_cf_options, mutable_db_options,
       current_->storage_info(), input_level, output_level,
       compact_range_options, begin, end, compaction_end, conflict,
-      max_file_num_to_ignore);
+      max_file_num_to_ignore, trim_ts);
   if (result != nullptr) {
     result->SetInputVersion(current_);
   }
@@ -1196,11 +1258,11 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
   SuperVersion* sv = static_cast<SuperVersion*>(ptr);
   if (sv == SuperVersion::kSVObsolete ||
       sv->version_number != super_version_number_.load()) {
-    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
+    RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES);
     SuperVersion* sv_to_delete = nullptr;
 
     if (sv && sv->Unref()) {
-      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
+      RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_CLEANUPS);
       db->mutex()->Lock();
       // NOTE: underlying resources held by superversion (sst files) might
       // not be released until the next background job.
@@ -1241,26 +1303,34 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
   return false;
 }
 
-void ColumnFamilyData::InstallSuperVersion(
-    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
+void ColumnFamilyData::InstallSuperVersion(SuperVersionContext* sv_context,
+                                           InstrumentedMutex* db_mutex) {
   db_mutex->AssertHeld();
-  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
+  return InstallSuperVersion(sv_context, mutable_cf_options_);
 }
 
 void ColumnFamilyData::InstallSuperVersion(
-    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
+    SuperVersionContext* sv_context,
     const MutableCFOptions& mutable_cf_options) {
   SuperVersion* new_superversion = sv_context->new_superversion.release();
-  new_superversion->db_mutex = db_mutex;
   new_superversion->mutable_cf_options = mutable_cf_options;
   new_superversion->Init(this, mem_, imm_.current(), current_);
   SuperVersion* old_superversion = super_version_;
   super_version_ = new_superversion;
   ++super_version_number_;
   super_version_->version_number = super_version_number_;
-  super_version_->write_stall_condition =
-      RecalculateWriteStallConditions(mutable_cf_options);
-
+  if (old_superversion == nullptr || old_superversion->current != current() ||
+      old_superversion->mem != mem_ ||
+      old_superversion->imm != imm_.current()) {
+    // Should not recalculate slow down condition if nothing has changed, since
+    // currently RecalculateWriteStallConditions() treats it as further slowing
+    // down is needed.
+    super_version_->write_stall_condition =
+        RecalculateWriteStallConditions(mutable_cf_options);
+  } else {
+    super_version_->write_stall_condition =
+        old_superversion->write_stall_condition;
+  }
   if (old_superversion != nullptr) {
     // Reset SuperVersions cached in thread local storage.
     // This should be done before old_superversion->Unref(). That's to ensure
@@ -1339,24 +1409,54 @@ Status ColumnFamilyData::ValidateOptions(
           "Block-Based Table format. ");
     }
   }
+
+  if (cf_options.enable_blob_garbage_collection) {
+    if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
+        cf_options.blob_garbage_collection_age_cutoff > 1.0) {
+      return Status::InvalidArgument(
+          "The age cutoff for blob garbage collection should be in the range "
+          "[0.0, 1.0].");
+    }
+    if (cf_options.blob_garbage_collection_force_threshold < 0.0 ||
+        cf_options.blob_garbage_collection_force_threshold > 1.0) {
+      return Status::InvalidArgument(
+          "The garbage ratio threshold for forcing blob garbage collection "
+          "should be in the range [0.0, 1.0].");
+    }
+  }
+
+  if (cf_options.compaction_style == kCompactionStyleFIFO &&
+      db_options.max_open_files != -1 && cf_options.ttl > 0) {
+    return Status::NotSupported(
+        "FIFO compaction only supported with max_open_files = -1.");
+  }
+
+  std::vector<uint32_t> supported{0, 1, 2, 4, 8};
+  if (std::find(supported.begin(), supported.end(),
+                cf_options.memtable_protection_bytes_per_key) ==
+      supported.end()) {
+    return Status::NotSupported(
+        "Memtable per key-value checksum protection only supports 0, 1, 2, 4 "
+        "or 8 bytes per key.");
+  }
   return s;
 }
 
 #ifndef ROCKSDB_LITE
 Status ColumnFamilyData::SetOptions(
-    const DBOptions& db_options,
+    const DBOptions& db_opts,
     const std::unordered_map<std::string, std::string>& options_map) {
-  MutableCFOptions new_mutable_cf_options;
-  Status s =
-      GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
-                                   ioptions_.info_log, &new_mutable_cf_options);
+  ColumnFamilyOptions cf_opts =
+      BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
+  ConfigOptions config_opts;
+  config_opts.mutable_options_only = true;
+  Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
+                                           &cf_opts);
   if (s.ok()) {
-    ColumnFamilyOptions cf_options =
-        BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
-    s = ValidateOptions(db_options, cf_options);
+    s = ValidateOptions(db_opts, cf_opts);
   }
   if (s.ok()) {
-    mutable_cf_options_ = new_mutable_cf_options;
+    mutable_cf_options_ = MutableCFOptions(cf_opts);
     mutable_cf_options_.RefreshDerivedOptions(ioptions_);
   }
   return s;
@@ -1381,8 +1481,8 @@ Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
     // than base_level.
     return Env::WLTH_MEDIUM;
   }
-  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
-                            static_cast<int>(Env::WLTH_MEDIUM));
+  return static_cast<Env::WriteLifeTimeHint>(
+      level - base_level + static_cast<int>(Env::WLTH_MEDIUM));
 }
 
 Status ColumnFamilyData::AddDirectories(
@@ -1395,7 +1495,8 @@ Status ColumnFamilyData::AddDirectories(
 
     if (existing_dir == created_dirs->end()) {
       std::unique_ptr<FSDirectory> path_directory;
-      s = DBImpl::CreateAndNewDirectory(ioptions_.fs, p.path, &path_directory);
+      s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path,
+                                        &path_directory);
       if (!s.ok()) {
         return s;
       }
@@ -1426,21 +1527,25 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
                                  WriteBufferManager* _write_buffer_manager,
                                  WriteController* _write_controller,
                                  BlockCacheTracer* const block_cache_tracer,
-                                 const std::shared_ptr<IOTracer>& io_tracer)
+                                 const std::shared_ptr<IOTracer>& io_tracer,
+                                 const std::string& db_id,
+                                 const std::string& db_session_id)
     : max_column_family_(0),
+      file_options_(file_options),
       dummy_cfd_(new ColumnFamilyData(
           ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
-          nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr,
-          block_cache_tracer, io_tracer)),
+          nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
+          block_cache_tracer, io_tracer, db_id, db_session_id)),
       default_cfd_cache_(nullptr),
       db_name_(dbname),
       db_options_(db_options),
-      file_options_(file_options),
       table_cache_(table_cache),
       write_buffer_manager_(_write_buffer_manager),
       write_controller_(_write_controller),
       block_cache_tracer_(block_cache_tracer),
-      io_tracer_(io_tracer) {
+      io_tracer_(io_tracer),
+      db_id_(db_id),
+      db_session_id_(db_session_id) {
   // initialize linked list
   dummy_cfd_->prev_ = dummy_cfd_;
   dummy_cfd_->next_ = dummy_cfd_;
@@ -1473,8 +1578,8 @@ ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
   }
 }
 
-ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
-    const {
+ColumnFamilyData* ColumnFamilySet::GetColumnFamily(
+    const std::string& name) const {
   auto cfd_iter = column_families_.find(name);
   if (cfd_iter != column_families_.end()) {
     auto cfd = GetColumnFamily(cfd_iter->second);
@@ -1506,7 +1611,8 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
   assert(column_families_.find(name) == column_families_.end());
   ColumnFamilyData* new_cfd = new ColumnFamilyData(
       id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
-      *db_options_, file_options_, this, block_cache_tracer_, io_tracer_);
+      *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
+      db_id_, db_session_id_);
   column_families_.insert({name, id});
   column_family_data_.insert({id, new_cfd});
   max_column_family_ = std::max(max_column_family_, id);
@@ -1522,20 +1628,6 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
   return new_cfd;
 }
 
-// REQUIRES: DB mutex held
-void ColumnFamilySet::FreeDeadColumnFamilies() {
-  autovector<ColumnFamilyData*> to_delete;
-  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
-    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
-      to_delete.push_back(cfd);
-    }
-  }
-  for (auto cfd : to_delete) {
-    // this is very rare, so it's not a problem that we do it under a mutex
-    delete cfd;
-  }
-}
-
 // under a DB mutex AND from a write thread
 void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
   auto cfd_iter = column_family_data_.find(cfd->GetID());