#include "db/column_family.h"
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
-#include <vector>
-#include <string>
#include <algorithm>
+#include <cinttypes>
#include <limits>
+#include <string>
+#include <vector>
-#include "db/compaction_picker.h"
-#include "db/compaction_picker_fifo.h"
-#include "db/compaction_picker_universal.h"
-#include "db/db_impl.h"
+#include "db/compaction/compaction_picker.h"
+#include "db/compaction/compaction_picker_fifo.h"
+#include "db/compaction/compaction_picker_level.h"
+#include "db/compaction/compaction_picker_universal.h"
+#include "db/db_impl/db_impl.h"
#include "db/internal_stats.h"
#include "db/job_context.h"
#include "db/range_del_aggregator.h"
#include "db/table_properties_collector.h"
#include "db/version_set.h"
#include "db/write_controller.h"
+#include "file/sst_file_manager_impl.h"
#include "memtable/hash_skiplist_rep.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
-#include "table/block_based_table_factory.h"
+#include "port/port.h"
+#include "table/block_based/block_based_table_factory.h"
#include "table/merging_iterator.h"
#include "util/autovector.h"
#include "util/compression.h"
-#include "util/sst_file_manager_impl.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
JobContext job_context(0);
mutex_->Lock();
- if (cfd_->Unref()) {
- delete cfd_;
+ bool dropped = cfd_->IsDropped();
+ if (cfd_->UnrefAndTryDelete()) {
+ if (dropped) {
+ db_->FindObsoleteFiles(&job_context, false, true);
+ }
}
- db_->FindObsoleteFiles(&job_context, false, true);
mutex_->Unlock();
if (job_context.HaveSomethingToDelete()) {
bool defer_purge =
return Status::OK();
}
+namespace {
+const uint64_t kDefaultTtl = 0xfffffffffffffffe;
+const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
+}; // namespace
+
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src;
if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2;
}
- if (result.max_write_buffer_number_to_maintain < 0) {
+ // fall back max_write_buffer_number_to_maintain if
+ // max_write_buffer_size_to_maintain is not set
+ if (result.max_write_buffer_size_to_maintain < 0) {
+ result.max_write_buffer_size_to_maintain =
+ result.max_write_buffer_number *
+ static_cast<int64_t>(result.write_buffer_size);
+ } else if (result.max_write_buffer_size_to_maintain == 0 &&
+ result.max_write_buffer_number_to_maintain < 0) {
result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
}
// bloom filter size shouldn't exceed 1/4 of memtable size.
result.max_compaction_bytes = result.target_file_size_base * 25;
}
+ bool is_block_based_table =
+ (result.table_factory->Name() == BlockBasedTableFactory().Name());
+
+ const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
+ if (result.ttl == kDefaultTtl) {
+ if (is_block_based_table &&
+ result.compaction_style != kCompactionStyleFIFO) {
+ result.ttl = kAdjustedTtl;
+ } else {
+ result.ttl = 0;
+ }
+ }
+
+ const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
+
+ // Turn on periodic compactions and set them to occur once every 30 days if
+ // compaction filters are used and periodic_compaction_seconds is set to the
+ // default value.
+ if (result.compaction_style != kCompactionStyleFIFO) {
+ if ((result.compaction_filter != nullptr ||
+ result.compaction_filter_factory != nullptr) &&
+ result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
+ is_block_based_table) {
+ result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
+ }
+ } else {
+ // result.compaction_style == kCompactionStyleFIFO
+ if (result.ttl == 0) {
+ if (is_block_based_table) {
+ if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
+ result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
+ }
+ result.ttl = result.periodic_compaction_seconds;
+ }
+ } else if (result.periodic_compaction_seconds != 0) {
+ result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
+ }
+ }
+
+ // TTL compactions would work similar to Periodic Compactions in Universal in
+ // most of the cases. So, if ttl is set, execute the periodic compaction
+ // codepath.
+ if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
+ if (result.periodic_compaction_seconds != 0) {
+ result.periodic_compaction_seconds =
+ std::min(result.ttl, result.periodic_compaction_seconds);
+ } else {
+ result.periodic_compaction_seconds = result.ttl;
+ }
+ }
+
+ if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
+ result.periodic_compaction_seconds = 0;
+ }
+
return result;
}
to_delete.push_back(m);
}
current->Unref();
+ if (cfd->Unref()) {
+ delete cfd;
+ }
}
-void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
- Version* new_current) {
+void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
+ MemTableListVersion* new_imm, Version* new_current) {
+ cfd = new_cfd;
mem = new_mem;
imm = new_imm;
current = new_current;
+ cfd->Ref();
mem->Ref();
imm->Ref();
current->Ref();
uint32_t id, const std::string& name, Version* _dummy_versions,
Cache* _table_cache, WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
- const EnvOptions& env_options, ColumnFamilySet* column_family_set)
+ const FileOptions& file_options, ColumnFamilySet* column_family_set,
+ BlockCacheTracer* const block_cache_tracer)
: id_(id),
name_(name),
dummy_versions_(_dummy_versions),
write_buffer_manager_(write_buffer_manager),
mem_(nullptr),
imm_(ioptions_.min_write_buffer_number_to_merge,
- ioptions_.max_write_buffer_number_to_maintain),
+ ioptions_.max_write_buffer_number_to_maintain,
+ ioptions_.max_write_buffer_size_to_maintain),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
if (_dummy_versions != nullptr) {
internal_stats_.reset(
new InternalStats(ioptions_.num_levels, db_options.env, this));
- table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
+ table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
+ block_cache_tracer));
if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
// compaction_queue_ and we destroyed it
assert(!queued_for_flush_);
assert(!queued_for_compaction_);
-
- if (super_version_ != nullptr) {
- // Release SuperVersion reference kept in ThreadLocalPtr.
- // This must be done outside of mutex_ since unref handler can lock mutex.
- super_version_->db_mutex->Unlock();
- local_sv_.reset();
- super_version_->db_mutex->Lock();
-
- bool is_last_reference __attribute__((__unused__));
- is_last_reference = super_version_->Unref();
- assert(is_last_reference);
- super_version_->Cleanup();
- delete super_version_;
- super_version_ = nullptr;
- }
+ assert(super_version_ == nullptr);
if (dummy_versions_ != nullptr) {
// List must be empty
}
}
+bool ColumnFamilyData::UnrefAndTryDelete() {
+ int old_refs = refs_.fetch_sub(1);
+ assert(old_refs > 0);
+
+ if (old_refs == 1) {
+ assert(super_version_ == nullptr);
+ delete this;
+ return true;
+ }
+
+ if (old_refs == 2 && super_version_ != nullptr) {
+ // Only the super_version_ holds me
+ SuperVersion* sv = super_version_;
+ super_version_ = nullptr;
+ // Release SuperVersion reference kept in ThreadLocalPtr.
+ // This must be done outside of mutex_ since unref handler can lock mutex.
+ sv->db_mutex->Unlock();
+ local_sv_.reset();
+ sv->db_mutex->Lock();
+
+ if (sv->Unref()) {
+ // May delete this ColumnFamilyData after calling Cleanup()
+ sv->Cleanup();
+ delete sv;
+ return true;
+ }
+ }
+ return false;
+}
+
void ColumnFamilyData::SetDropped() {
// can't drop default CF
assert(id_ != 0);
return write_stall_condition;
}
-const EnvOptions* ColumnFamilyData::soptions() const {
- return &(column_family_set_->env_options_);
+const FileOptions* ColumnFamilyData::soptions() const {
+ return &(column_family_set_->file_options_);
}
void ColumnFamilyData::SetCurrent(Version* current_version) {
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
+ SequenceNumber earliest_mem_seqno =
+ std::min(mem_->GetEarliestSequenceNumber(),
+ imm_.current()->GetEarliestSequenceNumber(false));
auto* result = compaction_picker_->PickCompaction(
- GetName(), mutable_options, current_->storage_info(), log_buffer);
+ GetName(), mutable_options, current_->storage_info(), log_buffer,
+ earliest_mem_seqno);
if (result != nullptr) {
result->SetInputVersion(current_);
}
Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options, int input_level,
- int output_level, uint32_t output_path_id, uint32_t max_subcompactions,
+ int output_level, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
- InternalKey** compaction_end, bool* conflict) {
+ InternalKey** compaction_end, bool* conflict,
+ uint64_t max_file_num_to_ignore) {
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, current_->storage_info(), input_level,
- output_level, output_path_id, max_subcompactions, begin, end,
- compaction_end, conflict);
+ output_level, compact_range_options, begin, end, compaction_end, conflict,
+ max_file_num_to_ignore);
if (result != nullptr) {
result->SetInputVersion(current_);
}
return result;
}
-SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
- InstrumentedMutex* db_mutex) {
- SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
+SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
+ SuperVersion* sv = GetThreadLocalSuperVersion(db);
sv->Ref();
if (!ReturnThreadLocalSuperVersion(sv)) {
// This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
return sv;
}
-SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
- InstrumentedMutex* db_mutex) {
+SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
// The SuperVersion is cached in thread local storage to avoid acquiring
// mutex when SuperVersion does not change since the last use. When a new
// SuperVersion is installed, the compaction or flush thread cleans up
if (sv && sv->Unref()) {
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
- db_mutex->Lock();
+ db->mutex()->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
sv->Cleanup();
- sv_to_delete = sv;
+ if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
+ db->AddSuperVersionsToFreeQueue(sv);
+ db->SchedulePurge();
+ } else {
+ sv_to_delete = sv;
+ }
} else {
- db_mutex->Lock();
+ db->mutex()->Lock();
}
sv = super_version_->Ref();
- db_mutex->Unlock();
+ db->mutex()->Unlock();
delete sv_to_delete;
}
SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
- new_superversion->Init(mem_, imm_.current(), current_);
+ new_superversion->Init(this, mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
}
}
+Status ColumnFamilyData::ValidateOptions(
+ const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
+ Status s;
+ s = CheckCompressionSupported(cf_options);
+ if (s.ok() && db_options.allow_concurrent_memtable_write) {
+ s = CheckConcurrentWritesSupported(cf_options);
+ }
+ if (s.ok() && db_options.unordered_write &&
+ cf_options.max_successive_merges != 0) {
+ s = Status::InvalidArgument(
+ "max_successive_merges > 0 is incompatible with unordered_write");
+ }
+ if (s.ok()) {
+ s = CheckCFPathsSupported(db_options, cf_options);
+ }
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
+ if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
+ return Status::NotSupported(
+ "TTL is only supported in Block-Based Table format. ");
+ }
+ }
+
+ if (cf_options.periodic_compaction_seconds > 0 &&
+ cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
+ if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
+ return Status::NotSupported(
+ "Periodic Compaction is only supported in "
+ "Block-Based Table format. ");
+ }
+ }
+ return s;
+}
+
#ifndef ROCKSDB_LITE
Status ColumnFamilyData::SetOptions(
- const std::unordered_map<std::string, std::string>& options_map) {
+ const DBOptions& db_options,
+ const std::unordered_map<std::string, std::string>& options_map) {
MutableCFOptions new_mutable_cf_options;
Status s =
GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
ioptions_.info_log, &new_mutable_cf_options);
+ if (s.ok()) {
+ ColumnFamilyOptions cf_options =
+ BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
+ s = ValidateOptions(db_options, cf_options);
+ }
if (s.ok()) {
mutable_cf_options_ = new_mutable_cf_options;
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
// L1: medium, L2: long, ...
if (level - base_level >= 2) {
return Env::WLTH_EXTREME;
+ } else if (level < base_level) {
+ // There is no restriction which prevents level passed in to be smaller
+ // than base_level.
+ return Env::WLTH_MEDIUM;
}
return static_cast<Env::WriteLifeTimeHint>(level - base_level +
static_cast<int>(Env::WLTH_MEDIUM));
}
-Status ColumnFamilyData::AddDirectories() {
+Status ColumnFamilyData::AddDirectories(
+ std::map<std::string, std::shared_ptr<Directory>>* created_dirs) {
Status s;
+ assert(created_dirs != nullptr);
assert(data_dirs_.empty());
for (auto& p : ioptions_.cf_paths) {
- std::unique_ptr<Directory> path_directory;
- s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
- if (!s.ok()) {
- return s;
+ auto existing_dir = created_dirs->find(p.path);
+
+ if (existing_dir == created_dirs->end()) {
+ std::unique_ptr<Directory> path_directory;
+ s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
+ if (!s.ok()) {
+ return s;
+ }
+ assert(path_directory != nullptr);
+ data_dirs_.emplace_back(path_directory.release());
+ (*created_dirs)[p.path] = data_dirs_.back();
+ } else {
+ data_dirs_.emplace_back(existing_dir->second);
}
- assert(path_directory != nullptr);
- data_dirs_.emplace_back(path_directory.release());
}
assert(data_dirs_.size() == ioptions_.cf_paths.size());
return s;
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const ImmutableDBOptions* db_options,
- const EnvOptions& env_options,
+ const FileOptions& file_options,
Cache* table_cache,
WriteBufferManager* write_buffer_manager,
- WriteController* write_controller)
+ WriteController* write_controller,
+ BlockCacheTracer* const block_cache_tracer)
: max_column_family_(0),
- dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
- ColumnFamilyOptions(), *db_options,
- env_options, nullptr)),
+ dummy_cfd_(new ColumnFamilyData(
+ 0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
+ file_options, nullptr, block_cache_tracer)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
- env_options_(env_options),
+ file_options_(file_options),
table_cache_(table_cache),
write_buffer_manager_(write_buffer_manager),
- write_controller_(write_controller) {
+ write_controller_(write_controller),
+ block_cache_tracer_(block_cache_tracer) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
// cfd destructor will delete itself from column_family_data_
auto cfd = column_family_data_.begin()->second;
bool last_ref __attribute__((__unused__));
- last_ref = cfd->Unref();
+ last_ref = cfd->UnrefAndTryDelete();
assert(last_ref);
- delete cfd;
}
bool dummy_last_ref __attribute__((__unused__));
- dummy_last_ref = dummy_cfd_->Unref();
+ dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
assert(dummy_last_ref);
- delete dummy_cfd_;
}
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
- *db_options_, env_options_, this);
+ *db_options_, file_options_, this, block_cache_tracer_);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
return nullptr;
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE