]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/column_family.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / column_family.cc
index f9a4ae66d8ef27ba9fa0d8e6f407bd148ac107d0..928a02a1fdab925e3c5c4993d4fd1f22abd6b09f 100644 (file)
@@ -9,36 +9,34 @@
 
 #include "db/column_family.h"
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
-#include <vector>
-#include <string>
 #include <algorithm>
+#include <cinttypes>
 #include <limits>
+#include <string>
+#include <vector>
 
-#include "db/compaction_picker.h"
-#include "db/compaction_picker_fifo.h"
-#include "db/compaction_picker_universal.h"
-#include "db/db_impl.h"
+#include "db/compaction/compaction_picker.h"
+#include "db/compaction/compaction_picker_fifo.h"
+#include "db/compaction/compaction_picker_level.h"
+#include "db/compaction/compaction_picker_universal.h"
+#include "db/db_impl/db_impl.h"
 #include "db/internal_stats.h"
 #include "db/job_context.h"
 #include "db/range_del_aggregator.h"
 #include "db/table_properties_collector.h"
 #include "db/version_set.h"
 #include "db/write_controller.h"
+#include "file/sst_file_manager_impl.h"
 #include "memtable/hash_skiplist_rep.h"
 #include "monitoring/thread_status_util.h"
 #include "options/options_helper.h"
-#include "table/block_based_table_factory.h"
+#include "port/port.h"
+#include "table/block_based/block_based_table_factory.h"
 #include "table/merging_iterator.h"
 #include "util/autovector.h"
 #include "util/compression.h"
-#include "util/sst_file_manager_impl.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
     ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
@@ -62,10 +60,12 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
     ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
     JobContext job_context(0);
     mutex_->Lock();
-    if (cfd_->Unref()) {
-      delete cfd_;
+    bool dropped = cfd_->IsDropped();
+    if (cfd_->UnrefAndTryDelete()) {
+      if (dropped) {
+        db_->FindObsoleteFiles(&job_context, false, true);
+      }
     }
-    db_->FindObsoleteFiles(&job_context, false, true);
     mutex_->Unlock();
     if (job_context.HaveSomethingToDelete()) {
       bool defer_purge =
@@ -185,6 +185,11 @@ Status CheckCFPathsSupported(const DBOptions& db_options,
   return Status::OK();
 }
 
+namespace {
+const uint64_t kDefaultTtl = 0xfffffffffffffffe;
+const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
+};  // namespace
+
 ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
                                     const ColumnFamilyOptions& src) {
   ColumnFamilyOptions result = src;
@@ -225,7 +230,14 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
   if (result.max_write_buffer_number < 2) {
     result.max_write_buffer_number = 2;
   }
-  if (result.max_write_buffer_number_to_maintain < 0) {
+  // fall back max_write_buffer_number_to_maintain if
+  // max_write_buffer_size_to_maintain is not set
+  if (result.max_write_buffer_size_to_maintain < 0) {
+    result.max_write_buffer_size_to_maintain =
+        result.max_write_buffer_number *
+        static_cast<int64_t>(result.write_buffer_size);
+  } else if (result.max_write_buffer_size_to_maintain == 0 &&
+             result.max_write_buffer_number_to_maintain < 0) {
     result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
   }
   // bloom filter size shouldn't exceed 1/4 of memtable size.
@@ -333,6 +345,61 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
     result.max_compaction_bytes = result.target_file_size_base * 25;
   }
 
+  bool is_block_based_table =
+      (result.table_factory->Name() == BlockBasedTableFactory().Name());
+
+  const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
+  if (result.ttl == kDefaultTtl) {
+    if (is_block_based_table &&
+        result.compaction_style != kCompactionStyleFIFO) {
+      result.ttl = kAdjustedTtl;
+    } else {
+      result.ttl = 0;
+    }
+  }
+
+  const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
+
+  // Turn on periodic compactions and set them to occur once every 30 days if
+  // compaction filters are used and periodic_compaction_seconds is set to the
+  // default value.
+  if (result.compaction_style != kCompactionStyleFIFO) {
+    if ((result.compaction_filter != nullptr ||
+         result.compaction_filter_factory != nullptr) &&
+        result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
+        is_block_based_table) {
+      result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
+    }
+  } else {
+    // result.compaction_style == kCompactionStyleFIFO
+    if (result.ttl == 0) {
+      if (is_block_based_table) {
+        if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
+          result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
+        }
+        result.ttl = result.periodic_compaction_seconds;
+      }
+    } else if (result.periodic_compaction_seconds != 0) {
+      result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
+    }
+  }
+
+  // TTL compactions would work similar to Periodic Compactions in Universal in
+  // most of the cases. So, if ttl is set, execute the periodic compaction
+  // codepath.
+  if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
+    if (result.periodic_compaction_seconds != 0) {
+      result.periodic_compaction_seconds =
+          std::min(result.ttl, result.periodic_compaction_seconds);
+    } else {
+      result.periodic_compaction_seconds = result.ttl;
+    }
+  }
+
+  if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
+    result.periodic_compaction_seconds = 0;
+  }
+
   return result;
 }
 
@@ -369,13 +436,18 @@ void SuperVersion::Cleanup() {
     to_delete.push_back(m);
   }
   current->Unref();
+  if (cfd->Unref()) {
+    delete cfd;
+  }
 }
 
-void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
-                        Version* new_current) {
+void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
+                        MemTableListVersion* new_imm, Version* new_current) {
+  cfd = new_cfd;
   mem = new_mem;
   imm = new_imm;
   current = new_current;
+  cfd->Ref();
   mem->Ref();
   imm->Ref();
   current->Ref();
@@ -403,7 +475,8 @@ ColumnFamilyData::ColumnFamilyData(
     uint32_t id, const std::string& name, Version* _dummy_versions,
     Cache* _table_cache, WriteBufferManager* write_buffer_manager,
     const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
-    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
+    const FileOptions& file_options, ColumnFamilySet* column_family_set,
+    BlockCacheTracer* const block_cache_tracer)
     : id_(id),
       name_(name),
       dummy_versions_(_dummy_versions),
@@ -420,7 +493,8 @@ ColumnFamilyData::ColumnFamilyData(
       write_buffer_manager_(write_buffer_manager),
       mem_(nullptr),
       imm_(ioptions_.min_write_buffer_number_to_merge,
-           ioptions_.max_write_buffer_number_to_maintain),
+           ioptions_.max_write_buffer_number_to_maintain,
+           ioptions_.max_write_buffer_size_to_maintain),
       super_version_(nullptr),
       super_version_number_(0),
       local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
@@ -443,7 +517,8 @@ ColumnFamilyData::ColumnFamilyData(
   if (_dummy_versions != nullptr) {
     internal_stats_.reset(
         new InternalStats(ioptions_.num_levels, db_options.env, this));
-    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
+    table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
+                                      block_cache_tracer));
     if (ioptions_.compaction_style == kCompactionStyleLevel) {
       compaction_picker_.reset(
           new LevelCompactionPicker(ioptions_, &internal_comparator_));
@@ -508,21 +583,7 @@ ColumnFamilyData::~ColumnFamilyData() {
   // compaction_queue_ and we destroyed it
   assert(!queued_for_flush_);
   assert(!queued_for_compaction_);
-
-  if (super_version_ != nullptr) {
-    // Release SuperVersion reference kept in ThreadLocalPtr.
-    // This must be done outside of mutex_ since unref handler can lock mutex.
-    super_version_->db_mutex->Unlock();
-    local_sv_.reset();
-    super_version_->db_mutex->Lock();
-
-    bool is_last_reference __attribute__((__unused__));
-    is_last_reference = super_version_->Unref();
-    assert(is_last_reference);
-    super_version_->Cleanup();
-    delete super_version_;
-    super_version_ = nullptr;
-  }
+  assert(super_version_ == nullptr);
 
   if (dummy_versions_ != nullptr) {
     // List must be empty
@@ -542,6 +603,36 @@ ColumnFamilyData::~ColumnFamilyData() {
   }
 }
 
+bool ColumnFamilyData::UnrefAndTryDelete() {
+  int old_refs = refs_.fetch_sub(1);
+  assert(old_refs > 0);
+
+  if (old_refs == 1) {
+    assert(super_version_ == nullptr);
+    delete this;
+    return true;
+  }
+
+  if (old_refs == 2 && super_version_ != nullptr) {
+    // Only the super_version_ holds me
+    SuperVersion* sv = super_version_;
+    super_version_ = nullptr;
+    // Release SuperVersion reference kept in ThreadLocalPtr.
+    // This must be done outside of mutex_ since unref handler can lock mutex.
+    sv->db_mutex->Unlock();
+    local_sv_.reset();
+    sv->db_mutex->Lock();
+
+    if (sv->Unref()) {
+      // May delete this ColumnFamilyData after calling Cleanup()
+      sv->Cleanup();
+      delete sv;
+      return true;
+    }
+  }
+  return false;
+}
+
 void ColumnFamilyData::SetDropped() {
   // can't drop default CF
   assert(id_ != 0);
@@ -876,8 +967,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
   return write_stall_condition;
 }
 
-const EnvOptions* ColumnFamilyData::soptions() const {
-  return &(column_family_set_->env_options_);
+const FileOptions* ColumnFamilyData::soptions() const {
+  return &(column_family_set_->file_options_);
 }
 
 void ColumnFamilyData::SetCurrent(Version* current_version) {
@@ -917,8 +1008,12 @@ bool ColumnFamilyData::NeedsCompaction() const {
 
 Compaction* ColumnFamilyData::PickCompaction(
     const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
+  SequenceNumber earliest_mem_seqno =
+      std::min(mem_->GetEarliestSequenceNumber(),
+               imm_.current()->GetEarliestSequenceNumber(false));
   auto* result = compaction_picker_->PickCompaction(
-      GetName(), mutable_options, current_->storage_info(), log_buffer);
+      GetName(), mutable_options, current_->storage_info(), log_buffer,
+      earliest_mem_seqno);
   if (result != nullptr) {
     result->SetInputVersion(current_);
   }
@@ -989,22 +1084,22 @@ const int ColumnFamilyData::kCompactToBaseLevel = -2;
 
 Compaction* ColumnFamilyData::CompactRange(
     const MutableCFOptions& mutable_cf_options, int input_level,
-    int output_level, uint32_t output_path_id, uint32_t max_subcompactions,
+    int output_level, const CompactRangeOptions& compact_range_options,
     const InternalKey* begin, const InternalKey* end,
-    InternalKey** compaction_end, bool* conflict) {
+    InternalKey** compaction_end, bool* conflict,
+    uint64_t max_file_num_to_ignore) {
   auto* result = compaction_picker_->CompactRange(
       GetName(), mutable_cf_options, current_->storage_info(), input_level,
-      output_level, output_path_id, max_subcompactions, begin, end,
-      compaction_end, conflict);
+      output_level, compact_range_options, begin, end, compaction_end, conflict,
+      max_file_num_to_ignore);
   if (result != nullptr) {
     result->SetInputVersion(current_);
   }
   return result;
 }
 
-SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
-    InstrumentedMutex* db_mutex) {
-  SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
+SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
+  SuperVersion* sv = GetThreadLocalSuperVersion(db);
   sv->Ref();
   if (!ReturnThreadLocalSuperVersion(sv)) {
     // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
@@ -1016,8 +1111,7 @@ SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
   return sv;
 }
 
-SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
-    InstrumentedMutex* db_mutex) {
+SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
   // The SuperVersion is cached in thread local storage to avoid acquiring
   // mutex when SuperVersion does not change since the last use. When a new
   // SuperVersion is installed, the compaction or flush thread cleans up
@@ -1044,16 +1138,21 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
 
     if (sv && sv->Unref()) {
       RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
-      db_mutex->Lock();
+      db->mutex()->Lock();
       // NOTE: underlying resources held by superversion (sst files) might
       // not be released until the next background job.
       sv->Cleanup();
-      sv_to_delete = sv;
+      if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
+        db->AddSuperVersionsToFreeQueue(sv);
+        db->SchedulePurge();
+      } else {
+        sv_to_delete = sv;
+      }
     } else {
-      db_mutex->Lock();
+      db->mutex()->Lock();
     }
     sv = super_version_->Ref();
-    db_mutex->Unlock();
+    db->mutex()->Unlock();
 
     delete sv_to_delete;
   }
@@ -1091,7 +1190,7 @@ void ColumnFamilyData::InstallSuperVersion(
   SuperVersion* new_superversion = sv_context->new_superversion.release();
   new_superversion->db_mutex = db_mutex;
   new_superversion->mutable_cf_options = mutable_cf_options;
-  new_superversion->Init(mem_, imm_.current(), current_);
+  new_superversion->Init(this, mem_, imm_.current(), current_);
   SuperVersion* old_superversion = super_version_;
   super_version_ = new_superversion;
   ++super_version_number_;
@@ -1141,13 +1240,56 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
   }
 }
 
+Status ColumnFamilyData::ValidateOptions(
+    const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
+  Status s;
+  s = CheckCompressionSupported(cf_options);
+  if (s.ok() && db_options.allow_concurrent_memtable_write) {
+    s = CheckConcurrentWritesSupported(cf_options);
+  }
+  if (s.ok() && db_options.unordered_write &&
+      cf_options.max_successive_merges != 0) {
+    s = Status::InvalidArgument(
+        "max_successive_merges > 0 is incompatible with unordered_write");
+  }
+  if (s.ok()) {
+    s = CheckCFPathsSupported(db_options, cf_options);
+  }
+  if (!s.ok()) {
+    return s;
+  }
+
+  if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
+    if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
+      return Status::NotSupported(
+          "TTL is only supported in Block-Based Table format. ");
+    }
+  }
+
+  if (cf_options.periodic_compaction_seconds > 0 &&
+      cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
+    if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
+      return Status::NotSupported(
+          "Periodic Compaction is only supported in "
+          "Block-Based Table format. ");
+    }
+  }
+  return s;
+}
+
 #ifndef ROCKSDB_LITE
 Status ColumnFamilyData::SetOptions(
-      const std::unordered_map<std::string, std::string>& options_map) {
+    const DBOptions& db_options,
+    const std::unordered_map<std::string, std::string>& options_map) {
   MutableCFOptions new_mutable_cf_options;
   Status s =
       GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                    ioptions_.info_log, &new_mutable_cf_options);
+  if (s.ok()) {
+    ColumnFamilyOptions cf_options =
+        BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
+    s = ValidateOptions(db_options, cf_options);
+  }
   if (s.ok()) {
     mutable_cf_options_ = new_mutable_cf_options;
     mutable_cf_options_.RefreshDerivedOptions(ioptions_);
@@ -1169,22 +1311,35 @@ Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
   // L1: medium, L2: long, ...
   if (level - base_level >= 2) {
     return Env::WLTH_EXTREME;
+  } else if (level < base_level) {
+    // There is no restriction which prevents level passed in to be smaller
+    // than base_level.
+    return Env::WLTH_MEDIUM;
   }
   return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                             static_cast<int>(Env::WLTH_MEDIUM));
 }
 
-Status ColumnFamilyData::AddDirectories() {
+Status ColumnFamilyData::AddDirectories(
+    std::map<std::string, std::shared_ptr<Directory>>* created_dirs) {
   Status s;
+  assert(created_dirs != nullptr);
   assert(data_dirs_.empty());
   for (auto& p : ioptions_.cf_paths) {
-    std::unique_ptr<Directory> path_directory;
-    s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
-    if (!s.ok()) {
-      return s;
+    auto existing_dir = created_dirs->find(p.path);
+
+    if (existing_dir == created_dirs->end()) {
+      std::unique_ptr<Directory> path_directory;
+      s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
+      if (!s.ok()) {
+        return s;
+      }
+      assert(path_directory != nullptr);
+      data_dirs_.emplace_back(path_directory.release());
+      (*created_dirs)[p.path] = data_dirs_.back();
+    } else {
+      data_dirs_.emplace_back(existing_dir->second);
     }
-    assert(path_directory != nullptr);
-    data_dirs_.emplace_back(path_directory.release());
   }
   assert(data_dirs_.size() == ioptions_.cf_paths.size());
   return s;
@@ -1201,21 +1356,23 @@ Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
 
 ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
                                  const ImmutableDBOptions* db_options,
-                                 const EnvOptions& env_options,
+                                 const FileOptions& file_options,
                                  Cache* table_cache,
                                  WriteBufferManager* write_buffer_manager,
-                                 WriteController* write_controller)
+                                 WriteController* write_controller,
+                                 BlockCacheTracer* const block_cache_tracer)
     : max_column_family_(0),
-      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
-                                      ColumnFamilyOptions(), *db_options,
-                                      env_options, nullptr)),
+      dummy_cfd_(new ColumnFamilyData(
+          0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
+          file_options, nullptr, block_cache_tracer)),
       default_cfd_cache_(nullptr),
       db_name_(dbname),
       db_options_(db_options),
-      env_options_(env_options),
+      file_options_(file_options),
       table_cache_(table_cache),
       write_buffer_manager_(write_buffer_manager),
-      write_controller_(write_controller) {
+      write_controller_(write_controller),
+      block_cache_tracer_(block_cache_tracer) {
   // initialize linked list
   dummy_cfd_->prev_ = dummy_cfd_;
   dummy_cfd_->next_ = dummy_cfd_;
@@ -1226,14 +1383,12 @@ ColumnFamilySet::~ColumnFamilySet() {
     // cfd destructor will delete itself from column_family_data_
     auto cfd = column_family_data_.begin()->second;
     bool last_ref __attribute__((__unused__));
-    last_ref = cfd->Unref();
+    last_ref = cfd->UnrefAndTryDelete();
     assert(last_ref);
-    delete cfd;
   }
   bool dummy_last_ref __attribute__((__unused__));
-  dummy_last_ref = dummy_cfd_->Unref();
+  dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
   assert(dummy_last_ref);
-  delete dummy_cfd_;
 }
 
 ColumnFamilyData* ColumnFamilySet::GetDefault() const {
@@ -1283,7 +1438,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
   assert(column_families_.find(name) == column_families_.end());
   ColumnFamilyData* new_cfd = new ColumnFamilyData(
       id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
-      *db_options_, env_options_, this);
+      *db_options_, file_options_, this, block_cache_tracer_);
   column_families_.insert({name, id});
   column_family_data_.insert({id, new_cfd});
   max_column_family_ = std::max(max_column_family_, id);
@@ -1365,4 +1520,4 @@ const Comparator* GetColumnFamilyUserComparator(
   return nullptr;
 }
 
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE