#include <vector>
#include "db/blob/blob_file_cache.h"
+#include "db/blob/blob_source.h"
#include "db/compaction/compaction_picker.h"
#include "db/compaction/compaction_picker_fifo.h"
#include "db/compaction/compaction_picker_level.h"
#include "db/version_set.h"
#include "db/write_controller.h"
#include "file/sst_file_manager_impl.h"
-#include "memtable/hash_skiplist_rep.h"
+#include "logging/logging.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "port/port.h"
+#include "rocksdb/convenience.h"
#include "rocksdb/table.h"
#include "table/merging_iterator.h"
#include "util/autovector.h"
bool defer_purge =
db_->immutable_db_options().avoid_unnecessary_blocking_io;
db_->PurgeObsoleteFiles(job_context, defer_purge);
- if (defer_purge) {
- mutex_->Lock();
- db_->SchedulePurge();
- mutex_->Unlock();
- }
}
job_context.Clean();
}
void GetIntTblPropCollectorFactory(
const ImmutableCFOptions& ioptions,
- std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
- int_tbl_prop_collector_factories) {
+ IntTblPropCollectorFactories* int_tbl_prop_collector_factories) {
+ assert(int_tbl_prop_collector_factories);
+
auto& collector_factories = ioptions.table_properties_collector_factories;
for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
++i) {
}
}
if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
- if (!ZSTD_TrainDictionarySupported()) {
+ if (cf_options.compression_opts.use_zstd_dict_trainer) {
+ if (!ZSTD_TrainDictionarySupported()) {
+ return Status::InvalidArgument(
+ "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
+ "is not linked with the binary.");
+ }
+ } else if (!ZSTD_FinalizeDictionarySupported()) {
return Status::InvalidArgument(
- "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
+ "zstd finalizeDictionary cannot be used because ZSTD 1.4.5+ "
"is not linked with the binary.");
}
if (cf_options.compression_opts.max_dict_bytes == 0) {
return Status::NotSupported(
"More than one CF paths are only supported in "
"universal and level compaction styles. ");
- } else if (cf_options.cf_paths.empty() &&
- db_options.db_paths.size() > 1) {
+ } else if (cf_options.cf_paths.empty() && db_options.db_paths.size() > 1) {
return Status::NotSupported(
"More than one DB paths are only supported in "
"universal and level compaction styles. ");
namespace {
const uint64_t kDefaultTtl = 0xfffffffffffffffe;
const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
-}; // namespace
+} // anonymous namespace
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
const ColumnFamilyOptions& src) {
size_t clamp_max = std::conditional<
sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
std::integral_constant<uint64_t, 64ull << 30>>::type::value;
- ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
+ ClipToRange(&result.write_buffer_size, (static_cast<size_t>(64)) << 10,
+ clamp_max);
// if user sets arena_block_size, we trust user to use this value. Otherwise,
// calculate a proper value from writer_buffer_size;
if (result.arena_block_size <= 0) {
- result.arena_block_size = result.write_buffer_size / 8;
+ result.arena_block_size =
+ std::min(size_t{1024 * 1024}, result.write_buffer_size / 8);
// Align up to 4k
const size_t align = 4 * 1024;
result.min_write_buffer_number_to_merge = 1;
}
+ if (db_options.atomic_flush && result.min_write_buffer_number_to_merge > 1) {
+ ROCKS_LOG_WARN(
+ db_options.logger,
+ "Currently, if atomic_flush is true, then triggering flush for any "
+ "column family internally (non-manual flush) will trigger flushing "
+ "all column families even if the number of memtables is smaller "
+ "min_write_buffer_number_to_merge. Therefore, configuring "
+ "min_write_buffer_number_to_merge > 1 is not compatible and should "
+ "be satinized to 1. Not doing so will lead to data loss and "
+ "inconsistent state across multiple column families when WAL is "
+ "disabled, which is a common setting for atomic flush");
+
+ result.min_write_buffer_number_to_merge = 1;
+ }
+
if (result.num_levels < 1) {
result.num_levels = 1;
}
}
if (result.compaction_style == kCompactionStyleFIFO) {
- result.num_levels = 1;
// since we delete level0 files in FIFO compaction when there are too many
// of them, these options don't really mean anything
result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
}
if (result.level0_file_num_compaction_trigger == 0) {
- ROCKS_LOG_WARN(db_options.info_log.get(),
+ ROCKS_LOG_WARN(db_options.logger,
"level0_file_num_compaction_trigger cannot be 0");
result.level0_file_num_compaction_trigger = 1;
}
result.level0_slowdown_writes_trigger ||
result.level0_slowdown_writes_trigger <
result.level0_file_num_compaction_trigger) {
- ROCKS_LOG_WARN(db_options.info_log.get(),
+ ROCKS_LOG_WARN(db_options.logger,
"This condition must be satisfied: "
"level0_stop_writes_trigger(%d) >= "
"level0_slowdown_writes_trigger(%d) >= "
result.level0_slowdown_writes_trigger) {
result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
}
- ROCKS_LOG_WARN(db_options.info_log.get(),
+ ROCKS_LOG_WARN(db_options.logger,
"Adjust the value to "
"level0_stop_writes_trigger(%d)"
"level0_slowdown_writes_trigger(%d)"
// were not deleted yet, when we open the DB we will find these .trash files
// and schedule them to be deleted (or delete immediately if SstFileManager
// was not used)
- auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
+ auto sfm =
+ static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
for (size_t i = 0; i < result.cf_paths.size(); i++) {
- DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
+ DeleteScheduler::CleanupDirectory(db_options.env, sfm,
+ result.cf_paths[i].path)
+ .PermitUncheckedError();
}
#endif
}
if (result.level_compaction_dynamic_level_bytes) {
- if (result.compaction_style != kCompactionStyleLevel ||
- result.cf_paths.size() > 1U) {
- // 1. level_compaction_dynamic_level_bytes only makes sense for
- // level-based compaction.
- // 2. we don't yet know how to make both of this feature and multiple
- // DB path work.
+ if (result.compaction_style != kCompactionStyleLevel) {
+ ROCKS_LOG_WARN(db_options.info_log.get(),
+ "level_compaction_dynamic_level_bytes only makes sense"
+ "for level-based compaction");
+ result.level_compaction_dynamic_level_bytes = false;
+ } else if (result.cf_paths.size() > 1U) {
+ // we don't yet know how to make both of this feature and multiple
+ // DB path work.
+ ROCKS_LOG_WARN(db_options.info_log.get(),
+ "multiple cf_paths/db_paths and"
+ "level_compaction_dynamic_level_bytes"
+ "can't be used together");
result.level_compaction_dynamic_level_bytes = false;
}
}
void SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0);
+ // Since this SuperVersion object is being deleted,
+ // decrement reference to the immutable MemtableList
+ // this SV object was pointing to.
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
to_delete.push_back(m);
}
current->Unref();
- if (cfd->Unref()) {
- delete cfd;
- }
+ cfd->UnrefAndTryDelete();
}
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
namespace {
void SuperVersionUnrefHandle(void* ptr) {
- // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
- // destroyed. When former happens, the thread shouldn't see kSVInUse.
- // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
- // well.
+ // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
+ // destroyed. When the former happens, the thread shouldn't see kSVInUse.
+ // When the latter happens, only super_version_ holds a reference
+ // to ColumnFamilyData, so no further queries are possible.
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
bool was_last_ref __attribute__((__unused__));
was_last_ref = sv->Unref();
return paths;
}
-const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;
+const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId =
+ std::numeric_limits<uint32_t>::max();
ColumnFamilyData::ColumnFamilyData(
uint32_t id, const std::string& name, Version* _dummy_versions,
Cache* _table_cache, WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
- const FileOptions& file_options, ColumnFamilySet* column_family_set,
+ const FileOptions* file_options, ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer,
- const std::shared_ptr<IOTracer>& io_tracer)
+ const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
+ const std::string& db_session_id)
: id_(id),
name_(name),
dummy_versions_(_dummy_versions),
prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc),
last_memtable_id_(0),
- db_paths_registered_(false) {
+ db_paths_registered_(false),
+ mempurge_used_(false) {
if (id_ != kDummyColumnFamilyDataId) {
// TODO(cc): RegisterDbPaths can be expensive, considering moving it
// outside of this constructor which might be called with db mutex held.
db_paths_registered_ = true;
} else {
ROCKS_LOG_ERROR(
- ioptions_.info_log,
+ ioptions_.logger,
"Failed to register data paths of column family (id: %d, name: %s)",
id_, name_.c_str());
}
// if _dummy_versions is nullptr, then this is a dummy column family.
if (_dummy_versions != nullptr) {
internal_stats_.reset(
- new InternalStats(ioptions_.num_levels, db_options.env, this));
+ new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
- block_cache_tracer, io_tracer));
+ block_cache_tracer, io_tracer,
+ db_session_id));
blob_file_cache_.reset(
new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
- internal_stats_->GetBlobFileReadHist()));
+ internal_stats_->GetBlobFileReadHist(), io_tracer));
+ blob_source_.reset(new BlobSource(ioptions(), db_id, db_session_id,
+ blob_file_cache_.get()));
if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
compaction_picker_.reset(
new FIFOCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleNone) {
- compaction_picker_.reset(new NullCompactionPicker(
- ioptions_, &internal_comparator_));
- ROCKS_LOG_WARN(ioptions_.info_log,
+ compaction_picker_.reset(
+ new NullCompactionPicker(ioptions_, &internal_comparator_));
+ ROCKS_LOG_WARN(ioptions_.logger,
"Column family %s does not use any background compaction. "
"Compactions can only be done via CompactFiles\n",
GetName().c_str());
#endif // !ROCKSDB_LITE
} else {
- ROCKS_LOG_ERROR(ioptions_.info_log,
+ ROCKS_LOG_ERROR(ioptions_.logger,
"Unable to recognize the specified compaction style %d. "
"Column family %s will use kCompactionStyleLevel.\n",
ioptions_.compaction_style, GetName().c_str());
}
if (column_family_set_->NumberOfColumnFamilies() < 10) {
- ROCKS_LOG_INFO(ioptions_.info_log,
+ ROCKS_LOG_INFO(ioptions_.logger,
"--------------- Options for column family [%s]:\n",
name.c_str());
- initial_cf_options_.Dump(ioptions_.info_log);
+ initial_cf_options_.Dump(ioptions_.logger);
} else {
- ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
+ ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n");
}
}
RecalculateWriteStallConditions(mutable_cf_options_);
+
+ if (cf_options.table_factory->IsInstanceOf(
+ TableFactory::kBlockBasedTableName()) &&
+ cf_options.table_factory->GetOptions<BlockBasedTableOptions>()) {
+ const BlockBasedTableOptions* bbto =
+ cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
+ const auto& options_overrides = bbto->cache_usage_options.options_overrides;
+ const auto file_metadata_charged =
+ options_overrides.at(CacheEntryRole::kFileMetadata).charged;
+ if (bbto->block_cache &&
+ file_metadata_charged == CacheEntryRoleOptions::Decision::kEnabled) {
+ // TODO(hx235): Add a `ConcurrentCacheReservationManager` at DB scope
+ // responsible for reservation of `ObsoleteFileInfo` so that we can keep
+ // this `file_metadata_cache_res_mgr_` nonconcurrent
+ file_metadata_cache_res_mgr_.reset(new ConcurrentCacheReservationManager(
+ std::make_shared<
+ CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>(
+ bbto->block_cache)));
+ }
+ }
}
// DB mutex held
if (dummy_versions_ != nullptr) {
// List must be empty
- assert(dummy_versions_->TEST_Next() == dummy_versions_);
+ assert(dummy_versions_->Next() == dummy_versions_);
bool deleted __attribute__((__unused__));
deleted = dummy_versions_->Unref();
assert(deleted);
Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
if (!s.ok()) {
ROCKS_LOG_ERROR(
- ioptions_.info_log,
+ ioptions_.logger,
"Failed to unregister data paths of column family (id: %d, name: %s)",
id_, name_.c_str());
}
// Only the super_version_ holds me
SuperVersion* sv = super_version_;
super_version_ = nullptr;
- // Release SuperVersion reference kept in ThreadLocalPtr.
- // This must be done outside of mutex_ since unref handler can lock mutex.
- sv->db_mutex->Unlock();
+
+ // Release SuperVersion references kept in ThreadLocalPtr.
local_sv_.reset();
- sv->db_mutex->Lock();
if (sv->Unref()) {
- // May delete this ColumnFamilyData after calling Cleanup()
+ // Note: sv will delete this ColumnFamilyData during Cleanup()
+ assert(sv->cfd == this);
sv->Cleanup();
delete sv;
return true;
auto current_log = GetLogNumber();
if (allow_2pc_) {
- autovector<MemTable*> empty_list;
- auto imm_prep_log =
- imm()->PrecomputeMinLogContainingPrepSection(empty_list);
+ auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
if (imm_prep_log > 0 && imm_prep_log < current_log) {
std::unique_ptr<WriteControllerToken> SetupDelay(
WriteController* write_controller, uint64_t compaction_needed_bytes,
uint64_t prev_compaction_need_bytes, bool penalize_stop,
- bool auto_comapctions_disabled) {
+ bool auto_compactions_disabled) {
const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
uint64_t max_write_rate = write_controller->max_delayed_write_rate();
uint64_t write_rate = write_controller->delayed_write_rate();
- if (auto_comapctions_disabled) {
+ if (auto_compactions_disabled) {
// When auto compaction is disabled, always use the value user gave.
write_rate = max_write_rate;
} else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
// condition.
// Or twice as compaction trigger, if it is smaller.
int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
- if (res >= port::kMaxInt32) {
- return port::kMaxInt32;
+ if (res >= std::numeric_limits<int32_t>::max()) {
+ return std::numeric_limits<int32_t>::max();
} else {
// res fits in int
return static_cast<int>(res);
}
}
-} // namespace
+} // anonymous namespace
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
int num_unflushed_memtables, int num_l0_files,
uint64_t num_compaction_needed_bytes,
- const MutableCFOptions& mutable_cf_options) {
+ const MutableCFOptions& mutable_cf_options,
+ const ImmutableCFOptions& immutable_cf_options) {
if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
} else if (!mutable_cf_options.disable_auto_compactions &&
WriteStallCause::kPendingCompactionBytes};
} else if (mutable_cf_options.max_write_buffer_number > 3 &&
num_unflushed_memtables >=
- mutable_cf_options.max_write_buffer_number - 1) {
+ mutable_cf_options.max_write_buffer_number - 1 &&
+ num_unflushed_memtables - 1 >=
+ immutable_cf_options.min_write_buffer_number_to_merge) {
return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
} else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
}
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
- const MutableCFOptions& mutable_cf_options) {
+ const MutableCFOptions& mutable_cf_options) {
auto write_stall_condition = WriteStallCondition::kNormal;
if (current_ != nullptr) {
auto* vstorage = current_->storage_info();
auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
- vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
+ vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
+ *ioptions());
write_stall_condition = write_stall_condition_and_cause.first;
auto write_stall_cause = write_stall_condition_and_cause.second;
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
- ioptions_.info_log,
+ ioptions_.logger,
"[%s] Stopping writes because we have %d immutable memtables "
"(waiting for flush), max_write_buffer_number is set to %d",
name_.c_str(), imm()->NumNotFlushed(),
internal_stats_->AddCFStats(
InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
}
- ROCKS_LOG_WARN(ioptions_.info_log,
+ ROCKS_LOG_WARN(ioptions_.logger,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (write_stall_condition == WriteStallCondition::kStopped &&
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
- ioptions_.info_log,
+ ioptions_.logger,
"[%s] Stopping writes because of estimated pending compaction "
"bytes %" PRIu64,
name_.c_str(), compaction_needed_bytes);
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
ROCKS_LOG_WARN(
- ioptions_.info_log,
+ ioptions_.logger,
"[%s] Stalling writes because we have %d immutable memtables "
"(waiting for flush), max_write_buffer_number is set to %d "
"rate %" PRIu64,
internal_stats_->AddCFStats(
InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
}
- ROCKS_LOG_WARN(ioptions_.info_log,
+ ROCKS_LOG_WARN(ioptions_.logger,
"[%s] Stalling writes because we have %d level-0 files "
"rate %" PRIu64,
name_.c_str(), vstorage->l0_delay_trigger_count(),
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
(compaction_needed_bytes -
mutable_cf_options.soft_pending_compaction_bytes_limit) >
- 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
+ 3 *
+ (mutable_cf_options.hard_pending_compaction_bytes_limit -
mutable_cf_options.soft_pending_compaction_bytes_limit) /
4;
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
ROCKS_LOG_WARN(
- ioptions_.info_log,
+ ioptions_.logger,
"[%s] Stalling writes because of estimated pending compaction "
"bytes %" PRIu64 " rate %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller_token_ =
write_controller->GetCompactionPressureToken();
ROCKS_LOG_INFO(
- ioptions_.info_log,
+ ioptions_.logger,
"[%s] Increasing compaction threads because we have %d level-0 "
"files ",
name_.c_str(), vstorage->l0_delay_trigger_count());
write_controller->GetCompactionPressureToken();
if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
ROCKS_LOG_INFO(
- ioptions_.info_log,
+ ioptions_.logger,
"[%s] Increasing compaction threads because of estimated pending "
"compaction "
"bytes %" PRIu64,
return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}
+uint64_t ColumnFamilyData::GetTotalBlobFileSize() const {
+ return VersionSet::GetTotalBlobFileSize(dummy_versions_);
+}
+
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
return current_->GetSstFilesSize();
}
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_opts, &arena));
- super_version->imm->AddIterators(read_opts, &merge_iter_builder);
+ super_version->imm->AddIterators(read_opts, &merge_iter_builder,
+ false /* add_range_tombstone_iter */);
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
auto read_seq = super_version->current->version_set()->LastSequence();
ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
- auto* active_range_del_iter =
- super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
+ auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator(
+ read_opts, read_seq, false /* immutable_memtable */);
range_del_agg.AddTombstones(
std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
Status status;
int output_level, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* conflict,
- uint64_t max_file_num_to_ignore) {
+ uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, mutable_db_options,
current_->storage_info(), input_level, output_level,
compact_range_options, begin, end, compaction_end, conflict,
- max_file_num_to_ignore);
+ max_file_num_to_ignore, trim_ts);
if (result != nullptr) {
result->SetInputVersion(current_);
}
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
- RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
+ RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
- RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
+ RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_CLEANUPS);
db->mutex()->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
return false;
}
-void ColumnFamilyData::InstallSuperVersion(
- SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
+void ColumnFamilyData::InstallSuperVersion(SuperVersionContext* sv_context,
+ InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld();
- return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
+ return InstallSuperVersion(sv_context, mutable_cf_options_);
}
void ColumnFamilyData::InstallSuperVersion(
- SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
+ SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options) {
SuperVersion* new_superversion = sv_context->new_superversion.release();
- new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(this, mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
- super_version_->write_stall_condition =
- RecalculateWriteStallConditions(mutable_cf_options);
-
+ if (old_superversion == nullptr || old_superversion->current != current() ||
+ old_superversion->mem != mem_ ||
+ old_superversion->imm != imm_.current()) {
+ // Should not recalculate slow down condition if nothing has changed, since
+ // currently RecalculateWriteStallConditions() treats it as further slowing
+ // down is needed.
+ super_version_->write_stall_condition =
+ RecalculateWriteStallConditions(mutable_cf_options);
+ } else {
+ super_version_->write_stall_condition =
+ old_superversion->write_stall_condition;
+ }
if (old_superversion != nullptr) {
// Reset SuperVersions cached in thread local storage.
// This should be done before old_superversion->Unref(). That's to ensure
"Block-Based Table format. ");
}
}
+
+ if (cf_options.enable_blob_garbage_collection) {
+ if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
+ cf_options.blob_garbage_collection_age_cutoff > 1.0) {
+ return Status::InvalidArgument(
+ "The age cutoff for blob garbage collection should be in the range "
+ "[0.0, 1.0].");
+ }
+ if (cf_options.blob_garbage_collection_force_threshold < 0.0 ||
+ cf_options.blob_garbage_collection_force_threshold > 1.0) {
+ return Status::InvalidArgument(
+ "The garbage ratio threshold for forcing blob garbage collection "
+ "should be in the range [0.0, 1.0].");
+ }
+ }
+
+ if (cf_options.compaction_style == kCompactionStyleFIFO &&
+ db_options.max_open_files != -1 && cf_options.ttl > 0) {
+ return Status::NotSupported(
+ "FIFO compaction only supported with max_open_files = -1.");
+ }
+
+ std::vector<uint32_t> supported{0, 1, 2, 4, 8};
+ if (std::find(supported.begin(), supported.end(),
+ cf_options.memtable_protection_bytes_per_key) ==
+ supported.end()) {
+ return Status::NotSupported(
+ "Memtable per key-value checksum protection only supports 0, 1, 2, 4 "
+ "or 8 bytes per key.");
+ }
return s;
}
#ifndef ROCKSDB_LITE
Status ColumnFamilyData::SetOptions(
- const DBOptions& db_options,
+ const DBOptions& db_opts,
const std::unordered_map<std::string, std::string>& options_map) {
- MutableCFOptions new_mutable_cf_options;
- Status s =
- GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
- ioptions_.info_log, &new_mutable_cf_options);
+ ColumnFamilyOptions cf_opts =
+ BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
+ ConfigOptions config_opts;
+ config_opts.mutable_options_only = true;
+ Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
+ &cf_opts);
if (s.ok()) {
- ColumnFamilyOptions cf_options =
- BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
- s = ValidateOptions(db_options, cf_options);
+ s = ValidateOptions(db_opts, cf_opts);
}
if (s.ok()) {
- mutable_cf_options_ = new_mutable_cf_options;
+ mutable_cf_options_ = MutableCFOptions(cf_opts);
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
}
return s;
// than base_level.
return Env::WLTH_MEDIUM;
}
- return static_cast<Env::WriteLifeTimeHint>(level - base_level +
- static_cast<int>(Env::WLTH_MEDIUM));
+ return static_cast<Env::WriteLifeTimeHint>(
+ level - base_level + static_cast<int>(Env::WLTH_MEDIUM));
}
Status ColumnFamilyData::AddDirectories(
if (existing_dir == created_dirs->end()) {
std::unique_ptr<FSDirectory> path_directory;
- s = DBImpl::CreateAndNewDirectory(ioptions_.fs, p.path, &path_directory);
+ s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path,
+ &path_directory);
if (!s.ok()) {
return s;
}
WriteBufferManager* _write_buffer_manager,
WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer,
- const std::shared_ptr<IOTracer>& io_tracer)
+ const std::shared_ptr<IOTracer>& io_tracer,
+ const std::string& db_id,
+ const std::string& db_session_id)
: max_column_family_(0),
+ file_options_(file_options),
dummy_cfd_(new ColumnFamilyData(
ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
- nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr,
- block_cache_tracer, io_tracer)),
+ nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
+ block_cache_tracer, io_tracer, db_id, db_session_id)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
- file_options_(file_options),
table_cache_(table_cache),
write_buffer_manager_(_write_buffer_manager),
write_controller_(_write_controller),
block_cache_tracer_(block_cache_tracer),
- io_tracer_(io_tracer) {
+ io_tracer_(io_tracer),
+ db_id_(db_id),
+ db_session_id_(db_session_id) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
}
}
-ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
- const {
+ColumnFamilyData* ColumnFamilySet::GetColumnFamily(
+ const std::string& name) const {
auto cfd_iter = column_families_.find(name);
if (cfd_iter != column_families_.end()) {
auto cfd = GetColumnFamily(cfd_iter->second);
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
- *db_options_, file_options_, this, block_cache_tracer_, io_tracer_);
+ *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
+ db_id_, db_session_id_);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
return new_cfd;
}
-// REQUIRES: DB mutex held
-void ColumnFamilySet::FreeDeadColumnFamilies() {
- autovector<ColumnFamilyData*> to_delete;
- for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
- if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
- to_delete.push_back(cfd);
- }
- }
- for (auto cfd : to_delete) {
- // this is very rare, so it's not a problem that we do it under a mutex
- delete cfd;
- }
-}
-
// under a DB mutex AND from a write thread
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID());